Spaces:
Runtime error
Runtime error
Santi Diana
Automatic Evaluation Available. Read the README filegit add add_new_model/README.md add_new_model/add_new_model.py app.py add_new_model/execute_evaluation.py
d38c074
import pandas as pd | |
import yaml | |
import numpy as np | |
import argparse | |
from execute_evaluation import evaluate | |
import logging | |
import os | |
import json | |
import sys | |
from mteb import MTEB | |
def add_model(): | |
""" | |
Esto actualiza el archivo del cual app.py coge la información para crear la leaderboard. Entonces, cuando | |
alguien quiera añadir un nuevo modelo, tiene que ejecutar este archivo. | |
1. Leemos el CSV, sacamos información y añadimos simplemente una nueva row. | |
""" | |
# Initialize an empty DataFrame | |
df = pd.DataFrame(columns=['dataset_name', 'Accuracy', 'Spearman', "Category"]) | |
metadata_archive = 'mteb_metadata.yaml' | |
with open(metadata_archive, 'r') as file: | |
for index, data in enumerate(yaml.safe_load_all(file)): | |
if index == 0: | |
model_index_list = data.get('model-index', []) | |
model_name = model_index_list[0].get('name') | |
results_list = model_index_list[0].get('results', []) | |
if results_list: | |
for i in range(len(results_list)): | |
task = results_list[i].get('task', {}) | |
task_name = task.get("type") | |
dataset_name = results_list[i]['dataset']['name'] | |
# Initialize the row with NaN values | |
row = {'dataset_name': dataset_name, 'Accuracy': None, 'Spearman': None} | |
if task_name == "Classification": | |
accuracy = next((metric.get('value') for metric in results_list[i].get('metrics', []) if metric.get('type') == 'accuracy'), None) | |
row['Accuracy'] = accuracy | |
row['Category'] = "Classification" | |
elif task_name == "STS": | |
spearman = next((metric.get('value') for metric in results_list[i].get('metrics', []) if metric.get('type') == 'cos_sim_spearman'), None) | |
row['Spearman'] = spearman | |
row["Category"] = "STS" | |
# Append the row to the DataFrame using pd.concat | |
new_df = pd.DataFrame([row]) | |
df = pd.concat([df, new_df], ignore_index=True) | |
df['Accuracy'] = pd.to_numeric(df['Accuracy'], errors='coerce') | |
classification_average = round(df.loc[df['Category'] == 'Classification', 'Accuracy'].mean(),2) | |
df['Spearman'] = pd.to_numeric(df['Spearman'], errors='coerce') | |
sts_spearman_average = round(df.loc[df['Category'] == 'STS', 'Spearman'].mean(),2) | |
## CLASSIFICATION | |
classification_dataframe = pd.read_csv('../data/classification.csv') | |
classification_df = df[df['Category']== 'Classification'] | |
new_row_data = {'Model name': model_name, 'Average': classification_average} | |
for index, row in classification_df.iterrows(): | |
column_name = row['dataset_name'] | |
accuracy_value = row['Accuracy'] | |
new_row_data[column_name] = round(accuracy_value,2) | |
new_row_df = pd.DataFrame(new_row_data,index=[0]) | |
classification_dataframe = pd.concat([classification_dataframe,new_row_df],ignore_index=True) | |
classification_dataframe.to_csv("../data/classification.csv",index=False) | |
## STS | |
sts_dataframe = pd.read_csv('../data/sts.csv') | |
sts_df = df[df['Category']=='STS'] | |
new_row_data = {'Model name': model_name, 'Average': sts_spearman_average} | |
for index, row in sts_df.iterrows(): | |
column_name = row['dataset_name'] | |
spearman_value = row['Spearman'] | |
new_row_data[column_name] = round(spearman_value,2) | |
new_row_df = pd.DataFrame(new_row_data,index = [0]) | |
sts_dataframe = pd.concat([sts_dataframe,new_row_df],ignore_index=True) | |
sts_dataframe.to_csv('../data/sts.csv',index=False) | |
## GENERAL | |
general_dataframe = pd.read_csv("../data/general.csv") | |
average = round(np.mean([classification_average,sts_spearman_average]),2) | |
## TODO: solucionar la meta-data como Model Size o Embedding Dimensions. | |
new_instance = {'Model name':model_name, 'Model Size (GB)': None, 'Embedding Dimensions': None, 'Average':average, 'Classification Average': classification_average, 'Clustering Average': None, 'STS Average': sts_spearman_average, 'Retrieval Average': None} | |
new_row_df = pd.DataFrame(new_instance, index=[0]) | |
general_dataframe = pd.concat([general_dataframe, new_row_df], ignore_index=True) | |
general_dataframe.to_csv("../data/general.csv",index=False) | |
def results_to_yaml(results_folder): | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
model_name = results_folder.split("/")[-1] | |
all_results = {} | |
for file_name in os.listdir(results_folder): | |
if not file_name.endswith(".json"): | |
logger.info(f"Skipping non-json {file_name}") | |
raise ValueError("This is not the proper folder. It does not contain the corresponding Json files.") | |
continue | |
with open(os.path.join(results_folder, file_name), "r", encoding="utf-8") as f: | |
results = json.load(f) | |
all_results = {**all_results, **{file_name.replace(".json", ""): results}} | |
# Use "train" split instead | |
TRAIN_SPLIT = ["DanishPoliticalCommentsClassification"] | |
# Use "validation" split instead | |
VALIDATION_SPLIT = ["AFQMC", "Cmnli", "IFlyTek", "TNews", "MSMARCO", "MultilingualSentiment", "Ocnli"] | |
# Use "dev" split instead | |
DEV_SPLIT = ["CmedqaRetrieval", "CovidRetrieval", "DuRetrieval", "EcomRetrieval", "MedicalRetrieval", "MMarcoReranking", "MMarcoRetrieval", "MSMARCO", "T2Reranking", "T2Retrieval", "VideoRetrieval"] | |
MARKER = "---" | |
TAGS = "tags:" | |
MTEB_TAG = "- mteb" | |
HEADER = "model-index:" | |
MODEL = f"- name: {model_name}" | |
RES = " results:" | |
META_STRING = "\n".join([MARKER, TAGS, MTEB_TAG, HEADER, MODEL, RES]) | |
ONE_TASK = " - task:\n type: {}\n dataset:\n type: {}\n name: {}\n config: {}\n split: {}\n revision: {}\n metrics:" | |
ONE_METRIC = " - type: {}\n value: {}" | |
SKIP_KEYS = ["std", "evaluation_time", "main_score", "threshold"] | |
for ds_name, res_dict in sorted(all_results.items()): | |
mteb_desc = ( | |
MTEB(tasks=[ds_name.replace("CQADupstackRetrieval", "CQADupstackAndroidRetrieval")]).tasks[0].description | |
) | |
hf_hub_name = mteb_desc.get("hf_hub_name", mteb_desc.get("beir_name")) | |
if "CQADupstack" in ds_name: | |
hf_hub_name = "BeIR/cqadupstack" | |
mteb_type = mteb_desc["type"] | |
revision = res_dict.get("dataset_revision") # Okay if it's None | |
split = "test" | |
if (ds_name in TRAIN_SPLIT) and ("train" in res_dict): | |
split = "train" | |
elif (ds_name in VALIDATION_SPLIT) and ("validation" in res_dict): | |
split = "validation" | |
elif (ds_name in DEV_SPLIT) and ("dev" in res_dict): | |
split = "dev" | |
elif "test" not in res_dict: | |
logger.info(f"Skipping {ds_name} as split {split} not present.") | |
continue | |
res_dict = res_dict.get(split) | |
for lang in mteb_desc["eval_langs"]: | |
mteb_name = f"MTEB {ds_name}" | |
mteb_name += f" ({lang})" if len(mteb_desc["eval_langs"]) > 1 else "" | |
# For English there is no language key if it's the only language | |
test_result_lang = res_dict.get(lang) if len(mteb_desc["eval_langs"]) > 1 else res_dict | |
# Skip if the language was not found but it has other languages | |
if test_result_lang is None: | |
continue | |
META_STRING += "\n" + ONE_TASK.format( | |
mteb_type, hf_hub_name, mteb_name, lang if len(mteb_desc["eval_langs"]) > 1 else "default", split, revision | |
) | |
for metric, score in test_result_lang.items(): | |
if not isinstance(score, dict): | |
score = {metric: score} | |
for sub_metric, sub_score in score.items(): | |
if any([x in sub_metric for x in SKIP_KEYS]): | |
continue | |
META_STRING += "\n" + ONE_METRIC.format( | |
f"{metric}_{sub_metric}" if metric != sub_metric else metric, | |
# All MTEB scores are 0-1, multiply them by 100 for 3 reasons: | |
# 1) It's easier to visually digest (You need two chars less: "0.1" -> "1") | |
# 2) Others may multiply them by 100, when building on MTEB making it confusing what the range is | |
# This happend with Text and Code Embeddings paper (OpenAI) vs original BEIR paper | |
# 3) It's accepted practice (SuperGLUE, GLUE are 0-100) | |
sub_score * 100, | |
) | |
META_STRING += "\n" + MARKER | |
if os.path.exists(f"./mteb_metadata.yaml"): | |
logger.warning("Overwriting mteb_metadata.md") | |
with open(f"./mteb_metadata.yaml", "w") as f: | |
f.write(META_STRING) | |
def main(): | |
if args.execute_eval: | |
output_folder = evaluate(args.model_id) | |
results_to_yaml(output_folder) | |
add_model() | |
else: | |
print('Hola') | |
print(args.output_folder) | |
if args.output_folder == None: | |
raise ValueError("You must indicate where your results are located") | |
else: | |
results_to_yaml(args.output_folder) | |
add_model() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Select the model that you want to add to the Leaderboard.") | |
parser.add_argument("--model_id", type=str, required=True, help="HuggingFace model path that you want to evaluate.") | |
parser.add_argument("--execute_eval",type=bool, default=False, help="Select if you want to execute evaluation.") | |
parser.add_argument("--output_folder", type=str, help = "Select the folder in which the results are stored.") | |
args = parser.parse_args() | |
main() | |