diff --git a/examples/demos/ethchain_agent.py b/examples/demos/crypto/ethchain_agent.py similarity index 100% rename from examples/demos/ethchain_agent.py rename to examples/demos/crypto/ethchain_agent.py diff --git a/examples/demos/sentiment_news_analysis.py b/examples/demos/finance/sentiment_news_analysis.py similarity index 100% rename from examples/demos/sentiment_news_analysis.py rename to examples/demos/finance/sentiment_news_analysis.py diff --git a/examples/demos/swarms_of_vllm.py b/examples/demos/finance/swarms_of_vllm.py similarity index 100% rename from examples/demos/swarms_of_vllm.py rename to examples/demos/finance/swarms_of_vllm.py diff --git a/examples/demos/insurance_swarm.py b/examples/demos/insurance/insurance_swarm.py similarity index 100% rename from examples/demos/insurance_swarm.py rename to examples/demos/insurance/insurance_swarm.py diff --git a/examples/demos/legal_swarm.py b/examples/demos/legal/legal_swarm.py similarity index 100% rename from examples/demos/legal_swarm.py rename to examples/demos/legal/legal_swarm.py diff --git a/examples/demos/medical_analysis/health_privacy_swarm 2.py b/examples/demos/medical/health_privacy_swarm 2.py similarity index 100% rename from examples/demos/medical_analysis/health_privacy_swarm 2.py rename to examples/demos/medical/health_privacy_swarm 2.py diff --git a/examples/demos/medical_analysis/health_privacy_swarm.py b/examples/demos/medical/health_privacy_swarm.py similarity index 100% rename from examples/demos/medical_analysis/health_privacy_swarm.py rename to examples/demos/medical/health_privacy_swarm.py diff --git a/examples/demos/medical_analysis/health_privacy_swarm_two 2.py b/examples/demos/medical/health_privacy_swarm_two 2.py similarity index 100% rename from examples/demos/medical_analysis/health_privacy_swarm_two 2.py rename to examples/demos/medical/health_privacy_swarm_two 2.py diff --git a/examples/demos/medical_analysis/health_privacy_swarm_two.py b/examples/demos/medical/health_privacy_swarm_two.py similarity index 100% rename from examples/demos/medical_analysis/health_privacy_swarm_two.py rename to examples/demos/medical/health_privacy_swarm_two.py diff --git a/examples/demos/medical_analysis/medical_analysis_agent_rearrange.md b/examples/demos/medical/medical_analysis_agent_rearrange.md similarity index 100% rename from examples/demos/medical_analysis/medical_analysis_agent_rearrange.md rename to examples/demos/medical/medical_analysis_agent_rearrange.md diff --git a/examples/demos/medical_analysis/medical_coder_agent.py b/examples/demos/medical/medical_coder_agent.py similarity index 100% rename from examples/demos/medical_analysis/medical_coder_agent.py rename to examples/demos/medical/medical_coder_agent.py diff --git a/examples/demos/medical_analysis/medical_coding_report.md b/examples/demos/medical/medical_coding_report.md similarity index 100% rename from examples/demos/medical_analysis/medical_coding_report.md rename to examples/demos/medical/medical_coding_report.md diff --git a/examples/demos/medical_analysis/medical_diagnosis_report.md b/examples/demos/medical/medical_diagnosis_report.md similarity index 100% rename from examples/demos/medical_analysis/medical_diagnosis_report.md rename to examples/demos/medical/medical_diagnosis_report.md diff --git a/examples/demos/medical_analysis/new_medical_rearrange.py b/examples/demos/medical/new_medical_rearrange.py similarity index 100% rename from examples/demos/medical_analysis/new_medical_rearrange.py rename to examples/demos/medical/new_medical_rearrange.py diff --git a/examples/demos/ollama_demo.py b/examples/demos/medical/ollama_demo.py similarity index 100% rename from examples/demos/ollama_demo.py rename to examples/demos/medical/ollama_demo.py diff --git a/examples/demos/medical_analysis/rearrange_video_examples/reports/medical_analysis_agent_rearrange.md b/examples/demos/medical/rearrange_video_examples/reports/medical_analysis_agent_rearrange.md similarity index 100% rename from examples/demos/medical_analysis/rearrange_video_examples/reports/medical_analysis_agent_rearrange.md rename to examples/demos/medical/rearrange_video_examples/reports/medical_analysis_agent_rearrange.md diff --git a/examples/demos/medical_analysis/rearrange_video_examples/reports/vc_document_analysis.md b/examples/demos/medical/rearrange_video_examples/reports/vc_document_analysis.md similarity index 100% rename from examples/demos/medical_analysis/rearrange_video_examples/reports/vc_document_analysis.md rename to examples/demos/medical/rearrange_video_examples/reports/vc_document_analysis.md diff --git a/examples/demos/medical_analysis/rearrange_video_examples/term_sheet_swarm.py b/examples/demos/medical/rearrange_video_examples/term_sheet_swarm.py similarity index 100% rename from examples/demos/medical_analysis/rearrange_video_examples/term_sheet_swarm.py rename to examples/demos/medical/rearrange_video_examples/term_sheet_swarm.py diff --git a/examples/applications/real_estate/README_realtor.md b/examples/demos/real_estate/README_realtor.md similarity index 100% rename from examples/applications/real_estate/README_realtor.md rename to examples/demos/real_estate/README_realtor.md diff --git a/examples/demos/morgtate_swarm.py b/examples/demos/real_estate/morgtate_swarm.py similarity index 100% rename from examples/demos/morgtate_swarm.py rename to examples/demos/real_estate/morgtate_swarm.py diff --git a/examples/demos/real_estate_agent.py b/examples/demos/real_estate/real_estate_agent.py similarity index 100% rename from examples/demos/real_estate_agent.py rename to examples/demos/real_estate/real_estate_agent.py diff --git a/examples/applications/real_estate/realtor_agent.py b/examples/demos/real_estate/realtor_agent.py similarity index 100% rename from examples/applications/real_estate/realtor_agent.py rename to examples/demos/real_estate/realtor_agent.py diff --git a/examples/demos/scient_agents/deep_research_swarm_example.py b/examples/demos/science/deep_research_swarm_example.py similarity index 100% rename from examples/demos/scient_agents/deep_research_swarm_example.py rename to examples/demos/science/deep_research_swarm_example.py diff --git a/examples/demos/materials_science_agents.py b/examples/demos/science/materials_science_agents.py similarity index 100% rename from examples/demos/materials_science_agents.py rename to examples/demos/science/materials_science_agents.py diff --git a/examples/demos/open_scientist.py b/examples/demos/science/open_scientist.py similarity index 100% rename from examples/demos/open_scientist.py rename to examples/demos/science/open_scientist.py diff --git a/examples/demos/scient_agents/paper_idea_agent.py b/examples/demos/science/paper_idea_agent.py similarity index 100% rename from examples/demos/scient_agents/paper_idea_agent.py rename to examples/demos/science/paper_idea_agent.py diff --git a/examples/demos/scient_agents/paper_idea_profile.py b/examples/demos/science/paper_idea_profile.py similarity index 100% rename from examples/demos/scient_agents/paper_idea_profile.py rename to examples/demos/science/paper_idea_profile.py diff --git a/profession_sim/convert_json_to_csv.py b/examples/demos/synthetic_data/profession_sim/convert_json_to_csv.py similarity index 100% rename from profession_sim/convert_json_to_csv.py rename to examples/demos/synthetic_data/profession_sim/convert_json_to_csv.py diff --git a/profession_sim/data.csv b/examples/demos/synthetic_data/profession_sim/data.csv similarity index 100% rename from profession_sim/data.csv rename to examples/demos/synthetic_data/profession_sim/data.csv diff --git a/profession_sim/format_prompt.py b/examples/demos/synthetic_data/profession_sim/format_prompt.py similarity index 100% rename from profession_sim/format_prompt.py rename to examples/demos/synthetic_data/profession_sim/format_prompt.py diff --git a/profession_sim/profession_persona_generator.py b/examples/demos/synthetic_data/profession_sim/profession_persona_generator.py similarity index 100% rename from profession_sim/profession_persona_generator.py rename to examples/demos/synthetic_data/profession_sim/profession_persona_generator.py diff --git a/profession_sim/profession_personas.csv b/examples/demos/synthetic_data/profession_sim/profession_personas.csv similarity index 100% rename from profession_sim/profession_personas.csv rename to examples/demos/synthetic_data/profession_sim/profession_personas.csv diff --git a/profession_sim/profession_personas.progress.backup.json b/examples/demos/synthetic_data/profession_sim/profession_personas.progress.backup.json similarity index 100% rename from profession_sim/profession_personas.progress.backup.json rename to examples/demos/synthetic_data/profession_sim/profession_personas.progress.backup.json diff --git a/profession_sim/profession_personas.progress.json b/examples/demos/synthetic_data/profession_sim/profession_personas.progress.json similarity index 100% rename from profession_sim/profession_personas.progress.json rename to examples/demos/synthetic_data/profession_sim/profession_personas.progress.json diff --git a/profession_sim/profession_personas_new_10.csv b/examples/demos/synthetic_data/profession_sim/profession_personas_new_10.csv similarity index 100% rename from profession_sim/profession_personas_new_10.csv rename to examples/demos/synthetic_data/profession_sim/profession_personas_new_10.csv diff --git a/profession_sim/profession_personas_new_10.progress.backup.json b/examples/demos/synthetic_data/profession_sim/profession_personas_new_10.progress.backup.json similarity index 100% rename from profession_sim/profession_personas_new_10.progress.backup.json rename to examples/demos/synthetic_data/profession_sim/profession_personas_new_10.progress.backup.json diff --git a/profession_sim/profession_personas_new_10.progress.backup_new.json b/examples/demos/synthetic_data/profession_sim/profession_personas_new_10.progress.backup_new.json similarity index 100% rename from profession_sim/profession_personas_new_10.progress.backup_new.json rename to examples/demos/synthetic_data/profession_sim/profession_personas_new_10.progress.backup_new.json diff --git a/profession_sim/profession_personas_new_10.progress.json b/examples/demos/synthetic_data/profession_sim/profession_personas_new_10.progress.json similarity index 100% rename from profession_sim/profession_personas_new_10.progress.json rename to examples/demos/synthetic_data/profession_sim/profession_personas_new_10.progress.json diff --git a/profession_sim/profession_personas_new_10.progress_neee.json b/examples/demos/synthetic_data/profession_sim/profession_personas_new_10.progress_neee.json similarity index 100% rename from profession_sim/profession_personas_new_10.progress_neee.json rename to examples/demos/synthetic_data/profession_sim/profession_personas_new_10.progress_neee.json diff --git a/profession_sim/prompt.txt b/examples/demos/synthetic_data/profession_sim/prompt.txt similarity index 100% rename from profession_sim/prompt.txt rename to examples/demos/synthetic_data/profession_sim/prompt.txt diff --git a/profession_sim/prompt_formatted.md b/examples/demos/synthetic_data/profession_sim/prompt_formatted.md similarity index 100% rename from profession_sim/prompt_formatted.md rename to examples/demos/synthetic_data/profession_sim/prompt_formatted.md diff --git a/examples/multi_agent/swarm_router/sr_moa_example.py b/examples/multi_agent/swarm_router/sr_moa_example.py new file mode 100644 index 00000000..55fa408c --- /dev/null +++ b/examples/multi_agent/swarm_router/sr_moa_example.py @@ -0,0 +1,81 @@ +from swarms import Agent, SwarmRouter + +# Agent 1: Risk Metrics Calculator +risk_metrics_agent = Agent( + agent_name="Risk-Metrics-Calculator", + agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility", + system_prompt="""You are a risk metrics specialist. Calculate and explain: + - Value at Risk (VaR) + - Sharpe ratio + - Volatility + - Maximum drawdown + - Beta coefficient + + Provide clear, numerical results with brief explanations.""", + max_loops=1, + model_name="gpt-4.1", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + +# Agent 2: Portfolio Risk Analyzer +portfolio_risk_agent = Agent( + agent_name="Portfolio-Risk-Analyzer", + agent_description="Analyzes portfolio diversification and concentration risk", + system_prompt="""You are a portfolio risk analyst. Focus on: + - Portfolio diversification analysis + - Concentration risk assessment + - Correlation analysis + - Sector/asset allocation risk + - Liquidity risk evaluation + + Provide actionable insights for risk reduction.""", + max_loops=1, + model_name="gpt-4.1", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + +# Agent 3: Market Risk Monitor +market_risk_agent = Agent( + agent_name="Market-Risk-Monitor", + agent_description="Monitors market conditions and identifies risk factors", + system_prompt="""You are a market risk monitor. Identify and assess: + - Market volatility trends + - Economic risk factors + - Geopolitical risks + - Interest rate risks + - Currency risks + + Provide current risk alerts and trends.""", + max_loops=1, + model_name="gpt-4.1", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + + +swarm = SwarmRouter( + name="SwarmRouter", + description="A router that can route messages to the appropriate swarm", + agents=[ + risk_metrics_agent, + portfolio_risk_agent, + ], + max_loops=1, + swarm_type="SequentialWorkflow", + output_type="final", +) + + +out = swarm.run( + "What are the best ways to short the EU markets. Give me specific tickrs to short and strategies to use. Create a comprehensive report with all the information you can find." +) + +print(out) diff --git a/examples/multi_agent/swarm_router/swarm_router_benchmark.py b/examples/multi_agent/swarm_router/swarm_router_benchmark.py new file mode 100644 index 00000000..3c9555d5 --- /dev/null +++ b/examples/multi_agent/swarm_router/swarm_router_benchmark.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +""" +SwarmRouter Performance Benchmark + +This script benchmarks the performance improvements in SwarmRouter's _create_swarm method. +It compares the old O(n) elif chain vs the new O(1) factory pattern with caching. +""" + +import time +import statistics +from typing import List, Dict, Any +from swarms.structs.swarm_router import SwarmRouter +from swarms.structs.agent import Agent + + +def create_mock_agents(num_agents: int = 3) -> List[Agent]: + """Create mock agents for testing purposes.""" + agents = [] + for i in range(num_agents): + # Create a simple mock agent + agent = Agent( + agent_name=f"TestAgent_{i}", + system_prompt=f"You are test agent {i}", + model_name="gpt-4o-mini", + max_loops=1, + ) + agents.append(agent) + return agents + + +def benchmark_swarm_creation( + swarm_types: List[str], + num_iterations: int = 100, + agents: List[Agent] = None, +) -> Dict[str, Dict[str, Any]]: + """ + Benchmark swarm creation performance for different swarm types. + + Args: + swarm_types: List of swarm types to test + num_iterations: Number of iterations to run for each swarm type + agents: List of agents to use for testing + + Returns: + Dictionary containing performance metrics for each swarm type + """ + if agents is None: + agents = create_mock_agents() + + results = {} + + for swarm_type in swarm_types: + print(f"Benchmarking {swarm_type}...") + times = [] + + for i in range(num_iterations): + # Create a fresh SwarmRouter instance for each test + router = SwarmRouter( + name=f"test-router-{i}", + agents=agents, + swarm_type=swarm_type, + telemetry_enabled=False, + ) + + # Time the _create_swarm method + start_time = time.perf_counter() + try: + router._create_swarm(task="test task") + end_time = time.perf_counter() + times.append(end_time - start_time) + except Exception as e: + print(f"Failed to create {swarm_type}: {e}") + continue + + if times: + results[swarm_type] = { + "mean_time": statistics.mean(times), + "median_time": statistics.median(times), + "min_time": min(times), + "max_time": max(times), + "std_dev": ( + statistics.stdev(times) if len(times) > 1 else 0 + ), + "total_iterations": len(times), + } + + return results + + +def benchmark_caching_performance( + swarm_type: str = "SequentialWorkflow", + num_iterations: int = 50, + agents: List[Agent] = None, +) -> Dict[str, Any]: + """ + Benchmark the caching performance by creating the same swarm multiple times. + + Args: + swarm_type: The swarm type to test + num_iterations: Number of iterations to run + agents: List of agents to use for testing + + Returns: + Dictionary containing caching performance metrics + """ + if agents is None: + agents = create_mock_agents() + + print(f"Benchmarking caching performance for {swarm_type}...") + + router = SwarmRouter( + name="cache-test-router", + agents=agents, + swarm_type=swarm_type, + telemetry_enabled=False, + ) + + first_call_times = [] + cached_call_times = [] + + for i in range(num_iterations): + # Clear cache for first call timing + router._swarm_cache.clear() + + # Time first call (cache miss) + start_time = time.perf_counter() + router._create_swarm(task="test task", iteration=i) + end_time = time.perf_counter() + first_call_times.append(end_time - start_time) + + # Time second call (cache hit) + start_time = time.perf_counter() + router._create_swarm(task="test task", iteration=i) + end_time = time.perf_counter() + cached_call_times.append(end_time - start_time) + + return { + "first_call_mean": statistics.mean(first_call_times), + "cached_call_mean": statistics.mean(cached_call_times), + "speedup_factor": statistics.mean(first_call_times) + / statistics.mean(cached_call_times), + "cache_hit_ratio": 1.0, # 100% cache hit rate in this test + } + + +def print_results(results: Dict[str, Dict[str, Any]]): + """Print benchmark results in a formatted way.""" + print("\n" + "=" * 60) + print("SWARM CREATION PERFORMANCE BENCHMARK RESULTS") + print("=" * 60) + + for swarm_type, metrics in results.items(): + print(f"\n{swarm_type}:") + print(f" Mean time: {metrics['mean_time']:.6f} seconds") + print(f" Median time: {metrics['median_time']:.6f} seconds") + print(f" Min time: {metrics['min_time']:.6f} seconds") + print(f" Max time: {metrics['max_time']:.6f} seconds") + print(f" Std dev: {metrics['std_dev']:.6f} seconds") + print(f" Iterations: {metrics['total_iterations']}") + + +def print_caching_results(results: Dict[str, Any]): + """Print caching benchmark results.""" + print("\n" + "=" * 60) + print("CACHING PERFORMANCE BENCHMARK RESULTS") + print("=" * 60) + + print( + f"First call mean time: {results['first_call_mean']:.6f} seconds" + ) + print( + f"Cached call mean time: {results['cached_call_mean']:.6f} seconds" + ) + print(f"Speedup factor: {results['speedup_factor']:.2f}x") + print(f"Cache hit ratio: {results['cache_hit_ratio']:.1%}") + + +def main(): + """Run the complete benchmark suite.""" + print("SwarmRouter Performance Benchmark") + print( + "Testing O(1) factory pattern with caching vs O(n) elif chain" + ) + print("-" * 60) + + # Create test agents + agents = create_mock_agents(3) + + # Test different swarm types + swarm_types = [ + "SequentialWorkflow", + "ConcurrentWorkflow", + "AgentRearrange", + "MixtureOfAgents", + "GroupChat", + "MultiAgentRouter", + "HeavySwarm", + "MALT", + ] + + # Run creation benchmark + creation_results = benchmark_swarm_creation( + swarm_types=swarm_types[:4], # Test first 4 for speed + num_iterations=20, + agents=agents, + ) + + print_results(creation_results) + + # Run caching benchmark + caching_results = benchmark_caching_performance( + swarm_type="SequentialWorkflow", + num_iterations=10, + agents=agents, + ) + + print_caching_results(caching_results) + + # Calculate overall performance improvement + if creation_results: + avg_creation_time = statistics.mean( + [ + metrics["mean_time"] + for metrics in creation_results.values() + ] + ) + print("\n" + "=" * 60) + print("PERFORMANCE SUMMARY") + print("=" * 60) + print( + f"Average swarm creation time: {avg_creation_time:.6f} seconds" + ) + print( + "Factory pattern provides O(1) lookup vs O(n) elif chain" + ) + print( + f"Caching provides {caching_results['speedup_factor']:.2f}x speedup for repeated calls" + ) + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/swarms/agents/cort_agent.py b/swarms/agents/cort_agent.py deleted file mode 100644 index 3d720e7c..00000000 --- a/swarms/agents/cort_agent.py +++ /dev/null @@ -1,206 +0,0 @@ -# AI generate initial response -# AI decides how many "thinking rounds" it needs -# For each round: -# Generates 3 alternative responses -# Evaluates all responses -# Picks the best one -# Final response is the survivor of this AI battle royale -from swarms import Agent - - -# OpenAI function schema for determining thinking rounds -thinking_rounds_schema = { - "name": "determine_thinking_rounds", - "description": "Determines the optimal number of thinking rounds needed for a task", - "parameters": { - "type": "object", - "properties": { - "num_rounds": { - "type": "integer", - "description": "The number of thinking rounds needed (1-5)", - "minimum": 1, - "maximum": 5, - } - }, - "required": ["num_rounds"], - }, -} - -# System prompt for determining thinking rounds -THINKING_ROUNDS_PROMPT = """You are an expert at determining the optimal number of thinking rounds needed for complex tasks. Your role is to analyze the task and determine how many rounds of thinking and evaluation would be most beneficial. - -Consider the following factors when determining the number of rounds: -1. Task Complexity: More complex tasks may require more rounds -2. Potential for Multiple Valid Approaches: Tasks with multiple valid solutions need more rounds -3. Risk of Error: Higher-stakes tasks may benefit from more rounds -4. Time Sensitivity: Balance thoroughness with efficiency - -Guidelines for number of rounds: -- 1 round: Simple, straightforward tasks with clear solutions -- 2-3 rounds: Moderately complex tasks with some ambiguity -- 4-5 rounds: Highly complex tasks with multiple valid approaches or high-stakes decisions - -Your response should be a single number between 1 and 5, representing the optimal number of thinking rounds needed.""" - -# Schema for generating alternative responses -alternative_responses_schema = { - "name": "generate_alternatives", - "description": "Generates multiple alternative responses to a task", - "parameters": { - "type": "object", - "properties": { - "alternatives": { - "type": "array", - "description": "List of alternative responses", - "items": { - "type": "object", - "properties": { - "response": { - "type": "string", - "description": "The alternative response", - }, - "reasoning": { - "type": "string", - "description": "Explanation of why this approach was chosen", - }, - }, - "required": ["response", "reasoning"], - }, - "minItems": 3, - "maxItems": 3, - } - }, - "required": ["alternatives"], - }, -} - -# Schema for evaluating responses -evaluation_schema = { - "name": "evaluate_responses", - "description": "Evaluates and ranks alternative responses", - "parameters": { - "type": "object", - "properties": { - "evaluation": { - "type": "object", - "properties": { - "best_response": { - "type": "string", - "description": "The selected best response", - }, - "ranking": { - "type": "array", - "description": "Ranked list of responses from best to worst", - "items": { - "type": "object", - "properties": { - "response": { - "type": "string", - "description": "The response", - }, - "score": { - "type": "number", - "description": "Score from 0-100", - }, - "reasoning": { - "type": "string", - "description": "Explanation of the score", - }, - }, - "required": [ - "response", - "score", - "reasoning", - ], - }, - }, - }, - "required": ["best_response", "ranking"], - } - }, - "required": ["evaluation"], - }, -} - -# System prompt for generating alternatives -ALTERNATIVES_PROMPT = """You are an expert at generating diverse and creative alternative responses to tasks. Your role is to generate 3 distinct approaches to solving the given task. - -For each alternative: -1. Consider a different perspective or approach -2. Provide clear reasoning for why this approach might be effective -3. Ensure alternatives are meaningfully different from each other -4. Maintain high quality and relevance to the task - -Your response should include 3 alternatives, each with its own reasoning.""" - -# System prompt for evaluation -EVALUATION_PROMPT = """You are an expert at evaluating and comparing different responses to tasks. Your role is to critically analyze each response and determine which is the most effective. - -Consider the following criteria when evaluating: -1. Relevance to the task -2. Completeness of the solution -3. Creativity and innovation -4. Practicality and feasibility -5. Clarity and coherence - -Your response should include: -1. The best response selected -2. A ranked list of all responses with scores and reasoning""" - - -class CortAgent: - def __init__( - self, - alternative_responses: int = 3, - ): - self.thinking_rounds = Agent( - agent_name="CortAgent", - agent_description="CortAgent is a multi-step agent that uses a battle royale approach to determine the best response to a task.", - model_name="gpt-4o-mini", - max_loops=1, - dynamic_temperature_enabled=True, - tools_list_dictionary=thinking_rounds_schema, - system_prompt=THINKING_ROUNDS_PROMPT, - ) - - self.alternatives_agent = Agent( - agent_name="CortAgentAlternatives", - agent_description="Generates multiple alternative responses to a task", - model_name="gpt-4o-mini", - max_loops=1, - dynamic_temperature_enabled=True, - tools_list_dictionary=alternative_responses_schema, - system_prompt=ALTERNATIVES_PROMPT, - ) - - self.evaluation_agent = Agent( - agent_name="CortAgentEvaluation", - agent_description="Evaluates and ranks alternative responses", - model_name="gpt-4o-mini", - max_loops=1, - dynamic_temperature_enabled=True, - tools_list_dictionary=evaluation_schema, - system_prompt=EVALUATION_PROMPT, - ) - - def run(self, task: str): - # First determine number of thinking rounds - num_rounds = self.thinking_rounds.run(task) - - # Initialize with the task - current_task = task - best_response = None - - # Run the battle royale for the determined number of rounds - for round_num in range(num_rounds): - # Generate alternatives - alternatives = self.alternatives_agent.run(current_task) - - # Evaluate alternatives - evaluation = self.evaluation_agent.run(alternatives) - - # Update best response and current task for next round - best_response = evaluation["evaluation"]["best_response"] - current_task = f"Previous best response: {best_response}\nOriginal task: {task}" - - return best_response diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index b786e3ec..b60399a2 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -16,39 +16,122 @@ logger = initialize_logger(log_folder="concurrent_workflow") class ConcurrentWorkflow(BaseSwarm): """ - Represents a concurrent workflow that executes multiple agents concurrently in a production-grade manner. - Features include: - - Caching for repeated prompts - - Enhanced error handling and retries - - Input validation + A production-grade concurrent workflow orchestrator that executes multiple agents simultaneously. + + ConcurrentWorkflow is designed for high-performance multi-agent orchestration with advanced features + including real-time monitoring, error handling, caching, and flexible output formatting. It's ideal + for scenarios where multiple agents need to process the same task independently and their results + need to be aggregated. + + Key Features: + - Concurrent execution using ThreadPoolExecutor for optimal performance + - Real-time dashboard monitoring with status updates + - Comprehensive error handling and recovery + - Flexible output formatting options + - Automatic prompt engineering capabilities + - Conversation history management + - Metadata persistence and auto-saving + - Support for both single and batch processing + - Image input support for multimodal agents + + Use Cases: + - Multi-perspective analysis (financial, legal, technical reviews) + - Consensus building and voting systems + - Parallel data processing and analysis + - A/B testing with different agent configurations + - Redundancy and reliability improvements Args: - name (str): The name of the workflow. Defaults to "ConcurrentWorkflow". - description (str): The description of the workflow. Defaults to "Execution of multiple agents concurrently". - agents (List[Agent]): The list of agents to be executed concurrently. Defaults to an empty list. - metadata_output_path (str): The path to save the metadata output. Defaults to "agent_metadata.json". - auto_save (bool): Flag indicating whether to automatically save the metadata. Defaults to False. - output_type (str): The type of output format. Defaults to "dict". - max_loops (int): The maximum number of loops for each agent. Defaults to 1. - return_str_on (bool): Flag indicating whether to return the output as a string. Defaults to False. - auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. Defaults to False. - return_entire_history (bool): Flag indicating whether to return the entire conversation history. Defaults to False. - show_dashboard (bool): Flag indicating whether to show a real-time dashboard. Defaults to True. + name (str, optional): Unique identifier for the workflow instance. + Defaults to "ConcurrentWorkflow". + description (str, optional): Human-readable description of the workflow's purpose. + Defaults to "Execution of multiple agents concurrently". + agents (List[Union[Agent, Callable]], optional): List of Agent instances or callable objects + to execute concurrently. Each agent should implement a `run` method. + Defaults to empty list. + metadata_output_path (str, optional): File path for saving execution metadata and results. + Supports JSON format. Defaults to "agent_metadata.json". + auto_save (bool, optional): Whether to automatically save conversation history and metadata + after each run. Defaults to True. + output_type (str, optional): Format for aggregating agent outputs. Options include: + - "dict-all-except-first": Dictionary with all agent outputs except the first + - "dict": Dictionary with all agent outputs + - "str": Concatenated string of all outputs + - "list": List of individual agent outputs + Defaults to "dict-all-except-first". + max_loops (int, optional): Maximum number of execution loops for each agent. + Defaults to 1. + auto_generate_prompts (bool, optional): Enable automatic prompt engineering for all agents. + When True, agents will enhance their prompts automatically. Defaults to False. + show_dashboard (bool, optional): Enable real-time dashboard display showing agent status, + progress, and outputs. Useful for monitoring and debugging. Defaults to False. + *args: Additional positional arguments passed to the BaseSwarm parent class. + **kwargs: Additional keyword arguments passed to the BaseSwarm parent class. Raises: - ValueError: If the list of agents is empty or if the description is empty. + ValueError: If agents list is empty or None. + ValueError: If description is empty or None. + TypeError: If agents list contains non-Agent, non-callable objects. Attributes: - name (str): The name of the workflow. - description (str): The description of the workflow. - agents (List[Agent]): The list of agents to be executed concurrently. - metadata_output_path (str): The path to save the metadata output. - auto_save (bool): Flag indicating whether to automatically save the metadata. - output_type (str): The type of output format. - max_loops (int): The maximum number of loops for each agent. - auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. - show_dashboard (bool): Flag indicating whether to show a real-time dashboard. - agent_statuses (dict): Dictionary to track agent statuses. + name (str): The workflow instance name. + description (str): The workflow description. + agents (List[Union[Agent, Callable]]): List of agents to execute. + metadata_output_path (str): Path for metadata output file. + auto_save (bool): Auto-save flag for metadata persistence. + output_type (str): Output aggregation format. + max_loops (int): Maximum execution loops per agent. + auto_generate_prompts (bool): Auto prompt engineering flag. + show_dashboard (bool): Dashboard display flag. + agent_statuses (dict): Real-time status tracking for each agent. + conversation (Conversation): Conversation history manager. + + Example: + Basic usage with multiple agents: + + >>> from swarms import Agent, ConcurrentWorkflow + >>> + >>> # Create specialized agents + >>> financial_agent = Agent( + ... agent_name="Financial-Analyst", + ... system_prompt="You are a financial analysis expert...", + ... model_name="gpt-4" + ... ) + >>> legal_agent = Agent( + ... agent_name="Legal-Advisor", + ... system_prompt="You are a legal expert...", + ... model_name="gpt-4" + ... ) + >>> + >>> # Create workflow + >>> workflow = ConcurrentWorkflow( + ... name="Multi-Expert-Analysis", + ... agents=[financial_agent, legal_agent], + ... show_dashboard=True, + ... auto_save=True + ... ) + >>> + >>> # Execute analysis + >>> task = "Analyze the risks of investing in cryptocurrency" + >>> results = workflow.run(task) + >>> print(f"Analysis complete with {len(results)} perspectives") + + Batch processing example: + + >>> tasks = [ + ... "Analyze Q1 financial performance", + ... "Review Q2 market trends", + ... "Forecast Q3 projections" + ... ] + >>> batch_results = workflow.batch_run(tasks) + >>> print(f"Processed {len(batch_results)} quarterly reports") + + Note: + - Agents are executed using ThreadPoolExecutor with 95% of available CPU cores + - Each agent runs independently and cannot communicate with others during execution + - The workflow maintains conversation history across all runs for context + - Dashboard mode disables individual agent printing to prevent output conflicts + - Error handling ensures partial results are available even if some agents fail """ def __init__( @@ -61,10 +144,21 @@ class ConcurrentWorkflow(BaseSwarm): output_type: str = "dict-all-except-first", max_loops: int = 1, auto_generate_prompts: bool = False, - show_dashboard: bool = True, + show_dashboard: bool = False, *args, **kwargs, ): + """ + Initialize the ConcurrentWorkflow with configuration parameters. + + Performs initialization, validation, and setup of internal components including + conversation management, agent status tracking, and dashboard configuration. + + Note: + The constructor automatically performs reliability checks and configures + agents for dashboard mode if enabled. Agent print outputs are disabled + when dashboard mode is active to prevent display conflicts. + """ super().__init__( name=name, description=description, @@ -93,12 +187,58 @@ class ConcurrentWorkflow(BaseSwarm): self.agents = self.fix_agents() def fix_agents(self): + """ + Configure agents for dashboard mode by disabling individual print outputs. + + When dashboard mode is enabled, individual agent print outputs can interfere + with the dashboard display. This method disables print_on for all agents + to ensure clean dashboard rendering. + + Returns: + List[Agent]: The modified list of agents with print_on disabled. + + Note: + This method only modifies agents when show_dashboard is True. + Agent functionality is not affected, only their output display behavior. + + Example: + >>> workflow = ConcurrentWorkflow(show_dashboard=True, agents=[agent1, agent2]) + >>> # Agents automatically configured for dashboard mode + >>> all(not agent.print_on for agent in workflow.agents) + True + """ if self.show_dashboard is True: for agent in self.agents: agent.print_on = False return self.agents def reliability_check(self): + """ + Perform comprehensive validation of workflow configuration and agents. + + Validates that the workflow is properly configured with valid agents and + provides warnings for suboptimal configurations. This method is called + automatically during initialization. + + Validates: + - Agents list is not None or empty + - At least one agent is provided + - Warns if only one agent is provided (suboptimal for concurrent execution) + + Raises: + ValueError: If agents list is None or empty. + + Logs: + Warning: If only one agent is provided (concurrent execution not beneficial). + Error: If validation fails with detailed error information. + + Example: + >>> workflow = ConcurrentWorkflow(agents=[]) + ValueError: ConcurrentWorkflow: No agents provided + + >>> workflow = ConcurrentWorkflow(agents=[single_agent]) + # Logs warning about single agent usage + """ try: if self.agents is None: raise ValueError( @@ -122,12 +262,31 @@ class ConcurrentWorkflow(BaseSwarm): def activate_auto_prompt_engineering(self): """ - Activates the auto-generate prompts feature for all agents in the workflow. + Enable automatic prompt engineering for all agents in the workflow. + + When activated, each agent will automatically enhance and optimize their + system prompts based on the task context and their previous interactions. + This can improve response quality but may increase execution time. + + Side Effects: + - Sets auto_generate_prompt=True for all agents in the workflow + - Affects subsequent agent.run() calls, not retroactively + - May increase latency due to prompt optimization overhead + + Note: + This method can be called at any time, but changes only affect + future agent executions. Already running agents are not affected. Example: - >>> workflow = ConcurrentWorkflow(agents=[Agent()]) + >>> workflow = ConcurrentWorkflow(agents=[agent1, agent2]) >>> workflow.activate_auto_prompt_engineering() - >>> # All agents in the workflow will now auto-generate prompts. + >>> # All agents will now auto-generate optimized prompts + >>> result = workflow.run("Complex analysis task") + >>> # Agents used enhanced prompts for better performance + + See Also: + - Agent.auto_generate_prompt: Individual agent prompt engineering + - auto_generate_prompts: Constructor parameter for default behavior """ if self.auto_generate_prompts is True: for agent in self.agents: @@ -139,11 +298,45 @@ class ConcurrentWorkflow(BaseSwarm): is_final: bool = False, ) -> None: """ - Displays the current status of all agents in a beautiful dashboard format. + Display a real-time dashboard showing the current status of all agents. + + Renders a formatted dashboard with agent names, execution status, and + output previews. The dashboard is updated in real-time during workflow + execution to provide visibility into agent progress and results. Args: - title (str): The title of the dashboard. - is_final (bool): Flag indicating whether this is the final dashboard. + title (str, optional): The dashboard title to display at the top. + Defaults to "🤖 Agent Dashboard". + is_final (bool, optional): Whether this is the final dashboard display + after all agents have completed. Changes formatting and styling. + Defaults to False. + + Side Effects: + - Prints formatted dashboard to console + - Updates display in real-time during execution + - May clear previous dashboard content for clean updates + + Note: + This method is automatically called during workflow execution when + show_dashboard=True. Manual calls are supported for custom monitoring. + + Dashboard Status Values: + - "pending": Agent queued but not yet started + - "running": Agent currently executing task + - "completed": Agent finished successfully + - "error": Agent execution failed with error + + Example: + >>> workflow = ConcurrentWorkflow(agents=[agent1, agent2], show_dashboard=True) + >>> workflow.display_agent_dashboard("Custom Dashboard Title") + # Displays: + # 🤖 Custom Dashboard Title + # ┌─────────────────┬─────────┬──────────────────┐ + # │ Agent Name │ Status │ Output Preview │ + # ├─────────────────┼─────────┼──────────────────┤ + # │ Financial-Agent │ running │ Analyzing data...│ + # │ Legal-Agent │ pending │ │ + # └─────────────────┴─────────┴──────────────────┘ """ agents_data = [ { @@ -166,8 +359,66 @@ class ConcurrentWorkflow(BaseSwarm): imgs: Optional[List[str]] = None, ): """ - Executes all agents in the workflow concurrently on the given task. - Now includes real-time dashboard updates. + Execute all agents concurrently with real-time dashboard monitoring. + + This method provides the same concurrent execution as _run() but with + enhanced real-time monitoring through a visual dashboard. Agent status + updates are displayed in real-time, showing progress, completion, and + any errors that occur during execution. + + Args: + task (str): The task description or prompt to be executed by all agents. + This will be passed to each agent's run() method. + img (Optional[str], optional): Path to a single image file for agents + that support multimodal input. Defaults to None. + imgs (Optional[List[str]], optional): List of image file paths for + agents that support multiple image inputs. Defaults to None. + + Returns: + Any: Formatted output based on the configured output_type setting. + The return format depends on the output_type parameter set during + workflow initialization. + + Raises: + Exception: Re-raises any exceptions from agent execution after + updating the dashboard with error status. + + Side Effects: + - Displays initial dashboard showing all agents as "pending" + - Updates dashboard in real-time as agents start, run, and complete + - Displays final dashboard with all results when execution completes + - Adds all task inputs and agent outputs to conversation history + - Automatically cleans up dashboard display resources + + Dashboard Flow: + 1. Initial dashboard shows all agents as "pending" + 2. As agents start, status updates to "running" + 3. As agents complete, status updates to "completed" with output preview + 4. Final dashboard shows complete results summary + 5. Dashboard resources are cleaned up automatically + + Performance: + - Uses 95% of available CPU cores for optimal concurrency + - ThreadPoolExecutor manages thread lifecycle automatically + - Real-time updates have minimal performance overhead + + Example: + >>> workflow = ConcurrentWorkflow( + ... agents=[financial_agent, legal_agent], + ... show_dashboard=True + ... ) + >>> result = workflow.run_with_dashboard( + ... task="Analyze the merger proposal", + ... img="company_financials.png" + ... ) + # Dashboard shows real-time progress: + # Agent-1: pending -> running -> completed + # Agent-2: pending -> running -> completed + >>> print("Analysis complete:", result) + + Note: + This method is automatically called when show_dashboard=True. + For headless execution without dashboard, use _run() directly. """ try: self.conversation.add(role="User", content=task) @@ -279,20 +530,64 @@ class ConcurrentWorkflow(BaseSwarm): imgs: Optional[List[str]] = None, ): """ - Executes all agents in the workflow concurrently on the given task. + Execute all agents concurrently without dashboard monitoring (headless mode). + + This is the core execution method that runs all agents simultaneously using + ThreadPoolExecutor for optimal performance. Results are collected and + formatted according to the configured output_type. Args: - task (str): The task to be executed by all agents. - img (Optional[str]): Optional image path for agents that support image input. - imgs (Optional[List[str]]): Optional list of image paths for agents that support multiple image inputs. + task (str): The task description or prompt to be executed by all agents. + Each agent receives the same task input for independent processing. + img (Optional[str], optional): Path to a single image file for multimodal + agents. The same image is provided to all agents. Defaults to None. + imgs (Optional[List[str]], optional): List of image file paths for agents + that support multiple image inputs. All agents receive the same image list. + Defaults to None. Returns: - The formatted output based on the configured output_type. + Any: Formatted output according to the output_type configuration: + - "dict-all-except-first": Dict with all agent outputs except first + - "dict": Dict with all agent outputs keyed by agent name + - "str": Concatenated string of all agent outputs + - "list": List of individual agent outputs in completion order + + Side Effects: + - Adds the user task to conversation history + - Adds each agent's output to conversation history upon completion + - No visual output or dashboard updates (headless execution) + + Performance Characteristics: + - Uses ThreadPoolExecutor with 95% of available CPU cores + - Agents execute truly concurrently, not sequentially + - Results are collected in completion order, not submission order + - Memory efficient for large numbers of agents + + Error Handling: + - Individual agent failures don't stop other agents + - Failed agents have their exceptions logged but execution continues + - Partial results are still returned for successful agents + + Thread Safety: + - Conversation object handles concurrent access safely + - Agent status is not tracked in this method (dashboard-free) + - Each agent runs in isolation without shared state Example: - >>> workflow = ConcurrentWorkflow(agents=[agent1, agent2]) - >>> result = workflow.run("Analyze this financial data") - >>> print(result) + >>> workflow = ConcurrentWorkflow( + ... agents=[agent1, agent2, agent3], + ... output_type="dict" + ... ) + >>> result = workflow._run( + ... task="Analyze market trends for Q4", + ... img="market_chart.png" + ... ) + >>> print(f"Received {len(result)} agent analyses") + >>> # Result format: {"agent1": "analysis1", "agent2": "analysis2", ...} + + Note: + This method is called automatically by run() when show_dashboard=False. + For monitoring and real-time updates, use run_with_dashboard() instead. """ self.conversation.add(role="User", content=task) @@ -331,7 +626,79 @@ class ConcurrentWorkflow(BaseSwarm): imgs: Optional[List[str]] = None, ): """ - Executes all agents in the workflow concurrently on the given task. + Execute all agents concurrently on the given task with optional dashboard monitoring. + + This is the main entry point for workflow execution. Automatically selects + between dashboard-enabled execution (run_with_dashboard) or headless execution + (_run) based on the show_dashboard configuration. + + Args: + task (str): The task description, prompt, or instruction to be executed + by all agents concurrently. Each agent processes the same task + independently. + img (Optional[str], optional): Path to a single image file for agents + that support multimodal (text + image) input. Defaults to None. + imgs (Optional[List[str]], optional): List of image file paths for + agents that support multiple image inputs. Defaults to None. + + Returns: + Any: Aggregated results from all agents formatted according to output_type: + - "dict-all-except-first": Dictionary excluding first agent's output + - "dict": Complete dictionary with all agent outputs + - "str": Concatenated string of all outputs + - "list": List of individual agent outputs + + Execution Modes: + - Dashboard Mode (show_dashboard=True): Provides real-time visual monitoring + with status updates, progress tracking, and error visibility + - Headless Mode (show_dashboard=False): Silent execution with no visual output, + optimized for performance and automation scenarios + + Concurrent Execution: + - All agents run simultaneously using ThreadPoolExecutor + - Utilizes 95% of available CPU cores for optimal performance + - Thread-safe conversation history management + - Independent agent execution without inter-agent communication + + Error Resilience: + - Individual agent failures don't halt the entire workflow + - Partial results are returned for successful agents + - Comprehensive error logging for debugging + - Graceful degradation under failure conditions + + Example: + Basic concurrent execution: + + >>> workflow = ConcurrentWorkflow(agents=[analyst, reviewer, summarizer]) + >>> result = workflow.run("Evaluate the new product proposal") + >>> print(f"Received insights from {len(workflow.agents)} experts") + + With dashboard monitoring: + + >>> workflow = ConcurrentWorkflow( + ... agents=[financial_agent, legal_agent], + ... show_dashboard=True + ... ) + >>> result = workflow.run("Review merger agreement") + # Real-time dashboard shows progress and completion status + + Multimodal analysis: + + >>> result = workflow.run( + ... task="Analyze this chart and provide insights", + ... img="quarterly_results.png" + ... ) + + Performance Tips: + - Use 2+ agents to benefit from concurrent execution + - Dashboard mode adds minimal overhead for monitoring + - Larger agent counts scale well with available CPU cores + - Consider batch_run() for multiple related tasks + + See Also: + - batch_run(): For processing multiple tasks sequentially + - run_with_dashboard(): For direct dashboard execution + - _run(): For direct headless execution """ if self.show_dashboard: return self.run_with_dashboard(task, img, imgs) @@ -341,29 +708,112 @@ class ConcurrentWorkflow(BaseSwarm): def batch_run( self, tasks: List[str], - img: Optional[str] = None, imgs: Optional[List[str]] = None, ): """ - Executes the workflow on multiple tasks sequentially. + Execute the workflow on multiple tasks sequentially with concurrent agent processing. + + This method processes a list of tasks one by one, where each task is executed + by all agents concurrently. This is ideal for batch processing scenarios where + you have multiple related tasks that need the same multi-agent analysis. Args: - tasks (List[str]): List of tasks to be executed by all agents. - img (Optional[str]): Optional image path for agents that support image input. - imgs (Optional[List[str]]): Optional list of image paths for agents that support multiple image inputs. + tasks (List[str]): List of task descriptions, prompts, or instructions. + Each task will be processed by all agents concurrently before + moving to the next task. Tasks are processed sequentially. + imgs (Optional[List[str]], optional): List of image file paths corresponding + to each task. If provided, imgs[i] will be used for tasks[i]. + If fewer images than tasks are provided, remaining tasks will + execute without images. Defaults to None. Returns: - List of results, one for each task. + List[Any]: List of results, one for each task. Each result is formatted + according to the workflow's output_type configuration. The length + of the returned list equals the length of the input tasks list. + + Processing Flow: + 1. Tasks are processed sequentially (not concurrently with each other) + 2. For each task, all agents execute concurrently + 3. Results are collected and formatted for each task + 4. Conversation history accumulates across all tasks + 5. Final result list contains aggregated outputs for each task + + Image Handling: + - If imgs is None: All tasks execute without images + - If imgs has fewer items than tasks: Extra tasks execute without images + - If imgs has more items than tasks: Extra images are ignored + - Each task gets at most one corresponding image + + Dashboard Behavior: + - If show_dashboard=True, dashboard resets and updates for each task + - Progress is shown separately for each task's agent execution + - Final dashboard shows results from the last task only + + Memory and Performance: + - Conversation history grows with each task (cumulative) + - Memory usage scales with number of tasks and agent outputs + - CPU utilization is optimal during each task's concurrent execution + - Consider clearing conversation history for very large batch jobs Example: - >>> workflow = ConcurrentWorkflow(agents=[agent1, agent2]) - >>> tasks = ["Task 1", "Task 2", "Task 3"] - >>> results = workflow.batch_run(tasks) - >>> print(len(results)) # 3 + Financial analysis across quarters: + + >>> workflow = ConcurrentWorkflow(agents=[analyst1, analyst2, analyst3]) + >>> quarterly_tasks = [ + ... "Analyze Q1 financial performance and market position", + ... "Analyze Q2 financial performance and market position", + ... "Analyze Q3 financial performance and market position", + ... "Analyze Q4 financial performance and market position" + ... ] + >>> results = workflow.batch_run(quarterly_tasks) + >>> print(f"Completed {len(results)} quarterly analyses") + >>> # Each result contains insights from all 3 analysts for that quarter + + With corresponding images: + + >>> tasks = ["Analyze chart trends", "Review performance metrics"] + >>> charts = ["q1_chart.png", "q2_chart.png"] + >>> results = workflow.batch_run(tasks, imgs=charts) + >>> # Task 0 uses q1_chart.png, Task 1 uses q2_chart.png + + Batch processing with dashboard: + + >>> workflow = ConcurrentWorkflow( + ... agents=[agent1, agent2], + ... show_dashboard=True + ... ) + >>> results = workflow.batch_run([ + ... "Process document batch 1", + ... "Process document batch 2", + ... "Process document batch 3" + ... ]) + # Dashboard shows progress for each batch separately + + Use Cases: + - Multi-period financial analysis (quarterly/yearly reports) + - Batch document processing with multiple expert reviews + - A/B testing across different scenarios or datasets + - Systematic evaluation of multiple proposals or options + - Comparative analysis across time periods or categories + + Note: + Tasks are intentionally processed sequentially to maintain result + order and prevent resource contention. For truly concurrent task + processing, create separate workflow instances. + + See Also: + - run(): For single task execution + - ConcurrentWorkflow: For configuration options """ - return [ - self.run(task=task, img=img, imgs=imgs) for task in tasks - ] + results = [] + for idx, task in enumerate(tasks): + img = None + if imgs is not None: + # Use the img at the same index if available, else None + if idx < len(imgs): + img = imgs[idx] + results.append(self.run(task=task, img=img)) + return results # if __name__ == "__main__": diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index f3210dfe..aaa36b6e 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -91,6 +91,18 @@ class SwarmRouterConfig(BaseModel): arbitrary_types_allowed = True +class SwarmRouterRunError(Exception): + """Exception raised when an error occurs during task execution.""" + + pass + + +class SwarmRouterConfigError(Exception): + """Exception raised when an error occurs during task execution.""" + + pass + + class SwarmRouter: """ A class that dynamically routes tasks to different swarm types based on user selection or automatic matching. @@ -189,6 +201,8 @@ class SwarmRouter: heavy_swarm_question_agent_model_name: str = "gpt-4.1", heavy_swarm_worker_model_name: str = "claude-3-5-sonnet-20240620", telemetry_enabled: bool = False, + council_judge_model_name: str = "gpt-4o-mini", # Add missing model_name attribute + verbose: bool = False, *args, **kwargs, ): @@ -224,17 +238,73 @@ class SwarmRouter: heavy_swarm_worker_model_name ) self.telemetry_enabled = telemetry_enabled + self.council_judge_model_name = council_judge_model_name # Add missing model_name attribute + self.verbose = verbose + + # Initialize swarm factory for O(1) lookup performance + self._swarm_factory = self._initialize_swarm_factory() + self._swarm_cache = {} # Cache for created swarms # Reliability check self.reliability_check() - # Load agents from CSV - if self.load_agents_from_csv: - self.agents = AgentLoader( - csv_path=self.csv_file_path - ).load_agents() + def reliability_check(self): + """Perform reliability checks on swarm configuration. + + Validates essential swarm parameters and configuration before execution. + Handles special case for CouncilAsAJudge which may not require agents. + """ + try: + + if self.verbose: + logger.info( + f"[SwarmRouter Reliability Check] Initializing SwarmRouter '{self.name}'. " + "Validating required parameters for robust operation.\n" + "For detailed documentation on SwarmRouter configuration, usage, and available swarm types, " + "please visit: https://docs.swarms.world/en/latest/swarms/structs/swarm_router/" + ) + + # Check swarm type first since it affects other validations + if self.swarm_type is None: + raise SwarmRouterConfigError( + "SwarmRouter: Swarm type cannot be 'none'. Check the docs for all the swarm types available. https://docs.swarms.world/en/latest/swarms/structs/swarm_router/" + ) + + if self.agents is None: + raise SwarmRouterConfigError( + "SwarmRouter: No agents provided for the swarm. Check the docs to learn of required parameters. https://docs.swarms.world/en/latest/swarms/structs/agent/" + ) + + if ( + self.swarm_type == "AgentRearrange" + and self.rearrange_flow is None + ): + raise SwarmRouterConfigError( + "SwarmRouter: rearrange_flow cannot be 'none' when using AgentRearrange. Check the SwarmRouter docs to learn of required parameters. https://docs.swarms.world/en/latest/swarms/structs/agent_rearrange/" + ) + + # Validate max_loops + if self.max_loops == 0: + raise SwarmRouterConfigError( + "SwarmRouter: max_loops cannot be 0. Check the docs for all the max_loops available. https://docs.swarms.world/en/latest/swarms/structs/swarm_router/" + ) + + self.setup() - self.agent_config = self.agent_config() + # Load agents from CSV + if self.load_agents_from_csv: + self.agents = AgentLoader( + csv_path=self.csv_file_path + ).load_agents() + + if self.telemetry_enabled: + self.agent_config = self.agent_config() + + except SwarmRouterConfigError as e: + logger.error( + f"SwarmRouterConfigError: {str(e)} Full Traceback: {traceback.format_exc()}" + ) + raise e def setup(self): if self.auto_generate_prompts is True: @@ -254,6 +324,17 @@ class SwarmRouter: if self.list_all_agents is True: self.list_agents_to_eachother() + def fetch_message_history_as_string(self): + try: + return ( + self.swarm.conversation.return_all_except_first_string() + ) + except Exception as e: + logger.error( + f"Error fetching message history as string: {str(e)}" + ) + return None + def activate_shared_memory(self): logger.info("Activating shared memory with all agents ") @@ -296,41 +377,203 @@ class SwarmRouter: ) raise RuntimeError(error_msg) from e - def reliability_check(self): - """Perform reliability checks on swarm configuration. + def _initialize_swarm_factory(self) -> Dict[str, Callable]: + """ + Initialize the swarm factory with O(1) lookup performance. - Validates essential swarm parameters and configuration before execution. - Handles special case for CouncilAsAJudge which may not require agents. + Returns: + Dict[str, Callable]: Dictionary mapping swarm types to their factory functions. """ + return { + "HeavySwarm": self._create_heavy_swarm, + "AgentRearrange": self._create_agent_rearrange, + "MALT": self._create_malt, + "CouncilAsAJudge": self._create_council_as_judge, + "InteractiveGroupChat": self._create_interactive_group_chat, + "DeepResearchSwarm": self._create_deep_research_swarm, + "HiearchicalSwarm": self._create_hierarchical_swarm, + "MixtureOfAgents": self._create_mixture_of_agents, + "MajorityVoting": self._create_majority_voting, + "GroupChat": self._create_group_chat, + "MultiAgentRouter": self._create_multi_agent_router, + "SpreadSheetSwarm": self._create_spreadsheet_swarm, + "SequentialWorkflow": self._create_sequential_workflow, + "ConcurrentWorkflow": self._create_concurrent_workflow, + } - logger.info( - f"Initializing SwarmRouter: {self.name} Reliability Check..." + def _create_heavy_swarm(self, *args, **kwargs): + """Factory function for HeavySwarm.""" + return HeavySwarm( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + output_type=self.output_type, + loops_per_agent=self.heavy_swarm_loops_per_agent, + question_agent_model_name=self.heavy_swarm_question_agent_model_name, + worker_model_name=self.heavy_swarm_worker_model_name, ) - # Check swarm type first since it affects other validations - if self.swarm_type is None: - raise ValueError( - "SwarmRouter: Swarm type cannot be 'none'." - ) + def _create_agent_rearrange(self, *args, **kwargs): + """Factory function for AgentRearrange.""" + return AgentRearrange( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + flow=self.rearrange_flow, + return_json=self.return_json, + output_type=self.output_type, + return_entire_history=self.return_entire_history, + *args, + **kwargs, + ) - if self.agents is None: - raise ValueError( - "SwarmRouter: No agents provided for the swarm." - ) + def _create_malt(self, *args, **kwargs): + """Factory function for MALT.""" + return MALT( + name=self.name, + description=self.description, + max_loops=self.max_loops, + return_dict=True, + preset_agents=True, + ) + + def _create_council_as_judge(self, *args, **kwargs): + """Factory function for CouncilAsAJudge.""" + return CouncilAsAJudge( + name=self.name, + description=self.description, + model_name=self.council_judge_model_name, + output_type=self.output_type, + base_agent=self.agents[0] if self.agents else None, + ) + + def _create_interactive_group_chat(self, *args, **kwargs): + """Factory function for InteractiveGroupChat.""" + return InteractiveGroupChat( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + output_type=self.output_type, + speaker_function=self.speaker_function, + ) + + def _create_deep_research_swarm(self, *args, **kwargs): + """Factory function for DeepResearchSwarm.""" + return DeepResearchSwarm( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + output_type=self.output_type, + ) + + def _create_hierarchical_swarm(self, *args, **kwargs): + """Factory function for HierarchicalSwarm.""" + return HierarchicalSwarm( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + return_all_history=self.return_entire_history, + output_type=self.output_type, + *args, + **kwargs, + ) + + def _create_mixture_of_agents(self, *args, **kwargs): + """Factory function for MixtureOfAgents.""" + return MixtureOfAgents( + name=self.name, + description=self.description, + agents=self.agents, + aggregator_agent=self.agents[-1], + layers=self.max_loops, + output_type=self.output_type, + *args, + **kwargs, + ) + + def _create_majority_voting(self, *args, **kwargs): + """Factory function for MajorityVoting.""" + return MajorityVoting( + name=self.name, + description=self.description, + agents=self.agents, + consensus_agent=self.agents[-1], + *args, + **kwargs, + ) + + def _create_group_chat(self, *args, **kwargs): + """Factory function for GroupChat.""" + return GroupChat( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + speaker_fn=self.speaker_fn, + *args, + **kwargs, + ) + + def _create_multi_agent_router(self, *args, **kwargs): + """Factory function for MultiAgentRouter.""" + return MultiAgentRouter( + name=self.name, + description=self.description, + agents=self.agents, + shared_memory_system=self.shared_memory_system, + output_type=self.output_type, + ) - # Validate max_loops - if self.max_loops == 0: - raise ValueError("SwarmRouter: max_loops cannot be 0.") + def _create_spreadsheet_swarm(self, *args, **kwargs): + """Factory function for SpreadSheetSwarm.""" + return SpreadSheetSwarm( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + autosave_on=self.autosave, + *args, + **kwargs, + ) - self.setup() + def _create_sequential_workflow(self, *args, **kwargs): + """Factory function for SequentialWorkflow.""" + return SequentialWorkflow( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + shared_memory_system=self.shared_memory_system, + output_type=self.output_type, + return_json=self.return_json, + return_entire_history=self.return_entire_history, + *args, + **kwargs, + ) - logger.info( - f"Reliability check for parameters and configurations are complete. SwarmRouter: {self.name} is ready to run!" + def _create_concurrent_workflow(self, *args, **kwargs): + """Factory function for ConcurrentWorkflow.""" + return ConcurrentWorkflow( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + auto_save=self.autosave, + return_str_on=self.return_entire_history, + output_type=self.output_type, + *args, + **kwargs, ) def _create_swarm(self, task: str = None, *args, **kwargs): """ - Dynamically create and return the specified swarm type or automatically match the best swarm type for a given task. + Dynamically create and return the specified swarm type with O(1) lookup performance. + Uses factory pattern with caching for optimal performance. Args: task (str, optional): The task to be executed by the swarm. Defaults to None. @@ -344,165 +587,56 @@ class SwarmRouter: Raises: ValueError: If an invalid swarm type is provided. """ + # Handle auto swarm type selection if self.swarm_type == "auto": - self.swarm_type = str(swarm_matcher(task)) - - self._create_swarm(self.swarm_type) - - elif self.swarm_type == "HeavySwarm": - return HeavySwarm( - name=self.name, - description=self.description, - agents=self.agents, - max_loops=self.max_loops, - output_type=self.output_type, - loops_per_agent=self.heavy_swarm_loops_per_agent, - question_agent_model_name=self.heavy_swarm_question_agent_model_name, - worker_model_name=self.heavy_swarm_worker_model_name, - ) - - elif self.swarm_type == "AgentRearrange": - return AgentRearrange( - name=self.name, - description=self.description, - agents=self.agents, - max_loops=self.max_loops, - flow=self.rearrange_flow, - return_json=self.return_json, - output_type=self.output_type, - return_entire_history=self.return_entire_history, - *args, - **kwargs, - ) - - elif self.swarm_type == "MALT": - return MALT( - name=self.name, - description=self.description, - max_loops=self.max_loops, - return_dict=True, - preset_agents=True, - ) - - elif self.swarm_type == "CouncilAsAJudge": - return CouncilAsAJudge( - name=self.name, - description=self.description, - model_name=self.model_name, - output_type=self.output_type, - base_agent=self.agents[0] if self.agents else None, - ) + try: + matched_swarm_type = str(swarm_matcher(task)) + self.swarm_type = matched_swarm_type + logger.info( + f"Auto-selected swarm type: {matched_swarm_type}" + ) + except Exception as e: + logger.warning( + f"Auto-selection failed: {e}, falling back to SequentialWorkflow" + ) + self.swarm_type = "SequentialWorkflow" - elif self.swarm_type == "InteractiveGroupChat": - return InteractiveGroupChat( - name=self.name, - description=self.description, - agents=self.agents, - max_loops=self.max_loops, - output_type=self.output_type, - speaker_function=self.speaker_function, + # Check cache first for better performance + cache_key = ( + f"{self.swarm_type}_{hash(str(args) + str(kwargs))}" + ) + if cache_key in self._swarm_cache: + logger.debug(f"Using cached swarm: {self.swarm_type}") + return self._swarm_cache[cache_key] + + # Use factory pattern for O(1) lookup + factory_func = self._swarm_factory.get(self.swarm_type) + if factory_func is None: + valid_types = list(self._swarm_factory.keys()) + raise ValueError( + f"Invalid swarm type: {self.swarm_type}. " + f"Valid types are: {', '.join(valid_types)}" ) - elif self.swarm_type == "DeepResearchSwarm": - return DeepResearchSwarm( - name=self.name, - description=self.description, - agents=self.agents, - max_loops=self.max_loops, - output_type=self.output_type, - ) + # Create the swarm using the factory function + try: + swarm = factory_func(*args, **kwargs) - elif self.swarm_type == "HiearchicalSwarm": - return HierarchicalSwarm( - name=self.name, - description=self.description, - # director=self.agents[0], - agents=self.agents, - max_loops=self.max_loops, - return_all_history=self.return_entire_history, - output_type=self.output_type, - *args, - **kwargs, - ) - elif self.swarm_type == "MixtureOfAgents": - return MixtureOfAgents( - name=self.name, - description=self.description, - agents=self.agents, - aggregator_agent=self.agents[-1], - layers=self.max_loops, - output_type=self.output_type, - *args, - **kwargs, - ) + # Cache the created swarm for future use + self._swarm_cache[cache_key] = swarm - elif self.swarm_type == "MajorityVoting": - return MajorityVoting( - name=self.name, - description=self.description, - agents=self.agents, - consensus_agent=self.agents[-1], - *args, - **kwargs, - ) - elif self.swarm_type == "GroupChat": - return GroupChat( - name=self.name, - description=self.description, - agents=self.agents, - max_loops=self.max_loops, - speaker_fn=self.speaker_fn, - *args, - **kwargs, + logger.info( + f"Successfully created swarm: {self.swarm_type}" ) + return swarm - elif self.swarm_type == "MultiAgentRouter": - return MultiAgentRouter( - name=self.name, - description=self.description, - agents=self.agents, - shared_memory_system=self.shared_memory_system, - output_type=self.output_type, - ) - elif self.swarm_type == "SpreadSheetSwarm": - return SpreadSheetSwarm( - name=self.name, - description=self.description, - agents=self.agents, - max_loops=self.max_loops, - autosave_on=self.autosave, - *args, - **kwargs, - ) - elif self.swarm_type == "SequentialWorkflow": - return SequentialWorkflow( - name=self.name, - description=self.description, - agents=self.agents, - max_loops=self.max_loops, - shared_memory_system=self.shared_memory_system, - output_type=self.output_type, - return_json=self.return_json, - return_entire_history=self.return_entire_history, - *args, - **kwargs, - ) - elif self.swarm_type == "ConcurrentWorkflow": - return ConcurrentWorkflow( - name=self.name, - description=self.description, - agents=self.agents, - max_loops=self.max_loops, - auto_save=self.autosave, - return_str_on=self.return_entire_history, - output_type=self.output_type, - *args, - **kwargs, - ) - else: - raise ValueError( - f"Invalid swarm type: {self.swarm_type} try again with a valid swarm type such as 'SequentialWorkflow' or 'ConcurrentWorkflow' or 'auto' or 'AgentRearrange' or 'MixtureOfAgents' or 'SpreadSheetSwarm'" + except Exception as e: + logger.error( + f"Failed to create swarm {self.swarm_type}: {str(e)}" ) + raise RuntimeError( + f"Failed to create swarm {self.swarm_type}: {str(e)}" + ) from e def update_system_prompt_for_agent_in_swarm(self): # Use list comprehension for faster iteration @@ -591,10 +725,19 @@ class SwarmRouter: ) return result - except Exception as e: - raise RuntimeError( - f"SwarmRouter: Error executing task on swarm: {str(e)} Traceback: {traceback.format_exc()}. Try reconfiguring the SwarmRouter Settings and or make sure the individual agents are configured correctly." + except SwarmRouterRunError as e: + logger.error( + f"\n[SwarmRouter ERROR] '{self.name}' failed to execute the task on the selected swarm.\n" + f"Reason: {str(e)}\n" + f"Traceback:\n{traceback.format_exc()}\n\n" + "Troubleshooting steps:\n" + " - Double-check your SwarmRouter configuration (swarm_type, agents, parameters).\n" + " - Ensure all individual agents are properly configured and initialized.\n" + " - Review the error message and traceback above for clues.\n\n" + "For detailed documentation on SwarmRouter configuration, usage, and available swarm types, please visit:\n" + " https://docs.swarms.world/en/latest/swarms/structs/swarm_router/\n" ) + raise e def run( self, @@ -631,10 +774,19 @@ class SwarmRouter: *args, **kwargs, ) - except Exception as e: - raise RuntimeError( - f"SwarmRouter: Error executing task on swarm: {str(e)} Traceback: {traceback.format_exc()}" + except SwarmRouterRunError as e: + logger.error( + f"\n[SwarmRouter ERROR] '{self.name}' failed to execute the task on the selected swarm.\n" + f"Reason: {str(e)}\n" + f"Traceback:\n{traceback.format_exc()}\n\n" + "Troubleshooting steps:\n" + " - Double-check your SwarmRouter configuration (swarm_type, agents, parameters).\n" + " - Ensure all individual agents are properly configured and initialized.\n" + " - Review the error message and traceback above for clues.\n\n" + "For detailed documentation on SwarmRouter configuration, usage, and available swarm types, please visit:\n" + " https://docs.swarms.world/en/latest/swarms/structs/swarm_router/\n" ) + raise e def __call__( self,