Spaces:
Sleeping
Sleeping
import asyncio | |
import csv | |
from datetime import datetime | |
import os | |
import uuid | |
from typing import List, Union | |
import aiofiles | |
from pydantic import BaseModel, Field | |
from swarms.structs.agent import Agent | |
from swarms.structs.base_swarm import BaseSwarm | |
from swarms.utils.file_processing import create_file_in_folder | |
from swarms.telemetry.capture_sys_data import log_agent_data | |
from swarms.utils.loguru_logger import initialize_logger | |
logger = initialize_logger(log_folder="spreadsheet_swarm") | |
# Replace timestamp-based time with a UUID for file naming | |
run_id = uuid.uuid4().hex # Unique identifier for each run | |
class AgentOutput(BaseModel): | |
agent_name: str | |
task: str | |
result: str | |
timestamp: str | |
class SwarmRunMetadata(BaseModel): | |
run_id: str = Field( | |
default_factory=lambda: f"spreadsheet_swarm_run_{run_id}" | |
) | |
name: str | |
description: str | |
agents: List[str] | |
start_time: str = Field( | |
default_factory=lambda: str(datetime.now().timestamp()), # Numeric timestamp | |
description="The start time of the swarm run.", | |
) | |
end_time: str | |
tasks_completed: int | |
outputs: List[AgentOutput] | |
number_of_agents: int = Field( | |
..., | |
description="The number of agents participating in the swarm.", | |
) | |
class SpreadSheetSwarm(BaseSwarm): | |
""" | |
A swarm that processes tasks concurrently using multiple agents. | |
Args: | |
name (str, optional): The name of the swarm. Defaults to "Spreadsheet-Swarm". | |
description (str, optional): The description of the swarm. Defaults to "A swarm that processes tasks concurrently using multiple agents.". | |
agents (Union[Agent, List[Agent]], optional): The agents participating in the swarm. Defaults to an empty list. | |
autosave_on (bool, optional): Whether to enable autosave of swarm metadata. Defaults to True. | |
save_file_path (str, optional): The file path to save the swarm metadata as a CSV file. Defaults to "spreedsheet_swarm.csv". | |
max_loops (int, optional): The number of times to repeat the swarm tasks. Defaults to 1. | |
workspace_dir (str, optional): The directory path of the workspace. Defaults to the value of the "WORKSPACE_DIR" environment variable. | |
*args: Additional positional arguments. | |
**kwargs: Additional keyword arguments. | |
""" | |
def __init__( | |
self, | |
name: str = "Spreadsheet-Swarm", | |
description: str = "A swarm that processes tasks concurrently using multiple agents and saves the metadata to a CSV file.", | |
agents: Union[Agent, List[Agent]] = [], | |
autosave_on: bool = True, | |
save_file_path: str = None, | |
max_loops: int = 1, | |
workspace_dir: str = os.getenv("WORKSPACE_DIR"), | |
*args, | |
**kwargs, | |
): | |
super().__init__( | |
name=name, | |
description=description, | |
agents=agents if isinstance(agents, list) else [agents], | |
*args, | |
**kwargs, | |
) | |
self.name = name | |
self.description = description | |
self.save_file_path = save_file_path | |
self.autosave_on = autosave_on | |
self.max_loops = max_loops | |
self.workspace_dir = workspace_dir | |
# Create a timestamp without colons or periods | |
timestamp = datetime.now().isoformat().replace(":", "_").replace(".", "_") | |
# Use this timestamp in the CSV filename | |
self.save_file_path = f"spreadsheet_swarm_{timestamp}_run_id_{run_id}.csv" | |
self.metadata = SwarmRunMetadata( | |
run_id=f"spreadsheet_swarm_run_{run_id}", | |
name=name, | |
description=description, | |
agents=[agent.name for agent in agents], | |
start_time=str(datetime.now().timestamp()), # Numeric timestamp | |
end_time="", | |
tasks_completed=0, | |
outputs=[], | |
number_of_agents=len(agents), | |
) | |
self.reliability_check() | |
def reliability_check(self): | |
""" | |
Check the reliability of the swarm. | |
Raises: | |
ValueError: If no agents are provided or no save file path is provided. | |
""" | |
logger.info("Checking the reliability of the swarm...") | |
if not self.agents: | |
raise ValueError("No agents are provided.") | |
if not self.save_file_path: | |
raise ValueError("No save file path is provided.") | |
if not self.max_loops: | |
raise ValueError("No max loops are provided.") | |
logger.info("Swarm reliability check passed.") | |
logger.info("Swarm is ready to run.") | |
# @profile_func | |
def run(self, task: str, *args, **kwargs): | |
""" | |
Run the swarm with the specified task. | |
Args: | |
task (str): The task to be executed by the swarm. | |
*args: Additional positional arguments. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
str: The JSON representation of the swarm metadata. | |
""" | |
logger.info(f"Running the swarm with task: {task}") | |
self.metadata.start_time = str(datetime.now().timestamp()) # Numeric timestamp | |
# Check if we're already in an event loop | |
if asyncio.get_event_loop().is_running(): | |
# If so, create and run tasks directly using `create_task` without `asyncio.run` | |
task_future = asyncio.create_task(self._run_tasks(task, *args, **kwargs)) | |
asyncio.get_event_loop().run_until_complete(task_future) | |
else: | |
# If no event loop is running, run using `asyncio.run` | |
asyncio.run(self._run_tasks(task, *args, **kwargs)) | |
self.metadata.end_time = str(datetime.now().timestamp()) # Numeric timestamp | |
# Synchronously save metadata | |
logger.info("Saving metadata to CSV and JSON...") | |
asyncio.run(self._save_metadata()) | |
if self.autosave_on: | |
self.data_to_json_file() | |
print(log_agent_data(self.metadata.model_dump())) | |
return self.metadata.model_dump_json(indent=4) | |
async def _run_tasks(self, task: str, *args, **kwargs): | |
""" | |
Run the swarm tasks concurrently. | |
Args: | |
task (str): The task to be executed by the swarm. | |
*args: Additional positional arguments. | |
**kwargs: Additional keyword arguments. | |
""" | |
tasks = [] | |
for _ in range(self.max_loops): | |
for agent in self.agents: | |
# Use asyncio.to_thread to run the blocking task in a thread pool | |
tasks.append( | |
asyncio.to_thread( | |
self._run_agent_task, | |
agent, | |
task, | |
*args, | |
**kwargs, | |
) | |
) | |
# Run all tasks concurrently | |
results = await asyncio.gather(*tasks) | |
# Process the results | |
for result in results: | |
self._track_output(*result) | |
def _run_agent_task(self, agent, task, *args, **kwargs): | |
""" | |
Run a single agent's task in a separate thread. | |
Args: | |
agent: The agent to run the task for. | |
task (str): The task to be executed by the agent. | |
*args: Additional positional arguments. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
Tuple[str, str, str]: A tuple containing the agent name, task, and result. | |
""" | |
result = agent.run(task, *args, **kwargs) | |
# Assuming agent.run() is a blocking call | |
return agent.agent_name, task, result | |
def _track_output(self, agent_name: str, task: str, result: str): | |
""" | |
Track the output of a completed task. | |
Args: | |
agent_name (str): The name of the agent that completed the task. | |
task (str): The task that was completed. | |
result (str): The result of the completed task. | |
""" | |
self.metadata.tasks_completed += 1 | |
self.metadata.outputs.append( | |
AgentOutput( | |
agent_name=agent_name, | |
task=task, | |
result=result, | |
timestamp=str(datetime.now().timestamp()), # Numeric timestamp | |
) | |
) | |
def export_to_json(self): | |
""" | |
Export the swarm metadata to JSON. | |
Returns: | |
str: The JSON representation of the swarm metadata. | |
""" | |
return self.metadata.model_dump_json(indent=4) | |
def data_to_json_file(self): | |
""" | |
Save the swarm metadata to a JSON file. | |
""" | |
out = self.export_to_json() | |
create_file_in_folder( | |
folder_path=f"{self.workspace_dir}/Spreedsheet-Swarm-{self.name}/{self.name}", | |
file_name=f"spreedsheet-swarm-{self.metadata.run_id}_metadata.json", | |
content=out, | |
) | |
async def _save_metadata(self): | |
""" | |
Save the swarm metadata to CSV and JSON. | |
""" | |
if self.autosave_on: | |
await self._save_to_csv() | |
async def _save_to_csv(self): | |
""" | |
Save the swarm metadata to a CSV file. | |
""" | |
logger.info(f"Saving swarm metadata to: {self.save_file_path}") | |
run_id = uuid.uuid4() | |
# Check if file exists before opening it | |
file_exists = os.path.exists(self.save_file_path) | |
async with aiofiles.open(self.save_file_path, mode="a") as file: | |
# Write header if file doesn't exist | |
if not file_exists: | |
header = "Run ID,Agent Name,Task,Result,Timestamp\n" | |
await file.write(header) | |
# Write each output as a new row | |
for output in self.metadata.outputs: | |
row = f"{run_id},{output.agent_name},{output.task},{output.result},{output.timestamp}\n" | |
await file.write(row) |