feats - increase swarm router speed through cache, and also improve error logging

pull/970/merge
Kye Gomez 5 days ago
parent c6df819644
commit 610e02fe8c

@ -0,0 +1,81 @@
from swarms import Agent, SwarmRouter
# Agent 1: Risk Metrics Calculator
risk_metrics_agent = Agent(
agent_name="Risk-Metrics-Calculator",
agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility",
system_prompt="""You are a risk metrics specialist. Calculate and explain:
- Value at Risk (VaR)
- Sharpe ratio
- Volatility
- Maximum drawdown
- Beta coefficient
Provide clear, numerical results with brief explanations.""",
max_loops=1,
model_name="gpt-4.1",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 2: Portfolio Risk Analyzer
portfolio_risk_agent = Agent(
agent_name="Portfolio-Risk-Analyzer",
agent_description="Analyzes portfolio diversification and concentration risk",
system_prompt="""You are a portfolio risk analyst. Focus on:
- Portfolio diversification analysis
- Concentration risk assessment
- Correlation analysis
- Sector/asset allocation risk
- Liquidity risk evaluation
Provide actionable insights for risk reduction.""",
max_loops=1,
model_name="gpt-4.1",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 3: Market Risk Monitor
market_risk_agent = Agent(
agent_name="Market-Risk-Monitor",
agent_description="Monitors market conditions and identifies risk factors",
system_prompt="""You are a market risk monitor. Identify and assess:
- Market volatility trends
- Economic risk factors
- Geopolitical risks
- Interest rate risks
- Currency risks
Provide current risk alerts and trends.""",
max_loops=1,
model_name="gpt-4.1",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
swarm = SwarmRouter(
name="SwarmRouter",
description="A router that can route messages to the appropriate swarm",
agents=[
risk_metrics_agent,
portfolio_risk_agent,
],
max_loops=1,
swarm_type="SequentialWorkflow",
output_type="final",
)
out = swarm.run(
"What are the best ways to short the EU markets. Give me specific tickrs to short and strategies to use. Create a comprehensive report with all the information you can find."
)
print(out)

@ -0,0 +1,243 @@
#!/usr/bin/env python3
"""
SwarmRouter Performance Benchmark
This script benchmarks the performance improvements in SwarmRouter's _create_swarm method.
It compares the old O(n) elif chain vs the new O(1) factory pattern with caching.
"""
import time
import statistics
from typing import List, Dict, Any
from swarms.structs.swarm_router import SwarmRouter
from swarms.structs.agent import Agent
def create_mock_agents(num_agents: int = 3) -> List[Agent]:
"""Create mock agents for testing purposes."""
agents = []
for i in range(num_agents):
# Create a simple mock agent
agent = Agent(
agent_name=f"TestAgent_{i}",
system_prompt=f"You are test agent {i}",
model_name="gpt-4o-mini",
max_loops=1,
)
agents.append(agent)
return agents
def benchmark_swarm_creation(
swarm_types: List[str],
num_iterations: int = 100,
agents: List[Agent] = None,
) -> Dict[str, Dict[str, Any]]:
"""
Benchmark swarm creation performance for different swarm types.
Args:
swarm_types: List of swarm types to test
num_iterations: Number of iterations to run for each swarm type
agents: List of agents to use for testing
Returns:
Dictionary containing performance metrics for each swarm type
"""
if agents is None:
agents = create_mock_agents()
results = {}
for swarm_type in swarm_types:
print(f"Benchmarking {swarm_type}...")
times = []
for i in range(num_iterations):
# Create a fresh SwarmRouter instance for each test
router = SwarmRouter(
name=f"test-router-{i}",
agents=agents,
swarm_type=swarm_type,
telemetry_enabled=False,
)
# Time the _create_swarm method
start_time = time.perf_counter()
try:
router._create_swarm(task="test task")
end_time = time.perf_counter()
times.append(end_time - start_time)
except Exception as e:
print(f"Failed to create {swarm_type}: {e}")
continue
if times:
results[swarm_type] = {
"mean_time": statistics.mean(times),
"median_time": statistics.median(times),
"min_time": min(times),
"max_time": max(times),
"std_dev": (
statistics.stdev(times) if len(times) > 1 else 0
),
"total_iterations": len(times),
}
return results
def benchmark_caching_performance(
swarm_type: str = "SequentialWorkflow",
num_iterations: int = 50,
agents: List[Agent] = None,
) -> Dict[str, Any]:
"""
Benchmark the caching performance by creating the same swarm multiple times.
Args:
swarm_type: The swarm type to test
num_iterations: Number of iterations to run
agents: List of agents to use for testing
Returns:
Dictionary containing caching performance metrics
"""
if agents is None:
agents = create_mock_agents()
print(f"Benchmarking caching performance for {swarm_type}...")
router = SwarmRouter(
name="cache-test-router",
agents=agents,
swarm_type=swarm_type,
telemetry_enabled=False,
)
first_call_times = []
cached_call_times = []
for i in range(num_iterations):
# Clear cache for first call timing
router._swarm_cache.clear()
# Time first call (cache miss)
start_time = time.perf_counter()
router._create_swarm(task="test task", iteration=i)
end_time = time.perf_counter()
first_call_times.append(end_time - start_time)
# Time second call (cache hit)
start_time = time.perf_counter()
router._create_swarm(task="test task", iteration=i)
end_time = time.perf_counter()
cached_call_times.append(end_time - start_time)
return {
"first_call_mean": statistics.mean(first_call_times),
"cached_call_mean": statistics.mean(cached_call_times),
"speedup_factor": statistics.mean(first_call_times)
/ statistics.mean(cached_call_times),
"cache_hit_ratio": 1.0, # 100% cache hit rate in this test
}
def print_results(results: Dict[str, Dict[str, Any]]):
"""Print benchmark results in a formatted way."""
print("\n" + "=" * 60)
print("SWARM CREATION PERFORMANCE BENCHMARK RESULTS")
print("=" * 60)
for swarm_type, metrics in results.items():
print(f"\n{swarm_type}:")
print(f" Mean time: {metrics['mean_time']:.6f} seconds")
print(f" Median time: {metrics['median_time']:.6f} seconds")
print(f" Min time: {metrics['min_time']:.6f} seconds")
print(f" Max time: {metrics['max_time']:.6f} seconds")
print(f" Std dev: {metrics['std_dev']:.6f} seconds")
print(f" Iterations: {metrics['total_iterations']}")
def print_caching_results(results: Dict[str, Any]):
"""Print caching benchmark results."""
print("\n" + "=" * 60)
print("CACHING PERFORMANCE BENCHMARK RESULTS")
print("=" * 60)
print(
f"First call mean time: {results['first_call_mean']:.6f} seconds"
)
print(
f"Cached call mean time: {results['cached_call_mean']:.6f} seconds"
)
print(f"Speedup factor: {results['speedup_factor']:.2f}x")
print(f"Cache hit ratio: {results['cache_hit_ratio']:.1%}")
def main():
"""Run the complete benchmark suite."""
print("SwarmRouter Performance Benchmark")
print(
"Testing O(1) factory pattern with caching vs O(n) elif chain"
)
print("-" * 60)
# Create test agents
agents = create_mock_agents(3)
# Test different swarm types
swarm_types = [
"SequentialWorkflow",
"ConcurrentWorkflow",
"AgentRearrange",
"MixtureOfAgents",
"GroupChat",
"MultiAgentRouter",
"HeavySwarm",
"MALT",
]
# Run creation benchmark
creation_results = benchmark_swarm_creation(
swarm_types=swarm_types[:4], # Test first 4 for speed
num_iterations=20,
agents=agents,
)
print_results(creation_results)
# Run caching benchmark
caching_results = benchmark_caching_performance(
swarm_type="SequentialWorkflow",
num_iterations=10,
agents=agents,
)
print_caching_results(caching_results)
# Calculate overall performance improvement
if creation_results:
avg_creation_time = statistics.mean(
[
metrics["mean_time"]
for metrics in creation_results.values()
]
)
print("\n" + "=" * 60)
print("PERFORMANCE SUMMARY")
print("=" * 60)
print(
f"Average swarm creation time: {avg_creation_time:.6f} seconds"
)
print(
"Factory pattern provides O(1) lookup vs O(n) elif chain"
)
print(
f"Caching provides {caching_results['speedup_factor']:.2f}x speedup for repeated calls"
)
print("=" * 60)
if __name__ == "__main__":
main()

@ -1,206 +0,0 @@
# AI generate initial response
# AI decides how many "thinking rounds" it needs
# For each round:
# Generates 3 alternative responses
# Evaluates all responses
# Picks the best one
# Final response is the survivor of this AI battle royale
from swarms import Agent
# OpenAI function schema for determining thinking rounds
thinking_rounds_schema = {
"name": "determine_thinking_rounds",
"description": "Determines the optimal number of thinking rounds needed for a task",
"parameters": {
"type": "object",
"properties": {
"num_rounds": {
"type": "integer",
"description": "The number of thinking rounds needed (1-5)",
"minimum": 1,
"maximum": 5,
}
},
"required": ["num_rounds"],
},
}
# System prompt for determining thinking rounds
THINKING_ROUNDS_PROMPT = """You are an expert at determining the optimal number of thinking rounds needed for complex tasks. Your role is to analyze the task and determine how many rounds of thinking and evaluation would be most beneficial.
Consider the following factors when determining the number of rounds:
1. Task Complexity: More complex tasks may require more rounds
2. Potential for Multiple Valid Approaches: Tasks with multiple valid solutions need more rounds
3. Risk of Error: Higher-stakes tasks may benefit from more rounds
4. Time Sensitivity: Balance thoroughness with efficiency
Guidelines for number of rounds:
- 1 round: Simple, straightforward tasks with clear solutions
- 2-3 rounds: Moderately complex tasks with some ambiguity
- 4-5 rounds: Highly complex tasks with multiple valid approaches or high-stakes decisions
Your response should be a single number between 1 and 5, representing the optimal number of thinking rounds needed."""
# Schema for generating alternative responses
alternative_responses_schema = {
"name": "generate_alternatives",
"description": "Generates multiple alternative responses to a task",
"parameters": {
"type": "object",
"properties": {
"alternatives": {
"type": "array",
"description": "List of alternative responses",
"items": {
"type": "object",
"properties": {
"response": {
"type": "string",
"description": "The alternative response",
},
"reasoning": {
"type": "string",
"description": "Explanation of why this approach was chosen",
},
},
"required": ["response", "reasoning"],
},
"minItems": 3,
"maxItems": 3,
}
},
"required": ["alternatives"],
},
}
# Schema for evaluating responses
evaluation_schema = {
"name": "evaluate_responses",
"description": "Evaluates and ranks alternative responses",
"parameters": {
"type": "object",
"properties": {
"evaluation": {
"type": "object",
"properties": {
"best_response": {
"type": "string",
"description": "The selected best response",
},
"ranking": {
"type": "array",
"description": "Ranked list of responses from best to worst",
"items": {
"type": "object",
"properties": {
"response": {
"type": "string",
"description": "The response",
},
"score": {
"type": "number",
"description": "Score from 0-100",
},
"reasoning": {
"type": "string",
"description": "Explanation of the score",
},
},
"required": [
"response",
"score",
"reasoning",
],
},
},
},
"required": ["best_response", "ranking"],
}
},
"required": ["evaluation"],
},
}
# System prompt for generating alternatives
ALTERNATIVES_PROMPT = """You are an expert at generating diverse and creative alternative responses to tasks. Your role is to generate 3 distinct approaches to solving the given task.
For each alternative:
1. Consider a different perspective or approach
2. Provide clear reasoning for why this approach might be effective
3. Ensure alternatives are meaningfully different from each other
4. Maintain high quality and relevance to the task
Your response should include 3 alternatives, each with its own reasoning."""
# System prompt for evaluation
EVALUATION_PROMPT = """You are an expert at evaluating and comparing different responses to tasks. Your role is to critically analyze each response and determine which is the most effective.
Consider the following criteria when evaluating:
1. Relevance to the task
2. Completeness of the solution
3. Creativity and innovation
4. Practicality and feasibility
5. Clarity and coherence
Your response should include:
1. The best response selected
2. A ranked list of all responses with scores and reasoning"""
class CortAgent:
def __init__(
self,
alternative_responses: int = 3,
):
self.thinking_rounds = Agent(
agent_name="CortAgent",
agent_description="CortAgent is a multi-step agent that uses a battle royale approach to determine the best response to a task.",
model_name="gpt-4o-mini",
max_loops=1,
dynamic_temperature_enabled=True,
tools_list_dictionary=thinking_rounds_schema,
system_prompt=THINKING_ROUNDS_PROMPT,
)
self.alternatives_agent = Agent(
agent_name="CortAgentAlternatives",
agent_description="Generates multiple alternative responses to a task",
model_name="gpt-4o-mini",
max_loops=1,
dynamic_temperature_enabled=True,
tools_list_dictionary=alternative_responses_schema,
system_prompt=ALTERNATIVES_PROMPT,
)
self.evaluation_agent = Agent(
agent_name="CortAgentEvaluation",
agent_description="Evaluates and ranks alternative responses",
model_name="gpt-4o-mini",
max_loops=1,
dynamic_temperature_enabled=True,
tools_list_dictionary=evaluation_schema,
system_prompt=EVALUATION_PROMPT,
)
def run(self, task: str):
# First determine number of thinking rounds
num_rounds = self.thinking_rounds.run(task)
# Initialize with the task
current_task = task
best_response = None
# Run the battle royale for the determined number of rounds
for round_num in range(num_rounds):
# Generate alternatives
alternatives = self.alternatives_agent.run(current_task)
# Evaluate alternatives
evaluation = self.evaluation_agent.run(alternatives)
# Update best response and current task for next round
best_response = evaluation["evaluation"]["best_response"]
current_task = f"Previous best response: {best_response}\nOriginal task: {task}"
return best_response

@ -16,39 +16,122 @@ logger = initialize_logger(log_folder="concurrent_workflow")
class ConcurrentWorkflow(BaseSwarm): class ConcurrentWorkflow(BaseSwarm):
""" """
Represents a concurrent workflow that executes multiple agents concurrently in a production-grade manner. A production-grade concurrent workflow orchestrator that executes multiple agents simultaneously.
Features include:
- Caching for repeated prompts ConcurrentWorkflow is designed for high-performance multi-agent orchestration with advanced features
- Enhanced error handling and retries including real-time monitoring, error handling, caching, and flexible output formatting. It's ideal
- Input validation for scenarios where multiple agents need to process the same task independently and their results
need to be aggregated.
Key Features:
- Concurrent execution using ThreadPoolExecutor for optimal performance
- Real-time dashboard monitoring with status updates
- Comprehensive error handling and recovery
- Flexible output formatting options
- Automatic prompt engineering capabilities
- Conversation history management
- Metadata persistence and auto-saving
- Support for both single and batch processing
- Image input support for multimodal agents
Use Cases:
- Multi-perspective analysis (financial, legal, technical reviews)
- Consensus building and voting systems
- Parallel data processing and analysis
- A/B testing with different agent configurations
- Redundancy and reliability improvements
Args: Args:
name (str): The name of the workflow. Defaults to "ConcurrentWorkflow". name (str, optional): Unique identifier for the workflow instance.
description (str): The description of the workflow. Defaults to "Execution of multiple agents concurrently". Defaults to "ConcurrentWorkflow".
agents (List[Agent]): The list of agents to be executed concurrently. Defaults to an empty list. description (str, optional): Human-readable description of the workflow's purpose.
metadata_output_path (str): The path to save the metadata output. Defaults to "agent_metadata.json". Defaults to "Execution of multiple agents concurrently".
auto_save (bool): Flag indicating whether to automatically save the metadata. Defaults to False. agents (List[Union[Agent, Callable]], optional): List of Agent instances or callable objects
output_type (str): The type of output format. Defaults to "dict". to execute concurrently. Each agent should implement a `run` method.
max_loops (int): The maximum number of loops for each agent. Defaults to 1. Defaults to empty list.
return_str_on (bool): Flag indicating whether to return the output as a string. Defaults to False. metadata_output_path (str, optional): File path for saving execution metadata and results.
auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. Defaults to False. Supports JSON format. Defaults to "agent_metadata.json".
return_entire_history (bool): Flag indicating whether to return the entire conversation history. Defaults to False. auto_save (bool, optional): Whether to automatically save conversation history and metadata
show_dashboard (bool): Flag indicating whether to show a real-time dashboard. Defaults to True. after each run. Defaults to True.
output_type (str, optional): Format for aggregating agent outputs. Options include:
- "dict-all-except-first": Dictionary with all agent outputs except the first
- "dict": Dictionary with all agent outputs
- "str": Concatenated string of all outputs
- "list": List of individual agent outputs
Defaults to "dict-all-except-first".
max_loops (int, optional): Maximum number of execution loops for each agent.
Defaults to 1.
auto_generate_prompts (bool, optional): Enable automatic prompt engineering for all agents.
When True, agents will enhance their prompts automatically. Defaults to False.
show_dashboard (bool, optional): Enable real-time dashboard display showing agent status,
progress, and outputs. Useful for monitoring and debugging. Defaults to False.
*args: Additional positional arguments passed to the BaseSwarm parent class.
**kwargs: Additional keyword arguments passed to the BaseSwarm parent class.
Raises: Raises:
ValueError: If the list of agents is empty or if the description is empty. ValueError: If agents list is empty or None.
ValueError: If description is empty or None.
TypeError: If agents list contains non-Agent, non-callable objects.
Attributes: Attributes:
name (str): The name of the workflow. name (str): The workflow instance name.
description (str): The description of the workflow. description (str): The workflow description.
agents (List[Agent]): The list of agents to be executed concurrently. agents (List[Union[Agent, Callable]]): List of agents to execute.
metadata_output_path (str): The path to save the metadata output. metadata_output_path (str): Path for metadata output file.
auto_save (bool): Flag indicating whether to automatically save the metadata. auto_save (bool): Auto-save flag for metadata persistence.
output_type (str): The type of output format. output_type (str): Output aggregation format.
max_loops (int): The maximum number of loops for each agent. max_loops (int): Maximum execution loops per agent.
auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. auto_generate_prompts (bool): Auto prompt engineering flag.
show_dashboard (bool): Flag indicating whether to show a real-time dashboard. show_dashboard (bool): Dashboard display flag.
agent_statuses (dict): Dictionary to track agent statuses. agent_statuses (dict): Real-time status tracking for each agent.
conversation (Conversation): Conversation history manager.
Example:
Basic usage with multiple agents:
>>> from swarms import Agent, ConcurrentWorkflow
>>>
>>> # Create specialized agents
>>> financial_agent = Agent(
... agent_name="Financial-Analyst",
... system_prompt="You are a financial analysis expert...",
... model_name="gpt-4"
... )
>>> legal_agent = Agent(
... agent_name="Legal-Advisor",
... system_prompt="You are a legal expert...",
... model_name="gpt-4"
... )
>>>
>>> # Create workflow
>>> workflow = ConcurrentWorkflow(
... name="Multi-Expert-Analysis",
... agents=[financial_agent, legal_agent],
... show_dashboard=True,
... auto_save=True
... )
>>>
>>> # Execute analysis
>>> task = "Analyze the risks of investing in cryptocurrency"
>>> results = workflow.run(task)
>>> print(f"Analysis complete with {len(results)} perspectives")
Batch processing example:
>>> tasks = [
... "Analyze Q1 financial performance",
... "Review Q2 market trends",
... "Forecast Q3 projections"
... ]
>>> batch_results = workflow.batch_run(tasks)
>>> print(f"Processed {len(batch_results)} quarterly reports")
Note:
- Agents are executed using ThreadPoolExecutor with 95% of available CPU cores
- Each agent runs independently and cannot communicate with others during execution
- The workflow maintains conversation history across all runs for context
- Dashboard mode disables individual agent printing to prevent output conflicts
- Error handling ensures partial results are available even if some agents fail
""" """
def __init__( def __init__(
@ -61,10 +144,21 @@ class ConcurrentWorkflow(BaseSwarm):
output_type: str = "dict-all-except-first", output_type: str = "dict-all-except-first",
max_loops: int = 1, max_loops: int = 1,
auto_generate_prompts: bool = False, auto_generate_prompts: bool = False,
show_dashboard: bool = True, show_dashboard: bool = False,
*args, *args,
**kwargs, **kwargs,
): ):
"""
Initialize the ConcurrentWorkflow with configuration parameters.
Performs initialization, validation, and setup of internal components including
conversation management, agent status tracking, and dashboard configuration.
Note:
The constructor automatically performs reliability checks and configures
agents for dashboard mode if enabled. Agent print outputs are disabled
when dashboard mode is active to prevent display conflicts.
"""
super().__init__( super().__init__(
name=name, name=name,
description=description, description=description,
@ -93,12 +187,58 @@ class ConcurrentWorkflow(BaseSwarm):
self.agents = self.fix_agents() self.agents = self.fix_agents()
def fix_agents(self): def fix_agents(self):
"""
Configure agents for dashboard mode by disabling individual print outputs.
When dashboard mode is enabled, individual agent print outputs can interfere
with the dashboard display. This method disables print_on for all agents
to ensure clean dashboard rendering.
Returns:
List[Agent]: The modified list of agents with print_on disabled.
Note:
This method only modifies agents when show_dashboard is True.
Agent functionality is not affected, only their output display behavior.
Example:
>>> workflow = ConcurrentWorkflow(show_dashboard=True, agents=[agent1, agent2])
>>> # Agents automatically configured for dashboard mode
>>> all(not agent.print_on for agent in workflow.agents)
True
"""
if self.show_dashboard is True: if self.show_dashboard is True:
for agent in self.agents: for agent in self.agents:
agent.print_on = False agent.print_on = False
return self.agents return self.agents
def reliability_check(self): def reliability_check(self):
"""
Perform comprehensive validation of workflow configuration and agents.
Validates that the workflow is properly configured with valid agents and
provides warnings for suboptimal configurations. This method is called
automatically during initialization.
Validates:
- Agents list is not None or empty
- At least one agent is provided
- Warns if only one agent is provided (suboptimal for concurrent execution)
Raises:
ValueError: If agents list is None or empty.
Logs:
Warning: If only one agent is provided (concurrent execution not beneficial).
Error: If validation fails with detailed error information.
Example:
>>> workflow = ConcurrentWorkflow(agents=[])
ValueError: ConcurrentWorkflow: No agents provided
>>> workflow = ConcurrentWorkflow(agents=[single_agent])
# Logs warning about single agent usage
"""
try: try:
if self.agents is None: if self.agents is None:
raise ValueError( raise ValueError(
@ -122,12 +262,31 @@ class ConcurrentWorkflow(BaseSwarm):
def activate_auto_prompt_engineering(self): def activate_auto_prompt_engineering(self):
""" """
Activates the auto-generate prompts feature for all agents in the workflow. Enable automatic prompt engineering for all agents in the workflow.
When activated, each agent will automatically enhance and optimize their
system prompts based on the task context and their previous interactions.
This can improve response quality but may increase execution time.
Side Effects:
- Sets auto_generate_prompt=True for all agents in the workflow
- Affects subsequent agent.run() calls, not retroactively
- May increase latency due to prompt optimization overhead
Note:
This method can be called at any time, but changes only affect
future agent executions. Already running agents are not affected.
Example: Example:
>>> workflow = ConcurrentWorkflow(agents=[Agent()]) >>> workflow = ConcurrentWorkflow(agents=[agent1, agent2])
>>> workflow.activate_auto_prompt_engineering() >>> workflow.activate_auto_prompt_engineering()
>>> # All agents in the workflow will now auto-generate prompts. >>> # All agents will now auto-generate optimized prompts
>>> result = workflow.run("Complex analysis task")
>>> # Agents used enhanced prompts for better performance
See Also:
- Agent.auto_generate_prompt: Individual agent prompt engineering
- auto_generate_prompts: Constructor parameter for default behavior
""" """
if self.auto_generate_prompts is True: if self.auto_generate_prompts is True:
for agent in self.agents: for agent in self.agents:
@ -139,11 +298,45 @@ class ConcurrentWorkflow(BaseSwarm):
is_final: bool = False, is_final: bool = False,
) -> None: ) -> None:
""" """
Displays the current status of all agents in a beautiful dashboard format. Display a real-time dashboard showing the current status of all agents.
Renders a formatted dashboard with agent names, execution status, and
output previews. The dashboard is updated in real-time during workflow
execution to provide visibility into agent progress and results.
Args: Args:
title (str): The title of the dashboard. title (str, optional): The dashboard title to display at the top.
is_final (bool): Flag indicating whether this is the final dashboard. Defaults to "🤖 Agent Dashboard".
is_final (bool, optional): Whether this is the final dashboard display
after all agents have completed. Changes formatting and styling.
Defaults to False.
Side Effects:
- Prints formatted dashboard to console
- Updates display in real-time during execution
- May clear previous dashboard content for clean updates
Note:
This method is automatically called during workflow execution when
show_dashboard=True. Manual calls are supported for custom monitoring.
Dashboard Status Values:
- "pending": Agent queued but not yet started
- "running": Agent currently executing task
- "completed": Agent finished successfully
- "error": Agent execution failed with error
Example:
>>> workflow = ConcurrentWorkflow(agents=[agent1, agent2], show_dashboard=True)
>>> workflow.display_agent_dashboard("Custom Dashboard Title")
# Displays:
# 🤖 Custom Dashboard Title
# ┌─────────────────┬─────────┬──────────────────┐
# │ Agent Name │ Status │ Output Preview │
# ├─────────────────┼─────────┼──────────────────┤
# │ Financial-Agent │ running │ Analyzing data...│
# │ Legal-Agent │ pending │ │
# └─────────────────┴─────────┴──────────────────┘
""" """
agents_data = [ agents_data = [
{ {
@ -166,8 +359,66 @@ class ConcurrentWorkflow(BaseSwarm):
imgs: Optional[List[str]] = None, imgs: Optional[List[str]] = None,
): ):
""" """
Executes all agents in the workflow concurrently on the given task. Execute all agents concurrently with real-time dashboard monitoring.
Now includes real-time dashboard updates.
This method provides the same concurrent execution as _run() but with
enhanced real-time monitoring through a visual dashboard. Agent status
updates are displayed in real-time, showing progress, completion, and
any errors that occur during execution.
Args:
task (str): The task description or prompt to be executed by all agents.
This will be passed to each agent's run() method.
img (Optional[str], optional): Path to a single image file for agents
that support multimodal input. Defaults to None.
imgs (Optional[List[str]], optional): List of image file paths for
agents that support multiple image inputs. Defaults to None.
Returns:
Any: Formatted output based on the configured output_type setting.
The return format depends on the output_type parameter set during
workflow initialization.
Raises:
Exception: Re-raises any exceptions from agent execution after
updating the dashboard with error status.
Side Effects:
- Displays initial dashboard showing all agents as "pending"
- Updates dashboard in real-time as agents start, run, and complete
- Displays final dashboard with all results when execution completes
- Adds all task inputs and agent outputs to conversation history
- Automatically cleans up dashboard display resources
Dashboard Flow:
1. Initial dashboard shows all agents as "pending"
2. As agents start, status updates to "running"
3. As agents complete, status updates to "completed" with output preview
4. Final dashboard shows complete results summary
5. Dashboard resources are cleaned up automatically
Performance:
- Uses 95% of available CPU cores for optimal concurrency
- ThreadPoolExecutor manages thread lifecycle automatically
- Real-time updates have minimal performance overhead
Example:
>>> workflow = ConcurrentWorkflow(
... agents=[financial_agent, legal_agent],
... show_dashboard=True
... )
>>> result = workflow.run_with_dashboard(
... task="Analyze the merger proposal",
... img="company_financials.png"
... )
# Dashboard shows real-time progress:
# Agent-1: pending -> running -> completed
# Agent-2: pending -> running -> completed
>>> print("Analysis complete:", result)
Note:
This method is automatically called when show_dashboard=True.
For headless execution without dashboard, use _run() directly.
""" """
try: try:
self.conversation.add(role="User", content=task) self.conversation.add(role="User", content=task)
@ -279,20 +530,64 @@ class ConcurrentWorkflow(BaseSwarm):
imgs: Optional[List[str]] = None, imgs: Optional[List[str]] = None,
): ):
""" """
Executes all agents in the workflow concurrently on the given task. Execute all agents concurrently without dashboard monitoring (headless mode).
This is the core execution method that runs all agents simultaneously using
ThreadPoolExecutor for optimal performance. Results are collected and
formatted according to the configured output_type.
Args: Args:
task (str): The task to be executed by all agents. task (str): The task description or prompt to be executed by all agents.
img (Optional[str]): Optional image path for agents that support image input. Each agent receives the same task input for independent processing.
imgs (Optional[List[str]]): Optional list of image paths for agents that support multiple image inputs. img (Optional[str], optional): Path to a single image file for multimodal
agents. The same image is provided to all agents. Defaults to None.
imgs (Optional[List[str]], optional): List of image file paths for agents
that support multiple image inputs. All agents receive the same image list.
Defaults to None.
Returns: Returns:
The formatted output based on the configured output_type. Any: Formatted output according to the output_type configuration:
- "dict-all-except-first": Dict with all agent outputs except first
- "dict": Dict with all agent outputs keyed by agent name
- "str": Concatenated string of all agent outputs
- "list": List of individual agent outputs in completion order
Side Effects:
- Adds the user task to conversation history
- Adds each agent's output to conversation history upon completion
- No visual output or dashboard updates (headless execution)
Performance Characteristics:
- Uses ThreadPoolExecutor with 95% of available CPU cores
- Agents execute truly concurrently, not sequentially
- Results are collected in completion order, not submission order
- Memory efficient for large numbers of agents
Error Handling:
- Individual agent failures don't stop other agents
- Failed agents have their exceptions logged but execution continues
- Partial results are still returned for successful agents
Thread Safety:
- Conversation object handles concurrent access safely
- Agent status is not tracked in this method (dashboard-free)
- Each agent runs in isolation without shared state
Example: Example:
>>> workflow = ConcurrentWorkflow(agents=[agent1, agent2]) >>> workflow = ConcurrentWorkflow(
>>> result = workflow.run("Analyze this financial data") ... agents=[agent1, agent2, agent3],
>>> print(result) ... output_type="dict"
... )
>>> result = workflow._run(
... task="Analyze market trends for Q4",
... img="market_chart.png"
... )
>>> print(f"Received {len(result)} agent analyses")
>>> # Result format: {"agent1": "analysis1", "agent2": "analysis2", ...}
Note:
This method is called automatically by run() when show_dashboard=False.
For monitoring and real-time updates, use run_with_dashboard() instead.
""" """
self.conversation.add(role="User", content=task) self.conversation.add(role="User", content=task)
@ -331,7 +626,79 @@ class ConcurrentWorkflow(BaseSwarm):
imgs: Optional[List[str]] = None, imgs: Optional[List[str]] = None,
): ):
""" """
Executes all agents in the workflow concurrently on the given task. Execute all agents concurrently on the given task with optional dashboard monitoring.
This is the main entry point for workflow execution. Automatically selects
between dashboard-enabled execution (run_with_dashboard) or headless execution
(_run) based on the show_dashboard configuration.
Args:
task (str): The task description, prompt, or instruction to be executed
by all agents concurrently. Each agent processes the same task
independently.
img (Optional[str], optional): Path to a single image file for agents
that support multimodal (text + image) input. Defaults to None.
imgs (Optional[List[str]], optional): List of image file paths for
agents that support multiple image inputs. Defaults to None.
Returns:
Any: Aggregated results from all agents formatted according to output_type:
- "dict-all-except-first": Dictionary excluding first agent's output
- "dict": Complete dictionary with all agent outputs
- "str": Concatenated string of all outputs
- "list": List of individual agent outputs
Execution Modes:
- Dashboard Mode (show_dashboard=True): Provides real-time visual monitoring
with status updates, progress tracking, and error visibility
- Headless Mode (show_dashboard=False): Silent execution with no visual output,
optimized for performance and automation scenarios
Concurrent Execution:
- All agents run simultaneously using ThreadPoolExecutor
- Utilizes 95% of available CPU cores for optimal performance
- Thread-safe conversation history management
- Independent agent execution without inter-agent communication
Error Resilience:
- Individual agent failures don't halt the entire workflow
- Partial results are returned for successful agents
- Comprehensive error logging for debugging
- Graceful degradation under failure conditions
Example:
Basic concurrent execution:
>>> workflow = ConcurrentWorkflow(agents=[analyst, reviewer, summarizer])
>>> result = workflow.run("Evaluate the new product proposal")
>>> print(f"Received insights from {len(workflow.agents)} experts")
With dashboard monitoring:
>>> workflow = ConcurrentWorkflow(
... agents=[financial_agent, legal_agent],
... show_dashboard=True
... )
>>> result = workflow.run("Review merger agreement")
# Real-time dashboard shows progress and completion status
Multimodal analysis:
>>> result = workflow.run(
... task="Analyze this chart and provide insights",
... img="quarterly_results.png"
... )
Performance Tips:
- Use 2+ agents to benefit from concurrent execution
- Dashboard mode adds minimal overhead for monitoring
- Larger agent counts scale well with available CPU cores
- Consider batch_run() for multiple related tasks
See Also:
- batch_run(): For processing multiple tasks sequentially
- run_with_dashboard(): For direct dashboard execution
- _run(): For direct headless execution
""" """
if self.show_dashboard: if self.show_dashboard:
return self.run_with_dashboard(task, img, imgs) return self.run_with_dashboard(task, img, imgs)
@ -341,29 +708,112 @@ class ConcurrentWorkflow(BaseSwarm):
def batch_run( def batch_run(
self, self,
tasks: List[str], tasks: List[str],
img: Optional[str] = None,
imgs: Optional[List[str]] = None, imgs: Optional[List[str]] = None,
): ):
""" """
Executes the workflow on multiple tasks sequentially. Execute the workflow on multiple tasks sequentially with concurrent agent processing.
This method processes a list of tasks one by one, where each task is executed
by all agents concurrently. This is ideal for batch processing scenarios where
you have multiple related tasks that need the same multi-agent analysis.
Args: Args:
tasks (List[str]): List of tasks to be executed by all agents. tasks (List[str]): List of task descriptions, prompts, or instructions.
img (Optional[str]): Optional image path for agents that support image input. Each task will be processed by all agents concurrently before
imgs (Optional[List[str]]): Optional list of image paths for agents that support multiple image inputs. moving to the next task. Tasks are processed sequentially.
imgs (Optional[List[str]], optional): List of image file paths corresponding
to each task. If provided, imgs[i] will be used for tasks[i].
If fewer images than tasks are provided, remaining tasks will
execute without images. Defaults to None.
Returns: Returns:
List of results, one for each task. List[Any]: List of results, one for each task. Each result is formatted
according to the workflow's output_type configuration. The length
of the returned list equals the length of the input tasks list.
Processing Flow:
1. Tasks are processed sequentially (not concurrently with each other)
2. For each task, all agents execute concurrently
3. Results are collected and formatted for each task
4. Conversation history accumulates across all tasks
5. Final result list contains aggregated outputs for each task
Image Handling:
- If imgs is None: All tasks execute without images
- If imgs has fewer items than tasks: Extra tasks execute without images
- If imgs has more items than tasks: Extra images are ignored
- Each task gets at most one corresponding image
Dashboard Behavior:
- If show_dashboard=True, dashboard resets and updates for each task
- Progress is shown separately for each task's agent execution
- Final dashboard shows results from the last task only
Memory and Performance:
- Conversation history grows with each task (cumulative)
- Memory usage scales with number of tasks and agent outputs
- CPU utilization is optimal during each task's concurrent execution
- Consider clearing conversation history for very large batch jobs
Example: Example:
>>> workflow = ConcurrentWorkflow(agents=[agent1, agent2]) Financial analysis across quarters:
>>> tasks = ["Task 1", "Task 2", "Task 3"]
>>> results = workflow.batch_run(tasks) >>> workflow = ConcurrentWorkflow(agents=[analyst1, analyst2, analyst3])
>>> print(len(results)) # 3 >>> quarterly_tasks = [
... "Analyze Q1 financial performance and market position",
... "Analyze Q2 financial performance and market position",
... "Analyze Q3 financial performance and market position",
... "Analyze Q4 financial performance and market position"
... ]
>>> results = workflow.batch_run(quarterly_tasks)
>>> print(f"Completed {len(results)} quarterly analyses")
>>> # Each result contains insights from all 3 analysts for that quarter
With corresponding images:
>>> tasks = ["Analyze chart trends", "Review performance metrics"]
>>> charts = ["q1_chart.png", "q2_chart.png"]
>>> results = workflow.batch_run(tasks, imgs=charts)
>>> # Task 0 uses q1_chart.png, Task 1 uses q2_chart.png
Batch processing with dashboard:
>>> workflow = ConcurrentWorkflow(
... agents=[agent1, agent2],
... show_dashboard=True
... )
>>> results = workflow.batch_run([
... "Process document batch 1",
... "Process document batch 2",
... "Process document batch 3"
... ])
# Dashboard shows progress for each batch separately
Use Cases:
- Multi-period financial analysis (quarterly/yearly reports)
- Batch document processing with multiple expert reviews
- A/B testing across different scenarios or datasets
- Systematic evaluation of multiple proposals or options
- Comparative analysis across time periods or categories
Note:
Tasks are intentionally processed sequentially to maintain result
order and prevent resource contention. For truly concurrent task
processing, create separate workflow instances.
See Also:
- run(): For single task execution
- ConcurrentWorkflow: For configuration options
""" """
return [ results = []
self.run(task=task, img=img, imgs=imgs) for task in tasks for idx, task in enumerate(tasks):
] img = None
if imgs is not None:
# Use the img at the same index if available, else None
if idx < len(imgs):
img = imgs[idx]
results.append(self.run(task=task, img=img))
return results
# if __name__ == "__main__": # if __name__ == "__main__":

@ -91,6 +91,18 @@ class SwarmRouterConfig(BaseModel):
arbitrary_types_allowed = True arbitrary_types_allowed = True
class SwarmRouterRunError(Exception):
"""Exception raised when an error occurs during task execution."""
pass
class SwarmRouterConfigError(Exception):
"""Exception raised when an error occurs during task execution."""
pass
class SwarmRouter: class SwarmRouter:
""" """
A class that dynamically routes tasks to different swarm types based on user selection or automatic matching. A class that dynamically routes tasks to different swarm types based on user selection or automatic matching.
@ -189,6 +201,8 @@ class SwarmRouter:
heavy_swarm_question_agent_model_name: str = "gpt-4.1", heavy_swarm_question_agent_model_name: str = "gpt-4.1",
heavy_swarm_worker_model_name: str = "claude-3-5-sonnet-20240620", heavy_swarm_worker_model_name: str = "claude-3-5-sonnet-20240620",
telemetry_enabled: bool = False, telemetry_enabled: bool = False,
council_judge_model_name: str = "gpt-4o-mini", # Add missing model_name attribute
verbose: bool = False,
*args, *args,
**kwargs, **kwargs,
): ):
@ -224,17 +238,73 @@ class SwarmRouter:
heavy_swarm_worker_model_name heavy_swarm_worker_model_name
) )
self.telemetry_enabled = telemetry_enabled self.telemetry_enabled = telemetry_enabled
self.council_judge_model_name = council_judge_model_name # Add missing model_name attribute
self.verbose = verbose
# Initialize swarm factory for O(1) lookup performance
self._swarm_factory = self._initialize_swarm_factory()
self._swarm_cache = {} # Cache for created swarms
# Reliability check # Reliability check
self.reliability_check() self.reliability_check()
# Load agents from CSV def reliability_check(self):
if self.load_agents_from_csv: """Perform reliability checks on swarm configuration.
self.agents = AgentLoader(
csv_path=self.csv_file_path Validates essential swarm parameters and configuration before execution.
).load_agents() Handles special case for CouncilAsAJudge which may not require agents.
"""
try:
if self.verbose:
logger.info(
f"[SwarmRouter Reliability Check] Initializing SwarmRouter '{self.name}'. "
"Validating required parameters for robust operation.\n"
"For detailed documentation on SwarmRouter configuration, usage, and available swarm types, "
"please visit: https://docs.swarms.world/en/latest/swarms/structs/swarm_router/"
)
# Check swarm type first since it affects other validations
if self.swarm_type is None:
raise SwarmRouterConfigError(
"SwarmRouter: Swarm type cannot be 'none'. Check the docs for all the swarm types available. https://docs.swarms.world/en/latest/swarms/structs/swarm_router/"
)
if self.agents is None:
raise SwarmRouterConfigError(
"SwarmRouter: No agents provided for the swarm. Check the docs to learn of required parameters. https://docs.swarms.world/en/latest/swarms/structs/agent/"
)
if (
self.swarm_type == "AgentRearrange"
and self.rearrange_flow is None
):
raise SwarmRouterConfigError(
"SwarmRouter: rearrange_flow cannot be 'none' when using AgentRearrange. Check the SwarmRouter docs to learn of required parameters. https://docs.swarms.world/en/latest/swarms/structs/agent_rearrange/"
)
# Validate max_loops
if self.max_loops == 0:
raise SwarmRouterConfigError(
"SwarmRouter: max_loops cannot be 0. Check the docs for all the max_loops available. https://docs.swarms.world/en/latest/swarms/structs/swarm_router/"
)
self.setup()
self.agent_config = self.agent_config() # Load agents from CSV
if self.load_agents_from_csv:
self.agents = AgentLoader(
csv_path=self.csv_file_path
).load_agents()
if self.telemetry_enabled:
self.agent_config = self.agent_config()
except SwarmRouterConfigError as e:
logger.error(
f"SwarmRouterConfigError: {str(e)} Full Traceback: {traceback.format_exc()}"
)
raise e
def setup(self): def setup(self):
if self.auto_generate_prompts is True: if self.auto_generate_prompts is True:
@ -254,6 +324,17 @@ class SwarmRouter:
if self.list_all_agents is True: if self.list_all_agents is True:
self.list_agents_to_eachother() self.list_agents_to_eachother()
def fetch_message_history_as_string(self):
try:
return (
self.swarm.conversation.return_all_except_first_string()
)
except Exception as e:
logger.error(
f"Error fetching message history as string: {str(e)}"
)
return None
def activate_shared_memory(self): def activate_shared_memory(self):
logger.info("Activating shared memory with all agents ") logger.info("Activating shared memory with all agents ")
@ -296,41 +377,203 @@ class SwarmRouter:
) )
raise RuntimeError(error_msg) from e raise RuntimeError(error_msg) from e
def reliability_check(self): def _initialize_swarm_factory(self) -> Dict[str, Callable]:
"""Perform reliability checks on swarm configuration. """
Initialize the swarm factory with O(1) lookup performance.
Validates essential swarm parameters and configuration before execution. Returns:
Handles special case for CouncilAsAJudge which may not require agents. Dict[str, Callable]: Dictionary mapping swarm types to their factory functions.
""" """
return {
"HeavySwarm": self._create_heavy_swarm,
"AgentRearrange": self._create_agent_rearrange,
"MALT": self._create_malt,
"CouncilAsAJudge": self._create_council_as_judge,
"InteractiveGroupChat": self._create_interactive_group_chat,
"DeepResearchSwarm": self._create_deep_research_swarm,
"HiearchicalSwarm": self._create_hierarchical_swarm,
"MixtureOfAgents": self._create_mixture_of_agents,
"MajorityVoting": self._create_majority_voting,
"GroupChat": self._create_group_chat,
"MultiAgentRouter": self._create_multi_agent_router,
"SpreadSheetSwarm": self._create_spreadsheet_swarm,
"SequentialWorkflow": self._create_sequential_workflow,
"ConcurrentWorkflow": self._create_concurrent_workflow,
}
logger.info( def _create_heavy_swarm(self, *args, **kwargs):
f"Initializing SwarmRouter: {self.name} Reliability Check..." """Factory function for HeavySwarm."""
return HeavySwarm(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
output_type=self.output_type,
loops_per_agent=self.heavy_swarm_loops_per_agent,
question_agent_model_name=self.heavy_swarm_question_agent_model_name,
worker_model_name=self.heavy_swarm_worker_model_name,
) )
# Check swarm type first since it affects other validations def _create_agent_rearrange(self, *args, **kwargs):
if self.swarm_type is None: """Factory function for AgentRearrange."""
raise ValueError( return AgentRearrange(
"SwarmRouter: Swarm type cannot be 'none'." name=self.name,
) description=self.description,
agents=self.agents,
max_loops=self.max_loops,
flow=self.rearrange_flow,
return_json=self.return_json,
output_type=self.output_type,
return_entire_history=self.return_entire_history,
*args,
**kwargs,
)
if self.agents is None: def _create_malt(self, *args, **kwargs):
raise ValueError( """Factory function for MALT."""
"SwarmRouter: No agents provided for the swarm." return MALT(
) name=self.name,
description=self.description,
max_loops=self.max_loops,
return_dict=True,
preset_agents=True,
)
def _create_council_as_judge(self, *args, **kwargs):
"""Factory function for CouncilAsAJudge."""
return CouncilAsAJudge(
name=self.name,
description=self.description,
model_name=self.council_judge_model_name,
output_type=self.output_type,
base_agent=self.agents[0] if self.agents else None,
)
def _create_interactive_group_chat(self, *args, **kwargs):
"""Factory function for InteractiveGroupChat."""
return InteractiveGroupChat(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
output_type=self.output_type,
speaker_function=self.speaker_function,
)
def _create_deep_research_swarm(self, *args, **kwargs):
"""Factory function for DeepResearchSwarm."""
return DeepResearchSwarm(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
output_type=self.output_type,
)
def _create_hierarchical_swarm(self, *args, **kwargs):
"""Factory function for HierarchicalSwarm."""
return HierarchicalSwarm(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
return_all_history=self.return_entire_history,
output_type=self.output_type,
*args,
**kwargs,
)
def _create_mixture_of_agents(self, *args, **kwargs):
"""Factory function for MixtureOfAgents."""
return MixtureOfAgents(
name=self.name,
description=self.description,
agents=self.agents,
aggregator_agent=self.agents[-1],
layers=self.max_loops,
output_type=self.output_type,
*args,
**kwargs,
)
def _create_majority_voting(self, *args, **kwargs):
"""Factory function for MajorityVoting."""
return MajorityVoting(
name=self.name,
description=self.description,
agents=self.agents,
consensus_agent=self.agents[-1],
*args,
**kwargs,
)
def _create_group_chat(self, *args, **kwargs):
"""Factory function for GroupChat."""
return GroupChat(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
speaker_fn=self.speaker_fn,
*args,
**kwargs,
)
def _create_multi_agent_router(self, *args, **kwargs):
"""Factory function for MultiAgentRouter."""
return MultiAgentRouter(
name=self.name,
description=self.description,
agents=self.agents,
shared_memory_system=self.shared_memory_system,
output_type=self.output_type,
)
# Validate max_loops def _create_spreadsheet_swarm(self, *args, **kwargs):
if self.max_loops == 0: """Factory function for SpreadSheetSwarm."""
raise ValueError("SwarmRouter: max_loops cannot be 0.") return SpreadSheetSwarm(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
autosave_on=self.autosave,
*args,
**kwargs,
)
self.setup() def _create_sequential_workflow(self, *args, **kwargs):
"""Factory function for SequentialWorkflow."""
return SequentialWorkflow(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
shared_memory_system=self.shared_memory_system,
output_type=self.output_type,
return_json=self.return_json,
return_entire_history=self.return_entire_history,
*args,
**kwargs,
)
logger.info( def _create_concurrent_workflow(self, *args, **kwargs):
f"Reliability check for parameters and configurations are complete. SwarmRouter: {self.name} is ready to run!" """Factory function for ConcurrentWorkflow."""
return ConcurrentWorkflow(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
auto_save=self.autosave,
return_str_on=self.return_entire_history,
output_type=self.output_type,
*args,
**kwargs,
) )
def _create_swarm(self, task: str = None, *args, **kwargs): def _create_swarm(self, task: str = None, *args, **kwargs):
""" """
Dynamically create and return the specified swarm type or automatically match the best swarm type for a given task. Dynamically create and return the specified swarm type with O(1) lookup performance.
Uses factory pattern with caching for optimal performance.
Args: Args:
task (str, optional): The task to be executed by the swarm. Defaults to None. task (str, optional): The task to be executed by the swarm. Defaults to None.
@ -344,165 +587,56 @@ class SwarmRouter:
Raises: Raises:
ValueError: If an invalid swarm type is provided. ValueError: If an invalid swarm type is provided.
""" """
# Handle auto swarm type selection
if self.swarm_type == "auto": if self.swarm_type == "auto":
self.swarm_type = str(swarm_matcher(task)) try:
matched_swarm_type = str(swarm_matcher(task))
self._create_swarm(self.swarm_type) self.swarm_type = matched_swarm_type
logger.info(
elif self.swarm_type == "HeavySwarm": f"Auto-selected swarm type: {matched_swarm_type}"
return HeavySwarm( )
name=self.name, except Exception as e:
description=self.description, logger.warning(
agents=self.agents, f"Auto-selection failed: {e}, falling back to SequentialWorkflow"
max_loops=self.max_loops, )
output_type=self.output_type, self.swarm_type = "SequentialWorkflow"
loops_per_agent=self.heavy_swarm_loops_per_agent,
question_agent_model_name=self.heavy_swarm_question_agent_model_name,
worker_model_name=self.heavy_swarm_worker_model_name,
)
elif self.swarm_type == "AgentRearrange":
return AgentRearrange(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
flow=self.rearrange_flow,
return_json=self.return_json,
output_type=self.output_type,
return_entire_history=self.return_entire_history,
*args,
**kwargs,
)
elif self.swarm_type == "MALT":
return MALT(
name=self.name,
description=self.description,
max_loops=self.max_loops,
return_dict=True,
preset_agents=True,
)
elif self.swarm_type == "CouncilAsAJudge":
return CouncilAsAJudge(
name=self.name,
description=self.description,
model_name=self.model_name,
output_type=self.output_type,
base_agent=self.agents[0] if self.agents else None,
)
elif self.swarm_type == "InteractiveGroupChat": # Check cache first for better performance
return InteractiveGroupChat( cache_key = (
name=self.name, f"{self.swarm_type}_{hash(str(args) + str(kwargs))}"
description=self.description, )
agents=self.agents, if cache_key in self._swarm_cache:
max_loops=self.max_loops, logger.debug(f"Using cached swarm: {self.swarm_type}")
output_type=self.output_type, return self._swarm_cache[cache_key]
speaker_function=self.speaker_function,
# Use factory pattern for O(1) lookup
factory_func = self._swarm_factory.get(self.swarm_type)
if factory_func is None:
valid_types = list(self._swarm_factory.keys())
raise ValueError(
f"Invalid swarm type: {self.swarm_type}. "
f"Valid types are: {', '.join(valid_types)}"
) )
elif self.swarm_type == "DeepResearchSwarm": # Create the swarm using the factory function
return DeepResearchSwarm( try:
name=self.name, swarm = factory_func(*args, **kwargs)
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
output_type=self.output_type,
)
elif self.swarm_type == "HiearchicalSwarm": # Cache the created swarm for future use
return HierarchicalSwarm( self._swarm_cache[cache_key] = swarm
name=self.name,
description=self.description,
# director=self.agents[0],
agents=self.agents,
max_loops=self.max_loops,
return_all_history=self.return_entire_history,
output_type=self.output_type,
*args,
**kwargs,
)
elif self.swarm_type == "MixtureOfAgents":
return MixtureOfAgents(
name=self.name,
description=self.description,
agents=self.agents,
aggregator_agent=self.agents[-1],
layers=self.max_loops,
output_type=self.output_type,
*args,
**kwargs,
)
elif self.swarm_type == "MajorityVoting": logger.info(
return MajorityVoting( f"Successfully created swarm: {self.swarm_type}"
name=self.name,
description=self.description,
agents=self.agents,
consensus_agent=self.agents[-1],
*args,
**kwargs,
)
elif self.swarm_type == "GroupChat":
return GroupChat(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
speaker_fn=self.speaker_fn,
*args,
**kwargs,
) )
return swarm
elif self.swarm_type == "MultiAgentRouter": except Exception as e:
return MultiAgentRouter( logger.error(
name=self.name, f"Failed to create swarm {self.swarm_type}: {str(e)}"
description=self.description,
agents=self.agents,
shared_memory_system=self.shared_memory_system,
output_type=self.output_type,
)
elif self.swarm_type == "SpreadSheetSwarm":
return SpreadSheetSwarm(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
autosave_on=self.autosave,
*args,
**kwargs,
)
elif self.swarm_type == "SequentialWorkflow":
return SequentialWorkflow(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
shared_memory_system=self.shared_memory_system,
output_type=self.output_type,
return_json=self.return_json,
return_entire_history=self.return_entire_history,
*args,
**kwargs,
)
elif self.swarm_type == "ConcurrentWorkflow":
return ConcurrentWorkflow(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
auto_save=self.autosave,
return_str_on=self.return_entire_history,
output_type=self.output_type,
*args,
**kwargs,
)
else:
raise ValueError(
f"Invalid swarm type: {self.swarm_type} try again with a valid swarm type such as 'SequentialWorkflow' or 'ConcurrentWorkflow' or 'auto' or 'AgentRearrange' or 'MixtureOfAgents' or 'SpreadSheetSwarm'"
) )
raise RuntimeError(
f"Failed to create swarm {self.swarm_type}: {str(e)}"
) from e
def update_system_prompt_for_agent_in_swarm(self): def update_system_prompt_for_agent_in_swarm(self):
# Use list comprehension for faster iteration # Use list comprehension for faster iteration
@ -591,10 +725,19 @@ class SwarmRouter:
) )
return result return result
except Exception as e: except SwarmRouterRunError as e:
raise RuntimeError( logger.error(
f"SwarmRouter: Error executing task on swarm: {str(e)} Traceback: {traceback.format_exc()}. Try reconfiguring the SwarmRouter Settings and or make sure the individual agents are configured correctly." f"\n[SwarmRouter ERROR] '{self.name}' failed to execute the task on the selected swarm.\n"
f"Reason: {str(e)}\n"
f"Traceback:\n{traceback.format_exc()}\n\n"
"Troubleshooting steps:\n"
" - Double-check your SwarmRouter configuration (swarm_type, agents, parameters).\n"
" - Ensure all individual agents are properly configured and initialized.\n"
" - Review the error message and traceback above for clues.\n\n"
"For detailed documentation on SwarmRouter configuration, usage, and available swarm types, please visit:\n"
" https://docs.swarms.world/en/latest/swarms/structs/swarm_router/\n"
) )
raise e
def run( def run(
self, self,
@ -631,10 +774,19 @@ class SwarmRouter:
*args, *args,
**kwargs, **kwargs,
) )
except Exception as e: except SwarmRouterRunError as e:
raise RuntimeError( logger.error(
f"SwarmRouter: Error executing task on swarm: {str(e)} Traceback: {traceback.format_exc()}" f"\n[SwarmRouter ERROR] '{self.name}' failed to execute the task on the selected swarm.\n"
f"Reason: {str(e)}\n"
f"Traceback:\n{traceback.format_exc()}\n\n"
"Troubleshooting steps:\n"
" - Double-check your SwarmRouter configuration (swarm_type, agents, parameters).\n"
" - Ensure all individual agents are properly configured and initialized.\n"
" - Review the error message and traceback above for clues.\n\n"
"For detailed documentation on SwarmRouter configuration, usage, and available swarm types, please visit:\n"
" https://docs.swarms.world/en/latest/swarms/structs/swarm_router/\n"
) )
raise e
def __call__( def __call__(
self, self,

Loading…
Cancel
Save