63 lines
2.1 KiB
63 lines
2.1 KiB
import threading
|
|
import queue
|
|
from time import sleep
|
|
from swarms.workers.autobot import AutoBot
|
|
|
|
# TODO Handle task assignment and task delegation
|
|
# TODO: User task => decomposed into very small sub tasks => sub tasks assigned to workers => workers complete and update the swarm, can ask for help from other agents.
|
|
# TODO: Missing, Task Assignment, Task delegation, Task completion, Swarm level communication with vector db
|
|
|
|
class AutoScaler:
|
|
def __init__(self,
|
|
initial_agents=10,
|
|
scale_up_factor=1,
|
|
idle_threshold=0.2,
|
|
busy_threshold=0.7
|
|
):
|
|
self.agents_pool = [AutoBot() for _ in range(initial_agents)]
|
|
self.task_queue = queue.Queue()
|
|
self.scale_up_factor = scale_up_factor
|
|
self.idle_threshold = idle_threshold
|
|
self.lock = threading.Lock()
|
|
|
|
def add_task(self, task):
|
|
self.tasks_queue.put(task)
|
|
|
|
def scale_up(self):
|
|
with self.lock:
|
|
new_agents_counts = len(self.agents_pool) * self.scale_up_factor
|
|
for _ in range(new_agents_counts):
|
|
self.agents_pool.append(AutoBot())
|
|
|
|
def scale_down(self):
|
|
with self.lock:
|
|
if len(self.agents_pool) > 10: #ensure minmum of 10 agents
|
|
del self.agents_pool[-1] #remove last agent
|
|
|
|
def monitor_and_scale(self):
|
|
while True:
|
|
sleep(60)#check minute
|
|
pending_tasks = self.task_queue.qsize()
|
|
active_agents = sum([1 for agent in self.agents_pool if agent.is_busy()])
|
|
|
|
if pending_tasks / len(self.agents_pool) > self.busy_threshold:
|
|
self.scale_up()
|
|
elif active_agents / len(self.agents_pool) < self.idle_threshold:
|
|
self.scale_down()
|
|
|
|
def start(self):
|
|
monitor_thread = threading.Thread(target=self.monitor_and_scale)
|
|
monitor_thread.start()
|
|
|
|
while True:
|
|
task = self.task_queue.get()
|
|
if task:
|
|
available_agent = next((agent for agent in self.agents_pool))
|
|
if available_agent:
|
|
available_agent.run(task)
|
|
|
|
|
|
|
|
|
|
|
|
|