Spaces:
Runtime error
Runtime error
import argparse | |
import functools | |
import traceback | |
import gradio as gr | |
import bittensor as bt | |
from typing import Dict, List, Any, Optional, Tuple | |
from bittensor.extrinsics.serving import get_metadata | |
from dataclasses import dataclass | |
import wandb | |
import math | |
import os | |
import datetime | |
import time | |
import json | |
import pandas as pd | |
from dotenv import load_dotenv | |
from huggingface_hub import HfApi | |
from apscheduler.schedulers.background import BackgroundScheduler | |
import pandas as pd | |
load_dotenv() | |
FONT = ( | |
"""<link href="https://fonts.cdnfonts.com/css/jmh-typewriter" rel="stylesheet">""" | |
) | |
TITLE = """<h1 align="center" id="space-title" class="typewriter">Subnet 9 Leaderboard</h1>""" | |
HEADER = """<h2 align="center" class="typewriter"><a href="https://github.com/RaoFoundation/pretraining" target="_blank">Subnet 9</a> is a <a href="https://bittensor.com/" target="_blank">Bittensor</a> subnet that rewards miners for producing pretrained Foundation-Models on the <a href="https://huggingface.co/datasets/tiiuae/falcon-refinedweb" target="_blank">Falcon Refined Web dataset</a>. It acts like a continuous benchmark whereby miners are rewarded for attaining the best losses on randomly sampled pages of Falcon.<br/>The models with the best head-to-head loss on the evaluation data receive a steady emission of TAO.</h3>""" | |
EVALUATION_DETAILS = """<ul><li><b>Name:</b> the 🤗 Hugging Face model name (click to go to the model card)</li><li><b>Rewards / Day:</b> the expected rewards per day based on current ranking.</li><li><b>Last Average Loss:</b> the last loss value on the evaluation data for the model as calculated by a validator (lower is better)</li><li><b>UID:</b> the Bittensor UID of the miner</li><li><b>Block:</b> the Bittensor block that the model was submitted in</li></ul><br/>More stats on <a href="https://taostats.io/subnets/netuid-9/" target="_blank">taostats</a>.""" | |
EVALUATION_HEADER = """<h3 align="center">Shows the latest internal evaluation statistics as calculated by the Opentensor validator</h3>""" | |
VALIDATOR_WANDB_PROJECT = "opentensor-dev/pretraining-subnet" | |
BENCHMARK_WANDB_PROJECT = "raofoundation/pretraining-leaderboard-data" | |
H4_TOKEN = os.environ.get("H4_TOKEN", None) | |
API = HfApi(token=H4_TOKEN) | |
WANDB_TOKEN = os.environ.get("WANDB_API_KEY", None) | |
SUBTENSOR_ENDPOINT=os.environ.get("SUBTENSOR_ENDPOINT", None) | |
REPO_ID = "RaoFoundation/pretraining-leaderboard" | |
MAX_AVG_LOSS_POINTS = 1 | |
RETRIES = 5 | |
DELAY_SECS = 3 | |
NETUID = 9 | |
SECONDS_PER_BLOCK = 12 | |
class ModelData: | |
uid: int | |
hotkey: str | |
namespace: str | |
name: str | |
commit: str | |
hash: str | |
block: int | |
incentive: float | |
emission: float | |
def from_compressed_str( | |
cls, | |
uid: int, | |
hotkey: str, | |
cs: str, | |
block: int, | |
incentive: float, | |
emission: float, | |
): | |
"""Returns an instance of this class from a compressed string representation""" | |
tokens = cs.split(":") | |
return ModelData( | |
uid=uid, | |
hotkey=hotkey, | |
namespace=tokens[0], | |
name=tokens[1], | |
commit=tokens[2] if tokens[2] != "None" else None, | |
hash=tokens[3] if tokens[3] != "None" else None, | |
block=block, | |
incentive=incentive, | |
emission=emission, | |
) | |
def run_with_retries(func, *args, **kwargs): | |
for i in range(0, RETRIES): | |
try: | |
return func(*args, **kwargs) | |
except (Exception, RuntimeError): | |
if i == RETRIES - 1: | |
raise | |
time.sleep(DELAY_SECS) | |
raise RuntimeError("Should never happen") | |
def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]: | |
def _internal() -> Tuple[bt.subtensor, bt.metagraph]: | |
if SUBTENSOR_ENDPOINT: | |
parser = argparse.ArgumentParser() | |
bt.subtensor.add_args(parser) | |
subtensor = bt.subtensor(config=bt.config(parser=parser, args=["--subtensor.chain_endpoint", SUBTENSOR_ENDPOINT])) | |
else: | |
subtensor = bt.subtensor("finney") | |
metagraph = subtensor.metagraph(NETUID, lite=False) | |
return subtensor, metagraph | |
return run_with_retries(_internal) | |
def get_validator_weights( | |
metagraph: bt.metagraph, | |
) -> Dict[int, Tuple[float, int, Dict[int, float]]]: | |
"""Returns a dictionary of validator UIDs to (vtrust, stake, {uid: weight}).""" | |
ret = {} | |
for uid in metagraph.uids.tolist(): | |
vtrust = metagraph.validator_trust[uid].item() | |
if vtrust > 0: | |
ret[uid] = (vtrust, metagraph.S[uid].item(), {}) | |
for ouid in metagraph.uids.tolist(): | |
if ouid == uid: | |
continue | |
weight = round(metagraph.weights[uid][ouid].item(), 4) | |
if weight > 0: | |
ret[uid][-1][ouid] = weight | |
return ret | |
def get_subnet_data( | |
subtensor: bt.subtensor, metagraph: bt.metagraph | |
) -> List[ModelData]: | |
result = [] | |
for uid in metagraph.uids.tolist(): | |
hotkey = metagraph.hotkeys[uid] | |
metadata = None | |
try: | |
metadata = run_with_retries( | |
functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey) | |
) | |
except: | |
print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}") | |
if not metadata: | |
continue | |
commitment = metadata["info"]["fields"][0] | |
hex_data = commitment[list(commitment.keys())[0]][2:] | |
chain_str = bytes.fromhex(hex_data).decode() | |
block = metadata["block"] | |
incentive = metagraph.incentive[uid].nan_to_num().item() | |
emission = ( | |
metagraph.emission[uid].nan_to_num().item() * 20 | |
) # convert to daily TAO | |
model_data = None | |
try: | |
model_data = ModelData.from_compressed_str( | |
uid, hotkey, chain_str, block, incentive, emission | |
) | |
except: | |
continue | |
result.append(model_data) | |
return result | |
def is_floatable(x) -> bool: | |
return ( | |
isinstance(x, float) and not math.isnan(x) and not math.isinf(x) | |
) or isinstance(x, int) | |
def get_wandb_runs(project: str, filters: Dict[str, Any]) -> List: | |
"""Get the latest runs from Wandb, retrying infinitely until we get them.""" | |
while True: | |
api = wandb.Api(api_key=WANDB_TOKEN) | |
runs = list( | |
api.runs( | |
project, | |
filters=filters, | |
) | |
) | |
if len(runs) > 0: | |
return runs | |
# WandDB API is quite unreliable. Wait another minute and try again. | |
print("Failed to get runs from Wandb. Trying again in 60 seconds.") | |
time.sleep(60) | |
def get_scores( | |
uids: List[int], | |
wandb_runs: List, | |
) -> Dict[int, Dict[str, Optional[float]]]: | |
result = {} | |
previous_timestamp = None | |
# Iterate through the runs until we've processed all the uids. | |
for i, run in enumerate(wandb_runs): | |
if not "original_format_json" in run.summary: | |
continue | |
data = json.loads(run.summary["original_format_json"]) | |
all_uid_data = data["uid_data"] | |
timestamp = data["timestamp"] | |
# Make sure runs are indeed in descending time order. | |
assert ( | |
previous_timestamp is None or timestamp < previous_timestamp | |
), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}" | |
previous_timestamp = timestamp | |
for uid in uids: | |
if uid in result: | |
continue | |
if str(uid) in all_uid_data: | |
uid_data = all_uid_data[str(uid)] | |
# Only the most recent run is fresh. | |
is_fresh = i == 0 | |
result[uid] = { | |
"avg_loss": uid_data.get("average_loss", None), | |
"win_rate": uid_data.get("win_rate", None), | |
"win_total": uid_data.get("win_total", None), | |
"weight": uid_data.get("weight", None), | |
"fresh": is_fresh, | |
} | |
if len(result) == len(uids): | |
break | |
return result | |
def get_losses_over_time(wandb_runs: List) -> pd.DataFrame: | |
"""Returns a dataframe of the best average model loss over time.""" | |
timestamps = [] | |
best_losses = [] | |
for run in wandb_runs: | |
if "original_format_json" not in run.summary: | |
continue | |
data = json.loads(run.summary["original_format_json"]) | |
all_uid_data = data["uid_data"] | |
timestamp = datetime.datetime.fromtimestamp(data["timestamp"]) | |
best_loss = math.inf | |
for _, uid_data in all_uid_data.items(): | |
loss = uid_data.get("average_loss", math.inf) | |
# Filter out the numbers from the exploit and when validators lost the best model. | |
if loss < best_loss and (loss > 2.5 or timestamp > datetime.datetime(2024,2,12)) and (loss < 5 or timestamp > datetime.datetime(2024,3,27)): | |
best_loss = uid_data["average_loss"] | |
if best_loss != math.inf: | |
timestamps.append(timestamp) | |
best_losses.append(best_loss) | |
return pd.DataFrame({"timestamp": timestamps, "best_loss": best_losses}) | |
def format_score(uid: int, scores, key) -> Optional[float]: | |
if uid in scores: | |
if key in scores[uid]: | |
point = scores[uid][key] | |
if is_floatable(point): | |
return round(scores[uid][key], 4) | |
return None | |
def next_epoch(subtensor: bt.subtensor, block: int) -> int: | |
return ( | |
block | |
+ subtensor.get_subnet_hyperparameters(NETUID).tempo | |
- subtensor.blocks_since_epoch(NETUID, block) | |
) | |
def get_next_update_div(current_block: int, next_update_block: int) -> str: | |
now = datetime.datetime.now() | |
blocks_to_go = next_update_block - current_block | |
next_update_time = now + datetime.timedelta( | |
seconds=blocks_to_go * SECONDS_PER_BLOCK | |
) | |
delta = next_update_time - now | |
return f"""<div align="center" style="font-size: larger;">Next reward update: <b>{blocks_to_go}</b> blocks (~{int(delta.total_seconds() // 60)} minutes)</div>""" | |
def get_last_updated_div() -> str: | |
return f"""<div>Last Updated: {datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")} (UTC)</div>""" | |
def leaderboard_data( | |
leaderboard: List[ModelData], | |
scores: Dict[int, Dict[str, Optional[float]]], | |
show_stale: bool, | |
) -> List[List[Any]]: | |
"""Returns the leaderboard data, based on models data and UID scores.""" | |
return [ | |
[ | |
f"[{c.namespace}/{c.name} ({c.commit[0:8]})](https://huggingface.co/{c.namespace}/{c.name}/commit/{c.commit})", | |
format_score(c.uid, scores, "win_rate"), | |
format_score(c.uid, scores, "avg_loss"), | |
format_score(c.uid, scores, "weight"), | |
c.uid, | |
c.block, | |
] | |
for c in leaderboard | |
if (c.uid in scores and scores[c.uid]["fresh"]) or show_stale | |
] | |
def get_benchmarks() -> Tuple[pd.DataFrame, datetime.datetime]: | |
"""Returns the latest benchmarks and the time they were run.""" | |
runs = get_wandb_runs(project=BENCHMARK_WANDB_PROJECT, filters=None) | |
for run in runs: | |
artifacts = list(run.logged_artifacts()) | |
if artifacts: | |
table = artifacts[-1].get("benchmarks") | |
if table: | |
return table.get_dataframe(), datetime.datetime.strptime(run.metadata["startedAt"], "%Y-%m-%dT%H:%M:%S.%f") | |
bt.logging.error("Failed to get benchmarks from Wandb.") | |
return None, None | |
def restart_space(): | |
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN) | |
def main(): | |
# To avoid leaderboard failures, infinitely try until we get all data | |
# needed to populate the dashboard | |
while True: | |
try: | |
subtensor, metagraph = get_subtensor_and_metagraph() | |
model_data: List[ModelData] = get_subnet_data(subtensor, metagraph) | |
model_data.sort(key=lambda x: x.incentive, reverse=True) | |
vali_runs = get_wandb_runs(project=VALIDATOR_WANDB_PROJECT, filters={"config.type": "validator", "config.uid": 238}) | |
scores = get_scores([x.uid for x in model_data], vali_runs) | |
# TODO: Re-enable once ""SubtensorModule.BlocksSinceEpoch" not found" issue is resolved. | |
# current_block = metagraph.block.item() | |
# next_epoch_block = next_epoch(subtensor, current_block) | |
validator_df = get_validator_weights(metagraph) | |
weight_keys = set() | |
for uid, stats in validator_df.items(): | |
weight_keys.update(stats[-1].keys()) | |
benchmarks, benchmark_timestamp = get_benchmarks() | |
break | |
except Exception as e: | |
print(f"Failed to get data: {e}") | |
time.sleep(30) | |
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}") | |
with demo: | |
gr.HTML(FONT) | |
gr.HTML(TITLE) | |
gr.HTML(HEADER) | |
# TODO: Re-enable once ""SubtensorModule.BlocksSinceEpoch" not found" issue is resolved. | |
# gr.HTML(value=get_next_update_div(current_block, next_epoch_block)) | |
gr.Label( | |
value={ | |
f"{c.namespace}/{c.name} ({c.commit[0:8]}) · (τ{round(c.emission, 2):,})": c.incentive | |
for c in model_data | |
if c.incentive | |
}, | |
num_top_classes=10, | |
) | |
if benchmarks is not None: | |
with gr.Accordion("Top Model Benchmarks"): | |
gr.components.Dataframe(benchmarks) | |
gr.HTML("""<div>PPL computed using a stride of 512. See <a href='https://github.com/RaoFoundation/pretraining/blob/dev/scripts/run_benchmarks.py'>here</a> for the full code.</div>""") | |
gr.HTML(f"""<div>Last Updated: {benchmark_timestamp.strftime("%Y-%m-%d %H:%M:%S")} (UTC)</div>""") | |
with gr.Accordion("Evaluation Stats"): | |
gr.HTML(EVALUATION_HEADER) | |
show_stale = gr.Checkbox(label="Show Stale", interactive=True) | |
leaderboard_table = gr.components.Dataframe( | |
value=leaderboard_data(model_data, scores, show_stale.value), | |
headers=["Name", "Win Rate", "Average Loss", "Weight", "UID", "Block"], | |
datatype=["markdown", "number", "number", "number", "number", "number"], | |
elem_id="leaderboard-table", | |
interactive=False, | |
visible=True, | |
) | |
gr.HTML(EVALUATION_DETAILS) | |
show_stale.change( | |
lambda stale: leaderboard_data(model_data, scores, stale), | |
inputs=[show_stale], | |
outputs=leaderboard_table, | |
) | |
gr.LinePlot( | |
get_losses_over_time(vali_runs), | |
x="timestamp", | |
x_title="Date", | |
y="best_loss", | |
y_title="Average Loss", | |
tooltip="best_loss", | |
interactive=True, | |
visible=True, | |
width=1024, | |
title="Best Average Loss Over Time", | |
) | |
with gr.Accordion("Validator Stats"): | |
gr.components.Dataframe( | |
value=[ | |
[uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)] | |
+ [ | |
validator_df[uid][-1].get(c.uid) | |
for c in model_data | |
if c.incentive | |
] | |
for uid, _ in sorted( | |
zip( | |
validator_df.keys(), | |
[validator_df[x][1] for x in validator_df.keys()], | |
), | |
key=lambda x: x[1], | |
reverse=True, | |
) | |
], | |
headers=["UID", "Stake (τ)", "V-Trust"] | |
+ [ | |
f"{c.namespace}/{c.name} ({c.commit[0:8]})" | |
for c in model_data | |
if c.incentive | |
], | |
datatype=["number", "number", "number"] | |
+ ["number" for c in model_data if c.incentive], | |
interactive=False, | |
visible=True, | |
) | |
gr.HTML(value=get_last_updated_div()) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job( | |
restart_space, "interval", seconds=60 * 30 | |
) # restart every 15 minutes | |
scheduler.start() | |
demo.launch() | |
main() | |