Spaces:
Sleeping
Sleeping
import os | |
import asyncio | |
import json | |
import uuid | |
from swarms.utils.file_processing import create_file_in_folder | |
from abc import ABC | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from typing import ( | |
Any, | |
Callable, | |
Dict, | |
List, | |
Optional, | |
Sequence, | |
) | |
import yaml | |
from swarms.structs.agent import Agent | |
from swarms.structs.conversation import Conversation | |
from swarms.structs.omni_agent_types import AgentType | |
from pydantic import BaseModel | |
from swarms.utils.pandas_utils import ( | |
dict_to_dataframe, | |
display_agents_info, | |
pydantic_model_to_dataframe, | |
) | |
from swarms.utils.loguru_logger import initialize_logger | |
logger = initialize_logger(log_folder="base_swarm") | |
class BaseSwarm(ABC): | |
""" | |
Base Swarm Class for all multi-agent systems | |
Attributes: | |
agents (List[Agent]): A list of agents | |
max_loops (int): The maximum number of loops to run | |
Methods: | |
communicate: Communicate with the swarm through the orchestrator, protocols, and the universal communication layer | |
run: Run the swarm | |
step: Step the swarm | |
add_agent: Add a agent to the swarm | |
remove_agent: Remove a agent from the swarm | |
broadcast: Broadcast a message to all agents | |
reset: Reset the swarm | |
plan: agents must individually plan using a workflow or pipeline | |
direct_message: Send a direct message to a agent | |
autoscaler: Autoscaler that acts like kubernetes for autonomous agents | |
get_agent_by_id: Locate a agent by id | |
get_agent_by_name: Locate a agent by name | |
assign_task: Assign a task to a agent | |
get_all_tasks: Get all tasks | |
get_finished_tasks: Get all finished tasks | |
get_pending_tasks: Get all penPding tasks | |
pause_agent: Pause a agent | |
resume_agent: Resume a agent | |
stop_agent: Stop a agent | |
restart_agent: Restart agent | |
scale_up: Scale up the number of agents | |
scale_down: Scale down the number of agents | |
scale_to: Scale to a specific number of agents | |
get_all_agents: Get all agents | |
get_swarm_size: Get the size of the swarm | |
get_swarm_status: Get the status of the swarm | |
save_swarm_state: Save the swarm state | |
loop: Loop through the swarm | |
run_async: Run the swarm asynchronously | |
run_batch_async: Run the swarm asynchronously | |
run_batch: Run the swarm asynchronously | |
batched_run: Run the swarm asynchronously | |
abatch_run: Asynchronous batch run with language model | |
arun: Asynchronous run | |
""" | |
def __init__( | |
self, | |
name: Optional[str] = None, | |
description: Optional[str] = None, | |
agents: Optional[List[Agent]] = None, | |
models: Optional[List[Any]] = None, | |
max_loops: Optional[int] = 200, | |
callbacks: Optional[Sequence[callable]] = None, | |
autosave: Optional[bool] = False, | |
logging: Optional[bool] = False, | |
return_metadata: Optional[bool] = False, | |
metadata_filename: Optional[ | |
str | |
] = "multiagent_structure_metadata.json", | |
stopping_function: Optional[Callable] = None, | |
stopping_condition: Optional[str] = "stop", | |
stopping_condition_args: Optional[Dict] = None, | |
agentops_on: Optional[bool] = False, | |
speaker_selection_func: Optional[Callable] = None, | |
rules: Optional[str] = None, | |
collective_memory_system: Optional[Any] = False, | |
agent_ops_on: bool = False, | |
output_schema: Optional[BaseModel] = None, | |
*args, | |
**kwargs, | |
): | |
"""Initialize the swarm with agents""" | |
self.name = name | |
self.description = description | |
self.agents = agents | |
self.models = models | |
self.max_loops = max_loops | |
self.callbacks = callbacks | |
self.autosave = autosave | |
self.logging = logging | |
self.return_metadata = return_metadata | |
self.metadata_filename = metadata_filename | |
self.stopping_function = stopping_function | |
self.stopping_condition = stopping_condition | |
self.stopping_condition_args = stopping_condition_args | |
self.agentops_on = agentops_on | |
self.speaker_selection_func = speaker_selection_func | |
self.rules = rules | |
self.collective_memory_system = collective_memory_system | |
self.agent_ops_on = agent_ops_on | |
self.output_schema = output_schema | |
logger.info("Reliability checks activated.") | |
# Ensure that agents is exists | |
if self.agents is None: | |
logger.info("Agents must be provided.") | |
raise ValueError("Agents must be provided.") | |
# Ensure that agents is a list | |
if not isinstance(self.agents, list): | |
logger.error("Agents must be a list.") | |
raise TypeError("Agents must be a list.") | |
# Ensure that agents is not empty | |
if len(self.agents) == 0: | |
logger.error("Agents list must not be empty.") | |
raise ValueError("Agents list must not be empty.") | |
# Initialize conversation | |
self.conversation = Conversation( | |
time_enabled=True, rules=self.rules, *args, **kwargs | |
) | |
# Handle callbacks | |
if callbacks is not None: | |
for callback in self.callbacks: | |
if not callable(callback): | |
raise TypeError("Callback must be callable.") | |
# Handle autosave | |
if autosave: | |
self.save_to_json(metadata_filename) | |
# Handle stopping function | |
if stopping_function is not None: | |
if not callable(stopping_function): | |
raise TypeError("Stopping function must be callable.") | |
if stopping_condition_args is None: | |
stopping_condition_args = {} | |
self.stopping_condition_args = stopping_condition_args | |
self.stopping_condition = stopping_condition | |
self.stopping_function = stopping_function | |
# Handle stopping condition | |
if stopping_condition is not None: | |
if stopping_condition_args is None: | |
stopping_condition_args = {} | |
self.stopping_condition_args = stopping_condition_args | |
self.stopping_condition = stopping_condition | |
# If agentops is enabled, try to import agentops | |
if agentops_on is True: | |
for agent in self.agents: | |
agent.agent_ops_on = True | |
# Handle speaker selection function | |
if speaker_selection_func is not None: | |
if not callable(speaker_selection_func): | |
raise TypeError( | |
"Speaker selection function must be callable." | |
) | |
self.speaker_selection_func = speaker_selection_func | |
# Add the check for all the agents to see if agent ops is on! | |
if agent_ops_on is True: | |
for agent in self.agents: | |
agent.agent_ops_on = True | |
# Agents dictionary with agent name as key and agent object as value | |
self.agents_dict = { | |
agent.agent_name: agent for agent in self.agents | |
} | |
def communicate(self): | |
"""Communicate with the swarm through the orchestrator, protocols, and the universal communication layer""" | |
... | |
def run(self): | |
"""Run the swarm""" | |
... | |
def __call__( | |
self, | |
task, | |
*args, | |
**kwargs, | |
): | |
"""Call self as a function | |
Args: | |
task (_type_): _description_ | |
Returns: | |
_type_: _description_ | |
""" | |
try: | |
return self.run(task, *args, **kwargs) | |
except Exception as error: | |
logger.error(f"Error running {self.__class__.__name__}") | |
raise error | |
def step(self): | |
"""Step the swarm""" | |
def add_agent(self, agent: AgentType): | |
"""Add a agent to the swarm""" | |
self.agents.append(agent) | |
def add_agents(self, agents: List[AgentType]): | |
"""Add a list of agents to the swarm""" | |
self.agents.extend(agents) | |
def add_agent_by_id(self, agent_id: str): | |
"""Add a agent to the swarm by id""" | |
agent = self.get_agent_by_id(agent_id) | |
self.add_agent(agent) | |
def remove_agent(self, agent: AgentType): | |
"""Remove a agent from the swarm""" | |
self.agents.remove(agent) | |
def get_agent_by_name(self, name: str): | |
"""Get a agent by name""" | |
for agent in self.agents: | |
if agent.name == name: | |
return agent | |
def reset_all_agents(self): | |
"""Resets the state of all agents.""" | |
for agent in self.agents: | |
agent.reset() | |
def broadcast( | |
self, message: str, sender: Optional[AgentType] = None | |
): | |
"""Broadcast a message to all agents""" | |
def reset(self): | |
"""Reset the swarm""" | |
def plan(self, task: str): | |
"""agents must individually plan using a workflow or pipeline""" | |
def self_find_agent_by_name(self, name: str): | |
""" | |
Find an agent by its name. | |
Args: | |
name (str): The name of the agent to find. | |
Returns: | |
Agent: The Agent object if found, None otherwise. | |
""" | |
for agent in self.agents: | |
if agent.agent_name == name: | |
return agent | |
return None | |
def self_find_agent_by_id(self, id: uuid.UUID): | |
""" | |
Find an agent by its id. | |
Args: | |
id (str): The id of the agent to find. | |
Returns: | |
Agent: The Agent object if found, None otherwise. | |
""" | |
for agent in self.agents: | |
if agent.id == id: | |
return agent | |
return None | |
def agent_exists(self, name: str): | |
""" | |
Check if an agent exists in the swarm. | |
Args: | |
name (str): The name of the agent to check. | |
Returns: | |
bool: True if the agent exists, False otherwise. | |
""" | |
return self.self_find_agent_by_name(name) is not None | |
def direct_message( | |
self, | |
message: str, | |
sender: AgentType, | |
recipient: AgentType, | |
): | |
"""Send a direct message to a agent""" | |
def autoscaler(self, num_agents: int, agent: List[AgentType]): | |
"""Autoscaler that acts like kubernetes for autonomous agents""" | |
def get_agent_by_id(self, id: str) -> AgentType: | |
"""Locate a agent by id""" | |
def assign_task(self, agent: AgentType, task: Any) -> Dict: | |
"""Assign a task to a agent""" | |
def get_all_tasks(self, agent: AgentType, task: Any): | |
"""Get all tasks""" | |
def get_finished_tasks(self) -> List[Dict]: | |
"""Get all finished tasks""" | |
def get_pending_tasks(self) -> List[Dict]: | |
"""Get all pending tasks""" | |
def pause_agent(self, agent: AgentType, agent_id: str): | |
"""Pause a agent""" | |
def resume_agent(self, agent: AgentType, agent_id: str): | |
"""Resume a agent""" | |
def stop_agent(self, agent: AgentType, agent_id: str): | |
"""Stop a agent""" | |
def restart_agent(self, agent: AgentType): | |
"""Restart agent""" | |
def scale_up(self, num_agent: int): | |
"""Scale up the number of agents""" | |
def scale_down(self, num_agent: int): | |
"""Scale down the number of agents""" | |
def scale_to(self, num_agent: int): | |
"""Scale to a specific number of agents""" | |
def get_all_agents(self) -> List[AgentType]: | |
"""Get all agents""" | |
def get_swarm_size(self) -> int: | |
"""Get the size of the swarm""" | |
# #@abstractmethod | |
def get_swarm_status(self) -> Dict: | |
"""Get the status of the swarm""" | |
# #@abstractmethod | |
def save_swarm_state(self): | |
"""Save the swarm state""" | |
def batched_run(self, tasks: List[Any], *args, **kwargs): | |
"""_summary_ | |
Args: | |
tasks (List[Any]): _description_ | |
""" | |
# Implement batched run | |
return [self.run(task, *args, **kwargs) for task in tasks] | |
async def abatch_run(self, tasks: List[str], *args, **kwargs): | |
"""Asynchronous batch run with language model | |
Args: | |
tasks (List[str]): _description_ | |
Returns: | |
_type_: _description_ | |
""" | |
return await asyncio.gather( | |
*(self.arun(task, *args, **kwargs) for task in tasks) | |
) | |
async def arun(self, task: Optional[str] = None, *args, **kwargs): | |
"""Asynchronous run | |
Args: | |
task (Optional[str], optional): _description_. Defaults to None. | |
""" | |
loop = asyncio.get_event_loop() | |
result = await loop.run_in_executor( | |
None, self.run, task, *args, **kwargs | |
) | |
return result | |
def loop( | |
self, | |
task: Optional[str] = None, | |
*args, | |
**kwargs, | |
): | |
"""Loop through the swarm | |
Args: | |
task (Optional[str], optional): _description_. Defaults to None. | |
""" | |
# Loop through the self.max_loops | |
for i in range(self.max_loops): | |
self.run(task, *args, **kwargs) | |
async def aloop( | |
self, | |
task: Optional[str] = None, | |
*args, | |
**kwargs, | |
): | |
"""Asynchronous loop through the swarm | |
Args: | |
task (Optional[str], optional): _description_. Defaults to None. | |
""" | |
# Async Loop through the self.max_loops | |
loop = asyncio.get_event_loop() | |
result = await loop.run_in_executor( | |
None, self.loop, task, *args, **kwargs | |
) | |
return result | |
def run_async(self, task: Optional[str] = None, *args, **kwargs): | |
"""Run the swarm asynchronously | |
Args: | |
task (Optional[str], optional): _description_. Defaults to None. | |
""" | |
loop = asyncio.get_event_loop() | |
result = loop.run_until_complete( | |
self.arun(task, *args, **kwargs) | |
) | |
return result | |
def run_batch_async(self, tasks: List[str], *args, **kwargs): | |
"""Run the swarm asynchronously | |
Args: | |
task (Optional[str], optional): _description_. Defaults to None. | |
""" | |
loop = asyncio.get_event_loop() | |
result = loop.run_until_complete( | |
self.abatch_run(tasks, *args, **kwargs) | |
) | |
return result | |
def run_batch(self, tasks: List[str], *args, **kwargs): | |
"""Run the swarm asynchronously | |
Args: | |
task (Optional[str], optional): _description_. Defaults to None. | |
""" | |
return self.batched_run(tasks, *args, **kwargs) | |
def select_agent_by_name(self, agent_name: str): | |
""" | |
Select an agent through their name | |
""" | |
# Find agent with id | |
for agent in self.agents: | |
if agent.name == agent_name: | |
return agent | |
def task_assignment_by_id( | |
self, task: str, agent_id: str, *args, **kwargs | |
): | |
""" | |
Assign a task to an agent | |
""" | |
# Assign task to agent by their agent id | |
agent = self.select_agent(agent_id) | |
return agent.run(task, *args, **kwargs) | |
def task_assignment_by_name( | |
self, task: str, agent_name: str, *args, **kwargs | |
): | |
""" | |
Assign a task to an agent | |
""" | |
# Assign task to agent by their agent id | |
agent = self.select_agent_by_name(agent_name) | |
return agent.run(task, *args, **kwargs) | |
def concurrent_run(self, task: str) -> List[str]: | |
"""Synchronously run the task on all llms and collect responses""" | |
with ThreadPoolExecutor() as executor: | |
future_to_llm = { | |
executor.submit(agent, task): agent | |
for agent in self.agents | |
} | |
responses = [] | |
for future in as_completed(future_to_llm): | |
try: | |
responses.append(future.result()) | |
except Exception as error: | |
print( | |
f"{future_to_llm[future]} generated an" | |
f" exception: {error}" | |
) | |
self.last_responses = responses | |
self.task_history.append(task) | |
return responses | |
def add_llm(self, agent: Callable): | |
"""Add an llm to the god mode""" | |
self.agents.append(agent) | |
def remove_llm(self, agent: Callable): | |
"""Remove an llm from the god mode""" | |
self.agents.remove(agent) | |
def run_all(self, task: str = None, *args, **kwargs): | |
"""Run all agents | |
Args: | |
task (str, optional): _description_. Defaults to None. | |
Returns: | |
_type_: _description_ | |
""" | |
responses = [] | |
for agent in self.agents: | |
responses.append(agent(task, *args, **kwargs)) | |
return responses | |
def run_on_all_agents(self, task: str = None, *args, **kwargs): | |
"""Run on all agents | |
Args: | |
task (str, optional): _description_. Defaults to None. | |
Returns: | |
_type_: _description_ | |
""" | |
with ThreadPoolExecutor() as executor: | |
responses = executor.map( | |
lambda agent: agent(task, *args, **kwargs), | |
self.agents, | |
) | |
return list(responses) | |
def add_swarm_entry(self, swarm): | |
""" | |
Add the information of a joined Swarm to the registry. | |
Args: | |
swarm (SwarmManagerBase): Instance of SwarmManagerBase representing the joined Swarm. | |
Returns: | |
None | |
""" | |
def add_agent_entry(self, agent: Agent): | |
""" | |
Add the information of an Agent to the registry. | |
Args: | |
agent (Agent): Instance of Agent representing the Agent. | |
Returns: | |
None | |
""" | |
def retrieve_swarm_information(self, swarm_id: str): | |
""" | |
Retrieve the information of a specific Swarm from the registry. | |
Args: | |
swarm_id (str): Unique identifier of the Swarm. | |
Returns: | |
SwarmManagerBase: Instance of SwarmManagerBase representing the retrieved Swarm, or None if not found. | |
""" | |
def retrieve_joined_agents(self, agent_id: str) -> List[Agent]: | |
""" | |
Retrieve the information the Agents which have joined the registry. | |
Returns: | |
Agent: Instance of Agent representing the retrieved Agent, or None if not found. | |
""" | |
def join_swarm( | |
self, from_entity: Agent | Agent, to_entity: Agent | |
): | |
""" | |
Add a relationship between a Swarm and an Agent or other Swarm to the registry. | |
Args: | |
from (Agent | SwarmManagerBase): Instance of Agent or SwarmManagerBase representing the source of the relationship. | |
""" | |
def metadata(self): | |
""" | |
Get the metadata of the multi-agent structure. | |
Returns: | |
dict: The metadata of the multi-agent structure. | |
""" | |
return { | |
"agents": self.agents, | |
"callbacks": self.callbacks, | |
"autosave": self.autosave, | |
"logging": self.logging, | |
"conversation": self.conversation, | |
} | |
def save_to_json(self, filename: str): | |
""" | |
Save the current state of the multi-agent structure to a JSON file. | |
Args: | |
filename (str): The name of the file to save the multi-agent structure to. | |
Returns: | |
None | |
""" | |
try: | |
with open(filename, "w") as f: | |
json.dump(self.__dict__, f) | |
except Exception as e: | |
logger.error(e) | |
def load_from_json(self, filename: str): | |
""" | |
Load the state of the multi-agent structure from a JSON file. | |
Args: | |
filename (str): The name of the file to load the multi-agent structure from. | |
Returns: | |
None | |
""" | |
try: | |
with open(filename) as f: | |
self.__dict__ = json.load(f) | |
except Exception as e: | |
logger.error(e) | |
def save_to_yaml(self, filename: str): | |
""" | |
Save the current state of the multi-agent structure to a YAML file. | |
Args: | |
filename (str): The name of the file to save the multi-agent structure to. | |
Returns: | |
None | |
""" | |
try: | |
with open(filename, "w") as f: | |
yaml.dump(self.__dict__, f) | |
except Exception as e: | |
logger.error(e) | |
def load_from_yaml(self, filename: str): | |
""" | |
Load the state of the multi-agent structure from a YAML file. | |
Args: | |
filename (str): The name of the file to load the multi-agent structure from. | |
Returns: | |
None | |
""" | |
try: | |
with open(filename) as f: | |
self.__dict__ = yaml.load(f) | |
except Exception as e: | |
logger.error(e) | |
def __repr__(self): | |
return f"{self.__class__.__name__}({self.__dict__})" | |
def __str__(self): | |
return f"{self.__class__.__name__}({self.__dict__})" | |
def __len__(self): | |
return len(self.agents) | |
def __getitem__(self, index): | |
return self.agents[index] | |
def __setitem__(self, index, value): | |
self.agents[index] = value | |
def __delitem__(self, index): | |
del self.agents[index] | |
def __iter__(self): | |
return iter(self.agents) | |
def __reversed__(self): | |
return reversed(self.agents) | |
def __contains__(self, value): | |
return value in self.agents | |
def agent_error_handling_check(self): | |
try: | |
if self.agents is None: | |
message = "You have not passed in any agents, you need to input agents to run a swarm" | |
logger.info(message) | |
raise ValueError(message) | |
except Exception as error: | |
logger.info(error) | |
raise error | |
def swarm_initialization(self, *args, **kwargs): | |
""" | |
Initializes the hierarchical swarm. | |
Args: | |
*args: Additional positional arguments. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
None | |
""" | |
logger.info( | |
f"Initializing the hierarchical swarm: {self.name}" | |
) | |
logger.info(f"Purpose of this swarm: {self.description}") | |
# Now log number of agnets and their names | |
logger.info(f"Number of agents: {len(self.agents)}") | |
logger.info( | |
f"Agent names: {[agent.name for agent in self.agents]}" | |
) | |
# Now see if agents is not empty | |
if len(self.agents) == 0: | |
logger.info( | |
"No agents found. Please add agents to the swarm." | |
) | |
return None | |
# Now see if director is not empty | |
if self.director is None: | |
logger.info( | |
"No director found. Please add a director to the swarm." | |
) | |
return None | |
logger.info( | |
f"Initialization complete for the hierarchical swarm: {self.name}" | |
) | |
def export_output_schema(self): | |
""" | |
Export the output schema of the swarm. | |
Returns: | |
dict: The output schema of the swarm. | |
""" | |
return self.output_schema.model_dump_json(indent=4) | |
def export_output_schema_dict(self): | |
return self.output_schema.model_dump() | |
def export_and_autosave(self): | |
content = self.export_output_schema() | |
create_file_in_folder( | |
os.getenv("WORKSPACE_DIR"), | |
self.metadata_filename, | |
content=content, | |
) | |
return logger.info( | |
f"Metadata saved to {self.metadata_filename}" | |
) | |
def list_agents(self): | |
""" | |
List all agents in the swarm. | |
Returns: | |
None | |
""" | |
display_agents_info(self.agents) | |
def agents_to_dataframe(self): | |
""" | |
Convert agents to a pandas DataFrame. | |
""" | |
data = [agent.agent_output.dict() for agent in self.agents] | |
return dict_to_dataframe(data) | |
def model_to_dataframe(self): | |
""" | |
Convert the Pydantic model to a pandas DataFrame. | |
""" | |
return pydantic_model_to_dataframe(self.output_schema) | |