Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from dataclasses import dataclass | |
| from src.display.formatting import make_clickable_model | |
| from src.display.utils import AutoEvalColumn, ModelType, Tasks, Precision, WeightType | |
| from src.submission.check_validity import is_model_on_hub | |
| class EvalResult: | |
| """Represents one full evaluation. Built from a single result file for a given run.""" | |
| eval_name: str # org_model_precision (uid) | |
| full_model: str # org/model (path on hub) | |
| org: str | |
| model: str | |
| revision: str # commit hash, "" if main | |
| results: dict | |
| precision: Precision = Precision.Unknown | |
| model_type: ModelType = ModelType.Unknown # Pretrained, fine tuned, ... | |
| weight_type: WeightType = WeightType.Original # Original or Adapter | |
| architecture: str = "Unknown" | |
| license: str = "?" | |
| likes: int = 0 | |
| num_params: int = 0 | |
| date: str = "" # submission date of request file | |
| still_on_hub: bool = False | |
| def init_from_json_file(self, json_filepath): | |
| """Inits the result from the specific model result file""" | |
| try: | |
| with open(json_filepath) as fp: | |
| data = json.load(fp) | |
| # Extract model information from the JSON data | |
| full_model_name = data.get('model') | |
| org_and_model = full_model_name.split("/", 1) | |
| org = org_and_model[0] | |
| model = org_and_model[1] | |
| # Extract other metadata | |
| precision_str = data.get('precision', 'Unknown') | |
| precision = Precision.from_str(precision_str) | |
| model_type = ModelType.from_str(data.get('model_type', 'Unknown')) | |
| weight_type = WeightType.from_str(data.get('weight_type', 'Original')) | |
| revision = data.get('revision', '') | |
| date = data.get('submitted_at', '') | |
| # Extract results and metadata | |
| results = data.get('results', {}) | |
| license = data.get('license', '?') | |
| likes = data.get('likes', 0) | |
| num_params = data.get('params', 0) | |
| architecture = data.get('architecture', 'Unknown') | |
| # Check if the model is still on the hub | |
| still_on_hub, _, _ = is_model_on_hub(full_model_name, revision=revision) | |
| return EvalResult( | |
| eval_name=f"{org}_{model}_{precision.value}", | |
| full_model=full_model_name, | |
| org=org, | |
| model=model, | |
| revision=revision, | |
| results=results, | |
| precision=precision, | |
| model_type=model_type, | |
| weight_type=weight_type, | |
| architecture=architecture, | |
| license=license, | |
| likes=likes, | |
| num_params=num_params, | |
| date=date, | |
| still_on_hub=still_on_hub | |
| ) | |
| except Exception as e: | |
| print(f"Error reading evaluation file {json_filepath}: {str(e)}") | |
| return None | |
| def to_dict(self): | |
| """Converts the Eval Result to a dict compatible with our dataframe display""" | |
| # Calculate the average score for the leaderboard | |
| scores = [v for k, v in self.results.items() if v is not None and k in [task.value.metric for task in Tasks]] | |
| average = sum(scores) / len(scores) if scores else 0 | |
| AutoEvalColumnInstance = AutoEvalColumn() | |
| data_dict = { | |
| "eval_name": self.eval_name, | |
| AutoEvalColumnInstance.precision.name: self.precision.value.name, | |
| AutoEvalColumnInstance.model_type.name: self.model_type.value.name, | |
| AutoEvalColumnInstance.model_type_symbol.name: self.model_type.value.symbol, | |
| AutoEvalColumnInstance.weight_type.name: self.weight_type.value.name, | |
| AutoEvalColumnInstance.architecture.name: self.architecture, | |
| AutoEvalColumnInstance.model.name: make_clickable_model(self.full_model), | |
| AutoEvalColumnInstance.revision.name: self.revision, | |
| AutoEvalColumnInstance.average.name: average, | |
| AutoEvalColumnInstance.license.name: self.license, | |
| AutoEvalColumnInstance.likes.name: self.likes, | |
| AutoEvalColumnInstance.params.name: self.num_params, | |
| AutoEvalColumnInstance.still_on_hub.name: self.still_on_hub, | |
| } | |
| # Dynamically map metric values to their corresponding column names | |
| for task in Tasks: | |
| task_metric = task.value.metric | |
| task_col_name = task.value.col_name | |
| data_dict[task_col_name] = self.results.get(task_metric) | |
| return data_dict | |
| def get_raw_eval_results(results_path: str, requests_path: str) -> list[EvalResult]: | |
| """From the path of the results folder root, extract all needed info for results""" | |
| model_result_filepaths = [] | |
| # Recursively find all result files | |
| for root, _, files in os.walk(results_path): | |
| json_files = [f for f in files if f.endswith(".json")] | |
| for file in json_files: | |
| model_result_filepaths.append(os.path.join(root, file)) | |
| eval_results = [] | |
| for model_result_filepath in model_result_filepaths: | |
| try: | |
| eval_result = EvalResult.init_from_json_file(model_result_filepath) | |
| if eval_result is not None: | |
| eval_results.append(eval_result) | |
| else: | |
| print(f"Skipping invalid evaluation file: {model_result_filepath}") | |
| except Exception as e: | |
| print(f"Error processing evaluation file {model_result_filepath}: {str(e)}") | |
| continue | |
| return eval_results | |