Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import fnmatch | |
import glob | |
import json | |
import logging | |
import os | |
import pprint | |
import gradio as gr | |
import gymnasium as gym | |
import numpy as np | |
import pandas as pd | |
import torch | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from huggingface_hub import hf_hub_download, snapshot_download | |
from huggingface_hub.utils._errors import EntryNotFoundError | |
from src.css_html_js import dark_mode_gradio_js | |
from src.envs import API, RESULTS_PATH, RESULTS_REPO, TOKEN | |
from src.logging import configure_root_logger, setup_logger | |
logging.getLogger("openai").setLevel(logging.WARNING) | |
logger = setup_logger(__name__) | |
configure_root_logger() | |
logger = setup_logger(__name__) | |
pp = pprint.PrettyPrinter(width=80) | |
ALL_ENV_IDS = list(gym.registry.keys()) | |
def model_hyperlink(link, model_id): | |
return f'<a target="_blank" href="{link}" style="color: var(--link-text-color); text-decoration: underline;text-decoration-style: dotted;">{model_id}</a>' | |
def make_clickable_model(model_id): | |
link = f"https://huggingface.co/{model_id}" | |
return model_hyperlink(link, model_id) | |
def pattern_match(patterns, source_list): | |
if isinstance(patterns, str): | |
patterns = [patterns] | |
env_ids = set() | |
for pattern in patterns: | |
for matching in fnmatch.filter(source_list, pattern): | |
env_ids.add(matching) | |
return sorted(list(env_ids)) | |
def evaluate(model_id, revision): | |
tags = API.model_info(model_id, revision=revision).tags | |
# Extract the environment IDs from the tags (usually only one) | |
env_ids = pattern_match(tags, ALL_ENV_IDS) | |
logger.info(f"Selected environments: {env_ids}") | |
results = {} | |
# Check if the agent exists | |
try: | |
agent_path = hf_hub_download(repo_id=model_id, filename="agent.pt") | |
except EntryNotFoundError: | |
logger.error("Agent not found") | |
return None | |
# Check safety | |
security = next(iter(API.get_paths_info(model_id, "agent.pt", expand=True))).security | |
if security is None or "safe" not in security: | |
logger.error("Agent safety not available") | |
return None | |
elif not security["safe"]: | |
logger.error("Agent not safe") | |
return None | |
# Load the agent | |
try: | |
agent = torch.jit.load(agent_path) | |
except Exception as e: | |
logger.error(f"Error loading agent: {e}") | |
return None | |
# Evaluate the agent on the environments | |
for env_id in env_ids: | |
episodic_rewards = [] | |
env = gym.make(env_id) | |
for _ in range(10): | |
episodic_reward = 0.0 | |
observation, info = env.reset() | |
done = False | |
while not done: | |
torch_observation = torch.from_numpy(np.array([observation])) | |
action = agent(torch_observation).numpy()[0] | |
observation, reward, terminated, truncated, info = env.step(action) | |
done = terminated or truncated | |
episodic_reward += reward | |
episodic_rewards.append(episodic_reward) | |
mean_reward = np.mean(episodic_rewards) | |
std_reward = np.std(episodic_rewards) | |
results[env_id] = {"episodic_return_mean": mean_reward, "episodic_reward_std": std_reward} | |
return results | |
def _backend_routine(): | |
# List only the text classification models | |
rl_models = list(API.list_models(filter="reinforcement-learning")) | |
logger.info(f"Found {len(rl_models)} RL models") | |
compatible_models = [] | |
for model in rl_models: | |
filenames = [sib.rfilename for sib in model.siblings] | |
if "agent.pt" in filenames: | |
compatible_models.append((model.modelId, model.sha)) | |
logger.info(f"Found {len(compatible_models)} compatible models") | |
# Get the results | |
snapshot_download( | |
repo_id=RESULTS_REPO, | |
revision="main", | |
local_dir=RESULTS_PATH, | |
repo_type="dataset", | |
max_workers=60, | |
token=TOKEN, | |
) | |
json_files = glob.glob(f"{RESULTS_PATH}/**/*.json", recursive=True) | |
evaluated_models = set() | |
for json_filepath in json_files: | |
with open(json_filepath) as fp: | |
data = json.load(fp) | |
evaluated_models.add((data["config"]["model_id"], data["config"]["model_sha"])) | |
# Find the models that are not associated with any results | |
pending_models = set(compatible_models) - evaluated_models | |
logger.info(f"Found {len(pending_models)} pending models") | |
# Run an evaluation on the models | |
for model_id, sha in pending_models: | |
logger.info(f"Running evaluation on {model_id}") | |
report = {"config": {"model_id": model_id, "model_sha": sha}} | |
try: | |
evaluations = evaluate(model_id, revision=sha) | |
except Exception as e: | |
logger.error(f"Error evaluating {model_id}: {e}") | |
evaluations = None | |
if evaluations is not None: | |
report["results"] = evaluations | |
report["status"] = "DONE" | |
else: | |
report["status"] = "FAILED" | |
# Update the results | |
dumped = json.dumps(report, indent=2) | |
output_path = os.path.join(RESULTS_PATH, model_id, f"results_{sha}.json") | |
os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
with open(output_path, "w") as f: | |
f.write(dumped) | |
# Upload the results to the results repo | |
API.upload_file( | |
path_or_fileobj=output_path, | |
path_in_repo=f"{model_id}/results_{sha}.json", | |
repo_id=RESULTS_REPO, | |
repo_type="dataset", | |
) | |
def backend_routine(): | |
try: | |
_backend_routine() | |
except Exception as e: | |
logger.error(f"{e.__class__.__name__}: {str(e)}") | |
def get_leaderboard_df(): | |
snapshot_download( | |
repo_id=RESULTS_REPO, | |
revision="main", | |
local_dir=RESULTS_PATH, | |
repo_type="dataset", | |
max_workers=60, | |
token=TOKEN, | |
) | |
json_files = glob.glob(f"{RESULTS_PATH}/**/*.json", recursive=True) | |
data = [] | |
for json_filepath in json_files: | |
with open(json_filepath) as fp: | |
report = json.load(fp) | |
model_id = report["config"]["model_id"] | |
row = {"Agent": model_id, "Status": report["status"]} | |
if report["status"] == "DONE": | |
results = {env_id: result["episodic_return_mean"] for env_id, result in report["results"].items()} | |
row.update(results) | |
data.append(row) | |
# Create DataFrame | |
df = pd.DataFrame(data) | |
# Replace NaN values with empty strings | |
df = df.fillna("") | |
return df | |
TITLE = """ | |
🚀 Open RL Leaderboard | |
""" | |
INTRODUCTION_TEXT = """ | |
Welcome to the Open RL Leaderboard! This is a community-driven benchmark for reinforcement learning models. | |
""" | |
ABOUT_TEXT = """ | |
The Open RL Leaderboard is a community-driven benchmark for reinforcement learning models. | |
""" | |
def select_column(column_names, data): | |
column_names = [col for col in column_names if col in data.columns] | |
column_names = ["Agent"] + column_names # add model name column | |
df = data[column_names] | |
def check_row(row): | |
return not (row.drop("Agent") == "").all() | |
mask = df.apply(check_row, axis=1) | |
df = df[mask] | |
return df | |
with gr.Blocks(js=dark_mode_gradio_js) as demo: | |
gr.HTML(TITLE) | |
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") | |
with gr.Tabs(elem_classes="tab-buttons") as tabs: | |
with gr.TabItem("🏅 Leaderboard", elem_id="llm-benchmark-tab-table", id=0): | |
hidden_df = gr.components.Dataframe(get_leaderboard_df, visible=False, every=60) # hidden dataframe | |
env_checkboxes = gr.components.CheckboxGroup( | |
label="Environments", | |
choices=ALL_ENV_IDS, | |
value=[ALL_ENV_IDS[0]], | |
interactive=True, | |
) | |
leaderboard = gr.components.Dataframe(select_column([ALL_ENV_IDS[0]], get_leaderboard_df())) | |
# Events | |
env_checkboxes.change(select_column, [env_checkboxes, hidden_df], leaderboard) | |
# Update hidden dataframe | |
# hidden_df.change(get_leaderboard_df, [], hidden_df, every=10) | |
with gr.TabItem("📝 About", elem_id="llm-benchmark-tab-table", id=2): | |
gr.Markdown(ABOUT_TEXT) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(func=backend_routine, trigger="interval", seconds=60) | |
scheduler.start() | |
if __name__ == "__main__": | |
demo.queue().launch() # server_name="0.0.0.0", show_error=True, server_port=7860) | |