import os import time import uuid from enum import Enum from threading import Thread from typing import Any, Iterator, Union, List from llama2_wrapper.types import ( Completion, CompletionChunk, ChatCompletion, ChatCompletionChunk, # ChatCompletionMessage, Message, B_INST, E_INST, B_SYS, E_SYS, ) class LLAMA2_WRAPPER: def __init__( self, model_path: str = "", backend_type: str = "llama.cpp", max_tokens: int = 4000, load_in_8bit: bool = True, verbose: bool = False, ): """Load a llama2 model from `model_path`. Args: model_path: Path to the model. backend_type: Backend for llama2, options: llama.cpp, gptq, transformers max_tokens: Maximum context size. load_in_8bit: Use bitsandbytes to run model in 8 bit mode (only for transformers models). verbose: Print verbose output to stderr. Raises: ValueError: If the model path does not exist. Returns: A LLAMA2_WRAPPER instance. """ self.model_path = model_path self.backend_type = BackendType.get_type(backend_type) self.max_tokens = max_tokens self.load_in_8bit = load_in_8bit self.model = None self.tokenizer = None self.verbose = verbose if self.backend_type is BackendType.LLAMA_CPP: print("Running on backend llama.cpp.") else: import torch if torch.cuda.is_available(): print("Running on GPU with backend torch transformers.") else: print("GPU CUDA not found.") self.default_llamacpp_path = "./models/llama-2-7b-chat.Q4_0.gguf" self.default_gptq_path = "./models/Llama-2-7b-Chat-GPTQ" # Download default ggml/gptq model if self.model_path == "": print("Model path is empty.") if self.backend_type is BackendType.LLAMA_CPP: print("Use default llama.cpp model path: " + self.default_llamacpp_path) if not os.path.exists(self.default_llamacpp_path): print("Start downloading model to: " + self.default_llamacpp_path) from huggingface_hub import hf_hub_download hf_hub_download( repo_id="TheBloke/Llama-2-7b-Chat-GGUF", filename="llama-2-7b-chat.Q4_0.gguf", local_dir="./models/", ) else: print("Model exists in ./models/llama-2-7b-chat.Q4_0.gguf.") self.model_path = self.default_llamacpp_path elif self.backend_type is BackendType.GPTQ: print("Use default gptq model path: " + self.default_gptq_path) if not os.path.exists(self.default_gptq_path): print("Start downloading model to: " + self.default_gptq_path) from huggingface_hub import snapshot_download snapshot_download( "TheBloke/Llama-2-7b-Chat-GPTQ", local_dir=self.default_gptq_path, ) else: print("Model exists in " + self.default_gptq_path) self.model_path = self.default_gptq_path self.init_tokenizer() self.init_model() def init_model(self): if self.model is None: self.model = LLAMA2_WRAPPER.create_llama2_model( self.model_path, self.backend_type, self.max_tokens, self.load_in_8bit, self.verbose, ) if self.backend_type is not BackendType.LLAMA_CPP: self.model.eval() def init_tokenizer(self): if self.backend_type is not BackendType.LLAMA_CPP: if self.tokenizer is None: self.tokenizer = LLAMA2_WRAPPER.create_llama2_tokenizer(self.model_path) @classmethod def create_llama2_model( cls, model_path, backend_type, max_tokens, load_in_8bit, verbose ): if backend_type is BackendType.LLAMA_CPP: from llama_cpp import Llama model = Llama( model_path=model_path, n_ctx=max_tokens, n_batch=max_tokens, verbose=verbose, ) elif backend_type is BackendType.GPTQ: from auto_gptq import AutoGPTQForCausalLM model = AutoGPTQForCausalLM.from_quantized( model_path, use_safetensors=True, trust_remote_code=True, device="cuda:0", use_triton=False, quantize_config=None, ) elif backend_type is BackendType.TRANSFORMERS: import torch from transformers import AutoModelForCausalLM model = AutoModelForCausalLM.from_pretrained( model_path, device_map="auto", torch_dtype=torch.float16, load_in_8bit=load_in_8bit, ) else: print(backend_type + "not implemented.") return model @classmethod def create_llama2_tokenizer(cls, model_path): from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(model_path) return tokenizer def get_token_length( self, prompt: str, ) -> int: if self.backend_type is BackendType.LLAMA_CPP: input_ids = self.model.tokenize(bytes(prompt, "utf-8")) return len(input_ids) else: input_ids = self.tokenizer([prompt], return_tensors="np")["input_ids"] return input_ids.shape[-1] def get_input_token_length( self, message: str, chat_history: list[tuple[str, str]] = [], system_prompt: str = "", ) -> int: prompt = get_prompt(message, chat_history, system_prompt) return self.get_token_length(prompt) def generate( self, prompt: str, max_new_tokens: int = 1000, temperature: float = 0.9, top_p: float = 1.0, top_k: int = 40, repetition_penalty: float = 1.0, **kwargs: Any, ) -> Iterator[str]: """Create a generator of response from a prompt. Examples: >>> llama2_wrapper = LLAMA2_WRAPPER() >>> prompt = get_prompt("Hi do you know Pytorch?") >>> for response in llama2_wrapper.generate(prompt): ... print(response) Args: prompt: The prompt to generate text from. max_new_tokens: The maximum number of tokens to generate. temperature: The temperature to use for sampling. top_p: The top-p value to use for sampling. top_k: The top-k value to use for sampling. repetition_penalty: The penalty to apply to repeated tokens. kwargs: all other arguments. Yields: The generated text. """ if self.backend_type is BackendType.LLAMA_CPP: result = self.model( prompt=prompt, stream=True, max_tokens=max_new_tokens, top_k=top_k, top_p=top_p, temperature=temperature, repeat_penalty=repetition_penalty, **kwargs, ) outputs = [] for part in result: text = part["choices"][0]["text"] outputs.append(text) yield "".join(outputs) else: from transformers import TextIteratorStreamer inputs = self.tokenizer([prompt], return_tensors="pt").to("cuda") streamer = TextIteratorStreamer( self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True ) generate_kwargs = dict( inputs, streamer=streamer, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, top_k=top_k, repetition_penalty=repetition_penalty, # num_beams=1, ) generate_kwargs = ( generate_kwargs if kwargs is None else {**generate_kwargs, **kwargs} ) t = Thread(target=self.model.generate, kwargs=generate_kwargs) t.start() outputs = [] for text in streamer: outputs.append(text) yield "".join(outputs) def run( self, message: str, chat_history: list[tuple[str, str]] = [], system_prompt: str = "", max_new_tokens: int = 1000, temperature: float = 0.9, top_p: float = 1.0, top_k: int = 40, repetition_penalty: float = 1.0, ) -> Iterator[str]: """Create a generator of response from a chat message. Process message to llama2 prompt with chat history and system_prompt for chatbot. Args: message: The origianl chat message to generate text from. chat_history: Chat history list from chatbot. system_prompt: System prompt for chatbot. max_new_tokens: The maximum number of tokens to generate. temperature: The temperature to use for sampling. top_p: The top-p value to use for sampling. top_k: The top-k value to use for sampling. repetition_penalty: The penalty to apply to repeated tokens. kwargs: all other arguments. Yields: The generated text. """ prompt = get_prompt(message, chat_history, system_prompt) return self.generate( prompt, max_new_tokens, temperature, top_p, top_k, repetition_penalty ) def __call__( self, prompt: str, stream: bool = False, max_new_tokens: int = 1000, temperature: float = 0.9, top_p: float = 1.0, top_k: int = 40, repetition_penalty: float = 1.0, **kwargs: Any, ) -> Union[str, Iterator[str]]: """Generate text from a prompt. Examples: >>> llama2_wrapper = LLAMA2_WRAPPER() >>> prompt = get_prompt("Hi do you know Pytorch?") >>> print(llama2_wrapper(prompt)) Args: prompt: The prompt to generate text from. stream: Whether to stream the results. max_new_tokens: The maximum number of tokens to generate. temperature: The temperature to use for sampling. top_p: The top-p value to use for sampling. top_k: The top-k value to use for sampling. repetition_penalty: The penalty to apply to repeated tokens. kwargs: all other arguments. Raises: ValueError: If the requested tokens exceed the context window. RuntimeError: If the prompt fails to tokenize or the model fails to evaluate the prompt. Returns: Generated text. """ if self.backend_type is BackendType.LLAMA_CPP: completion_or_chunks = self.model.__call__( prompt, stream=stream, max_tokens=max_new_tokens, temperature=temperature, top_p=top_p, top_k=top_k, repeat_penalty=repetition_penalty, **kwargs, ) if stream: def chunk_generator(chunks): for part in chunks: chunk = part["choices"][0]["text"] yield chunk chunks: Iterator[str] = chunk_generator(completion_or_chunks) return chunks return completion_or_chunks["choices"][0]["text"] else: inputs = self.tokenizer([prompt], return_tensors="pt").input_ids prompt_tokens_len = len(inputs[0]) inputs = inputs.to("cuda") generate_kwargs = dict( inputs=inputs, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, top_k=top_k, repetition_penalty=repetition_penalty, # num_beams=1, ) generate_kwargs = ( generate_kwargs if kwargs is None else {**generate_kwargs, **kwargs} ) if stream: from transformers import TextIteratorStreamer streamer = TextIteratorStreamer( self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True, ) generate_kwargs["streamer"] = streamer t = Thread(target=self.model.generate, kwargs=generate_kwargs) t.start() return streamer else: output_ids = self.model.generate( **generate_kwargs, ) # skip prompt, skip special tokens output = self.tokenizer.decode( output_ids[0][prompt_tokens_len:], skip_special_tokens=True ) return output def completion( self, prompt: str, stream: bool = False, max_new_tokens: int = 1000, temperature: float = 0.9, top_p: float = 1.0, top_k: int = 40, repetition_penalty: float = 1.0, **kwargs: Any, ) -> Union[Completion, Iterator[CompletionChunk]]: """For OpenAI compatible API /v1/completions Generate text from a prompt. Examples: >>> llama2_wrapper = LLAMA2_WRAPPER() >>> prompt = get_prompt("Hi do you know Pytorch?") >>> print(llm.completion(prompt)) Args: prompt: The prompt to generate text from. stream: Whether to stream the results. max_new_tokens: The maximum number of tokens to generate. temperature: The temperature to use for sampling. top_p: The top-p value to use for sampling. top_k: The top-k value to use for sampling. repetition_penalty: The penalty to apply to repeated tokens. kwargs: all other arguments. Raises: ValueError: If the requested tokens exceed the context window. RuntimeError: If the prompt fails to tokenize or the model fails to evaluate the prompt. Returns: Response object containing the generated text. """ completion_id: str = f"cmpl-{str(uuid.uuid4())}" created: int = int(time.time()) model_name: str = ( self.backend_type + " default model" if self.model_path == "" else self.model_path ) if self.backend_type is BackendType.LLAMA_CPP: completion_or_chunks = self.model.__call__( prompt, stream=stream, max_tokens=max_new_tokens, temperature=temperature, top_p=top_p, top_k=top_k, repeat_penalty=repetition_penalty, **kwargs, ) if stream: chunks: Iterator[CompletionChunk] = completion_or_chunks return chunks return completion_or_chunks else: inputs = self.tokenizer([prompt], return_tensors="pt").input_ids prompt_tokens_len = len(inputs[0]) inputs = inputs.to("cuda") generate_kwargs = dict( inputs=inputs, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, top_k=top_k, repetition_penalty=repetition_penalty, # num_beams=1, ) generate_kwargs = ( generate_kwargs if kwargs is None else {**generate_kwargs, **kwargs} ) if stream: from transformers import TextIteratorStreamer streamer = TextIteratorStreamer( self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True, ) generate_kwargs["streamer"] = streamer t = Thread(target=self.model.generate, kwargs=generate_kwargs) t.start() def chunk_generator(chunks): for part in chunks: yield { "id": completion_id, "object": "text_completion", "created": created, "model": model_name, "choices": [ { "text": part, "index": 0, "logprobs": None, "finish_reason": None, } ], } chunks: Iterator[CompletionChunk] = chunk_generator(streamer) return chunks else: output_ids = self.model.generate( **generate_kwargs, ) total_tokens_len = len(output_ids[0]) output = self.tokenizer.decode( output_ids[0][prompt_tokens_len:], skip_special_tokens=True ) completion: Completion = { "id": completion_id, "object": "text_completion", "created": created, "model": model_name, "choices": [ { "text": output, "index": 0, "logprobs": None, "finish_reason": None, } ], "usage": { "prompt_tokens": prompt_tokens_len, "completion_tokens": total_tokens_len - prompt_tokens_len, "total_tokens": total_tokens_len, }, } return completion def chat_completion( self, messages: List[Message], stream: bool = False, max_new_tokens: int = 1000, temperature: float = 0.9, top_p: float = 1.0, top_k: int = 40, repetition_penalty: float = 1.0, **kwargs: Any, ) -> Union[ChatCompletion, Iterator[ChatCompletionChunk]]: """For OpenAI compatible API /v1/chat/completions Generate text from a dialog (chat history). Examples: >>> llama2_wrapper = LLAMA2_WRAPPER() >>> dialog = [ { "role":"system", "content":"You are a helpful, respectful and honest assistant. " },{ "role":"user", "content":"Hi do you know Pytorch?", }, ] >>> print(llm.chat_completion(dialog)) Args: dialog: The dialog (chat history) to generate text from. stream: Whether to stream the results. max_new_tokens: The maximum number of tokens to generate. temperature: The temperature to use for sampling. top_p: The top-p value to use for sampling. top_k: The top-k value to use for sampling. repetition_penalty: The penalty to apply to repeated tokens. kwargs: all other arguments. Raises: ValueError: If the requested tokens exceed the context window. RuntimeError: If the prompt fails to tokenize or the model fails to evaluate the prompt. Returns: Response object containing the generated text. """ completion_id: str = f"cmpl-{str(uuid.uuid4())}" created: int = int(time.time()) model_name: str = ( self.backend_type + " default model" if self.model_path == "" else self.model_path ) if self.backend_type is BackendType.LLAMA_CPP: completion_or_chunks = self.model.create_chat_completion( messages, stream=stream, max_tokens=max_new_tokens, temperature=temperature, top_p=top_p, top_k=top_k, repeat_penalty=repetition_penalty, **kwargs, ) if stream: chunks: Iterator[ChatCompletionChunk] = completion_or_chunks return chunks return completion_or_chunks else: prompt = get_prompt_for_dialog(messages) inputs = self.tokenizer([prompt], return_tensors="pt").input_ids prompt_tokens_len = len(inputs[0]) inputs = inputs.to("cuda") generate_kwargs = dict( inputs=inputs, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, top_k=top_k, repetition_penalty=repetition_penalty, # num_beams=1, ) generate_kwargs = ( generate_kwargs if kwargs is None else {**generate_kwargs, **kwargs} ) if stream: from transformers import TextIteratorStreamer streamer = TextIteratorStreamer( self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True, ) generate_kwargs["streamer"] = streamer t = Thread(target=self.model.generate, kwargs=generate_kwargs) t.start() def chunk_generator(chunks): yield { "id": "chat" + completion_id, "model": model_name, "created": created, "object": "chat.completion.chunk", "choices": [ { "index": 0, "delta": { "role": "assistant", }, "finish_reason": None, } ], } for part in enumerate(chunks): yield { "id": "chat" + completion_id, "model": model_name, "created": created, "object": "chat.completion.chunk", "choices": [ { "index": 0, "delta": { "content": part, }, "finish_reason": None, } ], } chunks: Iterator[ChatCompletionChunk] = chunk_generator(streamer) return chunks else: output_ids = self.model.generate( **generate_kwargs, ) total_tokens_len = len(output_ids[0]) output = self.tokenizer.decode( output_ids[0][prompt_tokens_len:], skip_special_tokens=True ) chatcompletion: ChatCompletion = { "id": "chat" + completion_id, "object": "chat.completion", "created": created, "model": model_name, "choices": [ { "index": 0, "message": { "role": "assistant", "content": output, }, "finish_reason": None, } ], "usage": { "prompt_tokens": prompt_tokens_len, "completion_tokens": total_tokens_len - prompt_tokens_len, "total_tokens": total_tokens_len, }, } return chatcompletion def get_prompt_for_dialog(dialog: List[Message]) -> str: """Process dialog (chat history) to llama2 prompt for OpenAI compatible API /v1/chat/completions. Examples: >>> dialog = [ { "role":"system", "content":"You are a helpful, respectful and honest assistant. " },{ "role":"user", "content":"Hi do you know Pytorch?", }, ] >>> prompt = get_prompt_for_dialog("Hi do you know Pytorch?") Args: dialog: The dialog (chat history) to generate text from. Yields: prompt string. """ # add "<>\n{system_prompt}\n<>\n\n" in first dialog if dialog[0]["role"] == "system": dialog = [ { "role": dialog[1]["role"], "content": B_SYS + dialog[0]["content"] + E_SYS + dialog[1]["content"], } ] + dialog[2:] # check roles assert all([msg["role"] == "user" for msg in dialog[::2]]) and all( [msg["role"] == "assistant" for msg in dialog[1::2]] ), ( "model only supports 'system', 'user' and 'assistant' roles, " "starting with 'system', then 'user' and alternating (u/a/u/a/u...)" ) # add chat history texts = [] for prompt, answer in zip( dialog[::2], dialog[1::2], ): texts.append( f"{B_INST} {(prompt['content']).strip()} {E_INST} {(answer['content']).strip()} " ) # check last message if role is user, then add it to prompt text assert ( dialog[-1]["role"] == "user" ), f"Last message must be from user, got {dialog[-1]['role']}" texts.append(f"{B_INST} {(dialog[-1]['content']).strip()} {E_INST}") return "".join(texts) def get_prompt( message: str, chat_history: list[tuple[str, str]] = [], system_prompt: str = "" ) -> str: """Process message to llama2 prompt with chat history and system_prompt for chatbot. Examples: >>> prompt = get_prompt("Hi do you know Pytorch?") Args: message: The origianl chat message to generate text from. chat_history: Chat history list from chatbot. system_prompt: System prompt for chatbot. Yields: prompt string. """ texts = [f"[INST] <>\n{system_prompt}\n<>\n\n"] for user_input, response in chat_history: texts.append(f"{user_input.strip()} [/INST] {response.strip()} [INST] ") texts.append(f"{message.strip()} [/INST]") return "".join(texts) class BackendType(Enum): UNKNOWN = 0 TRANSFORMERS = 1 GPTQ = 2 LLAMA_CPP = 3 @classmethod def get_type(cls, backend_name: str): backend_type = None backend_name_lower = backend_name.lower() if "transformers" in backend_name_lower: backend_type = BackendType.TRANSFORMERS elif "gptq" in backend_name_lower: backend_type = BackendType.GPTQ elif "cpp" in backend_name_lower: backend_type = BackendType.LLAMA_CPP else: raise Exception("Unknown backend: " + backend_name) # backend_type = BackendType.UNKNOWN return backend_type