From 50d646d069eb50e5c44456569cb07c5f667ca831 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 3 Apr 2025 14:51:12 +0800 Subject: [PATCH] cleanup --- cleanup_and_publish.sh | 5 +- swarms/structs/__init__.py | 4 +- swarms/structs/auto_swarm.py | 229 ----------------------------------- 3 files changed, 4 insertions(+), 234 deletions(-) delete mode 100644 swarms/structs/auto_swarm.py diff --git a/cleanup_and_publish.sh b/cleanup_and_publish.sh index 1212e4ef..2baca547 100755 --- a/cleanup_and_publish.sh +++ b/cleanup_and_publish.sh @@ -13,9 +13,10 @@ poetry build && echo "✅ Build successful!" || echo "❌ Build failed!" echo "📦 Publishing to PyPI..." poetry publish && echo "✅ Package published!" || echo "❌ Publishing failed!" -echo "📝 Committing changes to Git..." +echo "📝 Enter your commit message:" +read commit_message git add . && echo "✅ Changes staged!" -git commit -m "Update and publish" && echo "✅ Changes committed!" +git commit -m "$commit_message" && echo "✅ Changes committed!" echo "🚀 Pushing to remote repository..." git push && echo "✅ Changes pushed to remote!" diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 00f49cf2..5e8db7de 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -2,7 +2,7 @@ from swarms.structs.agent import Agent from swarms.structs.agent_builder import AgentsBuilder from swarms.structs.agents_available import showcase_available_agents from swarms.structs.async_workflow import AsyncWorkflow -from swarms.structs.auto_swarm import AutoSwarm, AutoSwarmRouter +from experimental.auto_swarm import AutoSwarm, AutoSwarmRouter from swarms.structs.base_structure import BaseStructure from swarms.structs.base_swarm import BaseSwarm from swarms.structs.base_workflow import BaseWorkflow @@ -85,8 +85,6 @@ from swarms.structs.swarming_architectures import ( __all__ = [ "Agent", "AsyncWorkflow", - "AutoSwarm", - "AutoSwarmRouter", "BaseStructure", "BaseSwarm", "BaseWorkflow", diff --git a/swarms/structs/auto_swarm.py b/swarms/structs/auto_swarm.py deleted file mode 100644 index 6809bce5..00000000 --- a/swarms/structs/auto_swarm.py +++ /dev/null @@ -1,229 +0,0 @@ -from typing import Any, Callable, Dict, Optional, Sequence - -from swarms.structs.base_swarm import BaseSwarm -from swarms.utils.loguru_logger import logger - - -class AutoSwarmRouter(BaseSwarm): - """AutoSwarmRouter class represents a router for the AutoSwarm class. - - This class is responsible for routing tasks to the appropriate swarm based on the provided name. - It allows customization of the preprocessing, routing, and postprocessing of tasks. - - Attributes: - name (str): The name of the router. - description (str): The description of the router. - verbose (bool): Whether to enable verbose mode. - custom_params (dict): Custom parameters for the router. - swarms (list): A list of BaseSwarm objects. - custom_preprocess (callable): Custom preprocessing function for tasks. - custom_postprocess (callable): Custom postprocessing function for task results. - custom_router (callable): Custom routing function for tasks. - - Methods: - run(task: str = None, *args, **kwargs) -> Any: - Run the swarm simulation and route the task to the appropriate swarm. - - Flow: - name -> router -> swarm entry point - """ - - def __init__( - self, - name: Optional[str] = None, - description: Optional[str] = None, - verbose: bool = False, - custom_params: Optional[Dict[str, Any]] = None, - swarms: Sequence[BaseSwarm] = None, - custom_preprocess: Optional[Callable] = None, - custom_postprocess: Optional[Callable] = None, - custom_router: Optional[Callable] = None, - *args, - **kwargs, - ): - super().__init__( - name=name, description=description, *args, **kwargs - ) - self.name = name - self.description = description - self.verbose = verbose - self.custom_params = custom_params - self.swarms = swarms - self.custom_preprocess = custom_preprocess - self.custom_postprocess = custom_postprocess - self.custom_router = custom_router - - # Create a dictionary of swarms - self.swarm_dict = {swarm.name: swarm for swarm in self.swarms} - - logger.info( - f"AutoSwarmRouter has been initialized with {self.len_of_swarms()} swarms." - ) - - def run(self, task: str = None, *args, **kwargs): - try: - """Run the swarm simulation and route the task to the appropriate swarm.""" - - if self.custom_preprocess: - # If custom preprocess function is provided then run it - logger.info("Running custom preprocess function.") - task, args, kwargs = self.custom_preprocess( - task, args, kwargs - ) - - if self.custom_router: - # If custom router function is provided then use it to route the task - logger.info("Running custom router function.") - out = self.custom_router(self, task, *args, **kwargs) - - if self.custom_postprocess: - # If custom postprocess function is provided then run it - out = self.custom_postprocess(out) - - return out - - if self.name in self.swarm_dict: - # If a match is found then send the task to the swarm - out = self.swarm_dict[self.name].run( - task, *args, **kwargs - ) - - if self.custom_postprocess: - # If custom postprocess function is provided then run it - out = self.custom_postprocess(out) - - return out - - # If no match is found then return None - raise ValueError( - f"Swarm with name {self.name} not found." - ) - except Exception as e: - logger.error(f"Error: {e}") - raise e - - def len_of_swarms(self): - return print(len(self.swarms)) - - def list_available_swarms(self): - for swarm in self.swarms: - try: - logger.info( - f"Swarm Name: {swarm.name} || Swarm Description: {swarm.description} " - ) - except Exception as error: - logger.error( - f"Error Detected You may not have swarms available: {error}" - ) - raise error - - -class AutoSwarm(BaseSwarm): - """AutoSwarm class represents a swarm of agents that can be created automatically. - - Flow: - name -> router -> swarm entry point - - Args: - name (Optional[str]): The name of the swarm. Defaults to None. - description (Optional[str]): The description of the swarm. Defaults to None. - verbose (bool): Whether to enable verbose mode. Defaults to False. - custom_params (Optional[Dict[str, Any]]): Custom parameters for the swarm. Defaults to None. - router (Optional[AutoSwarmRouter]): The router for the swarm. Defaults to None. - """ - - def __init__( - self, - name: Optional[str] = None, - description: Optional[str] = None, - verbose: bool = False, - custom_params: Optional[Dict[str, Any]] = None, - custom_preprocess: Optional[Callable] = None, - custom_postprocess: Optional[Callable] = None, - custom_router: Optional[Callable] = None, - max_loops: int = 1, - *args, - **kwargs, - ): - super().__init__() - self.name = name - self.description = description - self.verbose = verbose - self.custom_params = custom_params - self.custom_preprocess = custom_preprocess - self.custom_postprocess = custom_postprocess - self.custom_router = custom_router - self.max_loops = max_loops - self.router = AutoSwarmRouter( - name=name, - description=description, - verbose=verbose, - custom_params=custom_params, - custom_preprocess=custom_preprocess, - custom_postprocess=custom_postprocess, - custom_router=custom_router, - *args, - **kwargs, - ) - - if name is None: - raise ValueError( - "A name must be provided for the AutoSwarm, what swarm do you want to use?" - ) - - if verbose is True: - self.init_logging() - - def init_logging(self): - logger.info("AutoSwarm has been activated. Ready for usage.") - - # def name_swarm_check(self, name: str = None): - - def run(self, task: str = None, *args, **kwargs): - """Run the swarm simulation.""" - try: - loop = 0 - - while loop < self.max_loops: - if self.custom_preprocess: - # If custom preprocess function is provided then run it - logger.info("Running custom preprocess function.") - task, args, kwargs = self.custom_preprocess( - task, args, kwargs - ) - - if self.custom_router: - # If custom router function is provided then use it to route the task - logger.info("Running custom router function.") - out = self.custom_router( - self, task, *args, **kwargs - ) - - else: - out = self.router.run(task, *args, **kwargs) - - if self.custom_postprocess: - # If custom postprocess function is provided then run it - out = self.custom_postprocess(out) - - # LOOP - loop += 1 - - return out - except Exception as e: - logger.error( - f"Error: {e} try optimizing the inputs and try again." - ) - raise e - - def list_all_swarms(self): - for swarm in self.swarms: - try: - logger.info( - f"Swarm Name: {swarm.name} || Swarm Description: {swarm.description} " - ) - except Exception as error: - logger.error( - f"Error Detected You may not have swarms available: {error}" - ) - raise error