Skip to content

storage

This is the storage module.

It will include two process

  • One is to pull data down
  • Another is upload data

StorageSolution

Source code in Agent/storage.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
class StorageSolution:
    def __init__(
            self,
            api_domain: str,
            token: str,
            input_source_dir: str = None,
            output_dest_dir: str = None,
            dest_password: str = None,
    ):
        self.api_domain = api_domain
        self.token = token
        self.api = API(domain=api_domain, token=token)
        self.storage_solution = self.api.get_storage_solution()
        self.input_source_dir = input_source_dir
        self.output_dest_dir = output_dest_dir
        self.dest_password = dest_password

    def sync_push_data(self):
        """
        Sync the data to the storage
        """
        if self.storage_solution == "volume":
            return
        if self.storage_solution == "s3":
            self.sync_push_s3()
        if self.storage_solution == "local":
            self.sync_push_local()
        if self.storage_solution == "api":
            self.sync_push_api()

    def sync_push_local(self):
        """
        Sync the data to the local network
        """
        observer = Observer()
        local_handler = LocalSyncHandler(
            src_path=str(DATA_DIR / "tts"),
            dest_path=self.output_dest_dir,
            sshpass=self.dest_password,
        )
        observer.schedule(local_handler, str(DATA_DIR / "tts"), recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

    @staticmethod
    def sync_push_s3():
        """
        Sync the data to the s3
        """
        observer = Observer()
        s3_handler = S3SyncHandler(s3_client=boto3.client("s3"))
        observer.schedule(s3_handler, str(DATA_DIR / "tts"), recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

    def sync_push_api(self):
        """
        Sync the data to the api
        """
        observer = Observer()
        api_handler = APISyncHandler(self.api)
        logger.info(str(DATA_DIR / "tts"))
        observer.schedule(api_handler, str(DATA_DIR / "tts"), recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

    def sync_pull_data(self):
        """
        If storage solution is volume or local, this means the data is accessible locally, do not need to worry about it
        This will first call cloud to list all audio and video files
        And then compare them with local ones
        If there is any new files, download them

        Returns:

        """
        if self.storage_solution == "volume":
            return
        if self.storage_solution == "local":
            self.sync_pull_local()
        if self.storage_solution == "s3":
            self.sync_pull_s3()
        if self.storage_solution == "api":
            self.sync_pull_api()

    def sync_pull_local(self):
        """
        Sync the data from the local network
        directly run the rsync command
        """
        while True:
            os.system(
                "sshpass -p {} rsync -avz {} {}".format(self.dest_password, self.input_source_dir,
                                                        str(CLIENT_DATA_FOLDER))
            )
            time.sleep(1)

    def sync_pull_s3(self):
        """
        Sync the data from s3
        """
        pass

    def sync_pull_api(self):
        """
        Sync the data from api
        """
        from_time = None
        while True:
            try:
                logger.info(f"Syncing data from {from_time}")
                files = self.api.list_files(from_time=from_time)
                # set from time to now for the next sync in timestamp format
                from_time = time.time()
                self.download_data(files)
            except Exception as e:
                logger.error(f"Error syncing data: {e}")
                logger.exception(e)
            time.sleep(1)

    def download_data(self, files):
        """
        Download the data from the cloud
        Args:
            files:

        Returns:

        """
        audio_files = files.get("audio_files", [])
        video_files = files.get("video_files", [])
        logger.info(
            f"Checking {len(audio_files)} audio files and {len(video_files)} video files"
        )
        for audio_file in audio_files:
            dest_path = (
                    CLIENT_DATA_FOLDER
                    / "audio"
                    / audio_file["uid"]
                    / audio_file["audio_file"]
            )
            if not dest_path.exists():
                # TODO: do the download here
                logger.info(f"Downloading {audio_file['audio_file']} to {dest_path}")
                dest_path.parent.mkdir(parents=True, exist_ok=True)
                self.download_audio(audio_file["id"], dest_path)
        for video_file in video_files:
            dest_path = (
                    CLIENT_DATA_FOLDER
                    / "videos"
                    / video_file["uid"]
                    / video_file["video_file"]
            )
            if not dest_path.exists():
                # TODO: do the download here
                logger.info(f"Downloading {video_file['video_file']} to {dest_path}")
                dest_path.parent.mkdir(parents=True, exist_ok=True)
                self.download_video(video_file["id"], dest_path)

    def download_audio(self, audio_file_id, dest_path: Path):
        """
        Download the audio file
        Args:
            audio_file_id (str): the audio file id
            dest_path (str): the destination

        Returns:

        """
        link_json = self.api.download_file_link(audio_file_id, "audio")
        audio_url = link_json.get("audio_url", None)
        if audio_url is None:
            return

        try:
            r = requests.get(audio_url, stream=True)

            if r.status_code != 404:
                with open(dest_path, "wb") as f:
                    for chunk in r.iter_content(chunk_size=1024):
                        if chunk:
                            f.write(chunk)
            else:
                logger.error(f"Error downloading audio file: {audio_url}, NOT FOUND")
        except Exception as e:
            logger.error(f"Error downloading audio file: {e}")

    def download_video(self, video_file_id, dest_path: Path):
        """
        Download the video file
        Args:
            video_file_id (str): the video file id
            dest_path (str): the destination

        Returns:

        """
        link_json = self.api.download_file_link(video_file_id, "video")
        video_url = link_json.get("video_url", None)
        frames = link_json.get("frames", None)
        logger.info(f"video_url: {video_url}, frames: {frames}")
        if video_url is not None:
            try:
                r = requests.get(video_url, stream=True)
                if r.status_code != 404:
                    with open(dest_path, "wb") as f:
                        for chunk in r.iter_content(chunk_size=1024):
                            if chunk:
                                f.write(chunk)
                else:
                    logger.error(
                        f"Error downloading video file: {video_url}, NOT FOUND"
                    )
            except Exception as e:
                logger.error(f"Error downloading video file: {e}")

        for frame_url in frames:
            # rsplit from the third /, get the last part

            frame_path = dest_path.parent / "frames" / frame_url.rsplit("/", 3)[-1]
            logger.info(f"Downloading frame {frame_url} to {frame_path}")
            if frame_path.exists():
                continue
            try:
                r = requests.get(frame_url, stream=True)
                with open(frame_path, "wb") as f:
                    for chunk in r.iter_content(chunk_size=1024):
                        if chunk:
                            f.write(chunk)
            except Exception as e:
                logger.error(f"Error downloading frame file: {e}")

download_audio(audio_file_id, dest_path)

Download the audio file Args: audio_file_id (str): the audio file id dest_path (str): the destination

Returns:

Source code in Agent/storage.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def download_audio(self, audio_file_id, dest_path: Path):
    """
    Download the audio file
    Args:
        audio_file_id (str): the audio file id
        dest_path (str): the destination

    Returns:

    """
    link_json = self.api.download_file_link(audio_file_id, "audio")
    audio_url = link_json.get("audio_url", None)
    if audio_url is None:
        return

    try:
        r = requests.get(audio_url, stream=True)

        if r.status_code != 404:
            with open(dest_path, "wb") as f:
                for chunk in r.iter_content(chunk_size=1024):
                    if chunk:
                        f.write(chunk)
        else:
            logger.error(f"Error downloading audio file: {audio_url}, NOT FOUND")
    except Exception as e:
        logger.error(f"Error downloading audio file: {e}")

download_data(files)

Download the data from the cloud Args: files:

Returns:

Source code in Agent/storage.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def download_data(self, files):
    """
    Download the data from the cloud
    Args:
        files:

    Returns:

    """
    audio_files = files.get("audio_files", [])
    video_files = files.get("video_files", [])
    logger.info(
        f"Checking {len(audio_files)} audio files and {len(video_files)} video files"
    )
    for audio_file in audio_files:
        dest_path = (
                CLIENT_DATA_FOLDER
                / "audio"
                / audio_file["uid"]
                / audio_file["audio_file"]
        )
        if not dest_path.exists():
            # TODO: do the download here
            logger.info(f"Downloading {audio_file['audio_file']} to {dest_path}")
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            self.download_audio(audio_file["id"], dest_path)
    for video_file in video_files:
        dest_path = (
                CLIENT_DATA_FOLDER
                / "videos"
                / video_file["uid"]
                / video_file["video_file"]
        )
        if not dest_path.exists():
            # TODO: do the download here
            logger.info(f"Downloading {video_file['video_file']} to {dest_path}")
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            self.download_video(video_file["id"], dest_path)

download_video(video_file_id, dest_path)

Download the video file Args: video_file_id (str): the video file id dest_path (str): the destination

Returns:

Source code in Agent/storage.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def download_video(self, video_file_id, dest_path: Path):
    """
    Download the video file
    Args:
        video_file_id (str): the video file id
        dest_path (str): the destination

    Returns:

    """
    link_json = self.api.download_file_link(video_file_id, "video")
    video_url = link_json.get("video_url", None)
    frames = link_json.get("frames", None)
    logger.info(f"video_url: {video_url}, frames: {frames}")
    if video_url is not None:
        try:
            r = requests.get(video_url, stream=True)
            if r.status_code != 404:
                with open(dest_path, "wb") as f:
                    for chunk in r.iter_content(chunk_size=1024):
                        if chunk:
                            f.write(chunk)
            else:
                logger.error(
                    f"Error downloading video file: {video_url}, NOT FOUND"
                )
        except Exception as e:
            logger.error(f"Error downloading video file: {e}")

    for frame_url in frames:
        # rsplit from the third /, get the last part

        frame_path = dest_path.parent / "frames" / frame_url.rsplit("/", 3)[-1]
        logger.info(f"Downloading frame {frame_url} to {frame_path}")
        if frame_path.exists():
            continue
        try:
            r = requests.get(frame_url, stream=True)
            with open(frame_path, "wb") as f:
                for chunk in r.iter_content(chunk_size=1024):
                    if chunk:
                        f.write(chunk)
        except Exception as e:
            logger.error(f"Error downloading frame file: {e}")

sync_pull_api()

Sync the data from api

Source code in Agent/storage.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def sync_pull_api(self):
    """
    Sync the data from api
    """
    from_time = None
    while True:
        try:
            logger.info(f"Syncing data from {from_time}")
            files = self.api.list_files(from_time=from_time)
            # set from time to now for the next sync in timestamp format
            from_time = time.time()
            self.download_data(files)
        except Exception as e:
            logger.error(f"Error syncing data: {e}")
            logger.exception(e)
        time.sleep(1)

sync_pull_data()

If storage solution is volume or local, this means the data is accessible locally, do not need to worry about it This will first call cloud to list all audio and video files And then compare them with local ones If there is any new files, download them

Returns:

Source code in Agent/storage.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def sync_pull_data(self):
    """
    If storage solution is volume or local, this means the data is accessible locally, do not need to worry about it
    This will first call cloud to list all audio and video files
    And then compare them with local ones
    If there is any new files, download them

    Returns:

    """
    if self.storage_solution == "volume":
        return
    if self.storage_solution == "local":
        self.sync_pull_local()
    if self.storage_solution == "s3":
        self.sync_pull_s3()
    if self.storage_solution == "api":
        self.sync_pull_api()

sync_pull_local()

Sync the data from the local network directly run the rsync command

Source code in Agent/storage.py
130
131
132
133
134
135
136
137
138
139
140
def sync_pull_local(self):
    """
    Sync the data from the local network
    directly run the rsync command
    """
    while True:
        os.system(
            "sshpass -p {} rsync -avz {} {}".format(self.dest_password, self.input_source_dir,
                                                    str(CLIENT_DATA_FOLDER))
        )
        time.sleep(1)

sync_pull_s3()

Sync the data from s3

Source code in Agent/storage.py
142
143
144
145
146
def sync_pull_s3(self):
    """
    Sync the data from s3
    """
    pass

sync_push_api()

Sync the data to the api

Source code in Agent/storage.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def sync_push_api(self):
    """
    Sync the data to the api
    """
    observer = Observer()
    api_handler = APISyncHandler(self.api)
    logger.info(str(DATA_DIR / "tts"))
    observer.schedule(api_handler, str(DATA_DIR / "tts"), recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

sync_push_data()

Sync the data to the storage

Source code in Agent/storage.py
47
48
49
50
51
52
53
54
55
56
57
58
def sync_push_data(self):
    """
    Sync the data to the storage
    """
    if self.storage_solution == "volume":
        return
    if self.storage_solution == "s3":
        self.sync_push_s3()
    if self.storage_solution == "local":
        self.sync_push_local()
    if self.storage_solution == "api":
        self.sync_push_api()

sync_push_local()

Sync the data to the local network

Source code in Agent/storage.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def sync_push_local(self):
    """
    Sync the data to the local network
    """
    observer = Observer()
    local_handler = LocalSyncHandler(
        src_path=str(DATA_DIR / "tts"),
        dest_path=self.output_dest_dir,
        sshpass=self.dest_password,
    )
    observer.schedule(local_handler, str(DATA_DIR / "tts"), recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

sync_push_s3() staticmethod

Sync the data to the s3

Source code in Agent/storage.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@staticmethod
def sync_push_s3():
    """
    Sync the data to the s3
    """
    observer = Observer()
    s3_handler = S3SyncHandler(s3_client=boto3.client("s3"))
    observer.schedule(s3_handler, str(DATA_DIR / "tts"), recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()