quick-hub-957 / models.py
Rights4AI's picture
Deploy Gradio app with multiple files
ca9b9a8 verified
import os
from pathlib import Path
from typing import Optional, Dict, Generator, List
import json
import logging
# Try to import llama-cpp-python
try:
from llama_cpp import Llama
LLAMA_AVAILABLE = True
except ImportError:
LLAMA_AVAILABLE = False
Llama = None
logging.warning("llama-cpp-python not installed. Install with: pip install llama-cpp-python")
class ModelManager:
"""Manages loading and inference of GGUF models"""
def __init__(self):
self.model: Optional[Llama] = None
self.model_path: Optional[str] = None
self.context_size: int = 2048
self.gpu_layers: int = 0
def is_loaded(self) -> bool:
"""Check if a model is loaded"""
return self.model is not None
def load_model(
self,
model_path: str,
context_size: int = 2048,
gpu_layers: int = 0,
n_ctx: Optional[int] = None,
n_gpu_layers: Optional[int] = None,
verbose: bool = True
) -> bool:
"""Load a GGUF model"""
if not LLAMA_AVAILABLE:
logging.error("llama-cpp-python is not installed")
return False
try:
# Unload existing model if any
if self.model:
self.unload_model()
# Set parameters
self.context_size = n_ctx or context_size
self.gpu_layers = n_gpu_layers or gpu_layers
self.model_path = model_path
# Load the model
self.model = Llama(
model_path=model_path,
n_ctx=self.context_size,
n_gpu_layers=self.gpu_layers,
verbose=verbose,
embedding=False,
f16_kv=True,
use_mmap=True,
use_mlock=False,
logits_all=False,
vocab_only=False
)
logging.info(f"Model loaded successfully: {model_path}")
return True
except Exception as e:
logging.error(f"Failed to load model: {str(e)}")
self.model = None
self.model_path = None
return False
def unload_model(self):
"""Unload the current model"""
if self.model:
del self.model
self.model = None
self.model_path = None
logging.info("Model unloaded")
def generate(
self,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 512,
top_p: float = 0.9,
repeat_penalty: float = 1.1,
stop: Optional[List[str]] = None,
stream: bool = True
) -> Generator[str, None, None]:
"""Generate text from the model"""
if not self.model:
raise ValueError("No model loaded")
try:
# Generate response
if stream:
for chunk in self.model(
prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
repeat_penalty=repeat_penalty,
stop=stop or [],
stream=True
):
if chunk["choices"]:
yield chunk["choices"][0]["text"]
else:
output = self.model(
prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
repeat_penalty=repeat_penalty,
stop=stop or [],
stream=False
)
yield output["choices"][0]["text"]
except Exception as e:
logging.error(f"Generation error: {str(e)}")
raise
def get_model_info(self) -> Optional[Dict]:
"""Get information about the loaded model"""
if not self.model:
return None
try:
# Extract model metadata
metadata = getattr(self.model, 'metadata', {})
# Try to get tokenizer info
try:
vocab_size = len(self.model._model.tokenizer().vocab())
except:
vocab_size = None
# Basic model info
info = {
"model_path": self.model_path,
"context_size": self.context_size,
"gpu_layers": self.gpu_layers,
"vocab_size": vocab_size,
}
# Add metadata if available
if metadata:
# Extract common metadata fields
common_fields = [
"general.architecture",
"llama.vocab_size",
"llama.context_length",
"llama.embedding_length",
"llama.block_count",
"llama.feed_forward_length",
"llama.attention.head_count",
"llama.attention.head_count_kv",
"llama.rope.dimension_count",
"llama.attention.layer_norm_rms_epsilon",
"tokenizer.ggml.model",
"tokenizer.ggml.tokens",
]
for field in common_fields:
if field in metadata:
info[field] = metadata[field]
# Add all metadata as raw for debugging
info["raw_metadata"] = {k: v for k, v in metadata.items()
if not isinstance(v, (bytes, bytearray))}
return info
except Exception as e:
logging.error(f"Error getting model info: {str(e)}")
return {"error": str(e)}
def tokenize(self, text: str) -> List[int]:
"""Tokenize text"""
if not self.model:
raise ValueError("No model loaded")
try:
return self.model.tokenize(text.encode("utf-8"))
except Exception as e:
logging.error(f"Tokenization error: {str(e)}")
return []
def detokenize(self, tokens: List[int]) -> str:
"""Detokenize tokens"""
if not self.model:
raise ValueError("No model loaded")
try:
return self.model.detokenize(tokens).decode("utf-8")
except Exception as e:
logging.error(f"Detokenization error: {str(e)}")
return ""
def check_model_compatibility(model_path: str) -> Dict:
"""Check if a model file is compatible"""
result = {
"exists": False,
"readable": False,
"gguf": False,
"size_mb": 0,
"error": None
}
try:
path = Path(model_path)
result["exists"] = path.exists()
if result["exists"]:
result["size_mb"] = path.stat().st_size / (1024 * 1024)
result["gguf"] = path.suffix.lower() == ".gguf"
# Try to read file header
try:
with open(path, "rb") as f:
header = f.read(4)
result["readable"] = len(header) == 4
except:
result["readable"] = False
except Exception as e:
result["error"] = str(e)
return result