NeMo_Canary / scripts /llm /evaluation.py
Respair's picture
Upload folder using huggingface_hub
b386992 verified
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# NOTE: This script is only an example of using NeMo with NeMo-Run's APIs and is subject to change without notice.
# This script is used for evaluation on local and slurm executors using NeMo-Run.
# It uses deploy method from nemo/llm/collections/api.py to deploy nemo2.0 ckpt on PyTriton server and uses evaluate
# method from nemo/llm/collections/api.py to run evaluation on it.
# (https://github.com/NVIDIA/NeMo-Run) to configure and execute the runs.
import argparse
from typing import Optional
import nemo_run as run
from nemo.collections.llm import deploy, evaluate
from nemo.collections.llm.evaluation.api import ApiEndpoint, ConfigParams, EvaluationConfig, EvaluationTarget
ENDPOINT_TYPES = {"chat": "chat/completions/", "completions": "completions/"}
COMPLETIONS_TASKS = (
"gsm8k",
"mgsm",
"mmlu",
"mmlu_pro",
"mmlu_redux",
)
CHAT_TASKS = (
"gpqa_diamond_cot",
"gsm8k_cot_instruct",
"ifeval",
"mgsm_cot",
"mmlu_instruct",
"mmlu_pro_instruct",
"mmlu_redux_instruct",
"wikilingua",
)
EVAL_TASKS = COMPLETIONS_TASKS + CHAT_TASKS
def get_parser():
parser = argparse.ArgumentParser(description="NeMo2.0 Evaluation")
parser.add_argument(
"--nemo_checkpoint",
type=str,
required=True,
help="NeMo 2.0 checkpoint to be evaluated",
)
parser.add_argument(
"--triton_http_address", type=str, default="0.0.0.0", help="IP address at which PyTriton server is created"
)
parser.add_argument("--fastapi_port", type=int, default=8080, help="Port at which FastAPI server is created")
parser.add_argument(
"--endpoint_type",
type=str,
default="completions",
help="Whether to use completions or chat endpoint",
choices=list(ENDPOINT_TYPES),
)
parser.add_argument(
"--max_input_len",
type=int,
default=4096,
help="Max input length of the model",
)
parser.add_argument(
"--tensor_parallelism_size",
type=int,
default=1,
help="Tensor parallelism size to deploy the model",
)
parser.add_argument(
"--pipeline_parallelism_size",
type=int,
default=1,
help="Pipeline parallelism size to deploy the model",
)
parser.add_argument(
"--batch_size",
type=int,
default=2,
help="Batch size for deployment and evaluation",
)
parser.add_argument(
"--eval_task",
type=str,
default="mmlu",
help="Evaluation benchmark to run.",
choices=EVAL_TASKS,
)
parser.add_argument(
"--limit", type=float, default=None, help="Limit evaluation to `limit` samples. Default: use all samples."
)
parser.add_argument(
"--parallel_requests",
type=int,
default=1,
help="Number of parallel requests to send to server. Default: use default for the task.",
)
parser.add_argument(
"--request_timeout",
type=int,
default=1000,
help="Time in seconds for the eval client. Default: 1000s",
)
parser.add_argument(
"--tag",
type=str,
help="Optional tag for your experiment title which will be appended after the model/exp name.",
required=False,
default="",
)
parser.add_argument(
"--dryrun",
action="store_true",
help="Do a dryrun and exit",
default=False,
)
parser.add_argument(
"--slurm",
action="store_true",
help="Run on slurm using run.SlurmExecutor",
default=False,
)
parser.add_argument('--nodes', type=int, default=1, help="Num nodes for the executor")
parser.add_argument('--devices', type=int, default=8, help="Num devices per node for the executor")
parser.add_argument(
'--container_image',
type=str,
default="nvcr.io/nvidia/nemo:dev",
help="Container image for the run, only used in case of slurm runs."
"Can be a path as well in case of .sqsh file.",
)
return parser
def slurm_executor(
user: str,
host: str,
remote_job_dir: str,
account: str,
partition: str,
nodes: int,
devices: int,
container_image: str,
time: str = "04:00:00",
custom_mounts: Optional[list[str]] = None,
custom_env_vars: Optional[dict[str, str]] = None,
retries: int = 0,
) -> run.SlurmExecutor:
if not (user and host and remote_job_dir and account and partition and nodes and devices):
raise RuntimeError(
"Please set user, host, remote_job_dir, account, partition, nodes and devices args for using this ",
"function.",
)
mounts = []
if custom_mounts:
mounts.extend(custom_mounts)
env_vars = {
# required for some eval benchmarks from lm-eval-harness
"HF_DATASETS_TRUST_REMOTE_CODE": "1",
"HF_TOKEN": "xxxxxx",
}
if custom_env_vars:
env_vars |= custom_env_vars
executor = run.SlurmExecutor(
account=account,
partition=partition,
tunnel=run.SSHTunnel(
user=user,
host=host,
job_dir=remote_job_dir,
),
nodes=nodes,
ntasks_per_node=devices,
exclusive=True,
# archives and uses the local code. Use packager=run.Packager() to use the code code mounted on clusters
packager=run.GitArchivePackager(),
)
executor.container_image = container_image
executor.container_mounts = mounts
executor.env_vars = env_vars
executor.retries = retries
executor.time = time
return executor
def local_executor_torchrun() -> run.LocalExecutor:
env_vars = {
# required for some eval benchmarks from lm-eval-harness
"HF_DATASETS_TRUST_REMOTE_CODE": "1",
"HF_TOKEN": "xxxxxx",
}
executor = run.LocalExecutor(env_vars=env_vars)
return executor
def main():
args = get_parser().parse_args()
if args.tag and not args.tag.startswith("-"):
args.tag = "-" + args.tag
exp_name = "NeMoEvaluation"
deploy_fn = run.Partial(
deploy,
nemo_checkpoint=args.nemo_checkpoint,
fastapi_port=args.fastapi_port,
triton_http_address=args.triton_http_address,
max_input_len=args.max_input_len,
tensor_parallelism_size=args.tensor_parallelism_size,
pipeline_parallelism_size=args.pipeline_parallelism_size,
max_batch_size=args.batch_size,
num_gpus=args.devices,
num_nodes=args.nodes,
)
api_endpoint = run.Config(
ApiEndpoint,
url=f"http://{args.triton_http_address}:{args.fastapi_port}/v1/{ENDPOINT_TYPES[args.endpoint_type]}",
type=args.endpoint_type,
)
eval_target = run.Config(EvaluationTarget, api_endpoint=api_endpoint)
eval_params = run.Config(
ConfigParams,
limit_samples=args.limit,
parallelism=args.parallel_requests,
request_timeout=args.request_timeout,
)
eval_config = run.Config(EvaluationConfig, type=args.eval_task, params=eval_params)
eval_fn = run.Partial(evaluate, target_cfg=eval_target, eval_cfg=eval_config)
executor: run.Executor
executor_eval: run.Executor
if args.slurm:
# TODO: Set your custom parameters for the Slurm Executor.
executor = slurm_executor(
user="",
host="",
remote_job_dir="",
account="",
partition="",
nodes=args.nodes,
devices=args.devices,
container_image=args.container_image,
custom_mounts=[],
)
executor.srun_args = ["--mpi=pmix", "--overlap"]
executor_eval = executor.clone()
executor_eval.srun_args = ["--ntasks-per-node=1", "--nodes=1"] ## so that eval is laucnhed only on main node
# or node with index 0
else:
executor = local_executor_torchrun()
executor_eval = None
with run.Experiment(f"{exp_name}{args.tag}") as exp:
if args.slurm:
exp.add(
[deploy_fn, eval_fn],
executor=[executor, executor_eval],
name=exp_name,
tail_logs=True if isinstance(executor, run.LocalExecutor) else False,
)
else:
exp.add(deploy_fn, executor=executor, name=f"{exp_name}_deploy")
exp.add(eval_fn, executor=executor, name=f"{exp_name}_evaluate")
if args.dryrun:
exp.dryrun()
else:
exp.run()
if __name__ == "__main__":
main()