@ -3,7 +3,6 @@ import time
from typing import Callable , List , Optional , Union
from swarms . structs . agent import Agent
from swarms . structs . base_swarm import BaseSwarm
from swarms . structs . conversation import Conversation
from swarms . structs . swarm_id import swarm_id
from swarms . utils . formatter import formatter
@ -16,12 +15,59 @@ from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger ( log_folder = " concurrent_workflow " )
class ConcurrentWorkflow ( BaseSwarm ) :
""" Concurrent workflow for running multiple agents simultaneously. """
class ConcurrentWorkflow :
"""
A concurrent workflow system for running multiple agents simultaneously .
This class provides a framework for executing multiple agents concurrently on the same task ,
with optional dashboard monitoring , streaming callbacks , and various output formatting options .
It uses ThreadPoolExecutor to manage concurrent execution and provides real - time status
tracking for each agent .
Attributes :
id ( str ) : Unique identifier for the workflow instance
name ( str ) : Human - readable name for the workflow
description ( str ) : Description of the workflow ' s purpose
agents ( List [ Union [ Agent , Callable ] ] ) : List of agents to execute concurrently
auto_save ( bool ) : Whether to automatically save workflow metadata
output_type ( str ) : Format for output formatting ( e . g . , " dict-all-except-first " )
max_loops ( int ) : Maximum number of execution loops ( currently unused )
auto_generate_prompts ( bool ) : Whether to enable automatic prompt engineering
show_dashboard ( bool ) : Whether to display real - time dashboard during execution
agent_statuses ( dict ) : Dictionary tracking status and output of each agent
metadata_output_path ( str ) : Path for saving workflow metadata
conversation ( Conversation ) : Conversation object for storing agent interactions
Methods :
run : Execute all agents concurrently on a given task
batch_run : Execute workflow on multiple tasks sequentially
run_with_dashboard : Execute agents with real - time dashboard monitoring
cleanup : Clean up resources and connections
fix_agents : Configure agents for dashboard mode
reliability_check : Validate workflow configuration
activate_auto_prompt_engineering : Enable automatic prompt engineering
display_agent_dashboard : Display real - time dashboard
Example :
>> > from swarms import Agent , ConcurrentWorkflow
>> >
>> > # Create agents
>> > agent1 = Agent ( llm = llm , agent_name = " Agent1 " )
>> > agent2 = Agent ( llm = llm , agent_name = " Agent2 " )
>> >
>> > # Create workflow
>> > workflow = ConcurrentWorkflow (
. . . agents = [ agent1 , agent2 ] ,
. . . show_dashboard = True
. . . )
>> >
>> > # Run workflow
>> > result = workflow . run ( " Analyze this data " )
"""
def __init__ (
self ,
id : str = swarm_id ( ) ,
id : str = None ,
name : str = " ConcurrentWorkflow " ,
description : str = " Execution of multiple agents concurrently " ,
agents : List [ Union [ Agent , Callable ] ] = None ,
@ -30,47 +76,63 @@ class ConcurrentWorkflow(BaseSwarm):
max_loops : int = 1 ,
auto_generate_prompts : bool = False ,
show_dashboard : bool = False ,
* args ,
* * kwargs ,
) :
super ( ) . __init__ (
name = name ,
description = description ,
agents = agents ,
* args ,
* * kwargs ,
)
self . id = id if id is not None else swarm_id ( )
self . name = name
self . description = description
self . agents = agents
self . metadata_output_path = (
f " concurrent_workflow_name_ { name } _id_ { id } .json "
)
self . auto_save = auto_save
self . max_loops = max_loops
self . auto_generate_prompts = auto_generate_prompts
self . output_type = output_type
self . show_dashboard = show_dashboard
self . agent_statuses = {
agent . agent_name : { " status " : " pending " , " output " : " " }
for agent in agents
}
self . metadata_output_path = (
f " concurrent_workflow_name_ { name } _id_ { self . id } .json "
)
# Initialize agent statuses if agents are provided
if agents is not None :
self . agent_statuses = {
agent . agent_name : { " status " : " pending " , " output " : " " }
for agent in agents
}
else :
self . agent_statuses = { }
self . reliability_check ( )
self . conversation = Conversation ( )
self . conversation = Conversation ( name = f " concurrent_workflow_name_ { name } _id_ { self . id } _conversation " )
if self . show_dashboard is True :
self . agents = self . fix_agents ( )
def fix_agents ( self ) :
""" Configure agents for dashboard mode. """
"""
Configure agents for dashboard mode .
Disables printing for all agents when dashboard mode is enabled to prevent
console output conflicts with the dashboard display .
Returns :
List [ Union [ Agent , Callable ] ] : The configured list of agents .
"""
if self . show_dashboard is True :
for agent in self . agents :
agent . print_on = False
return self . agents
def reliability_check ( self ) :
""" Validate workflow configuration. """
"""
Validate workflow configuration .
Performs various validation checks to ensure the workflow is properly configured :
- Checks that agents are provided
- Validates that agents list is not empty
- Warns if only one agent is provided ( concurrent execution not beneficial )
Raises :
ValueError : If no agents are provided or agents list is empty .
Exception : If any other validation error occurs .
"""
try :
if self . agents is None :
raise ValueError (
@ -93,7 +155,13 @@ class ConcurrentWorkflow(BaseSwarm):
raise
def activate_auto_prompt_engineering ( self ) :
""" Enable automatic prompt engineering. """
"""
Enable automatic prompt engineering for all agents .
When enabled , this method activates automatic prompt engineering capabilities
for all agents in the workflow , allowing them to generate and optimize
their own prompts dynamically .
"""
if self . auto_generate_prompts is True :
for agent in self . agents :
agent . auto_generate_prompt = True
@ -103,7 +171,16 @@ class ConcurrentWorkflow(BaseSwarm):
title : str = " ConcurrentWorkflow Dashboard " ,
is_final : bool = False ,
) :
""" Display real-time dashboard. """
"""
Display real - time dashboard showing agent status and outputs .
Creates and displays a dashboard showing the current status and output
of each agent in the workflow . This is used for monitoring concurrent execution .
Args :
title ( str ) : Title to display for the dashboard . Defaults to " ConcurrentWorkflow Dashboard " .
is_final ( bool ) : Whether this is the final dashboard display . Defaults to False .
"""
agents_data = [
{
" name " : agent . agent_name ,
@ -127,7 +204,23 @@ class ConcurrentWorkflow(BaseSwarm):
Callable [ [ str , str , bool ] , None ]
] = None ,
) :
""" Execute agents with dashboard monitoring. """
"""
Execute agents with dashboard monitoring .
Runs all agents concurrently while displaying a real - time dashboard that shows
the status and output of each agent . This method provides visual feedback during
execution and supports streaming callbacks for real - time updates .
Args :
task ( str ) : The task to be executed by all agents .
img ( Optional [ str ] ) : Single image path for agents that support image input .
imgs ( Optional [ List [ str ] ] ) : List of image paths for agents that support multiple images .
streaming_callback ( Optional [ Callable [ [ str , str , bool ] , None ] ] ) : Callback function for streaming updates .
Called with ( agent_name , chunk , is_final ) parameters .
Returns :
Union [ Dict , List , str ] : Formatted conversation history based on output_type .
"""
try :
self . conversation . add ( role = " User " , content = task )
@ -271,7 +364,22 @@ class ConcurrentWorkflow(BaseSwarm):
Callable [ [ str , str , bool ] , None ]
] = None ,
) :
""" Execute agents concurrently without dashboard. """
"""
Execute agents concurrently without dashboard .
Internal method that runs all agents concurrently using ThreadPoolExecutor
without displaying the dashboard . This is the core execution logic used when
dashboard mode is disabled .
Args :
task ( str ) : The task to be executed by all agents .
img ( Optional [ str ] ) : Single image path for agents that support image input .
imgs ( Optional [ List [ str ] ] ) : List of image paths for agents that support multiple images .
streaming_callback ( Optional [ Callable [ [ str , str , bool ] , None ] ] ) : Callback function for streaming updates .
Returns :
Union [ Dict , List , str ] : Formatted conversation history based on output_type .
"""
self . conversation . add ( role = " User " , content = task )
max_workers = int ( get_cpu_cores ( ) * 0.95 )
@ -314,7 +422,25 @@ class ConcurrentWorkflow(BaseSwarm):
Callable [ [ str , str , bool ] , None ]
] = None ,
) :
""" Run single agent with streaming support. """
"""
Run single agent with streaming support .
Executes a single agent with optional streaming callback support .
Handles errors gracefully and ensures completion callbacks are called .
Args :
agent ( Union [ Agent , Callable ] ) : The agent to execute .
task ( str ) : The task to be executed by the agent .
img ( Optional [ str ] ) : Single image path for agents that support image input .
imgs ( Optional [ List [ str ] ] ) : List of image paths for agents that support multiple images .
streaming_callback ( Optional [ Callable [ [ str , str , bool ] , None ] ] ) : Callback function for streaming updates .
Returns :
str : The output from the agent .
Raises :
Exception : If the agent execution fails .
"""
if streaming_callback is None :
return agent . run ( task = task , img = img , imgs = imgs )
@ -358,7 +484,16 @@ class ConcurrentWorkflow(BaseSwarm):
raise
def cleanup ( self ) :
""" Clean up resources and connections. """
"""
Clean up resources and connections .
Performs cleanup operations including :
- Calling cleanup methods on all agents if available
- Resetting agent statuses
- Preserving conversation history for result formatting
This method is called automatically after each run to ensure proper resource management .
"""
try :
# Reset agent statuses
for agent in self . agents :
@ -387,7 +522,27 @@ class ConcurrentWorkflow(BaseSwarm):
Callable [ [ str , str , bool ] , None ]
] = None ,
) :
""" Execute all agents concurrently. """
"""
Execute all agents concurrently .
Main entry point for running the concurrent workflow . Executes all agents
simultaneously on the given task , with optional dashboard monitoring and
streaming callbacks . Automatically cleans up resources after execution .
Args :
task ( str ) : The task to be executed by all agents .
img ( Optional [ str ] ) : Single image path for agents that support image input .
imgs ( Optional [ List [ str ] ] ) : List of image paths for agents that support multiple images .
streaming_callback ( Optional [ Callable [ [ str , str , bool ] , None ] ] ) : Callback function for streaming updates .
Called with ( agent_name , chunk , is_final ) parameters .
Returns :
Union [ Dict , List , str ] : Formatted conversation history based on output_type .
Example :
>> > workflow = ConcurrentWorkflow ( agents = [ agent1 , agent2 ] )
>> > result = workflow . run ( " Analyze this data " )
"""
try :
if self . show_dashboard :
result = self . run_with_dashboard (
@ -410,7 +565,25 @@ class ConcurrentWorkflow(BaseSwarm):
Callable [ [ str , str , bool ] , None ]
] = None ,
) :
""" Execute workflow on multiple tasks sequentially. """
"""
Execute workflow on multiple tasks sequentially .
Runs the concurrent workflow on multiple tasks one after another .
Each task is executed with all agents running concurrently , but the tasks
themselves are processed sequentially .
Args :
tasks ( List [ str ] ) : List of tasks to be executed .
imgs ( Optional [ List [ str ] ] ) : List of image paths corresponding to each task .
streaming_callback ( Optional [ Callable [ [ str , str , bool ] , None ] ] ) : Callback function for streaming updates .
Returns :
List [ Union [ Dict , List , str ] ] : List of results for each task .
Example :
>> > workflow = ConcurrentWorkflow ( agents = [ agent1 , agent2 ] )
>> > results = workflow . batch_run ( [ " Task 1 " , " Task 2 " , " Task 3 " ] )
"""
results = [ ]
for idx , task in enumerate ( tasks ) :
img = None