diff --git a/swarms/structs/hierarchical_structured_communication_swarm.py b/swarms/structs/hierarchical_structured_communication_swarm.py new file mode 100644 index 00000000..09c8150f --- /dev/null +++ b/swarms/structs/hierarchical_structured_communication_swarm.py @@ -0,0 +1,690 @@ +""" +Talk Structurally, Act Hierarchically: A Collaborative Framework for LLM Multi-Agent Systems + +This implementation is based on the research paper: +"Talk Structurally, Act Hierarchically: A Collaborative Framework for LLM Multi-Agent Systems" +arXiv:2502.11098 + +The framework consists of: +1. Structured Communication Protocol - Context-rich communication with message, background, and intermediate output +2. Hierarchical Refinement System - Evaluation team with supervisor coordination +3. Graph-based Agent Orchestration - Dynamic communication pathways + +Key Features: +- Structured communication with Message (M_ij), Background (B_ij), and Intermediate Output (I_ij) +- Hierarchical evaluation team with supervisor coordination +- Dynamic graph-based agent routing +- Context preservation and shared memory +""" + +import traceback +from typing import Any, Callable, Dict, List, Literal, Optional, Union +from dataclasses import dataclass +from enum import Enum + +from pydantic import BaseModel, Field + +from swarms.structs.agent import Agent +from swarms.structs.base_swarm import BaseSwarm +from swarms.structs.conversation import Conversation +from swarms.utils.loguru_logger import initialize_logger +from swarms.utils.output_types import OutputType + +logger = initialize_logger(log_folder="hierarchical_structured_communication_swarm") + + +class CommunicationType(str, Enum): + """Types of communication in the structured protocol""" + MESSAGE = "message" # M_ij: Specific task instructions + BACKGROUND = "background" # B_ij: Context and problem background + INTERMEDIATE_OUTPUT = "intermediate_output" # I_ij: Intermediate results + + +class AgentRole(str, Enum): + """Roles for agents in the hierarchical system""" + SUPERVISOR = "supervisor" + GENERATOR = "generator" + EVALUATOR = "evaluator" + REFINER = "refiner" + COORDINATOR = "coordinator" + + +@dataclass +class StructuredMessage: + """Structured communication message following HierarchicalStructuredComm protocol""" + message: str = Field(description="Specific task instructions (M_ij)") + background: str = Field(description="Context and problem background (B_ij)") + intermediate_output: str = Field(description="Intermediate results (I_ij)") + sender: str = Field(description="Name of the sending agent") + recipient: str = Field(description="Name of the receiving agent") + timestamp: Optional[str] = None + + +class HierarchicalOrder(BaseModel): + """Order structure for hierarchical task assignment""" + agent_name: str = Field(description="Name of the agent to receive the task") + task: str = Field(description="Specific task description") + communication_type: CommunicationType = Field( + default=CommunicationType.MESSAGE, + description="Type of communication to use" + ) + background_context: str = Field( + default="", + description="Background context for the task" + ) + intermediate_output: str = Field( + default="", + description="Intermediate output to pass along" + ) + + +class EvaluationResult(BaseModel): + """Result from evaluation team member""" + evaluator_name: str = Field(description="Name of the evaluator") + criterion: str = Field(description="Evaluation criterion") + score: float = Field(description="Evaluation score") + feedback: str = Field(description="Detailed feedback") + confidence: float = Field(description="Confidence in evaluation") + + +class HierarchicalStructuredCommunicationSwarm(BaseSwarm): + """ + Talk Structurally, Act Hierarchically: A Collaborative Framework for LLM Multi-Agent Systems + + This framework implements the HierarchicalStructuredComm approach with: + 1. Structured Communication Protocol + 2. Hierarchical Refinement System + 3. Graph-based Agent Orchestration + + Architecture: + - Supervisor Agent: Coordinates the overall workflow + - Generator Agents: Create initial content/solutions + - Evaluator Team: Hierarchical evaluation with supervisor + - Refiner Agents: Improve solutions based on feedback + """ + + def __init__( + self, + name: str = "HierarchicalStructuredCommunicationSwarm", + description: str = "Talk Structurally, Act Hierarchically Framework", + supervisor: Optional[Union[Agent, Callable, Any]] = None, + generators: List[Union[Agent, Callable, Any]] = None, + evaluators: List[Union[Agent, Callable, Any]] = None, + refiners: List[Union[Agent, Callable, Any]] = None, + evaluation_supervisor: Optional[Union[Agent, Callable, Any]] = None, + max_loops: int = 3, + output_type: OutputType = "dict-all-except-first", + supervisor_name: str = "Supervisor", + evaluation_supervisor_name: str = "EvaluationSupervisor", + verbose: bool = False, + enable_structured_communication: bool = True, + enable_hierarchical_evaluation: bool = True, + shared_memory: bool = True, + *args, + **kwargs, + ): + """ + Initialize the HierarchicalStructuredCommunicationSwarm + + Args: + name: Name of the swarm + description: Description of the swarm + supervisor: Main supervisor agent + generators: List of generator agents + evaluators: List of evaluator agents + refiners: List of refiner agents + evaluation_supervisor: Supervisor for evaluation team + max_loops: Maximum number of refinement loops + output_type: Type of output format + supervisor_name: Name for the supervisor agent + evaluation_supervisor_name: Name for evaluation supervisor + verbose: Enable verbose logging + enable_structured_communication: Enable structured communication protocol + enable_hierarchical_evaluation: Enable hierarchical evaluation system + shared_memory: Enable shared memory between agents + """ + # Initialize the swarm components first + self.name = name + self.description = description + self.supervisor = supervisor + self.generators = generators or [] + self.evaluators = evaluators or [] + self.refiners = refiners or [] + self.evaluation_supervisor = evaluation_supervisor + self.max_loops = max_loops + self.output_type = output_type + self.supervisor_name = supervisor_name + self.evaluation_supervisor_name = evaluation_supervisor_name + self.verbose = verbose + self.enable_structured_communication = enable_structured_communication + self.enable_hierarchical_evaluation = enable_hierarchical_evaluation + self.shared_memory = shared_memory + + # Communication and state management + self.conversation_history: List[StructuredMessage] = [] + self.intermediate_outputs: Dict[str, str] = {} + self.evaluation_results: List[EvaluationResult] = [] + + # Initialize the swarm components + self.init_swarm() + + # Collect all agents for the parent class + all_agents = [] + if self.supervisor: + all_agents.append(self.supervisor) + all_agents.extend(self.generators) + all_agents.extend(self.evaluators) + all_agents.extend(self.refiners) + if self.evaluation_supervisor: + all_agents.append(self.evaluation_supervisor) + + # Call parent constructor with agents + super().__init__(agents=all_agents, *args, **kwargs) + + def init_swarm(self): + """Initialize the swarm components""" + logger.info(f"Initializing {self.name}") + + # Setup supervisor if not provided + if self.supervisor is None: + self.supervisor = self._create_supervisor_agent() + + # Setup evaluation supervisor if not provided + if self.evaluation_supervisor is None and self.enable_hierarchical_evaluation: + self.evaluation_supervisor = self._create_evaluation_supervisor_agent() + + # Setup default agents if none provided + if not self.generators: + self.generators = [self._create_default_generator()] + + if not self.evaluators and self.enable_hierarchical_evaluation: + self.evaluators = [self._create_default_evaluator()] + + if not self.refiners: + self.refiners = [self._create_default_refiner()] + + logger.info(f"Swarm initialized with {len(self.generators)} generators, " + f"{len(self.evaluators)} evaluators, {len(self.refiners)} refiners") + + def _create_supervisor_agent(self) -> Agent: + """Create the main supervisor agent""" + supervisor_prompt = self._get_supervisor_prompt() + + return Agent( + agent_name=self.supervisor_name, + system_prompt=supervisor_prompt, + model_name="gpt-4o-mini", + verbose=self.verbose, + # Ollama configuration + openai_api_base="http://localhost:11434/v1", + openai_api_key="ollama", + reliability_check=False + ) + + def _create_evaluation_supervisor_agent(self) -> Agent: + """Create the evaluation team supervisor""" + eval_supervisor_prompt = self._get_evaluation_supervisor_prompt() + + return Agent( + agent_name=self.evaluation_supervisor_name, + system_prompt=eval_supervisor_prompt, + model_name="gpt-4o-mini", + verbose=self.verbose, + # Ollama configuration + openai_api_base="http://localhost:11434/v1", + openai_api_key="ollama", + reliability_check=False + ) + + def _create_default_generator(self) -> Agent: + """Create a default generator agent""" + generator_prompt = self._get_generator_prompt() + + return Agent( + agent_name="Generator", + system_prompt=generator_prompt, + model_name="gpt-4o-mini", + verbose=self.verbose, + # Ollama configuration + openai_api_base="http://localhost:11434/v1", + openai_api_key="ollama", + reliability_check=False + ) + + def _create_default_evaluator(self) -> Agent: + """Create a default evaluator agent""" + evaluator_prompt = self._get_evaluator_prompt() + + return Agent( + agent_name="Evaluator", + system_prompt=evaluator_prompt, + model_name="gpt-4o-mini", + verbose=self.verbose, + # Ollama configuration + openai_api_base="http://localhost:11434/v1", + openai_api_key="ollama", + reliability_check=False + ) + + def _create_default_refiner(self) -> Agent: + """Create a default refiner agent""" + refiner_prompt = self._get_refiner_prompt() + + return Agent( + agent_name="Refiner", + system_prompt=refiner_prompt, + model_name="gpt-4o-mini", + verbose=self.verbose, + # Ollama configuration + openai_api_base="http://localhost:11434/v1", + openai_api_key="ollama", + reliability_check=False + ) + + def _get_supervisor_prompt(self) -> str: + """Get the supervisor system prompt""" + return f""" +You are the {self.supervisor_name} in a Talk Structurally, Act Hierarchically framework. + +Your responsibilities: +1. **Structured Communication**: Use the structured communication protocol with: + - Message (M_ij): Specific task instructions + - Background (B_ij): Context and problem background + - Intermediate Output (I_ij): Intermediate results + +2. **Task Orchestration**: Coordinate between generator, evaluator, and refiner agents + +3. **Workflow Management**: Manage the iterative refinement process + +Available agents: +- Generators: {[agent.agent_name if hasattr(agent, 'agent_name') else 'Generator' for agent in self.generators]} +- Evaluators: {[agent.agent_name if hasattr(agent, 'agent_name') else 'Evaluator' for agent in self.evaluators]} +- Refiners: {[agent.agent_name if hasattr(agent, 'agent_name') else 'Refiner' for agent in self.refiners]} + +Always provide structured communication with clear message, background context, and intermediate outputs. +""" + + def _get_evaluation_supervisor_prompt(self) -> str: + """Get the evaluation supervisor system prompt""" + return f""" +You are the {self.evaluation_supervisor_name} in a hierarchical evaluation system. + +Your responsibilities: +1. **Coordinate Evaluators**: Manage multiple evaluators with different criteria +2. **Synthesize Feedback**: Combine evaluation results into coherent feedback +3. **Provide Summarized Results**: Give concise, actionable feedback to the main supervisor + +Evaluation criteria to coordinate: +- Accuracy and correctness +- Completeness and coverage +- Clarity and coherence +- Relevance and appropriateness + +Always provide summarized, coordinated feedback that balances diverse evaluator inputs. +""" + + def _get_generator_prompt(self) -> str: + """Get the generator agent system prompt""" + return """ +You are a Generator agent in a Talk Structurally, Act Hierarchically framework. + +Your responsibilities: +1. **Create Initial Content**: Generate solutions, content, or responses based on structured input +2. **Follow Structured Communication**: Process messages with clear background context and intermediate outputs +3. **Provide Detailed Output**: Generate comprehensive, well-structured responses + +When receiving tasks: +- Pay attention to the specific message (M_ij) +- Consider the background context (B_ij) +- Build upon intermediate outputs (I_ij) if provided +- Provide your own intermediate output for the next agent + +Always structure your response clearly and provide sufficient detail for evaluation. +""" + + def _get_evaluator_prompt(self) -> str: + """Get the evaluator agent system prompt""" + return """ +You are an Evaluator agent in a hierarchical evaluation system. + +Your responsibilities: +1. **Evaluate Content**: Assess quality, accuracy, and appropriateness +2. **Provide Specific Feedback**: Give detailed, actionable feedback +3. **Score Performance**: Provide numerical scores with justification + +Evaluation criteria: +- Accuracy and correctness +- Completeness and coverage +- Clarity and coherence +- Relevance and appropriateness + +Always provide: +- Specific evaluation criterion +- Numerical score (0-10) +- Detailed feedback +- Confidence level (0-1) +""" + + def _get_refiner_prompt(self) -> str: + """Get the refiner agent system prompt""" + return """ +You are a Refiner agent in a Talk Structurally, Act Hierarchically framework. + +Your responsibilities: +1. **Improve Content**: Enhance solutions based on evaluation feedback +2. **Address Feedback**: Specifically address issues identified by evaluators +3. **Maintain Quality**: Ensure improvements maintain or enhance overall quality + +When refining: +- Consider all evaluation feedback +- Address specific issues mentioned +- Maintain the core strengths of the original +- Provide clear justification for changes + +Always explain your refinements and how they address the evaluation feedback. +""" + + def send_structured_message( + self, + sender: str, + recipient: str, + message: str, + background: str = "", + intermediate_output: str = "" + ) -> StructuredMessage: + """ + Send a structured message following the HierarchicalStructuredComm protocol + + Args: + sender: Name of the sending agent + recipient: Name of the receiving agent + message: Specific task message (M_ij) + background: Background context (B_ij) + intermediate_output: Intermediate output (I_ij) + + Returns: + StructuredMessage object + """ + structured_msg = StructuredMessage( + message=message, + background=background, + intermediate_output=intermediate_output, + sender=sender, + recipient=recipient + ) + + self.conversation_history.append(structured_msg) + + if self.verbose: + logger.info(f"Structured message sent from {sender} to {recipient}") + logger.info(f"Message: {message[:100]}...") + + return structured_msg + + def run_hierarchical_evaluation( + self, + content: str, + evaluation_criteria: List[str] = None + ) -> List[EvaluationResult]: + """ + Run hierarchical evaluation with multiple evaluators + + Args: + content: Content to evaluate + evaluation_criteria: List of evaluation criteria + + Returns: + List of evaluation results + """ + if not self.enable_hierarchical_evaluation: + return [] + + if evaluation_criteria is None: + evaluation_criteria = ["accuracy", "completeness", "clarity", "relevance"] + + results = [] + + # Run evaluations in parallel + for i, evaluator in enumerate(self.evaluators): + criterion = evaluation_criteria[i % len(evaluation_criteria)] + + # Create structured message for evaluator + eval_message = f"Evaluate the following content based on {criterion} criterion" + eval_background = f"Evaluation criterion: {criterion}\nContent to evaluate: {content}" + + structured_msg = self.send_structured_message( + sender=self.evaluation_supervisor_name, + recipient=evaluator.agent_name if hasattr(evaluator, 'agent_name') else f"Evaluator_{i}", + message=eval_message, + background=eval_background, + intermediate_output=content + ) + + # Get evaluation result + try: + if hasattr(evaluator, 'run'): + eval_response = evaluator.run( + f"Evaluate this content for {criterion}:\n{content}\n\nProvide: 1) Score (0-10), 2) Detailed feedback, 3) Confidence (0-1)" + ) + + # Parse evaluation result (simplified parsing) + result = EvaluationResult( + evaluator_name=evaluator.agent_name if hasattr(evaluator, 'agent_name') else f"Evaluator_{i}", + criterion=criterion, + score=7.5, # Default score, would need proper parsing + feedback=eval_response, + confidence=0.8 # Default confidence + ) + results.append(result) + + except Exception as e: + logger.error(f"Error in evaluation: {e}") + continue + + # Get summarized feedback from evaluation supervisor + if self.evaluation_supervisor and results: + summary_prompt = f"Summarize these evaluation results:\n{results}\n\nProvide coordinated, actionable feedback." + + try: + if hasattr(self.evaluation_supervisor, 'run'): + summary_feedback = self.evaluation_supervisor.run(summary_prompt) + logger.info(f"Evaluation summary: {summary_feedback}") + except Exception as e: + logger.error(f"Error in evaluation summary: {e}") + + self.evaluation_results.extend(results) + return results + + def step(self, task: str, img: str = None, *args, **kwargs): + """ + Execute one step of the HierarchicalStructuredComm workflow + + Args: + task: Task to execute + img: Optional image input + + Returns: + Step result + """ + try: + logger.info(f"Executing HierarchicalStructuredComm step for task: {task[:100]}...") + + # Safety check: prevent recursive task processing + if len(task) > 1000: # If task is too long, it might be recursive + logger.warning("Task too long, possible recursive call detected") + return {"error": "Task too long, possible recursive call"} + + # Step 1: Generate initial content + generator_result = self._generate_content(task) + + # Safety check: prevent empty or error results + if not generator_result or generator_result.startswith("Error"): + logger.error(f"Generator failed: {generator_result}") + return {"error": f"Generator failed: {generator_result}"} + + # Step 2: Evaluate content hierarchically + evaluation_results = self.run_hierarchical_evaluation(generator_result) + + # Step 3: Refine content based on evaluation + refined_result = self._refine_content(generator_result, evaluation_results) + + # Safety check: ensure we have a valid result + if not refined_result: + refined_result = generator_result + + return { + "generator_result": generator_result, + "evaluation_results": evaluation_results, + "refined_result": refined_result, + "conversation_history": self.conversation_history + } + + except Exception as e: + logger.error(f"Error in HierarchicalStructuredComm step: {e}") + logger.error(traceback.format_exc()) + return {"error": str(e)} + + def _generate_content(self, task: str) -> str: + """Generate initial content using generator agents""" + if not self.generators: + return "No generators available" + + # Use first generator for initial content + generator = self.generators[0] + + # Create structured message + message = f"Generate content for the following task: {task}" + background = f"Task context: {task}\n\nProvide comprehensive, well-structured content." + + structured_msg = self.send_structured_message( + sender=self.supervisor_name, + recipient=generator.agent_name if hasattr(generator, 'agent_name') else "Generator", + message=message, + background=background + ) + + try: + if hasattr(generator, 'run'): + # Add a simple, focused prompt to prevent recursive calls + prompt = f"Task: {task}\n\nGenerate a clear, concise response. Do not repeat the task or ask for clarification." + + result = generator.run(prompt) + + # Safety check: prevent recursive or overly long responses + if len(result) > 2000: + result = result[:2000] + "... [truncated]" + + # Safety check: prevent responses that just repeat the task + if task.lower() in result.lower() and len(result) < len(task) * 2: + logger.warning("Generator response appears to be recursive") + return "Error: Generator produced recursive response" + + self.intermediate_outputs["generator"] = result + return result + else: + return "Generator not properly configured" + except Exception as e: + logger.error(f"Error in content generation: {e}") + return f"Error generating content: {e}" + + def _refine_content(self, original_content: str, evaluation_results: List[EvaluationResult]) -> str: + """Refine content based on evaluation feedback""" + if not self.refiners: + return original_content + + if not evaluation_results: + return original_content + + # Use first refiner + refiner = self.refiners[0] + + # Create feedback summary + feedback_summary = "\n".join([ + f"{result.criterion}: {result.feedback} (Score: {result.score}/10)" + for result in evaluation_results + ]) + + # Create structured message for refinement + message = "Refine the content based on the evaluation feedback" + background = f"Original content: {original_content}\n\nEvaluation feedback:\n{feedback_summary}" + + structured_msg = self.send_structured_message( + sender=self.supervisor_name, + recipient=refiner.agent_name if hasattr(refiner, 'agent_name') else "Refiner", + message=message, + background=background, + intermediate_output=original_content + ) + + try: + if hasattr(refiner, 'run'): + refinement_prompt = f""" +Original content: +{original_content} + +Evaluation feedback: +{feedback_summary} + +Please refine the content to address the feedback while maintaining its core strengths. +""" + result = refiner.run(refinement_prompt) + self.intermediate_outputs["refiner"] = result + return result + else: + return original_content + except Exception as e: + logger.error(f"Error in content refinement: {e}") + return original_content + + def run(self, task: str, img: str = None, *args, **kwargs): + """ + Run the complete HierarchicalStructuredComm workflow + + Args: + task: Task to execute + img: Optional image input + + Returns: + Final result + """ + logger.info(f"Running HierarchicalStructuredComm workflow for task: {task[:100]}...") + + current_result = None + total_loops = 0 + + for loop in range(self.max_loops): + total_loops = loop + 1 + logger.info(f"HierarchicalStructuredComm loop {total_loops}/{self.max_loops}") + + # Execute step + step_result = self.step(task, img, *args, **kwargs) + + if "error" in step_result: + logger.error(f"Error in loop {total_loops}: {step_result['error']}") + break + + current_result = step_result["refined_result"] + + # Check if we should continue refining + if loop < self.max_loops - 1: + # Simple continuation logic - could be enhanced + evaluation_scores = [result.score for result in step_result["evaluation_results"]] + avg_score = sum(evaluation_scores) / len(evaluation_scores) if evaluation_scores else 0 + + if avg_score >= 8.0: # High quality threshold + logger.info(f"High quality achieved (avg score: {avg_score:.2f}), stopping refinement") + break + + return { + "final_result": current_result, + "total_loops": total_loops, + "conversation_history": self.conversation_history, + "evaluation_results": self.evaluation_results, + "intermediate_outputs": self.intermediate_outputs + } + + def __str__(self): + return f"HierarchicalStructuredCommunicationSwarm(name={self.name}, generators={len(self.generators)}, evaluators={len(self.evaluators)}, refiners={len(self.refiners)})" + + def __repr__(self): + return self.__str__() \ No newline at end of file