| """ |
| AETHER Agent Orchestration. |
| Integrates: |
| - smolagents multi-agent hierarchy (Manager + Workers) |
| - MLPO: Multi-agent guided Leader Policy Optimization |
| - BabyAGI task creation/prioritization/execution loop |
| - Agentic Neural Networks: textual backpropagation |
| - Yunjue Agent: Manager/Executor/Developer/Integrator/Merger/Aggregator roles |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| from typing import Dict, List, Any, Optional, Callable |
| import logging |
| import time |
| from collections import deque |
|
|
| logger = logging.getLogger("AETHER.Agents") |
|
|
|
|
| class AgentRole: |
| """Role definitions inspired by Yunjue Agent multi-agent system.""" |
| MANAGER = "manager" |
| EXECUTOR = "executor" |
| DEVELOPER = "developer" |
| INTEGRATOR = "integrator" |
| MERGER = "merger" |
| AGGREGATOR = "aggregator" |
| RESEARCHER = "researcher" |
|
|
|
|
| class BaseAgent(nn.Module): |
| """Base agent with policy network. Implements MLPO-style leader policy.""" |
| |
| def __init__(self, role: str, hidden_dim: int = 128, |
| vocab_size: int = 32000): |
| super().__init__() |
| self.role = role |
| self.hidden_dim = hidden_dim |
| |
| self.encoder = nn.Sequential( |
| nn.Embedding(vocab_size, hidden_dim), |
| nn.LSTM(hidden_dim, hidden_dim, batch_first=True), |
| ) |
| self.policy_head = nn.Linear(hidden_dim, hidden_dim) |
| self.value_head = nn.Linear(hidden_dim, 1) |
| |
| self.task_history: deque = deque(maxlen=100) |
| self.performance_log: List[float] = [] |
| |
| def forward(self, input_ids: torch.Tensor) -> Dict[str, torch.Tensor]: |
| embeds = self.encoder[0](input_ids) |
| lstm_out, _ = self.encoder[1](embeds) |
| hidden = lstm_out[:, -1, :] |
| |
| return { |
| "policy_logits": self.policy_head(hidden), |
| "value": self.value_head(hidden), |
| "hidden": hidden, |
| } |
| |
| def act(self, observation: str) -> str: |
| self.task_history.append({ |
| "observation": observation, |
| "timestamp": time.time(), |
| }) |
| |
| role_actions = { |
| AgentRole.MANAGER: f"[MANAGER] Decomposing task: '{observation[:50]}...'", |
| AgentRole.EXECUTOR: f"[EXECUTOR] Executing: '{observation[:50]}...'", |
| AgentRole.DEVELOPER: f"[DEVELOPER] Synthesizing tool for: '{observation[:50]}...'", |
| AgentRole.INTEGRATOR: f"[INTEGRATOR] Integrating components for: '{observation[:50]}...'", |
| AgentRole.MERGER: f"[MERGER] Consolidating tools for: '{observation[:50]}...'", |
| AgentRole.AGGREGATOR: f"[AGGREGATOR] Aggregating results for: '{observation[:50]}...'", |
| AgentRole.RESEARCHER: f"[RESEARCHER] Exploring knowledge for: '{observation[:50]}...'", |
| } |
| |
| return role_actions.get(self.role, f"[{self.role.upper()}] Processing: '{observation}'") |
| |
| def update(self, reward: float): |
| self.performance_log.append(reward) |
|
|
|
|
| class HierarchicalAgent(nn.Module): |
| """ |
| HiMAC-style hierarchical agent with Macro-Policy and Micro-Policy. |
| Macro: generates blueprint (sub-goals) |
| Micro: executes atomic actions conditioned on blueprint |
| """ |
| |
| def __init__(self, macro_dim: int = 256, micro_dim: int = 128, |
| num_subgoals: int = 5): |
| super().__init__() |
| self.macro_dim = macro_dim |
| self.micro_dim = micro_dim |
| self.num_subgoals = num_subgoals |
| |
| self.macro_encoder = nn.LSTM(macro_dim, macro_dim, batch_first=True) |
| self.macro_decoder = nn.LSTM(macro_dim, macro_dim, batch_first=True) |
| self.subgoal_head = nn.Linear(macro_dim, num_subgoals) |
| self.termination_token = nn.Parameter(torch.randn(macro_dim)) |
| |
| self.micro_encoder = nn.LSTM(micro_dim + macro_dim, micro_dim, batch_first=True) |
| self.action_head = nn.Linear(micro_dim, 50) |
| |
| self.current_blueprint: Optional[List[str]] = None |
| self.active_subgoal_idx = 0 |
| |
| def generate_blueprint(self, task_embedding: torch.Tensor) -> List[str]: |
| batch_size = task_embedding.size(0) |
| hidden = (torch.zeros(1, batch_size, self.macro_dim), |
| torch.zeros(1, batch_size, self.macro_dim)) |
| |
| blueprints = [] |
| input_token = task_embedding.unsqueeze(1) |
| |
| for _ in range(self.num_subgoals): |
| out, hidden = self.macro_decoder(input_token, hidden) |
| subgoal_logits = self.subgoal_head(out.squeeze(1)) |
| subgoal_id = torch.argmax(subgoal_logits, dim=-1) |
| |
| similarity = torch.cosine_similarity(out.squeeze(1), |
| self.termination_token.unsqueeze(0)) |
| if similarity.item() > 0.9: |
| break |
| |
| blueprints.append(f"subgoal_{subgoal_id.item()}") |
| input_token = out |
| |
| self.current_blueprint = blueprints |
| self.active_subgoal_idx = 0 |
| return blueprints |
| |
| def execute_action(self, observation: torch.Tensor, |
| blueprint: Optional[List[str]] = None) -> torch.Tensor: |
| if blueprint is not None: |
| self.current_blueprint = blueprint |
| |
| if not self.current_blueprint: |
| return torch.zeros(1, 50) |
| |
| active_subgoal = self.current_blueprint[ |
| min(self.active_subgoal_idx, len(self.current_blueprint) - 1) |
| ] |
| |
| subgoal_embed = torch.randn(1, self.macro_dim) |
| combined = torch.cat([observation, subgoal_embed], dim=-1) |
| |
| out, _ = self.micro_encoder(combined.unsqueeze(1)) |
| action_logits = self.action_head(out.squeeze(1)) |
| |
| return action_logits |
| |
| def advance_subgoal(self): |
| self.active_subgoal_idx += 1 |
| |
| def reset(self): |
| self.current_blueprint = None |
| self.active_subgoal_idx = 0 |
|
|
|
|
| class BabyAGILoop: |
| """BabyAGI-inspired task-driven autonomous loop.""" |
| |
| def __init__(self, objective: str, max_iterations: int = 50): |
| self.objective = objective |
| self.max_iterations = max_iterations |
| self.task_list: deque = deque() |
| self.completed_tasks: List[Dict] = [] |
| self.results: Dict[int, Any] = {} |
| self.iteration = 0 |
| |
| def create_tasks(self, previous_result: str, task_description: str) -> List[str]: |
| new_tasks = [ |
| f"Sub-task {len(self.task_list) + i}: Analyze {previous_result[:30]}..." |
| for i in range(3) |
| ] |
| return new_tasks |
| |
| def prioritize_tasks(self) -> List[str]: |
| tasks = list(self.task_list) |
| scores = [] |
| for task in tasks: |
| overlap = sum(1 for word in self.objective.lower().split() |
| if word in task.lower()) |
| scores.append(overlap) |
| |
| sorted_tasks = [t for _, t in sorted(zip(scores, tasks), reverse=True)] |
| return sorted_tasks |
| |
| def execute_task(self, task: str, agent: BaseAgent) -> str: |
| result = agent.act(task) |
| self.completed_tasks.append({ |
| "task": task, |
| "result": result, |
| "iteration": self.iteration, |
| }) |
| return result |
| |
| def run(self, execution_agent: BaseAgent) -> Dict[str, Any]: |
| self.task_list.append(self.objective) |
| |
| while self.iteration < self.max_iterations and self.task_list: |
| prioritized = self.prioritize_tasks() |
| self.task_list = deque(prioritized) |
| |
| current_task = self.task_list.popleft() |
| previous_result = self.completed_tasks[-1]["result"] if self.completed_tasks else "" |
| |
| result = self.execute_task(current_task, execution_agent) |
| self.results[self.iteration] = result |
| |
| new_tasks = self.create_tasks(result, current_task) |
| for t in new_tasks: |
| if t not in self.task_list: |
| self.task_list.append(t) |
| |
| self.iteration += 1 |
| |
| logger.info(f"BabyAGI iteration {self.iteration}: " |
| f"tasks_remaining={len(self.task_list)}, " |
| f"completed={len(self.completed_tasks)}") |
| |
| return { |
| "completed_tasks": self.completed_tasks, |
| "results": self.results, |
| "iterations": self.iteration, |
| "objective": self.objective, |
| } |
|
|
|
|
| class AetherAgentOrchestrator(nn.Module): |
| """ |
| Multi-agent orchestrator combining: |
| - smolagents hierarchical delegation |
| - MLPO: train single leader, peers untrained |
| - Agentic Neural Networks: textual backpropagation |
| - CoMAS: co-evolving via interaction rewards |
| """ |
| |
| def __init__(self, config): |
| super().__init__() |
| self.config = config |
| |
| self.agents: Dict[str, BaseAgent] = nn.ModuleDict({ |
| "manager": BaseAgent(AgentRole.MANAGER, hidden_dim=config.macro_policy_dim), |
| "executor": BaseAgent(AgentRole.EXECUTOR, hidden_dim=config.micro_policy_dim), |
| "developer": BaseAgent(AgentRole.DEVELOPER, hidden_dim=config.micro_policy_dim), |
| "researcher": BaseAgent(AgentRole.RESEARCHER, hidden_dim=config.micro_policy_dim), |
| }) |
| |
| self.leader = BaseAgent(AgentRole.MANAGER, hidden_dim=config.macro_policy_dim) |
| |
| self.hierarchical = HierarchicalAgent( |
| macro_dim=config.macro_policy_dim, |
| micro_dim=config.micro_policy_dim, |
| ) |
| |
| self.routing_weights = nn.Parameter(torch.ones(len(self.agents))) |
| self.aggregation_gate = nn.Softmax(dim=0) |
| |
| self.agent_tasks: Dict[str, BabyAGILoop] = {} |
| |
| self.task_count = 0 |
| self.agent_interactions: List[Dict] = [] |
| |
| def forward(self, task: str, context: Dict[str, Any]) -> Dict[str, Any]: |
| task_embed = torch.randn(1, self.config.macro_policy_dim) |
| blueprint = self.hierarchical.generate_blueprint(task_embed) |
| |
| routing_probs = self.aggregation_gate(self.routing_weights) |
| |
| agent_outputs = {} |
| for i, (name, agent) in enumerate(self.agents.items()): |
| if name == "manager": |
| continue |
| |
| weight = routing_probs[i].item() |
| if weight < 0.15: |
| continue |
| |
| sub_task = blueprint[min(i, len(blueprint) - 1)] if blueprint else task |
| output = agent.act(f"[{name}] {sub_task}") |
| agent_outputs[name] = { |
| "output": output, |
| "weight": weight, |
| "sub_task": sub_task, |
| } |
| |
| synthesized = self.leader.act( |
| f"Synthesize: {task} with inputs: {list(agent_outputs.keys())}" |
| ) |
| |
| self.agent_interactions.append({ |
| "task": task, |
| "blueprint": blueprint, |
| "agent_outputs": agent_outputs, |
| "leader_synthesis": synthesized, |
| "routing_probs": routing_probs.detach().cpu().tolist(), |
| "timestamp": time.time(), |
| }) |
| |
| self.task_count += 1 |
| |
| return { |
| "output": synthesized, |
| "blueprint": blueprint, |
| "agent_outputs": agent_outputs, |
| "routing_weights": routing_probs.detach().cpu().tolist(), |
| } |
| |
| def execute(self, task: str, kg_context: Any, context: Dict[str, Any]) -> Dict[str, Any]: |
| return self.forward(task, context) |
| |
| def textual_backprop(self, global_gradient: str, |
| performance_feedback: float, |
| beta: float = 0.5) -> Dict[str, str]: |
| updates = {} |
| for name, agent in self.agents.items(): |
| local_grad = f"{global_gradient} + Agent {name} performance: {performance_feedback}" |
| |
| if hasattr(agent, 'previous_gradient'): |
| blended = f"0.7*{local_grad} + 0.3*{agent.previous_gradient}" |
| else: |
| blended = local_grad |
| |
| agent.previous_gradient = blended |
| updates[name] = blended |
| |
| self.routing_weights.data += performance_feedback * 0.01 |
| |
| return updates |
| |
| def co_evolve_interactions(self) -> List[Dict]: |
| rewards = [] |
| |
| for interaction in self.agent_interactions[-10:]: |
| num_agents_involved = len(interaction.get("agent_outputs", {})) |
| blueprint_complexity = len(interaction.get("blueprint", [])) |
| |
| reward = num_agents_involved * 0.1 + min(blueprint_complexity * 0.05, 0.5) |
| rewards.append({ |
| "interaction_id": id(interaction), |
| "reward": reward, |
| "agents_involved": num_agents_involved, |
| }) |
| |
| return rewards |
| |
| def run_babyagi(self, objective: str, max_iterations: int = 20) -> Dict[str, Any]: |
| loop = BabyAGILoop(objective, max_iterations) |
| result = loop.run(self.agents["manager"]) |
| self.agent_tasks[objective] = loop |
| return result |
| |
| def stats(self) -> Dict[str, Any]: |
| return { |
| "total_tasks": self.task_count, |
| "num_agents": len(self.agents), |
| "total_interactions": len(self.agent_interactions), |
| "routing_weights": self.routing_weights.detach().cpu().tolist(), |
| "active_tasks": len(self.agent_tasks), |
| } |
|
|