Swarms / swarms /structs /sequential_workflow.py
harshalmore31's picture
Synced repo using 'sync_with_huggingface' Github Action
d8d14f1 verified
from typing import List, Optional
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.output_types import OutputType
from concurrent.futures import ThreadPoolExecutor, as_completed
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="sequential_workflow")
class SequentialWorkflow:
"""
Initializes a SequentialWorkflow object, which orchestrates the execution of a sequence of agents.
Args:
name (str, optional): The name of the workflow. Defaults to "SequentialWorkflow".
description (str, optional): A description of the workflow. Defaults to "Sequential Workflow, where agents are executed in a sequence."
agents (List[Agent], optional): The list of agents in the workflow. Defaults to None.
max_loops (int, optional): The maximum number of loops to execute the workflow. Defaults to 1.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Raises:
ValueError: If agents list is None or empty, or if max_loops is 0
"""
def __init__(
self,
name: str = "SequentialWorkflow",
description: str = "Sequential Workflow, where agents are executed in a sequence.",
agents: List[Agent] = [],
max_loops: int = 1,
output_type: OutputType = "all",
return_json: bool = False,
shared_memory_system: callable = None,
*args,
**kwargs,
):
self.name = name
self.description = description
self.agents = agents
self.max_loops = max_loops
self.output_type = output_type
self.return_json = return_json
self.shared_memory_system = shared_memory_system
self.reliability_check()
self.flow = self.sequential_flow()
self.agent_rearrange = AgentRearrange(
name=name,
description=description,
agents=agents,
flow=self.flow,
max_loops=max_loops,
output_type=output_type,
return_json=return_json,
shared_memory_system=shared_memory_system,
*args,
**kwargs,
)
def sequential_flow(self):
# Only create flow if agents exist
if self.agents:
# Create flow by joining agent names with arrows
agent_names = []
for agent in self.agents:
try:
# Try to get agent_name, fallback to name if not available
agent_name = (
getattr(agent, "agent_name", None)
or agent.name
)
agent_names.append(agent_name)
except AttributeError:
logger.warning(
f"Could not get name for agent {agent}"
)
continue
if agent_names:
flow = " -> ".join(agent_names)
else:
flow = ""
logger.warning(
"No valid agent names found to create flow"
)
else:
flow = ""
logger.warning("No agents provided to create flow")
return flow
def reliability_check(self):
if self.agents is None or len(self.agents) == 0:
raise ValueError("Agents list cannot be None or empty")
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0")
logger.info("Checks completed your swarm is ready.")
def run(
self,
task: str,
img: Optional[str] = None,
device: str = "cpu",
all_cores: bool = False,
all_gpus: bool = False,
device_id: int = 0,
no_use_clusterops: bool = True,
*args,
**kwargs,
) -> str:
"""
Executes a task through the agents in the dynamically constructed flow.
Args:
task (str): The task for the agents to execute.
device (str): The device to use for the agents to execute.
all_cores (bool): Whether to use all cores.
all_gpus (bool): Whether to use all gpus.
device_id (int): The device id to use for the agents to execute.
no_use_clusterops (bool): Whether to use clusterops.
Returns:
str: The final result after processing through all agents.
Raises:
ValueError: If task is None or empty
Exception: If any error occurs during task execution
"""
try:
return self.agent_rearrange.run(
task=task,
img=img,
device=device,
all_cores=all_cores,
device_id=device_id,
all_gpus=all_gpus,
no_use_clusterops=no_use_clusterops,
*args,
**kwargs,
)
except Exception as e:
logger.error(
f"An error occurred while executing the task: {e}"
)
raise e
def __call__(self, task: str, *args, **kwargs) -> str:
return self.run(task, *args, **kwargs)
def run_batched(self, tasks: List[str]) -> List[str]:
"""
Executes a batch of tasks through the agents in the dynamically constructed flow.
Args:
tasks (List[str]): The tasks for the agents to execute.
Returns:
List[str]: The final results after processing through all agents.
Raises:
ValueError: If tasks is None or empty
Exception: If any error occurs during task execution
"""
if not tasks or not all(
isinstance(task, str) for task in tasks
):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
try:
return [self.agent_rearrange.run(task) for task in tasks]
except Exception as e:
logger.error(
f"An error occurred while executing the batch of tasks: {e}"
)
raise
async def run_async(self, task: str) -> str:
"""
Executes the task through the agents in the dynamically constructed flow asynchronously.
Args:
task (str): The task for the agents to execute.
Returns:
str: The final result after processing through all agents.
Raises:
ValueError: If task is None or empty
Exception: If any error occurs during task execution
"""
if not task or not isinstance(task, str):
raise ValueError("Task must be a non-empty string")
try:
return await self.agent_rearrange.run_async(task)
except Exception as e:
logger.error(
f"An error occurred while executing the task asynchronously: {e}"
)
raise
async def run_concurrent(self, tasks: List[str]) -> List[str]:
"""
Executes a batch of tasks through the agents in the dynamically constructed flow concurrently.
Args:
tasks (List[str]): The tasks for the agents to execute.
Returns:
List[str]: The final results after processing through all agents.
Raises:
ValueError: If tasks is None or empty
Exception: If any error occurs during task execution
"""
if not tasks or not all(
isinstance(task, str) for task in tasks
):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
try:
with ThreadPoolExecutor() as executor:
results = [
executor.submit(self.agent_rearrange.run, task)
for task in tasks
]
return [
result.result()
for result in as_completed(results)
]
except Exception as e:
logger.error(
f"An error occurred while executing the batch of tasks concurrently: {e}"
)
raise