ZOTHEOS-App / modules /model_manager_public.py
ZOTHEOS's picture
Upload 9 files
4d1849f verified
# FILE: modules/model_manager_public.py
import asyncio
import logging
import os
import sys
import time
import threading
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from typing import Dict, Any, Optional, List, Callable, Coroutine
# --- Llama.cpp Python Backend Import & Debug ---
logger_import_debug = logging.getLogger("ModelManager_ImportDebug")
if not logger_import_debug.handlers: # Minimal setup for this specific logger
_h = logging.StreamHandler(sys.stdout)
_f = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
_h.setFormatter(_f)
logger_import_debug.addHandler(_h)
logger_import_debug.setLevel(logging.INFO)
# --- Start of FIX: Define LlamaMock unconditionally and manage Llama variable ---
class LlamaMock:
def __init__(self, model_path: str = "mock_model_path", *args, **kwargs):
# Use logger_import_debug as it's defined at this point
logger_import_debug.error(f"LlamaMock initialized for model_path='{model_path}' with args: {args}, kwargs: {kwargs}. llama_cpp is not installed or importable.")
self.model_path = model_path
self.n_ctx_train = kwargs.get('n_ctx', 0) # Mock common attributes
self.metadata = {}
def create_chat_completion(self, messages: List[Dict[str,str]], *args, **kwargs) -> Dict[str, Any]:
logger_import_debug.error(f"LlamaMock: create_chat_completion called for '{self.model_path}', but llama_cpp is unavailable.")
return {
"id": "chatcmpl-mock", "object": "chat.completion", "created": int(time.time()),
"model": self.model_path,
"choices": [{"index": 0, "message": {"role": "assistant", "content": f"[Error: Llama.cpp backend not available for {self.model_path}]"}, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
}
def __call__(self, prompt: str, *args, **kwargs) -> Dict[str, Any]: # For older direct call style
logger_import_debug.error(f"LlamaMock: __call__ used for '{self.model_path}', but llama_cpp is unavailable.")
return {
"id": "cmpl-mock", "object": "text_completion", "created": int(time.time()),
"model": self.model_path,
"choices": [{"text": f"[Error: Llama.cpp backend not available for {self.model_path}]", "index": 0, "logprobs": None, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
}
def get_metadata(self) -> Dict[str, Any]:
return self.metadata
def tokenizer(self) -> Any:
class MockTokenizer:
def encode(self, text: str) -> List[int]: return [0] * len(text)
def decode(self, tokens: List[int]) -> str: return "".join([chr(t) for t in tokens if t < 256]) # Simplistic
return MockTokenizer()
# Initialize variables that will hold the class to be used
Llama_imported_class: Any = LlamaMock # Default to mock
LlamaCppError_imported_class: Any = Exception # Default to base Exception
LLAMA_CPP_AVAILABLE: bool = False
try:
from llama_cpp import Llama as RealLlama, LlamaCppError as RealLlamaCppSpecificError
Llama_imported_class = RealLlama
LlamaCppError_imported_class = RealLlamaCppSpecificError
LLAMA_CPP_AVAILABLE = True
logger_import_debug.info("🎉 Final llama-cpp-python status: AVAILABLE. Using REAL Llama class. Error catching for Llama.cpp specifics will use 'LlamaCppSpecificError'.")
except ImportError:
logger_import_debug.warning("Could not import 'Llama' and 'LlamaCppSpecificError' from 'llama_cpp' directly. Trying simpler import...")
try:
from llama_cpp import Llama as RealLlamaOnly # Try again without specific error import (older versions)
Llama_imported_class = RealLlamaOnly
# LlamaCppError_imported_class remains Exception
LLAMA_CPP_AVAILABLE = True
logger_import_debug.warning("Note: 'LlamaCppError' not found at llama_cpp top-level. Using base 'Exception' for LlamaCpp errors.")
logger_import_debug.info("🎉 Final llama-cpp-python status: AVAILABLE (LlamaCppError not found). Using REAL Llama class.")
except ImportError:
# Llama_imported_class remains LlamaMock, LLAMA_CPP_AVAILABLE remains False
logger_import_debug.critical("CRITICAL FAILURE: Cannot import 'Llama' from 'llama_cpp': No module named 'llama_cpp'. Using LlamaMock. Model loading will fail for real models.")
except Exception as e_import_general:
# Llama_imported_class remains LlamaMock, LLAMA_CPP_AVAILABLE remains False
logger_import_debug.critical(f"CRITICAL FAILURE: Unexpected error during 'llama_cpp' import: {e_import_general}. Using LlamaMock.", exc_info=True)
# Assign to the names used throughout the rest of this module
Llama = Llama_imported_class
LlamaCppError = LlamaCppError_imported_class
# --- End of FIX ---
# --- End Llama.cpp Python Backend Import & Debug ---
try:
# Assume config_settings_public.py is in the same 'modules' directory or project root is in sys.path
from modules.config_settings_public import (
MODEL_PATHS, MODEL_SPECIFIC_PARAMS, INFERENCE_PRESETS, N_CTX_FALLBACK, VERBOSE_LLAMA_CPP
)
except ImportError as e_conf:
logging.critical(f"CRITICAL IMPORT ERROR in ModelManager: Cannot import from config_settings_public: {e_conf}")
# Provide minimal fallbacks if config cannot be loaded, though this is a critical failure
MODEL_PATHS = {}
MODEL_SPECIFIC_PARAMS = {"_default": {"n_ctx": 2048, "n_gpu_layers": -1, "verbose": False}}
INFERENCE_PRESETS = {"default": {"temperature": 0.7}}
N_CTX_FALLBACK = 2048
VERBOSE_LLAMA_CPP = False
logger = logging.getLogger("ZOTHEOS_ModelManager")
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Timeout for model generation tasks when run in a thread
GENERATION_THREAD_TIMEOUT_SECONDS = 300 # 5 minutes
class ModelManager:
def __init__(self, device_preference: Optional[str] = "cuda", max_model_count: int = 1, max_ram_models_gb: float = 8.0):
self.device_preference = device_preference
self.loaded_models: Dict[str, Any] = {} # Stores Llama instances
self.model_load_status: Dict[str, str] = {} # "unloaded", "loading", "loaded", "error"
self.model_ram_estimate_gb: Dict[str, float] = {}
self.total_estimated_ram_gb: float = 0.0
self.max_model_count = max_model_count
self.max_ram_models_gb = max_ram_models_gb
self.model_load_order: List[str] = [] # Tracks order for LRU
self.model_locks: Dict[str, asyncio.Lock] = {} # Per-model lock for loading
self.executor = ThreadPoolExecutor(max_workers=max_model_count + 1, thread_name_prefix="LLM_Gen") # +1 for summarizer
dev_log = "CPU (default)"
if self.device_preference == "cuda":
# Check NVIDIA drivers and CUDA toolkit availability (conceptual)
try:
import torch
if torch.cuda.is_available():
dev_log = f"CUDA (preference): {torch.cuda.get_device_name(0)}"
else:
dev_log = "CUDA (preference, but torch.cuda.is_available()=False)"
except ImportError:
dev_log = "CUDA (preference, but PyTorch not found for detailed check)"
except Exception as e_cuda_check:
dev_log = f"CUDA (preference, error during torch check: {e_cuda_check})"
# Actual CUDA usage is determined by n_gpu_layers > 0 during Llama init
logger.info(f"🔥 MMgr init. Dev Preference: {dev_log}. MaxRAM Models: {self.max_ram_models_gb}GB. Max Count: {self.max_model_count}.")
if not LLAMA_CPP_AVAILABLE:
logger.critical("❌ Llama.cpp backend CRITICAL FAILURE: The 'Llama' class could not be imported from 'llama_cpp'. ModelManager cannot load GGUF models. Please check installation and logs from 'ModelManager_ImportDebug'.")
logger.info(f"✅ MMGR Config: Max Models: {self.max_model_count}, Max RAM: {self.max_ram_models_gb} GB.")
async def _ensure_model_loaded(self, model_name: str) -> bool:
if model_name not in self.model_locks:
self.model_locks[model_name] = asyncio.Lock()
async with self.model_locks[model_name]:
if self.model_load_status.get(model_name) == "loaded" and model_name in self.loaded_models:
if model_name in self.model_load_order: self.model_load_order.remove(model_name)
self.model_load_order.append(model_name) # Move to end of LRU
return True
if self.model_load_status.get(model_name) == "loading":
logger.info(f"Model '{model_name}' is already being loaded by another task. Waiting...")
while self.model_load_status.get(model_name) == "loading": await asyncio.sleep(0.5)
return self.model_load_status.get(model_name) == "loaded"
self.model_load_status[model_name] = "loading"
logger.info(f"Attempting to load model '{model_name}'...")
await self._evict_models_if_needed(new_model_name_to_load=model_name)
model_instance, ram_gb = await asyncio.to_thread(self._load_model_from_disk, model_name)
if model_instance:
self.loaded_models[model_name] = model_instance
self.model_ram_estimate_gb[model_name] = ram_gb
self.total_estimated_ram_gb += ram_gb
self.model_load_status[model_name] = "loaded"
if model_name in self.model_load_order: self.model_load_order.remove(model_name)
self.model_load_order.append(model_name)
logger.info(f"✅ Model '{model_name}' loaded successfully. RAM used by this model: {ram_gb:.2f}GB. Total est. RAM: {self.total_estimated_ram_gb:.2f}GB.")
return True
else:
self.model_load_status[model_name] = "error"
logger.error(f"❌ Failed to load model '{model_name}'.")
# Clean up RAM if an estimate was added prematurely or if partial load occurred
if model_name in self.model_ram_estimate_gb:
self.total_estimated_ram_gb -= self.model_ram_estimate_gb.pop(model_name)
return False
def _load_model_from_disk(self, model_name: str) -> tuple[Optional[Any], float]:
# The check Llama == LlamaMock now works because LlamaMock is always defined
if not LLAMA_CPP_AVAILABLE or Llama == LlamaMock: # Llama is the variable pointing to the class
logger.error(f"Cannot load model '{model_name}': Llama.cpp backend not available (LLAMA_CPP_AVAILABLE={LLAMA_CPP_AVAILABLE}, Llama is LlamaMock: {Llama == LlamaMock}).")
return None, 0.0
model_path = MODEL_PATHS.get(model_name)
if not model_path or not os.path.exists(model_path):
logger.error(f"Model path for '{model_name}' not found or invalid: {model_path}")
return None, 0.0
specific_params = MODEL_SPECIFIC_PARAMS.get(model_name, {})
default_params = MODEL_SPECIFIC_PARAMS.get("_default", {})
load_params = default_params.copy()
load_params.update(specific_params)
load_params['model_path'] = model_path
load_params.setdefault('verbose', VERBOSE_LLAMA_CPP) # Use global verbose if not set per model
if 'n_ctx' not in load_params or load_params['n_ctx'] == 0: # Ensure n_ctx is valid
load_params['n_ctx'] = N_CTX_FALLBACK
logger.warning(f"n_ctx for model {model_name} was invalid or not found, using fallback: {N_CTX_FALLBACK}")
logger.info(f"🔄 Loading '{model_name}' from: {model_path}")
logger.debug(f"--- FINAL LOAD PARAMS FOR {model_name.upper()}: {load_params} ---")
start_time = time.perf_counter()
try:
model_instance = Llama(**load_params) # Llama here is the variable assigned above
load_time = time.perf_counter() - start_time
logger.info(f"✅ Model '{model_name}' (Llama instance) initialized by Llama(...). Load time: {load_time:.2f}s.")
ram_gb = os.path.getsize(model_path) / (1024**3)
ram_gb_with_buffer = ram_gb * 1.5 + 1 # Heuristic: file size + 50% + 1GB for KV cache, etc.
logger.info(f"Model '{model_name}' path: '{model_path}'. Est. RAM for this model (with buffer): {ram_gb_with_buffer:.2f}GB.")
return model_instance, ram_gb_with_buffer
except LlamaCppError as e_llama: # Catch specific LlamaCpp errors
logger.error(f"❌ LlamaCppError loading model '{model_name}': {e_llama}", exc_info=True)
except Exception as e:
logger.error(f"❌ Generic error loading model '{model_name}': {e}", exc_info=True)
return None, 0.0
async def _evict_models_if_needed(self, new_model_name_to_load: Optional[str] = None):
new_model_ram_gb_estimate = 0.0
if new_model_name_to_load:
model_path = MODEL_PATHS.get(new_model_name_to_load)
if model_path and os.path.exists(model_path):
new_model_ram_gb_estimate = (os.path.getsize(model_path) / (1024**3)) * 1.5 + 1 # Same heuristic
# Check if eviction is needed:
# 1. If we are at max model count AND the new model isn't already loaded.
# 2. OR if adding the new model would exceed total RAM limit AND we have models loaded.
while (len(self.loaded_models) >= self.max_model_count and (new_model_name_to_load not in self.loaded_models)) or \
(self.total_estimated_ram_gb + new_model_ram_gb_estimate > self.max_ram_models_gb and self.model_load_order):
if not self.model_load_order:
logger.warning("Eviction needed but model_load_order is empty. This should not happen if loaded_models is populated.")
break
model_to_evict = self.model_load_order.pop(0) # Evict LRU
if model_to_evict in self.loaded_models:
logger.warning(f"⚠️ Evicting model '{model_to_evict}' due to resource limits (Count: {len(self.loaded_models)}/{self.max_model_count}, RAM: {self.total_estimated_ram_gb:.2f}/{self.max_ram_models_gb:.2f}GB).")
del self.loaded_models[model_to_evict] # Release model instance
evicted_ram = self.model_ram_estimate_gb.pop(model_to_evict, 0)
self.total_estimated_ram_gb -= evicted_ram
self.model_load_status[model_to_evict] = "unloaded"
logger.info(f"Model '{model_to_evict}' unloaded. RAM reclaimed: {evicted_ram:.2f}GB. Total est. RAM: {self.total_estimated_ram_gb:.2f}GB.")
else:
logger.warning(f"Model '{model_to_evict}' was in load order but not in loaded_models dict. Inconsistency detected.")
def _generation_task_sync(self, model_instance: Any, prompt_messages: List[Dict[str,str]], gen_params: Dict[str, Any]) -> str:
model_log_name = model_instance.model_path.split(os.sep)[-1] if hasattr(model_instance, 'model_path') else 'UnknownModel'
logger.debug(f"--- [{model_log_name}] ENTERING _generation_task_sync ---")
output_text = "[Error: Generation failed in thread]"
try:
completion = model_instance.create_chat_completion(messages=prompt_messages, **gen_params)
if completion and "choices" in completion and completion["choices"]:
message_content = completion["choices"][0].get("message", {}).get("content")
output_text = message_content.strip() if message_content else "[No content in choice message]"
else:
output_text = "[No choices in completion or completion is empty]"
logger.warning(f"[{model_log_name}] Malformed completion object: {completion}")
logger.debug(f"✅ [{model_log_name}] Sync generation successful. Preview: {output_text[:100].replace(chr(10),' ')}...")
except Exception as e:
logger.error(f"❌ [{model_log_name}] Exception during model generation in thread: {e}", exc_info=True)
output_text = f"[Error: Exception during generation - {type(e).__name__}: {str(e)[:100]}]"
logger.debug(f"--- [{model_log_name}] EXITING _generation_task_sync ---")
return output_text
async def async_generation_wrapper(self, model_instance: Any, prompt_messages: List[Dict[str,str]], gen_params: Dict[str, Any], model_name_for_log: str) -> str:
logger.debug(f"--- [{model_name_for_log}] ENTERING async_generation_wrapper ---")
output_str = f"[Error: Model '{model_name_for_log}' instance not available for generation]"
if not model_instance:
logger.error(f"[{model_name_for_log}] Model instance is None in async_generation_wrapper.")
return output_str
try:
output_str = await asyncio.wait_for(
asyncio.to_thread(self._generation_task_sync, model_instance, prompt_messages, gen_params),
timeout=GENERATION_THREAD_TIMEOUT_SECONDS + 10
)
except asyncio.TimeoutError:
logger.error(f"❌ [{model_name_for_log}] Generation task TIMED OUT in asyncio.wait_for (>{GENERATION_THREAD_TIMEOUT_SECONDS + 10}s).")
output_str = f"[Error: Generation timed out for model {model_name_for_log}]"
except Exception as e:
logger.error(f"❌ [{model_name_for_log}] Exception in async_generation_wrapper: {e}", exc_info=True)
output_str = f"[Error: Async wrapper exception for model {model_name_for_log} - {type(e).__name__}]"
logger.debug(f"--- [{model_name_for_log}] EXITING async_generation_wrapper ---")
return output_str
async def generate_with_model(self, model_name: str, prompt: str, preset_name: str = "default", system_prompt: Optional[str] = None) -> str:
logger.info(f"--- [{model_name}] Received generation request. System prompt: {'Yes' if system_prompt else 'No'}. Preset: {preset_name} ---")
# Check if Llama class is LlamaMock (meaning llama.cpp is not available)
if not LLAMA_CPP_AVAILABLE or Llama == LlamaMock:
logger.error(f"Cannot generate with model '{model_name}': Llama.cpp backend not available (LLAMA_CPP_AVAILABLE={LLAMA_CPP_AVAILABLE}, Llama is LlamaMock: {Llama == LlamaMock}).")
return "[Error: Llama.cpp backend core ('Llama' class) not available or is mocked]"
if not await self._ensure_model_loaded(model_name):
return f"[Error: Model '{model_name}' could not be loaded or is not available.]"
model_instance = self.loaded_models.get(model_name)
if not model_instance:
logger.error(f"Model '{model_name}' instance not found in loaded_models after successful _ensure_model_loaded. This is unexpected.")
return f"[Error: Model '{model_name}' instance unexpectedly not found after load attempt.]"
gen_params = INFERENCE_PRESETS.get(preset_name, INFERENCE_PRESETS.get("balanced", {"temperature": 0.7})).copy() # Fallback to "balanced" then hardcoded default
prompt_messages = []
if system_prompt and system_prompt.strip():
prompt_messages.append({"role": "system", "content": system_prompt})
prompt_messages.append({"role": "user", "content": prompt})
logger.debug(f"[{model_name}] Calling async_generation_wrapper with messages: {prompt_messages}, params: {gen_params}")
response_text = await self.async_generation_wrapper(model_instance, prompt_messages, gen_params, model_name)
logger.info(f"--- [{model_name}] Generation complete. Response length: {len(response_text)} ---")
return response_text
def get_loaded_model_stats(self) -> Dict[str, Any]:
return {
"loaded_model_count": len(self.loaded_models),
"total_ram_estimate_gb": round(self.total_estimated_ram_gb, 2),
"max_ram_models_gb": self.max_ram_models_gb,
"max_model_count": self.max_model_count,
"models_in_memory": list(self.loaded_models.keys()),
"load_order_lru": self.model_load_order, # LRU (oldest) to MRU (newest)
"model_ram_details_gb": {name: round(ram, 2) for name, ram in self.model_ram_estimate_gb.items()},
"model_load_status": self.model_load_status,
"llama_cpp_available": LLAMA_CPP_AVAILABLE,
"llama_class_is_mock": Llama == LlamaMock
}
def shutdown(self):
logger.info("🔌 Shutting down ModelManager...")
self.executor.shutdown(wait=True, cancel_futures=True) # Cancel pending futures on shutdown
self.loaded_models.clear()
self.model_ram_estimate_gb.clear()
self.model_load_order.clear()
self.model_load_status.clear()
self.total_estimated_ram_gb = 0
logger.info("✅ ModelManager shutdown complete.")
# Example usage for direct testing
async def main_model_manager_test():
# Configure logging for the test
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s')
# Set specific loggers to higher levels if too verbose
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logger.info("--- ModelManager Test ---")
if not MODEL_PATHS:
logger.error("MODEL_PATHS is empty. Cannot run test. Check config_settings_public.py")
return
if Llama == LlamaMock: # Check if we are using the mock
logger.warning("Llama.cpp is not available. Running test with LlamaMock. Responses will be mocked.")
# Example: max 2 models, 10GB RAM limit
mm = ModelManager(max_model_count=2, max_ram_models_gb=10.0)
test_models_available = list(MODEL_PATHS.keys())
if not test_models_available:
logger.error("No models defined in MODEL_PATHS for testing.")
mm.shutdown()
return
# Select up to 3 models for testing, or fewer if not available
model1_name = test_models_available[0]
model2_name = test_models_available[1] if len(test_models_available) > 1 else model1_name
model3_name = test_models_available[2] if len(test_models_available) > 2 else model1_name
test_queries = [
(model1_name, "What is the capital of France?", "precise", "You are a helpful geography expert."),
(model2_name, "Explain black holes simply.", "creative", None),
(model1_name, "What is 1+1?", "precise", None), # Query model1 again
]
if len(test_models_available) > 2 and model3_name not in [model1_name, model2_name]: # Only add model3 if it's distinct and available
test_queries.append((model3_name, "Tell me a short joke.", "default", None))
for i, (model_name, query, preset, sys_prompt) in enumerate(test_queries):
logger.info(f"\n--- Test Query {i+1}: Model '{model_name}', Preset '{preset}' ---")
response = await mm.generate_with_model(model_name, query, preset_name=preset, system_prompt=sys_prompt)
print(f"Response from '{model_name}': {response[:150]}...") # Print more of the response
print(f"Stats after '{model_name}': {mm.get_loaded_model_stats()}")
await asyncio.sleep(1) # Small delay between queries
mm.shutdown()
logger.info("--- ModelManager Test Complete ---")
if __name__ == "__main__":
# This allows testing ModelManager directly if needed
# Ensure your MODEL_PATHS in config_settings_public.py point to valid GGUF files
# To run: python -m modules.model_manager_public
asyncio.run(main_model_manager_test())