Skip to content

Mock

DataMock

We will first extract the audio and video from the video file. And then treat it as current time + any time in the future.

Then save them into the data folder as other did

  • audio
    • /audio/uuid/0-datetime.wav
  • video
    • /videos/uuid/datetime.mp4
    • /video/uuid/frames/date-time/xx.jpg

For the mock US-Election debate It is: - 02:53,3:20,20:20,20:39,33:38,34:18,55:15,55:40,80:05,80:18

Source code in Client/Listener/mock/data_extraction.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class DataMock:
    """
    We will first extract the audio and video from the video file.
    And then treat it as current time + any time in the future.

    Then save them into the data folder as other did

    - audio
        - /audio/uuid/0-datetime.wav
    - video
        - /videos/uuid/datetime.mp4
        - /video/uuid/frames/date-time/xx.jpg


    For the mock US-Election debate
    It is:
    - 02:53,3:20,20:20,20:39,33:38,34:18,55:15,55:40,80:05,80:18

    """

    def __init__(
        self,
        api_domain: str,
        token: str,
        home_id: Optional[int],
        track_cluster: str = None,
    ):
        self.api = API(
            domain=api_domain, token=token, home_id=home_id, track_cluster=track_cluster
        )
        self.uid = str(uuid4())

        self.audio_dir = DATA_DIR / "audio" / self.uid
        self.audio_dir.mkdir(parents=True, exist_ok=True)
        self.video_dir = DATA_DIR / "videos" / self.uid
        self.frames_dir = self.video_dir / "frames"
        self.frames_dir.mkdir(parents=True, exist_ok=True)
        self.mock_dir = DATA_DIR / "mock" / "output"
        self.mock_dir.mkdir(parents=True, exist_ok=True)

        self.current_time = datetime.now()

    def replay(self, time_ranges: List[Tuple[int, int]], input_video_path: str):
        """
        Replays the audio and video from the specified time
        Args:
            time_ranges (List[int, int]): List of time ranges in seconds.
            input_video_path (str): Path to the input video file.

        Returns:

        """
        for index, time_range in enumerate(time_ranges):
            start_second, end_second = time_range
            start_time = self.current_time + timedelta(seconds=start_second)
            end_time = self.current_time + timedelta(seconds=end_second)

            self.extract_audio_and_video(
                input_video_path=input_video_path,
                start_second=start_second,
                end_second=end_second,
                start_time=start_time,
                end_time=end_time,
                output_audio_path=self.audio_dir
                / f"{index}-{end_time.strftime('%Y%m%d%H%M%S')}.wav",
            )

            track_id = self.api.queue_speech_to_text(
                uid=self.uid,
                audio_index=str(index),
                start_time=start_time,
                end_time=end_time,
            )
            self.api.post_audio(
                uid=self.uid,
                sequence_index=index,
                audio_file=f"{index}-{end_time.strftime('%Y%m%d%H%M%S')}.wav",
                start_time=start_time,
                end_time=end_time,
                track_id=track_id,
            )

    def extract_audio_and_video(
        self,
        input_video_path: str,
        start_second: int,
        end_second: int,
        start_time: datetime,
        end_time: datetime,
        output_audio_path: str,
    ):
        """
        Extracts the audio and video from a specified segment of a video file.

        Args:
            input_video_path (str): Path to the input video file.
            start_second (int): Start time in seconds.
            end_second (int): End time in seconds.
            output_audio_path (str): Path to save the extracted audio file.
        """
        output_video_path = (
            self.mock_dir
            / f"{input_video_path.split('/')[-1]}-{start_second}-{end_second}.mp4"
        ).as_posix()
        # Load the video file
        video_clip = VideoFileClip(input_video_path)

        # Cut the video clip from start_time to end_time
        sub_clip = video_clip.subclip(start_second, end_second)

        # Write the video clip to the output path
        sub_clip.write_videofile(output_video_path, codec="libx264", audio_codec="aac")

        # Extract the audio from the sub clip
        audio_clip = sub_clip.audio

        # Write the audio clip to the output path
        audio_clip.write_audiofile(output_audio_path)

        # Close the clips
        # video_clip.close()
        sub_clip.close()
        audio_clip.close()
        video_clip.close()
        # then I want ot split the video by minutes, each minute will have 1 mp4 file
        # and the frames
        start_minute = start_time.replace(second=0, microsecond=0)
        end_minute = end_time.replace(second=0, microsecond=0) + timedelta(minutes=1)

        for i in range((end_minute - start_minute).seconds // 60):
            logger.info(f"Processing minute {i}")
            video_clip = VideoFileClip(input_video_path)
            the_minute_start_time = start_minute + timedelta(minutes=i)
            the_minute_end_time = start_minute + timedelta(minutes=i + 1)
            the_minute_output_video_path = (
                Path(self.video_dir)
                / (the_minute_start_time.strftime("%Y-%m-%d_%H-%M-%S") + ".mp4")
            ).as_posix()
            # recover the seconds range for each minute
            the_minute_start_second = (
                the_minute_start_time - self.current_time
            ).seconds
            the_minute_end_second = (the_minute_end_time - self.current_time).seconds
            logger.info(f"{the_minute_start_second}-{the_minute_end_second}")
            minute_clip = video_clip.subclip(
                the_minute_start_second, the_minute_end_second
            )
            minute_clip.write_videofile(
                the_minute_output_video_path, codec="libx264", audio_codec="aac"
            )
            minute_clip.close()

            # frames_folder
            frames_folder = self.frames_dir / the_minute_start_time.strftime(
                "%Y-%m-%d_%H-%M"
            )
            frames_folder.mkdir(parents=True, exist_ok=True)
            self.split_video_in_minutes(
                the_minute_output_video_path, frames_folder.as_posix()
            )
            self.api.post_video(
                self.uid,
                the_minute_output_video_path.split("/")[-1],
                start_time=the_minute_start_time,
                end_time=the_minute_end_time,
            )

            video_clip.close()

    @staticmethod
    def split_video_in_minutes(video_path, output_folder, fps=1):
        """
        Splits a video into images.

        Args:
            video_path (str): Path to the video file.
            output_folder (str): Folder to save the extracted images.
            fps (int): Frames per second to extract. Defaults to 1.
        """
        # Load the video file
        the_video_clip = VideoFileClip(video_path)

        # Ensure the output folder exists
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)

        # Extract frames
        for i, frame in enumerate(the_video_clip.iter_frames(fps=fps)):
            # Save each frame as an image
            frame_path = os.path.join(output_folder, f"{i}.png")
            imageio.imwrite(frame_path, frame)

        # Close the video clip
        the_video_clip.close()

extract_audio_and_video(input_video_path, start_second, end_second, start_time, end_time, output_audio_path)

Extracts the audio and video from a specified segment of a video file.

Parameters:

Name Type Description Default
input_video_path str

Path to the input video file.

required
start_second int

Start time in seconds.

required
end_second int

End time in seconds.

required
output_audio_path str

Path to save the extracted audio file.

required
Source code in Client/Listener/mock/data_extraction.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def extract_audio_and_video(
    self,
    input_video_path: str,
    start_second: int,
    end_second: int,
    start_time: datetime,
    end_time: datetime,
    output_audio_path: str,
):
    """
    Extracts the audio and video from a specified segment of a video file.

    Args:
        input_video_path (str): Path to the input video file.
        start_second (int): Start time in seconds.
        end_second (int): End time in seconds.
        output_audio_path (str): Path to save the extracted audio file.
    """
    output_video_path = (
        self.mock_dir
        / f"{input_video_path.split('/')[-1]}-{start_second}-{end_second}.mp4"
    ).as_posix()
    # Load the video file
    video_clip = VideoFileClip(input_video_path)

    # Cut the video clip from start_time to end_time
    sub_clip = video_clip.subclip(start_second, end_second)

    # Write the video clip to the output path
    sub_clip.write_videofile(output_video_path, codec="libx264", audio_codec="aac")

    # Extract the audio from the sub clip
    audio_clip = sub_clip.audio

    # Write the audio clip to the output path
    audio_clip.write_audiofile(output_audio_path)

    # Close the clips
    # video_clip.close()
    sub_clip.close()
    audio_clip.close()
    video_clip.close()
    # then I want ot split the video by minutes, each minute will have 1 mp4 file
    # and the frames
    start_minute = start_time.replace(second=0, microsecond=0)
    end_minute = end_time.replace(second=0, microsecond=0) + timedelta(minutes=1)

    for i in range((end_minute - start_minute).seconds // 60):
        logger.info(f"Processing minute {i}")
        video_clip = VideoFileClip(input_video_path)
        the_minute_start_time = start_minute + timedelta(minutes=i)
        the_minute_end_time = start_minute + timedelta(minutes=i + 1)
        the_minute_output_video_path = (
            Path(self.video_dir)
            / (the_minute_start_time.strftime("%Y-%m-%d_%H-%M-%S") + ".mp4")
        ).as_posix()
        # recover the seconds range for each minute
        the_minute_start_second = (
            the_minute_start_time - self.current_time
        ).seconds
        the_minute_end_second = (the_minute_end_time - self.current_time).seconds
        logger.info(f"{the_minute_start_second}-{the_minute_end_second}")
        minute_clip = video_clip.subclip(
            the_minute_start_second, the_minute_end_second
        )
        minute_clip.write_videofile(
            the_minute_output_video_path, codec="libx264", audio_codec="aac"
        )
        minute_clip.close()

        # frames_folder
        frames_folder = self.frames_dir / the_minute_start_time.strftime(
            "%Y-%m-%d_%H-%M"
        )
        frames_folder.mkdir(parents=True, exist_ok=True)
        self.split_video_in_minutes(
            the_minute_output_video_path, frames_folder.as_posix()
        )
        self.api.post_video(
            self.uid,
            the_minute_output_video_path.split("/")[-1],
            start_time=the_minute_start_time,
            end_time=the_minute_end_time,
        )

        video_clip.close()

replay(time_ranges, input_video_path)

Replays the audio and video from the specified time Args: time_ranges (List[int, int]): List of time ranges in seconds. input_video_path (str): Path to the input video file.

Returns:

Source code in Client/Listener/mock/data_extraction.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def replay(self, time_ranges: List[Tuple[int, int]], input_video_path: str):
    """
    Replays the audio and video from the specified time
    Args:
        time_ranges (List[int, int]): List of time ranges in seconds.
        input_video_path (str): Path to the input video file.

    Returns:

    """
    for index, time_range in enumerate(time_ranges):
        start_second, end_second = time_range
        start_time = self.current_time + timedelta(seconds=start_second)
        end_time = self.current_time + timedelta(seconds=end_second)

        self.extract_audio_and_video(
            input_video_path=input_video_path,
            start_second=start_second,
            end_second=end_second,
            start_time=start_time,
            end_time=end_time,
            output_audio_path=self.audio_dir
            / f"{index}-{end_time.strftime('%Y%m%d%H%M%S')}.wav",
        )

        track_id = self.api.queue_speech_to_text(
            uid=self.uid,
            audio_index=str(index),
            start_time=start_time,
            end_time=end_time,
        )
        self.api.post_audio(
            uid=self.uid,
            sequence_index=index,
            audio_file=f"{index}-{end_time.strftime('%Y%m%d%H%M%S')}.wav",
            start_time=start_time,
            end_time=end_time,
            track_id=track_id,
        )

split_video_in_minutes(video_path, output_folder, fps=1) staticmethod

Splits a video into images.

Parameters:

Name Type Description Default
video_path str

Path to the video file.

required
output_folder str

Folder to save the extracted images.

required
fps int

Frames per second to extract. Defaults to 1.

1
Source code in Client/Listener/mock/data_extraction.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
@staticmethod
def split_video_in_minutes(video_path, output_folder, fps=1):
    """
    Splits a video into images.

    Args:
        video_path (str): Path to the video file.
        output_folder (str): Folder to save the extracted images.
        fps (int): Frames per second to extract. Defaults to 1.
    """
    # Load the video file
    the_video_clip = VideoFileClip(video_path)

    # Ensure the output folder exists
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Extract frames
    for i, frame in enumerate(the_video_clip.iter_frames(fps=fps)):
        # Save each frame as an image
        frame_path = os.path.join(output_folder, f"{i}.png")
        imageio.imwrite(frame_path, frame)

    # Close the video clip
    the_video_clip.close()