import json import os import logging from datetime import datetime from argparse import Namespace from lighteval.main_accelerate import main, EnvConfig, create_model_config, load_model from src.envs import RESULTS_REPO, CACHE_PATH, TOKEN, OWNER from src.backend.manage_requests import EvalRequest from lighteval.logging.evaluation_tracker import EnhancedJSONEncoder from lighteval.models.model_loader import ModelInfo from huggingface_hub.errors import InferenceEndpointTimeoutError logging.getLogger("openai").setLevel(logging.WARNING) class DefaultNamespace(Namespace): def __getattr__(self, name): return self.__dict__.get(name, None) def run_evaluation(eval_request: EvalRequest, task_names: str, batch_size: int, local_dir: str, accelerator: str, region: str, vendor: str, instance_size: str, instance_type: str, limit=None): if limit: print("WARNING: --limit SHOULD ONLY BE USED FOR TESTING. REAL METRICS SHOULD NOT BE COMPUTED USING LIMIT.") args = DefaultNamespace(**{ "model_config": dict(model=dict( type="endpoint", base_params=dict( endpoint_name=f'{eval_request.model.split("/")[1].replace(".", "-").replace("_", "-").lower()}-lighteval'[-32:].strip('-'), model=eval_request.model, revision=eval_request.revision, dtype=eval_request.precision, reuse_existing=False ), instance=dict( accelerator=accelerator, region=region, vendor=vendor, instance_size=instance_size, instance_type=instance_type, framework='pytorch', endpoint_type='protected', namespace=OWNER ), generation=dict( add_special_tokens=True ) )), "max_samples": limit, "job_id": str(datetime.now()), "push_results_to_hub": True, "save_details": False, "push_details_to_hub": False, "public_run": False, "cache_dir": CACHE_PATH, "results_org": OWNER, "output_dir": local_dir, "override_batch_size": batch_size, "custom_tasks": "custom_tasks.py", "tasks": task_names, "dataset_loading_processes": 24, "num_fewshot_seeds": 0 }) try: # in case of timeout, try it again with reuse_existing for i in range(3): try: results = main(args) # if we are i>0, then raise an error so that we call clean up if i > 0: raise Exception() except InferenceEndpointTimeoutError: if i < 3: print('Timed out, trying again...') args.model_config['model']['base_params']['reuse_existing'] = True dumped = json.dumps(results, cls=EnhancedJSONEncoder, indent=2) print(dumped) except Exception as ex: # if eval failed, we force a cleanup import traceback traceback.print_exception(ex) env_config = EnvConfig(token=TOKEN, cache_dir=args.cache_dir) args.model_config['model']['base_params']['reuse_existing'] = True model_config = create_model_config(args=args, accelerator=accelerator) model, _ = load_model(config=model_config, env_config=env_config) print('Cleaning up') model.reuse_existing = False # force it to clean up model.cleanup() results = None return results