gMAS / src /execution /runner.py
Артём Боярских
chore: initial commit
3193174
"""
MACPRunner — executor of Multi-Agent Communication Protocol.
Supports both simple sequential execution and adaptive mode
with conditional edges, pruning, fallback, and parallel execution.
Also supports:
- Stratified memory (working/long-term)
- Hidden channels (hidden_state, embeddings)
- Exchange protocol separating visible/hidden data
- Typed errors and handling policies
- Token/request budgets at graph and node levels
- Structured execution event logging
- **Multi-model**: each agent can use its own LLM
Multi-model support:
MACPRunner supports three ways to specify LLMs:
1. A single llm_caller for all agents (legacy)
2. A dict of llm_callers: dict[agent_id, Callable] for different models
3. LLMCallerFactory that creates callers based on AgentLLMConfig
Priority: agent-specific caller > factory > default caller
"""
import asyncio
import time
import uuid
from collections import deque
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator
from datetime import UTC, datetime
from typing import Any, NamedTuple
import torch
from pydantic import BaseModel, ConfigDict, Field
from callbacks import (
CallbackManager,
Handler,
get_callback_manager,
)
from core.agent import AgentLLMConfig
from utils.memory import AgentMemory, MemoryConfig, SharedMemoryPool
from .budget import BudgetConfig, BudgetTracker
from .errors import (
ErrorPolicy,
ExecutionError,
ExecutionMetrics,
)
from .scheduler import (
AdaptiveScheduler,
ConditionContext,
ExecutionPlan,
PruningConfig,
RoutingPolicy,
StepResult,
build_execution_order,
extract_agent_adjacency,
filter_reachable_agents,
get_incoming_agents,
get_parallel_groups,
)
from .streaming import (
AgentErrorEvent,
AgentOutputEvent,
AgentStartEvent,
AnyStreamEvent,
AsyncStreamCallback,
FallbackEvent,
ParallelEndEvent,
ParallelStartEvent,
PruneEvent,
RunEndEvent,
RunStartEvent,
StreamCallback,
StreamEvent,
StreamEventType,
TokenEvent,
TopologyChangedEvent,
)
# Tools support (optional import)
try:
from tools import ToolRegistry
TOOLS_AVAILABLE = True
except ImportError:
TOOLS_AVAILABLE = False
# ---------------------------------------------------------------------------
# Module-level constants
# ---------------------------------------------------------------------------
#: Minimum edge weight below which the edge is considered absent (BFS).
_MIN_EDGE_WEIGHT: float = 1e-6
__all__ = [
"AgentMemory",
"AsyncStreamCallback",
"AsyncStructuredLLMCallerProtocol",
"AsyncTopologyHook",
"BudgetConfig",
"EarlyStopCondition",
"ErrorPolicy",
"ExecutionMetrics",
"HiddenState",
# Multi-model support
"LLMCallerFactory",
"LLMCallerProtocol",
"MACPResult",
"MACPRunner",
"MemoryConfig",
"RunnerConfig",
"SharedMemoryPool",
# Dynamic topology
"StepContext",
"StreamCallback",
# Streaming types
"StreamEvent",
"StreamEventType",
# Structured prompt support
"StructuredLLMCallerProtocol",
"StructuredPrompt",
"ToolRegistry",
"TopologyAction",
"TopologyHook",
"create_openai_async_structured_caller",
"create_openai_caller",
"create_openai_structured_caller",
]
# Type aliases for LLM callers
LLMCallerProtocol = Callable[[str], str]
AsyncLLMCallerProtocol = Callable[[str], Awaitable[str]]
# Structured prompt: allows callers to receive proper system/user roles
# instead of a flat string. This is the modern way to call chat LLMs.
StructuredLLMCallerProtocol = Callable[[list[dict[str, str]]], str]
AsyncStructuredLLMCallerProtocol = Callable[[list[dict[str, str]]], Awaitable[str]]
class StructuredPrompt:
"""
Prompt that carries both a flat string (legacy) and structured messages.
When a ``structured_llm_caller`` is available the runner sends
``messages`` (a list of ``{"role": ..., "content": ...}`` dicts)
so the LLM receives proper system / user roles. Otherwise the
runner falls back to the flat ``text`` representation — full
backward compatibility with ``Callable[[str], str]`` callers.
"""
__slots__ = ("messages", "text")
def __init__(self, text: str, messages: list[dict[str, str]]) -> None:
self.text = text
self.messages = messages
# Allow `len()`, logging, and anywhere a plain str was expected
def __str__(self) -> str:
return self.text
def __repr__(self) -> str:
return f"StructuredPrompt(text='{self.text[:80]}…', messages={len(self.messages)})"
class LLMCallerFactory:
"""
Factory for creating LLM callers based on agent configuration.
Supports:
- OpenAI-compatible APIs (OpenAI, Azure, Ollama, vLLM, LiteLLM)
- Caching created callers by (base_url, api_key, model_name)
- Fallback to default caller if configuration is not set
Example:
factory = LLMCallerFactory(
default_caller=my_default_caller,
default_config=LLMConfig(model_name="gpt-4", base_url="...")
)
# Automatically creates a caller for the agent
caller = factory.get_caller(agent.get_llm_config())
response = caller(prompt)
# Or with OpenAI
factory = LLMCallerFactory.create_openai_factory(
default_api_key="sk-...",
default_model="gpt-4"
)
"""
def __init__(
self,
default_caller: LLMCallerProtocol | None = None,
default_async_caller: AsyncLLMCallerProtocol | None = None,
default_config: AgentLLMConfig | None = None,
caller_builder: Callable[[AgentLLMConfig], LLMCallerProtocol] | None = None,
async_caller_builder: Callable[[AgentLLMConfig], AsyncLLMCallerProtocol] | None = None,
):
"""
Create an LLM caller factory.
Args:
default_caller: Default caller (for agents without custom configuration).
default_async_caller: Default async caller.
default_config: Default configuration (merged with agent config).
caller_builder: Function for creating a sync caller from configuration.
async_caller_builder: Function for creating an async caller from configuration.
"""
self.default_caller = default_caller
self.default_async_caller = default_async_caller
self.default_config = default_config
self.caller_builder = caller_builder
self.async_caller_builder = async_caller_builder
# Cache callers by config hash
self._caller_cache: dict[str, LLMCallerProtocol] = {}
self._async_caller_cache: dict[str, AsyncLLMCallerProtocol] = {}
def _config_key(self, config: AgentLLMConfig) -> str:
"""Create a cache key for the configuration."""
return f"{config.base_url}|{config.model_name}|{config.api_key}"
def _merge_config(self, config: AgentLLMConfig) -> AgentLLMConfig:
"""Merge the agent configuration with the default configuration."""
if not self.default_config:
return config
return AgentLLMConfig(
model_name=config.model_name or self.default_config.model_name,
base_url=config.base_url or self.default_config.base_url,
api_key=config.api_key or self.default_config.api_key,
max_tokens=config.max_tokens if config.max_tokens is not None else self.default_config.max_tokens,
temperature=config.temperature if config.temperature is not None else self.default_config.temperature,
timeout=config.timeout if config.timeout is not None else self.default_config.timeout,
top_p=config.top_p if config.top_p is not None else self.default_config.top_p,
stop_sequences=config.stop_sequences or self.default_config.stop_sequences,
extra_params={**self.default_config.extra_params, **config.extra_params},
)
def get_caller(
self,
config: AgentLLMConfig | None = None,
_agent_id: str | None = None,
) -> LLMCallerProtocol | None:
"""
Get sync caller for the given configuration.
Args:
config: Agent LLM configuration.
_agent_id: Agent ID (reserved for future use).
Returns:
LLM caller or None if creation failed.
"""
if config is None or not config.is_configured():
return self.default_caller
config = self._merge_config(config)
cache_key = self._config_key(config)
if cache_key in self._caller_cache:
return self._caller_cache[cache_key]
if self.caller_builder:
caller = self.caller_builder(config)
self._caller_cache[cache_key] = caller
return caller
return self.default_caller
def get_async_caller(
self,
config: AgentLLMConfig | None = None,
_agent_id: str | None = None,
) -> AsyncLLMCallerProtocol | None:
"""Get async caller for the given configuration."""
if config is None or not config.is_configured():
return self.default_async_caller
config = self._merge_config(config)
cache_key = self._config_key(config)
if cache_key in self._async_caller_cache:
return self._async_caller_cache[cache_key]
if self.async_caller_builder:
caller = self.async_caller_builder(config)
self._async_caller_cache[cache_key] = caller
return caller
return self.default_async_caller
@classmethod
def create_openai_factory(
cls,
default_api_key: str | None = None,
default_model: str = "gpt-4",
default_base_url: str = "https://api.openai.com/v1",
default_temperature: float = 0.7,
default_max_tokens: int = 2000,
) -> "LLMCallerFactory":
"""
Create a factory for OpenAI-compatible APIs.
Automatically creates callers using the openai library.
Args:
default_api_key: Default API key (or $OPENAI_API_KEY).
default_model: Default model name.
default_base_url: Default API URL.
default_temperature: Default temperature.
default_max_tokens: Default maximum tokens.
Example:
factory = LLMCallerFactory.create_openai_factory(
default_api_key="$OPENAI_API_KEY"
)
runner = MACPRunner(llm_factory=factory)
"""
import os
# Resolve default API key
if default_api_key and default_api_key.startswith("$"):
default_api_key = os.environ.get(default_api_key[1:])
default_config = AgentLLMConfig(
model_name=default_model,
base_url=default_base_url,
api_key=default_api_key,
temperature=default_temperature,
max_tokens=default_max_tokens,
)
def build_sync_caller(config: AgentLLMConfig) -> LLMCallerProtocol:
return _create_openai_caller_from_config(config)
def build_async_caller(config: AgentLLMConfig) -> AsyncLLMCallerProtocol:
return _create_async_openai_caller_from_config(config)
return cls(
default_config=default_config,
caller_builder=build_sync_caller,
async_caller_builder=build_async_caller,
)
def _create_openai_caller_from_config(config: AgentLLMConfig) -> LLMCallerProtocol:
"""Create an OpenAI-compatible sync caller from configuration."""
try:
from openai import OpenAI
except ImportError as e:
msg = "openai package required. Install with: pip install openai"
raise ImportError(msg) from e
api_key = config.resolve_api_key()
client = OpenAI(
api_key=api_key,
base_url=config.base_url,
timeout=config.timeout or 60.0,
)
gen_params = config.to_generation_params()
model = config.model_name or "gpt-4"
def caller(prompt: str) -> str:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**gen_params,
)
return response.choices[0].message.content or ""
return caller
def _create_async_openai_caller_from_config(config: AgentLLMConfig) -> AsyncLLMCallerProtocol:
"""Create an OpenAI-compatible async caller from configuration."""
try:
from openai import AsyncOpenAI
except ImportError as e:
msg = "openai package required. Install with: pip install openai"
raise ImportError(msg) from e
api_key = config.resolve_api_key()
client = AsyncOpenAI(
api_key=api_key,
base_url=config.base_url,
timeout=config.timeout or 60.0,
)
gen_params = config.to_generation_params()
model = config.model_name or "gpt-4"
async def caller(prompt: str) -> str:
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**gen_params,
)
return response.choices[0].message.content or ""
return caller
def create_openai_caller(
api_key: str | None = None,
model: str = "gpt-4",
base_url: str = "https://api.openai.com/v1",
temperature: float = 0.7,
max_tokens: int = 2000,
) -> LLMCallerProtocol:
"""
Create a simple OpenAI caller (convenience function).
Returns a ``llm_caller``-compatible callable: ``(str) -> str``.
Example:
caller = create_openai_caller(api_key="sk-...", model="gpt-4")
runner = MACPRunner(llm_caller=caller)
"""
config = AgentLLMConfig(
model_name=model,
base_url=base_url,
api_key=api_key,
temperature=temperature,
max_tokens=max_tokens,
)
return _create_openai_caller_from_config(config)
def create_openai_structured_caller(
api_key: str | None = None,
model: str = "gpt-4",
base_url: str = "https://api.openai.com/v1",
temperature: float = 0.7,
max_tokens: int = 2000,
) -> StructuredLLMCallerProtocol:
"""
Create an OpenAI structured caller (recommended for chat LLMs).
Returns a ``structured_llm_caller``-compatible callable that receives
a list of ``{"role": ..., "content": ...}`` dicts and returns a string.
This gives the LLM proper system/user role separation — shorter,
more focused responses and fewer tokens in long chains.
Example:
caller = create_openai_structured_caller(api_key="sk-...", model="gpt-4o")
runner = MACPRunner(structured_llm_caller=caller)
"""
try:
from openai import OpenAI
except ImportError as e:
msg = "openai package required. Install with: pip install openai"
raise ImportError(msg) from e
client = OpenAI(api_key=api_key, base_url=base_url)
def caller(messages: list[dict[str, str]]) -> str:
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
return response.choices[0].message.content or ""
return caller
def create_openai_async_structured_caller(
api_key: str | None = None,
model: str = "gpt-4",
base_url: str = "https://api.openai.com/v1",
temperature: float = 0.7,
max_tokens: int = 2000,
) -> AsyncStructuredLLMCallerProtocol:
"""
Create an async OpenAI structured caller.
Returns an ``async_structured_llm_caller``-compatible callable.
Required for parallel execution via ``astream()`` with
``enable_parallel=True``.
Example:
sync_caller = create_openai_structured_caller(api_key="sk-...")
async_caller = create_openai_async_structured_caller(api_key="sk-...")
runner = MACPRunner(
structured_llm_caller=sync_caller,
async_structured_llm_caller=async_caller,
config=RunnerConfig(enable_parallel=True),
)
# Sequential topologies — use stream()
for event in runner.stream(graph):
...
# Parallel topologies — use astream()
async for event in runner.astream(graph):
...
"""
try:
from openai import AsyncOpenAI
except ImportError as e:
msg = "openai package required. Install with: pip install openai"
raise ImportError(msg) from e
client = AsyncOpenAI(api_key=api_key, base_url=base_url)
async def caller(messages: list[dict[str, str]]) -> str:
response = await client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
return response.choices[0].message.content or ""
return caller
class HiddenState(BaseModel):
"""Agent hidden state/embeddings passed via hidden channels."""
model_config = ConfigDict(arbitrary_types_allowed=True)
tensor: torch.Tensor | None = None
embedding: torch.Tensor | None = None
metadata: dict[str, Any] = Field(default_factory=dict)
# =============================================================================
# DYNAMIC TOPOLOGY (runtime graph modification)
# =============================================================================
class StepContext(BaseModel):
"""
Context of the current execution step for making graph modification decisions.
Passed to hooks for dynamic topology modification at runtime.
Attributes:
agent_id: ID of the current agent.
response: Agent response (if already received).
messages: All agent responses so far.
step_result: Result of the current step.
execution_order: Execution order up to the current moment.
remaining_agents: Agents that have not yet been executed.
query: Original query.
total_tokens: Tokens used.
metadata: Arbitrary metadata.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
agent_id: str
response: str | None = None
messages: dict[str, str] = Field(default_factory=dict)
step_result: StepResult | None = None
execution_order: list[str] = Field(default_factory=list)
remaining_agents: list[str] = Field(default_factory=list)
query: str = ""
total_tokens: int = 0
metadata: dict[str, Any] = Field(default_factory=dict)
class TopologyAction(BaseModel):
"""
Action for modifying the graph/plan topology at runtime.
Returned from hooks for dynamic graph and ExecutionPlan modification.
Supports two levels of modification:
- Graph (add_edges, remove_edges) — for non-adaptive mode.
- Plan (skip_agents, force_agents, condition_skip/unskip) — for adaptive mode.
"""
# Early stopping
early_stop: bool = False
early_stop_reason: str | None = None
# Adding/removing edges (graph modification)
add_edges: list[tuple[str, str, float]] = Field(default_factory=list) # (src, tgt, weight)
remove_edges: list[tuple[str, str]] = Field(default_factory=list)
# Skip agents (skipped — permanently excluded from the plan)
skip_agents: list[str] = Field(default_factory=list)
# Force agent execution (even if not in the plan — added to the plan)
force_agents: list[str] = Field(default_factory=list)
# Conditional skip (condition_skipped — can be restored when conditions change)
condition_skip_agents: list[str] = Field(default_factory=list)
condition_unskip_agents: list[str] = Field(default_factory=list)
# Add agents to the plan with their unconditional chains
insert_chains: list[tuple[str, str]] = Field(default_factory=list)
# Change the end agent
new_end_agent: str | None = None
# Reserved for future use
trigger_rebuild: bool = False
# Type aliases for hooks
TopologyHook = Callable[[StepContext, Any], TopologyAction | None]
AsyncTopologyHook = Callable[[StepContext, Any], Awaitable[TopologyAction | None]]
class EarlyStopCondition:
"""
Condition for early stopping of execution.
Allows stopping graph execution based on an arbitrary condition.
The condition is any function (ctx: StepContext) -> bool.
Attributes:
condition: Condition evaluation function.
reason: Stop reason (for logging/debugging).
after_agents: List of agents after which to check the condition.
min_agents_executed: Minimum number of agents that must execute before checking.
Example:
# Arbitrary condition — any logic
stop_condition = EarlyStopCondition(
condition=lambda ctx: my_complex_check(ctx.messages, ctx.metadata),
reason="Custom condition met"
)
# Condition based on metrics
stop_condition = EarlyStopCondition(
condition=lambda ctx: ctx.metadata.get("quality_score", 0) > 0.9,
reason="Quality threshold reached"
)
# Stop after reaching a token limit
stop_condition = EarlyStopCondition(
condition=lambda ctx: ctx.total_tokens > 5000,
reason="Token limit reached"
)
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(early_stop_conditions=[stop_condition])
)
"""
def __init__(
self,
condition: Callable[[StepContext], bool],
reason: str = "Early stop condition met",
after_agents: list[str] | None = None,
min_agents_executed: int = 0,
):
"""
Create an early stop condition.
Args:
condition: Arbitrary condition function (ctx -> bool).
reason: Stop reason (for logging).
after_agents: List of agents after which to check the condition
(if None — check after every agent).
min_agents_executed: Minimum number of agents that must execute
before checking the condition.
"""
self.condition = condition
self.reason = reason
self.after_agents = after_agents
self.min_agents_executed = min_agents_executed
def should_stop(self, ctx: StepContext) -> tuple[bool, str]:
"""Check whether execution should stop."""
# Check minimum number of executed agents
if len(ctx.execution_order) < self.min_agents_executed:
return False, ""
# Check whether the current agent matches
if self.after_agents and ctx.agent_id not in self.after_agents:
return False, ""
try:
if self.condition(ctx):
return True, self.reason
except (ValueError, TypeError, KeyError, AttributeError, RuntimeError):
# Condition evaluation failed, treat as not met
return False, ""
return False, ""
# =========================================================================
# FACTORY METHODS for common conditions
# =========================================================================
@classmethod
def on_keyword(
cls,
keyword: str,
reason: str | None = None,
*,
case_sensitive: bool = False,
in_last_response: bool = True,
) -> "EarlyStopCondition":
"""
Stop if the response contains a keyword.
Args:
keyword: Keyword to search for.
reason: Stop reason.
case_sensitive: Whether to perform a case-sensitive match.
in_last_response: Search only in the last response (otherwise in all).
Example:
stop = EarlyStopCondition.on_keyword("FINAL ANSWER")
"""
def check(ctx: StepContext) -> bool:
text = ctx.response or "" if in_last_response else " ".join(ctx.messages.values())
if case_sensitive:
return keyword in text
return keyword.lower() in text.lower()
return cls(condition=check, reason=reason or f"Keyword '{keyword}' found")
@classmethod
def on_token_limit(
cls,
max_tokens: int,
reason: str | None = None,
) -> "EarlyStopCondition":
"""
Stop when the token limit is reached.
Args:
max_tokens: Maximum number of tokens.
reason: Stop reason.
Example:
stop = EarlyStopCondition.on_token_limit(5000)
"""
return cls(
condition=lambda ctx: ctx.total_tokens >= max_tokens,
reason=reason or f"Token limit {max_tokens} reached",
)
@classmethod
def on_agent_count(
cls,
max_agents: int,
reason: str | None = None,
) -> "EarlyStopCondition":
"""
Stop after N agents have been executed.
Args:
max_agents: Maximum number of agents.
reason: Stop reason.
Example:
stop = EarlyStopCondition.on_agent_count(3)
"""
return cls(
condition=lambda ctx: len(ctx.execution_order) >= max_agents,
reason=reason or f"Agent count limit {max_agents} reached",
)
@classmethod
def on_metadata(
cls,
key: str,
value: Any = None,
comparator: Callable[[Any, Any], bool] | None = None,
reason: str | None = None,
) -> "EarlyStopCondition":
"""
Stop based on a value in metadata.
Args:
key: Key in metadata.
value: Expected value (if None — only presence is checked).
comparator: Comparison function (default: ==).
reason: Stop reason.
Example:
# Stop if quality > 0.9
stop = EarlyStopCondition.on_metadata(
"quality", 0.9,
comparator=lambda v, threshold: v > threshold
)
"""
def check(ctx: StepContext) -> bool:
if key not in ctx.metadata:
return False
actual = ctx.metadata[key]
if value is None:
return True
if comparator:
return comparator(actual, value)
return actual == value
return cls(condition=check, reason=reason or f"Metadata condition met: {key}")
@classmethod
def on_custom(
cls,
condition: Callable[[StepContext], bool],
reason: str = "Custom condition met",
**kwargs,
) -> "EarlyStopCondition":
"""
Create a condition with an arbitrary function (constructor alias).
Args:
condition: Arbitrary evaluation function.
reason: Stop reason.
**kwargs: Additional parameters (after_agents, min_agents_executed).
Example:
stop = EarlyStopCondition.on_custom(
lambda ctx: my_rl_agent.should_stop(ctx.messages),
reason="RL agent decided to stop"
)
"""
return cls(condition=condition, reason=reason, **kwargs)
@classmethod
def combine_any(
cls,
conditions: list["EarlyStopCondition"],
reason: str = "One of conditions met",
) -> "EarlyStopCondition":
"""
Combine conditions with OR (stop if at least one is met).
Args:
conditions: List of conditions.
reason: Stop reason.
Example:
stop = EarlyStopCondition.combine_any([
EarlyStopCondition.on_keyword("DONE"),
EarlyStopCondition.on_token_limit(10000),
])
"""
def check(ctx: StepContext) -> bool:
for cond in conditions:
should_stop, _ = cond.should_stop(ctx)
if should_stop:
return True
return False
return cls(condition=check, reason=reason)
@classmethod
def combine_all(
cls,
conditions: list["EarlyStopCondition"],
reason: str = "All conditions met",
) -> "EarlyStopCondition":
"""
Combine conditions with AND (stop if all are met).
Args:
conditions: List of conditions.
reason: Stop reason.
Example:
stop = EarlyStopCondition.combine_all([
EarlyStopCondition.on_keyword("answer"),
EarlyStopCondition.on_metadata("confidence", 0.8, lambda v, t: v > t),
])
"""
def check(ctx: StepContext) -> bool:
for cond in conditions:
should_stop, _ = cond.should_stop(ctx)
if not should_stop:
return False
return True
return cls(condition=check, reason=reason)
class MACPResult(NamedTuple):
"""MACP execution result with messages, metrics, and states."""
messages: dict[str, str]
final_answer: str
final_agent_id: str
execution_order: list[str]
agent_states: dict[str, list[dict[str, Any]]] | None = None
step_results: dict[str, StepResult] | None = None
total_tokens: int = 0
total_time: float = 0.0
topology_changed_count: int = 0
fallback_count: int = 0
pruned_agents: list[str] | None = None
errors: list[ExecutionError] | None = None
hidden_states: dict[str, HiddenState] | None = None
metrics: ExecutionMetrics | None = None
budget_summary: dict[str, Any] | None = None
# Dynamic topology
early_stopped: bool = False
early_stop_reason: str | None = None
topology_modifications: int = 0 # Number of topology modifications
class ExecutionContext(NamedTuple):
"""
Common initialization context for all execution methods.
Assembled by ``MACPRunner._prepare_base_context`` and contains
parsed graph data: indices, agent identifiers, and lookups.
"""
task_idx: int
a_agents: Any # torch.Tensor adjacency matrix
agent_ids: list[str]
query: str
agent_lookup: dict[str, Any]
agent_names: dict[str, str]
class RunnerConfig(BaseModel):
"""Runner configuration: timeouts, adaptivity, parallelism, budgets, and logging."""
model_config = ConfigDict(arbitrary_types_allowed=True)
timeout: float = 60.0
adaptive: bool = False
enable_parallel: bool = True
max_parallel_size: int = 5
max_retries: int = 2
retry_delay: float = 1.0
retry_backoff: float = 2.0
update_states: bool = True
routing_policy: RoutingPolicy = RoutingPolicy.TOPOLOGICAL
pruning_config: PruningConfig | None = None
enable_hidden_channels: bool = False
hidden_combine_strategy: str = "mean"
pass_embeddings: bool = True
error_policy: ErrorPolicy = Field(default_factory=ErrorPolicy)
budget_config: BudgetConfig | None = None
callbacks: list[Handler] = Field(default_factory=list)
# Memory integration
enable_memory: bool = False
memory_config: MemoryConfig | None = None
memory_context_limit: int = 5 # number of memory entries to include in the prompt
# Streaming configuration
enable_token_streaming: bool = False # Enable token-level streaming if LLM supports it
stream_callbacks: list[StreamCallback] = Field(default_factory=list)
async_stream_callbacks: list[AsyncStreamCallback] = Field(default_factory=list)
prompt_preview_length: int = 100 # How many chars of prompt to include in events
# Task query broadcast mode
broadcast_task_to_all: bool = True # True: task query is broadcast to all agents
# False: only to agents directly connected to the task node
# Dynamic topology (graph modification at runtime)
enable_dynamic_topology: bool = False # Enable dynamic topology support
topology_hooks: list[Any] = Field(default_factory=list) # TopologyHook callbacks
async_topology_hooks: list[Any] = Field(default_factory=list) # AsyncTopologyHook callbacks
early_stop_conditions: list[Any] = Field(default_factory=list) # EarlyStopCondition list
# Tools support - if an agent has tools, they are used AUTOMATICALLY
max_tool_iterations: int = 3 # Maximum tool calling iterations per agent
tool_registry: Any | None = None # ToolRegistry (optional, can use global registry)
class MACPRunner:
"""
MACP protocol executor for RoleGraph with sync/async and adaptive mode.
Supports three execution modes:
- Batch: run_round() / arun_round() - returns complete result
- Streaming: stream() / astream() - yields events during execution
Multi-model support:
Each agent can use its own LLM. Three ways to specify models:
1. A single llm_caller for all agents (legacy)
2. llm_callers: dict[agent_id, Callable] — different models for different agents
3. llm_factory: LLMCallerFactory — dynamic caller creation
Priority: llm_callers[agent_id] > factory > default llm_caller
Streaming Example:
runner = MACPRunner(llm_caller=my_llm)
# Sync streaming
for event in runner.stream(graph):
if event.event_type == StreamEventType.AGENT_OUTPUT:
print(f"{event.agent_id}: {event.content}")
# Async streaming
async for event in runner.astream(graph):
print(event)
Multi-model Example:
# Option 1: Different callers for different agents
runner = MACPRunner(
llm_caller=default_caller,
llm_callers={
"solver": gpt4_caller,
"reviewer": claude_caller,
"analyzer": local_llama_caller,
}
)
# Option 2: Factory (automatically creates callers from agent LLM configurations)
factory = LLMCallerFactory.create_openai_factory(default_api_key="...")
runner = MACPRunner(llm_factory=factory)
# Agents with llm_config will automatically receive their callers:
builder.add_agent("solver", llm_backbone="gpt-4", temperature=0.7)
builder.add_agent("analyzer", llm_backbone="gpt-4o-mini", temperature=0.0)
Structured Prompt (recommended for modern chat LLMs):
Instead of the legacy ``llm_caller(str) -> str`` interface, use
``structured_llm_caller`` to send proper system/user roles to the LLM.
This avoids the "prompt as one string" problem where the model has to
re-parse a flat blob, and typically results in shorter, more focused
responses with fewer tokens — especially in long agent chains.
from openai import OpenAI
client = OpenAI(api_key="sk-...")
def my_structured_caller(messages: list[dict[str, str]]) -> str:
resp = client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=1024,
)
return resp.choices[0].message.content or ""
runner = MACPRunner(structured_llm_caller=my_structured_caller)
result = runner.run_round(graph)
# The runner automatically builds StructuredPrompt with:
# {"role": "system", "content": "<persona + description + tools hint + output_schema>"}
# {"role": "assistant", "content": "<state[0] assistant turn>"} # replayed from agent.state
# {"role": "user", "content": "<state[1] user turn>"} # replayed from agent.state
# ...
# {"role": "user", "content": "<task + input_schema hint + memory + incoming messages>"}
# and passes prompt.messages to your structured_llm_caller.
"""
def __init__(
self,
llm_caller: Callable[[str], str] | None = None,
async_llm_caller: Callable[[str], Awaitable[str]] | None = None,
streaming_llm_caller: Callable[[str], Iterator[str]] | None = None,
async_streaming_llm_caller: Callable[[str], AsyncIterator[str]] | None = None,
token_counter: Callable[[str], int] | None = None,
config: RunnerConfig | None = None,
timeout: int = 60,
memory_pool: SharedMemoryPool | None = None,
# Multi-model support
llm_callers: dict[str, Callable[[str], str]] | None = None,
async_llm_callers: dict[str, Callable[[str], Awaitable[str]]] | None = None,
llm_factory: LLMCallerFactory | None = None,
# Tools support
tool_registry: Any | None = None,
# Structured prompt support (modern chat LLMs)
structured_llm_caller: StructuredLLMCallerProtocol | None = None,
async_structured_llm_caller: AsyncStructuredLLMCallerProtocol | None = None,
):
"""
Create a MACP runner with multi-model and tools support.
Args:
llm_caller: Default synchronous LLM call (returns full response).
async_llm_caller: Default asynchronous LLM call (returns full response).
streaming_llm_caller: Synchronous streaming LLM (yields tokens).
async_streaming_llm_caller: Asynchronous streaming LLM (async yields tokens).
token_counter: Function to estimate tokens in text.
config: Runner configuration (otherwise created with the given timeout).
timeout: Default timeout (seconds), used if not specified in config.
memory_pool: External SharedMemoryPool (if None — created automatically).
# Multi-model support:
llm_callers: Dict agent_id -> sync caller. Has highest priority.
async_llm_callers: Dict agent_id -> async caller.
llm_factory: Factory for creating callers based on agent LLM configurations.
Used if no explicit caller is set in llm_callers for the agent.
# Tools support:
tool_registry: Tool registry (ToolRegistry). Optional.
If an agent has tools, they are used automatically.
# Structured prompt support:
structured_llm_caller: Callable that receives a list of
``{"role": "system"|"user", "content": "..."}`` dicts
and returns a string. When provided the runner sends
proper system/user roles to the LLM instead of a flat
string — this typically produces shorter, more focused
responses and saves tokens in long chains.
async_structured_llm_caller: Async version of the above.
Example:
# Multi-model via caller dictionary
runner = MACPRunner(
llm_caller=default_gpt4_caller,
llm_callers={
"analyzer": create_openai_caller(model="gpt-4o-mini"),
"expert": create_openai_caller(model="gpt-4-turbo"),
}
)
# Multi-model via factory
factory = LLMCallerFactory.create_openai_factory()
runner = MACPRunner(llm_factory=factory)
# Structured prompt (modern chat LLMs)
def my_chat(messages: list[dict[str, str]]) -> str:
return openai_client.chat.completions.create(
model="gpt-4", messages=messages
).choices[0].message.content
runner = MACPRunner(structured_llm_caller=my_chat)
"""
self.structured_llm_caller = structured_llm_caller
self.async_structured_llm_caller = async_structured_llm_caller
# When only a structured caller is provided, create a thin
# str->str wrapper so all existing code paths that check
# ``self.llm_caller is not None`` keep working.
if llm_caller is None and structured_llm_caller is not None:
_sc = structured_llm_caller # capture for closure
def _str_wrapper(prompt: str) -> str:
return _sc([{"role": "user", "content": prompt}])
llm_caller = _str_wrapper
self.llm_caller = llm_caller
self.async_llm_caller = async_llm_caller
self.streaming_llm_caller = streaming_llm_caller
self.async_streaming_llm_caller = async_streaming_llm_caller
self.token_counter = token_counter or self._default_token_counter
self.config = config or RunnerConfig(timeout=float(timeout))
# Multi-model support
self.llm_callers = llm_callers or {}
self.async_llm_callers = async_llm_callers or {}
self.llm_factory = llm_factory
# Tools support
self.tool_registry = tool_registry
self._scheduler = (
AdaptiveScheduler(
policy=self.config.routing_policy,
pruning_config=self.config.pruning_config,
)
if self.config.adaptive
else None
)
self._callback_manager: CallbackManager | None = None
self._budget_tracker: BudgetTracker | None = None
self._metrics: ExecutionMetrics | None = None
# Memory integration
self._memory_pool: SharedMemoryPool | None = memory_pool
self._agent_memories: dict[str, AgentMemory] = {}
def _init_run(
self,
graph_name: str | None = None, # noqa: ARG002
num_agents: int = 0,
query: str = "",
execution_order: list[str] | None = None,
callbacks: list[Handler] | None = None,
) -> uuid.UUID:
"""Initialize callbacks, budgets and metrics before running. Returns run_id."""
# Merge config callbacks with per-run callbacks and context callbacks
all_callbacks = list(self.config.callbacks)
if callbacks:
all_callbacks.extend(callbacks)
# Check for context callback manager
context_manager = get_callback_manager()
if context_manager:
all_callbacks.extend(context_manager.handlers)
self._callback_manager = CallbackManager.configure(handlers=all_callbacks)
if self.config.budget_config:
self._budget_tracker = BudgetTracker(self.config.budget_config)
self._budget_tracker.start()
else:
self._budget_tracker = None
self._metrics = ExecutionMetrics(
start_time=datetime.now(tz=UTC),
total_agents=num_agents,
)
return self._callback_manager.on_run_start(
query=query,
num_agents=num_agents,
execution_order=execution_order or [],
)
def _prepare_base_context(self, role_graph: Any) -> "ExecutionContext | None":
"""
Assemble the common initialization context from role_graph.
Extracts task_idx, adjacency matrix, agent_ids list, query,
and lookup dictionaries. Does not call ``_init_memory`` — each execution
method does so itself (accounting for possible agent filtering).
Returns:
``ExecutionContext`` with graph data, or ``None`` if there are no agents.
"""
task_idx = self._get_task_index(role_graph)
a_agents = extract_agent_adjacency(role_graph.A_com, task_idx)
agent_ids, _ = self._get_agent_ids(role_graph, task_idx)
if not agent_ids:
return None
query = role_graph.query or ""
agent_lookup = {a.agent_id: a for a in role_graph.agents}
agent_names = self._build_agent_names(role_graph)
return ExecutionContext(
task_idx=task_idx,
a_agents=a_agents,
agent_ids=agent_ids,
query=query,
agent_lookup=agent_lookup,
agent_names=agent_names,
)
def _init_memory(self, agent_ids: list[str]) -> None:
"""Initialize memory for agents before execution."""
if not self.config.enable_memory:
return
if self._memory_pool is None:
self._memory_pool = SharedMemoryPool()
mem_config = self.config.memory_config or MemoryConfig()
for agent_id in agent_ids:
if agent_id not in self._agent_memories:
memory = AgentMemory(agent_id, mem_config)
self._agent_memories[agent_id] = memory
self._memory_pool.register(memory)
def _get_memory_context(self, agent_id: str) -> list[dict[str, Any]]:
"""Get the latest entries from the agent's memory for context."""
if not self.config.enable_memory or agent_id not in self._agent_memories:
return []
memory = self._agent_memories[agent_id]
return memory.get_messages(limit=self.config.memory_context_limit)
def _save_to_memory(
self,
agent_id: str,
response: str,
incoming_ids: list[str] | None = None,
) -> None:
"""Save the agent's response to its memory and share with neighbors."""
if not self.config.enable_memory or agent_id not in self._agent_memories:
return
memory = self._agent_memories[agent_id]
entry = memory.add_message(role="assistant", content=response)
# Share with incoming agents (graph neighbors)
if self._memory_pool and incoming_ids:
self._memory_pool.share(agent_id, entry, to_agents=incoming_ids)
def get_agent_memory(self, agent_id: str) -> AgentMemory | None:
"""Get the agent's memory by id (for external access)."""
return self._agent_memories.get(agent_id)
@property
def memory_pool(self) -> SharedMemoryPool | None:
"""Access to the SharedMemoryPool."""
return self._memory_pool
# =========================================================================
# DYNAMIC TOPOLOGY METHODS
# =========================================================================
def _check_early_stop(
self,
agent_id: str,
response: str | None,
messages: dict[str, str],
execution_order: list[str],
remaining_agents: list[str],
query: str,
total_tokens: int,
) -> tuple[bool, str]:
"""
Check early stop conditions.
Returns:
(should_stop, reason)
"""
if not self.config.early_stop_conditions:
return False, ""
ctx = StepContext(
agent_id=agent_id,
response=response,
messages=messages,
execution_order=execution_order,
remaining_agents=remaining_agents,
query=query,
total_tokens=total_tokens,
)
for condition in self.config.early_stop_conditions:
if isinstance(condition, EarlyStopCondition):
should_stop, reason = condition.should_stop(ctx)
if should_stop:
return True, reason
return False, ""
def _apply_topology_hooks(
self,
agent_id: str,
response: str | None,
step_result: StepResult | None,
messages: dict[str, str],
execution_order: list[str],
remaining_agents: list[str],
query: str,
total_tokens: int,
role_graph: Any,
) -> TopologyAction | None:
"""
Apply sync topology hooks and collect actions.
Returns:
Combined TopologyAction or None.
"""
if not self.config.enable_dynamic_topology or not self.config.topology_hooks:
return None
ctx = StepContext(
agent_id=agent_id,
response=response,
step_result=step_result,
messages=messages,
execution_order=execution_order,
remaining_agents=remaining_agents,
query=query,
total_tokens=total_tokens,
)
combined_action = TopologyAction()
for hook in self.config.topology_hooks:
try:
action = hook(ctx, role_graph)
if action is not None:
combined_action = self._merge_topology_actions(combined_action, action)
except (ValueError, TypeError, KeyError, RuntimeError):
pass # Ignore hook errors
if self._has_topology_action(combined_action):
return combined_action
return None
async def _apply_async_topology_hooks(
self,
agent_id: str,
response: str | None,
step_result: StepResult | None,
messages: dict[str, str],
execution_order: list[str],
remaining_agents: list[str],
query: str,
total_tokens: int,
role_graph: Any,
) -> TopologyAction | None:
"""Apply async topology hooks and collect actions."""
if not self.config.enable_dynamic_topology or not self.config.async_topology_hooks:
return None
ctx = StepContext(
agent_id=agent_id,
response=response,
step_result=step_result,
messages=messages,
execution_order=execution_order,
remaining_agents=remaining_agents,
query=query,
total_tokens=total_tokens,
)
combined_action = TopologyAction()
for hook in self.config.async_topology_hooks:
try:
action = await hook(ctx, role_graph)
if action is not None:
combined_action = self._merge_topology_actions(combined_action, action)
except (ValueError, TypeError, KeyError, RuntimeError):
pass # Ignore async hook errors
if self._has_topology_action(combined_action):
return combined_action
return None
def _merge_topology_actions(
self,
base: TopologyAction,
new: TopologyAction,
) -> TopologyAction:
"""Merge two TopologyAction objects."""
return TopologyAction(
early_stop=base.early_stop or new.early_stop,
early_stop_reason=new.early_stop_reason or base.early_stop_reason,
add_edges=base.add_edges + new.add_edges,
remove_edges=base.remove_edges + new.remove_edges,
skip_agents=list(set(base.skip_agents + new.skip_agents)),
force_agents=list(set(base.force_agents + new.force_agents)),
condition_skip_agents=list(set(base.condition_skip_agents + new.condition_skip_agents)),
condition_unskip_agents=list(set(base.condition_unskip_agents + new.condition_unskip_agents)),
insert_chains=base.insert_chains + new.insert_chains,
new_end_agent=new.new_end_agent or base.new_end_agent,
trigger_rebuild=base.trigger_rebuild or new.trigger_rebuild,
)
def _apply_graph_modifications(
self,
role_graph: Any,
action: TopologyAction,
) -> int:
"""Apply modifications to the graph and return the number of changes."""
modifications = 0
# Remove edges
for src, tgt in action.remove_edges:
if role_graph.remove_edge(src, tgt):
modifications += 1
# Add edges
for src, tgt, weight in action.add_edges:
if role_graph.add_edge(src, tgt, weight):
modifications += 1
return modifications
@staticmethod
def _has_topology_action(action: TopologyAction) -> bool:
"""Check whether the TopologyAction contains any actions."""
return bool(
action.early_stop
or action.add_edges
or action.remove_edges
or action.skip_agents
or action.force_agents
or action.condition_skip_agents
or action.condition_unskip_agents
or action.insert_chains
or action.new_end_agent
or action.trigger_rebuild
)
def _build_conditional_edge_action( # noqa: PLR0912
self,
last_agent: str,
agent_ids: list[str],
step_results: dict[str, StepResult],
messages: dict[str, str],
query: str,
remaining_ids: set[str],
) -> TopologyAction | None:
"""
Built-in topology hook: evaluate conditional edges and return TopologyAction.
Checks outgoing conditional edges of the agent and builds an action:
- condition_skip_agents: agents whose conditions are not met
(only if there are no other unevaluated incoming conditional edges).
- condition_unskip_agents + insert_chains: agents whose conditions are met.
With multiple incoming conditional edges (A→B, C→B):
- If A→B is not met but C has not yet run — B is NOT skipped.
- If C→B is met — B is unskipped.
- B is skipped only if ALL incoming conditional edges have been evaluated and ALL are not met.
Complexity: O(E_cond) — only conditional edges are checked.
"""
if self._scheduler is None:
return None
edge_conditions = self._scheduler._last_edge_conditions # noqa: SLF001
if not edge_conditions or not messages:
return None
evaluator = self._scheduler.condition_evaluator
executed_agents = set(step_results.keys())
skip: list[str] = []
unskip: list[str] = []
chains: list[tuple[str, str]] = []
for (source, target), condition in edge_conditions.items():
if source != last_agent:
continue
if target not in agent_ids:
continue
ctx = ConditionContext(
source_agent=source,
target_agent=target,
messages=messages,
step_results=step_results,
query=query,
)
if evaluator.evaluate(condition, ctx):
unskip.append(target)
if target not in remaining_ids:
chains.append((target, last_agent))
elif target in remaining_ids:
# Skip only if:
# 1. No other unevaluated incoming conditional edges (waiting for them).
# 2. No other already-evaluated incoming conditional edge has passed.
has_pending_incoming = False
has_passed_incoming = False
for (src, tgt), cond in edge_conditions.items():
if tgt != target or src == source:
continue
if src not in executed_agents:
has_pending_incoming = True
break
# Source was already executed — check its condition
other_ctx = ConditionContext(
source_agent=src,
target_agent=target,
messages=messages,
step_results=step_results,
query=query,
)
if evaluator.evaluate(cond, other_ctx):
has_passed_incoming = True
break
if not has_pending_incoming and not has_passed_incoming:
skip.append(target)
if not skip and not unskip and not chains:
return None
return TopologyAction(
condition_skip_agents=skip,
condition_unskip_agents=unskip,
insert_chains=chains,
)
def _apply_topology_to_plan(
self,
plan: ExecutionPlan,
action: TopologyAction,
a_agents: Any,
agent_ids: list[str],
) -> bool:
"""
Apply a TopologyAction to the ExecutionPlan.
Single method for modifying the plan from any source:
user hooks, built-in conditional edge hook, etc.
Args:
plan: Current execution plan.
action: Action to apply.
a_agents: Adjacency matrix (for BFS chains).
agent_ids: List of agent IDs.
Returns:
True if the plan was modified.
"""
changed = False
edge_conditions = self._scheduler._last_edge_conditions if self._scheduler else {} # noqa: SLF001
# 1. Condition skip + cascade skip of unconditional descendants
for agent_id in action.condition_skip_agents:
plan.condition_skipped.add(agent_id)
changed = True
# Cascading condition_skip of unconditional descendants (BFS)
self._cascade_condition_skip(
plan,
agent_id,
a_agents,
agent_ids,
edge_conditions,
)
# 2. Condition unskip
for agent_id in action.condition_unskip_agents:
plan.condition_skipped.discard(agent_id)
changed = True
# 3. Skip (permanent skip)
remaining_ids = {s.agent_id for s in plan.steps[plan.current_index :]}
for agent_id in action.skip_agents:
if agent_id in remaining_ids:
plan.condition_skipped.add(agent_id)
changed = True
# 4. Force agents — add to the plan if not already there
for agent_id in action.force_agents:
if agent_id not in remaining_ids and agent_id in agent_ids:
plan.condition_skipped.discard(agent_id)
added = plan.insert_conditional_step(agent_id=agent_id, predecessors=[])
if added:
remaining_ids.add(agent_id)
changed = True
# 5. Insert chains — add the agent + its unconditional chain
for target, predecessor in action.insert_chains:
plan.condition_skipped.discard(target)
if target not in remaining_ids:
added = plan.insert_conditional_step(
agent_id=target,
predecessors=[predecessor],
)
if added:
remaining_ids.add(target)
changed = True
# BFS — add the unconditional chain after target
self._insert_unconditional_chain(
plan,
target,
a_agents,
agent_ids,
edge_conditions,
remaining_ids,
)
else:
changed = True
return changed
@staticmethod
def _cascade_condition_skip(
plan: ExecutionPlan,
skipped_agent: str,
a_agents: Any,
agent_ids: list[str],
edge_conditions: dict[tuple[str, str], Any],
) -> None:
"""
BFS: cascading condition_skip of unconditional descendants of skipped_agent.
If an agent is condition_skipped, its unconditional descendants should also be
skipped (if they have no other incoming data paths).
"""
queue = deque([skipped_agent])
visited = {skipped_agent}
remaining_ids = {s.agent_id for s in plan.steps[plan.current_index :]}
while queue:
current = queue.popleft()
if current not in agent_ids:
continue
current_idx = agent_ids.index(current)
for j, aid in enumerate(agent_ids):
if aid in visited or aid not in remaining_ids:
continue
weight = a_agents[current_idx, j]
if hasattr(weight, "item"):
weight = weight.item()
if weight <= _MIN_EDGE_WEIGHT:
continue
# Skip conditional edges — they are handled separately
if (current, aid) in edge_conditions:
continue
# Check: does aid have other incoming unconditional edges
# from agents that are NOT condition_skipped?
has_other_source = False
for k, src in enumerate(agent_ids):
if src == current or src in plan.condition_skipped:
continue
w = a_agents[k, j]
if hasattr(w, "item"):
w = w.item()
if w > _MIN_EDGE_WEIGHT and (src, aid) not in edge_conditions:
has_other_source = True
break
if not has_other_source:
visited.add(aid)
plan.condition_skipped.add(aid)
queue.append(aid)
@staticmethod
def _insert_unconditional_chain(
plan: ExecutionPlan,
start_agent: str,
a_agents: Any,
agent_ids: list[str],
edge_conditions: dict[tuple[str, str], Any],
remaining_ids: set[str],
) -> None:
"""
BFS: add a chain of unconditionally linked agents to the plan after start_agent.
Traverses edges without conditions and adds all subsequent agents to the plan.
"""
queue = deque([start_agent])
visited = {start_agent}
while queue:
current = queue.popleft()
if current not in agent_ids:
continue
current_idx = agent_ids.index(current)
for j, aid in enumerate(agent_ids):
if aid in visited:
continue
weight = a_agents[current_idx, j]
if hasattr(weight, "item"):
weight = weight.item()
if weight <= _MIN_EDGE_WEIGHT:
continue
# Skip conditional edges — they are handled separately
if (current, aid) in edge_conditions:
continue
visited.add(aid)
if aid not in remaining_ids:
plan.condition_skipped.discard(aid)
added = plan.insert_conditional_step(agent_id=aid, predecessors=[current])
if added:
remaining_ids.add(aid)
queue.append(aid)
def _run_topology_pipeline(
self,
plan: ExecutionPlan,
last_agent: str,
a_agents: Any,
agent_ids: list[str],
step_results: dict[str, StepResult],
messages: dict[str, str],
query: str,
execution_order: list[str],
total_tokens: int,
role_graph: Any,
) -> bool:
"""
Unified sync pipeline: built-in conditional hook + user hooks → plan.
Called after each step in adaptive methods.
Combines all TopologyAction objects and applies them to the plan.
Returns:
True if the plan was modified.
"""
remaining = [s.agent_id for s in plan.remaining_steps]
# 1. Built-in hook: conditional edges
remaining_ids = {s.agent_id for s in plan.steps[plan.current_index :]}
cond_action = self._build_conditional_edge_action(
last_agent,
agent_ids,
step_results,
messages,
query,
remaining_ids,
)
# 2. User topology hooks
user_action = self._apply_topology_hooks(
last_agent,
messages.get(last_agent),
step_results.get(last_agent),
messages,
execution_order,
remaining,
query,
total_tokens,
role_graph,
)
# 3. Combine actions
combined = TopologyAction()
if cond_action is not None:
combined = self._merge_topology_actions(combined, cond_action)
if user_action is not None:
combined = self._merge_topology_actions(combined, user_action)
if not self._has_topology_action(combined):
return False
# 4. Graph modification (add_edges / remove_edges)
if combined.add_edges or combined.remove_edges:
self._apply_graph_modifications(role_graph, combined)
# 5. Plan modification
return self._apply_topology_to_plan(plan, combined, a_agents, agent_ids)
async def _arun_topology_pipeline(
self,
plan: ExecutionPlan,
last_agent: str,
a_agents: Any,
agent_ids: list[str],
step_results: dict[str, StepResult],
messages: dict[str, str],
query: str,
execution_order: list[str],
total_tokens: int,
role_graph: Any,
) -> bool:
"""
Unified async pipeline: built-in conditional hook + async user hooks → plan.
Returns:
True if the plan was modified.
"""
remaining = [s.agent_id for s in plan.remaining_steps]
# 1. Built-in hook: conditional edges (sync — fast)
remaining_ids = {s.agent_id for s in plan.steps[plan.current_index :]}
cond_action = self._build_conditional_edge_action(
last_agent,
agent_ids,
step_results,
messages,
query,
remaining_ids,
)
# 2. User async topology hooks
user_action = await self._apply_async_topology_hooks(
last_agent,
messages.get(last_agent),
step_results.get(last_agent),
messages,
execution_order,
remaining,
query,
total_tokens,
role_graph,
)
# 3. Combine actions
combined = TopologyAction()
if cond_action is not None:
combined = self._merge_topology_actions(combined, cond_action)
if user_action is not None:
combined = self._merge_topology_actions(combined, user_action)
if not self._has_topology_action(combined):
return False
# 4. Graph modification
if combined.add_edges or combined.remove_edges:
self._apply_graph_modifications(role_graph, combined)
# 5. Plan modification
return self._apply_topology_to_plan(plan, combined, a_agents, agent_ids)
def _finalize_run(
self,
run_id: uuid.UUID,
*,
success: bool,
executed_agents: int,
final_answer: str = "",
error: BaseException | None = None,
executed_agent_ids: list[str] | None = None,
) -> None:
"""Finalize metrics and notify callbacks after execution."""
if self._metrics:
self._metrics.end_time = datetime.now(tz=UTC)
self._metrics.executed_agents = executed_agents
if self._callback_manager:
self._callback_manager.on_run_end(
run_id=run_id,
output=final_answer,
success=success,
error=error,
total_tokens=self._metrics.total_tokens if self._metrics else 0,
total_time_ms=self._metrics.duration_seconds * 1000 if self._metrics else 0,
executed_agents=executed_agent_ids or [],
)
@staticmethod
def _default_token_counter(text: str) -> int:
"""Simple token estimate: 4/3 of the number of words."""
return len(text.split()) * 4 // 3
def _get_caller_for_agent(
self,
agent_id: str,
agent: Any,
) -> Callable[[str], str] | None:
"""
Get sync LLM caller for a specific agent.
Priority:
1. llm_callers[agent_id] — explicitly specified caller for the agent
2. llm_factory.get_caller(agent.llm_config) — from the factory by configuration
3. self.llm_caller — default caller
Returns:
LLM caller or None if none is available.
"""
# 1. Check explicit per-agent caller
if agent_id in self.llm_callers:
return self.llm_callers[agent_id]
# 2. Try factory if agent has LLM config
if self.llm_factory and hasattr(agent, "get_llm_config"):
llm_config = agent.get_llm_config()
if llm_config and llm_config.is_configured():
caller = self.llm_factory.get_caller(llm_config, agent_id)
if caller:
return caller
elif self.llm_factory and hasattr(agent, "llm_config") and agent.llm_config:
caller = self.llm_factory.get_caller(agent.llm_config, agent_id)
if caller:
return caller
# 3. Fallback to default caller
return self.llm_caller
def _get_async_caller_for_agent(
self,
agent_id: str,
agent: Any,
) -> Callable[[str], Awaitable[str]] | None:
"""
Get async LLM caller for a specific agent.
Priority:
1. async_llm_callers[agent_id] — explicitly specified async caller
2. llm_factory.get_async_caller(agent.llm_config) — from the factory
3. self.async_llm_caller — default async caller
Note:
May return ``None`` even when ``async_structured_llm_caller`` is
set. Callers must check ``self.async_structured_llm_caller``
separately before treating ``None`` as a fatal error — see
:meth:`_acall_llm` which dispatches to the structured path first.
"""
# 1. Check explicit per-agent async caller
if agent_id in self.async_llm_callers:
return self.async_llm_callers[agent_id]
# 2. Try factory if agent has LLM config
if self.llm_factory and hasattr(agent, "get_llm_config"):
llm_config = agent.get_llm_config()
if llm_config and llm_config.is_configured():
caller = self.llm_factory.get_async_caller(llm_config, agent_id)
if caller:
return caller
elif self.llm_factory and hasattr(agent, "llm_config") and agent.llm_config:
caller = self.llm_factory.get_async_caller(agent.llm_config, agent_id)
if caller:
return caller
# 3. Fallback to default async caller
return self.async_llm_caller
# ------------------------------------------------------------------
# Structured prompt dispatch
# ------------------------------------------------------------------
def _call_llm(self, caller: Callable, prompt: StructuredPrompt) -> str:
"""
Call the LLM using the best available interface.
If a ``structured_llm_caller`` is registered, sends
``prompt.messages`` (proper system/user roles).
Otherwise falls back to ``caller(prompt.text)`` (flat string).
"""
if self.structured_llm_caller is not None:
return self.structured_llm_caller(prompt.messages)
return caller(prompt.text)
async def _acall_llm(self, async_caller: Callable | None, prompt: StructuredPrompt) -> str:
"""Async version of :meth:`_call_llm`."""
if self.async_structured_llm_caller is not None:
return await self.async_structured_llm_caller(prompt.messages)
if async_caller is None:
msg = "No async LLM caller available"
raise ValueError(msg)
return await async_caller(prompt.text)
def _has_any_caller(self) -> bool:
"""Check whether at least one LLM caller is available."""
return bool(
self.llm_caller
or self.llm_callers
or self.structured_llm_caller
or (self.llm_factory and (self.llm_factory.default_caller or self.llm_factory.caller_builder))
)
def _has_any_async_caller(self) -> bool:
"""Check whether at least one async LLM caller is available."""
return bool(
self.async_llm_caller
or self.async_llm_callers
or self.async_structured_llm_caller
or (self.llm_factory and (self.llm_factory.default_async_caller or self.llm_factory.async_caller_builder))
)
def run_round(
self,
role_graph: Any,
final_agent_id: str | None = None,
start_agent_id: str | None = None,
*,
update_states: bool | None = None,
filter_unreachable: bool = False,
callbacks: list[Handler] | None = None,
) -> MACPResult:
"""
Run a synchronous round (simple or adaptive strategy).
Args:
role_graph: Role graph to execute.
final_agent_id: ID of final agent (overrides role_graph.end_node).
start_agent_id: ID of start agent (overrides role_graph.start_node).
update_states: Whether to update agent states.
filter_unreachable: Exclude isolated nodes from execution.
callbacks: Per-run callback handlers (merged with config.callbacks).
Returns:
MACPResult with execution results.
"""
if not self._has_any_caller():
msg = "llm_caller, llm_callers, or llm_factory is required for synchronous execution"
raise ValueError(msg)
# Get start/end from params or graph
effective_start = start_agent_id or getattr(role_graph, "start_node", None)
effective_end = final_agent_id or getattr(role_graph, "end_node", None)
if self.config.adaptive:
return self._run_adaptive(
role_graph,
effective_end,
effective_start,
update_states=update_states,
filter_unreachable=filter_unreachable,
callbacks=callbacks,
)
return self._run_simple(
role_graph,
effective_end,
effective_start,
update_states=update_states,
filter_unreachable=filter_unreachable,
callbacks=callbacks,
)
async def arun_round(
self,
role_graph: Any,
final_agent_id: str | None = None,
start_agent_id: str | None = None,
*,
update_states: bool | None = None,
filter_unreachable: bool = False,
callbacks: list[Handler] | None = None,
) -> MACPResult:
"""
Run an async round (simple or adaptive strategy).
Args:
role_graph: Role graph to execute.
final_agent_id: ID of final agent (overrides role_graph.end_node).
start_agent_id: ID of start agent (overrides role_graph.start_node).
update_states: Whether to update agent states.
filter_unreachable: Exclude isolated nodes from execution.
callbacks: Per-run callback handlers (merged with config.callbacks).
Returns:
MACPResult with execution results.
"""
if not self._has_any_async_caller():
msg = "async_llm_caller, async_llm_callers, or llm_factory is required for async execution"
raise ValueError(msg)
# Get start/end from params or graph
effective_start = start_agent_id or getattr(role_graph, "start_node", None)
effective_end = final_agent_id or getattr(role_graph, "end_node", None)
if self.config.adaptive:
return await self._arun_adaptive(
role_graph,
effective_end,
effective_start,
update_states=update_states,
filter_unreachable=filter_unreachable,
callbacks=callbacks,
)
return await self._arun_simple(
role_graph,
effective_end,
effective_start,
update_states=update_states,
filter_unreachable=filter_unreachable,
callbacks=callbacks,
)
def _run_simple( # noqa: PLR0912, PLR0915
self,
role_graph: Any,
final_agent_id: str | None,
start_agent_id: str | None,
*,
update_states: bool | None = None,
filter_unreachable: bool = True,
callbacks: list[Handler] | None = None,
) -> MACPResult:
"""
Sequential execution in topological order without adaptation.
Supports multi-model: each agent uses its own LLM caller.
Supports filtering of isolated nodes to save tokens.
"""
if not self._has_any_caller():
msg = "llm_caller, llm_callers, or llm_factory is required for synchronous execution"
raise ValueError(msg)
start_time = time.time()
base = self._prepare_base_context(role_graph)
if base is None:
return MACPResult(messages={}, final_answer="", final_agent_id="", execution_order=[])
_task_idx, a_agents, agent_ids, query, agent_lookup, agent_names = base
# Filter isolated nodes
excluded_agents: list[str] = []
effective_agent_ids = agent_ids
effective_a = a_agents
if filter_unreachable and (start_agent_id is not None or final_agent_id is not None):
relevant, excluded_agents = filter_reachable_agents(a_agents, agent_ids, start_agent_id, final_agent_id)
if relevant and len(relevant) < len(agent_ids):
indices = [agent_ids.index(aid) for aid in relevant]
indices_t = torch.tensor(indices, dtype=torch.long)
effective_a = a_agents[indices_t][:, indices_t]
effective_agent_ids = relevant
exec_order = build_execution_order(effective_a, effective_agent_ids, role_graph.role_sequence)
# Initialize memory (with effective agents after filtering)
self._init_memory(effective_agent_ids)
# Initialize callbacks
run_id = self._init_run(
graph_name=getattr(role_graph, "name", None),
num_agents=len(effective_agent_ids),
query=query,
execution_order=exec_order,
callbacks=callbacks,
)
task_connected = self._get_task_connected_agents(role_graph)
messages: dict[str, str] = {}
total_tokens = 0
actual_exec_order: list[str] = []
early_stopped = False
early_stop_reason: str | None = None
topology_modifications = 0
skipped_by_hooks: set[str] = set()
run_error: BaseException | None = None
# Get disabled nodes from graph
disabled_nodes: set[str] = getattr(role_graph, "disabled_nodes", set())
try:
for step_idx, agent_id in enumerate(exec_order):
# Check if agent was skipped by hooks
if agent_id in skipped_by_hooks:
continue
# Check if node is disabled
if agent_id in disabled_nodes:
if agent_id not in excluded_agents:
excluded_agents.append(agent_id)
continue
agent = agent_lookup.get(agent_id)
if agent is None:
continue
incoming_ids = get_incoming_agents(agent_id, effective_a, effective_agent_ids)
incoming_messages = {aid: messages[aid] for aid in incoming_ids if aid in messages}
include_query = self._should_include_query(agent_id, task_connected)
memory_context = self._get_memory_context(agent_id)
prompt = self._build_prompt(
agent, query, incoming_messages, agent_names, memory_context, include_query=include_query
)
prompt_text = prompt.text
# Notify callbacks of agent start
if self._callback_manager:
self._callback_manager.on_agent_start(
run_id=run_id,
agent_id=agent_id,
agent_name=agent_names.get(agent_id, agent_id),
step_index=step_idx,
prompt=prompt_text[: self.config.prompt_preview_length],
predecessors=incoming_ids,
)
agent_start_time = time.time()
try:
# Get caller for this specific agent (multi-model support)
caller = self._get_caller_for_agent(agent_id, agent)
if caller is None:
error_msg = f"No LLM caller available for agent {agent_id}"
messages[agent_id] = f"[Error: {error_msg}]"
actual_exec_order.append(agent_id)
if self._callback_manager:
self._callback_manager.on_agent_error(
run_id=run_id,
error=ValueError(error_msg),
agent_id=agent_id,
error_type="NoCallerError",
)
continue
# Execute LLM caller with tools support
response, agent_tokens = self._run_agent_with_tools(
caller=caller,
prompt=prompt,
agent=agent,
)
agent_duration_ms = (time.time() - agent_start_time) * 1000
messages[agent_id] = response
total_tokens += agent_tokens
self._save_to_memory(agent_id, response, incoming_ids)
actual_exec_order.append(agent_id)
# Notify callbacks of agent end
if self._callback_manager:
is_final = agent_id == final_agent_id or (final_agent_id is None and agent_id == exec_order[-1])
self._callback_manager.on_agent_end(
run_id=run_id,
agent_id=agent_id,
output=response,
agent_name=agent_names.get(agent_id, agent_id),
step_index=step_idx,
tokens_used=agent_tokens,
duration_ms=agent_duration_ms,
is_final=is_final,
)
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
messages[agent_id] = f"[Error: {e}]"
actual_exec_order.append(agent_id)
if self._callback_manager:
self._callback_manager.on_agent_error(
run_id=run_id,
error=e,
agent_id=agent_id,
error_type=type(e).__name__,
)
# Check early stopping
remaining = [a for a in exec_order if a not in messages and a not in skipped_by_hooks]
should_stop, reason = self._check_early_stop(
agent_id,
messages.get(agent_id),
messages,
actual_exec_order,
remaining,
query,
total_tokens,
)
if should_stop:
early_stopped = True
early_stop_reason = reason
break
# Apply topology hooks
if self.config.enable_dynamic_topology:
action = self._apply_topology_hooks(
agent_id,
messages.get(agent_id),
None,
messages,
actual_exec_order,
remaining,
query,
total_tokens,
role_graph,
)
if action is not None:
if action.early_stop:
early_stopped = True
early_stop_reason = action.early_stop_reason
break
if action.skip_agents:
skipped_by_hooks.update(action.skip_agents)
topology_modifications += self._apply_graph_modifications(role_graph, action)
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
run_error = e
final_id = self._determine_final_agent(final_agent_id, actual_exec_order, messages)
final_answer = messages.get(final_id, "")
# Finalize callbacks
self._finalize_run(
run_id=run_id,
success=run_error is None,
executed_agents=len(actual_exec_order),
final_answer=final_answer,
error=run_error,
executed_agent_ids=actual_exec_order,
)
if run_error:
raise run_error
do_update = update_states if update_states is not None else self.config.update_states
agent_states = self._build_agent_states(messages, agent_lookup) if do_update else None
return MACPResult(
messages=messages,
final_answer=messages.get(final_id, ""),
final_agent_id=final_id,
execution_order=actual_exec_order,
agent_states=agent_states,
total_tokens=total_tokens,
total_time=time.time() - start_time,
pruned_agents=excluded_agents if excluded_agents else None,
early_stopped=early_stopped,
early_stop_reason=early_stop_reason,
topology_modifications=topology_modifications,
)
async def _arun_simple( # noqa: PLR0912, PLR0915
self,
role_graph: Any,
final_agent_id: str | None,
start_agent_id: str | None,
*,
update_states: bool | None = None,
filter_unreachable: bool = True,
callbacks: list[Handler] | None = None,
) -> MACPResult:
"""
Async sequential execution without adaptation.
Supports multi-model: each agent uses its own LLM caller.
Supports filtering of isolated nodes to save tokens.
"""
if not self._has_any_async_caller():
msg = "async_llm_caller, async_llm_callers, or llm_factory is required for async execution"
raise ValueError(msg)
start_time = time.time()
base = self._prepare_base_context(role_graph)
if base is None:
return MACPResult(messages={}, final_answer="", final_agent_id="", execution_order=[])
_task_idx, a_agents, agent_ids, query, agent_lookup, agent_names = base
# Filter isolated nodes
excluded_agents: list[str] = []
effective_agent_ids = agent_ids
effective_a = a_agents
if filter_unreachable and (start_agent_id is not None or final_agent_id is not None):
relevant, excluded_agents = filter_reachable_agents(a_agents, agent_ids, start_agent_id, final_agent_id)
if relevant and len(relevant) < len(agent_ids):
indices = [agent_ids.index(aid) for aid in relevant]
indices_t = torch.tensor(indices, dtype=torch.long)
effective_a = a_agents[indices_t][:, indices_t]
effective_agent_ids = relevant
exec_order = build_execution_order(effective_a, effective_agent_ids, role_graph.role_sequence)
# Initialize memory (with effective agents after filtering)
self._init_memory(effective_agent_ids)
# Initialize callbacks
run_id = self._init_run(
graph_name=getattr(role_graph, "name", None),
num_agents=len(effective_agent_ids),
query=query,
execution_order=exec_order,
callbacks=callbacks,
)
task_connected = self._get_task_connected_agents(role_graph)
messages: dict[str, str] = {}
total_tokens = 0
actual_exec_order: list[str] = []
early_stopped = False
early_stop_reason: str | None = None
topology_modifications = 0
skipped_by_hooks: set[str] = set()
run_error: BaseException | None = None
# Get disabled nodes from graph
disabled_nodes: set[str] = getattr(role_graph, "disabled_nodes", set())
try:
for step_idx, agent_id in enumerate(exec_order):
# Check if agent was skipped by hooks
if agent_id in skipped_by_hooks:
continue
# Check if node is disabled
if agent_id in disabled_nodes:
if agent_id not in excluded_agents:
excluded_agents.append(agent_id)
continue
agent = agent_lookup.get(agent_id)
if agent is None:
continue
incoming_ids = get_incoming_agents(agent_id, effective_a, effective_agent_ids)
incoming_messages = {aid: messages[aid] for aid in incoming_ids if aid in messages}
include_query = self._should_include_query(agent_id, task_connected)
memory_context = self._get_memory_context(agent_id)
prompt = self._build_prompt(
agent, query, incoming_messages, agent_names, memory_context, include_query=include_query
)
# Notify callbacks of agent start
# prompt is now a StructuredPrompt
prompt_text = prompt.text
if self._callback_manager:
self._callback_manager.on_agent_start(
run_id=run_id,
agent_id=agent_id,
agent_name=agent_names.get(agent_id, agent_id),
step_index=step_idx,
prompt=prompt_text[:100],
predecessors=incoming_ids,
)
agent_start_time = time.time()
try:
# Get async caller for this specific agent (multi-model support).
# When only async_structured_llm_caller is configured,
# async_caller may be None — _acall_llm handles that.
async_caller = self._get_async_caller_for_agent(agent_id, agent)
if async_caller is None and self.async_structured_llm_caller is None:
error_msg = f"No async LLM caller available for agent {agent_id}"
messages[agent_id] = f"[Error: {error_msg}]"
actual_exec_order.append(agent_id)
if self._callback_manager:
self._callback_manager.on_agent_error(
run_id=run_id,
error=ValueError(error_msg),
agent_id=agent_id,
error_type="NoCallerError",
)
continue
response = await asyncio.wait_for(
self._acall_llm(async_caller, prompt),
timeout=self.config.timeout,
)
agent_tokens = self.token_counter(prompt_text) + self.token_counter(response)
agent_duration_ms = (time.time() - agent_start_time) * 1000
messages[agent_id] = response
total_tokens += agent_tokens
self._save_to_memory(agent_id, response, incoming_ids)
actual_exec_order.append(agent_id)
# Notify callbacks of agent end
if self._callback_manager:
is_final = agent_id == final_agent_id or (final_agent_id is None and agent_id == exec_order[-1])
self._callback_manager.on_agent_end(
run_id=run_id,
agent_id=agent_id,
output=response,
agent_name=agent_names.get(agent_id, agent_id),
step_index=step_idx,
tokens_used=agent_tokens,
duration_ms=agent_duration_ms,
is_final=is_final,
)
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
messages[agent_id] = f"[Error: {e}]"
actual_exec_order.append(agent_id)
if self._callback_manager:
self._callback_manager.on_agent_error(
run_id=run_id,
error=e,
agent_id=agent_id,
error_type=type(e).__name__,
)
# Check early stopping
remaining = [a for a in exec_order if a not in messages and a not in skipped_by_hooks]
should_stop, reason = self._check_early_stop(
agent_id,
messages.get(agent_id),
messages,
actual_exec_order,
remaining,
query,
total_tokens,
)
if should_stop:
early_stopped = True
early_stop_reason = reason
break
# Apply async topology hooks
if self.config.enable_dynamic_topology:
action = await self._apply_async_topology_hooks(
agent_id,
messages.get(agent_id),
None,
messages,
actual_exec_order,
remaining,
query,
total_tokens,
role_graph,
)
if action is not None:
if action.early_stop:
early_stopped = True
early_stop_reason = action.early_stop_reason
break
if action.skip_agents:
skipped_by_hooks.update(action.skip_agents)
topology_modifications += self._apply_graph_modifications(role_graph, action)
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
run_error = e
final_id = self._determine_final_agent(final_agent_id, actual_exec_order, messages)
final_answer = messages.get(final_id, "")
# Finalize callbacks
self._finalize_run(
run_id=run_id,
success=run_error is None,
executed_agents=len(actual_exec_order),
final_answer=final_answer,
error=run_error,
executed_agent_ids=actual_exec_order,
)
if run_error:
raise run_error
do_update = update_states if update_states is not None else self.config.update_states
agent_states = self._build_agent_states(messages, agent_lookup) if do_update else None
return MACPResult(
messages=messages,
final_answer=final_answer,
final_agent_id=final_id,
execution_order=actual_exec_order,
agent_states=agent_states,
total_tokens=total_tokens,
total_time=time.time() - start_time,
pruned_agents=excluded_agents if excluded_agents else None,
early_stopped=early_stopped,
early_stop_reason=early_stop_reason,
topology_modifications=topology_modifications,
)
def _run_adaptive( # noqa: PLR0912, PLR0915
self,
role_graph: Any,
final_agent_id: str | None,
start_agent_id: str | None,
*,
update_states: bool | None = None,
filter_unreachable: bool = True,
callbacks: list[Handler] | None = None,
) -> MACPResult:
"""
Adaptive sync execution with conditional edges and fallback.
Supports multi-model: each agent uses its own LLM caller.
Supports filtering of isolated nodes to save tokens.
Per-call ``callbacks`` are merged with ``RunnerConfig.callbacks``
and context-manager callbacks (similar to ``_run_simple``).
"""
if not self._has_any_caller():
msg = "llm_caller, llm_callers, or llm_factory is required for synchronous execution"
raise ValueError(msg)
if self._scheduler is None:
msg = "Scheduler not initialized for adaptive mode"
raise ValueError(msg)
start_time = time.time()
base = self._prepare_base_context(role_graph)
if base is None:
return MACPResult(messages={}, final_answer="", final_agent_id="", execution_order=[])
task_idx, a_agents, agent_ids, query, agent_lookup, agent_names = base
# Initialize memory
self._init_memory(agent_ids)
p_matrix = self._extract_p_matrix(role_graph, task_idx)
# Get conditions from graph for conditional routing
edge_conditions = self._get_edge_conditions(role_graph)
# Initial context for conditions
condition_ctx = ConditionContext(
source_agent="",
target_agent="",
messages={},
step_results={},
query=query,
)
plan = self._scheduler.build_plan(
a_agents,
agent_ids,
p_matrix,
start_agent=start_agent_id,
end_agent=final_agent_id,
edge_conditions=edge_conditions,
condition_context=condition_ctx,
filter_unreachable=filter_unreachable,
)
# Initialize callbacks (per-call + config + context-manager)
run_id = self._init_run(
graph_name=getattr(role_graph, "name", None),
num_agents=len(agent_ids),
query=query,
execution_order=plan.execution_order,
callbacks=callbacks,
)
messages: dict[str, str] = {}
step_results: dict[str, StepResult] = {}
execution_order: list[str] = []
fallback_attempts: dict[str, int] = {}
topology_changed_count = 0
fallback_count = 0
pruned_agents: list[str] = []
errors: list[ExecutionError] = []
step_idx = 0
run_error: BaseException | None = None
try:
while not plan.is_complete:
step = plan.get_current_step()
if step is None:
break
# Skip agents whose conditions were not met
if step.agent_id in plan.condition_skipped:
plan.advance()
continue
should_prune, reason = self._scheduler.should_prune(
step, plan, step_results.get(execution_order[-1]) if execution_order else None
)
if should_prune:
plan.mark_skipped(step.agent_id)
pruned_agents.append(step.agent_id)
errors.append(
ExecutionError(
message=f"Pruned: {reason}",
agent_id=step.agent_id,
recoverable=False,
)
)
continue
# Notify callbacks of agent start
agent_name = agent_names.get(step.agent_id, step.agent_id)
if self._callback_manager:
_agent_obj = agent_lookup.get(step.agent_id)
_prompt_preview_text = ""
if _agent_obj is not None:
_inc = {p: messages[p] for p in step.predecessors if p in messages}
_mem = self._get_memory_context(step.agent_id)
_sp = self._build_prompt(_agent_obj, query, _inc, agent_names, _mem)
_prompt_preview_text = _sp.text
self._callback_manager.on_agent_start(
run_id=run_id,
agent_id=step.agent_id,
agent_name=agent_name,
step_index=step_idx,
prompt=_prompt_preview_text[: self.config.prompt_preview_length],
predecessors=step.predecessors,
)
agent_start_time = time.time()
result = self._execute_step(step, messages, agent_lookup, agent_names, query)
step_results[step.agent_id] = result
execution_order.append(step.agent_id)
if result.success:
messages[step.agent_id] = result.response or ""
plan.mark_completed(step.agent_id, result.tokens_used)
self._save_to_memory(step.agent_id, result.response or "", step.predecessors)
# Notify callbacks of agent end
if self._callback_manager:
self._callback_manager.on_agent_end(
run_id=run_id,
agent_id=step.agent_id,
output=result.response or "",
agent_name=agent_name,
step_index=step_idx,
tokens_used=result.tokens_used,
duration_ms=(time.time() - agent_start_time) * 1000,
is_final=False,
)
else:
plan.mark_failed(step.agent_id)
errors.append(
ExecutionError(
message=result.error or "Unknown error",
agent_id=step.agent_id,
recoverable=True,
)
)
# Notify callbacks of agent error
if self._callback_manager:
self._callback_manager.on_agent_error(
run_id=run_id,
error=Exception(result.error or "Unknown error"),
agent_id=step.agent_id,
error_type="ExecutionError",
)
attempts = fallback_attempts.get(step.agent_id, 0)
if self._scheduler.should_use_fallback(step, result, attempts):
for fb_agent in step.fallback_agents:
if fb_agent not in plan.completed and fb_agent not in plan.failed:
plan.insert_fallback(fb_agent, plan.current_index - 1)
fallback_count += 1
break
fallback_attempts[step.agent_id] = attempts + 1
# Topology pipeline: conditional edges + user hooks → plan
if self._run_topology_pipeline(
plan,
step.agent_id,
a_agents,
agent_ids,
step_results,
messages,
query,
execution_order,
plan.tokens_used,
role_graph,
):
topology_changed_count += 1
step_idx += 1
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
run_error = e
final_id = self._determine_final_agent(final_agent_id, execution_order, messages)
final_answer = messages.get(final_id, "")
# Finalize callbacks
self._finalize_run(
run_id=run_id,
success=run_error is None,
executed_agents=len(execution_order),
final_answer=final_answer,
error=run_error,
executed_agent_ids=execution_order,
)
if run_error:
raise run_error
do_update = update_states if update_states is not None else self.config.update_states
agent_states = self._build_agent_states(messages, agent_lookup) if do_update else None
return MACPResult(
messages=messages,
final_answer=final_answer,
final_agent_id=final_id,
execution_order=execution_order,
agent_states=agent_states,
step_results=step_results,
total_tokens=plan.tokens_used,
total_time=time.time() - start_time,
topology_changed_count=topology_changed_count,
fallback_count=fallback_count,
pruned_agents=pruned_agents,
errors=errors if errors else None,
)
async def _arun_adaptive( # noqa: PLR0912, PLR0915
self,
role_graph: Any,
final_agent_id: str | None,
start_agent_id: str | None,
*,
update_states: bool | None = None,
filter_unreachable: bool = True,
callbacks: list[Handler] | None = None,
) -> MACPResult:
"""
Adaptive async execution with parallelism and conditional edges.
Supports multi-model: each agent uses its own LLM caller.
Supports filtering of isolated nodes to save tokens.
Per-call ``callbacks`` are merged with ``RunnerConfig.callbacks``
and context-manager callbacks (similar to ``_arun_simple``).
"""
if not self._has_any_async_caller():
msg = "async_llm_caller, async_llm_callers, or llm_factory is required for async execution"
raise ValueError(msg)
if self._scheduler is None:
msg = "Scheduler not initialized for adaptive mode"
raise ValueError(msg)
start_time = time.time()
base = self._prepare_base_context(role_graph)
if base is None:
return MACPResult(messages={}, final_answer="", final_agent_id="", execution_order=[])
task_idx, a_agents, agent_ids, query, agent_lookup, agent_names = base
# Initialize memory
self._init_memory(agent_ids)
p_matrix = self._extract_p_matrix(role_graph, task_idx)
# Get conditions from graph for conditional routing
edge_conditions = self._get_edge_conditions(role_graph)
# Context for conditions
condition_ctx = ConditionContext(
source_agent="",
target_agent="",
messages={},
step_results={},
query=query,
)
plan = self._scheduler.build_plan(
a_agents,
agent_ids,
p_matrix,
start_agent=start_agent_id,
end_agent=final_agent_id,
edge_conditions=edge_conditions,
condition_context=condition_ctx,
filter_unreachable=filter_unreachable,
)
# Initialize callbacks (per-call + config + context-manager)
run_id = self._init_run(
graph_name=getattr(role_graph, "name", None),
num_agents=len(agent_ids),
query=query,
execution_order=plan.execution_order,
callbacks=callbacks,
)
messages: dict[str, str] = {}
step_results: dict[str, StepResult] = {}
execution_order: list[str] = []
fallback_attempts: dict[str, int] = {}
topology_changed_count = 0
fallback_count = 0
pruned_agents: list[str] = []
errors: list[ExecutionError] = []
step_idx = 0
run_error: BaseException | None = None
try:
while not plan.is_complete:
parallel_group = self._get_parallel_group(plan, messages.keys())
if not parallel_group:
break
valid_steps = []
for step in parallel_group:
# Skip agents whose conditions were not met
if step.agent_id in plan.condition_skipped:
plan.advance()
continue
should_prune, reason = self._scheduler.should_prune(step, plan, None)
if should_prune:
plan.mark_skipped(step.agent_id)
pruned_agents.append(step.agent_id)
errors.append(
ExecutionError(
message=f"Pruned: {reason}",
agent_id=step.agent_id,
recoverable=False,
)
)
else:
valid_steps.append(step)
if not valid_steps:
# All steps in the group were skipped or pruned — plan may stall.
# If plan.is_complete is already True — we'll exit on the next iteration;
# otherwise explicitly break to avoid an infinite loop.
break
# Notify callbacks of agent start for all steps in group
group_start_times: dict[str, float] = {}
for step in valid_steps:
agent_name = agent_names.get(step.agent_id, step.agent_id)
if self._callback_manager:
_agent_obj = agent_lookup.get(step.agent_id)
_prompt_preview_text = ""
if _agent_obj is not None:
_inc = {p: messages[p] for p in step.predecessors if p in messages}
_mem = self._get_memory_context(step.agent_id)
_sp = self._build_prompt(_agent_obj, query, _inc, agent_names, _mem)
_prompt_preview_text = _sp.text
self._callback_manager.on_agent_start(
run_id=run_id,
agent_id=step.agent_id,
agent_name=agent_name,
step_index=step_idx,
prompt=_prompt_preview_text[: self.config.prompt_preview_length],
predecessors=step.predecessors,
)
group_start_times[step.agent_id] = time.time()
step_idx += 1
if self.config.enable_parallel and len(valid_steps) > 1:
results = await self._execute_parallel(valid_steps, messages, agent_lookup, agent_names, query)
else:
results = []
for step in valid_steps:
r = await self._execute_step_async(step, messages, agent_lookup, agent_names, query)
results.append((step, r))
for step, result in results:
step_results[step.agent_id] = result
execution_order.append(step.agent_id)
agent_name = agent_names.get(step.agent_id, step.agent_id)
agent_start_time = group_start_times.get(step.agent_id, time.time())
if result.success:
messages[step.agent_id] = result.response or ""
plan.mark_completed(step.agent_id, result.tokens_used)
self._save_to_memory(step.agent_id, result.response or "", step.predecessors)
# Notify callbacks of agent end
if self._callback_manager:
self._callback_manager.on_agent_end(
run_id=run_id,
agent_id=step.agent_id,
output=result.response or "",
agent_name=agent_name,
step_index=execution_order.index(step.agent_id),
tokens_used=result.tokens_used,
duration_ms=(time.time() - agent_start_time) * 1000,
is_final=False,
)
else:
plan.mark_failed(step.agent_id)
errors.append(
ExecutionError(
message=result.error or "Unknown error",
agent_id=step.agent_id,
recoverable=True,
)
)
# Notify callbacks of agent error
if self._callback_manager:
self._callback_manager.on_agent_error(
run_id=run_id,
error=Exception(result.error or "Unknown error"),
agent_id=step.agent_id,
error_type="ExecutionError",
)
attempts = fallback_attempts.get(step.agent_id, 0)
if self._scheduler.should_use_fallback(step, result, attempts):
for fb_agent in step.fallback_agents:
if fb_agent not in plan.completed and fb_agent not in plan.failed:
plan.insert_fallback(fb_agent, plan.current_index - 1)
fallback_count += 1
break
fallback_attempts[step.agent_id] = attempts + 1
# Topology pipeline for each executed agent in the group
for step, _result in results:
if await self._arun_topology_pipeline(
plan,
step.agent_id,
a_agents,
agent_ids,
step_results,
messages,
query,
execution_order,
plan.tokens_used,
role_graph,
):
topology_changed_count += 1
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
run_error = e
final_id = self._determine_final_agent(final_agent_id, execution_order, messages)
final_answer = messages.get(final_id, "")
# Finalize callbacks
self._finalize_run(
run_id=run_id,
success=run_error is None,
executed_agents=len(execution_order),
final_answer=final_answer,
error=run_error,
executed_agent_ids=execution_order,
)
if run_error:
raise run_error
do_update = update_states if update_states is not None else self.config.update_states
agent_states = self._build_agent_states(messages, agent_lookup) if do_update else None
return MACPResult(
messages=messages,
final_answer=final_answer,
final_agent_id=final_id,
execution_order=execution_order,
agent_states=agent_states,
step_results=step_results,
total_tokens=plan.tokens_used,
total_time=time.time() - start_time,
topology_changed_count=topology_changed_count,
fallback_count=fallback_count,
pruned_agents=pruned_agents,
errors=errors if errors else None,
)
def _get_task_index(self, role_graph: Any) -> int:
"""Get the rustworkx index of the task node or raise an error."""
if role_graph.task_node is None:
msg = "RoleGraph has no task_node set"
raise ValueError(msg)
task_idx = role_graph.get_node_index(role_graph.task_node)
if task_idx is None:
msg = f"Task node '{role_graph.task_node}' not found"
raise ValueError(msg)
return task_idx
def _get_agent_ids(
self,
role_graph: Any,
task_idx: int,
) -> tuple[list[str], dict[str, int]]:
"""Return the list of agent_ids (excluding task) and the id->adjacency index map."""
agent_ids = []
id_to_idx = {}
adj_idx = 0
for agent in role_graph.agents:
graph_idx = role_graph.get_node_index(agent.agent_id)
if graph_idx == task_idx:
continue
agent_ids.append(agent.agent_id)
id_to_idx[agent.agent_id] = adj_idx
adj_idx += 1
return agent_ids, id_to_idx
def _extract_p_matrix(self, role_graph: Any, task_idx: int) -> torch.Tensor | None:
"""Return the probability matrix without the task row/column."""
if role_graph.p_matrix is None:
return None
n_nodes = role_graph.p_matrix.shape[0]
mask = torch.ones(n_nodes, dtype=torch.bool)
mask[task_idx] = False
return role_graph.p_matrix[mask][:, mask]
def _get_edge_conditions(self, role_graph: Any) -> dict[tuple[str, str], Any]:
"""Get all edge conditions from the graph."""
if hasattr(role_graph, "get_all_edge_conditions"):
return role_graph.get_all_edge_conditions()
# Fallback: check individual attributes
conditions: dict[tuple[str, str], Any] = {}
if hasattr(role_graph, "edge_condition_names"):
conditions.update(role_graph.edge_condition_names)
if hasattr(role_graph, "edge_conditions"):
conditions.update(role_graph.edge_conditions)
return conditions
def _build_agent_names(self, role_graph: Any) -> dict[str, str]:
"""Map id -> display_name/role for building the prompt."""
return {a.agent_id: a.display_name or getattr(a, "role", a.agent_id) for a in role_graph.agents}
def _get_task_connected_agents(self, role_graph: Any) -> set[str]:
"""Get the set of agents directly connected to the task node."""
if role_graph.task_node is None:
return set()
task_idx = role_graph.get_node_index(role_graph.task_node)
if task_idx is None or role_graph.A_com is None:
return set()
connected = set()
for agent in role_graph.agents:
agent_idx = role_graph.get_node_index(agent.agent_id)
if agent_idx is not None and agent_idx != task_idx and role_graph.A_com[task_idx, agent_idx] > 0:
connected.add(agent.agent_id)
return connected
def _should_include_query(self, agent_id: str, task_connected: set[str]) -> bool:
"""Determine whether to include the query in the agent's prompt."""
if self.config.broadcast_task_to_all:
return True
return agent_id in task_connected
def _run_agent_with_tools( # noqa: PLR0912, PLR0915
self,
caller: Any,
prompt: "str | StructuredPrompt",
agent: Any,
) -> tuple[str, int]:
"""
Execute an agent with automatic tools support.
If the agent has tools, they are ALWAYS used via native function calling.
Tools are obtained from:
1. BaseTool objects directly in agent.tools
2. Tool names registered in the global registry or config.tool_registry
``prompt`` may be a plain ``str`` (legacy) or a
``StructuredPrompt`` (modern). For plain LLM calls the method
dispatches via :meth:`_call_llm` which picks the structured
caller when available.
When a ``structured_llm_caller`` is registered the tool-calling
loop also uses structured messages (system/user/tool roles) so
the LLM receives proper role separation throughout the entire
tool-calling conversation.
Args:
caller: LLM caller (must support the tools parameter)
prompt: Agent prompt (str or StructuredPrompt)
agent: Agent profile (AgentProfile with tools)
Returns:
tuple[str, int]: (response, number of tokens)
"""
import inspect
import logging
logger = logging.getLogger(__name__)
# Normalise prompt — always have both flat text and structured form
prompt_text = prompt.text if isinstance(prompt, StructuredPrompt) else prompt
# Check if the agent has tools
if not TOOLS_AVAILABLE:
response = self._call_llm(caller, prompt) if isinstance(prompt, StructuredPrompt) else caller(prompt)
return response, self.token_counter(prompt_text) + self.token_counter(response)
# Get agent tools (method from AgentProfile)
agent_tools = []
if hasattr(agent, "get_tool_objects"):
agent_tools = agent.get_tool_objects()
elif hasattr(agent, "tools") and agent.tools:
# Fallback for old format (names only)
from tools import get_registry
registry = self.config.tool_registry or get_registry()
tool_names = agent.tools
agent_tools = registry.get_tools(tool_names)
if not agent_tools:
# Agent has no tools — plain call (use structured dispatch)
response = self._call_llm(caller, prompt) if isinstance(prompt, StructuredPrompt) else caller(prompt)
return response, self.token_counter(prompt_text) + self.token_counter(response)
# Check that caller supports tools
sig = inspect.signature(caller)
supports_tools = "tools" in sig.parameters
if not supports_tools:
# Caller does not support tools — plain call
response = self._call_llm(caller, prompt) if isinstance(prompt, StructuredPrompt) else caller(prompt)
return response, self.token_counter(prompt_text) + self.token_counter(response)
# Get schemas for tools
tool_schemas = [t.to_openai_schema() for t in agent_tools]
if not tool_schemas:
response = self._call_llm(caller, prompt) if isinstance(prompt, StructuredPrompt) else caller(prompt)
return response, self.token_counter(prompt_text) + self.token_counter(response)
# Get registry for executing tools
from tools import ToolCall, get_registry
registry = self.config.tool_registry or get_registry()
# Register all agent tools in the registry (for BaseTool objects)
for t in agent_tools:
if not registry.has(t.name):
registry.register(t)
# Execute tool calling loop
total_tokens = 0
# Build the conversation for the tool-calling loop.
# When structured messages are available we keep a proper
# multi-turn conversation; otherwise we accumulate flat text.
if isinstance(prompt, StructuredPrompt):
tool_messages: list[dict[str, str]] = list(prompt.messages)
else:
tool_messages = [{"role": "user", "content": prompt}]
current_prompt = prompt_text # flat fallback
# Cache for tool calls: (tool_name, args_json) -> result
# Prevents repeated execution of identical calls
tool_cache: dict[str, str] = {}
logger.debug("Agent has tools: %s", [s["function"]["name"] for s in tool_schemas])
llm_response: Any = None
use_structured_tools = isinstance(prompt, StructuredPrompt) and self.structured_llm_caller is not None
for iteration in range(self.config.max_tool_iterations):
# Call LLM with tools
logger.debug("Tool calling iteration %d", iteration + 1)
# Prefer structured messages when a structured caller is available;
# this gives the LLM proper system/user/assistant/tool role separation
# throughout the entire tool-calling conversation.
if use_structured_tools:
llm_response = caller(tool_messages, tools=tool_schemas)
else:
llm_response = caller(current_prompt, tools=tool_schemas)
if isinstance(llm_response, str):
# Caller returned a string, not an LLMResponse
return llm_response, self.token_counter(current_prompt) + self.token_counter(llm_response)
# Token counting: use the actual prompt content sent to the LLM
if use_structured_tools:
prompt_tokens = sum(self.token_counter(m.get("content", "")) for m in tool_messages)
else:
prompt_tokens = self.token_counter(current_prompt)
total_tokens += prompt_tokens
if llm_response.content:
total_tokens += self.token_counter(llm_response.content)
# If there are no tool_calls — return the response
if not llm_response.has_tool_calls:
content = llm_response.content or ""
if content:
logger.debug("No tool calls, returning content: %s...", content[:50])
return content, total_tokens
# Execute tool_calls with caching
tool_results: list[str] = []
for tc in llm_response.tool_calls:
# Create a cache key from the name and arguments
import json as json_module
cache_key = f"{tc.name}:{json_module.dumps(tc.arguments, sort_keys=True)}"
if cache_key in tool_cache:
# Already called with these arguments — use cache
output = tool_cache[cache_key]
logger.debug("Tool cache hit: %s(%s) -> %s...", tc.name, tc.arguments, output[:50])
else:
# New call — execute and cache
logger.debug("Executing tool: %s(%s)", tc.name, tc.arguments)
tool_call = ToolCall(name=tc.name, arguments=tc.arguments)
result = registry.execute(tool_call)
output = result.output if result.success else f"Error: {result.error}"
tool_cache[cache_key] = output
logger.debug("Tool result: %s...", output[:100])
tool_results.append(f"[{tc.name}]: {output}")
# Add results to the conversation for the next iteration.
# Structured path: append assistant + tool messages to the
# conversation so the LLM sees the full multi-turn history.
tool_results_text = "\n".join(tool_results)
if isinstance(prompt, StructuredPrompt):
# Append the assistant's tool-call reply
if llm_response.content:
tool_messages.append({"role": "assistant", "content": llm_response.content})
# Append tool results as a user message (compatible with
# all providers; some accept role="tool" but "user" is universal)
tool_messages.append({"role": "user", "content": f"Tool results:\n{tool_results_text}"})
# Flat text fallback (used by legacy callers)
current_prompt = f"{prompt_text}\n\nTool results:\n{tool_results_text}"
# Reached max_iterations, return the last content
return llm_response.content if llm_response else "", total_tokens
def _build_system_prompt_parts(self, agent: Any) -> list[str]:
"""Build system prompt parts: persona, description, tools, output_schema."""
import json as _json
parts: list[str] = []
if hasattr(agent, "persona") and agent.persona:
parts.append(f"You are {agent.persona}.")
elif hasattr(agent, "role") and agent.role:
parts.append(f"You are a {agent.role}.")
if hasattr(agent, "description") and agent.description:
parts.append(agent.description.strip())
# ── tools → brief mention so the agent is aware of its capabilities
tool_names: list[str] = []
if hasattr(agent, "get_tool_names"):
tool_names = agent.get_tool_names()
elif hasattr(agent, "tools") and agent.tools:
tool_names = [t if isinstance(t, str) else getattr(t, "name", str(t)) for t in agent.tools]
if tool_names:
parts.append(f"Available tools: {', '.join(tool_names)}.")
# ── output_schema → compact format instruction ───────────────
output_schema_json = self._extract_schema_json(agent, "output_schema")
if output_schema_json:
schema_text = _json.dumps(output_schema_json, ensure_ascii=False, separators=(",", ":"))
parts.append(f"Respond with JSON matching: {schema_text}")
return parts
def _build_user_prompt_parts(
self,
agent: Any,
query: str,
incoming_messages: dict[str, str],
agent_names: dict[str, str],
memory_context: list[dict[str, Any]] | None,
*,
include_query: bool,
) -> list[str]:
"""Build user prompt parts: query, input_schema, memory, incoming messages."""
import json as _json
user_parts: list[str] = []
# Task query is added only if include_query=True
if include_query and query:
user_parts.append(f"Task: {query}")
# ── input_schema hint (compact) ──────────────────────────────
input_schema_json = self._extract_schema_json(agent, "input_schema")
if input_schema_json:
schema_text = _json.dumps(input_schema_json, ensure_ascii=False, separators=(",", ":"))
user_parts.append(f"\nInput format: {schema_text}")
# Include memory context from SharedMemoryPool
if memory_context:
user_parts.append("\nPrevious context:")
for msg in memory_context:
role = msg.get("role", "unknown")
content = msg.get("content", "")
user_parts.append(f"[{role}]: {content}")
if incoming_messages:
user_parts.append("\nMessages from other agents:")
for sender_id, message in incoming_messages.items():
sender_name = agent_names.get(sender_id, sender_id)
user_parts.append(f"\n[{sender_name}]:\n{message}")
user_parts.append("\nProvide your response:")
return user_parts
def _build_state_text_parts(self, agent: Any) -> list[str]:
"""Build state text parts for flat string representation."""
agent_state: list[dict[str, Any]] = []
if hasattr(agent, "state"):
agent_state = list(agent.state) if agent.state else []
state_text_parts: list[str] = []
if agent_state:
state_text_parts.append("\nConversation history:")
for entry in agent_state:
entry_role = entry.get("role", "unknown")
entry_content = entry.get("content", "")
if entry_content:
state_text_parts.append(f"[{entry_role}]: {entry_content}")
return state_text_parts
def _build_structured_messages(
self,
system_prompt: str,
agent_state: list[dict[str, Any]],
user_content: str,
flat_user: str,
use_structured_state: bool,
) -> list[dict[str, str]]:
"""Build structured messages list for modern chat LLMs."""
messages: list[dict[str, str]] = [
{"role": "system", "content": system_prompt},
]
if use_structured_state:
# Replay agent.state as proper assistant/user turns so the
# LLM sees real conversation history with correct roles.
for entry in agent_state:
entry_role = entry.get("role", "user")
entry_content = entry.get("content", "")
if entry_content:
# Map roles: "agent"/"assistant" → "assistant", everything else → "user"
msg_role = "assistant" if entry_role in ("agent", "assistant") else "user"
messages.append({"role": msg_role, "content": entry_content})
# user_content does NOT contain state (it's in separate messages above)
messages.append({"role": "user", "content": user_content})
else:
# Legacy path or no state: state is inlined in user_content
messages.append({"role": "user", "content": flat_user})
return messages
def _build_prompt(
self,
agent: Any,
query: str,
incoming_messages: dict[str, str],
agent_names: dict[str, str],
memory_context: list[dict[str, Any]] | None = None,
*,
include_query: bool = True,
) -> StructuredPrompt:
"""
Build the agent prompt with persona/description, state, schemas, memory, and messages.
Returns a ``StructuredPrompt`` that carries **both** representations:
* ``prompt.text`` — legacy flat string (backward-compatible)
* ``prompt.messages`` — ``[{"role": "system", ...}, {"role": "user", ...}]``
The system message includes:
- persona / role identity
- description
- output_schema instructions (expected response format)
The user message includes:
- task query
- agent conversation state (previous turns)
- memory context from SharedMemoryPool
- incoming messages from other agents
- input_schema hint (expected input structure)
When a ``structured_llm_caller`` is registered the runner sends
``prompt.messages`` directly to the LLM, giving it proper
system/user role separation. Otherwise ``prompt.text`` is used
with the legacy ``llm_caller(str) -> str`` interface.
Args:
agent: Agent object with description/persona
query: User query string
incoming_messages: Messages from other agents
agent_names: Mapping of agent IDs to names
memory_context: Optional list of memory entries
include_query: Whether to include the task query in the prompt.
Controlled via config.broadcast_task_to_all.
"""
# Build system prompt
system_parts = self._build_system_prompt_parts(agent)
system_prompt = "\n\n".join(system_parts) if system_parts else "You are a helpful assistant."
# Build user prompt parts
user_parts = self._build_user_prompt_parts(
agent, query, incoming_messages, agent_names, memory_context, include_query=include_query
)
user_content = "".join(user_parts)
# Build state text parts
agent_state: list[dict[str, Any]] = []
if hasattr(agent, "state"):
agent_state = list(agent.state) if agent.state else []
use_structured_state = bool(agent_state) and self.structured_llm_caller is not None
state_text_parts = self._build_state_text_parts(agent)
# Build flat string (legacy)
flat_user = "".join(state_text_parts) + user_content if state_text_parts else user_content
flat = f"{system_prompt}\n\n{flat_user}"
# Build structured messages (modern)
messages = self._build_structured_messages(
system_prompt, agent_state, user_content, flat_user, use_structured_state
)
return StructuredPrompt(text=flat, messages=messages)
@staticmethod
def _extract_schema_json(agent: Any, attr: str) -> dict[str, Any] | None:
"""
Extract a JSON Schema dict from an agent's schema attribute.
Supports:
- ``dict`` — returned as-is (already a JSON Schema)
- Pydantic ``BaseModel`` subclass — converted via ``model_json_schema()``
- ``None`` / missing — returns ``None``
"""
schema = getattr(agent, attr, None)
if schema is None:
return None
if isinstance(schema, dict):
return schema
# Pydantic model class
try:
from pydantic import BaseModel
if isinstance(schema, type) and issubclass(schema, BaseModel):
return schema.model_json_schema()
except Exception as exc: # noqa: BLE001
# Pydantic may not be available or schema may not be a BaseModel
# This is expected in some cases, so we silently continue
_ = exc # Suppress unused variable warning
return None
def _execute_step(
self,
step: Any,
messages: dict[str, str],
agent_lookup: dict[str, Any],
agent_names: dict[str, str],
query: str,
) -> StepResult:
"""
Execute a step synchronously with retries and token counting.
Supports multi-model: uses the caller for the specific agent.
"""
agent = agent_lookup.get(step.agent_id)
if agent is None:
return StepResult(
agent_id=step.agent_id,
success=False,
error=f"Agent '{step.agent_id}' not found",
)
# Get caller for this specific agent (multi-model support)
caller = self._get_caller_for_agent(step.agent_id, agent)
if caller is None:
return StepResult(
agent_id=step.agent_id,
success=False,
error=f"No LLM caller available for agent '{step.agent_id}'",
)
incoming = {p: messages[p] for p in step.predecessors if p in messages}
memory_context = self._get_memory_context(step.agent_id)
prompt = self._build_prompt(agent, query, incoming, agent_names, memory_context)
last_error = None
delay = self.config.retry_delay
for attempt in range(self.config.max_retries + 1):
try:
# Execute with tools support
response, tokens = self._run_agent_with_tools(
caller=caller,
prompt=prompt,
agent=agent,
)
quality = 1.0
if self._scheduler and self._scheduler.pruning.quality_scorer:
quality = self._scheduler.pruning.quality_scorer(response)
return StepResult(
agent_id=step.agent_id,
success=True,
response=response,
tokens_used=tokens,
quality_score=quality,
)
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
last_error = str(e)
if attempt < self.config.max_retries:
time.sleep(delay)
delay *= self.config.retry_backoff
return StepResult(
agent_id=step.agent_id,
success=False,
error=last_error,
)
async def _execute_step_async(
self,
step: Any,
messages: dict[str, str],
agent_lookup: dict[str, Any],
agent_names: dict[str, str],
query: str,
) -> StepResult:
"""
Execute a step asynchronously with retries and timeout.
Supports multi-model: uses the async caller for the specific agent.
"""
agent = agent_lookup.get(step.agent_id)
if agent is None:
return StepResult(
agent_id=step.agent_id,
success=False,
error=f"Agent '{step.agent_id}' not found",
)
# Get async caller for this specific agent (multi-model support).
# _get_async_caller_for_agent may return None when only
# async_structured_llm_caller is configured — _acall_llm handles
# that case internally, so we only error when *neither* is available.
async_caller = self._get_async_caller_for_agent(step.agent_id, agent)
if async_caller is None and self.async_structured_llm_caller is None:
return StepResult(
agent_id=step.agent_id,
success=False,
error=f"No async LLM caller available for agent '{step.agent_id}'",
)
incoming = {p: messages[p] for p in step.predecessors if p in messages}
memory_context = self._get_memory_context(step.agent_id)
prompt = self._build_prompt(agent, query, incoming, agent_names, memory_context)
last_error = None
delay = self.config.retry_delay
for attempt in range(self.config.max_retries + 1):
try:
response = await asyncio.wait_for(
self._acall_llm(async_caller, prompt),
timeout=self.config.timeout,
)
tokens = self.token_counter(prompt.text) + self.token_counter(response)
quality = 1.0
if self._scheduler and self._scheduler.pruning.quality_scorer:
quality = self._scheduler.pruning.quality_scorer(response)
return StepResult(
agent_id=step.agent_id,
success=True,
response=response,
tokens_used=tokens,
quality_score=quality,
)
except TimeoutError:
last_error = f"Timeout after {self.config.timeout}s"
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
last_error = str(e)
if attempt < self.config.max_retries:
await asyncio.sleep(delay)
delay *= self.config.retry_backoff
return StepResult(
agent_id=step.agent_id,
success=False,
error=last_error,
)
async def _execute_parallel(
self,
steps: list[Any],
messages: dict[str, str],
agent_lookup: dict[str, Any],
agent_names: dict[str, str],
query: str,
) -> list[tuple[Any, StepResult]]:
"""Execute a group of steps in parallel asynchronously."""
tasks = [self._execute_step_async(step, messages, agent_lookup, agent_names, query) for step in steps]
results = await asyncio.gather(*tasks, return_exceptions=True)
output = []
for step, result in zip(steps, results, strict=False):
if isinstance(result, Exception):
sr = StepResult(
agent_id=step.agent_id,
success=False,
error=str(result),
)
else:
sr = result
output.append((step, sr))
return output
def _get_parallel_group(
self,
plan: ExecutionPlan,
completed_agents: Any,
) -> list[Any]:
"""Return a group of steps ready for parallel execution."""
completed = set(completed_agents)
group: list[Any] = []
for step in plan.remaining_steps:
if (
step.agent_id in plan.completed
or step.agent_id in plan.skipped
or step.agent_id in plan.condition_skipped
):
continue
predecessors_done = all(
p in completed or p in plan.skipped or p in plan.condition_skipped for p in step.predecessors
)
if predecessors_done:
group.append(step)
if len(group) >= self.config.max_parallel_size:
break
return group
def _determine_final_agent(
self,
requested: str | None,
exec_order: list[str],
messages: dict[str, str],
) -> str:
"""Select the final agent: the requested one or the last in execution order."""
if requested and requested in messages:
return requested
if exec_order:
return exec_order[-1]
return ""
def _build_agent_states(
self,
messages: dict[str, str],
agent_lookup: dict[str, Any],
) -> dict[str, list[dict[str, Any]]]:
"""Build updated agent states by appending responses to history."""
states: dict[str, list[dict[str, Any]]] = {}
for agent_id, response in messages.items():
agent = agent_lookup.get(agent_id)
if agent is not None:
new_state = list(getattr(agent, "state", []))
new_state.append({"role": "assistant", "content": response})
states[agent_id] = new_state
return states
def _collect_hidden_states(
self,
agent_lookup: dict[str, Any],
) -> dict[str, HiddenState]:
"""Collect the current hidden_state/embedding of agents into a dictionary."""
hidden_states: dict[str, HiddenState] = {}
for agent_id, agent in agent_lookup.items():
hs = HiddenState()
if hasattr(agent, "hidden_state") and agent.hidden_state is not None:
hs.tensor = agent.hidden_state
if hasattr(agent, "embedding") and agent.embedding is not None:
hs.embedding = agent.embedding
if hs.tensor is not None or hs.embedding is not None:
hidden_states[agent_id] = hs
return hidden_states
def _combine_hidden_states(
self,
states: list[HiddenState],
) -> HiddenState | None:
"""Combine a list of hidden states according to the hidden_combine_strategy."""
if not states:
return None
tensors = [s.tensor for s in states if s.tensor is not None]
embeddings = [s.embedding for s in states if s.embedding is not None]
combined = HiddenState()
if tensors:
combined.tensor = self._combine_tensors(tensors)
if embeddings:
combined.embedding = self._combine_tensors(embeddings)
return combined if (combined.tensor is not None or combined.embedding is not None) else None
def _combine_tensors(self, tensors: list[torch.Tensor]) -> torch.Tensor:
"""Combine a list of tensors according to the strategy (mean/sum/concat/attention)."""
if len(tensors) == 1:
return tensors[0]
stacked = torch.stack(tensors)
if self.config.hidden_combine_strategy == "mean":
return stacked.mean(dim=0)
if self.config.hidden_combine_strategy == "sum":
return stacked.sum(dim=0)
if self.config.hidden_combine_strategy == "concat":
return torch.cat(tensors, dim=-1)
if self.config.hidden_combine_strategy == "attention":
weights = torch.softmax(torch.ones(len(tensors)), dim=0)
return (stacked * weights.view(-1, *([1] * (stacked.dim() - 1)))).sum(dim=0)
return stacked.mean(dim=0)
def _get_incoming_hidden(
self,
_agent_id: str,
incoming_ids: list[str],
hidden_states: dict[str, HiddenState],
) -> HiddenState | None:
"""Get the combined hidden state of predecessors."""
if not self.config.enable_hidden_channels:
return None
incoming_states = [hidden_states[aid] for aid in incoming_ids if aid in hidden_states]
return self._combine_hidden_states(incoming_states)
def _update_agent_hidden_state(
self,
agent: Any,
response: str,
incoming_hidden: HiddenState | None,
hidden_encoder: Any | None = None,
) -> HiddenState:
"""Update the agent's hidden_state based on the response and incoming hidden state."""
new_hidden = HiddenState()
if hasattr(agent, "embedding") and agent.embedding is not None:
new_hidden.embedding = agent.embedding
if hidden_encoder is not None:
try:
encoded = hidden_encoder.encode([response])
if isinstance(encoded, torch.Tensor) and encoded.numel() > 0:
new_hidden.tensor = encoded[0]
except (ValueError, TypeError, RuntimeError):
pass # Ignore encoding errors
if new_hidden.tensor is None and incoming_hidden is not None:
new_hidden.tensor = incoming_hidden.tensor
new_hidden.metadata = {
"last_response_length": len(response),
"has_incoming": incoming_hidden is not None,
}
return new_hidden
def run_round_with_hidden( # noqa: PLR0912, PLR0915
self,
role_graph: Any,
final_agent_id: str | None = None,
hidden_encoder: Any | None = None,
) -> MACPResult:
"""
Synchronous round with hidden state transfer between agents.
Supports multi-model: each agent uses its own LLM caller.
Supports conditional edge evaluation.
"""
if not self._has_any_caller():
msg = "llm_caller, llm_callers, or llm_factory is required"
raise ValueError(msg)
original_hidden_setting = self.config.enable_hidden_channels
self.config.enable_hidden_channels = True
start_time = time.time()
try:
base = self._prepare_base_context(role_graph)
except Exception:
self.config.enable_hidden_channels = original_hidden_setting
raise
if base is None:
self.config.enable_hidden_channels = original_hidden_setting
return MACPResult(messages={}, final_answer="", final_agent_id="", execution_order=[])
task_idx, a_agents, agent_ids, query, agent_lookup, agent_names = base
# Initialize memory
self._init_memory(agent_ids)
hidden_states = self._collect_hidden_states(agent_lookup)
messages: dict[str, str] = {}
step_results: dict[str, StepResult] = {}
execution_order: list[str] = []
fallback_attempts: dict[str, int] = {}
topology_changed_count = 0
fallback_count = 0
pruned_agents: list[str] = []
errors: list[ExecutionError] = []
total_tokens = 0
# Adaptive mode: plan + conditional edge evaluation after each step
if self.config.adaptive and self._scheduler is not None:
p_matrix = self._extract_p_matrix(role_graph, task_idx)
edge_conditions = self._get_edge_conditions(role_graph)
condition_ctx = ConditionContext(
source_agent="",
target_agent="",
messages={},
step_results={},
query=query,
)
plan = self._scheduler.build_plan(
a_agents,
agent_ids,
p_matrix,
start_agent=None,
end_agent=final_agent_id,
edge_conditions=edge_conditions,
condition_context=condition_ctx,
filter_unreachable=True,
)
while not plan.is_complete:
step = plan.get_current_step()
if step is None:
break
if step.agent_id in plan.condition_skipped:
plan.advance()
continue
should_prune, reason = self._scheduler.should_prune(
step, plan, step_results.get(execution_order[-1]) if execution_order else None
)
if should_prune:
plan.mark_skipped(step.agent_id)
pruned_agents.append(step.agent_id)
errors.append(
ExecutionError(
message=f"Pruned: {reason}",
agent_id=step.agent_id,
recoverable=False,
)
)
continue
agent_id = step.agent_id
agent = agent_lookup.get(agent_id)
if agent is None:
plan.advance()
continue
incoming_ids = get_incoming_agents(agent_id, a_agents, agent_ids)
incoming_messages = {aid: messages[aid] for aid in incoming_ids if aid in messages}
incoming_hidden = self._get_incoming_hidden(agent_id, incoming_ids, hidden_states)
memory_context = self._get_memory_context(agent_id)
prompt = self._build_prompt(agent, query, incoming_messages, agent_names, memory_context)
if incoming_hidden and incoming_hidden.metadata:
context_hint = self._format_hidden_context(incoming_hidden)
if context_hint:
suffix = f"\n\n[Context: {context_hint}]"
prompt = StructuredPrompt(
text=prompt.text + suffix,
messages=[
*prompt.messages[:-1],
{"role": "user", "content": prompt.messages[-1]["content"] + suffix},
],
)
try:
caller = self._get_caller_for_agent(agent_id, agent)
if caller is None:
error_msg = f"No LLM caller available for agent {agent_id}"
messages[agent_id] = f"[Error: {error_msg}]"
result = StepResult(agent_id=agent_id, success=False, error=error_msg)
step_results[agent_id] = result
execution_order.append(agent_id)
plan.mark_failed(agent_id)
errors.append(
ExecutionError(
message=error_msg,
agent_id=agent_id,
recoverable=True,
)
)
# Fallback when caller is missing
attempts = fallback_attempts.get(agent_id, 0)
if self._scheduler.should_use_fallback(step, result, attempts):
for fb_agent in step.fallback_agents:
if fb_agent not in plan.completed and fb_agent not in plan.failed:
plan.insert_fallback(fb_agent, plan.current_index - 1)
fallback_count += 1
break
fallback_attempts[agent_id] = attempts + 1
continue
response, tokens = self._run_agent_with_tools(
caller=caller,
prompt=prompt,
agent=agent,
)
messages[agent_id] = response
total_tokens += tokens
execution_order.append(agent_id)
self._save_to_memory(agent_id, response, incoming_ids)
hidden_states[agent_id] = self._update_agent_hidden_state(
agent, response, incoming_hidden, hidden_encoder
)
result = StepResult(
agent_id=agent_id,
success=True,
response=response,
tokens_used=tokens,
)
step_results[agent_id] = result
plan.mark_completed(agent_id, tokens)
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
messages[agent_id] = f"[Error: {e}]"
result = StepResult(agent_id=agent_id, success=False, error=str(e))
step_results[agent_id] = result
execution_order.append(agent_id)
plan.mark_failed(agent_id)
errors.append(
ExecutionError(
message=str(e),
agent_id=agent_id,
recoverable=True,
)
)
# Fallback on execution error
attempts = fallback_attempts.get(agent_id, 0)
if self._scheduler.should_use_fallback(step, result, attempts):
for fb_agent in step.fallback_agents:
if fb_agent not in plan.completed and fb_agent not in plan.failed:
plan.insert_fallback(fb_agent, plan.current_index - 1)
fallback_count += 1
break
fallback_attempts[agent_id] = attempts + 1
# Topology pipeline: conditional edges + user hooks → plan
if self._run_topology_pipeline(
plan,
agent_id,
a_agents,
agent_ids,
step_results,
messages,
query,
execution_order,
total_tokens,
role_graph,
):
topology_changed_count += 1
else:
# Non-adaptive mode: execute the plan linearly
exec_order = build_execution_order(a_agents, agent_ids, role_graph.role_sequence)
for agent_id in exec_order:
agent = agent_lookup.get(agent_id)
if agent is None:
continue
incoming_ids = get_incoming_agents(agent_id, a_agents, agent_ids)
incoming_messages = {aid: messages[aid] for aid in incoming_ids if aid in messages}
incoming_hidden = self._get_incoming_hidden(agent_id, incoming_ids, hidden_states)
memory_context = self._get_memory_context(agent_id)
prompt = self._build_prompt(agent, query, incoming_messages, agent_names, memory_context)
if incoming_hidden and incoming_hidden.metadata:
context_hint = self._format_hidden_context(incoming_hidden)
if context_hint:
suffix = f"\n\n[Context: {context_hint}]"
prompt = StructuredPrompt(
text=prompt.text + suffix,
messages=[
*prompt.messages[:-1],
{"role": "user", "content": prompt.messages[-1]["content"] + suffix},
],
)
try:
caller = self._get_caller_for_agent(agent_id, agent)
if caller is None:
error_msg = f"No LLM caller available for agent {agent_id}"
messages[agent_id] = f"[Error: {error_msg}]"
result = StepResult(agent_id=agent_id, success=False, error=error_msg)
step_results[agent_id] = result
execution_order.append(agent_id)
errors.append(
ExecutionError(
message=error_msg,
agent_id=agent_id,
recoverable=True,
)
)
continue
response, tokens = self._run_agent_with_tools(
caller=caller,
prompt=prompt,
agent=agent,
)
messages[agent_id] = response
total_tokens += tokens
execution_order.append(agent_id)
self._save_to_memory(agent_id, response, incoming_ids)
hidden_states[agent_id] = self._update_agent_hidden_state(
agent, response, incoming_hidden, hidden_encoder
)
result = StepResult(
agent_id=agent_id,
success=True,
response=response,
tokens_used=tokens,
)
step_results[agent_id] = result
except (ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
messages[agent_id] = f"[Error: {e}]"
result = StepResult(agent_id=agent_id, success=False, error=str(e))
step_results[agent_id] = result
execution_order.append(agent_id)
errors.append(
ExecutionError(
message=str(e),
agent_id=agent_id,
recoverable=True,
)
)
final_id = self._determine_final_agent(final_agent_id, execution_order, messages)
agent_states = self._build_agent_states(messages, agent_lookup)
result = MACPResult(
messages=messages,
final_answer=messages.get(final_id, ""),
final_agent_id=final_id,
execution_order=execution_order,
topology_changed_count=topology_changed_count,
fallback_count=fallback_count,
agent_states=agent_states,
step_results=step_results,
total_tokens=total_tokens,
total_time=time.time() - start_time,
pruned_agents=pruned_agents,
errors=errors if errors else None,
hidden_states=hidden_states,
)
self.config.enable_hidden_channels = original_hidden_setting
return result
def _format_hidden_context(self, hidden: HiddenState) -> str:
"""Format hidden state metadata for inclusion in the prompt."""
parts = []
if hidden.metadata and "last_response_length" in hidden.metadata:
parts.append(f"previous response length: {hidden.metadata['last_response_length']}")
return ", ".join(parts) if parts else ""
# =========================================================================
# STREAMING EXECUTION METHODS
# =========================================================================
def stream(
self,
role_graph: Any,
final_agent_id: str | None = None,
*,
update_states: bool | None = None,
) -> Iterator[AnyStreamEvent]:
"""
Stream execution events for real-time output.
Yields events as agents are executed, allowing real-time monitoring
and display of intermediate results.
Args:
role_graph: The RoleGraph to execute
final_agent_id: Override which agent produces final answer
update_states: Whether to update agent states after execution
Yields:
StreamEvent instances for each execution phase
Example:
for event in runner.stream(graph):
if event.event_type == StreamEventType.AGENT_OUTPUT:
print(f"{event.agent_id}: {event.content}")
elif event.event_type == StreamEventType.TOKEN:
print(event.token, end="", flush=True)
"""
if not self._has_any_caller() and self.streaming_llm_caller is None:
msg = "llm_caller, llm_callers, llm_factory, or streaming_llm_caller required for streaming"
raise ValueError(msg)
if self.config.adaptive:
yield from self._stream_adaptive(role_graph, final_agent_id, update_states=update_states)
else:
yield from self._stream_simple(role_graph, final_agent_id, update_states=update_states)
async def astream(
self,
role_graph: Any,
final_agent_id: str | None = None,
*,
update_states: bool | None = None,
) -> AsyncIterator[AnyStreamEvent]:
"""
Async streaming execution for real-time output.
Async version of stream() for use in async contexts.
Args:
role_graph: The RoleGraph to execute
final_agent_id: Override which agent produces final answer
update_states: Whether to update agent states after execution
Yields:
StreamEvent instances for each execution phase
Example:
async for event in runner.astream(graph):
match event.event_type:
case StreamEventType.AGENT_START:
print(f"Agent {event.agent_id} started")
case StreamEventType.AGENT_OUTPUT:
print(f"Output: {event.content}")
"""
if not self._has_any_async_caller() and self.async_streaming_llm_caller is None:
msg = "async_llm_caller, async_llm_callers, llm_factory, or async_streaming_llm_caller required"
raise ValueError(msg)
if self.config.adaptive:
async for event in self._astream_adaptive(role_graph, final_agent_id, update_states=update_states):
yield event
else:
async for event in self._astream_simple(role_graph, final_agent_id, update_states=update_states):
yield event
def _stream_simple( # noqa: PLR0915
self,
role_graph: Any,
final_agent_id: str | None,
*,
update_states: bool | None,
) -> Iterator[StreamEvent]:
"""Simple sequential streaming execution."""
run_id = str(uuid.uuid4())[:8]
start_time = time.time()
base = self._prepare_base_context(role_graph)
if base is None:
yield RunEndEvent(
run_id=run_id, success=True, final_answer="", final_agent_id="", total_time=time.time() - start_time
)
return
task_idx, a_agents, agent_ids, query, agent_lookup, agent_names = base
del task_idx # not used directly in simple streaming
exec_order = build_execution_order(a_agents, agent_ids, role_graph.role_sequence)
self._init_memory(agent_ids)
task_connected = self._get_task_connected_agents(role_graph)
# Emit run start
yield RunStartEvent(
run_id=run_id,
query=query,
num_agents=len(exec_order),
execution_order=exec_order,
config_summary={
"adaptive": False,
"timeout": self.config.timeout,
"enable_memory": self.config.enable_memory,
"broadcast_task_to_all": self.config.broadcast_task_to_all,
},
)
messages: dict[str, str] = {}
total_tokens = 0
errors: list[str] = []
for step_idx, agent_id in enumerate(exec_order):
agent = agent_lookup.get(agent_id)
if agent is None:
continue
agent_name = agent_names.get(agent_id, agent_id)
incoming_ids = get_incoming_agents(agent_id, a_agents, agent_ids)
incoming_messages = {aid: messages[aid] for aid in incoming_ids if aid in messages}
include_query = self._should_include_query(agent_id, task_connected)
memory_context = self._get_memory_context(agent_id)
prompt = self._build_prompt(
agent, query, incoming_messages, agent_names, memory_context, include_query=include_query
)
# prompt is now a StructuredPrompt — use .text for preview/streaming
prompt_text = prompt.text
# Emit agent start
yield AgentStartEvent(
run_id=run_id,
agent_id=agent_id,
agent_name=agent_name,
step_index=step_idx,
predecessors=incoming_ids,
prompt_preview=prompt_text[: self.config.prompt_preview_length],
)
step_start = time.time()
try:
# Get caller for this specific agent (multi-model support)
caller = self._get_caller_for_agent(agent_id, agent)
if caller is None:
error_msg = f"No LLM caller available for agent {agent_id}"
errors.append(f"{agent_id}: {error_msg}")
messages[agent_id] = f"[Error: {error_msg}]"
yield AgentErrorEvent(
run_id=run_id,
agent_id=agent_id,
error_type="ValueError",
error_message=error_msg,
will_retry=False,
)
continue
# Use streaming LLM if available and enabled
if self.streaming_llm_caller and self.config.enable_token_streaming:
response_parts: list[str] = []
token_idx = 0
for token in self.streaming_llm_caller(prompt_text):
response_parts.append(token)
yield TokenEvent(
run_id=run_id,
agent_id=agent_id,
token=token,
token_index=token_idx,
is_first=(token_idx == 0),
is_last=False,
)
token_idx += 1
# Mark last token
if response_parts:
yield TokenEvent(
run_id=run_id,
agent_id=agent_id,
token="",
token_index=token_idx,
is_first=False,
is_last=True,
)
response = "".join(response_parts)
tokens = self.token_counter(prompt_text) + self.token_counter(response)
else:
# Use regular LLM caller for this agent (with tools support)
# prompt (StructuredPrompt) is passed through — _run_agent_with_tools
# dispatches via _call_llm when structured_llm_caller is available
response, tokens = self._run_agent_with_tools(
caller=caller,
prompt=prompt,
agent=agent,
)
messages[agent_id] = response
total_tokens += tokens
self._save_to_memory(agent_id, response, incoming_ids)
is_final = (step_idx == len(exec_order) - 1) or (agent_id == final_agent_id)
yield AgentOutputEvent(
run_id=run_id,
agent_id=agent_id,
agent_name=agent_name,
content=response,
tokens_used=tokens,
duration_ms=(time.time() - step_start) * 1000,
is_final=is_final,
)
except (TimeoutError, ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
error_msg = str(e)
errors.append(f"{agent_id}: {error_msg}")
messages[agent_id] = f"[Error: {e}]"
yield AgentErrorEvent(
run_id=run_id,
agent_id=agent_id,
error_type=type(e).__name__,
error_message=error_msg,
will_retry=False,
)
final_id = self._determine_final_agent(final_agent_id, exec_order, messages)
do_update = update_states if update_states is not None else self.config.update_states
agent_states = self._build_agent_states(messages, agent_lookup) if do_update else None
yield RunEndEvent(
run_id=run_id,
success=len(errors) == 0,
final_answer=messages.get(final_id, ""),
final_agent_id=final_id,
total_tokens=total_tokens,
total_time=time.time() - start_time,
executed_agents=list(messages.keys()),
errors=errors,
agent_states=agent_states,
)
async def _astream_simple( # noqa: PLR0912, PLR0915
self,
role_graph: Any,
final_agent_id: str | None,
*,
update_states: bool | None,
) -> AsyncIterator[StreamEvent]:
"""
Async streaming execution with optional parallel support.
When ``config.enable_parallel`` is ``True``, independent agents
(those whose predecessors have all completed) are executed
concurrently via ``asyncio.gather``. This is determined by
:func:`get_parallel_groups` which partitions the topological
order into dependency-based levels.
When ``enable_parallel`` is ``False`` (or the graph is purely
sequential), agents are executed one-by-one — identical to the
synchronous ``_stream_simple`` but using async I/O.
"""
run_id = str(uuid.uuid4())[:8]
start_time = time.time()
base = self._prepare_base_context(role_graph)
if base is None:
yield RunEndEvent(
run_id=run_id, success=True, final_answer="", final_agent_id="", total_time=time.time() - start_time
)
return
_task_idx, a_agents, agent_ids, query, agent_lookup, agent_names = base
exec_order = build_execution_order(a_agents, agent_ids, role_graph.role_sequence)
self._init_memory(agent_ids)
# When parallel execution is enabled, partition agents into
# dependency-based groups so independent agents run concurrently.
if self.config.enable_parallel:
groups = get_parallel_groups(a_agents, agent_ids)
else:
# Each agent is its own group — strictly sequential.
groups = [[aid] for aid in exec_order]
yield RunStartEvent(
run_id=run_id,
query=query,
num_agents=len(exec_order),
execution_order=exec_order,
config_summary={
"parallel": self.config.enable_parallel,
},
)
messages: dict[str, str] = {}
total_tokens = 0
errors: list[str] = []
step_idx = 0
for group_idx, group in enumerate(groups):
# Filter out unknown agents
group_agents = [aid for aid in group if agent_lookup.get(aid) is not None]
if not group_agents:
continue
# Emit ParallelStartEvent when the group has >1 agent
is_parallel_group = self.config.enable_parallel and len(group_agents) > 1
if is_parallel_group:
yield ParallelStartEvent(
run_id=run_id,
agent_ids=group_agents,
group_index=group_idx,
)
# Emit AgentStartEvent for every agent in the group
agent_prompts: dict[str, StructuredPrompt] = {}
for agent_id in group_agents:
agent = agent_lookup[agent_id]
agent_name = agent_names.get(agent_id, agent_id)
incoming_ids = get_incoming_agents(agent_id, a_agents, agent_ids)
incoming_messages = {aid: messages[aid] for aid in incoming_ids if aid in messages}
memory_context = self._get_memory_context(agent_id)
prompt = self._build_prompt(agent, query, incoming_messages, agent_names, memory_context)
agent_prompts[agent_id] = prompt
yield AgentStartEvent(
run_id=run_id,
agent_id=agent_id,
agent_name=agent_name,
step_index=step_idx,
predecessors=incoming_ids,
prompt_preview=prompt.text[: self.config.prompt_preview_length],
)
step_idx += 1
# ── Execute the group ─────────────────────────────────────
if is_parallel_group:
# Run all agents in the group concurrently
async def _call_agent(aid: str, prompt: StructuredPrompt) -> tuple[str, str | None, str | None]:
"""Returns (agent_id, response, error)."""
try:
resp = await asyncio.wait_for(
self._acall_llm(self.async_llm_caller, prompt),
timeout=self.config.timeout,
)
except (
TimeoutError,
ExecutionError,
ValueError,
TypeError,
KeyError,
RuntimeError,
OSError,
) as exc:
return (aid, None, str(exc))
else:
return (aid, resp, None)
results = await asyncio.gather(*[_call_agent(aid, agent_prompts[aid]) for aid in group_agents])
successful: list[str] = []
failed: list[str] = []
for agent_id, response, error in results:
agent_name = agent_names.get(agent_id, agent_id)
if error is None and response is not None:
messages[agent_id] = response
prompt = agent_prompts[agent_id]
tokens = self.token_counter(prompt.text) + self.token_counter(response)
total_tokens += tokens
incoming_ids = get_incoming_agents(agent_id, a_agents, agent_ids)
self._save_to_memory(agent_id, response, incoming_ids)
successful.append(agent_id)
yield AgentOutputEvent(
run_id=run_id,
agent_id=agent_id,
agent_name=agent_name,
content=response,
tokens_used=tokens,
is_final=(agent_id == final_agent_id),
)
else:
errors.append(f"{agent_id}: {error}")
messages[agent_id] = f"[Error: {error}]"
failed.append(agent_id)
yield AgentErrorEvent(
run_id=run_id,
agent_id=agent_id,
error_type="ExecutionError",
error_message=error or "Unknown error",
)
yield ParallelEndEvent(
run_id=run_id,
agent_ids=group_agents,
group_index=group_idx,
successful=successful,
failed=failed,
)
else:
# Sequential execution — one agent at a time
for agent_id in group_agents:
agent_name = agent_names.get(agent_id, agent_id)
prompt = agent_prompts[agent_id]
step_start = time.time()
try:
# Use async streaming LLM if available
if self.async_streaming_llm_caller and self.config.enable_token_streaming:
response_parts: list[str] = []
token_idx = 0
async for token in self.async_streaming_llm_caller(prompt.text):
response_parts.append(token)
yield TokenEvent(
run_id=run_id,
agent_id=agent_id,
token=token,
token_index=token_idx,
is_first=(token_idx == 0),
is_last=False,
)
token_idx += 1
if response_parts:
yield TokenEvent(
run_id=run_id,
agent_id=agent_id,
token="",
token_index=token_idx,
is_first=False,
is_last=True,
)
response = "".join(response_parts)
else:
if self.async_llm_caller is None and self.async_structured_llm_caller is None:
error_msg = f"No async LLM caller available for agent {agent_id}"
errors.append(f"{agent_id}: {error_msg}")
messages[agent_id] = f"[Error: {error_msg}]"
yield AgentErrorEvent(
run_id=run_id,
agent_id=agent_id,
error_type="ValueError",
error_message=error_msg,
)
continue
response = await asyncio.wait_for(
self._acall_llm(self.async_llm_caller, prompt),
timeout=self.config.timeout,
)
messages[agent_id] = response
tokens = self.token_counter(prompt.text) + self.token_counter(response)
total_tokens += tokens
incoming_ids = get_incoming_agents(agent_id, a_agents, agent_ids)
self._save_to_memory(agent_id, response, incoming_ids)
is_final = (agent_id == final_agent_id) or (
group_idx == len(groups) - 1 and agent_id == group_agents[-1]
)
yield AgentOutputEvent(
run_id=run_id,
agent_id=agent_id,
agent_name=agent_name,
content=response,
tokens_used=tokens,
duration_ms=(time.time() - step_start) * 1000,
is_final=is_final,
)
except (TimeoutError, ExecutionError, ValueError, TypeError, KeyError, RuntimeError, OSError) as e:
error_msg = str(e)
errors.append(f"{agent_id}: {error_msg}")
messages[agent_id] = f"[Error: {e}]"
yield AgentErrorEvent(
run_id=run_id,
agent_id=agent_id,
error_type=type(e).__name__,
error_message=error_msg,
)
final_id = self._determine_final_agent(final_agent_id, exec_order, messages)
do_update = update_states if update_states is not None else self.config.update_states
agent_states = self._build_agent_states(messages, agent_lookup) if do_update else None
yield RunEndEvent(
run_id=run_id,
success=len(errors) == 0,
final_answer=messages.get(final_id, ""),
final_agent_id=final_id,
total_tokens=total_tokens,
total_time=time.time() - start_time,
executed_agents=list(messages.keys()),
errors=errors,
agent_states=agent_states,
)
def _stream_adaptive( # noqa: PLR0912, PLR0915
self,
role_graph: Any,
final_agent_id: str | None,
*,
update_states: bool | None,
) -> Iterator[StreamEvent]:
"""Adaptive streaming execution with conditional edges, pruning, and fallback."""
if self._scheduler is None:
msg = "Scheduler not initialized for adaptive mode"
raise ValueError(msg)
run_id = str(uuid.uuid4())[:8]
start_time = time.time()
base = self._prepare_base_context(role_graph)
if base is None:
yield RunEndEvent(run_id=run_id, success=True, total_time=0)
return
task_idx, a_agents, agent_ids, query, agent_lookup, agent_names = base
self._init_memory(agent_ids)
p_matrix = self._extract_p_matrix(role_graph, task_idx)
edge_conditions = self._get_edge_conditions(role_graph)
condition_ctx = ConditionContext(
source_agent="",
target_agent="",
messages={},
step_results={},
query=query,
)
plan = self._scheduler.build_plan(
a_agents,
agent_ids,
p_matrix,
end_agent=final_agent_id,
edge_conditions=edge_conditions,
condition_context=condition_ctx,
)
yield RunStartEvent(
run_id=run_id,
query=query,
num_agents=len(agent_ids),
execution_order=plan.execution_order,
config_summary={"adaptive": True, "policy": self.config.routing_policy.value},
)
messages: dict[str, str] = {}
step_results: dict[str, StepResult] = {}
execution_order: list[str] = []
fallback_attempts: dict[str, int] = {}
topology_changed_count = 0
errors: list[str] = []
total_tokens = 0
step_idx = 0
while not plan.is_complete:
step = plan.get_current_step()
if step is None:
break
# Skip agents whose conditions were not met
if step.agent_id in plan.condition_skipped:
plan.advance()
continue
# Check for pruning
should_prune, reason = self._scheduler.should_prune(
step, plan, step_results.get(execution_order[-1]) if execution_order else None
)
if should_prune:
plan.mark_skipped(step.agent_id)
yield PruneEvent(
run_id=run_id,
agent_id=step.agent_id,
reason=reason,
)
continue
agent = agent_lookup.get(step.agent_id)
if agent is None:
plan.advance()
continue
agent_name = agent_names.get(step.agent_id, step.agent_id)
incoming = {p: messages[p] for p in step.predecessors if p in messages}
memory_context = self._get_memory_context(step.agent_id)
prompt = self._build_prompt(agent, query, incoming, agent_names, memory_context)
yield AgentStartEvent(
run_id=run_id,
agent_id=step.agent_id,
agent_name=agent_name,
step_index=step_idx,
predecessors=step.predecessors,
prompt_preview=prompt.text[: self.config.prompt_preview_length],
)
step_start = time.time()
result = self._execute_step(step, messages, agent_lookup, agent_names, query)
step_results[step.agent_id] = result
execution_order.append(step.agent_id)
if result.success:
messages[step.agent_id] = result.response or ""
plan.mark_completed(step.agent_id, result.tokens_used)
total_tokens += result.tokens_used
self._save_to_memory(step.agent_id, result.response or "", step.predecessors)
yield AgentOutputEvent(
run_id=run_id,
agent_id=step.agent_id,
agent_name=agent_name,
content=result.response or "",
tokens_used=result.tokens_used,
duration_ms=(time.time() - step_start) * 1000,
)
else:
plan.mark_failed(step.agent_id)
errors.append(f"{step.agent_id}: {result.error}")
yield AgentErrorEvent(
run_id=run_id,
agent_id=step.agent_id,
error_type="ExecutionError",
error_message=result.error or "Unknown error",
)
# Handle fallback
attempts = fallback_attempts.get(step.agent_id, 0)
if self._scheduler.should_use_fallback(step, result, attempts):
for fb_agent in step.fallback_agents:
if fb_agent not in plan.completed and fb_agent not in plan.failed:
plan.insert_fallback(fb_agent, plan.current_index - 1)
yield FallbackEvent(
run_id=run_id,
failed_agent_id=step.agent_id,
fallback_agent_id=fb_agent,
attempt=attempts + 1,
)
break
fallback_attempts[step.agent_id] = attempts + 1
# Topology pipeline: conditional edges + user hooks → plan
old_remaining = [s.agent_id for s in plan.remaining_steps]
if self._run_topology_pipeline(
plan,
step.agent_id,
a_agents,
agent_ids,
step_results,
messages,
query,
execution_order,
total_tokens,
role_graph,
):
topology_changed_count += 1
new_remaining = [s.agent_id for s in plan.remaining_steps]
if old_remaining != new_remaining:
yield TopologyChangedEvent(
run_id=run_id,
reason="Topology pipeline: conditional edges",
old_remaining=old_remaining,
new_remaining=new_remaining,
change_count=topology_changed_count,
)
step_idx += 1
final_id = self._determine_final_agent(final_agent_id, execution_order, messages)
do_update = update_states if update_states is not None else self.config.update_states
agent_states = self._build_agent_states(messages, agent_lookup) if do_update else None
yield RunEndEvent(
run_id=run_id,
success=len(errors) == 0,
final_answer=messages.get(final_id, ""),
final_agent_id=final_id,
total_tokens=total_tokens,
total_time=time.time() - start_time,
executed_agents=execution_order,
errors=errors,
agent_states=agent_states,
)
async def _astream_adaptive( # noqa: PLR0912, PLR0915
self,
role_graph: Any,
final_agent_id: str | None,
*,
update_states: bool | None,
) -> AsyncIterator[StreamEvent]:
"""Async adaptive streaming with parallel execution support."""
if self._scheduler is None:
msg = "Scheduler not initialized for adaptive mode"
raise ValueError(msg)
run_id = str(uuid.uuid4())[:8]
start_time = time.time()
base = self._prepare_base_context(role_graph)
if base is None:
yield RunEndEvent(run_id=run_id, success=True, total_time=0)
return
task_idx, a_agents, agent_ids, query, agent_lookup, agent_names = base
self._init_memory(agent_ids)
p_matrix = self._extract_p_matrix(role_graph, task_idx)
edge_conditions = self._get_edge_conditions(role_graph)
condition_ctx = ConditionContext(
source_agent="",
target_agent="",
messages={},
step_results={},
query=query,
)
plan = self._scheduler.build_plan(
a_agents,
agent_ids,
p_matrix,
end_agent=final_agent_id,
edge_conditions=edge_conditions,
condition_context=condition_ctx,
)
yield RunStartEvent(
run_id=run_id,
query=query,
num_agents=len(agent_ids),
execution_order=plan.execution_order,
config_summary={
"adaptive": True,
"parallel": self.config.enable_parallel,
"policy": self.config.routing_policy.value,
},
)
messages: dict[str, str] = {}
step_results: dict[str, StepResult] = {}
execution_order: list[str] = []
fallback_attempts: dict[str, int] = {}
topology_changed_count = 0
errors: list[str] = []
total_tokens = 0
group_idx = 0
while not plan.is_complete:
parallel_group = self._get_parallel_group(plan, messages.keys())
if not parallel_group:
break
# Filter condition-skipped and pruned steps
valid_steps = []
for step in parallel_group:
# Skip agents whose conditions were not met
if step.agent_id in plan.condition_skipped:
plan.advance()
continue
should_prune, reason = self._scheduler.should_prune(step, plan, None)
if should_prune:
plan.mark_skipped(step.agent_id)
yield PruneEvent(run_id=run_id, agent_id=step.agent_id, reason=reason)
else:
valid_steps.append(step)
if not valid_steps:
# All steps in the group are skipped or pruned — break to avoid stalling.
break
# Emit parallel start event
if self.config.enable_parallel and len(valid_steps) > 1:
yield ParallelStartEvent(
run_id=run_id,
agent_ids=[s.agent_id for s in valid_steps],
group_index=group_idx,
)
# Emit agent start events
for step in valid_steps:
agent_name = agent_names.get(step.agent_id, step.agent_id)
incoming = {p: messages[p] for p in step.predecessors if p in messages}
agent = agent_lookup.get(step.agent_id)
if agent:
memory_context = self._get_memory_context(step.agent_id)
prompt = self._build_prompt(agent, query, incoming, agent_names, memory_context)
yield AgentStartEvent(
run_id=run_id,
agent_id=step.agent_id,
agent_name=agent_name,
predecessors=step.predecessors,
prompt_preview=prompt.text[: self.config.prompt_preview_length],
)
# Execute steps (parallel or sequential)
if self.config.enable_parallel and len(valid_steps) > 1:
results = await self._execute_parallel(valid_steps, messages, agent_lookup, agent_names, query)
else:
results = []
for step in valid_steps:
r = await self._execute_step_async(step, messages, agent_lookup, agent_names, query)
results.append((step, r))
# Process results and emit events
successful: list[str] = []
failed: list[str] = []
for step, result in results:
step_results[step.agent_id] = result
execution_order.append(step.agent_id)
agent_name = agent_names.get(step.agent_id, step.agent_id)
if result.success:
messages[step.agent_id] = result.response or ""
plan.mark_completed(step.agent_id, result.tokens_used)
total_tokens += result.tokens_used
self._save_to_memory(step.agent_id, result.response or "", step.predecessors)
successful.append(step.agent_id)
yield AgentOutputEvent(
run_id=run_id,
agent_id=step.agent_id,
agent_name=agent_name,
content=result.response or "",
tokens_used=result.tokens_used,
)
else:
plan.mark_failed(step.agent_id)
errors.append(f"{step.agent_id}: {result.error}")
failed.append(step.agent_id)
yield AgentErrorEvent(
run_id=run_id,
agent_id=step.agent_id,
error_message=result.error or "Unknown error",
)
# Handle fallback
attempts = fallback_attempts.get(step.agent_id, 0)
if self._scheduler.should_use_fallback(step, result, attempts):
for fb_agent in step.fallback_agents:
if fb_agent not in plan.completed and fb_agent not in plan.failed:
plan.insert_fallback(fb_agent, plan.current_index - 1)
yield FallbackEvent(
run_id=run_id,
failed_agent_id=step.agent_id,
fallback_agent_id=fb_agent,
attempt=attempts + 1,
)
break
fallback_attempts[step.agent_id] = attempts + 1
# Emit parallel end event
if self.config.enable_parallel and len(valid_steps) > 1:
yield ParallelEndEvent(
run_id=run_id,
agent_ids=[s.agent_id for s in valid_steps],
group_index=group_idx,
successful=successful,
failed=failed,
)
# Topology pipeline for each executed agent in the group
old_remaining = [s.agent_id for s in plan.remaining_steps]
for step, _result in results:
if await self._arun_topology_pipeline(
plan,
step.agent_id,
a_agents,
agent_ids,
step_results,
messages,
query,
execution_order,
total_tokens,
role_graph,
):
topology_changed_count += 1
new_remaining = [s.agent_id for s in plan.remaining_steps]
if old_remaining != new_remaining:
yield TopologyChangedEvent(
run_id=run_id,
reason="Topology pipeline: conditional edges",
old_remaining=old_remaining,
new_remaining=new_remaining,
change_count=topology_changed_count,
)
group_idx += 1
final_id = self._determine_final_agent(final_agent_id, execution_order, messages)
do_update = update_states if update_states is not None else self.config.update_states
agent_states = self._build_agent_states(messages, agent_lookup) if do_update else None
yield RunEndEvent(
run_id=run_id,
success=len(errors) == 0,
final_answer=messages.get(final_id, ""),
final_agent_id=final_id,
total_tokens=total_tokens,
total_time=time.time() - start_time,
executed_agents=execution_order,
errors=errors,
agent_states=agent_states,
)
def stream_to_result(
self,
role_graph: Any,
final_agent_id: str | None = None,
*,
update_states: bool | None = None,
) -> tuple[Iterator[StreamEvent], MACPResult]:
"""
Stream execution and also return final MACPResult.
Useful when you want both streaming display and complete result.
Returns:
Tuple of (event iterator, MACPResult)
Example:
stream, result_future = runner.stream_to_result(graph)
for event in stream:
print(event)
result = result_future # Available after stream exhausted
"""
events: list[StreamEvent] = []
messages: dict[str, str] = {}
final_answer = ""
final_agent = ""
execution_order: list[str] = []
total_tokens = 0
total_time = 0.0
errors_list: list[str] = []
def collecting_stream() -> Iterator[StreamEvent]:
nonlocal final_answer, final_agent, total_tokens, total_time, errors_list
for event in self.stream(role_graph, final_agent_id, update_states=update_states):
events.append(event)
if isinstance(event, AgentOutputEvent):
messages[event.agent_id] = event.content
execution_order.append(event.agent_id)
elif isinstance(event, RunEndEvent):
final_answer = event.final_answer
final_agent = event.final_agent_id
total_tokens = event.total_tokens
total_time = event.total_time
errors_list = event.errors
yield event
stream = collecting_stream()
# Create a lazy result that becomes valid after stream is exhausted
class LazyResult:
def __init__(self, runner: MACPRunner):
self._runner = runner
self._result: MACPResult | None = None
def __getattr__(self, name: str) -> Any:
if self._result is None:
self._result = MACPResult(
messages=messages,
final_answer=final_answer,
final_agent_id=final_agent,
execution_order=execution_order,
total_tokens=total_tokens,
total_time=total_time,
errors=[ExecutionError(message=e, agent_id="", recoverable=False) for e in errors_list]
if errors_list
else None,
)
return getattr(self._result, name)
# LazyResult provides a proxy to access result attributes lazily
lazy_result: MACPResult = LazyResult(self) # type: ignore[assignment]
return stream, lazy_result