from pathlib import Path import json import os import gradio as gr from huggingface_hub import snapshot_download from gradio_leaderboard import Leaderboard, SelectColumns import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from ttsds.benchmarks.benchmark import BenchmarkCategory from ttsds import BenchmarkSuite from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN, TAGS from src.texts import LLM_BENCHMARKS_TEXT, EVALUATION_QUEUE_TEXT, CITATION_TEXT from src.css_html_js import custom_css def filter_dfs(tags, lb): global f_b_df, f_a_df is_agg = False if "Environment" in lb.columns: is_agg = True if is_agg: lb = f_a_df.copy() else: lb = f_b_df.copy() if tags and len(lb) > 0: lb = lb[lb["Tags"].apply(lambda x: any(tag in x for tag in tags))] lb = rounded_df(lb) return lb def change_mean(env, lb): global f_b_df, f_a_df lb = f_a_df.copy() if env: mean_cols = [col for col in lb.columns if str(col) not in ["Mean", "Environment", "Model", "Tags"]] else: mean_cols = [col for col in lb.columns if str(col) not in ["Mean", "Model", "Tags"]] lb["Mean"] = lb[mean_cols].mean(axis=1) lb = rounded_df(lb) return lb def restart_space(): API.restart_space(repo_id=REPO_ID) def submit_eval(model_name, model_tags, web_url, hf_url, code_url, paper_url, inference_details, file_path): model_id = model_name.lower().replace(" ", "_") # check if model already exists if Path(f"{EVAL_REQUESTS_PATH}/{model_id}.json").exists(): return "Model already exists in the evaluation queue" # check which urls are valid if web_url and not web_url.startswith("http"): return "Please enter a valid URL" if hf_url and not hf_url.startswith("http"): return "Please enter a valid URL" if code_url and not code_url.startswith("http"): return "Please enter a valid URL" if paper_url and not paper_url.startswith("http"): return "Please enter a valid URL" # move file to correct location if not file_path.endswith(".tar.gz"): return "Please upload a .tar.gz file" Path(file_path).rename(f"{EVAL_REQUESTS_PATH}/{model_id}.tar.gz") # build display name - use web_url to link text if available, and emojis for the other urls display_name = model_name + " " if web_url: display_name = f"[{display_name}]({web_url}) " if hf_url: display_name += f"[🤗]({hf_url})" if code_url: display_name += f"[💻]({code_url})" if paper_url: display_name += f"[📄]({paper_url})" request_obj = { "model_name": model_name, "display_name": display_name, "model_tags": model_tags, "web_url": web_url, "hf_url": hf_url, "code_url": code_url, "paper_url": paper_url, "inference_details": inference_details, "status": "pending", } try: with open(f"{EVAL_REQUESTS_PATH}/{model_id}.json", "w") as f: json.dump(request_obj, f) API.upload_file( path_or_fileobj=f"{EVAL_REQUESTS_PATH}/{model_id}.json", path_in_repo=f"{model_id}.json", repo_id=QUEUE_REPO, repo_type="dataset", commit_message=f"Add {model_name} to evaluation queue", ) API.upload_file( path_or_fileobj=f"{EVAL_REQUESTS_PATH}/{model_id}.tar.gz", path_in_repo=f"{model_id}.tar.gz", repo_id=QUEUE_REPO, repo_type="dataset", commit_message=f"Add {model_name} to evaluation queue", ) except error as e: os.remove(f"{EVAL_REQUESTS_PATH}/{model_id}.json") return f"Error: {e}" return "Model submitted successfully 🎉" ### Space initialisation try: print(EVAL_REQUESTS_PATH) snapshot_download( repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN, ) except Exception: restart_space() try: print(EVAL_RESULTS_PATH) snapshot_download( repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN, ) except Exception: restart_space() def rounded_df(df): df = df.copy() for col in df.columns: if isinstance(df[col].values[0], float): df[col] = df[col].apply(lambda x: round(x, 2)) return df results_df = pd.read_csv(EVAL_RESULTS_PATH + "/results.csv") agg_df = BenchmarkSuite.aggregate_df(results_df) agg_df = agg_df.pivot(index="dataset", columns="benchmark_category", values="score") agg_df.rename(columns={"OVERALL": "General"}, inplace=True) agg_df.columns = [x.capitalize() for x in agg_df.columns] mean_cols = [col for col in agg_df.columns if str(col) not in ["Mean", "Environment", "Model", "Tags"]] agg_df["Mean"] = agg_df[mean_cols].mean(axis=1) # make sure mean is the first column agg_df = agg_df[["Mean"] + [col for col in agg_df.columns if col != "Mean"]] agg_df["Tags"] = "" agg_df.reset_index(inplace=True) agg_df.rename(columns={"dataset": "Model"}, inplace=True) agg_df.sort_values("Mean", ascending=False, inplace=True) benchmark_df = results_df.pivot(index="dataset", columns="benchmark_name", values="score") # get benchmark name order by category benchmark_order = list(results_df.sort_values("benchmark_category")["benchmark_name"].unique()) benchmark_df = benchmark_df[benchmark_order] benchmark_df = benchmark_df.reset_index() benchmark_df.rename(columns={"dataset": "Model"}, inplace=True) # set index benchmark_df.set_index("Model", inplace=True) benchmark_df["Mean"] = benchmark_df.mean(axis=1) # make sure mean is the first column benchmark_df = benchmark_df[["Mean"] + [col for col in benchmark_df.columns if col != "Mean"]] benchmark_df["Tags"] = "" benchmark_df.reset_index(inplace=True) benchmark_df.sort_values("Mean", ascending=False, inplace=True) # get details for each model model_detail_files = Path(EVAL_REQUESTS_PATH).glob("*.json") model_details = {} for model_detail_file in model_detail_files: with open(model_detail_file) as f: model_detail = json.load(f) model_details[model_detail_file.stem] = model_detail # replace .tar.gz benchmark_df["Model"] = benchmark_df["Model"].apply(lambda x: x.replace(".tar.gz", "")) agg_df["Model"] = agg_df["Model"].apply(lambda x: x.replace(".tar.gz", "")) benchmark_df["Tags"] = benchmark_df["Model"].apply(lambda x: model_details.get(x, {}).get("model_tags", "")) agg_df["Tags"] = agg_df["Model"].apply(lambda x: model_details.get(x, {}).get("model_tags", "")) benchmark_df["Model"] = benchmark_df["Model"].apply(lambda x: model_details.get(x, {}).get("display_name", x)) agg_df["Model"] = agg_df["Model"].apply(lambda x: model_details.get(x, {}).get("display_name", x)) f_b_df = benchmark_df.copy() f_a_df = agg_df.copy() def init_leaderboard(dataframe): if dataframe is None or dataframe.empty: raise ValueError("Leaderboard DataFrame is empty or None.") df_types = [] for col in dataframe.columns: if col == "Model": df_types.append("markdown") elif col == "Tags": df_types.append("markdown") else: df_types.append("number") cols = list(dataframe.columns) cols.remove("Tags") return Leaderboard( value=rounded_df(dataframe), select_columns=SelectColumns( default_selection=cols, cant_deselect=["Model", "Mean"], label="Select Columns to Display:", ), search_columns=["Model", "Tags"], filter_columns=[], interactive=False, datatype=df_types, ) app = gr.Blocks(css=custom_css, title="TTS Benchmark Leaderboard") with app: with gr.Tabs(elem_classes="tab-buttons") as tabs: with gr.TabItem("🏅 TTSDB Scores", elem_id="llm-benchmark-tab-table", id=0): with gr.Group(): env = gr.Checkbox(value=True, label="Exclude environment from mean.") gr.Markdown("**Environment** measures how well the system can reproduce noise in the training data. This doesn't correlate with human judgements for 'naturalness'") tags = gr.Dropdown( TAGS, value=[], multiselect=True, label="Tags", info="Select tags to filter the leaderboard. You can suggest new tags here: https://huggingface.co/spaces/ttsds/benchmark/discussions/1", ) leaderboard = init_leaderboard(f_a_df) tags.change(filter_dfs, [tags, leaderboard], [leaderboard]) env.change(change_mean, [env, leaderboard], [leaderboard]) with gr.TabItem("🏅 Individual Benchmarks", elem_id="llm-benchmark-tab-table", id=1): tags = gr.Dropdown( TAGS, value=[], multiselect=True, label="Tags", info="Select tags to filter the leaderboard", ) leaderboard = init_leaderboard(f_b_df) tags.change(filter_dfs, [tags, leaderboard], [leaderboard]) with gr.TabItem("📝 About", elem_id="llm-benchmark-tab-table", id=2): gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") with gr.TabItem("🚀 Submit here!", elem_id="llm-benchmark-tab-table", id=3): with gr.Column(): with gr.Row(): gr.Markdown(EVALUATION_QUEUE_TEXT, elem_classes="markdown-text") with gr.Row(): gr.Markdown("# ✉️✨ Submit a TTS dataset here!", elem_classes="markdown-text") with gr.Row(): with gr.Column(): model_name_textbox = gr.Textbox(label="Model name") model_tags_dropdown = gr.Dropdown( label="Model tags", choices=TAGS, multiselect=True, ) website_url_textbox = gr.Textbox(label="Website URL (optional)") hf_url_textbox = gr.Textbox(label="Huggingface URL (optional)") code_url_textbox = gr.Textbox(label="Code URL (optional)") paper_url_textbox = gr.Textbox(label="Paper URL (optional)") inference_details_textbox = gr.TextArea(label="Inference details (optional)") file_input = gr.File(file_types=[".gz"], interactive=True, label=".tar.gz TTS dataset") submit_button = gr.Button("Submit Eval") submission_result = gr.Markdown() submit_button.click( submit_eval, [ model_name_textbox, model_tags_dropdown, website_url_textbox, hf_url_textbox, code_url_textbox, paper_url_textbox, inference_details_textbox, file_input, ], submission_result, ) with gr.Row(): with gr.Accordion("Citation", open=False): gr.Markdown(f"Copy the BibTeX citation to cite this source:\n\n```bibtext\n{CITATION_TEXT}\n```") scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=1800) scheduler.start() app.queue(default_concurrency_limit=40).launch()