| """ |
| evaluate.py - DISBench Evaluation Script |
| |
| Responsibilities: |
| 1. Scan all JSON submission files in submissions/ directory |
| 2. For each unevaluated submission, compare with groundtruth.jsonl to calculate scores |
| 3. Append new results to leaderboard_data.json |
| 4. (Optional) Commit updated leaderboard_data.json back to HF repository |
| |
| Execution: |
| - Automatic: Called when app.py starts (automatically triggered on Space rebuild) |
| - Manual: python evaluate.py |
| """ |
|
|
| import os |
| import json |
| import logging |
| from datetime import datetime |
| from typing import Dict, List, Set, Tuple, Optional |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| |
| SUBMISSIONS_DIR = "submissions" |
| GROUND_TRUTH_FILE = "groundtruth.jsonl" |
| LEADERBOARD_FILE = "leaderboard_data.json" |
|
|
|
|
| |
| |
| |
|
|
| def compute_em(predicted: Set[str], gold: Set[str]) -> float: |
| """Exact Match: returns 1 if predicted set exactly matches gold set, otherwise 0""" |
| return 1.0 if predicted == gold else 0.0 |
|
|
|
|
| def compute_f1(predicted: Set[str], gold: Set[str]) -> float: |
| """F1 Score: harmonic mean of set-based precision and recall""" |
| if not predicted and not gold: |
| return 1.0 |
| if not predicted or not gold: |
| return 0.0 |
|
|
| tp = len(predicted & gold) |
| precision = tp / len(predicted) |
| recall = tp / len(gold) |
|
|
| if precision + recall == 0: |
| return 0.0 |
| return 2 * precision * recall / (precision + recall) |
|
|
|
|
| def load_ground_truth() -> Dict: |
| """ |
| Load ground truth file (JSONL format). |
| |
| Input format (groundtruth.jsonl): |
| Each line is a JSON object: |
| { |
| "query_id": "1", |
| "user_id": "...", |
| "query": "...", |
| "answer": ["photo_id_1", "photo_id_2"], |
| "event_type": "intra-event" // "intra-event" or "inter-event" |
| } |
| |
| Converted to internal format: |
| { |
| "queries": { |
| "1": { |
| "type": "intra", // "intra" or "inter" |
| "gold_photos": ["photo_id_1", "photo_id_2"] |
| }, |
| ... |
| } |
| } |
| """ |
| if not os.path.exists(GROUND_TRUTH_FILE): |
| logger.warning(f"Ground truth file not found: {GROUND_TRUTH_FILE}") |
| return {} |
|
|
| queries = {} |
| with open(GROUND_TRUTH_FILE, 'r', encoding='utf-8') as f: |
| for line_num, line in enumerate(f, 1): |
| line = line.strip() |
| if not line: |
| continue |
| |
| try: |
| entry = json.loads(line) |
| query_id = entry.get("query_id") |
| answer = entry.get("answer", []) |
| event_type = entry.get("event_type", "intra-event") |
| |
| |
| query_type = event_type.replace("-event", "") |
| |
| queries[query_id] = { |
| "type": query_type, |
| "gold_photos": answer |
| } |
| except json.JSONDecodeError as e: |
| logger.warning(f"Invalid JSON at line {line_num}: {e}") |
| continue |
| except Exception as e: |
| logger.warning(f"Error processing line {line_num}: {e}") |
| continue |
| |
| return {"queries": queries} |
|
|
|
|
| def evaluate_predictions( |
| predictions: Dict[str, List[str]], |
| ground_truth: Dict |
| ) -> Dict[str, float]: |
| """ |
| Calculate all metrics for a submission's predictions. |
| |
| Returns: |
| { |
| "overall_em": float, |
| "overall_f1": float, |
| "intra_em": float, |
| "intra_f1": float, |
| "inter_em": float, |
| "inter_f1": float |
| } |
| """ |
| queries = ground_truth.get("queries", {}) |
|
|
| if not queries: |
| logger.warning("Ground truth has no queries, returning zeros.") |
| return { |
| "overall_em": 0.0, "overall_f1": 0.0, |
| "intra_em": 0.0, "intra_f1": 0.0, |
| "inter_em": 0.0, "inter_f1": 0.0, |
| } |
|
|
| |
| scores_by_type = {"intra": {"em": [], "f1": []}, "inter": {"em": [], "f1": []}} |
| all_em, all_f1 = [], [] |
|
|
| for query_id, query_info in queries.items(): |
| gold_set = set(query_info.get("gold_photos", [])) |
| pred_set = set(predictions.get(query_id, [])) |
| query_type = query_info.get("type", "intra") |
|
|
| em = compute_em(pred_set, gold_set) |
| f1 = compute_f1(pred_set, gold_set) |
|
|
| all_em.append(em) |
| all_f1.append(f1) |
|
|
| if query_type in scores_by_type: |
| scores_by_type[query_type]["em"].append(em) |
| scores_by_type[query_type]["f1"].append(f1) |
|
|
| def safe_mean(lst): |
| return round(sum(lst) / len(lst) * 100, 1) if lst else 0.0 |
|
|
| return { |
| "overall_em": safe_mean(all_em), |
| "overall_f1": safe_mean(all_f1), |
| "intra_em": safe_mean(scores_by_type["intra"]["em"]), |
| "intra_f1": safe_mean(scores_by_type["intra"]["f1"]), |
| "inter_em": safe_mean(scores_by_type["inter"]["em"]), |
| "inter_f1": safe_mean(scores_by_type["inter"]["f1"]), |
| } |
|
|
|
|
| |
| |
| |
|
|
| def get_entry_key(entry: Dict) -> Tuple: |
| """ |
| Generate unique identifier key for an entry. |
| |
| The same method may have multiple different configurations (different backbone, retriever, etc.), |
| Only when all key configuration fields are the same, they are considered the same submission. |
| |
| Returns: (method, agent, backbone, retriever, track) |
| """ |
| return ( |
| entry.get("method", ""), |
| entry.get("agent", ""), |
| entry.get("backbone", ""), |
| entry.get("retriever", ""), |
| entry.get("track", "Standard"), |
| ) |
|
|
|
|
| def load_leaderboard() -> list: |
| if os.path.exists(LEADERBOARD_FILE): |
| with open(LEADERBOARD_FILE, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| return [] |
|
|
|
|
| def save_leaderboard(data: list): |
| with open(LEADERBOARD_FILE, 'w', encoding='utf-8') as f: |
| json.dump(data, f, indent=2, ensure_ascii=False) |
|
|
|
|
| def process_submission(filepath: str, ground_truth: Dict) -> Optional[Dict]: |
| """ |
| Process a single submission file, return leaderboard entry (or None if error). |
| """ |
| try: |
| with open(filepath, 'r', encoding='utf-8') as f: |
| submission = json.load(f) |
|
|
| meta = submission.get("meta", {}) |
| predictions = submission.get("predictions", {}) |
|
|
| if not meta.get("method_name"): |
| logger.warning(f"Skipping {filepath}: missing method_name") |
| return None |
|
|
| if not predictions: |
| logger.warning(f"Skipping {filepath}: empty predictions") |
| return None |
|
|
| |
| scores = evaluate_predictions(predictions, ground_truth) |
|
|
| entry = { |
| "method": meta.get("method_name", "Unknown"), |
| "url": meta.get("project_url", "#"), |
| "org": meta.get("organization", "Anonymous"), |
| "agent": meta.get("agent_framework", "Unknown"), |
| "backbone": meta.get("backbone_model", "Unknown"), |
| "retriever": meta.get("retriever_model", "Unknown"), |
| "track": meta.get("track", "Standard"), |
| "date": datetime.now().strftime("%Y-%m-%d"), |
| **scores, |
| } |
|
|
| logger.info( |
| f"Evaluated '{entry['method']}': " |
| f"Overall EM={scores['overall_em']}, F1={scores['overall_f1']}" |
| ) |
| return entry |
|
|
| except Exception as e: |
| logger.error(f"Error processing {filepath}: {e}") |
| return None |
|
|
|
|
| def run_evaluation(): |
| """ |
| Main evaluation pipeline: |
| 1. Load ground truth |
| 2. Scan all files in submissions/ and re-evaluate |
| 3. Deduplicate using configuration combinations (method, agent, backbone, retriever, track) |
| 4. If multiple submissions exist for the same configuration, keep the latest (sorted by filename, last file is considered latest) |
| 5. Return (number of entries, total entries) |
| |
| Notes: |
| - No evaluated.json is maintained, all files are re-evaluated on each startup |
| - submissions/ is the single source of truth |
| - Benefits: simple logic, no state inconsistency, automatic recalculation when evaluation logic changes |
| """ |
| |
| ground_truth = load_ground_truth() |
| if not ground_truth: |
| logger.info("No ground truth file found. Skipping evaluation.") |
| return 0, 0 |
|
|
| |
| if not os.path.exists(SUBMISSIONS_DIR): |
| logger.info("No submissions directory found.") |
| return 0, 0 |
|
|
| |
| |
| entries_by_config = {} |
|
|
| for filename in sorted(os.listdir(SUBMISSIONS_DIR)): |
| if not filename.endswith(".json"): |
| continue |
|
|
| filepath = os.path.join(SUBMISSIONS_DIR, filename) |
| logger.info(f"Processing submission: {filename}") |
|
|
| entry = process_submission(filepath, ground_truth) |
| if entry is not None: |
| config_key = get_entry_key(entry) |
| |
| |
| if config_key in entries_by_config: |
| old_filename = entries_by_config[config_key][1] |
| logger.info( |
| f"Config {config_key} already exists (from {old_filename}), " |
| f"replacing with {filename}" |
| ) |
| |
| entries_by_config[config_key] = (entry, filename) |
|
|
| |
| leaderboard = [entry for entry, _ in entries_by_config.values()] |
|
|
| |
| save_leaderboard(leaderboard) |
| logger.info(f"Leaderboard updated: {len(leaderboard)} unique configurations.") |
|
|
| return len(leaderboard), len(leaderboard) |
|
|
|
|
| def commit_leaderboard_to_repo(): |
| """ |
| (Optional) Commit the updated leaderboard_data.json back to HF repository, |
| to persist data (avoid re-evaluation on every restart). |
| |
| Note: We no longer commit evaluated.json, as we re-evaluate from submissions/ on each startup. |
| """ |
| hf_token = os.environ.get("HF_TOKEN") |
| space_id = os.environ.get("SPACE_ID") |
|
|
| if not hf_token or not space_id: |
| logger.info("HF_TOKEN or SPACE_ID not set, skipping repo commit.") |
| return |
|
|
| try: |
| from huggingface_hub import HfApi, CommitOperationAdd |
|
|
| api = HfApi(token=hf_token) |
|
|
| |
| if not os.path.exists(LEADERBOARD_FILE): |
| logger.warning(f"Leaderboard file {LEADERBOARD_FILE} not found, skipping commit.") |
| return |
|
|
| with open(LEADERBOARD_FILE, 'rb') as f: |
| api.create_commit( |
| repo_id=space_id, |
| repo_type="space", |
| operations=[ |
| CommitOperationAdd( |
| path_in_repo=LEADERBOARD_FILE, |
| path_or_fileobj=f.read(), |
| ) |
| ], |
| commit_message="[Auto] Update leaderboard scores", |
| ) |
| logger.info("Leaderboard committed to repo successfully.") |
|
|
| except Exception as e: |
| logger.error(f"Failed to commit to repo: {e}") |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| logger.info("=" * 60) |
| logger.info("DISBench Evaluation Pipeline - Manual Run") |
| logger.info("=" * 60) |
|
|
| total, _ = run_evaluation() |
|
|
| if total > 0: |
| logger.info(f"Evaluated all submissions. Committing to repo...") |
| commit_leaderboard_to_repo() |
| else: |
| logger.info("No submissions found.") |
|
|
| logger.info(f"Leaderboard has {total} unique configurations.") |