[FEAT][Reasoning Integration] [Improvement][MajorityVoting Cleanup] [EXAMPLE][Firecrawl Agent] [FEAT][Streaming Callback for ConcurrentWorkflow]

pull/1062/head
Kye Gomez 2 months ago
parent ccd24f3a61
commit da7df166e7

@ -12,206 +12,331 @@ graph TD
D --> E[Single Task]
D --> F[Batch Tasks]
D --> G[Concurrent Tasks]
D --> H[Async Tasks]
E --> I[Run Agents]
F --> I
G --> I
H --> I
I --> J[Collect Responses]
J --> K[Consensus Analysis]
K --> L{Consensus Agent?}
L -->|Yes| M[Use Consensus Agent]
L -->|No| N[Use Last Agent]
M --> O[Final Output]
N --> O
O --> P[Save Conversation]
E --> H[Run Agents]
F --> H
G --> H
H --> I[Collect Responses]
I --> J[Consensus Analysis]
J --> K{Consensus Agent?}
K -->|Yes| L[Use Consensus Agent]
K -->|No| M[Use Last Agent]
L --> N[Final Output]
M --> N
N --> O[Return Result]
```
### Key Concepts
- **Majority Voting**: A method to determine the most common response from a set of answers.
- **Agents**: Entities (e.g., models, algorithms) that provide responses to tasks or queries.
- **Output Parser**: A function that processes the responses from the agents before performing the majority voting.
- **Consensus Agent**: An optional agent that analyzes the responses from all agents to determine the final consensus.
- **Conversation History**: A record of all agent interactions and responses during the voting process.
- **Output Types**: Support for different output formats (string, dictionary, list).
## Class Definition: `MajorityVoting`
### Parameters
```python
class MajorityVoting:
def __init__(
self,
id: str = swarm_id(),
name: str = "MajorityVoting",
description: str = "A majority voting system for agents",
agents: List[Agent] = None,
consensus_agent: Optional[Agent] = None,
autosave: bool = False,
verbose: bool = False,
max_loops: int = 1,
output_type: OutputType = "dict",
*args,
**kwargs,
):
```
| Parameter | Type | Description |
|------------------|----------------|-----------------------------------------------------------------------------|
| `name` | `str` | Name of the majority voting system. Default is "MajorityVoting". |
| `description` | `str` | Description of the system. Default is "A majority voting system for agents". |
| `agents` | `List[Agent]` | A list of agents to be used in the majority voting system. |
| `output_parser` | `Callable` | Function to parse agent outputs. Default is `majority_voting` function. |
| `consensus_agent`| `Agent` | Optional agent for analyzing consensus among responses. |
| `autosave` | `bool` | Whether to autosave conversations. Default is `False`. |
| `verbose` | `bool` | Whether to enable verbose logging. Default is `False`. |
| `max_loops` | `int` | Maximum number of voting loops. Default is 1. |
### Constructor Parameters
| Parameter | Type | Default | Description |
|------------------|-------------------|---------------|-----------------------------------------------------------------------------|
| `id` | `str` | `swarm_id()` | Unique identifier for the majority voting system. |
| `name` | `str` | `"MajorityVoting"` | Name of the majority voting system. |
| `description` | `str` | `"A majority voting system for agents"` | Description of the system. |
| `agents` | `List[Agent]` | `None` | A list of agents to be used in the majority voting system. Required. |
| `consensus_agent`| `Optional[Agent]` | `None` | Optional agent for analyzing consensus among responses. If None, uses last agent. |
| `autosave` | `bool` | `False` | Whether to autosave conversations. |
| `verbose` | `bool` | `False` | Whether to enable verbose logging. |
| `max_loops` | `int` | `1` | Maximum number of voting loops. |
| `output_type` | `OutputType` | `"dict"` | Output format: "str", "dict", "list", or other. |
| `*args` | `Any` | - | Variable length argument list passed to Conversation. |
| `**kwargs` | `Any` | - | Arbitrary keyword arguments passed to Conversation. |
### Methods
#### `run(task: str, correct_answer: str, *args, **kwargs) -> List[Any]`
#### `run(task: str, *args, **kwargs) -> Any`
Runs the majority voting system for a single task.
Runs the majority voting system for a single task and returns the consensus result.
**Parameters:**
- `task` (str): The task to be performed by the agents
- `correct_answer` (str): The correct answer for evaluation
- `*args`, `**kwargs`: Additional arguments
- `task` (`str`): The task to be performed by the agents
- `*args`: Variable length argument list passed to agents
- `**kwargs`: Arbitrary keyword arguments passed to agents
**Returns:**
- List[Any]: The conversation history as a string, including the majority vote
- `Any`: The consensus result in the specified output format (string, dict, or list)
**Raises:**
- `ValueError`: If agents list is empty
#### `batch_run(tasks: List[str], *args, **kwargs) -> List[Any]`
Runs multiple tasks in sequence.
Runs the majority voting system for multiple tasks in sequence.
**Parameters:**
- `tasks` (List[str]): List of tasks to be performed
- `*args`, `**kwargs`: Additional arguments
- `tasks` (`List[str]`): List of tasks to be performed by the agents
- `*args`: Variable length argument list passed to each run
- `**kwargs`: Arbitrary keyword arguments passed to each run
**Returns:**
- List[Any]: List of majority votes for each task
- `List[Any]`: List of consensus results for each task
#### `run_concurrently(tasks: List[str], *args, **kwargs) -> List[Any]`
Runs multiple tasks concurrently using thread pooling.
Runs the majority voting system for multiple tasks concurrently using thread pooling.
**Parameters:**
- `tasks` (List[str]): List of tasks to be performed
- `*args`, `**kwargs`: Additional arguments
**Returns:**
- List[Any]: List of majority votes for each task
#### `run_async(tasks: List[str], *args, **kwargs) -> List[Any]`
- `tasks` (`List[str]`): List of tasks to be performed by the agents
- `*args`: Variable length argument list passed to each run
- `**kwargs`: Arbitrary keyword arguments passed to each run
Runs multiple tasks asynchronously using asyncio.
**Returns:**
**Parameters:**
- `tasks` (List[str]): List of tasks to be performed
- `*args`, `**kwargs`: Additional arguments
- `List[Any]`: List of consensus results for each task, in completion order
**Returns:**
- List[Any]: List of majority votes for each task
**Note:** Uses `os.cpu_count()` workers for optimal performance.
## Usage Examples
### Example 1: Basic Single Task Execution with Modern LLMs
### Example 1: Investment Analysis with Consensus Agent
This example demonstrates using MajorityVoting for financial analysis with specialized agents and a dedicated consensus agent.
```python
from swarms import Agent, MajorityVoting
# Initialize multiple agents with different specialties
# Initialize multiple specialized agents
agents = [
Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor focused on market analysis",
system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.",
agent_name="Market-Analysis-Agent",
agent_description="Market trend analyst",
system_prompt="You are a market analyst specializing in identifying growth opportunities and market trends.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Risk-Assessment-Agent",
agent_description="Risk analysis and portfolio management expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.",
agent_name="Risk-Assessment-Agent",
agent_description="Risk analysis expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and volatility.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Tech-Investment-Agent",
agent_description="Technology sector investment specialist",
system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.",
agent_name="Portfolio-Strategy-Agent",
agent_description="Portfolio optimization specialist",
system_prompt="You are a portfolio strategist focused on diversification and long-term growth strategies.",
max_loops=1,
model_name="gpt-4o"
)
]
# Dedicated consensus agent for final analysis
consensus_agent = Agent(
agent_name="Consensus-Agent",
agent_description="Consensus agent focused on analyzing investment advice",
system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.",
agent_name="Investment-Consensus-Agent",
agent_description="Investment consensus analyzer",
system_prompt="You are an expert at synthesizing investment advice from multiple perspectives into a coherent recommendation.",
max_loops=1,
model_name="gpt-4o"
)
# Create majority voting system
majority_voting = MajorityVoting(
name="Investment-Advisory-System",
description="Multi-agent system for investment advice",
investment_system = MajorityVoting(
name="Investment-Analysis-System",
description="Multi-agent investment analysis with consensus evaluation",
agents=agents,
consensus_agent=consensus_agent,
verbose=True,
consensus_agent=consensus_agent
output_type="dict"
)
# Run the analysis with majority voting
result = majority_voting.run(
task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
correct_answer="" # Optional evaluation metric
# Execute investment analysis
result = investment_system.run(
task="""Analyze the following investment scenario and provide recommendations:
- Budget: $50,000
- Risk tolerance: Moderate
- Time horizon: 5-7 years
- Focus areas: Technology, Healthcare, Renewable Energy
Provide specific ETF/index fund recommendations with allocation percentages."""
)
print("Investment Analysis Results:")
print(result)
```
## Batch Execution
### Example 2: Content Creation with Batch Processing
This example shows how to use batch processing for content creation tasks with multiple writing styles.
```python
from swarms import Agent, MajorityVoting
# Initialize multiple agents with different specialties
agents = [
# Initialize content creation agents with different styles
content_agents = [
Agent(
agent_name="Creative-Writer",
agent_description="Creative content specialist",
system_prompt="You are a creative writer who produces engaging, story-driven content with vivid descriptions.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor focused on market analysis",
system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.",
agent_name="Technical-Writer",
agent_description="Technical content specialist",
system_prompt="You are a technical writer who focuses on clarity, accuracy, and structured information.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Risk-Assessment-Agent",
agent_description="Risk analysis and portfolio management expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.",
agent_name="SEO-Optimized-Writer",
agent_description="SEO content specialist",
system_prompt="You are an SEO specialist who creates content optimized for search engines while maintaining quality.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Tech-Investment-Agent",
agent_description="Technology sector investment specialist",
system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.",
agent_name="Conversational-Writer",
agent_description="Conversational content specialist",
system_prompt="You are a conversational writer who creates relatable, engaging content that connects with readers.",
max_loops=1,
model_name="gpt-4o"
)
]
# Create majority voting system without dedicated consensus agent
content_system = MajorityVoting(
name="Content-Creation-System",
description="Multi-style content creation with majority voting",
agents=content_agents,
verbose=True,
output_type="str"
)
# Define multiple content tasks
content_tasks = [
"Write a blog post about the benefits of renewable energy adoption",
"Create social media content for a new fitness app launch",
"Develop a product description for eco-friendly water bottles",
"Write an email newsletter about artificial intelligence trends"
]
# Execute batch processing
batch_results = content_system.batch_run(content_tasks)
consensus_agent = Agent(
agent_name="Consensus-Agent",
agent_description="Consensus agent focused on analyzing investment advice",
system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.",
print("Batch Content Creation Results:")
for i, result in enumerate(batch_results, 1):
print(f"\nTask {i} Result:")
print(result[:500] + "..." if len(str(result)) > 500 else result)
```
### Example 3: Research Analysis with Concurrent Processing
This example demonstrates concurrent processing for research analysis with multiple research perspectives.
```python
from swarms import Agent, MajorityVoting
# Initialize research agents with different methodologies
research_agents = [
Agent(
agent_name="Quantitative-Researcher",
agent_description="Quantitative research specialist",
system_prompt="You are a quantitative researcher who analyzes data, statistics, and numerical evidence.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Qualitative-Researcher",
agent_description="Qualitative research specialist",
system_prompt="You are a qualitative researcher who focuses on patterns, themes, and contextual understanding.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Literature-Review-Specialist",
agent_description="Literature review expert",
system_prompt="You are a literature review specialist who synthesizes existing research and identifies knowledge gaps.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Methodology-Expert",
agent_description="Research methodology specialist",
system_prompt="You are a methodology expert who evaluates research design, validity, and reliability.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Ethics-Reviewer",
agent_description="Research ethics specialist",
system_prompt="You are an ethics reviewer who ensures research practices are responsible and unbiased.",
max_loops=1,
model_name="gpt-4o"
)
]
# Consensus agent for synthesizing research findings
research_consensus = Agent(
agent_name="Research-Synthesis-Agent",
agent_description="Research synthesis specialist",
system_prompt="You are an expert at synthesizing diverse research perspectives into comprehensive, well-supported conclusions.",
max_loops=1,
model_name="gpt-4o"
)
# Create majority voting system
majority_voting = MajorityVoting(
name="Investment-Advisory-System",
description="Multi-agent system for investment advice",
agents=agents,
# Create majority voting system for research
research_system = MajorityVoting(
name="Research-Analysis-System",
description="Concurrent multi-perspective research analysis",
agents=research_agents,
consensus_agent=research_consensus,
verbose=True,
consensus_agent=consensus_agent
output_type="list"
)
# Run the analysis with majority voting
result = majority_voting.batch_run(
task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
correct_answer="" # Optional evaluation metric
)
# Define research questions for concurrent analysis
research_questions = [
"What are the environmental impacts of electric vehicle adoption?",
"How does remote work affect employee productivity and well-being?",
"What are the economic implications of universal basic income?",
"How can AI be used to improve healthcare outcomes?",
"What are the social effects of social media on mental health?"
]
print(result)
# Execute concurrent research analysis
concurrent_results = research_system.run_concurrently(research_questions)
print("Concurrent Research Analysis Results:")
print(f"Total questions analyzed: {len(concurrent_results)}")
for i, result in enumerate(concurrent_results, 1):
print(f"\nResearch Question {i}:")
if isinstance(result, list) and len(result) > 0:
print(f"Analysis length: {len(str(result))} characters")
print(f"Sample output: {str(result)[:300]}...")
else:
print(f"Result: {result}")
```
```

@ -0,0 +1,23 @@
from swarms import Agent
IMAGE_GEN_SYSTEM_PROMPT = (
"You are an advanced image generation agent. Given a textual description, generate a high-quality, photorealistic image that matches the prompt. "
"Return only the generated image."
)
image_gen_agent = Agent(
agent_name="Image-Generation-Agent",
agent_description="Agent specialized in generating high-quality, photorealistic images from textual prompts.",
model_name="gemini/gemini-2.5-flash-image-preview", # Replace with your preferred image generation model if available
dynamic_temperature_enabled=True,
max_loops=1,
dynamic_context_window=True,
retry_interval=1,
)
image_gen_out = image_gen_agent.run(
task=f"{IMAGE_GEN_SYSTEM_PROMPT} \n\n Generate a photorealistic image of a futuristic city skyline at sunset.",
)
print("Image Generation Output:")
print(image_gen_out)

@ -20,6 +20,7 @@ SYSTEM_PROMPT = (
"Return the image only."
)
# Agent for AR annotation
agent = Agent(
agent_name="Tactical-Strategist-Agent",
agent_description="Agent specialized in tactical strategy, scenario analysis, and actionable recommendations for complex situations.",
@ -35,4 +36,5 @@ out = agent.run(
img="hk.jpg",
)
print("AR Annotation Output:")
print(out)

@ -0,0 +1,62 @@
from swarms import Agent, ConcurrentWorkflow
def streaming_callback(
agent_name: str, chunk: str, is_complete: bool
):
"""
Print streaming output from each agent as it arrives.
"""
if chunk:
print(f"[{agent_name}] {chunk}", end="", flush=True)
if is_complete:
print(" [DONE]")
def main():
"""
Run a simple concurrent workflow with streaming output.
"""
agents = [
Agent(
agent_name="Financial",
system_prompt="Financial analysis.",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
print_on=False,
),
Agent(
agent_name="Legal",
system_prompt="Legal analysis.",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
print_on=False,
),
]
workflow = ConcurrentWorkflow(
name="Simple-Streaming",
agents=agents,
show_dashboard=False,
output_type="dict-all-except-first",
)
task = "Give a short analysis of the risks and opportunities for a SaaS startup expanding to Europe."
print("Streaming output:\n")
result = workflow.run(
task=task, streaming_callback=streaming_callback
)
print("\n\nFinal results:")
# Convert list of messages to dict with agent names as keys
agent_outputs = {}
for message in result:
if message["role"] != "User": # Skip the user message
agent_outputs[message["role"]] = message["content"]
for agent, output in agent_outputs.items():
print(f"{agent}: {output}")
if __name__ == "__main__":
main()

@ -0,0 +1,79 @@
from swarms import Agent
from swarms_tools import scrape_and_format_sync
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
import time
agent = Agent(
agent_name="Web Scraper Agent",
model_name="groq/moonshotai/kimi-k2-instruct",
system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full",
tools=[scrape_and_format_sync],
dynamic_context_window=True,
dynamic_temperature_enabled=True,
max_loops=1,
streaming_on=True, # Enable streaming mode
print_on=False, # Prevent direct printing (let callback handle it)
)
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
"""
Callback function to handle streaming output from agents.
Args:
agent_name (str): Name of the agent producing the output
chunk (str): Chunk of output text
is_final (bool): Whether this is the final chunk (completion signal)
"""
if is_final:
print(f"\n[{agent_name}] - COMPLETED")
return
if chunk:
# Print timestamp with agent name for each chunk
timestamp = time.strftime("%H:%M:%S", time.localtime())
print(
f"[{timestamp}] [{agent_name}] {chunk}",
end="",
flush=True,
)
else:
# Debug: print when chunk is empty but not final
print(f"[{agent_name}] - EMPTY CHUNK", end="", flush=True)
# Alternative simple callback (uncomment to use instead):
# def simple_streaming_callback(agent_name: str, chunk: str, is_final: bool):
# """Simple callback that just prints agent output without timestamps."""
# if is_final:
# print(f"\n--- {agent_name} FINISHED ---")
# elif chunk:
# print(f"[{agent_name}] {chunk}", end="", flush=True)
# For saving to file (uncomment to use):
# import os
# def file_streaming_callback(agent_name: str, chunk: str, is_final: bool):
# """Callback that saves streaming output to separate files per agent."""
# if not os.path.exists('agent_outputs'):
# os.makedirs('agent_outputs')
#
# filename = f"agent_outputs/{agent_name.replace(' ', '_')}.txt"
#
# with open(filename, 'a', encoding='utf-8') as f:
# if is_final:
# f.write(f"\n--- COMPLETED ---\n")
# elif chunk:
# f.write(chunk)
swarm = ConcurrentWorkflow(
agents=[agent, agent],
name="Web Scraper Swarm",
description="This swarm is used to scrape the web and return the data in a structured format.",
)
swarm.run(
task="Scrape swarms.ai website and provide a full report of the company does. The format type should be full.",
streaming_callback=streaming_callback,
)

@ -0,0 +1,18 @@
from swarms.utils.litellm_wrapper import LiteLLM
# Initialize the LiteLLM wrapper with reasoning support
llm = LiteLLM(
model_name="claude-sonnet-4-20250514", # OpenAI o3 model with reasoning
reasoning_effort="low", # Enable reasoning with high effort
temperature=1,
max_tokens=2000,
stream=False,
thinking_tokens=1024,
)
# Example task that would benefit from reasoning
task = "Solve this step-by-step: A farmer has 17 sheep and all but 9 die. How many sheep does he have left?"
print("=== Running reasoning model ===")
response = llm.run(task)
print(response)

@ -0,0 +1,23 @@
from swarms import Agent
# Initialize the LiteLLM wrapper with reasoning support
agent = Agent(
model_name="claude-sonnet-4-20250514", # OpenAI o3 model with reasoning
reasoning_effort="low", # Enable reasoning with high effort
temperature=1,
max_tokens=2000,
stream=False,
thinking_tokens=1024,
top_p=0.95,
streaming_on=True,
print_on=False,
)
out = agent.run(
task="Solve this step-by-step: A farmer has 17 sheep and all but 9 die. How many sheep does he have left?",
)
for chunk in out:
# Flush
print(chunk, end="", flush=True)

@ -0,0 +1,25 @@
from swarms_tools import crawl_entire_site_firecrawl
from swarms import Agent
agent = Agent(
agent_name="Marketing Copy Improver",
model_name="gpt-4.1",
tools=[crawl_entire_site_firecrawl],
dynamic_context_window=True,
dynamic_temperature_enabled=True,
max_loops=1,
system_prompt=(
"You are a world-class marketing copy improver. "
"Given a website URL, your job is to crawl the entire site, analyze all marketing copy, "
"and rewrite it to maximize clarity, engagement, and conversion. "
"Return the improved marketing copy in a structured, easy-to-read format. "
"Be concise, persuasive, and ensure the tone matches the brand. "
"Highlight key value propositions and calls to action."
),
)
out = agent.run(
"Crawl 2-3 pages of swarms.ai and improve the marketing copy found on those pages. Return the improved copy in a structured format."
)
print(out)

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

@ -39,9 +39,6 @@ from swarms.structs.ma_blocks import (
)
from swarms.structs.majority_voting import (
MajorityVoting,
majority_voting,
most_frequent,
parse_code_completion,
)
from swarms.structs.malt import MALT
from swarms.structs.meme_agent_persona_generator import (
@ -112,9 +109,6 @@ __all__ = [
"Conversation",
"GroupChat",
"MajorityVoting",
"majority_voting",
"most_frequent",
"parse_code_completion",
"AgentRearrange",
"rearrange",
"RoundRobinSwarm",

@ -432,6 +432,10 @@ class Agent:
reasoning_prompt_on: bool = True,
dynamic_context_window: bool = True,
show_tool_execution_output: bool = True,
reasoning_effort: str = None,
drop_params: bool = True,
thinking_tokens: int = None,
reasoning_enabled: bool = True,
*args,
**kwargs,
):
@ -574,6 +578,10 @@ class Agent:
self.dynamic_context_window = dynamic_context_window
self.show_tool_execution_output = show_tool_execution_output
self.mcp_configs = mcp_configs
self.reasoning_effort = reasoning_effort
self.drop_params = drop_params
self.thinking_tokens = thinking_tokens
self.reasoning_enabled = reasoning_enabled
# self.init_handling()
self.setup_config()
@ -718,6 +726,9 @@ class Agent:
"stream": self.streaming_on,
"top_p": self.top_p,
"retries": self.retry_attempts,
"reasoning_effort": self.reasoning_effort,
"thinking_tokens": self.thinking_tokens,
"reasoning_enabled": self.reasoning_enabled,
}
# Initialize tools_list_dictionary, if applicable
@ -974,33 +985,37 @@ class Agent:
)
def print_dashboard(self):
"""
Print a dashboard displaying the agent's current status and configuration.
Uses square brackets instead of emojis for section headers and bullet points.
"""
tools_activated = True if self.tools is not None else False
mcp_activated = True if self.mcp_url is not None else False
formatter.print_panel(
f"""
🤖 Agent {self.agent_name} Dashboard 🚀
[Agent {self.agent_name} Dashboard]
===========================================================
🎯 Agent {self.agent_name} Status: ONLINE & OPERATIONAL
[Agent {self.agent_name} Status]: ONLINE & OPERATIONAL
-----------------------------------------------------------
📋 Agent Identity:
🏷 Name: {self.agent_name}
📝 Description: {self.agent_description}
[Agent Identity]
- [Name]: {self.agent_name}
- [Description]: {self.agent_description}
Technical Specifications:
🤖 Model: {self.model_name}
🔄 Internal Loops: {self.max_loops}
🎯 Max Tokens: {self.max_tokens}
🌡 Dynamic Temperature: {self.dynamic_temperature_enabled}
[Technical Specifications]
- [Model]: {self.model_name}
- [Internal Loops]: {self.max_loops}
- [Max Tokens]: {self.max_tokens}
- [Dynamic Temperature]: {self.dynamic_temperature_enabled}
🔧 System Modules:
🛠 Tools Activated: {tools_activated}
🔗 MCP Activated: {mcp_activated}
[System Modules]
- [Tools Activated]: {tools_activated}
- [MCP Activated]: {mcp_activated}
🚀 Ready for Tasks 🚀
===========================================================
[Ready for Tasks]
""",
title=f"Agent {self.agent_name} Dashboard",

@ -5,6 +5,7 @@ from typing import Callable, List, Optional, Union
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.swarm_id import swarm_id
from swarms.utils.formatter import formatter
from swarms.utils.get_cpu_cores import get_cpu_cores
from swarms.utils.history_output_formatter import (
@ -16,131 +17,14 @@ logger = initialize_logger(log_folder="concurrent_workflow")
class ConcurrentWorkflow(BaseSwarm):
"""
A production-grade concurrent workflow orchestrator that executes multiple agents simultaneously.
ConcurrentWorkflow is designed for high-performance multi-agent orchestration with advanced features
including real-time monitoring, error handling, caching, and flexible output formatting. It's ideal
for scenarios where multiple agents need to process the same task independently and their results
need to be aggregated.
Key Features:
- Concurrent execution using ThreadPoolExecutor for optimal performance
- Real-time dashboard monitoring with status updates
- Comprehensive error handling and recovery
- Flexible output formatting options
- Automatic prompt engineering capabilities
- Conversation history management
- Metadata persistence and auto-saving
- Support for both single and batch processing
- Image input support for multimodal agents
Use Cases:
- Multi-perspective analysis (financial, legal, technical reviews)
- Consensus building and voting systems
- Parallel data processing and analysis
- A/B testing with different agent configurations
- Redundancy and reliability improvements
Args:
name (str, optional): Unique identifier for the workflow instance.
Defaults to "ConcurrentWorkflow".
description (str, optional): Human-readable description of the workflow's purpose.
Defaults to "Execution of multiple agents concurrently".
agents (List[Union[Agent, Callable]], optional): List of Agent instances or callable objects
to execute concurrently. Each agent should implement a `run` method.
Defaults to empty list.
metadata_output_path (str, optional): File path for saving execution metadata and results.
Supports JSON format. Defaults to "agent_metadata.json".
auto_save (bool, optional): Whether to automatically save conversation history and metadata
after each run. Defaults to True.
output_type (str, optional): Format for aggregating agent outputs. Options include:
- "dict-all-except-first": Dictionary with all agent outputs except the first
- "dict": Dictionary with all agent outputs
- "str": Concatenated string of all outputs
- "list": List of individual agent outputs
Defaults to "dict-all-except-first".
max_loops (int, optional): Maximum number of execution loops for each agent.
Defaults to 1.
auto_generate_prompts (bool, optional): Enable automatic prompt engineering for all agents.
When True, agents will enhance their prompts automatically. Defaults to False.
show_dashboard (bool, optional): Enable real-time dashboard display showing agent status,
progress, and outputs. Useful for monitoring and debugging. Defaults to False.
*args: Additional positional arguments passed to the BaseSwarm parent class.
**kwargs: Additional keyword arguments passed to the BaseSwarm parent class.
Raises:
ValueError: If agents list is empty or None.
ValueError: If description is empty or None.
TypeError: If agents list contains non-Agent, non-callable objects.
Attributes:
name (str): The workflow instance name.
description (str): The workflow description.
agents (List[Union[Agent, Callable]]): List of agents to execute.
metadata_output_path (str): Path for metadata output file.
auto_save (bool): Auto-save flag for metadata persistence.
output_type (str): Output aggregation format.
max_loops (int): Maximum execution loops per agent.
auto_generate_prompts (bool): Auto prompt engineering flag.
show_dashboard (bool): Dashboard display flag.
agent_statuses (dict): Real-time status tracking for each agent.
conversation (Conversation): Conversation history manager.
Example:
Basic usage with multiple agents:
>>> from swarms import Agent, ConcurrentWorkflow
>>>
>>> # Create specialized agents
>>> financial_agent = Agent(
... agent_name="Financial-Analyst",
... system_prompt="You are a financial analysis expert...",
... model_name="gpt-4"
... )
>>> legal_agent = Agent(
... agent_name="Legal-Advisor",
... system_prompt="You are a legal expert...",
... model_name="gpt-4"
... )
>>>
>>> # Create workflow
>>> workflow = ConcurrentWorkflow(
... name="Multi-Expert-Analysis",
... agents=[financial_agent, legal_agent],
... show_dashboard=True,
... auto_save=True
... )
>>>
>>> # Execute analysis
>>> task = "Analyze the risks of investing in cryptocurrency"
>>> results = workflow.run(task)
>>> print(f"Analysis complete with {len(results)} perspectives")
Batch processing example:
>>> tasks = [
... "Analyze Q1 financial performance",
... "Review Q2 market trends",
... "Forecast Q3 projections"
... ]
>>> batch_results = workflow.batch_run(tasks)
>>> print(f"Processed {len(batch_results)} quarterly reports")
Note:
- Agents are executed using ThreadPoolExecutor with 95% of available CPU cores
- Each agent runs independently and cannot communicate with others during execution
- The workflow maintains conversation history across all runs for context
- Dashboard mode disables individual agent printing to prevent output conflicts
- Error handling ensures partial results are available even if some agents fail
"""
"""Concurrent workflow for running multiple agents simultaneously."""
def __init__(
self,
id: str = swarm_id(),
name: str = "ConcurrentWorkflow",
description: str = "Execution of multiple agents concurrently",
agents: List[Union[Agent, Callable]] = None,
metadata_output_path: str = "agent_metadata.json",
auto_save: bool = True,
output_type: str = "dict-all-except-first",
max_loops: int = 1,
@ -149,17 +33,6 @@ class ConcurrentWorkflow(BaseSwarm):
*args,
**kwargs,
):
"""
Initialize the ConcurrentWorkflow with configuration parameters.
Performs initialization, validation, and setup of internal components including
conversation management, agent status tracking, and dashboard configuration.
Note:
The constructor automatically performs reliability checks and configures
agents for dashboard mode if enabled. Agent print outputs are disabled
when dashboard mode is active to prevent display conflicts.
"""
super().__init__(
name=name,
description=description,
@ -170,7 +43,9 @@ class ConcurrentWorkflow(BaseSwarm):
self.name = name
self.description = description
self.agents = agents
self.metadata_output_path = metadata_output_path
self.metadata_output_path = (
f"concurrent_workflow_name_{name}_id_{id}.json"
)
self.auto_save = auto_save
self.max_loops = max_loops
self.auto_generate_prompts = auto_generate_prompts
@ -188,58 +63,14 @@ class ConcurrentWorkflow(BaseSwarm):
self.agents = self.fix_agents()
def fix_agents(self):
"""
Configure agents for dashboard mode by disabling individual print outputs.
When dashboard mode is enabled, individual agent print outputs can interfere
with the dashboard display. This method disables print_on for all agents
to ensure clean dashboard rendering.
Returns:
List[Agent]: The modified list of agents with print_on disabled.
Note:
This method only modifies agents when show_dashboard is True.
Agent functionality is not affected, only their output display behavior.
Example:
>>> workflow = ConcurrentWorkflow(show_dashboard=True, agents=[agent1, agent2])
>>> # Agents automatically configured for dashboard mode
>>> all(not agent.print_on for agent in workflow.agents)
True
"""
"""Configure agents for dashboard mode."""
if self.show_dashboard is True:
for agent in self.agents:
agent.print_on = False
return self.agents
def reliability_check(self):
"""
Perform comprehensive validation of workflow configuration and agents.
Validates that the workflow is properly configured with valid agents and
provides warnings for suboptimal configurations. This method is called
automatically during initialization.
Validates:
- Agents list is not None or empty
- At least one agent is provided
- Warns if only one agent is provided (suboptimal for concurrent execution)
Raises:
ValueError: If agents list is None or empty.
Logs:
Warning: If only one agent is provided (concurrent execution not beneficial).
Error: If validation fails with detailed error information.
Example:
>>> workflow = ConcurrentWorkflow(agents=[])
ValueError: ConcurrentWorkflow: No agents provided
>>> workflow = ConcurrentWorkflow(agents=[single_agent])
# Logs warning about single agent usage
"""
"""Validate workflow configuration."""
try:
if self.agents is None:
raise ValueError(
@ -253,7 +84,7 @@ class ConcurrentWorkflow(BaseSwarm):
if len(self.agents) == 1:
logger.warning(
"ConcurrentWorkflow: Only one agent provided. With ConcurrentWorkflow, you should use at least 2+ agents."
"ConcurrentWorkflow: Only one agent provided."
)
except Exception as e:
logger.error(
@ -262,33 +93,7 @@ class ConcurrentWorkflow(BaseSwarm):
raise
def activate_auto_prompt_engineering(self):
"""
Enable automatic prompt engineering for all agents in the workflow.
When activated, each agent will automatically enhance and optimize their
system prompts based on the task context and their previous interactions.
This can improve response quality but may increase execution time.
Side Effects:
- Sets auto_generate_prompt=True for all agents in the workflow
- Affects subsequent agent.run() calls, not retroactively
- May increase latency due to prompt optimization overhead
Note:
This method can be called at any time, but changes only affect
future agent executions. Already running agents are not affected.
Example:
>>> workflow = ConcurrentWorkflow(agents=[agent1, agent2])
>>> workflow.activate_auto_prompt_engineering()
>>> # All agents will now auto-generate optimized prompts
>>> result = workflow.run("Complex analysis task")
>>> # Agents used enhanced prompts for better performance
See Also:
- Agent.auto_generate_prompt: Individual agent prompt engineering
- auto_generate_prompts: Constructor parameter for default behavior
"""
"""Enable automatic prompt engineering."""
if self.auto_generate_prompts is True:
for agent in self.agents:
agent.auto_generate_prompt = True
@ -297,48 +102,8 @@ class ConcurrentWorkflow(BaseSwarm):
self,
title: str = "ConcurrentWorkflow Dashboard",
is_final: bool = False,
) -> None:
"""
Display a real-time dashboard showing the current status of all agents.
Renders a formatted dashboard with agent names, execution status, and
output previews. The dashboard is updated in real-time during workflow
execution to provide visibility into agent progress and results.
Args:
title (str, optional): The dashboard title to display at the top.
Defaults to "🤖 ConcurrentWorkflow Dashboard".
is_final (bool, optional): Whether this is the final dashboard display
after all agents have completed. Changes formatting and styling.
Defaults to False.
Side Effects:
- Prints formatted dashboard to console
- Updates display in real-time during execution
- May clear previous dashboard content for clean updates
Note:
This method is automatically called during workflow execution when
show_dashboard=True. Manual calls are supported for custom monitoring.
Dashboard Status Values:
- "pending": Agent queued but not yet started
- "running": Agent currently executing task
- "completed": Agent finished successfully
- "error": Agent execution failed with error
Example:
>>> workflow = ConcurrentWorkflow(agents=[agent1, agent2], show_dashboard=True)
>>> workflow.display_agent_dashboard("Custom Dashboard Title")
# Displays:
# 🤖 Custom Dashboard Title
# ┌─────────────────┬─────────┬──────────────────┐
# │ Agent Name │ Status │ Output Preview │
# ├─────────────────┼─────────┼──────────────────┤
# │ Financial-Agent │ running │ Analyzing data...│
# │ Legal-Agent │ pending │ │
# └─────────────────┴─────────┴──────────────────┘
"""
):
"""Display real-time dashboard."""
agents_data = [
{
"name": agent.agent_name,
@ -358,69 +123,11 @@ class ConcurrentWorkflow(BaseSwarm):
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""
Execute all agents concurrently with real-time dashboard monitoring.
This method provides the same concurrent execution as _run() but with
enhanced real-time monitoring through a visual dashboard. Agent status
updates are displayed in real-time, showing progress, completion, and
any errors that occur during execution.
Args:
task (str): The task description or prompt to be executed by all agents.
This will be passed to each agent's run() method.
img (Optional[str], optional): Path to a single image file for agents
that support multimodal input. Defaults to None.
imgs (Optional[List[str]], optional): List of image file paths for
agents that support multiple image inputs. Defaults to None.
Returns:
Any: Formatted output based on the configured output_type setting.
The return format depends on the output_type parameter set during
workflow initialization.
Raises:
Exception: Re-raises any exceptions from agent execution after
updating the dashboard with error status.
Side Effects:
- Displays initial dashboard showing all agents as "pending"
- Updates dashboard in real-time as agents start, run, and complete
- Displays final dashboard with all results when execution completes
- Adds all task inputs and agent outputs to conversation history
- Automatically cleans up dashboard display resources
Dashboard Flow:
1. Initial dashboard shows all agents as "pending"
2. As agents start, status updates to "running"
3. As agents complete, status updates to "completed" with output preview
4. Final dashboard shows complete results summary
5. Dashboard resources are cleaned up automatically
Performance:
- Uses 95% of available CPU cores for optimal concurrency
- ThreadPoolExecutor manages thread lifecycle automatically
- Real-time updates have minimal performance overhead
Example:
>>> workflow = ConcurrentWorkflow(
... agents=[financial_agent, legal_agent],
... show_dashboard=True
... )
>>> result = workflow.run_with_dashboard(
... task="Analyze the merger proposal",
... img="company_financials.png"
... )
# Dashboard shows real-time progress:
# Agent-1: pending -> running -> completed
# Agent-2: pending -> running -> completed
>>> print("Analysis complete:", result)
Note:
This method is automatically called when show_dashboard=True.
For headless execution without dashboard, use _run() directly.
"""
"""Execute agents with dashboard monitoring."""
try:
self.conversation.add(role="User", content=task)
@ -431,61 +138,61 @@ class ConcurrentWorkflow(BaseSwarm):
"output": "",
}
# Display initial dashboard if enabled
if self.show_dashboard:
self.display_agent_dashboard()
# Use 95% of available CPU cores for optimal performance
max_workers = int(get_cpu_cores() * 0.95)
# Create a list to store all futures and their results
futures = []
results = []
def run_agent_with_status(agent, task, img, imgs):
try:
# Update status to running
self.agent_statuses[agent.agent_name][
"status"
] = "running"
if self.show_dashboard:
self.display_agent_dashboard()
# Create a streaming callback for this agent with throttling
last_update_time = [
0
] # Use list to allow modification in nested function
update_interval = 0.1 # Update dashboard every 100ms for smooth streaming
def streaming_callback(chunk: str):
"""Update dashboard with streaming content"""
if self.show_dashboard:
# Append the chunk to the agent's current output
current_output = self.agent_statuses[
agent.agent_name
]["output"]
self.agent_statuses[agent.agent_name][
"output"
] = (current_output + chunk)
# Throttle dashboard updates for better performance
current_time = time.time()
last_update_time = [0]
update_interval = 0.1
def agent_streaming_callback(chunk: str):
try:
if self.show_dashboard and chunk:
current_output = self.agent_statuses[
agent.agent_name
]["output"]
self.agent_statuses[agent.agent_name][
"output"
] = (current_output + chunk)
current_time = time.time()
if (
current_time - last_update_time[0]
>= update_interval
):
self.display_agent_dashboard()
last_update_time[0] = current_time
if (
current_time - last_update_time[0]
>= update_interval
streaming_callback
and chunk is not None
):
self.display_agent_dashboard()
last_update_time[0] = current_time
streaming_callback(
agent.agent_name, chunk, False
)
except Exception as callback_error:
logger.warning(
f"Dashboard streaming callback failed for {agent.agent_name}: {str(callback_error)}"
)
# Run the agent with streaming callback
output = agent.run(
task=task,
img=img,
imgs=imgs,
streaming_callback=streaming_callback,
streaming_callback=agent_streaming_callback,
)
# Update status to completed
self.agent_statuses[agent.agent_name][
"status"
] = "completed"
@ -495,9 +202,11 @@ class ConcurrentWorkflow(BaseSwarm):
if self.show_dashboard:
self.display_agent_dashboard()
if streaming_callback:
streaming_callback(agent.agent_name, "", True)
return output
except Exception as e:
# Update status to error
self.agent_statuses[agent.agent_name][
"status"
] = "error"
@ -506,24 +215,25 @@ class ConcurrentWorkflow(BaseSwarm):
] = f"Error: {str(e)}"
if self.show_dashboard:
self.display_agent_dashboard()
if streaming_callback:
streaming_callback(
agent.agent_name, f"Error: {str(e)}", True
)
raise
# Run agents concurrently using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all agent tasks
futures = [
executor.submit(
run_agent_with_status, agent, task, img, imgs
)
for agent in self.agents
]
# Wait for all futures to complete
concurrent.futures.wait(futures)
# Process results in order of completion
for future, agent in zip(futures, self.agents):
try:
output = future.result()
@ -536,11 +246,9 @@ class ConcurrentWorkflow(BaseSwarm):
(agent.agent_name, f"Error: {str(e)}")
)
# Add all results to conversation
for agent_name, output in results:
self.conversation.add(role=agent_name, content=output)
# Display final dashboard if enabled
if self.show_dashboard:
self.display_agent_dashboard(
"Final ConcurrentWorkflow Dashboard",
@ -548,11 +256,9 @@ class ConcurrentWorkflow(BaseSwarm):
)
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
conversation=self.conversation, type=self.output_type
)
finally:
# Always clean up the dashboard display
if self.show_dashboard:
formatter.stop_dashboard()
@ -561,316 +267,160 @@ class ConcurrentWorkflow(BaseSwarm):
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""
Execute all agents concurrently without dashboard monitoring (headless mode).
This is the core execution method that runs all agents simultaneously using
ThreadPoolExecutor for optimal performance. Results are collected and
formatted according to the configured output_type.
Args:
task (str): The task description or prompt to be executed by all agents.
Each agent receives the same task input for independent processing.
img (Optional[str], optional): Path to a single image file for multimodal
agents. The same image is provided to all agents. Defaults to None.
imgs (Optional[List[str]], optional): List of image file paths for agents
that support multiple image inputs. All agents receive the same image list.
Defaults to None.
Returns:
Any: Formatted output according to the output_type configuration:
- "dict-all-except-first": Dict with all agent outputs except first
- "dict": Dict with all agent outputs keyed by agent name
- "str": Concatenated string of all agent outputs
- "list": List of individual agent outputs in completion order
Side Effects:
- Adds the user task to conversation history
- Adds each agent's output to conversation history upon completion
- No visual output or dashboard updates (headless execution)
Performance Characteristics:
- Uses ThreadPoolExecutor with 95% of available CPU cores
- Agents execute truly concurrently, not sequentially
- Results are collected in completion order, not submission order
- Memory efficient for large numbers of agents
Error Handling:
- Individual agent failures don't stop other agents
- Failed agents have their exceptions logged but execution continues
- Partial results are still returned for successful agents
Thread Safety:
- Conversation object handles concurrent access safely
- Agent status is not tracked in this method (dashboard-free)
- Each agent runs in isolation without shared state
Example:
>>> workflow = ConcurrentWorkflow(
... agents=[agent1, agent2, agent3],
... output_type="dict"
... )
>>> result = workflow._run(
... task="Analyze market trends for Q4",
... img="market_chart.png"
... )
>>> print(f"Received {len(result)} agent analyses")
>>> # Result format: {"agent1": "analysis1", "agent2": "analysis2", ...}
Note:
This method is called automatically by run() when show_dashboard=False.
For monitoring and real-time updates, use run_with_dashboard() instead.
"""
"""Execute agents concurrently without dashboard."""
self.conversation.add(role="User", content=task)
# Use 95% of available CPU cores for optimal performance
max_workers = int(get_cpu_cores() * 0.95)
# Run agents concurrently using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all agent tasks and store with their index
future_to_agent = {
executor.submit(
agent.run, task=task, img=img, imgs=imgs
self._run_agent_with_streaming,
agent,
task,
img,
imgs,
streaming_callback,
): agent
for agent in self.agents
}
# Collect results and add to conversation in completion order
for future in concurrent.futures.as_completed(
future_to_agent
):
agent = future_to_agent[future]
output = future.result()
self.conversation.add(role=agent.name, content=output)
self.conversation.add(
role=agent.agent_name, content=output
)
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
conversation=self.conversation, type=self.output_type
)
def _run_agent_with_streaming(
self,
agent: Union[Agent, Callable],
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""Run single agent with streaming support."""
if streaming_callback is None:
return agent.run(task=task, img=img, imgs=imgs)
def agent_streaming_callback(chunk: str):
try:
# Safely call the streaming callback
if streaming_callback and chunk is not None:
streaming_callback(agent.agent_name, chunk, False)
except Exception as callback_error:
logger.warning(
f"Streaming callback failed for {agent.agent_name}: {str(callback_error)}"
)
try:
output = agent.run(
task=task,
img=img,
imgs=imgs,
streaming_callback=agent_streaming_callback,
)
# Ensure completion callback is called even if there were issues
try:
streaming_callback(agent.agent_name, "", True)
except Exception as callback_error:
logger.warning(
f"Completion callback failed for {agent.agent_name}: {str(callback_error)}"
)
return output
except Exception as e:
error_msg = f"Agent {agent.agent_name} failed: {str(e)}"
logger.error(error_msg)
# Try to send error through callback
try:
streaming_callback(
agent.agent_name, f"Error: {str(e)}", True
)
except Exception as callback_error:
logger.warning(
f"Error callback failed for {agent.agent_name}: {str(callback_error)}"
)
raise
def cleanup(self):
"""Clean up resources and connections."""
try:
# Reset agent statuses
for agent in self.agents:
if hasattr(agent, "cleanup"):
try:
agent.cleanup()
except Exception as e:
logger.warning(
f"Failed to cleanup agent {agent.agent_name}: {str(e)}"
)
# Clear conversation if needed
if hasattr(self, "conversation"):
# Keep the conversation for result formatting but reset for next run
pass
except Exception as e:
logger.error(f"Cleanup failed: {str(e)}")
def run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""
Execute all agents concurrently on the given task with optional dashboard monitoring.
This is the main entry point for workflow execution. Automatically selects
between dashboard-enabled execution (run_with_dashboard) or headless execution
(_run) based on the show_dashboard configuration.
Args:
task (str): The task description, prompt, or instruction to be executed
by all agents concurrently. Each agent processes the same task
independently.
img (Optional[str], optional): Path to a single image file for agents
that support multimodal (text + image) input. Defaults to None.
imgs (Optional[List[str]], optional): List of image file paths for
agents that support multiple image inputs. Defaults to None.
Returns:
Any: Aggregated results from all agents formatted according to output_type:
- "dict-all-except-first": Dictionary excluding first agent's output
- "dict": Complete dictionary with all agent outputs
- "str": Concatenated string of all outputs
- "list": List of individual agent outputs
Execution Modes:
- Dashboard Mode (show_dashboard=True): Provides real-time visual monitoring
with status updates, progress tracking, and error visibility
- Headless Mode (show_dashboard=False): Silent execution with no visual output,
optimized for performance and automation scenarios
Concurrent Execution:
- All agents run simultaneously using ThreadPoolExecutor
- Utilizes 95% of available CPU cores for optimal performance
- Thread-safe conversation history management
- Independent agent execution without inter-agent communication
Error Resilience:
- Individual agent failures don't halt the entire workflow
- Partial results are returned for successful agents
- Comprehensive error logging for debugging
- Graceful degradation under failure conditions
Example:
Basic concurrent execution:
>>> workflow = ConcurrentWorkflow(agents=[analyst, reviewer, summarizer])
>>> result = workflow.run("Evaluate the new product proposal")
>>> print(f"Received insights from {len(workflow.agents)} experts")
With dashboard monitoring:
>>> workflow = ConcurrentWorkflow(
... agents=[financial_agent, legal_agent],
... show_dashboard=True
... )
>>> result = workflow.run("Review merger agreement")
# Real-time dashboard shows progress and completion status
Multimodal analysis:
>>> result = workflow.run(
... task="Analyze this chart and provide insights",
... img="quarterly_results.png"
... )
Performance Tips:
- Use 2+ agents to benefit from concurrent execution
- Dashboard mode adds minimal overhead for monitoring
- Larger agent counts scale well with available CPU cores
- Consider batch_run() for multiple related tasks
See Also:
- batch_run(): For processing multiple tasks sequentially
- run_with_dashboard(): For direct dashboard execution
- _run(): For direct headless execution
"""
if self.show_dashboard:
return self.run_with_dashboard(task, img, imgs)
else:
return self._run(task, img, imgs)
"""Execute all agents concurrently."""
try:
if self.show_dashboard:
result = self.run_with_dashboard(
task, img, imgs, streaming_callback
)
else:
result = self._run(
task, img, imgs, streaming_callback
)
return result
finally:
# Always cleanup resources
self.cleanup()
def batch_run(
self,
tasks: List[str],
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
):
"""
Execute the workflow on multiple tasks sequentially with concurrent agent processing.
This method processes a list of tasks one by one, where each task is executed
by all agents concurrently. This is ideal for batch processing scenarios where
you have multiple related tasks that need the same multi-agent analysis.
Args:
tasks (List[str]): List of task descriptions, prompts, or instructions.
Each task will be processed by all agents concurrently before
moving to the next task. Tasks are processed sequentially.
imgs (Optional[List[str]], optional): List of image file paths corresponding
to each task. If provided, imgs[i] will be used for tasks[i].
If fewer images than tasks are provided, remaining tasks will
execute without images. Defaults to None.
Returns:
List[Any]: List of results, one for each task. Each result is formatted
according to the workflow's output_type configuration. The length
of the returned list equals the length of the input tasks list.
Processing Flow:
1. Tasks are processed sequentially (not concurrently with each other)
2. For each task, all agents execute concurrently
3. Results are collected and formatted for each task
4. Conversation history accumulates across all tasks
5. Final result list contains aggregated outputs for each task
Image Handling:
- If imgs is None: All tasks execute without images
- If imgs has fewer items than tasks: Extra tasks execute without images
- If imgs has more items than tasks: Extra images are ignored
- Each task gets at most one corresponding image
Dashboard Behavior:
- If show_dashboard=True, dashboard resets and updates for each task
- Progress is shown separately for each task's agent execution
- Final dashboard shows results from the last task only
Memory and Performance:
- Conversation history grows with each task (cumulative)
- Memory usage scales with number of tasks and agent outputs
- CPU utilization is optimal during each task's concurrent execution
- Consider clearing conversation history for very large batch jobs
Example:
Financial analysis across quarters:
>>> workflow = ConcurrentWorkflow(agents=[analyst1, analyst2, analyst3])
>>> quarterly_tasks = [
... "Analyze Q1 financial performance and market position",
... "Analyze Q2 financial performance and market position",
... "Analyze Q3 financial performance and market position",
... "Analyze Q4 financial performance and market position"
... ]
>>> results = workflow.batch_run(quarterly_tasks)
>>> print(f"Completed {len(results)} quarterly analyses")
>>> # Each result contains insights from all 3 analysts for that quarter
With corresponding images:
>>> tasks = ["Analyze chart trends", "Review performance metrics"]
>>> charts = ["q1_chart.png", "q2_chart.png"]
>>> results = workflow.batch_run(tasks, imgs=charts)
>>> # Task 0 uses q1_chart.png, Task 1 uses q2_chart.png
Batch processing with dashboard:
>>> workflow = ConcurrentWorkflow(
... agents=[agent1, agent2],
... show_dashboard=True
... )
>>> results = workflow.batch_run([
... "Process document batch 1",
... "Process document batch 2",
... "Process document batch 3"
... ])
# Dashboard shows progress for each batch separately
Use Cases:
- Multi-period financial analysis (quarterly/yearly reports)
- Batch document processing with multiple expert reviews
- A/B testing across different scenarios or datasets
- Systematic evaluation of multiple proposals or options
- Comparative analysis across time periods or categories
Note:
Tasks are intentionally processed sequentially to maintain result
order and prevent resource contention. For truly concurrent task
processing, create separate workflow instances.
See Also:
- run(): For single task execution
- ConcurrentWorkflow: For configuration options
"""
"""Execute workflow on multiple tasks sequentially."""
results = []
for idx, task in enumerate(tasks):
img = None
if imgs is not None:
# Use the img at the same index if available, else None
if idx < len(imgs):
img = imgs[idx]
results.append(self.run(task=task, img=img))
if imgs is not None and idx < len(imgs):
img = imgs[idx]
results.append(
self.run(
task=task,
img=img,
streaming_callback=streaming_callback,
)
)
return results
# if __name__ == "__main__":
# # Assuming you've already initialized some agents outside of this class
# agents = [
# Agent(
# agent_name=f"Financial-Analysis-Agent-{i}",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# model_name="gpt-4o",
# max_loops=1,
# )
# for i in range(3) # Adjust number of agents as needed
# ]
# # Initialize the workflow with the list of agents
# workflow = ConcurrentWorkflow(
# agents=agents,
# metadata_output_path="agent_metadata_4.json",
# return_str_on=True,
# )
# # Define the task for all agents
# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?"
# # Run the workflow and save metadata
# metadata = workflow.run(task)
# print(metadata)

@ -1,149 +1,27 @@
import asyncio
import concurrent.futures
import os
import re
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, List, Optional
from typing import Any, List, Optional
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import run_agents_concurrently
from swarms.utils.output_types import OutputType
from swarms.structs.swarm_id import swarm_id
from swarms.utils.formatter import formatter
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
logger = initialize_logger(log_folder="majority_voting")
def extract_last_python_code_block(text):
"""
Extracts the last Python code block from the given text.
Args:
text (str): The text to search for Python code blocks.
Returns:
str or None: The last Python code block found in the text, or None if no code block is found.
"""
# The regular expression pattern for Python code blocks
pattern = r"```[pP]ython(.*?)```"
# Find all matches in the text
matches = re.findall(pattern, text, re.DOTALL)
# If there are matches, return the last one
if matches:
return matches[-1].strip()
else:
return None
def parse_code_completion(agent_response, question):
"""
Parses the code completion response from the agent and extracts the last Python code block.
Args:
agent_response (str): The response from the agent.
question (str): The original question.
Returns:
tuple: A tuple containing the parsed Python code and a boolean indicating success.
"""
python_code = extract_last_python_code_block(agent_response)
if python_code is None:
if agent_response.count("impl]") == 0:
python_code = agent_response
else:
python_code_lines = agent_response.split("\n")
python_code = ""
in_func = False
for line in python_code_lines:
if in_func:
python_code += line + "\n"
if "impl]" in line:
in_func = True
if python_code.count("def") == 0:
python_code = question + python_code
return python_code, True
def most_frequent(
clist: list,
cmp_func: callable = None,
):
"""
Finds the most frequent element in a list based on a comparison function.
Args:
clist (list): The list of elements to search.
cmp_func (function, optional): The comparison function used to determine the frequency of elements.
If not provided, the default comparison function is used.
Returns:
tuple: A tuple containing the most frequent element and its frequency.
"""
counter = 0
num = clist[0]
for i in clist:
current_frequency = sum(cmp_func(i, item) for item in clist)
if current_frequency > counter:
counter = current_frequency
num = i
return num, counter
def majority_voting(answers: List[str]):
"""
Performs majority voting on a list of answers and returns the most common answer.
Args:
answers (list): A list of answers.
Returns:
The most common answer in the list.
"""
counter = Counter(answers)
if counter:
answer = counter.most_common(1)[0][0]
else:
answer = "I don't know"
return answer
class MajorityVoting:
"""
Class representing a majority voting system for agents.
Args:
agents (list): A list of agents to be used in the majority voting system.
output_parser (function, optional): A function used to parse the output of the agents.
If not provided, the default majority voting function is used.
autosave (bool, optional): A boolean indicating whether to autosave the conversation to a file.
verbose (bool, optional): A boolean indicating whether to enable verbose logging.
Examples:
>>> from swarms.structs.agent import Agent
>>> from swarms.structs.majority_voting import MajorityVoting
>>> agents = [
... Agent("GPT-3"),
... Agent("Codex"),
... Agent("Tabnine"),
... ]
>>> majority_voting = MajorityVoting(agents)
>>> majority_voting.run("What is the capital of France?")
'Paris'
"""
def __init__(
self,
id: str = swarm_id(),
name: str = "MajorityVoting",
description: str = "A majority voting system for agents",
agents: List[Agent] = [],
output_parser: Optional[Callable] = majority_voting,
agents: List[Agent] = None,
consensus_agent: Optional[Agent] = None,
autosave: bool = False,
verbose: bool = False,
@ -152,10 +30,10 @@ class MajorityVoting:
*args,
**kwargs,
):
self.id = id
self.name = name
self.description = description
self.agents = agents
self.output_parser = output_parser
self.consensus_agent = consensus_agent
self.autosave = autosave
self.verbose = verbose
@ -179,6 +57,10 @@ class MajorityVoting:
title="Majority Voting",
)
if self.consensus_agent is None:
# if no consensus agent is provided, use the last agent
self.consensus_agent = self.agents[-1]
def run(self, task: str, *args, **kwargs) -> List[Any]:
"""
Runs the majority voting system and returns the majority vote.
@ -290,21 +172,3 @@ class MajorityVoting:
future.result()
for future in concurrent.futures.as_completed(futures)
]
async def run_async(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
"""
Runs the majority voting system concurrently using asyncio.
Args:
tasks (List[str]): List of tasks to be performed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
List[Any]: List of majority votes for each task.
"""
return await asyncio.gather(
*[self.run(task, *args, **kwargs) for task in tasks]
)

@ -1,4 +1,3 @@
import threading
import time
import re
from typing import Any, Callable, Dict, List, Optional
@ -17,13 +16,6 @@ from rich.spinner import Spinner
from rich.markdown import Markdown
# Global lock to ensure only a single Rich Live context is active at any moment.
# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same
# console raise runtime errors. Using a module-level lock serialises access and
# prevents crashes when multiple agents stream simultaneously in different
# threads (e.g., in ConcurrentWorkflow).
live_render_lock = threading.Lock()
# Global Live display for the dashboard
dashboard_live = None
@ -490,62 +482,56 @@ class Formatter:
complete_response = ""
chunks_collected = []
# Acquire the lock so that only one Live panel is active at a time.
# Other threads will wait here until the current streaming completes,
# avoiding Rich.Live concurrency errors.
with live_render_lock:
# TRUE streaming with Rich's automatic text wrapping
with Live(
create_streaming_panel(streaming_text),
console=self.console,
refresh_per_second=20,
) as live:
try:
for part in streaming_response:
if (
hasattr(part, "choices")
and part.choices
and part.choices[0].delta.content
):
# Add ONLY the new chunk to the Text object with random color style
chunk = part.choices[0].delta.content
streaming_text.append(
chunk, style=text_style
)
complete_response += chunk
# Collect chunks if requested
if collect_chunks:
chunks_collected.append(chunk)
# Call chunk callback if provided
if on_chunk_callback:
on_chunk_callback(chunk)
# Update display with new text - Rich handles all wrapping automatically
live.update(
create_streaming_panel(
streaming_text, is_complete=False
)
# TRUE streaming with Rich's automatic text wrapping
with Live(
create_streaming_panel(streaming_text),
console=self.console,
refresh_per_second=20,
) as live:
try:
for part in streaming_response:
if (
hasattr(part, "choices")
and part.choices
and part.choices[0].delta.content
):
# Add ONLY the new chunk to the Text object with random color style
chunk = part.choices[0].delta.content
streaming_text.append(chunk, style=text_style)
complete_response += chunk
# Collect chunks if requested
if collect_chunks:
chunks_collected.append(chunk)
# Call chunk callback if provided
if on_chunk_callback:
on_chunk_callback(chunk)
# Update display with new text - Rich handles all wrapping automatically
live.update(
create_streaming_panel(
streaming_text, is_complete=False
)
# Final update to show completion
live.update(
create_streaming_panel(
streaming_text, is_complete=True
)
)
except Exception as e:
# Handle any streaming errors gracefully
streaming_text.append(
f"\n[Error: {str(e)}]", style="bold red"
# Final update to show completion
live.update(
create_streaming_panel(
streaming_text, is_complete=True
)
live.update(
create_streaming_panel(
streaming_text, is_complete=True
)
)
except Exception as e:
# Handle any streaming errors gracefully
streaming_text.append(
f"\n[Error: {str(e)}]", style="bold red"
)
live.update(
create_streaming_panel(
streaming_text, is_complete=True
)
)
return complete_response
@ -611,21 +597,20 @@ class Formatter:
title (str): The title of the dashboard.
is_final (bool): Whether this is the final update of the dashboard.
"""
with live_render_lock:
if self._dashboard_live is None:
# Create new Live display if none exists
self._dashboard_live = Live(
self._create_dashboard_table(agents_data, title),
console=self.console,
refresh_per_second=10, # Increased refresh rate
transient=False, # Make display persistent
)
self._dashboard_live.start()
else:
# Update existing Live display
self._dashboard_live.update(
self._create_dashboard_table(agents_data, title)
)
if self._dashboard_live is None:
# Create new Live display if none exists
self._dashboard_live = Live(
self._create_dashboard_table(agents_data, title),
console=self.console,
refresh_per_second=10, # Increased refresh rate
transient=False, # Make display persistent
)
self._dashboard_live.start()
else:
# Update existing Live display
self._dashboard_live.update(
self._create_dashboard_table(agents_data, title)
)
# If this is the final update, add a newline to separate from future output
if is_final:

@ -7,7 +7,7 @@ from typing import List, Optional
import litellm
import requests
from litellm import acompletion, completion, supports_vision
from litellm import completion, supports_vision
from loguru import logger
from pydantic import BaseModel
@ -231,6 +231,10 @@ class LiteLLM:
base_url: str = None,
api_key: str = None,
api_version: str = None,
reasoning_effort: str = None,
drop_params: bool = True,
thinking_tokens: int = None,
reasoning_enabled: bool = True,
*args,
**kwargs,
):
@ -284,6 +288,9 @@ class LiteLLM:
self.base_url = base_url
self.api_key = api_key
self.api_version = api_version
self.reasoning_effort = reasoning_effort
self.thinking_tokens = thinking_tokens
self.reasoning_enabled = reasoning_enabled
self.modalities = []
self.messages = [] # Initialize messages list
@ -296,7 +303,7 @@ class LiteLLM:
retries # Add retries for better reliability
)
litellm.drop_params = True
litellm.drop_params = drop_params
# Add system prompt if present
if self.system_prompt is not None:
@ -308,6 +315,63 @@ class LiteLLM:
self.init_args = args
self.init_kwargs = kwargs
self.reasoning_check()
def reasoning_check(self):
"""
Check if reasoning is enabled and supported by the model, and adjust temperature accordingly.
If reasoning is enabled and the model supports reasoning, set temperature to 1 for optimal reasoning.
Also logs information or warnings based on the model's reasoning support and configuration.
"""
if (
self.reasoning_enabled is True
and litellm.supports_reasoning(model=self.model_name)
is True
):
logger.info(
f"Model {self.model_name} supports reasoning and reasoning enabled is set to {self.reasoning_enabled}. Temperature will be set to 1 for better reasoning as some models may not work with low temperature."
)
self.temperature = 1
else:
logger.warning(
f"Model {self.model_name} does not support reasoning and reasoning enabled is set to {self.reasoning_enabled}. Temperature will not be set to 1."
)
if (
self.reasoning_enabled is True
and litellm.supports_reasoning(model=self.model_name)
is False
):
logger.warning(
f"Model {self.model_name} may or may not support reasoning and reasoning enabled is set to {self.reasoning_enabled}"
)
if (
self.reasoning_enabled is True
and self.check_if_model_name_uses_anthropic(
model_name=self.model_name
)
is True
):
if self.thinking_tokens is None:
logger.info(
f"Model {self.model_name} is an Anthropic model and reasoning enabled is set to {self.reasoning_enabled}. Thinking tokens is mandatory for Anthropic models."
)
self.thinking_tokens = self.max_tokens / 4
if (
self.reasoning_enabled is True
and self.check_if_model_name_uses_anthropic(
model_name=self.model_name
)
is True
):
logger.info(
"top_p must be greater than 0.95 for Anthropic models with reasoning enabled"
)
self.top_p = 0.95
def _process_additional_args(
self, completion_params: dict, runtime_args: tuple
):
@ -357,6 +421,61 @@ class LiteLLM:
out = out.model_dump()
return out
def output_for_reasoning(self, response: any):
"""
Handle output for reasoning models, formatting reasoning content and thinking blocks.
Args:
response: The response object from the LLM API call
Returns:
str: Formatted string containing reasoning content, thinking blocks, and main content
"""
output_parts = []
# Check if reasoning content is available
if (
hasattr(response.choices[0].message, "reasoning_content")
and response.choices[0].message.reasoning_content
):
output_parts.append(
f"Reasoning Content:\n{response.choices[0].message.reasoning_content}\n"
)
# Check if thinking blocks are available (Anthropic models)
if (
hasattr(response.choices[0].message, "thinking_blocks")
and response.choices[0].message.thinking_blocks
):
output_parts.append("Thinking Blocks:")
for i, block in enumerate(
response.choices[0].message.thinking_blocks, 1
):
block_type = block.get("type", "")
thinking = block.get("thinking", "")
output_parts.append(
f"Block {i} (Type: {block_type}):"
)
output_parts.append(f" Thinking: {thinking}")
output_parts.append("")
# Include tools if available
if (
hasattr(response.choices[0].message, "tool_calls")
and response.choices[0].message.tool_calls
):
output_parts.append(
f"Tools:\n{self.output_for_tools(response)}\n"
)
# Always include the main content
content = response.choices[0].message.content
if content:
output_parts.append(f"Content:\n{content}")
# Join all parts into a single string
return "\n".join(output_parts)
def _prepare_messages(
self,
task: Optional[str] = None,
@ -648,6 +767,26 @@ class LiteLLM:
f"Model {self.model_name} does not support vision"
)
@staticmethod
def check_if_model_name_uses_anthropic(model_name: str):
"""
Check if the model name uses Anthropic.
"""
if "anthropic" in model_name.lower():
return True
else:
return False
@staticmethod
def check_if_model_name_uses_openai(model_name: str):
"""
Check if the model name uses OpenAI.
"""
if "openai" in model_name.lower():
return True
else:
return False
def run(
self,
task: str,
@ -737,6 +876,29 @@ class LiteLLM:
if self.modalities and len(self.modalities) >= 2:
completion_params["modalities"] = self.modalities
if (
self.reasoning_effort is not None
and litellm.supports_reasoning(model=self.model_name)
is True
):
completion_params["reasoning_effort"] = (
self.reasoning_effort
)
else:
logger.warning(
f"Model {self.model_name} may or may not support reasoning"
)
if (
self.reasoning_enabled is True
and self.thinking_tokens is not None
):
thinking = {
"type": "enabled",
"budget_tokens": self.thinking_tokens,
}
completion_params["thinking"] = thinking
# Process additional args if any
self._process_additional_args(completion_params, args)
@ -747,6 +909,13 @@ class LiteLLM:
if self.stream:
return response # Return the streaming generator directly
# Handle reasoning model output
elif (
self.reasoning_enabled
and self.reasoning_effort is not None
):
return self.output_for_reasoning(response)
# Handle tool-based response
elif self.tools_list_dictionary is not None:
return self.output_for_tools(response)
@ -783,118 +952,6 @@ class LiteLLM:
"""
return self.run(task, *args, **kwargs)
async def arun(self, task: str, *args, **kwargs):
"""
Run the LLM model asynchronously for the given task.
Args:
task (str): The task to run the model for.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
str: The content of the response from the model.
"""
try:
# Extract image parameter from kwargs if present
img = kwargs.pop("img", None) if "img" in kwargs else None
messages = self._prepare_messages(task=task, img=img)
# Prepare common completion parameters
completion_params = {
"model": self.model_name,
"messages": messages,
"stream": self.stream,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
}
# Merge initialization kwargs first (lower priority)
if self.init_kwargs:
completion_params.update(self.init_kwargs)
# Merge runtime kwargs (higher priority - overrides init kwargs)
if kwargs:
completion_params.update(kwargs)
# Handle tool-based completion
if self.tools_list_dictionary is not None:
completion_params.update(
{
"tools": self.tools_list_dictionary,
"tool_choice": self.tool_choice,
"parallel_tool_calls": self.parallel_tool_calls,
}
)
# Process additional args if any
self._process_additional_args(completion_params, args)
# Make the completion call
response = await acompletion(**completion_params)
# Handle tool-based response
if self.tools_list_dictionary is not None:
return (
response.choices[0]
.message.tool_calls[0]
.function.arguments
)
elif self.return_all is True:
return response.model_dump()
elif "gemini" in self.model_name.lower():
return gemini_output_img_handler(response)
else:
# For non-Gemini models, return the content directly
return response.choices[0].message.content
except Exception as error:
logger.error(f"Error in LiteLLM arun: {str(error)}")
# if "rate_limit" in str(error).lower():
# logger.warning(
# "Rate limit hit, retrying with exponential backoff..."
# )
# await asyncio.sleep(2) # Use async sleep
# return await self.arun(task, *args, **kwargs)
raise error
async def _process_batch(
self, tasks: List[str], batch_size: int = 10
):
"""
Process a batch of tasks asynchronously.
Args:
tasks (List[str]): List of tasks to process.
batch_size (int): Size of each batch.
Returns:
List[str]: List of responses.
"""
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i : i + batch_size]
batch_results = await asyncio.gather(
*[self.arun(task) for task in batch],
return_exceptions=True,
)
# Handle any exceptions in the batch
for result in batch_results:
if isinstance(result, Exception):
logger.error(
f"Error in batch processing: {str(result)}"
)
results.append(str(result))
else:
results.append(result)
# Add a small delay between batches to avoid rate limits
if i + batch_size < len(tasks):
await asyncio.sleep(0.5)
return results
def batched_run(self, tasks: List[str], batch_size: int = 10):
"""
Run multiple tasks in batches synchronously.
@ -910,21 +967,3 @@ class LiteLLM:
f"Running {len(tasks)} tasks in batches of {batch_size}"
)
return asyncio.run(self._process_batch(tasks, batch_size))
async def batched_arun(
self, tasks: List[str], batch_size: int = 10
):
"""
Run multiple tasks in batches asynchronously.
Args:
tasks (List[str]): List of tasks to process.
batch_size (int): Size of each batch.
Returns:
List[str]: List of responses.
"""
logger.info(
f"Running {len(tasks)} tasks asynchronously in batches of {batch_size}"
)
return await self._process_batch(tasks, batch_size)

@ -131,9 +131,7 @@ def test_router_description(report):
"""Test ReasoningAgentRouter with custom description (only change description param)"""
start_time = time.time()
try:
result = test_agents_swarm(
description="Test description for router"
)
test_agents_swarm(description="Test description for router")
# Check if the description was set correctly
router = ReasoningAgentRouter(
description="Test description for router"
@ -164,7 +162,7 @@ def test_router_model_name(report):
"""Test ReasoningAgentRouter with custom model_name (only change model_name param)"""
start_time = time.time()
try:
result = test_agents_swarm(model_name="gpt-4")
test_agents_swarm(model_name="gpt-4")
router = ReasoningAgentRouter(model_name="gpt-4")
if router.model_name == "gpt-4":
report.add_result(
@ -192,9 +190,7 @@ def test_router_system_prompt(report):
"""Test ReasoningAgentRouter with custom system_prompt (only change system_prompt param)"""
start_time = time.time()
try:
result = test_agents_swarm(
system_prompt="You are a test router."
)
test_agents_swarm(system_prompt="You are a test router.")
router = ReasoningAgentRouter(
system_prompt="You are a test router."
)
@ -224,7 +220,7 @@ def test_router_max_loops(report):
"""Test ReasoningAgentRouter with custom max_loops (only change max_loops param)"""
start_time = time.time()
try:
result = test_agents_swarm(max_loops=5)
test_agents_swarm(max_loops=5)
router = ReasoningAgentRouter(max_loops=5)
if router.max_loops == 5:
report.add_result(
@ -252,7 +248,7 @@ def test_router_swarm_type(report):
"""Test ReasoningAgentRouter with custom swarm_type (only change swarm_type param)"""
start_time = time.time()
try:
result = test_agents_swarm(swarm_type="reasoning-agent")
test_agents_swarm(swarm_type="reasoning-agent")
router = ReasoningAgentRouter(swarm_type="reasoning-agent")
if router.swarm_type == "reasoning-agent":
report.add_result(
@ -281,7 +277,7 @@ def test_router_num_samples(report):
start_time = time.time()
try:
router = ReasoningAgentRouter(num_samples=3)
output = router.run("How many samples do you use?")
router.run("How many samples do you use?")
if router.num_samples == 3:
report.add_result(
"Parameter: num_samples",
@ -389,7 +385,7 @@ def test_router_eval(report):
"""Test ReasoningAgentRouter with eval enabled (only change eval param)"""
start_time = time.time()
try:
result = test_agents_swarm(eval=True)
test_agents_swarm(eval=True)
router = ReasoningAgentRouter(eval=True)
if router.eval is True:
report.add_result(
@ -417,7 +413,7 @@ def test_router_random_models_on(report):
"""Test ReasoningAgentRouter with random_models_on enabled (only change random_models_on param)"""
start_time = time.time()
try:
result = test_agents_swarm(random_models_on=True)
test_agents_swarm(random_models_on=True)
router = ReasoningAgentRouter(random_models_on=True)
if router.random_models_on is True:
report.add_result(
@ -445,7 +441,7 @@ def test_router_majority_voting_prompt(report):
"""Test ReasoningAgentRouter with custom majority_voting_prompt (only change majority_voting_prompt param)"""
start_time = time.time()
try:
result = test_agents_swarm(
test_agents_swarm(
majority_voting_prompt="Vote for the best answer."
)
router = ReasoningAgentRouter(
@ -526,7 +522,7 @@ def test_router_select_swarm(report):
majority_voting_prompt=DEFAULT_MAJORITY_VOTING_PROMPT,
)
# Run the method to test
result = router.select_swarm()
router.select_swarm()
# Determine if the result is as expected (not raising error is enough for this test)
report.add_result(
"Method: select_swarm()",

Loading…
Cancel
Save