agentcache / sync.py
Yash030's picture
feat: migrate agentmemory to agentcache namespace, endpoints, and tools
12a6c9a
raw
history blame contribute delete
9.13 kB
#!/usr/bin/env python3
"""
Sync agentcache data to/from a private HF Dataset repo.
Usage:
python3 sync.py restore -- download DB from HF on startup
python3 sync.py backup -- upload DB to HF (called in loop)
C4.1: Uses audit log high-water mark instead of mtime-based change detection.
C4.2: Exposes last sync status via a .sync_state JSON file read by /health.
"""
import json
import os
import sys
import shutil
import tempfile
import time
import sqlite3
try:
from huggingface_hub import HfApi, snapshot_download, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError
except ImportError:
print("[sync] huggingface_hub not installed, skipping sync")
sys.exit(0)
HF_TOKEN = os.environ.get("HF_TOKEN", "")
REPO_ID = os.environ.get("AGENTCACHE_DATASET_REPO") or os.environ.get("AGENTMEMORY_DATASET_REPO") or "Yash030/agentmemory-python-data"
DATA_DIR = os.path.expanduser("~/.agentcache")
DB_PATH = os.path.join(DATA_DIR, "agentcache.db")
# Only these paths are backed up/restored — everything else is ephemeral
SYNC_FILES = [
"agentcache.db",
".hmac",
]
SYNC_DIRS = [
"second-brain",
]
# C4.1: High-water mark stored as JSON (replaces mtime STATE_FILE)
STATE_FILE = os.path.join(DATA_DIR, ".sync_state")
def get_api():
return HfApi(token=HF_TOKEN)
def _collect_sync_targets():
"""Return list of (abs_path, repo_rel_path) for all files to sync."""
targets = []
for fname in SYNC_FILES:
full = os.path.join(DATA_DIR, fname)
if os.path.isfile(full):
targets.append((full, fname))
for dname in SYNC_DIRS:
dpath = os.path.join(DATA_DIR, dname)
if os.path.isdir(dpath):
for root, _, files in os.walk(dpath):
for f in files:
full = os.path.join(root, f)
rel = os.path.relpath(full, DATA_DIR).replace("\\", "/")
targets.append((full, rel))
return targets
def _get_audit_high_water_mark() -> int:
"""C4.1: Return MAX(id) from audit_log, or 0 if DB is absent/empty."""
try:
if not os.path.exists(DB_PATH):
return 0
conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=5)
try:
row = conn.execute("SELECT MAX(id) FROM audit_log").fetchone()
return int(row[0]) if row and row[0] is not None else 0
finally:
conn.close()
except Exception:
return 0
def _load_sync_state() -> dict:
"""C4.1: Load the persisted sync state dict from SQLite sync_state_metadata table."""
try:
if os.path.exists(DB_PATH):
conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=5)
try:
conn.execute("""
CREATE TABLE IF NOT EXISTS sync_state_metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
row = conn.execute("SELECT value FROM sync_state_metadata WHERE key = ?", ("sync_state",)).fetchone()
if row:
return json.loads(row[0])
finally:
conn.close()
except Exception as e:
print(f"[sync] load state error: {e}")
return {"last_synced_audit_id": 0, "last_sync_at": None, "sync_status": "never"}
def _save_sync_state(state: dict) -> None:
"""C4.1/C4.2: Persist the sync state dict to SQLite sync_state_metadata table."""
try:
if os.path.exists(DB_PATH):
conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=5)
try:
conn.execute("""
CREATE TABLE IF NOT EXISTS sync_state_metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
conn.execute(
"INSERT OR REPLACE INTO sync_state_metadata (key, value) VALUES (?, ?)",
("sync_state", json.dumps(state))
)
conn.commit()
finally:
conn.close()
except Exception as e:
print(f"[sync] failed to save sync state: {e}")
def restore():
if not HF_TOKEN:
print("[sync] No HF_TOKEN — skipping restore")
return
os.makedirs(DATA_DIR, exist_ok=True)
api = get_api()
# Check repo exists
try:
api.repo_info(REPO_ID, repo_type="dataset")
except RepositoryNotFoundError:
print(f"[sync] Dataset repo {REPO_ID} not found — fresh start")
return
except Exception as e:
print(f"[sync] restore repo check error: {e}")
return
# Download each sync target individually
all_targets = SYNC_FILES + [
f for f in _list_repo_prefix(api, "second-brain/")
]
if not all_targets:
print("[sync] Dataset empty — fresh start")
return
for fname in all_targets:
try:
local_path = os.path.join(DATA_DIR, fname)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
hf_hub_download(
repo_id=REPO_ID,
filename=fname,
repo_type="dataset",
token=HF_TOKEN,
local_dir=DATA_DIR,
local_dir_use_symlinks=False,
)
print(f"[sync] restored {fname}")
except EntryNotFoundError:
pass # file not yet in repo, skip
except Exception as e:
print(f"[sync] restore {fname} error: {e}")
print("[sync] restore complete")
def _list_repo_prefix(api, prefix):
"""List files in repo matching a path prefix."""
try:
from huggingface_hub import list_repo_files
return [f for f in list_repo_files(REPO_ID, repo_type="dataset", token=HF_TOKEN)
if f.startswith(prefix)]
except Exception:
return []
def _checkpoint_db():
"""Checkpoint the SQLite WAL file before backing up to ensure all data is in the main DB file."""
try:
if os.path.exists(DB_PATH):
conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=10)
try:
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
print("[sync] DB checkpoint complete (WAL merged)")
finally:
conn.close()
except Exception as e:
print(f"[sync] DB checkpoint failed: {e}")
def backup():
if not HF_TOKEN:
return
api = get_api()
# Checkpoint WAL changes to main DB file before backup
_checkpoint_db()
targets = _collect_sync_targets()
if not targets:
print("[sync] nothing to backup")
return
# C4.1: Compare audit log high-water mark instead of mtime fingerprint
current_hwm = _get_audit_high_water_mark()
state = _load_sync_state()
last_hwm = state.get("last_synced_audit_id", 0)
if current_hwm <= last_hwm:
print(f"[sync] no new audit entries (hwm={current_hwm}) — skipping backup")
return
print(f"[sync] audit HWM changed ({last_hwm}{current_hwm}) — backing up...")
# Ensure repo exists
try:
api.repo_info(REPO_ID, repo_type="dataset")
except RepositoryNotFoundError:
print(f"[sync] Creating dataset repo {REPO_ID}")
api.create_repo(REPO_ID, repo_type="dataset", private=True)
except Exception as e:
print(f"[sync] repo_info error: {e}")
# C4.2: record error state
state["sync_status"] = "error"
_save_sync_state(state)
return
# Stage only the targeted files
staging = tempfile.mkdtemp(prefix="agentcache_sync_")
try:
for full, rel in targets:
dest = os.path.join(staging, rel.replace("/", os.sep))
os.makedirs(os.path.dirname(dest), exist_ok=True)
try:
shutil.copy2(full, dest)
except Exception as e:
print(f"[sync] stage {rel} error: {e}")
print(f"[sync] uploading {len(targets)} files to {REPO_ID}...")
api.upload_folder(
folder_path=staging,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
commit_message="sync: periodic backup",
)
print("[sync] backup complete")
# C4.1/C4.2: update state with new HWM and timestamp
import datetime
state["last_synced_audit_id"] = current_hwm
state["last_sync_at"] = datetime.datetime.utcnow().isoformat() + "Z"
state["sync_status"] = "ok"
_save_sync_state(state)
except Exception as e:
print(f"[sync] backup error: {e}")
state["sync_status"] = "error"
_save_sync_state(state)
finally:
shutil.rmtree(staging, ignore_errors=True)
if __name__ == "__main__":
cmd = sys.argv[1] if len(sys.argv) > 1 else "backup"
if cmd == "restore":
restore()
elif cmd == "backup":
backup()
else:
print(f"[sync] unknown command: {cmd}")
sys.exit(1)