long-code-arena / src /submission_uploader.py
saridormi's picture
Start rendering actual data + minor improvements
9203553
raw history blame
No virus
10.6 kB
import json
import logging
import os
from tempfile import TemporaryDirectory
from typing import Dict, List, Optional
import jsonlines
from huggingface_hub import CommitOperationAdd # type: ignore[import]
from huggingface_hub import Discussion, HfApi, HfFileSystem
from tqdm import tqdm
from .evaluation import METRICS
from .formatting import styled_error, styled_message, styled_warning
from .tasks import TASKS_PRETTY_REVERSE
class AlreadyExists(Exception):
pass
class SubmissionUploader:
"""Class for adding new files to a dataset on a Hub and opening a PR.
Heavily influenced by these amazing spaces:
* https://huggingface.co/spaces/safetensors/convert
* https://huggingface.co/spaces/gaia-benchmark/leaderboard
"""
def __init__(self, dataset_id: str):
self._api = HfApi(token=os.environ["HF_TOKEN"])
self._fs = HfFileSystem(token=os.environ["HF_TOKEN"])
self._dataset_id = dataset_id
def _get_previous_pr(self, pr_title: str) -> Optional[Discussion]:
"""Searches among discussions of dataset repo for a PR with the given title."""
try:
discussions = self._api.get_repo_discussions(
repo_id=self._dataset_id, repo_type="dataset"
)
except Exception:
return None
for discussion in discussions:
if (
discussion.status == "open"
and discussion.is_pull_request
and discussion.title == pr_title
):
return discussion
return None
def _get_metadata(
self,
model_name_pretty: str,
model_availability: str,
urls: str,
context_size: str,
submitted_by: str,
) -> Dict[str, str]:
return {
"model_name": model_name_pretty,
"model_availability": model_availability,
"urls": urls,
"context_size": context_size,
"submitted_by": submitted_by,
}
def _upload_predictions(
self,
task_id: str,
model_folder: str,
filenames: List[str],
) -> List[CommitOperationAdd]:
commit_operations = [
CommitOperationAdd(
path_in_repo=f"{task_id}/predictions/{model_folder}/{os.path.basename(filename)}",
path_or_fileobj=filename,
)
for filename in filenames
]
return commit_operations
def _compute_metrics_for_predictions(
self, task_id: str, filenames: Optional[List[str]], temp_directory: str
) -> None:
metrics_module = METRICS[task_id]
assert (
metrics_module is not None
), f"Computing metrics for {task_id} is not supported."
metrics_module.reset()
open(os.path.join(temp_directory, "metrics.jsonl"), "w").close()
# compute the metrics for each submitted file
for filename in filenames:
with jsonlines.open(filename, "r") as reader:
for example in tqdm(
reader, desc=f"Computing metrics for {os.path.basename(filename)}"
):
metrics_module.add_batch(
predictions=[example["prediction"]],
references=[example["reference"]],
)
computed_metrics = metrics_module.compute()
metrics_module.reset()
with jsonlines.open(
os.path.join(temp_directory, "metrics.jsonl"), "a"
) as writer:
writer.write(computed_metrics)
# aggregate the metrics over submitted files
with jsonlines.open(
os.path.join(temp_directory, "metrics.jsonl"), "r"
) as reader:
metrics_results = [line for line in reader]
final_metrics_results = {
key: sum(entry[key] for entry in metrics_results) / len(metrics_results)
for key in metrics_results[0]
}
with open(os.path.join(temp_directory, "final_metrics.json"), "w") as f:
json.dump(final_metrics_results, f)
def _upload_results(
self,
task_id: str,
model_folder: str,
model_name_pretty: str,
model_availability: str,
urls: str,
context_size: str,
submitted_by: str,
temp_directory: str,
) -> List[CommitOperationAdd]:
final_results = {}
with open(os.path.join(temp_directory, "final_metrics.json"), "r") as f:
metrics = json.load(f)
final_results.update(metrics)
metadata_dict = self._get_metadata(
model_name_pretty=model_name_pretty,
model_availability=model_availability,
urls=urls,
context_size=context_size,
submitted_by=submitted_by,
)
final_results.update(metadata_dict)
with jsonlines.open(
os.path.join(temp_directory, "final_results.jsonl"), "w"
) as writer:
writer.write(final_results)
return [
CommitOperationAdd(
path_in_repo=f"{task_id}/results/{model_folder}.jsonl",
path_or_fileobj=os.path.join(temp_directory, "final_results.jsonl"),
)
]
def _verify_arguments(
self,
task_pretty: str,
model_folder: str,
model_name_pretty: str,
model_availability: str,
urls: str,
context_size: str,
submitted_by: str,
filenames: Optional[List[str]],
):
assert (
task_pretty and task_pretty in TASKS_PRETTY_REVERSE
), "Please, select one of the supported tasks."
assert (
model_folder
), "Please, specify non-empty name for a directory with a model's results."
assert model_name_pretty, "Please, specify non-empty name for a model."
assert (
model_availability
), "Please, specify non-empty information about a model's availability."
assert (
context_size
), "Please, specify non-empty information about a model's context size."
try:
_ = int(context_size)
except:
raise ValueError(
"Please, specify a model's context size as an integer (e.g., 16000)."
)
assert (
submitted_by
), "Please, specify non-empty information about a submission's author(s)."
assert filenames, "Please, attach at least one file with predictions."
def upload_files(
self,
task_pretty: str,
model_folder: str,
model_name_pretty: str,
model_availability: str,
urls: str,
context_size: str,
submitted_by: str,
filenames: Optional[List[str]],
force: bool = False,
) -> str:
try:
self._verify_arguments(
task_pretty=task_pretty,
model_folder=model_folder,
model_name_pretty=model_name_pretty,
model_availability=model_availability,
urls=urls,
context_size=context_size,
submitted_by=submitted_by,
filenames=filenames,
)
pr_title = f"πŸš€ New submission to {task_pretty} task: {model_name_pretty} with {context_size} context size from {submitted_by}"
logging.info(f"Start processing {pr_title}")
task_id = TASKS_PRETTY_REVERSE[task_pretty]
logging.info("Checking if this request has already been submitted...")
if not force:
if model_name_pretty in self._fs.ls(
f"datasets/{self._dataset_id}/{task_id}/predictions"
) and all(
filename
in self._fs.ls(
f"datasets/{self._dataset_id}/{task_id}/predictions/{model_name_pretty}"
)
for filename in filenames + ["metadata.json"]
):
return styled_warning(
f"{model_name_pretty} is already present in {self._dataset_id}."
)
prev_pr = self._get_previous_pr(pr_title)
if prev_pr is not None:
url = f"https://huggingface.co/datasets/{self._dataset_id}/discussions/{prev_pr.num}"
return styled_warning(
f"{self._dataset_id} already has an open PR for this submission: {url}."
)
logging.info("Processing predictions...")
predictions_commit_operations = self._upload_predictions(
task_id=task_id,
model_folder=model_folder,
filenames=filenames,
)
with TemporaryDirectory() as d:
logging.info("Computing metrics...")
self._compute_metrics_for_predictions(
task_id=task_id, filenames=filenames, temp_directory=str(d)
)
logging.info("Processing results...")
results_commit_operations = self._upload_results(
task_id=task_id,
model_folder=model_folder,
model_name_pretty=model_name_pretty,
model_availability=model_availability,
urls=urls,
context_size=context_size,
submitted_by=submitted_by,
temp_directory=str(d),
)
logging.info("Creating commit...")
new_pr = self._api.create_commit(
repo_id=self._dataset_id,
operations=predictions_commit_operations
+ results_commit_operations,
commit_message=pr_title,
commit_description=f"""New submission to {task_pretty} task in 🏟️ Long Code Arena benchmark!\n* Model name: {model_name_pretty}\n* Model availability: {model_availability}\n* Context Size: {context_size}\n* Relevant URLs: {urls}\n* Submitted By: {submitted_by}""",
create_pr=True,
repo_type="dataset",
)
return styled_message(f"πŸŽ‰ PR created at {new_pr.pr_url}.")
except Exception as e:
logging.exception(e)
if str(e):
return styled_error(f"An exception occurred. Please, try again.\n{e}")
return styled_error("An exception occurred. Please, try again.")