PlantBasedTen's picture
Upload 22 files
bb59984 verified
import logging
import os
from pathlib import Path
from typing import List, Optional, Tuple
import torch
from comet_ml import API
from langchain.llms import HuggingFacePipeline
from peft import LoraConfig, PeftConfig, PeftModel
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
BitsAndBytesConfig,
StoppingCriteria,
StoppingCriteriaList,
TextIteratorStreamer,
pipeline,
)
from financial_bot import constants
from financial_bot.utils import MockedPipeline
logger = logging.getLogger(__name__)
def download_from_model_registry(
model_id: str, cache_dir: Optional[Path] = None
) -> Path:
"""
Downloads a model from the Comet ML Learning model registry.
Args:
model_id (str): The ID of the model to download, in the format "workspace/model_name:version".
cache_dir (Optional[Path]): The directory to cache the downloaded model in. Defaults to the value of
`constants.CACHE_DIR`.
Returns:
Path: The path to the downloaded model directory.
"""
if cache_dir is None:
cache_dir = constants.CACHE_DIR
output_folder = cache_dir / "models" / model_id
already_downloaded = output_folder.exists()
if not already_downloaded:
workspace, model_id = model_id.split("/")
model_name, version = model_id.split(":")
api = API()
model = api.get_model(workspace=workspace, model_name=model_name)
model.download(version=version, output_folder=output_folder, expand=True)
else:
logger.info(f"Model {model_id=} already downloaded to: {output_folder}")
subdirs = [d for d in output_folder.iterdir() if d.is_dir()]
if len(subdirs) == 1:
model_dir = subdirs[0]
else:
raise RuntimeError(
f"There should be only one directory inside the model folder. \
Check the downloaded model at: {output_folder}"
)
logger.info(f"Model {model_id=} downloaded from the registry to: {model_dir}")
return model_dir
class StopOnTokens(StoppingCriteria):
"""
A stopping criteria that stops generation when a specific token is generated.
Args:
stop_ids (List[int]): A list of token ids that will trigger the stopping criteria.
"""
def __init__(self, stop_ids: List[int]):
super().__init__()
self._stop_ids = stop_ids
def __call__(
self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs
) -> bool:
"""
Check if the last generated token is in the stop_ids list.
Args:
input_ids (torch.LongTensor): The input token ids.
scores (torch.FloatTensor): The scores of the generated tokens.
Returns:
bool: True if the last generated token is in the stop_ids list, False otherwise.
"""
for stop_id in self._stop_ids:
if input_ids[0][-1] == stop_id:
return True
return False
def build_huggingface_pipeline(
llm_model_id: str,
llm_lora_model_id: str,
max_new_tokens: int = constants.LLM_INFERNECE_MAX_NEW_TOKENS,
temperature: float = constants.LLM_INFERENCE_TEMPERATURE,
gradient_checkpointing: bool = False,
use_streamer: bool = False,
cache_dir: Optional[Path] = None,
debug: bool = False,
) -> Tuple[HuggingFacePipeline, Optional[TextIteratorStreamer]]:
"""
Builds a HuggingFace pipeline for text generation using a custom LLM + Finetuned checkpoint.
Args:
llm_model_id (str): The ID or path of the LLM model.
llm_lora_model_id (str): The ID or path of the LLM LoRA model.
max_new_tokens (int, optional): The maximum number of new tokens to generate. Defaults to 128.
temperature (float, optional): The temperature to use for sampling. Defaults to 0.7.
gradient_checkpointing (bool, optional): Whether to use gradient checkpointing. Defaults to False.
use_streamer (bool, optional): Whether to use a text iterator streamer. Defaults to False.
cache_dir (Optional[Path], optional): The directory to use for caching. Defaults to None.
debug (bool, optional): Whether to use a mocked pipeline for debugging. Defaults to False.
Returns:
Tuple[HuggingFacePipeline, Optional[TextIteratorStreamer]]: A tuple containing the HuggingFace pipeline
and the text iterator streamer (if used).
"""
if debug is True:
return (
HuggingFacePipeline(
pipeline=MockedPipeline(f=lambda _: "You are doing great!")
),
None,
)
model, tokenizer, _ = build_qlora_model(
pretrained_model_name_or_path=llm_model_id,
peft_pretrained_model_name_or_path=llm_lora_model_id,
gradient_checkpointing=gradient_checkpointing,
cache_dir=cache_dir,
)
model.eval()
if use_streamer:
streamer = TextIteratorStreamer(
tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True
)
stop_on_tokens = StopOnTokens(stop_ids=[tokenizer.eos_token_id])
stopping_criteria = StoppingCriteriaList([stop_on_tokens])
else:
streamer = None
stopping_criteria = StoppingCriteriaList([])
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=max_new_tokens,
temperature=temperature,
streamer=streamer,
stopping_criteria=stopping_criteria,
)
hf = HuggingFacePipeline(pipeline=pipe)
return hf, streamer
def build_qlora_model(
pretrained_model_name_or_path: str = "tiiuae/falcon-7b-instruct",
peft_pretrained_model_name_or_path: Optional[str] = None,
gradient_checkpointing: bool = True,
cache_dir: Optional[Path] = None,
) -> Tuple[AutoModelForCausalLM, AutoTokenizer, PeftConfig]:
"""
Function that builds a QLoRA LLM model based on the given HuggingFace name:
1. Create and prepare the bitsandbytes configuration for QLoRa's quantization
2. Download, load, and quantize on-the-fly Falcon-7b
3. Create and prepare the LoRa configuration
4. Load and configuration Falcon-7B's tokenizer
Args:
pretrained_model_name_or_path (str): The name or path of the pretrained model to use.
peft_pretrained_model_name_or_path (Optional[str]): The name or path of the PEFT pretrained model to use.
gradient_checkpointing (bool): Whether to use gradient checkpointing or not.
cache_dir (Optional[Path]): The directory to cache the downloaded models.
Returns:
Tuple[AutoModelForCausalLM, AutoTokenizer, PeftConfig]:
A tuple containing the QLoRA LLM model, tokenizer, and PEFT config.
"""
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
)
model = AutoModelForCausalLM.from_pretrained(
pretrained_model_name_or_path,
revision="main",
quantization_config=bnb_config,
load_in_4bit=True,
device_map="auto",
trust_remote_code=False,
cache_dir=str(cache_dir) if cache_dir else None,
)
tokenizer = AutoTokenizer.from_pretrained(
pretrained_model_name_or_path,
trust_remote_code=False,
truncation=True,
cache_dir=str(cache_dir) if cache_dir else None,
)
if tokenizer.pad_token_id is None:
tokenizer.add_special_tokens({"pad_token": "<|pad|>"})
with torch.no_grad():
model.resize_token_embeddings(len(tokenizer))
model.config.pad_token_id = tokenizer.pad_token_id
if peft_pretrained_model_name_or_path:
is_model_name = not os.path.isdir(peft_pretrained_model_name_or_path)
if is_model_name:
logger.info(
f"Downloading {peft_pretrained_model_name_or_path} from CometML Model Registry:"
)
peft_pretrained_model_name_or_path = download_from_model_registry(
model_id=peft_pretrained_model_name_or_path,
cache_dir=cache_dir,
)
logger.info(f"Loading Lora Confing from: {peft_pretrained_model_name_or_path}")
lora_config = LoraConfig.from_pretrained(peft_pretrained_model_name_or_path)
assert (
lora_config.base_model_name_or_path == pretrained_model_name_or_path
), f"Lora Model trained on different base model than the one requested: \
{lora_config.base_model_name_or_path} != {pretrained_model_name_or_path}"
logger.info(f"Loading Peft Model from: {peft_pretrained_model_name_or_path}")
model = PeftModel.from_pretrained(model, peft_pretrained_model_name_or_path)
else:
lora_config = LoraConfig(
lora_alpha=16,
lora_dropout=0.1,
r=64,
bias="none",
task_type="CAUSAL_LM",
target_modules=["query_key_value"],
)
if gradient_checkpointing:
model.gradient_checkpointing_enable()
model.config.use_cache = (
False # Gradient checkpointing is not compatible with caching.
)
else:
model.gradient_checkpointing_disable()
model.config.use_cache = True # It is good practice to enable caching when using the model for inference.
return model, tokenizer, lora_config