import gradio as gr import json import os from datetime import datetime, timezone import pandas as pd from huggingface_hub import snapshot_download from src.display.about import ( CITATION_BUTTON_LABEL, CITATION_BUTTON_TEXT, EVALUATION_QUEUE_TEXT, INTRODUCTION_TEXT, LLM_BENCHMARKS_TEXT, FAQ_TEXT, TITLE, ) from src.display.css_html_js import custom_css from src.display.utils import ( BENCHMARK_COLS, COLS, EVAL_COLS, EVAL_TYPES, NUMERIC_INTERVALS, TYPES, AutoEvalColumn, ModelType, fields, WeightType, Precision ) from src.envs import API, EVAL_REQUESTS_PATH, DYNAMIC_INFO_REPO, DYNAMIC_INFO_FILE_PATH, DYNAMIC_INFO_PATH, EVAL_RESULTS_PATH, H4_TOKEN, IS_PUBLIC, QUEUE_REPO, REPO_ID, RESULTS_REPO from src.populate import get_evaluation_queue_df, get_leaderboard_df from src.submission.submit import add_new_eval from src.tools.collections import update_collections from src.tools.plots import ( create_metric_plot_obj, create_plot_df, create_scores_df, ) def restart_space(): API.restart_space(repo_id=REPO_ID, token=H4_TOKEN) def init_space(): try: print(EVAL_REQUESTS_PATH) snapshot_download( repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30 ) except Exception: restart_space() try: print(DYNAMIC_INFO_PATH) snapshot_download( repo_id=DYNAMIC_INFO_REPO, local_dir=DYNAMIC_INFO_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30 ) except Exception: restart_space() try: print(EVAL_RESULTS_PATH) snapshot_download( repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30 ) except Exception: restart_space() raw_data, original_df = get_leaderboard_df( results_path=EVAL_RESULTS_PATH, requests_path=EVAL_REQUESTS_PATH, dynamic_path=DYNAMIC_INFO_FILE_PATH, cols=COLS, benchmark_cols=BENCHMARK_COLS ) update_collections(original_df.copy()) leaderboard_df = original_df.copy() plot_df = create_plot_df(create_scores_df(raw_data)) ( finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df, ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) return leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df = init_space() # Searching and filtering def update_table( hidden_df: pd.DataFrame, columns: list, type_query: list, precision_query: str, size_query: list, show_deleted: bool, show_merges: bool, show_moe: bool, show_flagged: bool, query: str, ): filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted, show_merges, show_moe, show_flagged) filtered_df = filter_queries(query, filtered_df) df = select_columns(filtered_df, columns) return df def load_query(request: gr.Request): # triggered only once at startup => read query parameter if it exists query = request.query_params.get("query") or "" return query, query # return one for the "search_bar", one for a hidden component that triggers a reload only if value has changed def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame: return df[(df[AutoEvalColumn.dummy.name].str.contains(query, case=False))] def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame: always_here_cols = [c.name for c in fields(AutoEvalColumn) if c.never_hidden] dummy_col = [AutoEvalColumn.dummy.name] #AutoEvalColumn.model_type_symbol.name, #AutoEvalColumn.model.name, # We use COLS to maintain sorting filtered_df = df[ always_here_cols + [c for c in COLS if c in df.columns and c in columns] + dummy_col ] return filtered_df def filter_queries(query: str, filtered_df: pd.DataFrame): """Added by Abishek""" final_df = [] if query != "": queries = [q.strip() for q in query.split(";")] for _q in queries: _q = _q.strip() if _q != "": temp_filtered_df = search_table(filtered_df, _q) if len(temp_filtered_df) > 0: final_df.append(temp_filtered_df) if len(final_df) > 0: filtered_df = pd.concat(final_df) filtered_df = filtered_df.drop_duplicates( subset=[AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name] ) return filtered_df def filter_models( df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool, show_merges: bool, show_moe:bool, show_flagged: bool ) -> pd.DataFrame: # Show all models if show_deleted: filtered_df = df else: # Show only still on the hub models filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True] if not show_merges: filtered_df = filtered_df[filtered_df[AutoEvalColumn.merged.name] == False] if not show_moe: filtered_df = filtered_df[filtered_df[AutoEvalColumn.moe.name] == False] if not show_flagged: filtered_df = filtered_df[filtered_df[AutoEvalColumn.flagged.name] == False] type_emoji = [t[0] for t in type_query] filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)] filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])] numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query])) params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce") mask = params_column.apply(lambda x: any(numeric_interval.contains(x))) filtered_df = filtered_df.loc[mask] return filtered_df leaderboard_df = filter_models( df=leaderboard_df, type_query=[t.to_str(" : ") for t in ModelType], size_query=list(NUMERIC_INTERVALS.keys()), precision_query=[i.value.name for i in Precision], show_deleted=False, show_merges=False, show_moe=True, show_flagged=False ) import unicodedata def is_valid_unicode(char): try: unicodedata.name(char) return True # Valid Unicode character except ValueError: return False # Invalid Unicode character def remove_invalid_unicode(input_string): if isinstance(input_string, str): valid_chars = [char for char in input_string if is_valid_unicode(char)] return ''.join(valid_chars) else: return input_string # Return non-string values as is dummy1 = gr.Textbox(visible=False) hidden_leaderboard_table_for_search = gr.components.Dataframe( headers=COLS, datatype=TYPES, visible=False, line_breaks=False, interactive=False ) def display(x, y): # Assuming df is your DataFrame for column in leaderboard_df.columns: if leaderboard_df[column].dtype == 'object': leaderboard_df[column] = leaderboard_df[column].apply(remove_invalid_unicode) subset_df = leaderboard_df[COLS] # Ensure the output directory exists #output_dir = 'output' #if not os.path.exists(output_dir): # os.makedirs(output_dir) # ## Save JSON to a file in the output directory #output_file_path = os.path.join(output_dir, 'output.json') #with open(output_file_path, 'w') as file: # file.write(subset_df.to_json(orient='records')) #first_50_rows = subset_df.head(50) #print(first_50_rows.to_string()) #json_data = first_50_rows.to_json(orient='records') #print(json_data) # Print JSON representation return subset_df INTRODUCTION_TEXT = """ This is a copied space from Open Source LLM leaderboard. Instead of displaying the results as table the space simply provides a gradio API interface to access the full leaderboard data easily. Example python on how to access the data: ```python from gradio_client import Client import json client = Client("https://felixz-open-llm-leaderboard.hf.space/") json_data = client.predict("","", api_name='/predict') with open(json_data, 'r') as file: file_data = file.read() # Load the JSON data data = json.loads(file_data) # Get the headers and the data headers = data['headers'] data = data['data'] ``` """ interface = gr.Interface( fn=display, inputs=[gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text"), dummy1], outputs=[hidden_leaderboard_table_for_search] ) interface.launch()