import gradio as gr
import bittensor as bt
import typing
from bittensor.extrinsics.serving import get_metadata
from dataclasses import dataclass
import requests
import wandb
import math
import os
import datetime
import time
import functools
import multiprocessing
from dotenv import load_dotenv
from huggingface_hub import HfApi
from apscheduler.schedulers.background import BackgroundScheduler
from tqdm import tqdm
import concurrent.futures
load_dotenv()
FONT = """"""
TITLE = """
Subnet 6 Leaderboard (with Cortex Foundation validator/subtensor)
"""
IMAGE = """"""
HEADER = """
Subnet 6 is a Bittensor subnet that incentivizes the creation of the best open models by evaluating submissions on a constant stream of newly generated synthetic GPT-4 data. The models with the best head-to-head loss on the evaluation data receive a steady emission of TAO."""
EVALUATION_DETAILS = """Name is the 🤗 Hugging Face model name (click to go to the model card). Rewards / Day are the expected rewards per day for each model. Perplexity is represents the loss on all of the evaluation data for the model as calculated by the validator (lower is better). UID is the Bittensor user id of the submitter. Block is the Bittensor block that the model was submitted in. More stats on taostats."""
EVALUATION_HEADER = """
Shows the latest internal evaluation statistics as calculated by a validator run by Cortex Foundation ({date})
"""
VALIDATOR_WANDB_PROJECT = os.environ["VALIDATOR_WANDB_PROJECT"]
H4_TOKEN = os.environ.get("H4_TOKEN", None)
API = HfApi(token=H4_TOKEN)
REPO_ID = "0x9/finetuning_subnet_leaderboard"
METAGRAPH_RETRIES = 10
METAGRAPH_DELAY_SECS = 30
METADATA_TTL = 10
NETUID = 6
SUBNET_START_BLOCK = 2225782
SECONDS_PER_BLOCK = 12
SUBTENSOR = os.environ.get("SUBTENSOR", "finney")
@dataclass
class Competition:
id: str
name: str
COMPETITIONS = [Competition(id="m1", name="mistral-7b"), Competition(id="s1", name="stablelm-2b")]
DEFAULT_COMPETITION_ID = "m1"
last_refresh = None
def run_in_subprocess(func: functools.partial, ttl: int) -> typing.Any:
"""Runs the provided function on a subprocess with 'ttl' seconds to complete.
Args:
func (functools.partial): Function to be run.
ttl (int): How long to try for in seconds.
Returns:
Any: The value returned by 'func'
"""
def wrapped_func(func: functools.partial, queue: multiprocessing.Queue):
try:
result = func()
queue.put(result)
except (Exception, BaseException) as e:
# Catch exceptions here to add them to the queue.
queue.put(e)
# Use "fork" (the default on all POSIX except macOS), because pickling doesn't seem
# to work on "spawn".
ctx = multiprocessing.get_context("fork")
queue = ctx.Queue()
process = ctx.Process(target=wrapped_func, args=[func, queue])
process.start()
process.join(timeout=ttl)
if process.is_alive():
process.terminate()
process.join()
raise TimeoutError(f"Failed to {func.func.__name__} after {ttl} seconds")
# Raises an error if the queue is empty. This is fine. It means our subprocess timed out.
result = queue.get(block=False)
# If we put an exception on the queue then raise instead of returning.
if isinstance(result, Exception):
raise result
if isinstance(result, BaseException):
raise Exception(f"BaseException raised in subprocess: {str(result)}")
return result
def get_subtensor_and_metagraph() -> typing.Tuple[bt.subtensor, bt.metagraph]:
for i in range(0, METAGRAPH_RETRIES):
try:
print("Connecting to subtensor...")
subtensor: bt.subtensor = bt.subtensor(SUBTENSOR)
print("Pulling metagraph...")
metagraph: bt.metagraph = subtensor.metagraph(NETUID, lite=False)
return subtensor, metagraph
except:
if i == METAGRAPH_RETRIES - 1:
raise
print(f"Error connecting to subtensor or pulling metagraph, retry {i + 1} of {METAGRAPH_RETRIES} in {METAGRAPH_DELAY_SECS} seconds...")
time.sleep(METAGRAPH_DELAY_SECS)
raise RuntimeError()
@dataclass
class ModelData:
uid: int
hotkey: str
namespace: str
name: str
commit: str
hash: str
block: int
incentive: float
emission: float
competition: str
@classmethod
def from_compressed_str(cls, uid: int, hotkey: str, cs: str, block: int, incentive: float, emission: float):
"""Returns an instance of this class from a compressed string representation"""
tokens = cs.split(":")
return ModelData(
uid=uid,
hotkey=hotkey,
namespace=tokens[0],
name=tokens[1],
commit=tokens[2] if tokens[2] != "None" else "",
hash=tokens[3] if tokens[3] != "None" else "",
competition=tokens[4] if len(tokens) > 4 and tokens[4] != "None" else DEFAULT_COMPETITION_ID,
block=block,
incentive=incentive,
emission=emission
)
def get_tao_price() -> float:
for i in range(0, METAGRAPH_RETRIES):
try:
return float(requests.get("https://api.mexc.com/api/v3/avgPrice?symbol=TAOUSDT").json()["price"])
except:
if i == METAGRAPH_RETRIES - 1:
raise
time.sleep(METAGRAPH_DELAY_SECS)
raise RuntimeError()
def get_validator_weights(metagraph: bt.metagraph) -> typing.Dict[int, typing.Tuple[float, int, typing.Dict[int, float]]]:
ret = {}
for uid in metagraph.uids.tolist():
vtrust = metagraph.validator_trust[uid].item()
if vtrust > 0:
ret[uid] = (vtrust, metagraph.S[uid].item(), {})
for ouid in metagraph.uids.tolist():
if ouid == uid:
continue
weight = round(metagraph.weights[uid][ouid].item(), 4)
if weight > 0:
ret[uid][-1][ouid] = weight
return ret
def get_subnet_data(subtensor: bt.subtensor, metagraph: bt.metagraph) -> typing.List[ModelData]:
global last_refresh
# Function to be executed in a thread
def fetch_data(uid):
hotkey = metagraph.hotkeys[uid]
try:
partial = functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey)
metadata = run_in_subprocess(partial, METADATA_TTL)
except Exception as e:
return None
if not metadata:
return None
commitment = metadata["info"]["fields"][0]
hex_data = commitment[list(commitment.keys())[0]][2:]
chain_str = bytes.fromhex(hex_data).decode()
block = metadata["block"]
incentive = metagraph.incentive[uid].nan_to_num().item()
emission = metagraph.emission[uid].nan_to_num().item() * 20 # convert to daily TAO
try:
model_data = ModelData.from_compressed_str(uid, hotkey, chain_str, block, incentive, emission)
except Exception as e:
return None
return model_data
# Use ThreadPoolExecutor to fetch data in parallel
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
# Prepare the list of futures
futures = [executor.submit(fetch_data, uid) for uid in metagraph.uids.tolist()]
for future in tqdm(concurrent.futures.as_completed(futures), desc="Metadata for hotkeys", total=len(futures)):
result = future.result()
if result:
results.append(result)
last_refresh = datetime.datetime.now()
return results
def floatable(x) -> bool:
return (isinstance(x, float) and not math.isnan(x) and not math.isinf(x)) or isinstance(x, int)
def get_float_score(key: str, history, competition_id: str) -> typing.Tuple[typing.Optional[float], bool]:
if key in history and "competition_id" in history:
data = list(history[key])
if len(data) > 0:
competitions = list(history["competition_id"])
while True:
if competitions.pop() != competition_id:
data.pop()
continue
if floatable(data[-1]):
return float(data[-1]), True
else:
data = [float(x) for x in data if floatable(x)]
if len(data) > 0:
return float(data[-1]), False
break
return None, False
def get_sample(uid, history, competition_id: str) -> typing.Optional[typing.Tuple[str, str, str]]:
prompt_key = f"sample_prompt_data.{uid}"
response_key = f"sample_response_data.{uid}"
truth_key = f"sample_truth_data.{uid}"
if prompt_key in history and response_key in history and truth_key in history and "competition_id" in history:
competitions = list(history["competition_id"])
prompts = list(history[prompt_key])
responses = list(history[response_key])
truths = list(history[truth_key])
while True:
prompt = prompts.pop()
response = responses.pop()
truth = truths.pop()
if competitions.pop() != competition_id:
continue
if isinstance(prompt, str) and isinstance(response, str) and isinstance(truth, str):
return prompt, response, truth
break
return None
def get_scores(uids: typing.List[int], competition_id: str) -> typing.Dict[int, typing.Dict[str, typing.Optional[float | str]]]:
api = wandb.Api()
runs = list(api.runs(VALIDATOR_WANDB_PROJECT))
result = {}
for run in runs:
history = run.history()
for uid in uids:
if uid in result.keys():
continue
perplexity, perplexity_fresh = get_float_score(f"perplexity_data.{uid}", history, competition_id)
win_rate, win_rate_fresh = get_float_score(f"win_rate_data.{uid}", history, competition_id)
win_total, win_total_fresh = get_float_score(f"win_total_data.{uid}", history, competition_id)
weight, weight_fresh = get_float_score(f"weight_data.{uid}", history, competition_id)
sample = get_sample(uid, history, competition_id)
result[uid] = {
"perplexity": perplexity,
"win_rate": win_rate,
"win_total": win_total,
"weight": weight,
"sample": sample,
"fresh": perplexity_fresh and win_rate_fresh and win_total_fresh
}
if len(result.keys()) == len(uids):
break
return result
def format_score(uid, scores, key) -> typing.Optional[float]:
if uid in scores:
if key in scores[uid]:
point = scores[uid][key]
if floatable(point):
return round(scores[uid][key], 4)
return None
def next_tempo(start_block, tempo, block):
start_num = start_block + tempo
intervals = (block - start_num) // tempo
nearest_num = start_num + ((intervals + 1) * tempo)
return nearest_num
subtensor, metagraph = get_subtensor_and_metagraph()
tao_price = get_tao_price()
leaderboard_df = get_subnet_data(subtensor, metagraph)
leaderboard_df.sort(key=lambda x: x.incentive, reverse=True)
competition_scores = {
y.id: get_scores([x.uid for x in leaderboard_df if x.competition == y.id], y.id)
for y in COMPETITIONS
}
current_block = metagraph.block.item()
next_update = next_tempo(
SUBNET_START_BLOCK,
subtensor.get_subnet_hyperparameters(NETUID).tempo,
current_block
)
blocks_to_go = next_update - current_block
current_time = datetime.datetime.now()
next_update_time = current_time + datetime.timedelta(seconds=blocks_to_go * SECONDS_PER_BLOCK)
validator_df = get_validator_weights(metagraph)
weight_keys = set()
for uid, stats in validator_df.items():
weight_keys.update(stats[-1].keys())
def get_next_update():
now = datetime.datetime.now()
delta = next_update_time - now
return f"""
Next reward update: {blocks_to_go} blocks (~{int(delta.total_seconds() // 60)} minutes)
"""
def leaderboard_data(show_stale: bool, scores: typing.Dict[int, typing.Dict[str, typing.Optional[float | str]]], competition_id: str):
value = [
[
f'[{c.namespace}/{c.name} ({c.commit[0:8]}, UID={c.uid})](https://huggingface.co/{c.namespace}/{c.name}/commit/{c.commit})',
format_score(c.uid, scores, "win_rate"),
format_score(c.uid, scores, "perplexity"),
format_score(c.uid, scores, "weight"),
c.uid,
c.block
] for c in leaderboard_df if c.competition == competition_id and (scores[c.uid]["fresh"] or show_stale)
]
return value
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}")
with demo:
gr.HTML(FONT)
gr.HTML(TITLE)
gr.HTML(IMAGE)
gr.HTML(HEADER)
gr.HTML(value=get_next_update())
with gr.Tabs():
for competition in COMPETITIONS:
with gr.Tab(competition.name):
scores = competition_scores[competition.id]
print(scores)
class_denominator = sum(leaderboard_df[i].incentive for i in range(0, 10) if leaderboard_df[i].incentive and leaderboard_df[i].competition == competition.id)
class_values = {
f"{leaderboard_df[i].namespace}/{leaderboard_df[i].name} ({leaderboard_df[i].commit[0:8]}, UID={leaderboard_df[i].uid}) · ${round(leaderboard_df[i].emission * tao_price, 2):,} (τ{round(leaderboard_df[i].emission, 2):,})": \
leaderboard_df[i].incentive / class_denominator for i in range(0, 10) if leaderboard_df[i].incentive and leaderboard_df[i].competition == competition.id
}
gr.Label(
value=class_values,
num_top_classes=10,
)
with gr.Accordion("Evaluation Stats"):
gr.HTML(EVALUATION_HEADER.replace("{date}", last_refresh.strftime("refreshed at %H:%M on %Y-%m-%d")))
with gr.Tabs():
for entry in leaderboard_df:
if entry.competition == competition.id:
sample = scores[entry.uid]["sample"]
if sample is not None:
name = f"{entry.namespace}/{entry.name} ({entry.commit[0:8]}, UID={entry.uid})"
with gr.Tab(name):
gr.Chatbot([(sample[0], sample[1])])
# gr.Chatbot([(sample[0], f"*{name}*: {sample[1]}"), (None, f"*GPT-4*: {sample[2]}")])
show_stale = gr.Checkbox(label="Show Stale", interactive=True)
leaderboard_table = gr.components.Dataframe(
value=leaderboard_data(show_stale.value, scores, competition.id),
headers=["Name", "Win Rate", "Perplexity", "Weight", "UID", "Block"],
datatype=["markdown", "number", "number", "number", "number", "number"],
elem_id="leaderboard-table",
interactive=False,
visible=True,
)
gr.HTML(EVALUATION_DETAILS)
show_stale.change(lambda x: leaderboard_data(x, scores, competition.id), [show_stale], leaderboard_table)
with gr.Accordion("Validator Stats"):
validator_table = gr.components.Dataframe(
value=[
[uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)] + [validator_df[uid][-1].get(c.uid) for c in leaderboard_df if c.incentive]
for uid, _ in sorted(
zip(validator_df.keys(), [validator_df[x][1] for x in validator_df.keys()]),
key=lambda x: x[1],
reverse=True
)
],
headers=["UID", "Stake (Ï„)", "V-Trust"] + [f"{c.namespace}/{c.name} ({c.commit[0:8]}, UID={c.uid})" for c in leaderboard_df if c.incentive],
datatype=["number", "number", "number"] + ["number" for c in leaderboard_df if c.incentive],
interactive=False,
visible=True,
)
def restart_space():
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=60*5) # restart every 15 minutes
scheduler.start()
demo.launch()