aristoteles78 commited on
Commit
b714bbd
·
1 Parent(s): 19b064e

webhook hardening, dedup, init-known-folders

Browse files
__pycache__/app.cpython-311.pyc DELETED
Binary file (8.16 kB)
 
__pycache__/worker.cpython-311.pyc DELETED
Binary file (13.1 kB)
 
app.py CHANGED
@@ -2,6 +2,7 @@
2
 
3
  Listens for PR merge events on evaleval/EEE_datastore and triggers
4
  card generation for new benchmarks in a background thread.
 
5
  """
6
 
7
  import hmac
@@ -17,7 +18,8 @@ from worker import (
17
  detect_new_benchmarks,
18
  process_new_benchmarks,
19
  load_state,
20
- save_state,
 
21
  PERSISTENT_DIR,
22
  )
23
 
@@ -29,6 +31,9 @@ logger = logging.getLogger("webhook")
29
 
30
  app = FastAPI(title="BenchmarkCard Webhook")
31
 
 
 
 
32
  # Track active generation thread (max 1 concurrent)
33
  _active_job: dict = {"thread": None, "started_at": None, "folders": []}
34
  _job_lock = threading.Lock()
@@ -52,12 +57,35 @@ def _is_merged_pr(payload: dict) -> bool:
52
  )
53
 
54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  def _run_generation(new_folders: list[str]):
56
- """Background worker: generate cards for new benchmark folders."""
57
  try:
58
  logger.info("Background generation started for %d folders: %s", len(new_folders), new_folders)
59
  process_new_benchmarks(new_folders)
60
  logger.info("Background generation completed")
 
 
 
 
 
 
 
 
 
 
61
  except Exception:
62
  logger.exception("Background generation failed")
63
  finally:
@@ -70,14 +98,12 @@ def _run_generation(new_folders: list[str]):
70
  @app.post("/webhook")
71
  async def webhook(request: Request):
72
  """Receive HF webhook events and trigger card generation."""
73
- # Verify secret
74
  secret = request.headers.get("X-Webhook-Secret", "")
75
  if not _verify_secret(secret):
76
  return JSONResponse(status_code=403, content={"error": "invalid secret"})
77
 
78
  payload = await request.json()
79
 
80
- # Only act on merged PRs
81
  if not _is_merged_pr(payload):
82
  event_scope = payload.get("event", {}).get("scope", "unknown")
83
  discussion = payload.get("discussion", {})
@@ -92,7 +118,6 @@ async def webhook(request: Request):
92
  pr_num = discussion.get("num", "?")
93
  logger.info("Merged PR detected: #%s '%s'", pr_num, pr_title)
94
 
95
- # Detect new benchmark folders
96
  new_folders = detect_new_benchmarks()
97
  if not new_folders:
98
  return JSONResponse(
@@ -100,16 +125,32 @@ async def webhook(request: Request):
100
  content={"action": "no_new_benchmarks", "pr": f"#{pr_num} {pr_title}"},
101
  )
102
 
103
- # Spawn background thread if not already running
104
  with _job_lock:
105
- if _active_job["thread"] is not None and _active_job["thread"].is_alive():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
  return JSONResponse(
107
  status_code=200,
108
  content={
109
  "action": "queued",
110
- "reason": "generation already in progress",
111
  "active_folders": _active_job["folders"],
112
- "new_folders": new_folders,
113
  },
114
  )
115
 
@@ -145,14 +186,38 @@ async def status():
145
  return {
146
  "active_job": active,
147
  "known_folders": len(state.get("known_folders", [])),
 
148
  "jobs": state.get("jobs", [])[-20:],
149
  }
150
 
151
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
  @app.get("/")
153
  async def root():
154
  """Root endpoint for HF Space UI."""
155
- return {"service": "BenchmarkCard Webhook", "endpoints": ["/webhook", "/status", "/health"]}
156
 
157
 
158
  @app.get("/health")
 
2
 
3
  Listens for PR merge events on evaleval/EEE_datastore and triggers
4
  card generation for new benchmarks in a background thread.
5
+ Queued folders are persisted so nothing is lost between webhook events.
6
  """
7
 
8
  import hmac
 
18
  detect_new_benchmarks,
19
  process_new_benchmarks,
20
  load_state,
21
+ save_pending,
22
+ pop_pending,
23
  PERSISTENT_DIR,
24
  )
25
 
 
31
 
32
  app = FastAPI(title="BenchmarkCard Webhook")
33
 
34
+ # Max time a generation job can run before we allow new jobs (1 hour)
35
+ MAX_JOB_DURATION_SECONDS = 3600
36
+
37
  # Track active generation thread (max 1 concurrent)
38
  _active_job: dict = {"thread": None, "started_at": None, "folders": []}
39
  _job_lock = threading.Lock()
 
57
  )
58
 
59
 
60
+ def _is_job_timed_out() -> bool:
61
+ """Check if the active job has exceeded the timeout."""
62
+ started = _active_job.get("started_at")
63
+ if not started:
64
+ return False
65
+ try:
66
+ started_dt = datetime.fromisoformat(started)
67
+ elapsed = (datetime.now(timezone.utc) - started_dt).total_seconds()
68
+ return elapsed > MAX_JOB_DURATION_SECONDS
69
+ except (ValueError, TypeError):
70
+ return False
71
+
72
+
73
  def _run_generation(new_folders: list[str]):
74
+ """Background worker: generate cards, then drain pending queue."""
75
  try:
76
  logger.info("Background generation started for %d folders: %s", len(new_folders), new_folders)
77
  process_new_benchmarks(new_folders)
78
  logger.info("Background generation completed")
79
+
80
+ # Drain pending queue: process any folders that arrived while we were busy
81
+ while True:
82
+ pending = pop_pending()
83
+ if not pending:
84
+ break
85
+ logger.info("Draining pending queue: %d folders: %s", len(pending), pending)
86
+ # Re-detect to catch any new folders since the pending was saved
87
+ process_new_benchmarks(pending)
88
+
89
  except Exception:
90
  logger.exception("Background generation failed")
91
  finally:
 
98
  @app.post("/webhook")
99
  async def webhook(request: Request):
100
  """Receive HF webhook events and trigger card generation."""
 
101
  secret = request.headers.get("X-Webhook-Secret", "")
102
  if not _verify_secret(secret):
103
  return JSONResponse(status_code=403, content={"error": "invalid secret"})
104
 
105
  payload = await request.json()
106
 
 
107
  if not _is_merged_pr(payload):
108
  event_scope = payload.get("event", {}).get("scope", "unknown")
109
  discussion = payload.get("discussion", {})
 
118
  pr_num = discussion.get("num", "?")
119
  logger.info("Merged PR detected: #%s '%s'", pr_num, pr_title)
120
 
 
121
  new_folders = detect_new_benchmarks()
122
  if not new_folders:
123
  return JSONResponse(
 
125
  content={"action": "no_new_benchmarks", "pr": f"#{pr_num} {pr_title}"},
126
  )
127
 
 
128
  with _job_lock:
129
+ thread_alive = _active_job["thread"] is not None and _active_job["thread"].is_alive()
130
+ timed_out = thread_alive and _is_job_timed_out()
131
+
132
+ if timed_out:
133
+ logger.warning(
134
+ "Active job timed out (started %s), allowing new job",
135
+ _active_job["started_at"],
136
+ )
137
+ # Don't kill old thread (daemon, will die on exit), just reset tracking
138
+ _active_job["thread"] = None
139
+ _active_job["started_at"] = None
140
+ _active_job["folders"] = []
141
+ thread_alive = False
142
+
143
+ if thread_alive:
144
+ # Persist to pending queue so they get processed after current job
145
+ save_pending(new_folders)
146
+ logger.info("Job in progress, queued %d folders to pending", len(new_folders))
147
  return JSONResponse(
148
  status_code=200,
149
  content={
150
  "action": "queued",
151
+ "reason": "generation in progress, folders saved to pending queue",
152
  "active_folders": _active_job["folders"],
153
+ "queued_folders": new_folders,
154
  },
155
  )
156
 
 
186
  return {
187
  "active_job": active,
188
  "known_folders": len(state.get("known_folders", [])),
189
+ "pending_folders": state.get("pending_folders", []),
190
  "jobs": state.get("jobs", [])[-20:],
191
  }
192
 
193
 
194
+ @app.post("/init-known-folders")
195
+ async def init_known_folders(request: Request):
196
+ """One-time: mark all current EEE folders as known."""
197
+ secret = request.headers.get("X-Webhook-Secret", "")
198
+ if not _verify_secret(secret):
199
+ return JSONResponse(status_code=403, content={"error": "invalid secret"})
200
+
201
+ from huggingface_hub import HfApi
202
+ from worker import save_state
203
+
204
+ api = HfApi()
205
+ all_files = api.list_repo_files("evaleval/EEE_datastore", repo_type="dataset")
206
+ current_folders = sorted({
207
+ p.split("/")[1] for p in all_files
208
+ if p.startswith("data/") and len(p.split("/")) >= 2
209
+ })
210
+
211
+ state = load_state()
212
+ state["known_folders"] = current_folders
213
+ save_state(state)
214
+ return {"action": "initialized", "known_folders_count": len(current_folders)}
215
+
216
+
217
  @app.get("/")
218
  async def root():
219
  """Root endpoint for HF Space UI."""
220
+ return {"service": "BenchmarkCard Webhook", "endpoints": ["/webhook", "/status", "/health", "/init-known-folders"]}
221
 
222
 
223
  @app.get("/health")
worker.py CHANGED
@@ -1,18 +1,20 @@
1
  """Background worker for benchmark card generation.
2
 
3
- Detects new benchmarks in EEE_datastore by scanning folders for actual
4
- benchmark names, resolves them via the Entity Registry for canonical IDs,
5
- and skips benchmarks that already have cards in evaleval/auto-benchmarkcards.
 
6
  """
7
 
8
  import json
9
  import logging
10
  import os
11
- import shutil
12
  import tempfile
 
13
  from datetime import datetime, timezone
 
14
  from pathlib import Path
15
- from typing import Any
16
 
17
  import requests
18
  from huggingface_hub import HfApi, snapshot_download
@@ -21,8 +23,6 @@ logger = logging.getLogger("worker")
21
 
22
  EEE_REPO = "evaleval/EEE_datastore"
23
  CARDS_REPO = "evaleval/auto-benchmarkcards"
24
- MAX_BENCHMARKS_PER_JOB = 5
25
-
26
  ENTITY_REGISTRY_URL = "https://evaleval-entity-registry.hf.space/api/v1"
27
 
28
  # Persistent storage on HF Spaces (mounted volume).
@@ -30,176 +30,209 @@ ENTITY_REGISTRY_URL = "https://evaleval-entity-registry.hf.space/api/v1"
30
  PERSISTENT_DIR = Path(os.environ.get("PERSISTENT_DIR", "/data"))
31
  STATE_FILE = PERSISTENT_DIR / "state.json"
32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
  def load_state() -> dict:
35
- """Load persistent state (known folders, job history)."""
36
  if STATE_FILE.exists():
37
  try:
38
  return json.loads(STATE_FILE.read_text())
39
  except Exception:
40
  logger.exception("Failed to read state file, starting fresh")
41
- return {"known_folders": [], "jobs": []}
42
 
43
 
44
  def save_state(state: dict) -> None:
45
- """Save persistent state to disk (atomic write via temp + rename)."""
46
  PERSISTENT_DIR.mkdir(parents=True, exist_ok=True)
47
  tmp = STATE_FILE.with_suffix(".tmp")
48
  tmp.write_text(json.dumps(state, indent=2))
49
  tmp.rename(STATE_FILE)
50
 
51
 
52
- def _normalize_name(name: str) -> str:
53
- """Local fallback normalization (lowercase, collapse separators to underscore)."""
54
- return name.lower().replace("-", "_").replace(" ", "_")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
 
 
56
 
57
- def _resolve_names(names: list[str]) -> dict[str, str]:
58
- """Resolve benchmark names to canonical IDs via Entity Registry.
59
 
60
- Returns a mapping {raw_name: canonical_id}. Names the registry doesn't
61
- recognize get a locally-normalized fallback ID so dedup still works.
 
 
 
 
62
  """
63
- resolved = {}
 
64
 
65
- if not names:
66
- return resolved
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
 
68
  try:
 
69
  resp = requests.post(
70
  f"{ENTITY_REGISTRY_URL}/resolve/batch",
71
- json=[{"raw_value": n, "entity_type": "benchmark"} for n in names],
72
- timeout=15,
73
  )
74
  resp.raise_for_status()
75
  results = resp.json()
 
 
 
 
 
 
 
 
 
76
 
77
- # Response is a list in the same order as the input
78
- for name, item in zip(names, results):
79
- canonical = item.get("canonical_id")
80
- resolved[name] = canonical if canonical else _normalize_name(name)
81
 
82
- registry_hits = sum(1 for item in results if item.get("canonical_id"))
83
- logger.info(
84
- "Entity Registry resolved %d/%d names",
85
- registry_hits, len(names),
86
- )
87
- except Exception:
88
- logger.warning("Entity Registry unreachable, falling back to local normalization")
89
- for n in names:
90
- resolved[n] = _normalize_name(n)
91
 
92
- return resolved
 
93
 
 
 
 
 
 
 
 
 
 
 
 
 
94
 
95
  def _extract_folders(file_list: list[str]) -> set[str]:
96
  """Extract unique top-level folder names under data/."""
97
  folders = set()
98
  for path in file_list:
99
  parts = path.split("/")
100
- if len(parts) >= 3 and parts[0] == "data":
101
  folders.add(parts[1])
102
  return folders
103
 
104
 
105
- def _get_existing_cards() -> set[str]:
106
- """List card names already in the target dataset (without path/extension)."""
 
107
  api = HfApi()
108
- try:
109
- files = api.list_repo_files(CARDS_REPO, repo_type="dataset")
110
- except Exception:
111
- logger.exception("Failed to list existing cards")
112
- return set()
 
 
 
 
 
 
 
 
113
 
114
- names = set()
115
- for f in files:
116
- if f.startswith("cards/") and f.endswith(".json"):
117
- names.add(f[len("cards/"):-len(".json")])
118
- return names
119
 
 
120
 
121
- def _download_folder(folder_name: str) -> Path:
122
- """Download a single EEE folder to a temp directory."""
123
- target_dir = tempfile.mkdtemp(prefix=f"eee_{folder_name}_")
124
- logger.info("Downloading EEE folder '%s' to %s", folder_name, target_dir)
 
125
 
126
  snapshot_download(
127
  repo_id=EEE_REPO,
128
  repo_type="dataset",
129
  local_dir=target_dir,
130
- allow_patterns=[f"data/{folder_name}/**/*.json"],
131
  )
132
 
133
  return Path(target_dir) / "data"
134
 
135
 
136
- def detect_new_benchmarks() -> list[str]:
137
- """Find EEE folders that contain benchmarks without cards.
138
-
139
- Returns folder names that have at least one benchmark not yet in the
140
- cards dataset. The actual per-benchmark filtering happens during processing.
141
- """
142
- api = HfApi()
143
- try:
144
- all_files = api.list_repo_files(EEE_REPO, repo_type="dataset")
145
- except Exception:
146
- logger.exception("Failed to list EEE_datastore files")
147
- return []
148
-
149
- current_folders = _extract_folders(all_files)
150
- existing_cards = _get_existing_cards()
151
-
152
- # Normalize existing card names for comparison (match both hyphen and underscore)
153
- normalized_cards = set()
154
- for c in existing_cards:
155
- normalized_cards.add(c)
156
- normalized_cards.add(c.replace("-", "_"))
157
- normalized_cards.add(c.replace("_", "-"))
158
-
159
- # A folder is "new" if its normalized name doesn't match any card.
160
- # This is a coarse filter — per-benchmark dedup happens in process_new_benchmarks.
161
- new_folders = sorted(
162
- f for f in current_folders
163
- if _normalize_name(f) not in normalized_cards
164
- )
165
-
166
- if not new_folders:
167
- logger.info("All %d folders already have cards", len(current_folders))
168
- return []
169
-
170
- if len(new_folders) > MAX_BENCHMARKS_PER_JOB:
171
- logger.info(
172
- "Found %d folders without cards, limiting to %d per job",
173
- len(new_folders), MAX_BENCHMARKS_PER_JOB,
174
- )
175
- new_folders = new_folders[:MAX_BENCHMARKS_PER_JOB]
176
-
177
- logger.info("Processing %d folders: %s", len(new_folders), new_folders)
178
- return new_folders
179
-
180
-
181
- def _upload_card(card: dict, benchmark_name: str, canonical_id: str | None = None) -> bool:
182
- """Upload a generated card to the auto-benchmarkcards dataset."""
183
  api = HfApi()
 
 
184
 
185
- # Use canonical ID for filename when available, fall back to local normalization
186
- if canonical_id:
187
- safe_name = canonical_id
188
- else:
189
- safe_name = _normalize_name(benchmark_name).replace("/", "_")
190
-
191
- remote_path = f"cards/{safe_name}.json"
192
-
193
- # Embed canonical ID in the card so it's self-consistent with the filename
194
- if canonical_id:
195
- inner = card.get("benchmark_card", card)
196
- inner.setdefault("card_info", {})["canonical_id"] = canonical_id
197
 
198
  try:
199
- with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
200
- json.dump(card, f, indent=2)
201
- tmp_path = f.name
202
-
203
  api.upload_file(
204
  path_or_fileobj=tmp_path,
205
  path_in_repo=remote_path,
@@ -209,10 +242,6 @@ def _upload_card(card: dict, benchmark_name: str, canonical_id: str | None = Non
209
  )
210
  logger.info("Uploaded card to %s/%s", CARDS_REPO, remote_path)
211
  return True
212
-
213
- except Exception:
214
- logger.exception("Failed to upload card for %s", benchmark_name)
215
- return False
216
  finally:
217
  try:
218
  os.unlink(tmp_path)
@@ -220,30 +249,82 @@ def _upload_card(card: dict, benchmark_name: str, canonical_id: str | None = Non
220
  pass
221
 
222
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
223
  def process_new_benchmarks(new_folders: list[str]) -> None:
224
- """Generate and upload cards for benchmarks that don't have one yet.
225
 
226
- Downloads each folder, scans for benchmarks, resolves names via the
227
- Entity Registry, and skips any benchmark that already has a card.
228
  """
229
- from auto_benchmarkcard.tools.eee.eee_tool import (
230
- scan_eee_folder,
231
- eee_to_pipeline_inputs,
232
- )
233
- from auto_benchmarkcard.eee_workflow import process_single_benchmark
234
- from auto_benchmarkcard.logging_setup import setup_logging_suppression
235
 
236
  setup_logging_suppression(debug_mode=False)
237
 
238
- # Fetch existing cards once for the whole job.
239
- # Normalize to both hyphen and underscore forms so we match old and new naming.
240
- existing_cards = _get_existing_cards()
241
- existing_normalized = set()
242
- for c in existing_cards:
243
- existing_normalized.add(c)
244
- existing_normalized.add(c.replace("-", "_"))
245
- existing_normalized.add(c.replace("_", "-"))
246
-
247
  state = load_state()
248
  job_record: dict[str, Any] = {
249
  "started_at": datetime.now(timezone.utc).isoformat(),
@@ -251,81 +332,102 @@ def process_new_benchmarks(new_folders: list[str]) -> None:
251
  "results": [],
252
  }
253
 
254
- for folder_name in new_folders:
255
- logger.info("Processing folder: %s", folder_name)
 
 
 
 
 
256
 
257
- tmp_root = None
 
258
  try:
259
- data_path = _download_folder(folder_name)
260
- tmp_root = data_path.parent
261
  except Exception:
262
- logger.exception("Failed to download folder %s", folder_name)
263
  job_record["results"].append({
264
- "folder": folder_name, "status": "download_failed",
265
  })
266
- continue
 
 
 
 
267
 
 
268
  try:
269
  scan_result = scan_eee_folder(str(data_path))
270
-
271
- # Resolve all benchmark names in this folder at once
272
- bench_names = list(scan_result.benchmarks.keys())
273
- resolved = _resolve_names(bench_names)
274
-
275
- for name, bench in sorted(scan_result.benchmarks.items()):
276
- canonical = resolved.get(name, _normalize_name(name))
277
-
278
- # Per-benchmark dedup: skip if a card already exists
279
- if canonical in existing_normalized:
280
- logger.info("Skipping %s: card already exists (canonical: %s)", name, canonical)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
281
  job_record["results"].append({
282
- "folder": folder_name, "benchmark": name,
283
- "canonical_id": canonical, "status": "already_exists",
 
284
  })
285
- continue
286
-
287
- inputs = eee_to_pipeline_inputs(bench)
288
-
289
- if not inputs.get("hf_repo"):
290
- logger.warning("Skipping %s: no HF repo", name)
291
  job_record["results"].append({
292
- "folder": folder_name, "benchmark": name, "status": "no_hf_repo",
293
  })
294
- continue
295
-
296
- card = process_single_benchmark(
297
- benchmark_name=name,
298
- pipeline_inputs=inputs,
299
- base_output_path=str(PERSISTENT_DIR / "output"),
300
- debug=False,
301
- )
302
 
303
- if card:
304
- uploaded = _upload_card(card, name, canonical_id=canonical)
305
- job_record["results"].append({
306
- "folder": folder_name, "benchmark": name,
307
- "canonical_id": canonical,
308
- "status": "uploaded" if uploaded else "upload_failed",
309
- })
310
- # Add to existing set so later benchmarks in same job are deduped
311
- if uploaded:
312
- existing_normalized.add(canonical)
313
- existing_normalized.add(canonical.replace("-", "_"))
314
- else:
 
 
 
 
 
 
315
  job_record["results"].append({
316
- "folder": folder_name, "benchmark": name, "status": "generation_failed",
317
  })
318
 
319
- except Exception:
320
- logger.exception("Failed to process folder %s", folder_name)
321
- job_record["results"].append({
322
- "folder": folder_name, "status": "scan_failed",
323
- })
324
-
325
- finally:
326
- if tmp_root and tmp_root.exists():
327
- shutil.rmtree(tmp_root, ignore_errors=True)
328
-
329
  if folder_name not in state["known_folders"]:
330
  state["known_folders"].append(folder_name)
331
 
@@ -333,8 +435,8 @@ def process_new_benchmarks(new_folders: list[str]) -> None:
333
 
334
  results = job_record["results"]
335
  uploaded = sum(1 for r in results if r.get("status") == "uploaded")
336
- failed = sum(1 for r in results if r.get("status") not in ("uploaded", "no_hf_repo", "already_exists"))
337
- skipped = sum(1 for r in results if r.get("status") in ("no_hf_repo", "already_exists"))
338
  logger.info("Job complete: %d uploaded, %d failed, %d skipped", uploaded, failed, skipped)
339
 
340
  state["jobs"].append(job_record)
 
1
  """Background worker for benchmark card generation.
2
 
3
+ Detects new benchmark folders in EEE_datastore, generates cards via
4
+ run_eee_pipeline(), and uploads them to evaleval/auto-benchmarkcards.
5
+
6
+ Uses Jenny's Entity Registry for canonical ID resolution and dedup.
7
  """
8
 
9
  import json
10
  import logging
11
  import os
 
12
  import tempfile
13
+ import time
14
  from datetime import datetime, timezone
15
+ from functools import wraps
16
  from pathlib import Path
17
+ from typing import Any, Optional
18
 
19
  import requests
20
  from huggingface_hub import HfApi, snapshot_download
 
23
 
24
  EEE_REPO = "evaleval/EEE_datastore"
25
  CARDS_REPO = "evaleval/auto-benchmarkcards"
 
 
26
  ENTITY_REGISTRY_URL = "https://evaleval-entity-registry.hf.space/api/v1"
27
 
28
  # Persistent storage on HF Spaces (mounted volume).
 
30
  PERSISTENT_DIR = Path(os.environ.get("PERSISTENT_DIR", "/data"))
31
  STATE_FILE = PERSISTENT_DIR / "state.json"
32
 
33
+ FORCE_REGENERATE = os.environ.get("FORCE_REGENERATE", "").lower() in ("1", "true", "yes")
34
+
35
+
36
+ # -- Retry decorator for transient failures --
37
+
38
+ def retry(max_attempts=3, delay=5, backoff=2):
39
+ """Retry decorator with exponential backoff for transient failures."""
40
+ def decorator(func):
41
+ @wraps(func)
42
+ def wrapper(*args, **kwargs):
43
+ for attempt in range(max_attempts):
44
+ try:
45
+ return func(*args, **kwargs)
46
+ except Exception as e:
47
+ if attempt == max_attempts - 1:
48
+ raise
49
+ wait = delay * (backoff ** attempt)
50
+ logger.warning(
51
+ "%s failed (attempt %d/%d), retrying in %ds: %s",
52
+ func.__name__, attempt + 1, max_attempts, wait, e,
53
+ )
54
+ time.sleep(wait)
55
+ return wrapper
56
+ return decorator
57
+
58
+
59
+ # -- State management (atomic writes) --
60
 
61
  def load_state() -> dict:
62
+ """Load persistent state (known folders, job history, pending queue)."""
63
  if STATE_FILE.exists():
64
  try:
65
  return json.loads(STATE_FILE.read_text())
66
  except Exception:
67
  logger.exception("Failed to read state file, starting fresh")
68
+ return {"known_folders": [], "jobs": [], "pending_folders": []}
69
 
70
 
71
  def save_state(state: dict) -> None:
72
+ """Save persistent state atomically (write-then-rename)."""
73
  PERSISTENT_DIR.mkdir(parents=True, exist_ok=True)
74
  tmp = STATE_FILE.with_suffix(".tmp")
75
  tmp.write_text(json.dumps(state, indent=2))
76
  tmp.rename(STATE_FILE)
77
 
78
 
79
+ def save_pending(folders: list[str]) -> None:
80
+ """Add folders to the pending queue."""
81
+ state = load_state()
82
+ pending = state.get("pending_folders", [])
83
+ for f in folders:
84
+ if f not in pending:
85
+ pending.append(f)
86
+ state["pending_folders"] = pending
87
+ save_state(state)
88
+
89
+
90
+ def pop_pending() -> list[str]:
91
+ """Pop all pending folders from the queue."""
92
+ state = load_state()
93
+ pending = state.pop("pending_folders", [])
94
+ state["pending_folders"] = []
95
+ save_state(state)
96
+ return pending
97
+
98
 
99
+ # -- Entity Registry --
100
 
101
+ _canonical_cache: dict[str, Optional[str]] = {}
 
102
 
103
+
104
+ def resolve_canonical_id(benchmark_name: str) -> Optional[str]:
105
+ """Resolve benchmark name to canonical_id via Entity Registry.
106
+
107
+ Returns canonical_id string (e.g. "math") or None if not found.
108
+ Uses an in-memory cache to avoid repeated API calls within a job.
109
  """
110
+ if benchmark_name in _canonical_cache:
111
+ return _canonical_cache[benchmark_name]
112
 
113
+ try:
114
+ resp = requests.post(
115
+ f"{ENTITY_REGISTRY_URL}/resolve",
116
+ json={"raw_value": benchmark_name, "entity_type": "benchmark"},
117
+ timeout=10,
118
+ )
119
+ resp.raise_for_status()
120
+ data = resp.json()
121
+ canonical_id = data.get("canonical_id")
122
+ _canonical_cache[benchmark_name] = canonical_id
123
+ if canonical_id:
124
+ logger.info("Entity Registry: '%s' -> '%s'", benchmark_name, canonical_id)
125
+ return canonical_id
126
+ except Exception:
127
+ logger.debug("Entity Registry lookup failed for '%s'", benchmark_name)
128
+ _canonical_cache[benchmark_name] = None
129
+ return None
130
+
131
+
132
+ def resolve_canonical_ids_batch(names: list[str]) -> dict[str, Optional[str]]:
133
+ """Batch-resolve benchmark names to canonical_ids."""
134
+ # Check cache first, only query uncached names
135
+ uncached = [n for n in names if n not in _canonical_cache]
136
+ if not uncached:
137
+ return {n: _canonical_cache[n] for n in names}
138
 
139
  try:
140
+ payload = [{"raw_value": n, "entity_type": "benchmark"} for n in uncached]
141
  resp = requests.post(
142
  f"{ENTITY_REGISTRY_URL}/resolve/batch",
143
+ json=payload,
144
+ timeout=30,
145
  )
146
  resp.raise_for_status()
147
  results = resp.json()
148
+ for name, result in zip(uncached, results):
149
+ canonical_id = result.get("canonical_id")
150
+ _canonical_cache[name] = canonical_id
151
+ if canonical_id:
152
+ logger.info("Entity Registry: '%s' -> '%s'", name, canonical_id)
153
+ except Exception:
154
+ logger.warning("Entity Registry batch resolve failed, using fallback")
155
+ for name in uncached:
156
+ _canonical_cache[name] = None
157
 
158
+ return {n: _canonical_cache.get(n) for n in names}
 
 
 
159
 
 
 
 
 
 
 
 
 
 
160
 
161
+ def _get_card_filename(benchmark_name: str) -> str:
162
+ """Get the canonical filename for a benchmark card.
163
 
164
+ Uses Entity Registry canonical_id when available, falls back to
165
+ sanitize_benchmark_name from the main package.
166
+ """
167
+ canonical = resolve_canonical_id(benchmark_name)
168
+ if canonical:
169
+ return canonical
170
+
171
+ from auto_benchmarkcard.output import sanitize_benchmark_name
172
+ return sanitize_benchmark_name(benchmark_name).lower()
173
+
174
+
175
+ # -- EEE folder detection --
176
 
177
  def _extract_folders(file_list: list[str]) -> set[str]:
178
  """Extract unique top-level folder names under data/."""
179
  folders = set()
180
  for path in file_list:
181
  parts = path.split("/")
182
+ if len(parts) >= 2 and parts[0] == "data":
183
  folders.add(parts[1])
184
  return folders
185
 
186
 
187
+ @retry(max_attempts=3, delay=5)
188
+ def detect_new_benchmarks() -> list[str]:
189
+ """Compare current EEE_datastore file listing against known state."""
190
  api = HfApi()
191
+ all_files = api.list_repo_files(EEE_REPO, repo_type="dataset")
192
+
193
+ current_folders = _extract_folders(all_files)
194
+ state = load_state()
195
+ known = set(state.get("known_folders", []))
196
+
197
+ new_folders = sorted(current_folders - known)
198
+ if new_folders:
199
+ logger.info("Detected %d new folders: %s", len(new_folders), new_folders)
200
+ else:
201
+ logger.info("No new folders (known: %d, current: %d)", len(known), len(current_folders))
202
+
203
+ return new_folders
204
 
 
 
 
 
 
205
 
206
+ # -- Download & upload --
207
 
208
+ @retry(max_attempts=3, delay=10)
209
+ def _download_folders(folder_names: list[str], target_dir: str) -> Path:
210
+ """Download EEE folders into a shared temp directory."""
211
+ patterns = [f"data/{f}/**/*.json" for f in folder_names]
212
+ logger.info("Downloading %d EEE folders to %s", len(folder_names), target_dir)
213
 
214
  snapshot_download(
215
  repo_id=EEE_REPO,
216
  repo_type="dataset",
217
  local_dir=target_dir,
218
+ allow_patterns=patterns,
219
  )
220
 
221
  return Path(target_dir) / "data"
222
 
223
 
224
+ @retry(max_attempts=3, delay=5)
225
+ def _upload_card(card: dict, benchmark_name: str, canonical_id: Optional[str] = None) -> bool:
226
+ """Upload a generated card to evaleval/auto-benchmarkcards."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
227
  api = HfApi()
228
+ filename = canonical_id or _get_card_filename(benchmark_name)
229
+ remote_path = f"cards/{filename}.json"
230
 
231
+ with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
232
+ json.dump(card, f, indent=2)
233
+ tmp_path = f.name
 
 
 
 
 
 
 
 
 
234
 
235
  try:
 
 
 
 
236
  api.upload_file(
237
  path_or_fileobj=tmp_path,
238
  path_in_repo=remote_path,
 
242
  )
243
  logger.info("Uploaded card to %s/%s", CARDS_REPO, remote_path)
244
  return True
 
 
 
 
245
  finally:
246
  try:
247
  os.unlink(tmp_path)
 
249
  pass
250
 
251
 
252
+ @retry(max_attempts=2, delay=5)
253
+ def _list_existing_cards() -> set[str]:
254
+ """List all card filenames (without extension) in the cards repo."""
255
+ api = HfApi()
256
+ all_files = api.list_repo_files(CARDS_REPO, repo_type="dataset")
257
+ cards = set()
258
+ for path in all_files:
259
+ if path.startswith("cards/") and path.endswith(".json"):
260
+ name = path[len("cards/"):-len(".json")]
261
+ cards.add(name)
262
+ return cards
263
+
264
+
265
+ # -- Main processing --
266
+
267
+ def _build_dedup_filter(
268
+ benchmark_names: list[str],
269
+ existing_cards: set[str],
270
+ ) -> list[str]:
271
+ """Return list of benchmark names that don't already have cards.
272
+
273
+ Checks in order: Entity Registry canonical_id, exact fallback name,
274
+ and parent prefix match (for 'Parent - Child' pattern).
275
+ """
276
+ if FORCE_REGENERATE:
277
+ logger.info("FORCE_REGENERATE=true, skipping dedup")
278
+ return benchmark_names
279
+
280
+ # Batch-resolve all names
281
+ canonical_map = resolve_canonical_ids_batch(benchmark_names)
282
+
283
+ from auto_benchmarkcard.output import sanitize_benchmark_name
284
+
285
+ new_benchmarks = []
286
+ for name in benchmark_names:
287
+ canonical = canonical_map.get(name)
288
+ fallback = sanitize_benchmark_name(name).lower()
289
+
290
+ # 1. Entity Registry canonical_id match
291
+ if canonical and canonical in existing_cards:
292
+ logger.info("Skipping '%s' (card exists as '%s')", name, canonical)
293
+ continue
294
+
295
+ # 2. Exact fallback name match
296
+ if fallback in existing_cards:
297
+ logger.info("Skipping '%s' (card exists as '%s')", name, fallback)
298
+ continue
299
+
300
+ # 3. Prefix match: 'MGSM - Bengali' -> check if 'mgsm' card exists
301
+ if " - " in name:
302
+ parent = name.split(" - ", 1)[0].strip()
303
+ parent_lower = sanitize_benchmark_name(parent).lower()
304
+ if parent_lower in existing_cards:
305
+ logger.info("Skipping '%s' (parent card exists as '%s')", name, parent_lower)
306
+ continue
307
+
308
+ new_benchmarks.append(name)
309
+
310
+ logger.info("Dedup: %d total, %d new, %d existing",
311
+ len(benchmark_names), len(new_benchmarks),
312
+ len(benchmark_names) - len(new_benchmarks))
313
+ return new_benchmarks
314
+
315
+
316
  def process_new_benchmarks(new_folders: list[str]) -> None:
317
+ """Generate and upload cards for benchmarks in new folders.
318
 
319
+ Delegates to run_eee_pipeline() for the actual generation, using a
320
+ callback to upload each card as it's generated.
321
  """
322
+ from auto_benchmarkcard.eee_workflow import run_eee_pipeline
323
+ from auto_benchmarkcard.tools.eee.eee_tool import scan_eee_folder
324
+ from auto_benchmarkcard.workflow import setup_logging_suppression
 
 
 
325
 
326
  setup_logging_suppression(debug_mode=False)
327
 
 
 
 
 
 
 
 
 
 
328
  state = load_state()
329
  job_record: dict[str, Any] = {
330
  "started_at": datetime.now(timezone.utc).isoformat(),
 
332
  "results": [],
333
  }
334
 
335
+ # Pre-fetch existing cards for dedup
336
+ try:
337
+ existing_cards = _list_existing_cards()
338
+ logger.info("Found %d existing cards in %s", len(existing_cards), CARDS_REPO)
339
+ except Exception:
340
+ logger.warning("Failed to list existing cards, dedup disabled for this job")
341
+ existing_cards = set()
342
 
343
+ # Download all folders into one shared temp dir
344
+ with tempfile.TemporaryDirectory(prefix="eee_batch_") as tmpdir:
345
  try:
346
+ data_path = _download_folders(new_folders, tmpdir)
 
347
  except Exception:
348
+ logger.exception("Failed to download EEE folders")
349
  job_record["results"].append({
350
+ "folders": new_folders, "status": "download_failed",
351
  })
352
+ job_record["completed_at"] = datetime.now(timezone.utc).isoformat()
353
+ state["jobs"].append(job_record)
354
+ state["jobs"] = state["jobs"][-50:]
355
+ save_state(state)
356
+ return
357
 
358
+ # Scan to discover benchmark names for dedup
359
  try:
360
  scan_result = scan_eee_folder(str(data_path))
361
+ except Exception:
362
+ logger.exception("Failed to scan EEE data")
363
+ job_record["results"].append({
364
+ "folders": new_folders, "status": "scan_failed",
365
+ })
366
+ job_record["completed_at"] = datetime.now(timezone.utc).isoformat()
367
+ state["jobs"].append(job_record)
368
+ state["jobs"] = state["jobs"][-50:]
369
+ save_state(state)
370
+ return
371
+
372
+ all_names = (
373
+ list(scan_result.benchmarks.keys())
374
+ + list(scan_result.composites.keys())
375
+ )
376
+ benchmarks_to_generate = _build_dedup_filter(all_names, existing_cards)
377
+
378
+ if not benchmarks_to_generate:
379
+ logger.info("All benchmarks already have cards, nothing to generate")
380
+ job_record["results"].append({"status": "all_existing"})
381
+ else:
382
+ # Upload callback: called by run_eee_pipeline for each generated card
383
+ def _on_card_generated(name: str, card: dict) -> None:
384
+ canonical = resolve_canonical_id(name)
385
+
386
+ # Enrich card metadata
387
+ inner = card.get("benchmark_card", card)
388
+ info = inner.get("card_info", {})
389
+ info["source"] = "webhook"
390
+ if canonical:
391
+ info["canonical_id"] = canonical
392
+ inner["card_info"] = info
393
+
394
+ try:
395
+ _upload_card(card, name, canonical_id=canonical)
396
  job_record["results"].append({
397
+ "benchmark": name,
398
+ "canonical_id": canonical,
399
+ "status": "uploaded",
400
  })
401
+ except Exception:
402
+ logger.exception("Failed to upload card for %s", name)
 
 
 
 
403
  job_record["results"].append({
404
+ "benchmark": name, "status": "upload_failed",
405
  })
 
 
 
 
 
 
 
 
406
 
407
+ # Run the unified pipeline
408
+ summary = run_eee_pipeline(
409
+ eee_path=str(data_path),
410
+ output_path=str(PERSISTENT_DIR / "output"),
411
+ benchmarks_filter=benchmarks_to_generate,
412
+ on_card_generated=_on_card_generated,
413
+ )
414
+
415
+ # Record skipped/failed from pipeline summary
416
+ for item in summary.get("skipped", []):
417
+ job_record["results"].append({
418
+ "benchmark": item.get("benchmark", "unknown"),
419
+ "status": f"skipped:{item.get('reason', 'unknown')}",
420
+ })
421
+ for name in summary.get("failed", []):
422
+ # Only add if not already recorded by callback
423
+ existing = {r.get("benchmark") for r in job_record["results"]}
424
+ if name not in existing:
425
  job_record["results"].append({
426
+ "benchmark": name, "status": "generation_failed",
427
  })
428
 
429
+ # Mark folders as known
430
+ for folder_name in new_folders:
 
 
 
 
 
 
 
 
431
  if folder_name not in state["known_folders"]:
432
  state["known_folders"].append(folder_name)
433
 
 
435
 
436
  results = job_record["results"]
437
  uploaded = sum(1 for r in results if r.get("status") == "uploaded")
438
+ failed = sum(1 for r in results if "failed" in r.get("status", ""))
439
+ skipped = sum(1 for r in results if r.get("status", "").startswith("skipped"))
440
  logger.info("Job complete: %d uploaded, %d failed, %d skipped", uploaded, failed, skipped)
441
 
442
  state["jobs"].append(job_record)