Spaces:
Runtime error
Runtime error
import copy | |
import glob | |
import json | |
import os | |
# Necessary for `requests`. Without set correct path or empty string it fails during process HTTPS connection with this: [Errno 101] Network is unreachable | |
if os.path.exists("/etc/ssl/certs/ca-certificates.crt"): | |
os.environ["CURL_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt" | |
os.environ["REQUESTS_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt" | |
else: | |
os.environ["CURL_CA_BUNDLE"] = "" | |
os.environ["REQUESTS_CA_BUNDLE"] = "" | |
print(f"{os.environ.get('CURL_CA_BUNDLE') = }") | |
print(f"{os.environ.get('REQUESTS_CA_BUNDLE') = }") | |
import hashlib | |
import time | |
import requests | |
from collections import namedtuple | |
from xml.sax.saxutils import escape as xmlEscape, quoteattr as xmlQuoteAttr | |
from threading import Lock | |
import gradio as gr | |
import pandas as pd | |
from huggingface_hub import HfApi, snapshot_download | |
from compare_significance import SUPPORTED_METRICS | |
VISIBLE_METRICS = SUPPORTED_METRICS + ["macro_f1"] | |
api = HfApi() | |
ORG = "CZLC" | |
REPO = f"{ORG}/LLM_benchmark_data" | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
TASKS_METADATA_PATH = "./tasks_metadata.json" | |
MARKDOWN_SPECIAL_CHARACTERS = { | |
"#": "#", # for usage in xml.sax.saxutils.escape as entities must be first | |
"\\": "\", | |
"`": "`", | |
"*": "*", | |
"_": "_", | |
"{": "{", | |
"}": "}", | |
"[": "[", | |
"]": "]", | |
"(": "(", | |
")": ")", | |
"+": "+", | |
"-": "-", | |
".": ".", | |
"!": "!", | |
"=": "=", | |
"|": "|" | |
} | |
def check_significance_send_task(model_a_path, model_b_path): | |
url = 'https://czechllm.fit.vutbr.cz/benczechmark-leaderboard/compare_significance/' | |
# prepare and send request | |
with ( | |
open(model_a_path, 'rb') as model_a_fp, | |
open(model_b_path, 'rb') as model_b_fp, | |
): | |
files = { | |
'model_a': model_a_fp, | |
'model_b': model_b_fp, | |
} | |
response = requests.post(url, files=files, timeout=60 * 5) | |
# check response | |
if response.status_code == 202: | |
result_url = response.url | |
#task_id = response.json()['task_id'] | |
elif response.status_code == 429: | |
raise RuntimeError('Server is too busy. Please try again later.') # TODO: try-except do raise gr.error | |
else: | |
raise RuntimeError(f'Failed to submit task. Status code: {response.status_code}') # TODO: try-except do raise gr.error | |
return result_url | |
def check_significance_wait_for_result(result_url): | |
while True: | |
response = requests.get(result_url, timeout=60 * 5) | |
if response.status_code == 200: | |
result = response.json() | |
break | |
elif response.status_code == 202: | |
time.sleep(5) | |
else: | |
raise RuntimeError(f'Failed to get result. Status code: {response.status_code}') # TODO: try-except do raise gr.error | |
if result["state"] == "COMPLETED": | |
return result['result'] | |
else: | |
raise RuntimeError(result['result']['error']) | |
def check_significance(model_a_path, model_b_path): | |
result_url = check_significance_send_task(model_a_path, model_b_path) | |
result = check_significance_wait_for_result(result_url) | |
return result | |
pre_submit_lock = Lock() | |
class _ReadLock: | |
def __init__(self, lock): | |
self._lock = lock | |
self.reading = 0 | |
def __enter__(self): | |
with self._lock: | |
self.reading += 1 | |
def __exit__(self, exc_type, exc_value, traceback): | |
with self._lock: | |
self.reading -= 1 | |
class ReadWriteLock: | |
""" | |
Zámek, který ověří, že nikdo nečte když se zapisuje a že zapisuje pouze jeden | |
""" | |
def __init__(self): | |
self._lock = Lock() | |
self.ro = _ReadLock(self._lock) | |
self.rw = self | |
def __enter__(self): | |
self._lock.acquire() | |
while True: | |
reading = self.ro.reading | |
if reading > 0: | |
self._lock.release() | |
time.sleep(1) | |
self._lock.acquire() | |
elif reading < 0: | |
self._lock.release() | |
raise RuntimeError() | |
else: | |
return | |
def __exit__(self, exc_type, exc_value, traceback): | |
self._lock.release() | |
class LeaderboardServer: | |
def __init__(self): | |
self.server_address = REPO | |
self.repo_type = "dataset" | |
self.local_leaderboard = snapshot_download( | |
self.server_address, | |
repo_type=self.repo_type, | |
token=HF_TOKEN, | |
local_dir="./", | |
) | |
self.TASKS_METADATA = json.load(open(TASKS_METADATA_PATH)) | |
self.TASKS_CATEGORIES = {self.TASKS_METADATA[task]["category"] for task in self.TASKS_METADATA} | |
self.TASKS_CATEGORY_OVERALL = "Overall" | |
self.var_lock = ReadWriteLock() | |
self.submission_ids = set() | |
self.submission_id_to_file = {} # Map submission ids to file paths | |
self.fetch_existing_models() | |
self.tournament_results = self.load_tournament_results() | |
self.pre_submit_lock = pre_submit_lock | |
self.pre_submit = None | |
def update_leaderboard(self): | |
self.local_leaderboard = snapshot_download( | |
self.server_address, | |
repo_type=self.repo_type, | |
token=HF_TOKEN, | |
local_dir="./", | |
) | |
self.fetch_existing_models() | |
with self.var_lock.rw: | |
self.tournament_results = self.load_tournament_results() | |
def load_tournament_results(self): | |
metadata_rank_paths = os.path.join(self.local_leaderboard, "tournament.json") | |
if not os.path.exists(metadata_rank_paths): | |
return {} | |
with open(metadata_rank_paths) as ranks_file: | |
results = json.load(ranks_file) | |
return results | |
def fetch_existing_models(self): | |
# Models data | |
for submission_file in glob.glob(os.path.join(self.local_leaderboard, "data") + "/*.json"): | |
data = json.load(open(submission_file)) | |
metadata = data.get('metadata') | |
if metadata is None: | |
continue | |
submission_id = metadata["submission_id"] | |
with self.var_lock.rw: | |
self.submission_ids.add(submission_id) | |
self.submission_id_to_file[submission_id] = submission_file | |
def get_leaderboard(self, pre_submit=None, category=None): | |
with self.var_lock.ro: | |
tournament_results = pre_submit.tournament_results if pre_submit else self.tournament_results | |
category = category if category else self.TASKS_CATEGORY_OVERALL | |
if len(tournament_results) == 0: | |
return pd.DataFrame(columns=['No submissions yet']) | |
else: | |
processed_results = [] | |
for submission_id in tournament_results.keys(): | |
path = self.submission_id_to_file.get(submission_id) | |
if path is None: | |
if pre_submit and submission_id == pre_submit.submission_id: | |
data = json.load(open(pre_submit.file)) | |
else: | |
raise gr.Error(f"Internal error: Submission [{submission_id}] not found") | |
elif path: | |
data = json.load(open(path)) | |
else: | |
raise gr.Error(f"Submission [{submission_id}] not found") | |
if submission_id != data["metadata"]["submission_id"]: | |
raise gr.Error(f"Proper submission [{submission_id}] not found") | |
local_results = {} | |
win_score = {} | |
visible_metrics_map_word_to_header = {} | |
for task in self.TASKS_METADATA.keys(): | |
task_category = self.TASKS_METADATA[task]["category"] | |
if category not in (self.TASKS_CATEGORY_OVERALL, task_category): | |
continue | |
else: | |
# tournament_results | |
num_of_competitors = 0 | |
num_of_wins = 0 | |
for competitor_id in tournament_results[submission_id].keys() - {submission_id}: # without self | |
num_of_competitors += 1 | |
if tournament_results[submission_id][competitor_id][task]: | |
num_of_wins += 1 | |
task_score = num_of_wins / num_of_competitors * 100 if num_of_competitors > 0 else 100 | |
win_score.setdefault(task_category, []).append(task_score) | |
if category == task_category: | |
local_results[task] = task_score | |
for metric in VISIBLE_METRICS: | |
visible_metrics_map_word_to_header[task + "_" + metric] = self.TASKS_METADATA[task]["abbreviation"] + " " + metric | |
metric_value = data['results'][task].get(metric) | |
if metric_value is not None: | |
local_results[task + "_" + metric] = metric_value * 100 | |
break # Only the first metric of every task | |
for c in win_score: | |
win_score[c] = sum(win_score[c]) / len(win_score[c]) | |
if category == self.TASKS_CATEGORY_OVERALL: | |
for c in win_score: | |
local_results[c] = win_score[c] | |
local_results["average_score"] = sum(win_score.values()) / len(win_score) | |
else: | |
local_results["average_score"] = win_score[category] | |
model_link = data["metadata"]["link_to_model"] | |
model_title = data["metadata"]["team_name"] + "/" + data["metadata"]["model_name"] | |
model_title_abbr_team_name = self.abbreviate(data["metadata"]["team_name"], 28) | |
model_title_abbr_model_name = self.abbreviate(data["metadata"]["model_name"], 28) | |
model_title_abbr_html = f'<div style="font-size: 10px;">{xmlEscape(model_title_abbr_team_name, MARKDOWN_SPECIAL_CHARACTERS)}</div>{xmlEscape(model_title_abbr_model_name, MARKDOWN_SPECIAL_CHARACTERS)}' | |
local_results["model"] = f'<a href={xmlQuoteAttr(model_link)} title={xmlQuoteAttr(model_title)}>{model_title_abbr_html}</a>' | |
release = data["metadata"].get("submission_timestamp") | |
release = time.strftime("%Y-%m-%d", time.gmtime(release)) if release else "N/A" | |
local_results["release"] = release | |
local_results["model_type"] = data["metadata"]["model_type"] | |
local_results["parameters"] = data["metadata"]["parameters"] | |
if pre_submit and submission_id == pre_submit.submission_id: | |
processed_results.insert(0, local_results) | |
else: | |
processed_results.append(local_results) | |
dataframe = pd.DataFrame.from_records(processed_results) | |
extra_attributes_map_word_to_header = { | |
"model": "Model", | |
"release": "Release", | |
"average_score": "Average ⬆️", | |
"team_name": "Team name", | |
"model_name": "Model name", | |
"model_type": "Type", | |
"parameters": "# θ (B)", | |
"input_length": "Input length (# tokens)", | |
"precision": "Precision", | |
"description": "Description", | |
"link_to_model": "Link to model" | |
} | |
first_attributes = [ | |
"model", | |
"release", | |
"model_type", | |
"parameters", | |
"average_score", | |
] | |
df_order = [ | |
key | |
for key in dict.fromkeys( | |
first_attributes | |
+ list(self.TASKS_METADATA.keys()) | |
+ list(dataframe.columns) | |
).keys() | |
if key in dataframe.columns | |
] | |
dataframe = dataframe[df_order] | |
attributes_map_word_to_header = {key: value["abbreviation"] for key, value in self.TASKS_METADATA.items()} | |
attributes_map_word_to_header.update(extra_attributes_map_word_to_header) | |
attributes_map_word_to_header.update(visible_metrics_map_word_to_header) | |
dataframe = dataframe.rename( | |
columns=attributes_map_word_to_header | |
) | |
return dataframe | |
def start_tournament(self, new_submission_id, new_model_file): | |
with self.var_lock.ro: | |
new_tournament = copy.deepcopy(self.tournament_results) | |
new_tournament[new_submission_id] = {} | |
new_tournament[new_submission_id][new_submission_id] = { | |
task: False for task in self.TASKS_METADATA.keys() | |
} | |
rest_of_competitors = list(self.submission_ids - {new_submission_id}) # without self | |
num_of_competitors = len(rest_of_competitors) | |
result_url = {} | |
result_inverse_url = {} | |
while rest_of_competitors: | |
next_competitors = [] | |
while rest_of_competitors: | |
if len(next_competitors) < 5: # 5*2==10 tasks | |
next_competitors.append(rest_of_competitors.pop()) | |
else: | |
break | |
for competitor_id in next_competitors: | |
result_url[competitor_id] = check_significance_send_task(new_model_file, self.submission_id_to_file[competitor_id]) | |
result_inverse_url[competitor_id] = check_significance_send_task(self.submission_id_to_file[competitor_id], new_model_file) | |
while next_competitors: | |
competitor_id = next_competitors.pop(0) | |
result = check_significance_wait_for_result(result_url.pop(competitor_id)) | |
result_inverse = check_significance_wait_for_result(result_inverse_url.pop(competitor_id)) | |
if rest_of_competitors: | |
new_competitor_id = rest_of_competitors.pop() | |
next_competitors.append(new_competitor_id) | |
result_url[new_competitor_id] = check_significance_send_task(new_model_file, self.submission_id_to_file[new_competitor_id]) | |
result_inverse_url[new_competitor_id] = check_significance_send_task(self.submission_id_to_file[new_competitor_id], new_model_file) | |
new_tournament[new_submission_id][competitor_id] = { | |
task: data["significant"] for task, data in result.items() | |
} | |
new_tournament[competitor_id][new_submission_id] = { | |
task: data["significant"] for task, data in result_inverse.items() | |
} | |
num_of_competitors_done = num_of_competitors - len(next_competitors) - len(rest_of_competitors) | |
gr.Info(f"Tournament: {num_of_competitors_done}/{num_of_competitors} = {(num_of_competitors_done) * 100 // num_of_competitors}% done") | |
return new_tournament | |
def abbreviate(s, max_length, dots_place="center"): | |
if len(s) <= max_length: | |
return s | |
else: | |
if max_length <= 1: | |
return "…" | |
elif dots_place == "begin": | |
return "…" + s[-max_length + 1:].lstrip() | |
elif dots_place == "center" and max_length >= 3: | |
max_length_begin = max_length // 2 | |
max_length_end = max_length - max_length_begin - 1 | |
return s[:max_length_begin].rstrip() + "…" + s[-max_length_end:].lstrip() | |
else: # dots_place == "end" | |
return s[:max_length - 1].rstrip() + "…" | |
def create_submission_id(metadata): | |
# Délka ID musí být omezena, protože se používá v názvu souboru | |
submission_id = "_".join([metadata[key][:7] for key in ( | |
"team_name", | |
"model_name", | |
"model_predictions_sha256", | |
"model_results_sha256", | |
)]) | |
submission_id = submission_id.replace("/", "_").replace("\n", "_").strip() | |
return submission_id | |
def get_sha256_hexdigest(obj): | |
data = json.dumps( | |
obj, | |
separators=(',', ':'), | |
sort_keys=True, | |
ensure_ascii=True, | |
).encode() | |
result = hashlib.sha256(data).hexdigest() | |
return result | |
PreSubmit = namedtuple('PreSubmit', 'tournament_results, submission_id, file') | |
def prepare_model_for_submission(self, file, metadata) -> PreSubmit: | |
with open(file, "r") as f: | |
data = json.load(f) | |
data["metadata"] = metadata | |
metadata["model_predictions_sha256"] = self.get_sha256_hexdigest(data["predictions"]) | |
metadata["model_results_sha256"] = self.get_sha256_hexdigest(data["results"]) | |
submission_id = self.create_submission_id(metadata) | |
metadata["submission_id"] = submission_id | |
metadata["submission_timestamp"] = time.time() # timestamp | |
with open(file, "w") as f: | |
json.dump(data, f, separators=(',', ':')) # compact JSON | |
while True: | |
with self.pre_submit_lock: | |
if self.pre_submit == None: | |
gr.Info('Running tournament...', duration=15) | |
self.update_leaderboard() | |
tournament_results = self.start_tournament(submission_id, file) | |
self.pre_submit = self.PreSubmit(tournament_results, submission_id, file) | |
break | |
gr.Info("Waiting in queue...", duration=5) | |
time.sleep(10) | |
return self.pre_submit | |
def save_pre_submit(self): | |
with self.pre_submit_lock: | |
if self.pre_submit: | |
tournament_results, submission_id, file = self.pre_submit | |
api.upload_file( | |
path_or_fileobj=file, | |
path_in_repo=f"data/{submission_id}.json", | |
repo_id=self.server_address, | |
repo_type=self.repo_type, | |
token=HF_TOKEN, | |
) | |
# Temporary save tournament results | |
tournament_results_path = os.path.join(self.local_leaderboard, "tournament.json") | |
with open(tournament_results_path, "w") as f: | |
json.dump(tournament_results, f, sort_keys=True, indent=2) # readable JSON | |
api.upload_file( | |
path_or_fileobj=tournament_results_path, | |
path_in_repo="tournament.json", | |
repo_id=self.server_address, | |
repo_type=self.repo_type, | |
token=HF_TOKEN, | |
) | |
self.pre_submit = None | |
def get_model_detail(self, submission_id): | |
with self.var_lock.ro: | |
path = self.submission_id_to_file.get(submission_id) | |
if path is None: | |
raise gr.Error(f"Submission [{submission_id}] not found") | |
data = json.load(open(path)) | |
return data["metadata"] | |