Update heavy_swarm.py

pull/1089/head
CI-DEV 2 weeks ago committed by GitHub
parent 145443a5e0
commit ab4391c103
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -10,22 +10,15 @@ from typing import Dict, List, Optional
from loguru import logger
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
from rich.progress import (Progress, SpinnerColumn, TextColumn,
TimeElapsedColumn)
from rich.table import Table
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.tools.tool_type import tool_type
from swarms.utils.formatter import formatter
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.history_output_formatter import history_output_formatter
from swarms.utils.litellm_wrapper import LiteLLM
from swarms.tools.tool_type import tool_type
RESEARCH_AGENT_PROMPT = """
You are an expert Research Agent with exceptional capabilities in:
@ -144,7 +137,8 @@ You approach analysis with:
- Actionable insights
- Clear communication
Provide precise, data-driven analysis with clear implications and recommendations."""
Provide precise, data-driven analysis with clear implications and
recommendations."""
ALTERNATIVES_AGENT_PROMPT = """
You are an expert Alternatives Agent with exceptional capabilities in:
@ -218,7 +212,8 @@ You approach alternatives generation with:
- Implementation focus
- Value optimization
Provide innovative, practical, and well-evaluated alternative approaches and solutions.
Provide innovative, practical, and well-evaluated alternative approaches
and solutions.
"""
@ -304,7 +299,8 @@ You approach verification with:
- Quality focus
- Practical realism
Provide thorough, objective verification with clear feasibility assessments and risk evaluations."""
Provide thorough, objective verification with clear feasibility
assessments and risk evaluations."""
SYNTHESIS_AGENT_PROMPT = """
You are an expert Synthesis Agent with advanced capabilities in:
@ -378,35 +374,54 @@ You approach synthesis with:
- Value optimization
- Implementation focus
Provide comprehensive, integrated analysis with clear, actionable recommendations and detailed implementation guidance."""
Provide comprehensive, integrated analysis with clear, actionable
recommendations and detailed implementation guidance."""
schema = {
"type": "function",
"function": {
"name": "generate_specialized_questions",
"description": "Generate 4 specialized questions for different agent roles to comprehensively analyze a given task",
"description": (
"Generate 4 specialized questions for different agent roles to "
"comprehensively analyze a given task"
),
"parameters": {
"type": "object",
"properties": {
"thinking": {
"type": "string",
"description": "Your reasoning process for how to break down this task into 4 specialized questions for different agent roles",
"description": (
"Your reasoning process for how to break down this task "
"into 4 specialized questions for different agent roles"
),
},
"research_question": {
"type": "string",
"description": "A detailed research question for the Research Agent to gather comprehensive background information and data",
"description": (
"A detailed research question for the Research Agent to "
"gather comprehensive background information and data"
),
},
"analysis_question": {
"type": "string",
"description": "An analytical question for the Analysis Agent to examine patterns, trends, and insights",
"description": (
"An analytical question for the Analysis Agent to examine "
"patterns, trends, and insights"
),
},
"alternatives_question": {
"type": "string",
"description": "A strategic question for the Alternatives Agent to explore different approaches, options, and solutions",
"description": (
"A strategic question for the Alternatives Agent to explore "
"different approaches, options, and solutions"
),
},
"verification_question": {
"type": "string",
"description": "A verification question for the Verification Agent to validate findings, check accuracy, and assess feasibility",
"description": (
"A verification question for the Verification Agent to "
"validate findings, check accuracy, and assess feasibility"
),
},
},
"required": [
@ -425,9 +440,11 @@ schema = [schema]
class HeavySwarm:
"""
HeavySwarm is a sophisticated multi-agent orchestration system that decomposes complex tasks
into specialized questions and executes them using four specialized agents: Research, Analysis,
Alternatives, and Verification. The results are then synthesized into a comprehensive response.
HeavySwarm is a sophisticated multi-agent orchestration system that
decomposes complex tasks into specialized questions and executes them
using four specialized agents: Research, Analysis, Alternatives, and
Verification. The results are then synthesized into a comprehensive
response.
This swarm architecture provides robust task analysis through:
- Intelligent question generation for specialized agent roles
@ -445,12 +462,13 @@ class HeavySwarm:
5. Optional iterative refinement through multiple loops
Key Features:
- **Multi-loop Execution**: The max_loops parameter enables iterative refinement where
each subsequent loop builds upon the context and results from previous loops
- **Context Preservation**: Conversation history is maintained across all loops,
allowing for deeper analysis and refinement
- **Iterative Refinement**: Each loop can refine, improve, or complete aspects
of the analysis based on previous results
- **Multi-loop Execution**: The max_loops parameter enables iterative
refinement where each subsequent loop builds upon the context and
results from previous loops
- **Context Preservation**: Conversation history is maintained across
all loops, allowing for deeper analysis and refinement
- **Iterative Refinement**: Each loop can refine, improve, or complete
aspects of the analysis based on previous results
Attributes:
name (str): Name identifier for the swarm instance
@ -485,7 +503,10 @@ class HeavySwarm:
def __init__(
self,
name: str = "HeavySwarm",
description: str = "A swarm of agents that can analyze a task and generate specialized questions for each agent role",
description: str = (
"A swarm of agents that can analyze a task and generate "
"specialized questions for each agent role"
),
timeout: int = 300,
aggregation_strategy: str = "synthesis",
loops_per_agent: int = 1,
@ -512,27 +533,28 @@ class HeavySwarm:
timeout (int, optional): Maximum execution time per agent in seconds. Defaults to 300.
aggregation_strategy (str, optional): Strategy for aggregating results. Currently only
'synthesis' is supported. Defaults to "synthesis".
loops_per_agent (int, optional): Number of execution loops each agent should perform.
Must be greater than 0. Defaults to 1.
question_agent_model_name (str, optional): Language model for question generation.
Defaults to "gpt-4o-mini".
worker_model_name (str, optional): Language model for specialized worker agents.
Defaults to "gpt-4o-mini".
loops_per_agent (int, optional): Number of execution loops each
agent should perform. Must be greater than 0. Defaults to 1.
question_agent_model_name (str, optional): Language model for
question generation. Defaults to "gpt-4o-mini".
worker_model_name (str, optional): Language model for specialized
worker agents. Defaults to "gpt-4o-mini".
verbose (bool, optional): Enable detailed logging and debug output. Defaults to False.
max_workers (int, optional): Maximum concurrent workers for parallel execution.
Defaults to 90% of CPU count.
show_dashboard (bool, optional): Enable rich dashboard with progress visualization.
Defaults to False.
agent_prints_on (bool, optional): Enable individual agent output printing.
Defaults to False.
output_type (str, optional): Output format type for conversation history.
Defaults to "dict-all-except-first".
worker_tools (tool_type, optional): Tools available to worker agents for enhanced functionality.
Defaults to None.
random_loops_per_agent (bool, optional): Enable random number of loops per agent (1-10 range).
Defaults to False.
max_loops (int, optional): Maximum number of execution loops for the entire swarm.
Each loop builds upon previous results for iterative refinement. Defaults to 1.
max_workers (int, optional): Maximum concurrent workers for
parallel execution. Defaults to 90% of CPU count.
show_dashboard (bool, optional): Enable rich dashboard with
progress visualization. Defaults to False.
agent_prints_on (bool, optional): Enable individual agent
output printing. Defaults to False.
output_type (str, optional): Output format type for conversation
history. Defaults to "dict-all-except-first".
worker_tools (tool_type, optional): Tools available to worker
agents for enhanced functionality. Defaults to None.
random_loops_per_agent (bool, optional): Enable random number of
loops per agent (1-10 range). Defaults to False.
max_loops (int, optional): Maximum number of execution loops for
the entire swarm. Each loop builds upon previous results for
iterative refinement. Defaults to 1.
Raises:
ValueError: If loops_per_agent is 0 or negative
@ -619,17 +641,11 @@ class HeavySwarm:
info_table.add_row("Swarm Name", self.name)
info_table.add_row("Description", self.description)
info_table.add_row("Timeout", f"{self.timeout}s")
info_table.add_row(
"Loops per Agent", str(self.loops_per_agent)
)
info_table.add_row(
"Question Model", self.question_agent_model_name
)
info_table.add_row("Loops per Agent", str(self.loops_per_agent))
info_table.add_row("Question Model", self.question_agent_model_name)
info_table.add_row("Worker Model", self.worker_model_name)
info_table.add_row("Max Workers", str(self.max_workers))
info_table.add_row(
"Aggregation Strategy", self.aggregation_strategy
)
info_table.add_row("Aggregation Strategy", self.aggregation_strategy)
# Display dashboard with professional Arasaka styling
self.console.print(
@ -670,9 +686,7 @@ class HeavySwarm:
if self.show_dashboard:
with Progress(
SpinnerColumn(),
TextColumn(
"[progress.description]{task.description}"
),
TextColumn("[progress.description]{task.description}"),
transient=True,
console=self.console,
) as progress:
@ -831,7 +845,9 @@ class HeavySwarm:
"Build upon the insights from the previous loop to provide deeper analysis."
)
if self.verbose:
logger.info("Subsequent loop: Building upon previous results")
logger.info(
"Subsequent loop: Building upon previous results"
)
# Question generation with dashboard
try:
@ -849,7 +865,9 @@ class HeavySwarm:
total=100,
)
progress.update(task_gen, advance=30)
questions = self.execute_question_generation(loop_task)
questions = self.execute_question_generation(
loop_task
)
progress.update(
task_gen,
advance=70,
@ -857,7 +875,9 @@ class HeavySwarm:
)
time.sleep(0.5)
else:
questions = self.execute_question_generation(loop_task)
questions = self.execute_question_generation(
loop_task
)
self.conversation.add(
role="Question Generator Agent",
@ -889,7 +909,9 @@ class HeavySwarm:
)
agent_results = self._execute_agents_parallel(
questions=questions, agents=self.agents, img=img
questions=questions,
agents=self.agents,
img=img,
)
except Exception as e:
@ -979,7 +1001,9 @@ class HeavySwarm:
logger.success("Task iteration completed successfully")
except Exception as e:
error_msg = f"Failed to execute loop {current_loop + 1}: {str(e)}"
error_msg = (
f"Failed to execute loop {current_loop + 1}: {str(e)}"
)
logger.error(error_msg)
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
@ -1005,7 +1029,6 @@ class HeavySwarm:
if self.verbose:
logger.success("HeavySwarm execution completed")
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
@ -1165,9 +1188,7 @@ class HeavySwarm:
"""
if self.show_dashboard:
return self._execute_agents_with_dashboard(
questions, agents, img
)
return self._execute_agents_with_dashboard(questions, agents, img)
else:
return self._execute_agents_basic(questions, agents, img)
@ -1265,14 +1286,10 @@ class HeavySwarm:
}
# Collect results as they complete
for future in concurrent.futures.as_completed(
future_to_agent
):
for future in concurrent.futures.as_completed(future_to_agent):
agent_type = future_to_agent[future]
try:
agent_name, result = future.result(
timeout=self.timeout
)
agent_name, result = future.result(timeout=self.timeout)
results[agent_name.lower()] = result
except concurrent.futures.TimeoutError:
logger.error(
@ -1285,9 +1302,7 @@ class HeavySwarm:
logger.error(
f"❌ Exception in {agent_type} Agent: {str(e)}"
)
results[agent_type.lower()] = (
f"Exception: {str(e)}"
)
results[agent_type.lower()] = f"Exception: {str(e)}"
return results
@ -1478,16 +1493,12 @@ class HeavySwarm:
) as executor:
# Submit all agent tasks
future_to_agent = {
executor.submit(
execute_agent_with_progress, task
): task[1]
executor.submit(execute_agent_with_progress, task): task[1]
for task in agent_tasks
}
# Collect results as they complete
for future in concurrent.futures.as_completed(
future_to_agent
):
for future in concurrent.futures.as_completed(future_to_agent):
agent_key = future_to_agent[future]
try:
agent_name, result = future.result(
@ -1640,12 +1651,8 @@ class HeavySwarm:
return {
"thinking": arguments.get("thinking", ""),
"research_question": arguments.get(
"research_question", ""
),
"analysis_question": arguments.get(
"analysis_question", ""
),
"research_question": arguments.get("research_question", ""),
"analysis_question": arguments.get("analysis_question", ""),
"alternatives_question": arguments.get(
"alternatives_question", ""
),
@ -1664,9 +1671,7 @@ class HeavySwarm:
"function_name": tool_call.function.name,
}
def execute_question_generation(
self, task: str
) -> Dict[str, str]:
def execute_question_generation(self, task: str) -> Dict[str, str]:
"""
Execute the question generation using the schema with a language model.
@ -1756,12 +1761,8 @@ class HeavySwarm:
return {
"research_question": result.get("research_question", ""),
"analysis_question": result.get("analysis_question", ""),
"alternatives_question": result.get(
"alternatives_question", ""
),
"verification_question": result.get(
"verification_question", ""
),
"alternatives_question": result.get("alternatives_question", ""),
"verification_question": result.get("verification_question", ""),
}
def get_questions_as_list(self, task: str) -> List[str]:

Loading…
Cancel
Save