[FEAT][AutoSwarmRouter] [AutoSwarm]

pull/429/head
Kye 10 months ago
parent 3d12866f7c
commit e412cc0fd1

@ -47,3 +47,4 @@ PSG_CONNECTION_STRING=""
GITHUB_USERNAME="" GITHUB_USERNAME=""
GITHUB_REPO_NAME="" GITHUB_REPO_NAME=""
GITHUB_TOKEN="" GITHUB_TOKEN=""
USE_TELEMETRY=True

1
.gitignore vendored

@ -29,6 +29,7 @@ stderr_log.txt
.vscode .vscode
.DS_STORE .DS_STORE
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
Transcript Generator_state.json
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class

@ -19,7 +19,4 @@ agent = Agent(
) )
# Run the workflow on a task # Run the workflow on a task
out = agent( agent("Generate a transcript for a youtube video on what swarms are!")
"Generate a transcript for a youtube video on what swarms are!"
)
print(out)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "4.5.8" version = "4.6.0"
description = "Swarms - Pytorch" description = "Swarms - Pytorch"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]
@ -80,10 +80,8 @@ mypy-protobuf = "^3.0.0"
[tool.ruff] [tool.ruff]
line-length = 70 line-length = 70
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
select = ["E4", "E7", "E9", "F"] select = ["E4", "E7", "E9", "F"]
ignore = [] ignore = []
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"] fixable = ["ALL"]
unfixable = [] unfixable = []

@ -74,6 +74,7 @@ from swarms.structs.utils import (
find_token_in_text, find_token_in_text,
parse_tasks, parse_tasks,
) )
from swarms.structs.auto_swarm import AutoSwarm, AutoSwarmRouter
__all__ = [ __all__ = [
"Agent", "Agent",
@ -139,4 +140,6 @@ __all__ = [
"MultiProcessWorkflow", "MultiProcessWorkflow",
"MultiThreadedWorkflow", "MultiThreadedWorkflow",
"AgentJob", "AgentJob",
"AutoSwarm",
"AutoSwarmRouter",
] ]

@ -0,0 +1,213 @@
from typing import Any, Callable, Dict, Optional, Sequence
from swarms.models.base_llm import AbstractLLM
from swarms.structs.base_swarm import AbstractSwarm
from swarms.utils.loguru_logger import logger
class SequentialAccountingSwarm(AbstractSwarm):
"""SequentialAccountingSwarm class represents a swarm of agents that can be created automatically.
Flow:
name -> router -> swarm entry point
Attributes:
name (str, optional): The name of the swarm. Defaults to "kyegomez/sequential-accounting-swarm".
description (str, optional): The description of the swarm. Defaults to None.
verbose (bool): Whether to enable verbose mode or not. Defaults to False.
custom_params (dict, optional): Custom parameters for the swarm. Defaults to None.
iters (int, optional): The number of iterations for the swarm simulation. Defaults to 100.
max_agents (int, optional): The maximum number of agents in the swarm. Defaults to 100.
agents (Sequence[AbstractLLM], optional): The list of agents in the swarm. Defaults to None.
Methods:
run(task: str = None, *args, **kwargs) -> Any:
Run the swarm simulation.
"""
def __init__(
self,
name: Optional[str] = "kyegomez/sequential-accounting-swarm",
description: Optional[str] = None,
verbose: bool = False,
custom_params: Optional[Dict[str, Any]] = None,
iters: Optional[int] = 100,
max_agents: Optional[int] = 100,
agents: Sequence[AbstractLLM] = None,
):
super().__init__()
self.name = name
self.description = description
self.verbose = verbose
self.custom_params = custom_params
self.iters = iters
self.max_agents = max_agents
self.agents = agents
def run(self, task: str = None, *args, **kwargs):
"""Run the swarm simulation.
Args:
task (str, optional): The task to be performed by the agents. Defaults to None.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Any: The final result of the swarm simulation.
"""
for agent in self.agents:
out = agent.run(task, *args, **kwargs)
final = agent.run(out)
return final
class AutoSwarmRouter(AbstractSwarm):
"""AutoSwarmRouter class represents a router for the AutoSwarm class.
This class is responsible for routing tasks to the appropriate swarm based on the provided name.
It allows customization of the preprocessing, routing, and postprocessing of tasks.
Attributes:
name (str): The name of the router.
description (str): The description of the router.
verbose (bool): Whether to enable verbose mode.
custom_params (dict): Custom parameters for the router.
swarms (list): A list of AbstractSwarm objects.
custom_preprocess (callable): Custom preprocessing function for tasks.
custom_postprocess (callable): Custom postprocessing function for task results.
custom_router (callable): Custom routing function for tasks.
Methods:
run(task: str = None, *args, **kwargs) -> Any:
Run the swarm simulation and route the task to the appropriate swarm.
Flow:
name -> router -> swarm entry point
"""
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
verbose: bool = False,
custom_params: Optional[Dict[str, Any]] = None,
swarms: Sequence[AbstractSwarm] = None,
custom_preprocess: Optional[Callable] = None,
custom_postprocess: Optional[Callable] = None,
custom_router: Optional[Callable] = None,
):
super().__init__()
self.name = name
self.description = description
self.verbose = verbose
self.custom_params = custom_params
self.swarms = swarms
self.custom_preprocess = custom_preprocess
self.custom_postprocess = custom_postprocess
self.custom_router = custom_router
# Create a dictionary of swarms
self.swarm_dict = {swarm.name: swarm for swarm in self.swarms}
def run(self, task: str = None, *args, **kwargs):
try:
"""Run the swarm simulation and route the task to the appropriate swarm."""
if self.custom_preprocess:
# If custom preprocess function is provided then run it
logger.info("Running custom preprocess function.")
task, args, kwargs = self.custom_preprocess(
task, args, kwargs
)
if self.custom_router:
# If custom router function is provided then use it to route the task
logger.info("Running custom router function.")
out = self.custom_router(self, task, *args, **kwargs)
if self.custom_postprocess:
# If custom postprocess function is provided then run it
out = self.custom_postprocess(out)
return out
if self.name in self.swarm_dict:
# If a match is found then send the task to the swarm
out = self.swarm_dict[self.name].run(
task, *args, **kwargs
)
if self.custom_postprocess:
# If custom postprocess function is provided then run it
out = self.custom_postprocess(out)
return out
# If no match is found then return None
raise ValueError(
f"Swarm with name {self.name} not found."
)
except Exception as e:
logger.error(f"Error: {e}")
raise e
class AutoSwarm(AbstractSwarm):
"""AutoSwarm class represents a swarm of agents that can be created automatically.
Flow:
name -> router -> swarm entry point
Args:
name (Optional[str]): The name of the swarm. Defaults to None.
description (Optional[str]): The description of the swarm. Defaults to None.
verbose (bool): Whether to enable verbose mode. Defaults to False.
custom_params (Optional[Dict[str, Any]]): Custom parameters for the swarm. Defaults to None.
router (Optional[AutoSwarmRouter]): The router for the swarm. Defaults to None.
"""
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
verbose: bool = False,
custom_params: Optional[Dict[str, Any]] = None,
router: Optional[AutoSwarmRouter] = None,
custom_preprocess: Optional[Callable] = None,
custom_postprocess: Optional[Callable] = None,
custom_router: Optional[Callable] = None,
*args,
**kwargs,
):
super().__init__()
self.name = name
self.description = description
self.verbose = verbose
self.custom_params = custom_params
self.router = router
self.custom_preprocess = custom_preprocess
self.custom_postprocess = custom_postprocess
self.router = AutoSwarmRouter(
name=name,
description=description,
verbose=verbose,
custom_params=custom_params,
custom_preprocess=custom_preprocess,
custom_postprocess=custom_postprocess,
custom_router=custom_router,
*args,
**kwargs,
)
def run(self, task: str = None, *args, **kwargs):
"""Run the swarm simulation."""
try:
return self.router.run(task, *args, **kwargs)
except Exception as e:
logger.error(
f"Error: {e} try optimizing the inputs and try again."
)
raise e

@ -2,13 +2,21 @@ import asyncio
import json import json
from abc import ABC from abc import ABC
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Callable, Dict, List, Optional, Sequence from typing import (
Any,
Callable,
Dict,
List,
Optional,
Sequence,
)
import yaml import yaml
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from swarms.utils.loguru_logger import logger from swarms.utils.loguru_logger import logger
from swarms.structs.omni_agent_types import agent
class AbstractSwarm(ABC): class AbstractSwarm(ABC):
@ -178,17 +186,15 @@ class AbstractSwarm(ABC):
"""Step the swarm""" """Step the swarm"""
# @abstractmethod # @abstractmethod
def add_agent(self, agent: "Agent"): def add_agent(self, agent: agent):
"""Add a agent to the swarm""" """Add a agent to the swarm"""
# @abstractmethod # @abstractmethod
def remove_agent(self, agent: "Agent"): def remove_agent(self, agent: agent):
"""Remove a agent from the swarm""" """Remove a agent from the swarm"""
# @abstractmethod # @abstractmethod
def broadcast( def broadcast(self, message: str, sender: Optional[agent] = None):
self, message: str, sender: Optional["Agent"] = None
):
"""Broadcast a message to all agents""" """Broadcast a message to all agents"""
# @abstractmethod # @abstractmethod
@ -203,29 +209,29 @@ class AbstractSwarm(ABC):
def direct_message( def direct_message(
self, self,
message: str, message: str,
sender: "Agent", sender: agent,
recipient: "Agent", recipient: agent,
): ):
"""Send a direct message to a agent""" """Send a direct message to a agent"""
# @abstractmethod # @abstractmethod
def autoscaler(self, num_agents: int, agent: ["Agent"]): def autoscaler(self, num_agents: int, agent: [agent]):
"""Autoscaler that acts like kubernetes for autonomous agents""" """Autoscaler that acts like kubernetes for autonomous agents"""
# @abstractmethod # @abstractmethod
def get_agent_by_id(self, id: str) -> "Agent": def get_agent_by_id(self, id: str) -> agent:
"""Locate a agent by id""" """Locate a agent by id"""
# @abstractmethod # @abstractmethod
def get_agent_by_name(self, name: str) -> "Agent": def get_agent_by_name(self, name: str) -> agent:
"""Locate a agent by name""" """Locate a agent by name"""
# @abstractmethod # @abstractmethod
def assign_task(self, agent: "Agent", task: Any) -> Dict: def assign_task(self, agent: agent, task: Any) -> Dict:
"""Assign a task to a agent""" """Assign a task to a agent"""
# @abstractmethod # @abstractmethod
def get_all_tasks(self, agent: "Agent", task: Any): def get_all_tasks(self, agent: agent, task: Any):
"""Get all tasks""" """Get all tasks"""
# @abstractmethod # @abstractmethod
@ -237,19 +243,19 @@ class AbstractSwarm(ABC):
"""Get all pending tasks""" """Get all pending tasks"""
# @abstractmethod # @abstractmethod
def pause_agent(self, agent: "Agent", agent_id: str): def pause_agent(self, agent: agent, agent_id: str):
"""Pause a agent""" """Pause a agent"""
# @abstractmethod # @abstractmethod
def resume_agent(self, agent: "Agent", agent_id: str): def resume_agent(self, agent: agent, agent_id: str):
"""Resume a agent""" """Resume a agent"""
# @abstractmethod # @abstractmethod
def stop_agent(self, agent: "Agent", agent_id: str): def stop_agent(self, agent: agent, agent_id: str):
"""Stop a agent""" """Stop a agent"""
# @abstractmethod # @abstractmethod
def restart_agent(self, agent: "Agent"): def restart_agent(self, agent: agent):
"""Restart agent""" """Restart agent"""
# @abstractmethod # @abstractmethod
@ -265,7 +271,7 @@ class AbstractSwarm(ABC):
"""Scale to a specific number of agents""" """Scale to a specific number of agents"""
# @abstractmethod # @abstractmethod
def get_all_agents(self) -> List["Agent"]: def get_all_agents(self) -> List[agent]:
"""Get all agents""" """Get all agents"""
# @abstractmethod # @abstractmethod
@ -454,18 +460,6 @@ class AbstractSwarm(ABC):
"""Remove an llm from the god mode""" """Remove an llm from the god mode"""
self.agents.remove(agent) self.agents.remove(agent)
# def add_agent(self, agent: Agent = None, *args, **kwargs):
# """Add an agent to the swarm
# Args:
# agent (Agent, optional): _description_. Defaults to None.
# Returns:
# _type_: _description_
# """
# self.agents.append(agent)
# return agent
def run_all(self, task: str = None, *args, **kwargs): def run_all(self, task: str = None, *args, **kwargs):
"""Run all agents """Run all agents

@ -0,0 +1,15 @@
from typing import (
Any,
Callable,
Sequence,
Union,
)
from swarms.models.base_llm import AbstractLLM
from swarms.models.base_multimodal_model import BaseMultiModalModel
from swarms.structs.agent import Agent
# Unified type for agent
agent = Union[Agent, Callable, Any, AbstractLLM, BaseMultiModalModel]
# List of agents
agents = Sequence[agent]
Loading…
Cancel
Save