Spaces:
Runtime error
Runtime error
File size: 2,834 Bytes
4962437 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# workers in unison
#kye gomez jul 13 4:01pm, can scale up the number of swarms working on a probkem with `hivemind(swarms=4, or swarms=auto which will scale the agents depending on the complexity)`
#this needs to change, we need to specify exactly what needs to be imported
# add typechecking, documentation, and deeper error handling
# TODO: MANY WORKERS
import concurrent.futures
import logging
from swarms.swarms.swarms import HierarchicalSwarm
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
class HiveMind:
def __init__(
self,
openai_api_key="",
num_swarms=1,
max_workers=None
):
self.openai_api_key = openai_api_key
self.num_swarms = num_swarms
self.swarms = [HierarchicalSwarm(openai_api_key) for _ in range(num_swarms)]
self.vectorstore = self.initialize_vectorstore()
self.max_workers = max_workers if max_workers else min(32, num_swarms)
def initialize_vectorstore(self):
try:
embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
except Exception as e:
logging.error(f"Failed to initialize vector store: {e}")
raise
def run_swarm(self, swarm, objective):
try:
return swarm.run(objective)
except Exception as e:
logging.error(f"An error occurred in run: {e}")
def run(self, objective, timeout=None):
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.run_swarm, swarm, objective) for swarm in self.swarms}
results = []
for future in concurrent.futures.as_completed(futures, timeout=timeout):
try:
results.append(future.result())
except Exception as e:
logging.error(f"An error occurred in a swarm: {e}")
return results
def add_swarm(self):
self.swarms.append(HierarchicalSwarm(self.openai_api_key))
def remove_swarm(self, index):
try:
self.swarms.pop(index)
except IndexError:
logging.error(f"No swarm found at index {index}")
def get_progress(self):
#this assumes that the swarms class has a get progress method
pass
def cancel_swarm(self, index):
try:
self.swarms[index].cancel()
except IndexError:
logging.error(f"No swarm found at index {index}")
def queue_tasks(self, tasks):
for task in tasks:
self.run(task)
|