import glob import json import os import re import pickle from typing import List import huggingface_hub from huggingface_hub import HfApi from tqdm import tqdm from transformers import AutoModel, AutoConfig from accelerate import init_empty_weights from src.display_models.model_metadata_flags import DO_NOT_SUBMIT_MODELS, FLAGGED_MODELS from src.display_models.model_metadata_type import MODEL_TYPE_METADATA, ModelType, model_type_from_str from src.display_models.utils import AutoEvalColumn, model_hyperlink api = HfApi(token=os.environ.get("H4_TOKEN", None)) def get_model_infos_from_hub(leaderboard_data: List[dict]): # load cache from disk try: with open("model_info_cache.pkl", "rb") as f: model_info_cache = pickle.load(f) except (EOFError, FileNotFoundError): model_info_cache = {} try: with open("model_size_cache.pkl", "rb") as f: model_size_cache = pickle.load(f) except (EOFError, FileNotFoundError): model_size_cache = {} for model_data in tqdm(leaderboard_data): model_name = model_data["model_name_for_query"] if model_name in model_info_cache: model_info = model_info_cache[model_name] else: try: model_info = api.model_info(model_name) model_info_cache[model_name] = model_info except huggingface_hub.utils._errors.RepositoryNotFoundError: print("Repo not found!", model_name) model_data[AutoEvalColumn.license.name] = None model_data[AutoEvalColumn.likes.name] = None if model_name not in model_size_cache: model_size_cache[model_name] = get_model_size(model_name, None) model_data[AutoEvalColumn.params.name] = model_size_cache[model_name] model_data[AutoEvalColumn.license.name] = get_model_license(model_info) model_data[AutoEvalColumn.likes.name] = get_model_likes(model_info) if model_name not in model_size_cache: model_size_cache[model_name] = get_model_size(model_name, model_info) model_data[AutoEvalColumn.params.name] = model_size_cache[model_name] # save cache to disk in pickle format with open("model_info_cache.pkl", "wb") as f: pickle.dump(model_info_cache, f) with open("model_size_cache.pkl", "wb") as f: pickle.dump(model_size_cache, f) def get_model_license(model_info): try: return model_info.cardData["license"] except Exception: return "?" def get_model_likes(model_info): return model_info.likes size_pattern = re.compile(r"(\d\.)?\d+(b|m)") def get_model_size(model_name, model_info): # In billions try: return round(model_info.safetensors["total"] / 1e9, 3) except AttributeError: try: config = AutoConfig.from_pretrained(model_name, trust_remote_code=False) with init_empty_weights(): model = AutoModel.from_config(config, trust_remote_code=False) return round(sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e9, 3) except (EnvironmentError, ValueError, KeyError): # model config not found, likely private try: size_match = re.search(size_pattern, model_name.lower()) size = size_match.group(0) return round(float(size[:-1]) if size[-1] == "b" else float(size[:-1]) / 1e3, 3) except AttributeError: return 0 def get_model_type(leaderboard_data: List[dict]): for model_data in leaderboard_data: request_files = os.path.join( "eval-queue", model_data["model_name_for_query"] + "_eval_request_*" + ".json", ) request_files = glob.glob(request_files) # Select correct request file (precision) request_file = "" if len(request_files) == 1: request_file = request_files[0] elif len(request_files) > 1: request_files = sorted(request_files, reverse=True) for tmp_request_file in request_files: with open(tmp_request_file, "r") as f: req_content = json.load(f) if ( req_content["status"] == "FINISHED" and req_content["precision"] == model_data["Precision"].split(".")[-1] ): request_file = tmp_request_file try: with open(request_file, "r") as f: request = json.load(f) model_type = model_type_from_str(request["model_type"]) model_data[AutoEvalColumn.model_type.name] = model_type.value.name model_data[AutoEvalColumn.model_type_symbol.name] = model_type.value.symbol # + ("🔺" if is_delta else "") except Exception: if model_data["model_name_for_query"] in MODEL_TYPE_METADATA: model_data[AutoEvalColumn.model_type.name] = MODEL_TYPE_METADATA[ model_data["model_name_for_query"] ].value.name model_data[AutoEvalColumn.model_type_symbol.name] = MODEL_TYPE_METADATA[ model_data["model_name_for_query"] ].value.symbol # + ("🔺" if is_delta else "") else: model_data[AutoEvalColumn.model_type.name] = ModelType.Unknown.value.name model_data[AutoEvalColumn.model_type_symbol.name] = ModelType.Unknown.value.symbol def flag_models(leaderboard_data: List[dict]): for model_data in leaderboard_data: if model_data["model_name_for_query"] in FLAGGED_MODELS: issue_num = FLAGGED_MODELS[model_data["model_name_for_query"]].split("/")[-1] issue_link = model_hyperlink( FLAGGED_MODELS[model_data["model_name_for_query"]], f"See discussion #{issue_num}", ) model_data[ AutoEvalColumn.model.name ] = f"{model_data[AutoEvalColumn.model.name]} has been flagged! {issue_link}" def remove_forbidden_models(leaderboard_data: List[dict]): indices_to_remove = [] for ix, model in enumerate(leaderboard_data): if model["model_name_for_query"] in DO_NOT_SUBMIT_MODELS: indices_to_remove.append(ix) for ix in reversed(indices_to_remove): leaderboard_data.pop(ix) return leaderboard_data def apply_metadata(leaderboard_data: List[dict]): leaderboard_data = remove_forbidden_models(leaderboard_data) get_model_type(leaderboard_data) get_model_infos_from_hub(leaderboard_data) flag_models(leaderboard_data)