Skip to content

main

AIOrchestrator

This is the AI Orchestrator

We will pull the task from the API end And then based on which type of the task it is, we will send it to the respective handler

Source code in Agent/main.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
class AIOrchestrator:
    """
    This is the AI Orchestrator

    We will pull the task from the API end
    And then based on which type of the task it is, we will send it to the respective handler
    """

    def __init__(
        self,
        api_domain: str,
        token: str,
        task_name: Optional[str] = "all",
        time_sleep: Optional[float] = 1.5,
    ):
        """
        Initialize the AI Orchestrator
        Args:
            api_domain (str): The API Domain
            token (str): The API Token
            task_name (str): The task name. Default is "all"
            time_sleep (float): The time to sleep. Default is 1.5 during each loop
        """
        self.uuid = str(uuid.uuid4())
        self.api_domain = api_domain
        self.token = token
        self.task_name = task_name
        self.api = API(
            domain=api_domain, token=token, task_name=task_name, uuid=self.uuid
        )
        self.api.register_or_update_worker()
        self.storage_solution = self.api.get_storage_solution()
        # controller
        self.counter = 0
        self.time_sleep = time_sleep

        # first check the authentication of the token valid or not
        if not self.authenticate_token():
            raise Exception("Token is not valid")

        if not self.pre_env_check():
            raise Exception("Pre Environment Check Failed")

        self.speech2text = None
        self.text2speech = None
        self.emotion_detection = None
        self.quantization_llm = None
        self.hf_llm = None
        self.general_ml = None
        self.openai_handler = None
        self.rag_handler = None

        self.task_name_router = {
            TaskName.speech2text.value: self.handle_speech2text_task,
            TaskName.text2speech.value: self.handle_text2speech_task,
            TaskName.emotion_detection.value: self.handle_emotion_detection_task,
            TaskName.quantization_llm.value: self.handle_quantization_llm_task,
            TaskName.hf_llm.value: self.handle_hf_llm_task,
            TaskName.general_ml.value: self.handle_general_ml_task,
            TaskName.openai_gpt4o.value: self.handle_openai_task,
            TaskName.openai_speech2text.value: self.handle_openai_task,
            TaskName.openai_text2speech.value: self.handle_openai_task,
            TaskName.openai_gpt4o_text_only.value: self.handle_openai_task,
            TaskName.openai_gpt_4o_text_and_image.value: self.handle_openai_task,
            TaskName.rag.value: self.handle_rag_task,
        }

    def authenticate_token(self):
        """
        Authenticate the token
        Returns:
            bool: True if the token is valid
        """
        return self.api.verify_token()

    def pre_env_check(self):
        # if task is text 2 speech, check openai key
        load_dotenv()
        if self.task_name in ["all", "text2speech"]:
            # check openai key
            openai_key = os.getenv("OPENAI_API_KEY")
            if openai_key is None:
                # READ from .env, and set it
                # if it not exists, then return False
                logger.error("OpenAI API Key is not set")
                return False
        if self.task_name in ["all", "hf_llm"]:
            # check openai key
            openai_key = os.getenv("HF_TOKEN")
            if openai_key is None:
                logger.error("OpenAI HF TOKEN is not set")
                return False
        return True

    def run(self):
        logger.info(f"AI Worker Running UUID: {self.uuid}")
        while True:
            self.counter += 1
            if self.counter % 50 == 0:
                # report to the cloud that we are still alive
                logger.info(f"Still alive. Counter: {self.counter}")
                self.api.register_or_update_worker()
            try:
                with timer(logger=logger, message="get_task"):
                    task = self.api.get_task()
                # after get the task, then feed it to the model to evaluate the model params
                if task is None:
                    logger.info("No task found")
                    time.sleep(self.time_sleep)
                    continue
                self.handle_task(task)
            # allow it accepts keyboard interrupt
            except KeyboardInterrupt:
                logger.info("Keyboard Interrupt")
                break
            except Exception as e:
                logger.exception(e)
            time.sleep(self.time_sleep)

    def handle_task(self, task: dict):
        """
        Handle the task
        Args:
            task (dict): The task
        """
        task_obj = Task(**task)
        TimeLogger.log_task(task_obj, "start_task")
        if task_obj.task_name in self.task_name_router:
            task_obj = self.task_name_router[task_obj.task_name](task_obj)
        elif "openai" in task_obj.task_name:
            task_obj = self.handle_openai_task(task_obj)
        else:
            logger.error(f"Unknown task type: {task_obj.task_name}")
            task_obj.result_status = ResultStatus.failed.value
            task_obj.description = f"Unknown task type: {task_obj.task_name}"
        TimeLogger.log_task(task_obj, "end_task")
        # then update the task status
        self.api.post_task_result(task_obj)

    def handle_speech2text_task(self, task: Task):
        """
        Handle the speech2text task
        Args:
            task (Task): The task
        """
        if self.speech2text is None:
            self.speech2text = Speech2Text()
        task = self.speech2text.handle_task(task)
        return task

    def handle_text2speech_task(self, task: Task):
        """
        Handle the text2speech task
        Args:
            task (Task): The task
        """
        if self.text2speech is None:
            self.text2speech = Text2Speech()
        task = self.text2speech.handle_task(task)
        return task

    def handle_emotion_detection_task(self, task: Task):
        """
        Handle the emotion detection task
        Args:
            task (Task): The task
        """
        if self.emotion_detection is None:
            self.emotion_detection = EmotionDetectionHandler()
        task = self.emotion_detection.handle_task(task)
        return task

    def handle_quantization_llm_task(self, task: Task):
        """
        Handle the quantization llm task
        Args:
            task (Task): The task
        """
        if self.quantization_llm is None:
            self.quantization_llm = QuantizationLLM(api=self.api)
        task = self.quantization_llm.handle_task(task)
        return task

    def handle_hf_llm_task(self, task: Task):
        """
        Handle the hf llm task which will require more time compare to other tasks
        Args:
            task (Task): The task

        Returns:

        """
        if self.hf_llm is None:
            self.hf_llm = HFLLM()
        task = self.hf_llm.handle_task(task)
        return task

    def handle_general_ml_task(self, task: Task):
        """
        Handle the general ml task
        Args:
            task (Task): The task

        Returns:

        """
        if self.general_ml is None:
            self.general_ml = GeneralMLModel()
        task = self.general_ml.handle_task(task)
        return task

    def handle_openai_task(self, task: Task):
        """
        Handle the openai task
        Args:
            task (Task): The task

        Returns:

        """
        if self.openai_handler is None:
            self.openai_handler = OpenAIHandler()
        task = self.openai_handler.handle_task(task)
        return task

    def handle_rag_task(self, task: Task):
        """
        Handle the rag task
        Args:
            task (Task): The task

        Returns:

        """
        if self.rag_handler is None:
            self.rag_handler = RAGHandler()
        task = self.rag_handler.handle_task(task)
        return task

__init__(api_domain, token, task_name='all', time_sleep=1.5)

Initialize the AI Orchestrator Args: api_domain (str): The API Domain token (str): The API Token task_name (str): The task name. Default is "all" time_sleep (float): The time to sleep. Default is 1.5 during each loop

Source code in Agent/main.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def __init__(
    self,
    api_domain: str,
    token: str,
    task_name: Optional[str] = "all",
    time_sleep: Optional[float] = 1.5,
):
    """
    Initialize the AI Orchestrator
    Args:
        api_domain (str): The API Domain
        token (str): The API Token
        task_name (str): The task name. Default is "all"
        time_sleep (float): The time to sleep. Default is 1.5 during each loop
    """
    self.uuid = str(uuid.uuid4())
    self.api_domain = api_domain
    self.token = token
    self.task_name = task_name
    self.api = API(
        domain=api_domain, token=token, task_name=task_name, uuid=self.uuid
    )
    self.api.register_or_update_worker()
    self.storage_solution = self.api.get_storage_solution()
    # controller
    self.counter = 0
    self.time_sleep = time_sleep

    # first check the authentication of the token valid or not
    if not self.authenticate_token():
        raise Exception("Token is not valid")

    if not self.pre_env_check():
        raise Exception("Pre Environment Check Failed")

    self.speech2text = None
    self.text2speech = None
    self.emotion_detection = None
    self.quantization_llm = None
    self.hf_llm = None
    self.general_ml = None
    self.openai_handler = None
    self.rag_handler = None

    self.task_name_router = {
        TaskName.speech2text.value: self.handle_speech2text_task,
        TaskName.text2speech.value: self.handle_text2speech_task,
        TaskName.emotion_detection.value: self.handle_emotion_detection_task,
        TaskName.quantization_llm.value: self.handle_quantization_llm_task,
        TaskName.hf_llm.value: self.handle_hf_llm_task,
        TaskName.general_ml.value: self.handle_general_ml_task,
        TaskName.openai_gpt4o.value: self.handle_openai_task,
        TaskName.openai_speech2text.value: self.handle_openai_task,
        TaskName.openai_text2speech.value: self.handle_openai_task,
        TaskName.openai_gpt4o_text_only.value: self.handle_openai_task,
        TaskName.openai_gpt_4o_text_and_image.value: self.handle_openai_task,
        TaskName.rag.value: self.handle_rag_task,
    }

authenticate_token()

Authenticate the token Returns: bool: True if the token is valid

Source code in Agent/main.py
 97
 98
 99
100
101
102
103
def authenticate_token(self):
    """
    Authenticate the token
    Returns:
        bool: True if the token is valid
    """
    return self.api.verify_token()

handle_emotion_detection_task(task)

Handle the emotion detection task Args: task (Task): The task

Source code in Agent/main.py
191
192
193
194
195
196
197
198
199
200
def handle_emotion_detection_task(self, task: Task):
    """
    Handle the emotion detection task
    Args:
        task (Task): The task
    """
    if self.emotion_detection is None:
        self.emotion_detection = EmotionDetectionHandler()
    task = self.emotion_detection.handle_task(task)
    return task

handle_general_ml_task(task)

Handle the general ml task Args: task (Task): The task

Returns:

Source code in Agent/main.py
227
228
229
230
231
232
233
234
235
236
237
238
239
def handle_general_ml_task(self, task: Task):
    """
    Handle the general ml task
    Args:
        task (Task): The task

    Returns:

    """
    if self.general_ml is None:
        self.general_ml = GeneralMLModel()
    task = self.general_ml.handle_task(task)
    return task

handle_hf_llm_task(task)

Handle the hf llm task which will require more time compare to other tasks Args: task (Task): The task

Returns:

Source code in Agent/main.py
213
214
215
216
217
218
219
220
221
222
223
224
225
def handle_hf_llm_task(self, task: Task):
    """
    Handle the hf llm task which will require more time compare to other tasks
    Args:
        task (Task): The task

    Returns:

    """
    if self.hf_llm is None:
        self.hf_llm = HFLLM()
    task = self.hf_llm.handle_task(task)
    return task

handle_openai_task(task)

Handle the openai task Args: task (Task): The task

Returns:

Source code in Agent/main.py
241
242
243
244
245
246
247
248
249
250
251
252
253
def handle_openai_task(self, task: Task):
    """
    Handle the openai task
    Args:
        task (Task): The task

    Returns:

    """
    if self.openai_handler is None:
        self.openai_handler = OpenAIHandler()
    task = self.openai_handler.handle_task(task)
    return task

handle_quantization_llm_task(task)

Handle the quantization llm task Args: task (Task): The task

Source code in Agent/main.py
202
203
204
205
206
207
208
209
210
211
def handle_quantization_llm_task(self, task: Task):
    """
    Handle the quantization llm task
    Args:
        task (Task): The task
    """
    if self.quantization_llm is None:
        self.quantization_llm = QuantizationLLM(api=self.api)
    task = self.quantization_llm.handle_task(task)
    return task

handle_rag_task(task)

Handle the rag task Args: task (Task): The task

Returns:

Source code in Agent/main.py
255
256
257
258
259
260
261
262
263
264
265
266
267
def handle_rag_task(self, task: Task):
    """
    Handle the rag task
    Args:
        task (Task): The task

    Returns:

    """
    if self.rag_handler is None:
        self.rag_handler = RAGHandler()
    task = self.rag_handler.handle_task(task)
    return task

handle_speech2text_task(task)

Handle the speech2text task Args: task (Task): The task

Source code in Agent/main.py
169
170
171
172
173
174
175
176
177
178
def handle_speech2text_task(self, task: Task):
    """
    Handle the speech2text task
    Args:
        task (Task): The task
    """
    if self.speech2text is None:
        self.speech2text = Speech2Text()
    task = self.speech2text.handle_task(task)
    return task

handle_task(task)

Handle the task Args: task (dict): The task

Source code in Agent/main.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def handle_task(self, task: dict):
    """
    Handle the task
    Args:
        task (dict): The task
    """
    task_obj = Task(**task)
    TimeLogger.log_task(task_obj, "start_task")
    if task_obj.task_name in self.task_name_router:
        task_obj = self.task_name_router[task_obj.task_name](task_obj)
    elif "openai" in task_obj.task_name:
        task_obj = self.handle_openai_task(task_obj)
    else:
        logger.error(f"Unknown task type: {task_obj.task_name}")
        task_obj.result_status = ResultStatus.failed.value
        task_obj.description = f"Unknown task type: {task_obj.task_name}"
    TimeLogger.log_task(task_obj, "end_task")
    # then update the task status
    self.api.post_task_result(task_obj)

handle_text2speech_task(task)

Handle the text2speech task Args: task (Task): The task

Source code in Agent/main.py
180
181
182
183
184
185
186
187
188
189
def handle_text2speech_task(self, task: Task):
    """
    Handle the text2speech task
    Args:
        task (Task): The task
    """
    if self.text2speech is None:
        self.text2speech = Text2Speech()
    task = self.text2speech.handle_task(task)
    return task