import os import json import requests import gradio as gr import pandas as pd from huggingface_hub import HfApi, hf_hub_download, snapshot_download from huggingface_hub.repocard import metadata_load from apscheduler.schedulers.background import BackgroundScheduler from tqdm.contrib.concurrent import thread_map from utils import make_clickable_model, make_clickable_user DATASET_REPO_URL = ( "https://huggingface.co/datasets/hivex-research/hivex-leaderboard-data" ) DATASET_REPO_ID = "hivex-research/hivex-leaderboard-data" HF_TOKEN = os.environ.get("HF_TOKEN") block = gr.Blocks() api = HfApi(token=HF_TOKEN) hivex_envs = [ { "hivex_env": "hivex-wind-farm-control", "task_count": 2, }, { "hivex_env": "hivex-wildfire-resource-management", "task_count": 3, }, { "hivex_env": "hivex-drone-based-reforestation", "task_count": 7, }, { "hivex_env": "hivex-ocean-plastic-collection", "task_count": 4, }, { "hivex_env": "hivex-aerial-wildfire-suppression", "task_count": 9, }, ] def restart(): print("RESTART") api.restart_space(repo_id="hivex-research/hivex-leaderboard") def download_leaderboard_dataset(): path = snapshot_download(repo_id=DATASET_REPO_ID, repo_type="dataset") return path def get_model_ids(hivex_env): api = HfApi() models = api.list_models(filter=hivex_env) model_ids = [x.modelId for x in models] return model_ids def get_metadata(model_id): try: readme_path = hf_hub_download(model_id, filename="README.md", etag_timeout=180) return metadata_load(readme_path) except requests.exceptions.HTTPError: # 404 README.md not found return None # def parse_metrics_accuracy(meta): # if "model-index" not in meta: # return None # result = meta["model-index"][0]["results"] # metrics = result[0]["metrics"] # accuracy = metrics[0]["value"] # return accuracy # def parse_rewards(accuracy): # default_std = -1000 # default_reward = -1000 # if accuracy != None: # accuracy = str(accuracy) # parsed = accuracy.split("+/-") # if len(parsed) > 1: # mean_reward = float(parsed[0].strip()) # std_reward = float(parsed[1].strip()) # elif len(parsed) == 1: # only mean reward # mean_reward = float(parsed[0].strip()) # std_reward = float(0) # else: # mean_reward = float(default_std) # std_reward = float(default_reward) # else: # mean_reward = float(default_std) # std_reward = float(default_reward) # return mean_reward, std_reward def rank_dataframe(dataframe): dataframe = dataframe.sort_values( by=["Cumulative Reward", "User", "Model"], ascending=False ) if not "Ranking" in dataframe.columns: dataframe.insert(0, "Ranking", [i for i in range(1, len(dataframe) + 1)]) else: dataframe["Ranking"] = [i for i in range(1, len(dataframe) + 1)] return dataframe def update_leaderboard_dataset_parallel(hivex_env, path): # Get model ids associated with hivex_env model_ids = get_model_ids(hivex_env) def process_model(model_id): meta = get_metadata(model_id) # LOADED_MODEL_METADATA[model_id] = meta if meta is not None else '' if meta is None: return None user_id = model_id.split("/")[0] row = {} row["User"] = user_id row["Model"] = model_id # accuracy = parse_metrics_accuracy(meta) # mean_reward, std_reward = parse_rewards(accuracy) # mean_reward = mean_reward if not pd.isna(mean_reward) else 0 # std_reward = std_reward if not pd.isna(std_reward) else 0 # row["Results"] = mean_reward - std_reward # row["Mean Reward"] = mean_reward # row["Std Reward"] = std_reward results = meta["model-index"][0]["results"][0]["metrics"] for result in results: row[result["name"]] = float(result["value"].split("+/-")[0].strip()) return row data = list(thread_map(process_model, model_ids, desc="Processing models")) # Filter out None results (models with no metadata) data = [row for row in data if row is not None] ranked_dataframe = rank_dataframe(pd.DataFrame.from_records(data)) new_history = ranked_dataframe file_path = path + "/" + hivex_env + ".csv" new_history.to_csv(file_path, index=False) return ranked_dataframe def run_update_dataset(): path_ = download_leaderboard_dataset() for i in range(0, len(hivex_envs)): hivex_env = hivex_envs[i] update_leaderboard_dataset_parallel(hivex_env["hivex_env"], path_) api.upload_folder( folder_path=path_, repo_id="hivex-research/hivex-leaderboard-data", repo_type="dataset", commit_message="Update dataset", ) def get_data(rl_env, path) -> pd.DataFrame: """ Get data from rl_env :return: data as a pandas DataFrame """ csv_path = path + "/" + rl_env + ".csv" data = pd.read_csv(csv_path) for index, row in data.iterrows(): user_id = row["User"] data.loc[index, "User"] = make_clickable_user(user_id) model_id = row["Model"] data.loc[index, "Model"] = make_clickable_model(model_id) return data def get_data_no_html(rl_env, path) -> pd.DataFrame: """ Get data from rl_env :return: data as a pandas DataFrame """ csv_path = path + "/" + rl_env + ".csv" data = pd.read_csv(csv_path) return data run_update_dataset() main_block = gr.Blocks() with main_block: with gr.Row(elem_id="header-row"): # TITLE + "

Total models: " + str(len(HARD_LEADERBOARD_DF))+ "

" gr.HTML("

HIVEX-Leaderboard

") # gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") with gr.Tabs(elem_classes="tab-buttons") as tabs: for i in range(0, len(hivex_envs)): hivex_env = hivex_envs[i] with gr.Tab(hivex_env["hivex_env"]) as tab: for j in range(0, hivex_env["task_count"]): task = "Task " + str(j + 1) with gr.Row(): gr_dataframe = gr.components.Dataframe(value=get_data(rl_env["rl_env"], path_), headers=["User", "Model", "Cumulative Reward"], datatype=["markdown", "markdown", "number"], row_count=(100, 'fixed')) # with gr.Tab("💎 Hard Set") as hard_tabs: # with gr.TabItem( # "🏅 Benchmark", elem_id="llm-benchmark-tab-table", id="hard_bench" # ): # gr.DataTable( # get_data( # "hivex-wind-farm-control", "datasets/hivex-leaderboard-data" # ), # elem_id="hard_benchmark_table", # elem_classes="table", # ) scheduler = BackgroundScheduler() scheduler.add_job(restart, "interval", seconds=86400) scheduler.start() block.launch()