hamzabouajila's picture
feat: enhance evaluation pipeline and error handling
34052ff
import json
import os
import pandas as pd
from datetime import datetime, timedelta
import dateutil
from src.display.formatting import has_no_nan_values, make_clickable_model
from src.display.utils import AutoEvalColumn, EvalQueueColumn, ModelType, Tasks, Precision, WeightType
from src.leaderboard.read_evals import get_raw_eval_results
def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame:
"""Creates a dataframe from all the individual experiment results"""
raw_data = get_raw_eval_results(results_path, requests_path)
all_data_json = [v.to_dict() for v in raw_data]
df = pd.DataFrame.from_records(all_data_json)
if df.empty:
print("No evaluation results found. Returning empty DataFrame with correct columns.")
return pd.DataFrame(columns=cols)
df = df.sort_values(by=[AutoEvalColumn().average.name], ascending=False)
df = df[cols].round(decimals=4)
df = df[has_no_nan_values(df, benchmark_cols)]
return df
def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]:
"""Creates the different dataframes for the evaluation queues requestes"""
all_evals = []
# Define a threshold to identify "stuck" jobs
time_threshold = datetime.now() - timedelta(hours=1)
# Use os.walk for a robust way to find all files recursively
for root, _, files in os.walk(save_path):
for filename in files:
if filename.endswith(".json"):
file_path = os.path.join(root, filename)
try:
with open(file_path, "r") as fp:
data = json.load(fp)
# Check for "stuck" jobs
if data.get("status") == "RUNNING":
submitted_time_str = data.get("submitted_at")
if submitted_time_str:
submitted_time = dateutil.parser.isoparse(submitted_time_str)
if submitted_time < time_threshold:
print(f"Stuck job detected for {data['model']}. Changing status to PENDING.")
data["status"] = "PENDING"
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"])
data[EvalQueueColumn.revision.name] = data.get("revision", "main")
all_evals.append(data)
except Exception as e:
print(f"Error processing file {file_path}: {e}")
continue
pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]]
running_list = [e for e in all_evals if e["status"] == "RUNNING"]
finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"]
df_pending = pd.DataFrame.from_records(pending_list, columns=cols) if pending_list else pd.DataFrame(columns=cols)
df_running = pd.DataFrame.from_records(running_list, columns=cols) if running_list else pd.DataFrame(columns=cols)
df_finished = pd.DataFrame.from_records(finished_list, columns=cols) if finished_list else pd.DataFrame(columns=cols)
return df_finished[cols], df_running[cols], df_pending[cols]