Spaces:
Sleeping
Sleeping
File size: 8,266 Bytes
d8d14f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
from typing import List, Optional
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.output_types import OutputType
from concurrent.futures import ThreadPoolExecutor, as_completed
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="sequential_workflow")
class SequentialWorkflow:
"""
Initializes a SequentialWorkflow object, which orchestrates the execution of a sequence of agents.
Args:
name (str, optional): The name of the workflow. Defaults to "SequentialWorkflow".
description (str, optional): A description of the workflow. Defaults to "Sequential Workflow, where agents are executed in a sequence."
agents (List[Agent], optional): The list of agents in the workflow. Defaults to None.
max_loops (int, optional): The maximum number of loops to execute the workflow. Defaults to 1.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Raises:
ValueError: If agents list is None or empty, or if max_loops is 0
"""
def __init__(
self,
name: str = "SequentialWorkflow",
description: str = "Sequential Workflow, where agents are executed in a sequence.",
agents: List[Agent] = [],
max_loops: int = 1,
output_type: OutputType = "all",
return_json: bool = False,
shared_memory_system: callable = None,
*args,
**kwargs,
):
self.name = name
self.description = description
self.agents = agents
self.max_loops = max_loops
self.output_type = output_type
self.return_json = return_json
self.shared_memory_system = shared_memory_system
self.reliability_check()
self.flow = self.sequential_flow()
self.agent_rearrange = AgentRearrange(
name=name,
description=description,
agents=agents,
flow=self.flow,
max_loops=max_loops,
output_type=output_type,
return_json=return_json,
shared_memory_system=shared_memory_system,
*args,
**kwargs,
)
def sequential_flow(self):
# Only create flow if agents exist
if self.agents:
# Create flow by joining agent names with arrows
agent_names = []
for agent in self.agents:
try:
# Try to get agent_name, fallback to name if not available
agent_name = (
getattr(agent, "agent_name", None)
or agent.name
)
agent_names.append(agent_name)
except AttributeError:
logger.warning(
f"Could not get name for agent {agent}"
)
continue
if agent_names:
flow = " -> ".join(agent_names)
else:
flow = ""
logger.warning(
"No valid agent names found to create flow"
)
else:
flow = ""
logger.warning("No agents provided to create flow")
return flow
def reliability_check(self):
if self.agents is None or len(self.agents) == 0:
raise ValueError("Agents list cannot be None or empty")
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0")
logger.info("Checks completed your swarm is ready.")
def run(
self,
task: str,
img: Optional[str] = None,
device: str = "cpu",
all_cores: bool = False,
all_gpus: bool = False,
device_id: int = 0,
no_use_clusterops: bool = True,
*args,
**kwargs,
) -> str:
"""
Executes a task through the agents in the dynamically constructed flow.
Args:
task (str): The task for the agents to execute.
device (str): The device to use for the agents to execute.
all_cores (bool): Whether to use all cores.
all_gpus (bool): Whether to use all gpus.
device_id (int): The device id to use for the agents to execute.
no_use_clusterops (bool): Whether to use clusterops.
Returns:
str: The final result after processing through all agents.
Raises:
ValueError: If task is None or empty
Exception: If any error occurs during task execution
"""
try:
return self.agent_rearrange.run(
task=task,
img=img,
device=device,
all_cores=all_cores,
device_id=device_id,
all_gpus=all_gpus,
no_use_clusterops=no_use_clusterops,
*args,
**kwargs,
)
except Exception as e:
logger.error(
f"An error occurred while executing the task: {e}"
)
raise e
def __call__(self, task: str, *args, **kwargs) -> str:
return self.run(task, *args, **kwargs)
def run_batched(self, tasks: List[str]) -> List[str]:
"""
Executes a batch of tasks through the agents in the dynamically constructed flow.
Args:
tasks (List[str]): The tasks for the agents to execute.
Returns:
List[str]: The final results after processing through all agents.
Raises:
ValueError: If tasks is None or empty
Exception: If any error occurs during task execution
"""
if not tasks or not all(
isinstance(task, str) for task in tasks
):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
try:
return [self.agent_rearrange.run(task) for task in tasks]
except Exception as e:
logger.error(
f"An error occurred while executing the batch of tasks: {e}"
)
raise
async def run_async(self, task: str) -> str:
"""
Executes the task through the agents in the dynamically constructed flow asynchronously.
Args:
task (str): The task for the agents to execute.
Returns:
str: The final result after processing through all agents.
Raises:
ValueError: If task is None or empty
Exception: If any error occurs during task execution
"""
if not task or not isinstance(task, str):
raise ValueError("Task must be a non-empty string")
try:
return await self.agent_rearrange.run_async(task)
except Exception as e:
logger.error(
f"An error occurred while executing the task asynchronously: {e}"
)
raise
async def run_concurrent(self, tasks: List[str]) -> List[str]:
"""
Executes a batch of tasks through the agents in the dynamically constructed flow concurrently.
Args:
tasks (List[str]): The tasks for the agents to execute.
Returns:
List[str]: The final results after processing through all agents.
Raises:
ValueError: If tasks is None or empty
Exception: If any error occurs during task execution
"""
if not tasks or not all(
isinstance(task, str) for task in tasks
):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
try:
with ThreadPoolExecutor() as executor:
results = [
executor.submit(self.agent_rearrange.run, task)
for task in tasks
]
return [
result.result()
for result in as_completed(results)
]
except Exception as e:
logger.error(
f"An error occurred while executing the batch of tasks concurrently: {e}"
)
raise
|