Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Hermes Agent Data Sync Service | |
| Handles data persistence to/from Hugging Face Dataset | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import shutil | |
| import tarfile | |
| import argparse | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional, Dict, List | |
| from huggingface_hub import HfApi, hf_hub_download, upload_folder | |
| from loguru import logger | |
| # 文件监控(可选) | |
| try: | |
| from watchdog.observers import Observer | |
| from watchdog.events import FileSystemEventHandler | |
| WATCHDOG_AVAILABLE = True | |
| except ImportError: | |
| WATCHDOG_AVAILABLE = False | |
| logger.warning("watchdog not installed, file change detection disabled") | |
| class DatasetManager: | |
| """Manages data synchronization with Hugging Face Dataset""" | |
| def __init__(self, dataset_repo: Optional[str] = None, token: Optional[str] = None): | |
| self.dataset_repo = dataset_repo or os.environ.get('HF_DATASET_REPO') | |
| self.token = token or os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN') | |
| self.api = HfApi(token=self.token) | |
| self.hermes_home = Path(os.environ.get('HERMES_HOME', '/data/.hermes')) | |
| self.temp_dir = Path('/tmp/hermes_sync') | |
| # 数据路径映射 | |
| self.path_mapping = { | |
| 'config': self.hermes_home / 'config.yaml', | |
| 'env': self.hermes_home / '.env', | |
| 'auth': self.hermes_home / 'auth.json', | |
| 'soul': self.hermes_home / 'SOUL.md', | |
| 'memories': self.hermes_home / 'memories', | |
| 'skills': self.hermes_home / 'skills', | |
| 'sessions': self.hermes_home / 'sessions', | |
| 'state_db': self.hermes_home / 'state.db', | |
| 'logs': self.hermes_home / 'logs', | |
| 'cron': self.hermes_home / 'cron', | |
| 'webui_token': Path('/data/.hermes-web-ui') / '.token', | |
| 'image_cache': self.hermes_home / 'image_cache', | |
| 'baoyu_skills': Path('/home/appuser/.baoyu-skills'), | |
| } | |
| def validate(self) -> bool: | |
| """验证配置是否正确""" | |
| if not self.dataset_repo: | |
| logger.error("HF_DATASET_REPO not set") | |
| return False | |
| if not self.token: | |
| logger.warning("HF_TOKEN not set, will try public dataset") | |
| return True | |
| def prepare_backup_data(self) -> Path: | |
| """准备备份数据到临时目录""" | |
| logger.info("Preparing backup data...") | |
| # 清理并创建临时目录 | |
| if self.temp_dir.exists(): | |
| shutil.rmtree(self.temp_dir) | |
| self.temp_dir.mkdir(parents=True) | |
| # 创建目录结构 | |
| (self.temp_dir / 'config').mkdir() | |
| (self.temp_dir / 'personality').mkdir() | |
| (self.temp_dir / 'memories').mkdir() | |
| (self.temp_dir / 'skills').mkdir() | |
| (self.temp_dir / 'sessions').mkdir() | |
| (self.temp_dir / 'state').mkdir() | |
| (self.temp_dir / 'logs').mkdir() | |
| (self.temp_dir / 'cron').mkdir() | |
| (self.temp_dir / 'webui').mkdir() | |
| (self.temp_dir / 'image_cache').mkdir() | |
| (self.temp_dir / 'baoyu_skills').mkdir() | |
| # 复制文件 | |
| try: | |
| # 配置文件 | |
| if self.path_mapping['config'].exists(): | |
| shutil.copy2(self.path_mapping['config'], self.temp_dir / 'config' / 'config.yaml') | |
| # 环境变量(敏感信息) | |
| if self.path_mapping['env'].exists(): | |
| shutil.copy2(self.path_mapping['env'], self.temp_dir / 'config' / '.env') | |
| # OAuth 认证 | |
| if self.path_mapping['auth'].exists(): | |
| shutil.copy2(self.path_mapping['auth'], self.temp_dir / 'config' / 'auth.json') | |
| # 人格定义 | |
| if self.path_mapping['soul'].exists(): | |
| shutil.copy2(self.path_mapping['soul'], self.temp_dir / 'personality' / 'SOUL.md') | |
| # 记忆 | |
| if self.path_mapping['memories'].exists(): | |
| shutil.copytree(self.path_mapping['memories'], self.temp_dir / 'memories', dirs_exist_ok=True) | |
| # 技能 | |
| if self.path_mapping['skills'].exists(): | |
| shutil.copytree(self.path_mapping['skills'], self.temp_dir / 'skills', dirs_exist_ok=True) | |
| # 会话 | |
| if self.path_mapping['sessions'].exists(): | |
| shutil.copytree(self.path_mapping['sessions'], self.temp_dir / 'sessions', dirs_exist_ok=True) | |
| # 数据库 | |
| if self.path_mapping['state_db'].exists(): | |
| shutil.copy2(self.path_mapping['state_db'], self.temp_dir / 'state' / 'state.db') | |
| # 日志 | |
| if self.path_mapping['logs'].exists(): | |
| shutil.copytree(self.path_mapping['logs'], self.temp_dir / 'logs', dirs_exist_ok=True) | |
| # 定时任务 | |
| if self.path_mapping['cron'].exists(): | |
| shutil.copytree(self.path_mapping['cron'], self.temp_dir / 'cron', dirs_exist_ok=True) | |
| # Image Cache | |
| if self.path_mapping['image_cache'].exists(): | |
| shutil.copytree(self.path_mapping['image_cache'], self.temp_dir / 'image_cache', dirs_exist_ok=True) | |
| # baoyu-skills 用户配置 (EXTEND.md 等) | |
| if self.path_mapping['baoyu_skills'].exists(): | |
| shutil.copytree(self.path_mapping['baoyu_skills'], self.temp_dir / 'baoyu_skills', dirs_exist_ok=True) | |
| # WebUI 认证 Token | |
| if self.path_mapping['webui_token'].exists(): | |
| (self.temp_dir / 'webui').mkdir(exist_ok=True) | |
| shutil.copy2(self.path_mapping['webui_token'], self.temp_dir / 'webui' / '.token') | |
| # 修复临时目录权限(源文件可能被设为只读,如 baoyu-imagine scripts 的 555) | |
| # upload_folder 需要能正常读取所有文件 | |
| logger.info("Fixing permissions in temp backup dir...") | |
| import stat | |
| for root, dirs, files in os.walk(self.temp_dir): | |
| for d in dirs: | |
| dir_path = os.path.join(root, d) | |
| try: | |
| os.chmod(dir_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) | |
| except Exception: | |
| pass | |
| for f in files: | |
| file_path = os.path.join(root, f) | |
| try: | |
| os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) | |
| except Exception: | |
| pass | |
| # 添加元数据 | |
| metadata = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'version': '0.10.0', | |
| 'hermes_home': str(self.hermes_home) | |
| } | |
| with open(self.temp_dir / 'metadata.json', 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| logger.success(f"Backup prepared at {self.temp_dir}") | |
| return self.temp_dir | |
| except Exception as e: | |
| logger.error(f"Failed to prepare backup: {e}") | |
| raise | |
| def upload_to_dataset(self, force: bool = False) -> bool: | |
| """上传数据到 Hugging Face Dataset""" | |
| try: | |
| backup_dir = self.prepare_backup_data() | |
| logger.info(f"Uploading to dataset: {self.dataset_repo}") | |
| # 上传文件夹到 dataset | |
| self.api.upload_folder( | |
| folder_path=str(backup_dir), | |
| repo_id=self.dataset_repo, | |
| repo_type="dataset", | |
| commit_message=f"Hermes Agent backup - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
| ) | |
| logger.success("Backup uploaded successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to upload to dataset: {e}") | |
| return False | |
| def download_from_dataset(self) -> bool: | |
| """从 Hugging Face Dataset 下载数据""" | |
| try: | |
| logger.info(f"Downloading from dataset: {self.dataset_repo}") | |
| # 创建临时下载目录 | |
| download_dir = Path('/tmp/hermes_download') | |
| if download_dir.exists(): | |
| shutil.rmtree(download_dir) | |
| download_dir.mkdir(parents=True) | |
| # 下载所有文件 | |
| self.api.snapshot_download( | |
| repo_id=self.dataset_repo, | |
| repo_type="dataset", | |
| local_dir=str(download_dir) | |
| ) | |
| logger.success("Download completed") | |
| # 恢复数据到 Hermes 目录 | |
| self.restore_from_download(download_dir) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to download from dataset: {e}") | |
| return False | |
| def restore_from_download(self, download_dir: Path): | |
| """从下载的目录恢复数据 | |
| 注意: config.yaml 在恢复时被跳过,因为 entrypoint.sh 会根据环境变量 | |
| 重新生成正确的 config.yaml。如果恢复旧的 config.yaml,会导致模型 | |
| 配置被覆盖(例如 minimaxai/minimax-m2.7 被替换为旧模型)。 | |
| """ | |
| logger.info("Restoring data to Hermes home...") | |
| # 确保目标目录存在 | |
| self.hermes_home.mkdir(parents=True, exist_ok=True) | |
| # 恢复策略控制: | |
| # SKIP_CONFIG_RESTORE=true(默认):跳过由 entrypoint.sh 动态生成的配置,防止旧备份覆盖新配置 | |
| # 跳过项:config.yaml、baoyu_skills/ | |
| # 恢复项:skills/(Skills Hub 安装的技能,entrypoint.sh 不会重新安装 cover/illustrator 等) | |
| # 保留项:memories、sessions、state.db、.env、auth.json、logs、cron、webui/.token、SOUL.md(用户数据) | |
| # SKIP_CONFIG_RESTORE=false:恢复所有备份(配置稳定后使用) | |
| skip_restore = os.environ.get('SKIP_CONFIG_RESTORE', 'true').lower() in ('true', '1', 'yes') | |
| # 定义跳过恢复的路径(entrypoint.sh 会重新生成这些配置) | |
| # NOTE: skills 目录不跳过!它包含 Skills Hub 安装的技能(cover-image, article-illustrator 等), | |
| # entrypoint.sh 只重新安装 baoyu-imagine,不会安装其他技能。 | |
| # 恢复后 entrypoint.sh 会覆盖 baoyu-imagine 确保其最新(第652-654行 cp -r)。 | |
| if skip_restore: | |
| skip_paths = { | |
| 'config/config.yaml', # 模型/供应商配置由 entrypoint.sh 根据环境变量生成 | |
| 'baoyu_skills', # baoyu-skills EXTEND.md 由 entrypoint.sh 重新生成 | |
| } | |
| logger.info(f"SKIP_CONFIG_RESTORE=true, skipping: {', '.join(skip_paths)}") | |
| else: | |
| skip_paths = set() | |
| logger.info("SKIP_CONFIG_RESTORE=false, restoring all backed-up configurations") | |
| restore_mapping = { | |
| 'config/.env': self.path_mapping['env'], | |
| 'config/auth.json': self.path_mapping['auth'], | |
| 'personality/SOUL.md': self.path_mapping['soul'], | |
| 'memories': self.path_mapping['memories'], | |
| 'skills': self.path_mapping['skills'], | |
| 'sessions': self.path_mapping['sessions'], | |
| 'state/state.db': self.path_mapping['state_db'], | |
| 'logs': self.path_mapping['logs'], | |
| 'cron': self.path_mapping['cron'], | |
| 'webui/.token': self.path_mapping['webui_token'], | |
| 'image_cache': self.path_mapping['image_cache'], | |
| 'baoyu_skills': self.path_mapping['baoyu_skills'], | |
| } | |
| # config.yaml 特殊处理: | |
| # 即使不在 skip_paths 中,如果 skip_restore=true,也不直接覆盖,而是保存到 .restored 供合并 | |
| if not skip_restore: | |
| restore_mapping['config/config.yaml'] = self.path_mapping['config'] | |
| else: | |
| # 恢复到 .restored 文件,供 entrypoint.sh 合并用户修改的配置区块(如 channels、display 等) | |
| restored_path = self.hermes_home / 'config.yaml.restored' | |
| src = download_dir / 'config' / 'config.yaml' | |
| if src.exists(): | |
| shutil.copy2(src, restored_path) | |
| logger.info("Restored config.yaml to config.yaml.restored for merge") | |
| for src_rel, dst in restore_mapping.items(): | |
| # 检查是否在跳过列表中 | |
| if src_rel in skip_paths: | |
| logger.info(f"Skipping restore of {src_rel} (will be regenerated by entrypoint.sh)") | |
| continue | |
| src = download_dir / src_rel | |
| if src.exists(): | |
| try: | |
| if src.is_file(): | |
| dst.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(src, dst) | |
| logger.info(f"Restored: {src_rel}") | |
| elif src.is_dir(): | |
| if dst.exists(): | |
| shutil.rmtree(dst) | |
| shutil.copytree(src, dst) | |
| logger.info(f"Restored directory: {src_rel}") | |
| except Exception as e: | |
| logger.error(f"Failed to restore {src_rel}: {e}") | |
| else: | |
| logger.warning(f"Not found in backup: {src_rel}") | |
| logger.success("Data restoration completed") | |
| class ConfigFileHandler(FileSystemEventHandler): | |
| """配置文件变化处理器 - 实时同步到 Dataset 并触发重载""" | |
| # 启动静默期(秒):在此期间内的文件变更不予备份,避免启动阶段冗余上传 | |
| STARTUP_GRACE_PERIOD = 30 | |
| def __init__(self, manager: DatasetManager): | |
| self.manager = manager | |
| self.last_backup_time = 0 | |
| self.backup_cooldown = 5 # 5秒内不重复备份 | |
| self.start_time = time.time() # 记录处理器创建时间 | |
| self._startup_logged = False | |
| def on_modified(self, event): | |
| """文件被修改时触发""" | |
| if event.is_directory: | |
| return | |
| # 启动静默期:跳过启动阶段的配置变更备份 | |
| elapsed = time.time() - self.start_time | |
| if elapsed < self.STARTUP_GRACE_PERIOD: | |
| if not self._startup_logged: | |
| logger.info(f"In startup grace period ({int(self.STARTUP_GRACE_PERIOD - elapsed)}s remaining), skipping backup for: {event.src_path}") | |
| self._startup_logged = True | |
| return | |
| # 只关注关键配置文件 | |
| watched_files = ['config.yaml', '.env', 'auth.json'] | |
| if any(event.src_path.endswith(f) for f in watched_files): | |
| current_time = time.time() | |
| if current_time - self.last_backup_time > self.backup_cooldown: | |
| logger.info(f"Config file changed: {event.src_path}") | |
| logger.info("Triggering immediate backup...") | |
| try: | |
| self.manager.upload_to_dataset() | |
| self.last_backup_time = current_time | |
| logger.success("Immediate backup completed") | |
| # 尝试触发 Hermes 配置重载 | |
| self._trigger_reload() | |
| except Exception as e: | |
| logger.error(f"Immediate backup failed: {e}") | |
| def _trigger_reload(self): | |
| """尝试触发 Hermes 配置重载""" | |
| # 注意:Hermes 目前没有 config reload 命令 | |
| # 配置将在下次 Space 重启时自动生效 | |
| logger.info("Configuration saved. Please restart Space to apply changes immediately.") | |
| def run_daemon(): | |
| """后台守护进程模式 - 定期同步 + 实时文件监听""" | |
| logger.info("Starting data sync daemon...") | |
| sync_interval = int(os.environ.get('SYNC_INTERVAL', '60')) # 默认60秒(实时模式) | |
| manager = DatasetManager() | |
| if not manager.validate(): | |
| logger.error("Configuration invalid, exiting") | |
| sys.exit(1) | |
| logger.info(f"Sync interval: {sync_interval} seconds") | |
| # 如果 watchdog 可用,启动文件监听 | |
| observer = None | |
| if WATCHDOG_AVAILABLE: | |
| try: | |
| logger.info("Starting file watcher for real-time sync...") | |
| event_handler = ConfigFileHandler(manager) | |
| observer = Observer() | |
| observer.schedule(event_handler, str(manager.hermes_home), recursive=False) | |
| observer.start() | |
| logger.success("File watcher started - config changes will trigger immediate backup") | |
| except Exception as e: | |
| logger.error(f"Failed to start file watcher: {e}") | |
| logger.warning("Falling back to scheduled sync only") | |
| observer = None | |
| else: | |
| logger.warning("Watchdog not available, using scheduled sync only") | |
| try: | |
| while True: | |
| try: | |
| time.sleep(sync_interval) | |
| logger.info("Performing scheduled backup...") | |
| manager.upload_to_dataset() | |
| except KeyboardInterrupt: | |
| logger.info("Daemon stopped") | |
| break | |
| except Exception as e: | |
| logger.error(f"Sync error: {e}") | |
| finally: | |
| # 清理文件监听器 | |
| if observer: | |
| logger.info("Stopping file watcher...") | |
| observer.stop() | |
| observer.join() | |
| logger.info("File watcher stopped") | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Hermes Agent Data Sync') | |
| parser.add_argument('action', choices=['backup', 'restore', 'daemon'], | |
| help='Action to perform') | |
| parser.add_argument('--force', '-f', action='store_true', | |
| help='Force backup even if no changes') | |
| args = parser.parse_args() | |
| manager = DatasetManager() | |
| if not manager.validate(): | |
| logger.error("Configuration invalid") | |
| sys.exit(1) | |
| if args.action == 'backup': | |
| success = manager.upload_to_dataset(force=args.force) | |
| sys.exit(0 if success else 1) | |
| elif args.action == 'restore': | |
| success = manager.download_from_dataset() | |
| sys.exit(0 if success else 1) | |
| elif args.action == 'daemon': | |
| run_daemon() | |
| if __name__ == '__main__': | |
| main() | |