You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/swarms/structs/heavy_swarm.py

1713 lines
70 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import concurrent.futures
import json
import os
import random
import time
import traceback
from functools import lru_cache
from typing import Dict, List, Optional
from loguru import logger
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
from rich.table import Table
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.tools.tool_type import tool_type
from swarms.utils.formatter import formatter
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.litellm_wrapper import LiteLLM
RESEARCH_AGENT_PROMPT = """
You are a senior research agent. Your mission is to deliver fast, trustworthy, and reproducible research that supports decision-making.
Objective:
- Produce well-sourced, reproducible, and actionable research that directly answers the task.
Core responsibilities:
- Frame the research scope and assumptions
- Design and execute a systematic search strategy
- Extract and evaluate evidence
- Triangulate across sources and assess reliability
- Present findings with limitations and next steps
Process:
1. Clarify scope; state assumptions if details are missing
2. Define search strategy (keywords, databases, time range)
3. Collect sources, prioritizing primary and high-credibility ones
4. Extract key claims, methods, and figures with provenance
5. Score source credibility and reconcile conflicting claims
6. Synthesize into actionable insights
Scoring rubric (05 scale for each):
- Credibility
- Recency
- Methodological transparency
- Relevance
- Consistency with other sources
Deliverables:
1. Concise summary (12 sentences)
2. Key findings (bullet points)
3. Evidence table (source id, claim, support level, credibility, link)
4. Search log and methods
5. Assumptions and unknowns
6. Limitations and biases
7. Recommendations and next steps
8. Confidence score with justification
9. Raw citations and extracts
Citation rules:
- Number citations inline [1], [2], and provide metadata in the evidence table
- Explicitly label assumptions
- Include provenance for paraphrased content
Style and guardrails:
- Objective, precise language
- Present conflicting evidence fairly
- Redact sensitive details unless explicitly authorized
- If evidence is insufficient, state what is missing and suggest how to obtain it
"""
ANALYSIS_AGENT_PROMPT = """
You are an expert analysis agent. Your mission is to transform raw data or research into validated, decision-grade insights.
Objective:
- Deliver statistically sound analyses and models with quantified uncertainty.
Core responsibilities:
- Assess data quality
- Choose appropriate methods and justify them
- Run diagnostics and quantify uncertainty
- Interpret results in context and provide recommendations
Process:
1. Validate dataset (structure, missingness, ranges)
2. Clean and document transformations
3. Explore (distributions, outliers, correlations)
4. Select methods (justify choice)
5. Fit models or perform tests; report parameters and uncertainty
6. Run sensitivity and robustness checks
7. Interpret results and link to decisions
Deliverables:
1. Concise summary (key implication in 12 sentences)
2. Dataset overview
3. Methods and assumptions
4. Results (tables, coefficients, metrics, units)
5. Diagnostics and robustness
6. Quantified uncertainty
7. Practical interpretation and recommendations
8. Limitations and biases
9. Optional reproducible code/pseudocode
Style and guardrails:
- Rigorous but stakeholder-friendly explanations
- Clearly distinguish correlation from causation
- Present conservative results when evidence is weak
"""
ALTERNATIVES_AGENT_PROMPT = """
You are an alternatives agent. Your mission is to generate a diverse portfolio of solutions and evaluate trade-offs consistently.
Objective:
- Present multiple credible strategies, evaluate them against defined criteria, and recommend a primary and fallback path.
Core responsibilities:
- Generate a balanced set of alternatives
- Evaluate each using a consistent set of criteria
- Provide implementation outlines and risk mitigation
Process:
1. Define evaluation criteria and weights
2. Generate at least four distinct alternatives
3. For each option, describe scope, cost, timeline, resources, risks, and success metrics
4. Score options in a trade-off matrix
5. Rank and recommend primary and fallback strategies
6. Provide phased implementation roadmap
Deliverables:
1. Concise recommendation with rationale
2. List of alternatives with short descriptions
3. Trade-off matrix with scores and justifications
4. Recommendation with risk plan
5. Implementation roadmap with milestones
6. Success criteria and KPIs
7. Contingency plans with switch triggers
Style and guardrails:
- Creative but realistic options
- Transparent about hidden costs or dependencies
- Highlight flexibility-preserving options
- Use ranges and confidence where estimates are uncertain
"""
VERIFICATION_AGENT_PROMPT = """
You are a verification agent. Your mission is to rigorously validate claims, methods, and feasibility.
Objective:
- Provide a transparent, evidence-backed verification of claims and quantify remaining uncertainty.
Core responsibilities:
- Fact-check against primary sources
- Validate methodology and internal consistency
- Assess feasibility and compliance
- Deliver verdicts with supporting evidence
Process:
1. Identify claims or deliverables to verify
2. Define requirements for verification
3. Triangulate independent sources
4. Re-run calculations or sanity checks
5. Stress-test assumptions
6. Produce verification scorecard and remediation steps
Deliverables:
1. Claim summary
2. Verification status (verified, partial, not verified)
3. Evidence matrix (source, finding, support, confidence)
4. Reproduction of critical calculations
5. Key risks and failure modes
6. Corrective steps
7. Confidence score with reasons
Style and guardrails:
- Transparent chain-of-evidence
- Highlight uncertainty explicitly
- If data is missing, state whats needed and propose next steps
"""
SYNTHESIS_AGENT_PROMPT = """
You are a synthesis agent. Your mission is to integrate multiple inputs into a coherent narrative and executable plan.
Objective:
- Deliver an integrated synthesis that reconciles evidence, clarifies trade-offs, and yields a prioritized plan.
Core responsibilities:
- Combine outputs from research, analysis, alternatives, and verification
- Highlight consensus and conflicts
- Provide a prioritized roadmap and communication plan
Process:
1. Map inputs and provenance
2. Identify convergence and conflicts
3. Prioritize actions by impact and feasibility
4. Develop integrated roadmap with owners, milestones, KPIs
5. Create stakeholder-specific summaries
Deliverables:
1. Executive summary (≤150 words)
2. Consensus findings and open questions
3. Priority action list
4. Integrated roadmap
5. Measurement and evaluation plan
6. Communication plan per stakeholder group
7. Evidence map and assumptions
Style and guardrails:
- Executive-focused summary, technical appendix for implementers
- Transparent about uncertainty
- Include “what could break this plan” with mitigation steps
"""
schema = {
"type": "function",
"function": {
"name": "generate_specialized_questions",
"description": (
"Generate 4 specialized questions for different agent roles to "
"comprehensively analyze a given task"
),
"parameters": {
"type": "object",
"properties": {
"thinking": {
"type": "string",
"description": (
"Your reasoning process for how to break down this task "
"into 4 specialized questions for different agent roles"
),
},
"research_question": {
"type": "string",
"description": (
"A detailed research question for the Research Agent to "
"gather comprehensive background information and data"
),
},
"analysis_question": {
"type": "string",
"description": (
"An analytical question for the Analysis Agent to examine "
"patterns, trends, and insights"
),
},
"alternatives_question": {
"type": "string",
"description": (
"A strategic question for the Alternatives Agent to explore "
"different approaches, options, and solutions"
),
},
"verification_question": {
"type": "string",
"description": (
"A verification question for the Verification Agent to "
"validate findings, check accuracy, and assess feasibility"
),
},
},
"required": [
"thinking",
"research_question",
"analysis_question",
"alternatives_question",
"verification_question",
],
},
},
}
schema = [schema]
class HeavySwarm:
"""
HeavySwarm is a sophisticated multi-agent orchestration system that
decomposes complex tasks into specialized questions and executes them
using four specialized agents: Research, Analysis, Alternatives, and
Verification. The results are then synthesized into a comprehensive
response.
This swarm architecture provides robust task analysis through:
- Intelligent question generation for specialized agent roles
- Parallel execution of specialized agents for efficiency
- Comprehensive synthesis of multi-perspective results
- Real-time progress monitoring with rich dashboard displays
- Reliability checks and validation systems
- Multi-loop iterative refinement with context preservation
The HeavySwarm follows a structured workflow:
1. Task decomposition into specialized questions
2. Parallel execution by specialized agents
3. Result synthesis and integration
4. Comprehensive final report generation
5. Optional iterative refinement through multiple loops
Key Features:
- **Multi-loop Execution**: The max_loops parameter enables iterative
refinement where each subsequent loop builds upon the context and
results from previous loops
S **Iterative Refinement**: Each loop can refine, improve, or complete
aspects of the analysis based on previous results
Attributes:
name (str): Name identifier for the swarm instance
description (str): Description of the swarm's purpose
agents (Dict[str, Agent]): Dictionary of specialized agent instances (created internally)
timeout (int): Maximum execution time per agent in seconds
aggregation_strategy (str): Strategy for result aggregation (currently 'synthesis')
loops_per_agent (int): Number of execution loops per agent
question_agent_model_name (str): Model name for question generation
worker_model_name (str): Model name for specialized worker agents
verbose (bool): Enable detailed logging output
max_workers (int): Maximum number of concurrent worker threads
show_dashboard (bool): Enable rich dashboard with progress visualization
agent_prints_on (bool): Enable individual agent output printing
max_loops (int): Maximum number of execution loops for iterative refinement
conversation (Conversation): Conversation history tracker
console (Console): Rich console for dashboard output
Example:
>>> swarm = HeavySwarm(
... name="AnalysisSwarm",
... description="Market analysis swarm",
... question_agent_model_name="gpt-4o-mini",
... worker_model_name="gpt-4o-mini",
... show_dashboard=True,
... max_loops=3
... )
>>> result = swarm.run("Analyze the current cryptocurrency market trends")
>>> # The swarm will run 3 iterations, each building upon the previous results
"""
def __init__(
self,
name: str = "HeavySwarm",
description: str = (
"A swarm of agents that can analyze a task and generate "
"specialized questions for each agent role"
),
timeout: int = 300,
aggregation_strategy: str = "synthesis",
loops_per_agent: int = 1,
question_agent_model_name: str = "gpt-4o-mini",
worker_model_name: str = "gpt-4o-mini",
verbose: bool = False,
max_workers: int = int(os.cpu_count() * 0.9),
show_dashboard: bool = False,
agent_prints_on: bool = False,
output_type: str = "dict-all-except-first",
worker_tools: Optional[tool_type] = None,
random_loops_per_agent: bool = False,
max_loops: int = 1,
) -> None:
"""
Initialize the HeavySwarm with configuration parameters.
Args:
name (str, optional): Identifier name for the swarm instance. Defaults to "HeavySwarm".
description (str, optional): Description of the swarm's purpose and capabilities.
Defaults to standard description.
agents (List[Agent], optional): Pre-configured agent list (currently unused as agents
are created internally). Defaults to None.
timeout (int, optional): Maximum execution time per agent in seconds. Defaults to 300.
aggregation_strategy (str, optional): Strategy for aggregating results. Currently only
'synthesis' is supported. Defaults to "synthesis".
loops_per_agent (int, optional): Number of execution loops each
agent should perform. Must be greater than 0. Defaults to 1.
question_agent_model_name (str, optional): Language model for
question generation. Defaults to "gpt-4o-mini".
worker_model_name (str, optional): Language model for specialized
worker agents. Defaults to "gpt-4o-mini".
verbose (bool, optional): Enable detailed logging and debug output. Defaults to False.
max_workers (int, optional): Maximum concurrent workers for
parallel execution. Defaults to 90% of CPU count.
show_dashboard (bool, optional): Enable rich dashboard with
progress visualization. Defaults to False.
agent_prints_on (bool, optional): Enable individual agent
output printing. Defaults to False.
output_type (str, optional): Output format type for conversation
history. Defaults to "dict-all-except-first".
worker_tools (tool_type, optional): Tools available to worker
agents for enhanced functionality. Defaults to None.
random_loops_per_agent (bool, optional): Enable random number of
loops per agent (1-10 range). Defaults to False.
max_loops (int, optional): Maximum number of execution loops for
the entire swarm. Each loop builds upon previous results for
iterative refinement. Defaults to 1.
Raises:
ValueError: If loops_per_agent is 0 or negative
ValueError: If required model names are None
Note:
The swarm automatically performs reliability checks during initialization
to ensure all required parameters are properly configured. The max_loops
parameter enables iterative refinement by allowing the swarm to process
the same task multiple times, with each subsequent loop building upon
the context and results from previous loops.
"""
self.name = name
self.description = description
self.timeout = timeout
self.aggregation_strategy = aggregation_strategy
self.loops_per_agent = loops_per_agent
self.question_agent_model_name = question_agent_model_name
self.worker_model_name = worker_model_name
self.verbose = verbose
self.max_workers = max_workers
self.show_dashboard = show_dashboard
self.agent_prints_on = agent_prints_on
self.output_type = output_type
self.worker_tools = worker_tools
self.random_loops_per_agent = random_loops_per_agent
self.max_loops = max_loops
self.conversation = Conversation()
self.console = Console()
self.agents = self.create_agents()
if self.show_dashboard:
self.show_swarm_info()
self.reliability_check()
def handle_worker_agent_loops(self) -> int:
"""
Handle the loops per agent for the worker agents.
"""
loops = None
if self.random_loops_per_agent:
loops = random.randint(1, 10)
else:
loops = self.loops_per_agent
return loops
def show_swarm_info(self):
"""
Display comprehensive swarm configuration information in a rich dashboard format.
This method creates and displays a professionally styled information table containing
all key swarm configuration parameters including models, timeouts, and operational
settings. The display uses Arasaka-inspired styling with red headers and borders.
The dashboard includes:
- Swarm identification (name, description)
- Execution parameters (timeout, loops per agent)
- Model configurations (question and worker models)
- Performance settings (max workers, aggregation strategy)
Note:
This method only displays output when show_dashboard is enabled. If show_dashboard
is False, the method returns immediately without any output.
Returns:
None: This method only displays output and has no return value.
"""
if not self.show_dashboard:
return
# Create swarm info table with Arasaka styling
info_table = Table(
title="⚡ HEAVYSWARM CONFIGURATION",
show_header=True,
header_style="bold red",
)
info_table.add_column("Parameter", style="white", width=25)
info_table.add_column("Value", style="bright_white", width=40)
info_table.add_row("Swarm Name", self.name)
info_table.add_row("Description", self.description)
info_table.add_row("Timeout", f"{self.timeout}s")
info_table.add_row(
"Loops per Agent", str(self.loops_per_agent)
)
info_table.add_row(
"Question Model", self.question_agent_model_name
)
info_table.add_row("Worker Model", self.worker_model_name)
info_table.add_row("Max Workers", str(self.max_workers))
info_table.add_row(
"Aggregation Strategy", self.aggregation_strategy
)
# Display dashboard with professional Arasaka styling
self.console.print(
Panel(
info_table,
title="[bold red]HEAVYSWARM SYSTEM[/bold red]",
border_style="red",
)
)
self.console.print()
def reliability_check(self):
"""
Perform comprehensive reliability and configuration validation checks.
This method validates all critical swarm configuration parameters to ensure
the system is properly configured for operation. It checks for common
configuration errors and provides clear error messages for any issues found.
Validation checks include:
- loops_per_agent: Must be greater than 0 to ensure agents execute
- worker_model_name: Must be set for agent execution
- question_agent_model_name: Must be set for question generation
The method provides different user experiences based on the show_dashboard setting:
- With dashboard: Shows animated progress bars with professional styling
- Without dashboard: Provides basic console output with completion confirmation
Raises:
ValueError: If loops_per_agent is 0 or negative (agents won't execute)
ValueError: If worker_model_name is None (agents can't be created)
ValueError: If question_agent_model_name is None (questions can't be generated)
Note:
This method is automatically called during __init__ to ensure the swarm
is properly configured before any operations begin.
"""
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
transient=True,
console=self.console,
) as progress:
task = progress.add_task(
"[red]RUNNING RELIABILITY CHECKS...", total=4
)
# Check loops_per_agent
time.sleep(0.5)
if self.loops_per_agent == 0:
raise ValueError(
"loops_per_agent must be greater than 0. This parameter is used to determine how many times each agent will run. If it is 0, the agent will not run at all."
)
progress.update(
task,
advance=1,
description="[white]✓ LOOPS PER AGENT VALIDATED",
)
# Check worker_model_name
time.sleep(0.5)
if self.worker_model_name is None:
raise ValueError(
"worker_model_name must be set. This parameter is used to determine the model that will be used to execute the agents."
)
progress.update(
task,
advance=1,
description="[white]✓ WORKER MODEL VALIDATED",
)
# Check question_agent_model_name
time.sleep(0.5)
if self.question_agent_model_name is None:
raise ValueError(
"question_agent_model_name must be set. This parameter is used to determine the model that will be used to generate the questions."
)
progress.update(
task,
advance=1,
description="[white]✓ QUESTION MODEL VALIDATED",
)
# Final validation
time.sleep(0.5)
progress.update(
task,
advance=1,
description="[bold white]✓ ALL RELIABILITY CHECKS PASSED!",
)
time.sleep(0.8) # Let user see the final message
self.console.print(
Panel(
"[bold red]✅ HEAVYSWARM RELIABILITY CHECK COMPLETE[/bold red]\n"
"[white]All systems validated and ready for operation[/white]",
title="[bold red]SYSTEM STATUS[/bold red]",
border_style="red",
)
)
self.console.print()
else:
# Original non-dashboard behavior
if self.loops_per_agent == 0:
raise ValueError(
"loops_per_agent must be greater than 0. This parameter is used to determine how many times each agent will run. If it is 0, the agent will not run at all."
)
if self.worker_model_name is None:
raise ValueError(
"worker_model_name must be set. This parameter is used to determine the model that will be used to execute the agents."
)
if self.question_agent_model_name is None:
raise ValueError(
"question_agent_model_name must be set. This parameter is used to determine the model that will be used to generate the questions."
)
formatter.print_panel(
content="Reliability check passed",
title="Reliability Check",
)
def run(self, task: str, img: Optional[str] = None) -> str:
"""
Execute the complete HeavySwarm orchestration flow with multi-loop functionality.
This method implements the max_loops feature, allowing the HeavySwarm to iterate
multiple times on the same task, with each subsequent loop building upon the
context and results from previous loops. This enables iterative refinement and
deeper analysis of complex tasks.
The method follows this workflow:
1. For the first loop: Execute the original task with full HeavySwarm orchestration
2. For subsequent loops: Combine previous results with original task as context
3. Maintain conversation history across all loops for context preservation
4. Return the final synthesized result from the last loop
Args:
task (str): The main task to analyze and iterate upon
img (str, optional): Image input if needed for visual analysis tasks
Returns:
str: Comprehensive final answer from synthesis agent after all loops complete
Note:
The max_loops parameter controls how many iterations the swarm will perform.
Each loop builds upon the previous results, enabling iterative refinement.
"""
if self.verbose:
logger.info("Starting HeavySwarm execution")
current_loop = 0
last_output = None
if self.show_dashboard:
self.console.print(
Panel(
f"[bold red]⚡ Completing Task[/bold red]\n"
f"[white]Task: {task}[/white]",
title="[bold red]Initializing HeavySwarm[/bold red]",
border_style="red",
)
)
self.console.print()
# Add initial task to conversation
self.conversation.add(
role="User",
content=task,
category="input",
)
# Main execution loop with comprehensive error handling
try:
while current_loop < self.max_loops:
try:
if self.verbose:
logger.info("Processing task iteration")
# No additional per-loop panels; keep dashboard output minimal and original-style
# Determine the task for this loop
if current_loop == 0:
# First loop: use the original task
loop_task = task
if self.verbose:
logger.info(
"First loop: Using original task"
)
else:
# Subsequent loops: combine previous results with original task
loop_task = (
f"Previous loop results: {last_output}\n\n"
f"Original task: {task}\n\n"
"Based on the previous results and analysis, continue with the next iteration. "
"Refine, improve, or complete any remaining aspects of the analysis. "
"Build upon the insights from the previous loop to provide deeper analysis."
)
if self.verbose:
logger.info(
"Subsequent loop: Building upon previous results"
)
# Question generation with dashboard
try:
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
transient=True,
console=self.console,
) as progress:
task_gen = progress.add_task(
"[red]⚡ GENERATING SPECIALIZED QUESTIONS...",
total=100,
)
progress.update(task_gen, advance=30)
questions = (
self.execute_question_generation(
loop_task
)
)
progress.update(
task_gen,
advance=70,
description="[white]✓ QUESTIONS GENERATED SUCCESSFULLY!",
)
time.sleep(0.5)
else:
questions = (
self.execute_question_generation(
loop_task
)
)
self.conversation.add(
role="Question Generator Agent",
content=questions,
category="output",
)
if "error" in questions:
error_msg = f"Error in question generation: {questions['error']}"
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Failed to generate questions in loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(
f"Traceback: {traceback.format_exc()}"
)
return error_msg
# Agent execution phase
try:
if self.show_dashboard:
self.console.print(
Panel(
"[bold red]⚡ LAUNCHING SPECIALIZED AGENTS[/bold red]\n"
"[white]Executing 4 agents in parallel for comprehensive analysis[/white]",
title="[bold red]AGENT EXECUTION PHASE[/bold red]",
border_style="red",
)
)
agent_results = self._execute_agents_parallel(
questions=questions,
agents=self.agents,
img=img,
)
except Exception as e:
error_msg = f"Failed to execute agents in loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(
f"Traceback: {traceback.format_exc()}"
)
return error_msg
# Synthesis phase
try:
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
TimeElapsedColumn(),
console=self.console,
) as progress:
synthesis_task = progress.add_task(
"[red]Agent 5: SYNTHESIZING COMPREHENSIVE ANALYSIS ••••••••••••••••••••••••••••••••",
total=None,
)
progress.update(
synthesis_task,
description="[red]Agent 5: INTEGRATING AGENT RESULTS ••••••••••••••••••••••••••••••••",
)
time.sleep(0.5)
progress.update(
synthesis_task,
description="[red]Agent 5: Summarizing Results ••••••••••••••••••••••••••••••••",
)
final_result = (
self._synthesize_results(
original_task=loop_task,
questions=questions,
agent_results=agent_results,
)
)
progress.update(
synthesis_task,
description="[white]Agent 5: GENERATING FINAL REPORT ••••••••••••••••••••••••••••••••",
)
time.sleep(0.3)
progress.update(
synthesis_task,
description="[bold white]Agent 5: COMPLETE! ••••••••••••••••••••••••••••••••",
)
time.sleep(0.5)
self.console.print(
Panel(
"[bold red]⚡ HEAVYSWARM ANALYSIS COMPLETE![/bold red]\n"
"[white]Comprehensive multi-agent analysis delivered successfully[/white]",
title="[bold red]MISSION ACCOMPLISHED[/bold red]",
border_style="red",
)
)
self.console.print()
else:
final_result = self._synthesize_results(
original_task=loop_task,
questions=questions,
agent_results=agent_results,
)
self.conversation.add(
role="Synthesis Agent",
content=final_result,
category="output",
)
except Exception as e:
error_msg = f"Failed to synthesize results in loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(
f"Traceback: {traceback.format_exc()}"
)
return error_msg
# Store the result for next loop context
last_output = final_result
current_loop += 1
if self.verbose:
logger.success(
"Task iteration completed successfully"
)
except Exception as e:
error_msg = f"Failed to execute loop {current_loop + 1}: {str(e)}"
logger.error(error_msg)
logger.error(
f"Traceback: {traceback.format_exc()}"
)
return error_msg
except Exception as e:
error_msg = (
f"Critical error in HeavySwarm execution: {str(e)}"
)
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
# Final completion message
if self.show_dashboard:
self.console.print(
Panel(
"[bold red]⚡ HEAVYSWARM ANALYSIS COMPLETE![/bold red]\n"
"[white]Comprehensive multi-agent analysis delivered successfully[/white]",
title="[bold red]MISSION ACCOMPLISHED[/bold red]",
border_style="red",
)
)
self.console.print()
if self.verbose:
logger.success("HeavySwarm execution completed")
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
)
@lru_cache(maxsize=1)
def create_agents(self):
"""
Create and cache the 4 specialized agents with detailed role-specific prompts.
This method creates a complete set of specialized agents optimized for different
aspects of task analysis. Each agent is configured with expert-level system prompts
and optimal settings for their specific role. The agents are cached using LRU cache
to avoid recreation overhead on subsequent calls.
The specialized agents created:
1. **Research Agent**: Expert in comprehensive information gathering, data collection,
market research, and source verification. Specializes in systematic literature
reviews, competitive intelligence, and statistical data interpretation.
2. **Analysis Agent**: Expert in advanced statistical analysis, pattern recognition,
predictive modeling, and causal relationship identification. Specializes in
regression analysis, forecasting, and performance metrics development.
3. **Alternatives Agent**: Expert in strategic thinking, creative problem-solving,
innovation ideation, and strategic option evaluation. Specializes in design
thinking, scenario planning, and blue ocean strategy identification.
4. **Verification Agent**: Expert in validation, feasibility assessment, fact-checking,
and quality assurance. Specializes in risk assessment, compliance verification,
and implementation barrier analysis.
5. **Synthesis Agent**: Expert in multi-perspective integration, comprehensive analysis,
and executive summary creation. Specializes in strategic alignment, conflict
resolution, and holistic solution development.
Agent Configuration:
- All agents use the configured worker_model_name
- Loops are set based on loops_per_agent parameter
- Dynamic temperature is enabled for creative responses
- Streaming is disabled for complete responses
- Verbose mode follows class configuration
Returns:
Dict[str, Agent]: Dictionary containing all 5 specialized agents with keys:
- 'research': Research Agent instance
- 'analysis': Analysis Agent instance
- 'alternatives': Alternatives Agent instance
- 'verification': Verification Agent instance
- 'synthesis': Synthesis Agent instance
Note:
This method uses @lru_cache(maxsize=1) to ensure agents are only created once
per HeavySwarm instance, improving performance for multiple task executions.
"""
if self.verbose:
logger.info("🏗️ Creating specialized agents...")
tools = None
if self.worker_tools is not None:
tools = self.worker_tools
# Research Agent - Deep information gathering and data collection
research_agent = Agent(
agent_name="Research-Agent",
agent_description="Expert research agent specializing in comprehensive information gathering and data collection",
system_prompt=RESEARCH_AGENT_PROMPT,
max_loops=self.handle_worker_agent_loops(),
model_name=self.worker_model_name,
streaming_on=False,
verbose=False,
dynamic_temperature_enabled=True,
print_on=self.agent_prints_on,
tools=tools,
)
# Analysis Agent - Pattern recognition and deep analytical insights
analysis_agent = Agent(
agent_name="Analysis-Agent",
agent_description="Expert analytical agent specializing in pattern recognition, data analysis, and insight generation",
system_prompt=ANALYSIS_AGENT_PROMPT,
max_loops=self.handle_worker_agent_loops(),
model_name=self.worker_model_name,
streaming_on=False,
verbose=False,
dynamic_temperature_enabled=True,
print_on=self.agent_prints_on,
tools=tools,
)
# Alternatives Agent - Strategic options and creative solutions
alternatives_agent = Agent(
agent_name="Alternatives-Agent",
agent_description="Expert strategic agent specializing in alternative approaches, creative solutions, and option generation",
system_prompt=ALTERNATIVES_AGENT_PROMPT,
max_loops=self.handle_worker_agent_loops(),
model_name=self.worker_model_name,
streaming_on=False,
verbose=False,
dynamic_temperature_enabled=True,
print_on=self.agent_prints_on,
tools=tools,
)
# Verification Agent - Validation, feasibility assessment, and quality assurance
verification_agent = Agent(
agent_name="Verification-Agent",
agent_description="Expert verification agent specializing in validation, feasibility assessment, and quality assurance",
system_prompt=VERIFICATION_AGENT_PROMPT,
max_loops=self.handle_worker_agent_loops(),
model_name=self.worker_model_name,
streaming_on=False,
verbose=False,
dynamic_temperature_enabled=True,
print_on=self.agent_prints_on,
tools=tools,
)
# Synthesis Agent - Integration and comprehensive analysis
synthesis_agent = Agent(
agent_name="Synthesis-Agent",
agent_description="Expert synthesis agent specializing in integration, comprehensive analysis, and final recommendations",
system_prompt=SYNTHESIS_AGENT_PROMPT,
max_loops=1,
model_name=self.worker_model_name,
streaming_on=False,
verbose=False,
dynamic_temperature_enabled=True,
print_on=True,
tools=tools,
)
agents = {
"research": research_agent,
"analysis": analysis_agent,
"alternatives": alternatives_agent,
"verification": verification_agent,
"synthesis": synthesis_agent,
}
return agents
def _execute_agents_parallel(
self, questions: Dict, agents: Dict, img: Optional[str] = None
) -> Dict[str, str]:
"""
Execute the 4 specialized agents in TRUE parallel using concurrent.futures.
Args:
questions (Dict): Generated questions for each agent
agents (Dict): Dictionary of specialized agents
img (str, optional): Image input if needed
Returns:
Dict[str, str]: Results from each agent
"""
if self.show_dashboard:
return self._execute_agents_with_dashboard(
questions, agents, img
)
else:
return self._execute_agents_basic(questions, agents, img)
def _execute_agents_basic(
self, questions: Dict, agents: Dict, img: Optional[str] = None
) -> Dict[str, str]:
"""
Execute specialized agents in parallel without dashboard visualization.
This method provides the core agent execution functionality using concurrent.futures
for true parallel processing. It executes the four specialized agents simultaneously
to maximize efficiency while providing basic error handling and timeout management.
The execution process:
1. Prepare agent tasks with their respective specialized questions
2. Submit all tasks to ThreadPoolExecutor for parallel execution
3. Collect results as agents complete their work
4. Handle timeouts and exceptions gracefully
5. Log results to conversation history
Args:
questions (Dict): Generated questions containing keys:
- research_question: Question for Research Agent
- analysis_question: Question for Analysis Agent
- alternatives_question: Question for Alternatives Agent
- verification_question: Question for Verification Agent
agents (Dict): Dictionary of specialized agent instances from create_agents()
img (str, optional): Image input for agents that support visual analysis.
Defaults to None.
Returns:
Dict[str, str]: Results from each agent execution with keys:
- 'research': Research Agent output
- 'analysis': Analysis Agent output
- 'alternatives': Alternatives Agent output
- 'verification': Verification Agent output
Note:
This method uses ThreadPoolExecutor with max_workers limit for parallel execution.
Each agent runs independently and results are collected as they complete.
Timeout and exception handling ensure robustness even if individual agents fail.
"""
# Define agent execution tasks
def execute_agent(agent_info):
agent_type, agent, question = agent_info
try:
result = agent.run(question)
self.conversation.add(
role=agent.agent_name,
content=result,
category="output",
)
return agent_type, result
except Exception as e:
logger.error(
f"❌ Error in {agent_type} Agent: {str(e)} Traceback: {traceback.format_exc()}"
)
return agent_type, f"Error: {str(e)}"
# Prepare agent tasks
agent_tasks = [
(
"Research",
agents["research"],
questions.get("research_question", ""),
),
(
"Analysis",
agents["analysis"],
questions.get("analysis_question", ""),
),
(
"Alternatives",
agents["alternatives"],
questions.get("alternatives_question", ""),
),
(
"Verification",
agents["verification"],
questions.get("verification_question", ""),
),
]
# Execute agents in parallel using ThreadPoolExecutor
results = {}
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
# Submit all agent tasks
future_to_agent = {
executor.submit(execute_agent, task): task[0]
for task in agent_tasks
}
# Collect results as they complete
for future in concurrent.futures.as_completed(
future_to_agent
):
agent_type = future_to_agent[future]
try:
agent_name, result = future.result(
timeout=self.timeout
)
results[agent_name.lower()] = result
except concurrent.futures.TimeoutError:
logger.error(
f"⏰ Timeout for {agent_type} Agent after {self.timeout}s"
)
results[agent_type.lower()] = (
f"Timeout after {self.timeout} seconds"
)
except Exception as e:
logger.error(
f"❌ Exception in {agent_type} Agent: {str(e)}"
)
results[agent_type.lower()] = (
f"Exception: {str(e)}"
)
return results
def _execute_agents_with_dashboard(
self, questions: Dict, agents: Dict, img: Optional[str] = None
) -> Dict[str, str]:
"""
Execute specialized agents in parallel with rich dashboard visualization and progress tracking.
This method provides an enhanced user experience by displaying real-time progress bars
and status updates for each agent execution. It combines the efficiency of parallel
processing with professional dashboard visualization using Rich console styling.
Dashboard Features:
- Individual progress bars for each of the 4 specialized agents
- Real-time status updates with professional Arasaka-inspired styling
- Animated dots and progress indicators for visual engagement
- Color-coded status messages (red for processing, white for completion)
- Completion summary with mission accomplished messaging
Progress Phases for Each Agent:
1. INITIALIZING: Agent setup and preparation
2. PROCESSING QUERY: Question analysis and processing
3. EXECUTING: Core agent execution with animated indicators
4. GENERATING RESPONSE: Response formulation and completion
5. COMPLETE: Successful execution confirmation
Args:
questions (Dict): Generated specialized questions containing:
- research_question: Comprehensive information gathering query
- analysis_question: Pattern recognition and insight analysis query
- alternatives_question: Creative solutions and options exploration query
- verification_question: Validation and feasibility assessment query
agents (Dict): Dictionary of specialized agent instances with keys:
- research, analysis, alternatives, verification
img (str, optional): Image input for agents supporting visual analysis.
Defaults to None.
Returns:
Dict[str, str]: Comprehensive results from agent execution:
- Keys correspond to agent types (research, analysis, alternatives, verification)
- Values contain detailed agent outputs and analysis
Note:
This method requires show_dashboard=True in the HeavySwarm configuration.
It provides the same parallel execution as _execute_agents_basic but with
enhanced visual feedback and professional presentation.
"""
# Agent configurations with professional styling
agent_configs = [
(
"Agent 1",
"research",
"white",
"Gathering comprehensive research data",
),
(
"Agent 2",
"analysis",
"white",
"Analyzing patterns and generating insights",
),
(
"Agent 3",
"alternatives",
"white",
"Exploring creative solutions and alternatives",
),
(
"Agent 4",
"verification",
"white",
"Validating findings and checking feasibility",
),
]
results = {}
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TimeElapsedColumn(),
console=self.console,
) as progress:
# Create progress tasks for each agent
tasks = {}
for (
display_name,
agent_key,
color,
description,
) in agent_configs:
task_id = progress.add_task(
f"[{color}]{display_name}[/{color}]: INITIALIZING",
total=None,
)
tasks[agent_key] = task_id
# Define agent execution function with progress updates
def execute_agent_with_progress(agent_info):
agent_type, agent_key, agent, question = agent_info
try:
# Update progress to show agent starting
progress.update(
tasks[agent_key],
description=f"[red]{agent_type}[/red]: INITIALIZING ••••••••",
)
# Simulate some processing time for visual effect
time.sleep(0.5)
progress.update(
tasks[agent_key],
description=f"[red]{agent_type}[/red]: PROCESSING QUERY ••••••••••••••",
)
# Execute the agent with dots animation
progress.update(
tasks[agent_key],
description=f"[red]{agent_type}[/red]: EXECUTING ••••••••••••••••••••",
)
result = agent.run(question)
# Update progress during execution
progress.update(
tasks[agent_key],
description=f"[white]{agent_type}[/white]: GENERATING RESPONSE ••••••••••••••••••••••••••",
)
# Add to conversation
self.conversation.add(
role=agent.agent_name,
content=result,
category="output",
)
# Complete the progress
progress.update(
tasks[agent_key],
description=f"[bold white]{agent_type}[/bold white]: ✅ COMPLETE! ••••••••••••••••••••••••••••••••",
)
return agent_type, result
except Exception as e:
progress.update(
tasks[agent_key],
description=f"[bold red]{agent_type}[/bold red]: ❌ ERROR! ••••••••••••••••••••••••••••••••",
)
logger.error(
f"❌ Error in {agent_type} Agent: {str(e)} Traceback: {traceback.format_exc()}"
)
return agent_type, f"Error: {str(e)}"
# Prepare agent tasks with keys
agent_tasks = [
(
"Agent 1",
"research",
agents["research"],
questions.get("research_question", ""),
),
(
"Agent 2",
"analysis",
agents["analysis"],
questions.get("analysis_question", ""),
),
(
"Agent 3",
"alternatives",
agents["alternatives"],
questions.get("alternatives_question", ""),
),
(
"Agent 4",
"verification",
agents["verification"],
questions.get("verification_question", ""),
),
]
# Execute agents in parallel
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
# Submit all agent tasks
future_to_agent = {
executor.submit(
execute_agent_with_progress, task
): task[1]
for task in agent_tasks
}
# Collect results as they complete
for future in concurrent.futures.as_completed(
future_to_agent
):
agent_key = future_to_agent[future]
try:
agent_name, result = future.result(
timeout=self.timeout
)
results[
agent_name.lower()
.replace("🔍 ", "")
.replace("📊 ", "")
.replace("", "")
.replace("", "")
] = result
except concurrent.futures.TimeoutError:
progress.update(
tasks[agent_key],
description=f"[bold red]Agent {list(tasks.keys()).index(agent_key) + 1}[/bold red]: ⏰ TIMEOUT! ••••••••••••••••••••••••••••••••",
)
results[agent_key] = (
f"Timeout after {self.timeout} seconds"
)
except Exception as e:
progress.update(
tasks[agent_key],
description=f"[bold red]Agent {list(tasks.keys()).index(agent_key) + 1}[/bold red]: ❌ ERROR! ••••••••••••••••••••••••••••••••",
)
results[agent_key] = f"Exception: {str(e)}"
# Show completion summary
self.console.print(
Panel(
"[bold red]⚡ ALL AGENTS COMPLETED SUCCESSFULLY![/bold red]\n"
"[white]Results from all 4 specialized agents are ready for synthesis[/white]",
title="[bold red]EXECUTION COMPLETE[/bold red]",
border_style="red",
)
)
self.console.print()
return results
def _synthesize_results(
self, original_task: str, questions: Dict, agent_results: Dict
) -> str:
"""
Synthesize all agent results into a comprehensive final answer.
Args:
original_task (str): The original user task
questions (Dict): Generated questions
agent_results (Dict): Results from all agents
Returns:
str: Comprehensive synthesized analysis
"""
# Get the cached agents
agents = self.create_agents()
synthesis_agent = agents["synthesis"]
agents_names = [
"Research Agent",
"Analysis Agent",
"Alternatives Agent",
"Verification Agent",
]
# Create comprehensive synthesis prompt
synthesis_prompt = f"""
You are an expert synthesis agent tasked with producing a clear, actionable, and executive-ready report based on the following task and the results from four specialized agents (Research, Analysis, Alternatives, Verification).
Original Task:
{original_task}
Your objectives:
- Integrate and synthesize insights from all four agents {", ".join(agents_names)}, highlighting how each contributes to the overall understanding.
- Identify and explain key themes, patterns, and any points of agreement or disagreement across the agents' findings.
- Provide clear, prioritized, and actionable recommendations directly addressing the original task.
- Explicitly discuss potential risks, limitations, and propose mitigation strategies.
- Offer practical implementation guidance and concrete next steps.
- Ensure the report is well-structured, concise, and suitable for decision-makers (executive summary style).
- Use bullet points, numbered lists, and section headings where appropriate for clarity and readability.
You may reference the conversation history for additional context:
\n\n
{self.conversation.return_history_as_string()}
\n\n
Please present your synthesis in the following structure:
1. Executive Summary
2. Key Insights from Each Agent
3. Integrated Analysis & Themes
4. Actionable Recommendations
5. Risks & Mitigation Strategies
6. Implementation Guidance & Next Steps
Be thorough, objective, and ensure your synthesis is easy to follow for a non-technical audience.
"""
return synthesis_agent.run(synthesis_prompt)
def _parse_tool_calls(self, tool_calls: List) -> Dict[str, any]:
"""
Parse ChatCompletionMessageToolCall objects into a structured dictionary format.
This method extracts and structures the question generation results from language model
tool calls. It handles the JSON parsing of function arguments and provides clean access
to the generated questions for each specialized agent role.
The method specifically looks for the 'generate_specialized_questions' function call
and extracts the four specialized questions along with metadata. It provides robust
error handling for JSON parsing failures and includes both successful and error cases.
Args:
tool_calls (List): List of ChatCompletionMessageToolCall objects returned by the LLM.
Expected to contain at least one tool call with question generation results.
Returns:
Dict[str, any]: Structured dictionary containing:
On success:
- thinking (str): Reasoning process for question decomposition
- research_question (str): Question for Research Agent
- analysis_question (str): Question for Analysis Agent
- alternatives_question (str): Question for Alternatives Agent
- verification_question (str): Question for Verification Agent
- tool_call_id (str): Unique identifier for the tool call
- function_name (str): Name of the called function
On error:
- error (str): Error message describing the parsing failure
- raw_arguments (str): Original unparsed function arguments
- tool_call_id (str): Tool call identifier for debugging
- function_name (str): Function name for debugging
Note:
If no tool calls are provided, returns an empty dictionary.
Only the first tool call is processed, as only one question generation
call is expected per task.
"""
if not tool_calls:
return {}
# Get the first tool call (should be the question generation)
tool_call = tool_calls[0]
try:
# Parse the JSON arguments
arguments = json.loads(tool_call.function.arguments)
return {
"thinking": arguments.get("thinking", ""),
"research_question": arguments.get(
"research_question", ""
),
"analysis_question": arguments.get(
"analysis_question", ""
),
"alternatives_question": arguments.get(
"alternatives_question", ""
),
"verification_question": arguments.get(
"verification_question", ""
),
"tool_call_id": tool_call.id,
"function_name": tool_call.function.name,
}
except json.JSONDecodeError as e:
return {
"error": f"Failed to parse tool call arguments: {str(e)}",
"raw_arguments": tool_call.function.arguments,
"tool_call_id": tool_call.id,
"function_name": tool_call.function.name,
}
def execute_question_generation(
self, task: str
) -> Dict[str, str]:
"""
Execute the question generation using the schema with a language model.
Args:
task (str): The main task to analyze
Returns:
Dict[str, str]: Generated questions for each agent role with parsed data
"""
# Create the prompt for question generation
prompt = f"""
System: Technical task analyzer. Generate 4 non-overlapping analytical questions via function tool.
Roles:
- Research: systematic evidence collection, source verification, data quality assessment
- Analysis: statistical analysis, pattern recognition, quantitative insights, correlation analysis
- Alternatives: strategic option generation, multi-criteria analysis, scenario planning, decision modeling
- Verification: systematic validation, risk assessment, feasibility analysis, logical consistency
Requirements:
- Each question ≤30 words, technically precise, action-oriented
- No duplication across roles. No meta text in questions
- Ambiguity notes only in "thinking" field (≤40 words)
- Focus on systematic methodology and quantitative analysis
Task: {task}
Use generate_specialized_questions function only.
"""
question_agent = LiteLLM(
system_prompt=prompt,
model=self.question_agent_model_name,
tools_list_dictionary=schema,
max_tokens=3000,
temperature=0.7,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
tool_choice="auto",
)
# Get raw tool calls from LiteLLM
raw_output = question_agent.run(task)
# Parse the tool calls and return clean data
out = self._parse_tool_calls(raw_output)
if self.verbose:
logger.info(
f"🔍 Question Generation Output: {out} and type: {type(out)}"
)
return out
def get_questions_only(self, task: str) -> Dict[str, str]:
"""
Generate and extract only the specialized questions without metadata or execution.
This utility method provides a clean interface for obtaining just the generated
questions for each agent role without executing the full swarm workflow. It's
useful for previewing questions, debugging question generation, or integrating
with external systems that only need the questions.
The method performs question generation using the configured question agent model
and returns a clean dictionary containing only the four specialized questions,
filtering out metadata like thinking process, tool call IDs, and function names.
Args:
task (str): The main task or query to analyze and decompose into specialized
questions. Should be a clear, specific task description.
Returns:
Dict[str, str]: Clean dictionary containing only the questions:
- research_question (str): Question for comprehensive information gathering
- analysis_question (str): Question for pattern analysis and insights
- alternatives_question (str): Question for exploring creative solutions
- verification_question (str): Question for validation and feasibility
On error:
- error (str): Error message if question generation fails
Example:
>>> swarm = HeavySwarm()
>>> questions = swarm.get_questions_only("Analyze market trends for EVs")
>>> print(questions['research_question'])
"""
result = self.execute_question_generation(task)
if "error" in result:
return {"error": result["error"]}
return {
"research_question": result.get("research_question", ""),
"analysis_question": result.get("analysis_question", ""),
"alternatives_question": result.get(
"alternatives_question", ""
),
"verification_question": result.get(
"verification_question", ""
),
}
def get_questions_as_list(self, task: str) -> List[str]:
"""
Generate specialized questions and return them as an ordered list.
This utility method provides the simplest interface for obtaining generated questions
in a list format. It's particularly useful for iteration, display purposes, or
integration with systems that prefer list-based data structures over dictionaries.
The questions are returned in a consistent order:
1. Research question (information gathering)
2. Analysis question (pattern recognition and insights)
3. Alternatives question (creative solutions exploration)
4. Verification question (validation and feasibility)
Args:
task (str): The main task or query to decompose into specialized questions.
Should be a clear, actionable task description that can be analyzed
from multiple perspectives.
Returns:
List[str]: Ordered list of 4 specialized questions:
[0] Research question for comprehensive information gathering
[1] Analysis question for pattern analysis and insights
[2] Alternatives question for exploring creative solutions
[3] Verification question for validation and feasibility assessment
On error: Single-item list containing error message
Example:
>>> swarm = HeavySwarm()
>>> questions = swarm.get_questions_as_list("Optimize supply chain efficiency")
>>> for i, question in enumerate(questions):
... print(f"Agent {i+1}: {question}")
Note:
This method internally calls get_questions_only() and converts the dictionary
to a list format, maintaining the standard agent order.
"""
questions = self.get_questions_only(task)
if "error" in questions:
return [f"Error: {questions['error']}"]
return [
questions.get("research_question", ""),
questions.get("analysis_question", ""),
questions.get("alternatives_question", ""),
questions.get("verification_question", ""),
]