anderson-ufrj
feat: implement comprehensive memory integration system for all agents
426ed22
"""
Agent pooling system for improved performance.
This module provides a pool of pre-initialized agents that can be
reused across requests, avoiding the overhead of creating new instances.
"""
import asyncio
from typing import Dict, Type, Optional, Any, List
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
import weakref
from src.core import get_logger
from src.agents.deodoro import BaseAgent, AgentContext
logger = get_logger(__name__)
class AgentPoolEntry:
"""Entry in the agent pool."""
def __init__(self, agent: BaseAgent):
self.agent = agent
self.in_use = False
self.last_used = datetime.now()
self.usage_count = 0
self.created_at = datetime.now()
self._lock = asyncio.Lock()
@property
def idle_time(self) -> float:
"""Get idle time in seconds."""
return (datetime.now() - self.last_used).total_seconds()
async def acquire(self) -> BaseAgent:
"""Acquire the agent for use."""
async with self._lock:
if self.in_use:
raise RuntimeError("Agent already in use")
self.in_use = True
self.usage_count += 1
return self.agent
async def release(self):
"""Release the agent back to pool."""
async with self._lock:
self.in_use = False
self.last_used = datetime.now()
class AgentPool:
"""
Pool manager for AI agents.
Features:
- Pre-warmed agent instances
- Automatic cleanup of idle agents
- Usage statistics and monitoring
- Thread-safe operations
"""
def __init__(
self,
min_size: int = 2,
max_size: int = 10,
idle_timeout: int = 300, # 5 minutes
max_agent_lifetime: int = 3600, # 1 hour
use_lazy_loading: bool = True
):
"""
Initialize agent pool.
Args:
min_size: Minimum pool size per agent type
max_size: Maximum pool size per agent type
idle_timeout: Seconds before removing idle agents
max_agent_lifetime: Maximum agent lifetime in seconds
use_lazy_loading: Enable lazy loading for agents
"""
self.min_size = min_size
self.max_size = max_size
self.idle_timeout = idle_timeout
self.max_agent_lifetime = max_agent_lifetime
self._use_lazy_loading = use_lazy_loading
# Pool storage: agent_type -> list of entries
self._pools: Dict[Type[BaseAgent], List[AgentPoolEntry]] = {}
# Weak references to track all created agents
self._all_agents: weakref.WeakSet = weakref.WeakSet()
# Statistics
self._stats = {
"created": 0,
"reused": 0,
"evicted": 0,
"errors": 0,
"lazy_loaded": 0
}
# Cleanup task
self._cleanup_task: Optional[asyncio.Task] = None
self._running = False
async def start(self):
"""Start the agent pool and cleanup task."""
self._running = True
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
# Initialize lazy loader if enabled
if self._use_lazy_loading:
from src.services.agent_lazy_loader import agent_lazy_loader
await agent_lazy_loader.start()
logger.info("Agent pool started", lazy_loading=self._use_lazy_loading)
async def stop(self):
"""Stop the agent pool and cleanup resources."""
self._running = False
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
# Cleanup all agents
for agent_type, entries in self._pools.items():
for entry in entries:
try:
if hasattr(entry.agent, 'cleanup'):
await entry.agent.cleanup()
except Exception as e:
logger.error(f"Error cleaning up agent: {e}")
self._pools.clear()
# Stop lazy loader if enabled
if self._use_lazy_loading:
from src.services.agent_lazy_loader import agent_lazy_loader
await agent_lazy_loader.stop()
logger.info("Agent pool stopped")
@asynccontextmanager
async def acquire(self, agent_type: Type[BaseAgent], context: AgentContext):
"""
Acquire an agent from the pool.
Args:
agent_type: Type of agent to acquire
context: Agent execution context
Yields:
Agent instance
"""
entry = await self._get_or_create_agent(agent_type)
agent = await entry.acquire()
try:
# Update agent context
agent.context = context
yield agent
finally:
# Clear sensitive data
agent.context = None
await entry.release()
async def _get_or_create_agent(self, agent_type: Type[BaseAgent]) -> AgentPoolEntry:
"""Get an available agent or create a new one."""
# Initialize pool for agent type if needed
if agent_type not in self._pools:
self._pools[agent_type] = []
pool = self._pools[agent_type]
# Find available agent
for entry in pool:
if not entry.in_use:
self._stats["reused"] += 1
logger.debug(f"Reusing agent {agent_type.__name__} from pool")
return entry
# Create new agent if under limit
if len(pool) < self.max_size:
agent = await self._create_agent(agent_type)
entry = AgentPoolEntry(agent)
pool.append(entry)
self._stats["created"] += 1
logger.info(f"Created new agent {agent_type.__name__} (pool size: {len(pool)})")
return entry
# Wait for available agent
logger.warning(f"Agent pool full for {agent_type.__name__}, waiting...")
while True:
await asyncio.sleep(0.1)
for entry in pool:
if not entry.in_use:
return entry
async def _create_agent(self, agent_type: Type[BaseAgent]) -> BaseAgent:
"""Create and initialize a new agent."""
try:
# Check if we should use lazy loader
if hasattr(agent_type, '__name__') and self._use_lazy_loading:
# Try to get from lazy loader
from src.services.agent_lazy_loader import agent_lazy_loader
agent_name = agent_type.__name__.replace('Agent', '')
try:
# Use lazy loader to create agent
agent = await agent_lazy_loader.create_agent(agent_name)
self._all_agents.add(agent)
self._stats["lazy_loaded"] += 1
logger.debug(f"Created agent {agent_name} using lazy loader")
except Exception:
# Fallback to direct instantiation
logger.debug(f"Lazy loader failed for {agent_name}, using direct instantiation")
agent = agent_type()
self._all_agents.add(agent)
else:
# Direct instantiation
agent = agent_type()
self._all_agents.add(agent)
# Initialize if needed
if hasattr(agent, 'initialize'):
await agent.initialize()
# Integrate with memory system if available
await self._integrate_agent_memory(agent)
return agent
except Exception as e:
self._stats["errors"] += 1
logger.error(f"Failed to create agent {agent_type.__name__}: {e}")
raise
async def _cleanup_loop(self):
"""Background task to cleanup idle agents."""
while self._running:
try:
await asyncio.sleep(30) # Check every 30 seconds
await self._cleanup_idle_agents()
await self._maintain_minimum_pool()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")
async def _cleanup_idle_agents(self):
"""Remove agents that have been idle too long."""
for agent_type, pool in self._pools.items():
to_remove = []
for entry in pool:
# Check idle timeout
if not entry.in_use and entry.idle_time > self.idle_timeout:
# Keep minimum pool size
active_count = sum(1 for e in pool if not e.in_use)
if active_count > self.min_size:
to_remove.append(entry)
# Check lifetime
lifetime = (datetime.now() - entry.created_at).total_seconds()
if lifetime > self.max_agent_lifetime:
to_remove.append(entry)
# Remove identified agents
for entry in to_remove:
if entry.in_use:
continue # Skip if now in use
pool.remove(entry)
self._stats["evicted"] += 1
try:
if hasattr(entry.agent, 'cleanup'):
await entry.agent.cleanup()
except Exception as e:
logger.error(f"Error cleaning up agent: {e}")
logger.debug(f"Evicted idle agent {agent_type.__name__}")
async def _integrate_agent_memory(self, agent: BaseAgent):
"""Integrate agent with memory system if available."""
try:
from src.services.agent_memory_integration import get_memory_integration
memory_integration = get_memory_integration()
if memory_integration:
await memory_integration.integrate_agent(agent)
logger.info(f"Integrated {agent.agent_id} with memory system")
else:
logger.debug("Memory integration not available")
except ImportError:
logger.debug("Memory integration module not available")
except Exception as e:
logger.error(f"Failed to integrate agent with memory: {e}")
async def _maintain_minimum_pool(self):
"""Ensure minimum pool size for each agent type."""
for agent_type, pool in self._pools.items():
available = sum(1 for e in pool if not e.in_use)
# Create agents to maintain minimum
while available < self.min_size and len(pool) < self.max_size:
try:
agent = await self._create_agent(agent_type)
entry = AgentPoolEntry(agent)
pool.append(entry)
available += 1
logger.debug(f"Pre-warmed agent {agent_type.__name__}")
except Exception as e:
logger.error(f"Failed to maintain pool: {e}")
break
async def prewarm(self, agent_types: List[Type[BaseAgent]]):
"""Pre-warm the pool with specified agent types."""
for agent_type in agent_types:
if agent_type not in self._pools:
self._pools[agent_type] = []
# Create minimum agents
pool = self._pools[agent_type]
while len(pool) < self.min_size:
try:
agent = await self._create_agent(agent_type)
entry = AgentPoolEntry(agent)
pool.append(entry)
logger.info(f"Pre-warmed {agent_type.__name__} agent")
except Exception as e:
logger.error(f"Failed to prewarm {agent_type.__name__}: {e}")
break
def get_stats(self) -> Dict[str, Any]:
"""Get pool statistics."""
pool_stats = {}
for agent_type, pool in self._pools.items():
pool_stats[agent_type.__name__] = {
"total": len(pool),
"in_use": sum(1 for e in pool if e.in_use),
"available": sum(1 for e in pool if not e.in_use),
"avg_usage": sum(e.usage_count for e in pool) / len(pool) if pool else 0
}
return {
"pools": pool_stats,
"global_stats": self._stats,
"total_agents": sum(len(p) for p in self._pools.values())
}
# Global agent pool instance
agent_pool = AgentPool()
async def get_agent_pool() -> AgentPool:
"""Get the global agent pool instance."""
if not agent_pool._running:
await agent_pool.start()
return agent_pool