import gradio as gr import pandas as pd from pathlib import Path from typing import Optional from about import ENDPOINTS, API, submissions_repo, results_repo, test_repo from utils import metrics_per_ep from huggingface_hub import hf_hub_download import datetime import io import json, tempfile import re from pydantic import ( BaseModel, Field, model_validator, field_validator, ValidationError ) HF_USERNAME_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-_]{1,38})$") def _safeify_username(username: str) -> str: return str(username.strip()).replace("/", "_").replace(" ", "_") def _unsafify_username(username: str) -> str: return str(username.strip()).replace("/", "_").replace(" ", "_") def _check_required_columns(df: pd.DataFrame, name: str, cols: list[str]): missing = [c for c in cols if c not in df.columns] if missing: raise ValueError(f"{name} is missing required columns: {missing}") class ParticipantRecord(BaseModel): hf_username: str = Field(description="Hugging Face username") display_name: Optional[str] = Field(description="Name to display on leaderboard") participant_name: Optional[str] = Field(default=None, description="Participant's real name") discord_username: Optional[str] = Field(default=None, description="Discord username") email: Optional[str] = Field(default=None, description="Email address") affiliation: Optional[str] = Field(default=None, description="Affiliation") model_tag: Optional[str] = Field(default=None, description="Link to model description") anonymous: bool = Field(default=False, description="Whether to display username as 'anonymous'") consent_publication: bool = Field(default=False, description="Consent to be included in publications") @field_validator("hf_username") @classmethod def validate_hf_username(cls, v: str) -> str: v = v.strip() if not HF_USERNAME_RE.match(v): raise gr.Error("Invalid Hugging Face username (letters, numbers, -, _; min 2, max ~39).") return v @field_validator("display_name") @classmethod def validate_display_name(cls, v: Optional[str]) -> Optional[str]: if v is None: return None v = v.strip() if not v: return None if len(v) > 20: raise ValueError("Display name is too long (max 20 chars).") return v @field_validator("model_tag", mode="before") @classmethod def normalize_url(cls, v): if v is None: return v s = str(v).strip() if not s: return None if "://" not in s: s = "https://" + s return s @model_validator(mode="after") def require_display_name_if_anonymous(self) -> "ParticipantRecord": if self.anonymous and not self.display_name: raise ValueError("Alias is required when anonymous box is checked.") return self class SubmissionMetadata(BaseModel): submission_time_utc: str user: str original_filename: str evaluated: bool participant: ParticipantRecord def submit_data(predictions_file: str, user_state, participant_name: str = "", discord_username: str = "", email: str = "", affiliation: str = "", model_tag: str = "", user_display: str = "", anon_checkbox: bool = False, paper_checkbox: bool = False ): if user_state is None: raise gr.Error("Username or alias is required for submission.") file_path = Path(predictions_file).resolve() if not file_path.exists(): raise gr.Error("Uploaded file object does not have a valid file path.") # Read results file try: results_df = pd.read_csv(file_path) except Exception as e: return f"❌ Error reading results file: {str(e)}" if results_df.empty: return gr.Error("The uploaded file is empty.") missing = set(ENDPOINTS) - set(results_df.columns) if missing: return gr.Error(f"The uploaded file must contain all endpoint predictions {ENDPOINTS} as columns.") # Save participant record try: participant_record = ParticipantRecord( hf_username=user_state, participant_name=participant_name, discord_username=discord_username, email=email, affiliation=affiliation, model_tag=model_tag, display_name=user_display, anonymous=anon_checkbox, consent_publication=paper_checkbox ) except ValidationError as e: return f"❌ Error in participant information: {str(e)}" # Build destination filename in the dataset ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") # should keep default time so can be deserialized correctly try: meta = SubmissionMetadata( submission_time_utc=ts, user=user_state, original_filename=file_path.name, evaluated=False, participant=participant_record ) except ValidationError as e: return f"❌ Error in metadata information: {str(e)}" safe_user = _safeify_username(user_state) destination_csv = f"submissions/{safe_user}_{ts}.csv" destination_json = destination_csv.replace(".csv", ".json") # Upload the CSV file API.upload_file( path_or_fileobj=str(file_path), path_in_repo=destination_csv, repo_id=submissions_repo, repo_type="dataset", commit_message=f"Add submission for {safe_user} at {ts}" ) # Upload the metadata JSON file meta_bytes = io.BytesIO(json.dumps(meta.model_dump(), indent=2).encode("utf-8")) API.upload_file( path_or_fileobj=meta_bytes, path_in_repo=destination_json, repo_id=submissions_repo, repo_type="dataset", commit_message=f"Add metadata for {user_state} submission at {ts}" ) return "✅ Your submission has been received! Your scores will appear on the leaderboard shortly.", destination_csv def evaluate_data(filename: str) -> None: # Load the submission csv try: local_path = hf_hub_download( repo_id=submissions_repo, repo_type="dataset", filename=filename, ) except Exception as e: raise gr.Error(f"Failed to download submission file: {e}") # Load the test set try: test_path = hf_hub_download( repo_id=test_repo, repo_type="dataset", filename="data/challenge_mock_test_set.csv", #Replace later with "test_dataset.csv", ) except Exception as e: raise gr.Error(f"Failed to download test file: {e}") data_df = pd.read_csv(local_path) test_df = pd.read_csv(test_path) try: results_df = calculate_metrics(data_df, test_df) if not isinstance(results_df, pd.DataFrame) or results_df.empty: raise gr.Error("Evaluation produced no results.") except Exception as e: raise gr.Error(f'Evaluation failed: {e}. No results written to results dataset.') # Load metadata file meta_filename = filename.replace(".csv", ".json") try: meta_path = hf_hub_download( repo_id=submissions_repo, repo_type="dataset", filename=meta_filename, ) with open(meta_path, "r", encoding="utf-8") as f: _meta = json.load(f) meta = SubmissionMetadata(**_meta) username = meta.participant.hf_username timestamp = meta.submission_time_utc report = meta.participant.model_tag if meta.participant.anonymous: display_name = meta.participant.display_name else: display_name = username except Exception as e: raise gr.Error(f"Failed to load metadata file: {e}. No results written to results dataset.") # Write results to results dataset results_df['user'] = display_name results_df['submission_time'] = timestamp results_df['model_report'] = report results_df['anonymous'] = meta.participant.anonymous safe_user = _unsafify_username(username) destination_path = f"results/{safe_user}_{timestamp}_results.csv" tmp_name = None with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as tmp: results_df.to_csv(tmp, index=False) tmp.flush() tmp_name = tmp.name API.upload_file( path_or_fileobj=tmp_name, path_in_repo=destination_path, repo_id=results_repo, repo_type="dataset", commit_message=f"Add result data for {username}" ) Path(tmp_name).unlink() def calculate_metrics( results_dataframe: pd.DataFrame, test_dataframe: pd.DataFrame ): import numpy as np # Do some checks # 1) Check all columns are present _check_required_columns(results_dataframe, "Results file", ["Molecule Name"] + ENDPOINTS) _check_required_columns(test_dataframe, "Test file", ["Molecule Name"] + ENDPOINTS) # 2) Check all Molecules in the test set are present in the predictions merged_df = pd.merge(test_dataframe, results_dataframe, on=['Molecule Name'], how='left', indicator=True) if not (merged_df['_merge'] == 'both').all(): raise gr.Error("The predictions file is missing some molecules present in the test set. Please ensure all molecules are included.") # TODO: What to do when a molecule is duplicated in the Predictions file? df_results = pd.DataFrame(columns=["endpoint", "MAE", "RAE", "R2", "Spearman R", "Kendall's Tau"]) for i, measurement in enumerate(ENDPOINTS): df_pred = results_dataframe[['Molecule Name', measurement]].copy() df_true = test_dataframe[['Molecule Name', measurement]].copy() # coerce numeric columns df_pred[measurement] = pd.to_numeric(df_pred[measurement], errors="coerce") df_true[measurement] = pd.to_numeric(df_true[measurement], errors="coerce") if df_pred[measurement].isnull().all(): # TODO: Allow missing endpoints or raise an error? raise gr.Error(f"All predictions are missing for endpoint {measurement}. Please provide valid predictions.") # Drop NaNs and calculate coverage merged = ( df_pred.rename(columns={measurement: f"{measurement}_pred"}) .merge( df_true.rename(columns={measurement: f"{measurement}_true"}), on="Molecule Name", how="inner", ) .dropna(subset=[f"{measurement}_pred", f"{measurement}_true"]) ) n_total = merged[f"{measurement}_true"].notna().sum() # Valid test set points n_pairs = len(merged) # actual pairs with predictions coverage = (n_pairs / n_total * 100.0) if n_total else 0.0 merged = merged.sort_values("Molecule Name", kind="stable") # validate pairs if n_pairs < 10: mae = rae = r2 = spearman = ktau = np.nan else: y_pred = merged[f"{measurement}_pred"].to_numpy() y_true = merged[f"{measurement}_true"].to_numpy() # Force log scale for all endpoints except LogD (for outliers) if measurement != "LogD": y_pred = np.log10(y_pred) y_true = np.log10(y_true) mae, rae, r2, spearman, ktau = metrics_per_ep(y_pred, y_true) df_results.loc[i, 'endpoint'] = measurement df_results.loc[i, 'MAE'] = mae df_results.loc[i, 'RAE'] = rae df_results.loc[i, 'R2'] = r2 df_results.loc[i, 'Spearman R'] = spearman df_results.loc[i, "Kendall's Tau"] = ktau df_results.loc[i, 'data coverage (%)'] = coverage # Average results num_cols = ["MAE", "RAE", "R2", "Spearman R", "Kendall's Tau", "data coverage (%)"] df_results[num_cols] = df_results[num_cols].apply(pd.to_numeric, errors="coerce") means = df_results[num_cols].mean() avg_row = {"endpoint": "Average", **means.to_dict()} df_with_average = pd.concat([df_results, pd.DataFrame([avg_row])], ignore_index=True) return df_with_average