|
|
""" |
|
|
Lazy Loading Service for AI Agents |
|
|
Optimizes memory usage and startup time by loading agents on-demand |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import importlib |
|
|
import inspect |
|
|
from typing import Dict, Type, Optional, Any, List, Callable |
|
|
from datetime import datetime, timedelta |
|
|
import weakref |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.agents.deodoro import BaseAgent |
|
|
from src.core.exceptions import AgentExecutionError |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class AgentMetadata: |
|
|
"""Metadata for lazy-loaded agents.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
name: str, |
|
|
module_path: str, |
|
|
class_name: str, |
|
|
description: str, |
|
|
capabilities: List[str], |
|
|
priority: int = 0, |
|
|
preload: bool = False |
|
|
): |
|
|
self.name = name |
|
|
self.module_path = module_path |
|
|
self.class_name = class_name |
|
|
self.description = description |
|
|
self.capabilities = capabilities |
|
|
self.priority = priority |
|
|
self.preload = preload |
|
|
self.loaded_class: Optional[Type[BaseAgent]] = None |
|
|
self.last_used: Optional[datetime] = None |
|
|
self.usage_count: int = 0 |
|
|
self.load_time_ms: Optional[float] = None |
|
|
|
|
|
|
|
|
class AgentLazyLoader: |
|
|
""" |
|
|
Lazy loading manager for AI agents. |
|
|
|
|
|
Features: |
|
|
- On-demand agent loading |
|
|
- Memory-efficient agent management |
|
|
- Automatic unloading of unused agents |
|
|
- Preloading for critical agents |
|
|
- Usage tracking and statistics |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
unload_after_minutes: int = 15, |
|
|
max_loaded_agents: int = 10 |
|
|
): |
|
|
""" |
|
|
Initialize lazy loader. |
|
|
|
|
|
Args: |
|
|
unload_after_minutes: Minutes of inactivity before unloading |
|
|
max_loaded_agents: Maximum number of loaded agents in memory |
|
|
""" |
|
|
self.unload_after_minutes = unload_after_minutes |
|
|
self.max_loaded_agents = max_loaded_agents |
|
|
|
|
|
|
|
|
self._registry: Dict[str, AgentMetadata] = {} |
|
|
|
|
|
|
|
|
self._instances: weakref.WeakValueDictionary = weakref.WeakValueDictionary() |
|
|
|
|
|
|
|
|
self._stats = { |
|
|
"total_loads": 0, |
|
|
"cache_hits": 0, |
|
|
"cache_misses": 0, |
|
|
"total_unloads": 0, |
|
|
"avg_load_time_ms": 0.0 |
|
|
} |
|
|
|
|
|
|
|
|
self._cleanup_task: Optional[asyncio.Task] = None |
|
|
self._running = False |
|
|
|
|
|
|
|
|
self._register_default_agents() |
|
|
|
|
|
def _register_default_agents(self): |
|
|
"""Register all available agents.""" |
|
|
|
|
|
self.register_agent( |
|
|
name="Zumbi", |
|
|
module_path="src.agents.zumbi", |
|
|
class_name="ZumbiAgent", |
|
|
description="Anomaly detection investigator", |
|
|
capabilities=["anomaly_detection", "fraud_analysis", "pattern_recognition"], |
|
|
priority=10, |
|
|
preload=True |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="Anita", |
|
|
module_path="src.agents.anita", |
|
|
class_name="AnitaAgent", |
|
|
description="Pattern analysis specialist", |
|
|
capabilities=["pattern_analysis", "trend_detection", "correlation_analysis"], |
|
|
priority=10, |
|
|
preload=True |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="Tiradentes", |
|
|
module_path="src.agents.tiradentes", |
|
|
class_name="TiradentesAgent", |
|
|
description="Natural language report generation", |
|
|
capabilities=["report_generation", "summarization", "natural_language"], |
|
|
priority=10, |
|
|
preload=True |
|
|
) |
|
|
|
|
|
|
|
|
self.register_agent( |
|
|
name="MariaCurie", |
|
|
module_path="src.agents.legacy.mariacurie", |
|
|
class_name="MariaCurieAgent", |
|
|
description="Scientific research specialist", |
|
|
capabilities=["research", "data_analysis", "methodology"], |
|
|
priority=5, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="Drummond", |
|
|
module_path="src.agents.legacy.drummond", |
|
|
class_name="DrummondAgent", |
|
|
description="Communication and writing specialist", |
|
|
capabilities=["writing", "communication", "poetry"], |
|
|
priority=5, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="JoseBonifacio", |
|
|
module_path="src.agents.bonifacio", |
|
|
class_name="BonifacioAgent", |
|
|
description="Policy and governance analyst", |
|
|
capabilities=["policy_analysis", "governance", "regulation"], |
|
|
priority=5, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="MariaQuiteria", |
|
|
module_path="src.agents.maria_quiteria", |
|
|
class_name="MariaQuiteriaAgent", |
|
|
description="Security auditor and system integrity guardian", |
|
|
capabilities=["security_audit", "threat_detection", "compliance"], |
|
|
priority=5, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="Dandara", |
|
|
module_path="src.agents.dandara", |
|
|
class_name="DandaraAgent", |
|
|
description="Social justice and equity monitoring specialist", |
|
|
capabilities=["social_equity", "inclusion_policies", "justice_analysis"], |
|
|
priority=5, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="Machado", |
|
|
module_path="src.agents.machado", |
|
|
class_name="MachadoAgent", |
|
|
description="Government document analysis and text processing", |
|
|
capabilities=["text_analysis", "document_processing", "nlp"], |
|
|
priority=5, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="Obaluaie", |
|
|
module_path="src.agents.obaluaie", |
|
|
class_name="CorruptionDetectorAgent", |
|
|
description="Corruption detection and systemic pattern analysis", |
|
|
capabilities=["corruption_detection", "fraud_detection", "systemic_analysis"], |
|
|
priority=8, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="OscarNiemeyer", |
|
|
module_path="src.agents.oscar_niemeyer", |
|
|
class_name="OscarNiemeyerAgent", |
|
|
description="Data aggregation and visualization metadata specialist", |
|
|
capabilities=["data_aggregation", "visualization", "multidimensional_analysis"], |
|
|
priority=5, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="Lampiao", |
|
|
module_path="src.agents.lampiao", |
|
|
class_name="LampiaoAgent", |
|
|
description="Regional analysis and geographic data specialist", |
|
|
capabilities=["regional_analysis", "spatial_statistics", "inequality_measurement"], |
|
|
priority=5, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
self.register_agent( |
|
|
name="Oxossi", |
|
|
module_path="src.agents.oxossi", |
|
|
class_name="OxossiAgent", |
|
|
description="Fraud detection and tracking specialist with precision hunting capabilities", |
|
|
capabilities=["fraud_detection", "pattern_recognition", "financial_forensics", "evidence_tracking"], |
|
|
priority=9, |
|
|
preload=False |
|
|
) |
|
|
|
|
|
def register_agent( |
|
|
self, |
|
|
name: str, |
|
|
module_path: str, |
|
|
class_name: str, |
|
|
description: str, |
|
|
capabilities: List[str], |
|
|
priority: int = 0, |
|
|
preload: bool = False |
|
|
): |
|
|
"""Register an agent for lazy loading.""" |
|
|
metadata = AgentMetadata( |
|
|
name=name, |
|
|
module_path=module_path, |
|
|
class_name=class_name, |
|
|
description=description, |
|
|
capabilities=capabilities, |
|
|
priority=priority, |
|
|
preload=preload |
|
|
) |
|
|
|
|
|
self._registry[name] = metadata |
|
|
logger.info( |
|
|
f"Registered agent {name}", |
|
|
module=module_path, |
|
|
class_name=class_name, |
|
|
preload=preload |
|
|
) |
|
|
|
|
|
async def start(self): |
|
|
"""Start the lazy loader and preload critical agents.""" |
|
|
self._running = True |
|
|
self._cleanup_task = asyncio.create_task(self._cleanup_loop()) |
|
|
|
|
|
|
|
|
await self._preload_agents() |
|
|
|
|
|
logger.info("Agent lazy loader started") |
|
|
|
|
|
async def stop(self): |
|
|
"""Stop the lazy loader and cleanup resources.""" |
|
|
self._running = False |
|
|
|
|
|
if self._cleanup_task: |
|
|
self._cleanup_task.cancel() |
|
|
try: |
|
|
await self._cleanup_task |
|
|
except asyncio.CancelledError: |
|
|
pass |
|
|
|
|
|
|
|
|
for metadata in self._registry.values(): |
|
|
if metadata.loaded_class: |
|
|
await self._unload_agent(metadata) |
|
|
|
|
|
logger.info("Agent lazy loader stopped") |
|
|
|
|
|
async def _preload_agents(self): |
|
|
"""Preload high-priority agents.""" |
|
|
preload_agents = [ |
|
|
(metadata.priority, name, metadata) |
|
|
for name, metadata in self._registry.items() |
|
|
if metadata.preload |
|
|
] |
|
|
|
|
|
|
|
|
preload_agents.sort(key=lambda x: x[0], reverse=True) |
|
|
|
|
|
for _, name, metadata in preload_agents: |
|
|
try: |
|
|
await self._load_agent(metadata) |
|
|
logger.info(f"Preloaded agent {name}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to preload agent {name}: {e}") |
|
|
|
|
|
async def get_agent_class(self, name: str) -> Type[BaseAgent]: |
|
|
""" |
|
|
Get agent class, loading if necessary. |
|
|
|
|
|
Args: |
|
|
name: Agent name |
|
|
|
|
|
Returns: |
|
|
Agent class |
|
|
|
|
|
Raises: |
|
|
AgentExecutionError: If agent not found or load fails |
|
|
""" |
|
|
if name not in self._registry: |
|
|
raise AgentExecutionError( |
|
|
f"Agent '{name}' not registered", |
|
|
details={"available_agents": list(self._registry.keys())} |
|
|
) |
|
|
|
|
|
metadata = self._registry[name] |
|
|
|
|
|
|
|
|
if metadata.loaded_class: |
|
|
metadata.last_used = datetime.now() |
|
|
metadata.usage_count += 1 |
|
|
self._stats["cache_hits"] += 1 |
|
|
return metadata.loaded_class |
|
|
|
|
|
|
|
|
self._stats["cache_misses"] += 1 |
|
|
await self._load_agent(metadata) |
|
|
|
|
|
|
|
|
await self._check_memory_pressure() |
|
|
|
|
|
return metadata.loaded_class |
|
|
|
|
|
async def create_agent(self, name: str, **kwargs) -> BaseAgent: |
|
|
""" |
|
|
Create an agent instance. |
|
|
|
|
|
Args: |
|
|
name: Agent name |
|
|
**kwargs: Agent initialization arguments |
|
|
|
|
|
Returns: |
|
|
Agent instance |
|
|
""" |
|
|
agent_class = await self.get_agent_class(name) |
|
|
|
|
|
|
|
|
instance = agent_class(**kwargs) |
|
|
|
|
|
|
|
|
self._instances[f"{name}_{id(instance)}"] = instance |
|
|
|
|
|
|
|
|
if hasattr(instance, 'initialize'): |
|
|
await instance.initialize() |
|
|
|
|
|
return instance |
|
|
|
|
|
async def _load_agent(self, metadata: AgentMetadata): |
|
|
"""Load an agent class.""" |
|
|
start_time = asyncio.get_event_loop().time() |
|
|
|
|
|
try: |
|
|
|
|
|
module = importlib.import_module(metadata.module_path) |
|
|
|
|
|
|
|
|
agent_class = getattr(module, metadata.class_name) |
|
|
|
|
|
|
|
|
if not issubclass(agent_class, BaseAgent): |
|
|
raise ValueError(f"{metadata.class_name} is not a BaseAgent subclass") |
|
|
|
|
|
metadata.loaded_class = agent_class |
|
|
metadata.last_used = datetime.now() |
|
|
metadata.usage_count += 1 |
|
|
|
|
|
|
|
|
load_time = (asyncio.get_event_loop().time() - start_time) * 1000 |
|
|
metadata.load_time_ms = load_time |
|
|
|
|
|
|
|
|
self._stats["total_loads"] += 1 |
|
|
total_loads = self._stats["total_loads"] |
|
|
avg_load_time = self._stats["avg_load_time_ms"] |
|
|
self._stats["avg_load_time_ms"] = ( |
|
|
(avg_load_time * (total_loads - 1) + load_time) / total_loads |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"Loaded agent {metadata.name}", |
|
|
load_time_ms=round(load_time, 2), |
|
|
module=metadata.module_path |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
f"Failed to load agent {metadata.name}", |
|
|
error=str(e), |
|
|
module=metadata.module_path |
|
|
) |
|
|
raise AgentExecutionError( |
|
|
f"Failed to load agent '{metadata.name}'", |
|
|
details={"error": str(e), "module": metadata.module_path} |
|
|
) |
|
|
|
|
|
async def _unload_agent(self, metadata: AgentMetadata): |
|
|
"""Unload an agent from memory.""" |
|
|
if not metadata.loaded_class: |
|
|
return |
|
|
|
|
|
|
|
|
active_instances = [ |
|
|
key for key in self._instances |
|
|
if key.startswith(f"{metadata.name}_") |
|
|
] |
|
|
|
|
|
if active_instances: |
|
|
logger.debug( |
|
|
f"Cannot unload agent {metadata.name}, {len(active_instances)} instances active" |
|
|
) |
|
|
return |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
metadata.loaded_class = None |
|
|
|
|
|
|
|
|
import sys |
|
|
if metadata.module_path in sys.modules: |
|
|
del sys.modules[metadata.module_path] |
|
|
|
|
|
self._stats["total_unloads"] += 1 |
|
|
|
|
|
logger.info(f"Unloaded agent {metadata.name}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error unloading agent {metadata.name}: {e}") |
|
|
|
|
|
async def _check_memory_pressure(self): |
|
|
"""Check if we need to unload agents to free memory.""" |
|
|
loaded_agents = [ |
|
|
(metadata.last_used, metadata) |
|
|
for metadata in self._registry.values() |
|
|
if metadata.loaded_class |
|
|
] |
|
|
|
|
|
|
|
|
if len(loaded_agents) <= self.max_loaded_agents: |
|
|
return |
|
|
|
|
|
|
|
|
loaded_agents.sort(key=lambda x: x[0] or datetime.min) |
|
|
|
|
|
|
|
|
to_unload = len(loaded_agents) - self.max_loaded_agents |
|
|
for _, metadata in loaded_agents[:to_unload]: |
|
|
|
|
|
if metadata.preload: |
|
|
continue |
|
|
|
|
|
await self._unload_agent(metadata) |
|
|
|
|
|
async def _cleanup_loop(self): |
|
|
"""Background task to unload unused agents.""" |
|
|
while self._running: |
|
|
try: |
|
|
await asyncio.sleep(60) |
|
|
await self._cleanup_unused_agents() |
|
|
except asyncio.CancelledError: |
|
|
break |
|
|
except Exception as e: |
|
|
logger.error(f"Error in cleanup loop: {e}") |
|
|
|
|
|
async def _cleanup_unused_agents(self): |
|
|
"""Unload agents that haven't been used recently.""" |
|
|
cutoff_time = datetime.now() - timedelta(minutes=self.unload_after_minutes) |
|
|
|
|
|
for metadata in self._registry.values(): |
|
|
|
|
|
if not metadata.loaded_class or metadata.preload: |
|
|
continue |
|
|
|
|
|
|
|
|
if metadata.last_used and metadata.last_used < cutoff_time: |
|
|
await self._unload_agent(metadata) |
|
|
|
|
|
def get_available_agents(self) -> List[Dict[str, Any]]: |
|
|
"""Get list of available agents.""" |
|
|
agents = [] |
|
|
|
|
|
for name, metadata in self._registry.items(): |
|
|
agents.append({ |
|
|
"name": name, |
|
|
"description": metadata.description, |
|
|
"capabilities": metadata.capabilities, |
|
|
"loaded": metadata.loaded_class is not None, |
|
|
"usage_count": metadata.usage_count, |
|
|
"last_used": metadata.last_used.isoformat() if metadata.last_used else None, |
|
|
"load_time_ms": metadata.load_time_ms, |
|
|
"priority": metadata.priority, |
|
|
"preload": metadata.preload |
|
|
}) |
|
|
|
|
|
|
|
|
agents.sort(key=lambda x: (-x["priority"], -x["usage_count"])) |
|
|
|
|
|
return agents |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Get lazy loader statistics.""" |
|
|
loaded_count = sum( |
|
|
1 for m in self._registry.values() |
|
|
if m.loaded_class |
|
|
) |
|
|
|
|
|
return { |
|
|
"total_agents": len(self._registry), |
|
|
"loaded_agents": loaded_count, |
|
|
"active_instances": len(self._instances), |
|
|
"statistics": self._stats, |
|
|
"memory_usage": { |
|
|
"max_loaded_agents": self.max_loaded_agents, |
|
|
"unload_after_minutes": self.unload_after_minutes |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
agent_lazy_loader = AgentLazyLoader() |
|
|
|
|
|
|
|
|
async def get_agent_lazy_loader() -> AgentLazyLoader: |
|
|
"""Get the global agent lazy loader instance.""" |
|
|
if not agent_lazy_loader._running: |
|
|
await agent_lazy_loader.start() |
|
|
return agent_lazy_loader |