Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import asyncio | |
import shutil | |
import tempfile | |
import gradio as gr | |
import pandas as pd | |
import plotly.express as px | |
import src.constants as constants | |
from src.hub import glob, load_json_file | |
def load_result_paths_per_model(): | |
return sort_result_paths_per_model(fetch_result_paths()) | |
def fetch_result_paths(): | |
path = f"{constants.RESULTS_DATASET_ID}/**/**/*.json" | |
return glob(path) | |
def sort_result_paths_per_model(paths): | |
from collections import defaultdict | |
d = defaultdict(list) | |
for path in paths: | |
model_id, _ = path[len(constants.RESULTS_DATASET_ID) + 1 :].rsplit("/", 1) | |
d[model_id].append(path) | |
return {model_id: sorted(paths) for model_id, paths in d.items()} | |
def update_load_results_component(): | |
return (gr.Button("Load", interactive=True),) * 2 | |
async def load_results_dataframe(model_id, result_paths_per_model=None): | |
if not model_id or not result_paths_per_model: | |
return | |
result_paths = result_paths_per_model[model_id] | |
results = await asyncio.gather(*[load_json_file(path) for path in result_paths]) | |
data = {"results": {}, "configs": {}} | |
for result in results: | |
data["results"].update(result["results"]) | |
data["configs"].update(result["configs"]) | |
model_name = result.get("model_name", "Model") | |
df = pd.json_normalize([data]) | |
# df.columns = df.columns.str.split(".") # .split return a list instead of a tuple | |
return df.set_index(pd.Index([model_name])) | |
async def load_results(model_ids, result_paths_per_model=None): | |
dfs = await asyncio.gather(*[load_results_dataframe(model_id, result_paths_per_model) for model_id in model_ids]) | |
if dfs: | |
return pd.concat(dfs) | |
def display_results(df, task, hide_std_errors, show_only_differences): | |
if df is None: | |
return None, None | |
df = df.T.rename_axis(columns=None) | |
return ( | |
display_tab("results", df, task, hide_std_errors=hide_std_errors), | |
display_tab("configs", df, task, show_only_differences=show_only_differences), | |
) | |
def display_tab(tab, df, task, hide_std_errors=True, show_only_differences=False): | |
if show_only_differences: | |
any_difference = df.ne(df.iloc[:, 0], axis=0).any(axis=1) | |
df = df.style.format(escape="html", na_rep="") | |
# Hide rows | |
df.hide( | |
[ | |
row | |
for row in df.index | |
if ( | |
not row.startswith(f"{tab}.") | |
or row.startswith(f"{tab}.leaderboard.") | |
or row.endswith(".alias") | |
or ( | |
not row.startswith(f"{tab}.{task}") | |
if task != "All" | |
else row.startswith(f"{tab}.leaderboard_arc_challenge") # Hide legacy ARC | |
) | |
# Hide MATH fewshot_config.samples: <function list_fewshot_samples at 0x7f34d199ab90> | |
or (row.startswith(f"{tab}.leaderboard_math") and row.endswith("fewshot_config.samples")) | |
# Hide std errors | |
or (hide_std_errors and row.endswith("_stderr,none")) | |
# Hide non-different rows | |
or (show_only_differences and not any_difference[row]) | |
) | |
], | |
axis="index", | |
) | |
# Color metric result cells | |
idx = pd.IndexSlice | |
colored_rows = idx[ | |
[ | |
row | |
for row in df.index | |
if row.endswith("acc,none") or row.endswith("acc_norm,none") or row.endswith("exact_match,none") | |
] | |
] # Apply only on numeric cells, otherwise the background gradient will not work | |
subset = idx[colored_rows, idx[:]] | |
df.background_gradient(cmap="PiYG", vmin=0, vmax=1, subset=subset, axis=None) | |
# Format index values: remove prefix and suffix | |
start = len(f"{tab}.leaderboard_") if task == "All" else len(f"{tab}.{task} ") | |
df.format_index(lambda idx: idx[start:].removesuffix(",none"), axis="index") | |
# Fix overflow | |
df.set_table_styles( | |
[ | |
{ | |
"selector": "td", | |
"props": [("overflow-wrap", "break-word"), ("max-width", "1px")], | |
}, | |
{ | |
"selector": ".col_heading", | |
"props": [("width", f"{100 / len(df.columns)}%")], | |
}, | |
] | |
) | |
return df.to_html() | |
def update_tasks_component(): | |
return ( | |
gr.Radio( | |
["All"] + list(constants.TASKS.values()), | |
label="Tasks", | |
info="Evaluation tasks to be displayed", | |
value="All", | |
visible=True, | |
), | |
) * 2 | |
def clear_results(): | |
# model_ids, dataframe, load_results_btn, load_configs_btn, results_task, configs_task | |
return ( | |
gr.Dropdown(value=[]), | |
None, | |
*(gr.Button("Load", interactive=False),) * 2, | |
*( | |
gr.Radio( | |
["All"] + list(constants.TASKS.values()), | |
label="Tasks", | |
info="Evaluation tasks to be displayed", | |
value="All", | |
visible=False, | |
), | |
) | |
* 2, | |
) | |
def display_loading_message_for_results(): | |
return ("<h3 style='text-align: center;'>Loading...</h3>",) * 2 | |
def plot_results(df, task): | |
if df is not None: | |
df = df[ | |
[ | |
col | |
for col in df.columns | |
if col.startswith("results.") | |
and (col.endswith("acc,none") or col.endswith("acc_norm,none") or col.endswith("exact_match,none")) | |
] | |
] | |
tasks = {key: tupl[0] for key, tupl in constants.TASKS.items()} | |
tasks["leaderboard_math"] = tasks["leaderboard_math_hard"] | |
subtasks = {tupl[1]: tupl[0] for tupl in constants.SUBTASKS.get(task, [])} | |
if task == "All": | |
df = df[[col for col in df.columns if col.split(".")[1] in tasks]] | |
# - IFEval: Calculate average of both strict accuracies | |
ifeval_mean = df[ | |
[ | |
"results.leaderboard_ifeval.inst_level_strict_acc,none", | |
"results.leaderboard_ifeval.prompt_level_strict_acc,none", | |
] | |
].mean(axis=1) | |
df = df.drop(columns=[col for col in df.columns if col.split(".")[1] == "leaderboard_ifeval"]) | |
loc = df.columns.get_loc("results.leaderboard_math_hard.exact_match,none") | |
df.insert(loc - 1, "results.leaderboard_ifeval", ifeval_mean) | |
# Rename | |
df = df.rename(columns=lambda col: tasks[col.split(".")[1]]) | |
else: | |
df = df[[col for col in df.columns if col.startswith(f"results.{task}")]] | |
# - IFEval: Return 4 accuracies | |
if task == "leaderboard_ifeval": | |
df = df.rename(columns=lambda col: col.split(".")[2].removesuffix(",none")) | |
else: | |
df = df.rename(columns=lambda col: tasks.get(col.split(".")[1], subtasks.get(col.split(".")[1]))) | |
fig_1 = px.bar( | |
df.T.rename_axis(columns="Model"), | |
barmode="group", | |
labels={"index": "Benchmark" if task == "All" else "Subtask", "value": "Score"}, | |
color_discrete_sequence=px.colors.qualitative.Safe, # TODO: https://plotly.com/python/discrete-color/ | |
) | |
fig_1.update_yaxes(range=[0, 1]) | |
fig_2 = px.line_polar( | |
df.melt(ignore_index=False, var_name="Benchmark", value_name="Score").reset_index(names="Model"), | |
r="Score", | |
theta="Benchmark", | |
color="Model", | |
line_close=True, | |
range_r=[0, 1], | |
color_discrete_sequence=px.colors.qualitative.Safe, # TODO: https://plotly.com/python/discrete-color/ | |
) | |
# Avoid bug with radar: | |
fig_2.update_layout( | |
title_text="", | |
title_font_size=1, | |
) | |
return fig_1, fig_2 | |
else: | |
return None, None | |
tmpdirname = None | |
def download_results(results): | |
global tmpdirname | |
if results: | |
if tmpdirname: | |
shutil.rmtree(tmpdirname) | |
tmpdirname = tempfile.mkdtemp() | |
path = f"{tmpdirname}/results.html" | |
with open(path, "w") as f: | |
f.write(results) | |
return gr.File(path, visible=True) | |
def clear_results_file(): | |
global tmpdirname | |
if tmpdirname: | |
shutil.rmtree(tmpdirname) | |
tmpdirname = None | |
return gr.File(visible=False) | |