project / easy_sync.py
Rejekts's picture
Update easy_sync.py
e62dba9 verified
raw
history blame
2.36 kB
import subprocess, time, threading
from typing import List, Union
class Channel:
def __init__(self,source,destination,sync_deletions=False,sync_time=60,exclude: Union[str, List, None] = None):#What args to take
self.source = source
self.destination = destination
self.stop = threading.Event()
self.syncing_thread = threading.Thread(target=self._sync,args=())
self.sync_deletions = sync_deletions
self.sync_time = sync_time
if not exclude:
exclude = []
if isinstance(exclude,str):
exclude = [exclude]
self.exclude = exclude
self.command = ['rsync','-aP']
def alive(self):#Check if the thread is alive
if self.syncing_thread.is_alive():
return True
else:
return False
def _sync(self):#Sync constantly
command = self.command
for exclusion in self.exclude:
command.append(f'--exclude={exclusion}')
command.extend([f'{self.source}/',f'{self.destination}/'])
if self.sync_deletions:
command.append('--delete')
while not self.stop.is_set():
subprocess.run(command)
time.sleep(self.sync_time)
def copy(self):#Sync once
command = self.command
for exclusion in self.exclude:
command.append(f'--exclude={exclusion}')
command.extend([f'{self.source}/',f'{self.destination}/'])
if self.sync_deletions:
command.append('--delete')
subprocess.run(command)
return True
def open(self):#Handle threads
if self.syncing_thread.is_alive():#Check if it's running
self.stop.set()
self.syncing_thread.join()
if self.stop.is_set():
self.stop.clear()
if self.syncing_thread._started.is_set():#If it has been started before
self.syncing_thread = threading.Thread(target=self._sync,args=())#Create a FRESH thread
self.syncing_thread.start()#Start the thread
return self.alive()
def close(self):#Stop the thread and close the process
if self.alive():
self.stop.set()
self.syncing_thread.join()
while self.alive():
if not self.alive():
break
return not self.alive()