from __future__ import annotations import concurrent.futures from typing import Any, Dict, List, Optional from swarms.artifacts.error_artifact import ErrorArtifact from swarms.structs.task import BaseTask class StringTask(BaseTask): def __init__(self, task): super().__init__() self.task = task def execute(self) -> Any: prompt = self.task.replace( "{{ parent_input }}", self.parents[0].output if self.parents else "" ) response = self.structure.llm(prompt) self.output = response return response class Workflow: """ Workflows are ideal for prescriptive processes that need to be executed sequentially. They string together multiple tasks of varying types, and can use Short-Term Memory or pass specific arguments downstream. ``` llm = LLM() workflow = Workflow(llm) workflow.add("What's the weather in miami") workflow.add("Provide detauls for {{ parent_output }}") workflow.add("Summarize the above information: {{ parent_output}}) workflow.run() """ def __init__( self, llm, parallel: bool = False ): self.llm = llm self.tasks: List[BaseTask] = [] self.parallel = parallel def add( self, task: BaseTask ) -> BaseTask: task = StringTask(task) if self.last_task(): self.last_task().add_child(task) else: task.structure = self self.tasks.append(task) return task def first_task(self) -> Optional[BaseTask]: return self.tasks[0] if self.tasks else None def last_task(self) -> Optional[BaseTask]: return self.tasks[-1] if self.tasks else None def run(self, *args) -> BaseTask: self._execution_args = args [task.reset() for task in self.tasks] if self.parallel: with concurrent.futures.ThreadPoolExecutor() as executor: list(executor.map(self.__run_from_task, [self.first_task])) else: self.__run_from_task(self.first_task()) self._execution_args = () return self.last_task() def context(self, task: BaseTask) -> Dict[str, Any]: context = super().context(task) context.update( { "parent_output": task.parents[0].output.to_text() \ if task.parents and task.parents[0].output else None, "parent": task.parents[0] if task.parents else None, "child": task.children[0] if task.children else None } ) return context def __run_from_task(self, task: Optional[BaseTask]) -> None: if task is None: return else: if isinstance(task.execute(), ErrorArtifact): return else: self.__run_from_task(next(iter(task.children), None))