|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
import os |
|
import threading |
|
|
|
import torch |
|
from omegaconf import OmegaConf, open_dict |
|
from pytorch_lightning.trainer.trainer import Trainer |
|
from torch.utils.data import DataLoader, Dataset |
|
|
|
from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel |
|
from nemo.collections.nlp.modules.common.megatron.megatron_init import fake_initialize_model_parallel |
|
from nemo.collections.nlp.modules.common.megatron_web_server import get_demo |
|
from nemo.collections.nlp.modules.common.text_generation_server import MegatronServer |
|
from nemo.collections.nlp.modules.common.text_generation_utils import generate |
|
from nemo.collections.nlp.modules.common.transformer.text_generation import LengthParam, SamplingParam |
|
from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy, NLPSaveRestoreConnector |
|
from nemo.core.config import hydra_runner |
|
from nemo.utils.app_state import AppState |
|
from nemo.utils.model_utils import inject_model_parallel_rank |
|
|
|
try: |
|
from apex.transformer import parallel_state |
|
|
|
HAVE_APEX = True |
|
except (ImportError, ModuleNotFoundError): |
|
HAVE_APEX = False |
|
|
|
""" |
|
This is the script to run GPT text generation. |
|
|
|
Usage: |
|
Assume the model has TP=1, PP=1 in the following use cases. |
|
a. run greedy inference from a nemo file: |
|
python megatron_gpt_eval.py \ |
|
gpt_model_file=PATH_TO_MODEL \ |
|
inference.greedy=True \ |
|
inference.add_BOS=True \ |
|
trainer.devices=1 \ |
|
trainer.num_nodes=1 \ |
|
tensor_model_parallel_size=1 \ |
|
pipeline_model_parallel_size=1 \ |
|
prompts=[prompt1,prompt2] |
|
|
|
b. run greedy inference from a PTL checkpoint file: |
|
python megatron_gpt_eval.py \ |
|
checkpoint_dir=PATH_TO_CHECKPOINT_FILE \ |
|
checkpoint_name=CHECKPOINT_FILE_NAME \ |
|
hparams_file=HPARAMS_FILE \ |
|
inference.greedy=True \ |
|
inference.add_BOS=True \ |
|
trainer.devices=1 \ |
|
trainer.num_nodes=1 \ |
|
tensor_model_parallel_size=1 \ |
|
pipeline_model_parallel_size=1 \ |
|
prompts=[prompt1,prompt2] |
|
|
|
c. run top_p inference from a nemo file: |
|
python megatron_gpt_eval.py \ |
|
gpt_model_file=PATH_TO_MODEL \ |
|
inference.greedy=False \ |
|
inference.top_k=0 \ |
|
inference.top_p=0.9 \ |
|
inference.repetition_penalty=1.2 \ |
|
inference.add_BOS=True \ |
|
trainer.devices=1 \ |
|
trainer.num_nodes=1 \ |
|
tensor_model_parallel_size=1 \ |
|
pipeline_model_parallel_size=1 \ |
|
prompts=[prompt1,prompt2] |
|
|
|
d. If you don't need to generate tokens and need model to compute logprobs: |
|
python megatron_gpt_eval.py \ |
|
gpt_model_file=PATH_TO_MODEL \ |
|
inference.compute_logprob=True \ |
|
trainer.devices=1 \ |
|
trainer.num_nodes=1 \ |
|
tensor_model_parallel_size=1 \ |
|
pipeline_model_parallel_size=1 \ |
|
prompts=[text to get logprob] |
|
|
|
e. Launch the inference server |
|
python megatron_gpt_eval.py \ |
|
gpt_model_file=PATH_TO_MODEL \ |
|
trainer.devices=1 \ |
|
trainer.num_nodes=1 \ |
|
tensor_model_parallel_size=1 \ |
|
pipeline_model_parallel_size=1 \ |
|
server=True |
|
|
|
To send a request to the server, here is one example code: |
|
```python |
|
import json |
|
import requests |
|
|
|
batch_size = 8 |
|
port_num = 5555 |
|
headers = {"Content-Type": "application/json"} |
|
|
|
|
|
def request_data(data): |
|
resp = requests.put('http://localhost:{}/generate'.format(port_num), |
|
data=json.dumps(data), |
|
headers=headers) |
|
sentences = resp.json()['sentences'] |
|
return sentences |
|
|
|
|
|
data = { |
|
"sentences": [""] * batch_size, |
|
"tokens_to_generate": 300, |
|
"temperature": 1.0, |
|
"add_BOS": True, |
|
"top_k": 0, |
|
"top_p": 0.9, |
|
"greedy": False, |
|
"all_probs": False, |
|
"repetition_penalty": 1.2, |
|
"min_tokens_to_generate": 2, |
|
} |
|
|
|
sentences = request_data(data) |
|
``` |
|
""" |
|
|
|
if not torch.cuda.is_available(): |
|
raise EnvironmentError("GPU is needed for the inference") |
|
|
|
|
|
class RequestDataSet(Dataset): |
|
def __init__(self, sentences): |
|
super().__init__() |
|
self.sentences = sentences |
|
|
|
def __len__(self,): |
|
return len(self.sentences) |
|
|
|
def __getitem__(self, idx): |
|
return self.sentences[idx] |
|
|
|
|
|
@hydra_runner(config_path="conf", config_name="megatron_gpt_inference") |
|
def main(cfg) -> None: |
|
|
|
|
|
trainer = Trainer(strategy=NLPDDPStrategy(), **cfg.trainer) |
|
assert ( |
|
cfg.trainer.devices * cfg.trainer.num_nodes |
|
== cfg.tensor_model_parallel_size * cfg.pipeline_model_parallel_size |
|
), "devices * num_nodes should equal tensor_model_parallel_size * pipeline_model_parallel_size" |
|
|
|
if cfg.gpt_model_file: |
|
save_restore_connector = NLPSaveRestoreConnector() |
|
if os.path.isdir(cfg.gpt_model_file): |
|
save_restore_connector.model_extracted_dir = cfg.gpt_model_file |
|
|
|
pretrained_cfg = MegatronGPTModel.restore_from( |
|
restore_path=cfg.gpt_model_file, |
|
trainer=trainer, |
|
return_config=True, |
|
save_restore_connector=save_restore_connector, |
|
) |
|
OmegaConf.set_struct(pretrained_cfg, True) |
|
with open_dict(pretrained_cfg): |
|
pretrained_cfg.sequence_parallel = False |
|
pretrained_cfg.activations_checkpoint_granularity = None |
|
pretrained_cfg.activations_checkpoint_method = None |
|
pretrained_cfg.precision = trainer.precision |
|
model = MegatronGPTModel.restore_from( |
|
restore_path=cfg.gpt_model_file, |
|
trainer=trainer, |
|
override_config_path=pretrained_cfg, |
|
save_restore_connector=save_restore_connector, |
|
) |
|
elif cfg.checkpoint_dir: |
|
app_state = AppState() |
|
if cfg.tensor_model_parallel_size > 1 or cfg.pipeline_model_parallel_size > 1: |
|
app_state.model_parallel_size = cfg.tensor_model_parallel_size * cfg.pipeline_model_parallel_size |
|
app_state.tensor_model_parallel_size = cfg.tensor_model_parallel_size |
|
app_state.pipeline_model_parallel_size = cfg.pipeline_model_parallel_size |
|
( |
|
app_state.tensor_model_parallel_rank, |
|
app_state.pipeline_model_parallel_rank, |
|
app_state.model_parallel_size, |
|
app_state.data_parallel_size, |
|
app_state.pipeline_model_parallel_split_rank, |
|
app_state.virtual_pipeline_model_parallel_rank, |
|
) = fake_initialize_model_parallel( |
|
world_size=app_state.model_parallel_size, |
|
rank=trainer.global_rank, |
|
tensor_model_parallel_size_=cfg.tensor_model_parallel_size, |
|
pipeline_model_parallel_size_=cfg.pipeline_model_parallel_size, |
|
pipeline_model_parallel_split_rank_=cfg.pipeline_model_parallel_split_rank, |
|
) |
|
checkpoint_path = inject_model_parallel_rank(os.path.join(cfg.checkpoint_dir, cfg.checkpoint_name)) |
|
model = MegatronGPTModel.load_from_checkpoint(checkpoint_path, hparams_file=cfg.hparams_file, trainer=trainer) |
|
else: |
|
raise ValueError("need at least a nemo file or checkpoint dir") |
|
|
|
model.freeze() |
|
|
|
|
|
try: |
|
model.model.language_model.encoder.activations_checkpoint_method = None |
|
except AttributeError: |
|
pass |
|
|
|
length_params: LengthParam = { |
|
"max_length": cfg.inference.tokens_to_generate, |
|
"min_length": cfg.inference.min_tokens_to_generate, |
|
} |
|
|
|
sampling_params: SamplingParam = { |
|
"use_greedy": cfg.inference.greedy, |
|
"temperature": cfg.inference.temperature, |
|
"top_k": cfg.inference.top_k, |
|
"top_p": cfg.inference.top_p, |
|
"repetition_penalty": cfg.inference.repetition_penalty, |
|
"add_BOS": cfg.inference.add_BOS, |
|
"all_probs": cfg.inference.all_probs, |
|
"compute_logprob": cfg.inference.compute_logprob, |
|
} |
|
|
|
|
|
response = model.generate( |
|
inputs=OmegaConf.to_container(cfg.prompts), length_params=length_params, sampling_params=sampling_params |
|
) |
|
|
|
print("***************************") |
|
print(response) |
|
print("***************************") |
|
|
|
|
|
ds = RequestDataSet(OmegaConf.to_container(cfg.prompts)) |
|
request_dl = DataLoader(dataset=ds, batch_size=2) |
|
config = OmegaConf.to_container(cfg.inference) |
|
model.set_inference_config(config) |
|
response = trainer.predict(model, request_dl) |
|
|
|
print("***************************") |
|
print(response) |
|
print("***************************") |
|
|
|
|
|
if cfg.server: |
|
if parallel_state.is_pipeline_first_stage() and parallel_state.get_tensor_model_parallel_rank() == 0: |
|
if cfg.web_server: |
|
loop = asyncio.new_event_loop() |
|
thread = threading.Thread( |
|
target=get_demo, |
|
daemon=True, |
|
args=(cfg.share, cfg.username, cfg.password, cfg.port, cfg.web_port, loop), |
|
) |
|
thread.start() |
|
server = MegatronServer(model.cuda()) |
|
server.run("0.0.0.0", port=cfg.port) |
|
|
|
while True: |
|
choice = torch.cuda.LongTensor(1) |
|
torch.distributed.broadcast(choice, 0) |
|
if choice[0].item() == 0: |
|
generate(model.cuda()) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|