import functools
import traceback
import gradio as gr
import bittensor as bt
from typing import Dict, List, Any, Optional, Tuple
from bittensor.extrinsics.serving import get_metadata
from dataclasses import dataclass
import requests
import wandb
import math
import os
import datetime
import time
import json
import pandas as pd
from dotenv import load_dotenv
from huggingface_hub import HfApi
from apscheduler.schedulers.background import BackgroundScheduler
load_dotenv()
FONT = (
""""""
)
TITLE = """
Subnet 9 Leaderboard
"""
HEADER = """
Subnet 9 is a Bittensor subnet that rewards miners for producing pretrained Foundation-Models on the Falcon Refined Web dataset. It acts like a continuous benchmark whereby miners are rewarded for attaining the best losses on randomly sampled pages of Falcon. The models with the best head-to-head loss on the evaluation data receive a steady emission of TAO."""
EVALUATION_DETAILS = """
Name: the 🤗 Hugging Face model name (click to go to the model card)
Rewards / Day: the expected rewards per day based on current ranking.
Last Average Loss: the last loss value on the evaluation data for the model as calculated by a validator (lower is better)
UID: the Bittensor UID of the miner
Block: the Bittensor block that the model was submitted in
More stats on taostats."""
EVALUATION_HEADER = """
Shows the latest internal evaluation statistics as calculated by the Opentensor validator
"""
VALIDATOR_WANDB_PROJECT = "opentensor-dev/pretraining-subnet"
H4_TOKEN = os.environ.get("H4_TOKEN", None)
API = HfApi(token=H4_TOKEN)
WANDB_TOKEN = os.environ.get("WANDB_API_KEY", None)
REPO_ID = "RaoFoundation/pretraining-leaderboard"
MAX_AVG_LOSS_POINTS = 1
RETRIES = 5
DELAY_SECS = 3
NETUID = 9
SECONDS_PER_BLOCK = 12
@dataclass
class ModelData:
uid: int
hotkey: str
namespace: str
name: str
commit: str
hash: str
block: int
incentive: float
emission: float
@classmethod
def from_compressed_str(
cls,
uid: int,
hotkey: str,
cs: str,
block: int,
incentive: float,
emission: float,
):
"""Returns an instance of this class from a compressed string representation"""
tokens = cs.split(":")
return ModelData(
uid=uid,
hotkey=hotkey,
namespace=tokens[0],
name=tokens[1],
commit=tokens[2] if tokens[2] != "None" else None,
hash=tokens[3] if tokens[3] != "None" else None,
block=block,
incentive=incentive,
emission=emission,
)
def run_with_retries(func, *args, **kwargs):
for i in range(0, RETRIES):
try:
return func(*args, **kwargs)
except (Exception, RuntimeError):
if i == RETRIES - 1:
raise
time.sleep(DELAY_SECS)
raise RuntimeError("Should never happen")
def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]:
def _internal() -> Tuple[bt.subtensor, bt.metagraph]:
subtensor = bt.subtensor("finney")
metagraph = bt.metagraph(NETUID, lite=False)
return subtensor, metagraph
return run_with_retries(_internal)
def get_validator_weights(
metagraph: bt.metagraph,
) -> Dict[int, Tuple[float, int, Dict[int, float]]]:
"""Returns a dictionary of validator UIDs to (vtrust, stake, {uid: weight})."""
ret = {}
for uid in metagraph.uids.tolist():
vtrust = metagraph.validator_trust[uid].item()
if vtrust > 0:
ret[uid] = (vtrust, metagraph.S[uid].item(), {})
for ouid in metagraph.uids.tolist():
if ouid == uid:
continue
weight = round(metagraph.weights[uid][ouid].item(), 4)
if weight > 0:
ret[uid][-1][ouid] = weight
return ret
def get_subnet_data(
subtensor: bt.subtensor, metagraph: bt.metagraph
) -> List[ModelData]:
result = []
for uid in metagraph.uids.tolist():
hotkey = metagraph.hotkeys[uid]
metadata = None
try:
metadata = run_with_retries(functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey))
except:
print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}")
if not metadata:
continue
commitment = metadata["info"]["fields"][0]
hex_data = commitment[list(commitment.keys())[0]][2:]
chain_str = bytes.fromhex(hex_data).decode()
block = metadata["block"]
incentive = metagraph.incentive[uid].nan_to_num().item()
emission = (
metagraph.emission[uid].nan_to_num().item() * 20
) # convert to daily TAO
model_data = None
try:
model_data = ModelData.from_compressed_str(
uid, hotkey, chain_str, block, incentive, emission
)
except:
continue
result.append(model_data)
return result
def is_floatable(x) -> bool:
return (
isinstance(x, float) and not math.isnan(x) and not math.isinf(x)
) or isinstance(x, int)
def get_scores(
uids: List[int],
) -> Dict[int, Dict[str, Optional[float]]]:
runs = []
while True:
api = wandb.Api(api_key=WANDB_TOKEN)
runs = list(
api.runs(
VALIDATOR_WANDB_PROJECT,
filters={"config.type": "validator", "config.uid": 238},
)
)
if len(runs) > 0:
break
# WandDB API is quite unreliable. Wait another minute and try again.
print("Failed to get runs from Wandb. Trying again in 60 seconds.")
time.sleep(60)
result = {}
previous_timestamp = None
# Iterate through the runs until we've processed all the uids.
for i, run in enumerate(runs):
if not "original_format_json" in run.summary:
continue
data = json.loads(run.summary["original_format_json"])
all_uid_data = data["uid_data"]
timestamp = data["timestamp"]
# Make sure runs are indeed in descending time order.
assert (
previous_timestamp is None or timestamp < previous_timestamp
), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}"
previous_timestamp = timestamp
for uid in uids:
if uid in result:
continue
if str(uid) in all_uid_data:
uid_data = all_uid_data[str(uid)]
# Only the most recent run is fresh.
is_fresh = i == 0
result[uid] = {
"avg_loss": uid_data.get("average_loss", None),
"win_rate": uid_data.get("win_rate", None),
"win_total": uid_data.get("win_total", None),
"weight": uid_data.get("weight", None),
"fresh": is_fresh,
}
if len(result) == len(uids):
break
return result
def format_score(uid: int, scores, key) -> Optional[float]:
if uid in scores:
if key in scores[uid]:
point = scores[uid][key]
if is_floatable(point):
return round(scores[uid][key], 4)
return None
def next_epoch(subtensor: bt.subtensor, block: int) -> int:
return block + subtensor.get_subnet_hyperparameters(
NETUID
).tempo - subtensor.blocks_since_epoch(NETUID, block)
def get_next_update_div(current_block: int, next_update_block: int) -> str:
now = datetime.datetime.now()
blocks_to_go = next_update_block - current_block
next_update_time = now + datetime.timedelta(
seconds=blocks_to_go * SECONDS_PER_BLOCK
)
delta = next_update_time - now
return f"""
Next reward update: {blocks_to_go} blocks (~{int(delta.total_seconds() // 60)} minutes)