import logging import queue import threading from concurrent.futures import ThreadPoolExecutor from enum import Enum from typing import Any, Dict, List import chromadb from chromadb.utils import embedding_functions class TaskStatus(Enum): QUEUED = 1 RUNNING = 2 COMPLETED = 3 FAILED = 4 class Orchestrator: """ The Orchestrator takes in an agent, worker, or boss as input then handles all the logic for - task creation, - task assignment, - and task compeletion. And, the communication for millions of agents to chat with eachother through a vector database that each agent has access to chat with. Each LLM agent chats with the orchestrator through a dedicated communication layer. The orchestrator assigns tasks to each LLM agent, which the agents then complete and return. This setup allows for a high degree of flexibility, scalability, and robustness. In the context of swarm LLMs, one could consider an **Omni-Vector Embedding Database for communication. This database could store and manage the high-dimensional vectors produced by each LLM agent. Strengths: This approach would allow for similarity-based lookup and matching of LLM-generated vectors, which can be particularly useful for tasks that involve finding similar outputs or recognizing patterns. Weaknesses: An Omni-Vector Embedding Database might add complexity to the system in terms of setup and maintenance. It might also require significant computational resources, depending on the volume of data being handled and the complexity of the vectors. The handling and transmission of high-dimensional vectors could also pose challenges in terms of network load. # Orchestrator * Takes in an agent class with vector store, then handles all the communication and scales up a swarm with number of agents and handles task assignment and task completion from swarms import OpenAI, Orchestrator, Swarm orchestrated = Orchestrate(OpenAI, nodes=40) #handles all the task assignment and allocation and agent communication using a vectorstore as a universal communication layer and also handlles the task completion logic Objective = "Make a business website for a marketing consultancy" Swarms = Swarms(orchestrated, auto=True, Objective)) ``` In terms of architecture, the swarm might look something like this: ``` (Orchestrator) / \ Tools + Vector DB -- (LLM Agent)---(Communication Layer) (Communication Layer)---(LLM Agent)-- Tools + Vector DB / | | \ (Task Assignment) (Task Completion) (Task Assignment) (Task Completion) ###Usage ``` from swarms import Orchestrator # Instantiate the Orchestrator with 10 agents orchestrator = Orchestrator(llm, agent_list=[llm]*10, task_queue=[]) # Add tasks to the Orchestrator tasks = [{"content": f"Write a short story about a {animal}."} for animal in ["cat", "dog", "bird", "fish", "lion", "tiger", "elephant", "giraffe", "monkey", "zebra"]] orchestrator.assign_tasks(tasks) # Run the Orchestrator orchestrator.run() # Retrieve the results for task in tasks: print(orchestrator.retrieve_result(id(task))) ``` """ def __init__( self, agent, agent_list: List[Any], task_queue: List[Any], collection_name: str = "swarm", api_key: str = None, model_name: str = None, embed_func = None, worker = None ): self.agent = agent self.agents = queue.Queue() for _ in range(agent_list): self.agents.put(agent()) self.task_queue = queue.Queue() self.chroma_client = chromadb.Client() self.collection = self.chroma_client.create_collection( name = collection_name ) self.current_tasks = {} self.lock = threading.Lock() self.condition = threading.Condition(self.lock) self.executor = ThreadPoolExecutor(max_workers=len(agent_list)) self.embed_func = embed_func if embed_func else self.embed # @abstractmethod def assign_task( self, agent_id: int, task: Dict[str, Any] ) -> None: """Assign a task to a specific agent""" while True: with self.condition: while not self.task_queue: self.condition.wait() agent = self.agents.get() task = self.task_queue.get() try: result = self.worker.run(task["content"]) #using the embed method to get the vector representation of the result vector_representation = self.embed( result, self.api_key, self.model_name ) self.collection.add( embeddings=[vector_representation], documents=[str(id(task))], ids=[str(id(task))] ) logging.info(f"Task {id(str)} has been processed by agent {id(agent)} with") except Exception as error: logging.error(f"Failed to process task {id(task)} by agent {id(agent)}. Error: {error}") finally: with self.condition: self.agents.put(agent) self.condition.notify() def embed(self, input, api_key, model_name): openai = embedding_functions.OpenAIEmbeddingFunction( api_key=api_key, model_name=model_name ) embedding = openai(input) return embedding # @abstractmethod def retrieve_results(self, agent_id: int) -> Any: """Retrieve results from a specific agent""" try: #Query the vector database for documents created by the agents results = self.collection.query( query_texts=[str(agent_id)], n_results=10 ) return results except Exception as e: logging.error(f"Failed to retrieve results from agent {agent_id}. Error {e}") raise # @abstractmethod def update_vector_db(self, data) -> None: """Update the vector database""" try: self.collection.add( embeddings=[data["vector"]], documents=[str(data["task_id"])], ids=[str(data["task_id"])] ) except Exception as e: logging.error(f"Failed to update the vector database. Error: {e}") raise # @abstractmethod def get_vector_db(self): """Retrieve the vector database""" return self.collection def append_to_db( self, result: str ): """append the result of the swarm to a specifici collection in the database""" try: self.collection.add( documents=[result], ids=[str(id(result))] ) except Exception as e: logging.error(f"Failed to append the agent output to database. Error: {e}") raise def run(self, objective:str): """Runs""" if not objective or not isinstance(objective, str): logging.error("Invalid objective") raise ValueError("A valid objective is required") try: self.task_queue.append(objective) results = [ self.assign_task( agent_id, task ) for agent_id, task in zip( range( len(self.agents) ), self.task_queue ) ] for result in results: self.append_to_db(result) logging.info(f"Successfully ran swarms with results: {results}") return results except Exception as e: logging.error(f"An error occured in swarm: {e}") return None def chat( self, sender_id: int, receiver_id: int, message: str ): """ Allows the agents to chat with eachother thrught the vectordatabase # Instantiate the Orchestrator with 10 agents orchestrator = Orchestrator( llm, agent_list=[llm]*10, task_queue=[] ) # Agent 1 sends a message to Agent 2 orchestrator.chat(sender_id=1, receiver_id=2, message="Hello, Agent 2!") """ message_vector = self.embed( message, self.api_key, self.model_name ) #store the mesage in the vector database self.collection.add( embeddings=[message_vector], documents=[message], ids=[f"{sender_id}_to_{receiver_id}"] ) self.run( objective=f"chat with agent {receiver_id} about {message}" ) def add_agents( self, num_agents: int ): for _ in range(num_agents): self.agents.put(self.agent()) self.executor = ThreadPoolExecutor( max_workers=self.agents.qsize() ) def remove_agents(self, num_agents): for _ in range(num_agents): if not self.agents.empty(): self.agents.get() self.executor = ThreadPoolExecutor( max_workers=self.agents.qsize() )