OpenSpace / openspace /utils /ui_integration.py
darkfire514's picture
Upload 160 files
399b80c verified
"""
OpenSpace UI Integration
Integrates the UI system with OpenSpace core components.
Provides hooks and callbacks to update UI in real-time.
"""
import asyncio
from typing import Optional
from openspace.utils.ui import OpenSpaceUI, AgentStatus
from openspace.utils.logging import Logger
logger = Logger.get_logger(__name__)
class UIIntegration:
"""
UI Integration for OpenSpace
Connects OpenSpace components with the UI system to provide real-time
visualization of agent activities and execution flow.
"""
def __init__(self, ui: OpenSpaceUI):
"""
Initialize UI integration
Args:
ui: OpenSpaceUI instance
"""
self.ui = ui
self._update_task: Optional[asyncio.Task] = None
self._running = False
# Tracked components
self._llm_client = None
self._grounding_client = None
def attach_llm_client(self, llm_client):
"""
Attach LLM client
Args:
llm_client: LLMClient instance
"""
self._llm_client = llm_client
logger.debug("UI attached to LLMClient")
def attach_grounding_client(self, grounding_client):
"""
Attach grounding client
Args:
grounding_client: GroundingClient instance
"""
self._grounding_client = grounding_client
logger.debug("UI attached to GroundingClient")
async def start_monitoring(self, poll_interval: float = 0.5):
"""
Start monitoring and updating UI
Args:
poll_interval: Update interval in seconds
"""
if self._running:
logger.warning("UI monitoring already running")
return
self._running = True
# Immediately update UI once before starting the loop
await self._update_ui()
self._update_task = asyncio.create_task(
self._monitor_loop(poll_interval)
)
logger.info("UI monitoring started")
async def stop_monitoring(self):
"""Stop monitoring"""
if not self._running:
return
self._running = False
if self._update_task:
self._update_task.cancel()
try:
await self._update_task
except asyncio.CancelledError:
pass
logger.info("UI monitoring stopped")
async def _monitor_loop(self, poll_interval: float):
"""
Main monitoring loop
Args:
poll_interval: Update interval in seconds
"""
while self._running:
try:
await self._update_ui()
await asyncio.sleep(poll_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"UI update error: {e}", exc_info=True)
async def _update_ui(self):
"""Update UI with current state"""
# Update grounding backends info
if self._grounding_client:
backends = []
try:
# Get list of providers
providers = self._grounding_client.list_providers()
for backend_type, provider in providers.items():
backend_name = backend_type.value if hasattr(backend_type, 'value') else str(backend_type)
backend_info = {
"name": backend_name,
"type": backend_name, # gui, shell, mcp, system, web
"servers": []
}
# For MCP provider, get server names
if backend_name == "mcp":
try:
# Try to get MCP sessions from provider
if hasattr(provider, '_sessions'):
backend_info["servers"] = list(provider._sessions.keys())
except Exception:
pass
backends.append(backend_info)
self.ui.update_grounding_backends(backends)
except Exception as e:
logger.debug(f"Failed to update grounding backends: {e}")
# Refresh display
self.ui.update_display()
# Event handlers - to be called by agents
def on_agent_start(self, agent_name: str, activity: str):
"""
Called when agent starts an activity
Args:
agent_name: Agent name
activity: Activity description
"""
self.ui.update_agent_status(agent_name, AgentStatus.EXECUTING)
self.ui.add_agent_activity(agent_name, activity)
self.ui.add_log(f"{agent_name}: {activity}", level="info")
def on_agent_thinking(self, agent_name: str):
"""
Called when agent is thinking
Args:
agent_name: Agent name
"""
self.ui.update_agent_status(agent_name, AgentStatus.THINKING)
def on_agent_complete(self, agent_name: str, result: str = ""):
"""
Called when agent completes an activity
Args:
agent_name: Agent name
result: Result description
"""
self.ui.update_agent_status(agent_name, AgentStatus.IDLE)
if result:
self.ui.add_log(f"{agent_name}: {result}", level="success")
def on_llm_call(self, model: str, prompt_length: int):
"""
Called when LLM is called
Args:
model: Model name
prompt_length: Prompt length
"""
self.ui.update_metrics(
llm_calls=self.ui.metrics.get("llm_calls", 0) + 1
)
self.ui.add_log(f"LLM call: {model} (prompt: {prompt_length} chars)", level="debug")
def on_grounding_call(self, backend: str, action: str):
"""
Called when grounding backend is called
Args:
backend: Backend name
action: Action description
"""
self.ui.add_grounding_operation(backend, action, status="pending")
self.ui.add_log(f"Grounding [{backend}]: {action}", level="info")
def on_grounding_complete(self, backend: str, action: str, success: bool):
"""
Called when grounding operation completes
Args:
backend: Backend name
action: Action description
success: Whether operation succeeded
"""
status = "success" if success else "error"
# Update last operation status
for op in reversed(self.ui.grounding_operations):
if op["backend"] == backend and op["action"] == action and op["status"] == "pending":
op["status"] = status
break
level = "success" if success else "error"
result = "✓" if success else "✗"
self.ui.add_log(f"Grounding [{backend}]: {action} {result}", level=level)
def on_iteration(self, iteration: int):
"""
Called on each iteration
Args:
iteration: Iteration number
"""
self.ui.update_metrics(iterations=iteration)
def on_error(self, message: str):
"""
Called when an error occurs
Args:
message: Error message
"""
self.ui.add_log(f"ERROR: {message}", level="error")
class UILoggingHandler:
"""
Logging handler that forwards logs to UI
"""
def __init__(self, ui: OpenSpaceUI):
"""
Initialize logging handler
Args:
ui: OpenSpaceUI instance
"""
self.ui = ui
def emit(self, record):
"""
Emit a log record to UI
Args:
record: Log record
"""
level_map = {
"DEBUG": "debug",
"INFO": "info",
"WARNING": "warning",
"ERROR": "error",
"CRITICAL": "error",
}
level = level_map.get(record.levelname, "info")
message = record.getMessage()
# Filter out noisy logs
if any(skip in message.lower() for skip in ["processing card", "workflow poll"]):
return
self.ui.add_log(message, level=level)
def create_integration(ui: OpenSpaceUI) -> UIIntegration:
"""
Create UI integration instance
Args:
ui: OpenSpaceUI instance
Returns:
UIIntegration instance
"""
return UIIntegration(ui)