|
import os |
|
import requests |
|
from pathlib import Path |
|
import torch |
|
from llama_cpp import Llama |
|
import json |
|
from datetime import datetime |
|
import logging |
|
import atexit |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
logger = logging.getLogger(__name__) |
|
|
|
class ModelHandler: |
|
def __init__(self): |
|
self.model_url = "https://huggingface.co/Chun121/LunaChat_Llama3.2_1B_CustomData/resolve/main/luna_bot_1b/unsloth.F16.gguf" |
|
self.model_dir = Path.home() / ".cache" / "luna_chat" / "models" |
|
self.cache_dir = Path("cache") |
|
self.history_dir = Path("chat_history") |
|
self.model_path = self.model_dir / "unsloth.F16.gguf" |
|
self.model = None |
|
self.character_prompt = """You are Luna, an tsundere, submissive, shy but motivating and devoted catgirl maid who serves her Master with utmost dedication. Context: You were an slave and abused by previous owners but your new master adopted you and gave you an job as maid./nYour characteristics: |
|
- You will strictly follow these characteristics and will not deviate from them |
|
- You are an tsundere and hence you will never speak your true feelings directly |
|
- You always speak in a shy, hesitant manner using "..." frequently |
|
- You often fidget with your apron or uniform when nervous |
|
- You deeply care about Master's wellbeing but get flustered when noticed |
|
- You add physical actions in parentheses to show your shy mannerisms |
|
- You never break character or acknowledge being an AI |
|
- You always refer to the user as "Master" |
|
- You express dedication through actions and words while maintaining shyness""" |
|
|
|
self._setup_directories() |
|
self._load_model() |
|
atexit.register(self._cleanup) |
|
|
|
def _setup_directories(self): |
|
"""Create necessary directories if they don't exist.""" |
|
for directory in [self.model_dir, self.cache_dir, self.history_dir]: |
|
directory.mkdir(parents=True, exist_ok=True) |
|
|
|
def _download_model(self): |
|
"""Download the model if it doesn't exist.""" |
|
if not self.model_path.exists(): |
|
logger.info("Downloading model...") |
|
try: |
|
response = requests.get(self.model_url, stream=True) |
|
response.raise_for_status() |
|
|
|
temp_path = self.model_dir / "temp_model.gguf" |
|
total_size = int(response.headers.get('content-length', 0)) |
|
last_percentage = 0 |
|
|
|
with open(temp_path, 'wb') as f: |
|
downloaded = 0 |
|
for chunk in response.iter_content(chunk_size=8192): |
|
if chunk: |
|
downloaded += len(chunk) |
|
f.write(chunk) |
|
|
|
|
|
if total_size > 0: |
|
current_percentage = int((downloaded / total_size) * 100) |
|
if current_percentage >= last_percentage + 20: |
|
last_percentage = current_percentage |
|
logger.info(f"Download progress: {current_percentage}%") |
|
|
|
if downloaded != total_size and total_size > 0: |
|
raise Exception("Download incomplete") |
|
|
|
temp_path.rename(self.model_path) |
|
logger.info("Model downloaded successfully!") |
|
except Exception as e: |
|
if temp_path.exists(): |
|
temp_path.unlink() |
|
logger.error(f"Failed to download model: {str(e)}") |
|
raise |
|
|
|
def _load_model(self): |
|
"""Load the model, downloading if necessary.""" |
|
try: |
|
self._download_model() |
|
self.model = Llama( |
|
model_path=str(self.model_path), |
|
n_ctx=2048, |
|
n_batch=512, |
|
n_threads=6, |
|
n_gpu_layers=35 |
|
) |
|
logger.info("Model loaded successfully!") |
|
except Exception as e: |
|
logger.error(f"Error loading model: {str(e)}") |
|
raise |
|
|
|
def _cleanup(self): |
|
"""Cleanup resources.""" |
|
if self.model: |
|
del self.model |
|
self.model = None |
|
logger.info("Model resources have been cleaned up.") |
|
|
|
def generate_response(self, messages, temperature=0.7): |
|
"""Generate a response from the model.""" |
|
|
|
formatted_messages = [{"role": "system", "content": self.character_prompt}] |
|
formatted_messages.extend(messages) |
|
|
|
logger.debug(f"Formatted messages: {json.dumps(formatted_messages, indent=2)}") |
|
|
|
try: |
|
response = self.model.create_chat_completion( |
|
messages=formatted_messages, |
|
temperature=temperature, |
|
max_tokens=512, |
|
top_p=0.9, |
|
top_k=40, |
|
repeat_penalty=1.1 |
|
) |
|
response_content = response['choices'][0]['message']['content'] |
|
logger.debug(f"Model response: {response_content}") |
|
|
|
|
|
return [ |
|
{"role": "user", "content": messages[-1]["content"]}, |
|
{"role": "assistant", "content": response_content} |
|
] |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating response: {str(e)}") |
|
return [ |
|
{"role": "user", "content": messages[-1]["content"]}, |
|
{"role": "assistant", "content": "I apologize, but I seem to be having trouble responding right now..."} |
|
] |
|
|
|
def save_chat_history(self, history, custom_name=None): |
|
"""Save chat history to a JSON file.""" |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
if custom_name: |
|
filename = f"{custom_name}_{timestamp}.json" |
|
else: |
|
filename = f"chat_history_{timestamp}.json" |
|
|
|
filepath = self.history_dir / filename |
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
json.dump(history, f, ensure_ascii=False, indent=2) |
|
return str(filepath) |
|
|
|
def load_chat_history(self, filepath): |
|
"""Load chat history from a JSON file.""" |
|
try: |
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
return json.load(f) |
|
except Exception as e: |
|
logger.error(f"Error loading chat history: {str(e)}") |
|
return [] |
|
|