import subprocess, time, threading from typing import List, Union class Channel: def __init__(self,source,destination,sync_deletions=False,sync_time=60,exclude: Union[str, List, None] = None):#What args to take self.source = source self.destination = destination self.stop = threading.Event() self.syncing_thread = threading.Thread(target=self._sync,args=()) self.sync_deletions = sync_deletions self.sync_time = sync_time if not exclude: exclude = [] if isinstance(exclude,str): exclude = [exclude] self.exclude = exclude self.command = ['rsync','-aP'] def alive(self):#Check if the thread is alive if self.syncing_thread.is_alive(): return True else: return False def _sync(self):#Sync constantly command = self.command for exclusion in self.exclude: command.append(f'--exclude={exclusion}') command.extend([f'{self.source}/',f'{self.destination}/']) if self.sync_deletions: command.append('--delete') while not self.stop.is_set(): subprocess.run(command) time.sleep(self.sync_time) def copy(self):#Sync once command = self.command for exclusion in self.exclude: command.append(f'--exclude={exclusion}') command.extend([f'{self.source}/',f'{self.destination}/']) if self.sync_deletions: command.append('--delete') subprocess.run(command) return True def open(self):#Handle threads if self.syncing_thread.is_alive():#Check if it's running self.stop.set() self.syncing_thread.join() if self.stop.is_set(): self.stop.clear() if self.syncing_thread._started.is_set():#If it has been started before self.syncing_thread = threading.Thread(target=self._sync,args=())#Create a FRESH thread self.syncing_thread.start()#Start the thread return self.alive() def close(self):#Stop the thread and close the process if self.alive(): self.stop.set() self.syncing_thread.join() while self.alive(): if not self.alive(): break return not self.alive()