diff --git a/swarms/structs/heavy_swarm.py b/swarms/structs/heavy_swarm.py index 2edb5078..2a2e59ed 100644 --- a/swarms/structs/heavy_swarm.py +++ b/swarms/structs/heavy_swarm.py @@ -10,22 +10,15 @@ from typing import Dict, List, Optional from loguru import logger from rich.console import Console from rich.panel import Panel -from rich.progress import ( - Progress, - SpinnerColumn, - TextColumn, - TimeElapsedColumn, -) +from rich.progress import (Progress, SpinnerColumn, TextColumn, + TimeElapsedColumn) from rich.table import Table - from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation +from swarms.tools.tool_type import tool_type from swarms.utils.formatter import formatter -from swarms.utils.history_output_formatter import ( - history_output_formatter, -) +from swarms.utils.history_output_formatter import history_output_formatter from swarms.utils.litellm_wrapper import LiteLLM -from swarms.tools.tool_type import tool_type RESEARCH_AGENT_PROMPT = """ You are an expert Research Agent with exceptional capabilities in: @@ -144,7 +137,8 @@ You approach analysis with: - Actionable insights - Clear communication -Provide precise, data-driven analysis with clear implications and recommendations.""" +Provide precise, data-driven analysis with clear implications and +recommendations.""" ALTERNATIVES_AGENT_PROMPT = """ You are an expert Alternatives Agent with exceptional capabilities in: @@ -218,7 +212,8 @@ You approach alternatives generation with: - Implementation focus - Value optimization -Provide innovative, practical, and well-evaluated alternative approaches and solutions. +Provide innovative, practical, and well-evaluated alternative approaches +and solutions. """ @@ -304,7 +299,8 @@ You approach verification with: - Quality focus - Practical realism -Provide thorough, objective verification with clear feasibility assessments and risk evaluations.""" +Provide thorough, objective verification with clear feasibility +assessments and risk evaluations.""" SYNTHESIS_AGENT_PROMPT = """ You are an expert Synthesis Agent with advanced capabilities in: @@ -378,35 +374,54 @@ You approach synthesis with: - Value optimization - Implementation focus -Provide comprehensive, integrated analysis with clear, actionable recommendations and detailed implementation guidance.""" +Provide comprehensive, integrated analysis with clear, actionable +recommendations and detailed implementation guidance.""" schema = { "type": "function", "function": { "name": "generate_specialized_questions", - "description": "Generate 4 specialized questions for different agent roles to comprehensively analyze a given task", + "description": ( + "Generate 4 specialized questions for different agent roles to " + "comprehensively analyze a given task" + ), "parameters": { "type": "object", "properties": { "thinking": { "type": "string", - "description": "Your reasoning process for how to break down this task into 4 specialized questions for different agent roles", + "description": ( + "Your reasoning process for how to break down this task " + "into 4 specialized questions for different agent roles" + ), }, "research_question": { "type": "string", - "description": "A detailed research question for the Research Agent to gather comprehensive background information and data", + "description": ( + "A detailed research question for the Research Agent to " + "gather comprehensive background information and data" + ), }, "analysis_question": { "type": "string", - "description": "An analytical question for the Analysis Agent to examine patterns, trends, and insights", + "description": ( + "An analytical question for the Analysis Agent to examine " + "patterns, trends, and insights" + ), }, "alternatives_question": { "type": "string", - "description": "A strategic question for the Alternatives Agent to explore different approaches, options, and solutions", + "description": ( + "A strategic question for the Alternatives Agent to explore " + "different approaches, options, and solutions" + ), }, "verification_question": { "type": "string", - "description": "A verification question for the Verification Agent to validate findings, check accuracy, and assess feasibility", + "description": ( + "A verification question for the Verification Agent to " + "validate findings, check accuracy, and assess feasibility" + ), }, }, "required": [ @@ -425,9 +440,11 @@ schema = [schema] class HeavySwarm: """ - HeavySwarm is a sophisticated multi-agent orchestration system that decomposes complex tasks - into specialized questions and executes them using four specialized agents: Research, Analysis, - Alternatives, and Verification. The results are then synthesized into a comprehensive response. + HeavySwarm is a sophisticated multi-agent orchestration system that + decomposes complex tasks into specialized questions and executes them + using four specialized agents: Research, Analysis, Alternatives, and + Verification. The results are then synthesized into a comprehensive + response. This swarm architecture provides robust task analysis through: - Intelligent question generation for specialized agent roles @@ -435,17 +452,28 @@ class HeavySwarm: - Comprehensive synthesis of multi-perspective results - Real-time progress monitoring with rich dashboard displays - Reliability checks and validation systems + - Multi-loop iterative refinement with context preservation The HeavySwarm follows a structured workflow: 1. Task decomposition into specialized questions 2. Parallel execution by specialized agents 3. Result synthesis and integration 4. Comprehensive final report generation + 5. Optional iterative refinement through multiple loops + + Key Features: + - **Multi-loop Execution**: The max_loops parameter enables iterative + refinement where each subsequent loop builds upon the context and + results from previous loops + - **Context Preservation**: Conversation history is maintained across + all loops, allowing for deeper analysis and refinement + - **Iterative Refinement**: Each loop can refine, improve, or complete + aspects of the analysis based on previous results Attributes: name (str): Name identifier for the swarm instance description (str): Description of the swarm's purpose - agents (List[Agent]): List of agent instances (currently unused, agents are created internally) + agents (Dict[str, Agent]): Dictionary of specialized agent instances (created internally) timeout (int): Maximum execution time per agent in seconds aggregation_strategy (str): Strategy for result aggregation (currently 'synthesis') loops_per_agent (int): Number of execution loops per agent @@ -455,6 +483,7 @@ class HeavySwarm: max_workers (int): Maximum number of concurrent worker threads show_dashboard (bool): Enable rich dashboard with progress visualization agent_prints_on (bool): Enable individual agent output printing + max_loops (int): Maximum number of execution loops for iterative refinement conversation (Conversation): Conversation history tracker console (Console): Rich console for dashboard output @@ -464,15 +493,20 @@ class HeavySwarm: ... description="Market analysis swarm", ... question_agent_model_name="gpt-4o-mini", ... worker_model_name="gpt-4o-mini", - ... show_dashboard=True + ... show_dashboard=True, + ... max_loops=3 ... ) >>> result = swarm.run("Analyze the current cryptocurrency market trends") + >>> # The swarm will run 3 iterations, each building upon the previous results """ def __init__( self, name: str = "HeavySwarm", - description: str = "A swarm of agents that can analyze a task and generate specialized questions for each agent role", + description: str = ( + "A swarm of agents that can analyze a task and generate " + "specialized questions for each agent role" + ), timeout: int = 300, aggregation_strategy: str = "synthesis", loops_per_agent: int = 1, @@ -483,10 +517,10 @@ class HeavySwarm: show_dashboard: bool = False, agent_prints_on: bool = False, output_type: str = "dict-all-except-first", - worker_tools: tool_type = None, + worker_tools: Optional[tool_type] = None, random_loops_per_agent: bool = False, max_loops: int = 1, - ): + ) -> None: """ Initialize the HeavySwarm with configuration parameters. @@ -499,19 +533,28 @@ class HeavySwarm: timeout (int, optional): Maximum execution time per agent in seconds. Defaults to 300. aggregation_strategy (str, optional): Strategy for aggregating results. Currently only 'synthesis' is supported. Defaults to "synthesis". - loops_per_agent (int, optional): Number of execution loops each agent should perform. - Must be greater than 0. Defaults to 1. - question_agent_model_name (str, optional): Language model for question generation. - Defaults to "gpt-4o-mini". - worker_model_name (str, optional): Language model for specialized worker agents. - Defaults to "gpt-4o-mini". + loops_per_agent (int, optional): Number of execution loops each + agent should perform. Must be greater than 0. Defaults to 1. + question_agent_model_name (str, optional): Language model for + question generation. Defaults to "gpt-4o-mini". + worker_model_name (str, optional): Language model for specialized + worker agents. Defaults to "gpt-4o-mini". verbose (bool, optional): Enable detailed logging and debug output. Defaults to False. - max_workers (int, optional): Maximum concurrent workers for parallel execution. - Defaults to 90% of CPU count. - show_dashboard (bool, optional): Enable rich dashboard with progress visualization. - Defaults to False. - agent_prints_on (bool, optional): Enable individual agent output printing. - Defaults to False. + max_workers (int, optional): Maximum concurrent workers for + parallel execution. Defaults to 90% of CPU count. + show_dashboard (bool, optional): Enable rich dashboard with + progress visualization. Defaults to False. + agent_prints_on (bool, optional): Enable individual agent + output printing. Defaults to False. + output_type (str, optional): Output format type for conversation + history. Defaults to "dict-all-except-first". + worker_tools (tool_type, optional): Tools available to worker + agents for enhanced functionality. Defaults to None. + random_loops_per_agent (bool, optional): Enable random number of + loops per agent (1-10 range). Defaults to False. + max_loops (int, optional): Maximum number of execution loops for + the entire swarm. Each loop builds upon previous results for + iterative refinement. Defaults to 1. Raises: ValueError: If loops_per_agent is 0 or negative @@ -519,7 +562,10 @@ class HeavySwarm: Note: The swarm automatically performs reliability checks during initialization - to ensure all required parameters are properly configured. + to ensure all required parameters are properly configured. The max_loops + parameter enables iterative refinement by allowing the swarm to process + the same task multiple times, with each subsequent loop building upon + the context and results from previous loops. """ self.name = name self.description = description @@ -595,17 +641,11 @@ class HeavySwarm: info_table.add_row("Swarm Name", self.name) info_table.add_row("Description", self.description) info_table.add_row("Timeout", f"{self.timeout}s") - info_table.add_row( - "Loops per Agent", str(self.loops_per_agent) - ) - info_table.add_row( - "Question Model", self.question_agent_model_name - ) + info_table.add_row("Loops per Agent", str(self.loops_per_agent)) + info_table.add_row("Question Model", self.question_agent_model_name) info_table.add_row("Worker Model", self.worker_model_name) info_table.add_row("Max Workers", str(self.max_workers)) - info_table.add_row( - "Aggregation Strategy", self.aggregation_strategy - ) + info_table.add_row("Aggregation Strategy", self.aggregation_strategy) # Display dashboard with professional Arasaka styling self.console.print( @@ -646,9 +686,7 @@ class HeavySwarm: if self.show_dashboard: with Progress( SpinnerColumn(), - TextColumn( - "[progress.description]{task.description}" - ), + TextColumn("[progress.description]{task.description}"), transient=True, console=self.console, ) as progress: @@ -732,17 +770,38 @@ class HeavySwarm: title="Reliability Check", ) - def run(self, task: str, img: str = None): + def run(self, task: str, img: Optional[str] = None) -> str: """ - Execute the complete HeavySwarm orchestration flow. + Execute the complete HeavySwarm orchestration flow with multi-loop functionality. + + This method implements the max_loops feature, allowing the HeavySwarm to iterate + multiple times on the same task, with each subsequent loop building upon the + context and results from previous loops. This enables iterative refinement and + deeper analysis of complex tasks. + + The method follows this workflow: + 1. For the first loop: Execute the original task with full HeavySwarm orchestration + 2. For subsequent loops: Combine previous results with original task as context + 3. Maintain conversation history across all loops for context preservation + 4. Return the final synthesized result from the last loop Args: - task (str): The main task to analyze - img (str, optional): Image input if needed + task (str): The main task to analyze and iterate upon + img (str, optional): Image input if needed for visual analysis tasks Returns: - str: Comprehensive final answer from synthesis agent + str: Comprehensive final answer from synthesis agent after all loops complete + + Note: + The max_loops parameter controls how many iterations the swarm will perform. + Each loop builds upon the previous results, enabling iterative refinement. """ + if self.verbose: + logger.info("Starting HeavySwarm execution") + + current_loop = 0 + last_output = None + if self.show_dashboard: self.console.print( Panel( @@ -754,151 +813,209 @@ class HeavySwarm: ) self.console.print() + # Add initial task to conversation self.conversation.add( role="User", content=task, category="input", ) - # Question generation with dashboard - if self.show_dashboard: - with Progress( - SpinnerColumn(), - TextColumn( - "[progress.description]{task.description}" - ), - transient=True, - console=self.console, - ) as progress: - task_gen = progress.add_task( - "[red]⚡ GENERATING SPECIALIZED QUESTIONS...", - total=100, - ) - progress.update(task_gen, advance=30) - questions = self.execute_question_generation(task) - progress.update( - task_gen, - advance=70, - description="[white]✓ QUESTIONS GENERATED SUCCESSFULLY!", - ) - time.sleep(0.5) - else: - questions = self.execute_question_generation(task) - - # if self.show_dashboard: - # # Create questions table - # questions_table = Table( - # title="⚡ GENERATED QUESTIONS FOR SPECIALIZED AGENTS", - # show_header=True, - # header_style="bold red", - # ) - # questions_table.add_column( - # "Agent", style="white", width=20 - # ) - # questions_table.add_column( - # "Specialized Question", style="bright_white", width=60 - # ) - - # questions_table.add_row( - # "Agent 1", - # questions.get("research_question", "N/A"), - # ) - # questions_table.add_row( - # "Agent 2", - # questions.get("analysis_question", "N/A"), - # ) - # questions_table.add_row( - # "Agent 3", - # questions.get("alternatives_question", "N/A"), - # ) - # questions_table.add_row( - # "Agent 4", - # questions.get("verification_question", "N/A"), - # ) - - # self.console.print( - # Panel( - # questions_table, - # title="[bold red]QUESTION GENERATION COMPLETE[/bold red]", - # border_style="red", - # ) - # ) - # self.console.print() - # else: - # formatter.print_panel( - # content=json.dumps(questions, indent=4), - # title="Questions", - # ) + # Main execution loop with comprehensive error handling + try: + while current_loop < self.max_loops: + try: + if self.verbose: + logger.info("Processing task iteration") + + # No additional per-loop panels; keep dashboard output minimal and original-style + + # Determine the task for this loop + if current_loop == 0: + # First loop: use the original task + loop_task = task + if self.verbose: + logger.info("First loop: Using original task") + else: + # Subsequent loops: combine previous results with original task + loop_task = ( + f"Previous loop results: {last_output}\n\n" + f"Original task: {task}\n\n" + "Based on the previous results and analysis, continue with the next iteration. " + "Refine, improve, or complete any remaining aspects of the analysis. " + "Build upon the insights from the previous loop to provide deeper analysis." + ) + if self.verbose: + logger.info( + "Subsequent loop: Building upon previous results" + ) - self.conversation.add( - role="Question Generator Agent", - content=questions, - category="output", - ) + # Question generation with dashboard + try: + if self.show_dashboard: + with Progress( + SpinnerColumn(), + TextColumn( + "[progress.description]{task.description}" + ), + transient=True, + console=self.console, + ) as progress: + task_gen = progress.add_task( + "[red]⚡ GENERATING SPECIALIZED QUESTIONS...", + total=100, + ) + progress.update(task_gen, advance=30) + questions = self.execute_question_generation( + loop_task + ) + progress.update( + task_gen, + advance=70, + description="[white]✓ QUESTIONS GENERATED SUCCESSFULLY!", + ) + time.sleep(0.5) + else: + questions = self.execute_question_generation( + loop_task + ) + + self.conversation.add( + role="Question Generator Agent", + content=questions, + category="output", + ) - if "error" in questions: - return ( - f"Error in question generation: {questions['error']}" - ) + if "error" in questions: + error_msg = f"Error in question generation: {questions['error']}" + logger.error(error_msg) + return error_msg - if self.show_dashboard: - self.console.print( - Panel( - "[bold red]⚡ LAUNCHING SPECIALIZED AGENTS[/bold red]\n" - "[white]Executing 4 agents in parallel for comprehensive analysis[/white]", - title="[bold red]AGENT EXECUTION PHASE[/bold red]", - border_style="red", - ) - ) + except Exception as e: + error_msg = f"Failed to generate questions in loop {current_loop + 1}: {str(e)}" + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg - agent_results = self._execute_agents_parallel( - questions=questions, agents=self.agents, img=img - ) + # Agent execution phase + try: + if self.show_dashboard: + self.console.print( + Panel( + "[bold red]⚡ LAUNCHING SPECIALIZED AGENTS[/bold red]\n" + "[white]Executing 4 agents in parallel for comprehensive analysis[/white]", + title="[bold red]AGENT EXECUTION PHASE[/bold red]", + border_style="red", + ) + ) + + agent_results = self._execute_agents_parallel( + questions=questions, + agents=self.agents, + img=img, + ) - # Synthesis with dashboard - if self.show_dashboard: - with Progress( - SpinnerColumn(), - TextColumn( - "[progress.description]{task.description}" - ), - TimeElapsedColumn(), - console=self.console, - ) as progress: - synthesis_task = progress.add_task( - "[red]Agent 5: SYNTHESIZING COMPREHENSIVE ANALYSIS ••••••••••••••••••••••••••••••••", - total=None, - ) + except Exception as e: + error_msg = f"Failed to execute agents in loop {current_loop + 1}: {str(e)}" + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg - progress.update( - synthesis_task, - description="[red]Agent 5: INTEGRATING AGENT RESULTS ••••••••••••••••••••••••••••••••", - ) - time.sleep(0.5) + # Synthesis phase + try: + if self.show_dashboard: + with Progress( + SpinnerColumn(), + TextColumn( + "[progress.description]{task.description}" + ), + TimeElapsedColumn(), + console=self.console, + ) as progress: + synthesis_task = progress.add_task( + "[red]Agent 5: SYNTHESIZING COMPREHENSIVE ANALYSIS ••••••••••••••••••••••••••••••••", + total=None, + ) + + progress.update( + synthesis_task, + description="[red]Agent 5: INTEGRATING AGENT RESULTS ••••••••••••••••••••••••••••••••", + ) + time.sleep(0.5) + + progress.update( + synthesis_task, + description="[red]Agent 5: Summarizing Results ••••••••••••••••••••••••••••••••", + ) + + final_result = self._synthesize_results( + original_task=loop_task, + questions=questions, + agent_results=agent_results, + ) + + progress.update( + synthesis_task, + description="[white]Agent 5: GENERATING FINAL REPORT ••••••••••••••••••••••••••••••••", + ) + time.sleep(0.3) + + progress.update( + synthesis_task, + description="[bold white]Agent 5: COMPLETE! ••••••••••••••••••••••••••••••••", + ) + time.sleep(0.5) + + self.console.print( + Panel( + "[bold red]⚡ HEAVYSWARM ANALYSIS COMPLETE![/bold red]\n" + "[white]Comprehensive multi-agent analysis delivered successfully[/white]", + title="[bold red]MISSION ACCOMPLISHED[/bold red]", + border_style="red", + ) + ) + self.console.print() + else: + final_result = self._synthesize_results( + original_task=loop_task, + questions=questions, + agent_results=agent_results, + ) + + self.conversation.add( + role="Synthesis Agent", + content=final_result, + category="output", + ) - progress.update( - synthesis_task, - description="[red]Agent 5: Summarizing Results ••••••••••••••••••••••••••••••••", - ) + except Exception as e: + error_msg = f"Failed to synthesize results in loop {current_loop + 1}: {str(e)}" + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg - final_result = self._synthesize_results( - original_task=task, - questions=questions, - agent_results=agent_results, - ) + # Store the result for next loop context + last_output = final_result + current_loop += 1 - progress.update( - synthesis_task, - description="[white]Agent 5: GENERATING FINAL REPORT ••••••••••••••••••••••••••••••••", - ) - time.sleep(0.3) + if self.verbose: + logger.success("Task iteration completed successfully") - progress.update( - synthesis_task, - description="[bold white]Agent 5: ✅ COMPLETE! ••••••••••••••••••••••••••••••••", - ) - time.sleep(0.5) + except Exception as e: + error_msg = ( + f"Failed to execute loop {current_loop + 1}: {str(e)}" + ) + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg + + except Exception as e: + error_msg = f"Critical error in HeavySwarm execution: {str(e)}" + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg + # Final completion message + if self.show_dashboard: self.console.print( Panel( "[bold red]⚡ HEAVYSWARM ANALYSIS COMPLETE![/bold red]\n" @@ -908,18 +1025,9 @@ class HeavySwarm: ) ) self.console.print() - else: - final_result = self._synthesize_results( - original_task=task, - questions=questions, - agent_results=agent_results, - ) - self.conversation.add( - role="Synthesis Agent", - content=final_result, - category="output", - ) + if self.verbose: + logger.success("HeavySwarm execution completed") return history_output_formatter( conversation=self.conversation, @@ -1080,9 +1188,7 @@ class HeavySwarm: """ if self.show_dashboard: - return self._execute_agents_with_dashboard( - questions, agents, img - ) + return self._execute_agents_with_dashboard(questions, agents, img) else: return self._execute_agents_basic(questions, agents, img) @@ -1180,14 +1286,10 @@ class HeavySwarm: } # Collect results as they complete - for future in concurrent.futures.as_completed( - future_to_agent - ): + for future in concurrent.futures.as_completed(future_to_agent): agent_type = future_to_agent[future] try: - agent_name, result = future.result( - timeout=self.timeout - ) + agent_name, result = future.result(timeout=self.timeout) results[agent_name.lower()] = result except concurrent.futures.TimeoutError: logger.error( @@ -1200,9 +1302,7 @@ class HeavySwarm: logger.error( f"❌ Exception in {agent_type} Agent: {str(e)}" ) - results[agent_type.lower()] = ( - f"Exception: {str(e)}" - ) + results[agent_type.lower()] = f"Exception: {str(e)}" return results @@ -1393,16 +1493,12 @@ class HeavySwarm: ) as executor: # Submit all agent tasks future_to_agent = { - executor.submit( - execute_agent_with_progress, task - ): task[1] + executor.submit(execute_agent_with_progress, task): task[1] for task in agent_tasks } # Collect results as they complete - for future in concurrent.futures.as_completed( - future_to_agent - ): + for future in concurrent.futures.as_completed(future_to_agent): agent_key = future_to_agent[future] try: agent_name, result = future.result( @@ -1485,11 +1581,11 @@ class HeavySwarm: - Use bullet points, numbered lists, and section headings where appropriate for clarity and readability. You may reference the conversation history for additional context: - + \n\n - + {self.conversation.return_history_as_string()} - + \n\n Please present your synthesis in the following structure: @@ -1555,12 +1651,8 @@ class HeavySwarm: return { "thinking": arguments.get("thinking", ""), - "research_question": arguments.get( - "research_question", "" - ), - "analysis_question": arguments.get( - "analysis_question", "" - ), + "research_question": arguments.get("research_question", ""), + "analysis_question": arguments.get("analysis_question", ""), "alternatives_question": arguments.get( "alternatives_question", "" ), @@ -1579,9 +1671,7 @@ class HeavySwarm: "function_name": tool_call.function.name, } - def execute_question_generation( - self, task: str - ) -> Dict[str, str]: + def execute_question_generation(self, task: str) -> Dict[str, str]: """ Execute the question generation using the schema with a language model. @@ -1597,7 +1687,7 @@ class HeavySwarm: You are an expert task analyzer. Your job is to break down the following task into 4 specialized questions for different agent roles: 1. Research Agent: Focuses on gathering information, data, and background context - 2. Analysis Agent: Focuses on examining patterns, trends, and deriving insights + 2. Analysis Agent: Focuses on examining patterns, trends, and deriving insights 3. Alternatives Agent: Focuses on exploring different approaches and solutions 4. Verification Agent: Focuses on validating findings and checking feasibility @@ -1671,12 +1761,8 @@ class HeavySwarm: return { "research_question": result.get("research_question", ""), "analysis_question": result.get("analysis_question", ""), - "alternatives_question": result.get( - "alternatives_question", "" - ), - "verification_question": result.get( - "verification_question", "" - ), + "alternatives_question": result.get("alternatives_question", ""), + "verification_question": result.get("verification_question", ""), } def get_questions_as_list(self, task: str) -> List[str]: