|
import itertools |
|
import os |
|
|
|
import numpy as np |
|
import pandas as pd |
|
from datasets import load_dataset |
|
|
|
import style |
|
from style import LANG_SYMBOLS, T_SYMBOLS |
|
|
|
ZERO_SHOT_ONLY = ["BELEBELE"] |
|
FEW_SHOT_ONLY = ["GSM8K", "TruthfulQA"] |
|
|
|
|
|
def init(): |
|
global repo_id, config_name, split_name, hidden_df, task_group_names_list, task_group_type_dict, task_groups_shots_dict, languages_list, model_type_dict |
|
|
|
repo_id = os.getenv("OGX_LEADERBOARD_DATASET_NAME") |
|
config_name = os.getenv("OGX_LEADERBOARD_DATASET_CONFIG") |
|
split_name = os.getenv("OGX_LEADERBOARD_DATASET_SPLIT") |
|
|
|
dataset = load_dataset(repo_id, config_name, split=split_name) |
|
hidden_df = dataset.to_pandas() |
|
|
|
task_group_names_list = hidden_df["Task_Group"].unique().tolist() |
|
task_group_type_df = hidden_df[["Task_Group", "Task_Type"]].drop_duplicates() |
|
task_group_type_dict = task_group_type_df.set_index("Task_Group")["Task_Type"].to_dict() |
|
task_groups_shots_df = hidden_df[hidden_df["Few_Shot"] == True][["Task_Group", "Number_Shots"]].drop_duplicates() |
|
task_groups_shots_dict = task_groups_shots_df.set_index("Task_Group")["Number_Shots"].to_dict() |
|
languages_list = hidden_df["Language"].drop_duplicates().str.upper().tolist() |
|
model_type_df = hidden_df[["Model_Name", "Model_Type"]].drop_duplicates() |
|
model_type_dict = model_type_df.set_index("Model_Name")["Model_Type"].to_dict() |
|
|
|
hidden_df = hidden_df.pivot_table( |
|
columns=["Task_Group", "Few_Shot", "Language"], |
|
index=["Model_Name"], |
|
values="Value", |
|
dropna=False, |
|
).reset_index(inplace=False) |
|
|
|
hidden_df["Type"] = hidden_df["Model_Name"].apply(lambda x: style.T_SYMBOLS[model_type_dict[x]]) |
|
|
|
|
|
def sort_cols(df: pd.DataFrame, fewshot: bool = False) -> pd.DataFrame: |
|
task_cols = get_task_columns(df) |
|
return df.reindex(["Type", "Model_Name", "Average"] + sorted(task_cols), axis=1) |
|
|
|
|
|
def get_task_columns(df: pd.DataFrame) -> pd.DataFrame: |
|
l = list(df.columns) |
|
l.remove("Model_Name") |
|
l.remove("Average") |
|
l.remove("Type") |
|
return l |
|
|
|
|
|
def get_models(df: pd.DataFrame) -> pd.DataFrame: |
|
return df["Model_Name"].unique() |
|
|
|
|
|
def filter_type(df: pd.DataFrame, model_types: list[str]) -> pd.DataFrame: |
|
"""Keep only rows for which model type is in list of types""" |
|
return df[df["Type"].isin(model_types)] |
|
|
|
|
|
def search_model(df: pd.DataFrame, query: str) -> pd.DataFrame: |
|
"""Keep only rows for which model name matches search query""" |
|
query = query.replace(";", "|") |
|
return df[df["Model_Name"].str.contains(query, case=False)] |
|
|
|
|
|
def aggregate_langs(df: pd.DataFrame, tasks: list, langs: list): |
|
"""Aggregates results over langs for each task in tasks. |
|
If a language does not exist for a task, the aggregate for |
|
that task will be shown as NaN. |
|
""" |
|
|
|
langs_lower = [item.lower() for item in langs] |
|
df.columns = ["_".join(filter(None, col)) for col in df.columns] |
|
colset = set(df.columns) |
|
for t in tasks: |
|
cols = [(f"{a}_{b}") for a, b in itertools.product([t], langs_lower)] |
|
if set(cols).issubset(colset): |
|
df.loc[:, t] = df[cols].mean(axis=1, skipna=False) |
|
else: |
|
df.loc[:, t] = np.nan |
|
df.loc[:, "Average"] = df[tasks].mean(axis=1) |
|
return df[["Type", "Model_Name", "Average"] + tasks] |
|
|
|
|
|
def select_shots(df: pd.DataFrame, fewshot: bool = False): |
|
cols = [col for col in df.columns if col[1] == fewshot] + [] |
|
|
|
cols.append(("Model_Name", "", "")) |
|
cols.append(("Type", "", "")) |
|
return df[cols].droplevel(level=1, axis="columns") |
|
|
|
|
|
def update_df( |
|
tasks: list[str], |
|
model_query: str, |
|
langs: list[str], |
|
model_types: list[str], |
|
fewshot: bool = False, |
|
format: bool = True, |
|
) -> pd.DataFrame: |
|
"""Return a filtered dataframe according to selected models, tasks and |
|
languages. The format flag controls whether the output dataframe should |
|
be formatted to tw significant figures. |
|
""" |
|
|
|
df = select_shots(hidden_df, fewshot) |
|
|
|
|
|
df = aggregate_langs(df, tasks, langs) |
|
df = df.sort_values(by="Average", ascending=False) |
|
|
|
|
|
df = search_model(df, model_query) |
|
df = filter_type(df, model_types) |
|
|
|
if format: |
|
return sort_cols(df, fewshot).style.format(precision=2, decimal=".", na_rep="N/A") |
|
else: |
|
return sort_cols(df, fewshot) |
|
|
|
|
|
def get_selected_task_type(task_type_id): |
|
task_types = {0: "accuracy", 1: "misc"} |
|
selected_task_type = task_types[task_type_id] |
|
return selected_task_type |
|
|
|
|
|
def get_available_task_groups(selected_task_type, fewshot): |
|
task_groups = [task_group_name for task_group_name, task_type in task_group_type_dict.items() if task_type == selected_task_type] |
|
|
|
if fewshot: |
|
available_tasks = [c for c in task_groups if c not in ZERO_SHOT_ONLY] |
|
else: |
|
available_tasks = [c for c in task_groups if c not in FEW_SHOT_ONLY] |
|
|
|
return available_tasks |
|
|
|
|
|
init() |
|
|