[FEAT][AutoScaler Prototype][TESTS]

pull/243/head
Kye 1 year ago
parent 8d0b751ec2
commit 84a8449626

@ -0,0 +1,46 @@
import os
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms.models import OpenAIChat
from swarms.structs import Agent
from swarms.structs.autoscaler import AutoScaler
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = OpenAIChat(
temperature=0.5,
openai_api_key=api_key,
)
## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, dashboard=True)
# Load the autoscaler
autoscaler = AutoScaler(
initial_agents=2,
scale_up_factor=1,
idle_threshold=0.2,
busy_threshold=0.7,
agents=[agent],
autoscale=True,
min_agents=1,
max_agents=5,
custom_scale_strategy=None,
)
print(autoscaler)
# Run the workflow on a task
out = autoscaler.run(
agent.id, "Generate a 10,000 word blog on health and wellness."
)
print(out)

@ -10,7 +10,10 @@ from typing import Any, Callable, Dict, List, Optional, Tuple
from termcolor import colored from termcolor import colored
from swarms.prompts.agent_system_prompts import FLOW_SYSTEM_PROMPT from swarms.prompts.agent_system_prompts import (
FLOW_SYSTEM_PROMPT,
agent_system_prompt_2,
)
from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
) )
@ -23,11 +26,9 @@ from swarms.utils.parse_code import (
extract_code_in_backticks_in_string, extract_code_in_backticks_in_string,
) )
from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.pdf_to_text import pdf_to_text
from swarms.prompts.agent_system_prompts import (
agent_system_prompt_2,
)
# Utils
# Custom stopping condition # Custom stopping condition
def stop_when_repeats(response: str) -> bool: def stop_when_repeats(response: str) -> bool:
# Stop if the word stop appears in the response # Stop if the word stop appears in the response

@ -2,7 +2,9 @@ import logging
import queue import queue
import threading import threading
from time import sleep from time import sleep
from typing import Callable, Dict, List from typing import Callable, Dict, List, Optional
import asyncio
import concurrent.futures
from termcolor import colored from termcolor import colored
@ -16,38 +18,88 @@ from swarms.utils.decorators import (
class AutoScaler: class AutoScaler:
""" """
The AutoScaler is like a kubernetes pod, that autoscales an agent or worker or boss! AutoScaler class
Wraps around a structure like SequentialWorkflow The AutoScaler class is responsible for managing the agents pool
and or Agent and parallelizes them on multiple threads so they're split across devices and the task queue. It also monitors the health of the agents and
and you can use them like that scales the pool up or down based on the number of pending tasks
Args: and the current load of the agents.
initial_agents (int, optional): Number of initial agents. Defaults to 10.
scale_up_factor (int, optional): Scale up factor. Defaults to 1.
idle_threshold (float, optional): Idle threshold. Defaults to 0.2.
busy_threshold (float, optional): Busy threshold. Defaults to 0.7.
agent ([type], optional): Agent. Defaults to None.
Args:
initial_agents (Optional[int], optional): Initial number of
agents to start with. Defaults to 10.
scale_up_factor (int, optional): Factor by which to scale up
the agents pool. Defaults to 1.
idle_threshold (float, optional): Threshold for scaling down
the agents pool. Defaults to 0.2.
busy_threshold (float, optional): Threshold for scaling up
the agents pool. Defaults to 0.7.
agents (List[Agent], optional): List of agents to use in the
pool. Defaults to None.
autoscale (Optional[bool], optional): Whether to autoscale
the agents pool. Defaults to True.
min_agents (Optional[int], optional): Minimum number of
agents to keep in the pool. Defaults to 10.
max_agents (Optional[int], optional): Maximum number of
agents to keep in the pool. Defaults to 100.
custom_scale_strategy (Optional[Callable], optional): Custom
scaling strategy to use. Defaults to None.
Methods: Methods:
add_task: Add task to queue add_task: Add tasks to queue
scale_up: Scale up scale_up: Add more agents
scale_down: Scale down scale_down: scale down
run: Run agent the task on the agent id
monitor_and_scale: Monitor and scale monitor_and_scale: Monitor and scale
start: Start scaling start: Start scaling
del_agent: Delete an agent check_agent_health: Checks the health of each agent and
replaces unhealthy agents.
balance_load: Distributes tasks among agents based on their
current load.
set_scaling_strategy: Set a custom scaling strategy.
execute_scaling_strategy: Execute the custom scaling strategy
if defined.
report_agent_metrics: Collects and reports metrics from each
agent.
report: Reports the current state of the autoscaler.
print_dashboard: Prints a dashboard of the current state of
the autoscaler.
Usage Examples:
``` >>> import os
from swarms.swarms import AutoScaler >>> from dotenv import load_dotenv
from swarms.structs.agent import Agent >>> # Import the OpenAIChat model and the Agent struct
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import Agent
>>> from swarms.structs.autoscaler import AutoScaler
>>> # Load the environment variables
>>> load_dotenv()
>>> # Get the API key from the environment
>>> api_key = os.environ.get("OPENAI_API_KEY")
>>> # Initialize the language model
>>> llm = OpenAIChat(
... temperature=0.5,
... openai_api_key=api_key,
... )
>>> ## Initialize the workflow
>>> agent = Agent(llm=llm, max_loops=1, dashboard=True)
>>> # Load the autoscaler
>>> autoscaler = AutoScaler(
... initial_agents=2,
... scale_up_factor=1,
... idle_threshold=0.2,
... busy_threshold=0.7,
... agents=[agent],
... autoscale=True,
... min_agents=1,
... max_agents=5,
... custom_scale_strategy=None,
... )
>>> print(autoscaler)
>>> # Run the workflow on a task
>>> out = autoscaler.run(agent.id, "Generate a 10,000 word blog on health and wellness.")
>>> print(out)
@AutoScaler
agent = Agent()
agent.run("what is your name")
```
""" """
@log_decorator @log_decorator
@ -55,25 +107,36 @@ class AutoScaler:
@timing_decorator @timing_decorator
def __init__( def __init__(
self, self,
initial_agents=10, initial_agents: Optional[int] = 10,
scale_up_factor=1, scale_up_factor: int = 1,
idle_threshold=0.2, idle_threshold: float = 0.2,
busy_threshold=0.7, busy_threshold: float = 0.7,
agent=None, agents: List[Agent] = None,
autoscale: Optional[bool] = True,
min_agents: Optional[int] = 10,
max_agents: Optional[int] = 100,
custom_scale_strategy: Optional[Callable] = None,
*args,
**kwargs,
): ):
self.agent = agent or Agent self.agents_pool = agents or [
self.agents_pool = [ agents[0]() for _ in range(initial_agents)
self.agent() for _ in range(initial_agents)
] ]
self.task_queue = queue.Queue() self.task_queue = queue.Queue()
self.scale_up_factor = scale_up_factor self.scale_up_factor = scale_up_factor
self.idle_threshold = idle_threshold self.idle_threshold = idle_threshold
self.busy_threshold = busy_threshold
self.lock = threading.Lock() self.lock = threading.Lock()
self.agents = agents
self.autoscale = autoscale
self.min_agents = min_agents
self.max_agents = max_agents
self.custom_scale_strategy = custom_scale_strategy
def add_task(self, task): def add_task(self, task):
"""Add tasks to queue""" """Add tasks to queue"""
try: try:
self.tasks_queue.put(task) self.task_queue.put(task)
except Exception as error: except Exception as error:
print( print(
f"Error adding task to queue: {error} try again with" f"Error adding task to queue: {error} try again with"
@ -91,7 +154,7 @@ class AutoScaler:
len(self.agents_pool) * self.scale_up_factor len(self.agents_pool) * self.scale_up_factor
) )
for _ in range(new_agents_counts): for _ in range(new_agents_counts):
self.agents_pool.append(Agent()) self.agents_pool.append(self.agents[0]())
except Exception as error: except Exception as error:
print( print(
f"Error scaling up: {error} try again with a new task" f"Error scaling up: {error} try again with a new task"
@ -111,6 +174,26 @@ class AutoScaler:
" task" " task"
) )
def run(
self, agent_id, task: Optional[str] = None, *args, **kwargs
):
"""Run agent the task on the agent id
Args:
agent_id (_type_): _description_
task (str, optional): _description_. Defaults to None.
Raises:
ValueError: _description_
Returns:
_type_: _description_
"""
for agent in self.agents_pool:
if agent.id == agent_id:
return agent.run(task, *args, **kwargs)
raise ValueError(f"No agent found with ID {agent_id}")
@log_decorator @log_decorator
@error_decorator @error_decorator
@timing_decorator @timing_decorator

@ -0,0 +1,140 @@
import os
from dotenv import load_dotenv
from pytest import patch
from swarms.models import OpenAIChat
from swarms.structs import Agent
from swarms.structs.autoscaler import AutoScaler
load_dotenv()
api_key = os.environ.get("OPENAI_API_KEY")
llm = OpenAIChat(
temperature=0.5,
openai_api_key=api_key,
)
agent = Agent(llm=llm, max_loops=1)
def test_autoscaler_init():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
assert autoscaler.initial_agents == 5
assert autoscaler.scale_up_factor == 1
assert autoscaler.idle_threshold == 0.2
assert autoscaler.busy_threshold == 0.7
assert autoscaler.autoscale == True
assert autoscaler.min_agents == 1
assert autoscaler.max_agents == 5
assert autoscaler.custom_scale_strategy == None
assert len(autoscaler.agents_pool) == 5
assert autoscaler.task_queue.empty() == True
def test_autoscaler_add_task():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
autoscaler.add_task("task1")
assert autoscaler.task_queue.empty() == False
def test_autoscaler_run():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
out = autoscaler.run(
agent.id,
"Generate a 10,000 word blog on health and wellness.",
)
assert (
out == "Generate a 10,000 word blog on health and wellness."
)
def test_autoscaler_add_agent():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
autoscaler.add_agent(agent)
assert len(autoscaler.agents_pool) == 6
def test_autoscaler_remove_agent():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
autoscaler.remove_agent(agent)
assert len(autoscaler.agents_pool) == 4
def test_autoscaler_get_agent():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
agent = autoscaler.get_agent()
assert isinstance(agent, Agent)
def test_autoscaler_get_agent_by_id():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
agent = autoscaler.get_agent_by_id(agent.id)
assert isinstance(agent, Agent)
def test_autoscaler_get_agent_by_id_not_found():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
agent = autoscaler.get_agent_by_id("fake_id")
assert agent == None
@patch("swarms.swarms.Agent.is_healthy")
def test_autoscaler_check_agent_health(mock_is_healthy):
mock_is_healthy.side_effect = [False, True, True, True, True]
autoscaler = AutoScaler(initial_agents=5, agent=agent)
autoscaler.check_agent_health()
assert mock_is_healthy.call_count == 5
def test_autoscaler_balance_load():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
autoscaler.add_task("task1")
autoscaler.add_task("task2")
autoscaler.balance_load()
assert autoscaler.task_queue.empty()
def test_autoscaler_set_scaling_strategy():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
def strategy(x, y):
return x - y
autoscaler.set_scaling_strategy(strategy)
assert autoscaler.custom_scale_strategy == strategy
def test_autoscaler_execute_scaling_strategy():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
def strategy(x, y):
return x - y
autoscaler.set_scaling_strategy(strategy)
autoscaler.add_task("task1")
autoscaler.execute_scaling_strategy()
assert len(autoscaler.agents_pool) == 4
def test_autoscaler_report_agent_metrics():
autoscaler = AutoScaler(initial_agents=5, agent=agent)
metrics = autoscaler.report_agent_metrics()
assert set(metrics.keys()) == {
"completion_time",
"success_rate",
"error_rate",
}
@patch("swarms.swarms.AutoScaler.report_agent_metrics")
def test_autoscaler_report(mock_report_agent_metrics):
autoscaler = AutoScaler(initial_agents=5, agent=agent)
autoscaler.report()
mock_report_agent_metrics.assert_called_once()
@patch("builtins.print")
def test_autoscaler_print_dashboard(mock_print):
autoscaler = AutoScaler(initial_agents=5, agent=agent)
autoscaler.print_dashboard()
mock_print.assert_called()
Loading…
Cancel
Save