Spaces:
Paused
Paused
import torch | |
import logging | |
from auto_gptq import AutoGPTQForCausalLM | |
from huggingface_hub import hf_hub_download | |
from langchain.llms import LlamaCpp, HuggingFacePipeline | |
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler | |
from transformers import ( | |
AutoModelForCausalLM, | |
AutoTokenizer, | |
LlamaForCausalLM, | |
LlamaTokenizer, | |
GenerationConfig, | |
pipeline, | |
TextStreamer | |
) | |
torch.set_grad_enabled(False) | |
from constants import CONTEXT_WINDOW_SIZE, MAX_NEW_TOKENS, N_GPU_LAYERS, N_BATCH, MODELS_PATH | |
def load_quantized_model_gguf_ggml(model_id, model_basename, device_type, logging, stream = False, callbacks = []): | |
""" | |
Load a GGUF/GGML quantized model using LlamaCpp. | |
This function attempts to load a GGUF/GGML quantized model using the LlamaCpp library. | |
If the model is of type GGML, and newer version of LLAMA-CPP is used which does not support GGML, | |
it logs a message indicating that LLAMA-CPP has dropped support for GGML. | |
Parameters: | |
- model_id (str): The identifier for the model on HuggingFace Hub. | |
- model_basename (str): The base name of the model file. | |
- device_type (str): The type of device where the model will run, e.g., 'mps', 'cuda', etc. | |
- logging (logging.Logger): Logger instance for logging messages. | |
Returns: | |
- LlamaCpp: An instance of the LlamaCpp model if successful, otherwise None. | |
Notes: | |
- The function uses the `hf_hub_download` function to download the model from the HuggingFace Hub. | |
- The number of GPU layers is set based on the device type. | |
""" | |
try: | |
logging.info("Using Llamacpp for GGUF/GGML quantized models") | |
model_path = hf_hub_download( | |
repo_id=model_id, | |
filename=model_basename, | |
resume_download=True, | |
cache_dir=MODELS_PATH, | |
) | |
kwargs = { | |
"model_path": model_path, | |
"n_ctx": CONTEXT_WINDOW_SIZE, | |
"max_tokens": MAX_NEW_TOKENS, | |
"n_batch": N_BATCH, | |
# set this based on your GPU & CPU RAM | |
} | |
if device_type.lower() == "mps": | |
kwargs["n_gpu_layers"] = 1 | |
if device_type.lower() == "cuda": | |
kwargs["n_gpu_layers"] = N_GPU_LAYERS # set this based on your GPU | |
kwargs["stream"] = stream | |
if stream == True: | |
kwargs["callbacks"] = callbacks | |
return LlamaCpp(**kwargs) | |
except: | |
if "ggml" in model_basename: | |
logging.INFO("If you were using GGML model, LLAMA-CPP Dropped Support, Use GGUF Instead") | |
return None | |
def load_quantized_model_qptq(model_id, model_basename, device_type, logging): | |
""" | |
Load a GPTQ quantized model using AutoGPTQForCausalLM. | |
This function loads a quantized model that ends with GPTQ and may have variations | |
of .no-act.order or .safetensors in their HuggingFace repo. | |
Parameters: | |
- model_id (str): The identifier for the model on HuggingFace Hub. | |
- model_basename (str): The base name of the model file. | |
- device_type (str): The type of device where the model will run. | |
- logging (logging.Logger): Logger instance for logging messages. | |
Returns: | |
- model (AutoGPTQForCausalLM): The loaded quantized model. | |
- tokenizer (AutoTokenizer): The tokenizer associated with the model. | |
Notes: | |
- The function checks for the ".safetensors" ending in the model_basename and removes it if present. | |
""" | |
# The code supports all huggingface models that ends with GPTQ and have some variation | |
# of .no-act.order or .safetensors in their HF repo. | |
logging.info("Using AutoGPTQForCausalLM for quantized models") | |
if ".safetensors" in model_basename: | |
# Remove the ".safetensors" ending if present | |
model_basename = model_basename.replace(".safetensors", "") | |
tokenizer = AutoTokenizer.from_pretrained(model_id, use_fast=True) | |
logging.info("Tokenizer loaded") | |
model = AutoGPTQForCausalLM.from_quantized( | |
model_id, | |
model_basename=model_basename, | |
use_safetensors=True, | |
trust_remote_code=True, | |
device_map="auto", | |
use_triton=False, | |
quantize_config=None, | |
) | |
return model, tokenizer | |
def load_full_model(model_id, model_basename, device_type, logging): | |
""" | |
Load a full model using either LlamaTokenizer or AutoModelForCausalLM. | |
This function loads a full model based on the specified device type. | |
If the device type is 'mps' or 'cpu', it uses LlamaTokenizer and LlamaForCausalLM. | |
Otherwise, it uses AutoModelForCausalLM. | |
Parameters: | |
- model_id (str): The identifier for the model on HuggingFace Hub. | |
- model_basename (str): The base name of the model file. | |
- device_type (str): The type of device where the model will run. | |
- logging (logging.Logger): Logger instance for logging messages. | |
Returns: | |
- model (Union[LlamaForCausalLM, AutoModelForCausalLM]): The loaded model. | |
- tokenizer (Union[LlamaTokenizer, AutoTokenizer]): The tokenizer associated with the model. | |
Notes: | |
- The function uses the `from_pretrained` method to load both the model and the tokenizer. | |
- Additional settings are provided for NVIDIA GPUs, such as loading in 4-bit and setting the compute dtype. | |
""" | |
if device_type.lower() in ["mps", "cpu"]: | |
logging.info("Using LlamaTokenizer") | |
tokenizer = LlamaTokenizer.from_pretrained(model_id, cache_dir="./models/") | |
model = LlamaForCausalLM.from_pretrained(model_id, cache_dir="./models/") | |
else: | |
logging.info("Using AutoModelForCausalLM for full models") | |
tokenizer = AutoTokenizer.from_pretrained(model_id, cache_dir="./models/") | |
logging.info("Tokenizer loaded") | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
device_map="auto", | |
torch_dtype=torch.float16, | |
low_cpu_mem_usage=True, | |
cache_dir=MODELS_PATH, | |
# trust_remote_code=True, # set these if you are using NVIDIA GPU | |
# load_in_4bit=True, | |
# bnb_4bit_quant_type="nf4", | |
# bnb_4bit_compute_dtype=torch.float16, | |
max_memory={0: "15GB"} # Uncomment this line with you encounter CUDA out of memory errors | |
) | |
model.tie_weights() | |
return model, tokenizer | |
def load_model(device_type, model_id, model_basename=None, LOGGING=logging, stream=False, callbacks = []): | |
""" | |
Select a model for text generation using the HuggingFace library. | |
If you are running this for the first time, it will download a model for you. | |
subsequent runs will use the model from the disk. | |
Args: | |
device_type (str): Type of device to use, e.g., "cuda" for GPU or "cpu" for CPU. | |
model_id (str): Identifier of the model to load from HuggingFace's model hub. | |
model_basename (str, optional): Basename of the model if using quantized models. | |
Defaults to None. | |
Returns: | |
HuggingFacePipeline: A pipeline object for text generation using the loaded model. | |
Raises: | |
ValueError: If an unsupported model or device type is provided. | |
""" | |
logging.info(f"Loading Model: {model_id}, on: {device_type}") | |
logging.info("This action can take a few minutes!") | |
if model_basename is not None: | |
if ".gguf" in model_basename.lower(): | |
llm = load_quantized_model_gguf_ggml(model_id, model_basename, device_type, LOGGING, stream, callbacks) | |
return llm | |
elif ".ggml" in model_basename.lower(): | |
model, tokenizer = load_quantized_model_gguf_ggml(model_id, model_basename, device_type, LOGGING) | |
else: | |
model, tokenizer = load_quantized_model_qptq(model_id, model_basename, device_type, LOGGING) | |
else: | |
model, tokenizer = load_full_model(model_id, model_basename, device_type, LOGGING) | |
# Load configuration from the model to avoid warnings | |
generation_config = GenerationConfig.from_pretrained(model_id) | |
# see here for details: | |
# https://huggingface.co/docs/transformers/ | |
# main_classes/text_generation#transformers.GenerationConfig.from_pretrained.returns | |
# Create a pipeline for text generation | |
streamer = TextStreamer(tokenizer, skip_prompt=True) | |
pipe = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_length=50, | |
temperature=0.15, | |
top_p=0.1, | |
top_k=40, | |
repetition_penalty=1.0, | |
generation_config=generation_config, | |
streamer=streamer, | |
# callbacks=callbacks | |
) | |
local_llm = HuggingFacePipeline(pipeline=pipe) | |
logging.info("Local LLM Loaded") | |
return [local_llm, streamer] | |