Spaces:
Runtime error
Runtime error
File size: 2,796 Bytes
4962437 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
from swarms.workers.worker import Worker
from queue import Queue, PriorityQueue
class SimpleSwarm:
def __init__(
self,
num_workers,
openai_api_key,
ai_name
):
"""
Usage:
# Initialize the swarm with 5 workers, an API key, and a name for the AI model
swarm = SimpleSwarm(num_workers=5, openai_api_key="YOUR_OPENAI_API_KEY", ai_name="Optimus Prime")
# Normal task without priority
normal_task = "Describe the process of photosynthesis in simple terms."
swarm.distribute_task(normal_task)
# Priority task; lower numbers indicate higher priority (e.g., 1 is higher priority than 2)
priority_task = "Translate the phrase 'Hello World' to French."
swarm.distribute_task(priority_task, priority=1)
# Run the tasks and gather the responses
responses = swarm.run()
# Print responses
for response in responses:
print(response)
# Providing feedback to the system (this is a stubbed method and won't produce a visible effect, but serves as an example)
swarm.provide_feedback("Improve translation accuracy.")
# Perform a health check on the agents (this is also a stubbed method, illustrating potential usage)
swarm.health_check()
"""
self.workers = [
Worker(openai_api_key, ai_name) for _ in range(num_workers)
]
self.task_queue = Queue()
self.priority_queue = PriorityQueue()
def distribute(
self,
task,
priority=None
):
"""Distribute a task to the workers"""
if priority:
self.priority_queue.put((priority, task))
else:
self.task_queue.put(task)
def _process_task(self, task):
#TODO, Implement load balancing, fallback mechanism
for worker in self.workers:
response = worker.run(task)
if response:
return response
return "All Agents failed"
def run(self):
"""Run the simple swarm"""
responses = []
#process high priority tasks first
while not self.priority_queue.empty():
_, task = self.priority_queue.get()
responses.append(self._process_task(task))
#process normal tasks
while not self.task_queue.empty():
task = self.task_queue.get()
responses.append(self._process_task(task))
return responses
def run_old(self, task):
responses = []
for worker in self.workers:
response = worker.run(task)
responses.append(response)
return responses
def __call__(self, task):
return self.run(task)
|