Spaces:
Sleeping
Sleeping
from typing import List, Optional | |
from swarms.structs.agent import Agent | |
from swarms.structs.rearrange import AgentRearrange | |
from swarms.structs.output_types import OutputType | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from swarms.utils.loguru_logger import initialize_logger | |
logger = initialize_logger(log_folder="sequential_workflow") | |
class SequentialWorkflow: | |
""" | |
Initializes a SequentialWorkflow object, which orchestrates the execution of a sequence of agents. | |
Args: | |
name (str, optional): The name of the workflow. Defaults to "SequentialWorkflow". | |
description (str, optional): A description of the workflow. Defaults to "Sequential Workflow, where agents are executed in a sequence." | |
agents (List[Agent], optional): The list of agents in the workflow. Defaults to None. | |
max_loops (int, optional): The maximum number of loops to execute the workflow. Defaults to 1. | |
*args: Variable length argument list. | |
**kwargs: Arbitrary keyword arguments. | |
Raises: | |
ValueError: If agents list is None or empty, or if max_loops is 0 | |
""" | |
def __init__( | |
self, | |
name: str = "SequentialWorkflow", | |
description: str = "Sequential Workflow, where agents are executed in a sequence.", | |
agents: List[Agent] = [], | |
max_loops: int = 1, | |
output_type: OutputType = "all", | |
return_json: bool = False, | |
shared_memory_system: callable = None, | |
*args, | |
**kwargs, | |
): | |
self.name = name | |
self.description = description | |
self.agents = agents | |
self.max_loops = max_loops | |
self.output_type = output_type | |
self.return_json = return_json | |
self.shared_memory_system = shared_memory_system | |
self.reliability_check() | |
self.flow = self.sequential_flow() | |
self.agent_rearrange = AgentRearrange( | |
name=name, | |
description=description, | |
agents=agents, | |
flow=self.flow, | |
max_loops=max_loops, | |
output_type=output_type, | |
return_json=return_json, | |
shared_memory_system=shared_memory_system, | |
*args, | |
**kwargs, | |
) | |
def sequential_flow(self): | |
# Only create flow if agents exist | |
if self.agents: | |
# Create flow by joining agent names with arrows | |
agent_names = [] | |
for agent in self.agents: | |
try: | |
# Try to get agent_name, fallback to name if not available | |
agent_name = ( | |
getattr(agent, "agent_name", None) | |
or agent.name | |
) | |
agent_names.append(agent_name) | |
except AttributeError: | |
logger.warning( | |
f"Could not get name for agent {agent}" | |
) | |
continue | |
if agent_names: | |
flow = " -> ".join(agent_names) | |
else: | |
flow = "" | |
logger.warning( | |
"No valid agent names found to create flow" | |
) | |
else: | |
flow = "" | |
logger.warning("No agents provided to create flow") | |
return flow | |
def reliability_check(self): | |
if self.agents is None or len(self.agents) == 0: | |
raise ValueError("Agents list cannot be None or empty") | |
if self.max_loops == 0: | |
raise ValueError("max_loops cannot be 0") | |
logger.info("Checks completed your swarm is ready.") | |
def run( | |
self, | |
task: str, | |
img: Optional[str] = None, | |
device: str = "cpu", | |
all_cores: bool = False, | |
all_gpus: bool = False, | |
device_id: int = 0, | |
no_use_clusterops: bool = True, | |
*args, | |
**kwargs, | |
) -> str: | |
""" | |
Executes a task through the agents in the dynamically constructed flow. | |
Args: | |
task (str): The task for the agents to execute. | |
device (str): The device to use for the agents to execute. | |
all_cores (bool): Whether to use all cores. | |
all_gpus (bool): Whether to use all gpus. | |
device_id (int): The device id to use for the agents to execute. | |
no_use_clusterops (bool): Whether to use clusterops. | |
Returns: | |
str: The final result after processing through all agents. | |
Raises: | |
ValueError: If task is None or empty | |
Exception: If any error occurs during task execution | |
""" | |
try: | |
return self.agent_rearrange.run( | |
task=task, | |
img=img, | |
device=device, | |
all_cores=all_cores, | |
device_id=device_id, | |
all_gpus=all_gpus, | |
no_use_clusterops=no_use_clusterops, | |
*args, | |
**kwargs, | |
) | |
except Exception as e: | |
logger.error( | |
f"An error occurred while executing the task: {e}" | |
) | |
raise e | |
def __call__(self, task: str, *args, **kwargs) -> str: | |
return self.run(task, *args, **kwargs) | |
def run_batched(self, tasks: List[str]) -> List[str]: | |
""" | |
Executes a batch of tasks through the agents in the dynamically constructed flow. | |
Args: | |
tasks (List[str]): The tasks for the agents to execute. | |
Returns: | |
List[str]: The final results after processing through all agents. | |
Raises: | |
ValueError: If tasks is None or empty | |
Exception: If any error occurs during task execution | |
""" | |
if not tasks or not all( | |
isinstance(task, str) for task in tasks | |
): | |
raise ValueError( | |
"Tasks must be a non-empty list of strings" | |
) | |
try: | |
return [self.agent_rearrange.run(task) for task in tasks] | |
except Exception as e: | |
logger.error( | |
f"An error occurred while executing the batch of tasks: {e}" | |
) | |
raise | |
async def run_async(self, task: str) -> str: | |
""" | |
Executes the task through the agents in the dynamically constructed flow asynchronously. | |
Args: | |
task (str): The task for the agents to execute. | |
Returns: | |
str: The final result after processing through all agents. | |
Raises: | |
ValueError: If task is None or empty | |
Exception: If any error occurs during task execution | |
""" | |
if not task or not isinstance(task, str): | |
raise ValueError("Task must be a non-empty string") | |
try: | |
return await self.agent_rearrange.run_async(task) | |
except Exception as e: | |
logger.error( | |
f"An error occurred while executing the task asynchronously: {e}" | |
) | |
raise | |
async def run_concurrent(self, tasks: List[str]) -> List[str]: | |
""" | |
Executes a batch of tasks through the agents in the dynamically constructed flow concurrently. | |
Args: | |
tasks (List[str]): The tasks for the agents to execute. | |
Returns: | |
List[str]: The final results after processing through all agents. | |
Raises: | |
ValueError: If tasks is None or empty | |
Exception: If any error occurs during task execution | |
""" | |
if not tasks or not all( | |
isinstance(task, str) for task in tasks | |
): | |
raise ValueError( | |
"Tasks must be a non-empty list of strings" | |
) | |
try: | |
with ThreadPoolExecutor() as executor: | |
results = [ | |
executor.submit(self.agent_rearrange.run, task) | |
for task in tasks | |
] | |
return [ | |
result.result() | |
for result in as_completed(results) | |
] | |
except Exception as e: | |
logger.error( | |
f"An error occurred while executing the batch of tasks concurrently: {e}" | |
) | |
raise | |