""" Populate the GuardBench leaderboard from HuggingFace datasets. """ import json import os import pandas as pd import tempfile from typing import Dict, List, Optional from datetime import datetime import numpy as np from huggingface_hub import hf_hub_download, HfApi from datasets import load_dataset from src.display.utils import GUARDBENCH_COLUMN, DISPLAY_COLS, CATEGORIES from src.envs import RESULTS_DATASET_ID, TOKEN, CACHE_PATH from src.leaderboard.processor import leaderboard_to_dataframe def get_latest_leaderboard(version="v0") -> Optional[Dict]: """ Get the latest leaderboard data from HuggingFace dataset. """ try: # Try to download the leaderboard file leaderboard_path = hf_hub_download( repo_id=RESULTS_DATASET_ID, filename=f"leaderboards/leaderboard_{version}.json", repo_type="dataset", token=TOKEN ) with open(leaderboard_path, 'r') as f: return json.load(f) except Exception as e: print(f"Error downloading leaderboard: {e}") return None def get_model_entry(model_name: str, mode: str, version="v0") -> Optional[Dict]: """ Get a specific model's entry from the entries folder, uniquely identified by model_name, mode, and version. """ try: model_name_safe = model_name.replace("/", "_").replace(" ", "_") mode_safe = str(mode).replace("/", "_").replace(" ", "_").lower() entry_path = hf_hub_download( repo_id=RESULTS_DATASET_ID, filename=f"entries/entry_{model_name_safe}_{mode_safe}_{version}.json", repo_type="dataset", token=TOKEN ) with open(entry_path, 'r') as f: return json.load(f) except Exception as e: print(f"Error downloading model entry: {e}") return None def get_all_entries(version="v0", mode: str = None) -> List[Dict]: """ Get all model entries from the entries folder. If mode is provided, only return entries matching that mode. """ try: api = HfApi(token=TOKEN) files = api.list_repo_files(repo_id=RESULTS_DATASET_ID, repo_type="dataset") if mode is not None: mode_safe = str(mode).replace("/", "_").replace(" ", "_").lower() entry_files = [f for f in files if f.startswith("entries/") and f"_{mode_safe}_" in f and f.endswith(f"_{version}.json")] else: entry_files = [f for f in files if f.startswith("entries/") and f.endswith(f"_{version}.json")] entries = [] for entry_file in entry_files: try: entry_path = hf_hub_download( repo_id=RESULTS_DATASET_ID, filename=entry_file, repo_type="dataset", token=TOKEN ) with open(entry_path, 'r') as f: entry_data = json.load(f) entries.append(entry_data) except Exception as e: print(f"Error loading entry {entry_file}: {e}") return entries except Exception as e: print(f"Error listing entries: {e}") return [] def get_leaderboard_df(version="v0") -> pd.DataFrame: """ Get the leaderboard data as a DataFrame. """ # Get latest leaderboard data leaderboard_data = get_latest_leaderboard(version) if not leaderboard_data: # If no leaderboard exists, try to build it from entries entries = get_all_entries(version) if entries: leaderboard_data = { "entries": entries, "last_updated": datetime.now().isoformat(), "version": version } else: # Return empty DataFrame if no data available return pd.DataFrame(columns=DISPLAY_COLS) # Convert to DataFrame return leaderboard_to_dataframe(leaderboard_data) def get_category_leaderboard_df(category: str, version="v0") -> pd.DataFrame: """ Get the leaderboard data filtered by a specific category. """ # Get latest leaderboard data leaderboard_data = get_latest_leaderboard(version) if not leaderboard_data: # If no leaderboard exists, try to build it from entries entries = get_all_entries(version) if entries: leaderboard_data = { "entries": entries, "last_updated": datetime.now().isoformat(), "version": version } else: # Return empty DataFrame if no data available return pd.DataFrame(columns=DISPLAY_COLS) # Filter entries to only include those with data for the specified category filtered_entries = [] for entry in leaderboard_data.get("entries", []): # Copy all base fields filtered_entry = { "model_name": entry.get("model_name", "Unknown Model"), "model_type": entry.get("model_type", "Unknown"), "guard_model_type": entry.get("guard_model_type", "Unknown"), "mode": entry.get("mode", "Strict"), "submission_date": entry.get("submission_date", ""), "version": entry.get("version", version), "base_model": entry.get("base_model", ""), "revision": entry.get("revision", ""), "precision": entry.get("precision", ""), "weight_type": entry.get("weight_type", "") } if "per_category_metrics" in entry and category in entry["per_category_metrics"]: category_metrics = entry["per_category_metrics"][category] # Add all metrics for each test type for test_type, metrics in category_metrics.items(): if isinstance(metrics, dict): for metric, value in metrics.items(): col_name = f"{test_type}_{metric}" filtered_entry[col_name] = value # Also add the non-binary version for F1 scores if metric == "f1_binary": filtered_entry[f"{test_type}_f1"] = value # Calculate averages f1_values = [] recall_values = [] precision_values = [] accuracy_values = [] category_recall_values = [] total_samples = 0 for test_type in ["default_prompts", "jailbreaked_prompts", "default_answers", "jailbreaked_answers"]: if test_type in category_metrics and isinstance(category_metrics[test_type], dict): test_metrics = category_metrics[test_type] if "f1_binary" in test_metrics and pd.notna(test_metrics["f1_binary"]): f1_values.append(test_metrics["f1_binary"]) if "recall_binary" in test_metrics and pd.notna(test_metrics["recall_binary"]): recall_values.append(test_metrics["recall_binary"]) category_recall_values.append(test_metrics["recall_binary"]) if "precision_binary" in test_metrics and pd.notna(test_metrics["precision_binary"]): precision_values.append(test_metrics["precision_binary"]) if "accuracy" in test_metrics and pd.notna(test_metrics["accuracy"]): accuracy_values.append(test_metrics["accuracy"]) if "sample_count" in test_metrics and pd.notna(test_metrics["sample_count"]): total_samples += test_metrics["sample_count"] # print(f"F1 values: {f1_values}") # print(f1_values, recall_values, precision_values, accuracy_values, total_samples) # Add overall averages if f1_values: filtered_entry["average_f1"] = sum(f1_values) / len(f1_values) if recall_values: filtered_entry["average_recall"] = sum(recall_values) / len(recall_values) if precision_values: filtered_entry["average_precision"] = sum(precision_values) / len(precision_values) # Add category-specific values to standard macro metric keys if accuracy_values: filtered_entry["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values) else: filtered_entry["macro_accuracy"] = np.nan if category_recall_values: filtered_entry["macro_recall"] = sum(category_recall_values) / len(category_recall_values) else: filtered_entry["macro_recall"] = np.nan if total_samples > 0: filtered_entry["total_evals_count"] = total_samples else: filtered_entry["total_evals_count"] = np.nan filtered_entries.append(filtered_entry) # Create a new leaderboard data structure with the filtered entries filtered_leaderboard = { "entries": filtered_entries, "last_updated": leaderboard_data.get("last_updated", datetime.now().isoformat()), "version": version } # print(filtered_leaderboard) # Convert to DataFrame return leaderboard_to_dataframe(filtered_leaderboard) def get_detailed_model_data(model_name: str, mode: str, version="v0") -> Dict: """ Get detailed data for a specific model and mode. """ entry = get_model_entry(model_name, mode, version) if entry: return entry leaderboard_data = get_latest_leaderboard(version) if leaderboard_data: for entry in leaderboard_data.get("entries", []): if entry.get("model_name") == model_name and str(entry.get("mode")).lower() == str(mode).lower(): return entry return {}