FrontierBench / app.py
adpinzonp's picture
Upload folder using huggingface_hub
17f8527 verified
import re
import pandas as pd
import numpy as np
import gradio as gr
import plotly.graph_objects as go
from sklearn.experimental import enable_iterative_imputer # noqa: F401
from sklearn.impute import IterativeImputer, SimpleImputer
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
DEFAULT_SHEET_URL = "https://docs.google.com/spreadsheets/d/1ygw8nrqI-FdHzyQGczKR5n3t01d-9sxMB_KVoClhoAg/edit?gid=0#gid=0"
# Columnas con formato monetario
PRICE_COLS = ["Input price per 1MT", "Output price per 1MT"]
# ---------- Carga de Google Sheet ----------
def sheet_to_dataframe(sheet_url: str) -> pd.DataFrame:
m = re.search(r'/d/([a-zA-Z0-9-_]+)', sheet_url)
gid = re.search(r'gid=([0-9]+)', sheet_url)
if not m or not gid:
raise ValueError("Invalid Google Sheets URL")
sheet_id, gid = m.group(1), gid.group(1)
csv_url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv&gid={gid}"
return pd.read_csv(csv_url)
# ---------- Limpieza / parsing ----------
def _parse_percent_value(v):
if v is None or (isinstance(v, float) and np.isnan(v)):
return np.nan
if isinstance(v, (int, float)):
return float(v)
s = str(v).strip()
if not s or s.lower() in {"na", "n/a", "null", "none"}:
return np.nan
s = s.replace("%", "").replace(",", "").strip()
if s in {"-", "–", "—"}:
return np.nan
try:
return float(s)
except Exception:
return np.nan
def _split_columns(df: pd.DataFrame):
cols = list(df.columns)
if len(cols) < 4:
raise ValueError("Sheet must have at least 4 columns")
fixed = cols[:4]
benches = cols[4:]
return fixed, benches
def _clean_benchmarks(df: pd.DataFrame):
fixed, benches = _split_columns(df)
num = df.copy()
for c in benches:
num[c] = num[c].apply(_parse_percent_value)
return num, benches, fixed
# ---------- Estilos ----------
def _style_table(df_display: pd.DataFrame, benches,
cmap="RdYlGn", vmin=0.0, vmax=100.0,
precision=1, imputed_mask: pd.DataFrame | None = None) -> str:
styler = df_display.style.hide(axis="index")
styler = (
styler
.format({c: f"{{:.{precision}f}}%" for c in benches}, na_rep="N/A")
.background_gradient(axis=None, subset=benches, cmap=cmap, vmin=vmin, vmax=vmax)
.set_table_styles([
{"selector": "th", "props": [("position", "sticky"), ("top", "0"), ("background", "#111"), ("color", "white"), ("z-index", "1")]},
{"selector": "table", "props": [("border-collapse", "collapse"), ("font-family", "Inter, Roboto, Arial, sans-serif")]},
{"selector": "td, th", "props": [("border", "1px solid #333"), ("padding", "6px 8px")]},
{"selector": "tbody tr:nth-child(odd)", "props": [("background-color", "#161616")]},
{"selector": "tbody tr:nth-child(even)", "props": [("background-color", "#0f0f0f")]}
])
.set_properties(subset=df_display.columns[:4], **{"font-weight": "600"})
)
if imputed_mask is not None:
# imputed_mask debe tener mismas filas/columnas que df_display[benches]
def highlight(df):
styles = pd.DataFrame("", index=df.index, columns=df.columns)
for col in benches:
styles.loc[imputed_mask[col], col] = "border: 2px dashed yellow;"
return styles
styler = styler.apply(highlight, axis=None)
return styler.to_html()
# ---------- Filtros y orden ----------
def _filter_rows(df_raw: pd.DataFrame, df_num: pd.DataFrame, benches,
text_query, bench_choice, comparator, threshold):
"""Devuelve dataframes filtrados, conservando índices originales (sin reset)."""
mask = pd.Series(True, index=df_raw.index)
if text_query:
tq = str(text_query).strip().lower()
mc = (df_raw.iloc[:, 0].astype(str).str.lower().fillna("") + " " +
df_raw.iloc[:, 1].astype(str).str.lower().fillna(""))
mask &= mc.str.contains(tq, na=False)
if bench_choice == "Any":
bench_vals = df_num[benches]
if comparator == "≥":
mask &= bench_vals.ge(threshold).any(axis=1).fillna(False)
else:
mask &= bench_vals.le(threshold).any(axis=1).fillna(False)
elif bench_choice and bench_choice in benches:
col_vals = df_num[bench_choice]
mask &= (col_vals.ge(threshold) if comparator == "≥" else col_vals.le(threshold)).fillna(False)
return df_raw.loc[mask], df_num.loc[mask]
def _numeric_key_for_price(series: pd.Series) -> pd.Series:
"""Convierte strings con $ y comas a float para ordenar correctamente."""
key = series.astype(str).str.replace(r"[^\d\.\-]", "", regex=True)
return pd.to_numeric(key, errors="coerce")
def _sort_df(df_full: pd.DataFrame, sort_col: str, ascending: bool) -> pd.DataFrame:
"""Ordena por columna; para PRICE_COLS aplica orden numérico."""
if not sort_col:
return df_full
if sort_col in PRICE_COLS:
key = _numeric_key_for_price(df_full[sort_col])
return (
df_full.assign(_key=key)
.sort_values("_key", ascending=ascending, na_position="last")
.drop(columns="_key")
)
return df_full.sort_values(sort_col, ascending=ascending, na_position="last")
def _sort_with_mask(df_full: pd.DataFrame, mask: pd.DataFrame, sort_col: str, ascending: bool):
"""Ordena df y reordena la máscara imputed en la misma forma."""
if not sort_col:
return df_full, mask
if sort_col in PRICE_COLS:
key = _numeric_key_for_price(df_full[sort_col])
else:
key = df_full[sort_col]
order = pd.Series(key).sort_values(ascending=ascending, na_position="last").index
return df_full.loc[order], mask.loc[order]
# ---------- Correlación ----------
def _build_correlation_plot(df_num: pd.DataFrame, benches):
if not benches:
fig = go.Figure(); fig.update_layout(title="No benchmark columns found")
return fig
mat = df_num[benches].astype(float)
corr = mat.corr() if mat.shape[1] > 1 else pd.DataFrame([[1.0]], index=benches, columns=benches)
fig = go.Figure(data=go.Heatmap(
z=corr.values, x=list(corr.columns), y=list(corr.index),
colorscale="RdYlGn", zmin=-1, zmax=1, colorbar=dict(title="ρ"), hoverongaps=False
))
fig.update_layout(title="Correlation between benchmark variables",
xaxis_nticks=min(20, len(benches)),
yaxis_nticks=min(20, len(benches)),
margin=dict(l=60, r=20, t=60, b=60), height=600)
return fig
# ---------- Ciclos de carga y UI ----------
def fetch_and_prepare(url):
df_raw = sheet_to_dataframe(url)
df_num, benches, fixed = _clean_benchmarks(df_raw)
return df_raw, df_num, benches, fixed
def refetch_all(
t1_q, t1_bench, t1_op, t1_thr, t1_sort_col, t1_sort_dir,
t3_q, t3_bench, t3_op, t3_thr, t3_sort_col, t3_sort_dir
):
df_raw, df_num, benches, fixed = fetch_and_prepare(DEFAULT_SHEET_URL)
# Correlación
fig_corr = _build_correlation_plot(df_num, benches)
# ----- TAB 1: ORIGINAL -----
df1_raw, df1_num = _filter_rows(df_raw, df_num, benches, t1_q, t1_bench, t1_op, t1_thr)
df1_full = pd.concat([df1_raw.iloc[:, :4], df1_num[benches]], axis=1)
df1_full = _sort_df(df1_full, t1_sort_col, ascending=(t1_sort_dir == "asc"))
df1_full = df1_full.reset_index(drop=True)
html_tab1 = _style_table(df1_full, benches)
# ----- TAB 3: IMPUTED -----
bench_only = df_num[benches].astype(float)
orig_nan = bench_only.isna()
if bench_only.shape[1] > 1:
imputer = IterativeImputer(random_state=0, sample_posterior=False, max_iter=15, initial_strategy="mean")
bench_imp = pd.DataFrame(imputer.fit_transform(bench_only), columns=benches, index=bench_only.index)
else:
bench_imp = pd.DataFrame(SimpleImputer(strategy="mean").fit_transform(bench_only),
columns=benches, index=bench_only.index)
bench_imp = bench_imp.clip(lower=0.0)
df3_raw, df3_num = _filter_rows(df_raw, bench_imp, benches, t3_q, t3_bench, t3_op, t3_thr)
df3_full = pd.concat([df3_raw.iloc[:, :4], df3_num[benches]], axis=1)
mask3 = orig_nan.loc[df3_num.index] # Máscara alineada a las filas filtradas
df3_full, mask3 = _sort_with_mask(df3_full, mask3, t3_sort_col, ascending=(t3_sort_dir == "asc"))
df3_full = df3_full.reset_index(drop=True)
mask3 = mask3.reset_index(drop=True)
html_tab3 = _style_table(df3_full, benches, imputed_mask=mask3)
# Opciones de dropdown
bench_options = ["Any"] + benches
sort_options = fixed + benches
return (
html_tab1,
fig_corr,
html_tab3,
gr.update(choices=bench_options, value=t1_bench if t1_bench in bench_options else "Any"),
gr.update(choices=sort_options, value=(t1_sort_col if t1_sort_col in sort_options else "Input price per 1MT")),
gr.update(choices=bench_options, value=t3_bench if t3_bench in bench_options else "Any"),
gr.update(choices=sort_options, value=(t3_sort_col if t3_sort_col in sort_options else "Input price per 1MT")),
df_raw, df_num, benches, bench_imp, orig_nan
)
def filter_tab1(
s_df_raw, s_df_num, s_benches,
t1_q, t1_bench, t1_op, t1_thr,
t1_sort_col, t1_sort_dir
):
df1_raw, df1_num = _filter_rows(s_df_raw, s_df_num, s_benches, t1_q, t1_bench, t1_op, t1_thr)
df1_full = pd.concat([df1_raw.iloc[:, :4], df1_num[s_benches]], axis=1)
df1_full = _sort_df(df1_full, t1_sort_col, ascending=(t1_sort_dir == "asc")).reset_index(drop=True)
return _style_table(df1_full, s_benches)
def filter_tab3(
s_df_raw, s_bench_imp, s_benches, s_imput_mask,
t3_q, t3_bench, t3_op, t3_thr,
t3_sort_col, t3_sort_dir
):
df3_raw, df3_num = _filter_rows(s_df_raw, s_bench_imp, s_benches, t3_q, t3_bench, t3_op, t3_thr)
df3_full = pd.concat([df3_raw.iloc[:, :4], df3_num[s_benches]], axis=1)
mask3 = s_imput_mask.loc[df3_num.index]
df3_full, mask3 = _sort_with_mask(df3_full, mask3, t3_sort_col, ascending=(t3_sort_dir == "asc"))
df3_full = df3_full.reset_index(drop=True)
mask3 = mask3.reset_index(drop=True)
return _style_table(df3_full, s_benches, imputed_mask=mask3)
# ---------- UI ----------
with gr.Blocks(css="""
/* Scroll horizontal */
.table-wrap { overflow-x: auto; }
/* Oculta la columna de índice */
.table-wrap table th.row_heading,
.table-wrap table td.row_heading,
.table-wrap table th.blank {
display: none !important;
}
""") as demo:
gr.Markdown("## Reasoning Models Benchmarks")
with gr.Row():
reload_btn = gr.Button("Reload", variant="primary")
# States
s_df_raw = gr.State()
s_df_num = gr.State()
s_benches = gr.State()
s_bench_imp = gr.State()
s_imput_mask = gr.State()
with gr.Tabs():
# Tab 1: Original
with gr.Tab("Original table"):
with gr.Row():
t1_q = gr.Textbox(label="Filter: Model/Company contains", placeholder="e.g., llama", scale=2)
t1_bench = gr.Dropdown(choices=["Any"], value="Any", label="Benchmark")
t1_op = gr.Radio(choices=["≥", "≤"], value="≥", label="Comparator")
t1_thr = gr.Slider(minimum=0, maximum=100, value=0, step=1, label="Threshold (%)")
# Inicializa choices y value neutros; se actualizan en refetch_all
t1_sort_col = gr.Dropdown(choices=["Model","Company","Input price per 1MT","Output price per 1MT"],
value=None, label="Sort by")
t1_sort_dir = gr.Radio(choices=["asc", "desc"], value="asc", label="Direction")
t1_html = gr.HTML(elem_classes=["table-wrap"])
# Tab 2: Correlation
with gr.Tab("Correlation matrix"):
corr_plot = gr.Plot()
# Tab 3: Imputed
with gr.Tab("Imputed table"):
with gr.Row():
t3_q = gr.Textbox(label="Filter: Model/Company contains", placeholder="e.g., llama", scale=2)
t3_bench = gr.Dropdown(choices=["Any"], value="Any", label="Benchmark")
t3_op = gr.Radio(choices=["≥", "≤"], value="≥", label="Comparator")
t3_thr = gr.Slider(minimum=0, maximum=100, value=0, step=1, label="Threshold (%)")
t3_sort_col = gr.Dropdown(choices=["Model","Company","Input price per 1MT","Output price per 1MT"],
value=None, label="Sort by")
t3_sort_dir = gr.Radio(choices=["asc", "desc"], value="asc", label="Direction")
t3_html = gr.HTML(elem_classes=["table-wrap"])
# Load / Reload
args_reload = [
t1_q, t1_bench, t1_op, t1_thr, t1_sort_col, t1_sort_dir,
t3_q, t3_bench, t3_op, t3_thr, t3_sort_col, t3_sort_dir
]
outs_reload = [
t1_html, corr_plot, t3_html,
t1_bench, t1_sort_col,
t3_bench, t3_sort_col,
s_df_raw, s_df_num, s_benches, s_bench_imp, s_imput_mask
]
demo.load(refetch_all, inputs=args_reload, outputs=outs_reload)
reload_btn.click(refetch_all, inputs=args_reload, outputs=outs_reload)
# Eventos en vivo TAB 1
for comp in [t1_q, t1_bench, t1_op, t1_thr, t1_sort_col, t1_sort_dir]:
comp.change(
filter_tab1,
inputs=[s_df_raw, s_df_num, s_benches,
t1_q, t1_bench, t1_op, t1_thr,
t1_sort_col, t1_sort_dir],
outputs=[t1_html]
)
# Eventos en vivo TAB 3
for comp in [t3_q, t3_bench, t3_op, t3_thr, t3_sort_col, t3_sort_dir]:
comp.change(
filter_tab3,
inputs=[s_df_raw, s_bench_imp, s_benches, s_imput_mask,
t3_q, t3_bench, t3_op, t3_thr,
t3_sort_col, t3_sort_dir],
outputs=[t3_html]
)
if __name__ == "__main__":
demo.launch()