Spaces:
Runtime error
Runtime error
import pandas as pd | |
import streamlit as st | |
from huggingface_hub import HfApi, hf_hub_download | |
from huggingface_hub.repocard import metadata_load | |
from ascending_metrics import ascending_metrics | |
import numpy as np | |
from st_aggrid import AgGrid, GridOptionsBuilder, JsCode | |
from os.path import exists | |
import threading | |
st.set_page_config(layout="wide") | |
def get_model_infos(): | |
api = HfApi() | |
model_infos = api.list_models(filter="model-index", cardData=True) | |
return model_infos | |
def parse_metric_value(value): | |
if isinstance(value, str): | |
"".join(value.split("%")) | |
try: | |
value = float(value) | |
except: # noqa: E722 | |
value = None | |
elif isinstance(value, list): | |
if len(value) > 0: | |
value = value[0] | |
else: | |
value = None | |
value = round(value, 2) if isinstance(value, float) else None | |
return value | |
def parse_metrics_rows(meta, only_verified=False): | |
if not isinstance(meta["model-index"], list) or len(meta["model-index"]) == 0 or "results" not in meta["model-index"][0]: | |
return None | |
for result in meta["model-index"][0]["results"]: | |
if not isinstance(result, dict) or "dataset" not in result or "metrics" not in result or "type" not in result["dataset"]: | |
continue | |
dataset = result["dataset"]["type"] | |
row = {"dataset": dataset, "split": "-unspecified-", "config": "-unspecified-"} | |
if "split" in result["dataset"]: | |
row["split"] = result["dataset"]["split"] | |
if "config" in result["dataset"]: | |
row["config"] = result["dataset"]["config"] | |
no_results = True | |
for metric in result["metrics"]: | |
name = metric["type"].lower().strip() | |
if name in ("model_id", "dataset", "split", "config"): | |
# Metrics are not allowed to be named "dataset", "split", "config". | |
continue | |
value = parse_metric_value(metric.get("value", None)) | |
if value is None: | |
continue | |
if name in row: | |
new_metric_better = value < row[name] if name in ascending_metrics else value > row[name] | |
if name not in row or new_metric_better: | |
# overwrite the metric if the new value is better. | |
if only_verified: | |
if "verified" in metric and metric["verified"]: | |
no_results = False | |
row[name] = value | |
else: | |
no_results = False | |
row[name] = value | |
if no_results: | |
continue | |
yield row | |
def get_data_wrapper(): | |
def get_data(dataframe=None, verified_dataframe=None): | |
data = [] | |
verified_data = [] | |
print("getting model infos") | |
model_infos = get_model_infos() | |
print("got model infos") | |
for model_info in model_infos: | |
meta = model_info.cardData | |
if meta is None: | |
continue | |
for row in parse_metrics_rows(meta): | |
if row is None: | |
continue | |
row["model_id"] = model_info.id | |
data.append(row) | |
for row in parse_metrics_rows(meta, only_verified=True): | |
if row is None: | |
continue | |
row["model_id"] = model_info.id | |
verified_data.append(row) | |
dataframe = pd.DataFrame.from_records(data) | |
dataframe.to_pickle("cache.pkl") | |
verified_dataframe = pd.DataFrame.from_records(verified_data) | |
verified_dataframe.to_pickle("verified_cache.pkl") | |
if exists("cache.pkl") and exists("verified_cache.pkl"): | |
# If we have saved the results previously, call an asynchronous process | |
# to fetch the results and update the saved file. Don't make users wait | |
# while we fetch the new results. Instead, display the old results for | |
# now. The new results should be loaded when this method | |
# is called again. | |
dataframe = pd.read_pickle("cache.pkl") | |
verified_dataframe = pd.read_pickle("verified_cache.pkl") | |
t = threading.Thread(name='get_data procs', target=get_data) | |
t.start() | |
else: | |
# We have to make the users wait during the first startup of this app. | |
get_data() | |
dataframe = pd.read_pickle("cache.pkl") | |
verified_dataframe = pd.read_pickle("verified_cache.pkl") | |
return dataframe, verified_dataframe | |
dataframe, verified_dataframe = get_data_wrapper() | |
st.markdown("# π€ Leaderboards") | |
only_verified_results = st.sidebar.checkbox( | |
"Filter for Verified Results", | |
) | |
selectable_datasets = sorted(list(set(dataframe.dataset.tolist())), key=lambda name: name.lower()) | |
if only_verified_results: | |
dataframe = verified_dataframe | |
query_params = st.experimental_get_query_params() | |
if "first_query_params" not in st.session_state: | |
st.session_state.first_query_params = query_params | |
first_query_params = st.session_state.first_query_params | |
default_dataset = "common_voice" | |
if "dataset" in first_query_params: | |
if len(first_query_params["dataset"]) > 0 and first_query_params["dataset"][0] in selectable_datasets: | |
default_dataset = first_query_params["dataset"][0] | |
dataset = st.sidebar.selectbox( | |
"Dataset", | |
selectable_datasets, | |
index=selectable_datasets.index(default_dataset), | |
) | |
st.experimental_set_query_params(**{"dataset": [dataset]}) | |
dataset_df = dataframe[dataframe.dataset == dataset] | |
dataset_df = dataset_df.dropna(axis="columns", how="all") | |
if len(dataset_df) > 0: | |
selectable_configs = list(set(dataset_df["config"])) | |
config = st.sidebar.selectbox( | |
"Config", | |
selectable_configs, | |
) | |
dataset_df = dataset_df[dataset_df.config == config] | |
selectable_splits = list(set(dataset_df["split"])) | |
split = st.sidebar.selectbox( | |
"Split", | |
selectable_splits, | |
) | |
dataset_df = dataset_df[dataset_df.split == split] | |
selectable_metrics = list(filter(lambda column: column not in ("model_id", "dataset", "split", "config"), dataset_df.columns)) | |
dataset_df = dataset_df.filter(["model_id"] + selectable_metrics) | |
dataset_df = dataset_df.dropna(thresh=2) # Want at least two non-na values (one for model_id and one for a metric). | |
sorting_metric = st.sidebar.radio( | |
"Sorting Metric", | |
selectable_metrics, | |
) | |
st.markdown( | |
"Please click on the model's name to be redirected to its model card." | |
) | |
st.markdown( | |
"Want to beat the leaderboard? Don't see your model here? Simply request an automatic evaluation [here](https://huggingface.co/spaces/autoevaluate/model-evaluator)." | |
) | |
# Make the default metric appear right after model names | |
cols = dataset_df.columns.tolist() | |
cols.remove(sorting_metric) | |
cols = cols[:1] + [sorting_metric] + cols[1:] | |
dataset_df = dataset_df[cols] | |
# Sort the leaderboard, giving the sorting metric highest priority and then ordering by other metrics in the case of equal values. | |
dataset_df = dataset_df.sort_values(by=cols[1:], ascending=[metric in ascending_metrics for metric in cols[1:]]) | |
dataset_df = dataset_df.replace(np.nan, '-') | |
# Make the leaderboard | |
gb = GridOptionsBuilder.from_dataframe(dataset_df) | |
gb.configure_default_column(sortable=False) | |
gb.configure_column( | |
"model_id", | |
cellRenderer=JsCode('''function(params) {return '<a target="_blank" href="https://huggingface.co/'+params.value+'">'+params.value+'</a>'}'''), | |
) | |
for name in selectable_metrics: | |
gb.configure_column(name, type=["numericColumn","numberColumnFilter","customNumericFormat"], precision=2, aggFunc='sum') | |
gb.configure_column( | |
sorting_metric, | |
sortable=True, | |
cellStyle=JsCode('''function(params) { return {'backgroundColor': '#FFD21E'}}''') | |
) | |
go = gb.build() | |
fit_columns = len(dataset_df.columns) < 10 | |
AgGrid(dataset_df, gridOptions=go, height=28*len(dataset_df) + (35 if fit_columns else 41), allow_unsafe_jscode=True, fit_columns_on_grid_load=fit_columns) | |
else: | |
st.markdown( | |
"No data to display." | |
) | |