myshell-test's picture
Update app.py
d4f1441 verified
raw
history blame
18.8 kB
import gradio as gr
import bittensor as bt
import typing
from bittensor.extrinsics.serving import get_metadata
from dataclasses import dataclass
import requests
import wandb
import math
import os
import datetime
import time
import functools
import multiprocessing
from dotenv import load_dotenv
from huggingface_hub import HfApi
from apscheduler.schedulers.background import BackgroundScheduler
from tqdm import tqdm
import concurrent.futures
import sys
import numpy as np
load_dotenv()
FONT = (
"""<link href="https://fonts.cdnfonts.com/css/jmh-typewriter" rel="stylesheet">"""
)
TITLE = """<h1 align="center" id="space-title" class="typewriter">MyShell TTS Subnet Leaderboard</h1>"""
IMAGE = """<a href="https://discord.gg/myshell" target="_blank"><img src="https://avatars.githubusercontent.com/u/127754094?s=2000&v=4" alt="MyShell" style="margin: auto; width: 20%; border: 0;" /></a>"""
HEADER = """<h2 align="center" class="typewriter">MyShell TTS Subnet is a groundbreaking project that leverages the power of decentralized collaboration to advance the state-of-the-art in open-source Text-to-Speech (TTS) technology. By harnessing the Bittensor blockchain and a unique incentive mechanism, we aim to create the most advanced and accessible TTS models. By leveraging MyShell's user base of over one million individuals, we are devoted to pushing cutting-edge technology to every end-user.</h3>"""
EVALUATION_DETAILS = """<b>Name</b> is the 🤗 Hugging Face model name (click to go to the model card). <b>Rewards / Day</b> are the expected rewards per day for each model. <b>Block</b> is the Bittensor block that the model was submitted in. More stats on <a href="https://taostats.io/subnets/netuid-3/" target="_blank">taostats</a>."""
EVALUATION_HEADER = """<h3 align="center">Shows the latest internal evaluation statistics as calculated by a validator run by MyShell, the results are just for reference. </h3>"""
VALIDATOR_WANDB_PROJECT = "myshell_tc/tts_subnet_validator"
# os.environ.get("VALIDATOR_WANDB_PROJECT")
H4_TOKEN = os.environ.get("H4_TOKEN", None)
API = HfApi(token=H4_TOKEN)
REPO_ID = "myshell-test/tts-subnet-leaderboard"
METAGRAPH_RETRIES = 10
METAGRAPH_DELAY_SECS = 30
METADATA_TTL = 10
NETUID = 3
SUBNET_START_BLOCK = 2635801
SECONDS_PER_BLOCK = 12
SUBTENSOR = os.environ.get("SUBTENSOR", "finney")
@dataclass
class Competition:
id: str
name: str
COMPETITIONS = [
Competition(id="p255", name="anispeech-speaker-new"),
Competition(id="p257", name="anispeech-speaker-old"),
]
DEFAULT_COMPETITION_ID = "p255"
last_refresh = None
def run_in_subprocess(func: functools.partial, ttl: int) -> typing.Any:
"""Runs the provided function on a subprocess with 'ttl' seconds to complete.
Args:
func (functools.partial): Function to be run.
ttl (int): How long to try for in seconds.
Returns:
Any: The value returned by 'func'
"""
def wrapped_func(func: functools.partial, queue: multiprocessing.Queue):
try:
result = func()
queue.put(result)
except (Exception, BaseException) as e:
# Catch exceptions here to add them to the queue.
queue.put(e)
# Use "fork" (the default on all POSIX except macOS), because pickling doesn't seem
# to work on "spawn".
ctx = multiprocessing.get_context("fork")
queue = ctx.Queue()
process = ctx.Process(target=wrapped_func, args=[func, queue])
process.start()
process.join(timeout=ttl)
if process.is_alive():
process.terminate()
process.join()
raise TimeoutError(f"Failed to {func.func.__name__} after {ttl} seconds")
# Raises an error if the queue is empty. This is fine. It means our subprocess timed out.
result = queue.get(block=False)
# If we put an exception on the queue then raise instead of returning.
if isinstance(result, Exception):
raise result
if isinstance(result, BaseException):
raise Exception(f"BaseException raised in subprocess: {str(result)}")
return result
def get_subtensor_and_metagraph() -> typing.Tuple[bt.subtensor, bt.metagraph]:
for i in range(0, METAGRAPH_RETRIES):
try:
print("Connecting to subtensor...")
subtensor: bt.subtensor = bt.subtensor(SUBTENSOR)
print("Pulling metagraph...")
metagraph: bt.metagraph = subtensor.metagraph(NETUID, lite=False)
return subtensor, metagraph
except:
if i == METAGRAPH_RETRIES - 1:
raise
print(
f"Error connecting to subtensor or pulling metagraph, retry {i + 1} of {METAGRAPH_RETRIES} in {METAGRAPH_DELAY_SECS} seconds..."
)
time.sleep(METAGRAPH_DELAY_SECS)
raise RuntimeError()
@dataclass
class ModelData:
uid: int
hotkey: str
namespace: str
name: str
commit: str
hash: str
block: int
incentive: float
emission: float
competition: str
@classmethod
def from_compressed_str(
cls,
uid: int,
hotkey: str,
cs: str,
block: int,
incentive: float,
emission: float,
):
"""Returns an instance of this class from a compressed string representation"""
tokens = cs.split(":")
return ModelData(
uid=uid,
hotkey=hotkey,
namespace=tokens[0],
name=tokens[1],
commit=tokens[2] if tokens[2] != "None" else "",
hash=tokens[3] if tokens[3] != "None" else "",
competition=tokens[4]
if len(tokens) > 4 and tokens[4] != "None"
else DEFAULT_COMPETITION_ID,
block=block,
incentive=incentive,
emission=emission,
)
def get_tao_price() -> float:
for i in range(0, METAGRAPH_RETRIES):
try:
return float(requests.get("https://api.mexc.com/api/v3/avgPrice?symbol=TAOUSDT").json()["price"])
except:
if i == METAGRAPH_RETRIES - 1:
raise
time.sleep(METAGRAPH_DELAY_SECS)
raise RuntimeError()
def get_validator_weights(
metagraph: bt.metagraph,
) -> typing.Dict[int, typing.Tuple[float, int, typing.Dict[int, float]]]:
ret = {}
for uid in metagraph.uids.tolist():
vtrust = metagraph.validator_trust[uid].item()
if vtrust > 0:
ret[uid] = (vtrust, metagraph.S[uid].item(), {})
for ouid in metagraph.uids.tolist():
if ouid == uid:
continue
weight = round(metagraph.weights[uid][ouid].item(), 4)
if weight > 0:
ret[uid][-1][ouid] = weight
return ret
def get_subnet_data(
subtensor: bt.subtensor, metagraph: bt.metagraph
) -> typing.List[ModelData]:
global last_refresh
# Function to be executed in a thread
def fetch_data(uid):
hotkey = metagraph.hotkeys[uid]
try:
partial = functools.partial(
get_metadata, subtensor, metagraph.netuid, hotkey
)
metadata = run_in_subprocess(partial, METADATA_TTL)
except Exception as e:
return None
if not metadata:
return None
commitment = metadata["info"]["fields"][0]
hex_data = commitment[list(commitment.keys())[0]][2:]
chain_str = bytes.fromhex(hex_data).decode()
block = metadata["block"]
# incentive = metagraph.incentive[uid].nan_to_num().item()
incentive = np.nan_to_num(metagraph.incentive[uid]).item()
emission = (
np.nan_to_num(metagraph.emission[uid]).item() * 20
# metagraph.emission[uid].nan_to_num().item() * 20
) # convert to daily TAO
try:
model_data = ModelData.from_compressed_str(
uid, hotkey, chain_str, block, incentive, emission
)
except Exception as e:
return None
return model_data
# Use ThreadPoolExecutor to fetch data in parallel
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
# Prepare the list of futures
futures = [executor.submit(fetch_data, uid) for uid in metagraph.uids.tolist()]
for future in tqdm(
concurrent.futures.as_completed(futures),
desc="Metadata for hotkeys",
total=len(futures),
):
result = future.result()
if result:
results.append(result)
last_refresh = datetime.datetime.now()
return results
def floatable(x) -> bool:
return (
isinstance(x, float) and not math.isnan(x) and not math.isinf(x)
) or isinstance(x, int)
def get_float_score(
key: str, history, competition_id: str
) -> typing.Tuple[typing.Optional[float], bool]:
if key in history and "competition_id" in history:
data = list(history[key])
if len(data) > 0:
competitions = list(history["competition_id"])
while True:
if competitions.pop() != competition_id:
data.pop()
continue
if floatable(data[-1]):
return float(data[-1]), True
else:
data = [float(x) for x in data if floatable(x)]
if len(data) > 0:
return float(data[-1]), False
break
return None, False
def get_sample(
uid, history, competition_id: str
) -> typing.Optional[typing.Tuple[str, str, str]]:
prompt_key = f"sample_prompt_data.{uid}"
response_key = f"sample_response_data.{uid}"
truth_key = f"sample_truth_data.{uid}"
if (
prompt_key in history
and response_key in history
and truth_key in history
and "competition_id" in history
):
competitions = list(history["competition_id"])
prompts = list(history[prompt_key])
responses = list(history[response_key])
truths = list(history[truth_key])
while True:
prompt = prompts.pop()
response = responses.pop()
truth = truths.pop()
if competitions.pop() != competition_id:
continue
if (
isinstance(prompt, str)
and isinstance(response, str)
and isinstance(truth, str)
):
return prompt, response, truth
break
return None
def get_scores(
uids: typing.List[int], competition_id: str
) -> typing.Dict[int, typing.Dict[str, typing.Optional[float | str]]]:
api = wandb.Api()
runs = list(api.runs(VALIDATOR_WANDB_PROJECT))
result = {}
for run in runs:
history = run.history()
for uid in uids:
if uid in result.keys():
continue
win_rate, win_rate_fresh = get_float_score(
f"win_rate_data.{uid}", history, competition_id
)
win_total, win_total_fresh = get_float_score(
f"win_total_data.{uid}", history, competition_id
)
weight, weight_fresh = get_float_score(
f"weight_data.{uid}", history, competition_id
)
sample = get_sample(uid, history, competition_id)
result[uid] = {
"win_rate": win_rate,
"win_total": win_total,
"weight": weight,
"sample": sample,
"fresh": win_rate_fresh and win_total_fresh,
}
if len(result.keys()) == len(uids):
break
return result
def format_score(uid, scores, key) -> typing.Optional[float]:
if uid in scores:
if key in scores[uid]:
point = scores[uid][key]
if floatable(point):
return round(scores[uid][key], 4)
return None
def next_tempo(start_block, tempo, block):
start_num = start_block + tempo
intervals = (block - start_num) // tempo
nearest_num = start_num + ((intervals + 1) * tempo)
return nearest_num
subtensor, metagraph = get_subtensor_and_metagraph()
tao_price = get_tao_price()
leaderboard_df = get_subnet_data(subtensor, metagraph)
leaderboard_df.sort(key=lambda x: x.incentive, reverse=True)
print(leaderboard_df)
competition_scores = {
y.id: get_scores([x.uid for x in leaderboard_df if x.competition == y.id], y.id)
for y in COMPETITIONS
}
current_block = metagraph.block.item()
next_update = next_tempo(
SUBNET_START_BLOCK,
360,
current_block,
)
blocks_to_go = next_update - current_block
current_time = datetime.datetime.now()
next_update_time = current_time + datetime.timedelta(
seconds=blocks_to_go * SECONDS_PER_BLOCK
)
validator_df = get_validator_weights(metagraph)
weight_keys = set()
for uid, stats in validator_df.items():
weight_keys.update(stats[-1].keys())
def get_next_update():
now = datetime.datetime.now()
delta = next_update_time - now
return f"""<div align="center" style="font-size: larger;">Next reward update: <b>{blocks_to_go}</b> blocks (~{int(delta.total_seconds() // 60)} minutes)</div>"""
def leaderboard_data(
show_stale: bool,
scores: typing.Dict[int, typing.Dict[str, typing.Optional[float | str]]],
competition_id: str,
):
value = [
[
f"[{c.namespace}/{c.name} ({c.commit[0:8]}, UID={c.uid})](https://huggingface.co/{c.namespace}/{c.name}/commit/{c.commit})",
format_score(c.uid, scores, "win_rate"),
format_score(c.uid, scores, "weight"),
c.uid,
c.block,
]
for c in leaderboard_df
if c.competition == competition_id and (scores[c.uid]["fresh"] or show_stale)
]
return value
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}")
with demo:
gr.HTML(FONT)
gr.HTML(TITLE)
gr.HTML(IMAGE)
gr.HTML(HEADER)
gr.HTML(value=get_next_update())
with gr.Tabs():
for competition in COMPETITIONS:
with gr.Tab(competition.name):
scores = competition_scores[competition.id]
print(scores)
class_denominator = sum(
leaderboard_df[i].incentive
for i in range(0, min(10, len(leaderboard_df)))
if leaderboard_df[i].incentive
and leaderboard_df[i].competition == competition.id
)
class_values = {
f"{leaderboard_df[i].namespace}/{leaderboard_df[i].name} ({leaderboard_df[i].commit[0:8]}, UID={leaderboard_df[i].uid}) · ${round(leaderboard_df[i].emission * tao_price, 2):,}{round(leaderboard_df[i].emission, 2):,})": leaderboard_df[
i
].incentive
/ class_denominator
for i in range(0, min(10, len(leaderboard_df)))
if leaderboard_df[i].incentive
and leaderboard_df[i].competition == competition.id
}
gr.Label(
value=class_values,
num_top_classes=10,
)
with gr.Accordion("Evaluation Stats"):
gr.HTML(
EVALUATION_HEADER.replace(
"{date}",
last_refresh.strftime("refreshed at %H:%M on %Y-%m-%d"),
)
)
with gr.Tabs():
for entry in leaderboard_df:
if entry.competition == competition.id:
sample = scores[entry.uid]["sample"]
if sample is not None:
name = f"{entry.namespace}/{entry.name} ({entry.commit[0:8]}, UID={entry.uid})"
with gr.Tab(name):
gr.Chatbot([(sample[0], sample[1])])
# gr.Chatbot([(sample[0], f"*{name}*: {sample[1]}"), (None, f"*GPT-4*: {sample[2]}")])
show_stale = gr.Checkbox(label="Show Stale", interactive=True)
leaderboard_table = gr.components.Dataframe(
value=leaderboard_data(
show_stale.value, scores, competition.id
),
headers=[
"Name",
"Win Rate",
"Weight",
"UID",
"Block",
],
datatype=[
"markdown",
"number",
"number",
"number",
"number",
],
elem_id="leaderboard-table",
interactive=False,
visible=True,
)
gr.HTML(EVALUATION_DETAILS)
show_stale.change(
lambda x: leaderboard_data(x, scores, competition.id),
[show_stale],
leaderboard_table,
)
with gr.Accordion("Validator Stats"):
validator_table = gr.components.Dataframe(
value=[
[uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)]
+ [
validator_df[uid][-1].get(c.uid)
for c in leaderboard_df
if c.incentive
]
for uid, _ in sorted(
zip(
validator_df.keys(),
[validator_df[x][1] for x in validator_df.keys()],
),
key=lambda x: x[1],
reverse=True,
)
],
headers=["UID", "Stake (τ)", "V-Trust"]
+ [
f"{c.namespace}/{c.name} ({c.commit[0:8]}, UID={c.uid})"
for c in leaderboard_df
if c.incentive
],
datatype=["number", "number", "number"]
+ ["number" for c in leaderboard_df if c.incentive],
interactive=False,
visible=True,
)
def restart_space():
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN)
# # Switch to independent restarter now
# scheduler = BackgroundScheduler()
# scheduler.add_job(restart_space, "interval", seconds=60 * 5) # restart every 15 minutes
# scheduler.start()
demo.launch()