| | """ |
| | task_runner.py — Thin CLI entry point for subprocess GPU workers. |
| | |
| | Usage: |
| | CUDA_VISIBLE_DEVICES=5 python eval/tasks/task_runner.py \ |
| | --task calibration --gpu-id 5 --output /path/to/result.json |
| | """ |
| |
|
| | import argparse |
| | import json |
| | import os |
| | import sys |
| | import traceback |
| | from pathlib import Path |
| |
|
| | |
| | |
| | |
| | PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent |
| | if str(PROJECT_ROOT) not in sys.path: |
| | sys.path.insert(0, str(PROJECT_ROOT)) |
| |
|
| | |
| | |
| | |
| |
|
| | def _set_numa_affinity(gpu_id: int) -> None: |
| | """Pin the process to the NUMA node that owns the given GPU. |
| | |
| | GPU 0-3 → cores 0-35 (NUMA node 0) |
| | GPU 4-7 → cores 36-71 (NUMA node 1) |
| | """ |
| | try: |
| | import os |
| | if gpu_id <= 3: |
| | cores = list(range(0, 36)) |
| | else: |
| | cores = list(range(36, 72)) |
| |
|
| | |
| | os.sched_setaffinity(0, cores) |
| | print( |
| | f"[TASK_RUNNER gpu_id={gpu_id}] NUMA affinity set: cores {cores[0]}-{cores[-1]}", |
| | flush=True, |
| | ) |
| | except Exception as exc: |
| | |
| | print( |
| | f"[TASK_RUNNER gpu_id={gpu_id}] WARNING: could not set NUMA affinity: {exc}", |
| | flush=True, |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | VALID_TASKS = { |
| | "ppl_single", |
| | "ppl_multi", |
| | "calibration", |
| | "token_nll", |
| | "calib_nll", |
| | "generation", |
| | "repetition_grid", |
| | "lm_eval", |
| | } |
| |
|
| |
|
| | def _run_task(args: argparse.Namespace) -> dict: |
| | task = args.task |
| | device = "cuda:0" |
| |
|
| | if task == "ppl_single": |
| | if not args.val_file: |
| | raise ValueError("--val-file is required for ppl_single task") |
| | from eval.tasks.ppl_task import eval_ppl_single |
| | result = eval_ppl_single(args.val_file, device) |
| |
|
| | elif task == "ppl_multi": |
| | if not args.val_files: |
| | raise ValueError("--val-files is required for ppl_multi task") |
| | val_files_list = [f.strip() for f in args.val_files.split(",") if f.strip()] |
| | from eval.tasks.ppl_task import eval_ppl_multi |
| | result = eval_ppl_multi(val_files_list, device) |
| |
|
| | elif task == "calibration": |
| | from eval.tasks.calibration_task import eval_calibration |
| | result = eval_calibration(device) |
| |
|
| | elif task == "token_nll": |
| | from eval.tasks.token_nll_task import eval_token_nll |
| | result = eval_token_nll(device) |
| |
|
| | elif task == "calib_nll": |
| | from eval.tasks.calibration_task import eval_calibration |
| | from eval.tasks.token_nll_task import eval_token_nll |
| | calib_result = eval_calibration(device) |
| | nll_result = eval_token_nll(device) |
| | result = {"calibration": calib_result, "token_nll": nll_result} |
| |
|
| | elif task == "generation": |
| | from eval.tasks.generation_task import eval_generation |
| | result = eval_generation(device) |
| |
|
| | elif task == "repetition_grid": |
| | from eval.tasks.generation_task import eval_repetition_grid |
| | result = eval_repetition_grid(device) |
| |
|
| | elif task == "lm_eval": |
| | if not args.hf_model_path: |
| | raise ValueError("--hf-model-path is required for lm_eval task") |
| | if not args.lm_eval_tasks: |
| | raise ValueError("--lm-eval-tasks is required for lm_eval task") |
| | tasks_list = [t.strip() for t in args.lm_eval_tasks.split(",") if t.strip()] |
| |
|
| | if args.fewshot_list: |
| | |
| | fewshot_values = [int(x.strip()) for x in args.fewshot_list.split(",")] |
| | from eval.tasks.lm_eval_task import run_lm_eval_tasks_pipeline |
| | result = run_lm_eval_tasks_pipeline( |
| | args.hf_model_path, |
| | tasks_list, |
| | device, |
| | fewshot_values, |
| | output_dir=str(Path(args.output).parent), |
| | output_prefix=Path(args.output).stem, |
| | ) |
| | else: |
| | from eval.tasks.lm_eval_task import run_lm_eval_tasks |
| | result = run_lm_eval_tasks( |
| | args.hf_model_path, |
| | tasks_list, |
| | device, |
| | num_fewshot=args.num_fewshot, |
| | ) |
| |
|
| | else: |
| | raise ValueError(f"Unknown task: {task!r}. Valid tasks: {sorted(VALID_TASKS)}") |
| |
|
| | return result |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _parse_args() -> argparse.Namespace: |
| | parser = argparse.ArgumentParser( |
| | description="Thin CLI entry point for subprocess GPU eval workers." |
| | ) |
| | parser.add_argument( |
| | "--task", |
| | required=True, |
| | choices=sorted(VALID_TASKS), |
| | help="Eval task to run.", |
| | ) |
| | parser.add_argument( |
| | "--gpu-id", |
| | type=int, |
| | required=True, |
| | help="Original GPU ID (used for NUMA affinity only).", |
| | ) |
| | parser.add_argument( |
| | "--output", |
| | required=True, |
| | help="Path to write JSON result file.", |
| | ) |
| | |
| | parser.add_argument( |
| | "--val-file", |
| | default=None, |
| | help="Single validation filename (for ppl_single).", |
| | ) |
| | |
| | parser.add_argument( |
| | "--val-files", |
| | default=None, |
| | help="Comma-separated validation filenames (for ppl_multi).", |
| | ) |
| | |
| | parser.add_argument( |
| | "--hf-model-path", |
| | default=None, |
| | help="HuggingFace model directory (for lm_eval).", |
| | ) |
| | parser.add_argument( |
| | "--lm-eval-tasks", |
| | default=None, |
| | help="Comma-separated lm-eval task names (for lm_eval).", |
| | ) |
| | parser.add_argument( |
| | "--num-fewshot", |
| | type=int, |
| | default=0, |
| | help="Number of few-shot examples (for lm_eval). Default: 0.", |
| | ) |
| | parser.add_argument( |
| | "--fewshot-list", |
| | default=None, |
| | help="Comma-separated fewshot values to run sequentially, e.g. '0,5'. " |
| | "Model is loaded once and reused. Overrides --num-fewshot.", |
| | ) |
| | return parser.parse_args() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def main() -> None: |
| | args = _parse_args() |
| | gpu_id = args.gpu_id |
| | task_name = args.task |
| | output_path = args.output |
| |
|
| | print(f"[TASK_RUNNER gpu_id={gpu_id}] Starting task={task_name}", flush=True) |
| |
|
| | |
| | _set_numa_affinity(gpu_id) |
| |
|
| | exit_code = 0 |
| | try: |
| | result = _run_task(args) |
| | payload = result |
| | except Exception as exc: |
| | tb_str = traceback.format_exc() |
| | print( |
| | f"[TASK_RUNNER gpu_id={gpu_id}] ERROR in task={task_name}:\n{tb_str}", |
| | file=sys.stderr, |
| | flush=True, |
| | ) |
| | payload = {"error": str(exc), "traceback": tb_str} |
| | exit_code = 1 |
| |
|
| | |
| | output_path_obj = Path(output_path) |
| | output_path_obj.parent.mkdir(parents=True, exist_ok=True) |
| | with open(output_path_obj, "w", encoding="utf-8") as fh: |
| | json.dump(payload, fh, ensure_ascii=False, indent=2, default=str) |
| |
|
| | print( |
| | f"[TASK_RUNNER gpu_id={gpu_id}] Done. Result saved to {output_path}", |
| | flush=True, |
| | ) |
| | sys.exit(exit_code) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|