You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/swarms/structs/auto_swarm.py

230 lines
8.0 KiB

from typing import Any, Callable, Dict, Optional, Sequence
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import logger
class AutoSwarmRouter(BaseSwarm):
"""AutoSwarmRouter class represents a router for the AutoSwarm class.
This class is responsible for routing tasks to the appropriate swarm based on the provided name.
It allows customization of the preprocessing, routing, and postprocessing of tasks.
Attributes:
name (str): The name of the router.
description (str): The description of the router.
verbose (bool): Whether to enable verbose mode.
custom_params (dict): Custom parameters for the router.
swarms (list): A list of BaseSwarm objects.
custom_preprocess (callable): Custom preprocessing function for tasks.
custom_postprocess (callable): Custom postprocessing function for task results.
custom_router (callable): Custom routing function for tasks.
Methods:
run(task: str = None, *args, **kwargs) -> Any:
Run the swarm simulation and route the task to the appropriate swarm.
Flow:
name -> router -> swarm entry point
"""
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
verbose: bool = False,
custom_params: Optional[Dict[str, Any]] = None,
swarms: Sequence[BaseSwarm] = None,
custom_preprocess: Optional[Callable] = None,
custom_postprocess: Optional[Callable] = None,
custom_router: Optional[Callable] = None,
*args,
**kwargs,
):
super().__init__(
name=name, description=description, *args, **kwargs
)
self.name = name
self.description = description
self.verbose = verbose
self.custom_params = custom_params
self.swarms = swarms
self.custom_preprocess = custom_preprocess
self.custom_postprocess = custom_postprocess
self.custom_router = custom_router
# Create a dictionary of swarms
self.swarm_dict = {swarm.name: swarm for swarm in self.swarms}
logger.info(
f"AutoSwarmRouter has been initialized with {self.len_of_swarms()} swarms."
)
def run(self, task: str = None, *args, **kwargs):
try:
"""Run the swarm simulation and route the task to the appropriate swarm."""
if self.custom_preprocess:
# If custom preprocess function is provided then run it
logger.info("Running custom preprocess function.")
task, args, kwargs = self.custom_preprocess(
task, args, kwargs
)
if self.custom_router:
# If custom router function is provided then use it to route the task
logger.info("Running custom router function.")
out = self.custom_router(self, task, *args, **kwargs)
if self.custom_postprocess:
# If custom postprocess function is provided then run it
out = self.custom_postprocess(out)
return out
if self.name in self.swarm_dict:
# If a match is found then send the task to the swarm
out = self.swarm_dict[self.name].run(
task, *args, **kwargs
)
if self.custom_postprocess:
# If custom postprocess function is provided then run it
out = self.custom_postprocess(out)
return out
# If no match is found then return None
raise ValueError(
f"Swarm with name {self.name} not found."
)
except Exception as e:
logger.error(f"Error: {e}")
raise e
def len_of_swarms(self):
return print(len(self.swarms))
def list_available_swarms(self):
for swarm in self.swarms:
try:
logger.info(
f"Swarm Name: {swarm.name} || Swarm Description: {swarm.description} "
)
except Exception as error:
logger.error(
f"Error Detected You may not have swarms available: {error}"
)
raise error
class AutoSwarm(BaseSwarm):
"""AutoSwarm class represents a swarm of agents that can be created automatically.
Flow:
name -> router -> swarm entry point
Args:
name (Optional[str]): The name of the swarm. Defaults to None.
description (Optional[str]): The description of the swarm. Defaults to None.
verbose (bool): Whether to enable verbose mode. Defaults to False.
custom_params (Optional[Dict[str, Any]]): Custom parameters for the swarm. Defaults to None.
router (Optional[AutoSwarmRouter]): The router for the swarm. Defaults to None.
"""
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
verbose: bool = False,
custom_params: Optional[Dict[str, Any]] = None,
custom_preprocess: Optional[Callable] = None,
custom_postprocess: Optional[Callable] = None,
custom_router: Optional[Callable] = None,
max_loops: int = 1,
*args,
**kwargs,
):
super().__init__()
self.name = name
self.description = description
self.verbose = verbose
self.custom_params = custom_params
self.custom_preprocess = custom_preprocess
self.custom_postprocess = custom_postprocess
self.custom_router = custom_router
self.max_loops = max_loops
self.router = AutoSwarmRouter(
name=name,
description=description,
verbose=verbose,
custom_params=custom_params,
custom_preprocess=custom_preprocess,
custom_postprocess=custom_postprocess,
custom_router=custom_router,
*args,
**kwargs,
)
if name is None:
raise ValueError(
"A name must be provided for the AutoSwarm, what swarm do you want to use?"
)
if verbose is True:
self.init_logging()
def init_logging(self):
logger.info("AutoSwarm has been activated. Ready for usage.")
# def name_swarm_check(self, name: str = None):
def run(self, task: str = None, *args, **kwargs):
"""Run the swarm simulation."""
try:
loop = 0
while loop < self.max_loops:
if self.custom_preprocess:
# If custom preprocess function is provided then run it
logger.info("Running custom preprocess function.")
task, args, kwargs = self.custom_preprocess(
task, args, kwargs
)
if self.custom_router:
# If custom router function is provided then use it to route the task
logger.info("Running custom router function.")
out = self.custom_router(
self, task, *args, **kwargs
)
else:
out = self.router.run(task, *args, **kwargs)
if self.custom_postprocess:
# If custom postprocess function is provided then run it
out = self.custom_postprocess(out)
# LOOP
loop += 1
return out
except Exception as e:
logger.error(
f"Error: {e} try optimizing the inputs and try again."
)
raise e
def list_all_swarms(self):
for swarm in self.swarms:
try:
logger.info(
f"Swarm Name: {swarm.name} || Swarm Description: {swarm.description} "
)
except Exception as error:
logger.error(
f"Error Detected You may not have swarms available: {error}"
)
raise error