Spaces:
Runtime error
Runtime error
import pandas as pd | |
from tqdm.auto import tqdm | |
import streamlit as st | |
from huggingface_hub import HfApi, hf_hub_download | |
from huggingface_hub.repocard import metadata_load | |
from ascending_metrics import ascending_metrics | |
import numpy as np | |
from st_aggrid import AgGrid, GridOptionsBuilder, JsCode | |
from os.path import exists | |
import threading | |
def get_model_ids(author=None): | |
api = HfApi() | |
if author is None: | |
models = api.list_models(filter="model-index") | |
else: | |
models = api.list_models(filter="model-index", author="autoevaluate") | |
model_ids = [x.modelId for x in models] | |
return model_ids | |
def get_metadata(model_id): | |
try: | |
readme_path = hf_hub_download(model_id, filename="README.md") | |
return metadata_load(readme_path) | |
except Exception: | |
# 404 README.md not found or problem loading it | |
return None | |
def parse_metric_value(value): | |
if isinstance(value, str): | |
"".join(value.split("%")) | |
try: | |
value = float(value) | |
except: # noqa: E722 | |
value = None | |
elif isinstance(value, list): | |
if len(value) > 0: | |
value = value[0] | |
else: | |
value = None | |
value = round(value, 2) if isinstance(value, float) else None | |
return value | |
def parse_metrics_rows(meta, only_verified=False): | |
if not isinstance(meta["model-index"], list) or len(meta["model-index"]) == 0 or "results" not in meta["model-index"][0]: | |
return None | |
for result in meta["model-index"][0]["results"]: | |
if not isinstance(result, dict) or "dataset" not in result or "metrics" not in result or "type" not in result["dataset"]: | |
continue | |
dataset = result["dataset"]["type"] | |
row = {"dataset": dataset, "split": "-unspecified-", "config": "-unspecified-"} | |
if "split" in result["dataset"]: | |
row["split"] = result["dataset"]["split"] | |
if "config" in result["dataset"]: | |
row["config"] = result["dataset"]["config"] | |
no_results = True | |
for metric in result["metrics"]: | |
name = metric["type"].lower().strip() | |
if name in ("model_id", "dataset", "split", "config"): | |
# Metrics are not allowed to be named "dataset", "split", "config". | |
continue | |
value = parse_metric_value(metric.get("value", None)) | |
if value is None: | |
continue | |
if name in row: | |
new_metric_better = value < row[name] if name in ascending_metrics else value > row[name] | |
if name not in row or new_metric_better: | |
# overwrite the metric if the new value is better. | |
if only_verified: | |
if "verified" in metric and metric["verified"]: | |
no_results = False | |
row[name] = value | |
else: | |
no_results = False | |
row[name] = value | |
if no_results: | |
continue | |
yield row | |
def get_data_wrapper(): | |
def get_data(): | |
data = [] | |
verified_data = [] | |
model_ids = get_model_ids()[:100] | |
model_ids_from_autoeval = set(get_model_ids(author="autoevaluate")) | |
for model_id in tqdm(model_ids): | |
meta = get_metadata(model_id) | |
if meta is None: | |
continue | |
for row in parse_metrics_rows(meta): | |
if row is None: | |
continue | |
row["model_id"] = model_id | |
data.append(row) | |
for row in parse_metrics_rows(meta, only_verified=True): | |
if row is None: | |
continue | |
row["model_id"] = model_id | |
verified_data.append(row) | |
dataframe = pd.DataFrame.from_records(data) | |
dataframe.to_pickle("cache.pkl") | |
verified_dataframe = pd.DataFrame.from_records(verified_data) | |
verified_dataframe.to_pickle("verified_cache.pkl") | |
if exists("cache.pkl") and exists("verified_cache.pkl"): | |
# If we have saved the results previously, call an asynchronous process | |
# to fetch the results and update the saved file. Don't make users wait | |
# while we fetch the new results. Instead, display the old results for | |
# now. The new results should be loaded when this method | |
# is called again. | |
dataframe = pd.read_pickle("cache.pkl") | |
verified_dataframe = pd.read_pickle("verified_cache.pkl") | |
t = threading.Thread(name='get_data procs', target=get_data) | |
t.start() | |
else: | |
# We have to make the users wait during the first startup of this app. | |
get_data() | |
dataframe = pd.read_pickle("cache.pkl") | |
verified_dataframe = pd.read_pickle("verified_cache.pkl") | |
return dataframe, verified_dataframe | |
dataframe, verified_dataframe = get_data_wrapper() | |
st.markdown("# π€ Leaderboards") | |
only_verified_results = st.sidebar.checkbox( | |
"Filter for Verified Results", | |
) | |
if only_verified_results: | |
dataframe = verified_dataframe | |
selectable_datasets = list(set(dataframe.dataset.tolist())) | |
query_params = st.experimental_get_query_params() | |
default_dataset = "common_voice" | |
if "dataset" in query_params: | |
if len(query_params["dataset"]) > 0 and query_params["dataset"][0] in selectable_datasets: | |
default_dataset = query_params["dataset"][0] | |
dataset = st.sidebar.selectbox( | |
"Dataset", | |
selectable_datasets, | |
index=selectable_datasets.index(default_dataset), | |
) | |
st.experimental_set_query_params(**{"dataset": [dataset]}) | |
dataset_df = dataframe[dataframe.dataset == dataset] | |
dataset_df = dataset_df.dropna(axis="columns", how="all") | |
selectable_configs = list(set(dataset_df["config"])) | |
config = st.sidebar.selectbox( | |
"Config", | |
selectable_configs, | |
) | |
dataset_df = dataset_df[dataset_df.config == config] | |
selectable_splits = list(set(dataset_df["split"])) | |
split = st.sidebar.selectbox( | |
"Split", | |
selectable_splits, | |
) | |
dataset_df = dataset_df[dataset_df.split == split] | |
selectable_metrics = list(filter(lambda column: column not in ("model_id", "dataset", "split", "config"), dataset_df.columns)) | |
dataset_df = dataset_df.filter(["model_id"] + selectable_metrics) | |
dataset_df = dataset_df.dropna(thresh=2) # Want at least two non-na values (one for model_id and one for a metric). | |
sorting_metric = st.sidebar.radio( | |
"Sorting Metric", | |
selectable_metrics, | |
) | |
st.markdown( | |
"Please click on the model's name to be redirected to its model card." | |
) | |
st.markdown( | |
"Want to beat the leaderboard? Don't see your model here? Simply request an automatic evaluation [here](https://huggingface.co/spaces/autoevaluate/autoevaluate)." | |
) | |
# Make the default metric appear right after model names | |
cols = dataset_df.columns.tolist() | |
cols.remove(sorting_metric) | |
cols = cols[:1] + [sorting_metric] + cols[1:] | |
dataset_df = dataset_df[cols] | |
# Sort the leaderboard, giving the sorting metric highest priority and then ordering by other metrics in the case of equal values. | |
dataset_df = dataset_df.sort_values(by=cols[1:], ascending=[metric in ascending_metrics for metric in cols[1:]]) | |
dataset_df = dataset_df.replace(np.nan, '-') | |
# Make the leaderboard | |
gb = GridOptionsBuilder.from_dataframe(dataset_df) | |
gb.configure_default_column(sortable=False) | |
gb.configure_column( | |
"model_id", | |
cellRenderer=JsCode('''function(params) {return '<a target="_blank" href="https://huggingface.co/'+params.value+'">'+params.value+'</a>'}'''), | |
) | |
for name in selectable_metrics: | |
gb.configure_column(name, type=["numericColumn","numberColumnFilter","customNumericFormat"], precision=2, aggFunc='sum') | |
gb.configure_column( | |
sorting_metric, | |
sortable=True, | |
cellStyle=JsCode('''function(params) { return {'backgroundColor': '#FFD21E'}}''') | |
) | |
go = gb.build() | |
AgGrid(dataset_df, gridOptions=go, allow_unsafe_jscode=True) | |