|
import json |
|
import logging |
|
import os |
|
from tempfile import TemporaryDirectory |
|
from typing import Dict, List, Optional |
|
|
|
import jsonlines |
|
from huggingface_hub import CommitOperationAdd |
|
from huggingface_hub import Discussion, HfApi, HfFileSystem |
|
from tqdm import tqdm |
|
|
|
from .evaluation import METRICS |
|
from .formatting import styled_error, styled_message, styled_warning |
|
from .tasks import TASKS_PRETTY_REVERSE |
|
|
|
|
|
class AlreadyExists(Exception): |
|
pass |
|
|
|
|
|
class SubmissionUploader: |
|
"""Class for adding new files to a dataset on a Hub and opening a PR. |
|
|
|
Heavily influenced by these amazing spaces: |
|
* https://huggingface.co/spaces/safetensors/convert |
|
* https://huggingface.co/spaces/gaia-benchmark/leaderboard |
|
""" |
|
|
|
def __init__(self, dataset_id: str): |
|
self._api = HfApi(token=os.environ["HF_TOKEN"]) |
|
self._fs = HfFileSystem(token=os.environ["HF_TOKEN"]) |
|
self._dataset_id = dataset_id |
|
|
|
def _get_previous_pr(self, pr_title: str) -> Optional[Discussion]: |
|
"""Searches among discussions of dataset repo for a PR with the given title.""" |
|
try: |
|
discussions = self._api.get_repo_discussions( |
|
repo_id=self._dataset_id, repo_type="dataset" |
|
) |
|
except Exception: |
|
return None |
|
for discussion in discussions: |
|
if ( |
|
discussion.status == "open" |
|
and discussion.is_pull_request |
|
and discussion.title == pr_title |
|
): |
|
return discussion |
|
return None |
|
|
|
def _get_metadata( |
|
self, |
|
model_name_pretty: str, |
|
model_availability: str, |
|
urls: str, |
|
context_size: str, |
|
submitted_by: str, |
|
) -> Dict[str, str]: |
|
return { |
|
"model_name": model_name_pretty, |
|
"model_availability": model_availability, |
|
"urls": urls, |
|
"context_size": context_size, |
|
"submitted_by": submitted_by, |
|
} |
|
|
|
def _upload_predictions( |
|
self, |
|
task_id: str, |
|
model_folder: str, |
|
filenames: List[str], |
|
) -> List[CommitOperationAdd]: |
|
commit_operations = [ |
|
CommitOperationAdd( |
|
path_in_repo=f"{task_id}/predictions/{model_folder}/{os.path.basename(filename)}", |
|
path_or_fileobj=filename, |
|
) |
|
for filename in filenames |
|
] |
|
return commit_operations |
|
|
|
def _compute_metrics_for_predictions( |
|
self, task_id: str, filenames: Optional[List[str]], temp_directory: str |
|
) -> None: |
|
metrics_module = METRICS[task_id] |
|
assert ( |
|
metrics_module is not None |
|
), f"Computing metrics for {task_id} is not supported." |
|
metrics_module.reset() |
|
open(os.path.join(temp_directory, "metrics.jsonl"), "w").close() |
|
|
|
|
|
for filename in filenames: |
|
with jsonlines.open(filename, "r") as reader: |
|
for example in tqdm( |
|
reader, desc=f"Computing metrics for {os.path.basename(filename)}" |
|
): |
|
metrics_module.add_batch( |
|
predictions=[example["prediction"]], |
|
references=[example["reference"]], |
|
) |
|
computed_metrics = metrics_module.compute() |
|
metrics_module.reset() |
|
with jsonlines.open( |
|
os.path.join(temp_directory, "metrics.jsonl"), "a" |
|
) as writer: |
|
writer.write(computed_metrics) |
|
|
|
|
|
with jsonlines.open( |
|
os.path.join(temp_directory, "metrics.jsonl"), "r" |
|
) as reader: |
|
metrics_results = [line for line in reader] |
|
final_metrics_results = { |
|
key: sum(entry[key] for entry in metrics_results) / len(metrics_results) |
|
for key in metrics_results[0] |
|
} |
|
with open(os.path.join(temp_directory, "final_metrics.json"), "w") as f: |
|
json.dump(final_metrics_results, f) |
|
|
|
def _upload_results( |
|
self, |
|
task_id: str, |
|
model_folder: str, |
|
model_name_pretty: str, |
|
model_availability: str, |
|
urls: str, |
|
context_size: str, |
|
submitted_by: str, |
|
temp_directory: str, |
|
) -> List[CommitOperationAdd]: |
|
final_results = {} |
|
with open(os.path.join(temp_directory, "final_metrics.json"), "r") as f: |
|
metrics = json.load(f) |
|
final_results.update(metrics) |
|
metadata_dict = self._get_metadata( |
|
model_name_pretty=model_name_pretty, |
|
model_availability=model_availability, |
|
urls=urls, |
|
context_size=context_size, |
|
submitted_by=submitted_by, |
|
) |
|
final_results.update(metadata_dict) |
|
|
|
with jsonlines.open( |
|
os.path.join(temp_directory, "final_results.jsonl"), "w" |
|
) as writer: |
|
writer.write(final_results) |
|
|
|
return [ |
|
CommitOperationAdd( |
|
path_in_repo=f"{task_id}/results/{model_folder}.jsonl", |
|
path_or_fileobj=os.path.join(temp_directory, "final_results.jsonl"), |
|
) |
|
] |
|
|
|
def _verify_arguments( |
|
self, |
|
model_folder: str, |
|
model_name_pretty: str, |
|
model_availability: str, |
|
urls: str, |
|
context_size: str, |
|
submitted_by: str, |
|
filenames: Optional[List[str]], |
|
): |
|
assert ( |
|
model_folder |
|
), "Please, specify non-empty name for a directory with a model's results." |
|
assert model_name_pretty, "Please, specify non-empty name for a model." |
|
assert ( |
|
model_availability |
|
), "Please, specify non-empty information about a model's availability." |
|
assert ( |
|
context_size |
|
), "Please, specify non-empty information about a model's context size." |
|
try: |
|
_ = int(context_size) |
|
except: |
|
raise ValueError( |
|
"Please, specify a model's context size as an integer (e.g., 16000)." |
|
) |
|
|
|
assert ( |
|
submitted_by |
|
), "Please, specify non-empty information about a submission's author(s)." |
|
assert filenames, "Please, attach at least one file with predictions." |
|
|
|
def upload_files( |
|
self, |
|
task_pretty: str, |
|
model_folder: str, |
|
model_name_pretty: str, |
|
model_availability: str, |
|
urls: str, |
|
context_size: str, |
|
submitted_by: str, |
|
filenames: Optional[List[str]], |
|
force: bool = False, |
|
) -> str: |
|
try: |
|
self._verify_arguments( |
|
model_folder=model_folder, |
|
model_name_pretty=model_name_pretty, |
|
model_availability=model_availability, |
|
urls=urls, |
|
context_size=context_size, |
|
submitted_by=submitted_by, |
|
filenames=filenames, |
|
) |
|
|
|
pr_title = f"π New submission to {task_pretty} task: {model_name_pretty} with {context_size} context size from {submitted_by}" |
|
|
|
task_id = TASKS_PRETTY_REVERSE[task_pretty] |
|
|
|
logging.info("Checking if this request is already submitted...") |
|
if not force: |
|
if model_name_pretty in self._fs.ls( |
|
f"datasets/{self._dataset_id}/{task_id}/predictions" |
|
) and all( |
|
filename |
|
in self._fs.ls( |
|
f"datasets/{self._dataset_id}/{task_id}/predictions/{model_name_pretty}" |
|
) |
|
for filename in filenames + ["metadata.json"] |
|
): |
|
return styled_warning( |
|
f"{model_name_pretty} is already present in {self._dataset_id}." |
|
) |
|
|
|
prev_pr = self._get_previous_pr(pr_title) |
|
if prev_pr is not None: |
|
url = f"https://huggingface.co/datasets/{self._dataset_id}/discussions/{prev_pr.num}" |
|
return styled_warning( |
|
f"{self._dataset_id} already has an open PR for this submission: {url}." |
|
) |
|
|
|
logging.info("Processing predictions...") |
|
predictions_commit_operations = self._upload_predictions( |
|
task_id=task_id, |
|
model_folder=model_folder, |
|
filenames=filenames, |
|
) |
|
|
|
with TemporaryDirectory() as d: |
|
logging.info("Computing metrics...") |
|
self._compute_metrics_for_predictions( |
|
task_id=task_id, filenames=filenames, temp_directory=str(d) |
|
) |
|
|
|
logging.info("Processing results...") |
|
results_commit_operations = self._upload_results( |
|
task_id=task_id, |
|
model_folder=model_folder, |
|
model_name_pretty=model_name_pretty, |
|
model_availability=model_availability, |
|
urls=urls, |
|
context_size=context_size, |
|
submitted_by=submitted_by, |
|
temp_directory=str(d), |
|
) |
|
|
|
logging.info("Creating commit...") |
|
new_pr = self._api.create_commit( |
|
repo_id=self._dataset_id, |
|
operations=predictions_commit_operations + results_commit_operations, |
|
commit_message=pr_title, |
|
commit_description=f"""New submission to {task_pretty} task in ποΈ Long Code Arena benchmark! |
|
|
|
* Model name: {model_name_pretty} |
|
* Model availability: {model_availability} |
|
* Context Size: {context_size} |
|
* Relevant URLs: {urls} |
|
* Submitted By: {submitted_by} |
|
""", |
|
create_pr=True, |
|
repo_type="dataset", |
|
) |
|
return styled_message(f"π PR created at {new_pr.pr_url}.") |
|
|
|
except Exception as e: |
|
logging.exception(e) |
|
if str(e): |
|
return styled_error(str(e)) |
|
return styled_error("An exception occured.") |
|
|