felix
sync with upstream
97453a2
raw
history blame contribute delete
No virus
8.72 kB
import gradio as gr
import json
import os
from datetime import datetime, timezone
import pandas as pd
from huggingface_hub import snapshot_download
from src.display.about import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT,
INTRODUCTION_TEXT,
LLM_BENCHMARKS_TEXT,
FAQ_TEXT,
TITLE,
)
from src.display.css_html_js import custom_css
from src.display.utils import (
BENCHMARK_COLS,
COLS,
EVAL_COLS,
EVAL_TYPES,
NUMERIC_INTERVALS,
TYPES,
AutoEvalColumn,
ModelType,
fields,
WeightType,
Precision
)
from src.envs import API, EVAL_REQUESTS_PATH, DYNAMIC_INFO_REPO, DYNAMIC_INFO_FILE_PATH, DYNAMIC_INFO_PATH, EVAL_RESULTS_PATH, H4_TOKEN, IS_PUBLIC, QUEUE_REPO, REPO_ID, RESULTS_REPO
from src.populate import get_evaluation_queue_df, get_leaderboard_df
from src.submission.submit import add_new_eval
from src.tools.collections import update_collections
from src.tools.plots import (
create_metric_plot_obj,
create_plot_df,
create_scores_df,
)
def restart_space():
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN)
def init_space():
try:
print(EVAL_REQUESTS_PATH)
snapshot_download(
repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30
)
except Exception:
restart_space()
try:
print(DYNAMIC_INFO_PATH)
snapshot_download(
repo_id=DYNAMIC_INFO_REPO, local_dir=DYNAMIC_INFO_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30
)
except Exception:
restart_space()
try:
print(EVAL_RESULTS_PATH)
snapshot_download(
repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30
)
except Exception:
restart_space()
raw_data, original_df = get_leaderboard_df(
results_path=EVAL_RESULTS_PATH,
requests_path=EVAL_REQUESTS_PATH,
dynamic_path=DYNAMIC_INFO_FILE_PATH,
cols=COLS,
benchmark_cols=BENCHMARK_COLS
)
update_collections(original_df.copy())
leaderboard_df = original_df.copy()
plot_df = create_plot_df(create_scores_df(raw_data))
(
finished_eval_queue_df,
running_eval_queue_df,
pending_eval_queue_df,
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
return leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df
leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df = init_space()
# Searching and filtering
def update_table(
hidden_df: pd.DataFrame,
columns: list,
type_query: list,
precision_query: str,
size_query: list,
show_deleted: bool,
show_merges: bool,
show_moe: bool,
show_flagged: bool,
query: str,
):
filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted, show_merges, show_moe, show_flagged)
filtered_df = filter_queries(query, filtered_df)
df = select_columns(filtered_df, columns)
return df
def load_query(request: gr.Request): # triggered only once at startup => read query parameter if it exists
query = request.query_params.get("query") or ""
return query, query # return one for the "search_bar", one for a hidden component that triggers a reload only if value has changed
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame:
return df[(df[AutoEvalColumn.dummy.name].str.contains(query, case=False))]
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame:
always_here_cols = [c.name for c in fields(AutoEvalColumn) if c.never_hidden]
dummy_col = [AutoEvalColumn.dummy.name]
#AutoEvalColumn.model_type_symbol.name,
#AutoEvalColumn.model.name,
# We use COLS to maintain sorting
filtered_df = df[
always_here_cols + [c for c in COLS if c in df.columns and c in columns] + dummy_col
]
return filtered_df
def filter_queries(query: str, filtered_df: pd.DataFrame):
"""Added by Abishek"""
final_df = []
if query != "":
queries = [q.strip() for q in query.split(";")]
for _q in queries:
_q = _q.strip()
if _q != "":
temp_filtered_df = search_table(filtered_df, _q)
if len(temp_filtered_df) > 0:
final_df.append(temp_filtered_df)
if len(final_df) > 0:
filtered_df = pd.concat(final_df)
filtered_df = filtered_df.drop_duplicates(
subset=[AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name]
)
return filtered_df
def filter_models(
df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool, show_merges: bool, show_moe:bool, show_flagged: bool
) -> pd.DataFrame:
# Show all models
if show_deleted:
filtered_df = df
else: # Show only still on the hub models
filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True]
if not show_merges:
filtered_df = filtered_df[filtered_df[AutoEvalColumn.merged.name] == False]
if not show_moe:
filtered_df = filtered_df[filtered_df[AutoEvalColumn.moe.name] == False]
if not show_flagged:
filtered_df = filtered_df[filtered_df[AutoEvalColumn.flagged.name] == False]
type_emoji = [t[0] for t in type_query]
filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)]
filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])]
numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query]))
params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce")
mask = params_column.apply(lambda x: any(numeric_interval.contains(x)))
filtered_df = filtered_df.loc[mask]
return filtered_df
leaderboard_df = filter_models(
df=leaderboard_df,
type_query=[t.to_str(" : ") for t in ModelType],
size_query=list(NUMERIC_INTERVALS.keys()),
precision_query=[i.value.name for i in Precision],
show_deleted=False,
show_merges=False,
show_moe=True,
show_flagged=False
)
import unicodedata
def is_valid_unicode(char):
try:
unicodedata.name(char)
return True # Valid Unicode character
except ValueError:
return False # Invalid Unicode character
def remove_invalid_unicode(input_string):
if isinstance(input_string, str):
valid_chars = [char for char in input_string if is_valid_unicode(char)]
return ''.join(valid_chars)
else:
return input_string # Return non-string values as is
dummy1 = gr.Textbox(visible=False)
hidden_leaderboard_table_for_search = gr.components.Dataframe(
headers=COLS,
datatype=TYPES,
visible=False,
line_breaks=False,
interactive=False
)
def display(x, y):
# Assuming df is your DataFrame
for column in leaderboard_df.columns:
if leaderboard_df[column].dtype == 'object':
leaderboard_df[column] = leaderboard_df[column].apply(remove_invalid_unicode)
subset_df = leaderboard_df[COLS]
# Ensure the output directory exists
#output_dir = 'output'
#if not os.path.exists(output_dir):
# os.makedirs(output_dir)
#
## Save JSON to a file in the output directory
#output_file_path = os.path.join(output_dir, 'output.json')
#with open(output_file_path, 'w') as file:
# file.write(subset_df.to_json(orient='records'))
#first_50_rows = subset_df.head(50)
#print(first_50_rows.to_string())
#json_data = first_50_rows.to_json(orient='records')
#print(json_data) # Print JSON representation
return subset_df
INTRODUCTION_TEXT = """
This is a copied space from Open Source LLM leaderboard. Instead of displaying
the results as table the space simply provides a gradio API interface to access
the full leaderboard data easily.
Example python on how to access the data:
```python
from gradio_client import Client
import json
client = Client("https://felixz-open-llm-leaderboard.hf.space/")
json_data = client.predict("","", api_name='/predict')
with open(json_data, 'r') as file:
file_data = file.read()
# Load the JSON data
data = json.loads(file_data)
# Get the headers and the data
headers = data['headers']
data = data['data']
```
"""
interface = gr.Interface(
fn=display,
inputs=[gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text"), dummy1],
outputs=[hidden_leaderboard_table_for_search]
)
interface.launch()