Merge pull request #1089 from IlumCI/heavyswarm

[IMPRV-HEAVYSWARM][Added Max_loops param to `Heavy_swarm`]
pull/1102/head
Kye Gomez 2 weeks ago committed by GitHub
commit 8679543c6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -10,22 +10,15 @@ from typing import Dict, List, Optional
from loguru import logger
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
from rich.progress import (Progress, SpinnerColumn, TextColumn,
TimeElapsedColumn)
from rich.table import Table
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.tools.tool_type import tool_type
from swarms.utils.formatter import formatter
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.history_output_formatter import history_output_formatter
from swarms.utils.litellm_wrapper import LiteLLM
from swarms.tools.tool_type import tool_type
RESEARCH_AGENT_PROMPT = """
You are an expert Research Agent with exceptional capabilities in:
@ -144,7 +137,8 @@ You approach analysis with:
- Actionable insights
- Clear communication
Provide precise, data-driven analysis with clear implications and recommendations."""
Provide precise, data-driven analysis with clear implications and
recommendations."""
ALTERNATIVES_AGENT_PROMPT = """
You are an expert Alternatives Agent with exceptional capabilities in:
@ -218,7 +212,8 @@ You approach alternatives generation with:
- Implementation focus
- Value optimization
Provide innovative, practical, and well-evaluated alternative approaches and solutions.
Provide innovative, practical, and well-evaluated alternative approaches
and solutions.
"""
@ -304,7 +299,8 @@ You approach verification with:
- Quality focus
- Practical realism
Provide thorough, objective verification with clear feasibility assessments and risk evaluations."""
Provide thorough, objective verification with clear feasibility
assessments and risk evaluations."""
SYNTHESIS_AGENT_PROMPT = """
You are an expert Synthesis Agent with advanced capabilities in:
@ -378,35 +374,54 @@ You approach synthesis with:
- Value optimization
- Implementation focus
Provide comprehensive, integrated analysis with clear, actionable recommendations and detailed implementation guidance."""
Provide comprehensive, integrated analysis with clear, actionable
recommendations and detailed implementation guidance."""
schema = {
"type": "function",
"function": {
"name": "generate_specialized_questions",
"description": "Generate 4 specialized questions for different agent roles to comprehensively analyze a given task",
"description": (
"Generate 4 specialized questions for different agent roles to "
"comprehensively analyze a given task"
),
"parameters": {
"type": "object",
"properties": {
"thinking": {
"type": "string",
"description": "Your reasoning process for how to break down this task into 4 specialized questions for different agent roles",
"description": (
"Your reasoning process for how to break down this task "
"into 4 specialized questions for different agent roles"
),
},
"research_question": {
"type": "string",
"description": "A detailed research question for the Research Agent to gather comprehensive background information and data",
"description": (
"A detailed research question for the Research Agent to "
"gather comprehensive background information and data"
),
},
"analysis_question": {
"type": "string",
"description": "An analytical question for the Analysis Agent to examine patterns, trends, and insights",
"description": (
"An analytical question for the Analysis Agent to examine "
"patterns, trends, and insights"
),
},
"alternatives_question": {
"type": "string",
"description": "A strategic question for the Alternatives Agent to explore different approaches, options, and solutions",
"description": (
"A strategic question for the Alternatives Agent to explore "
"different approaches, options, and solutions"
),
},
"verification_question": {
"type": "string",
"description": "A verification question for the Verification Agent to validate findings, check accuracy, and assess feasibility",
"description": (
"A verification question for the Verification Agent to "
"validate findings, check accuracy, and assess feasibility"
),
},
},
"required": [
@ -425,9 +440,11 @@ schema = [schema]
class HeavySwarm:
"""
HeavySwarm is a sophisticated multi-agent orchestration system that decomposes complex tasks
into specialized questions and executes them using four specialized agents: Research, Analysis,
Alternatives, and Verification. The results are then synthesized into a comprehensive response.
HeavySwarm is a sophisticated multi-agent orchestration system that
decomposes complex tasks into specialized questions and executes them
using four specialized agents: Research, Analysis, Alternatives, and
Verification. The results are then synthesized into a comprehensive
response.
This swarm architecture provides robust task analysis through:
- Intelligent question generation for specialized agent roles
@ -435,17 +452,28 @@ class HeavySwarm:
- Comprehensive synthesis of multi-perspective results
- Real-time progress monitoring with rich dashboard displays
- Reliability checks and validation systems
- Multi-loop iterative refinement with context preservation
The HeavySwarm follows a structured workflow:
1. Task decomposition into specialized questions
2. Parallel execution by specialized agents
3. Result synthesis and integration
4. Comprehensive final report generation
5. Optional iterative refinement through multiple loops
Key Features:
- **Multi-loop Execution**: The max_loops parameter enables iterative
refinement where each subsequent loop builds upon the context and
results from previous loops
- **Context Preservation**: Conversation history is maintained across
all loops, allowing for deeper analysis and refinement
- **Iterative Refinement**: Each loop can refine, improve, or complete
aspects of the analysis based on previous results
Attributes:
name (str): Name identifier for the swarm instance
description (str): Description of the swarm's purpose
agents (List[Agent]): List of agent instances (currently unused, agents are created internally)
agents (Dict[str, Agent]): Dictionary of specialized agent instances (created internally)
timeout (int): Maximum execution time per agent in seconds
aggregation_strategy (str): Strategy for result aggregation (currently 'synthesis')
loops_per_agent (int): Number of execution loops per agent
@ -455,6 +483,7 @@ class HeavySwarm:
max_workers (int): Maximum number of concurrent worker threads
show_dashboard (bool): Enable rich dashboard with progress visualization
agent_prints_on (bool): Enable individual agent output printing
max_loops (int): Maximum number of execution loops for iterative refinement
conversation (Conversation): Conversation history tracker
console (Console): Rich console for dashboard output
@ -464,15 +493,20 @@ class HeavySwarm:
... description="Market analysis swarm",
... question_agent_model_name="gpt-4o-mini",
... worker_model_name="gpt-4o-mini",
... show_dashboard=True
... show_dashboard=True,
... max_loops=3
... )
>>> result = swarm.run("Analyze the current cryptocurrency market trends")
>>> # The swarm will run 3 iterations, each building upon the previous results
"""
def __init__(
self,
name: str = "HeavySwarm",
description: str = "A swarm of agents that can analyze a task and generate specialized questions for each agent role",
description: str = (
"A swarm of agents that can analyze a task and generate "
"specialized questions for each agent role"
),
timeout: int = 300,
aggregation_strategy: str = "synthesis",
loops_per_agent: int = 1,
@ -483,10 +517,10 @@ class HeavySwarm:
show_dashboard: bool = False,
agent_prints_on: bool = False,
output_type: str = "dict-all-except-first",
worker_tools: tool_type = None,
worker_tools: Optional[tool_type] = None,
random_loops_per_agent: bool = False,
max_loops: int = 1,
):
) -> None:
"""
Initialize the HeavySwarm with configuration parameters.
@ -499,19 +533,28 @@ class HeavySwarm:
timeout (int, optional): Maximum execution time per agent in seconds. Defaults to 300.
aggregation_strategy (str, optional): Strategy for aggregating results. Currently only
'synthesis' is supported. Defaults to "synthesis".
loops_per_agent (int, optional): Number of execution loops each agent should perform.
Must be greater than 0. Defaults to 1.
question_agent_model_name (str, optional): Language model for question generation.
Defaults to "gpt-4o-mini".
worker_model_name (str, optional): Language model for specialized worker agents.
Defaults to "gpt-4o-mini".
loops_per_agent (int, optional): Number of execution loops each
agent should perform. Must be greater than 0. Defaults to 1.
question_agent_model_name (str, optional): Language model for
question generation. Defaults to "gpt-4o-mini".
worker_model_name (str, optional): Language model for specialized
worker agents. Defaults to "gpt-4o-mini".
verbose (bool, optional): Enable detailed logging and debug output. Defaults to False.
max_workers (int, optional): Maximum concurrent workers for parallel execution.
Defaults to 90% of CPU count.
show_dashboard (bool, optional): Enable rich dashboard with progress visualization.
Defaults to False.
agent_prints_on (bool, optional): Enable individual agent output printing.
Defaults to False.
max_workers (int, optional): Maximum concurrent workers for
parallel execution. Defaults to 90% of CPU count.
show_dashboard (bool, optional): Enable rich dashboard with
progress visualization. Defaults to False.
agent_prints_on (bool, optional): Enable individual agent
output printing. Defaults to False.
output_type (str, optional): Output format type for conversation
history. Defaults to "dict-all-except-first".
worker_tools (tool_type, optional): Tools available to worker
agents for enhanced functionality. Defaults to None.
random_loops_per_agent (bool, optional): Enable random number of
loops per agent (1-10 range). Defaults to False.
max_loops (int, optional): Maximum number of execution loops for
the entire swarm. Each loop builds upon previous results for
iterative refinement. Defaults to 1.
Raises:
ValueError: If loops_per_agent is 0 or negative
@ -519,7 +562,10 @@ class HeavySwarm:
Note:
The swarm automatically performs reliability checks during initialization
to ensure all required parameters are properly configured.
to ensure all required parameters are properly configured. The max_loops
parameter enables iterative refinement by allowing the swarm to process
the same task multiple times, with each subsequent loop building upon
the context and results from previous loops.
"""
self.name = name
self.description = description
@ -595,17 +641,11 @@ class HeavySwarm:
info_table.add_row("Swarm Name", self.name)
info_table.add_row("Description", self.description)
info_table.add_row("Timeout", f"{self.timeout}s")
info_table.add_row(
"Loops per Agent", str(self.loops_per_agent)
)
info_table.add_row(
"Question Model", self.question_agent_model_name
)
info_table.add_row("Loops per Agent", str(self.loops_per_agent))
info_table.add_row("Question Model", self.question_agent_model_name)
info_table.add_row("Worker Model", self.worker_model_name)
info_table.add_row("Max Workers", str(self.max_workers))
info_table.add_row(
"Aggregation Strategy", self.aggregation_strategy
)
info_table.add_row("Aggregation Strategy", self.aggregation_strategy)
# Display dashboard with professional Arasaka styling
self.console.print(
@ -646,9 +686,7 @@ class HeavySwarm:
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
TextColumn("[progress.description]{task.description}"),
transient=True,
console=self.console,
) as progress:
@ -732,17 +770,38 @@ class HeavySwarm:
title="Reliability Check",
)
def run(self, task: str, img: str = None):
def run(self, task: str, img: Optional[str] = None) -> str:
"""
Execute the complete HeavySwarm orchestration flow.
Execute the complete HeavySwarm orchestration flow with multi-loop functionality.
This method implements the max_loops feature, allowing the HeavySwarm to iterate
multiple times on the same task, with each subsequent loop building upon the
context and results from previous loops. This enables iterative refinement and
deeper analysis of complex tasks.
The method follows this workflow:
1. For the first loop: Execute the original task with full HeavySwarm orchestration
2. For subsequent loops: Combine previous results with original task as context
3. Maintain conversation history across all loops for context preservation
4. Return the final synthesized result from the last loop
Args:
task (str): The main task to analyze
img (str, optional): Image input if needed
task (str): The main task to analyze and iterate upon
img (str, optional): Image input if needed for visual analysis tasks
Returns:
str: Comprehensive final answer from synthesis agent
str: Comprehensive final answer from synthesis agent after all loops complete
Note:
The max_loops parameter controls how many iterations the swarm will perform.
Each loop builds upon the previous results, enabling iterative refinement.
"""
if self.verbose:
logger.info("Starting HeavySwarm execution")
current_loop = 0
last_output = None
if self.show_dashboard:
self.console.print(
Panel(
@ -754,151 +813,209 @@ class HeavySwarm:
)
self.console.print()
# Add initial task to conversation
self.conversation.add(
role="User",
content=task,
category="input",
)
# Question generation with dashboard
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
transient=True,
console=self.console,
) as progress:
task_gen = progress.add_task(
"[red]⚡ GENERATING SPECIALIZED QUESTIONS...",
total=100,
)
progress.update(task_gen, advance=30)
questions = self.execute_question_generation(task)
progress.update(
task_gen,
advance=70,
description="[white]✓ QUESTIONS GENERATED SUCCESSFULLY!",
)
time.sleep(0.5)
else:
questions = self.execute_question_generation(task)
# if self.show_dashboard:
# # Create questions table
# questions_table = Table(
# title="⚡ GENERATED QUESTIONS FOR SPECIALIZED AGENTS",
# show_header=True,
# header_style="bold red",
# )
# questions_table.add_column(
# "Agent", style="white", width=20
# )
# questions_table.add_column(
# "Specialized Question", style="bright_white", width=60
# )
# questions_table.add_row(
# "Agent 1",
# questions.get("research_question", "N/A"),
# )
# questions_table.add_row(
# "Agent 2",
# questions.get("analysis_question", "N/A"),
# )
# questions_table.add_row(
# "Agent 3",
# questions.get("alternatives_question", "N/A"),
# )
# questions_table.add_row(
# "Agent 4",
# questions.get("verification_question", "N/A"),
# )
# self.console.print(
# Panel(
# questions_table,
# title="[bold red]QUESTION GENERATION COMPLETE[/bold red]",
# border_style="red",
# )
# )
# self.console.print()
# else:
# formatter.print_panel(
# content=json.dumps(questions, indent=4),
# title="Questions",
# )
# Main execution loop with comprehensive error handling
try:
while current_loop < self.max_loops:
try:
if self.verbose:
logger.info("Processing task iteration")
# No additional per-loop panels; keep dashboard output minimal and original-style
# Determine the task for this loop
if current_loop == 0:
# First loop: use the original task
loop_task = task
if self.verbose:
logger.info("First loop: Using original task")
else:
# Subsequent loops: combine previous results with original task
loop_task = (
f"Previous loop results: {last_output}\n\n"
f"Original task: {task}\n\n"
"Based on the previous results and analysis, continue with the next iteration. "
"Refine, improve, or complete any remaining aspects of the analysis. "
"Build upon the insights from the previous loop to provide deeper analysis."
)
if self.verbose:
logger.info(
"Subsequent loop: Building upon previous results"
)
self.conversation.add(
role="Question Generator Agent",
content=questions,
category="output",
)
# Question generation with dashboard
try:
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
transient=True,
console=self.console,
) as progress:
task_gen = progress.add_task(
"[red]⚡ GENERATING SPECIALIZED QUESTIONS...",
total=100,
)
progress.update(task_gen, advance=30)
questions = self.execute_question_generation(
loop_task
)
progress.update(
task_gen,
advance=70,
description="[white]✓ QUESTIONS GENERATED SUCCESSFULLY!",
)
time.sleep(0.5)
else:
questions = self.execute_question_generation(
loop_task
)
self.conversation.add(
role="Question Generator Agent",
content=questions,
category="output",
)
if "error" in questions:
return (
f"Error in question generation: {questions['error']}"
)
if "error" in questions:
error_msg = f"Error in question generation: {questions['error']}"
logger.error(error_msg)
return error_msg
if self.show_dashboard:
self.console.print(
Panel(
"[bold red]⚡ LAUNCHING SPECIALIZED AGENTS[/bold red]\n"
"[white]Executing 4 agents in parallel for comprehensive analysis[/white]",
title="[bold red]AGENT EXECUTION PHASE[/bold red]",
border_style="red",
)
)
except Exception as e:
error_msg = f"Failed to generate questions in loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
agent_results = self._execute_agents_parallel(
questions=questions, agents=self.agents, img=img
)
# Agent execution phase
try:
if self.show_dashboard:
self.console.print(
Panel(
"[bold red]⚡ LAUNCHING SPECIALIZED AGENTS[/bold red]\n"
"[white]Executing 4 agents in parallel for comprehensive analysis[/white]",
title="[bold red]AGENT EXECUTION PHASE[/bold red]",
border_style="red",
)
)
agent_results = self._execute_agents_parallel(
questions=questions,
agents=self.agents,
img=img,
)
# Synthesis with dashboard
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
TimeElapsedColumn(),
console=self.console,
) as progress:
synthesis_task = progress.add_task(
"[red]Agent 5: SYNTHESIZING COMPREHENSIVE ANALYSIS ••••••••••••••••••••••••••••••••",
total=None,
)
except Exception as e:
error_msg = f"Failed to execute agents in loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
progress.update(
synthesis_task,
description="[red]Agent 5: INTEGRATING AGENT RESULTS ••••••••••••••••••••••••••••••••",
)
time.sleep(0.5)
# Synthesis phase
try:
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
TimeElapsedColumn(),
console=self.console,
) as progress:
synthesis_task = progress.add_task(
"[red]Agent 5: SYNTHESIZING COMPREHENSIVE ANALYSIS ••••••••••••••••••••••••••••••••",
total=None,
)
progress.update(
synthesis_task,
description="[red]Agent 5: INTEGRATING AGENT RESULTS ••••••••••••••••••••••••••••••••",
)
time.sleep(0.5)
progress.update(
synthesis_task,
description="[red]Agent 5: Summarizing Results ••••••••••••••••••••••••••••••••",
)
final_result = self._synthesize_results(
original_task=loop_task,
questions=questions,
agent_results=agent_results,
)
progress.update(
synthesis_task,
description="[white]Agent 5: GENERATING FINAL REPORT ••••••••••••••••••••••••••••••••",
)
time.sleep(0.3)
progress.update(
synthesis_task,
description="[bold white]Agent 5: COMPLETE! ••••••••••••••••••••••••••••••••",
)
time.sleep(0.5)
self.console.print(
Panel(
"[bold red]⚡ HEAVYSWARM ANALYSIS COMPLETE![/bold red]\n"
"[white]Comprehensive multi-agent analysis delivered successfully[/white]",
title="[bold red]MISSION ACCOMPLISHED[/bold red]",
border_style="red",
)
)
self.console.print()
else:
final_result = self._synthesize_results(
original_task=loop_task,
questions=questions,
agent_results=agent_results,
)
self.conversation.add(
role="Synthesis Agent",
content=final_result,
category="output",
)
progress.update(
synthesis_task,
description="[red]Agent 5: Summarizing Results ••••••••••••••••••••••••••••••••",
)
except Exception as e:
error_msg = f"Failed to synthesize results in loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
final_result = self._synthesize_results(
original_task=task,
questions=questions,
agent_results=agent_results,
)
# Store the result for next loop context
last_output = final_result
current_loop += 1
progress.update(
synthesis_task,
description="[white]Agent 5: GENERATING FINAL REPORT ••••••••••••••••••••••••••••••••",
)
time.sleep(0.3)
if self.verbose:
logger.success("Task iteration completed successfully")
progress.update(
synthesis_task,
description="[bold white]Agent 5: ✅ COMPLETE! ••••••••••••••••••••••••••••••••",
)
time.sleep(0.5)
except Exception as e:
error_msg = (
f"Failed to execute loop {current_loop + 1}: {str(e)}"
)
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
except Exception as e:
error_msg = f"Critical error in HeavySwarm execution: {str(e)}"
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
# Final completion message
if self.show_dashboard:
self.console.print(
Panel(
"[bold red]⚡ HEAVYSWARM ANALYSIS COMPLETE![/bold red]\n"
@ -908,18 +1025,9 @@ class HeavySwarm:
)
)
self.console.print()
else:
final_result = self._synthesize_results(
original_task=task,
questions=questions,
agent_results=agent_results,
)
self.conversation.add(
role="Synthesis Agent",
content=final_result,
category="output",
)
if self.verbose:
logger.success("HeavySwarm execution completed")
return history_output_formatter(
conversation=self.conversation,
@ -1080,9 +1188,7 @@ class HeavySwarm:
"""
if self.show_dashboard:
return self._execute_agents_with_dashboard(
questions, agents, img
)
return self._execute_agents_with_dashboard(questions, agents, img)
else:
return self._execute_agents_basic(questions, agents, img)
@ -1180,14 +1286,10 @@ class HeavySwarm:
}
# Collect results as they complete
for future in concurrent.futures.as_completed(
future_to_agent
):
for future in concurrent.futures.as_completed(future_to_agent):
agent_type = future_to_agent[future]
try:
agent_name, result = future.result(
timeout=self.timeout
)
agent_name, result = future.result(timeout=self.timeout)
results[agent_name.lower()] = result
except concurrent.futures.TimeoutError:
logger.error(
@ -1200,9 +1302,7 @@ class HeavySwarm:
logger.error(
f"❌ Exception in {agent_type} Agent: {str(e)}"
)
results[agent_type.lower()] = (
f"Exception: {str(e)}"
)
results[agent_type.lower()] = f"Exception: {str(e)}"
return results
@ -1393,16 +1493,12 @@ class HeavySwarm:
) as executor:
# Submit all agent tasks
future_to_agent = {
executor.submit(
execute_agent_with_progress, task
): task[1]
executor.submit(execute_agent_with_progress, task): task[1]
for task in agent_tasks
}
# Collect results as they complete
for future in concurrent.futures.as_completed(
future_to_agent
):
for future in concurrent.futures.as_completed(future_to_agent):
agent_key = future_to_agent[future]
try:
agent_name, result = future.result(
@ -1485,11 +1581,11 @@ class HeavySwarm:
- Use bullet points, numbered lists, and section headings where appropriate for clarity and readability.
You may reference the conversation history for additional context:
\n\n
{self.conversation.return_history_as_string()}
\n\n
Please present your synthesis in the following structure:
@ -1555,12 +1651,8 @@ class HeavySwarm:
return {
"thinking": arguments.get("thinking", ""),
"research_question": arguments.get(
"research_question", ""
),
"analysis_question": arguments.get(
"analysis_question", ""
),
"research_question": arguments.get("research_question", ""),
"analysis_question": arguments.get("analysis_question", ""),
"alternatives_question": arguments.get(
"alternatives_question", ""
),
@ -1579,9 +1671,7 @@ class HeavySwarm:
"function_name": tool_call.function.name,
}
def execute_question_generation(
self, task: str
) -> Dict[str, str]:
def execute_question_generation(self, task: str) -> Dict[str, str]:
"""
Execute the question generation using the schema with a language model.
@ -1597,7 +1687,7 @@ class HeavySwarm:
You are an expert task analyzer. Your job is to break down the following task into 4 specialized questions for different agent roles:
1. Research Agent: Focuses on gathering information, data, and background context
2. Analysis Agent: Focuses on examining patterns, trends, and deriving insights
2. Analysis Agent: Focuses on examining patterns, trends, and deriving insights
3. Alternatives Agent: Focuses on exploring different approaches and solutions
4. Verification Agent: Focuses on validating findings and checking feasibility
@ -1671,12 +1761,8 @@ class HeavySwarm:
return {
"research_question": result.get("research_question", ""),
"analysis_question": result.get("analysis_question", ""),
"alternatives_question": result.get(
"alternatives_question", ""
),
"verification_question": result.get(
"verification_question", ""
),
"alternatives_question": result.get("alternatives_question", ""),
"verification_question": result.get("verification_question", ""),
}
def get_questions_as_list(self, task: str) -> List[str]:

Loading…
Cancel
Save