Spaces:
Runtime error
Runtime error
import json | |
import logging | |
import os | |
import subprocess | |
import gradio as gr | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from gradio_leaderboard import Leaderboard, SelectColumns | |
from gradio_space_ci import enable_space_ci | |
from src.display.about import ( | |
INTRODUCTION_TEXT, | |
TITLE, | |
) | |
from src.display.css_html_js import custom_css | |
from src.display.utils import ( | |
AutoEvalColumn, | |
fields, | |
) | |
from src.envs import ( | |
API, | |
DATA_PATH, | |
H4_TOKEN, | |
HF_HOME, | |
HF_TOKEN_PRIVATE, | |
METAINFO_DATASET, | |
PERSISTENT_FILE_CHECK, | |
PERSISTENT_FILE_CHECK_PATH, | |
REPO_ID, | |
RESET_JUDGEMENT_ENV, | |
SUBMITS_META_FILE, | |
) | |
from src.leaderboard.build_leaderboard import build_leadearboard_df, download_meta | |
os.environ["GRADIO_ANALYTICS_ENABLED"] = "false" | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
# Start ephemeral Spaces on PRs (see config in README.md) | |
enable_space_ci() | |
download_meta() | |
def build_demo(): | |
demo = gr.Blocks(title="Chatbot Arena Leaderboard", css=custom_css) | |
try: | |
leaderboard_df = build_leadearboard_df() | |
except FileNotFoundError: | |
change_judgement(1) | |
return | |
with demo: | |
gr.HTML(TITLE) | |
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") | |
with gr.Tabs(elem_classes="tab-buttons"): | |
with gr.TabItem("π LLM Benchmark", elem_id="llm-benchmark-tab-table", id=0): | |
Leaderboard( | |
value=leaderboard_df, | |
datatype=[c.type for c in fields(AutoEvalColumn)], | |
select_columns=SelectColumns( | |
default_selection=[c.name for c in fields(AutoEvalColumn) if c.displayed_by_default], | |
cant_deselect=[c.name for c in fields(AutoEvalColumn) if c.never_hidden or c.dummy], | |
label="Select Columns to Display:", | |
), | |
search_columns=[ | |
AutoEvalColumn.model_name.name, | |
AutoEvalColumn.username.name, | |
AutoEvalColumn.link.name, | |
], | |
) | |
# with gr.TabItem("π About", elem_id="llm-benchmark-tab-table", id=1): | |
# gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") | |
# with gr.TabItem("βFAQ", elem_id="llm-benchmark-tab-table", id=2): | |
# gr.Markdown(FAQ_TEXT, elem_classes="markdown-text") | |
with gr.TabItem("π Submit ", elem_id="llm-benchmark-tab-table", id=3): | |
with gr.Row(): | |
gr.Markdown("# β¨ Submit your model here!", elem_classes="markdown-text") | |
with gr.Column(): | |
model_name_textbox = gr.Textbox(label="Model name") | |
submitter_username = gr.Textbox(label="Username") # can we get this info from hf?? | |
model_link_web = gr.Textbox(label="Link to Model") # can we get this info from hf?? | |
def upload_file(file, model_name, username, link): | |
file_name = file.name.split("/")[-1] if "/" in file.name else file.name | |
file_name = model_name.value + "_" + file_name | |
with open(f"{DATA_PATH}/{SUBMITS_META_FILE}", "r", encoding="utf-8") as submit_meta_file: | |
current_info = json.loads(submit_meta_file) | |
# for now just do not save same name model | |
if any(filter(lambda x: x["model_name"] == model_name, current_info)): | |
return False | |
submit_info = { | |
"model_name": model_name, | |
"username": username, | |
"file_name": file_name, | |
"link": link if link else "", | |
} | |
current_info.append(submit_info) | |
with open(f"{DATA_PATH}/{SUBMITS_META_FILE}", "w", encoding="utf-8") as submit_meta_file: | |
submit_meta_file.write(json.dumps(current_info)) | |
logging.info( | |
"New submition: file from %s saved to %s with model %s", | |
username, | |
file_name, | |
model_name, | |
) | |
API.upload_file( | |
path_or_fileobj=file.name, | |
path_in_repo="arena-hard-v0.1/model_answers/" + file_name, | |
repo_id=METAINFO_DATASET, | |
repo_type="dataset", | |
token=HF_TOKEN_PRIVATE, | |
) | |
API.upload_file( | |
path_or_fileobj=SUBMITS_META_FILE, | |
path_in_repo=SUBMITS_META_FILE, | |
repo_id=METAINFO_DATASET, | |
repo_type="dataset", | |
token=HF_TOKEN_PRIVATE, | |
) | |
change_judgement(1) | |
return file.name | |
if model_name_textbox and submitter_username: | |
file_output = gr.File() | |
upload_button = gr.UploadButton( | |
"Click to Upload & Submit Answers", file_types=["*"], file_count="single" | |
) | |
upload_button.upload(upload_file, upload_button, file_output, inputs=[model_name_textbox, submitter_username, model_link_web]) | |
return demo | |
# print(os.system('cd src/gen && ../../.venv/bin/python gen_judgment.py')) | |
# print(os.system('cd src/gen/ && python show_result.py --output')) | |
def update_board(): | |
# very shitty solution, where we update board only when needed | |
# the state is checked by the the file PERSISTENT_FILE_CHECK | |
# very bad solution | |
# but a fast one to code | |
need_reset = os.environ[RESET_JUDGEMENT_ENV] == "1" | |
logging.info("Updating the judgement: %s", need_reset) | |
if need_reset != "1": | |
return | |
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN) | |
def change_judgement(need_recalc_state=0): | |
with open(PERSISTENT_FILE_CHECK_PATH, "w", encoding="utf-8") as file: | |
file.write(str(need_recalc_state)) | |
API.upload_file( | |
path_or_fileobj=PERSISTENT_FILE_CHECK_PATH, | |
path_in_repo=PERSISTENT_FILE_CHECK, | |
repo_id=METAINFO_DATASET, | |
repo_type="dataset", | |
token=HF_TOKEN_PRIVATE, | |
) | |
os.environ[RESET_JUDGEMENT_ENV] = str(need_recalc_state) | |
if __name__ == "__main__": | |
os.environ[RESET_JUDGEMENT_ENV] = "0" | |
need_recalc = False | |
try: | |
with open(PERSISTENT_FILE_CHECK_PATH, "r", encoding="utf-8") as file: | |
need_recalc = file.read() == "1" | |
except FileNotFoundError: | |
need_recalc = True | |
if need_recalc: | |
# does it need to be subprocess | |
# no | |
# was the not working code before using ones | |
# yes | |
# do i want to actually use my brain? | |
gen_judgement_file = os.path.join(HF_HOME, "src/gen/gen_judgment.py") | |
subprocess.run(["python3", gen_judgement_file], check=True) | |
show_result_file = os.path.join(HF_HOME, "src/gen/show_result.py") | |
subprocess.run(["python3", show_result_file, "--output"], check=True) | |
# finished creating the results | |
# may lead to multiple attempts to get judements | |
# if code above throws error | |
# move to before judgement to saafeguard from that | |
change_judgement(0) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(update_board, "interval", minutes=10) | |
scheduler.start() | |
demo_app = build_demo() | |
demo_app.launch(debug=True) | |