# coding:utf-8
from threading import Thread
from typing import Any, Iterator
class LLAMA2_WRAPPER:
def __init__(self, config: dict = {}):
self.config = config
self.model = None
self.tokenizer = None
def init_model(self):
if self.model is None:
self.model = LLAMA2_WRAPPER.create_llama2_model(
self.config,
)
if not self.config.get("llama_cpp"):
self.model.eval()
def init_tokenizer(self):
if self.tokenizer is None and not self.config.get("llama_cpp"):
self.tokenizer = LLAMA2_WRAPPER.create_llama2_tokenizer(self.config)
@classmethod
def create_llama2_model(cls, config):
model_name = config.get("model_name")
load_in_8bit = config.get("load_in_8bit", True)
load_in_4bit = config.get("load_in_4bit", False)
llama_cpp = config.get("llama_cpp", False)
if llama_cpp:
from llama_cpp import Llama
model = Llama(
model_path=model_name,
n_ctx=config.get("MAX_INPUT_TOKEN_LENGTH"),
n_batch=config.get("MAX_INPUT_TOKEN_LENGTH"),
)
elif load_in_4bit:
from auto_gptq import AutoGPTQForCausalLM
model = AutoGPTQForCausalLM.from_quantized(
model_name,
use_safetensors=True,
trust_remote_code=True,
device="cuda:0",
use_triton=False,
quantize_config=None,
)
else:
import torch
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
torch_dtype=torch.float16,
load_in_8bit=load_in_8bit,
)
return model
@classmethod
def create_llama2_tokenizer(cls, config):
model_name = config.get("model_name")
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)
return tokenizer
def get_token_length(
self,
prompt: str,
) -> int:
if self.config.get("llama_cpp"):
input_ids = self.model.tokenize(bytes(prompt, "utf-8"))
return len(input_ids)
else:
input_ids = self.tokenizer([prompt], return_tensors="np")["input_ids"]
return input_ids.shape[-1]
def get_input_token_length(
self, message: str, chat_history: list[tuple[str, str]], system_prompt: str
) -> int:
prompt = get_prompt(message, chat_history, system_prompt)
return self.get_token_length(prompt)
def generate(
self,
prompt: str,
max_new_tokens: int = 1024,
temperature: float = 0.8,
top_p: float = 0.95,
top_k: int = 50,
) -> Iterator[str]:
if self.config.get("llama_cpp"):
inputs = self.model.tokenize(bytes(prompt, "utf-8"))
generate_kwargs = dict(
top_p=top_p,
top_k=top_k,
temp=temperature,
)
generator = self.model.generate(inputs, **generate_kwargs)
outputs = []
answer_message =''
new_tokens = []
for token in generator:
if token!='':
try:
new_tokens.append(token)
b_text = self.model.detokenize(new_tokens)
# b_text = self.model.decode(new_tokens)
answer_message+=str(b_text, encoding="utf-8")
new_tokens = []
except:
pass
else:
yield answer_message
break
if 'Human:' in answer_message:
answer_message = answer_message.split('Human:')[0]
yield answer_message
break
if token == self.model.token_eos():
yield answer_message
break
yield answer_message
else:
from transformers import TextIteratorStreamer
inputs = self.tokenizer([prompt], return_tensors="pt").to("cuda")
streamer = TextIteratorStreamer(
self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True
)
generate_kwargs = dict(
inputs,
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=True,
top_p=top_p,
top_k=top_k,
temperature=temperature,
num_beams=1,
)
t = Thread(target=self.model.generate, kwargs=generate_kwargs)
t.start()
outputs = []
for text in streamer:
outputs.append(text)
yield "".join(outputs)
def run(
self,
message: str,
chat_history: list[tuple[str, str]],
system_prompt: str,
max_new_tokens: int = 1024,
temperature: float = 0.3,
top_p: float = 0.95,
top_k: int = 50,
) -> Iterator[str]:
prompt = get_prompt(message, chat_history, system_prompt)
return self.generate(prompt, max_new_tokens, temperature, top_p, top_k)
def __call__(
self,
prompt: str,
**kwargs: Any,
) -> str:
if self.config.get("llama_cpp"):
return self.model.__call__(prompt, **kwargs)["choices"][0]["text"]
else:
inputs = self.tokenizer([prompt], return_tensors="pt").input_ids.to("cuda")
output = self.model.generate(inputs=inputs, **kwargs)
return self.tokenizer.decode(output[0])
def get_prompt(
message: str, chat_history: list[tuple[str, str]], system_prompt: str
) -> str:
prompt = ''
for user_input, response in chat_history:
prompt += "Human: " + user_input.strip()+"\nAssistant: " + response.strip()+"\n"
prompt += "Human: " + message.strip() +"\nAssistant: "
prompt = prompt[-2048:]
if len(system_prompt)>0:
prompt = 'System: '+system_prompt.strip()+'\n'+ prompt
return prompt