File size: 9,884 Bytes
d8d14f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import asyncio
import csv
from datetime import datetime
import os
import uuid
from typing import List, Union

import aiofiles
from pydantic import BaseModel, Field

from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.file_processing import create_file_in_folder
from swarms.telemetry.capture_sys_data import log_agent_data
from swarms.utils.loguru_logger import initialize_logger

logger = initialize_logger(log_folder="spreadsheet_swarm")

# Replace timestamp-based time with a UUID for file naming
run_id = uuid.uuid4().hex  # Unique identifier for each run

class AgentOutput(BaseModel):
    agent_name: str
    task: str
    result: str
    timestamp: str


class SwarmRunMetadata(BaseModel):
    run_id: str = Field(
        default_factory=lambda: f"spreadsheet_swarm_run_{run_id}"
    )
    name: str
    description: str
    agents: List[str]
    start_time: str = Field(
        default_factory=lambda: str(datetime.now().timestamp()),  # Numeric timestamp
        description="The start time of the swarm run.",
    )
    end_time: str
    tasks_completed: int
    outputs: List[AgentOutput]
    number_of_agents: int = Field(
        ...,
        description="The number of agents participating in the swarm.",
    )


class SpreadSheetSwarm(BaseSwarm):
    """
    A swarm that processes tasks concurrently using multiple agents.

    Args:
        name (str, optional): The name of the swarm. Defaults to "Spreadsheet-Swarm".
        description (str, optional): The description of the swarm. Defaults to "A swarm that processes tasks concurrently using multiple agents.".
        agents (Union[Agent, List[Agent]], optional): The agents participating in the swarm. Defaults to an empty list.
        autosave_on (bool, optional): Whether to enable autosave of swarm metadata. Defaults to True.
        save_file_path (str, optional): The file path to save the swarm metadata as a CSV file. Defaults to "spreedsheet_swarm.csv".
        max_loops (int, optional): The number of times to repeat the swarm tasks. Defaults to 1.
        workspace_dir (str, optional): The directory path of the workspace. Defaults to the value of the "WORKSPACE_DIR" environment variable.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.
    """

    def __init__(
        self,
        name: str = "Spreadsheet-Swarm",
        description: str = "A swarm that processes tasks concurrently using multiple agents and saves the metadata to a CSV file.",
        agents: Union[Agent, List[Agent]] = [],
        autosave_on: bool = True,
        save_file_path: str = None,
        max_loops: int = 1,
        workspace_dir: str = os.getenv("WORKSPACE_DIR"),
        *args,
        **kwargs,
    ):
        super().__init__(
            name=name,
            description=description,
            agents=agents if isinstance(agents, list) else [agents],
            *args,
            **kwargs,
        )
        self.name = name
        self.description = description
        self.save_file_path = save_file_path
        self.autosave_on = autosave_on
        self.max_loops = max_loops
        self.workspace_dir = workspace_dir

        # Create a timestamp without colons or periods
        timestamp = datetime.now().isoformat().replace(":", "_").replace(".", "_")

        # Use this timestamp in the CSV filename
        self.save_file_path = f"spreadsheet_swarm_{timestamp}_run_id_{run_id}.csv"

        self.metadata = SwarmRunMetadata(
            run_id=f"spreadsheet_swarm_run_{run_id}",
            name=name,
            description=description,
            agents=[agent.name for agent in agents],
            start_time=str(datetime.now().timestamp()),  # Numeric timestamp
            end_time="",
            tasks_completed=0,
            outputs=[],
            number_of_agents=len(agents),
        )

        self.reliability_check()

    def reliability_check(self):
        """
        Check the reliability of the swarm.

        Raises:
            ValueError: If no agents are provided or no save file path is provided.
        """
        logger.info("Checking the reliability of the swarm...")

        if not self.agents:
            raise ValueError("No agents are provided.")
        if not self.save_file_path:
            raise ValueError("No save file path is provided.")
        if not self.max_loops:
            raise ValueError("No max loops are provided.")

        logger.info("Swarm reliability check passed.")
        logger.info("Swarm is ready to run.")

    # @profile_func
    def run(self, task: str, *args, **kwargs):
        """
        Run the swarm with the specified task.

        Args:
            task (str): The task to be executed by the swarm.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments.

        Returns:
            str: The JSON representation of the swarm metadata.

        """
        logger.info(f"Running the swarm with task: {task}")
        self.metadata.start_time = str(datetime.now().timestamp())  # Numeric timestamp

        # Check if we're already in an event loop
        if asyncio.get_event_loop().is_running():
            # If so, create and run tasks directly using `create_task` without `asyncio.run`
            task_future = asyncio.create_task(self._run_tasks(task, *args, **kwargs))
            asyncio.get_event_loop().run_until_complete(task_future)
        else:
            # If no event loop is running, run using `asyncio.run`
            asyncio.run(self._run_tasks(task, *args, **kwargs))

        self.metadata.end_time = str(datetime.now().timestamp())  # Numeric timestamp

        # Synchronously save metadata
        logger.info("Saving metadata to CSV and JSON...")
        asyncio.run(self._save_metadata())

        if self.autosave_on:
            self.data_to_json_file()

        print(log_agent_data(self.metadata.model_dump()))

        return self.metadata.model_dump_json(indent=4)

    async def _run_tasks(self, task: str, *args, **kwargs):
        """
        Run the swarm tasks concurrently.

        Args:
            task (str): The task to be executed by the swarm.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments.
        """
        tasks = []
        for _ in range(self.max_loops):
            for agent in self.agents:
                # Use asyncio.to_thread to run the blocking task in a thread pool
                tasks.append(
                    asyncio.to_thread(
                        self._run_agent_task,
                        agent,
                        task,
                        *args,
                        **kwargs,
                    )
                )

        # Run all tasks concurrently
        results = await asyncio.gather(*tasks)

        # Process the results
        for result in results:
            self._track_output(*result)

    def _run_agent_task(self, agent, task, *args, **kwargs):
        """
        Run a single agent's task in a separate thread.

        Args:
            agent: The agent to run the task for.
            task (str): The task to be executed by the agent.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments.

        Returns:
            Tuple[str, str, str]: A tuple containing the agent name, task, and result.
        """
        result = agent.run(task, *args, **kwargs)
        # Assuming agent.run() is a blocking call
        return agent.agent_name, task, result

    def _track_output(self, agent_name: str, task: str, result: str):
        """
        Track the output of a completed task.

        Args:
            agent_name (str): The name of the agent that completed the task.
            task (str): The task that was completed.
            result (str): The result of the completed task.
        """
        self.metadata.tasks_completed += 1
        self.metadata.outputs.append(
            AgentOutput(
                agent_name=agent_name,
                task=task,
                result=result,
                timestamp=str(datetime.now().timestamp()),  # Numeric timestamp
            )
        )

    def export_to_json(self):
        """
        Export the swarm metadata to JSON.

        Returns:
            str: The JSON representation of the swarm metadata.
        """
        return self.metadata.model_dump_json(indent=4)

    def data_to_json_file(self):
        """
        Save the swarm metadata to a JSON file.
        """
        out = self.export_to_json()

        create_file_in_folder(
            folder_path=f"{self.workspace_dir}/Spreedsheet-Swarm-{self.name}/{self.name}",
            file_name=f"spreedsheet-swarm-{self.metadata.run_id}_metadata.json",
            content=out,
        )

    async def _save_metadata(self):
        """
        Save the swarm metadata to CSV and JSON.
        """
        if self.autosave_on:
            await self._save_to_csv()

    async def _save_to_csv(self):
        """
        Save the swarm metadata to a CSV file.
        """
        logger.info(f"Saving swarm metadata to: {self.save_file_path}")
        run_id = uuid.uuid4()

        # Check if file exists before opening it
        file_exists = os.path.exists(self.save_file_path)

        async with aiofiles.open(self.save_file_path, mode="a") as file:
            # Write header if file doesn't exist
            if not file_exists:
                header = "Run ID,Agent Name,Task,Result,Timestamp\n"
                await file.write(header)

            # Write each output as a new row
            for output in self.metadata.outputs:
                row = f"{run_id},{output.agent_name},{output.task},{output.result},{output.timestamp}\n"
                await file.write(row)