omni_bot / swarms /swarms /simple_swarm.py
Zack Zitting Bradshaw
Upload folder using huggingface_hub
4962437
from swarms.workers.worker import Worker
from queue import Queue, PriorityQueue
class SimpleSwarm:
def __init__(
self,
num_workers,
openai_api_key,
ai_name
):
"""
Usage:
# Initialize the swarm with 5 workers, an API key, and a name for the AI model
swarm = SimpleSwarm(num_workers=5, openai_api_key="YOUR_OPENAI_API_KEY", ai_name="Optimus Prime")
# Normal task without priority
normal_task = "Describe the process of photosynthesis in simple terms."
swarm.distribute_task(normal_task)
# Priority task; lower numbers indicate higher priority (e.g., 1 is higher priority than 2)
priority_task = "Translate the phrase 'Hello World' to French."
swarm.distribute_task(priority_task, priority=1)
# Run the tasks and gather the responses
responses = swarm.run()
# Print responses
for response in responses:
print(response)
# Providing feedback to the system (this is a stubbed method and won't produce a visible effect, but serves as an example)
swarm.provide_feedback("Improve translation accuracy.")
# Perform a health check on the agents (this is also a stubbed method, illustrating potential usage)
swarm.health_check()
"""
self.workers = [
Worker(openai_api_key, ai_name) for _ in range(num_workers)
]
self.task_queue = Queue()
self.priority_queue = PriorityQueue()
def distribute(
self,
task,
priority=None
):
"""Distribute a task to the workers"""
if priority:
self.priority_queue.put((priority, task))
else:
self.task_queue.put(task)
def _process_task(self, task):
#TODO, Implement load balancing, fallback mechanism
for worker in self.workers:
response = worker.run(task)
if response:
return response
return "All Agents failed"
def run(self):
"""Run the simple swarm"""
responses = []
#process high priority tasks first
while not self.priority_queue.empty():
_, task = self.priority_queue.get()
responses.append(self._process_task(task))
#process normal tasks
while not self.task_queue.empty():
task = self.task_queue.get()
responses.append(self._process_task(task))
return responses
def run_old(self, task):
responses = []
for worker in self.workers:
response = worker.run(task)
responses.append(response)
return responses
def __call__(self, task):
return self.run(task)