From da7f5194a33c2d03d7b5df33470befd6286c6e3c Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Fri, 17 Oct 2025 13:52:22 -0700 Subject: [PATCH] [IMPROVEMENT][Remove BaseSwarm from ConcurrentWorkflow] [Decoupled inheritance and added docs] --- README.md | 1 - .../enhanced_collaboration_example.py | 2 - pyproject.toml | 2 +- swarms/structs/concurrent_workflow.py | 235 +++++++++++++++--- 4 files changed, 205 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 43fbc130..dd3de49d 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,6 @@ GROQ_API_KEY="" ``` - ### 🤖 Your First Agent An **Agent** is the fundamental building block of a swarm—an autonomous entity powered by an LLM + Tools + Memory. [Learn more Here](https://docs.swarms.world/en/latest/swarms/structs/agent/) diff --git a/examples/multi_agent/interactive_groupchat_examples/enhanced_collaboration_example.py b/examples/multi_agent/interactive_groupchat_examples/enhanced_collaboration_example.py index 1858372f..558b4800 100644 --- a/examples/multi_agent/interactive_groupchat_examples/enhanced_collaboration_example.py +++ b/examples/multi_agent/interactive_groupchat_examples/enhanced_collaboration_example.py @@ -1,5 +1,3 @@ - - from swarms import Agent from swarms.structs.interactive_groupchat import ( InteractiveGroupChat, diff --git a/pyproject.toml b/pyproject.toml index b05c488d..a2d1d5c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "8.5.0" +version = "8.5.1" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index d11e7def..286dccb6 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -3,7 +3,6 @@ import time from typing import Callable, List, Optional, Union from swarms.structs.agent import Agent -from swarms.structs.base_swarm import BaseSwarm from swarms.structs.conversation import Conversation from swarms.structs.swarm_id import swarm_id from swarms.utils.formatter import formatter @@ -16,12 +15,59 @@ from swarms.utils.loguru_logger import initialize_logger logger = initialize_logger(log_folder="concurrent_workflow") -class ConcurrentWorkflow(BaseSwarm): - """Concurrent workflow for running multiple agents simultaneously.""" +class ConcurrentWorkflow: + """ + A concurrent workflow system for running multiple agents simultaneously. + + This class provides a framework for executing multiple agents concurrently on the same task, + with optional dashboard monitoring, streaming callbacks, and various output formatting options. + It uses ThreadPoolExecutor to manage concurrent execution and provides real-time status + tracking for each agent. + + Attributes: + id (str): Unique identifier for the workflow instance + name (str): Human-readable name for the workflow + description (str): Description of the workflow's purpose + agents (List[Union[Agent, Callable]]): List of agents to execute concurrently + auto_save (bool): Whether to automatically save workflow metadata + output_type (str): Format for output formatting (e.g., "dict-all-except-first") + max_loops (int): Maximum number of execution loops (currently unused) + auto_generate_prompts (bool): Whether to enable automatic prompt engineering + show_dashboard (bool): Whether to display real-time dashboard during execution + agent_statuses (dict): Dictionary tracking status and output of each agent + metadata_output_path (str): Path for saving workflow metadata + conversation (Conversation): Conversation object for storing agent interactions + + Methods: + run: Execute all agents concurrently on a given task + batch_run: Execute workflow on multiple tasks sequentially + run_with_dashboard: Execute agents with real-time dashboard monitoring + cleanup: Clean up resources and connections + fix_agents: Configure agents for dashboard mode + reliability_check: Validate workflow configuration + activate_auto_prompt_engineering: Enable automatic prompt engineering + display_agent_dashboard: Display real-time dashboard + + Example: + >>> from swarms import Agent, ConcurrentWorkflow + >>> + >>> # Create agents + >>> agent1 = Agent(llm=llm, agent_name="Agent1") + >>> agent2 = Agent(llm=llm, agent_name="Agent2") + >>> + >>> # Create workflow + >>> workflow = ConcurrentWorkflow( + ... agents=[agent1, agent2], + ... show_dashboard=True + ... ) + >>> + >>> # Run workflow + >>> result = workflow.run("Analyze this data") + """ def __init__( self, - id: str = swarm_id(), + id: str = None, name: str = "ConcurrentWorkflow", description: str = "Execution of multiple agents concurrently", agents: List[Union[Agent, Callable]] = None, @@ -30,47 +76,63 @@ class ConcurrentWorkflow(BaseSwarm): max_loops: int = 1, auto_generate_prompts: bool = False, show_dashboard: bool = False, - *args, - **kwargs, ): - super().__init__( - name=name, - description=description, - agents=agents, - *args, - **kwargs, - ) + self.id = id if id is not None else swarm_id() self.name = name self.description = description self.agents = agents - self.metadata_output_path = ( - f"concurrent_workflow_name_{name}_id_{id}.json" - ) self.auto_save = auto_save self.max_loops = max_loops self.auto_generate_prompts = auto_generate_prompts self.output_type = output_type self.show_dashboard = show_dashboard - self.agent_statuses = { - agent.agent_name: {"status": "pending", "output": ""} - for agent in agents - } + self.metadata_output_path = ( + f"concurrent_workflow_name_{name}_id_{self.id}.json" + ) + + # Initialize agent statuses if agents are provided + if agents is not None: + self.agent_statuses = { + agent.agent_name: {"status": "pending", "output": ""} + for agent in agents + } + else: + self.agent_statuses = {} self.reliability_check() - self.conversation = Conversation() + self.conversation = Conversation(name=f"concurrent_workflow_name_{name}_id_{self.id}_conversation") if self.show_dashboard is True: self.agents = self.fix_agents() def fix_agents(self): - """Configure agents for dashboard mode.""" + """ + Configure agents for dashboard mode. + + Disables printing for all agents when dashboard mode is enabled to prevent + console output conflicts with the dashboard display. + + Returns: + List[Union[Agent, Callable]]: The configured list of agents. + """ if self.show_dashboard is True: for agent in self.agents: agent.print_on = False return self.agents def reliability_check(self): - """Validate workflow configuration.""" + """ + Validate workflow configuration. + + Performs various validation checks to ensure the workflow is properly configured: + - Checks that agents are provided + - Validates that agents list is not empty + - Warns if only one agent is provided (concurrent execution not beneficial) + + Raises: + ValueError: If no agents are provided or agents list is empty. + Exception: If any other validation error occurs. + """ try: if self.agents is None: raise ValueError( @@ -93,7 +155,13 @@ class ConcurrentWorkflow(BaseSwarm): raise def activate_auto_prompt_engineering(self): - """Enable automatic prompt engineering.""" + """ + Enable automatic prompt engineering for all agents. + + When enabled, this method activates automatic prompt engineering capabilities + for all agents in the workflow, allowing them to generate and optimize + their own prompts dynamically. + """ if self.auto_generate_prompts is True: for agent in self.agents: agent.auto_generate_prompt = True @@ -103,7 +171,16 @@ class ConcurrentWorkflow(BaseSwarm): title: str = "ConcurrentWorkflow Dashboard", is_final: bool = False, ): - """Display real-time dashboard.""" + """ + Display real-time dashboard showing agent status and outputs. + + Creates and displays a dashboard showing the current status and output + of each agent in the workflow. This is used for monitoring concurrent execution. + + Args: + title (str): Title to display for the dashboard. Defaults to "ConcurrentWorkflow Dashboard". + is_final (bool): Whether this is the final dashboard display. Defaults to False. + """ agents_data = [ { "name": agent.agent_name, @@ -127,7 +204,23 @@ class ConcurrentWorkflow(BaseSwarm): Callable[[str, str, bool], None] ] = None, ): - """Execute agents with dashboard monitoring.""" + """ + Execute agents with dashboard monitoring. + + Runs all agents concurrently while displaying a real-time dashboard that shows + the status and output of each agent. This method provides visual feedback during + execution and supports streaming callbacks for real-time updates. + + Args: + task (str): The task to be executed by all agents. + img (Optional[str]): Single image path for agents that support image input. + imgs (Optional[List[str]]): List of image paths for agents that support multiple images. + streaming_callback (Optional[Callable[[str, str, bool], None]]): Callback function for streaming updates. + Called with (agent_name, chunk, is_final) parameters. + + Returns: + Union[Dict, List, str]: Formatted conversation history based on output_type. + """ try: self.conversation.add(role="User", content=task) @@ -271,7 +364,22 @@ class ConcurrentWorkflow(BaseSwarm): Callable[[str, str, bool], None] ] = None, ): - """Execute agents concurrently without dashboard.""" + """ + Execute agents concurrently without dashboard. + + Internal method that runs all agents concurrently using ThreadPoolExecutor + without displaying the dashboard. This is the core execution logic used when + dashboard mode is disabled. + + Args: + task (str): The task to be executed by all agents. + img (Optional[str]): Single image path for agents that support image input. + imgs (Optional[List[str]]): List of image paths for agents that support multiple images. + streaming_callback (Optional[Callable[[str, str, bool], None]]): Callback function for streaming updates. + + Returns: + Union[Dict, List, str]: Formatted conversation history based on output_type. + """ self.conversation.add(role="User", content=task) max_workers = int(get_cpu_cores() * 0.95) @@ -314,7 +422,25 @@ class ConcurrentWorkflow(BaseSwarm): Callable[[str, str, bool], None] ] = None, ): - """Run single agent with streaming support.""" + """ + Run single agent with streaming support. + + Executes a single agent with optional streaming callback support. + Handles errors gracefully and ensures completion callbacks are called. + + Args: + agent (Union[Agent, Callable]): The agent to execute. + task (str): The task to be executed by the agent. + img (Optional[str]): Single image path for agents that support image input. + imgs (Optional[List[str]]): List of image paths for agents that support multiple images. + streaming_callback (Optional[Callable[[str, str, bool], None]]): Callback function for streaming updates. + + Returns: + str: The output from the agent. + + Raises: + Exception: If the agent execution fails. + """ if streaming_callback is None: return agent.run(task=task, img=img, imgs=imgs) @@ -358,7 +484,16 @@ class ConcurrentWorkflow(BaseSwarm): raise def cleanup(self): - """Clean up resources and connections.""" + """ + Clean up resources and connections. + + Performs cleanup operations including: + - Calling cleanup methods on all agents if available + - Resetting agent statuses + - Preserving conversation history for result formatting + + This method is called automatically after each run to ensure proper resource management. + """ try: # Reset agent statuses for agent in self.agents: @@ -387,7 +522,27 @@ class ConcurrentWorkflow(BaseSwarm): Callable[[str, str, bool], None] ] = None, ): - """Execute all agents concurrently.""" + """ + Execute all agents concurrently. + + Main entry point for running the concurrent workflow. Executes all agents + simultaneously on the given task, with optional dashboard monitoring and + streaming callbacks. Automatically cleans up resources after execution. + + Args: + task (str): The task to be executed by all agents. + img (Optional[str]): Single image path for agents that support image input. + imgs (Optional[List[str]]): List of image paths for agents that support multiple images. + streaming_callback (Optional[Callable[[str, str, bool], None]]): Callback function for streaming updates. + Called with (agent_name, chunk, is_final) parameters. + + Returns: + Union[Dict, List, str]: Formatted conversation history based on output_type. + + Example: + >>> workflow = ConcurrentWorkflow(agents=[agent1, agent2]) + >>> result = workflow.run("Analyze this data") + """ try: if self.show_dashboard: result = self.run_with_dashboard( @@ -410,7 +565,25 @@ class ConcurrentWorkflow(BaseSwarm): Callable[[str, str, bool], None] ] = None, ): - """Execute workflow on multiple tasks sequentially.""" + """ + Execute workflow on multiple tasks sequentially. + + Runs the concurrent workflow on multiple tasks one after another. + Each task is executed with all agents running concurrently, but the tasks + themselves are processed sequentially. + + Args: + tasks (List[str]): List of tasks to be executed. + imgs (Optional[List[str]]): List of image paths corresponding to each task. + streaming_callback (Optional[Callable[[str, str, bool], None]]): Callback function for streaming updates. + + Returns: + List[Union[Dict, List, str]]: List of results for each task. + + Example: + >>> workflow = ConcurrentWorkflow(agents=[agent1, agent2]) + >>> results = workflow.batch_run(["Task 1", "Task 2", "Task 3"]) + """ results = [] for idx, task in enumerate(tasks): img = None