Spaces:
Build error
Build error
| import os | |
| from pathlib import Path | |
| from typing import Optional, Dict, Generator, List | |
| import json | |
| import logging | |
| # Try to import llama-cpp-python | |
| try: | |
| from llama_cpp import Llama | |
| LLAMA_AVAILABLE = True | |
| except ImportError: | |
| LLAMA_AVAILABLE = False | |
| Llama = None | |
| logging.warning("llama-cpp-python not installed. Install with: pip install llama-cpp-python") | |
| class ModelManager: | |
| """Manages loading and inference of GGUF models""" | |
| def __init__(self): | |
| self.model: Optional[Llama] = None | |
| self.model_path: Optional[str] = None | |
| self.context_size: int = 2048 | |
| self.gpu_layers: int = 0 | |
| def is_loaded(self) -> bool: | |
| """Check if a model is loaded""" | |
| return self.model is not None | |
| def load_model( | |
| self, | |
| model_path: str, | |
| context_size: int = 2048, | |
| gpu_layers: int = 0, | |
| n_ctx: Optional[int] = None, | |
| n_gpu_layers: Optional[int] = None, | |
| verbose: bool = True | |
| ) -> bool: | |
| """Load a GGUF model""" | |
| if not LLAMA_AVAILABLE: | |
| logging.error("llama-cpp-python is not installed") | |
| return False | |
| try: | |
| # Unload existing model if any | |
| if self.model: | |
| self.unload_model() | |
| # Set parameters | |
| self.context_size = n_ctx or context_size | |
| self.gpu_layers = n_gpu_layers or gpu_layers | |
| self.model_path = model_path | |
| # Load the model | |
| self.model = Llama( | |
| model_path=model_path, | |
| n_ctx=self.context_size, | |
| n_gpu_layers=self.gpu_layers, | |
| verbose=verbose, | |
| embedding=False, | |
| f16_kv=True, | |
| use_mmap=True, | |
| use_mlock=False, | |
| logits_all=False, | |
| vocab_only=False | |
| ) | |
| logging.info(f"Model loaded successfully: {model_path}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"Failed to load model: {str(e)}") | |
| self.model = None | |
| self.model_path = None | |
| return False | |
| def unload_model(self): | |
| """Unload the current model""" | |
| if self.model: | |
| del self.model | |
| self.model = None | |
| self.model_path = None | |
| logging.info("Model unloaded") | |
| def generate( | |
| self, | |
| prompt: str, | |
| temperature: float = 0.7, | |
| max_tokens: int = 512, | |
| top_p: float = 0.9, | |
| repeat_penalty: float = 1.1, | |
| stop: Optional[List[str]] = None, | |
| stream: bool = True | |
| ) -> Generator[str, None, None]: | |
| """Generate text from the model""" | |
| if not self.model: | |
| raise ValueError("No model loaded") | |
| try: | |
| # Generate response | |
| if stream: | |
| for chunk in self.model( | |
| prompt, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| repeat_penalty=repeat_penalty, | |
| stop=stop or [], | |
| stream=True | |
| ): | |
| if chunk["choices"]: | |
| yield chunk["choices"][0]["text"] | |
| else: | |
| output = self.model( | |
| prompt, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| repeat_penalty=repeat_penalty, | |
| stop=stop or [], | |
| stream=False | |
| ) | |
| yield output["choices"][0]["text"] | |
| except Exception as e: | |
| logging.error(f"Generation error: {str(e)}") | |
| raise | |
| def get_model_info(self) -> Optional[Dict]: | |
| """Get information about the loaded model""" | |
| if not self.model: | |
| return None | |
| try: | |
| # Extract model metadata | |
| metadata = getattr(self.model, 'metadata', {}) | |
| # Try to get tokenizer info | |
| try: | |
| vocab_size = len(self.model._model.tokenizer().vocab()) | |
| except: | |
| vocab_size = None | |
| # Basic model info | |
| info = { | |
| "model_path": self.model_path, | |
| "context_size": self.context_size, | |
| "gpu_layers": self.gpu_layers, | |
| "vocab_size": vocab_size, | |
| } | |
| # Add metadata if available | |
| if metadata: | |
| # Extract common metadata fields | |
| common_fields = [ | |
| "general.architecture", | |
| "llama.vocab_size", | |
| "llama.context_length", | |
| "llama.embedding_length", | |
| "llama.block_count", | |
| "llama.feed_forward_length", | |
| "llama.attention.head_count", | |
| "llama.attention.head_count_kv", | |
| "llama.rope.dimension_count", | |
| "llama.attention.layer_norm_rms_epsilon", | |
| "tokenizer.ggml.model", | |
| "tokenizer.ggml.tokens", | |
| ] | |
| for field in common_fields: | |
| if field in metadata: | |
| info[field] = metadata[field] | |
| # Add all metadata as raw for debugging | |
| info["raw_metadata"] = {k: v for k, v in metadata.items() | |
| if not isinstance(v, (bytes, bytearray))} | |
| return info | |
| except Exception as e: | |
| logging.error(f"Error getting model info: {str(e)}") | |
| return {"error": str(e)} | |
| def tokenize(self, text: str) -> List[int]: | |
| """Tokenize text""" | |
| if not self.model: | |
| raise ValueError("No model loaded") | |
| try: | |
| return self.model.tokenize(text.encode("utf-8")) | |
| except Exception as e: | |
| logging.error(f"Tokenization error: {str(e)}") | |
| return [] | |
| def detokenize(self, tokens: List[int]) -> str: | |
| """Detokenize tokens""" | |
| if not self.model: | |
| raise ValueError("No model loaded") | |
| try: | |
| return self.model.detokenize(tokens).decode("utf-8") | |
| except Exception as e: | |
| logging.error(f"Detokenization error: {str(e)}") | |
| return "" | |
| def check_model_compatibility(model_path: str) -> Dict: | |
| """Check if a model file is compatible""" | |
| result = { | |
| "exists": False, | |
| "readable": False, | |
| "gguf": False, | |
| "size_mb": 0, | |
| "error": None | |
| } | |
| try: | |
| path = Path(model_path) | |
| result["exists"] = path.exists() | |
| if result["exists"]: | |
| result["size_mb"] = path.stat().st_size / (1024 * 1024) | |
| result["gguf"] = path.suffix.lower() == ".gguf" | |
| # Try to read file header | |
| try: | |
| with open(path, "rb") as f: | |
| header = f.read(4) | |
| result["readable"] = len(header) == 4 | |
| except: | |
| result["readable"] = False | |
| except Exception as e: | |
| result["error"] = str(e) | |
| return result |