Madhavan Iyengar
add basic error handling
7351c4b
raw
history blame
11.7 kB
import subprocess
import gradio as gr
import zipfile
import os
import shutil
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from huggingface_hub import snapshot_download, Repository, HfFolder
from src.about import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT,
INTRODUCTION_TEXT,
LLM_BENCHMARKS_TEXT,
TITLE,
)
from src.display.css_html_js import custom_css
from src.display.utils import (
BENCHMARK_COLS,
COLS,
EVAL_COLS,
EVAL_TYPES,
NUMERIC_INTERVALS,
TYPES,
AutoEvalColumn,
ModelType,
fields,
WeightType,
Precision
)
from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN, EVAL_REQUESTS_PATH_BACKEND, EVAL_RESULTS_PATH_BACKEND
from src.populate import get_evaluation_queue_df, get_leaderboard_df
from src.submission.submit import add_new_eval
from src.submission.evaluate import calculate_metrics
def handle_new_eval_submission(model_name, model_zip, model_link=None):
if not model_name:
return "Please enter a model name."
# check if the model name is already in the leaderboard
if model_name in leaderboard_df[AutoEvalColumn.model.name].values:
return "Model name already exists in the leaderboard. Please choose a different name."
# check if the zip file is actually a zip file
if model_zip is None or not zipfile.is_zipfile(model_zip):
return "Please upload a valid zip file."
# check if the model name is only one word
if len(model_name.split()) > 1:
return "Model name should be a single word with hyphens."
extraction_path = EVAL_RESULTS_PATH_BACKEND
if not os.path.exists(extraction_path):
os.makedirs(extraction_path)
# define path for the zip file to be extracted to
extraction_path = os.path.join(extraction_path, model_name)
if model_zip is not None:
with zipfile.ZipFile(model_zip, 'r') as zip_ref:
zip_ref.extractall(extraction_path)
print("File unzipped successfully to:", extraction_path)
# Evaluate the model's performance
calculate_metrics(extraction_path, model_name)
# upload to results repo
API.upload_file(
path_or_fileobj=os.path.join(os.getcwd(), EVAL_RESULTS_PATH, '3d-pope', model_name, 'results.json'),
path_in_repo=os.path.join('3d-pope', model_name, 'results.json'),
repo_id=RESULTS_REPO,
repo_type="dataset",
)
restart_space()
return "Submission received and results are being processed. Please check the leaderboard for updates."
def restart_space():
API.restart_space(repo_id=REPO_ID)
try:
print(EVAL_REQUESTS_PATH)
snapshot_download(
repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
try:
print(EVAL_RESULTS_PATH)
snapshot_download(
repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN
)
except Exception:
restart_space()
raw_data, original_df = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS)
leaderboard_df = original_df.copy()
def custom_format(x):
if pd.isna(x):
return x # Return as is if NaN
try:
float_x = float(x)
if float_x.is_integer():
return f"{int(float_x)}"
else:
return f"{float_x:.2f}".rstrip('0').rstrip('.')
except ValueError:
return x # Return as is if conversion to float fails
numeric_cols = [col for col in leaderboard_df.columns if leaderboard_df[col].dtype in ['float64', 'float32']]
leaderboard_df[numeric_cols] = leaderboard_df[numeric_cols].applymap(custom_format)
(
finished_eval_queue_df,
running_eval_queue_df,
pending_eval_queue_df,
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
# Searching and filtering
def update_table(
hidden_df: pd.DataFrame,
columns: list,
# type_query: list,
# precision_query: str,
# size_query: list,
# show_deleted: bool,
query: str,
):
# filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted)
filtered_df = filter_queries(query, hidden_df)
df = select_columns(filtered_df, columns)
return df
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame:
return df[(df[AutoEvalColumn.model.name].str.contains(query, case=False))]
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame:
always_here_cols = [
# AutoEvalColumn.model_type_symbol.name,
AutoEvalColumn.model.name,
]
# We use COLS to maintain sorting
filtered_df = df[
always_here_cols + [c for c in COLS if c in df.columns and c in columns]
]
return filtered_df
def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame:
final_df = []
if query != "":
queries = [q.strip() for q in query.split(";")]
for _q in queries:
_q = _q.strip()
if _q != "":
temp_filtered_df = search_table(filtered_df, _q)
if len(temp_filtered_df) > 0:
final_df.append(temp_filtered_df)
if len(final_df) > 0:
filtered_df = pd.concat(final_df)
existing_columns = [col for col in [AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name] if col in filtered_df.columns]
filtered_df = filtered_df.drop_duplicates(subset=existing_columns)
return filtered_df
def filter_models(
df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool
) -> pd.DataFrame:
# Show all models
# if show_deleted:
# filtered_df = df
# else: # Show only still on the hub models
# filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True]
filtered_df = df
type_emoji = [t[0] for t in type_query]
# filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)]
# filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])]
numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query]))
params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce")
mask = params_column.apply(lambda x: any(numeric_interval.contains(x)))
filtered_df = filtered_df.loc[mask]
return filtered_df
demo = gr.Blocks(css=custom_css)
with demo:
gr.HTML(TITLE)
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text")
with gr.Tabs(elem_classes="tab-buttons") as tabs:
with gr.TabItem("πŸ… 3D-POPE Benchmark", elem_id="llm-benchmark-tab-table", id=0):
with gr.Row():
with gr.Column():
with gr.Row():
search_bar = gr.Textbox(
placeholder=" πŸ” Search for your model (separate multiple queries with `;`) and press ENTER...",
show_label=False,
elem_id="search-bar",
)
with gr.Row():
shown_columns = gr.CheckboxGroup(
choices=[
c.name
for c in fields(AutoEvalColumn)
if not c.hidden and not c.never_hidden
],
value=[
c.name
for c in fields(AutoEvalColumn)
if c.displayed_by_default and not c.hidden and not c.never_hidden
],
label="Select columns to show",
elem_id="column-select",
interactive=True,
)
# with gr.Row():
# deleted_models_visibility = gr.Checkbox(
# value=False, label="Show gated/private/deleted models", interactive=True
# )
# with gr.Column(min_width=320):
#with gr.Box(elem_id="box-filter"):
leaderboard_table = gr.components.Dataframe(
value=leaderboard_df[
[c.name for c in fields(AutoEvalColumn) if c.never_hidden]
+ shown_columns.value
],
headers=[c.name for c in fields(AutoEvalColumn) if c.never_hidden] + shown_columns.value,
datatype=TYPES,
elem_id="leaderboard-table",
interactive=False,
visible=True,
)
# Dummy leaderboard for handling the case when the user uses backspace key
hidden_leaderboard_table_for_search = gr.components.Dataframe(
value=original_df[COLS],
headers=COLS,
datatype=TYPES,
visible=False,
)
search_bar.submit(
update_table,
[
hidden_leaderboard_table_for_search,
shown_columns,
# deleted_models_visibility,
search_bar,
],
leaderboard_table,
)
for selector in [shown_columns]:
selector.change(
update_table,
[
hidden_leaderboard_table_for_search,
shown_columns,
# deleted_models_visibility,
search_bar,
],
leaderboard_table,
queue=True,
)
with gr.TabItem("πŸ“ About", elem_id="llm-benchmark-tab-table", id=2):
gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text")
with gr.TabItem("πŸš€ Submit here! ", elem_id="llm-benchmark-tab-table", id=3):
with gr.Column():
with gr.Row():
gr.Markdown(EVALUATION_QUEUE_TEXT, elem_classes="markdown-text")
with gr.Row():
gr.Markdown("# πŸ“‹ Submit your results here!", elem_classes="markdown-text")
with gr.Row():
model_name_textbox = gr.Textbox(label="Model name")
model_zip_file = gr.File(label="Upload model prediction result ZIP file")
# model_link_textbox = gr.Textbox(label="Link to model page")
with gr.Row():
gr.Column()
with gr.Column(scale=2):
submit_button = gr.Button("Submit Model")
submission_result = gr.Markdown()
submit_button.click(
handle_new_eval_submission,
[model_name_textbox, model_zip_file],
submission_result
)
gr.Column()
with gr.Row():
with gr.Accordion("πŸ“™ Citation", open=False):
citation_button = gr.Textbox(
value=CITATION_BUTTON_TEXT,
label=CITATION_BUTTON_LABEL,
lines=20,
elem_id="citation-button",
show_copy_button=True,
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=1800)
scheduler.start()
demo.queue(default_concurrency_limit=40).launch()