import asyncio import shutil import tempfile import gradio as gr import pandas as pd import plotly.express as px import src.constants as constants from src.hub import glob, load_json_file def load_result_paths_per_model(): return sort_result_paths_per_model(fetch_result_paths()) def fetch_result_paths(): path = f"{constants.RESULTS_DATASET_ID}/**/**/*.json" return glob(path) def sort_result_paths_per_model(paths): from collections import defaultdict d = defaultdict(list) for path in paths: model_id, _ = path[len(constants.RESULTS_DATASET_ID) + 1 :].rsplit("/", 1) d[model_id].append(path) return {model_id: sorted(paths) for model_id, paths in d.items()} async def load_results_dataframe(model_id, result_paths_per_model=None): if not model_id or not result_paths_per_model: return result_paths = result_paths_per_model[model_id] results = await asyncio.gather(*[load_json_file(path) for path in result_paths]) results = [result for result in results if result] if not results: return data = {"results": {}, "configs": {}} for result in results: data["results"].update(result["results"]) data["configs"].update(result["configs"]) model_name = result.get("model_name", "Model") df = pd.json_normalize([data]) # df.columns = df.columns.str.split(".") # .split return a list instead of a tuple return df.set_index(pd.Index([model_name])) async def load_results(result_paths_per_model, *model_ids_lists): dfs = await asyncio.gather( *[ load_results_dataframe(model_id, result_paths_per_model) for model_ids in model_ids_lists if model_ids for model_id in model_ids ] ) dfs = [df for df in dfs if df is not None] if dfs: return pd.concat(dfs), None else: return None, None def display_results(df, task, hide_std_errors, show_only_differences): if df is None: return None, None df = df.T.rename_axis(columns=None) return ( display_tab("results", df, task, hide_std_errors=hide_std_errors), display_tab("configs", df, task, show_only_differences=show_only_differences), ) def display_tab(tab, df, task, hide_std_errors=True, show_only_differences=False): if show_only_differences: any_difference = df.ne(df.iloc[:, 0], axis=0).any(axis=1) df = df.style.format(escape="html", na_rep="") # Hide rows df.hide( [ row for row in df.index if ( not row.startswith(f"{tab}.") or row.startswith(f"{tab}.leaderboard.") or row.endswith(".alias") or ( not row.startswith(f"{tab}.{task}") if task != "All" else row.startswith(f"{tab}.leaderboard_arc_challenge") # Hide legacy ARC ) # Hide MATH fewshot_config.samples: or (row.startswith(f"{tab}.leaderboard_math") and row.endswith("fewshot_config.samples")) # Hide std errors or (hide_std_errors and row.endswith("_stderr,none")) # Hide non-different rows or (show_only_differences and not any_difference[row]) ) ], axis="index", ) # Color metric result cells idx = pd.IndexSlice colored_rows = idx[ [ row for row in df.index if row.endswith("acc,none") or row.endswith("acc_norm,none") or row.endswith("exact_match,none") ] ] # Apply only on numeric cells, otherwise the background gradient will not work subset = idx[colored_rows, idx[:]] df.background_gradient(cmap="PiYG", vmin=0, vmax=1, subset=subset, axis=None) # Format index values: remove prefix and suffix start = len(f"{tab}.leaderboard_") if task == "All" else len(f"{tab}.{task} ") df.format_index(lambda idx: idx[start:].removesuffix(",none"), axis="index") # Fix overflow df.set_table_styles( [ { "selector": "td", "props": [("overflow-wrap", "break-word"), ("max-width", "1px")], }, { "selector": ".col_heading", "props": [("width", f"{100 / len(df.columns)}%")], }, ] ) return df.to_html() def update_tasks_component(): return ( gr.Radio( ["All"] + list(constants.TASKS.values()), label="Tasks", info="Evaluation tasks to be displayed", value="All", visible=True, ), ) * 2 def clear_results(): # model_ids, dataframe, load_results_btn, load_configs_btn, results_task, configs_task return ( gr.Dropdown(value=[]), None, *(gr.Button("Load", interactive=False),) * 2, *( gr.Radio( ["All"] + list(constants.TASKS.values()), label="Tasks", info="Evaluation tasks to be displayed", value="All", visible=False, ), ) * 2, ) def display_loading_message_for_results(): return ("

Loading...

",) * 2 def plot_results(df, task): if df is not None: df = df[ [ col for col in df.columns if col.startswith("results.") and (col.endswith("acc,none") or col.endswith("acc_norm,none") or col.endswith("exact_match,none")) ] ] tasks = {key: tupl[0] for key, tupl in constants.TASKS.items()} tasks["leaderboard_math"] = tasks["leaderboard_math_hard"] subtasks = {tupl[1]: tupl[0] for tupl in constants.SUBTASKS.get(task, [])} if task == "All": df = df[[col for col in df.columns if col.split(".")[1] in tasks]] # - IFEval: Calculate average of both strict accuracies ifeval_mean = df[ [ "results.leaderboard_ifeval.inst_level_strict_acc,none", "results.leaderboard_ifeval.prompt_level_strict_acc,none", ] ].mean(axis=1) df = df.drop(columns=[col for col in df.columns if col.split(".")[1] == "leaderboard_ifeval"]) loc = df.columns.get_loc("results.leaderboard_math_hard.exact_match,none") df.insert(loc - 1, "results.leaderboard_ifeval", ifeval_mean) # Rename df = df.rename(columns=lambda col: tasks[col.split(".")[1]]) else: df = df[[col for col in df.columns if col.startswith(f"results.{task}")]] # - IFEval: Return 4 accuracies if task == "leaderboard_ifeval": df = df.rename(columns=lambda col: col.split(".")[2].removesuffix(",none")) else: df = df.rename(columns=lambda col: tasks.get(col.split(".")[1], subtasks.get(col.split(".")[1]))) fig_1 = px.bar( df.T.rename_axis(columns="Model"), barmode="group", labels={"index": "Benchmark" if task == "All" else "Subtask", "value": "Score"}, color_discrete_sequence=px.colors.qualitative.Safe, # TODO: https://plotly.com/python/discrete-color/ ) fig_1.update_yaxes(range=[0, 1]) fig_2 = px.line_polar( df.melt(ignore_index=False, var_name="Benchmark", value_name="Score").reset_index(names="Model"), r="Score", theta="Benchmark", color="Model", line_close=True, range_r=[0, 1], color_discrete_sequence=px.colors.qualitative.Safe, # TODO: https://plotly.com/python/discrete-color/ ) # Avoid bug with radar: fig_2.update_layout( title_text="", title_font_size=1, ) return fig_1, fig_2 else: return None, None tmpdirname = None def download_results(results): global tmpdirname if results: if tmpdirname: shutil.rmtree(tmpdirname) tmpdirname = tempfile.mkdtemp() path = f"{tmpdirname}/results.html" with open(path, "w") as f: f.write(results) return gr.File(path, visible=True) def clear_results_file(): global tmpdirname if tmpdirname: shutil.rmtree(tmpdirname) tmpdirname = None return gr.File(visible=False)