cidadao.ai-backend / src /services /agent_lazy_loader.py
anderson-ufrj
feat: implement Oxossi agent - fraud detection specialist
bb53f16
"""
Lazy Loading Service for AI Agents
Optimizes memory usage and startup time by loading agents on-demand
"""
import asyncio
import importlib
import inspect
from typing import Dict, Type, Optional, Any, List, Callable
from datetime import datetime, timedelta
import weakref
from src.core import get_logger
from src.agents.deodoro import BaseAgent
from src.core.exceptions import AgentExecutionError
logger = get_logger(__name__)
class AgentMetadata:
"""Metadata for lazy-loaded agents."""
def __init__(
self,
name: str,
module_path: str,
class_name: str,
description: str,
capabilities: List[str],
priority: int = 0,
preload: bool = False
):
self.name = name
self.module_path = module_path
self.class_name = class_name
self.description = description
self.capabilities = capabilities
self.priority = priority
self.preload = preload
self.loaded_class: Optional[Type[BaseAgent]] = None
self.last_used: Optional[datetime] = None
self.usage_count: int = 0
self.load_time_ms: Optional[float] = None
class AgentLazyLoader:
"""
Lazy loading manager for AI agents.
Features:
- On-demand agent loading
- Memory-efficient agent management
- Automatic unloading of unused agents
- Preloading for critical agents
- Usage tracking and statistics
"""
def __init__(
self,
unload_after_minutes: int = 15,
max_loaded_agents: int = 10
):
"""
Initialize lazy loader.
Args:
unload_after_minutes: Minutes of inactivity before unloading
max_loaded_agents: Maximum number of loaded agents in memory
"""
self.unload_after_minutes = unload_after_minutes
self.max_loaded_agents = max_loaded_agents
# Agent registry
self._registry: Dict[str, AgentMetadata] = {}
# Weak references to track agent instances
self._instances: weakref.WeakValueDictionary = weakref.WeakValueDictionary()
# Loading statistics
self._stats = {
"total_loads": 0,
"cache_hits": 0,
"cache_misses": 0,
"total_unloads": 0,
"avg_load_time_ms": 0.0
}
# Cleanup task
self._cleanup_task: Optional[asyncio.Task] = None
self._running = False
# Initialize with default agents
self._register_default_agents()
def _register_default_agents(self):
"""Register all available agents."""
# Core agents - high priority, preload
self.register_agent(
name="Zumbi",
module_path="src.agents.zumbi",
class_name="ZumbiAgent",
description="Anomaly detection investigator",
capabilities=["anomaly_detection", "fraud_analysis", "pattern_recognition"],
priority=10,
preload=True
)
self.register_agent(
name="Anita",
module_path="src.agents.anita",
class_name="AnitaAgent",
description="Pattern analysis specialist",
capabilities=["pattern_analysis", "trend_detection", "correlation_analysis"],
priority=10,
preload=True
)
self.register_agent(
name="Tiradentes",
module_path="src.agents.tiradentes",
class_name="TiradentesAgent",
description="Natural language report generation",
capabilities=["report_generation", "summarization", "natural_language"],
priority=10,
preload=True
)
# Extended agents - lower priority, lazy load
self.register_agent(
name="MariaCurie",
module_path="src.agents.legacy.mariacurie",
class_name="MariaCurieAgent",
description="Scientific research specialist",
capabilities=["research", "data_analysis", "methodology"],
priority=5,
preload=False
)
self.register_agent(
name="Drummond",
module_path="src.agents.legacy.drummond",
class_name="DrummondAgent",
description="Communication and writing specialist",
capabilities=["writing", "communication", "poetry"],
priority=5,
preload=False
)
self.register_agent(
name="JoseBonifacio",
module_path="src.agents.bonifacio",
class_name="BonifacioAgent",
description="Policy and governance analyst",
capabilities=["policy_analysis", "governance", "regulation"],
priority=5,
preload=False
)
self.register_agent(
name="MariaQuiteria",
module_path="src.agents.maria_quiteria",
class_name="MariaQuiteriaAgent",
description="Security auditor and system integrity guardian",
capabilities=["security_audit", "threat_detection", "compliance"],
priority=5,
preload=False
)
self.register_agent(
name="Dandara",
module_path="src.agents.dandara",
class_name="DandaraAgent",
description="Social justice and equity monitoring specialist",
capabilities=["social_equity", "inclusion_policies", "justice_analysis"],
priority=5,
preload=False
)
self.register_agent(
name="Machado",
module_path="src.agents.machado",
class_name="MachadoAgent",
description="Government document analysis and text processing",
capabilities=["text_analysis", "document_processing", "nlp"],
priority=5,
preload=False
)
self.register_agent(
name="Obaluaie",
module_path="src.agents.obaluaie",
class_name="CorruptionDetectorAgent",
description="Corruption detection and systemic pattern analysis",
capabilities=["corruption_detection", "fraud_detection", "systemic_analysis"],
priority=8,
preload=False
)
self.register_agent(
name="OscarNiemeyer",
module_path="src.agents.oscar_niemeyer",
class_name="OscarNiemeyerAgent",
description="Data aggregation and visualization metadata specialist",
capabilities=["data_aggregation", "visualization", "multidimensional_analysis"],
priority=5,
preload=False
)
self.register_agent(
name="Lampiao",
module_path="src.agents.lampiao",
class_name="LampiaoAgent",
description="Regional analysis and geographic data specialist",
capabilities=["regional_analysis", "spatial_statistics", "inequality_measurement"],
priority=5,
preload=False
)
self.register_agent(
name="Oxossi",
module_path="src.agents.oxossi",
class_name="OxossiAgent",
description="Fraud detection and tracking specialist with precision hunting capabilities",
capabilities=["fraud_detection", "pattern_recognition", "financial_forensics", "evidence_tracking"],
priority=9,
preload=False
)
def register_agent(
self,
name: str,
module_path: str,
class_name: str,
description: str,
capabilities: List[str],
priority: int = 0,
preload: bool = False
):
"""Register an agent for lazy loading."""
metadata = AgentMetadata(
name=name,
module_path=module_path,
class_name=class_name,
description=description,
capabilities=capabilities,
priority=priority,
preload=preload
)
self._registry[name] = metadata
logger.info(
f"Registered agent {name}",
module=module_path,
class_name=class_name,
preload=preload
)
async def start(self):
"""Start the lazy loader and preload critical agents."""
self._running = True
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
# Preload high-priority agents
await self._preload_agents()
logger.info("Agent lazy loader started")
async def stop(self):
"""Stop the lazy loader and cleanup resources."""
self._running = False
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
# Cleanup all loaded agents
for metadata in self._registry.values():
if metadata.loaded_class:
await self._unload_agent(metadata)
logger.info("Agent lazy loader stopped")
async def _preload_agents(self):
"""Preload high-priority agents."""
preload_agents = [
(metadata.priority, name, metadata)
for name, metadata in self._registry.items()
if metadata.preload
]
# Sort by priority (descending)
preload_agents.sort(key=lambda x: x[0], reverse=True)
for _, name, metadata in preload_agents:
try:
await self._load_agent(metadata)
logger.info(f"Preloaded agent {name}")
except Exception as e:
logger.error(f"Failed to preload agent {name}: {e}")
async def get_agent_class(self, name: str) -> Type[BaseAgent]:
"""
Get agent class, loading if necessary.
Args:
name: Agent name
Returns:
Agent class
Raises:
AgentExecutionError: If agent not found or load fails
"""
if name not in self._registry:
raise AgentExecutionError(
f"Agent '{name}' not registered",
details={"available_agents": list(self._registry.keys())}
)
metadata = self._registry[name]
# Return cached class if available
if metadata.loaded_class:
metadata.last_used = datetime.now()
metadata.usage_count += 1
self._stats["cache_hits"] += 1
return metadata.loaded_class
# Load agent class
self._stats["cache_misses"] += 1
await self._load_agent(metadata)
# Check if we need to unload other agents
await self._check_memory_pressure()
return metadata.loaded_class
async def create_agent(self, name: str, **kwargs) -> BaseAgent:
"""
Create an agent instance.
Args:
name: Agent name
**kwargs: Agent initialization arguments
Returns:
Agent instance
"""
agent_class = await self.get_agent_class(name)
# Create instance
instance = agent_class(**kwargs)
# Track instance
self._instances[f"{name}_{id(instance)}"] = instance
# Initialize if needed
if hasattr(instance, 'initialize'):
await instance.initialize()
return instance
async def _load_agent(self, metadata: AgentMetadata):
"""Load an agent class."""
start_time = asyncio.get_event_loop().time()
try:
# Import module
module = importlib.import_module(metadata.module_path)
# Get class
agent_class = getattr(module, metadata.class_name)
# Verify it's a valid agent
if not issubclass(agent_class, BaseAgent):
raise ValueError(f"{metadata.class_name} is not a BaseAgent subclass")
metadata.loaded_class = agent_class
metadata.last_used = datetime.now()
metadata.usage_count += 1
# Calculate load time
load_time = (asyncio.get_event_loop().time() - start_time) * 1000
metadata.load_time_ms = load_time
# Update statistics
self._stats["total_loads"] += 1
total_loads = self._stats["total_loads"]
avg_load_time = self._stats["avg_load_time_ms"]
self._stats["avg_load_time_ms"] = (
(avg_load_time * (total_loads - 1) + load_time) / total_loads
)
logger.info(
f"Loaded agent {metadata.name}",
load_time_ms=round(load_time, 2),
module=metadata.module_path
)
except Exception as e:
logger.error(
f"Failed to load agent {metadata.name}",
error=str(e),
module=metadata.module_path
)
raise AgentExecutionError(
f"Failed to load agent '{metadata.name}'",
details={"error": str(e), "module": metadata.module_path}
)
async def _unload_agent(self, metadata: AgentMetadata):
"""Unload an agent from memory."""
if not metadata.loaded_class:
return
# Check if any instances are still active
active_instances = [
key for key in self._instances
if key.startswith(f"{metadata.name}_")
]
if active_instances:
logger.debug(
f"Cannot unload agent {metadata.name}, {len(active_instances)} instances active"
)
return
# Unload the module
try:
# Remove class reference
metadata.loaded_class = None
# Try to remove from sys.modules
import sys
if metadata.module_path in sys.modules:
del sys.modules[metadata.module_path]
self._stats["total_unloads"] += 1
logger.info(f"Unloaded agent {metadata.name}")
except Exception as e:
logger.error(f"Error unloading agent {metadata.name}: {e}")
async def _check_memory_pressure(self):
"""Check if we need to unload agents to free memory."""
loaded_agents = [
(metadata.last_used, metadata)
for metadata in self._registry.values()
if metadata.loaded_class
]
# If under limit, no action needed
if len(loaded_agents) <= self.max_loaded_agents:
return
# Sort by last used (oldest first)
loaded_agents.sort(key=lambda x: x[0] or datetime.min)
# Unload oldest agents
to_unload = len(loaded_agents) - self.max_loaded_agents
for _, metadata in loaded_agents[:to_unload]:
# Skip preloaded agents
if metadata.preload:
continue
await self._unload_agent(metadata)
async def _cleanup_loop(self):
"""Background task to unload unused agents."""
while self._running:
try:
await asyncio.sleep(60) # Check every minute
await self._cleanup_unused_agents()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")
async def _cleanup_unused_agents(self):
"""Unload agents that haven't been used recently."""
cutoff_time = datetime.now() - timedelta(minutes=self.unload_after_minutes)
for metadata in self._registry.values():
# Skip if not loaded or preloaded
if not metadata.loaded_class or metadata.preload:
continue
# Check last used time
if metadata.last_used and metadata.last_used < cutoff_time:
await self._unload_agent(metadata)
def get_available_agents(self) -> List[Dict[str, Any]]:
"""Get list of available agents."""
agents = []
for name, metadata in self._registry.items():
agents.append({
"name": name,
"description": metadata.description,
"capabilities": metadata.capabilities,
"loaded": metadata.loaded_class is not None,
"usage_count": metadata.usage_count,
"last_used": metadata.last_used.isoformat() if metadata.last_used else None,
"load_time_ms": metadata.load_time_ms,
"priority": metadata.priority,
"preload": metadata.preload
})
# Sort by priority and usage
agents.sort(key=lambda x: (-x["priority"], -x["usage_count"]))
return agents
def get_stats(self) -> Dict[str, Any]:
"""Get lazy loader statistics."""
loaded_count = sum(
1 for m in self._registry.values()
if m.loaded_class
)
return {
"total_agents": len(self._registry),
"loaded_agents": loaded_count,
"active_instances": len(self._instances),
"statistics": self._stats,
"memory_usage": {
"max_loaded_agents": self.max_loaded_agents,
"unload_after_minutes": self.unload_after_minutes
}
}
# Global lazy loader instance
agent_lazy_loader = AgentLazyLoader()
async def get_agent_lazy_loader() -> AgentLazyLoader:
"""Get the global agent lazy loader instance."""
if not agent_lazy_loader._running:
await agent_lazy_loader.start()
return agent_lazy_loader