Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import threading | |
import time | |
from datetime import datetime, timezone, timedelta | |
import os | |
import requests | |
import pandas as pd | |
from datasets import Dataset, get_dataset_config_names | |
from datasets.exceptions import DatasetNotFoundError | |
from pandas.api.types import is_integer_dtype | |
import gradio as gr | |
from src.datamodel.data import F1Data | |
from src.display.formatting import styled_error, styled_message | |
from src.display.utils import ModelType | |
from src.envs import SUBMISSIONS_REPO, TOKEN | |
from src.logger import get_logger | |
from src.validation.validate import is_submission_file_valid, is_valid | |
logger = get_logger(__name__) | |
MIN_WAIT_TIME_PER_USER_HRS = 24 | |
RATE_LIMIT_WINDOW_HRS = 24 | |
MAX_SUBMISSIONS_PER_WINDOW = 10 | |
submission_lock = threading.Lock() | |
def add_new_solutions( | |
lbdb: F1Data, | |
username: str, | |
user_id: str, | |
system_name: str, | |
org: str, | |
submission_path: str, | |
is_warmup_dataset: bool, | |
ensure_all_present: bool = False, | |
): | |
with submission_lock: | |
try: | |
submitted_ids = get_dataset_config_names(SUBMISSIONS_REPO, token=TOKEN) | |
except (DatasetNotFoundError, FileNotFoundError): | |
submitted_ids = [] | |
if submitted_ids == ["default"]: | |
# means empty dataset | |
submitted_ids = [] | |
logger.info(f"Found {len(submitted_ids)} submissions") | |
# Rate limits: | |
# 1. Users must wait MIN_WAIT_TIME_PER_USER_HRS hours between submissions. | |
# 2. No more than MAX_SUBMISSIONS_PER_WINDOW submissions RATE_LIMIT_WINDOW_HRS hours overall. | |
sub_df = pd.DataFrame.from_dict( | |
{ | |
"submission_id": submitted_ids, | |
"user_id": map(_submission_id_to_user_id, submitted_ids), | |
"timestamp": map(_submission_id_to_timestamp, submitted_ids), | |
} | |
) | |
# Per user limit | |
now = datetime.now(timezone.utc) | |
cutoff_user = now - timedelta(hours=MIN_WAIT_TIME_PER_USER_HRS) | |
user_last_submission_ts = sub_df[sub_df.user_id == user_id].timestamp.max() | |
if pd.notna(user_last_submission_ts) and user_last_submission_ts > cutoff_user: | |
remaining_hrs = (user_last_submission_ts - cutoff_user).total_seconds() / 3600 | |
logger.info(f"{username} must wait {remaining_hrs:.2f} more hours.") | |
return styled_error( | |
f"You must wait {MIN_WAIT_TIME_PER_USER_HRS} hours between submissions. " | |
f"Remaining wait time: {remaining_hrs:.2f} hours" | |
) | |
# Overall limit | |
cutoff_overall = now - timedelta(hours=RATE_LIMIT_WINDOW_HRS) | |
if len(sub_df.timestamp > cutoff_overall) >= MAX_SUBMISSIONS_PER_WINDOW: | |
logger.info( | |
f"Too many submissions in the last {RATE_LIMIT_WINDOW_HRS} hours: {len(sub_df.timestamp > cutoff_overall)}." | |
) | |
return styled_error("The leaderboard has reached its submission capacity for now. Please try again later.") | |
logger.info( | |
f"Adding new submission: {system_name=}, {org=}, and {submission_path=}", | |
) | |
# Double-checking. | |
for val in [system_name, org]: | |
assert is_valid(val) | |
assert is_submission_file_valid(submission_path, is_warmup_dataset=is_warmup_dataset) | |
try: | |
submission_df = pd.read_json(submission_path, lines=True) | |
if ensure_all_present: | |
_validate_all_submissions_present(lbdb=lbdb, pd_ds=submission_df) | |
except Exception: | |
logger.warning("Failed to parse submission DF!", exc_info=True) | |
return styled_error( | |
"An error occurred. Please try again later." | |
) # Use same message as external error. Avoid infoleak. | |
submission_id = f"{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{username}_{user_id}" | |
# Seems good, creating the eval. | |
logger.info(f"Adding new submission: {submission_id}") | |
submission_ts = time.time_ns() | |
def add_info(row): | |
return { | |
**row, | |
"system_name": system_name, | |
"organization": org, | |
"submission_id": submission_id, | |
"submission_ts": submission_ts, | |
"evaluation_id": "", # This will be set later when the evaluation is launched in the backend | |
"evaluation_start_ts": "", # This will be set when the evaluation starts | |
} | |
ds = Dataset.from_pandas(submission_df).map(add_info) | |
with submission_lock: | |
ds.push_to_hub( | |
SUBMISSIONS_REPO, | |
submission_id, | |
private=True, | |
) | |
return styled_message( | |
"Your request has been submitted to the evaluation queue!\n" | |
+ "Results may take up to 24 hours to be processed and shown in the leaderboard." | |
) | |
def fetch_sub_claim(oauth_token: gr.OAuthToken | None) -> dict | None: | |
if oauth_token is None: | |
return None | |
provider = os.getenv("OPENID_PROVIDER_URL") | |
if not provider: | |
return None | |
try: | |
oidc_meta = requests.get(f"{provider}/.well-known/openid-configuration", timeout=5) | |
oidc_meta = oidc_meta.json() | |
userinfo_ep = oidc_meta["userinfo_endpoint"] | |
claims = requests.get(userinfo_ep, headers={"Authorization": f"Bearer {oauth_token.token}"}, timeout=5) | |
logger.info(f"userinfo_endpoint response: status={claims.status_code}\nheaders={dict(claims.headers)}") | |
claims = claims.json() | |
# Typical fields: sub (stable id), preferred_username, name, picture | |
return { | |
"sub": claims.get("sub"), | |
"preferred_username": claims.get("preferred_username"), | |
"name": claims.get("name"), | |
} | |
except Exception as e: | |
logger.warning(f"Failed to fetch user claims: {e}") | |
return None | |
def fetch_user_info(oauth_token: gr.OAuthToken | None) -> dict | None: | |
if oauth_token is None: | |
return None | |
try: | |
headers = {"Authorization": f"Bearer {oauth_token.token}"} | |
logger.info("HEADERS %s", headers) | |
r = requests.get("https://huggingface.co/api/whoami-v2", headers=headers) | |
logger.info("RESP CODE %s", r.status_code) | |
logger.info("RESP content %s", r.text) | |
if r.status_code != 200: | |
return None | |
return r.json() | |
except: | |
logger.exception("Cannot get user info") | |
return None | |
def _validate_all_submissions_present( | |
lbdb: F1Data, | |
pd_ds: pd.DataFrame, | |
): | |
logger.info(f"Validating DS size {len(pd_ds)} columns {pd_ds.columns} set {set(pd_ds.columns)}") | |
expected_cols = ["problem_id", "solution"] | |
if set(pd_ds.columns) != set(expected_cols): | |
return ValueError(f"Expected attributes: {expected_cols}, Got: {pd_ds.columns.tolist()}") | |
if not is_integer_dtype(pd_ds["problem_id"]): | |
return ValueError("problem_id must be str convertible to int") | |
if any(type(v) is not str for v in pd_ds["solution"]): | |
return ValueError("solution must be of type str") | |
submitted_ids = set(pd_ds.problem_id.astype(str)) | |
if submitted_ids != lbdb.code_problem_ids: | |
missing = lbdb.code_problem_ids - submitted_ids | |
unknown = submitted_ids - lbdb.code_problem_ids | |
raise ValueError(f"Mismatched problem IDs: {len(missing)} missing, {len(unknown)} unknown") | |
if len(pd_ds) > len(lbdb.code_problem_ids): | |
return ValueError("Duplicate problem IDs exist in uploaded file") | |
def _submission_id_to_user_id(submission_id: str) -> str: | |
""" | |
Extracts the user ID from the submission ID: "YYYYMMDD_HHMMSS_username_userid" | |
""" | |
return submission_id.rsplit("_", 1)[-1] | |
def _submission_id_to_timestamp(submission_id: str) -> datetime: | |
""" | |
Extracts the timestamp from the submission ID: "YYYYMMDD_HHMMSS_username_userid" | |
""" | |
ts_str = "_".join(submission_id.split("_", 2)[:2]) | |
return datetime.strptime(ts_str, "%Y%m%d_%H%M%S").replace(tzinfo=timezone.utc) | |