long-code-arena / src /submission_uploader.py
saridormi's picture
Display urls on the leaderboard, tweak interface a little and fix CMG descriptions
04f40cd
raw
history blame
14.2 kB
import json
import logging
import os
import re
import time
from tempfile import TemporaryDirectory
from typing import List, Optional
import jsonlines
from huggingface_hub import CommitOperationAdd # type: ignore[import]
from huggingface_hub import Discussion, HfApi, HfFileSystem
from tqdm import tqdm
from .evaluation import METRICS
from .formatting import styled_error, styled_message, styled_warning
from .tasks_content import TASKS_PRETTY_REVERSE
from .utils import MD_LINK_PATTERN
class AlreadyExists(Exception):
pass
class SubmissionUploader:
"""Class for adding new files to a dataset on a Hub and opening a PR.
Heavily influenced by these amazing spaces:
* https://huggingface.co/spaces/safetensors/convert
* https://huggingface.co/spaces/gaia-benchmark/leaderboard
* https://huggingface.co/spaces/HuggingFaceH4/open_llm_leaderboard
"""
def __init__(self, dataset_id: str, private_dataset_id: str):
self._api = HfApi(token=os.environ["HF_TOKEN"])
self._fs = HfFileSystem(token=os.environ["HF_TOKEN"])
self._results_dataset_id = dataset_id
self._requests_dataset_id = private_dataset_id
def _get_previous_pr(self, pr_title: str) -> Optional[Discussion]:
"""Searches among discussions of the results dataset for a PR with the given title."""
try:
discussions = self._api.get_repo_discussions(repo_id=self._results_dataset_id, repo_type="dataset")
except Exception:
return None
for discussion in discussions:
if discussion.status == "open" and discussion.is_pull_request and discussion.title == pr_title:
return discussion
return None
def _upload_request(
self,
task_id: str,
model_folder: str,
model_name_pretty: str,
model_availability: str,
model_url: Optional[str],
urls: Optional[str],
context_size: str,
submitted_by: str,
contact_information: str,
comment: Optional[str],
pr_url: str,
temp_directory: str,
) -> List[CommitOperationAdd]:
"""Adds a file with metadata about the current request to the requests dataset."""
request_metadata = {
"model_folder": model_folder,
"model_name_pretty": model_name_pretty,
"model_availability": model_availability,
"model_url": model_url,
"urls": urls,
"context_size": context_size,
"submitted_by": submitted_by,
"contact_information": contact_information,
"comment": comment,
"timestamp": time.time(),
"pr_url": pr_url,
}
with open(os.path.join(temp_directory, "request_metadata.json"), "w") as f:
json.dump(request_metadata, f)
num_requests_already_present = (
len(self._fs.ls(f"datasets/{self._requests_dataset_id}/{task_id}/"))
if self._fs.isdir(f"datasets/{self._requests_dataset_id}/{task_id}/")
else 0
)
commit_operations = [
CommitOperationAdd(
path_in_repo=f"{task_id}/{num_requests_already_present}_{model_folder}.json",
path_or_fileobj=os.path.join(temp_directory, "request_metadata.json"),
)
]
return commit_operations
def _upload_predictions(
self,
task_id: str,
model_folder: str,
filenames: List[str],
) -> List[CommitOperationAdd]:
"""Adds all files with current model's predictions to the results dataset."""
commit_operations = [
CommitOperationAdd(
path_in_repo=f"{task_id}/predictions/{model_folder}/{os.path.basename(filename)}",
path_or_fileobj=filename,
)
for filename in filenames
]
return commit_operations
def _compute_metrics_for_predictions(self, task_id: str, filenames: List[str], temp_directory: str) -> None:
"""Computes metrics for each submitted file with the current model's predictions."""
metrics_module = METRICS[task_id]
assert metrics_module is not None, f"Computing metrics for {task_id} is not supported."
metrics_module.reset()
open(os.path.join(temp_directory, "metrics.jsonl"), "w").close()
# compute the metrics for each submitted file
for filename in filenames:
with jsonlines.open(filename, "r") as reader:
for example in tqdm(reader, desc=f"Computing metrics for {os.path.basename(filename)}"):
metrics_module.add_batch(
predictions=[example["prediction"]],
references=[example["reference"]],
)
computed_metrics = metrics_module.compute()
metrics_module.reset()
with jsonlines.open(os.path.join(temp_directory, "metrics.jsonl"), "a") as writer:
writer.write(computed_metrics)
# aggregate the metrics over submitted files
with jsonlines.open(os.path.join(temp_directory, "metrics.jsonl"), "r") as reader:
metrics_results = [line for line in reader]
final_metrics_results = {
key: sum(entry[key] for entry in metrics_results) / len(metrics_results) for key in metrics_results[0]
}
with open(os.path.join(temp_directory, "final_metrics.json"), "w") as f:
json.dump(final_metrics_results, f)
def _upload_results(
self,
task_id: str,
model_folder: str,
model_name_pretty: str,
model_availability: str,
model_url: Optional[str],
urls: Optional[str],
context_size: str,
submitted_by: str,
temp_directory: str,
) -> List[CommitOperationAdd]:
"""Adds files with the current model's metrics values to the results dataset."""
final_results = {}
with open(os.path.join(temp_directory, "final_metrics.json"), "r") as f:
metrics = json.load(f)
final_results.update(metrics)
final_results.update(
{
"model_name": model_name_pretty,
"model_availability": model_availability,
"model_url": model_url,
"urls": urls,
"context_size": context_size,
"submitted_by": submitted_by,
}
)
with jsonlines.open(os.path.join(temp_directory, "final_results.jsonl"), "w") as writer:
writer.write(final_results)
return [
CommitOperationAdd(
path_in_repo=f"{task_id}/results/{model_folder}.jsonl",
path_or_fileobj=os.path.join(temp_directory, "final_results.jsonl"),
)
]
def _verify_arguments(
self,
task_pretty: str,
model_folder: str,
model_name_pretty: str,
model_availability: str,
model_url: Optional[str],
urls: Optional[str],
context_size: str,
submitted_by: str,
contact_information: str,
comment: Optional[str],
filenames: Optional[List[str]],
):
"""Verifies that all necessary arguments are not None (and also runs other sanity checks)."""
assert task_pretty and task_pretty in TASKS_PRETTY_REVERSE, "Please, select one of the supported tasks."
assert model_folder, "Please, specify non-empty name for a directory with a model's results."
assert model_name_pretty, "Please, specify non-empty name for a model."
assert model_availability, "Please, specify non-empty information about a model's availability."
assert context_size, "Please, specify non-empty information about a model's context size."
try:
_ = int(context_size)
except:
raise ValueError("Please, specify a model's context size as an integer (e.g., 16000).")
if urls is not None and "," in urls:
urls_list = urls.split(",")
assert all(
re.match(rf"^{MD_LINK_PATTERN}$", url.strip()) for url in urls_list
), 'Please, use the following format for URLs: "[text1](link1), [text2](link2)"'
assert submitted_by, "Please, specify non-empty information about a submission's author(s)."
assert filenames, "Please, attach at least one file with predictions."
assert contact_information, "Please, fill in the field with contact information."
def upload_files(
self,
task_pretty: str,
model_folder: str,
model_name_pretty: str,
model_availability: str,
model_url: Optional[str],
urls: Optional[str],
context_size: str,
submitted_by: str,
contact_information: str,
comment: Optional[str],
filenames: Optional[List[str]],
force: bool = False,
) -> str:
try:
self._verify_arguments(
task_pretty=task_pretty,
model_folder=model_folder,
model_name_pretty=model_name_pretty,
model_availability=model_availability,
model_url=model_url,
urls=urls,
context_size=context_size,
submitted_by=submitted_by,
contact_information=contact_information,
comment=comment,
filenames=filenames,
)
pr_title = f"πŸš€ New submission to {task_pretty} task: {model_name_pretty} with {context_size} context size from {submitted_by}"
logging.info(f"Start processing {pr_title}")
task_id = TASKS_PRETTY_REVERSE[task_pretty]
logging.info("Checking if this request has already been submitted...")
if not force:
if self._fs.isdir(f"datasets/{self._results_dataset_id}/{task_id}/predictions/{model_folder}"):
return styled_warning(
f"{model_folder} is already present in {self._results_dataset_id}, please, select another folder name."
)
prev_pr = self._get_previous_pr(pr_title)
if prev_pr is not None:
url = f"https://huggingface.co/datasets/{self._results_dataset_id}/discussions/{prev_pr.num}"
return styled_warning(
f"{self._results_dataset_id} already has an open PR for this submission: {url}."
)
logging.info("Processing predictions...")
predictions_commit_operations = self._upload_predictions(
task_id=task_id,
model_folder=model_folder,
filenames=filenames,
)
with TemporaryDirectory() as d:
logging.info("Computing metrics...")
self._compute_metrics_for_predictions(task_id=task_id, filenames=filenames, temp_directory=str(d))
logging.info("Processing results...")
results_commit_operations = self._upload_results(
task_id=task_id,
model_folder=model_folder,
model_name_pretty=model_name_pretty,
model_availability=model_availability,
model_url=model_url,
urls=urls,
context_size=context_size,
submitted_by=submitted_by,
temp_directory=str(d),
)
logging.info("Creating commit to the results dataset...")
new_pr = self._api.create_commit(
repo_id=self._results_dataset_id,
operations=predictions_commit_operations + results_commit_operations,
commit_message=pr_title,
commit_description=f"""New submission to {task_pretty} task in 🏟️ Long Code Arena benchmark!\n* Model name: {model_name_pretty}\n* Model availability: {model_availability}\n* Context Size: {context_size}\n* Relevant URLs: {urls}\n* Submitted By: {submitted_by}""",
create_pr=True,
repo_type="dataset",
)
logging.info("Creating commit to the requests dataset...")
request_commit_operations = self._upload_request(
task_id=task_id,
model_folder=model_folder,
temp_directory=str(d),
model_name_pretty=model_name_pretty,
model_availability=model_availability,
model_url=model_url,
urls=urls,
context_size=context_size,
submitted_by=submitted_by,
contact_information=contact_information,
comment=comment,
pr_url=new_pr.pr_url,
)
self._api.create_commit(
repo_id=self._requests_dataset_id,
operations=request_commit_operations,
commit_message=pr_title,
commit_description=f"""New submission to {task_pretty} task in 🏟️ Long Code Arena benchmark!\n* Model name: {model_name_pretty}\n* Model availability: {model_availability}\n* Context Size: {context_size}\n* Relevant URLs: {urls}\n* Submitted By: {submitted_by}\n* PR: {new_pr.pr_url}\n* Contact information: {contact_information}\n* Comment: {comment}""",
create_pr=True,
repo_type="dataset",
)
return styled_message(f"πŸŽ‰ PR created at {new_pr.pr_url}.")
except Exception as e:
exception_msg = str(e)
if exception_msg and os.environ["PRIVATE_DATASET_ID"] in exception_msg:
exception_msg = exception_msg.replace(os.environ["PRIVATE_DATASET_ID"], "{private_dataset}")
if exception_msg:
return styled_error(exception_msg)
return styled_error("An exception occurred. Please, try again.")