Alvinn-aai's picture
24 hours wait time and enable validation
ee85d80
import threading
import time
from datetime import datetime, timezone, timedelta
import os
import requests
import pandas as pd
from datasets import Dataset, get_dataset_config_names
from datasets.exceptions import DatasetNotFoundError
from pandas.api.types import is_integer_dtype
import gradio as gr
from src.datamodel.data import F1Data
from src.display.formatting import styled_error, styled_message
from src.display.utils import ModelType
from src.envs import SUBMISSIONS_REPO, TOKEN
from src.logger import get_logger
from src.validation.validate import is_submission_file_valid, is_valid
logger = get_logger(__name__)
MIN_WAIT_TIME_PER_USER_HRS = 24
RATE_LIMIT_WINDOW_HRS = 24
MAX_SUBMISSIONS_PER_WINDOW = 10
submission_lock = threading.Lock()
def add_new_solutions(
lbdb: F1Data,
username: str,
user_id: str,
system_name: str,
org: str,
submission_path: str,
is_warmup_dataset: bool,
ensure_all_present: bool = False,
):
with submission_lock:
try:
submitted_ids = get_dataset_config_names(SUBMISSIONS_REPO, token=TOKEN)
except (DatasetNotFoundError, FileNotFoundError):
submitted_ids = []
if submitted_ids == ["default"]:
# means empty dataset
submitted_ids = []
logger.info(f"Found {len(submitted_ids)} submissions")
# Rate limits:
# 1. Users must wait MIN_WAIT_TIME_PER_USER_HRS hours between submissions.
# 2. No more than MAX_SUBMISSIONS_PER_WINDOW submissions RATE_LIMIT_WINDOW_HRS hours overall.
sub_df = pd.DataFrame.from_dict(
{
"submission_id": submitted_ids,
"user_id": map(_submission_id_to_user_id, submitted_ids),
"timestamp": map(_submission_id_to_timestamp, submitted_ids),
}
)
# Per user limit
now = datetime.now(timezone.utc)
cutoff_user = now - timedelta(hours=MIN_WAIT_TIME_PER_USER_HRS)
user_last_submission_ts = sub_df[sub_df.user_id == user_id].timestamp.max()
if pd.notna(user_last_submission_ts) and user_last_submission_ts > cutoff_user:
remaining_hrs = (user_last_submission_ts - cutoff_user).total_seconds() / 3600
logger.info(f"{username} must wait {remaining_hrs:.2f} more hours.")
return styled_error(
f"You must wait {MIN_WAIT_TIME_PER_USER_HRS} hours between submissions. "
f"Remaining wait time: {remaining_hrs:.2f} hours"
)
# Overall limit
cutoff_overall = now - timedelta(hours=RATE_LIMIT_WINDOW_HRS)
if len(sub_df.timestamp > cutoff_overall) >= MAX_SUBMISSIONS_PER_WINDOW:
logger.info(
f"Too many submissions in the last {RATE_LIMIT_WINDOW_HRS} hours: {len(sub_df.timestamp > cutoff_overall)}."
)
return styled_error("The leaderboard has reached its submission capacity for now. Please try again later.")
logger.info(
f"Adding new submission: {system_name=}, {org=}, and {submission_path=}",
)
# Double-checking.
for val in [system_name, org]:
assert is_valid(val)
assert is_submission_file_valid(submission_path, is_warmup_dataset=is_warmup_dataset)
try:
submission_df = pd.read_json(submission_path, lines=True)
if ensure_all_present:
_validate_all_submissions_present(lbdb=lbdb, pd_ds=submission_df)
except Exception:
logger.warning("Failed to parse submission DF!", exc_info=True)
return styled_error(
"An error occurred. Please try again later."
) # Use same message as external error. Avoid infoleak.
submission_id = f"{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{username}_{user_id}"
# Seems good, creating the eval.
logger.info(f"Adding new submission: {submission_id}")
submission_ts = time.time_ns()
def add_info(row):
return {
**row,
"system_name": system_name,
"organization": org,
"submission_id": submission_id,
"submission_ts": submission_ts,
"evaluation_id": "", # This will be set later when the evaluation is launched in the backend
"evaluation_start_ts": "", # This will be set when the evaluation starts
}
ds = Dataset.from_pandas(submission_df).map(add_info)
with submission_lock:
ds.push_to_hub(
SUBMISSIONS_REPO,
submission_id,
private=True,
)
return styled_message(
"Your request has been submitted to the evaluation queue!\n"
+ "Results may take up to 24 hours to be processed and shown in the leaderboard."
)
def fetch_sub_claim(oauth_token: gr.OAuthToken | None) -> dict | None:
if oauth_token is None:
return None
provider = os.getenv("OPENID_PROVIDER_URL")
if not provider:
return None
try:
oidc_meta = requests.get(f"{provider}/.well-known/openid-configuration", timeout=5)
oidc_meta = oidc_meta.json()
userinfo_ep = oidc_meta["userinfo_endpoint"]
claims = requests.get(userinfo_ep, headers={"Authorization": f"Bearer {oauth_token.token}"}, timeout=5)
logger.info(f"userinfo_endpoint response: status={claims.status_code}\nheaders={dict(claims.headers)}")
claims = claims.json()
# Typical fields: sub (stable id), preferred_username, name, picture
return {
"sub": claims.get("sub"),
"preferred_username": claims.get("preferred_username"),
"name": claims.get("name"),
}
except Exception as e:
logger.warning(f"Failed to fetch user claims: {e}")
return None
def fetch_user_info(oauth_token: gr.OAuthToken | None) -> dict | None:
if oauth_token is None:
return None
try:
headers = {"Authorization": f"Bearer {oauth_token.token}"}
logger.info("HEADERS %s", headers)
r = requests.get("https://huggingface.co/api/whoami-v2", headers=headers)
logger.info("RESP CODE %s", r.status_code)
logger.info("RESP content %s", r.text)
if r.status_code != 200:
return None
return r.json()
except:
logger.exception("Cannot get user info")
return None
def _validate_all_submissions_present(
lbdb: F1Data,
pd_ds: pd.DataFrame,
):
logger.info(f"Validating DS size {len(pd_ds)} columns {pd_ds.columns} set {set(pd_ds.columns)}")
expected_cols = ["problem_id", "solution"]
if set(pd_ds.columns) != set(expected_cols):
return ValueError(f"Expected attributes: {expected_cols}, Got: {pd_ds.columns.tolist()}")
if not is_integer_dtype(pd_ds["problem_id"]):
return ValueError("problem_id must be str convertible to int")
if any(type(v) is not str for v in pd_ds["solution"]):
return ValueError("solution must be of type str")
submitted_ids = set(pd_ds.problem_id.astype(str))
if submitted_ids != lbdb.code_problem_ids:
missing = lbdb.code_problem_ids - submitted_ids
unknown = submitted_ids - lbdb.code_problem_ids
raise ValueError(f"Mismatched problem IDs: {len(missing)} missing, {len(unknown)} unknown")
if len(pd_ds) > len(lbdb.code_problem_ids):
return ValueError("Duplicate problem IDs exist in uploaded file")
def _submission_id_to_user_id(submission_id: str) -> str:
"""
Extracts the user ID from the submission ID: "YYYYMMDD_HHMMSS_username_userid"
"""
return submission_id.rsplit("_", 1)[-1]
def _submission_id_to_timestamp(submission_id: str) -> datetime:
"""
Extracts the timestamp from the submission ID: "YYYYMMDD_HHMMSS_username_userid"
"""
ts_str = "_".join(submission_id.split("_", 2)[:2])
return datetime.strptime(ts_str, "%Y%m%d_%H%M%S").replace(tzinfo=timezone.utc)