|  |  | 
					
						
						|  | import os | 
					
						
						|  | import sys | 
					
						
						|  | import time | 
					
						
						|  | import requests | 
					
						
						|  | import json | 
					
						
						|  | import hashlib | 
					
						
						|  | import logging | 
					
						
						|  | import threading | 
					
						
						|  | from pathlib import Path | 
					
						
						|  | from concurrent.futures import ThreadPoolExecutor | 
					
						
						|  | from watchdog.observers import Observer | 
					
						
						|  | from watchdog.events import FileSystemEventHandler | 
					
						
						|  | from urllib.parse import unquote | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | logging.basicConfig( | 
					
						
						|  | level=logging.INFO, | 
					
						
						|  | format='%(asctime)s - %(levelname)s - %(message)s', | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | ) | 
					
						
						|  | logger = logging.getLogger(__name__) | 
					
						
						|  |  | 
					
						
						|  | class Config: | 
					
						
						|  | """配置管理类""" | 
					
						
						|  | def __init__(self): | 
					
						
						|  | self.data_dir = Path(os.getenv("DATA_DIR", "/app")) | 
					
						
						|  | self.api_key = os.getenv("API_AUTH_KEY") | 
					
						
						|  | self.api_endpoint = os.getenv("API_ENDPOINT") | 
					
						
						|  | self.sync_interval = int(os.getenv("SYNC_INTERVAL", "300")) | 
					
						
						|  | self.use_hash_check = os.getenv("USE_HASH_CHECK", "true").lower() == "true" | 
					
						
						|  | self.max_upload_workers = int(os.getenv("MAX_UPLOAD_WORKERS", "500")) | 
					
						
						|  | self.upload_retry_delay = int(os.getenv("UPLOAD_RETRY_DELAY", "60")) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | ignore_suffixes = os.getenv("IGNORE_SUFFIXES", ".db-journal,.tmp,.log,sqlite3-journal") | 
					
						
						|  | self.ignore_suffixes = [s.strip().lower() for s in ignore_suffixes.split(",") if s.strip()] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | only_suffixes = os.getenv("ONLY_SUFFIXES", ".db") | 
					
						
						|  | self.only_suffixes = [s.strip().lower() for s in only_suffixes.split(",") if s.strip()] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not all([self.api_key, self.api_endpoint]): | 
					
						
						|  | raise ValueError("必须设置API_AUTH_KEY和API_ENDPOINT环境变量") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if self.ignore_suffixes and self.only_suffixes: | 
					
						
						|  | logger.warning("同时设置了IGNORE_SUFFIXES和ONLY_SUFFIXES,将优先使用ONLY_SUFFIXES") | 
					
						
						|  |  | 
					
						
						|  | class SyncHandler(FileSystemEventHandler): | 
					
						
						|  | """文件系统事件处理器""" | 
					
						
						|  | def __init__(self, callback, config): | 
					
						
						|  | self.callback = callback | 
					
						
						|  | self.config = config | 
					
						
						|  |  | 
					
						
						|  | def should_sync(self, file_path): | 
					
						
						|  | """检查文件是否应该被同步""" | 
					
						
						|  | file_path = str(file_path).lower() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if self.config.only_suffixes: | 
					
						
						|  | return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if self.config.ignore_suffixes: | 
					
						
						|  | return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return True | 
					
						
						|  |  | 
					
						
						|  | def on_modified(self, event): | 
					
						
						|  | if not event.is_directory and self.should_sync(event.src_path): | 
					
						
						|  | self.callback(event.src_path) | 
					
						
						|  |  | 
					
						
						|  | def on_created(self, event): | 
					
						
						|  | if not event.is_directory and self.should_sync(event.src_path): | 
					
						
						|  | self.callback(event.src_path) | 
					
						
						|  |  | 
					
						
						|  | class R2Sync: | 
					
						
						|  | """R2存储同步器""" | 
					
						
						|  | def __init__(self, config): | 
					
						
						|  | self.config = config | 
					
						
						|  | self.file_states = {} | 
					
						
						|  | self.upload_queue = {} | 
					
						
						|  | self.remote_files_cache = None | 
					
						
						|  | self.cache_valid = False | 
					
						
						|  | self.executor = ThreadPoolExecutor(max_workers=self.config.max_upload_workers) | 
					
						
						|  | self.cache_lock = threading.Lock() | 
					
						
						|  | self.upload_queue_lock = threading.Lock() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.config.data_dir.mkdir(parents=True, exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  | def should_sync_file(self, file_path): | 
					
						
						|  | """检查文件是否应该被同步""" | 
					
						
						|  | file_path = str(file_path).lower() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if self.config.only_suffixes: | 
					
						
						|  | return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if self.config.ignore_suffixes: | 
					
						
						|  | return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return True | 
					
						
						|  |  | 
					
						
						|  | def r2_api_request(self, method, path, data=None, max_retries=3): | 
					
						
						|  | """发送API请求""" | 
					
						
						|  | url = f"{self.config.api_endpoint}/{path}" | 
					
						
						|  | headers = { | 
					
						
						|  | 'X-API-Key': self.config.api_key, | 
					
						
						|  | 'Content-Type': 'application/octet-stream' | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | for attempt in range(max_retries): | 
					
						
						|  | try: | 
					
						
						|  | if method == "GET": | 
					
						
						|  | resp = requests.get(url, headers=headers) | 
					
						
						|  | elif method == "POST": | 
					
						
						|  | resp = requests.post(url, data=data, headers=headers) | 
					
						
						|  | elif method == "PUT": | 
					
						
						|  | resp = requests.put(url, data=data, headers=headers) | 
					
						
						|  | elif method == "DELETE": | 
					
						
						|  | resp = requests.delete(url, headers=headers) | 
					
						
						|  |  | 
					
						
						|  | resp.raise_for_status() | 
					
						
						|  |  | 
					
						
						|  | if method == "GET": | 
					
						
						|  | return resp.content | 
					
						
						|  | elif method == "DELETE": | 
					
						
						|  | return True | 
					
						
						|  | return resp.json() | 
					
						
						|  |  | 
					
						
						|  | except requests.exceptions.RequestException as e: | 
					
						
						|  | if attempt == max_retries - 1: | 
					
						
						|  | logger.error(f"API请求最终失败: {e}") | 
					
						
						|  | return None | 
					
						
						|  | wait_time = (attempt + 1) * 2 | 
					
						
						|  | logger.warning(f"API请求失败(尝试 {attempt + 1}/{max_retries}), {wait_time}秒后重试...") | 
					
						
						|  | time.sleep(wait_time) | 
					
						
						|  |  | 
					
						
						|  | def get_file_hash(self, file_path): | 
					
						
						|  | """计算文件哈希值""" | 
					
						
						|  | hash_obj = hashlib.blake2b() | 
					
						
						|  | with open(file_path, 'rb') as f: | 
					
						
						|  | while chunk := f.read(8192): | 
					
						
						|  | hash_obj.update(chunk) | 
					
						
						|  | return hash_obj.hexdigest() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_remote_files(self, force_refresh=False): | 
					
						
						|  | """获取远程文件列表,使用缓存""" | 
					
						
						|  | with self.cache_lock: | 
					
						
						|  | if force_refresh or not self.cache_valid or self.remote_files_cache is None: | 
					
						
						|  | self.remote_files_cache = self._fetch_remote_files() | 
					
						
						|  | self.cache_valid = True | 
					
						
						|  | return self.remote_files_cache.copy() | 
					
						
						|  |  | 
					
						
						|  | def _fetch_remote_files(self): | 
					
						
						|  | """实际获取远程文件列表""" | 
					
						
						|  | resp = self.r2_api_request("GET", "list") | 
					
						
						|  | if not resp: | 
					
						
						|  | return None | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | raw_resp = resp.decode('utf-8').strip() | 
					
						
						|  | if raw_resp == "[]": | 
					
						
						|  | return [] | 
					
						
						|  |  | 
					
						
						|  | data = json.loads(raw_resp) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if isinstance(data, dict): | 
					
						
						|  | if 'objects' in data: | 
					
						
						|  | data = data['objects'] | 
					
						
						|  | else: | 
					
						
						|  | data = [data] | 
					
						
						|  | elif not isinstance(data, list): | 
					
						
						|  | data = [data] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for item in data: | 
					
						
						|  | if isinstance(item, dict) and 'key' in item: | 
					
						
						|  | try: | 
					
						
						|  | item['key'] = unquote(item['key']) | 
					
						
						|  | except: | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  | return data | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"解析远程文件列表出错: {e}") | 
					
						
						|  | return [] | 
					
						
						|  |  | 
					
						
						|  | def invalidate_cache(self): | 
					
						
						|  | """使缓存失效""" | 
					
						
						|  | with self.cache_lock: | 
					
						
						|  | self.cache_valid = False | 
					
						
						|  |  | 
					
						
						|  | def download_file(self, key, dest_path): | 
					
						
						|  | """下载文件""" | 
					
						
						|  | if not self.should_sync_file(dest_path): | 
					
						
						|  | logger.debug(f"忽略下载文件(不匹配同步规则): {key}") | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | path = f"download/{key}" | 
					
						
						|  | logger.debug(f"下载路径: {dest_path} (原始key: {key})") | 
					
						
						|  |  | 
					
						
						|  | resp = self.r2_api_request("GET", path) | 
					
						
						|  | if not resp: | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | dest_path.parent.mkdir(parents=True, exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if isinstance(resp, bytes): | 
					
						
						|  | content = resp | 
					
						
						|  | else: | 
					
						
						|  | try: | 
					
						
						|  | content = resp.decode('utf-8').encode('utf-8') | 
					
						
						|  | except UnicodeDecodeError: | 
					
						
						|  | content = resp | 
					
						
						|  |  | 
					
						
						|  | with open(dest_path, 'wb') as f: | 
					
						
						|  | f.write(content) | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"文件已保存到: {dest_path} (大小: {len(content)}字节)") | 
					
						
						|  | return True | 
					
						
						|  |  | 
					
						
						|  | def _upload_file_task(self, file_path, key): | 
					
						
						|  | """线程池中执行的上传任务""" | 
					
						
						|  | if not self.should_sync_file(file_path): | 
					
						
						|  | logger.debug(f"忽略上传文件(不匹配同步规则): {key}") | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | path = f"upload/{key}" | 
					
						
						|  | try: | 
					
						
						|  | with open(file_path, 'rb') as f: | 
					
						
						|  | result = self.r2_api_request("POST", path, f.read()) is not None | 
					
						
						|  | if result: | 
					
						
						|  | self.invalidate_cache() | 
					
						
						|  | return result | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"上传文件出错: {e}") | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | def upload_file(self, file_path, key): | 
					
						
						|  | """上传文件""" | 
					
						
						|  |  | 
					
						
						|  | future = self.executor.submit(self._upload_file_task, file_path, key) | 
					
						
						|  | return future.result() | 
					
						
						|  |  | 
					
						
						|  | def delete_file(self, key): | 
					
						
						|  | """删除文件并更新缓存""" | 
					
						
						|  | if not self.should_sync_file(key): | 
					
						
						|  | logger.debug(f"忽略删除文件(不匹配同步规则): {key}") | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"删除远程文件: {key}") | 
					
						
						|  | path = f"delete/{key}" | 
					
						
						|  | if not self.r2_api_request("DELETE", path): | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.invalidate_cache() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | remote_files = self.get_remote_files(force_refresh=True) | 
					
						
						|  | if remote_files is None: | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | return not any(file_info['key'] == key for file_info in remote_files) | 
					
						
						|  |  | 
					
						
						|  | def check_api_health(self): | 
					
						
						|  | """检查API健康状态""" | 
					
						
						|  | logger.info("检查API健康状态...") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | files = self.get_remote_files() | 
					
						
						|  | if files is None: | 
					
						
						|  | logger.error("列表API测试失败") | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | logger.info("API健康状态检查通过") | 
					
						
						|  | return True | 
					
						
						|  |  | 
					
						
						|  | def _download_and_update_state(self, file_info): | 
					
						
						|  | """线程任务:下载文件并更新状态""" | 
					
						
						|  | key = file_info['key'] | 
					
						
						|  | dest_path = self.config.data_dir / key | 
					
						
						|  |  | 
					
						
						|  | if not self.should_sync_file(dest_path): | 
					
						
						|  | logger.debug(f"忽略下载文件(不匹配同步规则): {key}") | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"下载: {key} -> {dest_path}") | 
					
						
						|  | if self.download_file(key, dest_path): | 
					
						
						|  |  | 
					
						
						|  | stat = dest_path.stat() | 
					
						
						|  | file_state = (stat.st_size, stat.st_mtime) | 
					
						
						|  | if self.config.use_hash_check: | 
					
						
						|  | file_state += (self.get_file_hash(dest_path),) | 
					
						
						|  | with self.cache_lock: | 
					
						
						|  | self.file_states[key] = file_state | 
					
						
						|  |  | 
					
						
						|  | def init_sync(self): | 
					
						
						|  | """初始化同步""" | 
					
						
						|  | logger.info("初始化数据目录...") | 
					
						
						|  |  | 
					
						
						|  | if not self.check_api_health(): | 
					
						
						|  | logger.error("API检查失败,请检查配置和网络连接") | 
					
						
						|  | sys.exit(1) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | remote_files = self.get_remote_files() | 
					
						
						|  | if remote_files is None: | 
					
						
						|  | logger.error("获取远程文件列表失败") | 
					
						
						|  | sys.exit(1) | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"找到 {len(remote_files)} 个远程文件") | 
					
						
						|  | if not remote_files: | 
					
						
						|  | logger.info("远程存储桶为空,无需同步") | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  | logger.info("开始并行同步远程文件...") | 
					
						
						|  |  | 
					
						
						|  | futures = [] | 
					
						
						|  | for file_info in remote_files: | 
					
						
						|  | futures.append(self.executor.submit(self._download_and_update_state, file_info)) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for future in futures: | 
					
						
						|  | try: | 
					
						
						|  | future.result() | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"文件下载任务出错: {e}") | 
					
						
						|  |  | 
					
						
						|  | def handle_file_change(self, file_path): | 
					
						
						|  | """处理文件变化事件,加入上传队列并设置定时检查""" | 
					
						
						|  | try: | 
					
						
						|  | if not self.should_sync_file(file_path): | 
					
						
						|  | logger.debug(f"忽略文件变化(不匹配同步规则): {file_path}") | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  | rel_path = Path(file_path).relative_to(self.config.data_dir) | 
					
						
						|  | key = str(rel_path).replace('\\', '/') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | stat = os.stat(file_path) | 
					
						
						|  | current_mtime = stat.st_mtime | 
					
						
						|  |  | 
					
						
						|  | with self.upload_queue_lock: | 
					
						
						|  |  | 
					
						
						|  | if file_path in self.upload_queue: | 
					
						
						|  | _, _, timer = self.upload_queue[file_path] | 
					
						
						|  | timer.cancel() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | timer = threading.Timer( | 
					
						
						|  | self.config.upload_retry_delay, | 
					
						
						|  | self._process_upload_queue, | 
					
						
						|  | args=(file_path,) | 
					
						
						|  | ) | 
					
						
						|  | timer.start() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.upload_queue[file_path] = (key, current_mtime, timer) | 
					
						
						|  | logger.info(f"检测到文件变化,加入上传队列: {key} (将在{self.config.upload_retry_delay}秒后检查)") | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"处理文件变化出错: {e}") | 
					
						
						|  | def is_file_modified(self, file_path, last_known_state): | 
					
						
						|  | """检查文件是否修改""" | 
					
						
						|  | try: | 
					
						
						|  | stat = os.stat(file_path) | 
					
						
						|  | current_size = stat.st_size | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if last_known_state[0] == current_size: | 
					
						
						|  | logger.info(f"文件大小没有变化: {file_path} (大小: {current_size})") | 
					
						
						|  |  | 
					
						
						|  | if not self.config.use_hash_check: | 
					
						
						|  | logger.info(f"不使用hash检查: {file_path} 视为未修改") | 
					
						
						|  | return False | 
					
						
						|  | logger.info(f"文件大小相同,开始检查hash: {file_path}") | 
					
						
						|  | current_hash = self.get_file_hash(file_path) | 
					
						
						|  | if len(last_known_state) <= 2: | 
					
						
						|  |  | 
					
						
						|  | rel_path = str(Path(file_path).relative_to(self.config.data_dir)).replace('\\', '/') | 
					
						
						|  |  | 
					
						
						|  | self.file_states[rel_path] = (current_size, stat.st_mtime, current_hash) | 
					
						
						|  | return True | 
					
						
						|  | return current_hash != last_known_state[2] | 
					
						
						|  | return True | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"检查文件修改状态出错: {e}") | 
					
						
						|  | return False | 
					
						
						|  | def _process_upload_queue(self, file_path): | 
					
						
						|  | """处理上传队列中的文件""" | 
					
						
						|  | with self.upload_queue_lock: | 
					
						
						|  | if file_path not in self.upload_queue: | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  | key, original_mtime, _ = self.upload_queue[file_path] | 
					
						
						|  | del self.upload_queue[file_path] | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | if not os.path.exists(file_path): | 
					
						
						|  | logger.info(f"文件已被删除,取消上传: {key}") | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | current_stat = os.stat(file_path) | 
					
						
						|  | current_mtime = current_stat.st_mtime | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if current_mtime != original_mtime: | 
					
						
						|  | logger.info(f"文件 {key} 在等待期间又有新变化,重新加入队列") | 
					
						
						|  | self.handle_file_change(file_path) | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  | if not self.is_file_modified(file_path, self.file_states.get(key, (0, 0))): | 
					
						
						|  | logger.info(f"文件 {key} 没有新变化,跳过上传") | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"开始上传文件: {key}") | 
					
						
						|  | if self.upload_file(file_path, key): | 
					
						
						|  |  | 
					
						
						|  | file_state = (current_stat.st_size, current_mtime) | 
					
						
						|  | if self.config.use_hash_check: | 
					
						
						|  | file_state += (self.get_file_hash(file_path),) | 
					
						
						|  | self.file_states[key] = file_state | 
					
						
						|  | else: | 
					
						
						|  | logger.error(f"上传失败: {key}") | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"处理上传队列出错: {e}") | 
					
						
						|  |  | 
					
						
						|  | def sync_deleted_files(self): | 
					
						
						|  | """同步删除操作""" | 
					
						
						|  | try: | 
					
						
						|  | remote_files = self.get_remote_files() | 
					
						
						|  | if remote_files is None: | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  | local_files = { | 
					
						
						|  | str(f.relative_to(self.config.data_dir)).replace('\\', '/') | 
					
						
						|  | for f in self.config.data_dir.rglob('*') | 
					
						
						|  | if f.is_file() and self.should_sync_file(f) | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | for file_info in remote_files: | 
					
						
						|  | key = file_info['key'] | 
					
						
						|  | if key not in local_files and key not in self.upload_queue: | 
					
						
						|  | logger.info(f"删除远程文件: {key}") | 
					
						
						|  | if self.delete_file(key): | 
					
						
						|  | if key in self.file_states: | 
					
						
						|  | del self.file_states[key] | 
					
						
						|  | else: | 
					
						
						|  | logger.error(f"删除失败: {key}") | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"同步删除操作出错: {e}") | 
					
						
						|  |  | 
					
						
						|  | def watch_and_sync(self): | 
					
						
						|  | """监控并同步文件""" | 
					
						
						|  | logger.info("启动持续同步服务...") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for file in self.config.data_dir.rglob('*'): | 
					
						
						|  | if file.is_file() and self.should_sync_file(file): | 
					
						
						|  | rel_path = str(file.relative_to(self.config.data_dir)).replace('\\', '/') | 
					
						
						|  | stat = file.stat() | 
					
						
						|  | file_state = (stat.st_size, stat.st_mtime) | 
					
						
						|  | if self.config.use_hash_check: | 
					
						
						|  | file_state += (self.get_file_hash(file),) | 
					
						
						|  | self.file_states[rel_path] = file_state | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | event_handler = SyncHandler(self.handle_file_change, self.config) | 
					
						
						|  | observer = Observer() | 
					
						
						|  | observer.schedule(event_handler, path=str(self.config.data_dir), recursive=True) | 
					
						
						|  | observer.start() | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | while True: | 
					
						
						|  |  | 
					
						
						|  | self.sync_deleted_files() | 
					
						
						|  | time.sleep(self.config.sync_interval) | 
					
						
						|  | except KeyboardInterrupt: | 
					
						
						|  | logger.info("收到停止信号,关闭监控...") | 
					
						
						|  |  | 
					
						
						|  | with self.upload_queue_lock: | 
					
						
						|  | for file_path, (_, _, timer) in self.upload_queue.items(): | 
					
						
						|  | timer.cancel() | 
					
						
						|  | self.upload_queue.clear() | 
					
						
						|  | observer.stop() | 
					
						
						|  | observer.join() | 
					
						
						|  | self.executor.shutdown() | 
					
						
						|  |  | 
					
						
						|  | def main(): | 
					
						
						|  | if len(sys.argv) < 2: | 
					
						
						|  | logger.info("Usage: python r2_sync.py [init|sync]") | 
					
						
						|  | sys.exit(1) | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | config = Config() | 
					
						
						|  | except ValueError as e: | 
					
						
						|  | logger.error(str(e)) | 
					
						
						|  | sys.exit(1) | 
					
						
						|  |  | 
					
						
						|  | r2_sync = R2Sync(config) | 
					
						
						|  | command = sys.argv[1] | 
					
						
						|  |  | 
					
						
						|  | if command == "init": | 
					
						
						|  | r2_sync.init_sync() | 
					
						
						|  | elif command == "sync": | 
					
						
						|  | logger.info(f"启动同步服务,间隔: {config.sync_interval}秒") | 
					
						
						|  | r2_sync.watch_and_sync() | 
					
						
						|  | else: | 
					
						
						|  | logger.error(f"未知命令: {command}") | 
					
						
						|  | sys.exit(1) | 
					
						
						|  |  | 
					
						
						|  | if __name__ == "__main__": | 
					
						
						|  | main() | 
					
						
						|  |  |