Spaces:
Sleeping
Sleeping
| # File: enhanced_gradio_interface.py | |
| import asyncio | |
| from collections import defaultdict | |
| import json | |
| import os | |
| import re | |
| import time | |
| import uuid | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass | |
| from threading import Lock | |
| import threading | |
| import json | |
| import os | |
| import queue | |
| import traceback | |
| import uuid | |
| from typing import Coroutine, Dict, List, Any, Optional, Callable | |
| from dataclasses import dataclass | |
| from queue import Queue, Empty | |
| from threading import Lock, Event, Thread | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor | |
| import time | |
| import gradio as gr | |
| from openai import AsyncOpenAI, OpenAI | |
| import pyttsx3 | |
| from rich.console import Console | |
| BASE_URL="http://localhost:1234/v1" | |
| BASE_API_KEY="not-needed" | |
| BASE_CLIENT = AsyncOpenAI( | |
| base_url=BASE_URL, | |
| api_key=BASE_API_KEY | |
| ) # Global state for client | |
| BASEMODEL_ID = "leroydyer/qwen/qwen3-0.6b-q4_k_m.gguf" # Global state for selected model ID | |
| CLIENT =OpenAI( | |
| base_url=BASE_URL, | |
| api_key=BASE_API_KEY | |
| ) # Global state for client | |
| # --- Global Variables (if needed) --- | |
| console = Console() | |
| # --- Configuration --- | |
| LOCAL_BASE_URL = "http://localhost:1234/v1" | |
| LOCAL_API_KEY = "not-needed" | |
| # HuggingFace Spaces configuration | |
| HF_INFERENCE_URL = "https://api-inference.huggingface.co/models/" | |
| HF_API_KEY = os.getenv("HF_API_KEY", "") | |
| # Available model options | |
| MODEL_OPTIONS = { | |
| "Local LM Studio": LOCAL_BASE_URL, | |
| "Codellama 7B": "codellama/CodeLlama-7b-hf", | |
| "Mistral 7B": "mistralai/Mistral-7B-v0.1", | |
| "Llama 2 7B": "meta-llama/Llama-2-7b-chat-hf", | |
| "Falcon 7B": "tiiuae/falcon-7b-instruct" | |
| } | |
| DEFAULT_TEMPERATURE = 0.7 | |
| DEFAULT_MAX_TOKENS = 5000 | |
| console = Console() | |
| # --- Canvas Artifact Support --- | |
| class CanvasArtifact: | |
| id: str | |
| type: str # 'code', 'diagram', 'text', 'image' | |
| content: str | |
| title: str | |
| timestamp: float | |
| metadata: Dict[str, Any] | |
| class LLMMessage: | |
| role: str | |
| content: str | |
| message_id: str = None | |
| conversation_id: str = None | |
| timestamp: float = None | |
| metadata: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.message_id is None: | |
| self.message_id = str(uuid.uuid4()) | |
| if self.timestamp is None: | |
| self.timestamp = time.time() | |
| if self.metadata is None: | |
| self.metadata = {} | |
| class LLMRequest: | |
| message: LLMMessage | |
| response_event: str = None | |
| callback: Callable = None | |
| def __post_init__(self): | |
| if self.response_event is None: | |
| self.response_event = f"llm_response_{self.message.message_id}" | |
| class LLMResponse: | |
| message: LLMMessage | |
| request_id: str | |
| success: bool = True | |
| error: str = None | |
| # --- Event Manager (copied from your original code or imported) --- | |
| class EventManager: | |
| def __init__(self): | |
| self._handlers = defaultdict(list) | |
| self._lock = threading.Lock() | |
| def register(self, event: str, handler: Callable): | |
| with self._lock: | |
| self._handlers[event].append(handler) | |
| def unregister(self, event: str, handler: Callable): | |
| with self._lock: | |
| if event in self._handlers and handler in self._handlers[event]: | |
| self._handlers[event].remove(handler) | |
| def raise_event(self, event: str, data: Any): | |
| with self._lock: | |
| handlers = self._handlers[event][:] | |
| for handler in handlers: | |
| try: | |
| handler(data) | |
| except Exception as e: | |
| console.log(f"Error in event handler for {event}: {e}", style="bold red") | |
| EVENT_MANAGER = EventManager() | |
| def RegisterEvent(event: str, handler: Callable): | |
| EVENT_MANAGER.register(event, handler) | |
| def RaiseEvent(event: str, data: Any): | |
| EVENT_MANAGER.raise_event(event, data) | |
| def UnregisterEvent(event: str, handler: Callable): | |
| EVENT_MANAGER.unregister(event, handler) | |
| class LLMAgent: | |
| """Main Agent Driver ! | |
| Agent For Multiple messages at once , | |
| has a message queing service as well as agenerator method for easy intergration with console | |
| applications as well as ui !""" | |
| def __init__( | |
| self, | |
| model_id: str = BASEMODEL_ID, | |
| system_prompt: str = None, | |
| max_queue_size: int = 1000, | |
| max_retries: int = 3, | |
| timeout: int = 30000, | |
| max_tokens: int = 5000, | |
| temperature: float = 0.3, | |
| base_url: str = "http://localhost:1234/v1", | |
| api_key: str = "not-needed", | |
| generate_fn: Callable[[List[Dict[str, str]]], Coroutine[Any, Any, str]] = None | |
| ): | |
| self.model_id = model_id | |
| self.system_prompt = system_prompt or "You are a helpful AI assistant." | |
| self.request_queue = Queue(maxsize=max_queue_size) | |
| self.max_retries = max_retries | |
| self.timeout = timeout | |
| self.is_running = False | |
| self._stop_event = Event() | |
| self.processing_thread = None | |
| # Conversation tracking | |
| self.conversations: Dict[str, List[LLMMessage]] = {} | |
| self.max_history_length = 20 | |
| self._generate = generate_fn or self._default_generate | |
| self.api_key = api_key | |
| self.base_url = base_url | |
| self.max_tokens = max_tokens | |
| self.temperature = temperature | |
| self.async_client = self.CreateClient(base_url, api_key) | |
| # Canvas Artifacts - NEW | |
| self.canvas_artifacts: Dict[str, List[CanvasArtifact]] = {} | |
| self.canvas_lock = Lock() | |
| # Active requests waiting for responses | |
| self.pending_requests: Dict[str, LLMRequest] = {} | |
| self.pending_requests_lock = Lock() | |
| # Speech synthesis | |
| try: | |
| self.tts_engine = pyttsx3.init() | |
| self.setup_tts() | |
| self.speech_enabled = True | |
| except Exception as e: | |
| console.log(f"[yellow]TTS not available: {e}[/yellow]") | |
| self.speech_enabled = False | |
| # Register internal event handlers | |
| self._register_event_handlers() | |
| # Start the processing thread immediately | |
| self.start() | |
| def setup_tts(self): | |
| """Configure text-to-speech engine""" | |
| if hasattr(self, 'tts_engine'): | |
| voices = self.tts_engine.getProperty('voices') | |
| if voices: | |
| self.tts_engine.setProperty('voice', voices[0].id) | |
| self.tts_engine.setProperty('rate', 150) | |
| self.tts_engine.setProperty('volume', 0.8) | |
| def speak(self, text: str): | |
| """Convert text to speech in a non-blocking way""" | |
| if not hasattr(self, 'speech_enabled') or not self.speech_enabled: | |
| return | |
| def _speak(): | |
| try: | |
| # Clean text for speech (remove markdown, code blocks) | |
| clean_text = re.sub(r'```.*?```', '', text, flags=re.DOTALL) | |
| clean_text = re.sub(r'`.*?`', '', clean_text) | |
| clean_text = clean_text.strip() | |
| if clean_text: | |
| self.tts_engine.say(clean_text) | |
| self.tts_engine.runAndWait() | |
| else: | |
| self.tts_engine.say(text) | |
| self.tts_engine.runAndWait() | |
| except Exception as e: | |
| console.log(f"[red]TTS Error: {e}[/red]") | |
| thread = threading.Thread(target=_speak, daemon=True) | |
| thread.start() | |
| async def _default_generate(self, messages: List[Dict[str, str]]) -> str: | |
| """Default generate function if none provided""" | |
| return await self.openai_generate(messages) | |
| def _register_event_handlers(self): | |
| """Register internal event handlers for response routing""" | |
| RegisterEvent("llm_internal_response", self._handle_internal_response) | |
| def _handle_internal_response(self, response: LLMResponse): | |
| """Route responses to the appropriate request handlers""" | |
| console.log(f"[bold cyan]Handling internal response for: {response.request_id}[/bold cyan]") | |
| request = None | |
| with self.pending_requests_lock: | |
| if response.request_id in self.pending_requests: | |
| request = self.pending_requests[response.request_id] | |
| del self.pending_requests[response.request_id] | |
| console.log(f"Found pending request for: {response.request_id}") | |
| else: | |
| console.log(f"No pending request found for: {response.request_id}", style="yellow") | |
| return | |
| # Raise the specific response event | |
| if request.response_event: | |
| console.log(f"[bold green]Raising event: {request.response_event}[/bold green]") | |
| RaiseEvent(request.response_event, response) | |
| # Call callback if provided | |
| if request.callback: | |
| try: | |
| console.log(f"[bold yellow]Calling callback for: {response.request_id}[/bold yellow]") | |
| request.callback(response) | |
| except Exception as e: | |
| console.log(f"Error in callback: {e}", style="bold red") | |
| def _add_to_conversation_history(self, conversation_id: str, message: LLMMessage): | |
| """Add message to conversation history""" | |
| if conversation_id not in self.conversations: | |
| self.conversations[conversation_id] = [] | |
| self.conversations[conversation_id].append(message) | |
| # Trim history if too long | |
| if len(self.conversations[conversation_id]) > self.max_history_length * 2: | |
| self.conversations[conversation_id] = self.conversations[conversation_id][-(self.max_history_length * 2):] | |
| def _build_messages_from_conversation(self, conversation_id: str, new_message: LLMMessage) -> List[Dict[str, str]]: | |
| """Build message list from conversation history""" | |
| messages = [] | |
| # Add system prompt | |
| if self.system_prompt: | |
| messages.append({"role": "system", "content": self.system_prompt}) | |
| # Add conversation history | |
| if conversation_id in self.conversations: | |
| for msg in self.conversations[conversation_id][-self.max_history_length:]: | |
| messages.append({"role": msg.role, "content": msg.content}) | |
| # Add the new message | |
| messages.append({"role": new_message.role, "content": new_message.content}) | |
| return messages | |
| def _process_llm_request(self, request: LLMRequest): | |
| """Process a single LLM request""" | |
| console.log(f"[bold green]Processing LLM request: {request.message.message_id}[/bold green]") | |
| try: | |
| # Build messages for LLM | |
| messages = self._build_messages_from_conversation( | |
| request.message.conversation_id or "default", | |
| request.message | |
| ) | |
| console.log(f"Calling LLM with {len(messages)} messages") | |
| # Call LLM - Use sync call for thread compatibility | |
| response_content = self._call_llm_sync(messages) | |
| console.log(f"[bold green]LLM response received: {response_content}...[/bold green]") | |
| # Create response message | |
| response_message = LLMMessage( | |
| role="assistant", | |
| content=response_content, | |
| conversation_id=request.message.conversation_id, | |
| metadata={"request_id": request.message.message_id} | |
| ) | |
| # Update conversation history | |
| self._add_to_conversation_history( | |
| request.message.conversation_id or "default", | |
| request.message | |
| ) | |
| self._add_to_conversation_history( | |
| request.message.conversation_id or "default", | |
| response_message | |
| ) | |
| # Create and send response | |
| response = LLMResponse( | |
| message=response_message, | |
| request_id=request.message.message_id, | |
| success=True | |
| ) | |
| console.log(f"[bold blue]Sending internal response for: {request.message.message_id}[/bold blue]") | |
| RaiseEvent("llm_internal_response", response) | |
| except Exception as e: | |
| console.log(f"[bold red]Error processing LLM request: {e}[/bold red]") | |
| traceback.print_exc() | |
| # Create error response | |
| error_response = LLMResponse( | |
| message=LLMMessage( | |
| role="system", | |
| content=f"Error: {str(e)}", | |
| conversation_id=request.message.conversation_id | |
| ), | |
| request_id=request.message.message_id, | |
| success=False, | |
| error=str(e) | |
| ) | |
| RaiseEvent("llm_internal_response", error_response) | |
| def _call_llm_sync(self, messages: List[Dict[str, str]]) -> str: | |
| """Sync call to the LLM with retry logic""" | |
| console.log(f"Making LLM call to {self.model_id}") | |
| for attempt in range(self.max_retries): | |
| try: | |
| response = CLIENT.chat.completions.create( | |
| model=self.model_id, | |
| messages=messages, | |
| temperature=self.temperature, | |
| max_tokens=self.max_tokens | |
| ) | |
| content = response.choices[0].message.content | |
| console.log(f"LLM call successful, response length: {len(content)}") | |
| return content | |
| except Exception as e: | |
| console.log(f"LLM call attempt {attempt + 1} failed: {e}") | |
| if attempt == self.max_retries - 1: | |
| raise e | |
| # Wait before retry | |
| def _process_queue(self): | |
| """Main queue processing loop""" | |
| console.log("[bold cyan]LLM Agent queue processor started[/bold cyan]") | |
| while not self._stop_event.is_set(): | |
| try: | |
| request = self.request_queue.get(timeout=1.0) | |
| if request: | |
| console.log(f"Got request from queue: {request.message.message_id}") | |
| self._process_llm_request(request) | |
| self.request_queue.task_done() | |
| except Empty: | |
| continue | |
| except Exception as e: | |
| console.log(f"Error in queue processing: {e}", style="bold red") | |
| traceback.print_exc() | |
| console.log("[bold cyan]LLM Agent queue processor stopped[/bold cyan]") | |
| def send_message( | |
| self, | |
| content: str, | |
| role: str = "user", | |
| conversation_id: str = None, | |
| response_event: str = None, | |
| callback: Callable = None, | |
| metadata: Dict = None | |
| ) -> str: | |
| """Send a message to the LLM and get response via events""" | |
| if not self.is_running: | |
| raise RuntimeError("LLM Agent is not running. Call start() first.") | |
| # Create message | |
| message = LLMMessage( | |
| role=role, | |
| content=content, | |
| conversation_id=conversation_id, | |
| metadata=metadata or {} | |
| ) | |
| # Create request | |
| request = LLMRequest( | |
| message=message, | |
| response_event=response_event, | |
| callback=callback | |
| ) | |
| # Store in pending requests BEFORE adding to queue | |
| with self.pending_requests_lock: | |
| self.pending_requests[message.message_id] = request | |
| console.log(f"Added to pending requests: {message.message_id}") | |
| # Add to queue | |
| try: | |
| self.request_queue.put(request, timeout=5.0) | |
| console.log(f"[bold magenta]Message queued: {message.message_id}, Content: {content[:50]}...[/bold magenta]") | |
| return message.message_id | |
| except queue.Full: | |
| console.log(f"[bold red]Queue full, cannot send message[/bold red]") | |
| with self.pending_requests_lock: | |
| if message.message_id in self.pending_requests: | |
| del self.pending_requests[message.message_id] | |
| raise RuntimeError("LLM Agent queue is full") | |
| async def chat(self, messages: List[Dict[str, str]]) -> str: | |
| """ | |
| Async chat method that sends message via queue and returns response string. | |
| This is the main method you should use. | |
| """ | |
| # Create future for the response | |
| loop = asyncio.get_event_loop() | |
| response_future = loop.create_future() | |
| def chat_callback(response: LLMResponse): | |
| """Callback when LLM responds - thread-safe""" | |
| console.log(f"[bold yellow]β CHAT CALLBACK TRIGGERED![/bold yellow]") | |
| if not response_future.done(): | |
| if response.success: | |
| content = response.message.content | |
| console.log(f"Callback received content: {content}...") | |
| # Schedule setting the future result on the main event loop | |
| loop.call_soon_threadsafe(response_future.set_result, content) | |
| else: | |
| console.log(f"Error in response: {response.error}") | |
| error_msg = f"β Error: {response.error}" | |
| loop.call_soon_threadsafe(response_future.set_result, error_msg) | |
| else: | |
| console.log(f"[bold red]Future already done, ignoring callback[/bold red]") | |
| console.log(f"Sending message to LLM agent...") | |
| # Extract the actual message content from the messages list | |
| user_message = "" | |
| for msg in messages: | |
| if msg.get("role") == "user": | |
| user_message = msg.get("content", "") | |
| break | |
| if not user_message.strip(): | |
| return "" | |
| # Send message with callback using the queue system | |
| try: | |
| message_id = self.send_message( | |
| content=user_message, | |
| conversation_id="default", | |
| callback=chat_callback | |
| ) | |
| console.log(f"Message sent with ID: {message_id}, waiting for response...") | |
| # Wait for the response and return it | |
| try: | |
| response = await asyncio.wait_for(response_future, timeout=self.timeout) | |
| console.log(f"[bold green]β Chat complete! Response length: {len(response)}[/bold green]") | |
| return response | |
| except asyncio.TimeoutError: | |
| console.log("[bold red]Response timeout[/bold red]") | |
| # Clean up the pending request | |
| with self.pending_requests_lock: | |
| if message_id in self.pending_requests: | |
| del self.pending_requests[message_id] | |
| return "β Response timeout - check if LLM server is running" | |
| except Exception as e: | |
| console.log(f"[bold red]Error sending message: {e}[/bold red]") | |
| traceback.print_exc() | |
| return f"β Error sending message: {e}" | |
| def start(self): | |
| """Start the LLM agent""" | |
| if not self.is_running: | |
| self.is_running = True | |
| self._stop_event.clear() | |
| self.processing_thread = Thread(target=self._process_queue, daemon=True) | |
| self.processing_thread.start() | |
| console.log("[bold green]LLM Agent started[/bold green]") | |
| def stop(self): | |
| """Stop the LLM agent""" | |
| console.log("Stopping LLM Agent...") | |
| self._stop_event.set() | |
| if self.processing_thread and self.processing_thread.is_alive(): | |
| self.processing_thread.join(timeout=10) | |
| self.is_running = False | |
| console.log("LLM Agent stopped") | |
| def get_conversation_history(self, conversation_id: str = "default") -> List[LLMMessage]: | |
| """Get conversation history""" | |
| return self.conversations.get(conversation_id, [])[:] | |
| def clear_conversation(self, conversation_id: str = "default"): | |
| """Clear conversation history""" | |
| if conversation_id in self.conversations: | |
| del self.conversations[conversation_id] | |
| async def _chat(self, messages: List[Dict[str, str]]) -> str: | |
| return await self._generate(messages) | |
| async def openai_generate(messages: List[Dict[str, str]], max_tokens: int = 8096, temperature: float = 0.4, model: str = BASEMODEL_ID,tools=None) -> str: | |
| """Static method for generating responses using OpenAI API""" | |
| try: | |
| resp = await BASE_CLIENT.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| tools=tools | |
| ) | |
| response_text = resp.choices[0].message.content or "" | |
| return response_text | |
| except Exception as e: | |
| console.log(f"[bold red]Error in openai_generate: {e}[/bold red]") | |
| return f"[LLM_Agent Error - openai_generate: {str(e)}]" | |
| async def _call_(self, messages: List[Dict[str, str]]) -> str: | |
| """Internal call method using instance client""" | |
| try: | |
| resp = await self.async_client.chat.completions.create( | |
| model=self.model_id, | |
| messages=messages, | |
| temperature=self.temperature, | |
| max_tokens=self.max_tokens | |
| ) | |
| response_text = resp.choices[0].message.content or "" | |
| return response_text | |
| except Exception as e: | |
| console.log(f"[bold red]Error in _call_: {e}[/bold red]") | |
| return f"[LLM_Agent Error - _call_: {str(e)}]" | |
| def CreateClient(base_url: str, api_key: str) -> AsyncOpenAI: | |
| '''Create async OpenAI Client required for multi tasking''' | |
| return AsyncOpenAI( | |
| base_url=base_url, | |
| api_key=api_key | |
| ) | |
| async def fetch_available_models(base_url: str, api_key: str) -> List[str]: | |
| """Fetches available models from the OpenAI API.""" | |
| try: | |
| async_client = AsyncOpenAI(base_url=base_url, api_key=api_key) | |
| models = await async_client.models.list() | |
| model_choices = [model.id for model in models.data] | |
| return model_choices | |
| except Exception as e: | |
| console.log(f"[bold red]LLM_Agent Error fetching models: {e}[/bold red]") | |
| return ["LLM_Agent Error fetching models"] | |
| def get_models(self) -> List[str]: | |
| """Get available models using instance credentials""" | |
| return asyncio.run(self.fetch_available_models(self.base_url, self.api_key)) | |
| def get_queue_size(self) -> int: | |
| """Get current queue size""" | |
| return self.request_queue.qsize() | |
| def get_pending_requests_count(self) -> int: | |
| """Get number of pending requests""" | |
| with self.pending_requests_lock: | |
| return len(self.pending_requests) | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get agent status information""" | |
| return { | |
| "is_running": self.is_running, | |
| "queue_size": self.get_queue_size(), | |
| "pending_requests": self.get_pending_requests_count(), | |
| "conversations_count": len(self.conversations), | |
| "model": self.model_id | |
| } | |
| # --- ADDED CANVAS FUNCTIONALITY --- | |
| def add_canvas_artifact(self, conversation_id: str, artifact_type: str, content: str, title: str = ""): | |
| """Add an artifact to the canvas for a specific conversation.""" | |
| conv_id = conversation_id or "default" | |
| with self.canvas_lock: | |
| if conv_id not in self.canvas_artifacts: | |
| self.canvas_artifacts[conv_id] = [] | |
| artifact = CanvasArtifact( | |
| id=str(uuid.uuid4()), | |
| type=artifact_type, | |
| content=content, | |
| title=title, | |
| timestamp=time.time(), | |
| metadata={} | |
| ) | |
| self.canvas_artifacts[conv_id].append(artifact) | |
| console.log(f"[green]Added {artifact_type} artifact to canvas '{conv_id}'[/green]") | |
| def get_canvas_summary(self, conversation_id: str) -> List[Dict]: | |
| """Get a summary of artifacts on the canvas for JSON display.""" | |
| conv_id = conversation_id or "default" | |
| with self.canvas_lock: | |
| artifacts = self.canvas_artifacts.get(conv_id, []) | |
| # Convert artifacts to dictionaries for JSON serialization | |
| return [ | |
| { | |
| "id": art.id, | |
| "type": art.type, | |
| "title": art.title, | |
| "timestamp": art.timestamp, | |
| "content_preview": art.content[:100] + "..." if len(art.content) > 100 else art.content | |
| } | |
| for art in artifacts | |
| ] | |
| def clear_canvas(self, conversation_id: str): | |
| """Clear all artifacts from the canvas for a specific conversation.""" | |
| conv_id = conversation_id or "default" | |
| with self.canvas_lock: | |
| if conv_id in self.canvas_artifacts: | |
| self.canvas_artifacts[conv_id].clear() | |
| console.log(f"[yellow]Cleared canvas artifacts for '{conv_id}'[/yellow]") | |
| async def chat_with_canvas(self, message: str, conversation_id: str, include_canvas: bool = False): | |
| """Chat method that can optionally include canvas context.""" | |
| messages = [{"role": "user", "content": message}] | |
| if include_canvas: | |
| artifacts = self.get_canvas_summary(conversation_id) | |
| if artifacts: | |
| canvas_context = "Current Canvas Context:\\n" + "\\n".join([ | |
| f"- [{art['type'].upper()}] {art['title'] or 'Untitled'}: {art['content_preview']}" | |
| for art in artifacts | |
| ]) | |
| messages.insert(0, {"role": "system", "content": canvas_context}) | |
| return await self.chat(messages) | |
| class AI_Agent: | |
| def __init__(self, model_id: str, system_prompt: str = "You are a helpful assistant. Respond concisely in 1-2 sentences.", history: List[Dict] = None): | |
| self.model_id = model_id | |
| self.system_prompt = system_prompt | |
| self.history = history or [] | |
| self.conversation_id = f"conv_{uuid.uuid4().hex[:8]}" | |
| # Create agent instance | |
| self.client = LLMAgent( | |
| model_id=model_id, | |
| system_prompt=self.system_prompt, | |
| generate_fn=LLMAgent.openai_generate | |
| ) | |
| console.log(f"[bold green]β MyAgent initialized with model: {model_id}[/bold green]") | |
| async def call_llm(self, messages: List[Dict], use_history: bool = True) -> str: | |
| """ | |
| Send messages to LLM and get response | |
| Args: | |
| messages: List of message dicts with 'role' and 'content' | |
| use_history: Whether to include conversation history | |
| Returns: | |
| str: LLM response | |
| """ | |
| try: | |
| console.log(f"[bold yellow]Sending {len(messages)} messages to LLM (use_history: {use_history})...[/bold yellow]") | |
| # Enhance messages based on history setting | |
| enhanced_messages = await self._enhance_messages(messages, use_history) | |
| response = await self.client.chat(enhanced_messages) | |
| console.log(f"[bold green]β Response received ({len(response)} chars)[/bold green]") | |
| # Update conversation history ONLY if we're using history | |
| if use_history: | |
| self._update_history(messages, response) | |
| return response | |
| except Exception as e: | |
| console.log(f"[bold red]β ERROR: {e}[/bold red]") | |
| traceback.print_exc() | |
| return f"Error: {str(e)}" | |
| async def _enhance_messages(self, messages: List[Dict], use_history: bool) -> List[Dict]: | |
| """Enhance messages with system prompt and optional history""" | |
| enhanced = [] | |
| # Add system prompt if not already in messages | |
| has_system = any(msg.get('role') == 'system' for msg in messages) | |
| if not has_system and self.system_prompt: | |
| enhanced.append({"role": "system", "content": self.system_prompt}) | |
| # Add conversation history only if requested | |
| if use_history and self.history: | |
| enhanced.extend(self.history[-10:]) # Last 10 messages for context | |
| # Add current messages | |
| enhanced.extend(messages) | |
| return enhanced | |
| def _update_history(self, messages: List[Dict], response: str): | |
| """Update conversation history with new exchange""" | |
| # Add user messages to history | |
| for msg in messages: | |
| if msg.get('role') in ['user', 'assistant']: | |
| self.history.append(msg) | |
| # Add assistant response to history | |
| self.history.append({"role": "assistant", "content": response}) | |
| # Keep history manageable (last 20 exchanges) | |
| if len(self.history) > 40: # 20 user + 20 assistant messages | |
| self.history = self.history[-40:] | |
| async def simple_query(self, query: str) -> str: | |
| """Simple one-shot query method - NO history/context""" | |
| messages = [{"role": "user", "content": query}] | |
| return await self.call_llm(messages, use_history=False) | |
| async def multi_turn_chat(self, user_input: str) -> str: | |
| """Multi-turn chat that maintains context across calls""" | |
| messages = [{"role": "user", "content": user_input}] | |
| response = await self.call_llm(messages, use_history=True) | |
| return response | |
| def get_conversation_summary(self) -> Dict: | |
| """Get conversation summary""" | |
| return { | |
| "conversation_id": self.conversation_id, | |
| "total_messages": len(self.history), | |
| "user_messages": len([msg for msg in self.history if msg.get('role') == 'user']), | |
| "assistant_messages": len([msg for msg in self.history if msg.get('role') == 'assistant']), | |
| "recent_exchanges": self.history[-4:] if self.history else [] | |
| } | |
| def clear_history(self): | |
| """Clear conversation history""" | |
| self.history.clear() | |
| console.log("[bold yellow]Conversation history cleared[/bold yellow]") | |
| def update_system_prompt(self, new_prompt: str): | |
| """Update the system prompt""" | |
| self.system_prompt = new_prompt | |
| console.log(f"[bold blue]System prompt updated[/bold blue]") | |
| def stop(self): | |
| """Stop the client gracefully""" | |
| if hasattr(self, 'client') and self.client: | |
| self.client.stop() | |
| console.log("[bold yellow]MyAgent client stopped[/bold yellow]") | |
| async def contextual_query(self, query: str, context_messages: List[Dict] = None, | |
| context_text: str = None, context_files: List[str] = None) -> str: | |
| """ | |
| Query with specific context but doesn't update main history | |
| Args: | |
| query: The user question | |
| context_messages: List of message dicts for context | |
| context_text: Plain text context (will be converted to system message) | |
| context_files: List of file paths to read and include as context | |
| """ | |
| messages = [] | |
| # Add system prompt | |
| if self.system_prompt: | |
| messages.append({"role": "system", "content": self.system_prompt}) | |
| # Handle different context types | |
| if context_messages: | |
| messages.extend(context_messages) | |
| if context_text: | |
| messages.append({"role": "system", "content": f"Additional context: {context_text}"}) | |
| if context_files: | |
| file_context = await self._read_files_context(context_files) | |
| if file_context: | |
| messages.append({"role": "system", "content": f"File contents:\n{file_context}"}) | |
| # Add the actual query | |
| messages.append({"role": "user", "content": query}) | |
| return await self.call_llm(messages, use_history=False) | |
| async def _read_files_context(self, file_paths: List[str]) -> str: | |
| """Read multiple files and return as context string""" | |
| contexts = [] | |
| for file_path in file_paths: | |
| try: | |
| if os.path.exists(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| contexts.append(f"--- {os.path.basename(file_path)} ---\n{content}") | |
| else: | |
| console.log(f"[bold yellow]File not found: {file_path}[/bold yellow]") | |
| except Exception as e: | |
| console.log(f"[bold red]Error reading file {file_path}: {e}[/bold red]") | |
| return "\n\n".join(contexts) if contexts else "" | |
| async def query_with_code_context(self, query: str, code_snippets: List[str] = None, | |
| code_files: List[str] = None) -> str: | |
| """ | |
| Specialized contextual query for code-related questions | |
| """ | |
| code_context = "CODE CONTEXT:\n" | |
| if code_snippets: | |
| for i, snippet in enumerate(code_snippets, 1): | |
| code_context += f"\nSnippet {i}:\n```\n{snippet}\n```\n" | |
| if code_files: | |
| # Read code files and include them | |
| for file_path in code_files: | |
| if file_path.endswith(('.py', '.js', '.java', '.cpp', '.c', '.html', '.css')): | |
| code_context += f"\nFile: {file_path}\n```\n" | |
| try: | |
| with open(file_path, 'r') as f: | |
| code_context += f.read() | |
| except Exception as e: | |
| code_context += f"Error reading file: {e}" | |
| code_context += "\n```\n" | |
| return await self.contextual_query(query, context_text=code_context) | |
| async def multi_context_query(self, query: str, contexts: Dict[str, Any]) -> str: | |
| """ | |
| Advanced contextual query with multiple context types | |
| Args: | |
| query: The user question | |
| contexts: Dict with various context types | |
| - 'messages': List of message dicts | |
| - 'text': Plain text context | |
| - 'files': List of file paths | |
| - 'urls': List of URLs | |
| - 'code': List of code snippets or files | |
| - 'metadata': Any additional metadata | |
| """ | |
| all_context_messages = [] | |
| # Build context from different sources | |
| if contexts.get('text'): | |
| all_context_messages.append({"role": "system", "content": f"Context: {contexts['text']}"}) | |
| if contexts.get('messages'): | |
| all_context_messages.extend(contexts['messages']) | |
| if contexts.get('files'): | |
| file_context = await self._read_files_context(contexts['files']) | |
| if file_context: | |
| all_context_messages.append({"role": "system", "content": f"File Contents:\n{file_context}"}) | |
| if contexts.get('code'): | |
| code_context = "\n".join([f"Code snippet {i}:\n```\n{code}\n```" | |
| for i, code in enumerate(contexts['code'], 1)]) | |
| all_context_messages.append({"role": "system", "content": f"Code Context:\n{code_context}"}) | |
| if contexts.get('metadata'): | |
| all_context_messages.append({"role": "system", "content": f"Metadata: {contexts['metadata']}"}) | |
| return await self.contextual_query(query, context_messages=all_context_messages) | |
| console = Console() | |
| # --- Canvas Artifact Support --- | |
| class CanvasArtifact: | |
| id: str | |
| type: str # 'code', 'diagram', 'text', 'image' | |
| content: str | |
| title: str | |
| timestamp: float | |
| metadata: Dict[str, Any] | |
| class EnhancedAIAgent: | |
| """ | |
| Wrapper around your AI_Agent that adds canvas/artifact management | |
| without modifying the original agent. | |
| """ | |
| def __init__(self, ai_agent): | |
| self.agent = ai_agent | |
| self.canvas_artifacts: Dict[str, List[CanvasArtifact]] = {} | |
| self.max_canvas_artifacts = 50 | |
| console.log("[bold green]β Enhanced AI Agent wrapper initialized[/bold green]") | |
| def add_artifact_to_canvas(self, conversation_id: str, content: str, | |
| artifact_type: str = "code", title: str = None): | |
| """Add artifacts to the collaborative canvas""" | |
| if conversation_id not in self.canvas_artifacts: | |
| self.canvas_artifacts[conversation_id] = [] | |
| artifact = CanvasArtifact( | |
| id=str(uuid.uuid4())[:8], | |
| type=artifact_type, | |
| content=content, | |
| title=title or f"{artifact_type}_{len(self.canvas_artifacts[conversation_id]) + 1}", | |
| timestamp=time.time(), | |
| metadata={"conversation_id": conversation_id} | |
| ) | |
| self.canvas_artifacts[conversation_id].append(artifact) | |
| # Keep only recent artifacts | |
| if len(self.canvas_artifacts[conversation_id]) > self.max_canvas_artifacts: | |
| self.canvas_artifacts[conversation_id] = self.canvas_artifacts[conversation_id][-self.max_canvas_artifacts:] | |
| console.log(f"[green]Added artifact to canvas: {artifact.title}[/green]") | |
| return artifact | |
| def get_canvas_context(self, conversation_id: str) -> str: | |
| """Get formatted canvas context for LLM prompts""" | |
| if conversation_id not in self.canvas_artifacts or not self.canvas_artifacts[conversation_id]: | |
| return "" | |
| context_lines = ["\n=== COLLABORATIVE CANVAS ARTIFACTS ==="] | |
| for artifact in self.canvas_artifacts[conversation_id][-10:]: # Last 10 artifacts | |
| context_lines.append(f"\n--- {artifact.title} [{artifact.type.upper()}] ---") | |
| preview = artifact.content[:500] + "..." if len(artifact.content) > 500 else artifact.content | |
| context_lines.append(preview) | |
| return "\n".join(context_lines) + "\n=================================\n" | |
| async def chat_with_canvas(self, message: str, conversation_id: str = "default", | |
| include_canvas: bool = True) -> str: | |
| """Enhanced chat that includes canvas context""" | |
| # Build context with canvas artifacts if requested | |
| full_message = message | |
| if include_canvas: | |
| canvas_context = self.get_canvas_context(conversation_id) | |
| if canvas_context: | |
| full_message = f"{canvas_context}\n\nUser Query: {message}" | |
| try: | |
| # Use your original agent's multi_turn_chat method | |
| response = await self.agent.multi_turn_chat(full_message) | |
| # Auto-extract and add code artifacts to canvas | |
| self._extract_artifacts_to_canvas(response, conversation_id) | |
| return response | |
| except Exception as e: | |
| error_msg = f"Error in chat_with_canvas: {str(e)}" | |
| console.log(f"[red]{error_msg}[/red]") | |
| return error_msg | |
| def _extract_artifacts_to_canvas(self, response: str, conversation_id: str): | |
| """Automatically extract code blocks and add to canvas""" | |
| # Find all code blocks with optional language specification | |
| code_blocks = re.findall(r'```(?:(\w+)\n)?(.*?)```', response, re.DOTALL) | |
| for i, (lang, code_block) in enumerate(code_blocks): | |
| if len(code_block.strip()) > 10: # Only add substantial code blocks | |
| self.add_artifact_to_canvas( | |
| conversation_id, | |
| code_block.strip(), | |
| "code", | |
| f"code_snippet_{lang or 'unknown'}_{len(self.canvas_artifacts.get(conversation_id, [])) + 1}" | |
| ) | |
| def get_canvas_summary(self, conversation_id: str) -> List[Dict]: | |
| """Get summary of canvas artifacts for display""" | |
| if conversation_id not in self.canvas_artifacts: | |
| return [] | |
| artifacts = [] | |
| for artifact in reversed(self.canvas_artifacts[conversation_id]): # Newest first | |
| artifacts.append({ | |
| "id": artifact.id, | |
| "type": artifact.type.upper(), | |
| "title": artifact.title, | |
| "preview": artifact.content[:100] + "..." if len(artifact.content) > 100 else artifact.content, | |
| "timestamp": time.strftime("%H:%M:%S", time.localtime(artifact.timestamp)) | |
| }) | |
| return artifacts | |
| def get_artifact_by_id(self, conversation_id: str, artifact_id: str) -> Optional[CanvasArtifact]: | |
| """Get specific artifact by ID""" | |
| if conversation_id not in self.canvas_artifacts: | |
| return None | |
| for artifact in self.canvas_artifacts[conversation_id]: | |
| if artifact.id == artifact_id: | |
| return artifact | |
| return None | |
| def clear_canvas(self, conversation_id: str = "default"): | |
| """Clear canvas artifacts""" | |
| if conversation_id in self.canvas_artifacts: | |
| self.canvas_artifacts[conversation_id] = [] | |
| console.log(f"[yellow]Cleared canvas: {conversation_id}[/yellow]") | |
| def get_latest_code_artifact(self, conversation_id: str) -> Optional[str]: | |
| """Get the most recent code artifact content""" | |
| if conversation_id not in self.canvas_artifacts: | |
| return None | |
| for artifact in reversed(self.canvas_artifacts[conversation_id]): | |
| if artifact.type == "code": | |
| return artifact.content | |
| return None | |
| console = Console() | |
| # --- LCARS Styled Gradio Interface --- | |
| class LcarsInterface: | |
| def __init__(self): | |
| # Start with HuggingFace by default for Spaces | |
| self.use_huggingface = True | |
| self.agent = LLMAgent(generate_fn=LLMAgent.openai_generate) | |
| self.current_conversation = "default" | |
| def create_interface(self): | |
| """Create the full LCARS-styled interface""" | |
| lcars_css = """ | |
| :root { | |
| --lcars-orange: #FF9900; | |
| --lcars-red: #FF0033; | |
| --lcars-blue: #6699FF; | |
| --lcars-purple: #CC99FF; | |
| --lcars-pale-blue: #99CCFF; | |
| --lcars-black: #000000; | |
| --lcars-dark-blue: #3366CC; | |
| --lcars-gray: #424242; | |
| --lcars-yellow: #FFFF66; | |
| } | |
| body { | |
| background: var(--lcars-black); | |
| color: var(--lcars-orange); | |
| font-family: 'Antonio', 'LCD', 'Courier New', monospace; | |
| margin: 0; | |
| padding: 0; | |
| } | |
| .gradio-container { | |
| background: var(--lcars-black) !important; | |
| min-height: 100vh; | |
| } | |
| .lcars-container { | |
| background: var(--lcars-black); | |
| border: 4px solid var(--lcars-orange); | |
| border-radius: 0 30px 0 0; | |
| min-height: 100vh; | |
| padding: 20px; | |
| } | |
| .lcars-header { | |
| background: linear-gradient(90deg, var(--lcars-red), var(--lcars-orange)); | |
| padding: 20px 40px; | |
| border-radius: 0 60px 0 0; | |
| margin: -20px -20px 20px -20px; | |
| border-bottom: 6px solid var(--lcars-blue); | |
| } | |
| .lcars-title { | |
| font-size: 2.5em; | |
| font-weight: bold; | |
| color: var(--lcars-black); | |
| margin: 0; | |
| } | |
| .lcars-subtitle { | |
| font-size: 1.2em; | |
| color: var(--lcars-black); | |
| margin: 10px 0 0 0; | |
| } | |
| .lcars-panel { | |
| background: rgba(66, 66, 66, 0.9); | |
| border: 2px solid var(--lcars-orange); | |
| border-radius: 0 20px 0 20px; | |
| padding: 15px; | |
| margin-bottom: 15px; | |
| } | |
| .lcars-button { | |
| background: var(--lcars-orange); | |
| color: var(--lcars-black) !important; | |
| border: none !important; | |
| border-radius: 0 15px 0 15px !important; | |
| padding: 10px 20px !important; | |
| font-family: inherit !important; | |
| font-weight: bold !important; | |
| margin: 5px !important; | |
| } | |
| .lcars-button:hover { | |
| background: var(--lcars-red) !important; | |
| } | |
| .lcars-input { | |
| background: var(--lcars-black) !important; | |
| color: var(--lcars-orange) !important; | |
| border: 2px solid var(--lcars-blue) !important; | |
| border-radius: 0 10px 0 10px !important; | |
| padding: 10px !important; | |
| } | |
| .lcars-chatbot { | |
| background: var(--lcars-black) !important; | |
| border: 2px solid var(--lcars-purple) !important; | |
| border-radius: 0 15px 0 15px !important; | |
| } | |
| .status-indicator { | |
| display: inline-block; | |
| width: 12px; | |
| height: 12px; | |
| border-radius: 50%; | |
| background: var(--lcars-red); | |
| margin-right: 8px; | |
| } | |
| .status-online { | |
| background: var(--lcars-blue); | |
| animation: pulse 2s infinite; | |
| } | |
| @keyframes pulse { | |
| 0% { opacity: 1; } | |
| 50% { opacity: 0.5; } | |
| 100% { opacity: 1; } | |
| } | |
| """ | |
| with gr.Blocks(css=lcars_css, theme=gr.themes.Default(), title="LCARS Terminal") as interface: | |
| with gr.Column(elem_classes="lcars-container"): | |
| # Header | |
| with gr.Sidebar(): | |
| gr.LoginButton() | |
| with gr.Row(elem_classes="lcars-header"): | |
| gr.Markdown(""" | |
| <div style="text-align: center; width: 100%;"> | |
| <div class="lcars-title">π LCARS TERMINAL</div> | |
| <div class="lcars-subtitle">STARFLEET AI DEVELOPMENT CONSOLE</div> | |
| <div style="margin-top: 10px;"> | |
| <span class="status-indicator status-online"></span> | |
| <span style="color: var(--lcars-black); font-weight: bold;">SYSTEM ONLINE</span> | |
| </div> | |
| </div> | |
| """) | |
| # Main Content | |
| with gr.Row(): | |
| # Left Sidebar | |
| with gr.Column(scale=1): | |
| # Configuration Panel | |
| with gr.Column(elem_classes="lcars-panel"): | |
| # Connection Type Selector | |
| with gr.Row(elem_classes="lcars-panel"): | |
| connection_type = gr.Radio(label = "### π CONNECTION TYPE", | |
| choices=["HuggingFace Inference", "Local LM Studio"], | |
| value="HuggingFace Inference", | |
| elem_classes="lcars-input" | |
| ) | |
| gr.Markdown("### π§ CONFIGURATION") | |
| # Connection-specific settings | |
| with gr.Row(visible=False) as local_settings: | |
| base_url = gr.Textbox( | |
| value=LOCAL_BASE_URL, | |
| label="LM Studio URL", | |
| elem_classes="lcars-input" | |
| ) | |
| api_key = gr.Textbox( | |
| value=LOCAL_API_KEY, | |
| label="API Key", | |
| type="password", | |
| elem_classes="lcars-input" | |
| ) | |
| with gr.Row(visible=True) as hf_settings: | |
| hf_api_key = gr.Textbox( | |
| value=HF_API_KEY, | |
| label="HuggingFace API Key", | |
| type="password", | |
| elem_classes="lcars-input", | |
| placeholder="Get from https://huggingface.co/settings/tokens" | |
| ) | |
| with gr.Row(): | |
| model_dropdown = gr.Dropdown( | |
| choices=list(MODEL_OPTIONS.keys())[1:], | |
| value=list(MODEL_OPTIONS.keys())[1], | |
| label="AI Model", | |
| elem_classes="lcars-input" | |
| ) | |
| fetch_models_btn = gr.Button("π‘ Fetch Models", elem_classes="lcars-button") | |
| with gr.Row(): | |
| temperature = gr.Slider(0.0, 2.0, value=0.7, label="Temperature") | |
| max_tokens = gr.Slider(128, 8192, value=2000, step=128, label="Max Tokens") | |
| with gr.Row(): | |
| update_config_btn = gr.Button("πΎ Apply Config", elem_classes="lcars-button") | |
| speech_toggle = gr.Checkbox(value=True, label="π Speech Output") | |
| # Canvas Artifacts | |
| with gr.Column(elem_classes="lcars-panel"): | |
| gr.Markdown("### π¨ CANVAS ARTIFACTS") | |
| artifact_display = gr.JSON(label="") | |
| with gr.Row(): | |
| refresh_artifacts_btn = gr.Button("π Refresh", elem_classes="lcars-button") | |
| clear_canvas_btn = gr.Button("ποΈ Clear Canvas", elem_classes="lcars-button") | |
| # Main Content Area | |
| with gr.Column(scale=2): | |
| # Code Canvas | |
| with gr.Accordion("π» COLLABORATIVE CODE CANVAS", open=False): | |
| code_editor = gr.Code( | |
| value="# Welcome to LCARS Collaborative Canvas\\nprint('Hello, Starfleet!')", | |
| language="python", | |
| lines=15, | |
| label="" | |
| ) | |
| with gr.Row(): | |
| load_to_chat_btn = gr.Button("π¬ Discuss Code", elem_classes="lcars-button") | |
| analyze_btn = gr.Button("π Analyze", elem_classes="lcars-button") | |
| optimize_btn = gr.Button("β‘ Optimize", elem_classes="lcars-button") | |
| # Chat Interface | |
| with gr.Column(elem_classes="lcars-panel"): | |
| gr.Markdown("### π¬ MISSION LOG") | |
| chatbot = gr.Chatbot(label="", height=300) | |
| with gr.Row(): | |
| message_input = gr.Textbox( | |
| placeholder="Enter your command or query...", | |
| show_label=False, | |
| lines=2, | |
| scale=4 | |
| ) | |
| send_btn = gr.Button("π SEND", elem_classes="lcars-button", scale=1) | |
| # Status | |
| with gr.Row(): | |
| status_display = gr.Textbox( | |
| value="LCARS terminal operational. Awaiting commands.", | |
| label="Status", | |
| max_lines=2 | |
| ) | |
| with gr.Column(scale=0): | |
| clear_chat_btn = gr.Button("ποΈ Clear Chat", elem_classes="lcars-button") | |
| new_session_btn = gr.Button("π New Session", elem_classes="lcars-button") | |
| # === EVENT HANDLERS === | |
| def switch_connection(connection_type): | |
| if connection_type == "Local LM Studio": | |
| return [ | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(choices=list(MODEL_OPTIONS.keys())[1:], value=list(MODEL_OPTIONS.keys())[1]) | |
| ] | |
| else: | |
| return [ | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(choices=list(MODEL_OPTIONS.keys())[1:], value=list(MODEL_OPTIONS.keys())[1]) | |
| ] | |
| async def fetch_models_updated(connection_type, base_url_val, api_key_val, hf_api_key_val): | |
| # Fixed: Removed the 'use_huggingface' parameter | |
| if connection_type == "Local LM Studio": | |
| models = await LLMAgent.fetch_available_models( | |
| base_url_val, api_key_val | |
| ) | |
| else: | |
| # Using the HF_INFERENCE_URL and the key | |
| models = await LLMAgent.fetch_available_models( | |
| HF_INFERENCE_URL, hf_api_key_val | |
| ) | |
| if models: | |
| return gr.update(choices=models, value=models[0]) | |
| return gr.update(choices=["No models found"]) | |
| def update_agent_connection(connection_type, model_id, base_url_val, api_key_val, hf_api_key_val): | |
| # Fixed: Removed the 'use_huggingface' parameter from the constructor | |
| use_hf = connection_type == "HuggingFace Inference" | |
| if use_hf: | |
| # Use the model_id directly (it's the model name like 'codellama/CodeLlama-7b-hf') | |
| self.agent = LLMAgent( | |
| model_id=model_id, | |
| base_url=HF_INFERENCE_URL, | |
| api_key=hf_api_key_val, | |
| generate_fn=LLMAgent.openai_generate | |
| ) | |
| return f"β Switched to HuggingFace: {model_id}" | |
| else: | |
| self.agent = LLMAgent( | |
| model_id=model_id, | |
| base_url=base_url_val, | |
| api_key=api_key_val, | |
| generate_fn=LLMAgent.openai_generate | |
| ) | |
| return f"β Switched to Local: {base_url_val}" | |
| async def process_message(message, history, speech_enabled): | |
| if not message.strip(): | |
| return "", history, "Please enter a message" | |
| history = history + [[message, None]] | |
| try: | |
| # Fixed: Uses the new chat_with_canvas method which includes canvas context | |
| response = await self.agent.chat_with_canvas( | |
| message, self.current_conversation, include_canvas=True | |
| ) | |
| history[-1][1] = response | |
| if speech_enabled and self.agent.speech_enabled: | |
| self.agent.speak(response) | |
| artifacts = self.agent.get_canvas_summary(self.current_conversation) | |
| status = f"β Response received. Canvas artifacts: {len(artifacts)}" | |
| return "", history, status, artifacts | |
| except Exception as e: | |
| error_msg = f"β Error: {str(e)}" | |
| history[-1][1] = error_msg | |
| return "", history, error_msg, self.agent.get_canvas_summary(self.current_conversation) | |
| def get_artifacts(): | |
| return self.agent.get_canvas_summary(self.current_conversation) | |
| def clear_canvas(): | |
| self.agent.clear_canvas(self.current_conversation) | |
| return [], "β Canvas cleared" | |
| def clear_chat(): | |
| self.agent.clear_conversation(self.current_conversation) | |
| return [], "β Chat cleared" | |
| def new_session(): | |
| self.agent.clear_conversation(self.current_conversation) | |
| self.agent.clear_canvas(self.current_conversation) | |
| return [], "# New session started\\nprint('Ready!')", "π New session started", [] | |
| # Connect events | |
| connection_type.change(switch_connection, inputs=connection_type, | |
| outputs=[local_settings, hf_settings, model_dropdown]) | |
| fetch_models_btn.click(fetch_models_updated, | |
| inputs=[connection_type, base_url, api_key, hf_api_key], | |
| outputs=model_dropdown) | |
| update_config_btn.click(update_agent_connection, | |
| inputs=[connection_type, model_dropdown, base_url, api_key, hf_api_key], | |
| outputs=status_display) | |
| send_btn.click(process_message, | |
| inputs=[message_input, chatbot, speech_toggle], | |
| outputs=[message_input, chatbot, status_display, artifact_display]) | |
| message_input.submit(process_message, | |
| inputs=[message_input, chatbot, speech_toggle], | |
| outputs=[message_input, chatbot, status_display, artifact_display]) | |
| refresh_artifacts_btn.click(get_artifacts, outputs=artifact_display) | |
| clear_canvas_btn.click(clear_canvas, outputs=[artifact_display, status_display]) | |
| clear_chat_btn.click(clear_chat, outputs=[chatbot, status_display]) | |
| new_session_btn.click(new_session, outputs=[chatbot, code_editor, status_display, artifact_display]) | |
| interface.load(get_artifacts, outputs=artifact_display) | |
| return interface | |
| # --- Main Application --- | |
| def main(): | |
| console.log("[bold blue]π Starting LCARS Terminal...[/bold blue]") | |
| is_space = os.getenv('SPACE_ID') is not None | |
| if is_space: | |
| console.log("[green]π Detected HuggingFace Space[/green]") | |
| else: | |
| console.log("[blue]π» Running locally[/blue]") | |
| interface = LcarsInterface() | |
| demo = interface.create_interface() | |
| demo.launch( | |
| share=is_space | |
| ) | |
| if __name__ == "__main__": | |
| main() |