import copy import glob import json import os # Necessary for `requests`. Without set correct path or empty string it fails during process HTTPS connection with this: [Errno 101] Network is unreachable if os.path.exists("/etc/ssl/certs/ca-certificates.crt"): os.environ["CURL_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt" os.environ["REQUESTS_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt" else: os.environ["CURL_CA_BUNDLE"] = "" os.environ["REQUESTS_CA_BUNDLE"] = "" print(f"{os.environ.get('CURL_CA_BUNDLE') = }") print(f"{os.environ.get('REQUESTS_CA_BUNDLE') = }") import hashlib import time import requests from collections import namedtuple from xml.sax.saxutils import escape as xmlEscape, quoteattr as xmlQuoteAttr from threading import Lock import gradio as gr import pandas as pd from huggingface_hub import HfApi, snapshot_download from compare_significance import SUPPORTED_METRICS VISIBLE_METRICS = SUPPORTED_METRICS + ["macro_f1"] api = HfApi() ORG = "CZLC" REPO = f"{ORG}/LLM_benchmark_data" HF_TOKEN = os.environ.get("HF_TOKEN") TASKS_METADATA_PATH = "./tasks_metadata.json" MARKDOWN_SPECIAL_CHARACTERS = { "#": "#", # for usage in xml.sax.saxutils.escape as entities must be first "\\": "\", "`": "`", "*": "*", "_": "_", "{": "{", "}": "}", "[": "[", "]": "]", "(": "(", ")": ")", "+": "+", "-": "-", ".": ".", "!": "!", "=": "=", "|": "|" } def check_significance_send_task(model_a_path, model_b_path): url = 'https://czechllm.fit.vutbr.cz/benczechmark-leaderboard/compare_significance/' # prepare and send request with ( open(model_a_path, 'rb') as model_a_fp, open(model_b_path, 'rb') as model_b_fp, ): files = { 'model_a': model_a_fp, 'model_b': model_b_fp, } response = requests.post(url, files=files, timeout=60 * 5) # check response if response.status_code == 202: result_url = response.url #task_id = response.json()['task_id'] elif response.status_code == 429: raise RuntimeError('Server is too busy. Please try again later.') # TODO: try-except do raise gr.error else: raise RuntimeError(f'Failed to submit task. Status code: {response.status_code}') # TODO: try-except do raise gr.error return result_url def check_significance_wait_for_result(result_url): while True: response = requests.get(result_url, timeout=60 * 5) if response.status_code == 200: result = response.json() break elif response.status_code == 202: time.sleep(5) else: raise RuntimeError(f'Failed to get result. Status code: {response.status_code}') # TODO: try-except do raise gr.error if result["state"] == "COMPLETED": return result['result'] else: raise RuntimeError(result['result']['error']) def check_significance(model_a_path, model_b_path): result_url = check_significance_send_task(model_a_path, model_b_path) result = check_significance_wait_for_result(result_url) return result pre_submit_lock = Lock() class _ReadLock: def __init__(self, lock): self._lock = lock self.reading = 0 def __enter__(self): with self._lock: self.reading += 1 def __exit__(self, exc_type, exc_value, traceback): with self._lock: self.reading -= 1 class ReadWriteLock: """ Zámek, který ověří, že nikdo nečte když se zapisuje a že zapisuje pouze jeden """ def __init__(self): self._lock = Lock() self.ro = _ReadLock(self._lock) self.rw = self def __enter__(self): self._lock.acquire() while True: reading = self.ro.reading if reading > 0: self._lock.release() time.sleep(1) self._lock.acquire() elif reading < 0: self._lock.release() raise RuntimeError() else: return def __exit__(self, exc_type, exc_value, traceback): self._lock.release() class LeaderboardServer: def __init__(self): self.server_address = REPO self.repo_type = "dataset" self.local_leaderboard = snapshot_download( self.server_address, repo_type=self.repo_type, token=HF_TOKEN, local_dir="./", ) self.TASKS_METADATA = json.load(open(TASKS_METADATA_PATH)) self.TASKS_CATEGORIES = {self.TASKS_METADATA[task]["category"] for task in self.TASKS_METADATA} self.TASKS_CATEGORY_OVERALL = "Overall" self.var_lock = ReadWriteLock() self.submission_ids = set() self.submission_id_to_file = {} # Map submission ids to file paths self.fetch_existing_models() self.tournament_results = self.load_tournament_results() self.pre_submit_lock = pre_submit_lock self.pre_submit = None def update_leaderboard(self): self.local_leaderboard = snapshot_download( self.server_address, repo_type=self.repo_type, token=HF_TOKEN, local_dir="./", ) self.fetch_existing_models() with self.var_lock.rw: self.tournament_results = self.load_tournament_results() def load_tournament_results(self): metadata_rank_paths = os.path.join(self.local_leaderboard, "tournament.json") if not os.path.exists(metadata_rank_paths): return {} with open(metadata_rank_paths) as ranks_file: results = json.load(ranks_file) return results def fetch_existing_models(self): # Models data for submission_file in glob.glob(os.path.join(self.local_leaderboard, "data") + "/*.json"): data = json.load(open(submission_file)) metadata = data.get('metadata') if metadata is None: continue submission_id = metadata["submission_id"] with self.var_lock.rw: self.submission_ids.add(submission_id) self.submission_id_to_file[submission_id] = submission_file def get_leaderboard(self, pre_submit=None, category=None): with self.var_lock.ro: tournament_results = pre_submit.tournament_results if pre_submit else self.tournament_results category = category if category else self.TASKS_CATEGORY_OVERALL if len(tournament_results) == 0: return pd.DataFrame(columns=['No submissions yet']) else: processed_results = [] for submission_id in tournament_results.keys(): path = self.submission_id_to_file.get(submission_id) if path is None: if pre_submit and submission_id == pre_submit.submission_id: data = json.load(open(pre_submit.file)) else: raise gr.Error(f"Internal error: Submission [{submission_id}] not found") elif path: data = json.load(open(path)) else: raise gr.Error(f"Submission [{submission_id}] not found") if submission_id != data["metadata"]["submission_id"]: raise gr.Error(f"Proper submission [{submission_id}] not found") local_results = {} win_score = {} visible_metrics_map_word_to_header = {} for task in self.TASKS_METADATA.keys(): task_category = self.TASKS_METADATA[task]["category"] if category not in (self.TASKS_CATEGORY_OVERALL, task_category): continue else: # tournament_results num_of_competitors = 0 num_of_wins = 0 for competitor_id in tournament_results[submission_id].keys() - {submission_id}: # without self num_of_competitors += 1 if tournament_results[submission_id][competitor_id][task]: num_of_wins += 1 task_score = num_of_wins / num_of_competitors * 100 if num_of_competitors > 0 else 100 win_score.setdefault(task_category, []).append(task_score) if category == task_category: local_results[task] = task_score for metric in VISIBLE_METRICS: visible_metrics_map_word_to_header[task + "_" + metric] = self.TASKS_METADATA[task]["abbreviation"] + " " + metric metric_value = data['results'][task].get(metric) if metric_value is not None: local_results[task + "_" + metric] = metric_value * 100 break # Only the first metric of every task for c in win_score: win_score[c] = sum(win_score[c]) / len(win_score[c]) if category == self.TASKS_CATEGORY_OVERALL: for c in win_score: local_results[c] = win_score[c] local_results["average_score"] = sum(win_score.values()) / len(win_score) else: local_results["average_score"] = win_score[category] model_link = data["metadata"]["link_to_model"] model_title = data["metadata"]["team_name"] + "/" + data["metadata"]["model_name"] model_title_abbr = self.abbreviate(data["metadata"]["team_name"], 14) + "/" + self.abbreviate(data["metadata"]["model_name"], 14) local_results["model"] = f'{xmlEscape(model_title_abbr, MARKDOWN_SPECIAL_CHARACTERS)}' release = data["metadata"].get("submission_timestamp") release = time.strftime("%Y-%m-%d", time.gmtime(release)) if release else "N/A" local_results["release"] = release local_results["model_type"] = data["metadata"]["model_type"] local_results["parameters"] = data["metadata"]["parameters"] if pre_submit and submission_id == pre_submit.submission_id: processed_results.insert(0, local_results) else: processed_results.append(local_results) dataframe = pd.DataFrame.from_records(processed_results) extra_attributes_map_word_to_header = { "model": "Model", "release": "Release", "average_score": "Average ⬆️", "team_name": "Team name", "model_name": "Model name", "model_type": "Type", "parameters": "# θ (B)", "input_length": "Input length (# tokens)", "precision": "Precision", "description": "Description", "link_to_model": "Link to model" } first_attributes = [ "model", "release", "model_type", "parameters", "average_score", ] df_order = [ key for key in dict.fromkeys( first_attributes + list(self.TASKS_METADATA.keys()) + list(dataframe.columns) ).keys() if key in dataframe.columns ] dataframe = dataframe[df_order] attributes_map_word_to_header = {key: value["abbreviation"] for key, value in self.TASKS_METADATA.items()} attributes_map_word_to_header.update(extra_attributes_map_word_to_header) attributes_map_word_to_header.update(visible_metrics_map_word_to_header) dataframe = dataframe.rename( columns=attributes_map_word_to_header ) return dataframe def start_tournament(self, new_submission_id, new_model_file): with self.var_lock.ro: new_tournament = copy.deepcopy(self.tournament_results) new_tournament[new_submission_id] = {} new_tournament[new_submission_id][new_submission_id] = { task: False for task in self.TASKS_METADATA.keys() } rest_of_competitors = list(self.submission_ids - {new_submission_id}) # without self num_of_competitors = len(rest_of_competitors) result_url = {} result_inverse_url = {} while rest_of_competitors: next_competitors = [] while rest_of_competitors: if len(next_competitors) < 5: # 5*2==10 tasks next_competitors.append(rest_of_competitors.pop()) else: break for competitor_id in next_competitors: result_url[competitor_id] = check_significance_send_task(new_model_file, self.submission_id_to_file[competitor_id]) result_inverse_url[competitor_id] = check_significance_send_task(self.submission_id_to_file[competitor_id], new_model_file) while next_competitors: competitor_id = next_competitors.pop(0) result = check_significance_wait_for_result(result_url.pop(competitor_id)) result_inverse = check_significance_wait_for_result(result_inverse_url.pop(competitor_id)) if rest_of_competitors: new_competitor_id = rest_of_competitors.pop() next_competitors.append(new_competitor_id) result_url[new_competitor_id] = check_significance_send_task(new_model_file, self.submission_id_to_file[new_competitor_id]) result_inverse_url[new_competitor_id] = check_significance_send_task(self.submission_id_to_file[new_competitor_id], new_model_file) new_tournament[new_submission_id][competitor_id] = { task: data["significant"] for task, data in result.items() } new_tournament[competitor_id][new_submission_id] = { task: data["significant"] for task, data in result_inverse.items() } num_of_competitors_done = num_of_competitors - len(next_competitors) - len(rest_of_competitors) gr.Info(f"Tournament: {num_of_competitors_done}/{num_of_competitors} = {(num_of_competitors_done) * 100 // num_of_competitors}% done") return new_tournament @staticmethod def abbreviate(s, max_length, dots_place="center"): if len(s) <= max_length: return s else: if max_length <= 1: return "…" elif dots_place == "begin": return "…" + s[-max_length + 1:].lstrip() elif dots_place == "center" and max_length >= 3: max_length_begin = max_length // 2 max_length_end = max_length - max_length_begin - 1 return s[:max_length_begin].rstrip() + "…" + s[-max_length_end:].lstrip() else: # dots_place == "end" return s[:max_length - 1].rstrip() + "…" @staticmethod def create_submission_id(metadata): # Délka ID musí být omezena, protože se používá v názvu souboru submission_id = "_".join([metadata[key][:7] for key in ( "team_name", "model_name", "model_predictions_sha256", "model_results_sha256", )]) submission_id = submission_id.replace("/", "_").replace("\n", "_").strip() return submission_id @staticmethod def get_sha256_hexdigest(obj): data = json.dumps( obj, separators=(',', ':'), sort_keys=True, ensure_ascii=True, ).encode() result = hashlib.sha256(data).hexdigest() return result PreSubmit = namedtuple('PreSubmit', 'tournament_results, submission_id, file') def prepare_model_for_submission(self, file, metadata) -> PreSubmit: with open(file, "r") as f: data = json.load(f) data["metadata"] = metadata metadata["model_predictions_sha256"] = self.get_sha256_hexdigest(data["predictions"]) metadata["model_results_sha256"] = self.get_sha256_hexdigest(data["results"]) submission_id = self.create_submission_id(metadata) metadata["submission_id"] = submission_id metadata["submission_timestamp"] = time.time() # timestamp with open(file, "w") as f: json.dump(data, f, separators=(',', ':')) # compact JSON while True: with self.pre_submit_lock: if self.pre_submit == None: gr.Info('Running tournament...', duration=15) self.update_leaderboard() tournament_results = self.start_tournament(submission_id, file) self.pre_submit = self.PreSubmit(tournament_results, submission_id, file) break gr.Info("Waiting in queue...", duration=5) time.sleep(10) return self.pre_submit def save_pre_submit(self): with self.pre_submit_lock: if self.pre_submit: tournament_results, submission_id, file = self.pre_submit api.upload_file( path_or_fileobj=file, path_in_repo=f"data/{submission_id}.json", repo_id=self.server_address, repo_type=self.repo_type, token=HF_TOKEN, ) # Temporary save tournament results tournament_results_path = os.path.join(self.local_leaderboard, "tournament.json") with open(tournament_results_path, "w") as f: json.dump(tournament_results, f, sort_keys=True, indent=2) # readable JSON api.upload_file( path_or_fileobj=tournament_results_path, path_in_repo="tournament.json", repo_id=self.server_address, repo_type=self.repo_type, token=HF_TOKEN, ) self.pre_submit = None def get_model_detail(self, submission_id): with self.var_lock.ro: path = self.submission_id_to_file.get(submission_id) if path is None: raise gr.Error(f"Submission [{submission_id}] not found") data = json.load(open(path)) return data["metadata"]