|
import os |
|
import logging |
|
from abc import ABC |
|
|
|
import requests |
|
|
|
from langchain_community.llms import LlamaCpp |
|
|
|
from llm.utils.config import config |
|
from llm.utils.lc_interface import LCInterface |
|
|
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.ERROR) |
|
|
|
file_handler = logging.FileHandler( |
|
"logs/chelsea_llm_llamacpp.log") |
|
logger.setLevel(logging.INFO) |
|
|
|
formatted = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") |
|
file_handler.setFormatter(formatted) |
|
logger.addHandler(file_handler) |
|
|
|
work_dir = os.getcwd() |
|
models_dir = os.path.join(work_dir, "llm/models") |
|
|
|
|
|
class LC_TinyLlama(LCInterface, ABC): |
|
def __init__(self): |
|
self.model_config = config["LC_TinyLlama-1.1B-Chat-v1.0-GGUF"] |
|
|
|
try: |
|
get_file = requests.get(self.model_config["model_url"]) |
|
if get_file.status_code == 200: |
|
path_to_model = os.path.join(models_dir, self.model_config["model_name"]) |
|
with open(path_to_model, "wb") as f: |
|
f.write(get_file.content) |
|
logger.info("Model file successfully recorded") |
|
f.close() |
|
except FileExistsError: |
|
print(f"Model file {path_to_model} already exists. Skipping download.") |
|
logger.info(f"Model file {path_to_model} already exists. Skipping download.") |
|
except OSError as e: |
|
print(f"Error while writing a file to directory : {e}") |
|
logger.error(msg="Error while write a file to directory", exc_info=e) |
|
|
|
self.llm = LlamaCpp( |
|
model_path=os.path.join(models_dir, self.model_config["model_name"]), |
|
temperature=self.model_config["temperature"], |
|
max_tokens=self.model_config["max_tokens"], |
|
top_p=self.model_config["top_p"], |
|
top_k=self.model_config["top_k"], |
|
|
|
verbose=True, |
|
) |
|
|
|
def execution(self): |
|
try: |
|
return self.llm |
|
except Exception as e: |
|
print(f"Execution filed in LC_TinyLlama execution function: {e}") |
|
logger.critical(msg="Execution filed in LC_TinyLlama execution function", exc_info=e) |
|
return None |
|
|
|
def clear_llm(self, unused_model_dict, current_lc): |
|
|
|
if len(unused_model_dict) > 1 or unused_model_dict is not None: |
|
|
|
for key, value in unused_model_dict.items(): |
|
|
|
if os.path.exists(value) and key != current_lc: |
|
|
|
os.remove(value) |
|
logger.info(f"Successfully deleted file {value}") |
|
print(f"Successfully deleted file {value}") |
|
else: |
|
logger.info(f"Unfortunately dictionary empty or None") |
|
print(f"Unfortunately dictionary {unused_model_dict} empty or None") |
|
|
|
def get_unused(self, current_lc): |
|
|
|
if len(os.listdir(models_dir)) > 1: |
|
file_names = [os.path.basename(md) for md in os.listdir(models_dir)] |
|
for item in file_names: |
|
if item != current_lc: |
|
unused_model_file = os.path.join(models_dir, item) |
|
return {item: unused_model_file} |
|
else: |
|
return None |
|
|
|
def model_name(self): |
|
return self.model_config["model_name"] |
|
|
|
def __str__(self): |
|
return f"{self.__class__.__name__}_{self.model_name()}" |
|
|
|
def __repr__(self): |
|
llm_info = f"llm={self.llm}" if hasattr(self, 'llm') else 'llm=not initialized' |
|
return f"{self.__class__.__name__}({llm_info})" |
|
|
|
|
|
class LC_Phi3(LCInterface, ABC): |
|
def __init__(self): |
|
self.model_config = config["LC_Phi-3-mini-4k-instruct-gguf"] |
|
|
|
try: |
|
get_file = requests.get(self.model_config["model_url"]) |
|
if get_file.status_code == 200: |
|
path_to_model = os.path.join(models_dir, self.model_config["model_name"]) |
|
with open(path_to_model, "wb") as f: |
|
f.write(get_file.content) |
|
logger.info("Model file successfully recorded") |
|
print("Model file successfully recorded") |
|
f.close() |
|
except FileExistsError: |
|
print(f"Model file {path_to_model} already exists. Skipping download.") |
|
logger.info(f"Model file {path_to_model} already exists. Skipping download.") |
|
except OSError as e: |
|
print(f"Error while writing a file to directory : {e}") |
|
logger.error(msg="Error while write a file to directory", exc_info=e) |
|
|
|
self.llm = LlamaCpp( |
|
model_path=os.path.join(models_dir, self.model_config["model_name"]), |
|
temperature=self.model_config["temperature"], |
|
max_tokens=self.model_config["max_tokens"], |
|
top_p=self.model_config["top_p"], |
|
top_k=self.model_config["top_k"], |
|
|
|
verbose=True, |
|
) |
|
|
|
|
|
def execution(self): |
|
try: |
|
return self.llm |
|
except Exception as e: |
|
print(f"Execution filed in LC_Phi3 execution function: {e}") |
|
logger.critical(msg="Execution filed in LC_Phi3 execution function:", exc_info=e) |
|
return None |
|
|
|
def clear_llm(self, unused_model_dict, current_lc): |
|
|
|
if len(unused_model_dict) > 1 or unused_model_dict is not None: |
|
|
|
for key, value in unused_model_dict.items(): |
|
|
|
if os.path.exists(value) and key != current_lc: |
|
|
|
os.remove(value) |
|
logger.info(f"Successfully deleted file {value}") |
|
print(f"Successfully deleted file {value}") |
|
else: |
|
logger.info(f"Unfortunately dictionary empty or None") |
|
print(f"Unfortunately dictionary {unused_model_dict} empty or None") |
|
|
|
def get_unused(self, current_lc): |
|
|
|
if len(os.listdir(models_dir)) > 1: |
|
file_names = [os.path.basename(md) for md in os.listdir(models_dir)] |
|
for item in file_names: |
|
if item != current_lc: |
|
unused_model_file = os.path.join(models_dir, item) |
|
return {item: unused_model_file} |
|
else: |
|
return None |
|
|
|
def model_name(self): |
|
return self.model_config["model_name"] |
|
|
|
def __str__(self): |
|
return f"{self.__class__.__name__}_{self.model_name()}" |
|
|
|
def __repr__(self): |
|
llm_info = f"llm={self.llm}" if hasattr(self, 'llm') else 'llm=not initialized' |
|
return f"{self.__class__.__name__}({llm_info})" |