Spaces:
Running
Running
import json | |
import os | |
from ast import literal_eval | |
import pandas as pd | |
import re | |
from src.display.formatting import has_no_nan_values, make_clickable_model | |
from src.display.utils import AutoEvalColumn, EvalQueueColumn | |
from src.leaderboard.read_evals import get_raw_eval_results | |
from src.about import ( | |
nc_tasks, | |
nr_tasks, | |
lp_tasks, | |
) | |
def sanitize_string(input_string): | |
# Remove leading and trailing whitespace | |
input_string = input_string.strip() | |
# Remove leading whitespace on each line | |
sanitized_string = re.sub(r'(?m)^\s+', '', input_string) | |
return sanitized_string | |
''' | |
def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: | |
"""Creates a dataframe from all the individual experiment results""" | |
raw_data = get_raw_eval_results(results_path, requests_path) | |
all_data_json = [v.to_dict() for v in raw_data] | |
df = pd.DataFrame.from_records(all_data_json) | |
#df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) | |
#df = df[cols].round(decimals=2) | |
# filter out if any of the benchmarks have not been produced | |
#df = df[has_no_nan_values(df, benchmark_cols)] | |
return raw_data, df | |
''' | |
# Function to extract the numerical part before '+' | |
def extract_x(value): | |
return float(value.split('+')[0]) | |
# Function to highlight the highest (or lowest) value based on X | |
def make_bold(df, cols, ascending): | |
df_highlight = df.copy() | |
def apply_highlight(s): | |
if ascending: | |
max_idx = s.apply(extract_x).idxmin() | |
else: | |
max_idx = s.apply(extract_x).idxmax() | |
return ['font-weight: bold' if i == max_idx else '' for i in range(len(s))] | |
styler = df_highlight.style.apply(lambda x: apply_highlight(x) if x.name in cols else ['']*len(x), axis=0) | |
return styler | |
def format_number(num): | |
return f"{num:.3f}" | |
def get_leaderboard_df(EVAL_REQUESTS_PATH, task_type) -> pd.DataFrame: | |
if task_type in ['Node Classification', 'Entity Classification']: | |
ascending = False | |
tasks = nc_tasks | |
task_type = ['Node Classification', 'Entity Classification'] | |
elif task_type in ['Node Regression', 'Entity Regression']: | |
ascending = True | |
tasks = nr_tasks | |
task_type = ['Node Regression', 'Entity Regression'] | |
elif task_type in ['Link Prediction', 'Recommendation']: | |
ascending = False | |
tasks = lp_tasks | |
task_type = ['Link Prediction', 'Recommendation'] | |
model_result_filepaths = [] | |
for root,_, files in os.walk(EVAL_REQUESTS_PATH): | |
if len(files) == 0 or any([not f.endswith(".json") for f in files]): | |
continue | |
for file in files: | |
model_result_filepaths.append(os.path.join(root, file)) | |
model_res = [] | |
for model in model_result_filepaths: | |
import json | |
with open(model) as f: | |
out = json.load(f) | |
if ('task' in out) and (out['task'] in task_type): | |
model_res.append(out) | |
for model in model_res: | |
model["test"] = literal_eval(model["test"].split('}')[0]+'}') | |
model["valid"] = literal_eval(model["valid"].split('}')[0]+'}') | |
#model["params"] = int(model["params"]) | |
model['submitted_time'] = model['submitted_time'].split('T')[0] | |
#model['paper_url'] = '[Link](' + model['paper_url'] + ')' | |
#model['github_url'] = '[Link](' + model['github_url'] + ')' | |
name2short_name = {task.value.benchmark: task.value.benchmark for task in tasks} | |
for model in model_res: | |
model.update({ | |
name2short_name[i]: (f"{format_number(model['test'][i][0])} ± {format_number(model['test'][i][1])}" if i in model['test'] else '-') | |
for i in name2short_name | |
}) | |
columns_to_show = ['model', 'author', 'email', 'paper_url', 'github_url', 'submitted_time'] + list(name2short_name.values()) | |
# Check if model_res is empty | |
if len(model_res) > 0: | |
df_res = pd.DataFrame([{col: model[col] for col in columns_to_show} for model in model_res]) | |
else: | |
# Initialize an empty DataFrame with the desired columns | |
df_res = pd.DataFrame(columns=columns_to_show) | |
#df_res = pd.DataFrame([{col: model[col] for col in columns_to_show} for model in model_res]) | |
ranks = df_res[list(name2short_name.values())].rank(ascending = ascending) | |
df_res.rename(columns={'model': 'Model', 'author': 'Author', 'email': 'Email', 'paper_url': 'Paper URL', 'github_url': 'Github URL', 'submitted_time': 'Time'}, inplace=True) | |
df_res.Model.replace('Relbench User Study', 'Human Data Scientist', inplace=True) | |
df_res['Average Rank⬆️'] = ranks.mean(axis=1) | |
df_res.sort_values(by='Average Rank⬆️', ascending=True, inplace=True) | |
#df_res = make_bold(df_res, list(name2short_name.values()), ascending = ascending) | |
print(df_res) | |
return df_res | |
def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: | |
"""Creates the different dataframes for the evaluation queues requestes""" | |
entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] | |
all_evals = [] | |
for entry in entries: | |
if ".json" in entry: | |
file_path = os.path.join(save_path, entry) | |
with open(file_path) as fp: | |
data = json.load(fp) | |
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
all_evals.append(data) | |
elif ".md" not in entry: | |
# this is a folder | |
sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if not e.startswith(".")] | |
for sub_entry in sub_entries: | |
file_path = os.path.join(save_path, entry, sub_entry) | |
with open(file_path) as fp: | |
data = json.load(fp) | |
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
all_evals.append(data) | |
pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] | |
running_list = [e for e in all_evals if e["status"] == "RUNNING"] | |
finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] | |
df_pending = pd.DataFrame.from_records(pending_list, columns=cols) | |
df_running = pd.DataFrame.from_records(running_list, columns=cols) | |
df_finished = pd.DataFrame.from_records(finished_list, columns=cols) | |
return df_finished[cols], df_running[cols], df_pending[cols] | |