Spaces:
Runtime error
Runtime error
File size: 2,967 Bytes
4962437 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
from __future__ import annotations
import concurrent.futures
from typing import Any, Dict, List, Optional
from swarms.artifacts.error_artifact import ErrorArtifact
from swarms.structs.task import BaseTask
class StringTask(BaseTask):
def __init__(self, task):
super().__init__()
self.task = task
def execute(self) -> Any:
prompt = self.task.replace(
"{{ parent_input }}", self.parents[0].output if self.parents else ""
)
response = self.structure.llm(prompt)
self.output = response
return response
class Workflow:
"""
Workflows are ideal for prescriptive processes that need to be executed
sequentially.
They string together multiple tasks of varying types, and can use Short-Term Memory
or pass specific arguments downstream.
```
llm = LLM()
workflow = Workflow(llm)
workflow.add("What's the weather in miami")
workflow.add("Provide detauls for {{ parent_output }}")
workflow.add("Summarize the above information: {{ parent_output}})
workflow.run()
"""
def __init__(
self,
llm,
parallel: bool = False
):
self.llm = llm
self.tasks: List[BaseTask] = []
self.parallel = parallel
def add(
self,
task: BaseTask
) -> BaseTask:
task = StringTask(task)
if self.last_task():
self.last_task().add_child(task)
else:
task.structure = self
self.tasks.append(task)
return task
def first_task(self) -> Optional[BaseTask]:
return self.tasks[0] if self.tasks else None
def last_task(self) -> Optional[BaseTask]:
return self.tasks[-1] if self.tasks else None
def run(self, *args) -> BaseTask:
self._execution_args = args
[task.reset() for task in self.tasks]
if self.parallel:
with concurrent.futures.ThreadPoolExecutor() as executor:
list(executor.map(self.__run_from_task, [self.first_task]))
else:
self.__run_from_task(self.first_task())
self._execution_args = ()
return self.last_task()
def context(self, task: BaseTask) -> Dict[str, Any]:
context = super().context(task)
context.update(
{
"parent_output": task.parents[0].output.to_text() \
if task.parents and task.parents[0].output else None,
"parent": task.parents[0] if task.parents else None,
"child": task.children[0] if task.children else None
}
)
return context
def __run_from_task(self, task: Optional[BaseTask]) -> None:
if task is None:
return
else:
if isinstance(task.execute(), ErrorArtifact):
return
else:
self.__run_from_task(next(iter(task.children), None))
|