Spaces:
Runtime error
Runtime error
File size: 4,431 Bytes
4962437 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
import logging
from enum import Enum
from typing import Any
from chromadb.utils import embedding_functions
from swarms.workers.worker import Worker
class TaskStatus(Enum):
QUEUED = 1
RUNNING = 2
COMPLETED = 3
FAILED = 4
class ScalableGroupChat:
"""
This is a class to enable scalable groupchat like a telegram, it takes an Worker as an input
and handles all the logic to enable multi-agent collaboration at massive scale.
Worker -> ScalableGroupChat(Worker * 10)
-> every response is embedded and placed in chroma
-> every response is then retrieved by querying the database and sent then passed into the prompt of the worker
-> every worker is then updated with the new response
-> every worker can communicate at any time
-> every worker can communicate without restrictions in parallel
"""
def __init__(
self,
worker_count: int = 5,
collection_name: str = "swarm",
api_key: str = None,
):
self.workers = []
self.worker_count = worker_count
self.collection_name = collection_name
self.api_key = api_key
# Create a list of Worker instances with unique names
for i in range(worker_count):
self.workers.append(
Worker(
openai_api_key=api_key,
ai_name=f"Worker-{i}"
)
)
def embed(
self,
input,
model_name
):
"""Embeds an input of size N into a vector of size M"""
openai = embedding_functions.OpenAIEmbeddingFunction(
api_key=self.api_key,
model_name=model_name
)
embedding = openai(input)
return embedding
def retrieve_results(
self,
agent_id: int
) -> Any:
"""Retrieve results from a specific agent"""
try:
#Query the vector database for documents created by the agents
results = self.collection.query(
query_texts=[str(agent_id)],
n_results=10
)
return results
except Exception as e:
logging.error(f"Failed to retrieve results from agent {agent_id}. Error {e}")
raise
# @abstractmethod
def update_vector_db(self, data) -> None:
"""Update the vector database"""
try:
self.collection.add(
embeddings=[data["vector"]],
documents=[str(data["task_id"])],
ids=[str(data["task_id"])]
)
except Exception as e:
logging.error(f"Failed to update the vector database. Error: {e}")
raise
# @abstractmethod
def get_vector_db(self):
"""Retrieve the vector database"""
return self.collection
def append_to_db(
self,
result: str
):
"""append the result of the swarm to a specifici collection in the database"""
try:
self.collection.add(
documents=[result],
ids=[str(id(result))]
)
except Exception as e:
logging.error(f"Failed to append the agent output to database. Error: {e}")
raise
def chat(
self,
sender_id: int,
receiver_id: int,
message: str
):
"""
Allows the agents to chat with eachother thrught the vectordatabase
# Instantiate the ScalableGroupChat with 10 agents
orchestrator = ScalableGroupChat(
llm,
agent_list=[llm]*10,
task_queue=[]
)
# Agent 1 sends a message to Agent 2
orchestrator.chat(sender_id=1, receiver_id=2, message="Hello, Agent 2!")
"""
if sender_id < 0 or sender_id >= self.worker_count or receiver_id < 0 or receiver_id >= self.worker_count:
raise ValueError("Invalid sender or receiver ID")
message_vector = self.embed(
message,
)
#store the mesage in the vector database
self.collection.add(
embeddings=[message_vector],
documents=[message],
ids=[f"{sender_id}_to_{receiver_id}"]
)
self.run(
objective=f"chat with agent {receiver_id} about {message}"
)
|