Merge branch 'kyegomez:master' into master

pull/1057/head
Aksh Parekh 1 month ago committed by GitHub
commit fc451df6a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,65 @@
# Firecrawl Tool
The Firecrawl tool is a powerful web crawling utility that integrates seamlessly with Swarms agents to extract, analyze, and process content from websites. It leverages the Firecrawl API to crawl entire websites, extract structured data, and provide comprehensive content analysis for various use cases including marketing, research, content creation, and data analysis.
### Key Features
| Feature | Description |
|-----------------------------|-----------------------------------------------------------------------------------------------|
| **Complete Site Crawling** | Crawl entire websites and extract content from multiple pages |
| **Structured Data Extraction** | Automatically parse and structure web content |
| **Agent Integration** | Works seamlessly with Swarms agents for intelligent content processing |
| **Marketing Copy Analysis** | Specialized for analyzing and improving marketing content |
| **Content Optimization** | Identify and enhance key value propositions and calls-to-action |
## Prerequisites
Before getting started, you'll need:
1. **Python 3.8+** installed on your system
2. **Firecrawl API Key** from [firecrawl.dev/app](https://www.firecrawl.dev/app)
3. **OpenAI API Key** for agent functionality
## Install
```bash
pip3 install -U swarms swarms-tools
```
## ENV
```txt
FIRECRAWL_API_KEY=""
OPENAI_API_KEY=""
```
## Usage
```python
from swarms_tools import crawl_entire_site_firecrawl
from swarms import Agent
agent = Agent(
agent_name="Marketing Copy Improver",
model_name="gpt-4.1",
tools=[crawl_entire_site_firecrawl],
dynamic_context_window=True,
dynamic_temperature_enabled=True,
max_loops=1,
system_prompt=(
"You are a world-class marketing copy improver. "
"Given a website URL, your job is to crawl the entire site, analyze all marketing copy, "
"and rewrite it to maximize clarity, engagement, and conversion. "
"Return the improved marketing copy in a structured, easy-to-read format. "
"Be concise, persuasive, and ensure the tone matches the brand. "
"Highlight key value propositions and calls to action."
),
)
out = agent.run(
"Crawl 2-3 pages of swarms.ai and improve the marketing copy found on those pages. Return the improved copy in a structured format."
)
print(out)
```

@ -0,0 +1,84 @@
# Web Scraper Agents
Web scraper agents are specialized AI agents that can automatically extract and process information from websites. These agents combine the power of large language models with web scraping tools to intelligently gather, analyze, and structure data from the web.
Web scraper agents are AI-powered tools that can:
| Capability | Description |
|----------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------|
| **Automatically navigate websites** | Extract relevant information from web pages |
| **Parse and structure data** | Convert HTML content into readable, structured formats |
| **Handle dynamic content** | Process JavaScript-rendered pages and dynamic website elements |
| **Provide intelligent summaries and analysis** | Generate summaries and analyze the scraped content |
| **Scale to multiple websites simultaneously** | Scrape and process data from several websites at once for comprehensive research |
## Install
```bash
pip3 install -U swarms swarms-tools
```
## Environment Setup
```bash
OPENAI_API_KEY="your_openai_api_key_here"
```
## Basic Usage
Here's a simple example of how to create a web scraper agent:
```python
from swarms import Agent
from swarms_tools import scrape_and_format_sync
agent = Agent(
agent_name="Web Scraper Agent",
model_name="gpt-4o-mini",
tools=[scrape_and_format_sync],
dynamic_context_window=True,
dynamic_temperature_enabled=True,
max_loops=1,
system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full",
)
out = agent.run(
"Scrape swarms.ai website and provide a full report of the company does. The format type should be full."
)
print(out)
```
## Scraping Multiple Sites
For comprehensive research, you can scrape multiple websites simultaneously using batch execution:
```python
from swarms.structs.multi_agent_exec import batched_grid_agent_execution
from swarms_tools import scrape_and_format_sync
from swarms import Agent
agent = Agent(
agent_name="Web Scraper Agent",
model_name="gpt-4o-mini",
tools=[scrape_and_format_sync],
dynamic_context_window=True,
dynamic_temperature_enabled=True,
max_loops=1,
system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full",
)
out = batched_grid_agent_execution(
agents=[agent, agent],
tasks=[
"Scrape swarms.ai website and provide a full report of the company's mission, products, and team. The format type should be full.",
"Scrape langchain.com website and provide a full report of the company's mission, products, and team. The format type should be full.",
],
)
print(out)
```
## Conclusion
Web scraper agents combine AI with advanced automation to efficiently gather and process web data at scale. As you master the basics, explore features like batch processing and custom tools to unlock the full power of AI-driven web scraping.

@ -430,11 +430,13 @@ nav:
- Advanced Research: "examples/av.md"
- Browser Use: "examples/browser_use.md"
- Yahoo Finance: "swarms/examples/yahoo_finance.md"
- Firecrawl: "developer_guides/firecrawl.md"
- RAG:
- RAG with Qdrant: "swarms/RAG/qdrant_rag.md"
- Apps:
- Web Scraper Agents: "developer_guides/web_scraper.md"
- Smart Database: "examples/smart_database.md"

@ -1,20 +1,21 @@
# ConcurrentWorkflow Documentation
## Overview
The `ConcurrentWorkflow` class is designed to facilitate the concurrent execution of multiple agents, each tasked with solving a specific query or problem. This class is particularly useful in scenarios where multiple agents need to work in parallel, allowing for efficient resource utilization and faster completion of tasks. The workflow manages the execution, handles streaming callbacks, and provides optional dashboard monitoring for real-time progress tracking.
The `ConcurrentWorkflow` class is designed to facilitate the concurrent execution of multiple agents, each tasked with solving a specific query or problem. This class is particularly useful in scenarios where multiple agents need to work in parallel, allowing for efficient resource utilization and faster completion of tasks. The workflow manages the execution, collects metadata, and optionally saves the results in a structured format.
Full Path: `swarms.structs.concurrent_workflow`
### Key Features
- **Concurrent Execution**: Runs multiple agents simultaneously using Python's `ThreadPoolExecutor`
- **Interactive Mode**: Supports interactive task modification and execution
- **Caching System**: Implements LRU caching for repeated prompts
- **Progress Tracking**: Optional progress bar for task execution
- **Enhanced Error Handling**: Implements retry mechanism with exponential backoff
- **Input Validation**: Validates task inputs before execution
- **Batch Processing**: Supports running tasks in batches
- **Metadata Collection**: Gathers detailed metadata about each agent's execution
- **Customizable Output**: Allows saving metadata to file or returning as string/dictionary
| Feature | Description |
|---------------------------|-----------------------------------------------------------------------------------------------|
| Concurrent Execution | Runs multiple agents simultaneously using Python's `ThreadPoolExecutor` |
| Dashboard Monitoring | Optional real-time dashboard for tracking agent status and progress |
| Streaming Support | Full support for streaming callbacks during agent execution |
| Error Handling | Comprehensive error handling with logging and status tracking |
| Batch Processing | Supports running multiple tasks sequentially |
| Resource Management | Automatic cleanup of resources and connections |
| Flexible Output Types | Multiple output format options for conversation history |
| Agent Status Tracking | Real-time tracking of agent execution states (pending, running, completed, error) |
## Class Definition
@ -40,7 +41,7 @@ The `ConcurrentWorkflow` class is designed to facilitate the concurrent executio
| `_cache` | `dict` | The cache for storing agent outputs. |
| `_progress_bar` | `tqdm` | The progress bar for tracking execution. |
## Methods
## Constructor
### ConcurrentWorkflow.\_\_init\_\_
@ -49,119 +50,202 @@ Initializes the `ConcurrentWorkflow` class with the provided parameters.
#### Parameters
| Parameter | Type | Default Value | Description |
|-----------------------|----------------|----------------------------------------|-----------------------------------------------------------|
|-----------------------|-------------------------------|----------------------------------------|-----------------------------------------------------------|
| `id` | `str` | `swarm_id()` | Unique identifier for the workflow instance. |
| `name` | `str` | `"ConcurrentWorkflow"` | The name of the workflow. |
| `description` | `str` | `"Execution of multiple agents concurrently"` | A brief description of the workflow. |
| `agents` | `List[Agent]` | `[]` | A list of agents to be executed concurrently. |
| `metadata_output_path`| `str` | `"agent_metadata.json"` | Path to save the metadata output. |
| `auto_save` | `bool` | `True` | Flag indicating whether to automatically save the metadata. |
| `output_type` | `str` | `"dict"` | The type of output format. |
| `agents` | `List[Union[Agent, Callable]]`| `None` | A list of agents or callables to be executed concurrently. |
| `auto_save` | `bool` | `True` | Flag indicating whether to automatically save metadata. |
| `output_type` | `str` | `"dict-all-except-first"` | The type of output format. |
| `max_loops` | `int` | `1` | Maximum number of loops for each agent. |
| `return_str_on` | `bool` | `False` | Flag to return output as string. |
| `auto_generate_prompts`| `bool` | `False` | Flag indicating whether to auto-generate prompts for agents. |
| `return_entire_history`| `bool` | `False` | Flag to return entire conversation history. |
| `interactive` | `bool` | `False` | Flag indicating whether to enable interactive mode. |
| `cache_size` | `int` | `100` | The size of the cache. |
| `max_retries` | `int` | `3` | The maximum number of retry attempts. |
| `retry_delay` | `float` | `1.0` | The delay between retry attempts in seconds. |
| `show_progress` | `bool` | `False` | Flag indicating whether to show progress. |
| `show_dashboard` | `bool` | `False` | Flag indicating whether to show real-time dashboard. |
#### Raises
- `ValueError`: If the list of agents is empty or if the description is empty.
- `ValueError`: If no agents are provided or if the agents list is empty.
## Methods
### ConcurrentWorkflow.disable_agent_prints
### ConcurrentWorkflow.fix_agents
Disables print statements for all agents in the workflow.
Configures agents for dashboard mode by disabling print statements when dashboard is enabled.
#### Returns
- `List[Union[Agent, Callable]]`: The configured list of agents.
```python
workflow.disable_agent_prints()
agents = workflow.fix_agents()
```
### ConcurrentWorkflow.reliability_check
Validates workflow configuration and ensures agents are properly set up.
#### Raises
- `ValueError`: If no agents are provided or if the agents list is empty.
```python
workflow.reliability_check()
```
### ConcurrentWorkflow.activate_auto_prompt_engineering
Activates the auto-generate prompts feature for all agents in the workflow.
Enables automatic prompt generation for all agents in the workflow.
```python
workflow.activate_auto_prompt_engineering()
```
### ConcurrentWorkflow.enable_progress_bar
### ConcurrentWorkflow.display_agent_dashboard
Displays real-time dashboard showing agent status and progress.
Enables the progress bar display for task execution.
#### Parameters
| Parameter | Type | Default Value | Description |
|-------------|---------|----------------------------|-----------------------------------------------------------|
| `title` | `str` | `"ConcurrentWorkflow Dashboard"` | Title for the dashboard. |
| `is_final` | `bool` | `False` | Whether this is the final dashboard display. |
```python
workflow.enable_progress_bar()
workflow.display_agent_dashboard("Execution Progress", is_final=False)
```
### ConcurrentWorkflow.disable_progress_bar
### ConcurrentWorkflow.run_with_dashboard
Executes agents with real-time dashboard monitoring and streaming support.
#### Parameters
| Parameter | Type | Description |
|-----------------------|-----------------------------------|-----------------------------------------------------------|
| `task` | `str` | The task to execute. |
| `img` | `Optional[str]` | Optional image for processing. |
| `imgs` | `Optional[List[str]]` | Optional list of images for processing. |
| `streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming agent outputs. |
#### Returns
Disables the progress bar display.
- `Any`: The formatted conversation history based on output_type.
```python
workflow.disable_progress_bar()
result = workflow.run_with_dashboard(
task="Analyze this data",
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
### ConcurrentWorkflow.clear_cache
### ConcurrentWorkflow._run
Executes agents concurrently without dashboard monitoring.
#### Parameters
| Parameter | Type | Description |
|-----------------------|-----------------------------------|-----------------------------------------------------------|
| `task` | `str` | The task to execute. |
| `img` | `Optional[str]` | Optional image for processing. |
| `imgs` | `Optional[List[str]]` | Optional list of images for processing. |
| `streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming agent outputs. |
#### Returns
Clears the task cache.
- `Any`: The formatted conversation history based on output_type.
```python
workflow.clear_cache()
result = workflow._run(
task="Process this task",
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
### ConcurrentWorkflow.get_cache_stats
### ConcurrentWorkflow._run_agent_with_streaming
Runs a single agent with streaming callback support.
Gets cache statistics.
#### Parameters
| Parameter | Type | Description |
|-----------------------|-----------------------------------|-----------------------------------------------------------|
| `agent` | `Union[Agent, Callable]` | The agent or callable to execute. |
| `task` | `str` | The task to execute. |
| `img` | `Optional[str]` | Optional image for processing. |
| `imgs` | `Optional[List[str]]` | Optional list of images for processing. |
| `streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming outputs. |
#### Returns
- `Dict[str, int]`: A dictionary containing cache statistics.
- `str`: The output from the agent execution.
```python
stats = workflow.get_cache_stats()
print(stats) # {'cache_size': 5, 'max_cache_size': 100}
output = workflow._run_agent_with_streaming(
agent=my_agent,
task="Analyze data",
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
### ConcurrentWorkflow.cleanup
Cleans up resources and connections used by the workflow.
```python
workflow.cleanup()
```
### ConcurrentWorkflow.run
Executes the workflow for the provided task.
Main execution method that runs all agents concurrently.
#### Parameters
| Parameter | Type | Description |
|-------------|---------------------|-----------------------------------------------------------|
| `task` | `Optional[str]` | The task or query to give to all agents. |
| `img` | `Optional[str]` | The image to be processed by the agents. |
| `*args` | `tuple` | Additional positional arguments. |
| `**kwargs` | `dict` | Additional keyword arguments. |
|-----------------------|-----------------------------------|-----------------------------------------------------------|
| `task` | `str` | The task to execute. |
| `img` | `Optional[str]` | Optional image for processing. |
| `imgs` | `Optional[List[str]]` | Optional list of images for processing. |
| `streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming agent outputs. |
#### Returns
- `Any`: The result of the execution, format depends on output_type and return_entire_history settings.
#### Raises
- `Any`: The formatted conversation history based on output_type.
- `ValueError`: If an invalid device is specified.
- `Exception`: If any other error occurs during execution.
```python
result = workflow.run(
task="What are the benefits of renewable energy?",
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
### ConcurrentWorkflow.run_batched
### ConcurrentWorkflow.batch_run
Runs the workflow for a batch of tasks.
Executes the workflow on multiple tasks sequentially.
#### Parameters
| Parameter | Type | Description |
|-------------|--------------|-----------------------------------------------------------|
| `tasks` | `List[str]` | A list of tasks or queries to give to all agents. |
|-----------------------|-----------------------------------|-----------------------------------------------------------|
| `tasks` | `List[str]` | List of tasks to execute. |
| `imgs` | `Optional[List[str]]` | Optional list of images corresponding to tasks. |
| `streaming_callback` | `Optional[Callable[[str, str, bool], None]]` | Callback for streaming outputs. |
#### Returns
- `List[Any]`: A list of results for each task.
- `List[Any]`: List of results for each task.
```python
results = workflow.batch_run(
tasks=["Task 1", "Task 2", "Task 3"],
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
```
## Usage Examples
### Example 1: Basic Usage with Interactive Mode
### Example 1: Basic Concurrent Execution
```python
from swarms import Agent, ConcurrentWorkflow
@ -169,59 +253,136 @@ from swarms import Agent, ConcurrentWorkflow
# Initialize agents
agents = [
Agent(
agent_name=f"Agent-{i}",
system_prompt="You are a helpful assistant.",
agent_name="Research-Agent",
system_prompt="You are a research specialist focused on gathering information.",
model_name="gpt-4",
max_loops=1,
),
Agent(
agent_name="Analysis-Agent",
system_prompt="You are an analysis expert who synthesizes information.",
model_name="gpt-4",
max_loops=1,
),
Agent(
agent_name="Summary-Agent",
system_prompt="You are a summarization expert who creates concise reports.",
model_name="gpt-4",
max_loops=1,
)
for i in range(3)
]
# Initialize workflow with interactive mode
# Initialize workflow
workflow = ConcurrentWorkflow(
name="Interactive Workflow",
name="Research Analysis Workflow",
description="Concurrent execution of research, analysis, and summarization tasks",
agents=agents,
interactive=True,
show_progress=True,
cache_size=100,
max_retries=3,
retry_delay=1.0
auto_save=True,
output_type="dict-all-except-first",
show_dashboard=False
)
# Run workflow
task = "What are the benefits of using Python for data analysis?"
task = "What are the environmental impacts of electric vehicles?"
result = workflow.run(task)
print(result)
```
### Example 2: Batch Processing with Progress Bar
### Example 2: Dashboard Monitoring with Streaming
```python
# Initialize workflow
import time
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
"""Handle streaming output from agents."""
if chunk:
print(f"[{agent_name}] {chunk}", end="", flush=True)
if is_final:
print(f"\n[{agent_name}] Completed\n")
# Initialize workflow with dashboard
workflow = ConcurrentWorkflow(
name="Batch Processing Workflow",
name="Monitored Workflow",
agents=agents,
show_progress=True,
auto_save=True
show_dashboard=True, # Enable real-time dashboard
output_type="dict-all-except-first"
)
# Run with streaming and dashboard
task = "Analyze the future of artificial intelligence in healthcare"
result = workflow.run(
task=task,
streaming_callback=streaming_callback
)
# Define tasks
print("Final Result:", result)
```
### Example 3: Batch Processing Multiple Tasks
```python
# Define multiple tasks
tasks = [
"Analyze the impact of climate change on agriculture",
"Evaluate renewable energy solutions",
"Assess water conservation strategies"
"What are the benefits of renewable energy adoption?",
"How does blockchain technology impact supply chains?",
"What are the challenges of implementing remote work policies?",
"Analyze the growth of e-commerce in developing countries"
]
# Run batch processing
results = workflow.run_batched(tasks)
# Initialize workflow for batch processing
workflow = ConcurrentWorkflow(
name="Batch Analysis Workflow",
agents=agents,
output_type="dict-all-except-first",
show_dashboard=False
)
# Process results
for task, result in zip(tasks, results):
print(f"Task: {task}")
print(f"Result: {result}\n")
# Process all tasks
results = workflow.batch_run(tasks=tasks)
# Display results
for i, (task, result) in enumerate(zip(tasks, results)):
print(f"\n{'='*50}")
print(f"Task {i+1}: {task}")
print(f"{'='*50}")
print(f"Result: {result}")
```
### Example 3: Error Handling and Retries
### Example 4: Auto-Prompt Engineering
```python
# Initialize agents without specific prompts
agents = [
Agent(
agent_name="Creative-Agent",
model_name="gpt-4",
max_loops=1,
),
Agent(
agent_name="Technical-Agent",
model_name="gpt-4",
max_loops=1,
)
]
# Initialize workflow with auto-prompt engineering
workflow = ConcurrentWorkflow(
name="Auto-Prompt Workflow",
agents=agents,
auto_generate_prompts=True, # Enable auto-prompt generation
output_type="dict-all-except-first"
)
# Activate auto-prompt engineering (can also be done in init)
workflow.activate_auto_prompt_engineering()
# Run workflow
task = "Design a mobile app for fitness tracking"
result = workflow.run(task)
print(result)
```
### Example 5: Error Handling and Cleanup
```python
import logging
@ -229,38 +390,103 @@ import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
# Initialize workflow with retry settings
# Initialize workflow
workflow = ConcurrentWorkflow(
name="Reliable Workflow",
agents=agents,
max_retries=3,
retry_delay=1.0,
show_progress=True
output_type="dict-all-except-first"
)
# Run workflow with error handling
# Run workflow with proper error handling
try:
task = "Generate a comprehensive market analysis report"
task = "Generate a comprehensive report on quantum computing applications"
result = workflow.run(task)
print("Workflow completed successfully!")
print(result)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
logging.error(f"Workflow failed: {str(e)}")
finally:
# Always cleanup resources
workflow.cleanup()
print("Resources cleaned up")
```
## Tips and Best Practices
### Example 6: Working with Images
- **Agent Initialization**: Ensure all agents are correctly initialized with required configurations.
- **Interactive Mode**: Use interactive mode for tasks requiring user input or modification.
- **Caching**: Utilize the caching system for repeated tasks to improve performance.
- **Progress Tracking**: Enable progress bar for long-running tasks to monitor execution.
- **Error Handling**: Implement proper error handling and use retry mechanism for reliability.
- **Resource Management**: Monitor cache size and clear when necessary.
- **Batch Processing**: Use batch processing for multiple related tasks.
- **Logging**: Implement detailed logging for debugging and monitoring.
```python
# Initialize agents capable of image processing
vision_agents = [
Agent(
agent_name="Image-Analysis-Agent",
system_prompt="You are an expert at analyzing images and extracting insights.",
model_name="gpt-4-vision-preview",
max_loops=1,
),
Agent(
agent_name="Content-Description-Agent",
system_prompt="You specialize in creating detailed descriptions of visual content.",
model_name="gpt-4-vision-preview",
max_loops=1,
)
]
# Initialize workflow for image processing
workflow = ConcurrentWorkflow(
name="Image Analysis Workflow",
agents=vision_agents,
output_type="dict-all-except-first",
show_dashboard=True
)
# Run with image input
task = "Analyze this image and provide insights about its content"
image_path = "/path/to/image.jpg"
result = workflow.run(
task=task,
img=image_path,
streaming_callback=lambda agent, chunk, done: print(f"{agent}: {chunk}")
)
print(result)
```
### Example 7: Custom Callable Agents
## References and Resources
```python
from typing import Optional
def custom_analysis_agent(task: str, img: Optional[str] = None, **kwargs) -> str:
"""Custom analysis function that can be used as an agent."""
# Custom logic here
return f"Custom analysis result for: {task}"
def sentiment_analysis_agent(task: str, img: Optional[str] = None, **kwargs) -> str:
"""Sentiment analysis function."""
# Custom sentiment analysis logic
return f"Sentiment analysis for: {task}"
# Mix of Agent objects and callable functions
mixed_agents = [
Agent(
agent_name="GPT-Agent",
system_prompt="You are a helpful assistant.",
model_name="gpt-4",
max_loops=1,
),
custom_analysis_agent, # Callable function
sentiment_analysis_agent # Another callable function
]
# Initialize workflow with mixed agent types
workflow = ConcurrentWorkflow(
name="Mixed Agents Workflow",
agents=mixed_agents,
output_type="dict-all-except-first"
)
- [Python's ThreadPoolExecutor Documentation](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor)
- [tqdm Progress Bar Documentation](https://tqdm.github.io/)
- [Python's functools.lru_cache Documentation](https://docs.python.org/3/library/functools.html#functools.lru_cache)
- [Loguru for Logging in Python](https://loguru.readthedocs.io/en/stable/)
# Run workflow
task = "Analyze customer feedback and provide insights"
result = workflow.run(task)
print(result)
```

@ -12,206 +12,331 @@ graph TD
D --> E[Single Task]
D --> F[Batch Tasks]
D --> G[Concurrent Tasks]
D --> H[Async Tasks]
E --> I[Run Agents]
F --> I
G --> I
H --> I
I --> J[Collect Responses]
J --> K[Consensus Analysis]
K --> L{Consensus Agent?}
L -->|Yes| M[Use Consensus Agent]
L -->|No| N[Use Last Agent]
M --> O[Final Output]
N --> O
O --> P[Save Conversation]
E --> H[Run Agents]
F --> H
G --> H
H --> I[Collect Responses]
I --> J[Consensus Analysis]
J --> K{Consensus Agent?}
K -->|Yes| L[Use Consensus Agent]
K -->|No| M[Use Last Agent]
L --> N[Final Output]
M --> N
N --> O[Return Result]
```
### Key Concepts
- **Majority Voting**: A method to determine the most common response from a set of answers.
- **Agents**: Entities (e.g., models, algorithms) that provide responses to tasks or queries.
- **Output Parser**: A function that processes the responses from the agents before performing the majority voting.
- **Consensus Agent**: An optional agent that analyzes the responses from all agents to determine the final consensus.
- **Conversation History**: A record of all agent interactions and responses during the voting process.
- **Output Types**: Support for different output formats (string, dictionary, list).
## Class Definition: `MajorityVoting`
### Parameters
```python
class MajorityVoting:
def __init__(
self,
id: str = swarm_id(),
name: str = "MajorityVoting",
description: str = "A majority voting system for agents",
agents: List[Agent] = None,
consensus_agent: Optional[Agent] = None,
autosave: bool = False,
verbose: bool = False,
max_loops: int = 1,
output_type: OutputType = "dict",
*args,
**kwargs,
):
```
| Parameter | Type | Description |
|------------------|----------------|-----------------------------------------------------------------------------|
| `name` | `str` | Name of the majority voting system. Default is "MajorityVoting". |
| `description` | `str` | Description of the system. Default is "A majority voting system for agents". |
| `agents` | `List[Agent]` | A list of agents to be used in the majority voting system. |
| `output_parser` | `Callable` | Function to parse agent outputs. Default is `majority_voting` function. |
| `consensus_agent`| `Agent` | Optional agent for analyzing consensus among responses. |
| `autosave` | `bool` | Whether to autosave conversations. Default is `False`. |
| `verbose` | `bool` | Whether to enable verbose logging. Default is `False`. |
| `max_loops` | `int` | Maximum number of voting loops. Default is 1. |
### Constructor Parameters
| Parameter | Type | Default | Description |
|------------------|-------------------|---------------|-----------------------------------------------------------------------------|
| `id` | `str` | `swarm_id()` | Unique identifier for the majority voting system. |
| `name` | `str` | `"MajorityVoting"` | Name of the majority voting system. |
| `description` | `str` | `"A majority voting system for agents"` | Description of the system. |
| `agents` | `List[Agent]` | `None` | A list of agents to be used in the majority voting system. Required. |
| `consensus_agent`| `Optional[Agent]` | `None` | Optional agent for analyzing consensus among responses. If None, uses last agent. |
| `autosave` | `bool` | `False` | Whether to autosave conversations. |
| `verbose` | `bool` | `False` | Whether to enable verbose logging. |
| `max_loops` | `int` | `1` | Maximum number of voting loops. |
| `output_type` | `OutputType` | `"dict"` | Output format: "str", "dict", "list", or other. |
| `*args` | `Any` | - | Variable length argument list passed to Conversation. |
| `**kwargs` | `Any` | - | Arbitrary keyword arguments passed to Conversation. |
### Methods
#### `run(task: str, correct_answer: str, *args, **kwargs) -> List[Any]`
#### `run(task: str, *args, **kwargs) -> Any`
Runs the majority voting system for a single task.
Runs the majority voting system for a single task and returns the consensus result.
**Parameters:**
- `task` (str): The task to be performed by the agents
- `correct_answer` (str): The correct answer for evaluation
- `*args`, `**kwargs`: Additional arguments
- `task` (`str`): The task to be performed by the agents
- `*args`: Variable length argument list passed to agents
- `**kwargs`: Arbitrary keyword arguments passed to agents
**Returns:**
- List[Any]: The conversation history as a string, including the majority vote
- `Any`: The consensus result in the specified output format (string, dict, or list)
**Raises:**
- `ValueError`: If agents list is empty
#### `batch_run(tasks: List[str], *args, **kwargs) -> List[Any]`
Runs multiple tasks in sequence.
Runs the majority voting system for multiple tasks in sequence.
**Parameters:**
- `tasks` (List[str]): List of tasks to be performed
- `*args`, `**kwargs`: Additional arguments
- `tasks` (`List[str]`): List of tasks to be performed by the agents
- `*args`: Variable length argument list passed to each run
- `**kwargs`: Arbitrary keyword arguments passed to each run
**Returns:**
- List[Any]: List of majority votes for each task
- `List[Any]`: List of consensus results for each task
#### `run_concurrently(tasks: List[str], *args, **kwargs) -> List[Any]`
Runs multiple tasks concurrently using thread pooling.
Runs the majority voting system for multiple tasks concurrently using thread pooling.
**Parameters:**
- `tasks` (List[str]): List of tasks to be performed
- `*args`, `**kwargs`: Additional arguments
**Returns:**
- List[Any]: List of majority votes for each task
#### `run_async(tasks: List[str], *args, **kwargs) -> List[Any]`
- `tasks` (`List[str]`): List of tasks to be performed by the agents
- `*args`: Variable length argument list passed to each run
- `**kwargs`: Arbitrary keyword arguments passed to each run
Runs multiple tasks asynchronously using asyncio.
**Returns:**
**Parameters:**
- `tasks` (List[str]): List of tasks to be performed
- `*args`, `**kwargs`: Additional arguments
- `List[Any]`: List of consensus results for each task, in completion order
**Returns:**
- List[Any]: List of majority votes for each task
**Note:** Uses `os.cpu_count()` workers for optimal performance.
## Usage Examples
### Example 1: Basic Single Task Execution with Modern LLMs
### Example 1: Investment Analysis with Consensus Agent
This example demonstrates using MajorityVoting for financial analysis with specialized agents and a dedicated consensus agent.
```python
from swarms import Agent, MajorityVoting
# Initialize multiple agents with different specialties
# Initialize multiple specialized agents
agents = [
Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor focused on market analysis",
system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.",
agent_name="Market-Analysis-Agent",
agent_description="Market trend analyst",
system_prompt="You are a market analyst specializing in identifying growth opportunities and market trends.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Risk-Assessment-Agent",
agent_description="Risk analysis and portfolio management expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.",
agent_description="Risk analysis expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and volatility.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Tech-Investment-Agent",
agent_description="Technology sector investment specialist",
system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.",
agent_name="Portfolio-Strategy-Agent",
agent_description="Portfolio optimization specialist",
system_prompt="You are a portfolio strategist focused on diversification and long-term growth strategies.",
max_loops=1,
model_name="gpt-4o"
)
]
# Dedicated consensus agent for final analysis
consensus_agent = Agent(
agent_name="Consensus-Agent",
agent_description="Consensus agent focused on analyzing investment advice",
system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.",
agent_name="Investment-Consensus-Agent",
agent_description="Investment consensus analyzer",
system_prompt="You are an expert at synthesizing investment advice from multiple perspectives into a coherent recommendation.",
max_loops=1,
model_name="gpt-4o"
)
# Create majority voting system
majority_voting = MajorityVoting(
name="Investment-Advisory-System",
description="Multi-agent system for investment advice",
investment_system = MajorityVoting(
name="Investment-Analysis-System",
description="Multi-agent investment analysis with consensus evaluation",
agents=agents,
consensus_agent=consensus_agent,
verbose=True,
consensus_agent=consensus_agent
output_type="dict"
)
# Run the analysis with majority voting
result = majority_voting.run(
task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
correct_answer="" # Optional evaluation metric
# Execute investment analysis
result = investment_system.run(
task="""Analyze the following investment scenario and provide recommendations:
- Budget: $50,000
- Risk tolerance: Moderate
- Time horizon: 5-7 years
- Focus areas: Technology, Healthcare, Renewable Energy
Provide specific ETF/index fund recommendations with allocation percentages."""
)
print("Investment Analysis Results:")
print(result)
```
## Batch Execution
### Example 2: Content Creation with Batch Processing
This example shows how to use batch processing for content creation tasks with multiple writing styles.
```python
from swarms import Agent, MajorityVoting
# Initialize multiple agents with different specialties
agents = [
# Initialize content creation agents with different styles
content_agents = [
Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor focused on market analysis",
system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.",
agent_name="Creative-Writer",
agent_description="Creative content specialist",
system_prompt="You are a creative writer who produces engaging, story-driven content with vivid descriptions.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Risk-Assessment-Agent",
agent_description="Risk analysis and portfolio management expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.",
agent_name="Technical-Writer",
agent_description="Technical content specialist",
system_prompt="You are a technical writer who focuses on clarity, accuracy, and structured information.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="SEO-Optimized-Writer",
agent_description="SEO content specialist",
system_prompt="You are an SEO specialist who creates content optimized for search engines while maintaining quality.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Tech-Investment-Agent",
agent_description="Technology sector investment specialist",
system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.",
agent_name="Conversational-Writer",
agent_description="Conversational content specialist",
system_prompt="You are a conversational writer who creates relatable, engaging content that connects with readers.",
max_loops=1,
model_name="gpt-4o"
)
]
# Create majority voting system without dedicated consensus agent
content_system = MajorityVoting(
name="Content-Creation-System",
description="Multi-style content creation with majority voting",
agents=content_agents,
verbose=True,
output_type="str"
)
# Define multiple content tasks
content_tasks = [
"Write a blog post about the benefits of renewable energy adoption",
"Create social media content for a new fitness app launch",
"Develop a product description for eco-friendly water bottles",
"Write an email newsletter about artificial intelligence trends"
]
# Execute batch processing
batch_results = content_system.batch_run(content_tasks)
consensus_agent = Agent(
agent_name="Consensus-Agent",
agent_description="Consensus agent focused on analyzing investment advice",
system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.",
print("Batch Content Creation Results:")
for i, result in enumerate(batch_results, 1):
print(f"\nTask {i} Result:")
print(result[:500] + "..." if len(str(result)) > 500 else result)
```
### Example 3: Research Analysis with Concurrent Processing
This example demonstrates concurrent processing for research analysis with multiple research perspectives.
```python
from swarms import Agent, MajorityVoting
# Initialize research agents with different methodologies
research_agents = [
Agent(
agent_name="Quantitative-Researcher",
agent_description="Quantitative research specialist",
system_prompt="You are a quantitative researcher who analyzes data, statistics, and numerical evidence.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Qualitative-Researcher",
agent_description="Qualitative research specialist",
system_prompt="You are a qualitative researcher who focuses on patterns, themes, and contextual understanding.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Literature-Review-Specialist",
agent_description="Literature review expert",
system_prompt="You are a literature review specialist who synthesizes existing research and identifies knowledge gaps.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Methodology-Expert",
agent_description="Research methodology specialist",
system_prompt="You are a methodology expert who evaluates research design, validity, and reliability.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Ethics-Reviewer",
agent_description="Research ethics specialist",
system_prompt="You are an ethics reviewer who ensures research practices are responsible and unbiased.",
max_loops=1,
model_name="gpt-4o"
)
]
# Create majority voting system
majority_voting = MajorityVoting(
name="Investment-Advisory-System",
description="Multi-agent system for investment advice",
agents=agents,
verbose=True,
consensus_agent=consensus_agent
# Consensus agent for synthesizing research findings
research_consensus = Agent(
agent_name="Research-Synthesis-Agent",
agent_description="Research synthesis specialist",
system_prompt="You are an expert at synthesizing diverse research perspectives into comprehensive, well-supported conclusions.",
max_loops=1,
model_name="gpt-4o"
)
# Run the analysis with majority voting
result = majority_voting.batch_run(
task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
correct_answer="" # Optional evaluation metric
# Create majority voting system for research
research_system = MajorityVoting(
name="Research-Analysis-System",
description="Concurrent multi-perspective research analysis",
agents=research_agents,
consensus_agent=research_consensus,
verbose=True,
output_type="list"
)
print(result)
# Define research questions for concurrent analysis
research_questions = [
"What are the environmental impacts of electric vehicle adoption?",
"How does remote work affect employee productivity and well-being?",
"What are the economic implications of universal basic income?",
"How can AI be used to improve healthcare outcomes?",
"What are the social effects of social media on mental health?"
]
# Execute concurrent research analysis
concurrent_results = research_system.run_concurrently(research_questions)
print("Concurrent Research Analysis Results:")
print(f"Total questions analyzed: {len(concurrent_results)}")
for i, result in enumerate(concurrent_results, 1):
print(f"\nResearch Question {i}:")
if isinstance(result, list) and len(result) > 0:
print(f"Analysis length: {len(str(result))} characters")
print(f"Sample output: {str(result)[:300]}...")
else:
print(f"Result: {result}")
```

@ -0,0 +1,23 @@
from swarms import Agent
IMAGE_GEN_SYSTEM_PROMPT = (
"You are an advanced image generation agent. Given a textual description, generate a high-quality, photorealistic image that matches the prompt. "
"Return only the generated image."
)
image_gen_agent = Agent(
agent_name="Image-Generation-Agent",
agent_description="Agent specialized in generating high-quality, photorealistic images from textual prompts.",
model_name="gemini/gemini-2.5-flash-image-preview", # Replace with your preferred image generation model if available
dynamic_temperature_enabled=True,
max_loops=1,
dynamic_context_window=True,
retry_interval=1,
)
image_gen_out = image_gen_agent.run(
task=f"{IMAGE_GEN_SYSTEM_PROMPT} \n\n Generate a photorealistic image of a futuristic city skyline at sunset.",
)
print("Image Generation Output:")
print(image_gen_out)

@ -20,6 +20,7 @@ SYSTEM_PROMPT = (
"Return the image only."
)
# Agent for AR annotation
agent = Agent(
agent_name="Tactical-Strategist-Agent",
agent_description="Agent specialized in tactical strategy, scenario analysis, and actionable recommendations for complex situations.",
@ -35,4 +36,5 @@ out = agent.run(
img="hk.jpg",
)
print("AR Annotation Output:")
print(out)

@ -0,0 +1,25 @@
from swarms.structs.multi_agent_exec import (
batched_grid_agent_execution,
)
from swarms_tools import scrape_and_format_sync
from swarms import Agent
agent = Agent(
agent_name="Web Scraper Agent",
model_name="gpt-4o-mini",
tools=[scrape_and_format_sync],
dynamic_context_window=True,
dynamic_temperature_enabled=True,
max_loops=1,
system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full",
)
out = batched_grid_agent_execution(
agents=[agent, agent],
tasks=[
"Scrape swarms.ai website and provide a full report of the company's mission, products, and team. The format type should be full.",
"Scrape langchain.com website and provide a full report of the company's mission, products, and team. The format type should be full.",
],
)
print(out)

@ -0,0 +1,17 @@
from swarms import Agent
from swarms_tools import scrape_and_format_sync
agent = Agent(
agent_name="Web Scraper Agent",
model_name="gpt-4o-mini",
tools=[scrape_and_format_sync],
dynamic_context_window=True,
dynamic_temperature_enabled=True,
max_loops=1,
system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full",
)
out = agent.run(
"Scrape swarms.ai website and provide a full report of the company does. The format type should be full."
)
print(out)

@ -4,11 +4,14 @@ from swarms import Agent
mcp = FastMCP("MCPAgentTool")
@mcp.tool(
name="create_agent",
description="Create an agent with the specified name, system prompt, and model, then run a task.",
)
def create_agent(agent_name: str, system_prompt: str, model_name: str, task: str) -> str:
def create_agent(
agent_name: str, system_prompt: str, model_name: str, task: str
) -> str:
"""
Create an agent with the given parameters and execute the specified task.

@ -0,0 +1,62 @@
from swarms import Agent, ConcurrentWorkflow
def streaming_callback(
agent_name: str, chunk: str, is_complete: bool
):
"""
Print streaming output from each agent as it arrives.
"""
if chunk:
print(f"[{agent_name}] {chunk}", end="", flush=True)
if is_complete:
print(" [DONE]")
def main():
"""
Run a simple concurrent workflow with streaming output.
"""
agents = [
Agent(
agent_name="Financial",
system_prompt="Financial analysis.",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
print_on=False,
),
Agent(
agent_name="Legal",
system_prompt="Legal analysis.",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
print_on=False,
),
]
workflow = ConcurrentWorkflow(
name="Simple-Streaming",
agents=agents,
show_dashboard=False,
output_type="dict-all-except-first",
)
task = "Give a short analysis of the risks and opportunities for a SaaS startup expanding to Europe."
print("Streaming output:\n")
result = workflow.run(
task=task, streaming_callback=streaming_callback
)
print("\n\nFinal results:")
# Convert list of messages to dict with agent names as keys
agent_outputs = {}
for message in result:
if message["role"] != "User": # Skip the user message
agent_outputs[message["role"]] = message["content"]
for agent, output in agent_outputs.items():
print(f"{agent}: {output}")
if __name__ == "__main__":
main()

@ -0,0 +1,79 @@
from swarms import Agent
from swarms_tools import scrape_and_format_sync
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
import time
agent = Agent(
agent_name="Web Scraper Agent",
model_name="groq/moonshotai/kimi-k2-instruct",
system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full",
tools=[scrape_and_format_sync],
dynamic_context_window=True,
dynamic_temperature_enabled=True,
max_loops=1,
streaming_on=True, # Enable streaming mode
print_on=False, # Prevent direct printing (let callback handle it)
)
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
"""
Callback function to handle streaming output from agents.
Args:
agent_name (str): Name of the agent producing the output
chunk (str): Chunk of output text
is_final (bool): Whether this is the final chunk (completion signal)
"""
if is_final:
print(f"\n[{agent_name}] - COMPLETED")
return
if chunk:
# Print timestamp with agent name for each chunk
timestamp = time.strftime("%H:%M:%S", time.localtime())
print(
f"[{timestamp}] [{agent_name}] {chunk}",
end="",
flush=True,
)
else:
# Debug: print when chunk is empty but not final
print(f"[{agent_name}] - EMPTY CHUNK", end="", flush=True)
# Alternative simple callback (uncomment to use instead):
# def simple_streaming_callback(agent_name: str, chunk: str, is_final: bool):
# """Simple callback that just prints agent output without timestamps."""
# if is_final:
# print(f"\n--- {agent_name} FINISHED ---")
# elif chunk:
# print(f"[{agent_name}] {chunk}", end="", flush=True)
# For saving to file (uncomment to use):
# import os
# def file_streaming_callback(agent_name: str, chunk: str, is_final: bool):
# """Callback that saves streaming output to separate files per agent."""
# if not os.path.exists('agent_outputs'):
# os.makedirs('agent_outputs')
#
# filename = f"agent_outputs/{agent_name.replace(' ', '_')}.txt"
#
# with open(filename, 'a', encoding='utf-8') as f:
# if is_final:
# f.write(f"\n--- COMPLETED ---\n")
# elif chunk:
# f.write(chunk)
swarm = ConcurrentWorkflow(
agents=[agent, agent],
name="Web Scraper Swarm",
description="This swarm is used to scrape the web and return the data in a structured format.",
)
swarm.run(
task="Scrape swarms.ai website and provide a full report of the company does. The format type should be full.",
streaming_callback=streaming_callback,
)

@ -0,0 +1,18 @@
from swarms.utils.litellm_wrapper import LiteLLM
# Initialize the LiteLLM wrapper with reasoning support
llm = LiteLLM(
model_name="claude-sonnet-4-20250514", # OpenAI o3 model with reasoning
reasoning_effort="low", # Enable reasoning with high effort
temperature=1,
max_tokens=2000,
stream=False,
thinking_tokens=1024,
)
# Example task that would benefit from reasoning
task = "Solve this step-by-step: A farmer has 17 sheep and all but 9 die. How many sheep does he have left?"
print("=== Running reasoning model ===")
response = llm.run(task)
print(response)

@ -0,0 +1,23 @@
from swarms import Agent
# Initialize the LiteLLM wrapper with reasoning support
agent = Agent(
model_name="claude-sonnet-4-20250514", # OpenAI o3 model with reasoning
reasoning_effort="low", # Enable reasoning with high effort
temperature=1,
max_tokens=2000,
stream=False,
thinking_tokens=1024,
top_p=0.95,
streaming_on=True,
print_on=False,
)
out = agent.run(
task="Solve this step-by-step: A farmer has 17 sheep and all but 9 die. How many sheep does he have left?",
)
for chunk in out:
# Flush
print(chunk, end="", flush=True)

@ -0,0 +1,25 @@
from swarms_tools import crawl_entire_site_firecrawl
from swarms import Agent
agent = Agent(
agent_name="Marketing Copy Improver",
model_name="gpt-4.1",
tools=[crawl_entire_site_firecrawl],
dynamic_context_window=True,
dynamic_temperature_enabled=True,
max_loops=1,
system_prompt=(
"You are a world-class marketing copy improver. "
"Given a website URL, your job is to crawl the entire site, analyze all marketing copy, "
"and rewrite it to maximize clarity, engagement, and conversion. "
"Return the improved marketing copy in a structured, easy-to-read format. "
"Be concise, persuasive, and ensure the tone matches the brand. "
"Highlight key value propositions and calls to action."
),
)
out = agent.run(
"Crawl 2-3 pages of swarms.ai and improve the marketing copy found on those pages. Return the improved copy in a structured format."
)
print(out)

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

@ -39,9 +39,6 @@ from swarms.structs.ma_blocks import (
)
from swarms.structs.majority_voting import (
MajorityVoting,
majority_voting,
most_frequent,
parse_code_completion,
)
from swarms.structs.malt import MALT
from swarms.structs.meme_agent_persona_generator import (
@ -112,9 +109,6 @@ __all__ = [
"Conversation",
"GroupChat",
"MajorityVoting",
"majority_voting",
"most_frequent",
"parse_code_completion",
"AgentRearrange",
"rearrange",
"RoundRobinSwarm",

@ -432,6 +432,10 @@ class Agent:
reasoning_prompt_on: bool = True,
dynamic_context_window: bool = True,
show_tool_execution_output: bool = True,
reasoning_effort: str = None,
drop_params: bool = True,
thinking_tokens: int = None,
reasoning_enabled: bool = True,
*args,
**kwargs,
):
@ -574,6 +578,10 @@ class Agent:
self.dynamic_context_window = dynamic_context_window
self.show_tool_execution_output = show_tool_execution_output
self.mcp_configs = mcp_configs
self.reasoning_effort = reasoning_effort
self.drop_params = drop_params
self.thinking_tokens = thinking_tokens
self.reasoning_enabled = reasoning_enabled
# self.init_handling()
self.setup_config()
@ -718,6 +726,9 @@ class Agent:
"stream": self.streaming_on,
"top_p": self.top_p,
"retries": self.retry_attempts,
"reasoning_effort": self.reasoning_effort,
"thinking_tokens": self.thinking_tokens,
"reasoning_enabled": self.reasoning_enabled,
}
# Initialize tools_list_dictionary, if applicable
@ -974,33 +985,37 @@ class Agent:
)
def print_dashboard(self):
"""
Print a dashboard displaying the agent's current status and configuration.
Uses square brackets instead of emojis for section headers and bullet points.
"""
tools_activated = True if self.tools is not None else False
mcp_activated = True if self.mcp_url is not None else False
formatter.print_panel(
f"""
🤖 Agent {self.agent_name} Dashboard 🚀
[Agent {self.agent_name} Dashboard]
===========================================================
🎯 Agent {self.agent_name} Status: ONLINE & OPERATIONAL
[Agent {self.agent_name} Status]: ONLINE & OPERATIONAL
-----------------------------------------------------------
📋 Agent Identity:
🏷 Name: {self.agent_name}
📝 Description: {self.agent_description}
[Agent Identity]
- [Name]: {self.agent_name}
- [Description]: {self.agent_description}
Technical Specifications:
🤖 Model: {self.model_name}
🔄 Internal Loops: {self.max_loops}
🎯 Max Tokens: {self.max_tokens}
🌡 Dynamic Temperature: {self.dynamic_temperature_enabled}
[Technical Specifications]
- [Model]: {self.model_name}
- [Internal Loops]: {self.max_loops}
- [Max Tokens]: {self.max_tokens}
- [Dynamic Temperature]: {self.dynamic_temperature_enabled}
🔧 System Modules:
🛠 Tools Activated: {tools_activated}
🔗 MCP Activated: {mcp_activated}
[System Modules]
- [Tools Activated]: {tools_activated}
- [MCP Activated]: {mcp_activated}
🚀 Ready for Tasks 🚀
===========================================================
[Ready for Tasks]
""",
title=f"Agent {self.agent_name} Dashboard",

@ -5,6 +5,7 @@ from typing import Callable, List, Optional, Union
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.swarm_id import swarm_id
from swarms.utils.formatter import formatter
from swarms.utils.get_cpu_cores import get_cpu_cores
from swarms.utils.history_output_formatter import (
@ -16,131 +17,14 @@ logger = initialize_logger(log_folder="concurrent_workflow")
class ConcurrentWorkflow(BaseSwarm):
"""
A production-grade concurrent workflow orchestrator that executes multiple agents simultaneously.
ConcurrentWorkflow is designed for high-performance multi-agent orchestration with advanced features
including real-time monitoring, error handling, caching, and flexible output formatting. It's ideal
for scenarios where multiple agents need to process the same task independently and their results
need to be aggregated.
Key Features:
- Concurrent execution using ThreadPoolExecutor for optimal performance
- Real-time dashboard monitoring with status updates
- Comprehensive error handling and recovery
- Flexible output formatting options
- Automatic prompt engineering capabilities
- Conversation history management
- Metadata persistence and auto-saving
- Support for both single and batch processing
- Image input support for multimodal agents
Use Cases:
- Multi-perspective analysis (financial, legal, technical reviews)
- Consensus building and voting systems
- Parallel data processing and analysis
- A/B testing with different agent configurations
- Redundancy and reliability improvements
Args:
name (str, optional): Unique identifier for the workflow instance.
Defaults to "ConcurrentWorkflow".
description (str, optional): Human-readable description of the workflow's purpose.
Defaults to "Execution of multiple agents concurrently".
agents (List[Union[Agent, Callable]], optional): List of Agent instances or callable objects
to execute concurrently. Each agent should implement a `run` method.
Defaults to empty list.
metadata_output_path (str, optional): File path for saving execution metadata and results.
Supports JSON format. Defaults to "agent_metadata.json".
auto_save (bool, optional): Whether to automatically save conversation history and metadata
after each run. Defaults to True.
output_type (str, optional): Format for aggregating agent outputs. Options include:
- "dict-all-except-first": Dictionary with all agent outputs except the first
- "dict": Dictionary with all agent outputs
- "str": Concatenated string of all outputs
- "list": List of individual agent outputs
Defaults to "dict-all-except-first".
max_loops (int, optional): Maximum number of execution loops for each agent.
Defaults to 1.
auto_generate_prompts (bool, optional): Enable automatic prompt engineering for all agents.
When True, agents will enhance their prompts automatically. Defaults to False.
show_dashboard (bool, optional): Enable real-time dashboard display showing agent status,
progress, and outputs. Useful for monitoring and debugging. Defaults to False.
*args: Additional positional arguments passed to the BaseSwarm parent class.
**kwargs: Additional keyword arguments passed to the BaseSwarm parent class.
Raises:
ValueError: If agents list is empty or None.
ValueError: If description is empty or None.
TypeError: If agents list contains non-Agent, non-callable objects.
Attributes:
name (str): The workflow instance name.
description (str): The workflow description.
agents (List[Union[Agent, Callable]]): List of agents to execute.
metadata_output_path (str): Path for metadata output file.
auto_save (bool): Auto-save flag for metadata persistence.
output_type (str): Output aggregation format.
max_loops (int): Maximum execution loops per agent.
auto_generate_prompts (bool): Auto prompt engineering flag.
show_dashboard (bool): Dashboard display flag.
agent_statuses (dict): Real-time status tracking for each agent.
conversation (Conversation): Conversation history manager.
Example:
Basic usage with multiple agents:
>>> from swarms import Agent, ConcurrentWorkflow
>>>
>>> # Create specialized agents
>>> financial_agent = Agent(
... agent_name="Financial-Analyst",
... system_prompt="You are a financial analysis expert...",
... model_name="gpt-4"
... )
>>> legal_agent = Agent(
... agent_name="Legal-Advisor",
... system_prompt="You are a legal expert...",
... model_name="gpt-4"
... )
>>>
>>> # Create workflow
>>> workflow = ConcurrentWorkflow(
... name="Multi-Expert-Analysis",
... agents=[financial_agent, legal_agent],
... show_dashboard=True,
... auto_save=True
... )
>>>
>>> # Execute analysis
>>> task = "Analyze the risks of investing in cryptocurrency"
>>> results = workflow.run(task)
>>> print(f"Analysis complete with {len(results)} perspectives")
Batch processing example:
>>> tasks = [
... "Analyze Q1 financial performance",
... "Review Q2 market trends",
... "Forecast Q3 projections"
... ]
>>> batch_results = workflow.batch_run(tasks)
>>> print(f"Processed {len(batch_results)} quarterly reports")
Note:
- Agents are executed using ThreadPoolExecutor with 95% of available CPU cores
- Each agent runs independently and cannot communicate with others during execution
- The workflow maintains conversation history across all runs for context
- Dashboard mode disables individual agent printing to prevent output conflicts
- Error handling ensures partial results are available even if some agents fail
"""
"""Concurrent workflow for running multiple agents simultaneously."""
def __init__(
self,
id: str = swarm_id(),
name: str = "ConcurrentWorkflow",
description: str = "Execution of multiple agents concurrently",
agents: List[Union[Agent, Callable]] = None,
metadata_output_path: str = "agent_metadata.json",
auto_save: bool = True,
output_type: str = "dict-all-except-first",
max_loops: int = 1,
@ -149,17 +33,6 @@ class ConcurrentWorkflow(BaseSwarm):
*args,
**kwargs,
):
"""
Initialize the ConcurrentWorkflow with configuration parameters.
Performs initialization, validation, and setup of internal components including
conversation management, agent status tracking, and dashboard configuration.
Note:
The constructor automatically performs reliability checks and configures
agents for dashboard mode if enabled. Agent print outputs are disabled
when dashboard mode is active to prevent display conflicts.
"""
super().__init__(
name=name,
description=description,
@ -170,7 +43,9 @@ class ConcurrentWorkflow(BaseSwarm):
self.name = name
self.description = description
self.agents = agents
self.metadata_output_path = metadata_output_path
self.metadata_output_path = (
f"concurrent_workflow_name_{name}_id_{id}.json"
)
self.auto_save = auto_save
self.max_loops = max_loops
self.auto_generate_prompts = auto_generate_prompts
@ -188,58 +63,14 @@ class ConcurrentWorkflow(BaseSwarm):
self.agents = self.fix_agents()
def fix_agents(self):
"""
Configure agents for dashboard mode by disabling individual print outputs.
When dashboard mode is enabled, individual agent print outputs can interfere
with the dashboard display. This method disables print_on for all agents
to ensure clean dashboard rendering.
Returns:
List[Agent]: The modified list of agents with print_on disabled.
Note:
This method only modifies agents when show_dashboard is True.
Agent functionality is not affected, only their output display behavior.
Example:
>>> workflow = ConcurrentWorkflow(show_dashboard=True, agents=[agent1, agent2])
>>> # Agents automatically configured for dashboard mode
>>> all(not agent.print_on for agent in workflow.agents)
True
"""
"""Configure agents for dashboard mode."""
if self.show_dashboard is True:
for agent in self.agents:
agent.print_on = False
return self.agents
def reliability_check(self):
"""
Perform comprehensive validation of workflow configuration and agents.
Validates that the workflow is properly configured with valid agents and
provides warnings for suboptimal configurations. This method is called
automatically during initialization.
Validates:
- Agents list is not None or empty
- At least one agent is provided
- Warns if only one agent is provided (suboptimal for concurrent execution)
Raises:
ValueError: If agents list is None or empty.
Logs:
Warning: If only one agent is provided (concurrent execution not beneficial).
Error: If validation fails with detailed error information.
Example:
>>> workflow = ConcurrentWorkflow(agents=[])
ValueError: ConcurrentWorkflow: No agents provided
>>> workflow = ConcurrentWorkflow(agents=[single_agent])
# Logs warning about single agent usage
"""
"""Validate workflow configuration."""
try:
if self.agents is None:
raise ValueError(
@ -253,7 +84,7 @@ class ConcurrentWorkflow(BaseSwarm):
if len(self.agents) == 1:
logger.warning(
"ConcurrentWorkflow: Only one agent provided. With ConcurrentWorkflow, you should use at least 2+ agents."
"ConcurrentWorkflow: Only one agent provided."
)
except Exception as e:
logger.error(
@ -262,33 +93,7 @@ class ConcurrentWorkflow(BaseSwarm):
raise
def activate_auto_prompt_engineering(self):
"""
Enable automatic prompt engineering for all agents in the workflow.
When activated, each agent will automatically enhance and optimize their
system prompts based on the task context and their previous interactions.
This can improve response quality but may increase execution time.
Side Effects:
- Sets auto_generate_prompt=True for all agents in the workflow
- Affects subsequent agent.run() calls, not retroactively
- May increase latency due to prompt optimization overhead
Note:
This method can be called at any time, but changes only affect
future agent executions. Already running agents are not affected.
Example:
>>> workflow = ConcurrentWorkflow(agents=[agent1, agent2])
>>> workflow.activate_auto_prompt_engineering()
>>> # All agents will now auto-generate optimized prompts
>>> result = workflow.run("Complex analysis task")
>>> # Agents used enhanced prompts for better performance
See Also:
- Agent.auto_generate_prompt: Individual agent prompt engineering
- auto_generate_prompts: Constructor parameter for default behavior
"""
"""Enable automatic prompt engineering."""
if self.auto_generate_prompts is True:
for agent in self.agents:
agent.auto_generate_prompt = True
@ -297,48 +102,8 @@ class ConcurrentWorkflow(BaseSwarm):
self,
title: str = "ConcurrentWorkflow Dashboard",
is_final: bool = False,
) -> None:
"""
Display a real-time dashboard showing the current status of all agents.
Renders a formatted dashboard with agent names, execution status, and
output previews. The dashboard is updated in real-time during workflow
execution to provide visibility into agent progress and results.
Args:
title (str, optional): The dashboard title to display at the top.
Defaults to "🤖 ConcurrentWorkflow Dashboard".
is_final (bool, optional): Whether this is the final dashboard display
after all agents have completed. Changes formatting and styling.
Defaults to False.
Side Effects:
- Prints formatted dashboard to console
- Updates display in real-time during execution
- May clear previous dashboard content for clean updates
Note:
This method is automatically called during workflow execution when
show_dashboard=True. Manual calls are supported for custom monitoring.
Dashboard Status Values:
- "pending": Agent queued but not yet started
- "running": Agent currently executing task
- "completed": Agent finished successfully
- "error": Agent execution failed with error
Example:
>>> workflow = ConcurrentWorkflow(agents=[agent1, agent2], show_dashboard=True)
>>> workflow.display_agent_dashboard("Custom Dashboard Title")
# Displays:
# 🤖 Custom Dashboard Title
# ┌─────────────────┬─────────┬──────────────────┐
# │ Agent Name │ Status │ Output Preview │
# ├─────────────────┼─────────┼──────────────────┤
# │ Financial-Agent │ running │ Analyzing data...│
# │ Legal-Agent │ pending │ │
# └─────────────────┴─────────┴──────────────────┘
"""
):
"""Display real-time dashboard."""
agents_data = [
{
"name": agent.agent_name,
@ -358,69 +123,11 @@ class ConcurrentWorkflow(BaseSwarm):
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""
Execute all agents concurrently with real-time dashboard monitoring.
This method provides the same concurrent execution as _run() but with
enhanced real-time monitoring through a visual dashboard. Agent status
updates are displayed in real-time, showing progress, completion, and
any errors that occur during execution.
Args:
task (str): The task description or prompt to be executed by all agents.
This will be passed to each agent's run() method.
img (Optional[str], optional): Path to a single image file for agents
that support multimodal input. Defaults to None.
imgs (Optional[List[str]], optional): List of image file paths for
agents that support multiple image inputs. Defaults to None.
Returns:
Any: Formatted output based on the configured output_type setting.
The return format depends on the output_type parameter set during
workflow initialization.
Raises:
Exception: Re-raises any exceptions from agent execution after
updating the dashboard with error status.
Side Effects:
- Displays initial dashboard showing all agents as "pending"
- Updates dashboard in real-time as agents start, run, and complete
- Displays final dashboard with all results when execution completes
- Adds all task inputs and agent outputs to conversation history
- Automatically cleans up dashboard display resources
Dashboard Flow:
1. Initial dashboard shows all agents as "pending"
2. As agents start, status updates to "running"
3. As agents complete, status updates to "completed" with output preview
4. Final dashboard shows complete results summary
5. Dashboard resources are cleaned up automatically
Performance:
- Uses 95% of available CPU cores for optimal concurrency
- ThreadPoolExecutor manages thread lifecycle automatically
- Real-time updates have minimal performance overhead
Example:
>>> workflow = ConcurrentWorkflow(
... agents=[financial_agent, legal_agent],
... show_dashboard=True
... )
>>> result = workflow.run_with_dashboard(
... task="Analyze the merger proposal",
... img="company_financials.png"
... )
# Dashboard shows real-time progress:
# Agent-1: pending -> running -> completed
# Agent-2: pending -> running -> completed
>>> print("Analysis complete:", result)
Note:
This method is automatically called when show_dashboard=True.
For headless execution without dashboard, use _run() directly.
"""
"""Execute agents with dashboard monitoring."""
try:
self.conversation.add(role="User", content=task)
@ -431,36 +138,27 @@ class ConcurrentWorkflow(BaseSwarm):
"output": "",
}
# Display initial dashboard if enabled
if self.show_dashboard:
self.display_agent_dashboard()
# Use 95% of available CPU cores for optimal performance
max_workers = int(get_cpu_cores() * 0.95)
# Create a list to store all futures and their results
futures = []
results = []
def run_agent_with_status(agent, task, img, imgs):
try:
# Update status to running
self.agent_statuses[agent.agent_name][
"status"
] = "running"
if self.show_dashboard:
self.display_agent_dashboard()
# Create a streaming callback for this agent with throttling
last_update_time = [
0
] # Use list to allow modification in nested function
update_interval = 0.1 # Update dashboard every 100ms for smooth streaming
last_update_time = [0]
update_interval = 0.1
def streaming_callback(chunk: str):
"""Update dashboard with streaming content"""
if self.show_dashboard:
# Append the chunk to the agent's current output
def agent_streaming_callback(chunk: str):
try:
if self.show_dashboard and chunk:
current_output = self.agent_statuses[
agent.agent_name
]["output"]
@ -468,7 +166,6 @@ class ConcurrentWorkflow(BaseSwarm):
"output"
] = (current_output + chunk)
# Throttle dashboard updates for better performance
current_time = time.time()
if (
current_time - last_update_time[0]
@ -477,15 +174,25 @@ class ConcurrentWorkflow(BaseSwarm):
self.display_agent_dashboard()
last_update_time[0] = current_time
# Run the agent with streaming callback
if (
streaming_callback
and chunk is not None
):
streaming_callback(
agent.agent_name, chunk, False
)
except Exception as callback_error:
logger.warning(
f"Dashboard streaming callback failed for {agent.agent_name}: {str(callback_error)}"
)
output = agent.run(
task=task,
img=img,
imgs=imgs,
streaming_callback=streaming_callback,
streaming_callback=agent_streaming_callback,
)
# Update status to completed
self.agent_statuses[agent.agent_name][
"status"
] = "completed"
@ -495,9 +202,11 @@ class ConcurrentWorkflow(BaseSwarm):
if self.show_dashboard:
self.display_agent_dashboard()
if streaming_callback:
streaming_callback(agent.agent_name, "", True)
return output
except Exception as e:
# Update status to error
self.agent_statuses[agent.agent_name][
"status"
] = "error"
@ -506,24 +215,25 @@ class ConcurrentWorkflow(BaseSwarm):
] = f"Error: {str(e)}"
if self.show_dashboard:
self.display_agent_dashboard()
if streaming_callback:
streaming_callback(
agent.agent_name, f"Error: {str(e)}", True
)
raise
# Run agents concurrently using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all agent tasks
futures = [
executor.submit(
run_agent_with_status, agent, task, img, imgs
)
for agent in self.agents
]
# Wait for all futures to complete
concurrent.futures.wait(futures)
# Process results in order of completion
for future, agent in zip(futures, self.agents):
try:
output = future.result()
@ -536,11 +246,9 @@ class ConcurrentWorkflow(BaseSwarm):
(agent.agent_name, f"Error: {str(e)}")
)
# Add all results to conversation
for agent_name, output in results:
self.conversation.add(role=agent_name, content=output)
# Display final dashboard if enabled
if self.show_dashboard:
self.display_agent_dashboard(
"Final ConcurrentWorkflow Dashboard",
@ -548,11 +256,9 @@ class ConcurrentWorkflow(BaseSwarm):
)
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
conversation=self.conversation, type=self.output_type
)
finally:
# Always clean up the dashboard display
if self.show_dashboard:
formatter.stop_dashboard()
@ -561,316 +267,160 @@ class ConcurrentWorkflow(BaseSwarm):
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""
Execute all agents concurrently without dashboard monitoring (headless mode).
This is the core execution method that runs all agents simultaneously using
ThreadPoolExecutor for optimal performance. Results are collected and
formatted according to the configured output_type.
Args:
task (str): The task description or prompt to be executed by all agents.
Each agent receives the same task input for independent processing.
img (Optional[str], optional): Path to a single image file for multimodal
agents. The same image is provided to all agents. Defaults to None.
imgs (Optional[List[str]], optional): List of image file paths for agents
that support multiple image inputs. All agents receive the same image list.
Defaults to None.
Returns:
Any: Formatted output according to the output_type configuration:
- "dict-all-except-first": Dict with all agent outputs except first
- "dict": Dict with all agent outputs keyed by agent name
- "str": Concatenated string of all agent outputs
- "list": List of individual agent outputs in completion order
Side Effects:
- Adds the user task to conversation history
- Adds each agent's output to conversation history upon completion
- No visual output or dashboard updates (headless execution)
Performance Characteristics:
- Uses ThreadPoolExecutor with 95% of available CPU cores
- Agents execute truly concurrently, not sequentially
- Results are collected in completion order, not submission order
- Memory efficient for large numbers of agents
Error Handling:
- Individual agent failures don't stop other agents
- Failed agents have their exceptions logged but execution continues
- Partial results are still returned for successful agents
Thread Safety:
- Conversation object handles concurrent access safely
- Agent status is not tracked in this method (dashboard-free)
- Each agent runs in isolation without shared state
Example:
>>> workflow = ConcurrentWorkflow(
... agents=[agent1, agent2, agent3],
... output_type="dict"
... )
>>> result = workflow._run(
... task="Analyze market trends for Q4",
... img="market_chart.png"
... )
>>> print(f"Received {len(result)} agent analyses")
>>> # Result format: {"agent1": "analysis1", "agent2": "analysis2", ...}
Note:
This method is called automatically by run() when show_dashboard=False.
For monitoring and real-time updates, use run_with_dashboard() instead.
"""
"""Execute agents concurrently without dashboard."""
self.conversation.add(role="User", content=task)
# Use 95% of available CPU cores for optimal performance
max_workers = int(get_cpu_cores() * 0.95)
# Run agents concurrently using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all agent tasks and store with their index
future_to_agent = {
executor.submit(
agent.run, task=task, img=img, imgs=imgs
self._run_agent_with_streaming,
agent,
task,
img,
imgs,
streaming_callback,
): agent
for agent in self.agents
}
# Collect results and add to conversation in completion order
for future in concurrent.futures.as_completed(
future_to_agent
):
agent = future_to_agent[future]
output = future.result()
self.conversation.add(role=agent.name, content=output)
self.conversation.add(
role=agent.agent_name, content=output
)
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
conversation=self.conversation, type=self.output_type
)
def _run_agent_with_streaming(
self,
agent: Union[Agent, Callable],
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""Run single agent with streaming support."""
if streaming_callback is None:
return agent.run(task=task, img=img, imgs=imgs)
def agent_streaming_callback(chunk: str):
try:
# Safely call the streaming callback
if streaming_callback and chunk is not None:
streaming_callback(agent.agent_name, chunk, False)
except Exception as callback_error:
logger.warning(
f"Streaming callback failed for {agent.agent_name}: {str(callback_error)}"
)
try:
output = agent.run(
task=task,
img=img,
imgs=imgs,
streaming_callback=agent_streaming_callback,
)
# Ensure completion callback is called even if there were issues
try:
streaming_callback(agent.agent_name, "", True)
except Exception as callback_error:
logger.warning(
f"Completion callback failed for {agent.agent_name}: {str(callback_error)}"
)
return output
except Exception as e:
error_msg = f"Agent {agent.agent_name} failed: {str(e)}"
logger.error(error_msg)
# Try to send error through callback
try:
streaming_callback(
agent.agent_name, f"Error: {str(e)}", True
)
except Exception as callback_error:
logger.warning(
f"Error callback failed for {agent.agent_name}: {str(callback_error)}"
)
raise
def cleanup(self):
"""Clean up resources and connections."""
try:
# Reset agent statuses
for agent in self.agents:
if hasattr(agent, "cleanup"):
try:
agent.cleanup()
except Exception as e:
logger.warning(
f"Failed to cleanup agent {agent.agent_name}: {str(e)}"
)
# Clear conversation if needed
if hasattr(self, "conversation"):
# Keep the conversation for result formatting but reset for next run
pass
except Exception as e:
logger.error(f"Cleanup failed: {str(e)}")
def run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""
Execute all agents concurrently on the given task with optional dashboard monitoring.
This is the main entry point for workflow execution. Automatically selects
between dashboard-enabled execution (run_with_dashboard) or headless execution
(_run) based on the show_dashboard configuration.
Args:
task (str): The task description, prompt, or instruction to be executed
by all agents concurrently. Each agent processes the same task
independently.
img (Optional[str], optional): Path to a single image file for agents
that support multimodal (text + image) input. Defaults to None.
imgs (Optional[List[str]], optional): List of image file paths for
agents that support multiple image inputs. Defaults to None.
Returns:
Any: Aggregated results from all agents formatted according to output_type:
- "dict-all-except-first": Dictionary excluding first agent's output
- "dict": Complete dictionary with all agent outputs
- "str": Concatenated string of all outputs
- "list": List of individual agent outputs
Execution Modes:
- Dashboard Mode (show_dashboard=True): Provides real-time visual monitoring
with status updates, progress tracking, and error visibility
- Headless Mode (show_dashboard=False): Silent execution with no visual output,
optimized for performance and automation scenarios
Concurrent Execution:
- All agents run simultaneously using ThreadPoolExecutor
- Utilizes 95% of available CPU cores for optimal performance
- Thread-safe conversation history management
- Independent agent execution without inter-agent communication
Error Resilience:
- Individual agent failures don't halt the entire workflow
- Partial results are returned for successful agents
- Comprehensive error logging for debugging
- Graceful degradation under failure conditions
Example:
Basic concurrent execution:
>>> workflow = ConcurrentWorkflow(agents=[analyst, reviewer, summarizer])
>>> result = workflow.run("Evaluate the new product proposal")
>>> print(f"Received insights from {len(workflow.agents)} experts")
With dashboard monitoring:
>>> workflow = ConcurrentWorkflow(
... agents=[financial_agent, legal_agent],
... show_dashboard=True
... )
>>> result = workflow.run("Review merger agreement")
# Real-time dashboard shows progress and completion status
Multimodal analysis:
>>> result = workflow.run(
... task="Analyze this chart and provide insights",
... img="quarterly_results.png"
... )
Performance Tips:
- Use 2+ agents to benefit from concurrent execution
- Dashboard mode adds minimal overhead for monitoring
- Larger agent counts scale well with available CPU cores
- Consider batch_run() for multiple related tasks
See Also:
- batch_run(): For processing multiple tasks sequentially
- run_with_dashboard(): For direct dashboard execution
- _run(): For direct headless execution
"""
"""Execute all agents concurrently."""
try:
if self.show_dashboard:
return self.run_with_dashboard(task, img, imgs)
result = self.run_with_dashboard(
task, img, imgs, streaming_callback
)
else:
return self._run(task, img, imgs)
result = self._run(
task, img, imgs, streaming_callback
)
return result
finally:
# Always cleanup resources
self.cleanup()
def batch_run(
self,
tasks: List[str],
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""
Execute the workflow on multiple tasks sequentially with concurrent agent processing.
This method processes a list of tasks one by one, where each task is executed
by all agents concurrently. This is ideal for batch processing scenarios where
you have multiple related tasks that need the same multi-agent analysis.
Args:
tasks (List[str]): List of task descriptions, prompts, or instructions.
Each task will be processed by all agents concurrently before
moving to the next task. Tasks are processed sequentially.
imgs (Optional[List[str]], optional): List of image file paths corresponding
to each task. If provided, imgs[i] will be used for tasks[i].
If fewer images than tasks are provided, remaining tasks will
execute without images. Defaults to None.
Returns:
List[Any]: List of results, one for each task. Each result is formatted
according to the workflow's output_type configuration. The length
of the returned list equals the length of the input tasks list.
Processing Flow:
1. Tasks are processed sequentially (not concurrently with each other)
2. For each task, all agents execute concurrently
3. Results are collected and formatted for each task
4. Conversation history accumulates across all tasks
5. Final result list contains aggregated outputs for each task
Image Handling:
- If imgs is None: All tasks execute without images
- If imgs has fewer items than tasks: Extra tasks execute without images
- If imgs has more items than tasks: Extra images are ignored
- Each task gets at most one corresponding image
Dashboard Behavior:
- If show_dashboard=True, dashboard resets and updates for each task
- Progress is shown separately for each task's agent execution
- Final dashboard shows results from the last task only
Memory and Performance:
- Conversation history grows with each task (cumulative)
- Memory usage scales with number of tasks and agent outputs
- CPU utilization is optimal during each task's concurrent execution
- Consider clearing conversation history for very large batch jobs
Example:
Financial analysis across quarters:
>>> workflow = ConcurrentWorkflow(agents=[analyst1, analyst2, analyst3])
>>> quarterly_tasks = [
... "Analyze Q1 financial performance and market position",
... "Analyze Q2 financial performance and market position",
... "Analyze Q3 financial performance and market position",
... "Analyze Q4 financial performance and market position"
... ]
>>> results = workflow.batch_run(quarterly_tasks)
>>> print(f"Completed {len(results)} quarterly analyses")
>>> # Each result contains insights from all 3 analysts for that quarter
With corresponding images:
>>> tasks = ["Analyze chart trends", "Review performance metrics"]
>>> charts = ["q1_chart.png", "q2_chart.png"]
>>> results = workflow.batch_run(tasks, imgs=charts)
>>> # Task 0 uses q1_chart.png, Task 1 uses q2_chart.png
Batch processing with dashboard:
>>> workflow = ConcurrentWorkflow(
... agents=[agent1, agent2],
... show_dashboard=True
... )
>>> results = workflow.batch_run([
... "Process document batch 1",
... "Process document batch 2",
... "Process document batch 3"
... ])
# Dashboard shows progress for each batch separately
Use Cases:
- Multi-period financial analysis (quarterly/yearly reports)
- Batch document processing with multiple expert reviews
- A/B testing across different scenarios or datasets
- Systematic evaluation of multiple proposals or options
- Comparative analysis across time periods or categories
Note:
Tasks are intentionally processed sequentially to maintain result
order and prevent resource contention. For truly concurrent task
processing, create separate workflow instances.
See Also:
- run(): For single task execution
- ConcurrentWorkflow: For configuration options
"""
"""Execute workflow on multiple tasks sequentially."""
results = []
for idx, task in enumerate(tasks):
img = None
if imgs is not None:
# Use the img at the same index if available, else None
if idx < len(imgs):
if imgs is not None and idx < len(imgs):
img = imgs[idx]
results.append(self.run(task=task, img=img))
results.append(
self.run(
task=task,
img=img,
streaming_callback=streaming_callback,
)
)
return results
# if __name__ == "__main__":
# # Assuming you've already initialized some agents outside of this class
# agents = [
# Agent(
# agent_name=f"Financial-Analysis-Agent-{i}",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# model_name="gpt-4o",
# max_loops=1,
# )
# for i in range(3) # Adjust number of agents as needed
# ]
# # Initialize the workflow with the list of agents
# workflow = ConcurrentWorkflow(
# agents=agents,
# metadata_output_path="agent_metadata_4.json",
# return_str_on=True,
# )
# # Define the task for all agents
# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?"
# # Run the workflow and save metadata
# metadata = workflow.run(task)
# print(metadata)

@ -51,6 +51,7 @@ from swarms.prompts.reasoning_prompt import INTERNAL_MONOLGUE_PROMPT
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import list_all_agents
from swarms.structs.omni_agent_types import AgentListType
from swarms.tools.base_tool import BaseTool
from swarms.utils.history_output_formatter import (
history_output_formatter,
@ -672,7 +673,7 @@ class HierarchicalSwarm:
name: str = "HierarchicalAgentSwarm",
description: str = "Distributed task swarm",
director: Optional[Union[Agent, Callable, Any]] = None,
agents: List[Union[Agent, Callable, Any]] = None,
agents: AgentListType = None,
max_loops: int = 1,
output_type: OutputType = "dict-all-except-first",
feedback_director_model_name: str = "gpt-4o-mini",
@ -822,7 +823,7 @@ class HierarchicalSwarm:
# Initialize logger only if verbose is enabled
if self.verbose:
logger.info(
f"🚀 Initializing HierarchicalSwarm: {self.name}"
f"[INIT] Initializing HierarchicalSwarm: {self.name}"
)
self.conversation = Conversation(time_enabled=False)
@ -847,7 +848,7 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
f" HierarchicalSwarm: {self.name} initialized successfully."
f"[SUCCESS] HierarchicalSwarm: {self.name} initialized successfully."
)
if self.multi_agent_prompt_improvements:
@ -867,7 +868,7 @@ class HierarchicalSwarm:
"""
try:
if self.verbose:
logger.info("📝 Adding agent context to director")
logger.info("[INFO] Adding agent context to director")
list_all_agents(
agents=self.agents,
@ -878,15 +879,15 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
" Agent context added to director successfully"
"[SUCCESS] Agent context added to director successfully"
)
except Exception as e:
error_msg = (
f" Failed to add context to director: {str(e)}"
f"[ERROR] Failed to add context to director: {str(e)}"
)
logger.error(
f"{error_msg}\n🔍 Traceback: {traceback.format_exc()}"
f"{error_msg}\n[TRACE] Traceback: {traceback.format_exc()}"
)
def setup_director(self):
@ -904,12 +905,12 @@ class HierarchicalSwarm:
"""
try:
if self.verbose:
logger.info("🎯 Setting up director agent")
logger.info("[SETUP] Setting up director agent")
schema = BaseTool().base_model_to_dict(SwarmSpec)
if self.verbose:
logger.debug(f"📋 Director schema: {schema}")
logger.debug(f"[SCHEMA] Director schema: {schema}")
return Agent(
agent_name=self.director_name,
@ -923,7 +924,7 @@ class HierarchicalSwarm:
)
except Exception as e:
error_msg = f" Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def reliability_checks(self):
@ -963,7 +964,7 @@ class HierarchicalSwarm:
)
except Exception as e:
error_msg = f" Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def agents_no_print(self):
@ -995,7 +996,9 @@ class HierarchicalSwarm:
"""
try:
if self.verbose:
logger.info(f"🎯 Running director with task: {task}")
logger.info(
f"[RUN] Running director with task: {task}"
)
if self.planning_director_agent is not None:
plan = self.planning_director_agent.run(
@ -1022,15 +1025,17 @@ class HierarchicalSwarm:
)
if self.verbose:
logger.success("✅ Director execution completed")
logger.success(
"[SUCCESS] Director execution completed"
)
logger.debug(
f"📋 Director output type: {type(function_call)}"
f"[OUTPUT] Director output type: {type(function_call)}"
)
return function_call
except Exception as e:
error_msg = f" Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
raise e
@ -1059,7 +1064,7 @@ class HierarchicalSwarm:
try:
if self.verbose:
logger.info(
f"👣 Executing single step for task: {task}"
f"[STEP] Executing single step for task: {task}"
)
# Update dashboard for director execution
@ -1073,7 +1078,7 @@ class HierarchicalSwarm:
if self.verbose:
logger.info(
f"📋 Parsed plan and {len(orders)} orders"
f"[PARSE] Parsed plan and {len(orders)} orders"
)
# Update dashboard with plan and orders information
@ -1094,7 +1099,7 @@ class HierarchicalSwarm:
outputs = self.execute_orders(orders)
if self.verbose:
logger.info(f" Executed {len(outputs)} orders")
logger.info(f"[EXEC] Executed {len(outputs)} orders")
if self.director_feedback_on is True:
feedback = self.feedback_director(outputs)
@ -1102,12 +1107,14 @@ class HierarchicalSwarm:
feedback = outputs
if self.verbose:
logger.success("✅ Step completed successfully")
logger.success(
"[SUCCESS] Step completed successfully"
)
return feedback
except Exception as e:
error_msg = f" Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def run(
@ -1146,7 +1153,6 @@ class HierarchicalSwarm:
if task is None and self.interactive:
task = self._get_interactive_task()
current_loop = 0
last_output = None
@ -1157,16 +1163,16 @@ class HierarchicalSwarm:
if self.verbose:
logger.info(
f"🚀 Starting hierarchical swarm run: {self.name}"
f"[START] Starting hierarchical swarm run: {self.name}"
)
logger.info(
f"📊 Configuration - Max loops: {self.max_loops}"
f"[CONFIG] Configuration - Max loops: {self.max_loops}"
)
while current_loop < self.max_loops:
if self.verbose:
logger.info(
f"🔄 Loop {current_loop + 1}/{self.max_loops} - Processing task"
f"[LOOP] Loop {current_loop + 1}/{self.max_loops} - Processing task"
)
# Update dashboard loop counter
@ -1196,11 +1202,11 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
f" Loop {current_loop + 1} completed successfully"
f"[SUCCESS] Loop {current_loop + 1} completed successfully"
)
except Exception as e:
error_msg = f" Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
current_loop += 1
@ -1218,10 +1224,10 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
f"🎉 Hierarchical swarm run completed: {self.name}"
f"[COMPLETE] Hierarchical swarm run completed: {self.name}"
)
logger.info(
f"📊 Total loops executed: {current_loop}"
f"[STATS] Total loops executed: {current_loop}"
)
return history_output_formatter(
@ -1234,7 +1240,7 @@ class HierarchicalSwarm:
self.dashboard.update_director_status("ERROR")
self.dashboard.stop()
error_msg = f" Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def _get_interactive_task(self) -> str:
@ -1275,7 +1281,7 @@ class HierarchicalSwarm:
"""
try:
if self.verbose:
logger.info("📝 Generating director feedback")
logger.info("[FEEDBACK] Generating director feedback")
task = f"History: {self.conversation.get_str()} \n\n"
@ -1302,13 +1308,13 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
" Director feedback generated successfully"
"[SUCCESS] Director feedback generated successfully"
)
return output
except Exception as e:
error_msg = f" Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def call_single_agent(
@ -1336,7 +1342,7 @@ class HierarchicalSwarm:
"""
try:
if self.verbose:
logger.info(f"📞 Calling agent: {agent_name}")
logger.info(f"[CALL] Calling agent: {agent_name}")
# Find agent by name
agent = None
@ -1373,7 +1379,7 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
f" Agent {agent_name} completed task successfully"
f"[SUCCESS] Agent {agent_name} completed task successfully"
)
return output
@ -1385,7 +1391,7 @@ class HierarchicalSwarm:
agent_name, "ERROR", task, f"Error: {str(e)}"
)
error_msg = f" Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def parse_orders(self, output):
@ -1409,8 +1415,8 @@ class HierarchicalSwarm:
"""
try:
if self.verbose:
logger.info("📋 Parsing director orders")
logger.debug(f"📊 Output type: {type(output)}")
logger.info("[PARSE] Parsing director orders")
logger.debug(f"[TYPE] Output type: {type(output)}")
import json
@ -1454,7 +1460,7 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
f" Successfully parsed plan and {len(orders)} orders"
f"[SUCCESS] Successfully parsed plan and {len(orders)} orders"
)
return plan, orders
@ -1463,7 +1469,7 @@ class HierarchicalSwarm:
) as json_err:
if self.verbose:
logger.warning(
f"⚠️ JSON decode error: {json_err}"
f"[WARN] JSON decode error: {json_err}"
)
pass
# Check if it's a direct function call format
@ -1488,7 +1494,7 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
f" Successfully parsed plan and {len(orders)} orders"
f"[SUCCESS] Successfully parsed plan and {len(orders)} orders"
)
return plan, orders
@ -1497,7 +1503,7 @@ class HierarchicalSwarm:
) as json_err:
if self.verbose:
logger.warning(
f"⚠️ JSON decode error: {json_err}"
f"[WARN] JSON decode error: {json_err}"
)
pass
# If no function call found, raise error
@ -1515,7 +1521,7 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
f" Successfully parsed plan and {len(orders)} orders"
f"[SUCCESS] Successfully parsed plan and {len(orders)} orders"
)
return plan, orders
@ -1529,7 +1535,7 @@ class HierarchicalSwarm:
)
except Exception as e:
error_msg = f"❌ Failed to parse orders: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to parse orders: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
raise e
@ -1552,13 +1558,13 @@ class HierarchicalSwarm:
"""
try:
if self.verbose:
logger.info(f" Executing {len(orders)} orders")
logger.info(f"[EXEC] Executing {len(orders)} orders")
outputs = []
for i, order in enumerate(orders):
if self.verbose:
logger.info(
f"📋 Executing order {i+1}/{len(orders)}: {order.agent_name}"
f"[ORDER] Executing order {i+1}/{len(orders)}: {order.agent_name}"
)
# Update dashboard for agent execution
@ -1590,13 +1596,13 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
f" All {len(orders)} orders executed successfully"
f"[SUCCESS] All {len(orders)} orders executed successfully"
)
return outputs
except Exception as e:
error_msg = f" Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def batched_run(
@ -1624,10 +1630,10 @@ class HierarchicalSwarm:
try:
if self.verbose:
logger.info(
f"🚀 Starting batched hierarchical swarm run: {self.name}"
f"[START] Starting batched hierarchical swarm run: {self.name}"
)
logger.info(
f"📊 Configuration - Max loops: {self.max_loops}"
f"[CONFIG] Configuration - Max loops: {self.max_loops}"
)
# Initialize a list to store the results
@ -1640,18 +1646,18 @@ class HierarchicalSwarm:
if self.verbose:
logger.success(
f"🎉 Batched hierarchical swarm run completed: {self.name}"
f"[COMPLETE] Batched hierarchical swarm run completed: {self.name}"
)
logger.info(
f"[STATS] Total tasks processed: {len(tasks)}"
)
logger.info(f"📊 Total tasks processed: {len(tasks)}")
return results
except Exception as e:
error_msg = (
f"❌ Batched hierarchical swarm run failed: {str(e)}"
)
error_msg = f"[ERROR] Batched hierarchical swarm run failed: {str(e)}"
if self.verbose:
logger.error(error_msg)
logger.error(
f"🔍 Traceback: {traceback.format_exc()}"
f"[TRACE] Traceback: {traceback.format_exc()}"
)

@ -1,149 +1,27 @@
import asyncio
import concurrent.futures
import os
import re
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, List, Optional
from typing import Any, List, Optional
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import run_agents_concurrently
from swarms.utils.output_types import OutputType
from swarms.structs.swarm_id import swarm_id
from swarms.utils.formatter import formatter
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
logger = initialize_logger(log_folder="majority_voting")
def extract_last_python_code_block(text):
"""
Extracts the last Python code block from the given text.
Args:
text (str): The text to search for Python code blocks.
Returns:
str or None: The last Python code block found in the text, or None if no code block is found.
"""
# The regular expression pattern for Python code blocks
pattern = r"```[pP]ython(.*?)```"
# Find all matches in the text
matches = re.findall(pattern, text, re.DOTALL)
# If there are matches, return the last one
if matches:
return matches[-1].strip()
else:
return None
def parse_code_completion(agent_response, question):
"""
Parses the code completion response from the agent and extracts the last Python code block.
Args:
agent_response (str): The response from the agent.
question (str): The original question.
Returns:
tuple: A tuple containing the parsed Python code and a boolean indicating success.
"""
python_code = extract_last_python_code_block(agent_response)
if python_code is None:
if agent_response.count("impl]") == 0:
python_code = agent_response
else:
python_code_lines = agent_response.split("\n")
python_code = ""
in_func = False
for line in python_code_lines:
if in_func:
python_code += line + "\n"
if "impl]" in line:
in_func = True
if python_code.count("def") == 0:
python_code = question + python_code
return python_code, True
def most_frequent(
clist: list,
cmp_func: callable = None,
):
"""
Finds the most frequent element in a list based on a comparison function.
Args:
clist (list): The list of elements to search.
cmp_func (function, optional): The comparison function used to determine the frequency of elements.
If not provided, the default comparison function is used.
Returns:
tuple: A tuple containing the most frequent element and its frequency.
"""
counter = 0
num = clist[0]
for i in clist:
current_frequency = sum(cmp_func(i, item) for item in clist)
if current_frequency > counter:
counter = current_frequency
num = i
return num, counter
def majority_voting(answers: List[str]):
"""
Performs majority voting on a list of answers and returns the most common answer.
Args:
answers (list): A list of answers.
Returns:
The most common answer in the list.
"""
counter = Counter(answers)
if counter:
answer = counter.most_common(1)[0][0]
else:
answer = "I don't know"
return answer
class MajorityVoting:
"""
Class representing a majority voting system for agents.
Args:
agents (list): A list of agents to be used in the majority voting system.
output_parser (function, optional): A function used to parse the output of the agents.
If not provided, the default majority voting function is used.
autosave (bool, optional): A boolean indicating whether to autosave the conversation to a file.
verbose (bool, optional): A boolean indicating whether to enable verbose logging.
Examples:
>>> from swarms.structs.agent import Agent
>>> from swarms.structs.majority_voting import MajorityVoting
>>> agents = [
... Agent("GPT-3"),
... Agent("Codex"),
... Agent("Tabnine"),
... ]
>>> majority_voting = MajorityVoting(agents)
>>> majority_voting.run("What is the capital of France?")
'Paris'
"""
def __init__(
self,
id: str = swarm_id(),
name: str = "MajorityVoting",
description: str = "A majority voting system for agents",
agents: List[Agent] = [],
output_parser: Optional[Callable] = majority_voting,
agents: List[Agent] = None,
consensus_agent: Optional[Agent] = None,
autosave: bool = False,
verbose: bool = False,
@ -152,10 +30,10 @@ class MajorityVoting:
*args,
**kwargs,
):
self.id = id
self.name = name
self.description = description
self.agents = agents
self.output_parser = output_parser
self.consensus_agent = consensus_agent
self.autosave = autosave
self.verbose = verbose
@ -179,6 +57,10 @@ class MajorityVoting:
title="Majority Voting",
)
if self.consensus_agent is None:
# if no consensus agent is provided, use the last agent
self.consensus_agent = self.agents[-1]
def run(self, task: str, *args, **kwargs) -> List[Any]:
"""
Runs the majority voting system and returns the majority vote.
@ -290,21 +172,3 @@ class MajorityVoting:
future.result()
for future in concurrent.futures.as_completed(futures)
]
async def run_async(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
"""
Runs the majority voting system concurrently using asyncio.
Args:
tasks (List[str]): List of tasks to be performed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
List[Any]: List of majority votes for each task.
"""
return await asyncio.gather(
*[self.run(task, *args, **kwargs) for task in tasks]
)

@ -144,7 +144,6 @@ def run_agents_concurrently_multiprocess(
def batched_grid_agent_execution(
agents: List[AgentType],
tasks: List[str],
max_workers: int = None,
) -> List[Any]:
"""
Run multiple agents with different tasks concurrently.
@ -158,9 +157,6 @@ def batched_grid_agent_execution(
"The number of agents must match the number of tasks."
)
if max_workers is None:
max_workers = os.cpu_count()
results = []
for agent, task in zip(agents, tasks):

@ -1,20 +1,9 @@
"""
Todo:
- Add multi-agent selection for a task and then run them automatically
- Add shared memory for large instances of agents
"""
import os
from typing import List, Optional
from loguru import logger
from pydantic import BaseModel, Field
from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.output_types import OutputType
from swarms.utils.any_to_str import any_to_str
@ -22,7 +11,7 @@ from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.formatter import formatter
from typing import Callable, Union
from swarms.structs.omni_agent_types import AgentListType
class AgentResponse(BaseModel):
@ -60,7 +49,7 @@ class MultiAgentRouter:
self,
name: str = "swarm-router",
description: str = "Routes tasks to specialized agents based on their capabilities",
agents: List[Union[Agent, Callable]] = [],
agents: AgentListType = None,
model: str = "gpt-4o-mini",
temperature: float = 0.1,
shared_memory_system: callable = None,
@ -86,7 +75,6 @@ class MultiAgentRouter:
self.model = model
self.temperature = temperature
self.if_print = if_print
# Initialize Agents
self.agents = {agent.name: agent for agent in agents}
self.conversation = Conversation()

@ -1,4 +1,3 @@
import threading
import time
import re
from typing import Any, Callable, Dict, List, Optional
@ -17,13 +16,6 @@ from rich.spinner import Spinner
from rich.markdown import Markdown
# Global lock to ensure only a single Rich Live context is active at any moment.
# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same
# console raise runtime errors. Using a module-level lock serialises access and
# prevents crashes when multiple agents stream simultaneously in different
# threads (e.g., in ConcurrentWorkflow).
live_render_lock = threading.Lock()
# Global Live display for the dashboard
dashboard_live = None
@ -490,10 +482,6 @@ class Formatter:
complete_response = ""
chunks_collected = []
# Acquire the lock so that only one Live panel is active at a time.
# Other threads will wait here until the current streaming completes,
# avoiding Rich.Live concurrency errors.
with live_render_lock:
# TRUE streaming with Rich's automatic text wrapping
with Live(
create_streaming_panel(streaming_text),
@ -509,9 +497,7 @@ class Formatter:
):
# Add ONLY the new chunk to the Text object with random color style
chunk = part.choices[0].delta.content
streaming_text.append(
chunk, style=text_style
)
streaming_text.append(chunk, style=text_style)
complete_response += chunk
# Collect chunks if requested
@ -611,7 +597,6 @@ class Formatter:
title (str): The title of the dashboard.
is_final (bool): Whether this is the final update of the dashboard.
"""
with live_render_lock:
if self._dashboard_live is None:
# Create new Live display if none exists
self._dashboard_live = Live(

@ -7,7 +7,7 @@ from typing import List, Optional
import litellm
import requests
from litellm import acompletion, completion, supports_vision
from litellm import completion, supports_vision
from loguru import logger
from pydantic import BaseModel
@ -231,6 +231,10 @@ class LiteLLM:
base_url: str = None,
api_key: str = None,
api_version: str = None,
reasoning_effort: str = None,
drop_params: bool = True,
thinking_tokens: int = None,
reasoning_enabled: bool = True,
*args,
**kwargs,
):
@ -284,6 +288,9 @@ class LiteLLM:
self.base_url = base_url
self.api_key = api_key
self.api_version = api_version
self.reasoning_effort = reasoning_effort
self.thinking_tokens = thinking_tokens
self.reasoning_enabled = reasoning_enabled
self.modalities = []
self.messages = [] # Initialize messages list
@ -296,7 +303,7 @@ class LiteLLM:
retries # Add retries for better reliability
)
litellm.drop_params = True
litellm.drop_params = drop_params
# Add system prompt if present
if self.system_prompt is not None:
@ -308,6 +315,63 @@ class LiteLLM:
self.init_args = args
self.init_kwargs = kwargs
self.reasoning_check()
def reasoning_check(self):
"""
Check if reasoning is enabled and supported by the model, and adjust temperature accordingly.
If reasoning is enabled and the model supports reasoning, set temperature to 1 for optimal reasoning.
Also logs information or warnings based on the model's reasoning support and configuration.
"""
if (
self.reasoning_enabled is True
and litellm.supports_reasoning(model=self.model_name)
is True
):
logger.info(
f"Model {self.model_name} supports reasoning and reasoning enabled is set to {self.reasoning_enabled}. Temperature will be set to 1 for better reasoning as some models may not work with low temperature."
)
self.temperature = 1
else:
logger.warning(
f"Model {self.model_name} does not support reasoning and reasoning enabled is set to {self.reasoning_enabled}. Temperature will not be set to 1."
)
if (
self.reasoning_enabled is True
and litellm.supports_reasoning(model=self.model_name)
is False
):
logger.warning(
f"Model {self.model_name} may or may not support reasoning and reasoning enabled is set to {self.reasoning_enabled}"
)
if (
self.reasoning_enabled is True
and self.check_if_model_name_uses_anthropic(
model_name=self.model_name
)
is True
):
if self.thinking_tokens is None:
logger.info(
f"Model {self.model_name} is an Anthropic model and reasoning enabled is set to {self.reasoning_enabled}. Thinking tokens is mandatory for Anthropic models."
)
self.thinking_tokens = self.max_tokens / 4
if (
self.reasoning_enabled is True
and self.check_if_model_name_uses_anthropic(
model_name=self.model_name
)
is True
):
logger.info(
"top_p must be greater than 0.95 for Anthropic models with reasoning enabled"
)
self.top_p = 0.95
def _process_additional_args(
self, completion_params: dict, runtime_args: tuple
):
@ -357,6 +421,61 @@ class LiteLLM:
out = out.model_dump()
return out
def output_for_reasoning(self, response: any):
"""
Handle output for reasoning models, formatting reasoning content and thinking blocks.
Args:
response: The response object from the LLM API call
Returns:
str: Formatted string containing reasoning content, thinking blocks, and main content
"""
output_parts = []
# Check if reasoning content is available
if (
hasattr(response.choices[0].message, "reasoning_content")
and response.choices[0].message.reasoning_content
):
output_parts.append(
f"Reasoning Content:\n{response.choices[0].message.reasoning_content}\n"
)
# Check if thinking blocks are available (Anthropic models)
if (
hasattr(response.choices[0].message, "thinking_blocks")
and response.choices[0].message.thinking_blocks
):
output_parts.append("Thinking Blocks:")
for i, block in enumerate(
response.choices[0].message.thinking_blocks, 1
):
block_type = block.get("type", "")
thinking = block.get("thinking", "")
output_parts.append(
f"Block {i} (Type: {block_type}):"
)
output_parts.append(f" Thinking: {thinking}")
output_parts.append("")
# Include tools if available
if (
hasattr(response.choices[0].message, "tool_calls")
and response.choices[0].message.tool_calls
):
output_parts.append(
f"Tools:\n{self.output_for_tools(response)}\n"
)
# Always include the main content
content = response.choices[0].message.content
if content:
output_parts.append(f"Content:\n{content}")
# Join all parts into a single string
return "\n".join(output_parts)
def _prepare_messages(
self,
task: Optional[str] = None,
@ -648,6 +767,26 @@ class LiteLLM:
f"Model {self.model_name} does not support vision"
)
@staticmethod
def check_if_model_name_uses_anthropic(model_name: str):
"""
Check if the model name uses Anthropic.
"""
if "anthropic" in model_name.lower():
return True
else:
return False
@staticmethod
def check_if_model_name_uses_openai(model_name: str):
"""
Check if the model name uses OpenAI.
"""
if "openai" in model_name.lower():
return True
else:
return False
def run(
self,
task: str,
@ -737,6 +876,29 @@ class LiteLLM:
if self.modalities and len(self.modalities) >= 2:
completion_params["modalities"] = self.modalities
if (
self.reasoning_effort is not None
and litellm.supports_reasoning(model=self.model_name)
is True
):
completion_params["reasoning_effort"] = (
self.reasoning_effort
)
else:
logger.warning(
f"Model {self.model_name} may or may not support reasoning"
)
if (
self.reasoning_enabled is True
and self.thinking_tokens is not None
):
thinking = {
"type": "enabled",
"budget_tokens": self.thinking_tokens,
}
completion_params["thinking"] = thinking
# Process additional args if any
self._process_additional_args(completion_params, args)
@ -747,6 +909,13 @@ class LiteLLM:
if self.stream:
return response # Return the streaming generator directly
# Handle reasoning model output
elif (
self.reasoning_enabled
and self.reasoning_effort is not None
):
return self.output_for_reasoning(response)
# Handle tool-based response
elif self.tools_list_dictionary is not None:
return self.output_for_tools(response)
@ -783,118 +952,6 @@ class LiteLLM:
"""
return self.run(task, *args, **kwargs)
async def arun(self, task: str, *args, **kwargs):
"""
Run the LLM model asynchronously for the given task.
Args:
task (str): The task to run the model for.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
str: The content of the response from the model.
"""
try:
# Extract image parameter from kwargs if present
img = kwargs.pop("img", None) if "img" in kwargs else None
messages = self._prepare_messages(task=task, img=img)
# Prepare common completion parameters
completion_params = {
"model": self.model_name,
"messages": messages,
"stream": self.stream,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
}
# Merge initialization kwargs first (lower priority)
if self.init_kwargs:
completion_params.update(self.init_kwargs)
# Merge runtime kwargs (higher priority - overrides init kwargs)
if kwargs:
completion_params.update(kwargs)
# Handle tool-based completion
if self.tools_list_dictionary is not None:
completion_params.update(
{
"tools": self.tools_list_dictionary,
"tool_choice": self.tool_choice,
"parallel_tool_calls": self.parallel_tool_calls,
}
)
# Process additional args if any
self._process_additional_args(completion_params, args)
# Make the completion call
response = await acompletion(**completion_params)
# Handle tool-based response
if self.tools_list_dictionary is not None:
return (
response.choices[0]
.message.tool_calls[0]
.function.arguments
)
elif self.return_all is True:
return response.model_dump()
elif "gemini" in self.model_name.lower():
return gemini_output_img_handler(response)
else:
# For non-Gemini models, return the content directly
return response.choices[0].message.content
except Exception as error:
logger.error(f"Error in LiteLLM arun: {str(error)}")
# if "rate_limit" in str(error).lower():
# logger.warning(
# "Rate limit hit, retrying with exponential backoff..."
# )
# await asyncio.sleep(2) # Use async sleep
# return await self.arun(task, *args, **kwargs)
raise error
async def _process_batch(
self, tasks: List[str], batch_size: int = 10
):
"""
Process a batch of tasks asynchronously.
Args:
tasks (List[str]): List of tasks to process.
batch_size (int): Size of each batch.
Returns:
List[str]: List of responses.
"""
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i : i + batch_size]
batch_results = await asyncio.gather(
*[self.arun(task) for task in batch],
return_exceptions=True,
)
# Handle any exceptions in the batch
for result in batch_results:
if isinstance(result, Exception):
logger.error(
f"Error in batch processing: {str(result)}"
)
results.append(str(result))
else:
results.append(result)
# Add a small delay between batches to avoid rate limits
if i + batch_size < len(tasks):
await asyncio.sleep(0.5)
return results
def batched_run(self, tasks: List[str], batch_size: int = 10):
"""
Run multiple tasks in batches synchronously.
@ -910,21 +967,3 @@ class LiteLLM:
f"Running {len(tasks)} tasks in batches of {batch_size}"
)
return asyncio.run(self._process_batch(tasks, batch_size))
async def batched_arun(
self, tasks: List[str], batch_size: int = 10
):
"""
Run multiple tasks in batches asynchronously.
Args:
tasks (List[str]): List of tasks to process.
batch_size (int): Size of each batch.
Returns:
List[str]: List of responses.
"""
logger.info(
f"Running {len(tasks)} tasks asynchronously in batches of {batch_size}"
)
return await self._process_batch(tasks, batch_size)

@ -131,9 +131,7 @@ def test_router_description(report):
"""Test ReasoningAgentRouter with custom description (only change description param)"""
start_time = time.time()
try:
result = test_agents_swarm(
description="Test description for router"
)
test_agents_swarm(description="Test description for router")
# Check if the description was set correctly
router = ReasoningAgentRouter(
description="Test description for router"
@ -164,7 +162,7 @@ def test_router_model_name(report):
"""Test ReasoningAgentRouter with custom model_name (only change model_name param)"""
start_time = time.time()
try:
result = test_agents_swarm(model_name="gpt-4")
test_agents_swarm(model_name="gpt-4")
router = ReasoningAgentRouter(model_name="gpt-4")
if router.model_name == "gpt-4":
report.add_result(
@ -192,9 +190,7 @@ def test_router_system_prompt(report):
"""Test ReasoningAgentRouter with custom system_prompt (only change system_prompt param)"""
start_time = time.time()
try:
result = test_agents_swarm(
system_prompt="You are a test router."
)
test_agents_swarm(system_prompt="You are a test router.")
router = ReasoningAgentRouter(
system_prompt="You are a test router."
)
@ -224,7 +220,7 @@ def test_router_max_loops(report):
"""Test ReasoningAgentRouter with custom max_loops (only change max_loops param)"""
start_time = time.time()
try:
result = test_agents_swarm(max_loops=5)
test_agents_swarm(max_loops=5)
router = ReasoningAgentRouter(max_loops=5)
if router.max_loops == 5:
report.add_result(
@ -252,7 +248,7 @@ def test_router_swarm_type(report):
"""Test ReasoningAgentRouter with custom swarm_type (only change swarm_type param)"""
start_time = time.time()
try:
result = test_agents_swarm(swarm_type="reasoning-agent")
test_agents_swarm(swarm_type="reasoning-agent")
router = ReasoningAgentRouter(swarm_type="reasoning-agent")
if router.swarm_type == "reasoning-agent":
report.add_result(
@ -281,7 +277,7 @@ def test_router_num_samples(report):
start_time = time.time()
try:
router = ReasoningAgentRouter(num_samples=3)
output = router.run("How many samples do you use?")
router.run("How many samples do you use?")
if router.num_samples == 3:
report.add_result(
"Parameter: num_samples",
@ -389,7 +385,7 @@ def test_router_eval(report):
"""Test ReasoningAgentRouter with eval enabled (only change eval param)"""
start_time = time.time()
try:
result = test_agents_swarm(eval=True)
test_agents_swarm(eval=True)
router = ReasoningAgentRouter(eval=True)
if router.eval is True:
report.add_result(
@ -417,7 +413,7 @@ def test_router_random_models_on(report):
"""Test ReasoningAgentRouter with random_models_on enabled (only change random_models_on param)"""
start_time = time.time()
try:
result = test_agents_swarm(random_models_on=True)
test_agents_swarm(random_models_on=True)
router = ReasoningAgentRouter(random_models_on=True)
if router.random_models_on is True:
report.add_result(
@ -445,7 +441,7 @@ def test_router_majority_voting_prompt(report):
"""Test ReasoningAgentRouter with custom majority_voting_prompt (only change majority_voting_prompt param)"""
start_time = time.time()
try:
result = test_agents_swarm(
test_agents_swarm(
majority_voting_prompt="Vote for the best answer."
)
router = ReasoningAgentRouter(
@ -526,7 +522,7 @@ def test_router_select_swarm(report):
majority_voting_prompt=DEFAULT_MAJORITY_VOTING_PROMPT,
)
# Run the method to test
result = router.select_swarm()
router.select_swarm()
# Determine if the result is as expected (not raising error is enough for this test)
report.add_result(
"Method: select_swarm()",

@ -31,20 +31,23 @@ logger.add("test_results.log", rotation="10 MB", level="DEBUG")
TEST_CONFIG = {
"server_url": "http://localhost:8080/mcp",
"transport": "streamable_http",
"timeout": 10
"timeout": 10,
}
# Test results storage
test_results = []
def log_test_result(test_name: str, status: str, message: str = "", error: str = ""):
def log_test_result(
test_name: str, status: str, message: str = "", error: str = ""
):
"""Log test result and add to results list"""
result = {
"test_name": test_name,
"status": status,
"message": message,
"error": error,
"timestamp": datetime.now().isoformat()
"timestamp": datetime.now().isoformat(),
}
test_results.append(result)
@ -55,6 +58,7 @@ def log_test_result(test_name: str, status: str, message: str = "", error: str =
else:
logger.info(f"~ {test_name}: {message}")
def test_transform_mcp_tool_to_openai_tool():
"""Test MCP tool to OpenAI tool transformation"""
test_name = "test_transform_mcp_tool_to_openai_tool"
@ -70,7 +74,10 @@ def test_transform_mcp_tool_to_openai_tool():
mock_tool = MockMCPTool(
name="test_function",
description="Test function description",
input_schema={"type": "object", "properties": {"param1": {"type": "string"}}}
input_schema={
"type": "object",
"properties": {"param1": {"type": "string"}},
},
)
result = transform_mcp_tool_to_openai_tool(mock_tool)
@ -78,13 +85,25 @@ def test_transform_mcp_tool_to_openai_tool():
# Validate result structure
assert result["type"] == "function"
assert result["function"]["name"] == "test_function"
assert result["function"]["description"] == "Test function description"
assert (
result["function"]["description"]
== "Test function description"
)
assert result["function"]["parameters"]["type"] == "object"
log_test_result(test_name, "PASS", "Successfully transformed MCP tool to OpenAI format")
log_test_result(
test_name,
"PASS",
"Successfully transformed MCP tool to OpenAI format",
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to transform tool: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to transform tool: {str(e)}",
)
def test_get_function_arguments():
"""Test function argument extraction"""
@ -92,13 +111,17 @@ def test_get_function_arguments():
try:
# Test with dict arguments
function_def = {"arguments": {"param1": "value1", "param2": "value2"}}
function_def = {
"arguments": {"param1": "value1", "param2": "value2"}
}
result = _get_function_arguments(function_def)
assert isinstance(result, dict)
assert result["param1"] == "value1"
# Test with string arguments
function_def_str = {"arguments": '{"param1": "value1", "param2": "value2"}'}
function_def_str = {
"arguments": '{"param1": "value1", "param2": "value2"}'
}
result_str = _get_function_arguments(function_def_str)
assert isinstance(result_str, dict)
assert result_str["param1"] == "value1"
@ -108,10 +131,19 @@ def test_get_function_arguments():
result_empty = _get_function_arguments(function_def_empty)
assert result_empty == {}
log_test_result(test_name, "PASS", "Successfully extracted function arguments in all formats")
log_test_result(
test_name,
"PASS",
"Successfully extracted function arguments in all formats",
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to extract arguments: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to extract arguments: {str(e)}",
)
def test_transform_openai_tool_call_request_to_mcp_tool_call_request():
"""Test OpenAI tool call to MCP tool call transformation"""
@ -121,20 +153,31 @@ def test_transform_openai_tool_call_request_to_mcp_tool_call_request():
openai_tool = {
"function": {
"name": "test_function",
"arguments": {"param1": "value1", "param2": "value2"}
"arguments": {"param1": "value1", "param2": "value2"},
}
}
result = transform_openai_tool_call_request_to_mcp_tool_call_request(openai_tool)
result = transform_openai_tool_call_request_to_mcp_tool_call_request(
openai_tool
)
assert result.name == "test_function"
assert result.arguments["param1"] == "value1"
assert result.arguments["param2"] == "value2"
log_test_result(test_name, "PASS", "Successfully transformed OpenAI tool call to MCP format")
log_test_result(
test_name,
"PASS",
"Successfully transformed OpenAI tool call to MCP format",
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to transform tool call: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to transform tool call: {str(e)}",
)
def test_auto_detect_transport():
"""Test transport auto-detection"""
@ -166,10 +209,19 @@ def test_auto_detect_transport():
transport = auto_detect_transport(unknown_url)
assert transport == "sse" # Default
log_test_result(test_name, "PASS", "Successfully auto-detected all transport types")
log_test_result(
test_name,
"PASS",
"Successfully auto-detected all transport types",
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to auto-detect transport: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to auto-detect transport: {str(e)}",
)
def test_connect_to_mcp_server():
"""Test MCP server connection configuration"""
@ -184,10 +236,12 @@ def test_connect_to_mcp_server():
transport="streamable_http",
timeout=10,
headers={"Content-Type": "application/json"},
authorization_token="test_token"
authorization_token="test_token",
)
headers, timeout, transport, url = connect_to_mcp_server(connection)
headers, timeout, transport, url = connect_to_mcp_server(
connection
)
assert url == "http://localhost:8080/mcp"
assert transport == "streamable_http"
@ -195,10 +249,19 @@ def test_connect_to_mcp_server():
assert "Authorization" in headers
assert headers["Authorization"] == "Bearer test_token"
log_test_result(test_name, "PASS", "Successfully configured MCP server connection")
log_test_result(
test_name,
"PASS",
"Successfully configured MCP server connection",
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to configure connection: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to configure connection: {str(e)}",
)
async def test_aget_mcp_tools():
"""Test async MCP tools fetching"""
@ -209,16 +272,27 @@ async def test_aget_mcp_tools():
tools = await aget_mcp_tools(
server_path=TEST_CONFIG["server_url"],
format="openai",
transport=TEST_CONFIG["transport"]
transport=TEST_CONFIG["transport"],
)
assert isinstance(tools, list)
log_test_result(test_name, "PASS", f"Successfully fetched {len(tools)} tools from server")
log_test_result(
test_name,
"PASS",
f"Successfully fetched {len(tools)} tools from server",
)
except MCPConnectionError as e:
log_test_result(test_name, "SKIP", f"Server not available: {str(e)}")
log_test_result(
test_name, "SKIP", f"Server not available: {str(e)}"
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to fetch tools: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to fetch tools: {str(e)}",
)
def test_get_mcp_tools_sync():
"""Test synchronous MCP tools fetching"""
@ -228,16 +302,27 @@ def test_get_mcp_tools_sync():
tools = get_mcp_tools_sync(
server_path=TEST_CONFIG["server_url"],
format="openai",
transport=TEST_CONFIG["transport"]
transport=TEST_CONFIG["transport"],
)
assert isinstance(tools, list)
log_test_result(test_name, "PASS", f"Successfully fetched {len(tools)} tools synchronously")
log_test_result(
test_name,
"PASS",
f"Successfully fetched {len(tools)} tools synchronously",
)
except MCPConnectionError as e:
log_test_result(test_name, "SKIP", f"Server not available: {str(e)}")
log_test_result(
test_name, "SKIP", f"Server not available: {str(e)}"
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to fetch tools sync: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to fetch tools sync: {str(e)}",
)
def test_fetch_tools_for_server():
"""Test fetching tools for a single server"""
@ -247,38 +332,62 @@ def test_fetch_tools_for_server():
tools = _fetch_tools_for_server(
url=TEST_CONFIG["server_url"],
format="openai",
transport=TEST_CONFIG["transport"]
transport=TEST_CONFIG["transport"],
)
assert isinstance(tools, list)
log_test_result(test_name, "PASS", f"Successfully fetched tools for single server: {len(tools)} tools")
log_test_result(
test_name,
"PASS",
f"Successfully fetched tools for single server: {len(tools)} tools",
)
except MCPConnectionError as e:
log_test_result(test_name, "SKIP", f"Server not available: {str(e)}")
log_test_result(
test_name, "SKIP", f"Server not available: {str(e)}"
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to fetch tools for server: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to fetch tools for server: {str(e)}",
)
def test_get_tools_for_multiple_mcp_servers():
"""Test fetching tools from multiple servers"""
test_name = "test_get_tools_for_multiple_mcp_servers"
try:
urls = [TEST_CONFIG["server_url"]] # Using single server for testing
urls = [
TEST_CONFIG["server_url"]
] # Using single server for testing
tools = get_tools_for_multiple_mcp_servers(
urls=urls,
format="openai",
transport=TEST_CONFIG["transport"],
max_workers=2
max_workers=2,
)
assert isinstance(tools, list)
log_test_result(test_name, "PASS", f"Successfully fetched tools from multiple servers: {len(tools)} tools")
log_test_result(
test_name,
"PASS",
f"Successfully fetched tools from multiple servers: {len(tools)} tools",
)
except MCPConnectionError as e:
log_test_result(test_name, "SKIP", f"Server not available: {str(e)}")
log_test_result(
test_name, "SKIP", f"Server not available: {str(e)}"
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to fetch tools from multiple servers: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to fetch tools from multiple servers: {str(e)}",
)
async def test_execute_tool_call_simple():
"""Test simple tool execution"""
@ -290,11 +399,15 @@ async def test_execute_tool_call_simple():
tools = await aget_mcp_tools(
server_path=TEST_CONFIG["server_url"],
format="openai",
transport=TEST_CONFIG["transport"]
transport=TEST_CONFIG["transport"],
)
if not tools:
log_test_result(test_name, "SKIP", "No tools available for testing")
log_test_result(
test_name,
"SKIP",
"No tools available for testing",
)
return
# Use the first available tool for testing
@ -305,7 +418,7 @@ async def test_execute_tool_call_simple():
tool_call_request = {
"function": {
"name": tool_name,
"arguments": {} # Basic empty arguments
"arguments": {}, # Basic empty arguments
}
}
@ -313,17 +426,30 @@ async def test_execute_tool_call_simple():
response=tool_call_request,
server_path=TEST_CONFIG["server_url"],
transport=TEST_CONFIG["transport"],
output_type="str"
output_type="str",
)
assert result is not None
log_test_result(test_name, "PASS", f"Successfully executed tool call for {tool_name}")
log_test_result(
test_name,
"PASS",
f"Successfully executed tool call for {tool_name}",
)
except MCPConnectionError:
log_test_result(test_name, "SKIP", "Server not available for tool execution test")
log_test_result(
test_name,
"SKIP",
"Server not available for tool execution test",
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to execute tool call: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to execute tool call: {str(e)}",
)
async def test_create_server_tool_mapping():
"""Test server tool mapping creation"""
@ -335,16 +461,27 @@ async def test_create_server_tool_mapping():
mapping = await _create_server_tool_mapping_async(
urls=urls,
format="openai",
transport=TEST_CONFIG["transport"]
transport=TEST_CONFIG["transport"],
)
assert isinstance(mapping, dict)
log_test_result(test_name, "PASS", f"Successfully created server tool mapping with {len(mapping)} functions")
log_test_result(
test_name,
"PASS",
f"Successfully created server tool mapping with {len(mapping)} functions",
)
except MCPConnectionError as e:
log_test_result(test_name, "SKIP", f"Server not available: {str(e)}")
log_test_result(
test_name, "SKIP", f"Server not available: {str(e)}"
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to create server tool mapping: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to create server tool mapping: {str(e)}",
)
async def test_execute_multiple_tools_on_multiple_servers():
"""Test executing multiple tools across servers"""
@ -358,11 +495,15 @@ async def test_execute_multiple_tools_on_multiple_servers():
tools = await aget_mcp_tools(
server_path=TEST_CONFIG["server_url"],
format="openai",
transport=TEST_CONFIG["transport"]
transport=TEST_CONFIG["transport"],
)
if not tools:
log_test_result(test_name, "SKIP", "No tools available for testing")
log_test_result(
test_name,
"SKIP",
"No tools available for testing",
)
return
# Create test requests using available tools
@ -371,30 +512,49 @@ async def test_execute_multiple_tools_on_multiple_servers():
tool_call = {
"function": {
"name": tool["function"]["name"],
"arguments": {}
"arguments": {},
}
}
responses.append(tool_call)
if not responses:
log_test_result(test_name, "SKIP", "No suitable tools found for testing")
log_test_result(
test_name,
"SKIP",
"No suitable tools found for testing",
)
return
results = await execute_multiple_tools_on_multiple_mcp_servers(
results = (
await execute_multiple_tools_on_multiple_mcp_servers(
responses=responses,
urls=urls,
transport=TEST_CONFIG["transport"],
max_concurrent=2
max_concurrent=2,
)
)
assert isinstance(results, list)
log_test_result(test_name, "PASS", f"Successfully executed {len(results)} tool calls")
log_test_result(
test_name,
"PASS",
f"Successfully executed {len(results)} tool calls",
)
except MCPConnectionError:
log_test_result(test_name, "SKIP", "Server not available for multiple tool execution test")
log_test_result(
test_name,
"SKIP",
"Server not available for multiple tool execution test",
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed to execute multiple tools: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed to execute multiple tools: {str(e)}",
)
def test_execute_multiple_tools_sync():
"""Test synchronous multiple tool execution"""
@ -408,7 +568,7 @@ def test_execute_multiple_tools_sync():
{
"function": {
"name": "test_function", # This will likely fail but tests the sync wrapper
"arguments": {}
"arguments": {},
}
}
]
@ -417,16 +577,27 @@ def test_execute_multiple_tools_sync():
responses=responses,
urls=urls,
transport=TEST_CONFIG["transport"],
max_concurrent=1
max_concurrent=1,
)
assert isinstance(results, list)
log_test_result(test_name, "PASS", f"Successfully ran sync multiple tools execution (got {len(results)} results)")
log_test_result(
test_name,
"PASS",
f"Successfully ran sync multiple tools execution (got {len(results)} results)",
)
except MCPConnectionError as e:
log_test_result(test_name, "SKIP", f"Server not available: {str(e)}")
log_test_result(
test_name, "SKIP", f"Server not available: {str(e)}"
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Failed sync multiple tools execution: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Failed sync multiple tools execution: {str(e)}",
)
def test_error_handling():
"""Test error handling for various scenarios"""
@ -437,7 +608,7 @@ def test_error_handling():
try:
get_mcp_tools_sync(
server_path="http://invalid-url:99999/mcp",
transport="streamable_http"
transport="streamable_http",
)
assert False, "Should have raised an exception"
except MCPConnectionError:
@ -454,10 +625,17 @@ def test_error_handling():
transport = auto_detect_transport("")
assert transport == "sse" # Should default to sse
log_test_result(test_name, "PASS", "All error handling tests passed")
log_test_result(
test_name, "PASS", "All error handling tests passed"
)
except Exception as e:
log_test_result(test_name, "FAIL", error=f"Error handling test failed: {str(e)}")
log_test_result(
test_name,
"FAIL",
error=f"Error handling test failed: {str(e)}",
)
async def run_all_tests():
"""Run all test functions"""
@ -481,7 +659,10 @@ async def run_all_tests():
await test_create_server_tool_mapping()
await test_execute_multiple_tools_on_multiple_servers()
logger.info(f"Completed all tests. Total tests run: {len(test_results)}")
logger.info(
f"Completed all tests. Total tests run: {len(test_results)}"
)
def generate_markdown_report():
"""Generate markdown report of test results"""
@ -510,17 +691,27 @@ def generate_markdown_report():
"""
for test in passed_tests:
markdown_content += f"- **{test['test_name']}**: {test['message']}\n"
markdown_content += (
f"- **{test['test_name']}**: {test['message']}\n"
)
if failed_tests:
markdown_content += f"\n### ❌ Failed Tests ({len(failed_tests)})\n"
markdown_content += (
f"\n### ❌ Failed Tests ({len(failed_tests)})\n"
)
for test in failed_tests:
markdown_content += f"- **{test['test_name']}**: {test['error']}\n"
markdown_content += (
f"- **{test['test_name']}**: {test['error']}\n"
)
if skipped_tests:
markdown_content += f"\n### ⏭️ Skipped Tests ({len(skipped_tests)})\n"
markdown_content += (
f"\n### ⏭️ Skipped Tests ({len(skipped_tests)})\n"
)
for test in skipped_tests:
markdown_content += f"- **{test['test_name']}**: {test['message']}\n"
markdown_content += (
f"- **{test['test_name']}**: {test['message']}\n"
)
markdown_content += """
## Detailed Results
@ -530,7 +721,9 @@ def generate_markdown_report():
"""
for test in test_results:
status_emoji = {"PASS": "", "FAIL": "", "SKIP": "⏭️"}.get(test["status"], "")
status_emoji = {"PASS": "", "FAIL": "", "SKIP": "⏭️"}.get(
test["status"], ""
)
message = test.get("message") or test.get("error", "")
markdown_content += f"| {test['test_name']} | {status_emoji} {test['status']} | {message} | {test['timestamp']} |\n"
@ -545,6 +738,7 @@ Generated at: {datetime.now().isoformat()}
return markdown_content
async def main():
"""Main test runner"""
try:
@ -559,9 +753,15 @@ async def main():
logger.info("Test results saved to mcp_test_results.md")
# Print summary
passed = len([r for r in test_results if r["status"] == "PASS"])
failed = len([r for r in test_results if r["status"] == "FAIL"])
skipped = len([r for r in test_results if r["status"] == "SKIP"])
passed = len(
[r for r in test_results if r["status"] == "PASS"]
)
failed = len(
[r for r in test_results if r["status"] == "FAIL"]
)
skipped = len(
[r for r in test_results if r["status"] == "SKIP"]
)
print(f"\n{'='*50}")
print("TEST SUMMARY")
@ -577,5 +777,6 @@ async def main():
logger.error(f"Error running tests: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
if __name__ == "__main__":
asyncio.run(main())
Loading…
Cancel
Save