Spaces:
Running
Running
| """Hierarchical Orchestrator using middleware and sub-teams. | |
| This orchestrator implements a hierarchical team pattern where sub-teams | |
| can be composed and coordinated through middleware. It provides more | |
| granular control over the research workflow compared to the simple | |
| orchestrator. | |
| Design Patterns: | |
| - Composite: Teams can contain sub-teams | |
| - Chain of Responsibility: Middleware processes requests in sequence | |
| - Template Method: SubIterationMiddleware defines the iteration skeleton | |
| """ | |
| import asyncio | |
| from collections.abc import AsyncGenerator | |
| import structlog | |
| from src.agents.judge_agent_llm import LLMSubIterationJudge | |
| from src.agents.magentic_agents import create_search_agent | |
| from src.config.domain import ResearchDomain | |
| from src.middleware.sub_iteration import SubIterationMiddleware, SubIterationTeam | |
| from src.orchestrators.base import OrchestratorProtocol | |
| from src.state import init_magentic_state | |
| from src.utils.models import AgentEvent, OrchestratorConfig | |
| from src.utils.service_loader import get_embedding_service_if_available | |
| logger = structlog.get_logger() | |
| # Default timeout for hierarchical orchestrator (5 minutes) | |
| DEFAULT_TIMEOUT_SECONDS = 300.0 | |
| class ResearchTeam(SubIterationTeam): | |
| """Adapts ChatAgent to SubIterationTeam protocol. | |
| This adapter allows the search agent to be used within the | |
| sub-iteration middleware framework. | |
| """ | |
| def __init__(self, domain: ResearchDomain | str | None = None) -> None: | |
| self.agent = create_search_agent(domain=domain) | |
| async def execute(self, task: str) -> str: | |
| """Execute a research task. | |
| Args: | |
| task: The research task description | |
| Returns: | |
| Text response from the agent | |
| """ | |
| response = await self.agent.run(task) | |
| if response.messages: | |
| for msg in reversed(response.messages): | |
| if msg.role == "assistant" and msg.text: | |
| return str(msg.text) | |
| return "No response from agent." | |
| class HierarchicalOrchestrator(OrchestratorProtocol): | |
| """Orchestrator that uses hierarchical teams and sub-iterations. | |
| This orchestrator provides: | |
| - Sub-iteration middleware for fine-grained control | |
| - LLM-based judge for sub-iteration decisions | |
| - Event-driven architecture for UI updates | |
| - Configurable iterations and timeout | |
| """ | |
| def __init__( | |
| self, | |
| config: OrchestratorConfig | None = None, | |
| timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS, | |
| domain: ResearchDomain | str | None = None, | |
| ) -> None: | |
| """Initialize the hierarchical orchestrator. | |
| Args: | |
| config: Optional configuration (uses defaults if not provided) | |
| timeout_seconds: Maximum workflow duration (default: 5 minutes) | |
| domain: Research domain for customization | |
| """ | |
| self.config = config or OrchestratorConfig() | |
| self._timeout_seconds = timeout_seconds | |
| self.domain = domain | |
| self.team = ResearchTeam(domain=domain) | |
| self.judge = LLMSubIterationJudge() | |
| self.middleware = SubIterationMiddleware( | |
| self.team, self.judge, max_iterations=self.config.max_iterations | |
| ) | |
| async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: | |
| """Run the hierarchical workflow. | |
| Args: | |
| query: User's research question | |
| Yields: | |
| AgentEvent objects for real-time UI updates | |
| """ | |
| logger.info("Starting hierarchical orchestrator", query=query) | |
| service = get_embedding_service_if_available() | |
| init_magentic_state(query, service) | |
| yield AgentEvent(type="started", message=f"Starting research: {query}") | |
| queue: asyncio.Queue[AgentEvent | None] = asyncio.Queue() | |
| async def event_callback(event: AgentEvent) -> None: | |
| await queue.put(event) | |
| try: | |
| async with asyncio.timeout(self._timeout_seconds): | |
| task_future = asyncio.create_task(self.middleware.run(query, event_callback)) | |
| while not task_future.done(): | |
| get_event = asyncio.create_task(queue.get()) | |
| done, _ = await asyncio.wait( | |
| {task_future, get_event}, return_when=asyncio.FIRST_COMPLETED | |
| ) | |
| if get_event in done: | |
| event = get_event.result() | |
| if event: | |
| yield event | |
| else: | |
| get_event.cancel() | |
| # Process remaining events | |
| while not queue.empty(): | |
| ev = queue.get_nowait() | |
| if ev: | |
| yield ev | |
| result, assessment = await task_future | |
| assessment_text = assessment.reasoning if assessment else "None" | |
| yield AgentEvent( | |
| type="complete", | |
| message=( | |
| f"Research complete.\n\nResult:\n{result}\n\nAssessment:\n{assessment_text}" | |
| ), | |
| data={"assessment": assessment.model_dump() if assessment else None}, | |
| ) | |
| except TimeoutError: | |
| logger.warning("Hierarchical workflow timed out", query=query) | |
| yield AgentEvent( | |
| type="complete", | |
| message="Research timed out. Results may be incomplete.", | |
| data={"reason": "timeout"}, | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| "Orchestrator failed", | |
| error=str(e), | |
| error_type=type(e).__name__, | |
| ) | |
| yield AgentEvent(type="error", message=f"Orchestrator failed: {e}") | |