diff --git a/examples/multi_agent/full_hierarchical_structured_communication_test.py b/examples/multi_agent/full_hierarchical_structured_communication_test.py deleted file mode 100644 index 4f6a33ea..00000000 --- a/examples/multi_agent/full_hierarchical_structured_communication_test.py +++ /dev/null @@ -1,718 +0,0 @@ -""" -Full Hierarchical Structured Communication Framework Test with Ollama - -This script demonstrates the complete Hierarchical Structured Communication framework -using Ollama for local model inference. It showcases all components: -- Structured Communication Protocol -- Hierarchical Evaluation System -- Graph-based Agent Orchestration -- Iterative Refinement Process -""" - -import requests -import json -import time -import argparse -import sys -from typing import List, Dict, Any -from dataclasses import dataclass -from enum import Enum - -# Color support -try: - from colorama import init, Fore, Back, Style - init(autoreset=True) - COLORS_AVAILABLE = True -except ImportError: - # Fallback for systems without colorama - class Fore: - RED = GREEN = BLUE = YELLOW = MAGENTA = CYAN = WHITE = "" - class Back: - BLACK = RED = GREEN = BLUE = YELLOW = MAGENTA = CYAN = WHITE = "" - class Style: - BRIGHT = DIM = NORMAL = RESET_ALL = "" - COLORS_AVAILABLE = False - -class CommunicationType(str, Enum): - """Types of communication in the structured protocol""" - MESSAGE = "message" # M_ij: Specific task instructions - BACKGROUND = "background" # B_ij: Context and problem background - INTERMEDIATE_OUTPUT = "intermediate_output" # I_ij: Intermediate results - -@dataclass -class StructuredMessage: - """Structured communication message following HierarchicalStructuredComm protocol""" - message: str - background: str - intermediate_output: str - sender: str - recipient: str - timestamp: str = None - -@dataclass -class EvaluationResult: - """Result from evaluation team member""" - evaluator_name: str - criterion: str - score: float - feedback: str - confidence: float - -class HierarchicalStructuredCommunicationFramework: - """ - Full implementation of Hierarchical Structured Communication framework - using direct Ollama API calls - """ - - def __init__(self, model_name: str = "llama3:latest", verbose: bool = True, max_display_length: int = None): - self.model_name = model_name - self.verbose = verbose - self.max_display_length = max_display_length - self.conversation_history: List[StructuredMessage] = [] - self.intermediate_outputs: Dict[str, str] = {} - self.evaluation_results: List[EvaluationResult] = [] - - # Check Ollama availability - self._check_ollama() - - def _print_colored(self, text: str, color: str = Fore.WHITE, style: str = Style.NORMAL): - """Print colored text if colors are available""" - if COLORS_AVAILABLE: - print(f"{color}{style}{text}{Style.RESET_ALL}") - else: - print(text) - - def _print_header(self, text: str): - """Print a header with styling""" - self._print_colored(f"\n{text}", Fore.CYAN, Style.BRIGHT) - self._print_colored("=" * len(text), Fore.CYAN) - - def _print_subheader(self, text: str): - """Print a subheader with styling""" - self._print_colored(f"\n{text}", Fore.YELLOW, Style.BRIGHT) - self._print_colored("-" * len(text), Fore.YELLOW) - - def _print_success(self, text: str): - """Print success message""" - self._print_colored(text, Fore.GREEN, Style.BRIGHT) - - def _print_error(self, text: str): - """Print error message""" - self._print_colored(text, Fore.RED, Style.BRIGHT) - - def _print_info(self, text: str): - """Print info message""" - self._print_colored(text, Fore.BLUE) - - def _print_warning(self, text: str): - """Print warning message""" - self._print_colored(text, Fore.YELLOW) - - def _truncate_text(self, text: str, max_length: int = None) -> str: - """Truncate text for display if needed""" - if max_length is None: - max_length = self.max_display_length - - if max_length and len(text) > max_length: - return text[:max_length] + "..." - return text - - def _check_ollama(self): - """Check if Ollama is running and get available models""" - try: - response = requests.get("http://localhost:11434/api/tags", timeout=5) - if response.status_code == 200: - models = response.json().get('models', []) - model_names = [model.get('name') for model in models] - - if self.verbose: - self._print_success("Ollama is running") - self._print_info(f"Available models: {', '.join(model_names)}") - - # Verify our model is available - if not any(self.model_name in name for name in model_names): - self._print_warning(f"Model {self.model_name} not found, using first available") - self.model_name = model_names[0] if model_names else "llama3:latest" - - self._print_info(f"Using model: {self.model_name}") - else: - raise Exception("Ollama not responding properly") - except Exception as e: - self._print_error(f"Cannot connect to Ollama: {e}") - self._print_info("Please ensure Ollama is running: ollama serve") - raise - - def _call_ollama(self, prompt: str, temperature: float = 0.7, max_tokens: int = 1000) -> str: - """Make a call to Ollama API with infinite timeout""" - try: - payload = { - "model": self.model_name, - "prompt": prompt, - "stream": False, - "options": { - "temperature": temperature, - "num_predict": max_tokens - } - } - - # Set timeout to None for infinite timeout - response = requests.post( - "http://localhost:11434/api/generate", - json=payload, - timeout=None - ) - - if response.status_code == 200: - return response.json().get('response', '') - else: - raise Exception(f"Ollama API error: {response.status_code}") - - except Exception as e: - self._print_error(f"Error calling Ollama: {e}") - return f"Error: {e}" - - def send_structured_message( - self, - sender: str, - recipient: str, - message: str, - background: str = "", - intermediate_output: str = "" - ) -> StructuredMessage: - """Send a structured message following the HierarchicalStructuredComm protocol""" - structured_msg = StructuredMessage( - message=message, - background=background, - intermediate_output=intermediate_output, - sender=sender, - recipient=recipient, - timestamp=time.strftime("%Y-%m-%d %H:%M:%S") - ) - - self.conversation_history.append(structured_msg) - - if self.verbose: - display_message = self._truncate_text(message, 100) - self._print_info(f"{sender} -> {recipient}: {display_message}") - - return structured_msg - - def generate_content(self, task: str, context: str = "") -> str: - """Generate initial content using generator agent""" - if self.verbose: - self._print_subheader("Step 1: Generating initial content") - - # Create structured message - message = f"Generate comprehensive content for: {task}" - background = f"Task: {task}\nContext: {context}\n\nProvide detailed, well-structured content." - - self.send_structured_message( - sender="Supervisor", - recipient="Generator", - message=message, - background=background - ) - - # Generate content - prompt = f"""You are a Content Generator in a Hierarchical Structured Communication framework. - -Task: {task} -Context: {context} - -Generate comprehensive, well-structured content that addresses the task thoroughly. -Provide detailed explanations, examples, and insights. - -Content:""" - - result = self._call_ollama(prompt, temperature=0.7, max_tokens=1500) - self.intermediate_outputs["generator"] = result - - if self.verbose: - self._print_info("Generated content:") - print(result) # Print full content without truncation - - return result - - def evaluate_content(self, content: str, criteria: List[str] = None) -> List[EvaluationResult]: - """Evaluate content using hierarchical evaluation system""" - if criteria is None: - criteria = ["accuracy", "completeness", "clarity", "relevance"] - - if self.verbose: - self._print_subheader("Step 2: Hierarchical evaluation") - - results = [] - - for criterion in criteria: - if self.verbose: - self._print_info(f" Evaluating {criterion}...") - - # Create structured message for evaluator - message = f"Evaluate content for {criterion} criterion" - background = f"Content to evaluate: {content[:500]}...\nCriterion: {criterion}" - - self.send_structured_message( - sender="EvaluationSupervisor", - recipient=f"{criterion.capitalize()}Evaluator", - message=message, - background=background, - intermediate_output=content - ) - - # Evaluate with specific criterion - prompt = f"""You are a {criterion.capitalize()} Evaluator in a hierarchical evaluation system. - -Content to evaluate: -{content} - -Evaluation criterion: {criterion} - -Please provide: -1. Score (0-10) -2. Detailed feedback -3. Confidence level (0-1) -4. Specific suggestions for improvement - -Evaluation:""" - - evaluation_response = self._call_ollama(prompt, temperature=0.3, max_tokens=800) - - # Parse evaluation (simplified parsing) - score = 7.0 # Default score - feedback = evaluation_response - confidence = 0.8 # Default confidence - - # Try to extract score from response - if "score" in evaluation_response.lower(): - try: - # Look for patterns like "score: 8" or "8/10" - import re - score_match = re.search(r'(\d+(?:\.\d+)?)/10|score[:\s]*(\d+(?:\.\d+)?)', evaluation_response.lower()) - if score_match: - score = float(score_match.group(1) or score_match.group(2)) - except: - pass - - result = EvaluationResult( - evaluator_name=f"{criterion.capitalize()}Evaluator", - criterion=criterion, - score=score, - feedback=feedback, - confidence=confidence - ) - - results.append(result) - - if self.verbose: - self._print_info(f" Score: {score}/10") - print(f" Feedback: {feedback}") # Print full feedback - - self.evaluation_results.extend(results) - return results - - def refine_content(self, original_content: str, evaluation_results: List[EvaluationResult]) -> str: - """Refine content based on evaluation feedback""" - if self.verbose: - self._print_subheader("Step 3: Refining content") - - # Create feedback summary - feedback_summary = "\n\n".join([ - f"{result.criterion.capitalize()} (Score: {result.score}/10):\n{result.feedback}" - for result in evaluation_results - ]) - - # Create structured message for refinement - message = "Refine content based on evaluation feedback" - background = f"Original content: {original_content[:500]}...\n\nEvaluation feedback:\n{feedback_summary}" - - self.send_structured_message( - sender="Supervisor", - recipient="Refiner", - message=message, - background=background, - intermediate_output=original_content - ) - - # Refine content - prompt = f"""You are a Content Refiner in a Hierarchical Structured Communication framework. - -Original Content: -{original_content} - -Evaluation Feedback: -{feedback_summary} - -Please refine the content to address the feedback while maintaining its core strengths. -Focus on the specific issues mentioned in the evaluation and provide improvements. - -Refined Content:""" - - refined_result = self._call_ollama(prompt, temperature=0.5, max_tokens=1500) - self.intermediate_outputs["refiner"] = refined_result - - if self.verbose: - self._print_info("Refined content:") - print(refined_result) # Print full content without truncation - - return refined_result - - def run_hierarchical_workflow(self, task: str, max_iterations: int = 3, quality_threshold: float = 8.0) -> Dict[str, Any]: - """Run the complete Hierarchical Structured Communication workflow""" - self._print_header("Starting Hierarchical Structured Communication Workflow") - self._print_info(f"Task: {task}") - self._print_info(f"Max iterations: {max_iterations}") - self._print_info(f"Quality threshold: {quality_threshold}") - - start_time = time.time() - current_content = None - iteration = 0 - - for iteration in range(max_iterations): - self._print_subheader(f"Iteration {iteration + 1}/{max_iterations}") - - # Step 1: Generate/Refine content - if iteration == 0: - current_content = self.generate_content(task) - else: - current_content = self.refine_content(current_content, evaluation_results) - - # Step 2: Evaluate content - evaluation_results = self.evaluate_content(current_content) - - # Step 3: Check quality threshold - avg_score = sum(result.score for result in evaluation_results) / len(evaluation_results) - self._print_info(f"Average evaluation score: {avg_score:.2f}/10") - - if avg_score >= quality_threshold: - self._print_success("Quality threshold met! Stopping refinement.") - break - - if iteration < max_iterations - 1: - self._print_info("Continuing refinement...") - - total_time = time.time() - start_time - - return { - "final_content": current_content, - "total_iterations": iteration + 1, - "average_score": avg_score, - "evaluation_results": evaluation_results, - "conversation_history": self.conversation_history, - "intermediate_outputs": self.intermediate_outputs, - "total_time": total_time - } - - def print_workflow_summary(self, result: Dict[str, Any]): - """Print a comprehensive summary of the workflow results""" - self._print_header("TALK HIERARCHICAL WORKFLOW COMPLETED") - - self._print_subheader("PERFORMANCE SUMMARY") - self._print_info(f" Total iterations: {result['total_iterations']}") - self._print_info(f" Final average score: {result['average_score']:.2f}/10") - self._print_info(f" Total time: {result['total_time']:.2f} seconds") - self._print_info(f" Messages exchanged: {len(result['conversation_history'])}") - - self._print_subheader("FINAL CONTENT") - print(result['final_content']) - - self._print_subheader("EVALUATION RESULTS") - for eval_result in result['evaluation_results']: - self._print_info(f" {eval_result.criterion.capitalize()}: {eval_result.score}/10") - print(f" Feedback: {eval_result.feedback}") - - self._print_subheader("COMMUNICATION HISTORY") - for i, msg in enumerate(result['conversation_history']): - self._print_info(f" {i+1}. {msg.sender} -> {msg.recipient}") - print(f" Message: {msg.message}") - print(f" Background: {msg.background}") - print(f" Intermediate Output: {msg.intermediate_output}") - print(f" Time: {msg.timestamp}") - -def test_basic_workflow(): - """Test basic Hierarchical Structured Communication workflow""" - print("Test 1: Basic Workflow") - print("=" * 50) - - framework = HierarchicalStructuredCommunicationFramework(model_name="llama3:latest", verbose=True) - - task = "Explain the concept of neural networks and their applications in modern AI" - - result = framework.run_hierarchical_workflow( - task=task, - max_iterations=2, - quality_threshold=7.5 - ) - - framework.print_workflow_summary(result) - return result - -def test_complex_workflow(): - """Test complex workflow with multiple iterations""" - print("Test 2: Complex Workflow") - print("=" * 50) - - framework = HierarchicalStructuredCommunicationFramework(model_name="llama3:latest", verbose=True) - - task = """Create a comprehensive guide on machine learning that covers: -1. Basic concepts and definitions -2. Types of machine learning (supervised, unsupervised, reinforcement) -3. Common algorithms and their use cases -4. Real-world applications and examples -5. Future trends and challenges - -Make it suitable for both beginners and intermediate learners.""" - - result = framework.run_hierarchical_workflow( - task=task, - max_iterations=3, - quality_threshold=8.0 - ) - - framework.print_workflow_summary(result) - return result - -def test_structured_communication(): - """Test structured communication protocol in isolation""" - print("Test 3: Structured Communication Protocol") - print("=" * 50) - - framework = HierarchicalStructuredCommunicationFramework(model_name="llama3:latest", verbose=True) - - # Test structured message exchange - msg1 = framework.send_structured_message( - sender="Supervisor", - recipient="Generator", - message="Generate content about renewable energy", - background="Focus on solar and wind power", - intermediate_output="Previous discussion covered climate change" - ) - - msg2 = framework.send_structured_message( - sender="Generator", - recipient="Evaluator", - message="Content ready for evaluation", - background="Generated comprehensive guide on renewable energy", - intermediate_output="Detailed explanation of solar and wind technologies" - ) - - print("Structured Messages:") - for i, msg in enumerate(framework.conversation_history): - print(f"Message {i+1}:") - print(f" From: {msg.sender}") - print(f" To: {msg.recipient}") - print(f" Message (M_ij): {msg.message}") - print(f" Background (B_ij): {msg.background}") - print(f" Intermediate Output (I_ij): {msg.intermediate_output}") - print(f" Timestamp: {msg.timestamp}") - -def test_quick_demo(): - """Quick demonstration with smaller model and shorter prompts""" - print("Test 4: Quick Demo") - print("=" * 50) - - # Use a smaller model for faster response - framework = HierarchicalStructuredCommunicationFramework(model_name="llama3.2:3b", verbose=True) - - task = "Explain what artificial intelligence is in simple terms" - - result = framework.run_hierarchical_workflow( - task=task, - max_iterations=1, - quality_threshold=6.0 - ) - - framework.print_workflow_summary(result) - return result - -def interactive_mode(): - """Interactive mode for custom tasks""" - print("Interactive Hierarchical Structured Communication Framework") - print("=" * 50) - - # Get user input - model_name = input("Enter model name (default: llama3:latest): ").strip() or "llama3:latest" - task = input("Enter your task: ").strip() - - if not task: - print("No task provided. Exiting.") - return - - max_iterations = input("Enter max iterations (default: 2): ").strip() - max_iterations = int(max_iterations) if max_iterations.isdigit() else 2 - - quality_threshold = input("Enter quality threshold (default: 7.0): ").strip() - quality_threshold = float(quality_threshold) if quality_threshold.replace('.', '').isdigit() else 7.0 - - # Create framework and run - framework = HierarchicalStructuredCommunicationFramework(model_name=model_name, verbose=True) - - result = framework.run_hierarchical_workflow( - task=task, - max_iterations=max_iterations, - quality_threshold=quality_threshold - ) - - framework.print_workflow_summary(result) - return result - -def main(): - """Main CLI entry point""" - parser = argparse.ArgumentParser( - description="Hierarchical Structured Communication Framework Test Suite", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - python full_hierarchical_structured_communication_test.py --quick - python full_hierarchical_structured_communication_test.py --interactive - python full_hierarchical_structured_communication_test.py --model llama3.2:3b --task "Explain AI" - python full_hierarchical_structured_communication_test.py --all - """ - ) - - parser.add_argument( - "--quick", - action="store_true", - help="Run quick demo test" - ) - - parser.add_argument( - "--basic", - action="store_true", - help="Run basic workflow test" - ) - - parser.add_argument( - "--complex", - action="store_true", - help="Run complex workflow test" - ) - - parser.add_argument( - "--communication", - action="store_true", - help="Run structured communication test" - ) - - parser.add_argument( - "--all", - action="store_true", - help="Run all tests" - ) - - parser.add_argument( - "--interactive", - action="store_true", - help="Run in interactive mode" - ) - - parser.add_argument( - "--model", - type=str, - default="llama3:latest", - help="Ollama model to use (default: llama3:latest)" - ) - - parser.add_argument( - "--task", - type=str, - help="Custom task for single run" - ) - - parser.add_argument( - "--iterations", - type=int, - default=2, - help="Maximum iterations (default: 2)" - ) - - parser.add_argument( - "--threshold", - type=float, - default=7.0, - help="Quality threshold (default: 7.0)" - ) - - parser.add_argument( - "--no-color", - action="store_true", - help="Disable colored output" - ) - - parser.add_argument( - "--quiet", - action="store_true", - help="Disable verbose output (verbose is enabled by default)" - ) - - args = parser.parse_args() - - # Disable colors if requested - global COLORS_AVAILABLE - if args.no_color: - COLORS_AVAILABLE = False - - # Print header - if COLORS_AVAILABLE: - print(f"{Fore.CYAN}{Style.BRIGHT}") - print("=" * 80) - print("TALK HIERARCHICAL FRAMEWORK TEST SUITE") - print("=" * 80) - print(f"{Style.RESET_ALL}") - else: - print("=" * 80) - print("TALK HIERARCHICAL FRAMEWORK TEST SUITE") - print("=" * 80) - - print("Testing Hierarchical Structured Communication framework with Ollama") - print("=" * 80) - - try: - if args.interactive: - interactive_mode() - elif args.task: - # Single task run - framework = HierarchicalStructuredCommunicationFramework( - model_name=args.model, - verbose=not args.quiet - ) - result = framework.run_hierarchical_workflow( - task=args.task, - max_iterations=args.iterations, - quality_threshold=args.threshold - ) - framework.print_workflow_summary(result) - elif args.quick: - test_quick_demo() - elif args.basic: - test_basic_workflow() - elif args.complex: - test_complex_workflow() - elif args.communication: - test_structured_communication() - elif args.all: - # Run all tests - test_quick_demo() - test_basic_workflow() - test_complex_workflow() - test_structured_communication() - else: - # Default: run quick demo - test_quick_demo() - - print("All tests completed successfully!") - print("Framework Features Demonstrated:") - print(" Structured Communication Protocol (M_ij, B_ij, I_ij)") - print(" Hierarchical Evaluation System") - print(" Iterative Refinement Process") - print(" Graph-based Agent Orchestration") - print(" Local Ollama Integration") - - except KeyboardInterrupt: - print("\nInterrupted by user") - except Exception as e: - print(f"Error during testing: {e}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - main() \ No newline at end of file