hamzabouajila's picture
feat: enhance evaluation queue reliability and add stale job recovery
2f1e30c
import json
import os
import time
import traceback
from datetime import datetime, timezone
from src.display.formatting import styled_error, styled_message, styled_warning
from src.envs import API, EVAL_REQUESTS_PATH, TOKEN, QUEUE_REPO
from src.submission.check_validity import (
already_submitted_models,
check_model_card,
get_model_size,
is_model_on_hub,
)
from src.evaluator.evaluate import EvaluationStatus
REQUESTED_MODELS = None
USERS_TO_SUBMISSION_DATES = None
def _create_eval_request(
model: str,
base_model: str,
revision: str,
precision: str,
weight_type: str,
model_type: str,
model_info: dict,
):
"""
Creates and uploads a JSON file for a new model evaluation request.
This function is a helper for add_new_eval and should not be called directly.
"""
try:
request_data = {
'model': model,
'base_model': base_model,
'revision': revision,
'precision': precision,
'weight_type': weight_type,
'model_type': model_type,
'status': EvaluationStatus.PENDING.value,
'submitted_time': datetime.now(timezone.utc).isoformat(),
'likes': model_info.likes,
'params': get_model_size(model_info, precision),
'license': model_info.cardData.get("license"),
'private': model_info.private,
}
user_name = model.split('/')[0] if '/' in model else 'unknown'
safe_revision = revision.replace('/', '_')
request_filename = f"{model.replace('/', '_')}_eval_request_{safe_revision}_{precision}_{weight_type}.json"
local_dir = os.path.join(EVAL_REQUESTS_PATH, user_name)
os.makedirs(local_dir, exist_ok=True)
local_path = os.path.join(local_dir, request_filename)
print(f"Creating local evaluation request file: {local_path}")
# Use a try-finally block to ensure the local file is always removed
try:
with open(local_path, 'w') as f:
print(request_data)
json.dump(request_data, f, indent=2)
# Upload the request file to the Hugging Face queue repository
print(f"Uploading evaluation request to {QUEUE_REPO}")
path_in_repo = os.path.join(user_name, request_filename)
print(path_in_repo)
print(local_path)
API.upload_file(
path_or_fileobj=local_path,
path_in_repo=path_in_repo,
repo_id=QUEUE_REPO,
repo_type="dataset",
commit_message=f"Add evaluation request for {model}",
token=TOKEN
)
print(f"Uploaded successfully to {path_in_repo} in {QUEUE_REPO}")
return styled_message(
"Evaluation request created successfully! Please wait for the evaluation to complete."
)
finally:
if os.path.exists(local_path):
os.remove(local_path)
print(f"Local file {local_path} removed.")
except Exception as e:
print(f"Error creating or uploading evaluation request: {str(e)}")
print(f"Full traceback:\n{traceback.format_exc()}")
return styled_error(f"Failed to create evaluation request: {str(e)}")
def add_new_eval(model: str, base_model: str, revision: str, precision: str, weight_type: str, model_type: str):
"""
Validates a model and creates an evaluation request for it.
This is the main function to be called by the user.
"""
try:
print("\n=== Starting Evaluation Submission ===")
print(f"Submission time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC")
print(f"Model: {model}, Base: {base_model}, Revision: {revision}, Precision: {precision}")
precision = precision.split(" ")[0]
if not revision:
revision = "main"
print("Using default revision: main")
# --- Step 1: Check for existing submissions ---
print("\n=== Checking for existing submissions ===")
global REQUESTED_MODELS
global USERS_TO_SUBMISSION_DATES
start_time = time.time()
REQUESTED_MODELS, USERS_TO_SUBMISSION_DATES = already_submitted_models(EVAL_REQUESTS_PATH)
print(f"Cache refresh completed in {time.time() - start_time:.2f} seconds. Found {len(REQUESTED_MODELS)} existing submissions.")
model_key = f"{model}_{revision}_{precision}"
if model_key in REQUESTED_MODELS:
queue_file_path = REQUESTED_MODELS[model_key]
try:
with open(queue_file_path, 'r') as f:
queue_entry = json.load(f)
status = queue_entry.get('status')
if status is not None and status != EvaluationStatus.FAILED.value:
return styled_warning(f"This model has already been submitted and is in a '{status}' status.")
except Exception as e:
print(f"Error reading queue file: {e}")
print(f"Full traceback:\n{traceback.format_exc()}")
return styled_warning("Error checking model status. Please try again later.")
print(f"No existing submission found for key: {model_key} or previous submission had a FAILED status.")
# --- Step 2: Validate model type and existence on the Hub ---
print("\n=== Validating model existence and card === ")
if not model_type:
return styled_error("Please select a model type.")
try:
# Validate the base model first for delta/adapter weights
if weight_type in ["Delta", "Adapter"]:
print(f"Checking base model '{base_model}' on Hugging Face...")
base_model_on_hub, error, _ = is_model_on_hub(model_name=base_model, revision=revision, token=TOKEN)
if not base_model_on_hub:
return styled_error(f'Base model "{base_model}" was not found on the Hugging Face Hub: {error}')
# Validate the main model
print(f"Checking model '{model}' on Hugging Face...")
model_on_hub, error, _ = is_model_on_hub(model_name=model, revision=revision, token=TOKEN)
if not model_on_hub:
return styled_error(f'Model "{model}" was not found on the Hugging Face Hub: {error}')
# Get model information and validate the model card and license
model_info = API.model_info(repo_id=model, revision=revision)
model_card_ok, error_msg = check_model_card(model)
if not model_card_ok:
return styled_error(error_msg)
if "license" not in model_info.cardData:
return styled_error("Please select a license for your model in its model card.")
except Exception as e:
print(f"Error during model validation: {e}")
print(f"Full traceback:\n{traceback.format_exc()}")
return styled_error(f"Failed to validate model on Hugging Face: {str(e)}")
# --- Step 3: Create the evaluation request ---
print("\n=== Creating and uploading evaluation request ===")
# This function encapsulates the file creation and upload logic.
return _create_eval_request(
model=model,
base_model=base_model,
revision=revision,
precision=precision,
weight_type=weight_type,
model_type=model_type,
model_info=model_info,
)
except Exception as e:
print(f"An unexpected error occurred during submission: {e}")
print(f"Full traceback:\n{traceback.format_exc()}")
return styled_error(f"An unexpected error occurred during submission: {str(e)}")