import logging
import datasets
import gradio as gr
import pandas as pd
import datetime
from fetch_utils import (check_dataset_and_get_config,
check_dataset_and_get_split)
import leaderboard
logger = logging.getLogger(__name__)
global update_time
update_time = datetime.datetime.fromtimestamp(0)
def get_records_from_dataset_repo(dataset_id):
dataset_config = check_dataset_and_get_config(dataset_id)
logger.info(f"Dataset {dataset_id} has configs {dataset_config}")
dataset_split = check_dataset_and_get_split(dataset_id, dataset_config[0])
logger.info(f"Dataset {dataset_id} has splits {dataset_split}")
try:
ds = datasets.load_dataset(dataset_id, dataset_config[0], split=dataset_split[0])
df = ds.to_pandas()
return df
except Exception as e:
logger.warning(
f"Failed to load dataset {dataset_id} with config {dataset_config}: {e}"
)
return pd.DataFrame()
def get_model_ids(ds):
logging.info(f"Dataset {ds} column names: {ds['model_id']}")
models = ds["model_id"].tolist()
# return unique elements in the list model_ids
model_ids = list(set(models))
model_ids.insert(0, "Any")
return model_ids
def get_dataset_ids(ds):
logging.info(f"Dataset {ds} column names: {ds['dataset_id']}")
datasets = ds["dataset_id"].tolist()
dataset_ids = list(set(datasets))
dataset_ids.insert(0, "Any")
return dataset_ids
def get_types(ds):
# set types for each column
types = [str(t) for t in ds.dtypes.to_list()]
types = [t.replace("object", "markdown") for t in types]
types = [t.replace("float64", "number") for t in types]
types = [t.replace("int64", "number") for t in types]
return types
def get_display_df(df):
# style all elements in the model_id column
display_df = df.copy()
columns = display_df.columns.tolist()
if "model_id" in columns:
display_df["model_id"] = display_df["model_id"].apply(
lambda x: f'🔗{x}'
)
# style all elements in the dataset_id column
if "dataset_id" in columns:
display_df["dataset_id"] = display_df["dataset_id"].apply(
lambda x: f'🔗{x}'
)
# style all elements in the report_link column
if "report_link" in columns:
display_df["report_link"] = display_df["report_link"].apply(
lambda x: f'🔗{x}'
)
return display_df
def get_demo(leaderboard_tab):
global update_time
update_time = datetime.datetime.now()
logger.info("Loading leaderboard records")
leaderboard.records = get_records_from_dataset_repo(leaderboard.LEADERBOARD)
records = leaderboard.records
model_ids = get_model_ids(records)
dataset_ids = get_dataset_ids(records)
column_names = records.columns.tolist()
issue_columns = column_names[:11]
info_columns = column_names[15:]
default_columns = ["dataset_id", "total_issues", "report_link"]
default_df = records[default_columns] # extract columns selected
types = get_types(default_df)
display_df = get_display_df(default_df) # the styled dataframe to display
with gr.Row():
with gr.Column():
issue_columns_select = gr.CheckboxGroup(
label="Issue Columns",
choices=issue_columns,
value=[],
interactive=True,
)
with gr.Column():
info_columns_select = gr.CheckboxGroup(
label="Info Columns",
choices=info_columns,
value=default_columns,
interactive=True,
)
with gr.Row():
task_select = gr.Dropdown(
label="Task",
choices=["text_classification", "tabular"],
value="text_classification",
interactive=True,
)
model_select = gr.Dropdown(
label="Model id", choices=model_ids, value=model_ids[0], interactive=True
)
dataset_select = gr.Dropdown(
label="Dataset id",
choices=dataset_ids,
value=dataset_ids[0],
interactive=True,
)
with gr.Row():
leaderboard_df = gr.DataFrame(display_df, datatype=types, interactive=False)
def update_leaderboard_records(model_id, dataset_id, issue_columns, info_columns, task):
global update_time
if datetime.datetime.now() - update_time < datetime.timedelta(minutes=10):
return gr.update()
update_time = datetime.datetime.now()
logger.info("Updating leaderboard records")
leaderboard.records = get_records_from_dataset_repo(leaderboard.LEADERBOARD)
return filter_table(model_id, dataset_id, issue_columns, info_columns, task)
leaderboard_tab.select(
fn=update_leaderboard_records,
inputs=[model_select, dataset_select, issue_columns_select, info_columns_select, task_select],
outputs=[leaderboard_df])
@gr.on(
triggers=[
model_select.change,
dataset_select.change,
issue_columns_select.change,
info_columns_select.change,
task_select.change,
],
inputs=[model_select, dataset_select, issue_columns_select, info_columns_select, task_select],
outputs=[leaderboard_df],
)
def filter_table(model_id, dataset_id, issue_columns, info_columns, task):
logger.info("Filtering leaderboard records")
records = leaderboard.records
# filter the table based on task
df = records[(records["task"] == task)]
# filter the table based on the model_id and dataset_id
if model_id and model_id != "Any":
df = df[(df["model_id"] == model_id)]
if dataset_id and dataset_id != "Any":
df = df[(df["dataset_id"] == dataset_id)]
# filter the table based on the columns
issue_columns.sort()
df = df[["model_id"] + info_columns + issue_columns]
types = get_types(df)
display_df = get_display_df(df)
return gr.update(value=display_df, datatype=types, interactive=False)