Skip to content

Manager

Here will define a list of clusters

Each cluster will have a list of chain components

For example, end-to-end conversation chain will have the following components:

  • completed_speech2text
  • created_data_text
  • completed_emotion_detection
  • completed_quantization_llm
  • completed_text2speech

ClusterManager

Source code in API/orchestrator/chain/manager.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class ClusterManager:

    @staticmethod
    def get_cluster(cluster_name: str):
        """
        Get the cluster

        Args:
            cluster_name (str): The cluster name
        """
        if cluster_name in CLUSTERS:
            return CLUSTERS[cluster_name]
        return None

    @staticmethod
    def get_next_chain_component(
        cluster: dict, current_component: str
    ) -> Tuple[Optional[str], Optional[dict]]:
        """
        Get the next chain

        Args:
            cluster (dict): The cluster
            current_component (str): The current component

        Return:
            Tuple[Optional[str], Optional[dict]]: The next component and its parameters if exists, otherwise None
        """
        chain = []
        for key, value in cluster.items():
            chain.append(key)
        chain.sort(key=lambda x: cluster[x]["order"])
        if current_component == "init":
            """
            If this is the start of the chain, then return the first component
            """
            return chain[0], cluster[chain[0]]
        # index of the current component
        current_component_index = chain.index(current_component)
        next_index = current_component_index + 1
        if next_index >= len(chain):
            return None, None
        return chain[next_index], cluster[chain[next_index]]

    @classmethod
    def get_next(cls, cluster_name: str, current_component: str):
        """
        Get the next component

        Args:
            cluster_name (str): The cluster name
            current_component (str): The current component
        """
        cluster = cls.get_cluster(cluster_name)
        if cluster is None:
            return None
        return ClusterManager.get_next_chain_component(cluster, current_component)

    @classmethod
    def chain_next(
        cls,
        track_id: Optional[str],
        current_component: str,
        next_component_params: dict,
        name: str = None,
        user=None,
    ):
        """
        Chain to the next component

        Args:
            current_component (str): The current component
            track_id (str): The track ID
            next_component_params (dict): The next component parameters
            name (str): The task name, it will be used to aggregate the task
            user (None): The user
        """
        logger.info(f"Current component: {current_component}")
        logger.info(f"Next component params: {next_component_params}")
        cluster_name = track_id.split("-")[1]
        next_component_name, next_component = cls.get_next(
            cluster_name, current_component
        )
        logger.info(f"Next component: {next_component_name}")

        if next_component_name is None:
            return
        # do something with the next component
        # It can be a task or a signal
        next_parameters = {
            **next_component_params,
            **next_component.get("extra_params", {}),
        }
        logger.info(next_parameters)
        logger.info(next_component_name)

        if next_component["component_type"] == "task":
            task = Task.create_task(
                user=user,
                name=name or next_component["task_name"],
                task_name=next_component["task_name"],
                parameters=next_parameters,
                track_id=track_id,
            )
            logger.info(f"Task {task.id} created for {next_component['task_name']}")
            return task.id
        elif next_component["component_type"] == "signal":
            if next_component_name == "created_data_text":
                created_data_text.send(
                    sender=next_component_params.get("sender"),
                    data=next_component_params.get("data"),
                    track_id=track_id,
                    user=user,
                )
        return None

chain_next(track_id, current_component, next_component_params, name=None, user=None) classmethod

Chain to the next component

Parameters:

Name Type Description Default
current_component str

The current component

required
track_id str

The track ID

required
next_component_params dict

The next component parameters

required
name str

The task name, it will be used to aggregate the task

None
user None

The user

None
Source code in API/orchestrator/chain/manager.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@classmethod
def chain_next(
    cls,
    track_id: Optional[str],
    current_component: str,
    next_component_params: dict,
    name: str = None,
    user=None,
):
    """
    Chain to the next component

    Args:
        current_component (str): The current component
        track_id (str): The track ID
        next_component_params (dict): The next component parameters
        name (str): The task name, it will be used to aggregate the task
        user (None): The user
    """
    logger.info(f"Current component: {current_component}")
    logger.info(f"Next component params: {next_component_params}")
    cluster_name = track_id.split("-")[1]
    next_component_name, next_component = cls.get_next(
        cluster_name, current_component
    )
    logger.info(f"Next component: {next_component_name}")

    if next_component_name is None:
        return
    # do something with the next component
    # It can be a task or a signal
    next_parameters = {
        **next_component_params,
        **next_component.get("extra_params", {}),
    }
    logger.info(next_parameters)
    logger.info(next_component_name)

    if next_component["component_type"] == "task":
        task = Task.create_task(
            user=user,
            name=name or next_component["task_name"],
            task_name=next_component["task_name"],
            parameters=next_parameters,
            track_id=track_id,
        )
        logger.info(f"Task {task.id} created for {next_component['task_name']}")
        return task.id
    elif next_component["component_type"] == "signal":
        if next_component_name == "created_data_text":
            created_data_text.send(
                sender=next_component_params.get("sender"),
                data=next_component_params.get("data"),
                track_id=track_id,
                user=user,
            )
    return None

get_cluster(cluster_name) staticmethod

Get the cluster

Parameters:

Name Type Description Default
cluster_name str

The cluster name

required
Source code in API/orchestrator/chain/manager.py
28
29
30
31
32
33
34
35
36
37
38
@staticmethod
def get_cluster(cluster_name: str):
    """
    Get the cluster

    Args:
        cluster_name (str): The cluster name
    """
    if cluster_name in CLUSTERS:
        return CLUSTERS[cluster_name]
    return None

get_next(cluster_name, current_component) classmethod

Get the next component

Parameters:

Name Type Description Default
cluster_name str

The cluster name

required
current_component str

The current component

required
Source code in API/orchestrator/chain/manager.py
70
71
72
73
74
75
76
77
78
79
80
81
82
@classmethod
def get_next(cls, cluster_name: str, current_component: str):
    """
    Get the next component

    Args:
        cluster_name (str): The cluster name
        current_component (str): The current component
    """
    cluster = cls.get_cluster(cluster_name)
    if cluster is None:
        return None
    return ClusterManager.get_next_chain_component(cluster, current_component)

get_next_chain_component(cluster, current_component) staticmethod

Get the next chain

Parameters:

Name Type Description Default
cluster dict

The cluster

required
current_component str

The current component

required
Return

Tuple[Optional[str], Optional[dict]]: The next component and its parameters if exists, otherwise None

Source code in API/orchestrator/chain/manager.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@staticmethod
def get_next_chain_component(
    cluster: dict, current_component: str
) -> Tuple[Optional[str], Optional[dict]]:
    """
    Get the next chain

    Args:
        cluster (dict): The cluster
        current_component (str): The current component

    Return:
        Tuple[Optional[str], Optional[dict]]: The next component and its parameters if exists, otherwise None
    """
    chain = []
    for key, value in cluster.items():
        chain.append(key)
    chain.sort(key=lambda x: cluster[x]["order"])
    if current_component == "init":
        """
        If this is the start of the chain, then return the first component
        """
        return chain[0], cluster[chain[0]]
    # index of the current component
    current_component_index = chain.index(current_component)
    next_index = current_component_index + 1
    if next_index >= len(chain):
        return None, None
    return chain[next_index], cluster[chain[next_index]]