diff --git a/swarms/structs/heavy_swarm.py b/swarms/structs/heavy_swarm.py index 2edb5078..6c49d076 100644 --- a/swarms/structs/heavy_swarm.py +++ b/swarms/structs/heavy_swarm.py @@ -435,17 +435,27 @@ class HeavySwarm: - Comprehensive synthesis of multi-perspective results - Real-time progress monitoring with rich dashboard displays - Reliability checks and validation systems + - Multi-loop iterative refinement with context preservation The HeavySwarm follows a structured workflow: 1. Task decomposition into specialized questions 2. Parallel execution by specialized agents 3. Result synthesis and integration 4. Comprehensive final report generation + 5. Optional iterative refinement through multiple loops + + Key Features: + - **Multi-loop Execution**: The max_loops parameter enables iterative refinement where + each subsequent loop builds upon the context and results from previous loops + - **Context Preservation**: Conversation history is maintained across all loops, + allowing for deeper analysis and refinement + - **Iterative Refinement**: Each loop can refine, improve, or complete aspects + of the analysis based on previous results Attributes: name (str): Name identifier for the swarm instance description (str): Description of the swarm's purpose - agents (List[Agent]): List of agent instances (currently unused, agents are created internally) + agents (Dict[str, Agent]): Dictionary of specialized agent instances (created internally) timeout (int): Maximum execution time per agent in seconds aggregation_strategy (str): Strategy for result aggregation (currently 'synthesis') loops_per_agent (int): Number of execution loops per agent @@ -455,6 +465,7 @@ class HeavySwarm: max_workers (int): Maximum number of concurrent worker threads show_dashboard (bool): Enable rich dashboard with progress visualization agent_prints_on (bool): Enable individual agent output printing + max_loops (int): Maximum number of execution loops for iterative refinement conversation (Conversation): Conversation history tracker console (Console): Rich console for dashboard output @@ -464,9 +475,11 @@ class HeavySwarm: ... description="Market analysis swarm", ... question_agent_model_name="gpt-4o-mini", ... worker_model_name="gpt-4o-mini", - ... show_dashboard=True + ... show_dashboard=True, + ... max_loops=3 ... ) >>> result = swarm.run("Analyze the current cryptocurrency market trends") + >>> # The swarm will run 3 iterations, each building upon the previous results """ def __init__( @@ -483,10 +496,10 @@ class HeavySwarm: show_dashboard: bool = False, agent_prints_on: bool = False, output_type: str = "dict-all-except-first", - worker_tools: tool_type = None, + worker_tools: Optional[tool_type] = None, random_loops_per_agent: bool = False, max_loops: int = 1, - ): + ) -> None: """ Initialize the HeavySwarm with configuration parameters. @@ -512,6 +525,14 @@ class HeavySwarm: Defaults to False. agent_prints_on (bool, optional): Enable individual agent output printing. Defaults to False. + output_type (str, optional): Output format type for conversation history. + Defaults to "dict-all-except-first". + worker_tools (tool_type, optional): Tools available to worker agents for enhanced functionality. + Defaults to None. + random_loops_per_agent (bool, optional): Enable random number of loops per agent (1-10 range). + Defaults to False. + max_loops (int, optional): Maximum number of execution loops for the entire swarm. + Each loop builds upon previous results for iterative refinement. Defaults to 1. Raises: ValueError: If loops_per_agent is 0 or negative @@ -519,7 +540,10 @@ class HeavySwarm: Note: The swarm automatically performs reliability checks during initialization - to ensure all required parameters are properly configured. + to ensure all required parameters are properly configured. The max_loops + parameter enables iterative refinement by allowing the swarm to process + the same task multiple times, with each subsequent loop building upon + the context and results from previous loops. """ self.name = name self.description = description @@ -732,194 +756,264 @@ class HeavySwarm: title="Reliability Check", ) - def run(self, task: str, img: str = None): + def run(self, task: str, img: Optional[str] = None) -> str: """ - Execute the complete HeavySwarm orchestration flow. + Execute the complete HeavySwarm orchestration flow with multi-loop functionality. + + This method implements the max_loops feature, allowing the HeavySwarm to iterate + multiple times on the same task, with each subsequent loop building upon the + context and results from previous loops. This enables iterative refinement and + deeper analysis of complex tasks. + + The method follows this workflow: + 1. For the first loop: Execute the original task with full HeavySwarm orchestration + 2. For subsequent loops: Combine previous results with original task as context + 3. Maintain conversation history across all loops for context preservation + 4. Return the final synthesized result from the last loop Args: - task (str): The main task to analyze - img (str, optional): Image input if needed + task (str): The main task to analyze and iterate upon + img (str, optional): Image input if needed for visual analysis tasks Returns: - str: Comprehensive final answer from synthesis agent + str: Comprehensive final answer from synthesis agent after all loops complete + + Note: + The max_loops parameter controls how many iterations the swarm will perform. + Each loop builds upon the previous results, enabling iterative refinement. """ + if self.verbose: + logger.info(f"Starting HeavySwarm execution with {self.max_loops} loops") + + current_loop = 0 + last_output = None + if self.show_dashboard: self.console.print( Panel( - f"[bold red]⚡ Completing Task[/bold red]\n" - f"[white]Task: {task}[/white]", + f"[bold red]⚡ HEAVYSWARM MULTI-LOOP EXECUTION[/bold red]\n" + f"[white]Task: {task}[/white]\n" + f"[white]Max Loops: {self.max_loops}[/white]", title="[bold red]Initializing HeavySwarm[/bold red]", border_style="red", ) ) self.console.print() + # Add initial task to conversation self.conversation.add( role="User", content=task, category="input", ) - # Question generation with dashboard - if self.show_dashboard: - with Progress( - SpinnerColumn(), - TextColumn( - "[progress.description]{task.description}" - ), - transient=True, - console=self.console, - ) as progress: - task_gen = progress.add_task( - "[red]⚡ GENERATING SPECIALIZED QUESTIONS...", - total=100, - ) - progress.update(task_gen, advance=30) - questions = self.execute_question_generation(task) - progress.update( - task_gen, - advance=70, - description="[white]✓ QUESTIONS GENERATED SUCCESSFULLY!", - ) - time.sleep(0.5) - else: - questions = self.execute_question_generation(task) - - # if self.show_dashboard: - # # Create questions table - # questions_table = Table( - # title="⚡ GENERATED QUESTIONS FOR SPECIALIZED AGENTS", - # show_header=True, - # header_style="bold red", - # ) - # questions_table.add_column( - # "Agent", style="white", width=20 - # ) - # questions_table.add_column( - # "Specialized Question", style="bright_white", width=60 - # ) - - # questions_table.add_row( - # "Agent 1", - # questions.get("research_question", "N/A"), - # ) - # questions_table.add_row( - # "Agent 2", - # questions.get("analysis_question", "N/A"), - # ) - # questions_table.add_row( - # "Agent 3", - # questions.get("alternatives_question", "N/A"), - # ) - # questions_table.add_row( - # "Agent 4", - # questions.get("verification_question", "N/A"), - # ) - - # self.console.print( - # Panel( - # questions_table, - # title="[bold red]QUESTION GENERATION COMPLETE[/bold red]", - # border_style="red", - # ) - # ) - # self.console.print() - # else: - # formatter.print_panel( - # content=json.dumps(questions, indent=4), - # title="Questions", - # ) + # Main execution loop with comprehensive error handling + try: + while current_loop < self.max_loops: + try: + if self.verbose: + logger.info(f"Executing loop {current_loop + 1}/{self.max_loops}") + + if self.show_dashboard: + self.console.print( + Panel( + f"[bold red]⚡ LOOP {current_loop + 1}/{self.max_loops}[/bold red]\n" + f"[white]Processing task iteration[/white]", + title="[bold red]LOOP EXECUTION[/bold red]", + border_style="red", + ) + ) - self.conversation.add( - role="Question Generator Agent", - content=questions, - category="output", - ) + # Determine the task for this loop + if current_loop == 0: + # First loop: use the original task + loop_task = task + if self.verbose: + logger.info("First loop: Using original task") + else: + # Subsequent loops: combine previous results with original task + loop_task = ( + f"Previous loop results: {last_output}\n\n" + f"Original task: {task}\n\n" + "Based on the previous results and analysis, continue with the next iteration. " + "Refine, improve, or complete any remaining aspects of the analysis. " + "Build upon the insights from the previous loop to provide deeper analysis." + ) + if self.verbose: + logger.info("Subsequent loop: Building upon previous results") - if "error" in questions: - return ( - f"Error in question generation: {questions['error']}" - ) + # Question generation with dashboard + try: + if self.show_dashboard: + with Progress( + SpinnerColumn(), + TextColumn( + "[progress.description]{task.description}" + ), + transient=True, + console=self.console, + ) as progress: + task_gen = progress.add_task( + f"[red]⚡ LOOP {current_loop + 1}: GENERATING SPECIALIZED QUESTIONS...", + total=100, + ) + progress.update(task_gen, advance=30) + questions = self.execute_question_generation(loop_task) + progress.update( + task_gen, + advance=70, + description="[white]✓ QUESTIONS GENERATED SUCCESSFULLY!", + ) + time.sleep(0.5) + else: + questions = self.execute_question_generation(loop_task) + + self.conversation.add( + role="Question Generator Agent", + content=questions, + category="output", + ) - if self.show_dashboard: - self.console.print( - Panel( - "[bold red]⚡ LAUNCHING SPECIALIZED AGENTS[/bold red]\n" - "[white]Executing 4 agents in parallel for comprehensive analysis[/white]", - title="[bold red]AGENT EXECUTION PHASE[/bold red]", - border_style="red", - ) - ) + if "error" in questions: + error_msg = f"Error in question generation: {questions['error']}" + logger.error(error_msg) + return error_msg - agent_results = self._execute_agents_parallel( - questions=questions, agents=self.agents, img=img - ) + except Exception as e: + error_msg = f"Failed to generate questions in loop {current_loop + 1}: {str(e)}" + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg - # Synthesis with dashboard - if self.show_dashboard: - with Progress( - SpinnerColumn(), - TextColumn( - "[progress.description]{task.description}" - ), - TimeElapsedColumn(), - console=self.console, - ) as progress: - synthesis_task = progress.add_task( - "[red]Agent 5: SYNTHESIZING COMPREHENSIVE ANALYSIS ••••••••••••••••••••••••••••••••", - total=None, - ) + # Agent execution phase + try: + if self.show_dashboard: + self.console.print( + Panel( + f"[bold red]LOOP {current_loop + 1}: LAUNCHING SPECIALIZED AGENTS[/bold red]\n" + "[white]Executing 4 agents in parallel for comprehensive analysis[/white]", + title="[bold red]AGENT EXECUTION PHASE[/bold red]", + border_style="red", + ) + ) + + agent_results = self._execute_agents_parallel( + questions=questions, agents=self.agents, img=img + ) - progress.update( - synthesis_task, - description="[red]Agent 5: INTEGRATING AGENT RESULTS ••••••••••••••••••••••••••••••••", - ) - time.sleep(0.5) + except Exception as e: + error_msg = f"Failed to execute agents in loop {current_loop + 1}: {str(e)}" + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg - progress.update( - synthesis_task, - description="[red]Agent 5: Summarizing Results ••••••••••••••••••••••••••••••••", - ) + # Synthesis phase + try: + if self.show_dashboard: + with Progress( + SpinnerColumn(), + TextColumn( + "[progress.description]{task.description}" + ), + TimeElapsedColumn(), + console=self.console, + ) as progress: + synthesis_task = progress.add_task( + f"[red]LOOP {current_loop + 1} - Agent 5: SYNTHESIZING COMPREHENSIVE ANALYSIS", + total=None, + ) + + progress.update( + synthesis_task, + description=f"[red]LOOP {current_loop + 1} - Agent 5: INTEGRATING AGENT RESULTS", + ) + time.sleep(0.5) + + progress.update( + synthesis_task, + description=f"[red]LOOP {current_loop + 1} - Agent 5: Summarizing Results", + ) + + final_result = self._synthesize_results( + original_task=loop_task, + questions=questions, + agent_results=agent_results, + ) + + progress.update( + synthesis_task, + description=f"[white]LOOP {current_loop + 1} - Agent 5: GENERATING FINAL REPORT", + ) + time.sleep(0.3) + + progress.update( + synthesis_task, + description=f"[bold white]LOOP {current_loop + 1} - Agent 5: COMPLETE!", + ) + time.sleep(0.5) + + self.console.print( + Panel( + f"[bold red]LOOP {current_loop + 1} ANALYSIS COMPLETE![/bold red]\n" + "[white]Comprehensive multi-agent analysis delivered successfully[/white]", + title="[bold red]LOOP COMPLETED[/bold red]", + border_style="red", + ) + ) + self.console.print() + else: + final_result = self._synthesize_results( + original_task=loop_task, + questions=questions, + agent_results=agent_results, + ) + + self.conversation.add( + role="Synthesis Agent", + content=final_result, + category="output", + ) - final_result = self._synthesize_results( - original_task=task, - questions=questions, - agent_results=agent_results, - ) + except Exception as e: + error_msg = f"Failed to synthesize results in loop {current_loop + 1}: {str(e)}" + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg - progress.update( - synthesis_task, - description="[white]Agent 5: GENERATING FINAL REPORT ••••••••••••••••••••••••••••••••", - ) - time.sleep(0.3) + # Store the result for next loop context + last_output = final_result + current_loop += 1 - progress.update( - synthesis_task, - description="[bold white]Agent 5: ✅ COMPLETE! ••••••••••••••••••••••••••••••••", - ) - time.sleep(0.5) + if self.verbose: + logger.success(f"Loop {current_loop} completed successfully") + except Exception as e: + error_msg = f"Failed to execute loop {current_loop + 1}: {str(e)}" + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg + + except Exception as e: + error_msg = f"Critical error in HeavySwarm execution: {str(e)}" + logger.error(error_msg) + logger.error(f"Traceback: {traceback.format_exc()}") + return error_msg + + # Final completion message + if self.show_dashboard: self.console.print( Panel( - "[bold red]⚡ HEAVYSWARM ANALYSIS COMPLETE![/bold red]\n" - "[white]Comprehensive multi-agent analysis delivered successfully[/white]", + "[bold red]ALL LOOPS COMPLETED![/bold red]\n" + "[white]HeavySwarm multi-loop analysis finished successfully[/white]", title="[bold red]MISSION ACCOMPLISHED[/bold red]", border_style="red", ) ) self.console.print() - else: - final_result = self._synthesize_results( - original_task=task, - questions=questions, - agent_results=agent_results, - ) - self.conversation.add( - role="Synthesis Agent", - content=final_result, - category="output", - ) + if self.verbose: + logger.success(f"HeavySwarm execution completed after {self.max_loops} loops") + return history_output_formatter( conversation=self.conversation,