Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import json | |
| import uuid | |
| from swarms.utils.file_processing import create_file_in_folder | |
| from abc import ABC | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import ( | |
| Any, | |
| Callable, | |
| Dict, | |
| List, | |
| Optional, | |
| Sequence, | |
| ) | |
| import yaml | |
| from swarms.structs.agent import Agent | |
| from swarms.structs.conversation import Conversation | |
| from swarms.structs.omni_agent_types import AgentType | |
| from pydantic import BaseModel | |
| from swarms.utils.pandas_utils import ( | |
| dict_to_dataframe, | |
| display_agents_info, | |
| pydantic_model_to_dataframe, | |
| ) | |
| from swarms.utils.loguru_logger import initialize_logger | |
| logger = initialize_logger(log_folder="base_swarm") | |
| class BaseSwarm(ABC): | |
| """ | |
| Base Swarm Class for all multi-agent systems | |
| Attributes: | |
| agents (List[Agent]): A list of agents | |
| max_loops (int): The maximum number of loops to run | |
| Methods: | |
| communicate: Communicate with the swarm through the orchestrator, protocols, and the universal communication layer | |
| run: Run the swarm | |
| step: Step the swarm | |
| add_agent: Add a agent to the swarm | |
| remove_agent: Remove a agent from the swarm | |
| broadcast: Broadcast a message to all agents | |
| reset: Reset the swarm | |
| plan: agents must individually plan using a workflow or pipeline | |
| direct_message: Send a direct message to a agent | |
| autoscaler: Autoscaler that acts like kubernetes for autonomous agents | |
| get_agent_by_id: Locate a agent by id | |
| get_agent_by_name: Locate a agent by name | |
| assign_task: Assign a task to a agent | |
| get_all_tasks: Get all tasks | |
| get_finished_tasks: Get all finished tasks | |
| get_pending_tasks: Get all penPding tasks | |
| pause_agent: Pause a agent | |
| resume_agent: Resume a agent | |
| stop_agent: Stop a agent | |
| restart_agent: Restart agent | |
| scale_up: Scale up the number of agents | |
| scale_down: Scale down the number of agents | |
| scale_to: Scale to a specific number of agents | |
| get_all_agents: Get all agents | |
| get_swarm_size: Get the size of the swarm | |
| get_swarm_status: Get the status of the swarm | |
| save_swarm_state: Save the swarm state | |
| loop: Loop through the swarm | |
| run_async: Run the swarm asynchronously | |
| run_batch_async: Run the swarm asynchronously | |
| run_batch: Run the swarm asynchronously | |
| batched_run: Run the swarm asynchronously | |
| abatch_run: Asynchronous batch run with language model | |
| arun: Asynchronous run | |
| """ | |
| def __init__( | |
| self, | |
| name: Optional[str] = None, | |
| description: Optional[str] = None, | |
| agents: Optional[List[Agent]] = None, | |
| models: Optional[List[Any]] = None, | |
| max_loops: Optional[int] = 200, | |
| callbacks: Optional[Sequence[callable]] = None, | |
| autosave: Optional[bool] = False, | |
| logging: Optional[bool] = False, | |
| return_metadata: Optional[bool] = False, | |
| metadata_filename: Optional[ | |
| str | |
| ] = "multiagent_structure_metadata.json", | |
| stopping_function: Optional[Callable] = None, | |
| stopping_condition: Optional[str] = "stop", | |
| stopping_condition_args: Optional[Dict] = None, | |
| agentops_on: Optional[bool] = False, | |
| speaker_selection_func: Optional[Callable] = None, | |
| rules: Optional[str] = None, | |
| collective_memory_system: Optional[Any] = False, | |
| agent_ops_on: bool = False, | |
| output_schema: Optional[BaseModel] = None, | |
| *args, | |
| **kwargs, | |
| ): | |
| """Initialize the swarm with agents""" | |
| self.name = name | |
| self.description = description | |
| self.agents = agents | |
| self.models = models | |
| self.max_loops = max_loops | |
| self.callbacks = callbacks | |
| self.autosave = autosave | |
| self.logging = logging | |
| self.return_metadata = return_metadata | |
| self.metadata_filename = metadata_filename | |
| self.stopping_function = stopping_function | |
| self.stopping_condition = stopping_condition | |
| self.stopping_condition_args = stopping_condition_args | |
| self.agentops_on = agentops_on | |
| self.speaker_selection_func = speaker_selection_func | |
| self.rules = rules | |
| self.collective_memory_system = collective_memory_system | |
| self.agent_ops_on = agent_ops_on | |
| self.output_schema = output_schema | |
| logger.info("Reliability checks activated.") | |
| # Ensure that agents is exists | |
| if self.agents is None: | |
| logger.info("Agents must be provided.") | |
| raise ValueError("Agents must be provided.") | |
| # Ensure that agents is a list | |
| if not isinstance(self.agents, list): | |
| logger.error("Agents must be a list.") | |
| raise TypeError("Agents must be a list.") | |
| # Ensure that agents is not empty | |
| if len(self.agents) == 0: | |
| logger.error("Agents list must not be empty.") | |
| raise ValueError("Agents list must not be empty.") | |
| # Initialize conversation | |
| self.conversation = Conversation( | |
| time_enabled=True, rules=self.rules, *args, **kwargs | |
| ) | |
| # Handle callbacks | |
| if callbacks is not None: | |
| for callback in self.callbacks: | |
| if not callable(callback): | |
| raise TypeError("Callback must be callable.") | |
| # Handle autosave | |
| if autosave: | |
| self.save_to_json(metadata_filename) | |
| # Handle stopping function | |
| if stopping_function is not None: | |
| if not callable(stopping_function): | |
| raise TypeError("Stopping function must be callable.") | |
| if stopping_condition_args is None: | |
| stopping_condition_args = {} | |
| self.stopping_condition_args = stopping_condition_args | |
| self.stopping_condition = stopping_condition | |
| self.stopping_function = stopping_function | |
| # Handle stopping condition | |
| if stopping_condition is not None: | |
| if stopping_condition_args is None: | |
| stopping_condition_args = {} | |
| self.stopping_condition_args = stopping_condition_args | |
| self.stopping_condition = stopping_condition | |
| # If agentops is enabled, try to import agentops | |
| if agentops_on is True: | |
| for agent in self.agents: | |
| agent.agent_ops_on = True | |
| # Handle speaker selection function | |
| if speaker_selection_func is not None: | |
| if not callable(speaker_selection_func): | |
| raise TypeError( | |
| "Speaker selection function must be callable." | |
| ) | |
| self.speaker_selection_func = speaker_selection_func | |
| # Add the check for all the agents to see if agent ops is on! | |
| if agent_ops_on is True: | |
| for agent in self.agents: | |
| agent.agent_ops_on = True | |
| # Agents dictionary with agent name as key and agent object as value | |
| self.agents_dict = { | |
| agent.agent_name: agent for agent in self.agents | |
| } | |
| def communicate(self): | |
| """Communicate with the swarm through the orchestrator, protocols, and the universal communication layer""" | |
| ... | |
| def run(self): | |
| """Run the swarm""" | |
| ... | |
| def __call__( | |
| self, | |
| task, | |
| *args, | |
| **kwargs, | |
| ): | |
| """Call self as a function | |
| Args: | |
| task (_type_): _description_ | |
| Returns: | |
| _type_: _description_ | |
| """ | |
| try: | |
| return self.run(task, *args, **kwargs) | |
| except Exception as error: | |
| logger.error(f"Error running {self.__class__.__name__}") | |
| raise error | |
| def step(self): | |
| """Step the swarm""" | |
| def add_agent(self, agent: AgentType): | |
| """Add a agent to the swarm""" | |
| self.agents.append(agent) | |
| def add_agents(self, agents: List[AgentType]): | |
| """Add a list of agents to the swarm""" | |
| self.agents.extend(agents) | |
| def add_agent_by_id(self, agent_id: str): | |
| """Add a agent to the swarm by id""" | |
| agent = self.get_agent_by_id(agent_id) | |
| self.add_agent(agent) | |
| def remove_agent(self, agent: AgentType): | |
| """Remove a agent from the swarm""" | |
| self.agents.remove(agent) | |
| def get_agent_by_name(self, name: str): | |
| """Get a agent by name""" | |
| for agent in self.agents: | |
| if agent.name == name: | |
| return agent | |
| def reset_all_agents(self): | |
| """Resets the state of all agents.""" | |
| for agent in self.agents: | |
| agent.reset() | |
| def broadcast( | |
| self, message: str, sender: Optional[AgentType] = None | |
| ): | |
| """Broadcast a message to all agents""" | |
| def reset(self): | |
| """Reset the swarm""" | |
| def plan(self, task: str): | |
| """agents must individually plan using a workflow or pipeline""" | |
| def self_find_agent_by_name(self, name: str): | |
| """ | |
| Find an agent by its name. | |
| Args: | |
| name (str): The name of the agent to find. | |
| Returns: | |
| Agent: The Agent object if found, None otherwise. | |
| """ | |
| for agent in self.agents: | |
| if agent.agent_name == name: | |
| return agent | |
| return None | |
| def self_find_agent_by_id(self, id: uuid.UUID): | |
| """ | |
| Find an agent by its id. | |
| Args: | |
| id (str): The id of the agent to find. | |
| Returns: | |
| Agent: The Agent object if found, None otherwise. | |
| """ | |
| for agent in self.agents: | |
| if agent.id == id: | |
| return agent | |
| return None | |
| def agent_exists(self, name: str): | |
| """ | |
| Check if an agent exists in the swarm. | |
| Args: | |
| name (str): The name of the agent to check. | |
| Returns: | |
| bool: True if the agent exists, False otherwise. | |
| """ | |
| return self.self_find_agent_by_name(name) is not None | |
| def direct_message( | |
| self, | |
| message: str, | |
| sender: AgentType, | |
| recipient: AgentType, | |
| ): | |
| """Send a direct message to a agent""" | |
| def autoscaler(self, num_agents: int, agent: List[AgentType]): | |
| """Autoscaler that acts like kubernetes for autonomous agents""" | |
| def get_agent_by_id(self, id: str) -> AgentType: | |
| """Locate a agent by id""" | |
| def assign_task(self, agent: AgentType, task: Any) -> Dict: | |
| """Assign a task to a agent""" | |
| def get_all_tasks(self, agent: AgentType, task: Any): | |
| """Get all tasks""" | |
| def get_finished_tasks(self) -> List[Dict]: | |
| """Get all finished tasks""" | |
| def get_pending_tasks(self) -> List[Dict]: | |
| """Get all pending tasks""" | |
| def pause_agent(self, agent: AgentType, agent_id: str): | |
| """Pause a agent""" | |
| def resume_agent(self, agent: AgentType, agent_id: str): | |
| """Resume a agent""" | |
| def stop_agent(self, agent: AgentType, agent_id: str): | |
| """Stop a agent""" | |
| def restart_agent(self, agent: AgentType): | |
| """Restart agent""" | |
| def scale_up(self, num_agent: int): | |
| """Scale up the number of agents""" | |
| def scale_down(self, num_agent: int): | |
| """Scale down the number of agents""" | |
| def scale_to(self, num_agent: int): | |
| """Scale to a specific number of agents""" | |
| def get_all_agents(self) -> List[AgentType]: | |
| """Get all agents""" | |
| def get_swarm_size(self) -> int: | |
| """Get the size of the swarm""" | |
| # #@abstractmethod | |
| def get_swarm_status(self) -> Dict: | |
| """Get the status of the swarm""" | |
| # #@abstractmethod | |
| def save_swarm_state(self): | |
| """Save the swarm state""" | |
| def batched_run(self, tasks: List[Any], *args, **kwargs): | |
| """_summary_ | |
| Args: | |
| tasks (List[Any]): _description_ | |
| """ | |
| # Implement batched run | |
| return [self.run(task, *args, **kwargs) for task in tasks] | |
| async def abatch_run(self, tasks: List[str], *args, **kwargs): | |
| """Asynchronous batch run with language model | |
| Args: | |
| tasks (List[str]): _description_ | |
| Returns: | |
| _type_: _description_ | |
| """ | |
| return await asyncio.gather( | |
| *(self.arun(task, *args, **kwargs) for task in tasks) | |
| ) | |
| async def arun(self, task: Optional[str] = None, *args, **kwargs): | |
| """Asynchronous run | |
| Args: | |
| task (Optional[str], optional): _description_. Defaults to None. | |
| """ | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| None, self.run, task, *args, **kwargs | |
| ) | |
| return result | |
| def loop( | |
| self, | |
| task: Optional[str] = None, | |
| *args, | |
| **kwargs, | |
| ): | |
| """Loop through the swarm | |
| Args: | |
| task (Optional[str], optional): _description_. Defaults to None. | |
| """ | |
| # Loop through the self.max_loops | |
| for i in range(self.max_loops): | |
| self.run(task, *args, **kwargs) | |
| async def aloop( | |
| self, | |
| task: Optional[str] = None, | |
| *args, | |
| **kwargs, | |
| ): | |
| """Asynchronous loop through the swarm | |
| Args: | |
| task (Optional[str], optional): _description_. Defaults to None. | |
| """ | |
| # Async Loop through the self.max_loops | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| None, self.loop, task, *args, **kwargs | |
| ) | |
| return result | |
| def run_async(self, task: Optional[str] = None, *args, **kwargs): | |
| """Run the swarm asynchronously | |
| Args: | |
| task (Optional[str], optional): _description_. Defaults to None. | |
| """ | |
| loop = asyncio.get_event_loop() | |
| result = loop.run_until_complete( | |
| self.arun(task, *args, **kwargs) | |
| ) | |
| return result | |
| def run_batch_async(self, tasks: List[str], *args, **kwargs): | |
| """Run the swarm asynchronously | |
| Args: | |
| task (Optional[str], optional): _description_. Defaults to None. | |
| """ | |
| loop = asyncio.get_event_loop() | |
| result = loop.run_until_complete( | |
| self.abatch_run(tasks, *args, **kwargs) | |
| ) | |
| return result | |
| def run_batch(self, tasks: List[str], *args, **kwargs): | |
| """Run the swarm asynchronously | |
| Args: | |
| task (Optional[str], optional): _description_. Defaults to None. | |
| """ | |
| return self.batched_run(tasks, *args, **kwargs) | |
| def select_agent_by_name(self, agent_name: str): | |
| """ | |
| Select an agent through their name | |
| """ | |
| # Find agent with id | |
| for agent in self.agents: | |
| if agent.name == agent_name: | |
| return agent | |
| def task_assignment_by_id( | |
| self, task: str, agent_id: str, *args, **kwargs | |
| ): | |
| """ | |
| Assign a task to an agent | |
| """ | |
| # Assign task to agent by their agent id | |
| agent = self.select_agent(agent_id) | |
| return agent.run(task, *args, **kwargs) | |
| def task_assignment_by_name( | |
| self, task: str, agent_name: str, *args, **kwargs | |
| ): | |
| """ | |
| Assign a task to an agent | |
| """ | |
| # Assign task to agent by their agent id | |
| agent = self.select_agent_by_name(agent_name) | |
| return agent.run(task, *args, **kwargs) | |
| def concurrent_run(self, task: str) -> List[str]: | |
| """Synchronously run the task on all llms and collect responses""" | |
| with ThreadPoolExecutor() as executor: | |
| future_to_llm = { | |
| executor.submit(agent, task): agent | |
| for agent in self.agents | |
| } | |
| responses = [] | |
| for future in as_completed(future_to_llm): | |
| try: | |
| responses.append(future.result()) | |
| except Exception as error: | |
| print( | |
| f"{future_to_llm[future]} generated an" | |
| f" exception: {error}" | |
| ) | |
| self.last_responses = responses | |
| self.task_history.append(task) | |
| return responses | |
| def add_llm(self, agent: Callable): | |
| """Add an llm to the god mode""" | |
| self.agents.append(agent) | |
| def remove_llm(self, agent: Callable): | |
| """Remove an llm from the god mode""" | |
| self.agents.remove(agent) | |
| def run_all(self, task: str = None, *args, **kwargs): | |
| """Run all agents | |
| Args: | |
| task (str, optional): _description_. Defaults to None. | |
| Returns: | |
| _type_: _description_ | |
| """ | |
| responses = [] | |
| for agent in self.agents: | |
| responses.append(agent(task, *args, **kwargs)) | |
| return responses | |
| def run_on_all_agents(self, task: str = None, *args, **kwargs): | |
| """Run on all agents | |
| Args: | |
| task (str, optional): _description_. Defaults to None. | |
| Returns: | |
| _type_: _description_ | |
| """ | |
| with ThreadPoolExecutor() as executor: | |
| responses = executor.map( | |
| lambda agent: agent(task, *args, **kwargs), | |
| self.agents, | |
| ) | |
| return list(responses) | |
| def add_swarm_entry(self, swarm): | |
| """ | |
| Add the information of a joined Swarm to the registry. | |
| Args: | |
| swarm (SwarmManagerBase): Instance of SwarmManagerBase representing the joined Swarm. | |
| Returns: | |
| None | |
| """ | |
| def add_agent_entry(self, agent: Agent): | |
| """ | |
| Add the information of an Agent to the registry. | |
| Args: | |
| agent (Agent): Instance of Agent representing the Agent. | |
| Returns: | |
| None | |
| """ | |
| def retrieve_swarm_information(self, swarm_id: str): | |
| """ | |
| Retrieve the information of a specific Swarm from the registry. | |
| Args: | |
| swarm_id (str): Unique identifier of the Swarm. | |
| Returns: | |
| SwarmManagerBase: Instance of SwarmManagerBase representing the retrieved Swarm, or None if not found. | |
| """ | |
| def retrieve_joined_agents(self, agent_id: str) -> List[Agent]: | |
| """ | |
| Retrieve the information the Agents which have joined the registry. | |
| Returns: | |
| Agent: Instance of Agent representing the retrieved Agent, or None if not found. | |
| """ | |
| def join_swarm( | |
| self, from_entity: Agent | Agent, to_entity: Agent | |
| ): | |
| """ | |
| Add a relationship between a Swarm and an Agent or other Swarm to the registry. | |
| Args: | |
| from (Agent | SwarmManagerBase): Instance of Agent or SwarmManagerBase representing the source of the relationship. | |
| """ | |
| def metadata(self): | |
| """ | |
| Get the metadata of the multi-agent structure. | |
| Returns: | |
| dict: The metadata of the multi-agent structure. | |
| """ | |
| return { | |
| "agents": self.agents, | |
| "callbacks": self.callbacks, | |
| "autosave": self.autosave, | |
| "logging": self.logging, | |
| "conversation": self.conversation, | |
| } | |
| def save_to_json(self, filename: str): | |
| """ | |
| Save the current state of the multi-agent structure to a JSON file. | |
| Args: | |
| filename (str): The name of the file to save the multi-agent structure to. | |
| Returns: | |
| None | |
| """ | |
| try: | |
| with open(filename, "w") as f: | |
| json.dump(self.__dict__, f) | |
| except Exception as e: | |
| logger.error(e) | |
| def load_from_json(self, filename: str): | |
| """ | |
| Load the state of the multi-agent structure from a JSON file. | |
| Args: | |
| filename (str): The name of the file to load the multi-agent structure from. | |
| Returns: | |
| None | |
| """ | |
| try: | |
| with open(filename) as f: | |
| self.__dict__ = json.load(f) | |
| except Exception as e: | |
| logger.error(e) | |
| def save_to_yaml(self, filename: str): | |
| """ | |
| Save the current state of the multi-agent structure to a YAML file. | |
| Args: | |
| filename (str): The name of the file to save the multi-agent structure to. | |
| Returns: | |
| None | |
| """ | |
| try: | |
| with open(filename, "w") as f: | |
| yaml.dump(self.__dict__, f) | |
| except Exception as e: | |
| logger.error(e) | |
| def load_from_yaml(self, filename: str): | |
| """ | |
| Load the state of the multi-agent structure from a YAML file. | |
| Args: | |
| filename (str): The name of the file to load the multi-agent structure from. | |
| Returns: | |
| None | |
| """ | |
| try: | |
| with open(filename) as f: | |
| self.__dict__ = yaml.load(f) | |
| except Exception as e: | |
| logger.error(e) | |
| def __repr__(self): | |
| return f"{self.__class__.__name__}({self.__dict__})" | |
| def __str__(self): | |
| return f"{self.__class__.__name__}({self.__dict__})" | |
| def __len__(self): | |
| return len(self.agents) | |
| def __getitem__(self, index): | |
| return self.agents[index] | |
| def __setitem__(self, index, value): | |
| self.agents[index] = value | |
| def __delitem__(self, index): | |
| del self.agents[index] | |
| def __iter__(self): | |
| return iter(self.agents) | |
| def __reversed__(self): | |
| return reversed(self.agents) | |
| def __contains__(self, value): | |
| return value in self.agents | |
| def agent_error_handling_check(self): | |
| try: | |
| if self.agents is None: | |
| message = "You have not passed in any agents, you need to input agents to run a swarm" | |
| logger.info(message) | |
| raise ValueError(message) | |
| except Exception as error: | |
| logger.info(error) | |
| raise error | |
| def swarm_initialization(self, *args, **kwargs): | |
| """ | |
| Initializes the hierarchical swarm. | |
| Args: | |
| *args: Additional positional arguments. | |
| **kwargs: Additional keyword arguments. | |
| Returns: | |
| None | |
| """ | |
| logger.info( | |
| f"Initializing the hierarchical swarm: {self.name}" | |
| ) | |
| logger.info(f"Purpose of this swarm: {self.description}") | |
| # Now log number of agnets and their names | |
| logger.info(f"Number of agents: {len(self.agents)}") | |
| logger.info( | |
| f"Agent names: {[agent.name for agent in self.agents]}" | |
| ) | |
| # Now see if agents is not empty | |
| if len(self.agents) == 0: | |
| logger.info( | |
| "No agents found. Please add agents to the swarm." | |
| ) | |
| return None | |
| # Now see if director is not empty | |
| if self.director is None: | |
| logger.info( | |
| "No director found. Please add a director to the swarm." | |
| ) | |
| return None | |
| logger.info( | |
| f"Initialization complete for the hierarchical swarm: {self.name}" | |
| ) | |
| def export_output_schema(self): | |
| """ | |
| Export the output schema of the swarm. | |
| Returns: | |
| dict: The output schema of the swarm. | |
| """ | |
| return self.output_schema.model_dump_json(indent=4) | |
| def export_output_schema_dict(self): | |
| return self.output_schema.model_dump() | |
| def export_and_autosave(self): | |
| content = self.export_output_schema() | |
| create_file_in_folder( | |
| os.getenv("WORKSPACE_DIR"), | |
| self.metadata_filename, | |
| content=content, | |
| ) | |
| return logger.info( | |
| f"Metadata saved to {self.metadata_filename}" | |
| ) | |
| def list_agents(self): | |
| """ | |
| List all agents in the swarm. | |
| Returns: | |
| None | |
| """ | |
| display_agents_info(self.agents) | |
| def agents_to_dataframe(self): | |
| """ | |
| Convert agents to a pandas DataFrame. | |
| """ | |
| data = [agent.agent_output.dict() for agent in self.agents] | |
| return dict_to_dataframe(data) | |
| def model_to_dataframe(self): | |
| """ | |
| Convert the Pydantic model to a pandas DataFrame. | |
| """ | |
| return pydantic_model_to_dataframe(self.output_schema) | |