Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Sync agentcache data to/from a private HF Dataset repo. | |
| Usage: | |
| python3 sync.py restore -- download DB from HF on startup | |
| python3 sync.py backup -- upload DB to HF (called in loop) | |
| C4.1: Uses audit log high-water mark instead of mtime-based change detection. | |
| C4.2: Exposes last sync status via a .sync_state JSON file read by /health. | |
| """ | |
| import json | |
| import os | |
| import sys | |
| import shutil | |
| import tempfile | |
| import time | |
| import sqlite3 | |
| try: | |
| from huggingface_hub import HfApi, snapshot_download, hf_hub_download | |
| from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError | |
| except ImportError: | |
| print("[sync] huggingface_hub not installed, skipping sync") | |
| sys.exit(0) | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| REPO_ID = os.environ.get("AGENTCACHE_DATASET_REPO") or os.environ.get("AGENTMEMORY_DATASET_REPO") or "Yash030/agentmemory-python-data" | |
| DATA_DIR = os.path.expanduser("~/.agentcache") | |
| DB_PATH = os.path.join(DATA_DIR, "agentcache.db") | |
| # Only these paths are backed up/restored — everything else is ephemeral | |
| SYNC_FILES = [ | |
| "agentcache.db", | |
| ".hmac", | |
| ] | |
| SYNC_DIRS = [ | |
| "second-brain", | |
| ] | |
| # C4.1: High-water mark stored as JSON (replaces mtime STATE_FILE) | |
| STATE_FILE = os.path.join(DATA_DIR, ".sync_state") | |
| def get_api(): | |
| return HfApi(token=HF_TOKEN) | |
| def _collect_sync_targets(): | |
| """Return list of (abs_path, repo_rel_path) for all files to sync.""" | |
| targets = [] | |
| for fname in SYNC_FILES: | |
| full = os.path.join(DATA_DIR, fname) | |
| if os.path.isfile(full): | |
| targets.append((full, fname)) | |
| for dname in SYNC_DIRS: | |
| dpath = os.path.join(DATA_DIR, dname) | |
| if os.path.isdir(dpath): | |
| for root, _, files in os.walk(dpath): | |
| for f in files: | |
| full = os.path.join(root, f) | |
| rel = os.path.relpath(full, DATA_DIR).replace("\\", "/") | |
| targets.append((full, rel)) | |
| return targets | |
| def _get_audit_high_water_mark() -> int: | |
| """C4.1: Return MAX(id) from audit_log, or 0 if DB is absent/empty.""" | |
| try: | |
| if not os.path.exists(DB_PATH): | |
| return 0 | |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=5) | |
| try: | |
| row = conn.execute("SELECT MAX(id) FROM audit_log").fetchone() | |
| return int(row[0]) if row and row[0] is not None else 0 | |
| finally: | |
| conn.close() | |
| except Exception: | |
| return 0 | |
| def _load_sync_state() -> dict: | |
| """C4.1: Load the persisted sync state dict from SQLite sync_state_metadata table.""" | |
| try: | |
| if os.path.exists(DB_PATH): | |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=5) | |
| try: | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS sync_state_metadata ( | |
| key TEXT PRIMARY KEY, | |
| value TEXT NOT NULL | |
| ) | |
| """) | |
| row = conn.execute("SELECT value FROM sync_state_metadata WHERE key = ?", ("sync_state",)).fetchone() | |
| if row: | |
| return json.loads(row[0]) | |
| finally: | |
| conn.close() | |
| except Exception as e: | |
| print(f"[sync] load state error: {e}") | |
| return {"last_synced_audit_id": 0, "last_sync_at": None, "sync_status": "never"} | |
| def _save_sync_state(state: dict) -> None: | |
| """C4.1/C4.2: Persist the sync state dict to SQLite sync_state_metadata table.""" | |
| try: | |
| if os.path.exists(DB_PATH): | |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=5) | |
| try: | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS sync_state_metadata ( | |
| key TEXT PRIMARY KEY, | |
| value TEXT NOT NULL | |
| ) | |
| """) | |
| conn.execute( | |
| "INSERT OR REPLACE INTO sync_state_metadata (key, value) VALUES (?, ?)", | |
| ("sync_state", json.dumps(state)) | |
| ) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| except Exception as e: | |
| print(f"[sync] failed to save sync state: {e}") | |
| def restore(): | |
| if not HF_TOKEN: | |
| print("[sync] No HF_TOKEN — skipping restore") | |
| return | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| api = get_api() | |
| # Check repo exists | |
| try: | |
| api.repo_info(REPO_ID, repo_type="dataset") | |
| except RepositoryNotFoundError: | |
| print(f"[sync] Dataset repo {REPO_ID} not found — fresh start") | |
| return | |
| except Exception as e: | |
| print(f"[sync] restore repo check error: {e}") | |
| return | |
| # Download each sync target individually | |
| all_targets = SYNC_FILES + [ | |
| f for f in _list_repo_prefix(api, "second-brain/") | |
| ] | |
| if not all_targets: | |
| print("[sync] Dataset empty — fresh start") | |
| return | |
| for fname in all_targets: | |
| try: | |
| local_path = os.path.join(DATA_DIR, fname) | |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
| hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=fname, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| local_dir=DATA_DIR, | |
| local_dir_use_symlinks=False, | |
| ) | |
| print(f"[sync] restored {fname}") | |
| except EntryNotFoundError: | |
| pass # file not yet in repo, skip | |
| except Exception as e: | |
| print(f"[sync] restore {fname} error: {e}") | |
| print("[sync] restore complete") | |
| def _list_repo_prefix(api, prefix): | |
| """List files in repo matching a path prefix.""" | |
| try: | |
| from huggingface_hub import list_repo_files | |
| return [f for f in list_repo_files(REPO_ID, repo_type="dataset", token=HF_TOKEN) | |
| if f.startswith(prefix)] | |
| except Exception: | |
| return [] | |
| def _checkpoint_db(): | |
| """Checkpoint the SQLite WAL file before backing up to ensure all data is in the main DB file.""" | |
| try: | |
| if os.path.exists(DB_PATH): | |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=10) | |
| try: | |
| conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") | |
| print("[sync] DB checkpoint complete (WAL merged)") | |
| finally: | |
| conn.close() | |
| except Exception as e: | |
| print(f"[sync] DB checkpoint failed: {e}") | |
| def backup(): | |
| if not HF_TOKEN: | |
| return | |
| api = get_api() | |
| # Checkpoint WAL changes to main DB file before backup | |
| _checkpoint_db() | |
| targets = _collect_sync_targets() | |
| if not targets: | |
| print("[sync] nothing to backup") | |
| return | |
| # C4.1: Compare audit log high-water mark instead of mtime fingerprint | |
| current_hwm = _get_audit_high_water_mark() | |
| state = _load_sync_state() | |
| last_hwm = state.get("last_synced_audit_id", 0) | |
| if current_hwm <= last_hwm: | |
| print(f"[sync] no new audit entries (hwm={current_hwm}) — skipping backup") | |
| return | |
| print(f"[sync] audit HWM changed ({last_hwm} → {current_hwm}) — backing up...") | |
| # Ensure repo exists | |
| try: | |
| api.repo_info(REPO_ID, repo_type="dataset") | |
| except RepositoryNotFoundError: | |
| print(f"[sync] Creating dataset repo {REPO_ID}") | |
| api.create_repo(REPO_ID, repo_type="dataset", private=True) | |
| except Exception as e: | |
| print(f"[sync] repo_info error: {e}") | |
| # C4.2: record error state | |
| state["sync_status"] = "error" | |
| _save_sync_state(state) | |
| return | |
| # Stage only the targeted files | |
| staging = tempfile.mkdtemp(prefix="agentcache_sync_") | |
| try: | |
| for full, rel in targets: | |
| dest = os.path.join(staging, rel.replace("/", os.sep)) | |
| os.makedirs(os.path.dirname(dest), exist_ok=True) | |
| try: | |
| shutil.copy2(full, dest) | |
| except Exception as e: | |
| print(f"[sync] stage {rel} error: {e}") | |
| print(f"[sync] uploading {len(targets)} files to {REPO_ID}...") | |
| api.upload_folder( | |
| folder_path=staging, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message="sync: periodic backup", | |
| ) | |
| print("[sync] backup complete") | |
| # C4.1/C4.2: update state with new HWM and timestamp | |
| import datetime | |
| state["last_synced_audit_id"] = current_hwm | |
| state["last_sync_at"] = datetime.datetime.utcnow().isoformat() + "Z" | |
| state["sync_status"] = "ok" | |
| _save_sync_state(state) | |
| except Exception as e: | |
| print(f"[sync] backup error: {e}") | |
| state["sync_status"] = "error" | |
| _save_sync_state(state) | |
| finally: | |
| shutil.rmtree(staging, ignore_errors=True) | |
| if __name__ == "__main__": | |
| cmd = sys.argv[1] if len(sys.argv) > 1 else "backup" | |
| if cmd == "restore": | |
| restore() | |
| elif cmd == "backup": | |
| backup() | |
| else: | |
| print(f"[sync] unknown command: {cmd}") | |
| sys.exit(1) | |