omni_bot / swarms /structs /workflow.py
WAWAA's picture
Upload folder using huggingface_hub
4962437
raw
history blame
2.97 kB
from __future__ import annotations
import concurrent.futures
from typing import Any, Dict, List, Optional
from swarms.artifacts.error_artifact import ErrorArtifact
from swarms.structs.task import BaseTask
class StringTask(BaseTask):
def __init__(self, task):
super().__init__()
self.task = task
def execute(self) -> Any:
prompt = self.task.replace(
"{{ parent_input }}", self.parents[0].output if self.parents else ""
)
response = self.structure.llm(prompt)
self.output = response
return response
class Workflow:
"""
Workflows are ideal for prescriptive processes that need to be executed
sequentially.
They string together multiple tasks of varying types, and can use Short-Term Memory
or pass specific arguments downstream.
```
llm = LLM()
workflow = Workflow(llm)
workflow.add("What's the weather in miami")
workflow.add("Provide detauls for {{ parent_output }}")
workflow.add("Summarize the above information: {{ parent_output}})
workflow.run()
"""
def __init__(
self,
llm,
parallel: bool = False
):
self.llm = llm
self.tasks: List[BaseTask] = []
self.parallel = parallel
def add(
self,
task: BaseTask
) -> BaseTask:
task = StringTask(task)
if self.last_task():
self.last_task().add_child(task)
else:
task.structure = self
self.tasks.append(task)
return task
def first_task(self) -> Optional[BaseTask]:
return self.tasks[0] if self.tasks else None
def last_task(self) -> Optional[BaseTask]:
return self.tasks[-1] if self.tasks else None
def run(self, *args) -> BaseTask:
self._execution_args = args
[task.reset() for task in self.tasks]
if self.parallel:
with concurrent.futures.ThreadPoolExecutor() as executor:
list(executor.map(self.__run_from_task, [self.first_task]))
else:
self.__run_from_task(self.first_task())
self._execution_args = ()
return self.last_task()
def context(self, task: BaseTask) -> Dict[str, Any]:
context = super().context(task)
context.update(
{
"parent_output": task.parents[0].output.to_text() \
if task.parents and task.parents[0].output else None,
"parent": task.parents[0] if task.parents else None,
"child": task.children[0] if task.children else None
}
)
return context
def __run_from_task(self, task: Optional[BaseTask]) -> None:
if task is None:
return
else:
if isinstance(task.execute(), ErrorArtifact):
return
else:
self.__run_from_task(next(iter(task.children), None))