from __future__ import annotations import configparser import pathlib import typing import os import torch import transformers from torch.nn.utils.rnn import pad_sequence from .config import LYRA_XVERSE_PARAM from .model import XVERSEModel class lyraXVERSE: def __init__(self, model_path, tokenizer_path=None, dtype='fp16', memopt_mode=1, arch='Ampere', cuda_version=12) -> None: self.model_path = model_path self.tokenizer_path = tokenizer_path self.dtype = dtype self.memopt_mode = memopt_mode self.arch = arch self.cuda_version = cuda_version self.model, self.tokenizer = self.load_model_and_tokenizer() print("Got model and tokenizer") def load_model_and_tokenizer(self): if self.tokenizer_path is None: tokenizer_path = self.model_path else: tokenizer_path = self.tokenizer_path print(f'Loading tokenizer from {tokenizer_path}') tokenizer = transformers.AutoTokenizer.from_pretrained(tokenizer_path) checkpoint_path = pathlib.Path(self.model_path) config_path = checkpoint_path / 'config.ini' if config_path.exists(): # Read model params from config. cfg = configparser.ConfigParser() cfg.read(config_path) model_name = 'llama' inference_data_type = self.dtype if inference_data_type == None: inference_data_type = cfg.get(model_name, "weight_data_type") model_args = dict( head_num=cfg.getint(model_name, 'head_num'), size_per_head=cfg.getint(model_name, "size_per_head"), inter_size=cfg.getint(model_name, 'inter_size'), layer_num=cfg.getint(model_name, "num_layer"), rotary_embedding_dim=cfg.getint(model_name, 'rotary_embedding'), layernorm_eps=cfg.getfloat(model_name, 'layernorm_eps'), vocab_size=cfg.getint(model_name, "vocab_size"), start_id=cfg.getint(model_name, "start_id"), end_id=cfg.getint(model_name, "end_id"), weights_data_type=cfg.get(model_name, "weight_data_type"), tensor_para_size=cfg.getint(model_name, "tensor_para_size"), inference_data_type=inference_data_type) else: inference_data_type = self.dtype if inference_data_type == None: inference_data_type = LYRA_XVERSE_PARAM.weights_data_type model_args = dict(head_num=LYRA_XVERSE_PARAM.num_heads, size_per_head=LYRA_XVERSE_PARAM.size_per_head, inter_size=LYRA_XVERSE_PARAM.inter_size, layer_num=LYRA_XVERSE_PARAM.num_layers, rotary_embedding_dim=LYRA_XVERSE_PARAM.rotary_embedding, layernorm_eps=LYRA_XVERSE_PARAM.layernorm_eps, vocab_size=LYRA_XVERSE_PARAM.vocab_size, start_id=LYRA_XVERSE_PARAM.start_id or tokenizer.bos_token_id, end_id=LYRA_XVERSE_PARAM.end_id or tokenizer.eos_token_id, weights_data_type=LYRA_XVERSE_PARAM.weights_data_type, tensor_para_size=LYRA_XVERSE_PARAM.tensor_para_size, inference_data_type=inference_data_type) # update common parameters # Load the C++ model into Pytorch model. sm = "sm80" if self.arch == "Ampere": sm = "sm80" elif self.arch == "Volta": sm = "sm70" else: raise Exception(f"unsupported arch: {self.arch}") cu = 'cu11' if self.cuda_version == 11: cu = 'cu11' elif self.cuda_version == 12: cu = 'cu12' else: raise Exception(f"unsupported cuda version: {self.cuda_version}") lib_path = pathlib.Path(__file__).parent / "ftlib" / f"libth_transformer_{sm}_{cu}.so" model_args.update(dict( lib_path=lib_path, model_path=os.path.join(self.model_path, "1-gpu-fp16.bin"), max_seq_len=0, # for position seq embedding pipeline_para_size=LYRA_XVERSE_PARAM.pipeline_para_size, use_gptj_residual=LYRA_XVERSE_PARAM.use_gptj_residual, memopt_mode=self.memopt_mode )) print('[FT][INFO] Load Our FT Highly Optimized XVERSE model') for k, v in model_args.items(): print(f' - {k.ljust(25, ".")}: {v}') # Check sanity and consistency between the model and tokenizer. checklist = ['head_num', 'size_per_head', 'vocab_size', 'layer_num', 'tensor_para_size', 'tensor_para_size', 'weights_data_type'] if None in [model_args[k] for k in checklist]: none_params = [p for p in checklist if model_args[p] is None] print(f'[FT][WARNING] Found None parameters {none_params}. They must ' f'be provided either by config file or CLI arguments.') if model_args['start_id'] != tokenizer.bos_token_id: print('[FT][WARNING] Given start_id is not matched with the bos token ' 'id of the pretrained tokenizer.') if model_args['end_id'] not in (tokenizer.pad_token_id, tokenizer.eos_token_id): print('[FT][WARNING] Given end_id is not matched with neither pad ' 'token id nor eos token id of the pretrained tokenizer.') print(f'Loading model from {self.model_path}') model = XVERSEModel(**model_args) return model, tokenizer def generate(self, prompts: typing.List[str] | str, output_length: int = 512, beam_width: int = 1, top_k: typing.Optional[torch.IntTensor] = 1, top_p: typing.Optional[torch.FloatTensor] = 1.0, beam_search_diversity_rate: typing.Optional[torch.FloatTensor] = 0.0, temperature: typing.Optional[torch.FloatTensor] = 1.0, len_penalty: typing.Optional[torch.FloatTensor] = 0.0, repetition_penalty: typing.Optional[torch.FloatTensor] = 1.0, presence_penalty: typing.Optional[torch.FloatTensor] = None, min_length: typing.Optional[torch.IntTensor] = None, bad_words_list: typing.Optional[torch.IntTensor] = None, do_sample: bool = False, return_output_length: bool = False, return_cum_log_probs: int = 0): # if isinstance(prompts, str): prompts = [prompts, ] inputs = prompts batch_size = len(inputs) ones_int = torch.ones(size=[batch_size], dtype=torch.int32) ones_float = torch.ones(size=[batch_size], dtype=torch.float32) # we must encode the raw prompt text one by one in order to compute the length of the original text. input_token_ids = [self.tokenizer(text, return_tensors="pt").input_ids.int().squeeze() for text in inputs] input_lengths = torch.IntTensor([len(ids) for ids in input_token_ids]) # after got the length of each input text tokens. we can batchfy the input list to a tensor. padding the right. input_token_ids = pad_sequence(input_token_ids, batch_first=True, padding_value=self.tokenizer.eos_token_id) random_seed = None if do_sample: random_seed = torch.randint(0, 262144, (batch_size,), dtype=torch.long) outputs = self.model(start_ids=input_token_ids, start_lengths=input_lengths, output_len=output_length, beam_width=beam_width, top_k=top_k * ones_int, top_p=top_p * ones_float, beam_search_diversity_rate=beam_search_diversity_rate * ones_float, temperature=temperature * ones_float, len_penalty=len_penalty * ones_float, repetition_penalty=repetition_penalty * ones_float, random_seed=random_seed, return_output_length=return_output_length, return_cum_log_probs=return_cum_log_probs) if return_cum_log_probs > 0: outputs = outputs[0] # output_token_ids. # Slice the generated token ids of the 1st beam result. # output = input tokens + generated tokens. output_token_ids = [out[0, length:].cpu() for out, length in zip(outputs, input_lengths)] output_texts = self.tokenizer.batch_decode( output_token_ids, skip_special_tokens=True) return output_texts