Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import tarfile | |
| import hashlib | |
| import logging | |
| from datetime import datetime | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError | |
| # ── 日志配置 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| datefmt="%Y-%m-%dT%H:%M:%SZ", | |
| ) | |
| log = logging.getLogger("sync") | |
| # ── 配置 | |
| api = HfApi() | |
| repo_id = os.getenv("HF_DATASET") | |
| token = os.getenv("HF_TOKEN") | |
| FILENAME = "latest_backup.tar.gz" | |
| BACKUP_PATH = f"/tmp/{FILENAME}" | |
| BASE_DIR = "/home/node/.openclaw" | |
| PATHS_TO_BACKUP = [ | |
| f"{BASE_DIR}/sessions", | |
| f"{BASE_DIR}/agents/main/sessions", | |
| f"{BASE_DIR}/credentials", | |
| f"{BASE_DIR}/workspace", | |
| f"{BASE_DIR}/extensions", | |
| f"{BASE_DIR}/openclaw.json", | |
| ] | |
| # ── 工具函数 | |
| def _check_env() -> bool: | |
| if not repo_id or not token: | |
| log.warning("HF_DATASET 或 HF_TOKEN 未设置,跳过同步。") | |
| return False | |
| return True | |
| def _sha256(path: str) -> str: | |
| h = hashlib.sha256() | |
| with open(path, "rb") as f: | |
| for chunk in iter(lambda: f.read(65536), b""): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| def _verify_tar(path: str) -> bool: | |
| try: | |
| with tarfile.open(path, "r:gz") as tar: | |
| members = tar.getmembers() | |
| if not members: | |
| log.warning("压缩包为空,跳过。") | |
| return False | |
| log.info(f"压缩包验证通过,共 {len(members)} 个条目。") | |
| return True | |
| except tarfile.TarError as e: | |
| log.error(f"压缩包损坏: {e}") | |
| return False | |
| # ── restore | |
| def restore() -> bool: | |
| if not _check_env(): | |
| return False | |
| log.info(f"开始恢复:从 {repo_id} 下载 {FILENAME} ...") | |
| try: | |
| path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=FILENAME, | |
| repo_type="dataset", | |
| token=token, | |
| ) | |
| except (EntryNotFoundError, RepositoryNotFoundError): | |
| log.info("仓库中尚无备份文件,首次运行,跳过恢复。") | |
| return False | |
| except Exception as e: | |
| log.error(f"下载失败: {e}") | |
| return False | |
| if not _verify_tar(path): | |
| log.error("备份文件验证失败,放弃解压。") | |
| return False | |
| log.info(f"文件 SHA-256: {_sha256(path)}") | |
| try: | |
| os.makedirs(BASE_DIR, exist_ok=True) | |
| with tarfile.open(path, "r:gz") as tar: | |
| # 兼容处理:如果你之前的备份带有 /root 路径,解压时会自动映射到当前目录 | |
| tar.extractall(path=BASE_DIR) | |
| log.info(f"恢复成功 → {BASE_DIR}") | |
| return True | |
| except Exception as e: | |
| log.error(f"解压失败: {e}") | |
| return False | |
| # ── backup | |
| def backup() -> bool: | |
| if not _check_env(): | |
| return False | |
| existing = [p for p in PATHS_TO_BACKUP if os.path.exists(p)] | |
| if not existing: | |
| log.warning("所有备份路径均不存在,跳过备份。") | |
| return False | |
| log.info(f"开始备份,共 {len(existing)} 个路径...") | |
| try: | |
| with tarfile.open(BACKUP_PATH, "w:gz") as tar: | |
| for p in existing: | |
| # 剥离前缀,确保解压时不带绝对路径 | |
| arcname = p.replace(f"{BASE_DIR}/", "") | |
| tar.add(p, arcname=arcname, recursive=True) | |
| log.info(f" 已打包: {p} → {arcname}") | |
| except Exception as e: | |
| log.error(f"打包失败: {e}") | |
| return False | |
| if not _verify_tar(BACKUP_PATH): | |
| log.error("生成的压缩包验证失败,取消上传。") | |
| return False | |
| log.info(f"压缩包大小: {os.path.getsize(BACKUP_PATH)/1024:.1f} KB,SHA-256: {_sha256(BACKUP_PATH)}") | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=BACKUP_PATH, | |
| path_in_repo=FILENAME, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=token, | |
| commit_message=f"backup {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC", | |
| ) | |
| log.info(f"备份上传成功 → {repo_id}/{FILENAME}") | |
| return True | |
| except Exception as e: | |
| log.error(f"上传失败: {e}") | |
| return False | |
| finally: | |
| if os.path.exists(BACKUP_PATH): | |
| os.remove(BACKUP_PATH) | |
| log.info("本地临时文件已清理。") | |
| # ── 入口 | |
| if __name__ == "__main__": | |
| action = sys.argv[1] if len(sys.argv) > 1 else "restore" | |
| if action == "backup": | |
| success = backup() | |
| elif action == "restore": | |
| success = restore() | |
| else: | |
| log.error(f"未知命令: {action},用法: python sync.py [backup|restore]") | |
| sys.exit(1) | |
| sys.exit(0 if success else 1) |