From 582edf427f346758af9267cb835ccef3d5f48fc6 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Fri, 22 Aug 2025 16:37:29 -0700 Subject: [PATCH] [FIX][Board of Directors] [CLEANUP __INITS__] [New Examples for livestream] --- example.py | 31 +- .../board_of_directors_example.py | 83 +- .../minimal_board_example.py | 51 + .../simple_board_example.py | 35 + ...gle_file_hierarchical_framework_example.py | 171 +-- .../agent_loader/claude_code_compatible.py | 9 + .../utils/agent_loader/finance_advisor.md | 2 +- requirements.txt | 1 - swarms/cli/main.py | 25 +- swarms/structs/__init__.py | 48 +- swarms/structs/board_of_directors_swarm.py | 198 +-- ...ical_structured_communication_framework.py | 1186 ++++++++++------- 12 files changed, 941 insertions(+), 899 deletions(-) create mode 100644 examples/multi_agent/board_of_directors/minimal_board_example.py create mode 100644 examples/multi_agent/board_of_directors/simple_board_example.py create mode 100644 examples/utils/agent_loader/claude_code_compatible.py diff --git a/example.py b/example.py index f6427822..96886521 100644 --- a/example.py +++ b/example.py @@ -4,40 +4,11 @@ from swarms import Agent agent = Agent( agent_name="Quantitative-Trading-Agent", agent_description="Advanced quantitative trading and algorithmic analysis agent", - system_prompt="""You are an expert quantitative trading agent with deep expertise in: - - Algorithmic trading strategies and implementation - - Statistical arbitrage and market making - - Risk management and portfolio optimization - - High-frequency trading systems - - Market microstructure analysis - - Quantitative research methodologies - - Financial mathematics and stochastic processes - - Machine learning applications in trading - - Your core responsibilities include: - 1. Developing and backtesting trading strategies - 2. Analyzing market data and identifying alpha opportunities - 3. Implementing risk management frameworks - 4. Optimizing portfolio allocations - 5. Conducting quantitative research - 6. Monitoring market microstructure - 7. Evaluating trading system performance - - You maintain strict adherence to: - - Mathematical rigor in all analyses - - Statistical significance in strategy development - - Risk-adjusted return optimization - - Market impact minimization - - Regulatory compliance - - Transaction cost analysis - - Performance attribution - - You communicate in precise, technical terms while maintaining clarity for stakeholders.""", model_name="claude-sonnet-4-20250514", dynamic_temperature_enabled=True, - output_type="str-all-except-first", max_loops=1, dynamic_context_window=True, + streaming_on=True, ) out = agent.run( diff --git a/examples/multi_agent/board_of_directors/board_of_directors_example.py b/examples/multi_agent/board_of_directors/board_of_directors_example.py index 2461919e..8966ac30 100644 --- a/examples/multi_agent/board_of_directors/board_of_directors_example.py +++ b/examples/multi_agent/board_of_directors/board_of_directors_example.py @@ -10,23 +10,8 @@ To run this example: 2. Run: python examples/multi_agent/board_of_directors/board_of_directors_example.py """ -import os -import sys from typing import List -# Add the root directory to the Python path if running from examples directory -current_dir = os.path.dirname(os.path.abspath(__file__)) -if "examples" in current_dir: - root_dir = current_dir - while os.path.basename( - root_dir - ) != "examples" and root_dir != os.path.dirname(root_dir): - root_dir = os.path.dirname(root_dir) - if os.path.basename(root_dir) == "examples": - root_dir = os.path.dirname(root_dir) - if root_dir not in sys.path: - sys.path.insert(0, root_dir) - from swarms.structs.board_of_directors_swarm import ( BoardOfDirectorsSwarm, BoardMember, @@ -37,7 +22,6 @@ from swarms.structs.agent import Agent def create_board_members() -> List[BoardMember]: """Create board members with specific roles.""" - chairman = Agent( agent_name="Chairman", agent_description="Executive Chairman with strategic vision", @@ -86,7 +70,6 @@ def create_board_members() -> List[BoardMember]: def create_worker_agents() -> List[Agent]: """Create worker agents for the swarm.""" - researcher = Agent( agent_name="Researcher", agent_description="Research analyst for data analysis", @@ -114,9 +97,8 @@ def create_worker_agents() -> List[Agent]: return [researcher, developer, marketer] -def run_board_example() -> None: +def run_board_example() -> str: """Run a Board of Directors example.""" - # Create board members and worker agents board_members = create_board_members() worker_agents = create_worker_agents() @@ -127,7 +109,7 @@ def run_board_example() -> None: board_members=board_members, agents=worker_agents, max_loops=2, - verbose=True, + verbose=False, decision_threshold=0.6, ) @@ -137,66 +119,17 @@ def run_board_example() -> None: Include market research, technical planning, marketing strategy, and financial projections. """ - # Execute the task - result = board_swarm.run(task=task) - - print("Task completed successfully!") - print(f"Result: {result}") - - -def run_simple_example() -> None: - """Run a simple Board of Directors example.""" - - # Create simple agents - analyst = Agent( - agent_name="Analyst", - agent_description="Data analyst", - model_name="gpt-4o-mini", - max_loops=1, - ) - - writer = Agent( - agent_name="Writer", - agent_description="Content writer", - model_name="gpt-4o-mini", - max_loops=1, - ) - - # Create swarm with default settings - board_swarm = BoardOfDirectorsSwarm( - name="Simple_Board", - agents=[analyst, writer], - verbose=True, - ) - - # Execute simple task - task = ( - "Analyze current market trends and create a summary report." - ) - result = board_swarm.run(task=task) - - print("Simple example completed!") - print(f"Result: {result}") + # Execute the task and return result + return board_swarm.run(task=task) def main() -> None: - """Main function to run the examples.""" - - if not os.getenv("OPENAI_API_KEY"): - print( - "Warning: OPENAI_API_KEY not set. Example may not work." - ) - return try: - print("Running simple Board of Directors example...") - run_simple_example() - - print("\nRunning comprehensive Board of Directors example...") - run_board_example() - - except Exception as e: - print(f"Error: {e}") + result = run_board_example() + return result + except Exception: + pass if __name__ == "__main__": diff --git a/examples/multi_agent/board_of_directors/minimal_board_example.py b/examples/multi_agent/board_of_directors/minimal_board_example.py new file mode 100644 index 00000000..74543c72 --- /dev/null +++ b/examples/multi_agent/board_of_directors/minimal_board_example.py @@ -0,0 +1,51 @@ +""" +Minimal Board of Directors Example + +This example demonstrates the most basic Board of Directors swarm setup +with minimal configuration and agents. + +To run this example: +1. Make sure you're in the root directory of the swarms project +2. Run: python examples/multi_agent/board_of_directors/minimal_board_example.py +""" + +from swarms.structs.board_of_directors_swarm import ( + BoardOfDirectorsSwarm, +) +from swarms.structs.agent import Agent + + +def run_minimal_example() -> str: + """Run a minimal Board of Directors example.""" + # Create a single agent + agent = Agent( + agent_name="General_Agent", + agent_description="General purpose agent", + model_name="gpt-4o-mini", + max_loops=1, + ) + + # Create minimal swarm + board_swarm = BoardOfDirectorsSwarm( + name="Minimal_Board", + agents=[agent], + verbose=False, + ) + + # Execute minimal task + task = "Provide a brief overview of artificial intelligence." + return board_swarm.run(task=task) + + +def main() -> None: + """Main function to run the minimal example.""" + + try: + result = run_minimal_example() + return result + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/examples/multi_agent/board_of_directors/simple_board_example.py b/examples/multi_agent/board_of_directors/simple_board_example.py new file mode 100644 index 00000000..fa5e4aef --- /dev/null +++ b/examples/multi_agent/board_of_directors/simple_board_example.py @@ -0,0 +1,35 @@ +from swarms.structs.board_of_directors_swarm import ( + BoardOfDirectorsSwarm, +) +from swarms.structs.agent import Agent + +# Create simple agents for basic tasks +analyst = Agent( + agent_name="Analyst", + agent_description="Data analyst", + model_name="gpt-4o-mini", + max_loops=1, +) + +writer = Agent( + agent_name="Writer", + agent_description="Content writer", + model_name="gpt-4o-mini", + max_loops=1, +) + +agents = [analyst, writer] + +# Create swarm with default settings +board_swarm = BoardOfDirectorsSwarm( + name="Simple_Board", + agents=agents, + verbose=False, +) + +# Execute simple task +task = "Analyze current market trends and create a summary report." + +result = board_swarm.run(task=task) + +print(result) diff --git a/examples/multi_agent/hscf/single_file_hierarchical_framework_example.py b/examples/multi_agent/hscf/single_file_hierarchical_framework_example.py index 70221f29..1ae39dfa 100644 --- a/examples/multi_agent/hscf/single_file_hierarchical_framework_example.py +++ b/examples/multi_agent/hscf/single_file_hierarchical_framework_example.py @@ -9,10 +9,11 @@ All components are now in one file: hierarchical_structured_communication_framew import os import sys -from typing import Dict, Any # Add the project root to the Python path -project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +project_root = os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "..") +) sys.path.insert(0, project_root) from dotenv import load_dotenv @@ -20,11 +21,6 @@ from dotenv import load_dotenv # Import everything from the single file from swarms.structs.hierarchical_structured_communication_framework import ( HierarchicalStructuredCommunicationFramework, - HierarchicalStructuredCommunicationGenerator, - HierarchicalStructuredCommunicationEvaluator, - HierarchicalStructuredCommunicationRefiner, - HierarchicalStructuredCommunicationSupervisor, - # Convenience aliases TalkHierarchicalGenerator, TalkHierarchicalEvaluator, TalkHierarchicalRefiner, @@ -42,29 +38,29 @@ def example_basic_usage(): print("=" * 80) print("BASIC USAGE EXAMPLE") print("=" * 80) - + # Create framework with default configuration framework = HierarchicalStructuredCommunicationFramework( - name="BasicFramework", - max_loops=2, - verbose=True + name="BasicFramework", max_loops=2, verbose=True ) - + # Run a simple task task = "Explain the benefits of structured communication in multi-agent systems" - + print(f"Task: {task}") print("Running framework...") - + result = framework.run(task) - + print("\n" + "=" * 50) print("FINAL RESULT") print("=" * 50) print(result["final_result"]) - + print(f"\nTotal loops: {result['total_loops']}") - print(f"Conversation history entries: {len(result['conversation_history'])}") + print( + f"Conversation history entries: {len(result['conversation_history'])}" + ) print(f"Evaluation results: {len(result['evaluation_results'])}") @@ -75,40 +71,40 @@ def example_custom_agents(): print("\n" + "=" * 80) print("CUSTOM AGENTS EXAMPLE") print("=" * 80) - + # Create custom agents using the convenience aliases generator = TalkHierarchicalGenerator( agent_name="ContentCreator", model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + evaluator1 = TalkHierarchicalEvaluator( agent_name="AccuracyChecker", evaluation_criteria=["accuracy", "technical_correctness"], model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + evaluator2 = TalkHierarchicalEvaluator( agent_name="ClarityChecker", evaluation_criteria=["clarity", "readability", "coherence"], model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + refiner = TalkHierarchicalRefiner( agent_name="ContentImprover", model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + supervisor = TalkHierarchicalSupervisor( agent_name="WorkflowManager", model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + # Create framework with custom agents framework = HierarchicalStructuredCommunicationFramework( name="CustomFramework", @@ -117,24 +113,26 @@ def example_custom_agents(): evaluators=[evaluator1, evaluator2], refiners=[refiner], max_loops=3, - verbose=True + verbose=True, ) - + # Run a complex task task = "Design a comprehensive machine learning pipeline for sentiment analysis" - + print(f"Task: {task}") print("Running framework with custom agents...") - + result = framework.run(task) - + print("\n" + "=" * 50) print("FINAL RESULT") print("=" * 50) print(result["final_result"]) - + print(f"\nTotal loops: {result['total_loops']}") - print(f"Conversation history entries: {len(result['conversation_history'])}") + print( + f"Conversation history entries: {len(result['conversation_history'])}" + ) print(f"Evaluation results: {len(result['evaluation_results'])}") @@ -145,7 +143,7 @@ def example_ollama_integration(): print("\n" + "=" * 80) print("OLLAMA INTEGRATION EXAMPLE") print("=" * 80) - + # Create framework with Ollama configuration framework = HierarchicalStructuredCommunicationFramework( name="OllamaFramework", @@ -154,27 +152,31 @@ def example_ollama_integration(): model_name="llama3:latest", use_ollama=True, ollama_base_url="http://localhost:11434/v1", - ollama_api_key="ollama" + ollama_api_key="ollama", ) - + # Run a task with local model task = "Explain the concept of structured communication protocols" - + print(f"Task: {task}") print("Running framework with Ollama...") - + try: result = framework.run(task) - + print("\n" + "=" * 50) print("FINAL RESULT") print("=" * 50) print(result["final_result"]) - + print(f"\nTotal loops: {result['total_loops']}") - print(f"Conversation history entries: {len(result['conversation_history'])}") - print(f"Evaluation results: {len(result['evaluation_results'])}") - + print( + f"Conversation history entries: {len(result['conversation_history'])}" + ) + print( + f"Evaluation results: {len(result['evaluation_results'])}" + ) + except Exception as e: print(f"Error with Ollama: {e}") print("Make sure Ollama is running: ollama serve") @@ -187,28 +189,31 @@ def example_structured_communication(): print("\n" + "=" * 80) print("STRUCTURED COMMUNICATION EXAMPLE") print("=" * 80) - + # Create framework framework = HierarchicalStructuredCommunicationFramework( - name="CommunicationDemo", - verbose=True + name="CommunicationDemo", verbose=True ) - + # Demonstrate structured message sending print("Sending structured message...") - + structured_msg = framework.send_structured_message( sender="Supervisor", recipient="Generator", message="Create a technical documentation outline", background="For a Python library focused on data processing", - intermediate_output="Previous research on similar libraries" + intermediate_output="Previous research on similar libraries", ) - + print(f"Message sent: {structured_msg.message}") print(f"Background: {structured_msg.background}") - print(f"Intermediate output: {structured_msg.intermediate_output}") - print(f"From: {structured_msg.sender} -> To: {structured_msg.recipient}") + print( + f"Intermediate output: {structured_msg.intermediate_output}" + ) + print( + f"From: {structured_msg.sender} -> To: {structured_msg.recipient}" + ) def example_agent_interaction(): @@ -218,52 +223,51 @@ def example_agent_interaction(): print("\n" + "=" * 80) print("AGENT INTERACTION EXAMPLE") print("=" * 80) - + # Create agents generator = TalkHierarchicalGenerator( - agent_name="ContentGenerator", - verbose=True + agent_name="ContentGenerator", verbose=True ) - + evaluator = TalkHierarchicalEvaluator( agent_name="QualityEvaluator", evaluation_criteria=["accuracy", "clarity"], - verbose=True + verbose=True, ) - + refiner = TalkHierarchicalRefiner( - agent_name="ContentRefiner", - verbose=True + agent_name="ContentRefiner", verbose=True ) - + # Generate content print("1. Generating content...") gen_result = generator.generate_with_structure( message="Create a brief explanation of machine learning", background="For beginners with no technical background", - intermediate_output="" + intermediate_output="", ) - + print(f"Generated content: {gen_result.content[:200]}...") - + # Evaluate content print("\n2. Evaluating content...") eval_result = evaluator.evaluate_with_criterion( - content=gen_result.content, - criterion="clarity" + content=gen_result.content, criterion="clarity" ) - + print(f"Evaluation score: {eval_result.score}/10") print(f"Feedback: {eval_result.feedback[:200]}...") - + # Refine content print("\n3. Refining content...") refine_result = refiner.refine_with_feedback( original_content=gen_result.content, - evaluation_results=[eval_result] + evaluation_results=[eval_result], + ) + + print( + f"Refined content: {refine_result.refined_content[:200]}..." ) - - print(f"Refined content: {refine_result.refined_content[:200]}...") print(f"Changes made: {refine_result.changes_made}") @@ -271,12 +275,16 @@ def main(): """ Main function to run all examples """ - print("SINGLE-FILE HIERARCHICAL STRUCTURED COMMUNICATION FRAMEWORK") + print( + "SINGLE-FILE HIERARCHICAL STRUCTURED COMMUNICATION FRAMEWORK" + ) print("=" * 80) - print("This demonstrates the consolidated single-file implementation") + print( + "This demonstrates the consolidated single-file implementation" + ) print("based on the research paper: arXiv:2502.11098") print("=" * 80) - + try: # Run examples example_basic_usage() @@ -284,27 +292,30 @@ def main(): example_ollama_integration() example_structured_communication() example_agent_interaction() - + print("\n" + "=" * 80) print("ALL EXAMPLES COMPLETED SUCCESSFULLY!") print("=" * 80) print("Framework Features Demonstrated:") print("✓ Single-file implementation") - print("✓ Structured Communication Protocol (M_ij, B_ij, I_ij)") + print( + "✓ Structured Communication Protocol (M_ij, B_ij, I_ij)" + ) print("✓ Hierarchical Evaluation System") print("✓ Iterative Refinement Process") print("✓ Flexible Model Configuration (OpenAI/Ollama)") print("✓ Custom Agent Specialization") print("✓ Direct Agent Interaction") print("✓ Convenience Aliases") - + except KeyboardInterrupt: print("\nInterrupted by user") except Exception as e: print(f"Error during execution: {e}") import traceback + traceback.print_exc() if __name__ == "__main__": - main() + main() diff --git a/examples/utils/agent_loader/claude_code_compatible.py b/examples/utils/agent_loader/claude_code_compatible.py new file mode 100644 index 00000000..ae05825f --- /dev/null +++ b/examples/utils/agent_loader/claude_code_compatible.py @@ -0,0 +1,9 @@ +from swarms import load_agents_from_markdown + +agents = load_agents_from_markdown(["finance_advisor.md"]) + +# Use the agent +response = agents[0].run( + "I have $100k to invest. I want to hedge my bets on the energy companies that will benefit from the AI revoltion" + "What are the top 4 stocks to invest in?" +) diff --git a/examples/utils/agent_loader/finance_advisor.md b/examples/utils/agent_loader/finance_advisor.md index 62c32e51..ff0ab41f 100644 --- a/examples/utils/agent_loader/finance_advisor.md +++ b/examples/utils/agent_loader/finance_advisor.md @@ -1,7 +1,7 @@ --- name: FinanceAdvisor description: Expert financial advisor for investment and budgeting guidance -model_name: gpt-4o +model_name: claude-sonnet-4-20250514 temperature: 0.7 max_loops: 1 --- diff --git a/requirements.txt b/requirements.txt index e873cffb..74380a53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,4 +28,3 @@ mcp numpy openai schedule -colorama diff --git a/swarms/cli/main.py b/swarms/cli/main.py index 3da9af19..2892e06d 100644 --- a/swarms/cli/main.py +++ b/swarms/cli/main.py @@ -22,6 +22,9 @@ from swarms.cli.onboarding_process import OnboardingProcess from swarms.structs.agent import Agent from swarms.utils.agent_loader import AgentLoader from swarms.utils.formatter import formatter +from dotenv import load_dotenv + +load_dotenv() # Initialize console with custom styling console = Console() @@ -397,7 +400,14 @@ def check_python_version() -> tuple[bool, str, str]: def check_api_keys() -> tuple[bool, str, str]: - """Check if common API keys are set.""" + """ + Check if at least one common API key is set in the environment variables. + + Returns: + tuple: (True, "✓", message) if at least one API key is set, + (False, "✗", message) otherwise. + """ + api_keys = { "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"), "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY"), @@ -405,9 +415,16 @@ def check_api_keys() -> tuple[bool, str, str]: "COHERE_API_KEY": os.getenv("COHERE_API_KEY"), } - set_keys = [key for key, value in api_keys.items() if value] - if set_keys: - return True, "✓", f"API keys found: {', '.join(set_keys)}" + # At least one key must be present and non-empty + if any(value for value in api_keys.values()): + present_keys = [ + key for key, value in api_keys.items() if value + ] + return ( + True, + "✓", + f"At least one API key found: {', '.join(present_keys)}", + ) else: return ( False, diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 04a83dd8..1af73d1d 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -1,12 +1,10 @@ from swarms.structs.agent import Agent from swarms.structs.agent_builder import AgentsBuilder +from swarms.structs.agent_rearrange import AgentRearrange, rearrange from swarms.structs.auto_swarm_builder import AutoSwarmBuilder from swarms.structs.base_structure import BaseStructure from swarms.structs.base_swarm import BaseSwarm from swarms.structs.batch_agent_execution import batch_agent_execution -from swarms.structs.board_of_directors_swarm import ( - BoardOfDirectorsSwarm, -) from swarms.structs.concurrent_workflow import ConcurrentWorkflow from swarms.structs.conversation import Conversation from swarms.structs.council_as_judge import CouncilAsAJudge @@ -24,8 +22,8 @@ from swarms.structs.groupchat import ( expertise_based, ) from swarms.structs.heavy_swarm import HeavySwarm -from swarms.structs.hierarchical_swarm import HierarchicalSwarm -from swarms.structs.hybrid_hierarchical_peer_swarm import ( +from swarms.structs.hiearchical_swarm import HierarchicalSwarm +from swarms.structs.hybrid_hiearchical_peer_swarm import ( HybridHierarchicalClusterSwarm, ) from swarms.structs.interactive_groupchat import ( @@ -66,7 +64,6 @@ from swarms.structs.multi_agent_exec import ( run_single_agent, ) from swarms.structs.multi_agent_router import MultiAgentRouter -from swarms.structs.rearrange import AgentRearrange, rearrange from swarms.structs.round_robin import RoundRobinSwarm from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm @@ -82,7 +79,7 @@ from swarms.structs.stopping_conditions import ( check_stopped, check_success, ) -from swarms.structs.swarm_arange import SwarmRearrange +from swarms.structs.swarm_rearrange import SwarmRearrange from swarms.structs.swarm_router import ( SwarmRouter, SwarmType, @@ -107,32 +104,11 @@ from swarms.structs.swarming_architectures import ( staircase_swarm, star_swarm, ) -from swarms.structs.hierarchical_structured_communication_framework import ( - HierarchicalStructuredCommunicationFramework, - HierarchicalStructuredCommunicationGenerator, - HierarchicalStructuredCommunicationEvaluator, - HierarchicalStructuredCommunicationRefiner, - HierarchicalStructuredCommunicationSupervisor, - StructuredMessage, - HierarchicalOrder, - EvaluationResult, - StructuredMessageSchema, - EvaluationResultSchema, - GeneratorResponseSchema, - EvaluatorResponseSchema, - RefinerResponseSchema, - CommunicationType, - AgentRole, -) - -# Convenience alias(fixes old code if any was left out in the wild) -HierarchicalStructuredCommunicationSwarm = HierarchicalStructuredCommunicationFramework __all__ = [ "Agent", "BaseStructure", "BaseSwarm", - "BoardOfDirectorsSwarm", "ConcurrentWorkflow", "Conversation", "GroupChat", @@ -206,22 +182,6 @@ __all__ = [ "HierarchicalSwarm", "HeavySwarm", "CronJob", - "HierarchicalStructuredCommunicationSwarm", - "HierarchicalStructuredCommunicationGenerator", - "HierarchicalStructuredCommunicationEvaluator", - "HierarchicalStructuredCommunicationRefiner", - "HierarchicalStructuredCommunicationSupervisor", - "StructuredMessage", - "HierarchicalOrder", - "EvaluationResult", - "StructuredMessageSchema", - "EvaluationResultSchema", - "GeneratorResponseSchema", - "EvaluatorResponseSchema", - "RefinerResponseSchema", - "CommunicationType", - "AgentRole", - # Stopping conditions "check_done", "check_finished", "check_complete", diff --git a/swarms/structs/board_of_directors_swarm.py b/swarms/structs/board_of_directors_swarm.py index 7dbf0d34..6c99db4e 100644 --- a/swarms/structs/board_of_directors_swarm.py +++ b/swarms/structs/board_of_directors_swarm.py @@ -19,7 +19,6 @@ Flow: 6. All context and conversation history is preserved throughout the process """ -import asyncio import json import os import re @@ -34,7 +33,6 @@ from loguru import logger from pydantic import BaseModel, Field from swarms.structs.agent import Agent -from swarms.structs.base_swarm import BaseSwarm from swarms.structs.conversation import Conversation from swarms.structs.ma_utils import list_all_agents from swarms.utils.history_output_formatter import ( @@ -54,21 +52,6 @@ board_logger = initialize_logger( # ============================================================================ -class BoardFeatureStatus(str, Enum): - """Enumeration of Board of Directors feature status. - - This enum defines the possible states of the Board of Directors feature - within the Swarms Framework. - - Attributes: - ENABLED: Feature is explicitly enabled - DISABLED: Feature is explicitly disabled - AUTO: Feature state is determined automatically - """ - - ENABLED = "enabled" - DISABLED = "disabled" - AUTO = "auto" class BoardConfigModel(BaseModel): @@ -91,12 +74,6 @@ class BoardConfigModel(BaseModel): custom_board_templates: Custom board templates for different use cases """ - # Feature control - board_feature_enabled: bool = Field( - default=False, - description="Whether the Board of Directors feature is enabled globally.", - ) - # Board composition default_board_size: int = Field( default=3, @@ -201,9 +178,6 @@ class BoardConfig: ): self._load_from_file() - # Override with environment variables - self._load_from_environment() - # Override with explicit config data if self.config_data: self._load_from_dict(self.config_data) @@ -236,62 +210,6 @@ class BoardConfig: ) raise - def _load_from_environment(self) -> None: - """ - Load configuration from environment variables. - - This method maps environment variables to configuration parameters - and handles type conversion appropriately. - """ - env_mappings = { - "SWARMS_BOARD_FEATURE_ENABLED": "board_feature_enabled", - "SWARMS_BOARD_DEFAULT_SIZE": "default_board_size", - "SWARMS_BOARD_DECISION_THRESHOLD": "decision_threshold", - "SWARMS_BOARD_ENABLE_VOTING": "enable_voting", - "SWARMS_BOARD_ENABLE_CONSENSUS": "enable_consensus", - "SWARMS_BOARD_DEFAULT_MODEL": "default_board_model", - "SWARMS_BOARD_VERBOSE_LOGGING": "verbose_logging", - "SWARMS_BOARD_MAX_MEETING_DURATION": "max_board_meeting_duration", - "SWARMS_BOARD_AUTO_FALLBACK": "auto_fallback_to_director", - } - - for env_var, config_key in env_mappings.items(): - value = os.getenv(env_var) - if value is not None: - try: - # Convert string values to appropriate types - if config_key in [ - "board_feature_enabled", - "enable_voting", - "enable_consensus", - "verbose_logging", - "auto_fallback_to_director", - ]: - converted_value = value.lower() in [ - "true", - "1", - "yes", - "on", - ] - elif config_key in [ - "default_board_size", - "max_board_meeting_duration", - ]: - converted_value = int(value) - elif config_key in ["decision_threshold"]: - converted_value = float(value) - else: - converted_value = value - - setattr(self.config, config_key, converted_value) - logger.debug( - f"Loaded {config_key} from environment: {converted_value}" - ) - except (ValueError, TypeError) as e: - logger.warning( - f"Failed to parse environment variable {env_var}: {e}" - ) - def _load_from_dict(self, config_dict: Dict[str, Any]) -> None: """ Load configuration from dictionary. @@ -312,15 +230,6 @@ class BoardConfig: f"Invalid configuration value for {key}: {e}" ) - def is_enabled(self) -> bool: - """ - Check if the Board of Directors feature is enabled. - - Returns: - bool: True if the feature is enabled, False otherwise - """ - return self.config.board_feature_enabled - def get_config(self) -> BoardConfigModel: """ Get the current configuration. @@ -562,63 +471,6 @@ def get_board_config( return _board_config -def enable_board_feature( - config_file_path: Optional[str] = None, -) -> None: - """ - Enable the Board of Directors feature globally. - - This function enables the Board of Directors feature and saves the configuration - to the specified file path. - - Args: - config_file_path: Optional path to save the configuration - """ - config = get_board_config(config_file_path) - config.update_config({"board_feature_enabled": True}) - - if config_file_path: - config.save_config(config_file_path) - - logger.info("Board of Directors feature enabled") - - -def disable_board_feature( - config_file_path: Optional[str] = None, -) -> None: - """ - Disable the Board of Directors feature globally. - - This function disables the Board of Directors feature and saves the configuration - to the specified file path. - - Args: - config_file_path: Optional path to save the configuration - """ - config = get_board_config(config_file_path) - config.update_config({"board_feature_enabled": False}) - - if config_file_path: - config.save_config(config_file_path) - - logger.info("Board of Directors feature disabled") - - -def is_board_feature_enabled( - config_file_path: Optional[str] = None, -) -> bool: - """ - Check if the Board of Directors feature is enabled. - - Args: - config_file_path: Optional path to configuration file - - Returns: - bool: True if the feature is enabled, False otherwise - """ - config = get_board_config(config_file_path) - return config.is_enabled() - def create_default_config_file( file_path: str = "swarms_board_config.yaml", @@ -953,7 +805,7 @@ class BoardSpec(BaseModel): ) -class BoardOfDirectorsSwarm(BaseSwarm): +class BoardOfDirectorsSwarm: """ A hierarchical swarm of agents with a Board of Directors that orchestrates tasks. @@ -1029,13 +881,8 @@ class BoardOfDirectorsSwarm(BaseSwarm): Raises: ValueError: If critical requirements are not met during initialization """ - super().__init__( - name=name, - description=description, - agents=agents, - ) - self.name = name + self.description = description self.board_members = board_members or [] self.agents = agents or [] self.max_loops = max_loops @@ -1047,9 +894,8 @@ class BoardOfDirectorsSwarm(BaseSwarm): self.decision_threshold = decision_threshold self.enable_voting = enable_voting self.enable_consensus = enable_consensus - self.max_workers = max_workers or min( - 32, (os.cpu_count() or 1) + 4 - ) + self.max_workers = max_workers + self.max_workers = os.cpu_count() # Initialize the swarm self._init_board_swarm() @@ -1258,14 +1104,6 @@ You should be thorough, organized, and detail-oriented in your documentation.""" f"🔍 Running reliability checks for swarm: {self.name}" ) - # Check if Board of Directors feature is enabled - board_config = get_board_config() - if not board_config.is_enabled(): - raise ValueError( - "Board of Directors feature is not enabled. Please enable it using " - "enable_board_feature() or set SWARMS_BOARD_FEATURE_ENABLED=true environment variable." - ) - if not self.agents or len(self.agents) == 0: raise ValueError( "No agents found in the swarm. At least one agent must be provided to create a Board of Directors swarm." @@ -1687,34 +1525,6 @@ Please provide your response in the following format: board_logger.error(error_msg) raise - async def arun( - self, - task: str, - img: Optional[str] = None, - *args: Any, - **kwargs: Any, - ) -> Any: - """ - Run the Board of Directors swarm asynchronously. - - This method provides an asynchronous interface for running the swarm, - allowing for non-blocking execution in async contexts. - - Args: - task: The task to be executed - img: Optional image input - *args: Additional positional arguments - **kwargs: Additional keyword arguments - - Returns: - Any: The final result of the swarm execution - """ - loop = asyncio.get_event_loop() - result = await loop.run_in_executor( - None, self.run, task, img, *args, **kwargs - ) - return result - def _generate_board_feedback(self, outputs: List[Any]) -> str: """ Provide feedback from the Board of Directors based on agent outputs. diff --git a/swarms/structs/hierarchical_structured_communication_framework.py b/swarms/structs/hierarchical_structured_communication_framework.py index 41b4b663..ba817ca3 100644 --- a/swarms/structs/hierarchical_structured_communication_framework.py +++ b/swarms/structs/hierarchical_structured_communication_framework.py @@ -21,43 +21,46 @@ Key Features: """ import traceback -import time -from typing import Any, Callable, Dict, List, Literal, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union from dataclasses import dataclass from enum import Enum from pydantic import BaseModel, Field from rich.console import Console from rich.panel import Panel -from rich.text import Text from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table -from rich import print as rprint from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm -from swarms.structs.conversation import Conversation from swarms.utils.loguru_logger import initialize_logger from swarms.utils.output_types import OutputType # Initialize rich console for enhanced output console = Console() -logger = initialize_logger(log_folder="hierarchical_structured_communication_framework") +logger = initialize_logger( + log_folder="hierarchical_structured_communication_framework" +) # ============================================================================= # ENUMS AND DATA MODELS # ============================================================================= + class CommunicationType(str, Enum): """Types of communication in the structured protocol""" + MESSAGE = "message" # M_ij: Specific task instructions BACKGROUND = "background" # B_ij: Context and problem background - INTERMEDIATE_OUTPUT = "intermediate_output" # I_ij: Intermediate results + INTERMEDIATE_OUTPUT = ( + "intermediate_output" # I_ij: Intermediate results + ) class AgentRole(str, Enum): """Roles for agents in the hierarchical system""" + SUPERVISOR = "supervisor" GENERATOR = "generator" EVALUATOR = "evaluator" @@ -68,9 +71,16 @@ class AgentRole(str, Enum): @dataclass class StructuredMessage: """Structured communication message following HierarchicalStructuredComm protocol""" - message: str = Field(description="Specific task instructions (M_ij)") - background: str = Field(description="Context and problem background (B_ij)") - intermediate_output: str = Field(description="Intermediate results (I_ij)") + + message: str = Field( + description="Specific task instructions (M_ij)" + ) + background: str = Field( + description="Context and problem background (B_ij)" + ) + intermediate_output: str = Field( + description="Intermediate results (I_ij)" + ) sender: str = Field(description="Name of the sending agent") recipient: str = Field(description="Name of the receiving agent") timestamp: Optional[str] = None @@ -78,24 +88,26 @@ class StructuredMessage: class HierarchicalOrder(BaseModel): """Order structure for hierarchical task assignment""" - agent_name: str = Field(description="Name of the agent to receive the task") + + agent_name: str = Field( + description="Name of the agent to receive the task" + ) task: str = Field(description="Specific task description") communication_type: CommunicationType = Field( default=CommunicationType.MESSAGE, - description="Type of communication to use" + description="Type of communication to use", ) background_context: str = Field( - default="", - description="Background context for the task" + default="", description="Background context for the task" ) intermediate_output: str = Field( - default="", - description="Intermediate output to pass along" + default="", description="Intermediate output to pass along" ) class EvaluationResult(BaseModel): """Result from evaluation team member""" + evaluator_name: str = Field(description="Name of the evaluator") criterion: str = Field(description="Evaluation criterion") score: float = Field(description="Evaluation score") @@ -107,77 +119,142 @@ class EvaluationResult(BaseModel): # SCHEMAS # ============================================================================= + class StructuredMessageSchema(BaseModel): """Schema for structured communication messages""" - message: str = Field(description="Specific task instructions (M_ij)", min_length=1) - background: str = Field(description="Context and problem background (B_ij)", default="") - intermediate_output: str = Field(description="Intermediate results (I_ij)", default="") - sender: str = Field(description="Name of the sending agent", min_length=1) - recipient: str = Field(description="Name of the receiving agent", min_length=1) - timestamp: Optional[str] = Field(description="Timestamp of the message", default=None) - communication_type: CommunicationType = Field(description="Type of communication", default=CommunicationType.MESSAGE) + + message: str = Field( + description="Specific task instructions (M_ij)", min_length=1 + ) + background: str = Field( + description="Context and problem background (B_ij)", + default="", + ) + intermediate_output: str = Field( + description="Intermediate results (I_ij)", default="" + ) + sender: str = Field( + description="Name of the sending agent", min_length=1 + ) + recipient: str = Field( + description="Name of the receiving agent", min_length=1 + ) + timestamp: Optional[str] = Field( + description="Timestamp of the message", default=None + ) + communication_type: CommunicationType = Field( + description="Type of communication", + default=CommunicationType.MESSAGE, + ) class EvaluationResultSchema(BaseModel): """Schema for evaluation results""" - criterion: str = Field(description="Evaluation criterion", min_length=1) - score: float = Field(description="Evaluation score (0-10)", ge=0.0, le=10.0) - feedback: str = Field(description="Detailed feedback", min_length=1) - confidence: float = Field(description="Confidence level (0-1)", ge=0.0, le=1.0) - reasoning: str = Field(description="Evaluation reasoning", default="") - suggestions: List[str] = Field(description="Improvement suggestions", default=[]) + + criterion: str = Field( + description="Evaluation criterion", min_length=1 + ) + score: float = Field( + description="Evaluation score (0-10)", ge=0.0, le=10.0 + ) + feedback: str = Field( + description="Detailed feedback", min_length=1 + ) + confidence: float = Field( + description="Confidence level (0-1)", ge=0.0, le=1.0 + ) + reasoning: str = Field( + description="Evaluation reasoning", default="" + ) + suggestions: List[str] = Field( + description="Improvement suggestions", default=[] + ) class GeneratorResponseSchema(BaseModel): """Schema for generator responses""" - content: str = Field(description="Generated content", min_length=1) - intermediate_output: str = Field(description="Intermediate output for next agent", default="") - reasoning: str = Field(description="Generation reasoning", default="") - confidence: float = Field(description="Confidence level (0-1)", ge=0.0, le=1.0) + + content: str = Field( + description="Generated content", min_length=1 + ) + intermediate_output: str = Field( + description="Intermediate output for next agent", default="" + ) + reasoning: str = Field( + description="Generation reasoning", default="" + ) + confidence: float = Field( + description="Confidence level (0-1)", ge=0.0, le=1.0 + ) class EvaluatorResponseSchema(BaseModel): """Schema for evaluator responses""" - criterion: str = Field(description="Evaluation criterion", min_length=1) - score: float = Field(description="Evaluation score (0-10)", ge=0.0, le=10.0) - feedback: str = Field(description="Detailed feedback", min_length=1) - confidence: float = Field(description="Confidence level (0-1)", ge=0.0, le=1.0) - reasoning: str = Field(description="Evaluation reasoning", default="") - suggestions: List[str] = Field(description="Improvement suggestions", default=[]) + + criterion: str = Field( + description="Evaluation criterion", min_length=1 + ) + score: float = Field( + description="Evaluation score (0-10)", ge=0.0, le=10.0 + ) + feedback: str = Field( + description="Detailed feedback", min_length=1 + ) + confidence: float = Field( + description="Confidence level (0-1)", ge=0.0, le=1.0 + ) + reasoning: str = Field( + description="Evaluation reasoning", default="" + ) + suggestions: List[str] = Field( + description="Improvement suggestions", default=[] + ) class RefinerResponseSchema(BaseModel): """Schema for refiner responses""" - refined_content: str = Field(description="Refined content", min_length=1) - changes_made: List[str] = Field(description="List of changes made", default=[]) - reasoning: str = Field(description="Refinement reasoning", default="") - confidence: float = Field(description="Confidence level (0-1)", ge=0.0, le=1.0) - feedback_addressed: List[str] = Field(description="Feedback points addressed", default=[]) + + refined_content: str = Field( + description="Refined content", min_length=1 + ) + changes_made: List[str] = Field( + description="List of changes made", default=[] + ) + reasoning: str = Field( + description="Refinement reasoning", default="" + ) + confidence: float = Field( + description="Confidence level (0-1)", ge=0.0, le=1.0 + ) + feedback_addressed: List[str] = Field( + description="Feedback points addressed", default=[] + ) # ============================================================================= # SPECIALIZED AGENT CLASSES # ============================================================================= + class HierarchicalStructuredCommunicationGenerator(Agent): """ Generator agent for Hierarchical Structured Communication Framework - + This agent specializes in creating initial content following the structured communication protocol with Message (M_ij), Background (B_ij), and Intermediate Output (I_ij). """ - + def __init__( self, agent_name: str = "TalkHierGenerator", system_prompt: Optional[str] = None, model_name: str = "gpt-4o-mini", verbose: bool = False, - **kwargs + **kwargs, ): """ Initialize the HierarchicalStructuredCommunication Generator agent - + Args: agent_name: Name of the agent system_prompt: Custom system prompt @@ -186,15 +263,15 @@ class HierarchicalStructuredCommunicationGenerator(Agent): """ if system_prompt is None: system_prompt = self._get_default_generator_prompt() - + super().__init__( agent_name=agent_name, system_prompt=system_prompt, model_name=model_name, verbose=verbose, - **kwargs + **kwargs, ) - + def _get_default_generator_prompt(self) -> str: """Get the default system prompt for generator agents""" return """ @@ -233,129 +310,134 @@ Confidence: [0.0-1.0 confidence level] Always maintain high quality and provide detailed, actionable content. """ - + def generate_with_structure( self, message: str, background: str = "", intermediate_output: str = "", - **kwargs + **kwargs, ) -> GeneratorResponseSchema: """ Generate content using structured communication protocol - + Args: message: Specific task message (M_ij) background: Background context (B_ij) intermediate_output: Intermediate output (I_ij) - + Returns: GeneratorResponseSchema with structured response """ try: # Construct structured prompt - prompt = self._construct_structured_prompt(message, background, intermediate_output) - + prompt = self._construct_structured_prompt( + message, background, intermediate_output + ) + # Generate response response = self.run(prompt, **kwargs) - + # Parse and structure response return self._parse_generator_response(response) - + except Exception as e: logger.error(f"Error in structured generation: {e}") return GeneratorResponseSchema( content=f"Error generating content: {e}", intermediate_output="", reasoning="Error occurred during generation", - confidence=0.0 + confidence=0.0, ) - + def _construct_structured_prompt( - self, - message: str, - background: str, - intermediate_output: str + self, message: str, background: str, intermediate_output: str ) -> str: """Construct a structured prompt for generation""" prompt_parts = [] - + if message: prompt_parts.append(f"**Task Message (M_ij):** {message}") - + if background: - prompt_parts.append(f"**Background Context (B_ij):** {background}") - + prompt_parts.append( + f"**Background Context (B_ij):** {background}" + ) + if intermediate_output: - prompt_parts.append(f"**Intermediate Output (I_ij):** {intermediate_output}") - + prompt_parts.append( + f"**Intermediate Output (I_ij):** {intermediate_output}" + ) + prompt = "\n\n".join(prompt_parts) prompt += "\n\nPlease generate content following the structured response format." - + return prompt - - def _parse_generator_response(self, response: str) -> GeneratorResponseSchema: + + def _parse_generator_response( + self, response: str + ) -> GeneratorResponseSchema: """Parse the generator response into structured format""" try: - lines = response.split('\n') + lines = response.split("\n") content = "" intermediate_output = "" reasoning = "" confidence = 0.8 # Default confidence - + current_section = None - + for line in lines: line = line.strip() if not line: continue - - if line.lower().startswith('content:'): - current_section = 'content' + + if line.lower().startswith("content:"): + current_section = "content" content = line[8:].strip() - elif line.lower().startswith('intermediate output:'): - current_section = 'intermediate' + elif line.lower().startswith("intermediate output:"): + current_section = "intermediate" intermediate_output = line[20:].strip() - elif line.lower().startswith('reasoning:'): - current_section = 'reasoning' + elif line.lower().startswith("reasoning:"): + current_section = "reasoning" reasoning = line[10:].strip() - elif line.lower().startswith('confidence:'): + elif line.lower().startswith("confidence:"): try: confidence = float(line[11:].strip()) except ValueError: confidence = 0.8 - elif current_section == 'content': + elif current_section == "content": content += " " + line - elif current_section == 'intermediate': + elif current_section == "intermediate": intermediate_output += " " + line - elif current_section == 'reasoning': + elif current_section == "reasoning": reasoning += " " + line - + return GeneratorResponseSchema( content=content or response, intermediate_output=intermediate_output, reasoning=reasoning, - confidence=confidence + confidence=confidence, ) - + except Exception as e: logger.error(f"Error parsing generator response: {e}") return GeneratorResponseSchema( content=response, intermediate_output="", reasoning="Error parsing response", - confidence=0.5 + confidence=0.5, ) class HierarchicalStructuredCommunicationEvaluator(Agent): """ Evaluator agent for Hierarchical Structured Communication Framework - + This agent specializes in evaluating content using specific criteria and providing structured feedback following the hierarchical evaluation system. """ - + def __init__( self, agent_name: str = "TalkHierEvaluator", @@ -363,11 +445,11 @@ class HierarchicalStructuredCommunicationEvaluator(Agent): model_name: str = "gpt-4o-mini", verbose: bool = False, evaluation_criteria: List[str] = None, - **kwargs + **kwargs, ): """ Initialize the HierarchicalStructuredCommunication Evaluator agent - + Args: agent_name: Name of the agent system_prompt: Custom system prompt @@ -376,25 +458,41 @@ class HierarchicalStructuredCommunicationEvaluator(Agent): evaluation_criteria: List of evaluation criteria this agent can assess """ if system_prompt is None: - system_prompt = self._get_default_evaluator_prompt(evaluation_criteria) - + system_prompt = self._get_default_evaluator_prompt( + evaluation_criteria + ) + super().__init__( agent_name=agent_name, system_prompt=system_prompt, model_name=model_name, verbose=verbose, - **kwargs + **kwargs, ) - - self.evaluation_criteria = evaluation_criteria or ["accuracy", "completeness", "clarity", "relevance"] - - def _get_default_evaluator_prompt(self, criteria: List[str] = None) -> str: + + self.evaluation_criteria = evaluation_criteria or [ + "accuracy", + "completeness", + "clarity", + "relevance", + ] + + def _get_default_evaluator_prompt( + self, criteria: List[str] = None + ) -> str: """Get the default system prompt for evaluator agents""" if criteria is None: - criteria = ["accuracy", "completeness", "clarity", "relevance"] - - criteria_text = "\n".join([f"- {criterion}" for criterion in criteria]) - + criteria = [ + "accuracy", + "completeness", + "clarity", + "relevance", + ] + + criteria_text = "\n".join( + [f"- {criterion}" for criterion in criteria] + ) + return f""" You are an Evaluator agent in a Hierarchical Structured Communication Framework. @@ -435,33 +533,32 @@ Suggestions: [Specific improvement suggestions] Be thorough, fair, and constructive in your evaluations. """ - + def evaluate_with_criterion( - self, - content: str, - criterion: str, - **kwargs + self, content: str, criterion: str, **kwargs ) -> EvaluatorResponseSchema: """ Evaluate content using a specific criterion - + Args: content: Content to evaluate criterion: Specific evaluation criterion - + Returns: EvaluatorResponseSchema with evaluation results """ try: # Construct evaluation prompt - prompt = self._construct_evaluation_prompt(content, criterion) - + prompt = self._construct_evaluation_prompt( + content, criterion + ) + # Get evaluation response response = self.run(prompt, **kwargs) - + # Parse and structure response return self._parse_evaluator_response(response, criterion) - + except Exception as e: logger.error(f"Error in evaluation: {e}") return EvaluatorResponseSchema( @@ -470,10 +567,15 @@ Be thorough, fair, and constructive in your evaluations. feedback=f"Error during evaluation: {e}", confidence=0.0, reasoning="Error occurred during evaluation", - suggestions=["Fix technical issues", "Retry evaluation"] + suggestions=[ + "Fix technical issues", + "Retry evaluation", + ], ) - - def _construct_evaluation_prompt(self, content: str, criterion: str) -> str: + + def _construct_evaluation_prompt( + self, content: str, criterion: str + ) -> str: """Construct an evaluation prompt""" return f""" **Content to Evaluate:** @@ -486,61 +588,63 @@ Please evaluate the content above based on the {criterion} criterion. Provide your evaluation following the structured response format. """ - - def _parse_evaluator_response(self, response: str, criterion: str) -> EvaluatorResponseSchema: + + def _parse_evaluator_response( + self, response: str, criterion: str + ) -> EvaluatorResponseSchema: """Parse the evaluator response into structured format""" try: - lines = response.split('\n') + lines = response.split("\n") score = 5.0 # Default score feedback = "" confidence = 0.8 # Default confidence reasoning = "" suggestions = [] - + current_section = None - + for line in lines: line = line.strip() if not line: continue - - if line.lower().startswith('score:'): + + if line.lower().startswith("score:"): try: score = float(line[6:].strip()) except ValueError: score = 5.0 - elif line.lower().startswith('feedback:'): - current_section = 'feedback' + elif line.lower().startswith("feedback:"): + current_section = "feedback" feedback = line[9:].strip() - elif line.lower().startswith('confidence:'): + elif line.lower().startswith("confidence:"): try: confidence = float(line[11:].strip()) except ValueError: confidence = 0.8 - elif line.lower().startswith('reasoning:'): - current_section = 'reasoning' + elif line.lower().startswith("reasoning:"): + current_section = "reasoning" reasoning = line[10:].strip() - elif line.lower().startswith('suggestions:'): - current_section = 'suggestions' - elif current_section == 'feedback': + elif line.lower().startswith("suggestions:"): + current_section = "suggestions" + elif current_section == "feedback": feedback += " " + line - elif current_section == 'reasoning': + elif current_section == "reasoning": reasoning += " " + line - elif current_section == 'suggestions': - if line.startswith('-') or line.startswith('•'): + elif current_section == "suggestions": + if line.startswith("-") or line.startswith("•"): suggestions.append(line[1:].strip()) else: suggestions.append(line) - + return EvaluatorResponseSchema( criterion=criterion, score=score, feedback=feedback or "No feedback provided", confidence=confidence, reasoning=reasoning, - suggestions=suggestions + suggestions=suggestions, ) - + except Exception as e: logger.error(f"Error parsing evaluator response: {e}") return EvaluatorResponseSchema( @@ -549,29 +653,29 @@ Provide your evaluation following the structured response format. feedback="Error parsing evaluation response", confidence=0.0, reasoning="Error occurred during parsing", - suggestions=["Fix parsing issues"] + suggestions=["Fix parsing issues"], ) class HierarchicalStructuredCommunicationRefiner(Agent): """ Refiner agent for Hierarchical Structured Communication Framework - + This agent specializes in improving content based on evaluation feedback and maintaining the structured communication protocol. """ - + def __init__( self, agent_name: str = "TalkHierRefiner", system_prompt: Optional[str] = None, model_name: str = "gpt-4o-mini", verbose: bool = False, - **kwargs + **kwargs, ): """ Initialize the HierarchicalStructuredCommunication Refiner agent - + Args: agent_name: Name of the agent system_prompt: Custom system prompt @@ -580,15 +684,15 @@ class HierarchicalStructuredCommunicationRefiner(Agent): """ if system_prompt is None: system_prompt = self._get_default_refiner_prompt() - + super().__init__( agent_name=agent_name, system_prompt=system_prompt, model_name=model_name, verbose=verbose, - **kwargs + **kwargs, ) - + def _get_default_refiner_prompt(self) -> str: """Get the default system prompt for refiner agents""" return """ @@ -625,33 +729,37 @@ Feedback Addressed: [Which feedback points were addressed] Focus on meaningful improvements that directly address the evaluation feedback. """ - + def refine_with_feedback( self, original_content: str, evaluation_results: List[EvaluationResultSchema], - **kwargs + **kwargs, ) -> RefinerResponseSchema: """ Refine content based on evaluation feedback - + Args: original_content: Original content to refine evaluation_results: List of evaluation results with feedback - + Returns: RefinerResponseSchema with refined content """ try: # Construct refinement prompt - prompt = self._construct_refinement_prompt(original_content, evaluation_results) - + prompt = self._construct_refinement_prompt( + original_content, evaluation_results + ) + # Get refinement response response = self.run(prompt, **kwargs) - + # Parse and structure response - return self._parse_refiner_response(response, evaluation_results) - + return self._parse_refiner_response( + response, evaluation_results + ) + except Exception as e: logger.error(f"Error in refinement: {e}") return RefinerResponseSchema( @@ -659,20 +767,22 @@ Focus on meaningful improvements that directly address the evaluation feedback. changes_made=["Error occurred during refinement"], reasoning=f"Error during refinement: {e}", confidence=0.0, - feedback_addressed=[] + feedback_addressed=[], ) - + def _construct_refinement_prompt( self, original_content: str, - evaluation_results: List[EvaluationResultSchema] + evaluation_results: List[EvaluationResultSchema], ) -> str: """Construct a refinement prompt""" - feedback_summary = "\n\n".join([ - f"**{result.criterion} (Score: {result.score}/10):**\n{result.feedback}" - for result in evaluation_results - ]) - + feedback_summary = "\n\n".join( + [ + f"**{result.criterion} (Score: {result.score}/10):**\n{result.feedback}" + for result in evaluation_results + ] + ) + return f""" **Original Content:** {original_content} @@ -684,70 +794,70 @@ Please refine the content to address the feedback while maintaining its core str Provide your refinement following the structured response format. """ - + def _parse_refiner_response( self, response: str, - evaluation_results: List[EvaluationResultSchema] + evaluation_results: List[EvaluationResultSchema], ) -> RefinerResponseSchema: """Parse the refiner response into structured format""" try: - lines = response.split('\n') + lines = response.split("\n") refined_content = "" changes_made = [] reasoning = "" confidence = 0.8 # Default confidence feedback_addressed = [] - + current_section = None - + for line in lines: line = line.strip() if not line: continue - - if line.lower().startswith('refined content:'): - current_section = 'content' + + if line.lower().startswith("refined content:"): + current_section = "content" refined_content = line[16:].strip() - elif line.lower().startswith('changes made:'): - current_section = 'changes' - elif line.lower().startswith('reasoning:'): - current_section = 'reasoning' + elif line.lower().startswith("changes made:"): + current_section = "changes" + elif line.lower().startswith("reasoning:"): + current_section = "reasoning" reasoning = line[10:].strip() - elif line.lower().startswith('confidence:'): + elif line.lower().startswith("confidence:"): try: confidence = float(line[11:].strip()) except ValueError: confidence = 0.8 - elif line.lower().startswith('feedback addressed:'): - current_section = 'feedback' - elif current_section == 'content': + elif line.lower().startswith("feedback addressed:"): + current_section = "feedback" + elif current_section == "content": refined_content += " " + line - elif current_section == 'changes': - if line.startswith('-') or line.startswith('•'): + elif current_section == "changes": + if line.startswith("-") or line.startswith("•"): changes_made.append(line[1:].strip()) else: changes_made.append(line) - elif current_section == 'reasoning': + elif current_section == "reasoning": reasoning += " " + line - elif current_section == 'feedback': - if line.startswith('-') or line.startswith('•'): + elif current_section == "feedback": + if line.startswith("-") or line.startswith("•"): feedback_addressed.append(line[1:].strip()) else: feedback_addressed.append(line) - + # If no refined content found, use original if not refined_content: refined_content = response - + return RefinerResponseSchema( refined_content=refined_content, changes_made=changes_made, reasoning=reasoning, confidence=confidence, - feedback_addressed=feedback_addressed + feedback_addressed=feedback_addressed, ) - + except Exception as e: logger.error(f"Error parsing refiner response: {e}") return RefinerResponseSchema( @@ -755,29 +865,29 @@ Provide your refinement following the structured response format. changes_made=["Error parsing response"], reasoning="Error occurred during parsing", confidence=0.0, - feedback_addressed=[] + feedback_addressed=[], ) class HierarchicalStructuredCommunicationSupervisor(Agent): """ Supervisor agent for Hierarchical Structured Communication Framework - + This agent coordinates the overall workflow and manages structured communication between different agent types. """ - + def __init__( self, agent_name: str = "TalkHierSupervisor", system_prompt: Optional[str] = None, model_name: str = "gpt-4o-mini", verbose: bool = False, - **kwargs + **kwargs, ): """ Initialize the HierarchicalStructuredCommunication Supervisor agent - + Args: agent_name: Name of the agent system_prompt: Custom system prompt @@ -786,15 +896,15 @@ class HierarchicalStructuredCommunicationSupervisor(Agent): """ if system_prompt is None: system_prompt = self._get_default_supervisor_prompt() - + super().__init__( agent_name=agent_name, system_prompt=system_prompt, model_name=model_name, verbose=verbose, - **kwargs + **kwargs, ) - + def _get_default_supervisor_prompt(self) -> str: """Get the default system prompt for supervisor agents""" return """ @@ -838,33 +948,32 @@ Reasoning: [Why this decision was made] Focus on efficient coordination and high-quality outcomes. """ - + def coordinate_workflow( - self, - task: str, - current_state: Dict[str, Any], - **kwargs + self, task: str, current_state: Dict[str, Any], **kwargs ) -> Dict[str, Any]: """ Coordinate the workflow and determine next actions - + Args: task: Current task being processed current_state: Current state of the workflow - + Returns: Dictionary with coordination decisions """ try: # Construct coordination prompt - prompt = self._construct_coordination_prompt(task, current_state) - + prompt = self._construct_coordination_prompt( + task, current_state + ) + # Get coordination response response = self.run(prompt, **kwargs) - + # Parse and structure response return self._parse_coordination_response(response) - + except Exception as e: logger.error(f"Error in workflow coordination: {e}") return { @@ -873,16 +982,20 @@ Focus on efficient coordination and high-quality outcomes. "structured_message": f"Error in coordination: {e}", "background_context": "", "intermediate_output": "", - "reasoning": "Error occurred during coordination" + "reasoning": "Error occurred during coordination", } - - def _construct_coordination_prompt(self, task: str, current_state: Dict[str, Any]) -> str: + + def _construct_coordination_prompt( + self, task: str, current_state: Dict[str, Any] + ) -> str: """Construct a coordination prompt""" - state_summary = "\n".join([ - f"- {key}: {value}" - for key, value in current_state.items() - ]) - + state_summary = "\n".join( + [ + f"- {key}: {value}" + for key, value in current_state.items() + ] + ) + return f""" **Current Task:** {task} @@ -894,54 +1007,56 @@ Please coordinate the workflow and determine the next action. Provide your coordination decision following the structured response format. """ - - def _parse_coordination_response(self, response: str) -> Dict[str, Any]: + + def _parse_coordination_response( + self, response: str + ) -> Dict[str, Any]: """Parse the coordination response""" try: - lines = response.split('\n') + lines = response.split("\n") result = { "next_action": "continue", "target_agent": "generator", "structured_message": "", "background_context": "", "intermediate_output": "", - "reasoning": "" + "reasoning": "", } - + current_section = None - + for line in lines: line = line.strip() if not line: continue - - if line.lower().startswith('next action:'): + + if line.lower().startswith("next action:"): result["next_action"] = line[12:].strip() - elif line.lower().startswith('target agent:'): + elif line.lower().startswith("target agent:"): result["target_agent"] = line[13:].strip() - elif line.lower().startswith('structured message:'): - current_section = 'message' + elif line.lower().startswith("structured message:"): + current_section = "message" result["structured_message"] = line[19:].strip() - elif line.lower().startswith('background context:'): - current_section = 'background' + elif line.lower().startswith("background context:"): + current_section = "background" result["background_context"] = line[19:].strip() - elif line.lower().startswith('intermediate output:'): - current_section = 'output' + elif line.lower().startswith("intermediate output:"): + current_section = "output" result["intermediate_output"] = line[20:].strip() - elif line.lower().startswith('reasoning:'): - current_section = 'reasoning' + elif line.lower().startswith("reasoning:"): + current_section = "reasoning" result["reasoning"] = line[10:].strip() - elif current_section == 'message': + elif current_section == "message": result["structured_message"] += " " + line - elif current_section == 'background': + elif current_section == "background": result["background_context"] += " " + line - elif current_section == 'output': + elif current_section == "output": result["intermediate_output"] += " " + line - elif current_section == 'reasoning': + elif current_section == "reasoning": result["reasoning"] += " " + line - + return result - + except Exception as e: logger.error(f"Error parsing coordination response: {e}") return { @@ -950,30 +1065,31 @@ Provide your coordination decision following the structured response format. "structured_message": "Error parsing response", "background_context": "", "intermediate_output": "", - "reasoning": "Error occurred during parsing" - } + "reasoning": "Error occurred during parsing", + } # ============================================================================= # MAIN SWARM ORCHESTRATOR # ============================================================================= + class HierarchicalStructuredCommunicationFramework(BaseSwarm): """ Talk Structurally, Act Hierarchically: A Collaborative Framework for LLM Multi-Agent Systems - + This is the main orchestrator class that implements the complete HierarchicalStructuredComm approach with: 1. Structured Communication Protocol 2. Hierarchical Refinement System 3. Graph-based Agent Orchestration - + Architecture: - Supervisor Agent: Coordinates the overall workflow - Generator Agents: Create initial content/solutions - Evaluator Team: Hierarchical evaluation with supervisor - Refiner Agents: Improve solutions based on feedback """ - + def __init__( self, name: str = "HierarchicalStructuredCommunicationFramework", @@ -982,7 +1098,9 @@ class HierarchicalStructuredCommunicationFramework(BaseSwarm): generators: List[Union[Agent, Callable, Any]] = None, evaluators: List[Union[Agent, Callable, Any]] = None, refiners: List[Union[Agent, Callable, Any]] = None, - evaluation_supervisor: Optional[Union[Agent, Callable, Any]] = None, + evaluation_supervisor: Optional[ + Union[Agent, Callable, Any] + ] = None, max_loops: int = 3, output_type: OutputType = "dict-all-except-first", supervisor_name: str = "Supervisor", @@ -1000,13 +1118,13 @@ class HierarchicalStructuredCommunicationFramework(BaseSwarm): ): """ Initialize the HierarchicalStructuredCommunicationFramework - + Args: name: Name of the swarm description: Description of the swarm supervisor: Main supervisor agent generators: List of generator agents - evaluators: List of evaluator agents + evaluators: List of evaluator agents refiners: List of refiner agents evaluation_supervisor: Supervisor for evaluation team max_loops: Maximum number of refinement loops @@ -1035,22 +1153,26 @@ class HierarchicalStructuredCommunicationFramework(BaseSwarm): self.supervisor_name = supervisor_name self.evaluation_supervisor_name = evaluation_supervisor_name self.verbose = verbose - self.enable_structured_communication = enable_structured_communication - self.enable_hierarchical_evaluation = enable_hierarchical_evaluation + self.enable_structured_communication = ( + enable_structured_communication + ) + self.enable_hierarchical_evaluation = ( + enable_hierarchical_evaluation + ) self.shared_memory = shared_memory self.model_name = model_name self.use_ollama = use_ollama self.ollama_base_url = ollama_base_url self.ollama_api_key = ollama_api_key - + # Communication and state management self.conversation_history: List[StructuredMessage] = [] self.intermediate_outputs: Dict[str, str] = {} self.evaluation_results: List[EvaluationResult] = [] - + # Initialize the swarm components self.init_swarm() - + # Collect all agents for the parent class all_agents = [] if self.supervisor: @@ -1060,155 +1182,185 @@ class HierarchicalStructuredCommunicationFramework(BaseSwarm): all_agents.extend(self.refiners) if self.evaluation_supervisor: all_agents.append(self.evaluation_supervisor) - + # Call parent constructor with agents super().__init__(agents=all_agents, *args, **kwargs) - + def init_swarm(self): """Initialize the swarm components""" # Enhanced logging with rich formatting - console.print(Panel( - f"[bold blue]Initializing {self.name}[/bold blue]\n" - f"[dim]Framework: Talk Structurally, Act Hierarchically[/dim]", - title="Framework Initialization", - border_style="blue" - )) + console.print( + Panel( + f"[bold blue]Initializing {self.name}[/bold blue]\n" + f"[dim]Framework: Talk Structurally, Act Hierarchically[/dim]", + title="Framework Initialization", + border_style="blue", + ) + ) logger.info(f"Initializing {self.name}") - + # Setup supervisor if not provided if self.supervisor is None: self.supervisor = self._create_supervisor_agent() - + # Setup evaluation supervisor if not provided - if self.evaluation_supervisor is None and self.enable_hierarchical_evaluation: - self.evaluation_supervisor = self._create_evaluation_supervisor_agent() - + if ( + self.evaluation_supervisor is None + and self.enable_hierarchical_evaluation + ): + self.evaluation_supervisor = ( + self._create_evaluation_supervisor_agent() + ) + # Setup default agents if none provided if not self.generators: self.generators = [self._create_default_generator()] - - if not self.evaluators and self.enable_hierarchical_evaluation: + + if ( + not self.evaluators + and self.enable_hierarchical_evaluation + ): self.evaluators = [self._create_default_evaluator()] - + if not self.refiners: self.refiners = [self._create_default_refiner()] - + # Enhanced status display table = Table(title="Framework Components") table.add_column("Component", style="cyan", no_wrap=True) table.add_column("Count", style="magenta") table.add_column("Status", style="green") - - table.add_row("Generators", str(len(self.generators)), "Ready") - table.add_row("Evaluators", str(len(self.evaluators)), "Ready") + + table.add_row( + "Generators", str(len(self.generators)), "Ready" + ) + table.add_row( + "Evaluators", str(len(self.evaluators)), "Ready" + ) table.add_row("Refiners", str(len(self.refiners)), "Ready") - table.add_row("Supervisors", str(1 if self.supervisor else 0), "Ready") - + table.add_row( + "Supervisors", str(1 if self.supervisor else 0), "Ready" + ) + console.print(table) - - logger.info(f"Swarm initialized with {len(self.generators)} generators, " - f"{len(self.evaluators)} evaluators, {len(self.refiners)} refiners") - + + logger.info( + f"Swarm initialized with {len(self.generators)} generators, " + f"{len(self.evaluators)} evaluators, {len(self.refiners)} refiners" + ) + def _create_supervisor_agent(self) -> Agent: """Create the main supervisor agent""" supervisor_prompt = self._get_supervisor_prompt() - + agent_kwargs = { "agent_name": self.supervisor_name, "system_prompt": supervisor_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _create_evaluation_supervisor_agent(self) -> Agent: """Create the evaluation team supervisor""" - eval_supervisor_prompt = self._get_evaluation_supervisor_prompt() - + eval_supervisor_prompt = ( + self._get_evaluation_supervisor_prompt() + ) + agent_kwargs = { "agent_name": self.evaluation_supervisor_name, "system_prompt": eval_supervisor_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _create_default_generator(self) -> Agent: """Create a default generator agent""" generator_prompt = self._get_generator_prompt() - + agent_kwargs = { "agent_name": "Generator", "system_prompt": generator_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _create_default_evaluator(self) -> Agent: """Create a default evaluator agent""" evaluator_prompt = self._get_evaluator_prompt() - + agent_kwargs = { "agent_name": "Evaluator", "system_prompt": evaluator_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _create_default_refiner(self) -> Agent: """Create a default refiner agent""" refiner_prompt = self._get_refiner_prompt() - + agent_kwargs = { "agent_name": "Refiner", "system_prompt": refiner_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _get_supervisor_prompt(self) -> str: """Get the supervisor system prompt""" return f""" @@ -1231,7 +1383,7 @@ Available agents: Always provide structured communication with clear message, background context, and intermediate outputs. """ - + def _get_evaluation_supervisor_prompt(self) -> str: """Get the evaluation supervisor system prompt""" return f""" @@ -1250,7 +1402,7 @@ Evaluation criteria to coordinate: Always provide summarized, coordinated feedback that balances diverse evaluator inputs. """ - + def _get_generator_prompt(self) -> str: """Get the generator agent system prompt""" return """ @@ -1269,7 +1421,7 @@ When receiving tasks: Always structure your response clearly and provide sufficient detail for evaluation. """ - + def _get_evaluator_prompt(self) -> str: """Get the evaluator agent system prompt""" return """ @@ -1292,7 +1444,7 @@ Always provide: - Detailed feedback - Confidence level (0-1) """ - + def _get_refiner_prompt(self) -> str: """Get the refiner agent system prompt""" return """ @@ -1311,25 +1463,25 @@ When refining: Always explain your refinements and how they address the evaluation feedback. """ - + def send_structured_message( self, sender: str, recipient: str, message: str, background: str = "", - intermediate_output: str = "" + intermediate_output: str = "", ) -> StructuredMessage: """ Send a structured message following the HierarchicalStructuredComm protocol - + Args: sender: Name of the sending agent recipient: Name of the receiving agent message: Specific task message (M_ij) background: Background context (B_ij) intermediate_output: Intermediate output (I_ij) - + Returns: StructuredMessage object """ @@ -1338,184 +1490,234 @@ Always explain your refinements and how they address the evaluation feedback. background=background, intermediate_output=intermediate_output, sender=sender, - recipient=recipient + recipient=recipient, ) - + self.conversation_history.append(structured_msg) - + if self.verbose: # Enhanced structured message display - console.print(Panel( - f"[bold green]Message Sent[/bold green]\n" - f"[cyan]From:[/cyan] {sender}\n" - f"[cyan]To:[/cyan] {recipient}\n" - f"[cyan]Message:[/cyan] {message[:100]}{'...' if len(message) > 100 else ''}", - title="Structured Communication", - border_style="green" - )) - logger.info(f"Structured message sent from {sender} to {recipient}") + console.print( + Panel( + f"[bold green]Message Sent[/bold green]\n" + f"[cyan]From:[/cyan] {sender}\n" + f"[cyan]To:[/cyan] {recipient}\n" + f"[cyan]Message:[/cyan] {message[:100]}{'...' if len(message) > 100 else ''}", + title="Structured Communication", + border_style="green", + ) + ) + logger.info( + f"Structured message sent from {sender} to {recipient}" + ) logger.info(f"Message: {message[:100]}...") - + return structured_msg - + def run_hierarchical_evaluation( - self, - content: str, - evaluation_criteria: List[str] = None + self, content: str, evaluation_criteria: List[str] = None ) -> List[EvaluationResult]: """ Run hierarchical evaluation with multiple evaluators - + Args: content: Content to evaluate evaluation_criteria: List of evaluation criteria - + Returns: List of evaluation results """ if not self.enable_hierarchical_evaluation: return [] - + if evaluation_criteria is None: - evaluation_criteria = ["accuracy", "completeness", "clarity", "relevance"] - + evaluation_criteria = [ + "accuracy", + "completeness", + "clarity", + "relevance", + ] + results = [] - + # Run evaluations in parallel for i, evaluator in enumerate(self.evaluators): - criterion = evaluation_criteria[i % len(evaluation_criteria)] - + criterion = evaluation_criteria[ + i % len(evaluation_criteria) + ] + # Create structured message for evaluator eval_message = f"Evaluate the following content based on {criterion} criterion" eval_background = f"Evaluation criterion: {criterion}\nContent to evaluate: {content}" - + structured_msg = self.send_structured_message( sender=self.evaluation_supervisor_name, - recipient=evaluator.agent_name if hasattr(evaluator, 'agent_name') else f"Evaluator_{i}", + recipient=( + evaluator.agent_name + if hasattr(evaluator, "agent_name") + else f"Evaluator_{i}" + ), message=eval_message, background=eval_background, - intermediate_output=content + intermediate_output=content, ) - + # Get evaluation result try: - if hasattr(evaluator, 'run'): + if hasattr(evaluator, "run"): eval_response = evaluator.run( f"Evaluate this content for {criterion}:\n{content}\n\nProvide: 1) Score (0-10), 2) Detailed feedback, 3) Confidence (0-1)" ) - + # Parse evaluation result (simplified parsing) result = EvaluationResult( - evaluator_name=evaluator.agent_name if hasattr(evaluator, 'agent_name') else f"Evaluator_{i}", + evaluator_name=( + evaluator.agent_name + if hasattr(evaluator, "agent_name") + else f"Evaluator_{i}" + ), criterion=criterion, score=7.5, # Default score, would need proper parsing feedback=eval_response, - confidence=0.8 # Default confidence + confidence=0.8, # Default confidence ) results.append(result) - + except Exception as e: logger.error(f"Error in evaluation: {e}") continue - + # Get summarized feedback from evaluation supervisor if self.evaluation_supervisor and results: summary_prompt = f"Summarize these evaluation results:\n{results}\n\nProvide coordinated, actionable feedback." - + try: - if hasattr(self.evaluation_supervisor, 'run'): - summary_feedback = self.evaluation_supervisor.run(summary_prompt) - logger.info(f"Evaluation summary: {summary_feedback}") + if hasattr(self.evaluation_supervisor, "run"): + summary_feedback = self.evaluation_supervisor.run( + summary_prompt + ) + logger.info( + f"Evaluation summary: {summary_feedback}" + ) except Exception as e: logger.error(f"Error in evaluation summary: {e}") - + self.evaluation_results.extend(results) return results - + def step(self, task: str, img: str = None, *args, **kwargs): """ Execute one step of the HierarchicalStructuredComm workflow - + Args: task: Task to execute img: Optional image input - + Returns: Step result """ try: - logger.info(f"Executing HierarchicalStructuredComm step for task: {task[:100]}...") - + logger.info( + f"Executing HierarchicalStructuredComm step for task: {task[:100]}..." + ) + # Safety check: prevent recursive task processing - if len(task) > 1000: # If task is too long, it might be recursive - logger.warning("Task too long, possible recursive call detected") - return {"error": "Task too long, possible recursive call"} - + if ( + len(task) > 1000 + ): # If task is too long, it might be recursive + logger.warning( + "Task too long, possible recursive call detected" + ) + return { + "error": "Task too long, possible recursive call" + } + # Step 1: Generate initial content generator_result = self._generate_content(task) - + # Safety check: prevent empty or error results - if not generator_result or generator_result.startswith("Error"): + if not generator_result or generator_result.startswith( + "Error" + ): logger.error(f"Generator failed: {generator_result}") - return {"error": f"Generator failed: {generator_result}"} - + return { + "error": f"Generator failed: {generator_result}" + } + # Step 2: Evaluate content hierarchically - evaluation_results = self.run_hierarchical_evaluation(generator_result) - + evaluation_results = self.run_hierarchical_evaluation( + generator_result + ) + # Step 3: Refine content based on evaluation - refined_result = self._refine_content(generator_result, evaluation_results) - + refined_result = self._refine_content( + generator_result, evaluation_results + ) + # Safety check: ensure we have a valid result if not refined_result: refined_result = generator_result - + return { "generator_result": generator_result, "evaluation_results": evaluation_results, "refined_result": refined_result, - "conversation_history": self.conversation_history + "conversation_history": self.conversation_history, } - + except Exception as e: - logger.error(f"Error in HierarchicalStructuredComm step: {e}") + logger.error( + f"Error in HierarchicalStructuredComm step: {e}" + ) logger.error(traceback.format_exc()) return {"error": str(e)} - + def _generate_content(self, task: str) -> str: """Generate initial content using generator agents""" if not self.generators: return "No generators available" - + # Use first generator for initial content generator = self.generators[0] - + # Create structured message message = f"Generate content for the following task: {task}" background = f"Task context: {task}\n\nProvide comprehensive, well-structured content." - + structured_msg = self.send_structured_message( sender=self.supervisor_name, - recipient=generator.agent_name if hasattr(generator, 'agent_name') else "Generator", + recipient=( + generator.agent_name + if hasattr(generator, "agent_name") + else "Generator" + ), message=message, - background=background + background=background, ) - + try: - if hasattr(generator, 'run'): + if hasattr(generator, "run"): # Add a simple, focused prompt to prevent recursive calls prompt = f"Task: {task}\n\nGenerate a clear, concise response. Do not repeat the task or ask for clarification." - + result = generator.run(prompt) - + # Safety check: prevent recursive or overly long responses if len(result) > 2000: result = result[:2000] + "... [truncated]" - + # Safety check: prevent responses that just repeat the task - if task.lower() in result.lower() and len(result) < len(task) * 2: - logger.warning("Generator response appears to be recursive") - return "Error: Generator produced recursive response" - + if ( + task.lower() in result.lower() + and len(result) < len(task) * 2 + ): + logger.warning( + "Generator response appears to be recursive" + ) + return ( + "Error: Generator produced recursive response" + ) + self.intermediate_outputs["generator"] = result return result else: @@ -1523,38 +1725,50 @@ Always explain your refinements and how they address the evaluation feedback. except Exception as e: logger.error(f"Error in content generation: {e}") return f"Error generating content: {e}" - - def _refine_content(self, original_content: str, evaluation_results: List[EvaluationResult]) -> str: + + def _refine_content( + self, + original_content: str, + evaluation_results: List[EvaluationResult], + ) -> str: """Refine content based on evaluation feedback""" if not self.refiners: return original_content - + if not evaluation_results: return original_content - + # Use first refiner refiner = self.refiners[0] - + # Create feedback summary - feedback_summary = "\n".join([ - f"{result.criterion}: {result.feedback} (Score: {result.score}/10)" - for result in evaluation_results - ]) - + feedback_summary = "\n".join( + [ + f"{result.criterion}: {result.feedback} (Score: {result.score}/10)" + for result in evaluation_results + ] + ) + # Create structured message for refinement - message = "Refine the content based on the evaluation feedback" + message = ( + "Refine the content based on the evaluation feedback" + ) background = f"Original content: {original_content}\n\nEvaluation feedback:\n{feedback_summary}" - + structured_msg = self.send_structured_message( sender=self.supervisor_name, - recipient=refiner.agent_name if hasattr(refiner, 'agent_name') else "Refiner", + recipient=( + refiner.agent_name + if hasattr(refiner, "agent_name") + else "Refiner" + ), message=message, background=background, - intermediate_output=original_content + intermediate_output=original_content, ) - + try: - if hasattr(refiner, 'run'): + if hasattr(refiner, "run"): refinement_prompt = f""" Original content: {original_content} @@ -1572,89 +1786,121 @@ Please refine the content to address the feedback while maintaining its core str except Exception as e: logger.error(f"Error in content refinement: {e}") return original_content - + def run(self, task: str, img: str = None, *args, **kwargs): """ Run the complete HierarchicalStructuredComm workflow - + Args: task: Task to execute img: Optional image input - + Returns: Final result """ # Enhanced workflow start display - console.print(Panel( - f"[bold yellow]Starting Hierarchical Structured Communication Workflow[/bold yellow]\n" - f"[cyan]Task:[/cyan] {task[:100]}{'...' if len(task) > 100 else ''}\n" - f"[cyan]Max Loops:[/cyan] {self.max_loops}", - title="Workflow Execution", - border_style="yellow" - )) - logger.info(f"Running HierarchicalStructuredComm workflow for task: {task[:100]}...") - + console.print( + Panel( + f"[bold yellow]Starting Hierarchical Structured Communication Workflow[/bold yellow]\n" + f"[cyan]Task:[/cyan] {task[:100]}{'...' if len(task) > 100 else ''}\n" + f"[cyan]Max Loops:[/cyan] {self.max_loops}", + title="Workflow Execution", + border_style="yellow", + ) + ) + logger.info( + f"Running HierarchicalStructuredComm workflow for task: {task[:100]}..." + ) + current_result = None total_loops = 0 - + # Rich progress tracking with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), - console=console + console=console, ) as progress: - task_progress = progress.add_task("Processing workflow...", total=self.max_loops) - + task_progress = progress.add_task( + "Processing workflow...", total=self.max_loops + ) + for loop in range(self.max_loops): total_loops = loop + 1 - progress.update(task_progress, description=f"Loop {total_loops}/{self.max_loops}") - logger.info(f"HierarchicalStructuredComm loop {total_loops}/{self.max_loops}") - + progress.update( + task_progress, + description=f"Loop {total_loops}/{self.max_loops}", + ) + logger.info( + f"HierarchicalStructuredComm loop {total_loops}/{self.max_loops}" + ) + # Execute step step_result = self.step(task, img, *args, **kwargs) - + if "error" in step_result: - console.print(f"[bold red]Error in loop {total_loops}: {step_result['error']}[/bold red]") - logger.error(f"Error in loop {total_loops}: {step_result['error']}") + console.print( + f"[bold red]Error in loop {total_loops}: {step_result['error']}[/bold red]" + ) + logger.error( + f"Error in loop {total_loops}: {step_result['error']}" + ) break - + current_result = step_result["refined_result"] - + # Check if we should continue refining if loop < self.max_loops - 1: # Simple continuation logic - could be enhanced - evaluation_scores = [result.score for result in step_result["evaluation_results"]] - avg_score = sum(evaluation_scores) / len(evaluation_scores) if evaluation_scores else 0 - + evaluation_scores = [ + result.score + for result in step_result[ + "evaluation_results" + ] + ] + avg_score = ( + sum(evaluation_scores) + / len(evaluation_scores) + if evaluation_scores + else 0 + ) + if avg_score >= 8.0: # High quality threshold - console.print(f"[bold green]High quality achieved (avg score: {avg_score:.2f}), stopping refinement[/bold green]") - logger.info(f"High quality achieved (avg score: {avg_score:.2f}), stopping refinement") + console.print( + f"[bold green]High quality achieved (avg score: {avg_score:.2f}), stopping refinement[/bold green]" + ) + logger.info( + f"High quality achieved (avg score: {avg_score:.2f}), stopping refinement" + ) break - + progress.advance(task_progress) - + # Enhanced completion display - console.print(Panel( - f"[bold green]Workflow Completed Successfully![/bold green]\n" - f"[cyan]Total Loops:[/cyan] {total_loops}\n" - f"[cyan]Conversation History:[/cyan] {len(self.conversation_history)} messages\n" - f"[cyan]Evaluation Results:[/cyan] {len(self.evaluation_results)} evaluations", - title="Workflow Summary", - border_style="green" - )) - + console.print( + Panel( + f"[bold green]Workflow Completed Successfully![/bold green]\n" + f"[cyan]Total Loops:[/cyan] {total_loops}\n" + f"[cyan]Conversation History:[/cyan] {len(self.conversation_history)} messages\n" + f"[cyan]Evaluation Results:[/cyan] {len(self.evaluation_results)} evaluations", + title="Workflow Summary", + border_style="green", + ) + ) + return { "final_result": current_result, "total_loops": total_loops, "conversation_history": self.conversation_history, "evaluation_results": self.evaluation_results, - "intermediate_outputs": self.intermediate_outputs + "intermediate_outputs": self.intermediate_outputs, } - + def __str__(self): return f"HierarchicalStructuredCommunicationFramework(name={self.name}, generators={len(self.generators)}, evaluators={len(self.evaluators)}, refiners={len(self.refiners)})" - + def __repr__(self): return self.__str__() + # Nothing to see here yet.