somratpro commited on
Commit
e41db1d
·
1 Parent(s): 9a3eec7

feat: implement persistent state tracking, hostname inclusion in sync messages, and interval jittering

Browse files
Files changed (1) hide show
  1. hermes-sync.py +62 -30
hermes-sync.py CHANGED
@@ -7,6 +7,8 @@ import logging
7
  import os
8
  import shutil
9
  import signal
 
 
10
  import sys
11
  import tempfile
12
  import threading
@@ -23,6 +25,7 @@ logging.getLogger("huggingface_hub").setLevel(logging.ERROR)
23
 
24
  HERMES_HOME = Path(os.environ.get("HERMES_HOME", "/opt/data"))
25
  STATUS_FILE = Path("/tmp/huggingmes-sync-status.json")
 
26
  INTERVAL = int(os.environ.get("SYNC_INTERVAL", "600"))
27
  INITIAL_DELAY = int(os.environ.get("SYNC_START_DELAY", "10"))
28
  HF_TOKEN = os.environ.get("HF_TOKEN", "").strip()
@@ -41,7 +44,7 @@ EXCLUDED_DIRS = {
41
  "node_modules",
42
  "venv",
43
  }
44
- EXCLUDED_TOP_LEVEL = {"logs"}
45
  if not INCLUDE_ENV:
46
  EXCLUDED_TOP_LEVEL.add(".env")
47
 
@@ -50,15 +53,41 @@ STOP_EVENT = threading.Event()
50
  _REPO_ID_CACHE: str | None = None
51
 
52
 
53
- def write_status(status: str, message: str) -> None:
 
54
  payload = {
55
  "status": status,
56
  "message": message,
57
- "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
58
  }
 
 
59
  tmp_path = STATUS_FILE.with_suffix(".tmp")
60
- tmp_path.write_text(json.dumps(payload), encoding="utf-8")
61
- tmp_path.replace(STATUS_FILE)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
 
63
 
64
  def resolve_backup_repo() -> str:
@@ -213,45 +242,45 @@ def restore() -> bool:
213
 
214
 
215
  def sync_once(last_fingerprint: str | None = None, last_marker: tuple[int, int, int] | None = None):
216
- if not HF_TOKEN:
217
- write_status("disabled", "HF_TOKEN is not configured.")
218
- return (last_fingerprint or "", last_marker or (0, 0, 0))
 
 
 
 
 
 
 
 
219
 
220
  repo_id = ensure_repo_exists()
221
  current_marker = metadata_marker(HERMES_HOME)
222
  if last_marker is not None and current_marker == last_marker:
223
- write_status("synced", "No Hermes state changes detected.")
224
  return (last_fingerprint or "", current_marker)
225
 
226
  current_fingerprint = fingerprint_dir(HERMES_HOME)
227
  if last_fingerprint is not None and current_fingerprint == last_fingerprint:
228
- write_status("synced", "No Hermes state changes detected.")
229
  return (last_fingerprint, current_marker)
230
 
231
- write_status("syncing", f"Uploading Hermes state to {repo_id}")
 
232
  snapshot_dir = create_snapshot_dir(HERMES_HOME)
233
  try:
234
- try:
235
- HF_API.upload_large_folder(
236
- repo_id=repo_id,
237
- repo_type="dataset",
238
- folder_path=str(snapshot_dir),
239
- num_workers=2,
240
- print_report=False,
241
- )
242
- except AttributeError:
243
- upload_folder(
244
- folder_path=str(snapshot_dir),
245
- repo_id=repo_id,
246
- repo_type="dataset",
247
- token=HF_TOKEN,
248
- commit_message=f"HuggingMes sync {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}",
249
- ignore_patterns=[".git/*", ".git"],
250
- )
251
  finally:
252
  shutil.rmtree(snapshot_dir, ignore_errors=True)
253
 
254
- write_status("success", f"Uploaded Hermes state to {repo_id}")
255
  return (current_fingerprint, current_marker)
256
 
257
 
@@ -281,7 +310,10 @@ def loop() -> int:
281
  except Exception as exc:
282
  write_status("error", f"Sync failed: {exc}")
283
  print(f"Hermes sync failed: {exc}")
284
- if STOP_EVENT.wait(INTERVAL):
 
 
 
285
  break
286
  return 0
287
 
 
7
  import os
8
  import shutil
9
  import signal
10
+ import random
11
+ import socket
12
  import sys
13
  import tempfile
14
  import threading
 
25
 
26
  HERMES_HOME = Path(os.environ.get("HERMES_HOME", "/opt/data"))
27
  STATUS_FILE = Path("/tmp/huggingmes-sync-status.json")
28
+ STATE_FILE = HERMES_HOME / ".huggingmes-sync-state.json"
29
  INTERVAL = int(os.environ.get("SYNC_INTERVAL", "600"))
30
  INITIAL_DELAY = int(os.environ.get("SYNC_START_DELAY", "10"))
31
  HF_TOKEN = os.environ.get("HF_TOKEN", "").strip()
 
44
  "node_modules",
45
  "venv",
46
  }
47
+ EXCLUDED_TOP_LEVEL = {"logs", STATE_FILE.name}
48
  if not INCLUDE_ENV:
49
  EXCLUDED_TOP_LEVEL.add(".env")
50
 
 
53
  _REPO_ID_CACHE: str | None = None
54
 
55
 
56
+ def write_status(status: str, message: str, fingerprint: str | None = None, marker: tuple[int, int, int] | None = None) -> None:
57
+ timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
58
  payload = {
59
  "status": status,
60
  "message": message,
61
+ "timestamp": timestamp,
62
  }
63
+
64
+ # Update temporary status file for health checks/dashboard
65
  tmp_path = STATUS_FILE.with_suffix(".tmp")
66
+ try:
67
+ tmp_path.write_text(json.dumps(payload), encoding="utf-8")
68
+ tmp_path.replace(STATUS_FILE)
69
+ except OSError:
70
+ pass
71
+
72
+ # Update persistent state file in HERMES_HOME
73
+ if fingerprint or marker:
74
+ state = {}
75
+ if STATE_FILE.exists():
76
+ try:
77
+ state = json.loads(STATE_FILE.read_text(encoding="utf-8"))
78
+ except Exception:
79
+ pass
80
+
81
+ if fingerprint:
82
+ state["last_fingerprint"] = fingerprint
83
+ if marker:
84
+ state["last_marker"] = list(marker)
85
+ state["last_sync"] = timestamp
86
+
87
+ try:
88
+ STATE_FILE.write_text(json.dumps(state), encoding="utf-8")
89
+ except OSError:
90
+ pass
91
 
92
 
93
  def resolve_backup_repo() -> str:
 
242
 
243
 
244
  def sync_once(last_fingerprint: str | None = None, last_marker: tuple[int, int, int] | None = None):
245
+ # If no state provided, try to load from persistent state file
246
+ if last_fingerprint is None and last_marker is None:
247
+ if STATE_FILE.exists():
248
+ try:
249
+ state = json.loads(STATE_FILE.read_text(encoding="utf-8"))
250
+ last_fingerprint = state.get("last_fingerprint")
251
+ m = state.get("last_marker")
252
+ if m and len(m) == 3:
253
+ last_marker = tuple(m)
254
+ except Exception:
255
+ pass
256
 
257
  repo_id = ensure_repo_exists()
258
  current_marker = metadata_marker(HERMES_HOME)
259
  if last_marker is not None and current_marker == last_marker:
260
+ write_status("synced", "No Hermes state changes detected (marker match).")
261
  return (last_fingerprint or "", current_marker)
262
 
263
  current_fingerprint = fingerprint_dir(HERMES_HOME)
264
  if last_fingerprint is not None and current_fingerprint == last_fingerprint:
265
+ write_status("synced", "No Hermes state changes detected (fingerprint match).")
266
  return (last_fingerprint, current_marker)
267
 
268
+ hostname = socket.gethostname()
269
+ write_status("syncing", f"Uploading Hermes state to {repo_id} from {hostname}")
270
  snapshot_dir = create_snapshot_dir(HERMES_HOME)
271
  try:
272
+ upload_folder(
273
+ folder_path=str(snapshot_dir),
274
+ repo_id=repo_id,
275
+ repo_type="dataset",
276
+ token=HF_TOKEN,
277
+ commit_message=f"HuggingMes sync [{hostname}] {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}",
278
+ ignore_patterns=[".git/*", ".git"],
279
+ )
 
 
 
 
 
 
 
 
 
280
  finally:
281
  shutil.rmtree(snapshot_dir, ignore_errors=True)
282
 
283
+ write_status("success", f"Uploaded Hermes state to {repo_id}", fingerprint=current_fingerprint, marker=current_marker)
284
  return (current_fingerprint, current_marker)
285
 
286
 
 
310
  except Exception as exc:
311
  write_status("error", f"Sync failed: {exc}")
312
  print(f"Hermes sync failed: {exc}")
313
+
314
+ # Add 10% jitter to interval to avoid synchronized commits from multiple containers
315
+ jitter = random.uniform(0.9, 1.1)
316
+ if STOP_EVENT.wait(INTERVAL * jitter):
317
  break
318
  return 0
319