NeMo / examples /nlp /language_modeling /megatron_gpt_eval.py
camenduru's picture
thanks to NVIDIA ❤
7934b29
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
import threading
import torch
from omegaconf import OmegaConf, open_dict
from pytorch_lightning.trainer.trainer import Trainer
from torch.utils.data import DataLoader, Dataset
from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel
from nemo.collections.nlp.modules.common.megatron.megatron_init import fake_initialize_model_parallel
from nemo.collections.nlp.modules.common.megatron_web_server import get_demo
from nemo.collections.nlp.modules.common.text_generation_server import MegatronServer
from nemo.collections.nlp.modules.common.text_generation_utils import generate
from nemo.collections.nlp.modules.common.transformer.text_generation import LengthParam, SamplingParam
from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy, NLPSaveRestoreConnector
from nemo.core.config import hydra_runner
from nemo.utils.app_state import AppState
from nemo.utils.model_utils import inject_model_parallel_rank
try:
from apex.transformer import parallel_state
HAVE_APEX = True
except (ImportError, ModuleNotFoundError):
HAVE_APEX = False
"""
This is the script to run GPT text generation.
Usage:
Assume the model has TP=1, PP=1 in the following use cases.
a. run greedy inference from a nemo file:
python megatron_gpt_eval.py \
gpt_model_file=PATH_TO_MODEL \
inference.greedy=True \
inference.add_BOS=True \
trainer.devices=1 \
trainer.num_nodes=1 \
tensor_model_parallel_size=1 \
pipeline_model_parallel_size=1 \
prompts=[prompt1,prompt2]
b. run greedy inference from a PTL checkpoint file:
python megatron_gpt_eval.py \
checkpoint_dir=PATH_TO_CHECKPOINT_FILE \
checkpoint_name=CHECKPOINT_FILE_NAME \
hparams_file=HPARAMS_FILE \
inference.greedy=True \
inference.add_BOS=True \
trainer.devices=1 \
trainer.num_nodes=1 \
tensor_model_parallel_size=1 \
pipeline_model_parallel_size=1 \
prompts=[prompt1,prompt2]
c. run top_p inference from a nemo file:
python megatron_gpt_eval.py \
gpt_model_file=PATH_TO_MODEL \
inference.greedy=False \
inference.top_k=0 \
inference.top_p=0.9 \
inference.repetition_penalty=1.2 \
inference.add_BOS=True \
trainer.devices=1 \
trainer.num_nodes=1 \
tensor_model_parallel_size=1 \
pipeline_model_parallel_size=1 \
prompts=[prompt1,prompt2]
d. If you don't need to generate tokens and need model to compute logprobs:
python megatron_gpt_eval.py \
gpt_model_file=PATH_TO_MODEL \
inference.compute_logprob=True \
trainer.devices=1 \
trainer.num_nodes=1 \
tensor_model_parallel_size=1 \
pipeline_model_parallel_size=1 \
prompts=[text to get logprob]
e. Launch the inference server
python megatron_gpt_eval.py \
gpt_model_file=PATH_TO_MODEL \
trainer.devices=1 \
trainer.num_nodes=1 \
tensor_model_parallel_size=1 \
pipeline_model_parallel_size=1 \
server=True
To send a request to the server, here is one example code:
```python
import json
import requests
batch_size = 8
port_num = 5555
headers = {"Content-Type": "application/json"}
def request_data(data):
resp = requests.put('http://localhost:{}/generate'.format(port_num),
data=json.dumps(data),
headers=headers)
sentences = resp.json()['sentences']
return sentences
data = {
"sentences": [""] * batch_size,
"tokens_to_generate": 300,
"temperature": 1.0,
"add_BOS": True,
"top_k": 0,
"top_p": 0.9,
"greedy": False,
"all_probs": False,
"repetition_penalty": 1.2,
"min_tokens_to_generate": 2,
}
sentences = request_data(data)
```
"""
if not torch.cuda.is_available():
raise EnvironmentError("GPU is needed for the inference")
class RequestDataSet(Dataset):
def __init__(self, sentences):
super().__init__()
self.sentences = sentences
def __len__(self,):
return len(self.sentences)
def __getitem__(self, idx):
return self.sentences[idx]
@hydra_runner(config_path="conf", config_name="megatron_gpt_inference")
def main(cfg) -> None:
# trainer required for restoring model parallel models
trainer = Trainer(strategy=NLPDDPStrategy(), **cfg.trainer)
assert (
cfg.trainer.devices * cfg.trainer.num_nodes
== cfg.tensor_model_parallel_size * cfg.pipeline_model_parallel_size
), "devices * num_nodes should equal tensor_model_parallel_size * pipeline_model_parallel_size"
if cfg.gpt_model_file:
save_restore_connector = NLPSaveRestoreConnector()
if os.path.isdir(cfg.gpt_model_file):
save_restore_connector.model_extracted_dir = cfg.gpt_model_file
pretrained_cfg = MegatronGPTModel.restore_from(
restore_path=cfg.gpt_model_file,
trainer=trainer,
return_config=True,
save_restore_connector=save_restore_connector,
)
OmegaConf.set_struct(pretrained_cfg, True)
with open_dict(pretrained_cfg):
pretrained_cfg.sequence_parallel = False
pretrained_cfg.activations_checkpoint_granularity = None
pretrained_cfg.activations_checkpoint_method = None
pretrained_cfg.precision = trainer.precision
model = MegatronGPTModel.restore_from(
restore_path=cfg.gpt_model_file,
trainer=trainer,
override_config_path=pretrained_cfg,
save_restore_connector=save_restore_connector,
)
elif cfg.checkpoint_dir:
app_state = AppState()
if cfg.tensor_model_parallel_size > 1 or cfg.pipeline_model_parallel_size > 1:
app_state.model_parallel_size = cfg.tensor_model_parallel_size * cfg.pipeline_model_parallel_size
app_state.tensor_model_parallel_size = cfg.tensor_model_parallel_size
app_state.pipeline_model_parallel_size = cfg.pipeline_model_parallel_size
(
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
app_state.virtual_pipeline_model_parallel_rank,
) = fake_initialize_model_parallel(
world_size=app_state.model_parallel_size,
rank=trainer.global_rank,
tensor_model_parallel_size_=cfg.tensor_model_parallel_size,
pipeline_model_parallel_size_=cfg.pipeline_model_parallel_size,
pipeline_model_parallel_split_rank_=cfg.pipeline_model_parallel_split_rank,
)
checkpoint_path = inject_model_parallel_rank(os.path.join(cfg.checkpoint_dir, cfg.checkpoint_name))
model = MegatronGPTModel.load_from_checkpoint(checkpoint_path, hparams_file=cfg.hparams_file, trainer=trainer)
else:
raise ValueError("need at least a nemo file or checkpoint dir")
model.freeze()
# Have to turn off activations_checkpoint_method for inference
try:
model.model.language_model.encoder.activations_checkpoint_method = None
except AttributeError:
pass
length_params: LengthParam = {
"max_length": cfg.inference.tokens_to_generate,
"min_length": cfg.inference.min_tokens_to_generate,
}
sampling_params: SamplingParam = {
"use_greedy": cfg.inference.greedy,
"temperature": cfg.inference.temperature,
"top_k": cfg.inference.top_k,
"top_p": cfg.inference.top_p,
"repetition_penalty": cfg.inference.repetition_penalty,
"add_BOS": cfg.inference.add_BOS,
"all_probs": cfg.inference.all_probs,
"compute_logprob": cfg.inference.compute_logprob,
}
# First method of running text generation, call model.generate method
response = model.generate(
inputs=OmegaConf.to_container(cfg.prompts), length_params=length_params, sampling_params=sampling_params
)
print("***************************")
print(response)
print("***************************")
# Second method of running text generation, call trainer.predict
ds = RequestDataSet(OmegaConf.to_container(cfg.prompts))
request_dl = DataLoader(dataset=ds, batch_size=2)
config = OmegaConf.to_container(cfg.inference)
model.set_inference_config(config)
response = trainer.predict(model, request_dl)
print("***************************")
print(response)
print("***************************")
# Third method of running text generation, use inference server
if cfg.server:
if parallel_state.is_pipeline_first_stage() and parallel_state.get_tensor_model_parallel_rank() == 0:
if cfg.web_server:
loop = asyncio.new_event_loop()
thread = threading.Thread(
target=get_demo,
daemon=True,
args=(cfg.share, cfg.username, cfg.password, cfg.port, cfg.web_port, loop),
)
thread.start()
server = MegatronServer(model.cuda())
server.run("0.0.0.0", port=cfg.port)
while True:
choice = torch.cuda.LongTensor(1)
torch.distributed.broadcast(choice, 0)
if choice[0].item() == 0:
generate(model.cuda())
if __name__ == '__main__':
main() # noqa pylint: disable=no-value-for-parameter