Skip to content

Storage

APISyncHandler

Bases: FileSystemEventHandler

Sync the files to s3 when they are created, modified, moved or deleted

Source code in Client/Listener/storage.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class APISyncHandler(FileSystemEventHandler):
    """
    Sync the files to s3 when they are created, modified, moved or deleted
    """

    def __init__(self, home_id: int, api: API):
        super().__init__()
        self.home_id = home_id
        self.api = api

    def on_any_event(self, event):
        if event.is_directory:
            return None

        elif event.event_type in ("created", "modified", "moved", "deleted"):
            # print(f"Event type: {event.event_type} - Path: {event.src_path}")
            # only process .avi and .wav files

            if event.src_path.split("/")[-1].split(".")[-1] not in [
                "mp4",
                "wav",
                "mp3",
                "jpg",
                "jpeg",
                "png",
            ]:
                return None
            try:
                self.api.upload_file(
                    event.src_path,
                    f"Listener/{event.src_path.split(DATA_DIR.as_posix())[1].strip('/')}",
                )
                logger.info(f"Uploaded file to server: {event.src_path}")
            except Exception as e:
                logger.error(f"Error uploading file to s3: {e}")

LocalSyncHandler

Bases: FileSystemEventHandler

Sync the files to disk when they are created, modified, moved or deleted

Source code in Client/Listener/storage.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class LocalSyncHandler(FileSystemEventHandler):
    """
    Sync the files to disk when they are created, modified, moved or deleted
    """

    def __init__(self, src_path: str, dest_path: str, sshpass: str):
        """

        Args:
            src_path (str): The source path to sync
            dest_path (str): The destination path to sync
            sshpass (str): The password to ssh
        """
        super().__init__()
        self.src_path = src_path
        self.dest_path = dest_path
        self.sshpass = sshpass

    def on_any_event(self, event):
        """
        Sync the files to disk when they are created, modified, moved or deleted
        Args:
            event:

        Returns:

        """
        if event.is_directory:
            return None
        else:
            if self.sshpass:
                subprocess.call(
                    [
                        "sshpass",
                        "-p",
                        self.sshpass,
                        "rsync",
                        "-avz",
                        "--delete",
                        self.src_path,
                        self.dest_path,
                    ]
                )
            else:
                # wer can set up the authentication first, then we can use the rsync command
                subprocess.call(
                    ["rsync", "-avz", "--delete", self.src_path, self.dest_path]
                )

__init__(src_path, dest_path, sshpass)

Parameters:

Name Type Description Default
src_path str

The source path to sync

required
dest_path str

The destination path to sync

required
sshpass str

The password to ssh

required
Source code in Client/Listener/storage.py
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, src_path: str, dest_path: str, sshpass: str):
    """

    Args:
        src_path (str): The source path to sync
        dest_path (str): The destination path to sync
        sshpass (str): The password to ssh
    """
    super().__init__()
    self.src_path = src_path
    self.dest_path = dest_path
    self.sshpass = sshpass

on_any_event(event)

Sync the files to disk when they are created, modified, moved or deleted Args: event:

Returns:

Source code in Client/Listener/storage.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def on_any_event(self, event):
    """
    Sync the files to disk when they are created, modified, moved or deleted
    Args:
        event:

    Returns:

    """
    if event.is_directory:
        return None
    else:
        if self.sshpass:
            subprocess.call(
                [
                    "sshpass",
                    "-p",
                    self.sshpass,
                    "rsync",
                    "-avz",
                    "--delete",
                    self.src_path,
                    self.dest_path,
                ]
            )
        else:
            # wer can set up the authentication first, then we can use the rsync command
            subprocess.call(
                ["rsync", "-avz", "--delete", self.src_path, self.dest_path]
            )

S3SyncHandler

Bases: FileSystemEventHandler

Sync the files to s3 when they are created, modified, moved or deleted

Source code in Client/Listener/storage.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class S3SyncHandler(FileSystemEventHandler):
    """
    Sync the files to s3 when they are created, modified, moved or deleted
    """

    def __init__(self, home_id: int, s3_client):
        super().__init__()
        self.home_id = home_id
        self.s3_client = s3_client

    def on_any_event(self, event):
        if event.is_directory:
            return None

        elif event.event_type in ("created", "modified", "moved", "deleted"):
            # print(f"Event type: {event.event_type} - Path: {event.src_path}")
            # only process .avi and .wav files

            if event.src_path.split("/")[-1].split(".")[-1] not in [
                "mp4",
                "wav",
                "mp3",
                "jpg",
                "jpeg",
                "png",
            ]:
                return None
            try:
                self.s3_client.upload_file(
                    event.src_path,
                    S3_BUCKET,
                    f"Listener/{event.src_path.split(DATA_DIR.as_posix())[1].strip('/')}",
                )
                logger.info(f"Uploaded file to s3: {event.src_path}")
                # logger.info(f"Listener/{event.src_path.split(DATA_DIR.as_posix())[1].strip('/')}")
            except Exception as e:
                logger.error(f"Error uploading file to s3: {e}")

StorageHandler

Source code in Client/Listener/storage.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class StorageHandler:
    def __init__(
        self,
        api_domain: str = "",
        token: str = "",
        home_id: int = None,
        dest_dir: Optional[str] = None,
        dest_password: Optional[str] = None,
    ):
        """
        Args:
            api_domain (str): the api domain
            token (str): the api token
            home_id (int): the home id
            dest_dir (str): the destination directory to sync, like
            dest_password (str): the destination password to sync
        """
        self.home_id = home_id
        self.dest_dir = dest_dir
        self.dest_password = dest_password
        self.api = API(domain=api_domain, token=token, home_id=home_id)
        self.storage_solution = self.api.get_storage_solution()

    def process(self):
        if self.storage_solution == STORAGE_SOLUTION_VOLUME:
            logger.info("No need to process files")
            return

        if self.storage_solution == STORAGE_SOLUTION_S3:
            self.process_s3()

        if self.storage_solution == STORAGE_SOLUTION_LOCAL:
            self.process_local_network()

        if self.storage_solution == STORAGE_SOLUTION_API:
            self.process_api()

    def process_s3(self):
        observer = Observer()
        s3_handler = S3SyncHandler(self.home_id, s3_client=boto3.client("s3"))
        observer.schedule(s3_handler, str(DATA_DIR), recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

    def process_local_network(self):
        observer = Observer()
        if not self.dest_dir:
            logger.error("dest_dir is required for local network sync")
            return
        local_handler = LocalSyncHandler(
            src_path=str(DATA_DIR),
            dest_path=self.dest_dir,
            sshpass=self.dest_password,
        )
        observer.schedule(local_handler, str(DATA_DIR), recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

    def process_api(self):
        observer = Observer()
        api_handler = APISyncHandler(self.home_id, self.api)
        observer.schedule(api_handler, str(DATA_DIR), recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

__init__(api_domain='', token='', home_id=None, dest_dir=None, dest_password=None)

Parameters:

Name Type Description Default
api_domain str

the api domain

''
token str

the api token

''
home_id int

the home id

None
dest_dir str

the destination directory to sync, like

None
dest_password str

the destination password to sync

None
Source code in Client/Listener/storage.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def __init__(
    self,
    api_domain: str = "",
    token: str = "",
    home_id: int = None,
    dest_dir: Optional[str] = None,
    dest_password: Optional[str] = None,
):
    """
    Args:
        api_domain (str): the api domain
        token (str): the api token
        home_id (int): the home id
        dest_dir (str): the destination directory to sync, like
        dest_password (str): the destination password to sync
    """
    self.home_id = home_id
    self.dest_dir = dest_dir
    self.dest_password = dest_password
    self.api = API(domain=api_domain, token=token, home_id=home_id)
    self.storage_solution = self.api.get_storage_solution()