Spaces:
Running
Running
| import json | |
| import os | |
| import io | |
| from datetime import datetime, timezone | |
| import hmac | |
| import hashlib | |
| import time | |
| import requests | |
| import pandas as pd | |
| from colorama import Fore | |
| from huggingface_hub import HfApi, snapshot_download | |
| import os | |
| import sys | |
| current_script_path = os.path.abspath(__file__) | |
| src_directory = os.path.join(os.path.dirname(current_script_path), '..', '..') | |
| sys.path.append(src_directory) | |
| # print(sys.path) | |
| from src.display.utils import EVAL_COLS,BENCHMARK_COLS,COLS | |
| from src.envs import API, EVAL_REQUESTS_PATH, DYNAMIC_INFO_REPO, DYNAMIC_INFO_FILE_PATH, DYNAMIC_INFO_PATH, EVAL_RESULTS_PATH, TOKEN, IS_PUBLIC, QUEUE_REPO, REPO_ID, RESULTS_REPO | |
| status_mapping = { | |
| 'P': 'PENDING', | |
| 'R': 'RUNNING', | |
| 'S': 'FINISHED', | |
| 'F': 'FAILED', | |
| 'C': 'CANCELLED' | |
| } | |
| dataset_metric_mapping = { | |
| 'ChartQA': ('accuracy','acc'), | |
| 'CMMMU': ('accuracy','acc'), | |
| 'CMMU': ('accuracy','acc'), | |
| 'MMMU': ('accuracy','acc'), | |
| 'MMMU_Pro_standard': ('accuracy','acc'), | |
| 'MMMU_Pro_vision': ('accuracy','acc'), | |
| 'OCRBench': ('accuracy','acc'), | |
| 'MathVision': ('accuracy','acc'), | |
| 'CII-Bench': ('accuracy','acc'), | |
| 'Blink': ('accuracy','acc'), | |
| } | |
| failed_mapping = {} | |
| # Example usage | |
| # 生产环境 | |
| #base_url = 'https://flageval.baai.ac.cn/api/hf' | |
| #secret = b'M2L84t36MdzwS1Lb' | |
| # 测试环境 | |
| base_url = 'http://120.92.17.239:8080/api/hf' | |
| secret = b'Dn29TMCxzvKBGMS8' | |
| # model_id = 'Qwen/Qwen1.5-0.5B' | |
| MAX_GPU_USAGE = 20 | |
| LC_A800_QUEUE_ID = "877467e6-808b-487e-8a06-af8e96c83fa6" | |
| A800_QUEUE_ID = "f016ff98-6ec8-4b1e-aed2-9a93753119b2" | |
| A100_QUEUE_ID = "7f8cb309-295f-4f56-8159-f43f60f03f9c" | |
| MAX_A800_UASGE = 1 | |
| def get_gpu_number(params=0): | |
| # 参数量除以 30 再向上取整,就算 params为0,最小为1 | |
| # return -(-params // 35) | |
| # return -(-params // 35) | |
| # return -(-params // 35) | |
| if params == 0: | |
| return 0, A100_QUEUE_ID | |
| if params < 9: | |
| return 1, A100_QUEUE_ID | |
| if params < 15: | |
| return 2, A100_QUEUE_ID | |
| elif params < 35: | |
| return 4, A100_QUEUE_ID | |
| elif params < 70: | |
| return 3, LC_A800_QUEUE_ID | |
| elif params < 100: | |
| return 5, LC_A800_QUEUE_ID | |
| elif params < 140: | |
| return 6, LC_A800_QUEUE_ID | |
| else: | |
| return 8, LC_A800_QUEUE_ID | |
| def generate_signature(secret, url, body): | |
| timestamp = str(int(time.time())) | |
| to_sign = f'{timestamp}{url}{body}' | |
| h = hmac.new(secret, to_sign.encode('utf-8'), digestmod=hashlib.sha256) | |
| sign = h.hexdigest() | |
| return sign, timestamp | |
| def submit_evaluation(base_url, secret, model_id, require_gpus=None, priority=None, gpus_queue_id=None, hf_user_id=None): | |
| url = f'{base_url}/mm/batches' | |
| data = {'modelId': model_id} | |
| if require_gpus is not None: | |
| data['requireGpus'] = require_gpus | |
| if priority is not None: | |
| data['priority'] = priority | |
| if gpus_queue_id is not None: | |
| data['gpus_queue_id'] = gpus_queue_id | |
| if hf_user_id is not None: | |
| data['hfUserId'] = hf_user_id | |
| raw_body = json.dumps(data) | |
| sign, timestamp = generate_signature(secret, url, raw_body) | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'X-Flageval-Sign': sign, | |
| 'X-Flageval-Timestamp': timestamp, | |
| } | |
| response = requests.post(url, data=raw_body, headers=headers) | |
| print("submit_evaluation response",response) | |
| response_data = response.json() | |
| evaluation_info = { | |
| 'evaluationId': response_data.get('evaluationId'), | |
| 'eval_id': response_data.get('id') | |
| } | |
| return evaluation_info | |
| def poll_evaluation_progress(base_url, secret, batch_id): | |
| url = f'{base_url}/mm/batches/{int(batch_id)}' | |
| sign, timestamp = generate_signature(secret, url, '') | |
| headers = { | |
| 'X-Flageval-Sign': sign, | |
| 'X-Flageval-Timestamp': timestamp, | |
| } | |
| try: | |
| response = requests.get(url, headers=headers) | |
| response.raise_for_status() # 如果响应状态不是200,将引发HTTPError异常 | |
| response_data = response.json() | |
| evaluation_progress = { | |
| 'evaluationId': response_data.get('evaluationId'), | |
| 'eval_id': response_data.get('batchId'), | |
| 'status': response_data.get('status'), | |
| 'details': response_data.get('details', []) | |
| } | |
| return evaluation_progress | |
| except requests.exceptions.RequestException as e: | |
| print(f"请求错误: {e}") | |
| except ValueError: | |
| print(f"解析JSON时出错:{response}") | |
| except Exception as e: | |
| print(f"未知错误: {e}") | |
| return {'status': '未执行成功'} | |
| def update_gpu_usage(change): | |
| global current_gpu_usage | |
| current_gpu_usage += change | |
| def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: | |
| all_evals = [] | |
| for root, dirs, files in os.walk(save_path): | |
| for file in files: | |
| if file.endswith(".json"): | |
| file_path = os.path.join(root, file) | |
| with open(file_path) as fp: | |
| data = json.load(fp) | |
| # 确保所有列都存在,不存在的列初始化为 None | |
| for col in cols: | |
| if col not in data: | |
| if col == "failed_status": | |
| data[col] = 0 | |
| else: | |
| data[col] = None | |
| all_evals.append(data) | |
| # all_eval order by submited_time | |
| all_evals = sorted(all_evals, key=lambda x: x['submitted_time']) | |
| pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] | |
| pending_list = sorted(pending_list, key=lambda x: x['params']) | |
| pending_list = sorted(pending_list, key=lambda x: x['failed_status']) | |
| running_list = [e for e in all_evals if e["status"] == "RUNNING"] | |
| finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] | |
| df_pending = pd.DataFrame(pending_list) if pending_list else pd.DataFrame(columns=cols) | |
| df_running = pd.DataFrame(running_list) if running_list else pd.DataFrame(columns=cols) | |
| df_finished = pd.DataFrame(finished_list) if finished_list else pd.DataFrame(columns=cols) | |
| return df_finished[cols], df_running[cols], df_pending[cols] | |
| def update_evaluation_queue(model_name, nstatus, eval_id=None, flageval_id=None): | |
| print("update_evaluation_queue", model_name, nstatus, eval_id) | |
| fail_status = -1 | |
| if len(nstatus.split("_")) == 2: | |
| status, fail_status = nstatus.split("_")[0], int(nstatus.split("_")[1]) | |
| else: | |
| status = nstatus | |
| user_name, model_path = model_name.split("/") if "/" in model_name else ("", model_name) | |
| out_dir = f"{EVAL_REQUESTS_PATH}/{user_name}" | |
| json_files = [f for f in os.listdir(out_dir) if f.startswith(model_path + '_') and f.endswith(".json")] | |
| if not json_files: | |
| print(f"No JSON file found for model {model_name}") | |
| return | |
| for json_file in json_files: | |
| json_path = os.path.join(out_dir, json_file) | |
| with open(json_path, "r") as f: | |
| eval_entry = json.load(f) | |
| print("befor update_evaluation_queue", eval_entry['status'], eval_entry['failed_status']) | |
| eval_entry['status'] = status | |
| if fail_status >=0: | |
| eval_entry['failed_status'] = fail_status | |
| if eval_id is not None: | |
| eval_entry['eval_id'] = eval_id | |
| if flageval_id is not None: | |
| eval_entry['flageval_id'] = flageval_id | |
| print("after update_evaluation_queue status change", eval_entry['status'], eval_entry['failed_status']) | |
| with open(json_path, "w") as f: | |
| # f.write(json.dumps(eval_entry)) | |
| json.dump(eval_entry, f, indent=4) | |
| api.upload_file( | |
| path_or_fileobj=json_path, | |
| path_in_repo=json_path.split(f"{EVAL_REQUESTS_PATH}/")[1], | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Update {model_name} status to {status}", | |
| ) | |
| def save_and_upload_results(model_name, details): | |
| converted_details = { | |
| "config_general": { | |
| "model_name": model_name, | |
| "model_dtype": "float16", | |
| "model_size": 0 | |
| }, | |
| "results": {}, | |
| "versions": {}, | |
| "config_tasks": {}, | |
| "summary_tasks": {}, | |
| "summary_general": {} | |
| } | |
| for detail in details: | |
| dataset = detail['dataset'] | |
| status = detail['status'] | |
| # accuracy = detail['accuracy'] | |
| if status == 'S' and dataset in dataset_metric_mapping.keys(): | |
| # dataset_key = f"harness|{dataset}|5" | |
| acc_key = dataset_metric_mapping[dataset][0] | |
| acc = detail['accuracy'] if acc_key == 'accuracy' else detail['rawDetails'][acc_key] | |
| converted_details['results'][dataset] = { | |
| dataset_metric_mapping[dataset][1]: acc, | |
| "acc_stderr": 0 | |
| } | |
| # 添加详细信息 | |
| for metric, value in detail['rawDetails'].items(): | |
| converted_details['results'][dataset][metric] = value | |
| out_dir = f"{EVAL_RESULTS_PATH}/{model_name}" | |
| os.makedirs(out_dir, exist_ok=True) | |
| result_path = os.path.join(out_dir, f"results_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S.%f')}.json") | |
| with open(result_path, "w") as f: | |
| json.dump(converted_details, f, indent=4) | |
| api.upload_file( | |
| path_or_fileobj=result_path, | |
| path_in_repo=result_path.split(f"{EVAL_RESULTS_PATH}/")[1], | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add results for {model_name}", | |
| ) | |
| from tqdm.auto import tqdm | |
| import io | |
| class SilentTqdm(tqdm): | |
| def __init__(self, *args, **kwargs): | |
| kwargs['bar_format'] = '' | |
| kwargs['leave'] = False | |
| super().__init__(*args, **kwargs, file=io.StringIO()) | |
| def update(self, n=1): | |
| pass | |
| def close(self): | |
| pass | |
| def snapshot_download_with_retry(max_retries, wait_time, *args, **kwargs): | |
| for i in range(max_retries): | |
| try: | |
| return snapshot_download(*args, **kwargs) | |
| except Exception as e: | |
| if i < max_retries - 1: # i is zero indexed | |
| print(f"Error occurred: {e}. Retrying in {wait_time} seconds...") | |
| time.sleep(wait_time) | |
| else: | |
| print("Max retries reached. Raising exception.") | |
| raise | |
| api = HfApi() | |
| print(EVAL_REQUESTS_PATH) | |
| print(DYNAMIC_INFO_PATH) | |
| print(EVAL_RESULTS_PATH) | |
| prev_running_models = '' | |
| while True: | |
| snapshot_download_with_retry(5, 10, repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30) | |
| snapshot_download_with_retry(5, 10, repo_id=DYNAMIC_INFO_REPO, local_dir=DYNAMIC_INFO_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30) | |
| snapshot_download_with_retry(5, 10, repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30) | |
| ( | |
| finished_eval_queue_df, | |
| running_eval_queue_df, | |
| pending_eval_queue_df, | |
| ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, ['model','status','params','eval_id', 'failed_status']) | |
| ## pending list test | |
| pending_list = [row for _,row in pending_eval_queue_df.iterrows()] | |
| for pend in pending_list: | |
| print("pending", pend) | |
| # 根据正在运行的评测队列更新当前 GPU 使用情况 | |
| current_gpu_usage = 0 | |
| current_A800gpu_usage = 0 | |
| for _, row in running_eval_queue_df.iterrows(): | |
| print(get_gpu_number(row['params']), row['params']) | |
| gpus_num, gpus_queue_id = get_gpu_number(row['params']) | |
| current_gpu_usage += gpus_num | |
| if gpus_queue_id == LC_A800_QUEUE_ID: | |
| current_A800gpu_usage += 1 | |
| # print(f'Current GPU usage: {current_gpu_usage}/{MAX_GPU_USAGE}') | |
| running_models = ", ".join([row["model"] for _, row in running_eval_queue_df.iterrows()]) | |
| if running_models != prev_running_models: | |
| print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | GPU usage: {current_gpu_usage}/{MAX_GPU_USAGE} | Running models: {running_models}') | |
| prev_running_models = running_models | |
| print("current A800 GPU usage", current_A800gpu_usage) | |
| # 只查询 pending_eval_queue_df 中的前5个待处理的评测 | |
| if not pending_eval_queue_df.empty: | |
| for i,row in pending_eval_queue_df.iterrows(): | |
| #if i >= 3 : break | |
| required_gpus, gpus_queue_id = get_gpu_number(row['params']) | |
| if gpus_queue_id == LC_A800_QUEUE_ID: | |
| if current_A800gpu_usage >= MAX_A800_UASGE: | |
| print(current_A800gpu_usage >= MAX_A800_UASGE, row['model']) | |
| continue | |
| if "princeton-nlp/Llama-3-8B-ProLong-512k" in row['model']: | |
| required_gpus += 1 | |
| if current_gpu_usage + required_gpus <= MAX_GPU_USAGE: | |
| #确认是否有重复提交 | |
| if row['model'] in [row["model"] for _, row in running_eval_queue_df.iterrows()]: | |
| priniit(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Evaluation {row["model"]} is already running') | |
| update_evaluation_queue(row['model'], 'CANCELLED', evaluation_info['eval_id'], evaluation_info['evaluationId']) | |
| continue | |
| # 提交评测 | |
| try: | |
| evaluation_info = submit_evaluation(base_url, secret, row['model'], require_gpus=required_gpus,priority='high',gpus_queue_id=gpus_queue_id) | |
| update_evaluation_queue(row['model'], 'RUNNING', evaluation_info['eval_id'], evaluation_info['evaluationId']) | |
| update_gpu_usage(required_gpus) | |
| print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Submitted evaluation {row["model"]} with {required_gpus} GPUs, submit info: {evaluation_info}') | |
| except Exception as e: | |
| print(e) | |
| continue | |
| # 查询正在运行的评测状态 | |
| for _, row in running_eval_queue_df.iterrows(): | |
| progress = poll_evaluation_progress(base_url, secret, row['eval_id']) | |
| if progress['status'] in ['S', 'F', 'C'] or progress['status'] == 'DI': | |
| new_status = status_mapping.get(progress['status'], 'FINISHED') | |
| update_evaluation_queue(row['model'], new_status) | |
| gpus_num, gpus_queue_id = get_gpu_number(row['params']) | |
| update_gpu_usage(-gpus_num) | |
| if gpus_queue_id == LC_A800_QUEUE_ID: | |
| current_A800gpu_usage -= 1 | |
| print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Evaluation {row["model"]} finished with status {progress["status"]}') | |
| if new_status == 'FAILED': | |
| print("failed_mapping0", failed_mapping) | |
| if row['model'] in failed_mapping: | |
| failed_mapping[row['model']] += 1 | |
| else: | |
| failed_mapping[row['model']] = 1 | |
| print("failed_mapping add", failed_mapping, row['failed_status']) | |
| if failed_mapping[row['model']] == 5: | |
| del failed_mapping[row['model']] | |
| update_evaluation_queue(row['model'], 'PENDING_'+str(int(row['failed_status']+1))) | |
| else: | |
| update_evaluation_queue(row['model'], 'PENDING') | |
| print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |--------------- RePending {row["model"]} ------------ ') | |
| elif new_status == 'FINISHED': | |
| print(progress) | |
| save_and_upload_results(row['model'], progress['details']) | |
| time.sleep(300) # 调整队列检查间隔 | |