Spaces:
Sleeping
Sleeping
| """ | |
| Shared LLM Model Manager | |
| Single Qwen2.5-Coder-1.5B instance shared by NL translator and AI analysis | |
| Prevents duplicate model loading and memory waste | |
| OPTIMIZED FOR NON-BLOCKING OPERATION: | |
| - Async request submission (returns immediately) | |
| - Result polling (check if ready) | |
| - Request cancellation if game loop needs to continue | |
| """ | |
| import threading | |
| import queue | |
| import time | |
| from typing import Optional, Dict, Any, List, Tuple | |
| from pathlib import Path | |
| from enum import Enum | |
| try: | |
| from llama_cpp import Llama | |
| except ImportError: | |
| Llama = None | |
| class RequestStatus(Enum): | |
| """Status of an async request""" | |
| PENDING = "pending" # In queue, not yet processed | |
| PROCESSING = "processing" # Currently being processed | |
| COMPLETED = "completed" # Done, result available | |
| FAILED = "failed" # Error occurred | |
| CANCELLED = "cancelled" # Request was cancelled | |
| class AsyncRequest: | |
| """Represents an async LLM request""" | |
| def __init__(self, request_id: str, messages: List[Dict[str, str]], | |
| max_tokens: int, temperature: float): | |
| self.request_id = request_id | |
| self.messages = messages | |
| self.max_tokens = max_tokens | |
| self.temperature = temperature | |
| self.status = RequestStatus.PENDING | |
| self.result_text: Optional[str] = None | |
| self.error_message: Optional[str] = None | |
| self.submitted_at = time.time() | |
| self.completed_at: Optional[float] = None | |
| class SharedModelManager: | |
| """Thread-safe singleton manager for shared LLM model""" | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| # Only initialize once | |
| if hasattr(self, '_initialized'): | |
| return | |
| self._initialized = True | |
| self.model = None # type: Optional[Llama] | |
| self.model_path = None # type: Optional[str] | |
| self.model_loaded = False | |
| self.last_error = None # type: Optional[str] | |
| # Async request management | |
| self._request_queue = queue.Queue() # type: queue.Queue[AsyncRequest] | |
| self._requests = {} # type: Dict[str, AsyncRequest] | |
| self._requests_lock = threading.Lock() | |
| self._worker_thread = None # type: Optional[threading.Thread] | |
| self._stop_worker = False | |
| self._current_request_id: Optional[str] = None # Track what's being processed | |
| def load_model(self, model_path: str = "qwen2.5-coder-1.5b-instruct-q4_0.gguf") -> tuple[bool, Optional[str]]: | |
| """Load the shared model (thread-safe)""" | |
| with self._lock: | |
| if self.model_loaded and self.model_path == model_path: | |
| return True, None | |
| if Llama is None: | |
| self.last_error = "llama-cpp-python not installed" | |
| return False, self.last_error | |
| try: | |
| # Unload previous model if different | |
| if self.model is not None and self.model_path != model_path: | |
| del self.model | |
| self.model = None | |
| self.model_loaded = False | |
| # Load new model | |
| # Try /tmp/rts first (HuggingFace Space download location) | |
| tmp_path = Path("/tmp/rts") / model_path | |
| local_path = Path(__file__).parent / model_path | |
| if tmp_path.exists(): | |
| full_path = tmp_path | |
| elif local_path.exists(): | |
| full_path = local_path | |
| else: | |
| self.last_error = f"Model file not found: {model_path} (checked /tmp/rts/ and {Path(__file__).parent})" | |
| return False, self.last_error | |
| self.model = Llama( | |
| model_path=str(full_path), | |
| n_ctx=2048, # Reduced from 4096 for faster processing | |
| n_threads=1, # Prompt processing: 1 thread | |
| n_threads_batch=1, # Token generation: 1 thread (CRITICAL!) | |
| n_batch=256, # Increased from 128 for better throughput | |
| verbose=False, | |
| chat_format='qwen' | |
| ) | |
| self.model_path = model_path | |
| self.model_loaded = True | |
| self.last_error = None | |
| # Start worker thread if not running | |
| if self._worker_thread is None or not self._worker_thread.is_alive(): | |
| self._stop_worker = False | |
| self._worker_thread = threading.Thread(target=self._process_requests, daemon=True) | |
| self._worker_thread.start() | |
| return True, None | |
| except Exception as e: | |
| self.last_error = f"Failed to load model: {str(e)}" | |
| self.model_loaded = False | |
| return False, self.last_error | |
| def _process_requests(self): | |
| """Worker thread to process model requests sequentially (async-friendly)""" | |
| # Lower thread priority so game gets CPU preference | |
| import os | |
| try: | |
| os.nice(10) # Lower priority (0=normal, 19=lowest) | |
| print("📉 LLM worker thread priority lowered (nice +10)") | |
| except Exception as e: | |
| print(f"⚠️ Could not lower thread priority: {e}") | |
| while not self._stop_worker: | |
| try: | |
| # Get request with timeout to check stop flag | |
| try: | |
| request = self._request_queue.get(timeout=0.5) | |
| except queue.Empty: | |
| continue | |
| if not isinstance(request, AsyncRequest): | |
| continue | |
| # Mark as processing | |
| with self._requests_lock: | |
| self._current_request_id = request.request_id | |
| request.status = RequestStatus.PROCESSING | |
| try: | |
| # Check model is loaded | |
| if not self.model_loaded or self.model is None: | |
| request.status = RequestStatus.FAILED | |
| request.error_message = 'Model not loaded' | |
| request.completed_at = time.time() | |
| continue | |
| # Process request (this is the blocking part) | |
| start_time = time.time() | |
| response = self.model.create_chat_completion( | |
| messages=request.messages, | |
| max_tokens=request.max_tokens, | |
| temperature=request.temperature, | |
| stream=False | |
| ) | |
| elapsed = time.time() - start_time | |
| # Extract text from response | |
| if response and 'choices' in response and len(response['choices']) > 0: | |
| text = response['choices'][0].get('message', {}).get('content', '') | |
| request.status = RequestStatus.COMPLETED | |
| request.result_text = text | |
| request.completed_at = time.time() | |
| print(f"✅ LLM request completed in {elapsed:.2f}s") | |
| else: | |
| request.status = RequestStatus.FAILED | |
| request.error_message = 'Empty response from model' | |
| request.completed_at = time.time() | |
| except Exception as e: | |
| request.status = RequestStatus.FAILED | |
| request.error_message = f"Model inference error: {str(e)}" | |
| request.completed_at = time.time() | |
| print(f"❌ LLM request failed: {e}") | |
| finally: | |
| with self._requests_lock: | |
| self._current_request_id = None | |
| except Exception as e: | |
| print(f"❌ Worker thread error: {e}") | |
| time.sleep(0.1) | |
| def submit_async(self, messages: List[Dict[str, str]], max_tokens: int = 256, | |
| temperature: float = 0.7) -> str: | |
| """ | |
| Submit request asynchronously (non-blocking) | |
| Args: | |
| messages: List of {role, content} dicts | |
| max_tokens: Maximum tokens to generate | |
| temperature: Sampling temperature | |
| Returns: | |
| request_id: Use this to poll for results with get_result() | |
| """ | |
| if not self.model_loaded: | |
| raise RuntimeError("Model not loaded. Call load_model() first.") | |
| # Create unique request ID | |
| request_id = f"req_{int(time.time() * 1000000)}_{id(threading.current_thread())}" | |
| # Create request object | |
| request = AsyncRequest( | |
| request_id=request_id, | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature | |
| ) | |
| # Register and submit | |
| with self._requests_lock: | |
| self._requests[request_id] = request | |
| self._request_queue.put(request) | |
| print(f"📤 LLM request submitted: {request_id}") | |
| return request_id | |
| def get_result(self, request_id: str, remove: bool = True) -> Tuple[RequestStatus, Optional[str], Optional[str]]: | |
| """ | |
| Check result of async request (non-blocking) | |
| Args: | |
| request_id: ID returned by submit_async() | |
| remove: If True, remove request after getting result | |
| Returns: | |
| (status, result_text, error_message) | |
| """ | |
| with self._requests_lock: | |
| request = self._requests.get(request_id) | |
| if request is None: | |
| return RequestStatus.FAILED, None, "Request not found (may have been cleaned up)" | |
| # Return current status | |
| status = request.status | |
| result_text = request.result_text | |
| error_message = request.error_message | |
| # Cleanup if requested and completed | |
| if remove and status in [RequestStatus.COMPLETED, RequestStatus.FAILED, RequestStatus.CANCELLED]: | |
| with self._requests_lock: | |
| self._requests.pop(request_id, None) | |
| return status, result_text, error_message | |
| def cancel_request(self, request_id: str) -> bool: | |
| """ | |
| Cancel a pending request (cannot cancel if already processing) | |
| Returns: | |
| True if cancelled, False if already processing/completed | |
| """ | |
| with self._requests_lock: | |
| request = self._requests.get(request_id) | |
| if request is None: | |
| return False | |
| # Can only cancel pending requests | |
| if request.status == RequestStatus.PENDING: | |
| request.status = RequestStatus.CANCELLED | |
| request.completed_at = time.time() | |
| return True | |
| return False | |
| def generate(self, messages: List[Dict[str, str]], max_tokens: int = 256, | |
| temperature: float = 0.7, max_wait: float = 300.0) -> tuple[bool, Optional[str], Optional[str]]: | |
| """ | |
| Generate response from model (blocking, for backward compatibility) | |
| NO TIMEOUT - waits for inference to complete naturally. | |
| Only cancelled if superseded by new request of same type. | |
| max_wait is a safety limit only. | |
| Args: | |
| messages: List of {role, content} dicts | |
| max_tokens: Maximum tokens to generate | |
| temperature: Sampling temperature | |
| max_wait: Safety limit in seconds (default 5min) | |
| Returns: | |
| (success, response_text, error_message) | |
| """ | |
| try: | |
| # Submit async | |
| request_id = self.submit_async(messages, max_tokens, temperature) | |
| # Poll for result (no timeout, wait for completion) | |
| start_time = time.time() | |
| while time.time() - start_time < max_wait: # Safety limit only | |
| status, result_text, error_message = self.get_result(request_id, remove=False) | |
| if status == RequestStatus.COMPLETED: | |
| # Cleanup and return | |
| self.get_result(request_id, remove=True) | |
| return True, result_text, None | |
| elif status == RequestStatus.FAILED: | |
| # Cleanup and return | |
| self.get_result(request_id, remove=True) | |
| return False, None, error_message | |
| elif status == RequestStatus.CANCELLED: | |
| self.get_result(request_id, remove=True) | |
| return False, None, "Request was cancelled by newer request" | |
| # Still pending/processing, wait a bit | |
| time.sleep(0.1) | |
| # Safety limit reached (model may be stuck) | |
| return False, None, f"Request exceeded safety limit ({max_wait}s) - model may be stuck" | |
| except Exception as e: | |
| return False, None, f"Error: {str(e)}" | |
| def cleanup_old_requests(self, max_age: float = 300.0): | |
| """ | |
| Remove completed/failed requests older than max_age seconds | |
| Args: | |
| max_age: Maximum age in seconds (default 5 minutes) | |
| """ | |
| now = time.time() | |
| with self._requests_lock: | |
| to_remove = [] | |
| for request_id, request in self._requests.items(): | |
| if request.completed_at is not None: | |
| age = now - request.completed_at | |
| if age > max_age: | |
| to_remove.append(request_id) | |
| for request_id in to_remove: | |
| self._requests.pop(request_id, None) | |
| if to_remove: | |
| print(f"🧹 Cleaned up {len(to_remove)} old LLM requests") | |
| def get_queue_status(self) -> Dict[str, Any]: | |
| """Get current queue status for monitoring""" | |
| with self._requests_lock: | |
| pending = sum(1 for r in self._requests.values() if r.status == RequestStatus.PENDING) | |
| processing = sum(1 for r in self._requests.values() if r.status == RequestStatus.PROCESSING) | |
| completed = sum(1 for r in self._requests.values() if r.status == RequestStatus.COMPLETED) | |
| failed = sum(1 for r in self._requests.values() if r.status == RequestStatus.FAILED) | |
| return { | |
| 'queue_size': self._request_queue.qsize(), | |
| 'total_requests': len(self._requests), | |
| 'pending': pending, | |
| 'processing': processing, | |
| 'completed': completed, | |
| 'failed': failed, | |
| 'current_request': self._current_request_id | |
| } | |
| def shutdown(self): | |
| """Cleanup resources""" | |
| self._stop_worker = True | |
| if self._worker_thread is not None: | |
| self._worker_thread.join(timeout=2.0) | |
| with self._lock: | |
| if self.model is not None: | |
| del self.model | |
| self.model = None | |
| self.model_loaded = False | |
| # Global singleton instance | |
| _shared_model_manager = SharedModelManager() | |
| def get_shared_model() -> SharedModelManager: | |
| """Get the shared model manager singleton""" | |
| return _shared_model_manager | |