Skip to content

Utils

extract_task_group(cluster_name)

Extract the task group Args: cluster_name (str): The cluster name

Returns:

Source code in API/orchestrator/metrics/utils.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def extract_task_group(
    cluster_name: str,
) -> Tuple[Dict[str, List[Task]], int, List[Task]]:
    """
    Extract the task group
    Args:
        cluster_name (str): The cluster name

    Returns:

    """
    cluster = CLUSTERS.get(cluster_name, None)
    if cluster is None:
        raise ValueError(f"Cluster {cluster_name} not found")

    required_tasks = [
        item for item in cluster.values() if item["component_type"] == "task"
    ]
    required_tasks_count = len(required_tasks)
    logger.info(f"Cluster: {cluster_name}, Required tasks: {required_tasks_count}")
    # get all related tasks, the track_id is like T_{cluster_name}_XXX
    tasks = Task.objects.filter(track_id__startswith=f"T-{cluster_name}-")
    logger.info(f"Cluster: {cluster_name}, Total tasks: {len(tasks)}")
    # group the tasks by the track_id
    task_groups = {}
    for task in tasks:
        track_id = task.track_id
        if track_id not in task_groups:
            task_groups[track_id] = []
        task_groups[track_id].append(task)

    # sort the task groups by the first task created time
    task_groups = dict(
        sorted(
            task_groups.items(),
            key=lambda x: x[1][0].created_at if len(x[1]) > 0 else None,
        )
    )
    return task_groups, required_tasks_count, tasks

get_task_names_order(track_id)

Get the task names order Args: track_id (str): The track ID

Returns:

Name Type Description
str List[str]

The task names order

Source code in API/orchestrator/metrics/utils.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def get_task_names_order(track_id: str) -> List[str]:
    """
    Get the task names order
    Args:
        track_id (str): The track ID

    Returns:
        str: The task names order

    """
    cluster_name = track_id.split("-")[1]
    cluster = CLUSTERS.get(cluster_name)
    task_name_order = [
        item for item in cluster.values() if item["component_type"] == "task"
    ]
    task_name_order = sorted(task_name_order, key=lambda x: x["order"])
    task_names = [item["task_name"] for item in task_name_order]
    return task_names

str_to_datetime(datetime_str)

Convert the datetime string to datetime object Args: datetime_str (str): the string datetime, like this: 2024-07-01T14:58:36.419352

Returns:

Name Type Description
datetime datetime

The datetime object

Source code in API/orchestrator/metrics/utils.py
72
73
74
75
76
77
78
79
80
81
def str_to_datetime(datetime_str: str) -> datetime:
    """
    Convert the datetime string to datetime object
    Args:
        datetime_str (str): the string datetime, like this: 2024-07-01T14:58:36.419352

    Returns:
        datetime: The datetime object
    """
    return datetime.strptime(datetime_str, "%Y-%m-%dT%H:%M:%S.%f")