Alex Jude
New leaderboard design (#19)
6f17dc5
import itertools
import os
import numpy as np
import pandas as pd
from datasets import load_dataset
import style
from style import LANG_SYMBOLS, T_SYMBOLS
ZERO_SHOT_ONLY = ["BELEBELE"]
FEW_SHOT_ONLY = ["GSM8K", "TruthfulQA"]
def init():
global repo_id, config_name, split_name, hidden_df, task_group_names_list, task_group_type_dict, task_groups_shots_dict, languages_list, model_type_dict
repo_id = os.getenv("OGX_LEADERBOARD_DATASET_NAME")
config_name = os.getenv("OGX_LEADERBOARD_DATASET_CONFIG")
split_name = os.getenv("OGX_LEADERBOARD_DATASET_SPLIT")
dataset = load_dataset(repo_id, config_name, split=split_name)
hidden_df = dataset.to_pandas()
task_group_names_list = hidden_df["Task_Group"].unique().tolist()
task_group_type_df = hidden_df[["Task_Group", "Task_Type"]].drop_duplicates()
task_group_type_dict = task_group_type_df.set_index("Task_Group")["Task_Type"].to_dict()
task_groups_shots_df = hidden_df[hidden_df["Few_Shot"] == True][["Task_Group", "Number_Shots"]].drop_duplicates()
task_groups_shots_dict = task_groups_shots_df.set_index("Task_Group")["Number_Shots"].to_dict()
languages_list = hidden_df["Language"].drop_duplicates().str.upper().tolist()
model_type_df = hidden_df[["Model_Name", "Model_Type"]].drop_duplicates()
model_type_dict = model_type_df.set_index("Model_Name")["Model_Type"].to_dict()
hidden_df = hidden_df.pivot_table(
columns=["Task_Group", "Few_Shot", "Language"],
index=["Model_Name"],
values="Value",
dropna=False,
).reset_index(inplace=False)
hidden_df["Type"] = hidden_df["Model_Name"].apply(lambda x: style.T_SYMBOLS[model_type_dict[x]])
def sort_cols(df: pd.DataFrame, fewshot: bool = False) -> pd.DataFrame:
task_cols = get_task_columns(df)
return df.reindex(["Type", "Model_Name", "Average"] + sorted(task_cols), axis=1)
def get_task_columns(df: pd.DataFrame) -> pd.DataFrame:
l = list(df.columns)
l.remove("Model_Name")
l.remove("Average")
l.remove("Type")
return l
def get_models(df: pd.DataFrame) -> pd.DataFrame:
return df["Model_Name"].unique()
def filter_type(df: pd.DataFrame, model_types: list[str]) -> pd.DataFrame:
"""Keep only rows for which model type is in list of types"""
return df[df["Type"].isin(model_types)]
def search_model(df: pd.DataFrame, query: str) -> pd.DataFrame:
"""Keep only rows for which model name matches search query"""
query = query.replace(";", "|")
return df[df["Model_Name"].str.contains(query, case=False)]
def aggregate_langs(df: pd.DataFrame, tasks: list, langs: list):
"""Aggregates results over langs for each task in tasks.
If a language does not exist for a task, the aggregate for
that task will be shown as NaN.
"""
langs_lower = [item.lower() for item in langs]
df.columns = ["_".join(filter(None, col)) for col in df.columns]
colset = set(df.columns)
for t in tasks:
cols = [(f"{a}_{b}") for a, b in itertools.product([t], langs_lower)]
if set(cols).issubset(colset):
df.loc[:, t] = df[cols].mean(axis=1, skipna=False)
else:
df.loc[:, t] = np.nan
df.loc[:, "Average"] = df[tasks].mean(axis=1)
return df[["Type", "Model_Name", "Average"] + tasks]
def select_shots(df: pd.DataFrame, fewshot: bool = False):
cols = [col for col in df.columns if col[1] == fewshot] + []
# Move model name and type icon to the end
cols.append(("Model_Name", "", ""))
cols.append(("Type", "", ""))
return df[cols].droplevel(level=1, axis="columns")
def update_df(
tasks: list[str],
model_query: str,
langs: list[str],
model_types: list[str],
fewshot: bool = False,
format: bool = True,
) -> pd.DataFrame:
"""Return a filtered dataframe according to selected models, tasks and
languages. The format flag controls whether the output dataframe should
be formatted to tw significant figures.
"""
# keep only selected shots
df = select_shots(hidden_df, fewshot)
# aggregate results over languages per task
df = aggregate_langs(df, tasks, langs)
df = df.sort_values(by="Average", ascending=False)
# filter models by search bar and model type
df = search_model(df, model_query)
df = filter_type(df, model_types)
if format:
return sort_cols(df, fewshot).style.format(precision=2, decimal=".", na_rep="N/A")
else:
return sort_cols(df, fewshot)
def get_selected_task_type(task_type_id):
task_types = {0: "accuracy", 1: "misc"}
selected_task_type = task_types[task_type_id]
return selected_task_type
def get_available_task_groups(selected_task_type, fewshot):
task_groups = [task_group_name for task_group_name, task_type in task_group_type_dict.items() if task_type == selected_task_type]
if fewshot:
available_tasks = [c for c in task_groups if c not in ZERO_SHOT_ONLY]
else:
available_tasks = [c for c in task_groups if c not in FEW_SHOT_ONLY]
return available_tasks
init()