Spaces:
				
			
			
	
			
			
		Runtime error
		
	
	
	
			
			
	
	
	
	
		
		
		Runtime error
		
	| from pathlib import Path | |
| import torch | |
| from llama_cookbook.inference.model_utils import load_model as load_model_llamarecipes | |
| from llama_cookbook.inference.model_utils import load_peft_model | |
| from transformers import AutoTokenizer | |
| from src.utils import RankedLogger | |
| log = RankedLogger(__name__, rank_zero_only=True) | |
| def load_model( | |
| ckpt_path, quantization=None, use_fast_kernels=False, peft_model=False, **kwargs | |
| ): | |
| model = load_model_llamarecipes( | |
| model_name=ckpt_path, | |
| quantization=quantization, | |
| use_fast_kernels=use_fast_kernels, | |
| device_map="auto", | |
| **kwargs, | |
| ) | |
| if peft_model: | |
| model = load_peft_model(model, peft_model) | |
| tokenizer = AutoTokenizer.from_pretrained(ckpt_path) | |
| tokenizer.pad_token = tokenizer.eos_token | |
| # special_tokens = {"additional_special_tokens": ["<image>"]} | |
| # tokenizer.add_special_tokens(special_tokens) | |
| return model, tokenizer | |
| def inference( | |
| model, | |
| tokenizer: AutoTokenizer, | |
| prompt: str, | |
| add_special_tokens: bool = True, | |
| temperature: float = 1.0, | |
| max_new_tokens=1024, | |
| top_p: float = 1.0, | |
| top_k: int = 50, | |
| use_cache: bool = True, | |
| max_padding_length: int = None, | |
| do_sample: bool = False, | |
| min_length: int = None, | |
| repetition_penalty: float = 1.0, | |
| length_penalty: int = 1, | |
| max_prompt_tokens: int = 35_000, | |
| **kwargs, | |
| ): | |
| """ | |
| temperature: float, optional (default=1.0) The value used to module the next token probabilities. | |
| max_new_tokens: int, optional (default=1024) The maximum number of tokens to generate. | |
| top_p: float, optional (default=1.0) If set to float < 1 only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for generation. | |
| top_k: int, optional (default=50) The number of highest probability vocabulary tokens to keep for top-k-filtering. | |
| use_cache: bool, optional (default=True) Whether or not the model should use the past last key/values attentions Whether or not the model should use the past last key/values attentions (if applicable to the model) to speed up decoding. | |
| max_padding_length: int, optional (default=None) the max padding length to be used with tokenizer padding the prompts. | |
| do_sample: bool, optional (default=True) Whether or not to use sampling ; use greedy decoding otherwise. | |
| min_length: int, optional (default=None) The minimum length of the sequence to be generated input prompt + min_new_tokens | |
| repetition_penalty: float, optional (default=1.0) The parameter for repetition penalty. 1.0 means no penalty. | |
| length_penalty: int, optional (default=1) Exponential penalty to the length that is used with beam-based generation. | |
| """ | |
| if add_special_tokens: | |
| prompt = f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\nCutting Knowledge Date: December 2023\nToday Date: 26 Jul 2024\n\n<|eot_id|><|start_header_id|>user<|end_header_id|>\n\n{prompt}<|eot_id|><|start_header_id|>assistant<|end_header_id|>" | |
| # prompt = f"<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n{prompt}<|eot_id|><|start_header_id|>assistant<|end_header_id|>" | |
| batch = tokenizer( | |
| prompt, | |
| truncation=True, | |
| max_length=max_padding_length, | |
| return_tensors="pt", | |
| ) | |
| # if the input is too long, return the length of the input | |
| n_tokens = len(batch["input_ids"][0]) | |
| if max_prompt_tokens is not None and n_tokens > max_prompt_tokens: | |
| return n_tokens | |
| batch = {k: v.to("cuda") for k, v in batch.items()} | |
| terminators = [ | |
| tokenizer.eos_token_id, | |
| tokenizer.convert_tokens_to_ids("<|eot_id|>"), | |
| ] | |
| try: | |
| outputs = model.generate( | |
| **batch, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=do_sample, | |
| top_p=top_p, | |
| temperature=temperature, | |
| min_length=min_length, | |
| use_cache=use_cache, | |
| top_k=top_k, | |
| repetition_penalty=repetition_penalty, | |
| length_penalty=length_penalty, | |
| eos_token_id=terminators, | |
| pad_token_id=tokenizer.eos_token_id, | |
| **kwargs, | |
| ) | |
| output_text = tokenizer.decode(outputs[0], skip_special_tokens=False) | |
| output = output_text.split("<|start_header_id|>assistant<|end_header_id|>")[1] | |
| output = output.strip() | |
| output = output.removesuffix("<|eot_id|>") | |
| except torch.cuda.OutOfMemoryError as e: | |
| log.error(f"CUDA out of memory error: {e}") | |
| torch.cuda.empty_cache() | |
| return n_tokens | |
| return output | |
| class LlamaInference: | |
| def __init__( | |
| self, | |
| ckpt_path, | |
| quantization=None, | |
| use_fast_kernels=False, | |
| peft_model=False, | |
| add_special_tokens: bool = True, | |
| temperature: float = 1.0, | |
| max_new_tokens: int = 1024, | |
| top_p: float = 1.0, | |
| top_k: int = 50, | |
| use_cache: bool = True, | |
| max_padding_length: int = None, | |
| do_sample: bool = False, | |
| min_length: int = None, | |
| repetition_penalty: float = 1.0, | |
| length_penalty: int = 1, | |
| max_prompt_tokens: int = 35_000, | |
| **kwargs, | |
| ): | |
| # Check if LLaMA model exists | |
| # if not Path(ckpt_path).exists(): | |
| # log.warning(f"Model checkpoint does not exist at {ckpt_path}") | |
| # return None | |
| # If PEFT model is specified, check if it exists | |
| if peft_model and not Path(peft_model).exists(): | |
| log.warning(f"PEFT model does not exist at {peft_model}") | |
| return None | |
| if peft_model: | |
| log.info(f"PEFT model found at {peft_model}") | |
| model = load_model_llamarecipes( | |
| model_name=ckpt_path, | |
| quantization=quantization, | |
| use_fast_kernels=use_fast_kernels, | |
| device_map="auto", | |
| **kwargs, | |
| ) | |
| if peft_model: | |
| model = load_peft_model(model, peft_model) | |
| model.eval() | |
| tokenizer = AutoTokenizer.from_pretrained(ckpt_path) | |
| tokenizer.pad_token = tokenizer.eos_token | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.add_special_tokens = add_special_tokens | |
| self.temperature = temperature | |
| self.max_new_tokens = max_new_tokens | |
| self.top_p = top_p | |
| self.top_k = top_k | |
| self.use_cache = use_cache | |
| self.max_padding_length = max_padding_length | |
| self.do_sample = do_sample | |
| self.min_length = min_length | |
| self.repetition_penalty = repetition_penalty | |
| self.length_penalty = length_penalty | |
| self.max_prompt_tokens = max_prompt_tokens | |
| def __call__(self, prompt: str, **kwargs): | |
| # Create a dict of default parameters from instance attributes | |
| params = { | |
| "model": self.model, | |
| "tokenizer": self.tokenizer, | |
| "prompt": prompt, | |
| "add_special_tokens": self.add_special_tokens, | |
| "temperature": self.temperature, | |
| "max_new_tokens": self.max_new_tokens, | |
| "top_p": self.top_p, | |
| "top_k": self.top_k, | |
| "use_cache": self.use_cache, | |
| "max_padding_length": self.max_padding_length, | |
| "do_sample": self.do_sample, | |
| "min_length": self.min_length, | |
| "repetition_penalty": self.repetition_penalty, | |
| "length_penalty": self.length_penalty, | |
| "max_prompt_tokens": self.max_prompt_tokens, | |
| } | |
| # Update with any overrides passed in kwargs | |
| params.update(kwargs) | |
| return inference(**params) | |