Spaces:
Runtime error
Runtime error
import gradio as gr | |
import bittensor as bt | |
from typing import Dict, List, Any, Optional, Tuple | |
from bittensor.extrinsics.serving import get_metadata | |
from dataclasses import dataclass | |
import requests | |
import wandb | |
import math | |
import os | |
import datetime | |
import time | |
import json | |
import pandas as pd | |
from dotenv import load_dotenv | |
from huggingface_hub import HfApi | |
from apscheduler.schedulers.background import BackgroundScheduler | |
load_dotenv() | |
FONT = ( | |
"""<link href="https://fonts.cdnfonts.com/css/jmh-typewriter" rel="stylesheet">""" | |
) | |
TITLE = """<h1 align="center" id="space-title" class="typewriter">Subnet 9 Leaderboard</h1>""" | |
HEADER = """<h2 align="center" class="typewriter"><a href="https://github.com/RaoFoundation/pretraining" target="_blank">Subnet 9</a> is a <a href="https://bittensor.com/" target="_blank">Bittensor</a> subnet that rewards miners for producing pretrained Foundation-Models on the <a href="https://huggingface.co/datasets/tiiuae/falcon-refinedweb" target="_blank">Falcon Refined Web dataset</a>. It acts like a continuous benchmark whereby miners are rewarded for attaining the best losses on randomly sampled pages of Falcon.<br/>The models with the best head-to-head loss on the evaluation data receive a steady emission of TAO.</h3>""" | |
EVALUATION_DETAILS = """<ul><li><b>Name:</b> the 🤗 Hugging Face model name (click to go to the model card)</li><li><b>Rewards / Day:</b> the expected rewards per day based on current ranking.</li><li><b>Last Average Loss:</b> the last loss value on the evaluation data for the model as calculated by a validator (lower is better)</li><li><b>UID:</b> the Bittensor UID of the miner</li><li><b>Block:</b> the Bittensor block that the model was submitted in</li></ul><br/>More stats on <a href="https://taostats.io/subnets/netuid-9/" target="_blank">taostats</a>.""" | |
EVALUATION_HEADER = """<h3 align="center">Shows the latest internal evaluation statistics as calculated by the Opentensor validator</h3>""" | |
VALIDATOR_WANDB_PROJECT = "opentensor-dev/pretraining-subnet" | |
H4_TOKEN = os.environ.get("H4_TOKEN", None) | |
API = HfApi(token=H4_TOKEN) | |
WANDB_TOKEN = os.environ.get("WANDB_API_KEY", None) | |
REPO_ID = "RusticLuftig/9-leaderboard" | |
MAX_AVG_LOSS_POINTS = 1 | |
RETRIES = 5 | |
DELAY_SECS = 3 | |
NETUID = 9 | |
SECONDS_PER_BLOCK = 12 | |
class ModelData: | |
uid: int | |
hotkey: str | |
namespace: str | |
name: str | |
commit: str | |
hash: str | |
block: int | |
incentive: float | |
emission: float | |
def from_compressed_str( | |
cls, | |
uid: int, | |
hotkey: str, | |
cs: str, | |
block: int, | |
incentive: float, | |
emission: float, | |
): | |
"""Returns an instance of this class from a compressed string representation""" | |
tokens = cs.split(":") | |
return ModelData( | |
uid=uid, | |
hotkey=hotkey, | |
namespace=tokens[0], | |
name=tokens[1], | |
commit=tokens[2] if tokens[2] != "None" else None, | |
hash=tokens[3] if tokens[3] != "None" else None, | |
block=block, | |
incentive=incentive, | |
emission=emission, | |
) | |
def run_with_retries(func, *args, **kwargs): | |
for i in range(0, RETRIES): | |
try: | |
return func(*args, **kwargs) | |
except: | |
if i == RETRIES - 1: | |
raise | |
time.sleep(DELAY_SECS) | |
raise RuntimeError("Should never happen") | |
def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]: | |
def _internal() -> Tuple[bt.subtensor, bt.metagraph]: | |
subtensor = bt.subtensor("finney") | |
metagraph = bt.metagraph(NETUID, lite=False) | |
return subtensor, metagraph | |
return run_with_retries(_internal) | |
def get_tao_price() -> float: | |
return run_with_retries( | |
lambda: float( | |
requests.get( | |
"https://api.kucoin.com/api/v1/market/stats?symbol=TAO-USDT" | |
).json()["data"]["last"] | |
) | |
) | |
def get_validator_weights( | |
metagraph: bt.metagraph, | |
) -> Dict[int, Tuple[float, int, Dict[int, float]]]: | |
"""Returns a dictionary of validator UIDs to (vtrust, stake, {uid: weight}).""" | |
ret = {} | |
for uid in metagraph.uids.tolist(): | |
vtrust = metagraph.validator_trust[uid].item() | |
if vtrust > 0: | |
ret[uid] = (vtrust, metagraph.S[uid].item(), {}) | |
for ouid in metagraph.uids.tolist(): | |
if ouid == uid: | |
continue | |
weight = round(metagraph.weights[uid][ouid].item(), 4) | |
if weight > 0: | |
ret[uid][-1][ouid] = weight | |
return ret | |
def get_subnet_data( | |
subtensor: bt.subtensor, metagraph: bt.metagraph | |
) -> List[ModelData]: | |
result = [] | |
for uid in metagraph.uids.tolist(): | |
hotkey = metagraph.hotkeys[uid] | |
metadata = get_metadata(subtensor, metagraph.netuid, hotkey) | |
if not metadata: | |
continue | |
commitment = metadata["info"]["fields"][0] | |
hex_data = commitment[list(commitment.keys())[0]][2:] | |
chain_str = bytes.fromhex(hex_data).decode() | |
block = metadata["block"] | |
incentive = metagraph.incentive[uid].nan_to_num().item() | |
emission = ( | |
metagraph.emission[uid].nan_to_num().item() * 20 | |
) # convert to daily TAO | |
model_data = None | |
try: | |
model_data = ModelData.from_compressed_str( | |
uid, hotkey, chain_str, block, incentive, emission | |
) | |
except: | |
continue | |
result.append(model_data) | |
return result | |
def is_floatable(x) -> bool: | |
return ( | |
isinstance(x, float) and not math.isnan(x) and not math.isinf(x) | |
) or isinstance(x, int) | |
def get_scores( | |
uids: List[int], | |
) -> Dict[int, Dict[str, Optional[float]]]: | |
api = wandb.Api(api_key=WANDB_TOKEN) | |
runs = list( | |
api.runs( | |
VALIDATOR_WANDB_PROJECT, | |
filters={"config.type": "validator", "config.uid": 238}, | |
) | |
) | |
result = {} | |
previous_timestamp = None | |
# Iterate through the runs until we've processed all the uids. | |
for i, run in enumerate(runs): | |
if not "original_format_json" in run.summary: | |
continue | |
data = json.loads(run.summary["original_format_json"]) | |
all_uid_data = data["uid_data"] | |
timestamp = data["timestamp"] | |
# Make sure runs are indeed in descending time order. | |
assert ( | |
previous_timestamp is None or timestamp < previous_timestamp | |
), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}" | |
previous_timestamp = timestamp | |
for uid in uids: | |
if uid in result: | |
continue | |
if str(uid) in all_uid_data: | |
uid_data = all_uid_data[str(uid)] | |
# Only the most recent run is fresh. | |
is_fresh = i == 0 | |
result[uid] = { | |
"avg_loss": uid_data.get("average_loss", None), | |
"win_rate": uid_data.get("win_rate", None), | |
"win_total": uid_data.get("win_total", None), | |
"weight": uid_data.get("weight", None), | |
"fresh": is_fresh, | |
} | |
if len(result) == len(uids): | |
break | |
return result | |
def format_score(uid: int, scores, key) -> Optional[float]: | |
if uid in scores: | |
if key in scores[uid]: | |
point = scores[uid][key] | |
if is_floatable(point): | |
return round(scores[uid][key], 4) | |
return None | |
def next_epoch(subtensor: bt.subtensor, block: int) -> int: | |
return block + subtensor.get_subnet_hyperparameters( | |
NETUID | |
).tempo - subtensor.blocks_since_epoch(NETUID, block) | |
def get_next_update_div(current_block: int, next_update_block: int) -> str: | |
now = datetime.datetime.now() | |
blocks_to_go = next_update_block - current_block | |
next_update_time = now + datetime.timedelta( | |
seconds=blocks_to_go * SECONDS_PER_BLOCK | |
) | |
delta = next_update_time - now | |
return f"""<div align="center" style="font-size: larger;">Next reward update: <b>{blocks_to_go}</b> blocks (~{int(delta.total_seconds() // 60)} minutes)</div>""" | |
def leaderboard_data( | |
leaderboard: List[ModelData], | |
scores: Dict[int, Dict[str, Optional[float]]], | |
show_stale: bool, | |
) -> List[List[Any]]: | |
"""Returns the leaderboard data, based on models data and UID scores.""" | |
return [ | |
[ | |
f"[{c.namespace}/{c.name} ({c.commit[0:8]})](https://huggingface.co/{c.namespace}/{c.name}/commit/{c.commit})", | |
format_score(c.uid, scores, "win_rate"), | |
format_score(c.uid, scores, "avg_loss"), | |
format_score(c.uid, scores, "weight"), | |
c.uid, | |
c.block, | |
] | |
for c in leaderboard | |
if (c.uid in scores and scores[c.uid]["fresh"]) or show_stale | |
] | |
def restart_space(): | |
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN) | |
def main(): | |
subtensor, metagraph = get_subtensor_and_metagraph() | |
tao_price = get_tao_price() | |
model_data: List[ModelData] = get_subnet_data(subtensor, metagraph) | |
model_data.sort(key=lambda x: x.incentive, reverse=True) | |
scores = get_scores([x.uid for x in model_data]) | |
current_block = metagraph.block.item() | |
next_epoch_block = next_epoch(subtensor, current_block) | |
validator_df = get_validator_weights(metagraph) | |
weight_keys = set() | |
for uid, stats in validator_df.items(): | |
weight_keys.update(stats[-1].keys()) | |
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}") | |
with demo: | |
gr.HTML(FONT) | |
gr.HTML(TITLE) | |
gr.HTML(HEADER) | |
gr.HTML(value=get_next_update_div(current_block, next_epoch_block)) | |
gr.Label( | |
value={ | |
f"{c.namespace}/{c.name} ({c.commit[0:8]}) · ${round(c.emission * tao_price, 2):,} (τ{round(c.emission, 2):,})": c.incentive | |
for c in model_data | |
if c.incentive | |
}, | |
num_top_classes=10, | |
) | |
with gr.Accordion("Evaluation Stats"): | |
gr.HTML(EVALUATION_HEADER) | |
show_stale = gr.Checkbox(label="Show Stale", interactive=True) | |
leaderboard_table = gr.components.Dataframe( | |
value=leaderboard_data(model_data, scores, show_stale.value), | |
headers=["Name", "Win Rate", "Average Loss", "Weight", "UID", "Block"], | |
datatype=["markdown", "number", "number", "number", "number", "number"], | |
elem_id="leaderboard-table", | |
interactive=False, | |
visible=True, | |
) | |
gr.HTML(EVALUATION_DETAILS) | |
show_stale.change(lambda stale: leaderboard_data(model_data, scores, stale), inputs=[show_stale], outputs=leaderboard_table) | |
with gr.Accordion("Validator Stats"): | |
gr.components.Dataframe( | |
value=[ | |
[uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)] | |
+ [validator_df[uid][-1].get(c.uid) for c in model_data if c.incentive] | |
for uid, _ in sorted( | |
zip( | |
validator_df.keys(), | |
[validator_df[x][1] for x in validator_df.keys()], | |
), | |
key=lambda x: x[1], | |
reverse=True, | |
) | |
], | |
headers=["UID", "Stake (τ)", "V-Trust"] | |
+ [ | |
f"{c.namespace}/{c.name} ({c.commit[0:8]})" | |
for c in model_data | |
if c.incentive | |
], | |
datatype=["number", "number", "number"] | |
+ ["number" for c in model_data if c.incentive], | |
interactive=False, | |
visible=True, | |
) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job( | |
restart_space, "interval", seconds=60 * 15 | |
) # restart every 15 minutes | |
scheduler.start() | |
demo.launch() | |
main() | |