Skip to content

LatencyBenchmark

LatencyBenchmark

For each component, we will generally have two values: - model_latency: The time taken by the model to process the data - transfer_latency: The time taken to transfer the data to the model - overall_latency: The time taken by the model to process the data and transfer the data to the model

The whole pipeline latency will be the sum of - all component start end end ts

Another way to output the performance is the Timeline - start will be 0 - and average relative time to 0 for each important time point, plot them in the timeline

Source code in API/orchestrator/metrics/latency_benchmark.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
class LatencyBenchmark:
    """
    For each component, we will generally have two values:
    - model_latency: The time taken by the model to process the data
    - transfer_latency: The time taken to transfer the data to the model
    - overall_latency: The time taken by the model to process the data and transfer the data to the model

    The whole pipeline latency will be the sum of
    - all component start end end ts

    Another way to output the performance is the Timeline
    - start will be 0
    - and average relative time to 0 for each important time point, plot them in the timeline
    """

    def __init__(self, benchmark_cluster: str = CLUSTER_Q_ETE_CONVERSATION_NAME):
        """
        Initialize the benchmark
        Args:
            benchmark_cluster (str): The benchmark cluster
        """
        # if it is a specific name, gather this metric, otherwise, report all existing cluster
        self.benchmark_cluster = benchmark_cluster

    def run(self):
        """
        Run the benchmark
        """
        html_content = ""
        if self.benchmark_cluster == "all":
            for cluster_name in CLUSTERS.keys():
                # add a divider
                html_content += "<hr>"
                html_content += self.process_cluster(cluster_name)
        else:
            if self.benchmark_cluster not in CLUSTERS:
                raise ValueError(f"Cluster {self.benchmark_cluster} not found")
            html_content += "<hr>"
            html_content += self.process_cluster(self.benchmark_cluster)
        return html_content

    def run_detail(self) -> str:
        html_content = ""
        if self.benchmark_cluster == "all":
            for cluster_name in CLUSTERS.keys():
                # add a divider
                html_content += "<hr>"
                html_content += self.process_cluster_detail(cluster_name)
        else:
            if self.benchmark_cluster not in CLUSTERS:
                raise ValueError(f"Cluster {self.benchmark_cluster} not found")
            html_content += "<hr>"
            html_content += self.process_cluster_detail(self.benchmark_cluster)
        return html_content

    def process_cluster(self, cluster_name: str):
        """
        Process the cluster
        Args:
            cluster_name (str): The cluster name
        """
        task_groups, required_tasks_count, tasks = extract_task_group(cluster_name)
        general_desc = f"<h2>Cluster: {cluster_name}</h2>"
        general_desc += f"<p>Required tasks: {required_tasks_count} | Total tasks groups: {len(task_groups)}</p>"
        # loop through the task groups, if the success task is not == required_tasks_count, then we will skip
        success_pipeline = 0
        cluster_latency = []
        for track_id, task_group in task_groups.items():
            success_tasks = [
                task for task in task_group if task.result_status == "completed"
            ]
            if len(success_tasks) != required_tasks_count:
                # the pipeline is not completed, so we will skip
                continue
            success_pipeline += 1
            cluster_latency.append(self.process_task_group(task_group))

        logger.info(
            f"""
                Cluster: {cluster_name}, Success Ratio: {success_pipeline}/{len(task_groups)}
                Required Components: {required_tasks_count}, Total tasks: {len(tasks)}
            """
        )

        general_title = f"Cluster: <b>{cluster_name}</b>, Completed Ratio: {success_pipeline}/{len(task_groups)}"
        # flatten the cluster_latency
        result_df = pd.DataFrame(cluster_latency)
        # get the column split with _ from right, and left element is the component name

        if len(result_df) != 0:
            logger.debug(result_df.describe())
            # result_df.to_csv(settings.LOG_DIR / f"{cluster_name}_benchmark.csv")
            # to html and return it
            logger.debug(result_df.describe())
            desc = result_df.describe().transpose()
            desc = desc.round(4)

            # add another column
            # Extract model accuracy from index and add it as a new column
            desc["latency_type"] = desc.index.str.rsplit("_", n=2).str[1]
            # then update the index to two columns, first will be component
            desc.index = desc.index.str.rsplit("_", n=2, expand=True).get_level_values(
                0
            )
            # reset index, get the index to be the column component
            desc = desc.reset_index()
            # rename the index to be component
            desc = desc.rename(columns={"index": "component"})
            desc_html = self.plot_table(desc, title=f" ({general_title})")
            plot_html = self.plot_distribution(result_df, title=f" ({general_title})")

            return general_desc + desc_html + plot_html
        return general_desc

    def process_cluster_detail(self, cluster_name: str) -> str:
        """
        Process the cluster in detail
        Even if the track is not finished, we will still plot it and stop status
        Args:
            cluster_name (str): html content

        Returns:

        """
        task_groups, required_tasks_count, tasks = extract_task_group(cluster_name)
        general_desc = f"<h2>Cluster: {cluster_name}</h2>"
        general_desc += f"<p>Required tasks: {required_tasks_count} | Total tasks groups: {len(task_groups)}</p>"
        # loop through the task groups, if the success task is not == required_tasks_count, then we will skip
        success_pipeline = 0
        cluster_latency = []
        cluster_ts_latency = []
        cluster_tp_latency = []
        for track_id, task_group in task_groups.items():
            success_tasks = [
                task for task in task_group if task.result_status == "completed"
            ]
            if len(success_tasks) == required_tasks_count:
                # the pipeline is not completed, so we will skip
                success_pipeline += 1
            cluster_latency.append(self.process_task_group_detail(task_group))
            cluster_ts_latency.append(
                self.process_task_group_detail_timeline(task_group)
            )
            cluster_tp_latency.append(
                self.process_task_group_detail_timeline(task_group, timeline=True)
            )
        general_title = f"Cluster: <b>{cluster_name}</b>, Completed Ratio: {success_pipeline}/{len(task_groups)}"
        result_df = pd.DataFrame(cluster_latency)
        if len(result_df) == 0:
            return general_desc

        # only keep the last element in the track_id
        result_df["track_id"] = result_df["track_id"].str.split("-").str[-1]
        # get result into multiple level column, which will split current column into multiple level column name
        # Split the column names into three parts, but only keep the first two
        split_columns = result_df.columns.str.rsplit("_", n=2, expand=True)

        # we only need the first two level, so we will get the first two level
        result_df.columns = [
            split_columns.get_level_values(0),
            split_columns.get_level_values(1),
        ]
        # sort the column
        track_tasks_html = self.plot_table(result_df, title=f" ({general_title})")

        # cluster ts latency
        result_ts_df = pd.DataFrame(cluster_ts_latency)
        # result_ts_df.to_csv(settings.LOG_DIR / f"{cluster_name}_ts_benchmark.csv")
        if len(result_ts_df) == 0:
            return track_tasks_html
        # we will plot a bar
        ts_stacked_html = self.plot_stacked_timeline(result_ts_df, title=general_title)

        # grab the time point latency, and try to draw time point html
        result_tp_df = pd.DataFrame(cluster_tp_latency)
        # result_tp_df.to_csv(settings.LOG_DIR / f"{cluster_name}_tp_benchmark.csv")
        ts_timepoint_html = self.plot_timestamp_timeline_depth(
            result_tp_df, title=general_title
        )
        return general_desc + track_tasks_html + ts_stacked_html + ts_timepoint_html

    @staticmethod
    def process_task_group(task_track: List[Task]):
        """
        This will process each component, and then extract the transfer and model latency total

        Args:
            task_track (List[Task]): The task track

        Returns:
            dict: The benchmark result
        """
        result = {
            "track_id": task_track[0].track_id,
        }
        task_names = get_task_names_order(result["track_id"])
        for task in task_track:
            latency_profile = task.result_json.get("latency_profile", {})
            # NOTE: this will require client side do not log overlap durations
            model_latency = 0
            transfer_latency = 0
            logger.debug(latency_profile)
            task_start_time = None
            task_end_time = None
            for key, value in latency_profile.items():
                if key.startswith("model"):
                    model_latency += float(value)
                if key.startswith("transfer"):
                    transfer_latency += float(value)
                if key.startswith("ts"):
                    if key == "ts_start_task":
                        task_start_time = value
                    if key == "ts_end_task":
                        task_end_time = value
            result[f"{task.task_name}_model_latency"] = model_latency
            result[f"{task.task_name}_transfer_latency"] = transfer_latency
            # look for the ts_start_task and ts_end_task, and the overall_latency should be that value
            # process time into datetime object
            # ts_end_trigger_emotion_model 2024-07-01T14:58:36.419352
            if task_start_time and task_end_time:
                task_start_time_dt = str_to_datetime(task_start_time)
                task_end_time_dt = str_to_datetime(task_end_time)
                result[f"{task.task_name}_overall_latency"] = (  # noqa
                    task_end_time_dt - task_start_time_dt
                ).total_seconds()

            else:
                logger.error(f"Task {task.task_name} does not have start and end time")
                result[f"{task.task_name}_overall_latency"] = (
                    model_latency + transfer_latency
                )
        # total_latency should be the sum of all the overall_latency
        total_latency = 0
        for key, value in result.items():
            if key.endswith("overall_latency"):
                total_latency += value
        result["total_latency"] = total_latency
        # loop all value, get it to decimal 4
        for key, value in result.items():
            if isinstance(value, float):
                result[key] = round(value, 4)

        ordered_result = {
            "track_id": result["track_id"],
        }
        for task_name in task_names:
            ordered_result[task_name + "_model_latency"] = result[
                task_name + "_model_latency"
            ]
            ordered_result[task_name + "_transfer_latency"] = result[
                task_name + "_transfer_latency"
            ]
            ordered_result[task_name + "_overall_latency"] = result[
                task_name + "_overall_latency"
            ]
        ordered_result["total_latency"] = result["total_latency"]
        return ordered_result

    @staticmethod
    def process_task_group_detail(task_track: List[Task]):
        """
        This will process each component, and then extract the transfer and model latency total

        Args:
            task_track (List[Task]): The task track

        Returns:
            dict: The benchmark result
        """
        result = {
            "track_id": task_track[0].track_id,
        }
        task_names = get_task_names_order(result["track_id"])
        for task in task_track:
            if task.result_status != "completed":
                result[f"{task.task_name}_model_latency"] = task.result_status
                result[f"{task.task_name}_transfer_latency"] = task.result_status
                result[f"{task.task_name}_overall_latency"] = task.result_status
                continue
            latency_profile = task.result_json.get("latency_profile", {})
            # NOTE: this will require client side do not log overlap durations
            model_latency = 0
            transfer_latency = 0
            logger.debug(latency_profile)
            task_start_time = None
            task_end_time = None
            for key, value in latency_profile.items():
                if key.startswith("model"):
                    model_latency += float(value)
                if key.startswith("transfer"):
                    transfer_latency += float(value)
                if key.startswith("ts"):
                    if key == "ts_start_task":
                        task_start_time = value
                    if key == "ts_end_task":
                        task_end_time = value
            result[f"{task.task_name}_model_latency"] = model_latency
            result[f"{task.task_name}_transfer_latency"] = transfer_latency
            # look for the ts_start_task and ts_end_task, and the overall_latency should be that value
            # process time into datetime object
            # ts_end_trigger_emotion_model 2024-07-01T14:58:36.419352
            if task_start_time and task_end_time:
                task_start_time_dt = str_to_datetime(task_start_time)
                task_end_time_dt = str_to_datetime(task_end_time)
                result[f"{task.task_name}_overall_latency"] = (  # noqa
                    task_end_time_dt - task_start_time_dt
                ).total_seconds()

            else:
                logger.error(f"Task {task.task_name} does not have start and end time")
                result[f"{task.task_name}_overall_latency"] = (
                    model_latency + transfer_latency
                )

        # sort the key to be the same as the cluster order, also if missed, fill it with missing
        for task_name in task_names:
            if f"{task_name}_overall_latency" not in result:
                result[task_name + "_model_latency"] = "missing"
                result[task_name + "_transfer_latency"] = "missing"
                result[task_name + "_overall_latency"] = "missing"

        # total_latency should be the sum of all the overall_latency
        total_latency = 0
        for key, value in result.items():
            if key.endswith("overall_latency") and isinstance(value, float):
                total_latency += value
            elif key.endswith("overall_latency") and not isinstance(value, float):
                total_latency = "incomplete"
                break
        result["total_latency"] = total_latency
        # loop all value, get it to decimal 4
        for key, value in result.items():
            if isinstance(value, float):
                result[key] = round(value, 4)

        ordered_result = {
            "track_id": result["track_id"],
        }
        for task_name in task_names:
            ordered_result[task_name + "_model_latency"] = result[
                task_name + "_model_latency"
            ]
            ordered_result[task_name + "_transfer_latency"] = result[
                task_name + "_transfer_latency"
            ]
            ordered_result[task_name + "_overall_latency"] = result[
                task_name + "_overall_latency"
            ]

        ordered_result["total_latency"] = result["total_latency"]
        return ordered_result

    @staticmethod
    def process_task_group_detail_timeline(
        task_track: List[Task], timeline: bool = False
    ):
        """
        Based on the result_json => latency_profile
        We will gather the time point for each, and then change to the relative second value compared to start point

        If timeline is True, we will only grab the timestamp information.
        Otherwise, we will calculate the relative time to the start point

        In the end, we will grab the
        Args:
            task_track (List[Task]): The task track
            timeline (bool): If we want to plot the timeline

        Returns:

        """
        result = {
            "track_id": task_track[0].track_id,
        }

        task_names = get_task_names_order(result["track_id"])

        task_results = {}
        for task in task_track:
            if task.result_status != "completed":
                continue
            latency_profile = task.result_json.get("latency_profile", {})
            task_result = {}
            for key, value in latency_profile.items():
                if key.startswith("ts"):
                    task_result[key] = str_to_datetime(value)

            if timeline is False:
                # sort out the whole task_result based on time timestamp
                # and then calculate the relative time to the previous component
                sorted_task_result = dict(
                    sorted(task_result.items(), key=lambda item: item[1])
                )
                previous_time = None
                task_relative_time = {}
                for key, value in sorted_task_result.items():
                    if previous_time is None:
                        task_relative_time[key] = 0
                    else:
                        task_relative_time[key] = (
                            value - previous_time
                        ).total_seconds()
                    previous_time = value
                task_results[task.task_name] = task_relative_time
            else:
                task_results[task.task_name] = task_result

        # sort the key to be the same as the cluster order, calculate the value to add up the previous component
        first_start_task = None
        for task_name in task_names:
            if task_name not in task_results:
                break
            for key, value in task_results[task_name].items():
                new_key = f"{task_name}_{key.split('_', 1)[1]}"
                if key == "ts_start_task":
                    if first_start_task is None:
                        first_start_task = value
                    else:
                        continue
                if new_key not in result:
                    result[new_key] = value

        return result

    @staticmethod
    def plot_table(df: pd.DataFrame, title: str = "") -> str:
        """
        Plot the table
        Args:
            df (pd.DataFrame): The dataframe
            title (str): The title

        Returns:
            str: The plot in HTML
        """
        colors = []
        for col in df.columns:
            col_colors = []
            for val in df[col]:
                if isinstance(val, float) or isinstance(val, int):
                    col_colors.append("lavender")
                else:
                    if val == "missing":
                        col_colors.append("lightcoral")
                    elif val == "started":
                        col_colors.append("lightyellow")
                    elif val == "failed":
                        col_colors.append("lightcoral")
                    elif val == "pending":
                        col_colors.append("lightblue")
                    elif val == "incomplete":
                        col_colors.append("lightgrey")
                    else:
                        col_colors.append("lightgreen")
            colors.append(col_colors)
        # Create a Plotly table
        fig = go.Figure(
            data=[
                go.Table(
                    header=dict(
                        values=[
                            (
                                [f"<b>{c.upper()}</b>" for c in col]
                                if isinstance(col, tuple)
                                else f"<b>{col.upper()}</b>"
                            )
                            for col in df.columns
                        ],
                        fill_color="paleturquoise",
                        align="left",
                    ),
                    cells=dict(
                        values=[df[col] for col in df.columns],
                        fill_color=colors,
                        align="left",
                    ),
                )
            ]
        )
        fig.update_layout(
            title={
                "text": f"Latency Summary: {title}",
                "x": 0.5,
                "xanchor": "center",
                "yanchor": "top",
            },
            #     update margin to be 0
            margin=dict(l=10, r=10, b=0),
            # get the height to be whatever it requires
            height=max((len(df) * 35), 300),
        )
        # Update layout for better appearance
        desc_html = fig.to_html(full_html=False)
        return desc_html

    @staticmethod
    def plot_distribution(df: pd.DataFrame, title: str = "") -> str:
        """
        Plot the distribution of the latency
        Args:
            df (pd.DataFrame): The dataframe
            title (str): The title

        Returns:
            str: The plot in HTML
        """
        # plot the distribution for each column
        # Calculate mean and max for each latency column
        mean_latencies = df[df.columns[1:]].mean()
        max_latencies = df[df.columns[1:]].max()
        min_latencies = df[df.columns[1:]].min()

        # Create a Plotly figure
        fig = go.Figure()
        # Add min latencies to the figure
        fig.add_trace(
            go.Bar(x=min_latencies.index, y=min_latencies.values, name="Min Latency")
        )
        # Add mean latencies to the figure
        fig.add_trace(
            go.Bar(x=mean_latencies.index, y=mean_latencies.values, name="Mean Latency")
        )

        # Add max latencies to the figure
        fig.add_trace(
            go.Bar(x=max_latencies.index, y=max_latencies.values, name="Max Latency")
        )

        # Customize the layout
        fig.update_layout(
            title={
                "text": "Latency Distribution" + title,
                "x": 0.5,
                "xanchor": "center",
                "yanchor": "top",
            },
            xaxis_title="Component and Latency",
            yaxis_title="Latency (s)",
            barmode="group",
            margin=dict(l=10, r=10, b=0),
        )

        # Convert Plotly figure to HTML
        plot_html = fig.to_html(full_html=False)
        return plot_html

    @staticmethod
    def plot_stacked_timeline(df: pd.DataFrame, title: str) -> str:
        """
        Plot the stacked timeline
        Args:
            df (pd.DataFrame): The dataframe
            title (str): The title

        Returns:

        """
        # Create a Plotly figure
        fig = go.Figure()
        # get the track id to be the stacked one
        df["track_id"] = df["track_id"].str.split("-").str[-1]
        # Add a trace for each component
        for col in df.columns[1:]:
            fig.add_trace(
                go.Bar(
                    y=df["track_id"],
                    x=df[col],
                    name=col,
                    orientation="h",
                    hovertemplate="%{x}<br>%{fullData.name}<extra></extra>",
                )
            )

        # Customize the layout
        fig.update_layout(
            title={
                "text": f"Time Interval in Seconds ({title})",
                "x": 0.5,
                "xanchor": "center",
                "yanchor": "top",
            },
            xaxis_title="Relative in Seconds to Start Time",
            yaxis_title="Track ID",
            barmode="stack",
            height=max((len(df) * 35), 300),
        )

        # Convert Plotly figure to HTML
        plot_html = fig.to_html(full_html=False)
        return plot_html

    @staticmethod
    def plot_timestamp_timeline_depth(df: pd.DataFrame, title: str) -> str:
        """
        Plot the timestamp timeline
        Args:
            df (pd.DataFrame): The dataframe
            title (str): The title

        Returns:
            str: The plot in HTML
        """
        fig = go.Figure()
        y_values = list(range(len(df)))
        shapes = []
        for y_value in y_values:
            shapes.append(
                dict(
                    type="line",
                    xref="paper",
                    x0=0,
                    x1=1,
                    yref="y",
                    y0=y_value,
                    y1=y_value,
                    line=dict(color="grey", width=1, dash="dot"),
                )
            )
        y_labels = []

        legend_added = {}
        # Use Plotly's qualitative color sequence 'Dark24' to generate a spectrum of colors
        colors = pcolors.qualitative.Dark24

        # Dynamically generate a color map for each column
        column_colors = {
            col: colors[i % len(colors)] for i, col in enumerate(df.columns[1:])
        }
        for i, row in df.iterrows():
            y_value = y_values[i]
            y_labels.append(row["track_id"].split("-")[-1])
            for col in df.columns[1:]:
                if not pd.isna(row[col]):
                    show_legend = False
                    if col not in legend_added:
                        show_legend = True
                        legend_added[col] = True
                    fig.add_trace(
                        go.Scatter(
                            x=[row[col]],
                            y=[y_value],
                            mode="markers",
                            marker=dict(size=10, color=column_colors[col]),
                            name=f"{col}",
                            hovertemplate="%{x}<br>%{fullData.name}<extra></extra>",
                            showlegend=show_legend,
                        )
                    )
        # Customize the layout
        fig.update_layout(
            title={
                "text": f"Timeline of Events ({title})",
                "x": 0.5,
                "xanchor": "center",
                "yanchor": "top",
            },
            xaxis_title="Time",
            yaxis=dict(
                showline=False,
                showgrid=True,
                zeroline=False,
                tickvals=y_values,
                ticktext=y_labels,
                title="Track ID",
            ),
            showlegend=True,
            shapes=shapes,
            height=max((len(df) * 35), 300),
        )
        # Convert Plotly figure to HTML
        plot_html = fig.to_html(full_html=False)
        return plot_html

    @staticmethod
    def plot_timestamp_timeline(df: pd.DataFrame) -> str:
        """
        Plot the timestamp timeline
        Args:
            df (pd.DataFrame): The dataframe

        Returns:
            str: The plot in HTML
        """
        fig = go.Figure()
        y_values = range(len(df))

        for i, row in df.iterrows():
            y_value = y_values[i]
            for col in df.columns[1:]:
                if not pd.isna(row[col]):
                    fig.add_trace(
                        go.Scatter(
                            x=[row[col]],
                            y=[y_value],
                            mode="markers",
                            marker=dict(size=10),
                            name=f"{col}",
                            hovertemplate="%{x}<br>%{fullData.name}<extra></extra>",
                        )
                    )
            # break
        # Customize the layout
        fig.update_layout(
            title="Timeline of Time Points",
            xaxis_title="Time",
            # show nothing of y, even the label
            yaxis=dict(
                showticklabels=False, showline=False, showgrid=False, zeroline=True
            ),
            showlegend=True,
        )
        # Convert Plotly figure to HTML
        plot_html = fig.to_html(full_html=False)
        return plot_html

__init__(benchmark_cluster=CLUSTER_Q_ETE_CONVERSATION_NAME)

Initialize the benchmark Args: benchmark_cluster (str): The benchmark cluster

Source code in API/orchestrator/metrics/latency_benchmark.py
34
35
36
37
38
39
40
41
def __init__(self, benchmark_cluster: str = CLUSTER_Q_ETE_CONVERSATION_NAME):
    """
    Initialize the benchmark
    Args:
        benchmark_cluster (str): The benchmark cluster
    """
    # if it is a specific name, gather this metric, otherwise, report all existing cluster
    self.benchmark_cluster = benchmark_cluster

plot_distribution(df, title='') staticmethod

Plot the distribution of the latency Args: df (pd.DataFrame): The dataframe title (str): The title

Returns:

Name Type Description
str str

The plot in HTML

Source code in API/orchestrator/metrics/latency_benchmark.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
@staticmethod
def plot_distribution(df: pd.DataFrame, title: str = "") -> str:
    """
    Plot the distribution of the latency
    Args:
        df (pd.DataFrame): The dataframe
        title (str): The title

    Returns:
        str: The plot in HTML
    """
    # plot the distribution for each column
    # Calculate mean and max for each latency column
    mean_latencies = df[df.columns[1:]].mean()
    max_latencies = df[df.columns[1:]].max()
    min_latencies = df[df.columns[1:]].min()

    # Create a Plotly figure
    fig = go.Figure()
    # Add min latencies to the figure
    fig.add_trace(
        go.Bar(x=min_latencies.index, y=min_latencies.values, name="Min Latency")
    )
    # Add mean latencies to the figure
    fig.add_trace(
        go.Bar(x=mean_latencies.index, y=mean_latencies.values, name="Mean Latency")
    )

    # Add max latencies to the figure
    fig.add_trace(
        go.Bar(x=max_latencies.index, y=max_latencies.values, name="Max Latency")
    )

    # Customize the layout
    fig.update_layout(
        title={
            "text": "Latency Distribution" + title,
            "x": 0.5,
            "xanchor": "center",
            "yanchor": "top",
        },
        xaxis_title="Component and Latency",
        yaxis_title="Latency (s)",
        barmode="group",
        margin=dict(l=10, r=10, b=0),
    )

    # Convert Plotly figure to HTML
    plot_html = fig.to_html(full_html=False)
    return plot_html

plot_stacked_timeline(df, title) staticmethod

Plot the stacked timeline Args: df (pd.DataFrame): The dataframe title (str): The title

Returns:

Source code in API/orchestrator/metrics/latency_benchmark.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
@staticmethod
def plot_stacked_timeline(df: pd.DataFrame, title: str) -> str:
    """
    Plot the stacked timeline
    Args:
        df (pd.DataFrame): The dataframe
        title (str): The title

    Returns:

    """
    # Create a Plotly figure
    fig = go.Figure()
    # get the track id to be the stacked one
    df["track_id"] = df["track_id"].str.split("-").str[-1]
    # Add a trace for each component
    for col in df.columns[1:]:
        fig.add_trace(
            go.Bar(
                y=df["track_id"],
                x=df[col],
                name=col,
                orientation="h",
                hovertemplate="%{x}<br>%{fullData.name}<extra></extra>",
            )
        )

    # Customize the layout
    fig.update_layout(
        title={
            "text": f"Time Interval in Seconds ({title})",
            "x": 0.5,
            "xanchor": "center",
            "yanchor": "top",
        },
        xaxis_title="Relative in Seconds to Start Time",
        yaxis_title="Track ID",
        barmode="stack",
        height=max((len(df) * 35), 300),
    )

    # Convert Plotly figure to HTML
    plot_html = fig.to_html(full_html=False)
    return plot_html

plot_table(df, title='') staticmethod

Plot the table Args: df (pd.DataFrame): The dataframe title (str): The title

Returns:

Name Type Description
str str

The plot in HTML

Source code in API/orchestrator/metrics/latency_benchmark.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
@staticmethod
def plot_table(df: pd.DataFrame, title: str = "") -> str:
    """
    Plot the table
    Args:
        df (pd.DataFrame): The dataframe
        title (str): The title

    Returns:
        str: The plot in HTML
    """
    colors = []
    for col in df.columns:
        col_colors = []
        for val in df[col]:
            if isinstance(val, float) or isinstance(val, int):
                col_colors.append("lavender")
            else:
                if val == "missing":
                    col_colors.append("lightcoral")
                elif val == "started":
                    col_colors.append("lightyellow")
                elif val == "failed":
                    col_colors.append("lightcoral")
                elif val == "pending":
                    col_colors.append("lightblue")
                elif val == "incomplete":
                    col_colors.append("lightgrey")
                else:
                    col_colors.append("lightgreen")
        colors.append(col_colors)
    # Create a Plotly table
    fig = go.Figure(
        data=[
            go.Table(
                header=dict(
                    values=[
                        (
                            [f"<b>{c.upper()}</b>" for c in col]
                            if isinstance(col, tuple)
                            else f"<b>{col.upper()}</b>"
                        )
                        for col in df.columns
                    ],
                    fill_color="paleturquoise",
                    align="left",
                ),
                cells=dict(
                    values=[df[col] for col in df.columns],
                    fill_color=colors,
                    align="left",
                ),
            )
        ]
    )
    fig.update_layout(
        title={
            "text": f"Latency Summary: {title}",
            "x": 0.5,
            "xanchor": "center",
            "yanchor": "top",
        },
        #     update margin to be 0
        margin=dict(l=10, r=10, b=0),
        # get the height to be whatever it requires
        height=max((len(df) * 35), 300),
    )
    # Update layout for better appearance
    desc_html = fig.to_html(full_html=False)
    return desc_html

plot_timestamp_timeline(df) staticmethod

Plot the timestamp timeline Args: df (pd.DataFrame): The dataframe

Returns:

Name Type Description
str str

The plot in HTML

Source code in API/orchestrator/metrics/latency_benchmark.py
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
@staticmethod
def plot_timestamp_timeline(df: pd.DataFrame) -> str:
    """
    Plot the timestamp timeline
    Args:
        df (pd.DataFrame): The dataframe

    Returns:
        str: The plot in HTML
    """
    fig = go.Figure()
    y_values = range(len(df))

    for i, row in df.iterrows():
        y_value = y_values[i]
        for col in df.columns[1:]:
            if not pd.isna(row[col]):
                fig.add_trace(
                    go.Scatter(
                        x=[row[col]],
                        y=[y_value],
                        mode="markers",
                        marker=dict(size=10),
                        name=f"{col}",
                        hovertemplate="%{x}<br>%{fullData.name}<extra></extra>",
                    )
                )
        # break
    # Customize the layout
    fig.update_layout(
        title="Timeline of Time Points",
        xaxis_title="Time",
        # show nothing of y, even the label
        yaxis=dict(
            showticklabels=False, showline=False, showgrid=False, zeroline=True
        ),
        showlegend=True,
    )
    # Convert Plotly figure to HTML
    plot_html = fig.to_html(full_html=False)
    return plot_html

plot_timestamp_timeline_depth(df, title) staticmethod

Plot the timestamp timeline Args: df (pd.DataFrame): The dataframe title (str): The title

Returns:

Name Type Description
str str

The plot in HTML

Source code in API/orchestrator/metrics/latency_benchmark.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
@staticmethod
def plot_timestamp_timeline_depth(df: pd.DataFrame, title: str) -> str:
    """
    Plot the timestamp timeline
    Args:
        df (pd.DataFrame): The dataframe
        title (str): The title

    Returns:
        str: The plot in HTML
    """
    fig = go.Figure()
    y_values = list(range(len(df)))
    shapes = []
    for y_value in y_values:
        shapes.append(
            dict(
                type="line",
                xref="paper",
                x0=0,
                x1=1,
                yref="y",
                y0=y_value,
                y1=y_value,
                line=dict(color="grey", width=1, dash="dot"),
            )
        )
    y_labels = []

    legend_added = {}
    # Use Plotly's qualitative color sequence 'Dark24' to generate a spectrum of colors
    colors = pcolors.qualitative.Dark24

    # Dynamically generate a color map for each column
    column_colors = {
        col: colors[i % len(colors)] for i, col in enumerate(df.columns[1:])
    }
    for i, row in df.iterrows():
        y_value = y_values[i]
        y_labels.append(row["track_id"].split("-")[-1])
        for col in df.columns[1:]:
            if not pd.isna(row[col]):
                show_legend = False
                if col not in legend_added:
                    show_legend = True
                    legend_added[col] = True
                fig.add_trace(
                    go.Scatter(
                        x=[row[col]],
                        y=[y_value],
                        mode="markers",
                        marker=dict(size=10, color=column_colors[col]),
                        name=f"{col}",
                        hovertemplate="%{x}<br>%{fullData.name}<extra></extra>",
                        showlegend=show_legend,
                    )
                )
    # Customize the layout
    fig.update_layout(
        title={
            "text": f"Timeline of Events ({title})",
            "x": 0.5,
            "xanchor": "center",
            "yanchor": "top",
        },
        xaxis_title="Time",
        yaxis=dict(
            showline=False,
            showgrid=True,
            zeroline=False,
            tickvals=y_values,
            ticktext=y_labels,
            title="Track ID",
        ),
        showlegend=True,
        shapes=shapes,
        height=max((len(df) * 35), 300),
    )
    # Convert Plotly figure to HTML
    plot_html = fig.to_html(full_html=False)
    return plot_html

process_cluster(cluster_name)

Process the cluster Args: cluster_name (str): The cluster name

Source code in API/orchestrator/metrics/latency_benchmark.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def process_cluster(self, cluster_name: str):
    """
    Process the cluster
    Args:
        cluster_name (str): The cluster name
    """
    task_groups, required_tasks_count, tasks = extract_task_group(cluster_name)
    general_desc = f"<h2>Cluster: {cluster_name}</h2>"
    general_desc += f"<p>Required tasks: {required_tasks_count} | Total tasks groups: {len(task_groups)}</p>"
    # loop through the task groups, if the success task is not == required_tasks_count, then we will skip
    success_pipeline = 0
    cluster_latency = []
    for track_id, task_group in task_groups.items():
        success_tasks = [
            task for task in task_group if task.result_status == "completed"
        ]
        if len(success_tasks) != required_tasks_count:
            # the pipeline is not completed, so we will skip
            continue
        success_pipeline += 1
        cluster_latency.append(self.process_task_group(task_group))

    logger.info(
        f"""
            Cluster: {cluster_name}, Success Ratio: {success_pipeline}/{len(task_groups)}
            Required Components: {required_tasks_count}, Total tasks: {len(tasks)}
        """
    )

    general_title = f"Cluster: <b>{cluster_name}</b>, Completed Ratio: {success_pipeline}/{len(task_groups)}"
    # flatten the cluster_latency
    result_df = pd.DataFrame(cluster_latency)
    # get the column split with _ from right, and left element is the component name

    if len(result_df) != 0:
        logger.debug(result_df.describe())
        # result_df.to_csv(settings.LOG_DIR / f"{cluster_name}_benchmark.csv")
        # to html and return it
        logger.debug(result_df.describe())
        desc = result_df.describe().transpose()
        desc = desc.round(4)

        # add another column
        # Extract model accuracy from index and add it as a new column
        desc["latency_type"] = desc.index.str.rsplit("_", n=2).str[1]
        # then update the index to two columns, first will be component
        desc.index = desc.index.str.rsplit("_", n=2, expand=True).get_level_values(
            0
        )
        # reset index, get the index to be the column component
        desc = desc.reset_index()
        # rename the index to be component
        desc = desc.rename(columns={"index": "component"})
        desc_html = self.plot_table(desc, title=f" ({general_title})")
        plot_html = self.plot_distribution(result_df, title=f" ({general_title})")

        return general_desc + desc_html + plot_html
    return general_desc

process_cluster_detail(cluster_name)

Process the cluster in detail Even if the track is not finished, we will still plot it and stop status Args: cluster_name (str): html content

Returns:

Source code in API/orchestrator/metrics/latency_benchmark.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def process_cluster_detail(self, cluster_name: str) -> str:
    """
    Process the cluster in detail
    Even if the track is not finished, we will still plot it and stop status
    Args:
        cluster_name (str): html content

    Returns:

    """
    task_groups, required_tasks_count, tasks = extract_task_group(cluster_name)
    general_desc = f"<h2>Cluster: {cluster_name}</h2>"
    general_desc += f"<p>Required tasks: {required_tasks_count} | Total tasks groups: {len(task_groups)}</p>"
    # loop through the task groups, if the success task is not == required_tasks_count, then we will skip
    success_pipeline = 0
    cluster_latency = []
    cluster_ts_latency = []
    cluster_tp_latency = []
    for track_id, task_group in task_groups.items():
        success_tasks = [
            task for task in task_group if task.result_status == "completed"
        ]
        if len(success_tasks) == required_tasks_count:
            # the pipeline is not completed, so we will skip
            success_pipeline += 1
        cluster_latency.append(self.process_task_group_detail(task_group))
        cluster_ts_latency.append(
            self.process_task_group_detail_timeline(task_group)
        )
        cluster_tp_latency.append(
            self.process_task_group_detail_timeline(task_group, timeline=True)
        )
    general_title = f"Cluster: <b>{cluster_name}</b>, Completed Ratio: {success_pipeline}/{len(task_groups)}"
    result_df = pd.DataFrame(cluster_latency)
    if len(result_df) == 0:
        return general_desc

    # only keep the last element in the track_id
    result_df["track_id"] = result_df["track_id"].str.split("-").str[-1]
    # get result into multiple level column, which will split current column into multiple level column name
    # Split the column names into three parts, but only keep the first two
    split_columns = result_df.columns.str.rsplit("_", n=2, expand=True)

    # we only need the first two level, so we will get the first two level
    result_df.columns = [
        split_columns.get_level_values(0),
        split_columns.get_level_values(1),
    ]
    # sort the column
    track_tasks_html = self.plot_table(result_df, title=f" ({general_title})")

    # cluster ts latency
    result_ts_df = pd.DataFrame(cluster_ts_latency)
    # result_ts_df.to_csv(settings.LOG_DIR / f"{cluster_name}_ts_benchmark.csv")
    if len(result_ts_df) == 0:
        return track_tasks_html
    # we will plot a bar
    ts_stacked_html = self.plot_stacked_timeline(result_ts_df, title=general_title)

    # grab the time point latency, and try to draw time point html
    result_tp_df = pd.DataFrame(cluster_tp_latency)
    # result_tp_df.to_csv(settings.LOG_DIR / f"{cluster_name}_tp_benchmark.csv")
    ts_timepoint_html = self.plot_timestamp_timeline_depth(
        result_tp_df, title=general_title
    )
    return general_desc + track_tasks_html + ts_stacked_html + ts_timepoint_html

process_task_group(task_track) staticmethod

This will process each component, and then extract the transfer and model latency total

Parameters:

Name Type Description Default
task_track List[Task]

The task track

required

Returns:

Name Type Description
dict

The benchmark result

Source code in API/orchestrator/metrics/latency_benchmark.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
@staticmethod
def process_task_group(task_track: List[Task]):
    """
    This will process each component, and then extract the transfer and model latency total

    Args:
        task_track (List[Task]): The task track

    Returns:
        dict: The benchmark result
    """
    result = {
        "track_id": task_track[0].track_id,
    }
    task_names = get_task_names_order(result["track_id"])
    for task in task_track:
        latency_profile = task.result_json.get("latency_profile", {})
        # NOTE: this will require client side do not log overlap durations
        model_latency = 0
        transfer_latency = 0
        logger.debug(latency_profile)
        task_start_time = None
        task_end_time = None
        for key, value in latency_profile.items():
            if key.startswith("model"):
                model_latency += float(value)
            if key.startswith("transfer"):
                transfer_latency += float(value)
            if key.startswith("ts"):
                if key == "ts_start_task":
                    task_start_time = value
                if key == "ts_end_task":
                    task_end_time = value
        result[f"{task.task_name}_model_latency"] = model_latency
        result[f"{task.task_name}_transfer_latency"] = transfer_latency
        # look for the ts_start_task and ts_end_task, and the overall_latency should be that value
        # process time into datetime object
        # ts_end_trigger_emotion_model 2024-07-01T14:58:36.419352
        if task_start_time and task_end_time:
            task_start_time_dt = str_to_datetime(task_start_time)
            task_end_time_dt = str_to_datetime(task_end_time)
            result[f"{task.task_name}_overall_latency"] = (  # noqa
                task_end_time_dt - task_start_time_dt
            ).total_seconds()

        else:
            logger.error(f"Task {task.task_name} does not have start and end time")
            result[f"{task.task_name}_overall_latency"] = (
                model_latency + transfer_latency
            )
    # total_latency should be the sum of all the overall_latency
    total_latency = 0
    for key, value in result.items():
        if key.endswith("overall_latency"):
            total_latency += value
    result["total_latency"] = total_latency
    # loop all value, get it to decimal 4
    for key, value in result.items():
        if isinstance(value, float):
            result[key] = round(value, 4)

    ordered_result = {
        "track_id": result["track_id"],
    }
    for task_name in task_names:
        ordered_result[task_name + "_model_latency"] = result[
            task_name + "_model_latency"
        ]
        ordered_result[task_name + "_transfer_latency"] = result[
            task_name + "_transfer_latency"
        ]
        ordered_result[task_name + "_overall_latency"] = result[
            task_name + "_overall_latency"
        ]
    ordered_result["total_latency"] = result["total_latency"]
    return ordered_result

process_task_group_detail(task_track) staticmethod

This will process each component, and then extract the transfer and model latency total

Parameters:

Name Type Description Default
task_track List[Task]

The task track

required

Returns:

Name Type Description
dict

The benchmark result

Source code in API/orchestrator/metrics/latency_benchmark.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
@staticmethod
def process_task_group_detail(task_track: List[Task]):
    """
    This will process each component, and then extract the transfer and model latency total

    Args:
        task_track (List[Task]): The task track

    Returns:
        dict: The benchmark result
    """
    result = {
        "track_id": task_track[0].track_id,
    }
    task_names = get_task_names_order(result["track_id"])
    for task in task_track:
        if task.result_status != "completed":
            result[f"{task.task_name}_model_latency"] = task.result_status
            result[f"{task.task_name}_transfer_latency"] = task.result_status
            result[f"{task.task_name}_overall_latency"] = task.result_status
            continue
        latency_profile = task.result_json.get("latency_profile", {})
        # NOTE: this will require client side do not log overlap durations
        model_latency = 0
        transfer_latency = 0
        logger.debug(latency_profile)
        task_start_time = None
        task_end_time = None
        for key, value in latency_profile.items():
            if key.startswith("model"):
                model_latency += float(value)
            if key.startswith("transfer"):
                transfer_latency += float(value)
            if key.startswith("ts"):
                if key == "ts_start_task":
                    task_start_time = value
                if key == "ts_end_task":
                    task_end_time = value
        result[f"{task.task_name}_model_latency"] = model_latency
        result[f"{task.task_name}_transfer_latency"] = transfer_latency
        # look for the ts_start_task and ts_end_task, and the overall_latency should be that value
        # process time into datetime object
        # ts_end_trigger_emotion_model 2024-07-01T14:58:36.419352
        if task_start_time and task_end_time:
            task_start_time_dt = str_to_datetime(task_start_time)
            task_end_time_dt = str_to_datetime(task_end_time)
            result[f"{task.task_name}_overall_latency"] = (  # noqa
                task_end_time_dt - task_start_time_dt
            ).total_seconds()

        else:
            logger.error(f"Task {task.task_name} does not have start and end time")
            result[f"{task.task_name}_overall_latency"] = (
                model_latency + transfer_latency
            )

    # sort the key to be the same as the cluster order, also if missed, fill it with missing
    for task_name in task_names:
        if f"{task_name}_overall_latency" not in result:
            result[task_name + "_model_latency"] = "missing"
            result[task_name + "_transfer_latency"] = "missing"
            result[task_name + "_overall_latency"] = "missing"

    # total_latency should be the sum of all the overall_latency
    total_latency = 0
    for key, value in result.items():
        if key.endswith("overall_latency") and isinstance(value, float):
            total_latency += value
        elif key.endswith("overall_latency") and not isinstance(value, float):
            total_latency = "incomplete"
            break
    result["total_latency"] = total_latency
    # loop all value, get it to decimal 4
    for key, value in result.items():
        if isinstance(value, float):
            result[key] = round(value, 4)

    ordered_result = {
        "track_id": result["track_id"],
    }
    for task_name in task_names:
        ordered_result[task_name + "_model_latency"] = result[
            task_name + "_model_latency"
        ]
        ordered_result[task_name + "_transfer_latency"] = result[
            task_name + "_transfer_latency"
        ]
        ordered_result[task_name + "_overall_latency"] = result[
            task_name + "_overall_latency"
        ]

    ordered_result["total_latency"] = result["total_latency"]
    return ordered_result

process_task_group_detail_timeline(task_track, timeline=False) staticmethod

Based on the result_json => latency_profile We will gather the time point for each, and then change to the relative second value compared to start point

If timeline is True, we will only grab the timestamp information. Otherwise, we will calculate the relative time to the start point

In the end, we will grab the Args: task_track (List[Task]): The task track timeline (bool): If we want to plot the timeline

Returns:

Source code in API/orchestrator/metrics/latency_benchmark.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
@staticmethod
def process_task_group_detail_timeline(
    task_track: List[Task], timeline: bool = False
):
    """
    Based on the result_json => latency_profile
    We will gather the time point for each, and then change to the relative second value compared to start point

    If timeline is True, we will only grab the timestamp information.
    Otherwise, we will calculate the relative time to the start point

    In the end, we will grab the
    Args:
        task_track (List[Task]): The task track
        timeline (bool): If we want to plot the timeline

    Returns:

    """
    result = {
        "track_id": task_track[0].track_id,
    }

    task_names = get_task_names_order(result["track_id"])

    task_results = {}
    for task in task_track:
        if task.result_status != "completed":
            continue
        latency_profile = task.result_json.get("latency_profile", {})
        task_result = {}
        for key, value in latency_profile.items():
            if key.startswith("ts"):
                task_result[key] = str_to_datetime(value)

        if timeline is False:
            # sort out the whole task_result based on time timestamp
            # and then calculate the relative time to the previous component
            sorted_task_result = dict(
                sorted(task_result.items(), key=lambda item: item[1])
            )
            previous_time = None
            task_relative_time = {}
            for key, value in sorted_task_result.items():
                if previous_time is None:
                    task_relative_time[key] = 0
                else:
                    task_relative_time[key] = (
                        value - previous_time
                    ).total_seconds()
                previous_time = value
            task_results[task.task_name] = task_relative_time
        else:
            task_results[task.task_name] = task_result

    # sort the key to be the same as the cluster order, calculate the value to add up the previous component
    first_start_task = None
    for task_name in task_names:
        if task_name not in task_results:
            break
        for key, value in task_results[task_name].items():
            new_key = f"{task_name}_{key.split('_', 1)[1]}"
            if key == "ts_start_task":
                if first_start_task is None:
                    first_start_task = value
                else:
                    continue
            if new_key not in result:
                result[new_key] = value

    return result

run()

Run the benchmark

Source code in API/orchestrator/metrics/latency_benchmark.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def run(self):
    """
    Run the benchmark
    """
    html_content = ""
    if self.benchmark_cluster == "all":
        for cluster_name in CLUSTERS.keys():
            # add a divider
            html_content += "<hr>"
            html_content += self.process_cluster(cluster_name)
    else:
        if self.benchmark_cluster not in CLUSTERS:
            raise ValueError(f"Cluster {self.benchmark_cluster} not found")
        html_content += "<hr>"
        html_content += self.process_cluster(self.benchmark_cluster)
    return html_content