File size: 4,431 Bytes
4962437
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import logging
from enum import Enum
from typing import Any

from chromadb.utils import embedding_functions

from swarms.workers.worker import Worker


class TaskStatus(Enum):
    QUEUED = 1
    RUNNING = 2
    COMPLETED = 3
    FAILED = 4

class ScalableGroupChat:
    """
    This is a class to enable scalable groupchat like a telegram, it takes an Worker as an input
    and handles all the logic to enable multi-agent collaboration at massive scale.

    Worker -> ScalableGroupChat(Worker * 10) 
    -> every response is embedded and placed in chroma
    -> every response is then retrieved by querying the database and sent then passed into the prompt of the worker
    -> every worker is then updated with the new response
    -> every worker can communicate at any time
    -> every worker can communicate without restrictions in parallel

    """
    def __init__(
        self,
        worker_count: int = 5,
        collection_name: str = "swarm",
        api_key: str = None,
    ):
        self.workers = []
        self.worker_count = worker_count
        self.collection_name = collection_name
        self.api_key = api_key

        # Create a list of Worker instances with unique names
        for i in range(worker_count):
            self.workers.append(
                Worker(
                    openai_api_key=api_key, 
                    ai_name=f"Worker-{i}"
                )
            )
        
    def embed(
        self, 
        input, 
        model_name
    ):
        """Embeds an input of size N into a vector of size M"""
        openai = embedding_functions.OpenAIEmbeddingFunction(
            api_key=self.api_key,
            model_name=model_name
        )

        embedding = openai(input)

        return embedding
                
    
    def retrieve_results(
        self, 
        agent_id: int
    ) -> Any:
        """Retrieve results from a specific agent"""

        try:
            #Query the vector database for documents created by the agents 
            results = self.collection.query(
                query_texts=[str(agent_id)], 
                n_results=10
            )

            return results
        except Exception as e:
            logging.error(f"Failed to retrieve results from agent {agent_id}. Error {e}")
            raise
    
    # @abstractmethod
    def update_vector_db(self, data) -> None:
        """Update the vector database"""

        try:
            self.collection.add(
                embeddings=[data["vector"]],
                documents=[str(data["task_id"])],
                ids=[str(data["task_id"])]
            )

        except Exception as e:
            logging.error(f"Failed to update the vector database. Error: {e}")
            raise


    # @abstractmethod
    def get_vector_db(self):
        """Retrieve the vector database"""
        return self.collection
    

    def append_to_db(
        self, 
        result: str
    ):
        """append the result of the swarm to a specifici collection in the database"""

        try:
            self.collection.add(
                documents=[result],
                ids=[str(id(result))]
            )

        except Exception as e:
            logging.error(f"Failed to append the agent output to database. Error: {e}")
            raise

    
    
    def chat(
        self,
        sender_id: int,
        receiver_id: int,
        message: str
    ):
        """
        
        Allows the agents to chat with eachother thrught the vectordatabase

        # Instantiate the ScalableGroupChat with 10 agents
        orchestrator = ScalableGroupChat(
            llm, 
            agent_list=[llm]*10, 
            task_queue=[]
        )

        # Agent 1 sends a message to Agent 2
        orchestrator.chat(sender_id=1, receiver_id=2, message="Hello, Agent 2!")
            
        """
        if sender_id < 0 or sender_id >= self.worker_count or receiver_id < 0 or receiver_id >= self.worker_count:
            raise ValueError("Invalid sender or receiver ID")
        
        message_vector = self.embed(
            message,
        )

        #store the mesage in the vector database
        self.collection.add(
            embeddings=[message_vector],
            documents=[message],
            ids=[f"{sender_id}_to_{receiver_id}"]
        )

        self.run(
            objective=f"chat with agent {receiver_id} about {message}"
        )