Update heavy_swarm.py

pull/1089/head
CI-DEV 2 weeks ago committed by GitHub
parent f14b282a27
commit 2285bd9aaa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -435,17 +435,27 @@ class HeavySwarm:
- Comprehensive synthesis of multi-perspective results
- Real-time progress monitoring with rich dashboard displays
- Reliability checks and validation systems
- Multi-loop iterative refinement with context preservation
The HeavySwarm follows a structured workflow:
1. Task decomposition into specialized questions
2. Parallel execution by specialized agents
3. Result synthesis and integration
4. Comprehensive final report generation
5. Optional iterative refinement through multiple loops
Key Features:
- **Multi-loop Execution**: The max_loops parameter enables iterative refinement where
each subsequent loop builds upon the context and results from previous loops
- **Context Preservation**: Conversation history is maintained across all loops,
allowing for deeper analysis and refinement
- **Iterative Refinement**: Each loop can refine, improve, or complete aspects
of the analysis based on previous results
Attributes:
name (str): Name identifier for the swarm instance
description (str): Description of the swarm's purpose
agents (List[Agent]): List of agent instances (currently unused, agents are created internally)
agents (Dict[str, Agent]): Dictionary of specialized agent instances (created internally)
timeout (int): Maximum execution time per agent in seconds
aggregation_strategy (str): Strategy for result aggregation (currently 'synthesis')
loops_per_agent (int): Number of execution loops per agent
@ -455,6 +465,7 @@ class HeavySwarm:
max_workers (int): Maximum number of concurrent worker threads
show_dashboard (bool): Enable rich dashboard with progress visualization
agent_prints_on (bool): Enable individual agent output printing
max_loops (int): Maximum number of execution loops for iterative refinement
conversation (Conversation): Conversation history tracker
console (Console): Rich console for dashboard output
@ -464,9 +475,11 @@ class HeavySwarm:
... description="Market analysis swarm",
... question_agent_model_name="gpt-4o-mini",
... worker_model_name="gpt-4o-mini",
... show_dashboard=True
... show_dashboard=True,
... max_loops=3
... )
>>> result = swarm.run("Analyze the current cryptocurrency market trends")
>>> # The swarm will run 3 iterations, each building upon the previous results
"""
def __init__(
@ -483,10 +496,10 @@ class HeavySwarm:
show_dashboard: bool = False,
agent_prints_on: bool = False,
output_type: str = "dict-all-except-first",
worker_tools: tool_type = None,
worker_tools: Optional[tool_type] = None,
random_loops_per_agent: bool = False,
max_loops: int = 1,
):
) -> None:
"""
Initialize the HeavySwarm with configuration parameters.
@ -512,6 +525,14 @@ class HeavySwarm:
Defaults to False.
agent_prints_on (bool, optional): Enable individual agent output printing.
Defaults to False.
output_type (str, optional): Output format type for conversation history.
Defaults to "dict-all-except-first".
worker_tools (tool_type, optional): Tools available to worker agents for enhanced functionality.
Defaults to None.
random_loops_per_agent (bool, optional): Enable random number of loops per agent (1-10 range).
Defaults to False.
max_loops (int, optional): Maximum number of execution loops for the entire swarm.
Each loop builds upon previous results for iterative refinement. Defaults to 1.
Raises:
ValueError: If loops_per_agent is 0 or negative
@ -519,7 +540,10 @@ class HeavySwarm:
Note:
The swarm automatically performs reliability checks during initialization
to ensure all required parameters are properly configured.
to ensure all required parameters are properly configured. The max_loops
parameter enables iterative refinement by allowing the swarm to process
the same task multiple times, with each subsequent loop building upon
the context and results from previous loops.
"""
self.name = name
self.description = description
@ -732,194 +756,264 @@ class HeavySwarm:
title="Reliability Check",
)
def run(self, task: str, img: str = None):
def run(self, task: str, img: Optional[str] = None) -> str:
"""
Execute the complete HeavySwarm orchestration flow.
Execute the complete HeavySwarm orchestration flow with multi-loop functionality.
This method implements the max_loops feature, allowing the HeavySwarm to iterate
multiple times on the same task, with each subsequent loop building upon the
context and results from previous loops. This enables iterative refinement and
deeper analysis of complex tasks.
The method follows this workflow:
1. For the first loop: Execute the original task with full HeavySwarm orchestration
2. For subsequent loops: Combine previous results with original task as context
3. Maintain conversation history across all loops for context preservation
4. Return the final synthesized result from the last loop
Args:
task (str): The main task to analyze
img (str, optional): Image input if needed
task (str): The main task to analyze and iterate upon
img (str, optional): Image input if needed for visual analysis tasks
Returns:
str: Comprehensive final answer from synthesis agent
str: Comprehensive final answer from synthesis agent after all loops complete
Note:
The max_loops parameter controls how many iterations the swarm will perform.
Each loop builds upon the previous results, enabling iterative refinement.
"""
if self.verbose:
logger.info(f"Starting HeavySwarm execution with {self.max_loops} loops")
current_loop = 0
last_output = None
if self.show_dashboard:
self.console.print(
Panel(
f"[bold red]⚡ Completing Task[/bold red]\n"
f"[white]Task: {task}[/white]",
f"[bold red]⚡ HEAVYSWARM MULTI-LOOP EXECUTION[/bold red]\n"
f"[white]Task: {task}[/white]\n"
f"[white]Max Loops: {self.max_loops}[/white]",
title="[bold red]Initializing HeavySwarm[/bold red]",
border_style="red",
)
)
self.console.print()
# Add initial task to conversation
self.conversation.add(
role="User",
content=task,
category="input",
)
# Question generation with dashboard
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
transient=True,
console=self.console,
) as progress:
task_gen = progress.add_task(
"[red]⚡ GENERATING SPECIALIZED QUESTIONS...",
total=100,
)
progress.update(task_gen, advance=30)
questions = self.execute_question_generation(task)
progress.update(
task_gen,
advance=70,
description="[white]✓ QUESTIONS GENERATED SUCCESSFULLY!",
)
time.sleep(0.5)
else:
questions = self.execute_question_generation(task)
# if self.show_dashboard:
# # Create questions table
# questions_table = Table(
# title="⚡ GENERATED QUESTIONS FOR SPECIALIZED AGENTS",
# show_header=True,
# header_style="bold red",
# )
# questions_table.add_column(
# "Agent", style="white", width=20
# )
# questions_table.add_column(
# "Specialized Question", style="bright_white", width=60
# )
# questions_table.add_row(
# "Agent 1",
# questions.get("research_question", "N/A"),
# )
# questions_table.add_row(
# "Agent 2",
# questions.get("analysis_question", "N/A"),
# )
# questions_table.add_row(
# "Agent 3",
# questions.get("alternatives_question", "N/A"),
# )
# questions_table.add_row(
# "Agent 4",
# questions.get("verification_question", "N/A"),
# )
# self.console.print(
# Panel(
# questions_table,
# title="[bold red]QUESTION GENERATION COMPLETE[/bold red]",
# border_style="red",
# )
# )
# self.console.print()
# else:
# formatter.print_panel(
# content=json.dumps(questions, indent=4),
# title="Questions",
# )
# Main execution loop with comprehensive error handling
try:
while current_loop < self.max_loops:
try:
if self.verbose:
logger.info(f"Executing loop {current_loop + 1}/{self.max_loops}")
if self.show_dashboard:
self.console.print(
Panel(
f"[bold red]⚡ LOOP {current_loop + 1}/{self.max_loops}[/bold red]\n"
f"[white]Processing task iteration[/white]",
title="[bold red]LOOP EXECUTION[/bold red]",
border_style="red",
)
)
self.conversation.add(
role="Question Generator Agent",
content=questions,
category="output",
)
# Determine the task for this loop
if current_loop == 0:
# First loop: use the original task
loop_task = task
if self.verbose:
logger.info("First loop: Using original task")
else:
# Subsequent loops: combine previous results with original task
loop_task = (
f"Previous loop results: {last_output}\n\n"
f"Original task: {task}\n\n"
"Based on the previous results and analysis, continue with the next iteration. "
"Refine, improve, or complete any remaining aspects of the analysis. "
"Build upon the insights from the previous loop to provide deeper analysis."
)
if self.verbose:
logger.info("Subsequent loop: Building upon previous results")
if "error" in questions:
return (
f"Error in question generation: {questions['error']}"
)
# Question generation with dashboard
try:
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
transient=True,
console=self.console,
) as progress:
task_gen = progress.add_task(
f"[red]⚡ LOOP {current_loop + 1}: GENERATING SPECIALIZED QUESTIONS...",
total=100,
)
progress.update(task_gen, advance=30)
questions = self.execute_question_generation(loop_task)
progress.update(
task_gen,
advance=70,
description="[white]✓ QUESTIONS GENERATED SUCCESSFULLY!",
)
time.sleep(0.5)
else:
questions = self.execute_question_generation(loop_task)
self.conversation.add(
role="Question Generator Agent",
content=questions,
category="output",
)
if self.show_dashboard:
self.console.print(
Panel(
"[bold red]⚡ LAUNCHING SPECIALIZED AGENTS[/bold red]\n"
"[white]Executing 4 agents in parallel for comprehensive analysis[/white]",
title="[bold red]AGENT EXECUTION PHASE[/bold red]",
border_style="red",
)
)
if "error" in questions:
error_msg = f"Error in question generation: {questions['error']}"
logger.error(error_msg)
return error_msg
agent_results = self._execute_agents_parallel(
questions=questions, agents=self.agents, img=img
)
except Exception as e:
error_msg = f"Failed to generate questions in loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
# Synthesis with dashboard
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
TimeElapsedColumn(),
console=self.console,
) as progress:
synthesis_task = progress.add_task(
"[red]Agent 5: SYNTHESIZING COMPREHENSIVE ANALYSIS ••••••••••••••••••••••••••••••••",
total=None,
)
# Agent execution phase
try:
if self.show_dashboard:
self.console.print(
Panel(
f"[bold red]LOOP {current_loop + 1}: LAUNCHING SPECIALIZED AGENTS[/bold red]\n"
"[white]Executing 4 agents in parallel for comprehensive analysis[/white]",
title="[bold red]AGENT EXECUTION PHASE[/bold red]",
border_style="red",
)
)
agent_results = self._execute_agents_parallel(
questions=questions, agents=self.agents, img=img
)
progress.update(
synthesis_task,
description="[red]Agent 5: INTEGRATING AGENT RESULTS ••••••••••••••••••••••••••••••••",
)
time.sleep(0.5)
except Exception as e:
error_msg = f"Failed to execute agents in loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
progress.update(
synthesis_task,
description="[red]Agent 5: Summarizing Results ••••••••••••••••••••••••••••••••",
)
# Synthesis phase
try:
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
TimeElapsedColumn(),
console=self.console,
) as progress:
synthesis_task = progress.add_task(
f"[red]LOOP {current_loop + 1} - Agent 5: SYNTHESIZING COMPREHENSIVE ANALYSIS",
total=None,
)
progress.update(
synthesis_task,
description=f"[red]LOOP {current_loop + 1} - Agent 5: INTEGRATING AGENT RESULTS",
)
time.sleep(0.5)
progress.update(
synthesis_task,
description=f"[red]LOOP {current_loop + 1} - Agent 5: Summarizing Results",
)
final_result = self._synthesize_results(
original_task=loop_task,
questions=questions,
agent_results=agent_results,
)
progress.update(
synthesis_task,
description=f"[white]LOOP {current_loop + 1} - Agent 5: GENERATING FINAL REPORT",
)
time.sleep(0.3)
progress.update(
synthesis_task,
description=f"[bold white]LOOP {current_loop + 1} - Agent 5: COMPLETE!",
)
time.sleep(0.5)
self.console.print(
Panel(
f"[bold red]LOOP {current_loop + 1} ANALYSIS COMPLETE![/bold red]\n"
"[white]Comprehensive multi-agent analysis delivered successfully[/white]",
title="[bold red]LOOP COMPLETED[/bold red]",
border_style="red",
)
)
self.console.print()
else:
final_result = self._synthesize_results(
original_task=loop_task,
questions=questions,
agent_results=agent_results,
)
self.conversation.add(
role="Synthesis Agent",
content=final_result,
category="output",
)
final_result = self._synthesize_results(
original_task=task,
questions=questions,
agent_results=agent_results,
)
except Exception as e:
error_msg = f"Failed to synthesize results in loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
progress.update(
synthesis_task,
description="[white]Agent 5: GENERATING FINAL REPORT ••••••••••••••••••••••••••••••••",
)
time.sleep(0.3)
# Store the result for next loop context
last_output = final_result
current_loop += 1
progress.update(
synthesis_task,
description="[bold white]Agent 5: ✅ COMPLETE! ••••••••••••••••••••••••••••••••",
)
time.sleep(0.5)
if self.verbose:
logger.success(f"Loop {current_loop} completed successfully")
except Exception as e:
error_msg = f"Failed to execute loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
except Exception as e:
error_msg = f"Critical error in HeavySwarm execution: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
# Final completion message
if self.show_dashboard:
self.console.print(
Panel(
"[bold red]⚡ HEAVYSWARM ANALYSIS COMPLETE![/bold red]\n"
"[white]Comprehensive multi-agent analysis delivered successfully[/white]",
"[bold red]ALL LOOPS COMPLETED![/bold red]\n"
"[white]HeavySwarm multi-loop analysis finished successfully[/white]",
title="[bold red]MISSION ACCOMPLISHED[/bold red]",
border_style="red",
)
)
self.console.print()
else:
final_result = self._synthesize_results(
original_task=task,
questions=questions,
agent_results=agent_results,
)
self.conversation.add(
role="Synthesis Agent",
content=final_result,
category="output",
)
if self.verbose:
logger.success(f"HeavySwarm execution completed after {self.max_loops} loops")
return history_output_formatter(
conversation=self.conversation,

Loading…
Cancel
Save