rodrigomasini's picture
Update app.py
22b332a verified
import gradio as gr
import json
import os
from datetime import datetime, timezone
from apscheduler.schedulers.background import BackgroundScheduler
import pandas as pd
from huggingface_hub import snapshot_download
from src.display.about import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT,
INTRODUCTION_TEXT,
LLM_BENCHMARKS_TEXT,
FAQ_TEXT,
TITLE,
)
from src.display.css_html_js import custom_css
from src.display.utils import (
BENCHMARK_COLS,
COLS,
EVAL_COLS,
EVAL_TYPES,
NUMERIC_INTERVALS,
TYPES,
AutoEvalColumn,
ModelType,
fields,
WeightType,
Precision
)
from src.envs import API, EVAL_REQUESTS_PATH, DYNAMIC_INFO_REPO, DYNAMIC_INFO_FILE_PATH, DYNAMIC_INFO_PATH, EVAL_RESULTS_PATH, H4_TOKEN, IS_PUBLIC, QUEUE_REPO, REPO_ID, RESULTS_REPO
from src.populate import get_evaluation_queue_df, get_leaderboard_df
from src.submission.submit import add_new_eval
from src.tools.collections import update_collections
from src.tools.plots import (
create_metric_plot_obj,
create_plot_df,
create_scores_df,
)
def restart_space():
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN)
def init_space():
try:
print(EVAL_REQUESTS_PATH)
snapshot_download(
repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30
)
except Exception:
restart_space()
try:
print(DYNAMIC_INFO_PATH)
snapshot_download(
repo_id=DYNAMIC_INFO_REPO, local_dir=DYNAMIC_INFO_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30
)
except Exception:
restart_space()
try:
print(EVAL_RESULTS_PATH)
snapshot_download(
repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30
)
except Exception:
restart_space()
raw_data, original_df = get_leaderboard_df(
results_path=EVAL_RESULTS_PATH,
requests_path=EVAL_REQUESTS_PATH,
dynamic_path=DYNAMIC_INFO_FILE_PATH,
cols=COLS,
benchmark_cols=BENCHMARK_COLS
)
update_collections(original_df.copy())
leaderboard_df = original_df.copy()
plot_df = create_plot_df(create_scores_df(raw_data))
(
finished_eval_queue_df,
running_eval_queue_df,
pending_eval_queue_df,
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
return leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df
leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df = init_space()
# Searching and filtering
def update_table(
hidden_df: pd.DataFrame,
columns: list,
type_query: list,
precision_query: str,
size_query: list,
show_deleted: bool,
show_merges: bool,
show_moe: bool,
show_flagged: bool,
query: str,
):
filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted, show_merges, show_moe, show_flagged)
filtered_df = filter_queries(query, filtered_df)
df = select_columns(filtered_df, columns)
return df
def load_query(request: gr.Request): # triggered only once at startup => read query parameter if it exists
query = request.query_params.get("query") or ""
return query, query # return one for the "search_bar", one for a hidden component that triggers a reload only if value has changed
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame:
return df[(df[AutoEvalColumn.dummy.name].str.contains(query, case=False))]
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame:
always_here_cols = [c.name for c in fields(AutoEvalColumn) if c.never_hidden]
dummy_col = [AutoEvalColumn.dummy.name]
#AutoEvalColumn.model_type_symbol.name,
#AutoEvalColumn.model.name,
# We use COLS to maintain sorting
filtered_df = df[
always_here_cols + [c for c in COLS if c in df.columns and c in columns] + dummy_col
]
return filtered_df
def filter_queries(query: str, filtered_df: pd.DataFrame):
"""Added by Abishek"""
final_df = []
if query != "":
queries = [q.strip() for q in query.split(";")]
for _q in queries:
_q = _q.strip()
if _q != "":
temp_filtered_df = search_table(filtered_df, _q)
if len(temp_filtered_df) > 0:
final_df.append(temp_filtered_df)
if len(final_df) > 0:
filtered_df = pd.concat(final_df)
filtered_df = filtered_df.drop_duplicates(
subset=[AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name]
)
return filtered_df
def filter_models(
df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool, show_merges: bool, show_moe:bool, show_flagged: bool
) -> pd.DataFrame:
# Show all models
if show_deleted:
filtered_df = df
else: # Show only still on the hub models
filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True]
if not show_merges:
filtered_df = filtered_df[filtered_df[AutoEvalColumn.merged.name] == False]
if not show_moe:
filtered_df = filtered_df[filtered_df[AutoEvalColumn.moe.name] == False]
if not show_flagged:
filtered_df = filtered_df[filtered_df[AutoEvalColumn.flagged.name] == False]
type_emoji = [t[0] for t in type_query]
filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)]
filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])]
numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query]))
params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce")
mask = params_column.apply(lambda x: any(numeric_interval.contains(x)))
filtered_df = filtered_df.loc[mask]
return filtered_df
leaderboard_df = filter_models(
df=leaderboard_df,
type_query=[t.to_str(" : ") for t in ModelType],
size_query=list(NUMERIC_INTERVALS.keys()),
precision_query=[i.value.name for i in Precision],
show_deleted=False,
show_merges=False,
show_moe=True,
show_flagged=False
)
import unicodedata
def is_valid_unicode(char):
try:
unicodedata.name(char)
return True # Valid Unicode character
except ValueError:
return False # Invalid Unicode character
def remove_invalid_unicode(input_string):
if isinstance(input_string, str):
valid_chars = [char for char in input_string if is_valid_unicode(char)]
return ''.join(valid_chars)
else:
return input_string # Return non-string values as is
dummy1 = gr.Textbox(visible=False)
hidden_leaderboard_table_for_search = gr.components.Dataframe(
headers=COLS,
datatype=TYPES,
visible=False,
line_breaks=False,
interactive=False
)
def display(x, y):
# Assuming df is your DataFrame
for column in leaderboard_df.columns:
if leaderboard_df[column].dtype == 'object':
leaderboard_df[column] = leaderboard_df[column].apply(remove_invalid_unicode)
subset_df = leaderboard_df[COLS]
return subset_df
INTRODUCTION_TEXT = """
This is a copied space from Open LLM Leaderboard. Instead of displaying
the results as table this space was modified to simply provides a gradio API interface.
Using the following python script below, users can access the full leaderboard data easily.
```python
# Import dependencies
from gradio_client import Client
# Initialize the Gradio client with the API URL
client = Client("https://rodrigomasini-data-only-enterprise-scenarios-leaderboard.hf.space/")
try:
# Perform the API call
response = client.predict("","", api_name='/predict')
# Check if response it's directly accessible
if len(response) > 0:
print("Response received!")
headers = response.get('headers', [])
data = response.get('data', [])
print(headers)
# Remove commenst if you want to download the dataset and save in csv format
# Specify the path to your CSV file
#csv_file_path = 'foundational-models-benchmark.csv'
# Open the CSV file for writing
#with open(csv_file_path, mode='w', newline='', encoding='utf-8') as file:
# writer = csv.writer(file)
# Write the headers
# writer.writerow(headers)
# Write the data
# for row in data:
# writer.writerow(row)
#print(f"Results saved to {csv_file_path}")
# If the above line prints a string that looks like JSON, you can parse it with json.loads(response)
# Otherwise, you might need to adjust based on the actual structure of `response`
except Exception as e:
print(f"An error occurred: {e}")
```
"""
interface = gr.Interface(
fn=display,
inputs=[gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text"), dummy1],
outputs=[hidden_leaderboard_table_for_search]
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=1800)
scheduler.start()
interface.launch()