File size: 2,834 Bytes
4962437
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#  workers in unison
#kye gomez jul 13 4:01pm, can scale up the number of swarms working on a probkem with `hivemind(swarms=4, or swarms=auto which will scale the agents depending on the complexity)`
#this needs to change, we need to specify exactly what needs to be imported
# add typechecking, documentation, and deeper error handling 
# TODO: MANY WORKERS

import concurrent.futures
import logging


from swarms.swarms.swarms import HierarchicalSwarm

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

class HiveMind:
    def __init__(
            self, 
            openai_api_key="", 
            num_swarms=1, 
            max_workers=None
        ):
        self.openai_api_key = openai_api_key
        self.num_swarms = num_swarms
        self.swarms = [HierarchicalSwarm(openai_api_key) for _ in range(num_swarms)]
        self.vectorstore = self.initialize_vectorstore()
        self.max_workers = max_workers if max_workers else min(32, num_swarms)

    def initialize_vectorstore(self):
        try:
            embeddings_model = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
            embedding_size = 1536
            index = faiss.IndexFlatL2(embedding_size)
            return FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
        except Exception as e:
            logging.error(f"Failed to initialize vector store: {e}")
            raise

    def run_swarm(self, swarm, objective):
        try:
            return swarm.run(objective)
        except Exception as e:
            logging.error(f"An error occurred in run: {e}")

    def run(self, objective, timeout=None):
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {executor.submit(self.run_swarm, swarm, objective) for swarm in self.swarms}
            results = []
            for future in concurrent.futures.as_completed(futures, timeout=timeout):
                try:
                    results.append(future.result())
                except Exception as e:
                    logging.error(f"An error occurred in a swarm: {e}")
        return results
    
    def add_swarm(self):
        self.swarms.append(HierarchicalSwarm(self.openai_api_key))

    def remove_swarm(self, index):
        try:
            self.swarms.pop(index)
        except IndexError:
            logging.error(f"No swarm found at index {index}")
        
    def get_progress(self):
        #this assumes that the swarms class has a get progress method
        pass

    def cancel_swarm(self, index):
        try:
            self.swarms[index].cancel()
        except IndexError:
            logging.error(f"No swarm found at index {index}")

    def queue_tasks(self, tasks):
        for task in tasks:
            self.run(task)