Add files via upload

pull/1003/head
CI-DEV 1 month ago committed by GitHub
parent 5f6f8c00e6
commit 9332a256aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,690 @@
"""
Talk Structurally, Act Hierarchically: A Collaborative Framework for LLM Multi-Agent Systems
This implementation is based on the research paper:
"Talk Structurally, Act Hierarchically: A Collaborative Framework for LLM Multi-Agent Systems"
arXiv:2502.11098
The framework consists of:
1. Structured Communication Protocol - Context-rich communication with message, background, and intermediate output
2. Hierarchical Refinement System - Evaluation team with supervisor coordination
3. Graph-based Agent Orchestration - Dynamic communication pathways
Key Features:
- Structured communication with Message (M_ij), Background (B_ij), and Intermediate Output (I_ij)
- Hierarchical evaluation team with supervisor coordination
- Dynamic graph-based agent routing
- Context preservation and shared memory
"""
import traceback
from typing import Any, Callable, Dict, List, Literal, Optional, Union
from dataclasses import dataclass
from enum import Enum
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
logger = initialize_logger(log_folder="hierarchical_structured_communication_swarm")
class CommunicationType(str, Enum):
"""Types of communication in the structured protocol"""
MESSAGE = "message" # M_ij: Specific task instructions
BACKGROUND = "background" # B_ij: Context and problem background
INTERMEDIATE_OUTPUT = "intermediate_output" # I_ij: Intermediate results
class AgentRole(str, Enum):
"""Roles for agents in the hierarchical system"""
SUPERVISOR = "supervisor"
GENERATOR = "generator"
EVALUATOR = "evaluator"
REFINER = "refiner"
COORDINATOR = "coordinator"
@dataclass
class StructuredMessage:
"""Structured communication message following HierarchicalStructuredComm protocol"""
message: str = Field(description="Specific task instructions (M_ij)")
background: str = Field(description="Context and problem background (B_ij)")
intermediate_output: str = Field(description="Intermediate results (I_ij)")
sender: str = Field(description="Name of the sending agent")
recipient: str = Field(description="Name of the receiving agent")
timestamp: Optional[str] = None
class HierarchicalOrder(BaseModel):
"""Order structure for hierarchical task assignment"""
agent_name: str = Field(description="Name of the agent to receive the task")
task: str = Field(description="Specific task description")
communication_type: CommunicationType = Field(
default=CommunicationType.MESSAGE,
description="Type of communication to use"
)
background_context: str = Field(
default="",
description="Background context for the task"
)
intermediate_output: str = Field(
default="",
description="Intermediate output to pass along"
)
class EvaluationResult(BaseModel):
"""Result from evaluation team member"""
evaluator_name: str = Field(description="Name of the evaluator")
criterion: str = Field(description="Evaluation criterion")
score: float = Field(description="Evaluation score")
feedback: str = Field(description="Detailed feedback")
confidence: float = Field(description="Confidence in evaluation")
class HierarchicalStructuredCommunicationSwarm(BaseSwarm):
"""
Talk Structurally, Act Hierarchically: A Collaborative Framework for LLM Multi-Agent Systems
This framework implements the HierarchicalStructuredComm approach with:
1. Structured Communication Protocol
2. Hierarchical Refinement System
3. Graph-based Agent Orchestration
Architecture:
- Supervisor Agent: Coordinates the overall workflow
- Generator Agents: Create initial content/solutions
- Evaluator Team: Hierarchical evaluation with supervisor
- Refiner Agents: Improve solutions based on feedback
"""
def __init__(
self,
name: str = "HierarchicalStructuredCommunicationSwarm",
description: str = "Talk Structurally, Act Hierarchically Framework",
supervisor: Optional[Union[Agent, Callable, Any]] = None,
generators: List[Union[Agent, Callable, Any]] = None,
evaluators: List[Union[Agent, Callable, Any]] = None,
refiners: List[Union[Agent, Callable, Any]] = None,
evaluation_supervisor: Optional[Union[Agent, Callable, Any]] = None,
max_loops: int = 3,
output_type: OutputType = "dict-all-except-first",
supervisor_name: str = "Supervisor",
evaluation_supervisor_name: str = "EvaluationSupervisor",
verbose: bool = False,
enable_structured_communication: bool = True,
enable_hierarchical_evaluation: bool = True,
shared_memory: bool = True,
*args,
**kwargs,
):
"""
Initialize the HierarchicalStructuredCommunicationSwarm
Args:
name: Name of the swarm
description: Description of the swarm
supervisor: Main supervisor agent
generators: List of generator agents
evaluators: List of evaluator agents
refiners: List of refiner agents
evaluation_supervisor: Supervisor for evaluation team
max_loops: Maximum number of refinement loops
output_type: Type of output format
supervisor_name: Name for the supervisor agent
evaluation_supervisor_name: Name for evaluation supervisor
verbose: Enable verbose logging
enable_structured_communication: Enable structured communication protocol
enable_hierarchical_evaluation: Enable hierarchical evaluation system
shared_memory: Enable shared memory between agents
"""
# Initialize the swarm components first
self.name = name
self.description = description
self.supervisor = supervisor
self.generators = generators or []
self.evaluators = evaluators or []
self.refiners = refiners or []
self.evaluation_supervisor = evaluation_supervisor
self.max_loops = max_loops
self.output_type = output_type
self.supervisor_name = supervisor_name
self.evaluation_supervisor_name = evaluation_supervisor_name
self.verbose = verbose
self.enable_structured_communication = enable_structured_communication
self.enable_hierarchical_evaluation = enable_hierarchical_evaluation
self.shared_memory = shared_memory
# Communication and state management
self.conversation_history: List[StructuredMessage] = []
self.intermediate_outputs: Dict[str, str] = {}
self.evaluation_results: List[EvaluationResult] = []
# Initialize the swarm components
self.init_swarm()
# Collect all agents for the parent class
all_agents = []
if self.supervisor:
all_agents.append(self.supervisor)
all_agents.extend(self.generators)
all_agents.extend(self.evaluators)
all_agents.extend(self.refiners)
if self.evaluation_supervisor:
all_agents.append(self.evaluation_supervisor)
# Call parent constructor with agents
super().__init__(agents=all_agents, *args, **kwargs)
def init_swarm(self):
"""Initialize the swarm components"""
logger.info(f"Initializing {self.name}")
# Setup supervisor if not provided
if self.supervisor is None:
self.supervisor = self._create_supervisor_agent()
# Setup evaluation supervisor if not provided
if self.evaluation_supervisor is None and self.enable_hierarchical_evaluation:
self.evaluation_supervisor = self._create_evaluation_supervisor_agent()
# Setup default agents if none provided
if not self.generators:
self.generators = [self._create_default_generator()]
if not self.evaluators and self.enable_hierarchical_evaluation:
self.evaluators = [self._create_default_evaluator()]
if not self.refiners:
self.refiners = [self._create_default_refiner()]
logger.info(f"Swarm initialized with {len(self.generators)} generators, "
f"{len(self.evaluators)} evaluators, {len(self.refiners)} refiners")
def _create_supervisor_agent(self) -> Agent:
"""Create the main supervisor agent"""
supervisor_prompt = self._get_supervisor_prompt()
return Agent(
agent_name=self.supervisor_name,
system_prompt=supervisor_prompt,
model_name="gpt-4o-mini",
verbose=self.verbose,
# Ollama configuration
openai_api_base="http://localhost:11434/v1",
openai_api_key="ollama",
reliability_check=False
)
def _create_evaluation_supervisor_agent(self) -> Agent:
"""Create the evaluation team supervisor"""
eval_supervisor_prompt = self._get_evaluation_supervisor_prompt()
return Agent(
agent_name=self.evaluation_supervisor_name,
system_prompt=eval_supervisor_prompt,
model_name="gpt-4o-mini",
verbose=self.verbose,
# Ollama configuration
openai_api_base="http://localhost:11434/v1",
openai_api_key="ollama",
reliability_check=False
)
def _create_default_generator(self) -> Agent:
"""Create a default generator agent"""
generator_prompt = self._get_generator_prompt()
return Agent(
agent_name="Generator",
system_prompt=generator_prompt,
model_name="gpt-4o-mini",
verbose=self.verbose,
# Ollama configuration
openai_api_base="http://localhost:11434/v1",
openai_api_key="ollama",
reliability_check=False
)
def _create_default_evaluator(self) -> Agent:
"""Create a default evaluator agent"""
evaluator_prompt = self._get_evaluator_prompt()
return Agent(
agent_name="Evaluator",
system_prompt=evaluator_prompt,
model_name="gpt-4o-mini",
verbose=self.verbose,
# Ollama configuration
openai_api_base="http://localhost:11434/v1",
openai_api_key="ollama",
reliability_check=False
)
def _create_default_refiner(self) -> Agent:
"""Create a default refiner agent"""
refiner_prompt = self._get_refiner_prompt()
return Agent(
agent_name="Refiner",
system_prompt=refiner_prompt,
model_name="gpt-4o-mini",
verbose=self.verbose,
# Ollama configuration
openai_api_base="http://localhost:11434/v1",
openai_api_key="ollama",
reliability_check=False
)
def _get_supervisor_prompt(self) -> str:
"""Get the supervisor system prompt"""
return f"""
You are the {self.supervisor_name} in a Talk Structurally, Act Hierarchically framework.
Your responsibilities:
1. **Structured Communication**: Use the structured communication protocol with:
- Message (M_ij): Specific task instructions
- Background (B_ij): Context and problem background
- Intermediate Output (I_ij): Intermediate results
2. **Task Orchestration**: Coordinate between generator, evaluator, and refiner agents
3. **Workflow Management**: Manage the iterative refinement process
Available agents:
- Generators: {[agent.agent_name if hasattr(agent, 'agent_name') else 'Generator' for agent in self.generators]}
- Evaluators: {[agent.agent_name if hasattr(agent, 'agent_name') else 'Evaluator' for agent in self.evaluators]}
- Refiners: {[agent.agent_name if hasattr(agent, 'agent_name') else 'Refiner' for agent in self.refiners]}
Always provide structured communication with clear message, background context, and intermediate outputs.
"""
def _get_evaluation_supervisor_prompt(self) -> str:
"""Get the evaluation supervisor system prompt"""
return f"""
You are the {self.evaluation_supervisor_name} in a hierarchical evaluation system.
Your responsibilities:
1. **Coordinate Evaluators**: Manage multiple evaluators with different criteria
2. **Synthesize Feedback**: Combine evaluation results into coherent feedback
3. **Provide Summarized Results**: Give concise, actionable feedback to the main supervisor
Evaluation criteria to coordinate:
- Accuracy and correctness
- Completeness and coverage
- Clarity and coherence
- Relevance and appropriateness
Always provide summarized, coordinated feedback that balances diverse evaluator inputs.
"""
def _get_generator_prompt(self) -> str:
"""Get the generator agent system prompt"""
return """
You are a Generator agent in a Talk Structurally, Act Hierarchically framework.
Your responsibilities:
1. **Create Initial Content**: Generate solutions, content, or responses based on structured input
2. **Follow Structured Communication**: Process messages with clear background context and intermediate outputs
3. **Provide Detailed Output**: Generate comprehensive, well-structured responses
When receiving tasks:
- Pay attention to the specific message (M_ij)
- Consider the background context (B_ij)
- Build upon intermediate outputs (I_ij) if provided
- Provide your own intermediate output for the next agent
Always structure your response clearly and provide sufficient detail for evaluation.
"""
def _get_evaluator_prompt(self) -> str:
"""Get the evaluator agent system prompt"""
return """
You are an Evaluator agent in a hierarchical evaluation system.
Your responsibilities:
1. **Evaluate Content**: Assess quality, accuracy, and appropriateness
2. **Provide Specific Feedback**: Give detailed, actionable feedback
3. **Score Performance**: Provide numerical scores with justification
Evaluation criteria:
- Accuracy and correctness
- Completeness and coverage
- Clarity and coherence
- Relevance and appropriateness
Always provide:
- Specific evaluation criterion
- Numerical score (0-10)
- Detailed feedback
- Confidence level (0-1)
"""
def _get_refiner_prompt(self) -> str:
"""Get the refiner agent system prompt"""
return """
You are a Refiner agent in a Talk Structurally, Act Hierarchically framework.
Your responsibilities:
1. **Improve Content**: Enhance solutions based on evaluation feedback
2. **Address Feedback**: Specifically address issues identified by evaluators
3. **Maintain Quality**: Ensure improvements maintain or enhance overall quality
When refining:
- Consider all evaluation feedback
- Address specific issues mentioned
- Maintain the core strengths of the original
- Provide clear justification for changes
Always explain your refinements and how they address the evaluation feedback.
"""
def send_structured_message(
self,
sender: str,
recipient: str,
message: str,
background: str = "",
intermediate_output: str = ""
) -> StructuredMessage:
"""
Send a structured message following the HierarchicalStructuredComm protocol
Args:
sender: Name of the sending agent
recipient: Name of the receiving agent
message: Specific task message (M_ij)
background: Background context (B_ij)
intermediate_output: Intermediate output (I_ij)
Returns:
StructuredMessage object
"""
structured_msg = StructuredMessage(
message=message,
background=background,
intermediate_output=intermediate_output,
sender=sender,
recipient=recipient
)
self.conversation_history.append(structured_msg)
if self.verbose:
logger.info(f"Structured message sent from {sender} to {recipient}")
logger.info(f"Message: {message[:100]}...")
return structured_msg
def run_hierarchical_evaluation(
self,
content: str,
evaluation_criteria: List[str] = None
) -> List[EvaluationResult]:
"""
Run hierarchical evaluation with multiple evaluators
Args:
content: Content to evaluate
evaluation_criteria: List of evaluation criteria
Returns:
List of evaluation results
"""
if not self.enable_hierarchical_evaluation:
return []
if evaluation_criteria is None:
evaluation_criteria = ["accuracy", "completeness", "clarity", "relevance"]
results = []
# Run evaluations in parallel
for i, evaluator in enumerate(self.evaluators):
criterion = evaluation_criteria[i % len(evaluation_criteria)]
# Create structured message for evaluator
eval_message = f"Evaluate the following content based on {criterion} criterion"
eval_background = f"Evaluation criterion: {criterion}\nContent to evaluate: {content}"
structured_msg = self.send_structured_message(
sender=self.evaluation_supervisor_name,
recipient=evaluator.agent_name if hasattr(evaluator, 'agent_name') else f"Evaluator_{i}",
message=eval_message,
background=eval_background,
intermediate_output=content
)
# Get evaluation result
try:
if hasattr(evaluator, 'run'):
eval_response = evaluator.run(
f"Evaluate this content for {criterion}:\n{content}\n\nProvide: 1) Score (0-10), 2) Detailed feedback, 3) Confidence (0-1)"
)
# Parse evaluation result (simplified parsing)
result = EvaluationResult(
evaluator_name=evaluator.agent_name if hasattr(evaluator, 'agent_name') else f"Evaluator_{i}",
criterion=criterion,
score=7.5, # Default score, would need proper parsing
feedback=eval_response,
confidence=0.8 # Default confidence
)
results.append(result)
except Exception as e:
logger.error(f"Error in evaluation: {e}")
continue
# Get summarized feedback from evaluation supervisor
if self.evaluation_supervisor and results:
summary_prompt = f"Summarize these evaluation results:\n{results}\n\nProvide coordinated, actionable feedback."
try:
if hasattr(self.evaluation_supervisor, 'run'):
summary_feedback = self.evaluation_supervisor.run(summary_prompt)
logger.info(f"Evaluation summary: {summary_feedback}")
except Exception as e:
logger.error(f"Error in evaluation summary: {e}")
self.evaluation_results.extend(results)
return results
def step(self, task: str, img: str = None, *args, **kwargs):
"""
Execute one step of the HierarchicalStructuredComm workflow
Args:
task: Task to execute
img: Optional image input
Returns:
Step result
"""
try:
logger.info(f"Executing HierarchicalStructuredComm step for task: {task[:100]}...")
# Safety check: prevent recursive task processing
if len(task) > 1000: # If task is too long, it might be recursive
logger.warning("Task too long, possible recursive call detected")
return {"error": "Task too long, possible recursive call"}
# Step 1: Generate initial content
generator_result = self._generate_content(task)
# Safety check: prevent empty or error results
if not generator_result or generator_result.startswith("Error"):
logger.error(f"Generator failed: {generator_result}")
return {"error": f"Generator failed: {generator_result}"}
# Step 2: Evaluate content hierarchically
evaluation_results = self.run_hierarchical_evaluation(generator_result)
# Step 3: Refine content based on evaluation
refined_result = self._refine_content(generator_result, evaluation_results)
# Safety check: ensure we have a valid result
if not refined_result:
refined_result = generator_result
return {
"generator_result": generator_result,
"evaluation_results": evaluation_results,
"refined_result": refined_result,
"conversation_history": self.conversation_history
}
except Exception as e:
logger.error(f"Error in HierarchicalStructuredComm step: {e}")
logger.error(traceback.format_exc())
return {"error": str(e)}
def _generate_content(self, task: str) -> str:
"""Generate initial content using generator agents"""
if not self.generators:
return "No generators available"
# Use first generator for initial content
generator = self.generators[0]
# Create structured message
message = f"Generate content for the following task: {task}"
background = f"Task context: {task}\n\nProvide comprehensive, well-structured content."
structured_msg = self.send_structured_message(
sender=self.supervisor_name,
recipient=generator.agent_name if hasattr(generator, 'agent_name') else "Generator",
message=message,
background=background
)
try:
if hasattr(generator, 'run'):
# Add a simple, focused prompt to prevent recursive calls
prompt = f"Task: {task}\n\nGenerate a clear, concise response. Do not repeat the task or ask for clarification."
result = generator.run(prompt)
# Safety check: prevent recursive or overly long responses
if len(result) > 2000:
result = result[:2000] + "... [truncated]"
# Safety check: prevent responses that just repeat the task
if task.lower() in result.lower() and len(result) < len(task) * 2:
logger.warning("Generator response appears to be recursive")
return "Error: Generator produced recursive response"
self.intermediate_outputs["generator"] = result
return result
else:
return "Generator not properly configured"
except Exception as e:
logger.error(f"Error in content generation: {e}")
return f"Error generating content: {e}"
def _refine_content(self, original_content: str, evaluation_results: List[EvaluationResult]) -> str:
"""Refine content based on evaluation feedback"""
if not self.refiners:
return original_content
if not evaluation_results:
return original_content
# Use first refiner
refiner = self.refiners[0]
# Create feedback summary
feedback_summary = "\n".join([
f"{result.criterion}: {result.feedback} (Score: {result.score}/10)"
for result in evaluation_results
])
# Create structured message for refinement
message = "Refine the content based on the evaluation feedback"
background = f"Original content: {original_content}\n\nEvaluation feedback:\n{feedback_summary}"
structured_msg = self.send_structured_message(
sender=self.supervisor_name,
recipient=refiner.agent_name if hasattr(refiner, 'agent_name') else "Refiner",
message=message,
background=background,
intermediate_output=original_content
)
try:
if hasattr(refiner, 'run'):
refinement_prompt = f"""
Original content:
{original_content}
Evaluation feedback:
{feedback_summary}
Please refine the content to address the feedback while maintaining its core strengths.
"""
result = refiner.run(refinement_prompt)
self.intermediate_outputs["refiner"] = result
return result
else:
return original_content
except Exception as e:
logger.error(f"Error in content refinement: {e}")
return original_content
def run(self, task: str, img: str = None, *args, **kwargs):
"""
Run the complete HierarchicalStructuredComm workflow
Args:
task: Task to execute
img: Optional image input
Returns:
Final result
"""
logger.info(f"Running HierarchicalStructuredComm workflow for task: {task[:100]}...")
current_result = None
total_loops = 0
for loop in range(self.max_loops):
total_loops = loop + 1
logger.info(f"HierarchicalStructuredComm loop {total_loops}/{self.max_loops}")
# Execute step
step_result = self.step(task, img, *args, **kwargs)
if "error" in step_result:
logger.error(f"Error in loop {total_loops}: {step_result['error']}")
break
current_result = step_result["refined_result"]
# Check if we should continue refining
if loop < self.max_loops - 1:
# Simple continuation logic - could be enhanced
evaluation_scores = [result.score for result in step_result["evaluation_results"]]
avg_score = sum(evaluation_scores) / len(evaluation_scores) if evaluation_scores else 0
if avg_score >= 8.0: # High quality threshold
logger.info(f"High quality achieved (avg score: {avg_score:.2f}), stopping refinement")
break
return {
"final_result": current_result,
"total_loops": total_loops,
"conversation_history": self.conversation_history,
"evaluation_results": self.evaluation_results,
"intermediate_outputs": self.intermediate_outputs
}
def __str__(self):
return f"HierarchicalStructuredCommunicationSwarm(name={self.name}, generators={len(self.generators)}, evaluators={len(self.evaluators)}, refiners={len(self.refiners)})"
def __repr__(self):
return self.__str__()
Loading…
Cancel
Save