omni_bot / swarms /swarms /autoscaler.py
WAWAA's picture
Upload folder using huggingface_hub
4962437
raw
history blame contribute delete
No virus
3.01 kB
import queue
import threading
from time import sleep
from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator
from swarms.workers.worker import Worker
class AutoScaler:
"""
The AutoScaler is like a kubernetes pod, that autoscales an agent or worker or boss!
# TODO Handle task assignment and task delegation
# TODO: User task => decomposed into very small sub tasks => sub tasks assigned to workers => workers complete and update the swarm, can ask for help from other agents.
# TODO: Missing, Task Assignment, Task delegation, Task completion, Swarm level communication with vector db
Example
```
# usage of usage
auto_scaler = AutoScaler(agent=YourCustomAgent)
auto_scaler.start()
for i in range(100):
auto_scaler.add_task9f"task {I}})
```
"""
@log_decorator
@error_decorator
@timing_decorator
def __init__(
self,
initial_agents=10,
scale_up_factor=1,
idle_threshold=0.2,
busy_threshold=0.7,
agent=None,
):
self.agent = agent or Worker
self.agents_pool = [self.agent() for _ in range(initial_agents)]
self.task_queue = queue.Queue()
self.scale_up_factor = scale_up_factor
self.idle_threshold = idle_threshold
self.lock = threading.Lock()
def add_task(self, task):
self.tasks_queue.put(task)
@log_decorator
@error_decorator
@timing_decorator
def scale_up(self):
with self.lock:
new_agents_counts = len(self.agents_pool) * self.scale_up_factor
for _ in range(new_agents_counts):
self.agents_pool.append(Worker())
def scale_down(self):
with self.lock:
if len(self.agents_pool) > 10: #ensure minmum of 10 agents
del self.agents_pool[-1] #remove last agent
@log_decorator
@error_decorator
@timing_decorator
def monitor_and_scale(self):
while True:
sleep(60)#check minute
pending_tasks = self.task_queue.qsize()
active_agents = sum([1 for agent in self.agents_pool if agent.is_busy()])
if pending_tasks / len(self.agents_pool) > self.busy_threshold:
self.scale_up()
elif active_agents / len(self.agents_pool) < self.idle_threshold:
self.scale_down()
@log_decorator
@error_decorator
@timing_decorator
def start(self):
monitor_thread = threading.Thread(target=self.monitor_and_scale)
monitor_thread.start()
while True:
task = self.task_queue.get()
if task:
available_agent = next((agent for agent in self.agents_pool))
if available_agent:
available_agent.run(task)
def del_agent(self):
with self.lock:
if self.agents_pool:
agent_to_remove = self.agents_poo.pop()
del agent_to_remove