From e79b369e96fdf96dd34c52527b536d3224f66ea1 Mon Sep 17 00:00:00 2001 From: Kye Date: Wed, 11 Oct 2023 19:55:58 -0400 Subject: [PATCH] autoscaler --- swarms/swarms/autoscaler.py | 26 ++++++++++++++++++++++++-- swarms/swarms/base.py | 10 ++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/swarms/swarms/autoscaler.py b/swarms/swarms/autoscaler.py index c85cd13f..e9d47334 100644 --- a/swarms/swarms/autoscaler.py +++ b/swarms/swarms/autoscaler.py @@ -1,7 +1,6 @@ import queue import threading from time import sleep - from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator from swarms.workers.worker import Worker @@ -14,7 +13,24 @@ class AutoScaler: # TODO: Missing, Task Assignment, Task delegation, Task completion, Swarm level communication with vector db - Example + Args: + + initial_agents (int, optional): Number of initial agents. Defaults to 10. + scale_up_factor (int, optional): Scale up factor. Defaults to 1. + idle_threshold (float, optional): Idle threshold. Defaults to 0.2. + busy_threshold (float, optional): Busy threshold. Defaults to 0.7. + agent ([type], optional): Agent. Defaults to None. + + + Methods: + add_task: Add task to queue + scale_up: Scale up + scale_down: Scale down + monitor_and_scale: Monitor and scale + start: Start scaling + del_agent: Delete an agent + + Usage ``` # usage of usage auto_scaler = AutoScaler(agent=YourCustomAgent) @@ -44,18 +60,21 @@ class AutoScaler: self.lock = threading.Lock() def add_task(self, task): + """Add tasks to queue""" self.tasks_queue.put(task) @log_decorator @error_decorator @timing_decorator def scale_up(self): + """Add more agents""" with self.lock: new_agents_counts = len(self.agents_pool) * self.scale_up_factor for _ in range(new_agents_counts): self.agents_pool.append(Worker()) def scale_down(self): + """scale down""" with self.lock: if len(self.agents_pool) > 10: # ensure minmum of 10 agents del self.agents_pool[-1] # remove last agent @@ -64,6 +83,7 @@ class AutoScaler: @error_decorator @timing_decorator def monitor_and_scale(self): + """Monitor and scale""" while True: sleep(60) # check minute pending_tasks = self.task_queue.qsize() @@ -78,6 +98,7 @@ class AutoScaler: @error_decorator @timing_decorator def start(self): + """Start scaling""" monitor_thread = threading.Thread(target=self.monitor_and_scale) monitor_thread.start() @@ -89,6 +110,7 @@ class AutoScaler: available_agent.run(task) def del_agent(self): + """Delete an agent""" with self.lock: if self.agents_pool: agent_to_remove = self.agents_poo.pop() diff --git a/swarms/swarms/base.py b/swarms/swarms/base.py index 8a0f4428..2c3f6f92 100644 --- a/swarms/swarms/base.py +++ b/swarms/swarms/base.py @@ -3,6 +3,10 @@ from typing import Optional, List from swarms.workers.base import AbstractWorker class AbstractSwarm(ABC): + """ + Abstract class for swarm simulation architectures + + """ # TODO: Pass in abstract LLM class that can utilize Hf or Anthropic models, Move away from OPENAI # TODO: ADD Universal Communication Layer, a ocean vectorstore instance # TODO: BE MORE EXPLICIT ON TOOL USE, TASK DECOMPOSITION AND TASK COMPLETETION AND ALLOCATION @@ -63,3 +67,9 @@ class AbstractSwarm(ABC): ): """Send a direct message to a worker""" pass + + @abstractmethod + def autoscaler(self, num_workers: int, worker: ["AbstractWorker"]): + """Autoscaler that acts like kubernetes for autonomous agents""" + pass +