File size: 2,967 Bytes
4962437
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from __future__ import annotations

import concurrent.futures
from typing import Any, Dict, List, Optional

from swarms.artifacts.error_artifact import ErrorArtifact
from swarms.structs.task import BaseTask

class StringTask(BaseTask):
    def __init__(self, task):
        super().__init__()
        self.task = task

    def execute(self) -> Any:
        prompt = self.task.replace(
            "{{ parent_input }}", self.parents[0].output if self.parents else ""
        )

        response = self.structure.llm(prompt)
        self.output = response
        return response

class Workflow:
    """
    Workflows are ideal for prescriptive processes that need to be executed 
    sequentially. 
    They string together multiple tasks of varying types, and can use Short-Term Memory 
    or pass specific arguments downstream.



    ```
    llm = LLM()
    workflow = Workflow(llm)

    workflow.add("What's the weather in miami")
    workflow.add("Provide detauls for {{ parent_output }}")
    workflow.add("Summarize the above information: {{ parent_output}})

    workflow.run()


    """
    def __init__(
        self,
        llm,
        parallel: bool = False
    ):
        self.llm = llm
        self.tasks: List[BaseTask] = []
        self.parallel = parallel

    
    def add(
        self,
        task: BaseTask
    ) -> BaseTask:
        task = StringTask(task)

        if self.last_task():
            self.last_task().add_child(task)
        else:
            task.structure = self
            self.tasks.append(task)
        return task

    def first_task(self) -> Optional[BaseTask]:
        return self.tasks[0] if self.tasks else None
    
    def last_task(self) -> Optional[BaseTask]:
        return self.tasks[-1] if self.tasks else None

    def run(self, *args) -> BaseTask:
        self._execution_args = args

        [task.reset() for task in self.tasks]

        if self.parallel:
            with concurrent.futures.ThreadPoolExecutor() as executor:
                list(executor.map(self.__run_from_task, [self.first_task]))
        else:
            self.__run_from_task(self.first_task())

        self._execution_args = ()

        return self.last_task()

    
    def context(self, task: BaseTask) -> Dict[str, Any]:
        context = super().context(task)

        context.update(
            {
                "parent_output": task.parents[0].output.to_text() \
                    if task.parents and task.parents[0].output else None,
                "parent": task.parents[0] if task.parents else None,
                "child": task.children[0] if task.children else None
            }
        )
        return context

    
    def __run_from_task(self, task: Optional[BaseTask]) -> None:
        if task is None:
            return
        else:
            if isinstance(task.execute(), ErrorArtifact):
                return
            else:
                self.__run_from_task(next(iter(task.children), None))