|
|
|
|
|
import os |
|
import json |
|
import argparse |
|
|
|
import socket |
|
import random |
|
import threading |
|
from datetime import datetime |
|
|
|
from src.backend.run_eval_suite import run_evaluation |
|
from src.backend.manage_requests import check_completed_evals, get_eval_requests, set_eval_request |
|
from src.backend.sort_queue import sort_models_by_priority |
|
from src.backend.envs import Tasks, EVAL_REQUESTS_PATH_BACKEND, EVAL_RESULTS_PATH_BACKEND, DEVICE, Task |
|
from src.backend.manage_requests import EvalRequest |
|
from src.leaderboard.read_evals import EvalResult |
|
|
|
from src.envs import QUEUE_REPO, RESULTS_REPO, API, DEBUG_QUEUE_REPO, DEBUG_RESULTS_REPO |
|
from src.utils import my_snapshot_download, analyze_gpu_stats, parse_nvidia_smi, monitor_gpus, get_gpu_details |
|
|
|
from src.leaderboard.read_evals import get_raw_eval_results |
|
|
|
from typing import Optional |
|
import GPUtil |
|
import time |
|
|
|
import pprint |
|
import logging |
|
|
|
from lm_eval.filters.extraction import RegexFilter |
|
|
|
|
|
|
|
logging.basicConfig( |
|
format="%(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s", |
|
datefmt="%Y-%m-%d:%H:%M:%S", |
|
level=logging.WARNING, |
|
) |
|
|
|
|
|
eval_logger = logging.getLogger("lm-eval") |
|
|
|
|
|
eval_logger.setLevel(logging.WARNING) |
|
|
|
def tuple_input_decorator(func): |
|
def wrapper(self, resps, docs): |
|
stripped_resps = [[resp_data[0] for resp_data in group] for group in resps] |
|
|
|
filtered_resps = func(self, stripped_resps, docs) |
|
|
|
combined_resps = [] |
|
for original_group, new_group in zip(resps, filtered_resps): |
|
combined_group = [(new_resp,) + rest_of_data[1:] for new_resp, rest_of_data in zip(new_group, original_group)] |
|
combined_resps.append(combined_group) |
|
|
|
return combined_resps |
|
return wrapper |
|
|
|
|
|
def my_set_eval_request(api, eval_request, set_to_status, hf_repo, local_dir): |
|
for i in range(10): |
|
try: |
|
set_eval_request( |
|
api=api, eval_request=eval_request, set_to_status=set_to_status, hf_repo=hf_repo, local_dir=local_dir |
|
) |
|
return |
|
except Exception as e: |
|
print(f"Error setting eval request to {set_to_status}: {e}. Retrying in 60 seconds") |
|
time.sleep(60) |
|
return |
|
|
|
|
|
logging.getLogger("openai").setLevel(logging.WARNING) |
|
|
|
logging.basicConfig(level=logging.ERROR) |
|
pp = pprint.PrettyPrinter(width=80) |
|
|
|
PENDING_STATUS = "PENDING" |
|
RUNNING_STATUS = "RUNNING" |
|
FINISHED_STATUS = "FINISHED" |
|
FAILED_STATUS = "FAILED" |
|
|
|
TASKS_HARNESS = [task.value for task in Tasks] |
|
|
|
|
|
my_snapshot_download( |
|
repo_id=RESULTS_REPO, revision="main", local_dir=EVAL_RESULTS_PATH_BACKEND, repo_type="dataset", max_workers=60 |
|
) |
|
my_snapshot_download( |
|
repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60 |
|
) |
|
|
|
|
|
def sanity_checks(): |
|
print(f"Device: {DEVICE}") |
|
|
|
|
|
|
|
my_snapshot_download( |
|
repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60 |
|
) |
|
check_completed_evals( |
|
api=API, |
|
checked_status=RUNNING_STATUS, |
|
completed_status=FINISHED_STATUS, |
|
failed_status=FAILED_STATUS, |
|
hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
hf_repo_results=RESULTS_REPO, |
|
local_dir_results=EVAL_RESULTS_PATH_BACKEND, |
|
) |
|
return |
|
|
|
|
|
def request_to_result_name(request: EvalRequest) -> str: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
org_and_model = request.model.split("/", 1) |
|
if len(org_and_model) == 1: |
|
model = org_and_model[0] |
|
res = f"{model}_{request.precision}" |
|
else: |
|
org = org_and_model[0] |
|
model = org_and_model[1] |
|
res = f"{org}_{model}_{request.precision}" |
|
return res |
|
|
|
|
|
def process_evaluation(task: Task, eval_request: EvalRequest, limit: Optional[int] = None) -> dict: |
|
batch_size = 1 |
|
batch_size = eval_request.batch_size |
|
|
|
init_gpu_info = analyze_gpu_stats(parse_nvidia_smi()) |
|
|
|
|
|
gpu_stats_list = [] |
|
stop_event = threading.Event() |
|
monitor_thread = threading.Thread(target=monitor_gpus, args=(stop_event, 5, gpu_stats_list)) |
|
monitor_thread.start() |
|
|
|
original_apply = RegexFilter.apply |
|
if task.benchmark in ["gsm8k", "gsm8k_cot", "gsm8k_cot_self_consistency", "gsm8k_custom"]: |
|
RegexFilter.apply = tuple_input_decorator(RegexFilter.apply) |
|
else: |
|
RegexFilter.apply = original_apply |
|
|
|
try: |
|
results = run_evaluation( |
|
eval_request=eval_request, |
|
task_names=[task.benchmark], |
|
num_fewshot=task.num_fewshot, |
|
batch_size=batch_size, |
|
device=DEVICE, |
|
use_cache=None, |
|
limit=limit, |
|
) |
|
except RuntimeError as e: |
|
if "No executable batch size found" in str(e): |
|
batch_size = 1 |
|
results = run_evaluation( |
|
eval_request=eval_request, |
|
task_names=[task.benchmark], |
|
num_fewshot=task.num_fewshot, |
|
batch_size=batch_size, |
|
device=DEVICE, |
|
use_cache=None, |
|
limit=limit, |
|
) |
|
else: |
|
raise |
|
|
|
|
|
stop_event.set() |
|
monitor_thread.join() |
|
gpu_info = analyze_gpu_stats(gpu_stats_list) |
|
for task_name in results['results'].keys(): |
|
for key, value in gpu_info.items(): |
|
if "GPU" not in key: |
|
results['results'][task_name][f"{key},none"] = int(value) |
|
else: |
|
results['results'][task_name][f"{key},none"] = value |
|
|
|
results['results'][task_name]['batch_size,none'] = batch_size |
|
results['results'][task_name]['precision,none'] = eval_request.precision |
|
print(f"gpu_stats_list: {gpu_stats_list}") |
|
print("GPU Usage:", gpu_info) |
|
|
|
dumped = json.dumps(results, indent=2, default=lambda o: "<not serializable>") |
|
|
|
|
|
output_path = os.path.join( |
|
EVAL_RESULTS_PATH_BACKEND, *eval_request.model.split("/"), f"results_{datetime.now()}.json" |
|
) |
|
os.makedirs(os.path.dirname(output_path), exist_ok=True) |
|
with open(output_path, "w") as f: |
|
f.write(dumped) |
|
|
|
my_snapshot_download( |
|
repo_id=RESULTS_REPO, revision="main", local_dir=EVAL_RESULTS_PATH_BACKEND, repo_type="dataset", max_workers=60 |
|
) |
|
API.upload_file( |
|
path_or_fileobj=output_path, |
|
path_in_repo=f"{eval_request.model}/results_{datetime.now()}.json", |
|
repo_id=RESULTS_REPO, |
|
repo_type="dataset", |
|
) |
|
|
|
RegexFilter.apply = original_apply |
|
return results |
|
|
|
|
|
def process_finished_requests(thr: int, hard_task_lst: Optional[list[str]] = None) -> bool: |
|
sanity_checks() |
|
|
|
current_finished_status = [FINISHED_STATUS, FAILED_STATUS] |
|
|
|
|
|
eval_requests: list[EvalRequest] = get_eval_requests( |
|
job_status=current_finished_status, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND |
|
) |
|
|
|
eval_requests: list[EvalRequest] = sort_models_by_priority(api=API, models=eval_requests) |
|
|
|
random.shuffle(eval_requests) |
|
|
|
eval_results: list[EvalResult] = get_raw_eval_results(EVAL_RESULTS_PATH_BACKEND, EVAL_REQUESTS_PATH_BACKEND) |
|
|
|
result_name_to_request = {request_to_result_name(r): r for r in eval_requests} |
|
result_name_to_result = {r.eval_name: r for r in eval_results} |
|
|
|
for eval_request in eval_requests: |
|
if eval_request.likes >= thr: |
|
result_name: str = request_to_result_name(eval_request) |
|
|
|
|
|
eval_result: Optional[EvalResult] = ( |
|
result_name_to_result[result_name] if result_name in result_name_to_result else None |
|
) |
|
|
|
|
|
|
|
task_lst = TASKS_HARNESS.copy() |
|
random.shuffle(task_lst) |
|
|
|
|
|
for task in task_lst: |
|
task_name = task.benchmark |
|
|
|
do_run_task = False |
|
if hard_task_lst is None or any(ss in task_name for ss in hard_task_lst): |
|
do_run_task = True |
|
|
|
if (eval_result is None or task_name not in eval_result.results) and do_run_task: |
|
eval_request: EvalRequest = result_name_to_request[result_name] |
|
|
|
my_snapshot_download( |
|
repo_id=QUEUE_REPO, |
|
revision="main", |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
repo_type="dataset", |
|
max_workers=60, |
|
) |
|
my_set_eval_request( |
|
api=API, |
|
eval_request=eval_request, |
|
set_to_status=RUNNING_STATUS, |
|
hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
) |
|
|
|
results = process_evaluation(task, eval_request) |
|
|
|
my_snapshot_download( |
|
repo_id=QUEUE_REPO, |
|
revision="main", |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
repo_type="dataset", |
|
max_workers=60, |
|
) |
|
my_set_eval_request( |
|
api=API, |
|
eval_request=eval_request, |
|
set_to_status=FINISHED_STATUS, |
|
hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
) |
|
|
|
return True |
|
|
|
return False |
|
|
|
|
|
def maybe_refresh_results(thr: int, hard_task_lst: Optional[list[str]] = None) -> bool: |
|
sanity_checks() |
|
|
|
current_finished_status = [PENDING_STATUS, FINISHED_STATUS, FAILED_STATUS] |
|
|
|
|
|
eval_requests: list[EvalRequest] = get_eval_requests( |
|
job_status=current_finished_status, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND |
|
) |
|
|
|
eval_requests: list[EvalRequest] = sort_models_by_priority(api=API, models=eval_requests) |
|
|
|
random.shuffle(eval_requests) |
|
|
|
eval_results: list[EvalResult] = get_raw_eval_results(EVAL_RESULTS_PATH_BACKEND, EVAL_REQUESTS_PATH_BACKEND) |
|
|
|
result_name_to_request = {request_to_result_name(r): r for r in eval_requests} |
|
result_name_to_result = {r.eval_name: r for r in eval_results} |
|
|
|
for eval_request in eval_requests: |
|
if eval_request.likes >= thr: |
|
result_name: str = request_to_result_name(eval_request) |
|
|
|
|
|
eval_result: Optional[EvalResult] = ( |
|
result_name_to_result[result_name] if result_name in result_name_to_result else None |
|
) |
|
|
|
task_lst = TASKS_HARNESS.copy() |
|
random.shuffle(task_lst) |
|
|
|
|
|
for task in task_lst: |
|
task_name = task.benchmark |
|
|
|
do_run_task = False |
|
if hard_task_lst is None or any(ss in task_name for ss in hard_task_lst): |
|
do_run_task = True |
|
|
|
task_lst = ["nq", "trivia", "tqa", "self"] |
|
if ( |
|
eval_result is None |
|
or do_run_task |
|
or task_name not in eval_result.results |
|
or any(ss in task_name for ss in task_lst) |
|
): |
|
eval_request: EvalRequest = result_name_to_request[result_name] |
|
|
|
my_snapshot_download( |
|
repo_id=QUEUE_REPO, |
|
revision="main", |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
repo_type="dataset", |
|
max_workers=60, |
|
) |
|
my_set_eval_request( |
|
api=API, |
|
eval_request=eval_request, |
|
set_to_status=RUNNING_STATUS, |
|
hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
) |
|
|
|
results = process_evaluation(task, eval_request) |
|
|
|
my_snapshot_download( |
|
repo_id=QUEUE_REPO, |
|
revision="main", |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
repo_type="dataset", |
|
max_workers=60, |
|
) |
|
my_set_eval_request( |
|
api=API, |
|
eval_request=eval_request, |
|
set_to_status=FINISHED_STATUS, |
|
hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
) |
|
|
|
return True |
|
|
|
return False |
|
|
|
def process_pending_requests() -> bool: |
|
sanity_checks() |
|
print("Processing pending requests") |
|
current_pending_status = [PENDING_STATUS] |
|
|
|
|
|
eval_requests = get_eval_requests( |
|
job_status=current_pending_status, hf_repo=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH_BACKEND |
|
) |
|
|
|
eval_requests = sort_models_by_priority(api=API, models=eval_requests) |
|
|
|
random.shuffle(eval_requests) |
|
|
|
print(f"Found {len(eval_requests)} {','.join(current_pending_status)} eval requests") |
|
|
|
if len(eval_requests) == 0: |
|
return False |
|
|
|
eval_request = eval_requests[0] |
|
pp.pprint(eval_request) |
|
|
|
gpu_type = eval_request.gpu_type |
|
curr_gpu_type = get_gpu_details() |
|
if gpu_type != curr_gpu_type: |
|
print(f"GPU type mismatch: {gpu_type} vs {curr_gpu_type}") |
|
return False |
|
|
|
my_snapshot_download( |
|
repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60 |
|
) |
|
my_set_eval_request( |
|
api=API, |
|
eval_request=eval_request, |
|
set_to_status=RUNNING_STATUS, |
|
hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
) |
|
|
|
task_lst = TASKS_HARNESS.copy() |
|
random.shuffle(task_lst) |
|
|
|
for task in task_lst: |
|
results = process_evaluation(task, eval_request) |
|
|
|
my_snapshot_download( |
|
repo_id=QUEUE_REPO, revision="main", local_dir=EVAL_REQUESTS_PATH_BACKEND, repo_type="dataset", max_workers=60 |
|
) |
|
my_set_eval_request( |
|
api=API, |
|
eval_request=eval_request, |
|
set_to_status=FINISHED_STATUS, |
|
hf_repo=QUEUE_REPO, |
|
local_dir=EVAL_REQUESTS_PATH_BACKEND, |
|
) |
|
|
|
return True |
|
|
|
|
|
def get_args(): |
|
parser = argparse.ArgumentParser(description="Run the backend") |
|
parser.add_argument("--debug", action="store_true", help="Run in debug mode") |
|
|
|
parser.add_argument("--task", type=str, default="selfcheckgpt,mmlu, gsm8k", help="Task to debug") |
|
parser.add_argument("--model", type=str, default="mistralai/Mixtral-8x7B-Instruct-v0.1,mistralai/Mixtral-8x7B-v0.1", help="Model to debug") |
|
parser.add_argument("--precision", type=str, default="float32,float16,8bit,4bit", help="Precision to debug") |
|
parser.add_argument("--inference-framework", type=str, default="hf-chat", help="Inference framework to debug") |
|
parser.add_argument("--limit", type=int, default=None, help="Limit for the number of samples") |
|
parser.add_argument("--gpu-type", type=str, default="NVIDIA-A100-PCIe-80GB", |
|
help="GPU type. NVIDIA-A100-PCIe-80GB; NVIDIA-RTX-A5000-24GB; NVIDIA-H100-PCIe-80GB") |
|
parser.add_argument("--debug_repo", action="store_true", help="Use debug repo") |
|
return parser.parse_args() |
|
|
|
|
|
if __name__ == "__main__": |
|
args = get_args() |
|
local_debug = args.debug |
|
|
|
if local_debug and not args.debug_repo: |
|
|
|
|
|
debug_model_names = args.model.split(",") |
|
debug_task_name = args.task.split(",") |
|
precisions = args.precision.split(",") |
|
print(f"debug_model_names: {debug_model_names}, debug_task_name: {debug_task_name}, precisions: {precisions}") |
|
task_lst = TASKS_HARNESS.copy() |
|
RESULTS_REPO = DEBUG_RESULTS_REPO |
|
for precision in precisions: |
|
for debug_model_name in debug_model_names: |
|
for task in task_lst: |
|
task_name = task.benchmark |
|
if task_name not in debug_task_name: |
|
continue |
|
|
|
eval_request = EvalRequest( |
|
model=debug_model_name, |
|
private=False, |
|
status="", |
|
json_filepath="", |
|
precision=precision, |
|
inference_framework=args.inference_framework, |
|
gpu_type=args.gpu_type |
|
) |
|
curr_gpu_type = get_gpu_details() |
|
if eval_request.gpu_type != curr_gpu_type: |
|
print(f"GPU type mismatch: {eval_request.gpu_type} vs {curr_gpu_type}") |
|
raise Exception("GPU type mismatch") |
|
results = process_evaluation(task, eval_request, limit=args.limit) |
|
|
|
|
|
elif local_debug and args.debug_repo: |
|
QUEUE_REPO = DEBUG_QUEUE_REPO |
|
RESULTS_REPO = DEBUG_RESULTS_REPO |
|
while True: |
|
res = False |
|
|
|
res = process_pending_requests() |
|
print(f"waiting for 60 seconds") |
|
time.sleep(60) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif not local_debug and not args.debug_repo: |
|
while True: |
|
res = False |
|
|
|
res = process_pending_requests() |
|
print(f"waiting for 60 seconds") |
|
time.sleep(60) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
raise Exception("Cannot use debug_repo without local debug flag") |