File size: 5,061 Bytes
4962437
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import uuid
from abc import ABC
from typing import Any, Dict, List, Optional

from swarms.memory.schemas import Artifact, Status
from swarms.memory.schemas import Step as APIStep
from swarms.memory.schemas import Task as APITask


class Step(APIStep):
    additional_properties: Optional[Dict[str, str]] = None

class Task(APITask):
    steps: List[Step] = []

class NotFoundException(Exception):
    """
    Exception raised when a resource is not found.
    """

    def __init__(self, item_name: str, item_id: str):
        self.item_name = item_name
        self.item_id = item_id
        super().__init__(f"{item_name} with {item_id} not found.")

class TaskDB(ABC):
    async def create_task(
        self,
        input: Optional[str],
        additional_input: Any = None,
        artifacts: Optional[List[Artifact]] = None,
        steps: Optional[List[Step]] = None,
    ) -> Task:
        raise NotImplementedError

    async def create_step(
        self,
        task_id: str,
        name: Optional[str] = None,
        input: Optional[str] = None,
        is_last: bool = False,
        additional_properties: Optional[Dict[str, str]] = None,
    ) -> Step:
        raise NotImplementedError

    async def create_artifact(
        self,
        task_id: str,
        file_name: str,
        relative_path: Optional[str] = None,
        step_id: Optional[str] = None,
    ) -> Artifact:
        raise NotImplementedError

    async def get_task(self, task_id: str) -> Task:
        raise NotImplementedError

    async def get_step(self, task_id: str, step_id: str) -> Step:
        raise NotImplementedError

    async def get_artifact(self, task_id: str, artifact_id: str) -> Artifact:
        raise NotImplementedError

    async def list_tasks(self) -> List[Task]:
        raise NotImplementedError

    async def list_steps(
        self, task_id: str, status: Optional[Status] = None
    ) -> List[Step]:
        raise NotImplementedError


class InMemoryTaskDB(TaskDB):
    _tasks: Dict[str, Task] = {}

    async def create_task(
        self,
        input: Optional[str],
        additional_input: Any = None,
        artifacts: Optional[List[Artifact]] = None,
        steps: Optional[List[Step]] = None,
    ) -> Task:
        if not steps:
            steps = []
        if not artifacts:
            artifacts = []
        task_id = str(uuid.uuid4())
        task = Task(
            task_id=task_id,
            input=input,
            steps=steps,
            artifacts=artifacts,
            additional_input=additional_input,
        )
        self._tasks[task_id] = task
        return task

    async def create_step(
        self,
        task_id: str,
        name: Optional[str] = None,
        input: Optional[str] = None,
        is_last=False,
        additional_properties: Optional[Dict[str, Any]] = None,
    ) -> Step:
        step_id = str(uuid.uuid4())
        step = Step(
            task_id=task_id,
            step_id=step_id,
            name=name,
            input=input,
            status=Status.created,
            is_last=is_last,
            additional_properties=additional_properties,
        )
        task = await self.get_task(task_id)
        task.steps.append(step)
        return step

    async def get_task(self, task_id: str) -> Task:
        task = self._tasks.get(task_id, None)
        if not task:
            raise NotFoundException("Task", task_id)
        return task

    async def get_step(self, task_id: str, step_id: str) -> Step:
        task = await self.get_task(task_id)
        step = next(filter(lambda s: s.task_id == task_id, task.steps), None)
        if not step:
            raise NotFoundException("Step", step_id)
        return step

    async def get_artifact(self, task_id: str, artifact_id: str) -> Artifact:
        task = await self.get_task(task_id)
        artifact = next(
            filter(lambda a: a.artifact_id == artifact_id, task.artifacts), None
        )
        if not artifact:
            raise NotFoundException("Artifact", artifact_id)
        return artifact

    async def create_artifact(
        self,
        task_id: str,
        file_name: str,
        relative_path: Optional[str] = None,
        step_id: Optional[str] = None,
    ) -> Artifact:
        artifact_id = str(uuid.uuid4())
        artifact = Artifact(
            artifact_id=artifact_id, file_name=file_name, relative_path=relative_path
        )
        task = await self.get_task(task_id)
        task.artifacts.append(artifact)

        if step_id:
            step = await self.get_step(task_id, step_id)
            step.artifacts.append(artifact)

        return artifact

    async def list_tasks(self) -> List[Task]:
        return [task for task in self._tasks.values()]

    async def list_steps(
        self, task_id: str, status: Optional[Status] = None
    ) -> List[Step]:
        task = await self.get_task(task_id)
        steps = task.steps
        if status:
            steps = list(filter(lambda s: s.status == status, steps))
        return steps