From 84a844962681dd696741be9a22edc665b7724e85 Mon Sep 17 00:00:00 2001 From: Kye Date: Fri, 1 Dec 2023 00:30:26 -0800 Subject: [PATCH] [FEAT][AutoScaler Prototype][TESTS] --- autoscaler.py | 46 ++++++++++ swarms/structs/agent.py | 9 +- swarms/structs/autoscaler.py | 153 ++++++++++++++++++++++++------- tests/structs/test_autoscaler.py | 140 ++++++++++++++++++++++++++++ 4 files changed, 309 insertions(+), 39 deletions(-) create mode 100644 autoscaler.py create mode 100644 tests/structs/test_autoscaler.py diff --git a/autoscaler.py b/autoscaler.py new file mode 100644 index 00000000..8b808db6 --- /dev/null +++ b/autoscaler.py @@ -0,0 +1,46 @@ +import os + +from dotenv import load_dotenv + +# Import the OpenAIChat model and the Agent struct +from swarms.models import OpenAIChat +from swarms.structs import Agent +from swarms.structs.autoscaler import AutoScaler + + +# Load the environment variables +load_dotenv() + +# Get the API key from the environment +api_key = os.environ.get("OPENAI_API_KEY") + +# Initialize the language model +llm = OpenAIChat( + temperature=0.5, + openai_api_key=api_key, +) + + +## Initialize the workflow +agent = Agent(llm=llm, max_loops=1, dashboard=True) + + +# Load the autoscaler +autoscaler = AutoScaler( + initial_agents=2, + scale_up_factor=1, + idle_threshold=0.2, + busy_threshold=0.7, + agents=[agent], + autoscale=True, + min_agents=1, + max_agents=5, + custom_scale_strategy=None, +) +print(autoscaler) + +# Run the workflow on a task +out = autoscaler.run( + agent.id, "Generate a 10,000 word blog on health and wellness." +) +print(out) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index c79f4b5c..fe62dc20 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -10,7 +10,10 @@ from typing import Any, Callable, Dict, List, Optional, Tuple from termcolor import colored -from swarms.prompts.agent_system_prompts import FLOW_SYSTEM_PROMPT +from swarms.prompts.agent_system_prompts import ( + FLOW_SYSTEM_PROMPT, + agent_system_prompt_2, +) from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, ) @@ -23,11 +26,9 @@ from swarms.utils.parse_code import ( extract_code_in_backticks_in_string, ) from swarms.utils.pdf_to_text import pdf_to_text -from swarms.prompts.agent_system_prompts import ( - agent_system_prompt_2, -) +# Utils # Custom stopping condition def stop_when_repeats(response: str) -> bool: # Stop if the word stop appears in the response diff --git a/swarms/structs/autoscaler.py b/swarms/structs/autoscaler.py index 7d4894ad..1cb31333 100644 --- a/swarms/structs/autoscaler.py +++ b/swarms/structs/autoscaler.py @@ -2,7 +2,9 @@ import logging import queue import threading from time import sleep -from typing import Callable, Dict, List +from typing import Callable, Dict, List, Optional +import asyncio +import concurrent.futures from termcolor import colored @@ -16,38 +18,88 @@ from swarms.utils.decorators import ( class AutoScaler: """ - The AutoScaler is like a kubernetes pod, that autoscales an agent or worker or boss! + AutoScaler class - Wraps around a structure like SequentialWorkflow - and or Agent and parallelizes them on multiple threads so they're split across devices - and you can use them like that - Args: - - initial_agents (int, optional): Number of initial agents. Defaults to 10. - scale_up_factor (int, optional): Scale up factor. Defaults to 1. - idle_threshold (float, optional): Idle threshold. Defaults to 0.2. - busy_threshold (float, optional): Busy threshold. Defaults to 0.7. - agent ([type], optional): Agent. Defaults to None. + The AutoScaler class is responsible for managing the agents pool + and the task queue. It also monitors the health of the agents and + scales the pool up or down based on the number of pending tasks + and the current load of the agents. + Args: + initial_agents (Optional[int], optional): Initial number of + agents to start with. Defaults to 10. + scale_up_factor (int, optional): Factor by which to scale up + the agents pool. Defaults to 1. + idle_threshold (float, optional): Threshold for scaling down + the agents pool. Defaults to 0.2. + busy_threshold (float, optional): Threshold for scaling up + the agents pool. Defaults to 0.7. + agents (List[Agent], optional): List of agents to use in the + pool. Defaults to None. + autoscale (Optional[bool], optional): Whether to autoscale + the agents pool. Defaults to True. + min_agents (Optional[int], optional): Minimum number of + agents to keep in the pool. Defaults to 10. + max_agents (Optional[int], optional): Maximum number of + agents to keep in the pool. Defaults to 100. + custom_scale_strategy (Optional[Callable], optional): Custom + scaling strategy to use. Defaults to None. Methods: - add_task: Add task to queue - scale_up: Scale up - scale_down: Scale down + add_task: Add tasks to queue + scale_up: Add more agents + scale_down: scale down + run: Run agent the task on the agent id monitor_and_scale: Monitor and scale start: Start scaling - del_agent: Delete an agent + check_agent_health: Checks the health of each agent and + replaces unhealthy agents. + balance_load: Distributes tasks among agents based on their + current load. + set_scaling_strategy: Set a custom scaling strategy. + execute_scaling_strategy: Execute the custom scaling strategy + if defined. + report_agent_metrics: Collects and reports metrics from each + agent. + report: Reports the current state of the autoscaler. + print_dashboard: Prints a dashboard of the current state of + the autoscaler. - Usage - ``` - from swarms.swarms import AutoScaler - from swarms.structs.agent import Agent + Examples: + >>> import os + >>> from dotenv import load_dotenv + >>> # Import the OpenAIChat model and the Agent struct + >>> from swarms.models import OpenAIChat + >>> from swarms.structs import Agent + >>> from swarms.structs.autoscaler import AutoScaler + >>> # Load the environment variables + >>> load_dotenv() + >>> # Get the API key from the environment + >>> api_key = os.environ.get("OPENAI_API_KEY") + >>> # Initialize the language model + >>> llm = OpenAIChat( + ... temperature=0.5, + ... openai_api_key=api_key, + ... ) + >>> ## Initialize the workflow + >>> agent = Agent(llm=llm, max_loops=1, dashboard=True) + >>> # Load the autoscaler + >>> autoscaler = AutoScaler( + ... initial_agents=2, + ... scale_up_factor=1, + ... idle_threshold=0.2, + ... busy_threshold=0.7, + ... agents=[agent], + ... autoscale=True, + ... min_agents=1, + ... max_agents=5, + ... custom_scale_strategy=None, + ... ) + >>> print(autoscaler) + >>> # Run the workflow on a task + >>> out = autoscaler.run(agent.id, "Generate a 10,000 word blog on health and wellness.") + >>> print(out) - @AutoScaler - agent = Agent() - - agent.run("what is your name") - ``` """ @log_decorator @@ -55,25 +107,36 @@ class AutoScaler: @timing_decorator def __init__( self, - initial_agents=10, - scale_up_factor=1, - idle_threshold=0.2, - busy_threshold=0.7, - agent=None, + initial_agents: Optional[int] = 10, + scale_up_factor: int = 1, + idle_threshold: float = 0.2, + busy_threshold: float = 0.7, + agents: List[Agent] = None, + autoscale: Optional[bool] = True, + min_agents: Optional[int] = 10, + max_agents: Optional[int] = 100, + custom_scale_strategy: Optional[Callable] = None, + *args, + **kwargs, ): - self.agent = agent or Agent - self.agents_pool = [ - self.agent() for _ in range(initial_agents) + self.agents_pool = agents or [ + agents[0]() for _ in range(initial_agents) ] self.task_queue = queue.Queue() self.scale_up_factor = scale_up_factor self.idle_threshold = idle_threshold + self.busy_threshold = busy_threshold self.lock = threading.Lock() + self.agents = agents + self.autoscale = autoscale + self.min_agents = min_agents + self.max_agents = max_agents + self.custom_scale_strategy = custom_scale_strategy def add_task(self, task): """Add tasks to queue""" try: - self.tasks_queue.put(task) + self.task_queue.put(task) except Exception as error: print( f"Error adding task to queue: {error} try again with" @@ -91,7 +154,7 @@ class AutoScaler: len(self.agents_pool) * self.scale_up_factor ) for _ in range(new_agents_counts): - self.agents_pool.append(Agent()) + self.agents_pool.append(self.agents[0]()) except Exception as error: print( f"Error scaling up: {error} try again with a new task" @@ -111,6 +174,26 @@ class AutoScaler: " task" ) + def run( + self, agent_id, task: Optional[str] = None, *args, **kwargs + ): + """Run agent the task on the agent id + + Args: + agent_id (_type_): _description_ + task (str, optional): _description_. Defaults to None. + + Raises: + ValueError: _description_ + + Returns: + _type_: _description_ + """ + for agent in self.agents_pool: + if agent.id == agent_id: + return agent.run(task, *args, **kwargs) + raise ValueError(f"No agent found with ID {agent_id}") + @log_decorator @error_decorator @timing_decorator diff --git a/tests/structs/test_autoscaler.py b/tests/structs/test_autoscaler.py new file mode 100644 index 00000000..cfb9560e --- /dev/null +++ b/tests/structs/test_autoscaler.py @@ -0,0 +1,140 @@ +import os + +from dotenv import load_dotenv +from pytest import patch + +from swarms.models import OpenAIChat +from swarms.structs import Agent +from swarms.structs.autoscaler import AutoScaler + +load_dotenv() + +api_key = os.environ.get("OPENAI_API_KEY") +llm = OpenAIChat( + temperature=0.5, + openai_api_key=api_key, +) +agent = Agent(llm=llm, max_loops=1) + + +def test_autoscaler_init(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + assert autoscaler.initial_agents == 5 + assert autoscaler.scale_up_factor == 1 + assert autoscaler.idle_threshold == 0.2 + assert autoscaler.busy_threshold == 0.7 + assert autoscaler.autoscale == True + assert autoscaler.min_agents == 1 + assert autoscaler.max_agents == 5 + assert autoscaler.custom_scale_strategy == None + assert len(autoscaler.agents_pool) == 5 + assert autoscaler.task_queue.empty() == True + + +def test_autoscaler_add_task(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + autoscaler.add_task("task1") + assert autoscaler.task_queue.empty() == False + + +def test_autoscaler_run(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + out = autoscaler.run( + agent.id, + "Generate a 10,000 word blog on health and wellness.", + ) + assert ( + out == "Generate a 10,000 word blog on health and wellness." + ) + + +def test_autoscaler_add_agent(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + autoscaler.add_agent(agent) + assert len(autoscaler.agents_pool) == 6 + + +def test_autoscaler_remove_agent(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + autoscaler.remove_agent(agent) + assert len(autoscaler.agents_pool) == 4 + + +def test_autoscaler_get_agent(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + agent = autoscaler.get_agent() + assert isinstance(agent, Agent) + + +def test_autoscaler_get_agent_by_id(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + agent = autoscaler.get_agent_by_id(agent.id) + assert isinstance(agent, Agent) + + +def test_autoscaler_get_agent_by_id_not_found(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + agent = autoscaler.get_agent_by_id("fake_id") + assert agent == None + + +@patch("swarms.swarms.Agent.is_healthy") +def test_autoscaler_check_agent_health(mock_is_healthy): + mock_is_healthy.side_effect = [False, True, True, True, True] + autoscaler = AutoScaler(initial_agents=5, agent=agent) + autoscaler.check_agent_health() + assert mock_is_healthy.call_count == 5 + + +def test_autoscaler_balance_load(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + autoscaler.add_task("task1") + autoscaler.add_task("task2") + autoscaler.balance_load() + assert autoscaler.task_queue.empty() + + +def test_autoscaler_set_scaling_strategy(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + + def strategy(x, y): + return x - y + + autoscaler.set_scaling_strategy(strategy) + assert autoscaler.custom_scale_strategy == strategy + + +def test_autoscaler_execute_scaling_strategy(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + + def strategy(x, y): + return x - y + + autoscaler.set_scaling_strategy(strategy) + autoscaler.add_task("task1") + autoscaler.execute_scaling_strategy() + assert len(autoscaler.agents_pool) == 4 + + +def test_autoscaler_report_agent_metrics(): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + metrics = autoscaler.report_agent_metrics() + assert set(metrics.keys()) == { + "completion_time", + "success_rate", + "error_rate", + } + + +@patch("swarms.swarms.AutoScaler.report_agent_metrics") +def test_autoscaler_report(mock_report_agent_metrics): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + autoscaler.report() + mock_report_agent_metrics.assert_called_once() + + +@patch("builtins.print") +def test_autoscaler_print_dashboard(mock_print): + autoscaler = AutoScaler(initial_agents=5, agent=agent) + autoscaler.print_dashboard() + mock_print.assert_called()