# workers in unison #kye gomez jul 13 4:01pm, can scale up the number of swarms working on a probkem with `hivemind(swarms=4, or swarms=auto which will scale the agents depending on the complexity)` #this needs to change, we need to specify exactly what needs to be imported # add typechecking, documentation, and deeper error handling # TODO: MANY WORKERS import concurrent.futures import logging from swarms.swarms.swarms import HierarchicalSwarm logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') class HiveMind: def __init__( self, openai_api_key="", num_swarms=1, max_workers=None ): self.openai_api_key = openai_api_key self.num_swarms = num_swarms self.swarms = [HierarchicalSwarm(openai_api_key) for _ in range(num_swarms)] self.vectorstore = self.initialize_vectorstore() self.max_workers = max_workers if max_workers else min(32, num_swarms) def initialize_vectorstore(self): try: embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key) embedding_size = 1536 index = faiss.IndexFlatL2(embedding_size) return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {}) except Exception as e: logging.error(f"Failed to initialize vector store: {e}") raise def run_swarm(self, swarm, objective): try: return swarm.run(objective) except Exception as e: logging.error(f"An error occurred in run: {e}") def run(self, objective, timeout=None): with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = {executor.submit(self.run_swarm, swarm, objective) for swarm in self.swarms} results = [] for future in concurrent.futures.as_completed(futures, timeout=timeout): try: results.append(future.result()) except Exception as e: logging.error(f"An error occurred in a swarm: {e}") return results def add_swarm(self): self.swarms.append(HierarchicalSwarm(self.openai_api_key)) def remove_swarm(self, index): try: self.swarms.pop(index) except IndexError: logging.error(f"No swarm found at index {index}") def get_progress(self): #this assumes that the swarms class has a get progress method pass def cancel_swarm(self, index): try: self.swarms[index].cancel() except IndexError: logging.error(f"No swarm found at index {index}") def queue_tasks(self, tasks): for task in tasks: self.run(task)