Spaces:
Runtime error
Runtime error
import os | |
import zipfile | |
import json | |
from pathlib import Path | |
import gradio as gr | |
import pandas as pd | |
from email_validator import validate_email, EmailNotValidError | |
from huggingface_hub import HfApi, HfFileSystem | |
from utils import ( | |
INTRODUCTION_TEXT, | |
TITLE, | |
format_error, | |
format_log, | |
format_warning, | |
model_hyperlink, | |
read_jsonl, | |
) | |
TOKEN = os.environ.get("TOKEN", None) | |
OWNER="Skywork" | |
REAL_WORLD_RESULTS_FILE = f"hf://datasets/{OWNER}/submitted_results/leaderboard/real_world_result.parquet" | |
GUI_GROUNDING_RESULTS_FILE = f"hf://datasets/{OWNER}/submitted_results/leaderboard/gui_grounding_result.parquet" | |
SUBMISSION_DATASET = f"{OWNER}/submitted_results" | |
GROUNDING_FOLDER = f"hf://datasets/{OWNER}/agent-studio-data/grounding" | |
class ScoreManager: | |
def __init__(self) -> None: | |
self.eval_results : pd.DataFrame | |
self.display_eval_results : pd.DataFrame | |
self.grounding_results: pd.DataFrame | |
self.display_grounding_results: pd.DataFrame | |
self.api = HfApi(token=TOKEN) | |
self.fs = HfFileSystem(token=TOKEN) | |
self.refresh() | |
def calc_real_task_scores(base_path: Path): | |
apps = ["filesystem", "google", "GUI"] | |
scores_per_app = {} | |
for app in apps: | |
if app == "google": | |
data = [] | |
try: | |
data += read_jsonl((base_path / "gcalendar.jsonl").as_posix()) | |
data += read_jsonl((base_path / "gmail.jsonl").as_posix()) | |
data += read_jsonl((base_path / "gdocs.jsonl").as_posix()) | |
except FileNotFoundError: | |
print("No google data found") | |
continue | |
elif app == "filesystem": | |
try: | |
data = read_jsonl((base_path / "filesystem.jsonl").as_posix()) | |
except FileNotFoundError: | |
print("No filesystem data found") | |
continue | |
elif app == "GUI": | |
data = [] | |
try: | |
data += read_jsonl((base_path / "desktop_hard.jsonl").as_posix()) | |
data += read_jsonl((base_path / "vscode.jsonl").as_posix()) | |
except FileNotFoundError: | |
print("No GUI data found") | |
continue | |
else: | |
raise ValueError("Invalid app") | |
scores = [entry["score"] for entry in data] | |
tp = 0 | |
fp = 0 | |
tn = 0 | |
fn = 0 | |
for entry in data: | |
if entry["score"] > 0: | |
if entry["self_eval"]["score"] > 0: | |
tp += 1 | |
else: | |
fp += 1 | |
else: | |
if entry["self_eval"]["score"] > 0: | |
fn += 1 | |
else: | |
tn += 1 | |
score = round(sum(scores) / len(scores) * 100, 1) | |
accuracy = round((tp + tn) / (tp + tn + fp + fn) * 100, 1) | |
# print(f"Average score: {score}") | |
# print(f"Total tasks: {tp + fp + tn + fn}") | |
# print(f"True positive: {tp}") | |
# print(f"False positive: {fp}") | |
# print(f"True negative: {tn}") | |
# print(f"False negative: {fn}") | |
# print(f"Accuracy: {accuracy}\n") | |
scores_per_app[app] = { | |
"score": score, | |
"success_tasks": tp + fp, | |
"total_tasks": tp + fp + tn + fn, | |
"accuracy": accuracy, | |
} | |
return scores_per_app | |
def calc_gui_grounding_scores(self, base_path: Path): | |
def calc_per_app_grounding_scores(result_dict, task_configs): | |
total_tasks = len(result_dict) | |
task_ids = set([task_config["task_id"] for task_config in task_configs]) | |
success = 0 | |
for result in result_dict: | |
if result["task_id"] not in task_ids: | |
raise ValueError(f"Task id {result['task_id']} not found!") | |
if result["score"] == 1.0: | |
success += 1 | |
return { | |
"score": success / total_tasks * 100, | |
"total_tasks": total_tasks, | |
"success_tasks": success, | |
} | |
scores_per_os = {} | |
for os in base_path.iterdir(): | |
if not os.is_dir(): | |
continue | |
try: | |
scores_per_app = {} | |
for app in os.iterdir(): | |
if not app.is_dir(): | |
continue | |
with self.fs.open( | |
f"{GROUNDING_FOLDER}/{app.relative_to(base_path).as_posix()}/actions.jsonl", | |
"r" | |
) as f: | |
task_configs = read_jsonl(f) | |
results_dict = read_jsonl((base_path / os / app / "results.jsonl").as_posix()) | |
results = calc_per_app_grounding_scores(results_dict, task_configs) | |
scores_per_app[app.name] = results | |
scores_per_os[os.name] = scores_per_app | |
except FileNotFoundError: | |
print(f"No data found for {os.name}") | |
continue | |
return scores_per_os | |
def to_displayed_table(df: pd.DataFrame): | |
df_display = df.copy() | |
df_display['model'] = df_display.apply( | |
lambda row: model_hyperlink(row['url'], row['model']) if \ | |
(row['url'] != "") and (not pd.isna(row['url'])) \ | |
else row['model'], | |
axis=1 | |
) | |
df_display = df_display.drop(columns=["url", "organization"]) | |
df_display = df_display.sort_values(by="Average (%) ⬆️", ascending=False) | |
df_display = df_display.map(lambda x: round(x, 2) if isinstance(x, float) else x) | |
return df_display | |
def refresh(self): | |
try: | |
with self.fs.open(REAL_WORLD_RESULTS_FILE, "rb") as f: | |
self.eval_results = pd.read_parquet(f) | |
except FileNotFoundError: | |
self.eval_results = pd.DataFrame( | |
columns=["model", "agent_type", "Average (%) ⬆️", "filesystem (%)", "google (%)", "GUI (%)", "organization", "url", "model_family"] | |
) | |
try: | |
with self.fs.open(GUI_GROUNDING_RESULTS_FILE, "rb") as f: | |
self.grounding_results = pd.read_parquet(f) | |
except FileNotFoundError: | |
self.grounding_results = pd.DataFrame( | |
columns=["model", "agent_type", "Average (%) ⬆️", "windows (%)", "linux (%)", "macos (%)", "organization", "url", "model_family"] | |
) | |
self.display_eval_results = self.to_displayed_table(self.eval_results) | |
self.display_grounding_results = self.to_displayed_table(self.grounding_results) | |
return self.display_eval_results, self.display_grounding_results | |
def add_new_eval( | |
self, | |
dataset_selection: str, | |
model_name: str, | |
model_family: str, | |
agent_type: str, | |
url: str, | |
uploaded_file_path: str, | |
organization: str, | |
mail: str, | |
): | |
# Mandatory fields | |
if model_name == "": | |
return format_error("Model name cannot be empty") | |
elif model_family == "": | |
return format_error("Model family cannot be empty") | |
elif agent_type == "": | |
return format_error("Agent type cannot be empty") | |
elif organization == "": | |
return format_error("Organization cannot be empty") | |
elif mail == "": | |
return format_error("Mail cannot be empty") | |
elif uploaded_file_path == "": | |
return format_error("File cannot be empty") | |
# Check if the model has been already submitted | |
if dataset_selection == "Real-world tasks": | |
if model_name.lower() in set([m.lower() for m in self.eval_results["model"]]) \ | |
and organization.lower() in set([l.lower() for l in self.eval_results["organization"]]): | |
return format_warning("This model has been already submitted.") | |
else: | |
if model_name.lower() in set([m.lower() for m in self.grounding_results["model"]]) \ | |
and organization.lower() in set([l.lower() for l in self.grounding_results["organization"]]): | |
return format_warning("This model has been already submitted.") | |
# Check if the email is valid | |
try: | |
validate_email(mail, check_deliverability=True) | |
except EmailNotValidError as e: | |
return format_error(f"Invalid email") | |
if url == "": | |
url = None | |
self.refresh() | |
try: | |
file_path = Path(uploaded_file_path) | |
results_folder_path = file_path.parent / model_name | |
with zipfile.ZipFile(file_path, 'r') as zip_file: | |
zip_file.extractall(results_folder_path) | |
print(results_folder_path) | |
contact_info = { | |
"model": model_name, | |
"model_family": model_family, | |
"url": url, | |
"organization": organization, | |
"mail": mail, | |
} | |
if dataset_selection == "Real-world tasks": | |
scores = self.calc_real_task_scores(results_folder_path) | |
if scores == {}: | |
return format_error("No data found in the zip file, please make sure the file structure is correct.") | |
eval_entry = { | |
"model": model_name, | |
"model_family": model_family, | |
"agent_type": agent_type, | |
"url": url, | |
"organization": organization, | |
} | |
succ = 0 | |
total = 0 | |
for app, scores in scores.items(): | |
eval_entry[f"{app} (%)"] = scores["score"] | |
succ += scores["success_tasks"] | |
total += scores["total_tasks"] | |
eval_entry["Average (%) ⬆️"] = succ / total * 100 | |
print(eval_entry) | |
self.eval_results = pd.concat( | |
[self.eval_results, pd.DataFrame([eval_entry])], | |
ignore_index=True | |
) | |
self.upload2hub( | |
results_path=REAL_WORLD_RESULTS_FILE, | |
results=self.eval_results, | |
folder_path=results_folder_path, | |
path_in_repo=f"origin/{organization.lower()}/{model_name.lower()}/real_world", | |
contact_info=contact_info, | |
) | |
elif dataset_selection == "GUI grounding tasks": | |
scores = self.calc_gui_grounding_scores(results_folder_path) | |
if scores == {}: | |
return format_error("No data found in the zip file, please make sure the file structure is correct.") | |
print(scores) | |
eval_entry: dict[str, str | float] = { | |
"model": model_name, | |
"model_family": model_family, | |
"agent_type": agent_type, | |
"url": url, | |
"organization": organization, | |
} | |
succ = 0 | |
total = 0 | |
for os, app_scores in scores.items(): | |
succ_per_app = 0 | |
total_per_app = 0 | |
for app, score in app_scores.items(): | |
succ_per_app += score["success_tasks"] | |
total_per_app += score["total_tasks"] | |
succ += succ_per_app | |
total += total_per_app | |
eval_entry[f"{os} (%)"] = succ_per_app / total_per_app * 100 | |
print(f"Total: {total}, Success: {succ}") | |
eval_entry["Average (%) ⬆️"] = succ / total * 100 | |
self.grounding_results = pd.concat( | |
[self.grounding_results, pd.DataFrame([eval_entry])], | |
ignore_index=True | |
) | |
self.upload2hub( | |
results_path=GUI_GROUNDING_RESULTS_FILE, | |
results=self.grounding_results, | |
folder_path=results_folder_path, | |
path_in_repo=f"origin/{organization.lower()}/{model_name.lower()}/grounding", | |
contact_info=contact_info, | |
) | |
else: | |
return format_error("Invalid dataset selection") | |
except Exception as e: | |
return format_error(f"Internal Error: {e}") | |
return format_log("Submitted successfully") | |
def upload2hub( | |
self, | |
results_path: str, | |
results: pd.DataFrame, | |
folder_path: Path, | |
path_in_repo: str, | |
contact_info: str, | |
) -> None: | |
with self.fs.open(results_path, "wb") as f: | |
results.to_parquet(f) | |
with open(folder_path / "contact_info.json", "w") as f: | |
f.write(json.dumps(contact_info)) | |
self.api.upload_folder( | |
folder_path=folder_path, | |
path_in_repo=path_in_repo, | |
repo_id=SUBMISSION_DATASET, | |
repo_type="dataset", | |
) | |
if __name__ == "__main__": | |
score_manager = ScoreManager() | |
iface = gr.Blocks() | |
with iface: | |
gr.HTML(TITLE) | |
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") | |
with gr.Tabs(elem_classes="main_tabs") as main_tabs: | |
with gr.TabItem("🎞️ GUI grounding tasks table", id=0): | |
leaderboard_gui_grounding_table = gr.components.Dataframe( | |
value=score_manager.display_grounding_results, | |
datatype=["str", "str", "number", "number", "number", "number", "str"], | |
interactive=False, | |
column_widths=["20%"] | |
) | |
with gr.TabItem("🌍 Real-world tasks table", id=1): | |
leaderboard_real_world_table = gr.components.Dataframe( | |
value=score_manager.display_eval_results, | |
datatype=["str", "str", "number", "number", "number", "number", "str"], | |
interactive=False, | |
column_widths=["20%"] | |
) | |
refresh_button = gr.Button("Refresh") | |
refresh_button.click( | |
score_manager.refresh, | |
inputs=[], | |
outputs=[ | |
leaderboard_real_world_table, | |
leaderboard_gui_grounding_table, | |
], | |
) | |
with gr.Accordion("Submit a new model for evaluation (field with * are required)"): | |
with gr.Row(): | |
with gr.Column(): | |
dataset_selection = gr.Radio(["GUI grounding tasks", "Real-world tasks"], value="GUI grounding tasks") | |
model_name_textbox = gr.Textbox(label="Model name*") | |
model_family_textbox = gr.Textbox(label="Model family*") | |
agent_type_textbox = gr.Textbox(label="Agent type*") | |
url_textbox = gr.Textbox(label="Url to model information") | |
with gr.Column(): | |
organization = gr.Textbox(label="Organization*") | |
mail = gr.Textbox(label="Contact email* (will be stored privately, & used if there is an issue with your submission)") | |
file_output = gr.File(label="Upload model output* (one zip file)") | |
submit_button = gr.Button("Submit Eval") | |
submission_result = gr.Markdown() | |
submit_button.click( | |
score_manager.add_new_eval, | |
[ | |
dataset_selection, | |
model_name_textbox, | |
model_family_textbox, | |
agent_type_textbox, | |
url_textbox, | |
file_output, | |
organization, | |
], | |
submission_result, | |
) | |
iface.launch() | |