Chelsea / llm /llamacpp /lc_model.py
CineAI's picture
Update llm/llamacpp/lc_model.py
ffd9848 verified
raw
history blame
8.89 kB
import os
import logging
from abc import ABC
import requests
import yaml
from langchain.prompts import PromptTemplate
from langchain_community.llms import LlamaCpp
from llm.config import config
from llm.lc_interface import LCInterface
# print(os.getcwd())
print("Current path : ", os.path.dirname(os.path.realpath(__file__)))
logger = logging.getLogger(__name__)
logger.setLevel(logging.CRITICAL) # because if something went wrong in execution application can't be work anymore
file_handler = logging.FileHandler(
"logs/chelsea_llm_llamacpp.log") # for all modules template for logs file is "logs/chelsea_{module_name}_{dir_name}.log"
logger.setLevel(logging.INFO) # informed
formatted = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatted)
logger.addHandler(file_handler)
try:
os.chdir('/home/user/app/llm/')
except FileNotFoundError:
print("Error: Could not move up. You might be at the root directory.")
work_dir = os.getcwd()
models_dir = os.path.join(work_dir, "models")
class LC_TinyLlama(LCInterface, ABC):
def __init__(self, prompt_entity: str, prompt_id: int = 0):
self.prompt_entity = prompt_entity
self.prompt_id = prompt_id
self.model_config = config["LC_TinyLlama-1.1B-Chat-v1.0-GGUF"]
try:
get_file = requests.get(self.model_config["model_url"])
if get_file.status_code == 200:
path_to_model = os.path.join(models_dir, self.model_config["model_name"])
with open(path_to_model, "wb") as f:
f.write(get_file.content)
logger.info("Model file successfully recorded")
f.close()
except FileExistsError:
print(f"Model file {path_to_model} already exists. Skipping download.")
logger.info(f"Model file {path_to_model} already exists. Skipping download.")
except OSError as e:
print(f"Error while writing a file to directory : {e}")
logger.error(msg="Error while write a file to directory", exc_info=e)
@staticmethod
def __read_yaml():
try:
yaml_file = os.path.join(work_dir, 'prompts.yaml')
with open(yaml_file, 'r') as file:
data = yaml.safe_load(file)
return data
except Exception as e:
print(f"Execution filed : {e}")
logger.error(msg="Execution filed", exc_info=e)
def execution(self):
try:
data = self.__read_yaml()
prompts = data["prompts"][
self.prompt_id] # to get second prompt from yaml, need change id parameter to get other prompt
template = prompts["prompt_template"]
prompt = PromptTemplate(template=template, input_variables=["entity"])
llm = LlamaCpp(
model_path=os.path.join(models_dir, self.model_config["model_name"]),
temperature=self.model_config["temperature"],
max_tokens=self.model_config["max_tokens"],
top_p=self.model_config["top_p"],
top_k=self.model_config["top_k"],
# callback_manager=callback_manager,
verbose=True, # Verbose is required to pass to the callback manager
)
logger.info(f"Check llm : {llm}")
llm_chain = prompt | llm
output = llm_chain.invoke({"question": self.prompt_entity})
return output
except Exception as e:
print(f"Execution filed : {e}")
logger.critical(msg="Execution filed", exc_info=e)
def clear_llm(self, unused_model_dict, current_lc):
# If unused_model_dict is not empty
if len(unused_model_dict) > 1 or unused_model_dict is not None:
# go through key and value
for key, value in unused_model_dict.items():
# check if path is existing and key is not current using model
if os.path.exists(value) and key != current_lc:
# delete files from models directory except of current_lc
os.remove(value)
logger.info(f"Successfully deleted file {value}")
else:
logger.info(f"Unfortunately dictionary empty or None")
def get_unused(self, current_lc):
if len(os.listdir(models_dir)) > 1:
file_names = [os.path.basename(md) for md in os.listdir(models_dir)]
for item in file_names:
if item != current_lc:
unused_model_file = os.path.join(models_dir, item)
return {item: unused_model_file}
else:
return None
def __str__(self):
return f"prompt_entity={self.prompt_entity}, prompt_id={self.prompt_id}"
def __repr__(self):
return f"{self.__class__.__name__}(prompt_entity: {type(self.prompt_entity)} = {self.prompt_entity}, prompt_id: {type(self.prompt_id)} = {self.prompt_id})"
class LC_Phi3(LCInterface, ABC):
def __init__(self, prompt_entity: str, prompt_id: int = 0):
self.prompt_entity = prompt_entity
self.prompt_id = prompt_id
self.model_config = config["LC_Phi-3-mini-4k-instruct-gguf"]
try:
get_file = requests.get(self.model_config["model_url"])
if get_file.status_code == 200:
path_to_model = os.path.join(models_dir, self.model_config["model_name"])
with open(path_to_model, "wb") as f:
f.write(get_file.content)
logger.info("Model file successfully recorded")
f.close()
except FileExistsError:
logger.info(f"Model file {path_to_model} already exists. Skipping download.")
except OSError as e:
print(f"Error while writing a file to directory : {e}")
logger.error(msg="Error while write a file to directory", exc_info=e)
@staticmethod
def __read_yaml():
try:
yaml_file = os.path.join(work_dir, 'prompts.yaml')
with open(yaml_file, 'r') as file:
data = yaml.safe_load(file)
return data
except Exception as e:
print(f"Execution filed : {e}")
logger.error(msg="Execution filed", exc_info=e)
def execution(self):
try:
data = self.__read_yaml()
prompts = data["prompts"][
self.prompt_id] # get second prompt from yaml, need change id parameter to get other prompt
template = prompts["prompt_template"]
prompt = PromptTemplate(template=template, input_variables=["entity"])
llm = LlamaCpp(
model_path=os.path.join(models_dir, self.model_config["model_name"]),
temperature=self.model_config["temperature"],
max_tokens=self.model_config["max_tokens"],
top_p=self.model_config["top_p"],
top_k=self.model_config["top_k"],
# callback_manager=callback_manager,
verbose=True, # Verbose is required to pass to the callback manager
)
logger.info(f"Check llm : {llm}")
llm_chain = prompt | llm
output = llm_chain.invoke({"question": self.prompt_entity})
return output
except Exception as e:
print(f"Execution filed : {e}")
logger.critical(msg="Execution filed", exc_info=e)
def clear_llm(self, unused_model_dict, current_lc):
# If unused_model_dict is not empty
if len(unused_model_dict) > 1 or unused_model_dict is not None:
# go through key and value
for key, value in unused_model_dict.items():
# check if path is existing and key is not current using model
if os.path.exists(value) and key != current_lc:
# delete files from models directory except of current_lc
os.remove(value)
logger.info(f"Successfully deleted file {value}")
else:
logger.info(f"Unfortunately dictionary empty or None")
def get_unused(self, current_lc):
if len(os.listdir(models_dir)) > 1:
file_names = [os.path.basename(md) for md in os.listdir(models_dir)]
for item in file_names:
if item != current_lc:
unused_model_file = os.path.join(models_dir, item)
return {item: unused_model_file}
else:
return None
def __str__(self):
return f"prompt_entity={self.prompt_entity}, prompt_id={self.prompt_id}"
def __repr__(self):
return f"{self.__class__.__name__}(prompt_entity: {type(self.prompt_entity)} = {self.prompt_entity}, prompt_id: {type(self.prompt_id)} = {self.prompt_id})"