@ -1,20 +1,21 @@
# ConcurrentWorkflow Documentation
# ConcurrentWorkflow Documentation
## Overview
The `ConcurrentWorkflow` class is designed to facilitate the concurrent execution of multiple agents, each tasked with solving a specific query or problem. This class is particularly useful in scenarios where multiple agents need to work in parallel, allowing for efficient resource utilization and faster completion of tasks. The workflow manages the execution, handles streaming callbacks, and provides optional dashboard monitoring for real-time progress tracking.
The `ConcurrentWorkflow` class is designed to facilitate the concurrent execution of multiple agents, each tasked with solving a specific query or problem. This class is particularly useful in scenarios where multiple agents need to work in parallel, allowing for efficient resource utilization and faster completion of tasks. The workflow manages the execution, collects metadata, and optionally saves the results in a structured format.
Full Path: `swarms.structs.concurrent_workflow`
### Key Features
### Key Features
- **Concurrent Execution** : Runs multiple agents simultaneously using Python's `ThreadPoolExecutor`
| Feature | Description |
- **Interactive Mode** : Supports interactive task modification and execution
|---------------------------|-----------------------------------------------------------------------------------------------|
- **Caching System** : Implements LRU caching for repeated prompts
| Concurrent Execution | Runs multiple agents simultaneously using Python's `ThreadPoolExecutor` |
- **Progress Tracking** : Optional progress bar for task execution
| Dashboard Monitoring | Optional real-time dashboard for tracking agent status and progress |
- **Enhanced Error Handling** : Implements retry mechanism with exponential backoff
| Streaming Support | Full support for streaming callbacks during agent execution |
- **Input Validation** : Validates task inputs before execution
| Error Handling | Comprehensive error handling with logging and status tracking |
- **Batch Processing** : Supports running tasks in batches
| Batch Processing | Supports running multiple tasks sequentially |
- **Metadata Collection** : Gathers detailed metadata about each agent's execution
| Resource Management | Automatic cleanup of resources and connections |
- **Customizable Output** : Allows saving metadata to file or returning as string/dictionary
| Flexible Output Types | Multiple output format options for conversation history |
| Agent Status Tracking | Real-time tracking of agent execution states (pending, running, completed, error) |
## Class Definition
## Class Definition
@ -40,7 +41,7 @@ The `ConcurrentWorkflow` class is designed to facilitate the concurrent executio
| `_cache` | `dict` | The cache for storing agent outputs. |
| `_cache` | `dict` | The cache for storing agent outputs. |
| `_progress_bar` | `tqdm` | The progress bar for tracking execution. |
| `_progress_bar` | `tqdm` | The progress bar for tracking execution. |
## Methods
## Constructor
### ConcurrentWorkflow.\_\_init\_\_
### ConcurrentWorkflow.\_\_init\_\_
@ -49,119 +50,202 @@ Initializes the `ConcurrentWorkflow` class with the provided parameters.
#### Parameters
#### Parameters
| Parameter | Type | Default Value | Description |
| Parameter | Type | Default Value | Description |
|-----------------------|----------------|----------------------------------------|-----------------------------------------------------------|
|-----------------------|-------------------------------|----------------------------------------|-----------------------------------------------------------|
| `id` | `str` | `swarm_id()` | Unique identifier for the workflow instance. |
| `name` | `str` | `"ConcurrentWorkflow"` | The name of the workflow. |
| `name` | `str` | `"ConcurrentWorkflow"` | The name of the workflow. |
| `description` | `str` | `"Execution of multiple agents concurrently"` | A brief description of the workflow. |
| `description` | `str` | `"Execution of multiple agents concurrently"` | A brief description of the workflow. |
| `agents` | `List[Agent]` | `[]` | A list of agents to be executed concurrently. |
| `agents` | `List[Union[Agent, Callable]]` | `None` | A list of agents or callables to be executed concurrently. |
| `metadata_output_path` | `str` | `"agent_metadata.json"` | Path to save the metadata output. |
| `auto_save` | `bool` | `True` | Flag indicating whether to automatically save metadata. |
| `auto_save` | `bool` | `True` | Flag indicating whether to automatically save the metadata. |
| `output_type` | `str` | `"dict-all-except-first"` | The type of output format. |
| `output_type` | `str` | `"dict"` | The type of output format. |
| `max_loops` | `int` | `1` | Maximum number of loops for each agent. |
| `max_loops` | `int` | `1` | Maximum number of loops for each agent. |
| `return_str_on` | `bool` | `False` | Flag to return output as string. |
| `auto_generate_prompts` | `bool` | `False` | Flag indicating whether to auto-generate prompts for agents. |
| `auto_generate_prompts` | `bool` | `False` | Flag indicating whether to auto-generate prompts for agents. |
| `return_entire_history` | `bool` | `False` | Flag to return entire conversation history. |
| `show_dashboard` | `bool` | `False` | Flag indicating whether to show real-time dashboard. |
| `interactive` | `bool` | `False` | Flag indicating whether to enable interactive mode. |
| `cache_size` | `int` | `100` | The size of the cache. |
| `max_retries` | `int` | `3` | The maximum number of retry attempts. |
| `retry_delay` | `float` | `1.0` | The delay between retry attempts in seconds. |
| `show_progress` | `bool` | `False` | Flag indicating whether to show progress. |
#### Raises
#### Raises
- `ValueError` : If the list of agents is empty or if the description is empty.
- `ValueError` : If no agents are provided or if the agents list is empty.
## Methods
### ConcurrentWorkflow.disable_agent_prints
### ConcurrentWorkflow.fix_age nts
Disables print statements for all agents in the workflow.
Configures agents for dashboard mode by disabling print statements when dashboard is enabled.
#### Returns
- `List[Union[Agent, Callable]]` : The configured list of agents.
```python
```python
workflow.disable_agent_prints()
agents = workflow.fix_agents()
```
### ConcurrentWorkflow.reliability_check
Validates workflow configuration and ensures agents are properly set up.
#### Raises
- `ValueError` : If no agents are provided or if the agents list is empty.
```python
workflow.reliability_check()
```
```
### ConcurrentWorkflow.activate_auto_prompt_engineering
### ConcurrentWorkflow.activate_auto_prompt_engineering
Activates the auto-generate prompts feature for all agents in the workflow.
Enables automatic prompt generation for all agents in the workflow.
```python
```python
workflow.activate_auto_prompt_engineering()
workflow.activate_auto_prompt_engineering()
```
```
### ConcurrentWorkflow.enable_progress_bar
### ConcurrentWorkflow.display_agent_dashboard
Displays real-time dashboard showing agent status and progress.
Enables the progress bar display for task execution.
#### Parameters
| Parameter | Type | Default Value | Description |
|-------------|---------|----------------------------|-----------------------------------------------------------|
| `title` | `str` | `"ConcurrentWorkflow Dashboard"` | Title for the dashboard. |
| `is_final` | `bool` | `False` | Whether this is the final dashboard display. |
```python
```python
workflow.enable_progress_bar()
workflow.display_agent_dashboard("Execution Progress", is_final=False )
```
```
### ConcurrentWorkflow.disable_progress_bar
### ConcurrentWorkflow.run_with_dashboard
Executes agents with real-time dashboard monitoring and streaming support.
#### Parameters
| Parameter | Type | Description |
|-----------------------|-----------------------------------|-----------------------------------------------------------|
| `task` | `str` | The task to execute. |
| `img` | `Optional[str]` | Optional image for processing. |
| `imgs` | `Optional[List[str]]` | Optional list of images for processing. |
| `streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming agent outputs. |
#### Returns
Disables the progress bar display.
- `Any` : The formatted conversation history based on output_type .
```python
```python
workflow.disable_progress_bar()
result = workflow.run_with_dashboard(
task="Analyze this data",
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
```
### ConcurrentWorkflow.clear_cache
### ConcurrentWorkflow._run
Executes agents concurrently without dashboard monitoring.
#### Parameters
| Parameter | Type | Description |
|-----------------------|-----------------------------------|-----------------------------------------------------------|
| `task` | `str` | The task to execute. |
| `img` | `Optional[str]` | Optional image for processing. |
| `imgs` | `Optional[List[str]]` | Optional list of images for processing. |
| `streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming agent outputs. |
#### Returns
Clears the task cache.
- `Any` : The formatted conversation history based on output_typ e.
```python
```python
workflow.clear_cache()
result = workflow._run(
task="Process this task",
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
```
### ConcurrentWorkflow.get_cache_stats
### ConcurrentWorkflow._run_agent_with_streaming
Runs a single agent with streaming callback support.
Gets cache statistics.
#### Parameters
| Parameter | Type | Description |
|-----------------------|-----------------------------------|-----------------------------------------------------------|
| `agent` | `Union[Agent, Callable]` | The agent or callable to execute. |
| `task` | `str` | The task to execute. |
| `img` | `Optional[str]` | Optional image for processing. |
| `imgs` | `Optional[List[str]]` | Optional list of images for processing. |
| `streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming outputs. |
#### Returns
#### Returns
- `Dict[str, int]` : A dictionary containing cache statistics.
- ` str`: The output from the agent execution .
```python
```python
stats = workflow.get_cache_stats()
output = workflow._run_agent_with_streaming(
print(stats) # {'cache_size': 5, 'max_cache_size': 100}
agent=my_agent,
task="Analyze data",
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
### ConcurrentWorkflow.cleanup
Cleans up resources and connections used by the workflow.
```python
workflow.cleanup()
```
```
### ConcurrentWorkflow.run
### ConcurrentWorkflow.run
Executes the workflow for the provided task.
Main execution method that runs all agents concurrently .
#### Parameters
#### Parameters
| Parameter | Type | Description |
| Parameter | Type | Description |
|-------------|---------------------|-----------------------------------------------------------|
|----------------------- |-------------- ---------------------|-----------------------------------------------------------|
| `task` | `Optional[str]` | The task or query to give to all agents. |
| `task` | `str` | The task to execute. |
| `img` | `Optional[str]` | The image to be processed by the agents. |
| `img` | `Optional[str]` | Optional image for processing. |
| ` *args` | `tuple` | Additional positional arguments. |
| ` imgs` | `Optional[List[str]]` | Optional list of images for processing. |
| ` **kwargs` | `dict` | Additional keyword arguments. |
| ` streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming agent outputs. |
#### Returns
#### Returns
- `Any` : The result of the execution, format depends on output_type and return_entire_history settings.
- `Any` : The formatted conversation history based on output_type.
#### Raises
- `ValueError` : If an invalid device is specified.
```python
- `Exception` : If any other error occurs during execution.
result = workflow.run(
task="What are the benefits of renewable energy?",
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
### ConcurrentWorkflow.run_batched
### ConcurrentWorkflow.batch_run
Runs the workflow for a batch of tasks.
Executes the workflow on multiple tasks sequentially .
#### Parameters
#### Parameters
| Parameter | Type | Description |
| Parameter | Type | Description |
|-------------|--------------|-----------------------------------------------------------|
|-----------------------|-----------------------------------|-----------------------------------------------------------|
| `tasks` | `List[str]` | A list of tasks or queries to give to all agents. |
| `tasks` | `List[str]` | List of tasks to execute. |
| `imgs` | `Optional[List[str]]` | Optional list of images corresponding to tasks. |
| `streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming outputs. |
#### Returns
#### Returns
- `List[Any]` : A list of results for each task.
- `List[Any]` : List of results for each task.
```python
results = workflow.batch_run(
tasks=["Task 1", "Task 2", "Task 3"],
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
## Usage Examples
## Usage Examples
### Example 1: Basic Usage with Interactive Mode
### Example 1: Basic Concurrent Execution
```python
```python
from swarms import Agent, ConcurrentWorkflow
from swarms import Agent, ConcurrentWorkflow
@ -169,59 +253,136 @@ from swarms import Agent, ConcurrentWorkflow
# Initialize agents
# Initialize agents
agents = [
agents = [
Agent(
Agent(
agent_name=f"Agent-{i}",
agent_name="Research-Agent",
system_prompt="You are a helpful assistant.",
system_prompt="You are a research specialist focused on gathering information.",
model_name="gpt-4",
max_loops=1,
),
Agent(
agent_name="Analysis-Agent",
system_prompt="You are an analysis expert who synthesizes information.",
model_name="gpt-4",
max_loops=1,
),
Agent(
agent_name="Summary-Agent",
system_prompt="You are a summarization expert who creates concise reports.",
model_name="gpt-4",
model_name="gpt-4",
max_loops=1,
max_loops=1,
)
)
for i in range(3)
]
]
# Initialize workflow with interactive mode
# Initialize workflow
workflow = ConcurrentWorkflow(
workflow = ConcurrentWorkflow(
name="Interactive Workflow",
name="Research Analysis Workflow",
description="Concurrent execution of research, analysis, and summarization tasks",
agents=agents,
agents=agents,
interactive=True,
auto_save=True,
show_progress=True,
output_type="dict-all-except-first",
cache_size=100,
show_dashboard=False
max_retries=3,
retry_delay=1.0
)
)
# Run workflow
# Run workflow
task = "What are the benefits of using Python for data analysi s?"
task = "What are the environmental impacts of electric vehicle s?"
result = workflow.run(task)
result = workflow.run(task)
print(result)
print(result)
```
```
### Example 2: Batch Processing with Progress Bar
### Example 2: Dashboard Monitoring with Streaming
```python
```python
# Initialize workflow
import time
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
"""Handle streaming output from agents."""
if chunk:
print(f"[{agent_name}] {chunk}", end="", flush=True)
if is_final:
print(f"\n[{agent_name}] Completed\n")
# Initialize workflow with dashboard
workflow = ConcurrentWorkflow(
workflow = ConcurrentWorkflow(
name="Batch Processing Workflow",
name="Monitored Workflow",
agents=agents,
agents=agents,
show_progress=True,
show_dashboard=True, # Enable real-time dashboard
auto_save=True
output_type="dict-all-except-first"
)
# Run with streaming and dashboard
task = "Analyze the future of artificial intelligence in healthcare"
result = workflow.run(
task=task,
streaming_callback=streaming_callback
)
)
# Define tasks
print("Final Result:", result)
```
### Example 3: Batch Processing Multiple Tasks
```python
# Define multiple tasks
tasks = [
tasks = [
"Analyze the impact of climate change on agriculture",
"What are the benefits of renewable energy adoption?",
"Evaluate renewable energy solutions",
"How does blockchain technology impact supply chains?",
"Assess water conservation strategies"
"What are the challenges of implementing remote work policies?",
"Analyze the growth of e-commerce in developing countries"
]
]
# Run batch processing
# Initialize workflow for batch processing
results = workflow.run_batched(tasks)
workflow = ConcurrentWorkflow(
name="Batch Analysis Workflow",
agents=agents,
output_type="dict-all-except-first",
show_dashboard=False
)
# Process results
# Process all tasks
for task, result in zip(tasks, results):
results = workflow.batch_run(tasks=tasks)
print(f"Task: {task}")
print(f"Result: {result}\n")
# Display results
for i, (task, result) in enumerate(zip(tasks, results)):
print(f"\n{'='*50}")
print(f"Task {i+1}: {task}")
print(f"{'='*50}")
print(f"Result: {result}")
```
```
### Example 3: Error Handling and Retries
### Example 4: Auto-Prompt Engineering
```python
# Initialize agents without specific prompts
agents = [
Agent(
agent_name="Creative-Agent",
model_name="gpt-4",
max_loops=1,
),
Agent(
agent_name="Technical-Agent",
model_name="gpt-4",
max_loops=1,
)
]
# Initialize workflow with auto-prompt engineering
workflow = ConcurrentWorkflow(
name="Auto-Prompt Workflow",
agents=agents,
auto_generate_prompts=True, # Enable auto-prompt generation
output_type="dict-all-except-first"
)
# Activate auto-prompt engineering (can also be done in init)
workflow.activate_auto_prompt_engineering()
# Run workflow
task = "Design a mobile app for fitness tracking"
result = workflow.run(task)
print(result)
```
### Example 5: Error Handling and Cleanup
```python
```python
import logging
import logging
@ -229,38 +390,103 @@ import logging
# Set up logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.INFO)
# Initialize workflow with retry settings
# Initialize workflow
workflow = ConcurrentWorkflow(
workflow = ConcurrentWorkflow(
name="Reliable Workflow",
name="Reliable Workflow",
agents=agents,
agents=agents,
max_retries=3,
output_type="dict-all-except-first"
retry_delay=1.0,
show_progress=True
)
)
# Run workflow with error handling
# Run workflow with proper error handling
try:
try:
task = "Generate a comprehensive market analysis report "
task = "Generate a comprehensive report on quantum computing applications "
result = workflow.run(task)
result = workflow.run(task)
print("Workflow completed successfully!")
print(result)
print(result)
except Exception as e:
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
logging.error(f"Workflow failed: {str(e)}")
finally:
# Always cleanup resources
workflow.cleanup()
print("Resources cleaned up")
```
```
## Tips and Best Practices
### Example 6: Working with Imag es
- **Agent Initialization** : Ensure all agents are correctly initialized with required configurations.
```python
- **Interactive Mode** : Use interactive mode for tasks requiring user input or modification.
# Initialize agents capable of image processing
- **Caching** : Utilize the caching system for repeated tasks to improve performance.
vision_agents = [
- **Progress Tracking** : Enable progress bar for long-running tasks to monitor execution.
Agent(
- **Error Handling** : Implement proper error handling and use retry mechanism for reliability.
agent_name="Image-Analysis-Agent",
- **Resource Management** : Monitor cache size and clear when necessary.
system_prompt="You are an expert at analyzing images and extracting insights.",
- **Batch Processing** : Use batch processing for multiple related tasks.
model_name="gpt-4-vision-preview",
- **Logging** : Implement detailed logging for debugging and monitoring.
max_loops=1,
),
Agent(
agent_name="Content-Description-Agent",
system_prompt="You specialize in creating detailed descriptions of visual content.",
model_name="gpt-4-vision-preview",
max_loops=1,
)
]
# Initialize workflow for image processing
workflow = ConcurrentWorkflow(
name="Image Analysis Workflow",
agents=vision_agents,
output_type="dict-all-except-first",
show_dashboard=True
)
# Run with image input
task = "Analyze this image and provide insights about its content"
image_path = "/path/to/image.jpg"
result = workflow.run(
task=task,
img=image_path,
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
print(result)
```
### Example 7: Custom Callable Agents
## References and Resources
```python
from typing import Optional
def custom_analysis_agent(task: str, img: Optional[str] = None, **kwargs) -> str:
"""Custom analysis function that can be used as an agent."""
# Custom logic here
return f"Custom analysis result for: {task}"
def sentiment_analysis_agent(task: str, img: Optional[str] = None, **kwargs) -> str:
"""Sentiment analysis function."""
# Custom sentiment analysis logic
return f"Sentiment analysis for: {task}"
# Mix of Agent objects and callable functions
mixed_agents = [
Agent(
agent_name="GPT-Agent",
system_prompt="You are a helpful assistant.",
model_name="gpt-4",
max_loops=1,
),
custom_analysis_agent, # Callable function
sentiment_analysis_agent # Another callable function
]
# Initialize workflow with mixed agent types
workflow = ConcurrentWorkflow(
name="Mixed Agents Workflow",
agents=mixed_agents,
output_type="dict-all-except-first"
)
- [Python's ThreadPoolExecutor Documentation ](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor )
# Run workflow
- [tqdm Progress Bar Documentation ](https://tqdm.github.io/ )
task = "Analyze customer feedback and provide insights"
- [Python's functools.lru_cache Documentation ](https://docs.python.org/3/library/functools.html#functools.lru_cache )
result = workflow.run(task)
- [Loguru for Logging in Python ](https://loguru.readthedocs.io/en/stable/ )
print(result)
```