Spaces:
Running
Running
| """Advanced Orchestrator using Microsoft Agent Framework. | |
| This orchestrator uses the ChatAgent pattern from Microsoft's agent-framework-core | |
| package for multi-agent coordination. It provides richer orchestration capabilities | |
| including specialized agents (Search, Hypothesis, Judge, Report) coordinated by | |
| a manager agent. | |
| Note: Previously named 'orchestrator_magentic.py' - renamed to eliminate confusion | |
| with the 'magentic' PyPI package (which is a different library). | |
| Design Patterns: | |
| - Mediator: Manager agent coordinates between specialized agents | |
| - Strategy: Different agents implement different strategies for their tasks | |
| - Observer: Event stream allows UI to observe progress | |
| """ | |
| import asyncio | |
| from collections.abc import AsyncGenerator | |
| from typing import TYPE_CHECKING, Any | |
| import structlog | |
| from agent_framework import ( | |
| MagenticAgentDeltaEvent, | |
| MagenticAgentMessageEvent, | |
| MagenticBuilder, | |
| MagenticFinalResultEvent, | |
| MagenticOrchestratorMessageEvent, | |
| WorkflowOutputEvent, | |
| ) | |
| from agent_framework.openai import OpenAIChatClient | |
| from src.agents.magentic_agents import ( | |
| create_hypothesis_agent, | |
| create_judge_agent, | |
| create_report_agent, | |
| create_search_agent, | |
| ) | |
| from src.agents.state import init_magentic_state | |
| from src.config.domain import ResearchDomain, get_domain_config | |
| from src.orchestrators.base import OrchestratorProtocol | |
| from src.utils.config import settings | |
| from src.utils.llm_factory import check_magentic_requirements | |
| from src.utils.models import AgentEvent | |
| from src.utils.service_loader import get_embedding_service_if_available | |
| if TYPE_CHECKING: | |
| from src.services.embedding_protocol import EmbeddingServiceProtocol | |
| logger = structlog.get_logger() | |
| class AdvancedOrchestrator(OrchestratorProtocol): | |
| """ | |
| Advanced orchestrator using Microsoft Agent Framework ChatAgent pattern. | |
| Each agent has an internal LLM that understands natural language | |
| instructions from the manager and can call tools appropriately. | |
| This orchestrator provides: | |
| - Multi-agent coordination (Search, Hypothesis, Judge, Report) | |
| - Manager agent for workflow orchestration | |
| - Streaming events for real-time UI updates | |
| - Configurable timeouts and round limits | |
| """ | |
| def __init__( | |
| self, | |
| max_rounds: int | None = None, | |
| chat_client: OpenAIChatClient | None = None, | |
| api_key: str | None = None, | |
| timeout_seconds: float = 300.0, | |
| domain: ResearchDomain | str | None = None, | |
| ) -> None: | |
| """Initialize orchestrator. | |
| Args: | |
| max_rounds: Maximum coordination rounds | |
| chat_client: Optional shared chat client for agents | |
| api_key: Optional OpenAI API key (for BYOK) | |
| timeout_seconds: Maximum workflow duration (default: 5 minutes) | |
| domain: Research domain for customization | |
| """ | |
| # Validate requirements only if no key provided | |
| if not chat_client and not api_key: | |
| check_magentic_requirements() | |
| # Use pydantic-validated settings (fails fast on invalid config) | |
| self._max_rounds = max_rounds if max_rounds is not None else settings.advanced_max_rounds | |
| self._timeout_seconds = ( | |
| timeout_seconds if timeout_seconds != 300.0 else settings.advanced_timeout | |
| ) | |
| self.domain = domain | |
| self.domain_config = get_domain_config(domain) | |
| self._chat_client: OpenAIChatClient | None | |
| if chat_client: | |
| self._chat_client = chat_client | |
| elif api_key: | |
| # Create client with user provided key | |
| self._chat_client = OpenAIChatClient( | |
| model_id=settings.openai_model, | |
| api_key=api_key, | |
| ) | |
| else: | |
| # Fallback to env vars (will fail later if requirements check wasn't run/passed) | |
| self._chat_client = None | |
| def _init_embedding_service(self) -> "EmbeddingServiceProtocol | None": | |
| """Initialize embedding service if available.""" | |
| return get_embedding_service_if_available() | |
| def _build_workflow(self) -> Any: | |
| """Build the workflow with ChatAgent participants.""" | |
| # Create agents with internal LLMs | |
| search_agent = create_search_agent(self._chat_client, domain=self.domain) | |
| judge_agent = create_judge_agent(self._chat_client, domain=self.domain) | |
| hypothesis_agent = create_hypothesis_agent(self._chat_client, domain=self.domain) | |
| report_agent = create_report_agent(self._chat_client, domain=self.domain) | |
| # Manager chat client (orchestrates the agents) | |
| manager_client = self._chat_client or OpenAIChatClient( | |
| model_id=settings.openai_model, # Use configured model | |
| api_key=settings.openai_api_key, | |
| ) | |
| return ( | |
| MagenticBuilder() | |
| .participants( | |
| searcher=search_agent, | |
| hypothesizer=hypothesis_agent, | |
| judge=judge_agent, | |
| reporter=report_agent, | |
| ) | |
| .with_standard_manager( | |
| chat_client=manager_client, | |
| max_round_count=self._max_rounds, | |
| max_stall_count=3, | |
| max_reset_count=2, | |
| ) | |
| .build() | |
| ) | |
| async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: | |
| """ | |
| Run the workflow. | |
| Args: | |
| query: User's research question | |
| Yields: | |
| AgentEvent objects for real-time UI updates | |
| """ | |
| logger.info("Starting Advanced orchestrator", query=query) | |
| yield AgentEvent( | |
| type="started", | |
| message=f"Starting research (Advanced mode): {query}", | |
| iteration=0, | |
| ) | |
| # Initialize context state | |
| embedding_service = self._init_embedding_service() | |
| init_magentic_state(query, embedding_service) | |
| workflow = self._build_workflow() | |
| task = f"""Research {self.domain_config.report_focus} for: {query} | |
| ## CRITICAL RULE | |
| When JudgeAgent says "SUFFICIENT EVIDENCE" or "STOP SEARCHING": | |
| β IMMEDIATELY delegate to ReportAgent for synthesis | |
| β Do NOT continue searching or gathering more evidence | |
| β The Judge has determined evidence quality is adequate | |
| ## Standard Workflow | |
| 1. SearchAgent: Find evidence from PubMed, ClinicalTrials.gov, and Europe PMC | |
| 2. HypothesisAgent: Generate mechanistic hypotheses (Drug -> Target -> Pathway -> Effect) | |
| 3. JudgeAgent: Evaluate if evidence is sufficient | |
| 4. If insufficient -> SearchAgent refines search based on gaps | |
| 5. If sufficient -> ReportAgent synthesizes final report | |
| Focus on: | |
| - Identifying specific molecular targets | |
| - Understanding mechanism of action | |
| - Finding clinical evidence supporting hypotheses | |
| The final output should be a structured research report.""" | |
| # UX FIX: Yield thinking state before blocking workflow call | |
| # The workflow.run_stream() blocks for 2+ minutes on first LLM call | |
| yield AgentEvent( | |
| type="thinking", | |
| message=( | |
| f"Multi-agent reasoning in progress ({self._max_rounds} rounds max)... " | |
| f"Estimated time: {self._max_rounds * 45 // 60}-" | |
| f"{self._max_rounds * 60 // 60} minutes." | |
| ), | |
| iteration=0, | |
| ) | |
| iteration = 0 | |
| final_event_received = False | |
| try: | |
| async with asyncio.timeout(self._timeout_seconds): | |
| async for event in workflow.run_stream(task): | |
| agent_event = self._process_event(event, iteration) | |
| if agent_event: | |
| if isinstance(event, MagenticAgentMessageEvent): | |
| iteration += 1 | |
| # Progress estimation (clamp to avoid negative values) | |
| rounds_remaining = max(self._max_rounds - iteration, 0) | |
| est_seconds = rounds_remaining * 45 | |
| if est_seconds >= 60: | |
| est_display = f"{est_seconds // 60}m {est_seconds % 60}s" | |
| else: | |
| est_display = f"{est_seconds}s" | |
| progress_msg = ( | |
| f"Round {iteration}/{self._max_rounds} (~{est_display} remaining)" | |
| ) | |
| # Yield progress update before the agent action | |
| yield AgentEvent( | |
| type="progress", | |
| message=progress_msg, | |
| iteration=iteration, | |
| ) | |
| if agent_event.type == "complete": | |
| final_event_received = True | |
| yield agent_event | |
| # GUARANTEE: Always emit termination event if stream ends without one | |
| # (e.g., max rounds reached) | |
| if not final_event_received: | |
| logger.warning( | |
| "Workflow ended without final event", | |
| iterations=iteration, | |
| ) | |
| yield AgentEvent( | |
| type="complete", | |
| message=( | |
| f"Research completed after {iteration} agent rounds. " | |
| "Max iterations reached - results may be partial. " | |
| "Try a more specific query for better results." | |
| ), | |
| data={"iterations": iteration, "reason": "max_rounds_reached"}, | |
| iteration=iteration, | |
| ) | |
| except TimeoutError: | |
| logger.warning("Workflow timed out", iterations=iteration) | |
| yield AgentEvent( | |
| type="complete", | |
| message="Research timed out. Synthesizing available evidence...", | |
| data={"reason": "timeout", "iterations": iteration}, | |
| iteration=iteration, | |
| ) | |
| except Exception as e: | |
| logger.error("Workflow failed", error=str(e)) | |
| yield AgentEvent( | |
| type="error", | |
| message=f"Workflow error: {e!s}", | |
| iteration=iteration, | |
| ) | |
| def _extract_text(self, message: Any) -> str: | |
| """ | |
| Defensively extract text from a message object. | |
| Fixes bug where message.text might return the object itself or its repr. | |
| """ | |
| if not message: | |
| return "" | |
| # Priority 1: .content (often the raw string or list of content) | |
| if hasattr(message, "content") and message.content: | |
| content = message.content | |
| # If it's a list (e.g., Multi-modal), join text parts | |
| if isinstance(content, list): | |
| return " ".join([str(c.text) for c in content if hasattr(c, "text")]) | |
| return str(content) | |
| # Priority 2: .text (standard, but sometimes buggy/missing) | |
| if hasattr(message, "text") and message.text: | |
| # Verify it's not the object itself or a repr string | |
| text = str(message.text) | |
| if text.startswith("<") and "object at" in text: | |
| # Likely a repr string, ignore if possible | |
| pass | |
| else: | |
| return text | |
| # Fallback: If we can't find clean text, return str(message) | |
| # taking care to avoid infinite recursion if str() calls .text | |
| return str(message) | |
| def _get_event_type_for_agent(self, agent_name: str) -> str: | |
| """Map agent name to appropriate event type. | |
| Args: | |
| agent_name: The agent ID from the workflow event | |
| Returns: | |
| Event type string matching AgentEvent.type Literal | |
| """ | |
| agent_lower = agent_name.lower() | |
| if "search" in agent_lower: | |
| return "search_complete" | |
| if "judge" in agent_lower: | |
| return "judge_complete" | |
| if "hypothes" in agent_lower: | |
| return "hypothesizing" | |
| if "report" in agent_lower: | |
| return "synthesizing" | |
| return "judging" # Default for unknown agents | |
| def _process_event(self, event: Any, iteration: int) -> AgentEvent | None: | |
| """Process workflow event into AgentEvent.""" | |
| if isinstance(event, MagenticOrchestratorMessageEvent): | |
| text = self._extract_text(event.message) | |
| if text: | |
| return AgentEvent( | |
| type="judging", | |
| message=f"Manager ({event.kind}): {text[:200]}...", | |
| iteration=iteration, | |
| ) | |
| elif isinstance(event, MagenticAgentMessageEvent): | |
| agent_name = event.agent_id or "unknown" | |
| text = self._extract_text(event.message) | |
| event_type = self._get_event_type_for_agent(agent_name) | |
| # All returned types are valid AgentEvent.type literals | |
| return AgentEvent( | |
| type=event_type, # type: ignore[arg-type] | |
| message=f"{agent_name}: {text[:200]}...", | |
| iteration=iteration + 1, | |
| ) | |
| elif isinstance(event, MagenticFinalResultEvent): | |
| text = self._extract_text(event.message) if event.message else "No result" | |
| return AgentEvent( | |
| type="complete", | |
| message=text, | |
| data={"iterations": iteration}, | |
| iteration=iteration, | |
| ) | |
| elif isinstance(event, MagenticAgentDeltaEvent): | |
| if event.text: | |
| return AgentEvent( | |
| type="streaming", | |
| message=event.text, | |
| data={"agent_id": event.agent_id}, | |
| iteration=iteration, | |
| ) | |
| elif isinstance(event, WorkflowOutputEvent): | |
| if event.data: | |
| return AgentEvent( | |
| type="complete", | |
| message=str(event.data), | |
| iteration=iteration, | |
| ) | |
| return None | |
| def _create_deprecated_alias() -> type["AdvancedOrchestrator"]: | |
| """Create a deprecated alias that warns on use.""" | |
| import warnings | |
| class MagenticOrchestrator(AdvancedOrchestrator): | |
| """Deprecated alias for AdvancedOrchestrator. | |
| .. deprecated:: 0.1.0 | |
| Use :class:`AdvancedOrchestrator` instead. | |
| """ | |
| def __init__(self, *args: Any, **kwargs: Any) -> None: | |
| """Initialize deprecated MagenticOrchestrator (use AdvancedOrchestrator).""" | |
| warnings.warn( | |
| "MagenticOrchestrator is deprecated, use AdvancedOrchestrator instead. " | |
| "The name 'magentic' was confusing with the 'magentic' PyPI package.", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| super().__init__(*args, **kwargs) | |
| return MagenticOrchestrator | |
| # Backwards compatibility alias with deprecation warning | |
| MagenticOrchestrator = _create_deprecated_alias() | |