import os import json import requests import gradio as gr import pandas as pd from huggingface_hub import HfApi, hf_hub_download, snapshot_download from huggingface_hub.repocard import metadata_load from apscheduler.schedulers.background import BackgroundScheduler from tqdm.contrib.concurrent import thread_map from utils import make_clickable_model, make_clickable_user DATASET_REPO_URL = ( "https://huggingface.co/datasets/hivex-research/hivex-leaderboard-data" ) DATASET_REPO_ID = "hivex-research/hivex-leaderboard-data" HF_TOKEN = os.environ.get("HF_TOKEN") block = gr.Blocks() api = HfApi(token=HF_TOKEN) hivex_envs = [ { "title": "Wind Farm Control", "hivex_env": "hivex-wind-farm-control", "task_count": 2, }, { "title": "Wildfire Resource Management", "hivex_env": "hivex-wildfire-resource-management", "task_count": 3, }, { "title": "Drone-Based Reforestation", "hivex_env": "hivex-drone-based-reforestation", "task_count": 7, }, { "title": "Ocean Plastic Collection", "hivex_env": "hivex-ocean-plastic-collection", "task_count": 4, }, { "title": "Aerial Wildfire Suppression", "hivex_env": "hivex-aerial-wildfire-suppression", "task_count": 9, }, ] def restart(): print("RESTART") api.restart_space(repo_id="hivex-research/hivex-leaderboard") def download_leaderboard_dataset(): path = snapshot_download(repo_id=DATASET_REPO_ID, repo_type="dataset") return path def get_model_ids(hivex_env): api = HfApi() models = api.list_models(filter=hivex_env) model_ids = [x.modelId for x in models] return model_ids def get_metadata(model_id): try: readme_path = hf_hub_download(model_id, filename="README.md", etag_timeout=180) return metadata_load(readme_path) except requests.exceptions.HTTPError: # 404 README.md not found return None # def parse_metrics_accuracy(meta): # if "model-index" not in meta: # return None # result = meta["model-index"][0]["results"] # metrics = result[0]["metrics"] # accuracy = metrics[0]["value"] # return accuracy # def parse_rewards(accuracy): # default_std = -1000 # default_reward = -1000 # if accuracy != None: # accuracy = str(accuracy) # parsed = accuracy.split("+/-") # if len(parsed) > 1: # mean_reward = float(parsed[0].strip()) # std_reward = float(parsed[1].strip()) # elif len(parsed) == 1: # only mean reward # mean_reward = float(parsed[0].strip()) # std_reward = float(0) # else: # mean_reward = float(default_std) # std_reward = float(default_reward) # else: # mean_reward = float(default_std) # std_reward = float(default_reward) # return mean_reward, std_reward # def rank_dataframe(dataframe): # dataframe = dataframe.sort_values( # by=["Cumulative Reward", "User", "Model"], ascending=False # ) # if not "Ranking" in dataframe.columns: # dataframe.insert(0, "Ranking", [i for i in range(1, len(dataframe) + 1)]) # else: # dataframe["Ranking"] = [i for i in range(1, len(dataframe) + 1)] # return dataframe def update_leaderboard_dataset_parallel(hivex_env, path): # Get model ids associated with hivex_env model_ids = get_model_ids(hivex_env) def process_model(model_id): meta = get_metadata(model_id) # LOADED_MODEL_METADATA[model_id] = meta if meta is not None else '' if meta is None: return None user_id = model_id.split("/")[0] row = {} row["User"] = user_id row["Model"] = model_id results = meta["model-index"][0]["results"][0] row["Task"] = results["task"]["task-id"] results_metrics = results["metrics"] for result in results_metrics: row[result["name"]] = float(result["value"].split("+/-")[0].strip()) return row data = list(thread_map(process_model, model_ids, desc="Processing models")) # Filter out None results (models with no metadata) data = [row for row in data if row is not None] # ranked_dataframe = rank_dataframe(pd.DataFrame.from_records(data)) # new_history = ranked_dataframe new_history = pd.DataFrame.from_records(data) file_path = path + "/" + hivex_env + ".csv" new_history.to_csv(file_path, index=False) return ranked_dataframe def run_update_dataset(): path_ = download_leaderboard_dataset() for i in range(0, len(hivex_envs)): hivex_env = hivex_envs[i] update_leaderboard_dataset_parallel(hivex_env["hivex_env"], path_) api.upload_folder( folder_path=path_, repo_id="hivex-research/hivex-leaderboard-data", repo_type="dataset", commit_message="Update dataset", ) def get_data(rl_env, task, path) -> pd.DataFrame: """ Get data from rl_env, filter by the given task, and drop the Task column. :return: filtered data as a pandas DataFrame without the Task column """ csv_path = path + "/" + rl_env + ".csv" data = pd.read_csv(csv_path) # Filter the data to only include rows where the "Task" column matches the given task filtered_data = data[data["Task"] == task] # Drop the "Task" column filtered_data = filtered_data.drop(columns=["Task"]) # Convert User and Model columns to clickable links for index, row in filtered_data.iterrows(): user_id = row["User"] filtered_data.loc[index, "User"] = make_clickable_user(user_id) model_id = row["Model"] filtered_data.loc[index, "Model"] = make_clickable_model(model_id) return filtered_data run_update_dataset() block = gr.Blocks() with block: with gr.Row(elem_id="header-row"): # TITLE + "

Total models: " + str(len(HARD_LEADERBOARD_DF))+ "

" gr.HTML("

HIVEX-Leaderboard

") path_ = download_leaderboard_dataset() # gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") # ENVIRONMENT TABS with gr.Tabs(elem_classes="tab-buttons") as tabs: for i in range(0, len(hivex_envs)): hivex_env = hivex_envs[i] with gr.Tab(hivex_env["title"]) as env_tabs: # TASK TABS for j in range(0, hivex_env["task_count"]): task = "Task " + str(j + 1) with gr.TabItem(f"Task {j}"): with gr.Row(): gr_dataframe = gr.components.Dataframe(value=get_data(hivex_env["hivex_env"], j, path_), headers=["User", "Model"], datatype=["markdown", "markdown"], row_count=(100, 'fixed')) scheduler = BackgroundScheduler() scheduler.add_job(restart, "interval", seconds=86400) scheduler.start() block.launch()