bhys's picture
Update app.py
2d6e814 verified
raw
history blame
12.3 kB
import os
import json
import datetime
from email.utils import parseaddr
import gradio as gr
import pandas as pd
import numpy as np
from datasets import load_dataset
from apscheduler.schedulers.background import BackgroundScheduler
from huggingface_hub import HfApi, snapshot_download
# InfoStrings
from scorer import question_scorer
from content import format_error, format_warning, format_log, TITLE, INTRODUCTION_TEXT, CITATION_BUTTON_LABEL, \
CITATION_BUTTON_TEXT, model_hyperlink
TOKEN = os.environ.get("TOKEN", None)
print(TOKEN)
OWNER = "autogenCTF"
DATA_DATASET = f"{OWNER}/CTFAIA"
INTERNAL_DATA_DATASET = f"{OWNER}/CTFAIA_internal"
SUBMISSION_DATASET = f"{OWNER}/CTFAIA_submissions_internal"
CONTACT_DATASET = f"{OWNER}/contact_info"
RESULTS_DATASET = f"{OWNER}/test_result"
LEADERBOARD_PATH = f"{OWNER}/agent_ctf_leaderboard"
api = HfApi()
YEAR_VERSION = "2024"
os.makedirs("scored", exist_ok=True)
all_version = ['20240602']
contact_infos = load_dataset(
CONTACT_DATASET,
token=TOKEN,
download_mode="force_redownload",
verification_mode="no_checks"
)
all_gold_dataset = {}
all_gold_results = {}
eval_results = {}
for dataset_version in all_version:
all_gold_dataset[dataset_version] = load_dataset(
INTERNAL_DATA_DATASET,
dataset_version,
token=TOKEN,
download_mode="force_redownload",
verification_mode="no_checks",
trust_remote_code=True
)
all_gold_results[dataset_version] = {
split: {row["task_name"]: row for row in all_gold_dataset[dataset_version][split]}
for split in ["test", "validation"]
}
eval_results[dataset_version] = load_dataset(
RESULTS_DATASET,
dataset_version,
token=TOKEN,
download_mode="force_redownload",
verification_mode="no_checks",
trust_remote_code=True
)
def get_dataframe_from_results(eval_results, split):
local_df = eval_results[split]
local_df = local_df.map(lambda row: {"model": model_hyperlink(row["url"], row["model"])})
local_df = local_df.remove_columns(["url"])
local_df = local_df.rename_column("model", "Model name")
local_df = local_df.rename_column("model_family", "Model family")
# local_df = local_df.rename_column("score", "Average score (%)")
# for i in [1, 2, 3]:
# local_df = local_df.rename_column(f"score_level{i}", f"Level {i} score (%)")
df = pd.DataFrame(local_df)
df = df.sort_values(by=["completion_level"], ascending=False)
numeric_cols = [c for c in local_df.column_names
if c in ["success_rate", "completion_level"]]
df[numeric_cols] = df[numeric_cols].multiply(100).round(decimals=2)
df = df.style.format("{:.2%}", subset=numeric_cols)
return df
eval_dataframe = {}
for dataset_version in all_version:
eval_dataframe[dataset_version] = get_dataframe_from_results(
eval_results=eval_results[dataset_version],
split="validation"
)
def restart_space():
api.restart_space(repo_id=LEADERBOARD_PATH, token=TOKEN)
TYPES = ["markdown", "str", "str", "str", "number", "number", "number", "number"]
LEVELS = ["all", 1, 2, 3]
def round_and_pad(number, ndigits=2):
# 四舍五入到指定的小数位数
rounded_number = round(number, ndigits)
# 转换为字符串
number_str = str(rounded_number)
# 分离整数部分和小数部分
integer_part, decimal_part = number_str.split('.')
# 如果小数部分不足指定的位数,补零
while len(decimal_part) < ndigits:
decimal_part += '0'
# 拼接回去,并转换回数字
return '.'.join([integer_part, decimal_part])
def add_new_eval(
dataset_version: str,
model: str,
model_family: str,
url: str,
path_to_file: str,
organisation: str,
mail: str,
):
val_or_test = 'validation'
# Very basic email parsing
_, parsed_mail = parseaddr(mail)
if not "@" in parsed_mail:
return format_warning("Please provide a valid email adress.")
print("Adding new eval")
# Check if the combination model/org already exists and prints a warning message if yes
if model.lower() in set(
[m.lower() for m in eval_results[dataset_version][val_or_test]["model"]]) and organisation.lower() in set(
[o.lower() for o in eval_results[dataset_version][val_or_test]["organisation"]]):
return format_warning("This model has been already submitted.")
if path_to_file is None:
return format_warning("Please attach a file.")
# Gold answers
gold_results = all_gold_results[dataset_version]
print(gold_results)
# Compute score
file_path = path_to_file.name
success_rate = {'all': 0, 1: 0, 2: 0, 3: 0}
completion_level = {'all': 0, 1: 0, 2: 0, 3: 0}
expertise = {'all': 0, 1: 0, 2: 0, 3: 0}
reasoning = {'all': 0, 1: 0, 2: 0, 3: 0}
comprehension = {'all': 0, 1: 0, 2: 0, 3: 0}
num = {'all': 0, 1: 0, 2: 0, 3: 0}
# with open(f"scored/{organisation}_{model}.jsonl", "w") as scored_file:
with open(f"scored/{organisation}_{model}.jsonl", "w") as scored_file:
with open(file_path, 'r') as f:
for ix, line in enumerate(f):
try:
task = json.loads(line)
except Exception:
return format_error(f"Line {ix} is incorrectly formatted. Please fix it and resubmit your file.")
if "final_answer" not in task:
raise format_error(f"Line {ix} contains no final_answer key. Please fix it and resubmit your file.")
answer = task["final_answer"]
task_name = task["task_name"]
try:
level = int(gold_results[val_or_test][task_name]["Level"])
score = question_scorer(task, gold_results[val_or_test][task_name])
except KeyError:
return format_error(f"{task_name} not found in split {val_or_test}. Are you sure you submitted the correct file?")
scored_file.write(
json.dumps({
"id": task_name,
"final_answer": answer,
"score": score,
"level": level
}) + "\n"
)
num[level] += 1
completion_level[level] += score[0]
expertise[level] += score[1]
reasoning[level] += score[2]
comprehension[level] += score[3]
num['all'] += 1
completion_level['all'] += score[0]
expertise['all'] += score[1]
reasoning['all'] += score[2]
comprehension['all'] += score[3]
if score[0] == 10:
success_rate[level] += 1
success_rate['all'] += 1
for key in LEVELS:
success_rate[key] = round_and_pad(success_rate[key] / num[key] / 100)
completion_level[key] = round_and_pad(completion_level[key] / num[key] / 1000)
expertise[key] = round_and_pad(expertise[key] / num[key])
reasoning[key] = round_and_pad(reasoning[key] / num[key])
comprehension[key] = round_and_pad(comprehension[key] / num[key])
print(success_rate, completion_level, expertise, reasoning, comprehension)
# Save submitted file
api.upload_file(
repo_id=SUBMISSION_DATASET,
path_or_fileobj=path_to_file.name,
path_in_repo=f"{organisation}/{model}/{dataset_version}_{val_or_test}_raw_{datetime.datetime.today()}.jsonl",
repo_type="dataset",
token=TOKEN
)
# Save scored file
api.upload_file(
repo_id=SUBMISSION_DATASET,
path_or_fileobj=f"scored/{organisation}_{model}.jsonl",
path_in_repo=f"{organisation}/{model}/{dataset_version}_{val_or_test}_scored_{datetime.datetime.today()}.jsonl",
repo_type="dataset",
token=TOKEN
)
# Actual submission
eval_entry = {
"model": model,
"model_family": model_family,
"url": url,
"organisation": organisation,
"success_rate": success_rate["all"],
"completion_level": completion_level["all"],
"expertise": expertise["all"],
"reasoning": reasoning["all"],
"comprehension": comprehension["all"]
}
eval_results[dataset_version][val_or_test] = eval_results[dataset_version][val_or_test].add_item(eval_entry)
eval_results[dataset_version].push_to_hub(RESULTS_DATASET, config_name=dataset_version, token=TOKEN)
contact_info = {
"model": model,
"model_family": model_family,
"url": url,
"organisation": organisation,
"mail": mail,
}
contact_infos[val_or_test] = contact_infos[val_or_test].add_item(contact_info)
contact_infos.push_to_hub(CONTACT_DATASET, config_name=YEAR_VERSION, token=TOKEN)
return format_log(
f"Model {model} submitted by {organisation} successfully. \nPlease refresh the leaderboard, and wait a bit to see the score displayed")
def refresh():
eval_results = {}
for dataset_version in all_version:
eval_results[dataset_version] = load_dataset(
RESULTS_DATASET,
dataset_version,
token=TOKEN,
download_mode="force_redownload",
verification_mode="no_checks"
)
new_eval_dataframe = {}
new_leaderboard_tables = []
for dataset_version in all_version:
new_eval_dataframe[dataset_version] = get_dataframe_from_results(
eval_results=eval_results[dataset_version],
split="validation"
)
new_leaderboard_tables.append(new_eval_dataframe[dataset_version])
if len(new_leaderboard_tables) == 1:
return new_leaderboard_tables[0]
else:
return new_leaderboard_tables
def upload_file(files):
file_paths = [file.name for file in files]
return file_paths
demo = gr.Blocks()
with demo:
gr.HTML(TITLE)
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text")
with gr.Row():
with gr.Accordion("📙 Citation", open=False):
citation_button = gr.Textbox(
value=CITATION_BUTTON_TEXT,
label=CITATION_BUTTON_LABEL,
elem_id="citation-button",
) # .style(show_copy_button=True)
leaderboard_tables = []
for dataset_version in all_version:
with gr.Tab(dataset_version):
leaderboard_tables.append(
gr.components.Dataframe(
value=eval_dataframe[dataset_version], datatype=TYPES, interactive=False,
column_widths=["20%"]
)
)
refresh_button = gr.Button("Refresh")
refresh_button.click(
refresh,
inputs=[],
outputs=leaderboard_tables,
)
with gr.Accordion("Submit a new model for evaluation"):
with gr.Row():
with gr.Column():
level_of_test = gr.Radio(all_version, value=all_version[0], label="dataset_version")
model_name_textbox = gr.Textbox(label="Model name", value='')
model_family_textbox = gr.Textbox(label="Model family", value='')
url_textbox = gr.Textbox(label="Url to model information", value='')
with gr.Column():
organisation = gr.Textbox(label="Organisation", value='')
mail = gr.Textbox(
label="Contact email (will be stored privately, & used if there is an issue with your submission)",
value='')
file_output = gr.File()
submit_button = gr.Button("Submit Eval")
submission_result = gr.Markdown()
submit_button.click(
add_new_eval,
[
level_of_test,
model_name_textbox,
model_family_textbox,
url_textbox,
file_output,
organisation,
mail
],
submission_result,
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=3600)
scheduler.start()
demo.launch(debug=True)