Spaces:
Sleeping
Sleeping
import random | |
from swarms.structs.base_swarm import BaseSwarm | |
from typing import List | |
from swarms.structs.agent import Agent | |
from pydantic import BaseModel, Field | |
from typing import Optional | |
from datetime import datetime | |
from swarms.schemas.agent_step_schemas import ManySteps | |
import tenacity | |
from swarms.utils.loguru_logger import initialize_logger | |
logger = initialize_logger("round-robin") | |
datetime_stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
class MetadataSchema(BaseModel): | |
swarm_id: Optional[str] = Field( | |
..., description="Unique ID for the run" | |
) | |
name: Optional[str] = Field( | |
"RoundRobinSwarm", description="Name of the swarm" | |
) | |
task: Optional[str] = Field( | |
..., description="Task or query given to all agents" | |
) | |
description: Optional[str] = Field( | |
"Concurrent execution of multiple agents", | |
description="Description of the workflow", | |
) | |
agent_outputs: Optional[List[ManySteps]] = Field( | |
..., description="List of agent outputs and metadata" | |
) | |
timestamp: Optional[str] = Field( | |
default_factory=datetime.now, | |
description="Timestamp of the workflow execution", | |
) | |
max_loops: Optional[int] = Field( | |
1, description="Maximum number of loops to run" | |
) | |
class RoundRobinSwarm(BaseSwarm): | |
""" | |
A swarm implementation that executes tasks in a round-robin fashion. | |
Args: | |
agents (List[Agent], optional): List of agents in the swarm. Defaults to None. | |
verbose (bool, optional): Flag to enable verbose mode. Defaults to False. | |
max_loops (int, optional): Maximum number of loops to run. Defaults to 1. | |
callback (callable, optional): Callback function to be called after each loop. Defaults to None. | |
return_json_on (bool, optional): Flag to return the metadata as a JSON object. Defaults to False. | |
*args: Variable length argument list. | |
**kwargs: Arbitrary keyword arguments. | |
Attributes: | |
agents (List[Agent]): List of agents in the swarm. | |
verbose (bool): Flag to enable verbose mode. | |
max_loops (int): Maximum number of loops to run. | |
index (int): Current index of the agent being executed. | |
Methods: | |
run(task: str, *args, **kwargs) -> Any: Executes the given task on the agents in a round-robin fashion. | |
""" | |
def __init__( | |
self, | |
name: str = "RoundRobinSwarm", | |
description: str = "A swarm implementation that executes tasks in a round-robin fashion.", | |
agents: List[Agent] = None, | |
verbose: bool = False, | |
max_loops: int = 1, | |
callback: callable = None, | |
return_json_on: bool = False, | |
max_retries: int = 3, | |
*args, | |
**kwargs, | |
): | |
try: | |
super().__init__( | |
name=name, | |
description=description, | |
agents=agents, | |
*args, | |
**kwargs, | |
) | |
self.name = name | |
self.description = description | |
self.agents = agents or [] | |
self.verbose = verbose | |
self.max_loops = max_loops | |
self.callback = callback | |
self.return_json_on = return_json_on | |
self.index = 0 | |
self.max_retries = max_retries | |
# Store the metadata for the run | |
self.output_schema = MetadataSchema( | |
name=self.name, | |
swarm_id=datetime_stamp, | |
task="", | |
description=self.description, | |
agent_outputs=[], | |
timestamp=datetime_stamp, | |
max_loops=self.max_loops, | |
) | |
# Set the max loops for every agent | |
if self.agents: | |
for agent in self.agents: | |
agent.max_loops = random.randint(1, 5) | |
logger.info( | |
f"Successfully initialized {self.name} with {len(self.agents)} agents" | |
) | |
except Exception as e: | |
logger.error( | |
f"Failed to initialize {self.name}: {str(e)}" | |
) | |
raise | |
def _execute_agent( | |
self, agent: Agent, task: str, *args, **kwargs | |
) -> str: | |
"""Execute a single agent with retries and error handling""" | |
try: | |
logger.info( | |
f"Running Agent {agent.agent_name} on task: {task}" | |
) | |
result = agent.run(task, *args, **kwargs) | |
self.output_schema.agent_outputs.append( | |
agent.agent_output | |
) | |
return result | |
except Exception as e: | |
logger.error( | |
f"Error executing agent {agent.agent_name}: {str(e)}" | |
) | |
raise | |
def run(self, task: str, *args, **kwargs): | |
""" | |
Executes the given task on the agents in a round-robin fashion. | |
Args: | |
task (str): The task to be executed. | |
*args: Variable length argument list. | |
**kwargs: Arbitrary keyword arguments. | |
Returns: | |
Any: The result of the task execution. | |
Raises: | |
ValueError: If no agents are configured | |
Exception: If an exception occurs during task execution. | |
""" | |
if not self.agents: | |
logger.error("No agents configured for the swarm") | |
raise ValueError("No agents configured for the swarm") | |
try: | |
result = task | |
self.output_schema.task = task | |
n = len(self.agents) | |
logger.info( | |
f"Starting round-robin execution with task '{task}' on {n} agents" | |
) | |
for loop in range(self.max_loops): | |
logger.debug( | |
f"Starting loop {loop + 1}/{self.max_loops}" | |
) | |
for _ in range(n): | |
current_agent = self.agents[self.index] | |
try: | |
result = self._execute_agent( | |
current_agent, result, *args, **kwargs | |
) | |
finally: | |
self.index = (self.index + 1) % n | |
if self.callback: | |
logger.debug( | |
f"Executing callback for loop {loop + 1}" | |
) | |
try: | |
self.callback(loop, result) | |
except Exception as e: | |
logger.error( | |
f"Callback execution failed: {str(e)}" | |
) | |
logger.success( | |
f"Successfully completed {self.max_loops} loops of round-robin execution" | |
) | |
if self.return_json_on: | |
return self.export_metadata() | |
return result | |
except Exception as e: | |
logger.error(f"Round-robin execution failed: {str(e)}") | |
raise | |
def export_metadata(self): | |
"""Export the execution metadata as JSON""" | |
try: | |
return self.output_schema.model_dump_json(indent=4) | |
except Exception as e: | |
logger.error(f"Failed to export metadata: {str(e)}") | |
raise | |