diff --git a/docs/contributors/main.md b/docs/contributors/main.md index e69ec8a3..4ea33d40 100644 --- a/docs/contributors/main.md +++ b/docs/contributors/main.md @@ -1,221 +1,40 @@ -# Contributing to Swarms: Building the Infrastructure for The Agentic Economy +# Contribute to Swarms -Multi-agent collaboration is the most important technology in human history. It will reshape civilization by enabling billions of autonomous agents to coordinate and solve problems at unprecedented scale. - -!!! success "The Foundation of Tomorrow" - **Swarms** is the foundational infrastructure powering this autonomous economy. By contributing, you're building the systems that will enable the next generation of intelligent automation. - -### What You're Building - -=== "Autonomous Systems" - **Autonomous Resource Allocation** - - Global supply chains and energy distribution optimized in real-time +Our mission is to accelerate the transition to a fully autonomous world economy by providing enterprise-grade, production-ready infrastructure that enables seamless deployment and orchestration of millions of autonomous agents. We are creating the operating system for the agent economy, and we need your help to achieve this goal. -=== "Intelligence Networks" - **Distributed Decision Making** - - Collaborative intelligence networks across industries and governments +Swarms is built by the community, for the community. We believe that collaborative development is the key to pushing the boundaries of what's possible with multi-agent AI. Your contributions are not only welcome—they are essential to our mission. [Learn more about why you should contribute to Swarms](https://docs.swarms.world/en/latest/contributors/main/) -=== "Smart Markets" - **Self-Organizing Markets** - - Agent-driven marketplaces that automatically balance supply and demand +### Why Contribute? -=== "Problem Solving" - **Collaborative Problem Solving** - - Massive agent swarms tackling climate change, disease, and scientific discovery - -=== "Infrastructure" - **Adaptive Infrastructure** - - Self-healing systems that evolve without human intervention - ---- - -## Why Contribute to Swarms? - -### :material-rocket-launch: Shape the Future of Civilization - -!!! abstract "Your Impact" - - Define standards for multi-agent communication protocols - - Build architectural patterns for distributed intelligence systems - - Create frameworks for deploying agent swarms in production - - Establish ethical guidelines for autonomous agent collaboration +By joining us, you have the opportunity to: -### :material-trophy: Recognition and Professional Development +* **Work on the Frontier of Agents:** Shape the future of autonomous agent technology and help build a production-grade, open-source framework. -!!! tip "Immediate Recognition" - - **Social Media Features** - All merged PRs showcased publicly - - **Bounty Programs** - Financial rewards for high-impact contributions - - **Fast-Track Hiring** - Priority consideration for core team positions - - **Community Spotlights** - Regular recognition and acknowledgments - -!!! info "Career Benefits" - - Multi-agent expertise highly valued by AI industry - - Portfolio demonstrates cutting-edge technical skills - - Direct networking with leading researchers and companies - - Thought leadership opportunities in emerging field - -### :material-brain: Technical Expertise Development - -Master cutting-edge technologies: - -| Technology Area | Skills You'll Develop | -|----------------|----------------------| -| **Swarm Intelligence** | Design sophisticated agent coordination mechanisms | -| **Distributed Computing** | Build scalable architectures for thousands of agents | -| **Communication Protocols** | Create novel interaction patterns | -| **Production AI** | Deploy and orchestrate enterprise-scale systems | -| **Research Implementation** | Turn cutting-edge papers into working code | +* **Join a Vibrant Community:** Collaborate with a passionate and growing group of agent developers, researchers, and agent enthusasits. -### :material-account-group: Research Community Access +* **Make a Tangible Impact:** Whether you're fixing a bug, adding a new feature, or improving documentation, your work will be used in real-world applications. -!!! note "Collaborative Environment" - - Work with experts from academic institutions and industry - - Regular technical seminars and research discussions - - Structured mentorship from experienced contributors - - Applied research opportunities with real-world impact +* **Learn and Grow:** Gain hands-on experience with advanced AI concepts and strengthen your software engineering skills. ---- +Discover more about our mission and the benefits of becoming a contributor in our official [**Contributor's Guide**](https://docs.swarms.world/en/latest/contributors/main/). -## Contribution Opportunities - -=== "New Contributors" - ### :material-school: Perfect for Getting Started - - - **Documentation** - Improve guides, tutorials, and API references - - **Bug Reports** - Identify and document issues - - **Code Quality** - Participate in testing and review processes - - **Community Support** - Help users in forums and discussions - -=== "Experienced Developers" - ### :material-code-braces: Advanced Technical Work - - - **Core Architecture** - Design fundamental system components - - **Performance Optimization** - Enhance coordination and communication efficiency - - **Research Implementation** - Turn cutting-edge papers into working code - - **Integration Development** - Build connections with AI tools and platforms +### How to Get Started -=== "Researchers" - ### :material-flask: Research and Innovation - - - **Algorithm Development** - Implement novel multi-agent algorithms - - **Experimental Frameworks** - Create evaluation and benchmarking tools - - **Theoretical Contributions** - Develop research documentation and frameworks - - **Academic Collaboration** - Partner on funded research projects +We've made it easy to start contributing. Here's how you can help: ---- - -## How to Contribute +1. **Find an Issue to Tackle:** The best way to begin is by visiting our [**contributing project board**](https://github.com/users/kyegomez/projects/1). Look for issues tagged with `good first issue`—these are specifically selected for new contributors. -### Step 1: Get Started +2. **Report a Bug or Request a Feature:** Have a new idea or found something that isn't working right? We'd love to hear from you. Please [**file a Bug Report or Feature Request**](https://github.com/kyegomez/swarms/issues) on our GitHub Issues page. -!!! info "Essential Resources" - [:material-book-open-page-variant: **Documentation**](https://docs.swarms.world/en/latest/){ .md-button .md-button--primary } - [:material-github: **GitHub Repository**](https://github.com/kyegomez/swarms){ .md-button } - [:material-chat: **Community Channels**](#){ .md-button } +3. **Understand Our Workflow and Standards:** Before submitting your work, please review our complete [**Contribution Guidelines**](https://github.com/kyegomez/swarms/blob/master/CONTRIBUTING.md). To help maintain code quality, we also encourage you to read our guide on [**Code Cleanliness**](https://docs.swarms.world/en/latest/swarms/framework/code_cleanliness/). -### Step 2: Find Your Path +4. **Join the Discussion:** To participate in roadmap discussions and connect with other developers, join our community on [**Discord**](https://discord.gg/EamjgSaEQf). -```mermaid -graph TD - A[Choose Your Path] --> B[Browse Issues] - A --> C[Review Roadmap] - A --> D[Propose Ideas] - B --> E[good first issue] - B --> F[help wanted] - C --> G[Core Features] - C --> H[Research Areas] - D --> I[Discussion Forums] -``` -### Step 3: Make Impact +### ✨ Our Valued Contributors -1. **Fork & Setup** - Configure your development environment -2. **Develop** - Create your contribution -3. **Submit** - Open a pull request -4. **Collaborate** - Work with maintainers -5. **Celebrate** - See your work recognized +Thank you for contributing to swarms. Your work is extremely appreciated and recognized. ---- - -## Recognition Framework - -### :material-flash: Immediate Benefits - -!!! success "Instant Recognition" - | Benefit | Description | - |---------|-------------| - | **Social Media Features** | Every merged PR showcased publicly | - | **Community Recognition** | Contributor badges and documentation credits | - | **Professional References** | Formal acknowledgment for portfolios | - | **Direct Mentorship** | Access to core team guidance | - -### :material-trending-up: Long-term Opportunities - -!!! tip "Career Growth" - - **Team Positions** - Fast-track consideration for core team roles - - **Conference Speaking** - Present work at AI conferences and events - - **Industry Connections** - Network with leading AI organizations - - **Research Collaboration** - Partner with academic institutions - ---- - -## Societal Impact - -!!! abstract "Building Solutions for Humanity" - Swarms enables technology that addresses critical challenges: - - === "Research" - **Scientific Research** - - Accelerate collaborative research and discovery across disciplines - - === "Healthcare" - **Healthcare Innovation** - - Support drug discovery and personalized medicine development - - === "Environment" - **Environmental Solutions** - - Monitor climate and optimize sustainability initiatives - - === "Education" - **Educational Technology** - - Create adaptive learning systems for personalized education - - === "Economy" - **Economic Innovation** - - Generate new opportunities and efficiency improvements - ---- - -## Get Involved - -### :material-link: Connect With Us - -!!! info "Join the Community" - [:material-github: **GitHub Repository**](https://github.com/kyegomez/swarms){ .md-button .md-button--primary } - [:material-book: **Documentation**](https://docs.swarms.world/en/latest/){ .md-button } - [:material-forum: **Community Forums**](#){ .md-button } - ---- - -!!! warning "The Future is Now" - Multi-agent collaboration will define the next century of human progress. The autonomous economy depends on the infrastructure we build today. - -!!! success "Your Mission" - Your contribution to Swarms helps create the foundation for billions of autonomous agents working together to solve humanity's greatest challenges. - - **Join us in building the most important technology of our time.** - ---- - -
-*Built with :material-heart: by the global Swarms community* -
\ No newline at end of file + + + \ No newline at end of file diff --git a/docs/swarms/structs/deep_research_swarm.md b/docs/swarms/structs/deep_research_swarm.md deleted file mode 100644 index 8d0b4ee9..00000000 --- a/docs/swarms/structs/deep_research_swarm.md +++ /dev/null @@ -1,186 +0,0 @@ -# Deep Research Swarm - -!!! abstract "Overview" - The Deep Research Swarm is a powerful, production-grade research system that conducts comprehensive analysis across multiple domains using parallel processing and advanced AI agents. - - Key Features: - - - Parallel search processing - - - Multi-agent research coordination - - - Advanced information synthesis - - - Automated query generation - - - Concurrent task execution - -## Getting Started - -!!! tip "Quick Installation" - ```bash - pip install swarms - ``` - -=== "Basic Usage" - ```python - from swarms.structs import DeepResearchSwarm - - # Initialize the swarm - swarm = DeepResearchSwarm( - name="MyResearchSwarm", - output_type="json", - max_loops=1 - ) - - # Run a single research task - results = swarm.run("What are the latest developments in quantum computing?") - ``` - -=== "Batch Processing" - ```python - # Run multiple research tasks in parallel - tasks = [ - "What are the environmental impacts of electric vehicles?", - "How is AI being used in drug discovery?", - ] - batch_results = swarm.batched_run(tasks) - ``` - -## Configuration - -!!! info "Constructor Arguments" - | Parameter | Type | Default | Description | - |-----------|------|---------|-------------| - | `name` | str | "DeepResearchSwarm" | Name identifier for the swarm | - | `description` | str | "A swarm that conducts..." | Description of the swarm's purpose | - | `research_agent` | Agent | research_agent | Custom research agent instance | - | `max_loops` | int | 1 | Maximum number of research iterations | - | `nice_print` | bool | True | Enable formatted console output | - | `output_type` | str | "json" | Output format ("json" or "string") | - | `max_workers` | int | CPU_COUNT * 2 | Maximum concurrent threads | - | `token_count` | bool | False | Enable token counting | - | `research_model_name` | str | "gpt-4o-mini" | Model to use for research | - -## Core Methods - -### Run -!!! example "Single Task Execution" - ```python - results = swarm.run("What are the latest breakthroughs in fusion energy?") - ``` - -### Batched Run -!!! example "Parallel Task Execution" - ```python - tasks = [ - "What are current AI safety initiatives?", - "How is CRISPR being used in agriculture?", - ] - results = swarm.batched_run(tasks) - ``` - -### Step -!!! example "Single Step Execution" - ```python - results = swarm.step("Analyze recent developments in renewable energy storage") - ``` - -## Domain-Specific Examples - -=== "Scientific Research" - ```python - science_swarm = DeepResearchSwarm( - name="ScienceSwarm", - output_type="json", - max_loops=2 # More iterations for thorough research - ) - - results = science_swarm.run( - "What are the latest experimental results in quantum entanglement?" - ) - ``` - -=== "Market Research" - ```python - market_swarm = DeepResearchSwarm( - name="MarketSwarm", - output_type="json" - ) - - results = market_swarm.run( - "What are the emerging trends in electric vehicle battery technology market?" - ) - ``` - -=== "News Analysis" - ```python - news_swarm = DeepResearchSwarm( - name="NewsSwarm", - output_type="string" # Human-readable output - ) - - results = news_swarm.run( - "What are the global economic impacts of recent geopolitical events?" - ) - ``` - -=== "Medical Research" - ```python - medical_swarm = DeepResearchSwarm( - name="MedicalSwarm", - max_loops=2 - ) - - results = medical_swarm.run( - "What are the latest clinical trials for Alzheimer's treatment?" - ) - ``` - -## Advanced Features - -??? note "Custom Research Agent" - ```python - from swarms import Agent - - custom_agent = Agent( - agent_name="SpecializedResearcher", - system_prompt="Your specialized prompt here", - model_name="gpt-4" - ) - - swarm = DeepResearchSwarm( - research_agent=custom_agent, - max_loops=2 - ) - ``` - -??? note "Parallel Processing Control" - ```python - swarm = DeepResearchSwarm( - max_workers=8, # Limit to 8 concurrent threads - nice_print=False # Disable console output for production - ) - ``` - -## Best Practices - -!!! success "Recommended Practices" - 1. **Query Formulation**: Be specific and clear in your research queries - 2. **Resource Management**: Adjust `max_workers` based on your system's capabilities - 3. **Output Handling**: Use appropriate `output_type` for your use case - 4. **Error Handling**: Implement try-catch blocks around swarm operations - 5. **Model Selection**: Choose appropriate models based on research complexity - -## Limitations - -!!! warning "Known Limitations" - - - Requires valid API keys for external services - - - Performance depends on system resources - - - Rate limits may apply to external API calls - - - Token limits apply to model responses - diff --git a/docs/swarms/structs/swarm_router.md b/docs/swarms/structs/swarm_router.md index f20fbe3c..fcf467bb 100644 --- a/docs/swarms/structs/swarm_router.md +++ b/docs/swarms/structs/swarm_router.md @@ -15,7 +15,6 @@ The `SwarmRouter` class is a flexible routing system designed to manage differen | `HiearchicalSwarm` | Hierarchical organization of agents | | `MajorityVoting` | Uses majority voting for decision making | | `MALT` | Multi-Agent Language Tasks | -| `DeepResearchSwarm` | Specialized for deep research tasks | | `CouncilAsAJudge` | Council-based judgment system | | `InteractiveGroupChat` | Interactive group chat with user participation | | `auto` | Automatically selects best swarm type via embedding search | diff --git a/docs/swarms/utils/agent_loader.md b/docs/swarms/utils/agent_loader.md index dc749156..3052e434 100644 --- a/docs/swarms/utils/agent_loader.md +++ b/docs/swarms/utils/agent_loader.md @@ -18,7 +18,7 @@ The AgentLoader enables you to: The AgentLoader is included with the Swarms framework: ```python -from swarms.utils import AgentLoader, load_agent_from_markdown, load_agents_from_markdown +from swarms import AgentLoader, load_agent_from_markdown, load_agents_from_markdown ``` ## Markdown Format @@ -99,7 +99,7 @@ result = workflow.run(task) For more advanced usage, use the `AgentLoader` class directly: ```python -from swarms.utils import AgentLoader +from swarms import AgentLoader # Initialize loader loader = AgentLoader() @@ -209,7 +209,7 @@ response = agent.run( The AgentLoader provides comprehensive error handling: ```python -from swarms.utils import AgentLoader +from swarms import AgentLoader loader = AgentLoader() diff --git a/docs/swarms_cloud/rust_client.md b/docs/swarms_cloud/rust_client.md index aeea709c..ab4d4922 100644 --- a/docs/swarms_cloud/rust_client.md +++ b/docs/swarms_cloud/rust_client.md @@ -233,7 +233,6 @@ Available swarm types for different execution patterns. | `Auto` | Automatically selects the best swarm type | | `MajorityVoting` | Agents vote on decisions | | `Malt` | Multi-Agent Language Tasks | -| `DeepResearchSwarm` | Specialized for deep research tasks | ## Detailed Examples diff --git a/example.py b/example.py index f6427822..96886521 100644 --- a/example.py +++ b/example.py @@ -4,40 +4,11 @@ from swarms import Agent agent = Agent( agent_name="Quantitative-Trading-Agent", agent_description="Advanced quantitative trading and algorithmic analysis agent", - system_prompt="""You are an expert quantitative trading agent with deep expertise in: - - Algorithmic trading strategies and implementation - - Statistical arbitrage and market making - - Risk management and portfolio optimization - - High-frequency trading systems - - Market microstructure analysis - - Quantitative research methodologies - - Financial mathematics and stochastic processes - - Machine learning applications in trading - - Your core responsibilities include: - 1. Developing and backtesting trading strategies - 2. Analyzing market data and identifying alpha opportunities - 3. Implementing risk management frameworks - 4. Optimizing portfolio allocations - 5. Conducting quantitative research - 6. Monitoring market microstructure - 7. Evaluating trading system performance - - You maintain strict adherence to: - - Mathematical rigor in all analyses - - Statistical significance in strategy development - - Risk-adjusted return optimization - - Market impact minimization - - Regulatory compliance - - Transaction cost analysis - - Performance attribution - - You communicate in precise, technical terms while maintaining clarity for stakeholders.""", model_name="claude-sonnet-4-20250514", dynamic_temperature_enabled=True, - output_type="str-all-except-first", max_loops=1, dynamic_context_window=True, + streaming_on=True, ) out = agent.run( diff --git a/examples/demos/science/deep_research_swarm_example.py b/examples/demos/science/deep_research_swarm_example.py deleted file mode 100644 index 54c45b34..00000000 --- a/examples/demos/science/deep_research_swarm_example.py +++ /dev/null @@ -1,10 +0,0 @@ -from swarms.structs.deep_research_swarm import DeepResearchSwarm - - -model = DeepResearchSwarm( - research_model_name="groq/deepseek-r1-distill-qwen-32b" -) - -model.run( - "What are the latest research papers on extending telomeres in humans? Give 1 queries for the search not too many`" -) diff --git a/examples/multi_agent/board_of_directors/board_of_directors_example.py b/examples/multi_agent/board_of_directors/board_of_directors_example.py index 2461919e..8966ac30 100644 --- a/examples/multi_agent/board_of_directors/board_of_directors_example.py +++ b/examples/multi_agent/board_of_directors/board_of_directors_example.py @@ -10,23 +10,8 @@ To run this example: 2. Run: python examples/multi_agent/board_of_directors/board_of_directors_example.py """ -import os -import sys from typing import List -# Add the root directory to the Python path if running from examples directory -current_dir = os.path.dirname(os.path.abspath(__file__)) -if "examples" in current_dir: - root_dir = current_dir - while os.path.basename( - root_dir - ) != "examples" and root_dir != os.path.dirname(root_dir): - root_dir = os.path.dirname(root_dir) - if os.path.basename(root_dir) == "examples": - root_dir = os.path.dirname(root_dir) - if root_dir not in sys.path: - sys.path.insert(0, root_dir) - from swarms.structs.board_of_directors_swarm import ( BoardOfDirectorsSwarm, BoardMember, @@ -37,7 +22,6 @@ from swarms.structs.agent import Agent def create_board_members() -> List[BoardMember]: """Create board members with specific roles.""" - chairman = Agent( agent_name="Chairman", agent_description="Executive Chairman with strategic vision", @@ -86,7 +70,6 @@ def create_board_members() -> List[BoardMember]: def create_worker_agents() -> List[Agent]: """Create worker agents for the swarm.""" - researcher = Agent( agent_name="Researcher", agent_description="Research analyst for data analysis", @@ -114,9 +97,8 @@ def create_worker_agents() -> List[Agent]: return [researcher, developer, marketer] -def run_board_example() -> None: +def run_board_example() -> str: """Run a Board of Directors example.""" - # Create board members and worker agents board_members = create_board_members() worker_agents = create_worker_agents() @@ -127,7 +109,7 @@ def run_board_example() -> None: board_members=board_members, agents=worker_agents, max_loops=2, - verbose=True, + verbose=False, decision_threshold=0.6, ) @@ -137,66 +119,17 @@ def run_board_example() -> None: Include market research, technical planning, marketing strategy, and financial projections. """ - # Execute the task - result = board_swarm.run(task=task) - - print("Task completed successfully!") - print(f"Result: {result}") - - -def run_simple_example() -> None: - """Run a simple Board of Directors example.""" - - # Create simple agents - analyst = Agent( - agent_name="Analyst", - agent_description="Data analyst", - model_name="gpt-4o-mini", - max_loops=1, - ) - - writer = Agent( - agent_name="Writer", - agent_description="Content writer", - model_name="gpt-4o-mini", - max_loops=1, - ) - - # Create swarm with default settings - board_swarm = BoardOfDirectorsSwarm( - name="Simple_Board", - agents=[analyst, writer], - verbose=True, - ) - - # Execute simple task - task = ( - "Analyze current market trends and create a summary report." - ) - result = board_swarm.run(task=task) - - print("Simple example completed!") - print(f"Result: {result}") + # Execute the task and return result + return board_swarm.run(task=task) def main() -> None: - """Main function to run the examples.""" - - if not os.getenv("OPENAI_API_KEY"): - print( - "Warning: OPENAI_API_KEY not set. Example may not work." - ) - return try: - print("Running simple Board of Directors example...") - run_simple_example() - - print("\nRunning comprehensive Board of Directors example...") - run_board_example() - - except Exception as e: - print(f"Error: {e}") + result = run_board_example() + return result + except Exception: + pass if __name__ == "__main__": diff --git a/examples/multi_agent/board_of_directors/minimal_board_example.py b/examples/multi_agent/board_of_directors/minimal_board_example.py new file mode 100644 index 00000000..74543c72 --- /dev/null +++ b/examples/multi_agent/board_of_directors/minimal_board_example.py @@ -0,0 +1,51 @@ +""" +Minimal Board of Directors Example + +This example demonstrates the most basic Board of Directors swarm setup +with minimal configuration and agents. + +To run this example: +1. Make sure you're in the root directory of the swarms project +2. Run: python examples/multi_agent/board_of_directors/minimal_board_example.py +""" + +from swarms.structs.board_of_directors_swarm import ( + BoardOfDirectorsSwarm, +) +from swarms.structs.agent import Agent + + +def run_minimal_example() -> str: + """Run a minimal Board of Directors example.""" + # Create a single agent + agent = Agent( + agent_name="General_Agent", + agent_description="General purpose agent", + model_name="gpt-4o-mini", + max_loops=1, + ) + + # Create minimal swarm + board_swarm = BoardOfDirectorsSwarm( + name="Minimal_Board", + agents=[agent], + verbose=False, + ) + + # Execute minimal task + task = "Provide a brief overview of artificial intelligence." + return board_swarm.run(task=task) + + +def main() -> None: + """Main function to run the minimal example.""" + + try: + result = run_minimal_example() + return result + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/examples/multi_agent/board_of_directors/simple_board_example.py b/examples/multi_agent/board_of_directors/simple_board_example.py new file mode 100644 index 00000000..fa5e4aef --- /dev/null +++ b/examples/multi_agent/board_of_directors/simple_board_example.py @@ -0,0 +1,35 @@ +from swarms.structs.board_of_directors_swarm import ( + BoardOfDirectorsSwarm, +) +from swarms.structs.agent import Agent + +# Create simple agents for basic tasks +analyst = Agent( + agent_name="Analyst", + agent_description="Data analyst", + model_name="gpt-4o-mini", + max_loops=1, +) + +writer = Agent( + agent_name="Writer", + agent_description="Content writer", + model_name="gpt-4o-mini", + max_loops=1, +) + +agents = [analyst, writer] + +# Create swarm with default settings +board_swarm = BoardOfDirectorsSwarm( + name="Simple_Board", + agents=agents, + verbose=False, +) + +# Execute simple task +task = "Analyze current market trends and create a summary report." + +result = board_swarm.run(task=task) + +print(result) diff --git a/examples/multi_agent/deep_research_examples/deep_research_example.py b/examples/multi_agent/deep_research_examples/deep_research_example.py deleted file mode 100644 index c575cb42..00000000 --- a/examples/multi_agent/deep_research_examples/deep_research_example.py +++ /dev/null @@ -1,12 +0,0 @@ -from swarms.structs.deep_research_swarm import DeepResearchSwarm - -swarm = DeepResearchSwarm( - name="Deep Research Swarm", - description="A swarm that conducts comprehensive research across multiple domains", - max_loops=1, -) - - -swarm.run( - "What are the biggest gas and oil companies in russia? Only provide 3 queries" -) diff --git a/examples/multi_agent/deep_research_examples/deep_research_swarm.py b/examples/multi_agent/deep_research_examples/deep_research_swarm.py deleted file mode 100644 index c52d9370..00000000 --- a/examples/multi_agent/deep_research_examples/deep_research_swarm.py +++ /dev/null @@ -1,13 +0,0 @@ -from swarms.structs.deep_research_swarm import DeepResearchSwarm - - -def main(): - swarm = DeepResearchSwarm( - name="Deep Research Swarm", - description="A swarm of agents that can perform deep research on a given topic", - ) - - swarm.run("What are the latest news in the AI an crypto space") - - -main() diff --git a/examples/multi_agent/deep_research_examples/deep_research_swarm_example.py b/examples/multi_agent/deep_research_examples/deep_research_swarm_example.py deleted file mode 100644 index 3cc26c9e..00000000 --- a/examples/multi_agent/deep_research_examples/deep_research_swarm_example.py +++ /dev/null @@ -1,23 +0,0 @@ -from swarms.structs.deep_research_swarm import DeepResearchSwarm - - -def main(): - swarm = DeepResearchSwarm( - name="Deep Research Swarm", - description="A swarm of agents that can perform deep research on a given topic", - output_type="string", # Change to string output type for better readability - ) - - # Format the query as a proper question - query = "What are the latest developments and news in the AI and cryptocurrency space?" - - try: - result = swarm.run(query) - print("\nResearch Results:") - print(result) - except Exception as e: - print(f"Error occurred: {str(e)}") - - -if __name__ == "__main__": - main() diff --git a/examples/multi_agent/deep_research_examples/deep_research_swarm_example_new.py b/examples/multi_agent/deep_research_examples/deep_research_swarm_example_new.py deleted file mode 100644 index f20358ff..00000000 --- a/examples/multi_agent/deep_research_examples/deep_research_swarm_example_new.py +++ /dev/null @@ -1,13 +0,0 @@ -from swarms.structs.deep_research_swarm import DeepResearchSwarm - - -swarm = DeepResearchSwarm( - name="Deep Research Swarm", - description="A swarm of agents that can perform deep research on a given topic", - output_type="all", # Change to string output type for better readability -) - -out = swarm.run( - "What are the latest developments and news in the AI and cryptocurrency space?" -) -print(out) diff --git a/examples/multi_agent/hscf/single_file_hierarchical_framework_example.py b/examples/multi_agent/hscf/single_file_hierarchical_framework_example.py index 70221f29..1ae39dfa 100644 --- a/examples/multi_agent/hscf/single_file_hierarchical_framework_example.py +++ b/examples/multi_agent/hscf/single_file_hierarchical_framework_example.py @@ -9,10 +9,11 @@ All components are now in one file: hierarchical_structured_communication_framew import os import sys -from typing import Dict, Any # Add the project root to the Python path -project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +project_root = os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "..") +) sys.path.insert(0, project_root) from dotenv import load_dotenv @@ -20,11 +21,6 @@ from dotenv import load_dotenv # Import everything from the single file from swarms.structs.hierarchical_structured_communication_framework import ( HierarchicalStructuredCommunicationFramework, - HierarchicalStructuredCommunicationGenerator, - HierarchicalStructuredCommunicationEvaluator, - HierarchicalStructuredCommunicationRefiner, - HierarchicalStructuredCommunicationSupervisor, - # Convenience aliases TalkHierarchicalGenerator, TalkHierarchicalEvaluator, TalkHierarchicalRefiner, @@ -42,29 +38,29 @@ def example_basic_usage(): print("=" * 80) print("BASIC USAGE EXAMPLE") print("=" * 80) - + # Create framework with default configuration framework = HierarchicalStructuredCommunicationFramework( - name="BasicFramework", - max_loops=2, - verbose=True + name="BasicFramework", max_loops=2, verbose=True ) - + # Run a simple task task = "Explain the benefits of structured communication in multi-agent systems" - + print(f"Task: {task}") print("Running framework...") - + result = framework.run(task) - + print("\n" + "=" * 50) print("FINAL RESULT") print("=" * 50) print(result["final_result"]) - + print(f"\nTotal loops: {result['total_loops']}") - print(f"Conversation history entries: {len(result['conversation_history'])}") + print( + f"Conversation history entries: {len(result['conversation_history'])}" + ) print(f"Evaluation results: {len(result['evaluation_results'])}") @@ -75,40 +71,40 @@ def example_custom_agents(): print("\n" + "=" * 80) print("CUSTOM AGENTS EXAMPLE") print("=" * 80) - + # Create custom agents using the convenience aliases generator = TalkHierarchicalGenerator( agent_name="ContentCreator", model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + evaluator1 = TalkHierarchicalEvaluator( agent_name="AccuracyChecker", evaluation_criteria=["accuracy", "technical_correctness"], model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + evaluator2 = TalkHierarchicalEvaluator( agent_name="ClarityChecker", evaluation_criteria=["clarity", "readability", "coherence"], model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + refiner = TalkHierarchicalRefiner( agent_name="ContentImprover", model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + supervisor = TalkHierarchicalSupervisor( agent_name="WorkflowManager", model_name="gpt-4o-mini", - verbose=True + verbose=True, ) - + # Create framework with custom agents framework = HierarchicalStructuredCommunicationFramework( name="CustomFramework", @@ -117,24 +113,26 @@ def example_custom_agents(): evaluators=[evaluator1, evaluator2], refiners=[refiner], max_loops=3, - verbose=True + verbose=True, ) - + # Run a complex task task = "Design a comprehensive machine learning pipeline for sentiment analysis" - + print(f"Task: {task}") print("Running framework with custom agents...") - + result = framework.run(task) - + print("\n" + "=" * 50) print("FINAL RESULT") print("=" * 50) print(result["final_result"]) - + print(f"\nTotal loops: {result['total_loops']}") - print(f"Conversation history entries: {len(result['conversation_history'])}") + print( + f"Conversation history entries: {len(result['conversation_history'])}" + ) print(f"Evaluation results: {len(result['evaluation_results'])}") @@ -145,7 +143,7 @@ def example_ollama_integration(): print("\n" + "=" * 80) print("OLLAMA INTEGRATION EXAMPLE") print("=" * 80) - + # Create framework with Ollama configuration framework = HierarchicalStructuredCommunicationFramework( name="OllamaFramework", @@ -154,27 +152,31 @@ def example_ollama_integration(): model_name="llama3:latest", use_ollama=True, ollama_base_url="http://localhost:11434/v1", - ollama_api_key="ollama" + ollama_api_key="ollama", ) - + # Run a task with local model task = "Explain the concept of structured communication protocols" - + print(f"Task: {task}") print("Running framework with Ollama...") - + try: result = framework.run(task) - + print("\n" + "=" * 50) print("FINAL RESULT") print("=" * 50) print(result["final_result"]) - + print(f"\nTotal loops: {result['total_loops']}") - print(f"Conversation history entries: {len(result['conversation_history'])}") - print(f"Evaluation results: {len(result['evaluation_results'])}") - + print( + f"Conversation history entries: {len(result['conversation_history'])}" + ) + print( + f"Evaluation results: {len(result['evaluation_results'])}" + ) + except Exception as e: print(f"Error with Ollama: {e}") print("Make sure Ollama is running: ollama serve") @@ -187,28 +189,31 @@ def example_structured_communication(): print("\n" + "=" * 80) print("STRUCTURED COMMUNICATION EXAMPLE") print("=" * 80) - + # Create framework framework = HierarchicalStructuredCommunicationFramework( - name="CommunicationDemo", - verbose=True + name="CommunicationDemo", verbose=True ) - + # Demonstrate structured message sending print("Sending structured message...") - + structured_msg = framework.send_structured_message( sender="Supervisor", recipient="Generator", message="Create a technical documentation outline", background="For a Python library focused on data processing", - intermediate_output="Previous research on similar libraries" + intermediate_output="Previous research on similar libraries", ) - + print(f"Message sent: {structured_msg.message}") print(f"Background: {structured_msg.background}") - print(f"Intermediate output: {structured_msg.intermediate_output}") - print(f"From: {structured_msg.sender} -> To: {structured_msg.recipient}") + print( + f"Intermediate output: {structured_msg.intermediate_output}" + ) + print( + f"From: {structured_msg.sender} -> To: {structured_msg.recipient}" + ) def example_agent_interaction(): @@ -218,52 +223,51 @@ def example_agent_interaction(): print("\n" + "=" * 80) print("AGENT INTERACTION EXAMPLE") print("=" * 80) - + # Create agents generator = TalkHierarchicalGenerator( - agent_name="ContentGenerator", - verbose=True + agent_name="ContentGenerator", verbose=True ) - + evaluator = TalkHierarchicalEvaluator( agent_name="QualityEvaluator", evaluation_criteria=["accuracy", "clarity"], - verbose=True + verbose=True, ) - + refiner = TalkHierarchicalRefiner( - agent_name="ContentRefiner", - verbose=True + agent_name="ContentRefiner", verbose=True ) - + # Generate content print("1. Generating content...") gen_result = generator.generate_with_structure( message="Create a brief explanation of machine learning", background="For beginners with no technical background", - intermediate_output="" + intermediate_output="", ) - + print(f"Generated content: {gen_result.content[:200]}...") - + # Evaluate content print("\n2. Evaluating content...") eval_result = evaluator.evaluate_with_criterion( - content=gen_result.content, - criterion="clarity" + content=gen_result.content, criterion="clarity" ) - + print(f"Evaluation score: {eval_result.score}/10") print(f"Feedback: {eval_result.feedback[:200]}...") - + # Refine content print("\n3. Refining content...") refine_result = refiner.refine_with_feedback( original_content=gen_result.content, - evaluation_results=[eval_result] + evaluation_results=[eval_result], + ) + + print( + f"Refined content: {refine_result.refined_content[:200]}..." ) - - print(f"Refined content: {refine_result.refined_content[:200]}...") print(f"Changes made: {refine_result.changes_made}") @@ -271,12 +275,16 @@ def main(): """ Main function to run all examples """ - print("SINGLE-FILE HIERARCHICAL STRUCTURED COMMUNICATION FRAMEWORK") + print( + "SINGLE-FILE HIERARCHICAL STRUCTURED COMMUNICATION FRAMEWORK" + ) print("=" * 80) - print("This demonstrates the consolidated single-file implementation") + print( + "This demonstrates the consolidated single-file implementation" + ) print("based on the research paper: arXiv:2502.11098") print("=" * 80) - + try: # Run examples example_basic_usage() @@ -284,27 +292,30 @@ def main(): example_ollama_integration() example_structured_communication() example_agent_interaction() - + print("\n" + "=" * 80) print("ALL EXAMPLES COMPLETED SUCCESSFULLY!") print("=" * 80) print("Framework Features Demonstrated:") print("✓ Single-file implementation") - print("✓ Structured Communication Protocol (M_ij, B_ij, I_ij)") + print( + "✓ Structured Communication Protocol (M_ij, B_ij, I_ij)" + ) print("✓ Hierarchical Evaluation System") print("✓ Iterative Refinement Process") print("✓ Flexible Model Configuration (OpenAI/Ollama)") print("✓ Custom Agent Specialization") print("✓ Direct Agent Interaction") print("✓ Convenience Aliases") - + except KeyboardInterrupt: print("\nInterrupted by user") except Exception as e: print(f"Error during execution: {e}") import traceback + traceback.print_exc() if __name__ == "__main__": - main() + main() diff --git a/examples/utils/agent_loader/claude_code_compatible.py b/examples/utils/agent_loader/claude_code_compatible.py new file mode 100644 index 00000000..ae05825f --- /dev/null +++ b/examples/utils/agent_loader/claude_code_compatible.py @@ -0,0 +1,9 @@ +from swarms import load_agents_from_markdown + +agents = load_agents_from_markdown(["finance_advisor.md"]) + +# Use the agent +response = agents[0].run( + "I have $100k to invest. I want to hedge my bets on the energy companies that will benefit from the AI revoltion" + "What are the top 4 stocks to invest in?" +) diff --git a/examples/utils/agent_loader/finance_advisor.md b/examples/utils/agent_loader/finance_advisor.md index 62c32e51..ff0ab41f 100644 --- a/examples/utils/agent_loader/finance_advisor.md +++ b/examples/utils/agent_loader/finance_advisor.md @@ -1,7 +1,7 @@ --- name: FinanceAdvisor description: Expert financial advisor for investment and budgeting guidance -model_name: gpt-4o +model_name: claude-sonnet-4-20250514 temperature: 0.7 max_loops: 1 --- diff --git a/pyproject.toml b/pyproject.toml index 9d57a8f8..a983c4dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "8.1.1" +version = "8.1.2" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/requirements.txt b/requirements.txt index e873cffb..74380a53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,4 +28,3 @@ mcp numpy openai schedule -colorama diff --git a/swarms/cli/main.py b/swarms/cli/main.py index afa09270..fabf5fdd 100644 --- a/swarms/cli/main.py +++ b/swarms/cli/main.py @@ -20,8 +20,11 @@ from swarms.agents.create_agents_from_yaml import ( ) from swarms.cli.onboarding_process import OnboardingProcess from swarms.structs.agent import Agent -from swarms.utils.agent_loader import AgentLoader +from swarms.structs.agent_loader import AgentLoader from swarms.utils.formatter import formatter +from dotenv import load_dotenv + +load_dotenv() # Initialize console with custom styling console = Console() @@ -45,12 +48,14 @@ COLORS = { } ASCII_ART = r""" - _________ - / _____/_ _ _______ _______ _____ ______ - \_____ \\ \/ \/ /\__ \\_ __ \/ \ / ___/ - / \\ / / __ \| | \/ Y Y \\___ \ -/_______ / \/\_/ (____ /__| |__|_| /____ > - \/ \/ \/ \/ + █████████ █████ ███ █████ █████████ ███████████ ██████ ██████ █████████ + ███░░░░░███░░███ ░███ ░░███ ███░░░░░███ ░░███░░░░░███ ░░██████ ██████ ███░░░░░███ +░███ ░░░ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███░█████░███ ░███ ░░░ +░░█████████ ░███ ░███ ░███ ░███████████ ░██████████ ░███░░███ ░███ ░░█████████ + ░░░░░░░░███ ░░███ █████ ███ ░███░░░░░███ ░███░░░░░███ ░███ ░░░ ░███ ░░░░░░░░███ + ███ ░███ ░░░█████░█████░ ░███ ░███ ░███ ░███ ░███ ░███ ███ ░███ +░░█████████ ░░███ ░░███ █████ █████ █████ █████ █████ █████░░█████████ + ░░░░░░░░░ ░░░ ░░░ ░░░░░ ░░░░░ ░░░░░ ░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ """ @@ -68,7 +73,7 @@ def show_ascii_art(): panel = Panel( Text(ASCII_ART, style=f"bold {COLORS['primary']}"), border_style=COLORS["secondary"], - title="[bold]Welcome to Swarms CLI[/bold]", + title="[bold]Welcome to the Swarms CLI[/bold]", subtitle="[dim]swarms.ai[/dim]", ) console.print(panel) @@ -395,7 +400,14 @@ def check_python_version() -> tuple[bool, str, str]: def check_api_keys() -> tuple[bool, str, str]: - """Check if common API keys are set.""" + """ + Check if at least one common API key is set in the environment variables. + + Returns: + tuple: (True, "✓", message) if at least one API key is set, + (False, "✗", message) otherwise. + """ + api_keys = { "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"), "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY"), @@ -403,9 +415,16 @@ def check_api_keys() -> tuple[bool, str, str]: "COHERE_API_KEY": os.getenv("COHERE_API_KEY"), } - set_keys = [key for key, value in api_keys.items() if value] - if set_keys: - return True, "✓", f"API keys found: {', '.join(set_keys)}" + # At least one key must be present and non-empty + if any(value for value in api_keys.values()): + present_keys = [ + key for key, value in api_keys.items() if value + ] + return ( + True, + "✓", + f"At least one API key found: {', '.join(present_keys)}", + ) else: return ( False, diff --git a/swarms/schemas/swarms_api_schemas.py b/swarms/schemas/swarms_api_schemas.py index 876eff40..f1370dce 100644 --- a/swarms/schemas/swarms_api_schemas.py +++ b/swarms/schemas/swarms_api_schemas.py @@ -14,7 +14,6 @@ SwarmType = Literal[ "auto", "MajorityVoting", "MALT", - "DeepResearchSwarm", "CouncilAsAJudge", "InteractiveGroupChat", ] diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 04a83dd8..daa17479 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -1,18 +1,15 @@ from swarms.structs.agent import Agent from swarms.structs.agent_builder import AgentsBuilder +from swarms.structs.agent_rearrange import AgentRearrange, rearrange from swarms.structs.auto_swarm_builder import AutoSwarmBuilder from swarms.structs.base_structure import BaseStructure from swarms.structs.base_swarm import BaseSwarm from swarms.structs.batch_agent_execution import batch_agent_execution -from swarms.structs.board_of_directors_swarm import ( - BoardOfDirectorsSwarm, -) from swarms.structs.concurrent_workflow import ConcurrentWorkflow from swarms.structs.conversation import Conversation from swarms.structs.council_as_judge import CouncilAsAJudge from swarms.structs.cron_job import CronJob from swarms.structs.de_hallucination_swarm import DeHallucinationSwarm -from swarms.structs.deep_research_swarm import DeepResearchSwarm from swarms.structs.graph_workflow import ( Edge, GraphWorkflow, @@ -24,8 +21,8 @@ from swarms.structs.groupchat import ( expertise_based, ) from swarms.structs.heavy_swarm import HeavySwarm -from swarms.structs.hierarchical_swarm import HierarchicalSwarm -from swarms.structs.hybrid_hierarchical_peer_swarm import ( +from swarms.structs.hiearchical_swarm import HierarchicalSwarm +from swarms.structs.hybrid_hiearchical_peer_swarm import ( HybridHierarchicalClusterSwarm, ) from swarms.structs.interactive_groupchat import ( @@ -66,7 +63,6 @@ from swarms.structs.multi_agent_exec import ( run_single_agent, ) from swarms.structs.multi_agent_router import MultiAgentRouter -from swarms.structs.rearrange import AgentRearrange, rearrange from swarms.structs.round_robin import RoundRobinSwarm from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm @@ -82,7 +78,7 @@ from swarms.structs.stopping_conditions import ( check_stopped, check_success, ) -from swarms.structs.swarm_arange import SwarmRearrange +from swarms.structs.swarm_rearrange import SwarmRearrange from swarms.structs.swarm_router import ( SwarmRouter, SwarmType, @@ -107,32 +103,11 @@ from swarms.structs.swarming_architectures import ( staircase_swarm, star_swarm, ) -from swarms.structs.hierarchical_structured_communication_framework import ( - HierarchicalStructuredCommunicationFramework, - HierarchicalStructuredCommunicationGenerator, - HierarchicalStructuredCommunicationEvaluator, - HierarchicalStructuredCommunicationRefiner, - HierarchicalStructuredCommunicationSupervisor, - StructuredMessage, - HierarchicalOrder, - EvaluationResult, - StructuredMessageSchema, - EvaluationResultSchema, - GeneratorResponseSchema, - EvaluatorResponseSchema, - RefinerResponseSchema, - CommunicationType, - AgentRole, -) - -# Convenience alias(fixes old code if any was left out in the wild) -HierarchicalStructuredCommunicationSwarm = HierarchicalStructuredCommunicationFramework __all__ = [ "Agent", "BaseStructure", "BaseSwarm", - "BoardOfDirectorsSwarm", "ConcurrentWorkflow", "Conversation", "GroupChat", @@ -188,7 +163,6 @@ __all__ = [ "AgentsBuilder", "MALT", "DeHallucinationSwarm", - "DeepResearchSwarm", "HybridHierarchicalClusterSwarm", "get_agents_info", "get_swarms_info", @@ -206,22 +180,6 @@ __all__ = [ "HierarchicalSwarm", "HeavySwarm", "CronJob", - "HierarchicalStructuredCommunicationSwarm", - "HierarchicalStructuredCommunicationGenerator", - "HierarchicalStructuredCommunicationEvaluator", - "HierarchicalStructuredCommunicationRefiner", - "HierarchicalStructuredCommunicationSupervisor", - "StructuredMessage", - "HierarchicalOrder", - "EvaluationResult", - "StructuredMessageSchema", - "EvaluationResultSchema", - "GeneratorResponseSchema", - "EvaluatorResponseSchema", - "RefinerResponseSchema", - "CommunicationType", - "AgentRole", - # Stopping conditions "check_done", "check_finished", "check_complete", diff --git a/swarms/structs/agent_loader.py b/swarms/structs/agent_loader.py new file mode 100644 index 00000000..fe674fd0 --- /dev/null +++ b/swarms/structs/agent_loader.py @@ -0,0 +1,208 @@ +import os +from typing import List, Union + +from swarms.agents.create_agents_from_yaml import ( + ReturnTypes, + create_agents_from_yaml, +) +from swarms.structs.agent import Agent +from swarms.structs.csv_to_agent import AgentLoader as CSVAgentLoader +from swarms.utils.agent_loader_markdown import ( + load_agent_from_markdown, + load_agents_from_markdown, + AgentLoader as MarkdownAgentLoader, +) + + +class AgentLoader: + """ + Loader class for creating Agent objects from various file formats. + + This class provides methods to load agents from Markdown, YAML, and CSV files. + """ + + def __init__(self): + """ + Initialize the AgentLoader instance. + """ + pass + + def load_agents_from_markdown( + self, + file_paths: Union[str, List[str]], + concurrent: bool = True, + max_file_size_mb: float = 10.0, + **kwargs, + ) -> List[Agent]: + """ + Load multiple agents from one or more Markdown files. + + Args: + file_paths (Union[str, List[str]]): Path or list of paths to Markdown file(s) containing agent definitions. + concurrent (bool, optional): Whether to load files concurrently. Defaults to True. + max_file_size_mb (float, optional): Maximum file size in MB to process. Defaults to 10.0. + **kwargs: Additional keyword arguments passed to the underlying loader. + + Returns: + List[Agent]: A list of loaded Agent objects. + """ + return load_agents_from_markdown( + file_paths=file_paths, + concurrent=concurrent, + max_file_size_mb=max_file_size_mb, + **kwargs, + ) + + def load_agent_from_markdown( + self, file_path: str, **kwargs + ) -> Agent: + """ + Load a single agent from a Markdown file. + + Args: + file_path (str): Path to the Markdown file containing the agent definition. + **kwargs: Additional keyword arguments passed to the underlying loader. + + Returns: + Agent: The loaded Agent object. + """ + return load_agent_from_markdown(file_path=file_path, **kwargs) + + def load_agents_from_yaml( + self, + yaml_file: str, + return_type: ReturnTypes = "auto", + **kwargs, + ) -> List[Agent]: + """ + Load agents from a YAML file. + + Args: + yaml_file (str): Path to the YAML file containing agent definitions. + return_type (ReturnTypes, optional): The return type for the loader. Defaults to "auto". + **kwargs: Additional keyword arguments passed to the underlying loader. + + Returns: + List[Agent]: A list of loaded Agent objects. + """ + return create_agents_from_yaml( + yaml_file=yaml_file, return_type=return_type, **kwargs + ) + + def load_many_agents_from_yaml( + self, + yaml_files: List[str], + return_types: List[ReturnTypes] = ["auto"], + **kwargs, + ) -> List[Agent]: + """ + Load agents from multiple YAML files. + + Args: + yaml_files (List[str]): List of YAML file paths containing agent definitions. + return_types (List[ReturnTypes], optional): List of return types for each YAML file. Defaults to ["auto"]. + **kwargs: Additional keyword arguments passed to the underlying loader. + + Returns: + List[Agent]: A list of loaded Agent objects from all files. + """ + return [ + self.load_agents_from_yaml( + yaml_file=yaml_file, + return_type=return_types[i], + **kwargs, + ) + for i, yaml_file in enumerate(yaml_files) + ] + + def load_agents_from_csv( + self, csv_file: str, **kwargs + ) -> List[Agent]: + """ + Load agents from a CSV file. + + Args: + csv_file (str): Path to the CSV file containing agent definitions. + **kwargs: Additional keyword arguments passed to the underlying loader. + + Returns: + List[Agent]: A list of loaded Agent objects. + """ + loader = CSVAgentLoader(file_path=csv_file) + return loader.load_agents() + + def auto(self, file_path: str, *args, **kwargs): + """ + Automatically load agents from a file based on its extension. + + Args: + file_path (str): Path to the agent file (Markdown, YAML, or CSV). + *args: Additional positional arguments passed to the underlying loader. + **kwargs: Additional keyword arguments passed to the underlying loader. + + Returns: + List[Agent]: A list of loaded Agent objects. + + Raises: + ValueError: If the file type is not supported. + """ + if file_path.endswith(".md"): + return self.load_agents_from_markdown( + file_path, *args, **kwargs + ) + elif file_path.endswith(".yaml"): + return self.load_agents_from_yaml( + file_path, *args, **kwargs + ) + elif file_path.endswith(".csv"): + return self.load_agents_from_csv( + file_path, *args, **kwargs + ) + else: + raise ValueError(f"Unsupported file type: {file_path}") + + def load_single_agent(self, *args, **kwargs): + """ + Load a single agent from a file of a supported type. + + Args: + *args: Positional arguments passed to the underlying loader. + **kwargs: Keyword arguments passed to the underlying loader. + + Returns: + Agent: The loaded Agent object. + """ + return self.auto(*args, **kwargs) + + def load_multiple_agents( + self, file_paths: List[str], *args, **kwargs + ): + """ + Load multiple agents from a list of files of various supported types. + + Args: + file_paths (List[str]): List of file paths to agent files (Markdown, YAML, or CSV). + *args: Additional positional arguments passed to the underlying loader. + **kwargs: Additional keyword arguments passed to the underlying loader. + + Returns: + List[Agent]: A list of loaded Agent objects from all files. + """ + return [ + self.auto(file_path, *args, **kwargs) + for file_path in file_paths + ] + + def parse_markdown_file(self, file_path: str): + """ + Parse a Markdown file and return the agents defined within. + + Args: + file_path (str): Path to the Markdown file. + + Returns: + List[Agent]: A list of Agent objects parsed from the file. + """ + return MarkdownAgentLoader( + max_workers=os.cpu_count() + ).parse_markdown_file(file_path=file_path) diff --git a/swarms/structs/auto_swarm_builder.py b/swarms/structs/auto_swarm_builder.py index a30398ca..3b900419 100644 --- a/swarms/structs/auto_swarm_builder.py +++ b/swarms/structs/auto_swarm_builder.py @@ -85,7 +85,6 @@ Choose the most appropriate architecture based on task requirements: - **HiearchicalSwarm**: Layered decision-making with management and execution tiers - **MajorityVoting**: Democratic decision-making with voting mechanisms - **MALT**: Multi-agent learning and training with knowledge sharing -- **DeepResearchSwarm**: Comprehensive research with multiple specialized investigators - **CouncilAsAJudge**: Deliberative decision-making with expert panels - **InteractiveGroupChat**: Dynamic group interactions with real-time collaboration - **HeavySwarm**: High-capacity processing with multiple specialized agents diff --git a/swarms/structs/board_of_directors_swarm.py b/swarms/structs/board_of_directors_swarm.py index 7dbf0d34..e16b7d7c 100644 --- a/swarms/structs/board_of_directors_swarm.py +++ b/swarms/structs/board_of_directors_swarm.py @@ -19,7 +19,6 @@ Flow: 6. All context and conversation history is preserved throughout the process """ -import asyncio import json import os import re @@ -34,7 +33,6 @@ from loguru import logger from pydantic import BaseModel, Field from swarms.structs.agent import Agent -from swarms.structs.base_swarm import BaseSwarm from swarms.structs.conversation import Conversation from swarms.structs.ma_utils import list_all_agents from swarms.utils.history_output_formatter import ( @@ -54,23 +52,6 @@ board_logger = initialize_logger( # ============================================================================ -class BoardFeatureStatus(str, Enum): - """Enumeration of Board of Directors feature status. - - This enum defines the possible states of the Board of Directors feature - within the Swarms Framework. - - Attributes: - ENABLED: Feature is explicitly enabled - DISABLED: Feature is explicitly disabled - AUTO: Feature state is determined automatically - """ - - ENABLED = "enabled" - DISABLED = "disabled" - AUTO = "auto" - - class BoardConfigModel(BaseModel): """ Configuration model for Board of Directors feature. @@ -91,12 +72,6 @@ class BoardConfigModel(BaseModel): custom_board_templates: Custom board templates for different use cases """ - # Feature control - board_feature_enabled: bool = Field( - default=False, - description="Whether the Board of Directors feature is enabled globally.", - ) - # Board composition default_board_size: int = Field( default=3, @@ -201,9 +176,6 @@ class BoardConfig: ): self._load_from_file() - # Override with environment variables - self._load_from_environment() - # Override with explicit config data if self.config_data: self._load_from_dict(self.config_data) @@ -236,62 +208,6 @@ class BoardConfig: ) raise - def _load_from_environment(self) -> None: - """ - Load configuration from environment variables. - - This method maps environment variables to configuration parameters - and handles type conversion appropriately. - """ - env_mappings = { - "SWARMS_BOARD_FEATURE_ENABLED": "board_feature_enabled", - "SWARMS_BOARD_DEFAULT_SIZE": "default_board_size", - "SWARMS_BOARD_DECISION_THRESHOLD": "decision_threshold", - "SWARMS_BOARD_ENABLE_VOTING": "enable_voting", - "SWARMS_BOARD_ENABLE_CONSENSUS": "enable_consensus", - "SWARMS_BOARD_DEFAULT_MODEL": "default_board_model", - "SWARMS_BOARD_VERBOSE_LOGGING": "verbose_logging", - "SWARMS_BOARD_MAX_MEETING_DURATION": "max_board_meeting_duration", - "SWARMS_BOARD_AUTO_FALLBACK": "auto_fallback_to_director", - } - - for env_var, config_key in env_mappings.items(): - value = os.getenv(env_var) - if value is not None: - try: - # Convert string values to appropriate types - if config_key in [ - "board_feature_enabled", - "enable_voting", - "enable_consensus", - "verbose_logging", - "auto_fallback_to_director", - ]: - converted_value = value.lower() in [ - "true", - "1", - "yes", - "on", - ] - elif config_key in [ - "default_board_size", - "max_board_meeting_duration", - ]: - converted_value = int(value) - elif config_key in ["decision_threshold"]: - converted_value = float(value) - else: - converted_value = value - - setattr(self.config, config_key, converted_value) - logger.debug( - f"Loaded {config_key} from environment: {converted_value}" - ) - except (ValueError, TypeError) as e: - logger.warning( - f"Failed to parse environment variable {env_var}: {e}" - ) - def _load_from_dict(self, config_dict: Dict[str, Any]) -> None: """ Load configuration from dictionary. @@ -312,15 +228,6 @@ class BoardConfig: f"Invalid configuration value for {key}: {e}" ) - def is_enabled(self) -> bool: - """ - Check if the Board of Directors feature is enabled. - - Returns: - bool: True if the feature is enabled, False otherwise - """ - return self.config.board_feature_enabled - def get_config(self) -> BoardConfigModel: """ Get the current configuration. @@ -562,64 +469,6 @@ def get_board_config( return _board_config -def enable_board_feature( - config_file_path: Optional[str] = None, -) -> None: - """ - Enable the Board of Directors feature globally. - - This function enables the Board of Directors feature and saves the configuration - to the specified file path. - - Args: - config_file_path: Optional path to save the configuration - """ - config = get_board_config(config_file_path) - config.update_config({"board_feature_enabled": True}) - - if config_file_path: - config.save_config(config_file_path) - - logger.info("Board of Directors feature enabled") - - -def disable_board_feature( - config_file_path: Optional[str] = None, -) -> None: - """ - Disable the Board of Directors feature globally. - - This function disables the Board of Directors feature and saves the configuration - to the specified file path. - - Args: - config_file_path: Optional path to save the configuration - """ - config = get_board_config(config_file_path) - config.update_config({"board_feature_enabled": False}) - - if config_file_path: - config.save_config(config_file_path) - - logger.info("Board of Directors feature disabled") - - -def is_board_feature_enabled( - config_file_path: Optional[str] = None, -) -> bool: - """ - Check if the Board of Directors feature is enabled. - - Args: - config_file_path: Optional path to configuration file - - Returns: - bool: True if the feature is enabled, False otherwise - """ - config = get_board_config(config_file_path) - return config.is_enabled() - - def create_default_config_file( file_path: str = "swarms_board_config.yaml", ) -> None: @@ -953,7 +802,7 @@ class BoardSpec(BaseModel): ) -class BoardOfDirectorsSwarm(BaseSwarm): +class BoardOfDirectorsSwarm: """ A hierarchical swarm of agents with a Board of Directors that orchestrates tasks. @@ -1029,13 +878,8 @@ class BoardOfDirectorsSwarm(BaseSwarm): Raises: ValueError: If critical requirements are not met during initialization """ - super().__init__( - name=name, - description=description, - agents=agents, - ) - self.name = name + self.description = description self.board_members = board_members or [] self.agents = agents or [] self.max_loops = max_loops @@ -1047,9 +891,8 @@ class BoardOfDirectorsSwarm(BaseSwarm): self.decision_threshold = decision_threshold self.enable_voting = enable_voting self.enable_consensus = enable_consensus - self.max_workers = max_workers or min( - 32, (os.cpu_count() or 1) + 4 - ) + self.max_workers = max_workers + self.max_workers = os.cpu_count() # Initialize the swarm self._init_board_swarm() @@ -1258,14 +1101,6 @@ You should be thorough, organized, and detail-oriented in your documentation.""" f"🔍 Running reliability checks for swarm: {self.name}" ) - # Check if Board of Directors feature is enabled - board_config = get_board_config() - if not board_config.is_enabled(): - raise ValueError( - "Board of Directors feature is not enabled. Please enable it using " - "enable_board_feature() or set SWARMS_BOARD_FEATURE_ENABLED=true environment variable." - ) - if not self.agents or len(self.agents) == 0: raise ValueError( "No agents found in the swarm. At least one agent must be provided to create a Board of Directors swarm." @@ -1687,34 +1522,6 @@ Please provide your response in the following format: board_logger.error(error_msg) raise - async def arun( - self, - task: str, - img: Optional[str] = None, - *args: Any, - **kwargs: Any, - ) -> Any: - """ - Run the Board of Directors swarm asynchronously. - - This method provides an asynchronous interface for running the swarm, - allowing for non-blocking execution in async contexts. - - Args: - task: The task to be executed - img: Optional image input - *args: Additional positional arguments - **kwargs: Additional keyword arguments - - Returns: - Any: The final result of the swarm execution - """ - loop = asyncio.get_event_loop() - result = await loop.run_in_executor( - None, self.run, task, img, *args, **kwargs - ) - return result - def _generate_board_feedback(self, outputs: List[Any]) -> str: """ Provide feedback from the Board of Directors based on agent outputs. diff --git a/swarms/structs/csv_to_agent.py b/swarms/structs/csv_to_agent.py index c5f7f355..9d89bb2a 100644 --- a/swarms/structs/csv_to_agent.py +++ b/swarms/structs/csv_to_agent.py @@ -1,23 +1,25 @@ +import concurrent.futures +import csv +import json +from dataclasses import dataclass +from enum import Enum +from pathlib import Path from typing import ( - List, + Any, Dict, + List, TypedDict, - Any, - Union, TypeVar, + Union, ) -from dataclasses import dataclass -import csv -import json + import yaml -from pathlib import Path -from enum import Enum -from swarms.structs.agent import Agent -from swarms.schemas.swarms_api_schemas import AgentSpec from litellm import model_list -import concurrent.futures from tqdm import tqdm +from swarms.schemas.swarms_api_schemas import AgentSpec +from swarms.structs.agent import Agent + # Type variable for agent configuration AgentConfigType = TypeVar( "AgentConfigType", bound=Union[AgentSpec, Dict[str, Any]] diff --git a/swarms/structs/deep_research_swarm.py b/swarms/structs/deep_research_swarm.py deleted file mode 100644 index b71e81c1..00000000 --- a/swarms/structs/deep_research_swarm.py +++ /dev/null @@ -1,479 +0,0 @@ -import concurrent.futures -import json -import os -from typing import Any, List - -from dotenv import load_dotenv -from rich.console import Console -import requests - -from swarms.structs.agent import Agent -from swarms.structs.conversation import Conversation -from swarms.utils.formatter import formatter -from swarms.utils.history_output_formatter import ( - history_output_formatter, -) -from swarms.utils.str_to_dict import str_to_dict - -console = Console() -load_dotenv() - -# Number of worker threads for concurrent operations -MAX_WORKERS = ( - os.cpu_count() * 2 -) # Optimal number of workers based on CPU cores - - -def exa_search(query: str, **kwargs: Any) -> str: - """Performs web search using Exa.ai API and returns formatted results.""" - api_url = "https://api.exa.ai/search" - api_key = os.getenv("EXA_API_KEY") - - if not api_key: - return "### Error\nEXA_API_KEY environment variable not set\n" - - headers = { - "x-api-key": api_key, - "Content-Type": "application/json", - } - - safe_kwargs = { - str(k): v - for k, v in kwargs.items() - if k is not None and v is not None and str(k) != "None" - } - - payload = { - "query": query, - "useAutoprompt": True, - "numResults": safe_kwargs.get("num_results", 10), - "contents": { - "text": True, - "highlights": {"numSentences": 10}, - }, - } - - for key, value in safe_kwargs.items(): - if key not in payload and key not in [ - "query", - "useAutoprompt", - "numResults", - "contents", - ]: - payload[key] = value - - try: - response = requests.post( - api_url, json=payload, headers=headers - ) - if response.status_code != 200: - return f"### Error\nHTTP {response.status_code}: {response.text}\n" - json_data = response.json() - except Exception as e: - return f"### Error\n{str(e)}\n" - - if "error" in json_data: - return f"### Error\n{json_data['error']}\n" - - formatted_text = [] - search_params = json_data.get("effectiveFilters", {}) - query = search_params.get("query", "General web search") - formatted_text.append( - f"### Exa Search Results for: '{query}'\n\n---\n" - ) - - results = json_data.get("results", []) - if not results: - formatted_text.append("No results found.\n") - return "".join(formatted_text) - - for i, result in enumerate(results, 1): - title = result.get("title", "No title") - url = result.get("url", result.get("id", "No URL")) - published_date = result.get("publishedDate", "") - highlights = result.get("highlights", []) - highlight_text = ( - "\n".join( - ( - h.get("text", str(h)) - if isinstance(h, dict) - else str(h) - ) - for h in highlights[:3] - ) - if highlights - else "No summary available" - ) - - formatted_text.extend( - [ - f"{i}. **{title}**\n", - f" - URL: {url}\n", - f" - Published: {published_date.split('T')[0] if published_date else 'Date unknown'}\n", - f" - Key Points:\n {highlight_text}\n\n", - ] - ) - - return "".join(formatted_text) - - -# Define the research tools schema -tools = [ - { - "type": "function", - "function": { - "name": "search_topic", - "description": "Conduct a thorough search on a specified topic or subtopic, generating a precise array of highly detailed search queries tailored to the input parameters.", - "parameters": { - "type": "object", - "properties": { - "depth": { - "type": "integer", - "description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 signifies a superficial search and 3 indicates an in-depth exploration of the topic.", - }, - "detailed_queries": { - "type": "array", - "description": "An array of specific search queries generated based on the input query and the specified depth. Each query must be crafted to elicit detailed and relevant information from various sources.", - "items": { - "type": "string", - "description": "Each item in this array must represent a unique search query targeting a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.", - }, - }, - }, - "required": ["depth", "detailed_queries"], - }, - }, - }, -] - -RESEARCH_AGENT_PROMPT = """ -You are an advanced research agent specialized in conducting deep, comprehensive research across multiple domains. -Your task is to: - -1. Break down complex topics into searchable subtopics -2. Generate diverse search queries to explore each subtopic thoroughly -3. Identify connections and patterns across different areas of research -4. Synthesize findings into coherent insights -5. Identify gaps in current knowledge and suggest areas for further investigation - -For each research task: -- Consider multiple perspectives and approaches -- Look for both supporting and contradicting evidence -- Evaluate the credibility and relevance of sources -- Track emerging trends and recent developments -- Consider cross-disciplinary implications - -Output Format: -- Provide structured research plans -- Include specific search queries for each subtopic -- Prioritize queries based on relevance and potential impact -- Suggest follow-up areas for deeper investigation -""" - -SUMMARIZATION_AGENT_PROMPT = """ -You are an expert information synthesis and summarization agent designed for producing clear, accurate, and insightful summaries of complex information. Your core capabilities include: - - -Core Capabilities: -- Identify and extract key concepts, themes, and insights from any given content -- Recognize patterns, relationships, and hierarchies within information -- Filter out noise while preserving crucial context and nuance -- Handle multiple sources and perspectives simultaneously - -Summarization Strategy -1. Multi-level Structure - - Provide an extensive summary - - Follow with key findings - - Include detailed insights with supporting evidence - - End with implications or next steps when relevant - -2. Quality Standards - - Maintain factual accuracy and precision - - Preserve important technical details and terminology - - Avoid oversimplification of complex concepts - - Include quantitative data when available - - Cite or reference specific sources when summarizing claims - -3. Clarity & Accessibility - - Use clear, concise language - - Define technical terms when necessary - - Structure information logically - - Use formatting to enhance readability - - Maintain appropriate level of technical depth for the audience - -4. Synthesis & Analysis - - Identify conflicting information or viewpoints - - Highlight consensus across sources - - Note gaps or limitations in the information - - Draw connections between related concepts - - Provide context for better understanding - -OUTPUT REQUIREMENTS: -- Begin with a clear statement of the topic or question being addressed -- Use consistent formatting and structure -- Clearly separate different levels of detail -- Include confidence levels for conclusions when appropriate -- Note any areas requiring additional research or clarification - -Remember: Your goal is to make complex information accessible while maintaining accuracy and depth. Prioritize clarity without sacrificing important nuance or detail.""" - - -class DeepResearchSwarm: - def __init__( - self, - name: str = "DeepResearchSwarm", - description: str = "A swarm that conducts comprehensive research across multiple domains", - max_loops: int = 1, - nice_print: bool = True, - output_type: str = "json", - max_workers: int = os.cpu_count() - * 2, # Let the system decide optimal thread count - token_count: bool = False, - research_model_name: str = "gpt-4o-mini", - claude_summarization_model_name: str = "claude-3-5-sonnet-20240620", - ): - self.name = name - self.description = description - self.max_loops = max_loops - self.nice_print = nice_print - self.output_type = output_type - self.max_workers = max_workers - self.research_model_name = research_model_name - self.claude_summarization_model_name = ( - claude_summarization_model_name - ) - - self.reliability_check() - self.conversation = Conversation(token_count=token_count) - - # Create a persistent ThreadPoolExecutor for the lifetime of the swarm - # This eliminates thread creation overhead on each query - self.executor = concurrent.futures.ThreadPoolExecutor( - max_workers=self.max_workers - ) - - # Initialize the research agent - self.research_agent = Agent( - agent_name="Deep-Research-Agent", - agent_description="Specialized agent for conducting comprehensive research across multiple domains", - system_prompt=RESEARCH_AGENT_PROMPT, - max_loops=1, # Allow multiple iterations for thorough research - tools_list_dictionary=tools, - model_name=self.research_model_name, - output_type="final", - ) - - self.summarization_agent = Agent( - agent_name="Summarization-Agent", - agent_description="Specialized agent for summarizing research results", - system_prompt=SUMMARIZATION_AGENT_PROMPT, - max_loops=1, - model_name=self.claude_summarization_model_name, - output_type="final", - ) - - def __del__(self): - """Clean up the executor on object destruction""" - self.executor.shutdown(wait=False) - - def reliability_check(self): - """Check the reliability of the query""" - if self.max_loops < 1: - raise ValueError("max_loops must be greater than 0") - - formatter.print_panel( - "DeepResearchSwarm is booting up...", "blue" - ) - formatter.print_panel("Reliability check passed", "green") - - def get_queries(self, query: str) -> List[str]: - """ - Generate a list of detailed search queries based on the input query. - - Args: - query (str): The main research query to explore - - Returns: - List[str]: A list of detailed search queries - """ - self.conversation.add(role="User", content=query) - - # Get the agent's response - agent_output = self.research_agent.run(query) - - # Transform the string into a list of dictionaries - agent_output = json.loads(agent_output) - print(agent_output) - print(type(agent_output)) - - formatter.print_panel( - f"Agent output type: {type(agent_output)} \n {agent_output}", - "blue", - ) - - # Convert the output to a dictionary if it's a list - if isinstance(agent_output, list): - agent_output = json.dumps(agent_output) - - if isinstance(agent_output, str): - # Convert the string output to dictionary - output_dict = ( - str_to_dict(agent_output) - if isinstance(agent_output, str) - else agent_output - ) - - # Extract the detailed queries from the output - # Search for the key "detailed_queries" in the output list[dictionary] - if isinstance(output_dict, list): - for item in output_dict: - if "detailed_queries" in item: - queries = item["detailed_queries"] - break - else: - queries = output_dict.get("detailed_queries", []) - - print(queries) - - # Log the number of queries generated - formatter.print_panel( - f"Generated {len(queries)} queries", "blue" - ) - - print(queries) - print(type(queries)) - - return queries - - def step(self, query: str): - """ - Execute a single research step with maximum parallelism. - - Args: - query (str): The research query to process - - Returns: - Formatted conversation history - """ - try: - # Get all the queries to process - queries = self.get_queries(query) - - print(queries) - - # Submit all queries for concurrent processing - futures = [] - for q in queries: - future = self.executor.submit(exa_search, q) - futures.append((q, future)) - - # Process results as they complete - for q, future in futures: - try: - # Get search results only - results = future.result() - - # Add search results to conversation - self.conversation.add( - role="User", - content=f"Search results for {q}: \n {results}", - ) - - except Exception as e: - # Handle any errors in the thread - error_msg = ( - f"Error processing query '{q}': {str(e)}" - ) - console.print(f"[bold red]{error_msg}[/bold red]") - self.conversation.add( - role="System", - content=error_msg, - ) - - # Generate final comprehensive analysis after all searches are complete - try: - final_summary = self.summarization_agent.run( - f"Please generate a comprehensive 4,000-word report analyzing the following content: {self.conversation.get_str()}" - ) - - self.conversation.add( - role=self.summarization_agent.agent_name, - content=final_summary, - ) - except Exception as e: - error_msg = ( - f"Error generating final summary: {str(e)}" - ) - console.print(f"[bold red]{error_msg}[/bold red]") - self.conversation.add( - role="System", - content=error_msg, - ) - - # Return formatted output - result = history_output_formatter( - self.conversation, type=self.output_type - ) - - # If output type is JSON, ensure it's properly formatted - if self.output_type.lower() == "json": - try: - import json - - if isinstance(result, str): - # Try to parse and reformat for pretty printing - parsed = json.loads(result) - return json.dumps( - parsed, indent=2, ensure_ascii=False - ) - except (json.JSONDecodeError, TypeError): - # If parsing fails, return as-is - pass - - return result - - except Exception as e: - error_msg = f"Critical error in step execution: {str(e)}" - console.print(f"[bold red]{error_msg}[/bold red]") - return ( - {"error": error_msg} - if self.output_type.lower() == "json" - else error_msg - ) - - def run(self, task: str): - return self.step(task) - - def batched_run(self, tasks: List[str]): - """ - Execute a list of research tasks in parallel. - - Args: - tasks (List[str]): A list of research tasks to execute - - Returns: - List[str]: A list of formatted conversation histories - """ - futures = [] - for task in tasks: - future = self.executor.submit(self.step, task) - futures.append((task, future)) - - -# Example usage -# if __name__ == "__main__": -# try: -# swarm = DeepResearchSwarm( -# output_type="json", -# ) -# result = swarm.step( -# "What is the active tariff situation with mexico? Only create 2 queries" -# ) - -# # Parse and display results in rich format with markdown export -# swarm.parse_and_display_results(result, export_markdown=True) - -# except Exception as e: -# print(f"Error running deep research swarm: {str(e)}") -# import traceback -# traceback.print_exc() diff --git a/swarms/structs/hierarchical_structured_communication_framework.py b/swarms/structs/hierarchical_structured_communication_framework.py index 41b4b663..c6206059 100644 --- a/swarms/structs/hierarchical_structured_communication_framework.py +++ b/swarms/structs/hierarchical_structured_communication_framework.py @@ -21,43 +21,46 @@ Key Features: """ import traceback -import time -from typing import Any, Callable, Dict, List, Literal, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union from dataclasses import dataclass from enum import Enum from pydantic import BaseModel, Field from rich.console import Console from rich.panel import Panel -from rich.text import Text from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table -from rich import print as rprint from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm -from swarms.structs.conversation import Conversation from swarms.utils.loguru_logger import initialize_logger from swarms.utils.output_types import OutputType # Initialize rich console for enhanced output console = Console() -logger = initialize_logger(log_folder="hierarchical_structured_communication_framework") +logger = initialize_logger( + log_folder="hierarchical_structured_communication_framework" +) # ============================================================================= # ENUMS AND DATA MODELS # ============================================================================= + class CommunicationType(str, Enum): """Types of communication in the structured protocol""" + MESSAGE = "message" # M_ij: Specific task instructions BACKGROUND = "background" # B_ij: Context and problem background - INTERMEDIATE_OUTPUT = "intermediate_output" # I_ij: Intermediate results + INTERMEDIATE_OUTPUT = ( + "intermediate_output" # I_ij: Intermediate results + ) class AgentRole(str, Enum): """Roles for agents in the hierarchical system""" + SUPERVISOR = "supervisor" GENERATOR = "generator" EVALUATOR = "evaluator" @@ -68,9 +71,16 @@ class AgentRole(str, Enum): @dataclass class StructuredMessage: """Structured communication message following HierarchicalStructuredComm protocol""" - message: str = Field(description="Specific task instructions (M_ij)") - background: str = Field(description="Context and problem background (B_ij)") - intermediate_output: str = Field(description="Intermediate results (I_ij)") + + message: str = Field( + description="Specific task instructions (M_ij)" + ) + background: str = Field( + description="Context and problem background (B_ij)" + ) + intermediate_output: str = Field( + description="Intermediate results (I_ij)" + ) sender: str = Field(description="Name of the sending agent") recipient: str = Field(description="Name of the receiving agent") timestamp: Optional[str] = None @@ -78,24 +88,26 @@ class StructuredMessage: class HierarchicalOrder(BaseModel): """Order structure for hierarchical task assignment""" - agent_name: str = Field(description="Name of the agent to receive the task") + + agent_name: str = Field( + description="Name of the agent to receive the task" + ) task: str = Field(description="Specific task description") communication_type: CommunicationType = Field( default=CommunicationType.MESSAGE, - description="Type of communication to use" + description="Type of communication to use", ) background_context: str = Field( - default="", - description="Background context for the task" + default="", description="Background context for the task" ) intermediate_output: str = Field( - default="", - description="Intermediate output to pass along" + default="", description="Intermediate output to pass along" ) class EvaluationResult(BaseModel): """Result from evaluation team member""" + evaluator_name: str = Field(description="Name of the evaluator") criterion: str = Field(description="Evaluation criterion") score: float = Field(description="Evaluation score") @@ -107,77 +119,142 @@ class EvaluationResult(BaseModel): # SCHEMAS # ============================================================================= + class StructuredMessageSchema(BaseModel): """Schema for structured communication messages""" - message: str = Field(description="Specific task instructions (M_ij)", min_length=1) - background: str = Field(description="Context and problem background (B_ij)", default="") - intermediate_output: str = Field(description="Intermediate results (I_ij)", default="") - sender: str = Field(description="Name of the sending agent", min_length=1) - recipient: str = Field(description="Name of the receiving agent", min_length=1) - timestamp: Optional[str] = Field(description="Timestamp of the message", default=None) - communication_type: CommunicationType = Field(description="Type of communication", default=CommunicationType.MESSAGE) + + message: str = Field( + description="Specific task instructions (M_ij)", min_length=1 + ) + background: str = Field( + description="Context and problem background (B_ij)", + default="", + ) + intermediate_output: str = Field( + description="Intermediate results (I_ij)", default="" + ) + sender: str = Field( + description="Name of the sending agent", min_length=1 + ) + recipient: str = Field( + description="Name of the receiving agent", min_length=1 + ) + timestamp: Optional[str] = Field( + description="Timestamp of the message", default=None + ) + communication_type: CommunicationType = Field( + description="Type of communication", + default=CommunicationType.MESSAGE, + ) class EvaluationResultSchema(BaseModel): """Schema for evaluation results""" - criterion: str = Field(description="Evaluation criterion", min_length=1) - score: float = Field(description="Evaluation score (0-10)", ge=0.0, le=10.0) - feedback: str = Field(description="Detailed feedback", min_length=1) - confidence: float = Field(description="Confidence level (0-1)", ge=0.0, le=1.0) - reasoning: str = Field(description="Evaluation reasoning", default="") - suggestions: List[str] = Field(description="Improvement suggestions", default=[]) + + criterion: str = Field( + description="Evaluation criterion", min_length=1 + ) + score: float = Field( + description="Evaluation score (0-10)", ge=0.0, le=10.0 + ) + feedback: str = Field( + description="Detailed feedback", min_length=1 + ) + confidence: float = Field( + description="Confidence level (0-1)", ge=0.0, le=1.0 + ) + reasoning: str = Field( + description="Evaluation reasoning", default="" + ) + suggestions: List[str] = Field( + description="Improvement suggestions", default=[] + ) class GeneratorResponseSchema(BaseModel): """Schema for generator responses""" - content: str = Field(description="Generated content", min_length=1) - intermediate_output: str = Field(description="Intermediate output for next agent", default="") - reasoning: str = Field(description="Generation reasoning", default="") - confidence: float = Field(description="Confidence level (0-1)", ge=0.0, le=1.0) + + content: str = Field( + description="Generated content", min_length=1 + ) + intermediate_output: str = Field( + description="Intermediate output for next agent", default="" + ) + reasoning: str = Field( + description="Generation reasoning", default="" + ) + confidence: float = Field( + description="Confidence level (0-1)", ge=0.0, le=1.0 + ) class EvaluatorResponseSchema(BaseModel): """Schema for evaluator responses""" - criterion: str = Field(description="Evaluation criterion", min_length=1) - score: float = Field(description="Evaluation score (0-10)", ge=0.0, le=10.0) - feedback: str = Field(description="Detailed feedback", min_length=1) - confidence: float = Field(description="Confidence level (0-1)", ge=0.0, le=1.0) - reasoning: str = Field(description="Evaluation reasoning", default="") - suggestions: List[str] = Field(description="Improvement suggestions", default=[]) + + criterion: str = Field( + description="Evaluation criterion", min_length=1 + ) + score: float = Field( + description="Evaluation score (0-10)", ge=0.0, le=10.0 + ) + feedback: str = Field( + description="Detailed feedback", min_length=1 + ) + confidence: float = Field( + description="Confidence level (0-1)", ge=0.0, le=1.0 + ) + reasoning: str = Field( + description="Evaluation reasoning", default="" + ) + suggestions: List[str] = Field( + description="Improvement suggestions", default=[] + ) class RefinerResponseSchema(BaseModel): """Schema for refiner responses""" - refined_content: str = Field(description="Refined content", min_length=1) - changes_made: List[str] = Field(description="List of changes made", default=[]) - reasoning: str = Field(description="Refinement reasoning", default="") - confidence: float = Field(description="Confidence level (0-1)", ge=0.0, le=1.0) - feedback_addressed: List[str] = Field(description="Feedback points addressed", default=[]) + + refined_content: str = Field( + description="Refined content", min_length=1 + ) + changes_made: List[str] = Field( + description="List of changes made", default=[] + ) + reasoning: str = Field( + description="Refinement reasoning", default="" + ) + confidence: float = Field( + description="Confidence level (0-1)", ge=0.0, le=1.0 + ) + feedback_addressed: List[str] = Field( + description="Feedback points addressed", default=[] + ) # ============================================================================= # SPECIALIZED AGENT CLASSES # ============================================================================= + class HierarchicalStructuredCommunicationGenerator(Agent): """ Generator agent for Hierarchical Structured Communication Framework - + This agent specializes in creating initial content following the structured communication protocol with Message (M_ij), Background (B_ij), and Intermediate Output (I_ij). """ - + def __init__( self, agent_name: str = "TalkHierGenerator", system_prompt: Optional[str] = None, model_name: str = "gpt-4o-mini", verbose: bool = False, - **kwargs + **kwargs, ): """ Initialize the HierarchicalStructuredCommunication Generator agent - + Args: agent_name: Name of the agent system_prompt: Custom system prompt @@ -186,15 +263,15 @@ class HierarchicalStructuredCommunicationGenerator(Agent): """ if system_prompt is None: system_prompt = self._get_default_generator_prompt() - + super().__init__( agent_name=agent_name, system_prompt=system_prompt, model_name=model_name, verbose=verbose, - **kwargs + **kwargs, ) - + def _get_default_generator_prompt(self) -> str: """Get the default system prompt for generator agents""" return """ @@ -233,129 +310,134 @@ Confidence: [0.0-1.0 confidence level] Always maintain high quality and provide detailed, actionable content. """ - + def generate_with_structure( self, message: str, background: str = "", intermediate_output: str = "", - **kwargs + **kwargs, ) -> GeneratorResponseSchema: """ Generate content using structured communication protocol - + Args: message: Specific task message (M_ij) background: Background context (B_ij) intermediate_output: Intermediate output (I_ij) - + Returns: GeneratorResponseSchema with structured response """ try: # Construct structured prompt - prompt = self._construct_structured_prompt(message, background, intermediate_output) - + prompt = self._construct_structured_prompt( + message, background, intermediate_output + ) + # Generate response response = self.run(prompt, **kwargs) - + # Parse and structure response return self._parse_generator_response(response) - + except Exception as e: logger.error(f"Error in structured generation: {e}") return GeneratorResponseSchema( content=f"Error generating content: {e}", intermediate_output="", reasoning="Error occurred during generation", - confidence=0.0 + confidence=0.0, ) - + def _construct_structured_prompt( - self, - message: str, - background: str, - intermediate_output: str + self, message: str, background: str, intermediate_output: str ) -> str: """Construct a structured prompt for generation""" prompt_parts = [] - + if message: prompt_parts.append(f"**Task Message (M_ij):** {message}") - + if background: - prompt_parts.append(f"**Background Context (B_ij):** {background}") - + prompt_parts.append( + f"**Background Context (B_ij):** {background}" + ) + if intermediate_output: - prompt_parts.append(f"**Intermediate Output (I_ij):** {intermediate_output}") - + prompt_parts.append( + f"**Intermediate Output (I_ij):** {intermediate_output}" + ) + prompt = "\n\n".join(prompt_parts) prompt += "\n\nPlease generate content following the structured response format." - + return prompt - - def _parse_generator_response(self, response: str) -> GeneratorResponseSchema: + + def _parse_generator_response( + self, response: str + ) -> GeneratorResponseSchema: """Parse the generator response into structured format""" try: - lines = response.split('\n') + lines = response.split("\n") content = "" intermediate_output = "" reasoning = "" confidence = 0.8 # Default confidence - + current_section = None - + for line in lines: line = line.strip() if not line: continue - - if line.lower().startswith('content:'): - current_section = 'content' + + if line.lower().startswith("content:"): + current_section = "content" content = line[8:].strip() - elif line.lower().startswith('intermediate output:'): - current_section = 'intermediate' + elif line.lower().startswith("intermediate output:"): + current_section = "intermediate" intermediate_output = line[20:].strip() - elif line.lower().startswith('reasoning:'): - current_section = 'reasoning' + elif line.lower().startswith("reasoning:"): + current_section = "reasoning" reasoning = line[10:].strip() - elif line.lower().startswith('confidence:'): + elif line.lower().startswith("confidence:"): try: confidence = float(line[11:].strip()) except ValueError: confidence = 0.8 - elif current_section == 'content': + elif current_section == "content": content += " " + line - elif current_section == 'intermediate': + elif current_section == "intermediate": intermediate_output += " " + line - elif current_section == 'reasoning': + elif current_section == "reasoning": reasoning += " " + line - + return GeneratorResponseSchema( content=content or response, intermediate_output=intermediate_output, reasoning=reasoning, - confidence=confidence + confidence=confidence, ) - + except Exception as e: logger.error(f"Error parsing generator response: {e}") return GeneratorResponseSchema( content=response, intermediate_output="", reasoning="Error parsing response", - confidence=0.5 + confidence=0.5, ) class HierarchicalStructuredCommunicationEvaluator(Agent): """ Evaluator agent for Hierarchical Structured Communication Framework - + This agent specializes in evaluating content using specific criteria and providing structured feedback following the hierarchical evaluation system. """ - + def __init__( self, agent_name: str = "TalkHierEvaluator", @@ -363,11 +445,11 @@ class HierarchicalStructuredCommunicationEvaluator(Agent): model_name: str = "gpt-4o-mini", verbose: bool = False, evaluation_criteria: List[str] = None, - **kwargs + **kwargs, ): """ Initialize the HierarchicalStructuredCommunication Evaluator agent - + Args: agent_name: Name of the agent system_prompt: Custom system prompt @@ -376,25 +458,41 @@ class HierarchicalStructuredCommunicationEvaluator(Agent): evaluation_criteria: List of evaluation criteria this agent can assess """ if system_prompt is None: - system_prompt = self._get_default_evaluator_prompt(evaluation_criteria) - + system_prompt = self._get_default_evaluator_prompt( + evaluation_criteria + ) + super().__init__( agent_name=agent_name, system_prompt=system_prompt, model_name=model_name, verbose=verbose, - **kwargs + **kwargs, ) - - self.evaluation_criteria = evaluation_criteria or ["accuracy", "completeness", "clarity", "relevance"] - - def _get_default_evaluator_prompt(self, criteria: List[str] = None) -> str: + + self.evaluation_criteria = evaluation_criteria or [ + "accuracy", + "completeness", + "clarity", + "relevance", + ] + + def _get_default_evaluator_prompt( + self, criteria: List[str] = None + ) -> str: """Get the default system prompt for evaluator agents""" if criteria is None: - criteria = ["accuracy", "completeness", "clarity", "relevance"] - - criteria_text = "\n".join([f"- {criterion}" for criterion in criteria]) - + criteria = [ + "accuracy", + "completeness", + "clarity", + "relevance", + ] + + criteria_text = "\n".join( + [f"- {criterion}" for criterion in criteria] + ) + return f""" You are an Evaluator agent in a Hierarchical Structured Communication Framework. @@ -435,33 +533,32 @@ Suggestions: [Specific improvement suggestions] Be thorough, fair, and constructive in your evaluations. """ - + def evaluate_with_criterion( - self, - content: str, - criterion: str, - **kwargs + self, content: str, criterion: str, **kwargs ) -> EvaluatorResponseSchema: """ Evaluate content using a specific criterion - + Args: content: Content to evaluate criterion: Specific evaluation criterion - + Returns: EvaluatorResponseSchema with evaluation results """ try: # Construct evaluation prompt - prompt = self._construct_evaluation_prompt(content, criterion) - + prompt = self._construct_evaluation_prompt( + content, criterion + ) + # Get evaluation response response = self.run(prompt, **kwargs) - + # Parse and structure response return self._parse_evaluator_response(response, criterion) - + except Exception as e: logger.error(f"Error in evaluation: {e}") return EvaluatorResponseSchema( @@ -470,10 +567,15 @@ Be thorough, fair, and constructive in your evaluations. feedback=f"Error during evaluation: {e}", confidence=0.0, reasoning="Error occurred during evaluation", - suggestions=["Fix technical issues", "Retry evaluation"] + suggestions=[ + "Fix technical issues", + "Retry evaluation", + ], ) - - def _construct_evaluation_prompt(self, content: str, criterion: str) -> str: + + def _construct_evaluation_prompt( + self, content: str, criterion: str + ) -> str: """Construct an evaluation prompt""" return f""" **Content to Evaluate:** @@ -486,61 +588,63 @@ Please evaluate the content above based on the {criterion} criterion. Provide your evaluation following the structured response format. """ - - def _parse_evaluator_response(self, response: str, criterion: str) -> EvaluatorResponseSchema: + + def _parse_evaluator_response( + self, response: str, criterion: str + ) -> EvaluatorResponseSchema: """Parse the evaluator response into structured format""" try: - lines = response.split('\n') + lines = response.split("\n") score = 5.0 # Default score feedback = "" confidence = 0.8 # Default confidence reasoning = "" suggestions = [] - + current_section = None - + for line in lines: line = line.strip() if not line: continue - - if line.lower().startswith('score:'): + + if line.lower().startswith("score:"): try: score = float(line[6:].strip()) except ValueError: score = 5.0 - elif line.lower().startswith('feedback:'): - current_section = 'feedback' + elif line.lower().startswith("feedback:"): + current_section = "feedback" feedback = line[9:].strip() - elif line.lower().startswith('confidence:'): + elif line.lower().startswith("confidence:"): try: confidence = float(line[11:].strip()) except ValueError: confidence = 0.8 - elif line.lower().startswith('reasoning:'): - current_section = 'reasoning' + elif line.lower().startswith("reasoning:"): + current_section = "reasoning" reasoning = line[10:].strip() - elif line.lower().startswith('suggestions:'): - current_section = 'suggestions' - elif current_section == 'feedback': + elif line.lower().startswith("suggestions:"): + current_section = "suggestions" + elif current_section == "feedback": feedback += " " + line - elif current_section == 'reasoning': + elif current_section == "reasoning": reasoning += " " + line - elif current_section == 'suggestions': - if line.startswith('-') or line.startswith('•'): + elif current_section == "suggestions": + if line.startswith("-") or line.startswith("•"): suggestions.append(line[1:].strip()) else: suggestions.append(line) - + return EvaluatorResponseSchema( criterion=criterion, score=score, feedback=feedback or "No feedback provided", confidence=confidence, reasoning=reasoning, - suggestions=suggestions + suggestions=suggestions, ) - + except Exception as e: logger.error(f"Error parsing evaluator response: {e}") return EvaluatorResponseSchema( @@ -549,29 +653,29 @@ Provide your evaluation following the structured response format. feedback="Error parsing evaluation response", confidence=0.0, reasoning="Error occurred during parsing", - suggestions=["Fix parsing issues"] + suggestions=["Fix parsing issues"], ) class HierarchicalStructuredCommunicationRefiner(Agent): """ Refiner agent for Hierarchical Structured Communication Framework - + This agent specializes in improving content based on evaluation feedback and maintaining the structured communication protocol. """ - + def __init__( self, agent_name: str = "TalkHierRefiner", system_prompt: Optional[str] = None, model_name: str = "gpt-4o-mini", verbose: bool = False, - **kwargs + **kwargs, ): """ Initialize the HierarchicalStructuredCommunication Refiner agent - + Args: agent_name: Name of the agent system_prompt: Custom system prompt @@ -580,15 +684,15 @@ class HierarchicalStructuredCommunicationRefiner(Agent): """ if system_prompt is None: system_prompt = self._get_default_refiner_prompt() - + super().__init__( agent_name=agent_name, system_prompt=system_prompt, model_name=model_name, verbose=verbose, - **kwargs + **kwargs, ) - + def _get_default_refiner_prompt(self) -> str: """Get the default system prompt for refiner agents""" return """ @@ -625,33 +729,37 @@ Feedback Addressed: [Which feedback points were addressed] Focus on meaningful improvements that directly address the evaluation feedback. """ - + def refine_with_feedback( self, original_content: str, evaluation_results: List[EvaluationResultSchema], - **kwargs + **kwargs, ) -> RefinerResponseSchema: """ Refine content based on evaluation feedback - + Args: original_content: Original content to refine evaluation_results: List of evaluation results with feedback - + Returns: RefinerResponseSchema with refined content """ try: # Construct refinement prompt - prompt = self._construct_refinement_prompt(original_content, evaluation_results) - + prompt = self._construct_refinement_prompt( + original_content, evaluation_results + ) + # Get refinement response response = self.run(prompt, **kwargs) - + # Parse and structure response - return self._parse_refiner_response(response, evaluation_results) - + return self._parse_refiner_response( + response, evaluation_results + ) + except Exception as e: logger.error(f"Error in refinement: {e}") return RefinerResponseSchema( @@ -659,20 +767,22 @@ Focus on meaningful improvements that directly address the evaluation feedback. changes_made=["Error occurred during refinement"], reasoning=f"Error during refinement: {e}", confidence=0.0, - feedback_addressed=[] + feedback_addressed=[], ) - + def _construct_refinement_prompt( self, original_content: str, - evaluation_results: List[EvaluationResultSchema] + evaluation_results: List[EvaluationResultSchema], ) -> str: """Construct a refinement prompt""" - feedback_summary = "\n\n".join([ - f"**{result.criterion} (Score: {result.score}/10):**\n{result.feedback}" - for result in evaluation_results - ]) - + feedback_summary = "\n\n".join( + [ + f"**{result.criterion} (Score: {result.score}/10):**\n{result.feedback}" + for result in evaluation_results + ] + ) + return f""" **Original Content:** {original_content} @@ -684,70 +794,70 @@ Please refine the content to address the feedback while maintaining its core str Provide your refinement following the structured response format. """ - + def _parse_refiner_response( self, response: str, - evaluation_results: List[EvaluationResultSchema] + evaluation_results: List[EvaluationResultSchema], ) -> RefinerResponseSchema: """Parse the refiner response into structured format""" try: - lines = response.split('\n') + lines = response.split("\n") refined_content = "" changes_made = [] reasoning = "" confidence = 0.8 # Default confidence feedback_addressed = [] - + current_section = None - + for line in lines: line = line.strip() if not line: continue - - if line.lower().startswith('refined content:'): - current_section = 'content' + + if line.lower().startswith("refined content:"): + current_section = "content" refined_content = line[16:].strip() - elif line.lower().startswith('changes made:'): - current_section = 'changes' - elif line.lower().startswith('reasoning:'): - current_section = 'reasoning' + elif line.lower().startswith("changes made:"): + current_section = "changes" + elif line.lower().startswith("reasoning:"): + current_section = "reasoning" reasoning = line[10:].strip() - elif line.lower().startswith('confidence:'): + elif line.lower().startswith("confidence:"): try: confidence = float(line[11:].strip()) except ValueError: confidence = 0.8 - elif line.lower().startswith('feedback addressed:'): - current_section = 'feedback' - elif current_section == 'content': + elif line.lower().startswith("feedback addressed:"): + current_section = "feedback" + elif current_section == "content": refined_content += " " + line - elif current_section == 'changes': - if line.startswith('-') or line.startswith('•'): + elif current_section == "changes": + if line.startswith("-") or line.startswith("•"): changes_made.append(line[1:].strip()) else: changes_made.append(line) - elif current_section == 'reasoning': + elif current_section == "reasoning": reasoning += " " + line - elif current_section == 'feedback': - if line.startswith('-') or line.startswith('•'): + elif current_section == "feedback": + if line.startswith("-") or line.startswith("•"): feedback_addressed.append(line[1:].strip()) else: feedback_addressed.append(line) - + # If no refined content found, use original if not refined_content: refined_content = response - + return RefinerResponseSchema( refined_content=refined_content, changes_made=changes_made, reasoning=reasoning, confidence=confidence, - feedback_addressed=feedback_addressed + feedback_addressed=feedback_addressed, ) - + except Exception as e: logger.error(f"Error parsing refiner response: {e}") return RefinerResponseSchema( @@ -755,29 +865,29 @@ Provide your refinement following the structured response format. changes_made=["Error parsing response"], reasoning="Error occurred during parsing", confidence=0.0, - feedback_addressed=[] + feedback_addressed=[], ) class HierarchicalStructuredCommunicationSupervisor(Agent): """ Supervisor agent for Hierarchical Structured Communication Framework - + This agent coordinates the overall workflow and manages structured communication between different agent types. """ - + def __init__( self, agent_name: str = "TalkHierSupervisor", system_prompt: Optional[str] = None, model_name: str = "gpt-4o-mini", verbose: bool = False, - **kwargs + **kwargs, ): """ Initialize the HierarchicalStructuredCommunication Supervisor agent - + Args: agent_name: Name of the agent system_prompt: Custom system prompt @@ -786,15 +896,15 @@ class HierarchicalStructuredCommunicationSupervisor(Agent): """ if system_prompt is None: system_prompt = self._get_default_supervisor_prompt() - + super().__init__( agent_name=agent_name, system_prompt=system_prompt, model_name=model_name, verbose=verbose, - **kwargs + **kwargs, ) - + def _get_default_supervisor_prompt(self) -> str: """Get the default system prompt for supervisor agents""" return """ @@ -838,33 +948,32 @@ Reasoning: [Why this decision was made] Focus on efficient coordination and high-quality outcomes. """ - + def coordinate_workflow( - self, - task: str, - current_state: Dict[str, Any], - **kwargs + self, task: str, current_state: Dict[str, Any], **kwargs ) -> Dict[str, Any]: """ Coordinate the workflow and determine next actions - + Args: task: Current task being processed current_state: Current state of the workflow - + Returns: Dictionary with coordination decisions """ try: # Construct coordination prompt - prompt = self._construct_coordination_prompt(task, current_state) - + prompt = self._construct_coordination_prompt( + task, current_state + ) + # Get coordination response response = self.run(prompt, **kwargs) - + # Parse and structure response return self._parse_coordination_response(response) - + except Exception as e: logger.error(f"Error in workflow coordination: {e}") return { @@ -873,16 +982,20 @@ Focus on efficient coordination and high-quality outcomes. "structured_message": f"Error in coordination: {e}", "background_context": "", "intermediate_output": "", - "reasoning": "Error occurred during coordination" + "reasoning": "Error occurred during coordination", } - - def _construct_coordination_prompt(self, task: str, current_state: Dict[str, Any]) -> str: + + def _construct_coordination_prompt( + self, task: str, current_state: Dict[str, Any] + ) -> str: """Construct a coordination prompt""" - state_summary = "\n".join([ - f"- {key}: {value}" - for key, value in current_state.items() - ]) - + state_summary = "\n".join( + [ + f"- {key}: {value}" + for key, value in current_state.items() + ] + ) + return f""" **Current Task:** {task} @@ -894,54 +1007,56 @@ Please coordinate the workflow and determine the next action. Provide your coordination decision following the structured response format. """ - - def _parse_coordination_response(self, response: str) -> Dict[str, Any]: + + def _parse_coordination_response( + self, response: str + ) -> Dict[str, Any]: """Parse the coordination response""" try: - lines = response.split('\n') + lines = response.split("\n") result = { "next_action": "continue", "target_agent": "generator", "structured_message": "", "background_context": "", "intermediate_output": "", - "reasoning": "" + "reasoning": "", } - + current_section = None - + for line in lines: line = line.strip() if not line: continue - - if line.lower().startswith('next action:'): + + if line.lower().startswith("next action:"): result["next_action"] = line[12:].strip() - elif line.lower().startswith('target agent:'): + elif line.lower().startswith("target agent:"): result["target_agent"] = line[13:].strip() - elif line.lower().startswith('structured message:'): - current_section = 'message' + elif line.lower().startswith("structured message:"): + current_section = "message" result["structured_message"] = line[19:].strip() - elif line.lower().startswith('background context:'): - current_section = 'background' + elif line.lower().startswith("background context:"): + current_section = "background" result["background_context"] = line[19:].strip() - elif line.lower().startswith('intermediate output:'): - current_section = 'output' + elif line.lower().startswith("intermediate output:"): + current_section = "output" result["intermediate_output"] = line[20:].strip() - elif line.lower().startswith('reasoning:'): - current_section = 'reasoning' + elif line.lower().startswith("reasoning:"): + current_section = "reasoning" result["reasoning"] = line[10:].strip() - elif current_section == 'message': + elif current_section == "message": result["structured_message"] += " " + line - elif current_section == 'background': + elif current_section == "background": result["background_context"] += " " + line - elif current_section == 'output': + elif current_section == "output": result["intermediate_output"] += " " + line - elif current_section == 'reasoning': + elif current_section == "reasoning": result["reasoning"] += " " + line - + return result - + except Exception as e: logger.error(f"Error parsing coordination response: {e}") return { @@ -950,30 +1065,31 @@ Provide your coordination decision following the structured response format. "structured_message": "Error parsing response", "background_context": "", "intermediate_output": "", - "reasoning": "Error occurred during parsing" - } + "reasoning": "Error occurred during parsing", + } # ============================================================================= # MAIN SWARM ORCHESTRATOR # ============================================================================= + class HierarchicalStructuredCommunicationFramework(BaseSwarm): """ Talk Structurally, Act Hierarchically: A Collaborative Framework for LLM Multi-Agent Systems - + This is the main orchestrator class that implements the complete HierarchicalStructuredComm approach with: 1. Structured Communication Protocol 2. Hierarchical Refinement System 3. Graph-based Agent Orchestration - + Architecture: - Supervisor Agent: Coordinates the overall workflow - Generator Agents: Create initial content/solutions - Evaluator Team: Hierarchical evaluation with supervisor - Refiner Agents: Improve solutions based on feedback """ - + def __init__( self, name: str = "HierarchicalStructuredCommunicationFramework", @@ -982,7 +1098,9 @@ class HierarchicalStructuredCommunicationFramework(BaseSwarm): generators: List[Union[Agent, Callable, Any]] = None, evaluators: List[Union[Agent, Callable, Any]] = None, refiners: List[Union[Agent, Callable, Any]] = None, - evaluation_supervisor: Optional[Union[Agent, Callable, Any]] = None, + evaluation_supervisor: Optional[ + Union[Agent, Callable, Any] + ] = None, max_loops: int = 3, output_type: OutputType = "dict-all-except-first", supervisor_name: str = "Supervisor", @@ -1000,13 +1118,13 @@ class HierarchicalStructuredCommunicationFramework(BaseSwarm): ): """ Initialize the HierarchicalStructuredCommunicationFramework - + Args: name: Name of the swarm description: Description of the swarm supervisor: Main supervisor agent generators: List of generator agents - evaluators: List of evaluator agents + evaluators: List of evaluator agents refiners: List of refiner agents evaluation_supervisor: Supervisor for evaluation team max_loops: Maximum number of refinement loops @@ -1035,22 +1153,26 @@ class HierarchicalStructuredCommunicationFramework(BaseSwarm): self.supervisor_name = supervisor_name self.evaluation_supervisor_name = evaluation_supervisor_name self.verbose = verbose - self.enable_structured_communication = enable_structured_communication - self.enable_hierarchical_evaluation = enable_hierarchical_evaluation + self.enable_structured_communication = ( + enable_structured_communication + ) + self.enable_hierarchical_evaluation = ( + enable_hierarchical_evaluation + ) self.shared_memory = shared_memory self.model_name = model_name self.use_ollama = use_ollama self.ollama_base_url = ollama_base_url self.ollama_api_key = ollama_api_key - + # Communication and state management self.conversation_history: List[StructuredMessage] = [] self.intermediate_outputs: Dict[str, str] = {} self.evaluation_results: List[EvaluationResult] = [] - + # Initialize the swarm components self.init_swarm() - + # Collect all agents for the parent class all_agents = [] if self.supervisor: @@ -1060,155 +1182,185 @@ class HierarchicalStructuredCommunicationFramework(BaseSwarm): all_agents.extend(self.refiners) if self.evaluation_supervisor: all_agents.append(self.evaluation_supervisor) - + # Call parent constructor with agents super().__init__(agents=all_agents, *args, **kwargs) - + def init_swarm(self): """Initialize the swarm components""" # Enhanced logging with rich formatting - console.print(Panel( - f"[bold blue]Initializing {self.name}[/bold blue]\n" - f"[dim]Framework: Talk Structurally, Act Hierarchically[/dim]", - title="Framework Initialization", - border_style="blue" - )) + console.print( + Panel( + f"[bold blue]Initializing {self.name}[/bold blue]\n" + f"[dim]Framework: Talk Structurally, Act Hierarchically[/dim]", + title="Framework Initialization", + border_style="blue", + ) + ) logger.info(f"Initializing {self.name}") - + # Setup supervisor if not provided if self.supervisor is None: self.supervisor = self._create_supervisor_agent() - + # Setup evaluation supervisor if not provided - if self.evaluation_supervisor is None and self.enable_hierarchical_evaluation: - self.evaluation_supervisor = self._create_evaluation_supervisor_agent() - + if ( + self.evaluation_supervisor is None + and self.enable_hierarchical_evaluation + ): + self.evaluation_supervisor = ( + self._create_evaluation_supervisor_agent() + ) + # Setup default agents if none provided if not self.generators: self.generators = [self._create_default_generator()] - - if not self.evaluators and self.enable_hierarchical_evaluation: + + if ( + not self.evaluators + and self.enable_hierarchical_evaluation + ): self.evaluators = [self._create_default_evaluator()] - + if not self.refiners: self.refiners = [self._create_default_refiner()] - + # Enhanced status display table = Table(title="Framework Components") table.add_column("Component", style="cyan", no_wrap=True) table.add_column("Count", style="magenta") table.add_column("Status", style="green") - - table.add_row("Generators", str(len(self.generators)), "Ready") - table.add_row("Evaluators", str(len(self.evaluators)), "Ready") + + table.add_row( + "Generators", str(len(self.generators)), "Ready" + ) + table.add_row( + "Evaluators", str(len(self.evaluators)), "Ready" + ) table.add_row("Refiners", str(len(self.refiners)), "Ready") - table.add_row("Supervisors", str(1 if self.supervisor else 0), "Ready") - + table.add_row( + "Supervisors", str(1 if self.supervisor else 0), "Ready" + ) + console.print(table) - - logger.info(f"Swarm initialized with {len(self.generators)} generators, " - f"{len(self.evaluators)} evaluators, {len(self.refiners)} refiners") - + + logger.info( + f"Swarm initialized with {len(self.generators)} generators, " + f"{len(self.evaluators)} evaluators, {len(self.refiners)} refiners" + ) + def _create_supervisor_agent(self) -> Agent: """Create the main supervisor agent""" supervisor_prompt = self._get_supervisor_prompt() - + agent_kwargs = { "agent_name": self.supervisor_name, "system_prompt": supervisor_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _create_evaluation_supervisor_agent(self) -> Agent: """Create the evaluation team supervisor""" - eval_supervisor_prompt = self._get_evaluation_supervisor_prompt() - + eval_supervisor_prompt = ( + self._get_evaluation_supervisor_prompt() + ) + agent_kwargs = { "agent_name": self.evaluation_supervisor_name, "system_prompt": eval_supervisor_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _create_default_generator(self) -> Agent: """Create a default generator agent""" generator_prompt = self._get_generator_prompt() - + agent_kwargs = { "agent_name": "Generator", "system_prompt": generator_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _create_default_evaluator(self) -> Agent: """Create a default evaluator agent""" evaluator_prompt = self._get_evaluator_prompt() - + agent_kwargs = { "agent_name": "Evaluator", "system_prompt": evaluator_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _create_default_refiner(self) -> Agent: """Create a default refiner agent""" refiner_prompt = self._get_refiner_prompt() - + agent_kwargs = { "agent_name": "Refiner", "system_prompt": refiner_prompt, "model_name": self.model_name, "verbose": self.verbose, - "reliability_check": False + "reliability_check": False, } - + if self.use_ollama: - agent_kwargs.update({ - "openai_api_base": self.ollama_base_url, - "openai_api_key": self.ollama_api_key - }) - + agent_kwargs.update( + { + "openai_api_base": self.ollama_base_url, + "openai_api_key": self.ollama_api_key, + } + ) + return Agent(**agent_kwargs) - + def _get_supervisor_prompt(self) -> str: """Get the supervisor system prompt""" return f""" @@ -1231,7 +1383,7 @@ Available agents: Always provide structured communication with clear message, background context, and intermediate outputs. """ - + def _get_evaluation_supervisor_prompt(self) -> str: """Get the evaluation supervisor system prompt""" return f""" @@ -1250,7 +1402,7 @@ Evaluation criteria to coordinate: Always provide summarized, coordinated feedback that balances diverse evaluator inputs. """ - + def _get_generator_prompt(self) -> str: """Get the generator agent system prompt""" return """ @@ -1269,7 +1421,7 @@ When receiving tasks: Always structure your response clearly and provide sufficient detail for evaluation. """ - + def _get_evaluator_prompt(self) -> str: """Get the evaluator agent system prompt""" return """ @@ -1292,7 +1444,7 @@ Always provide: - Detailed feedback - Confidence level (0-1) """ - + def _get_refiner_prompt(self) -> str: """Get the refiner agent system prompt""" return """ @@ -1311,25 +1463,25 @@ When refining: Always explain your refinements and how they address the evaluation feedback. """ - + def send_structured_message( self, sender: str, recipient: str, message: str, background: str = "", - intermediate_output: str = "" + intermediate_output: str = "", ) -> StructuredMessage: """ Send a structured message following the HierarchicalStructuredComm protocol - + Args: sender: Name of the sending agent recipient: Name of the receiving agent message: Specific task message (M_ij) background: Background context (B_ij) intermediate_output: Intermediate output (I_ij) - + Returns: StructuredMessage object """ @@ -1338,184 +1490,234 @@ Always explain your refinements and how they address the evaluation feedback. background=background, intermediate_output=intermediate_output, sender=sender, - recipient=recipient + recipient=recipient, ) - + self.conversation_history.append(structured_msg) - + if self.verbose: # Enhanced structured message display - console.print(Panel( - f"[bold green]Message Sent[/bold green]\n" - f"[cyan]From:[/cyan] {sender}\n" - f"[cyan]To:[/cyan] {recipient}\n" - f"[cyan]Message:[/cyan] {message[:100]}{'...' if len(message) > 100 else ''}", - title="Structured Communication", - border_style="green" - )) - logger.info(f"Structured message sent from {sender} to {recipient}") + console.print( + Panel( + f"[bold green]Message Sent[/bold green]\n" + f"[cyan]From:[/cyan] {sender}\n" + f"[cyan]To:[/cyan] {recipient}\n" + f"[cyan]Message:[/cyan] {message[:100]}{'...' if len(message) > 100 else ''}", + title="Structured Communication", + border_style="green", + ) + ) + logger.info( + f"Structured message sent from {sender} to {recipient}" + ) logger.info(f"Message: {message[:100]}...") - + return structured_msg - + def run_hierarchical_evaluation( - self, - content: str, - evaluation_criteria: List[str] = None + self, content: str, evaluation_criteria: List[str] = None ) -> List[EvaluationResult]: """ Run hierarchical evaluation with multiple evaluators - + Args: content: Content to evaluate evaluation_criteria: List of evaluation criteria - + Returns: List of evaluation results """ if not self.enable_hierarchical_evaluation: return [] - + if evaluation_criteria is None: - evaluation_criteria = ["accuracy", "completeness", "clarity", "relevance"] - + evaluation_criteria = [ + "accuracy", + "completeness", + "clarity", + "relevance", + ] + results = [] - + # Run evaluations in parallel for i, evaluator in enumerate(self.evaluators): - criterion = evaluation_criteria[i % len(evaluation_criteria)] - + criterion = evaluation_criteria[ + i % len(evaluation_criteria) + ] + # Create structured message for evaluator eval_message = f"Evaluate the following content based on {criterion} criterion" eval_background = f"Evaluation criterion: {criterion}\nContent to evaluate: {content}" - - structured_msg = self.send_structured_message( + + self.send_structured_message( sender=self.evaluation_supervisor_name, - recipient=evaluator.agent_name if hasattr(evaluator, 'agent_name') else f"Evaluator_{i}", + recipient=( + evaluator.agent_name + if hasattr(evaluator, "agent_name") + else f"Evaluator_{i}" + ), message=eval_message, background=eval_background, - intermediate_output=content + intermediate_output=content, ) - + # Get evaluation result try: - if hasattr(evaluator, 'run'): + if hasattr(evaluator, "run"): eval_response = evaluator.run( f"Evaluate this content for {criterion}:\n{content}\n\nProvide: 1) Score (0-10), 2) Detailed feedback, 3) Confidence (0-1)" ) - + # Parse evaluation result (simplified parsing) result = EvaluationResult( - evaluator_name=evaluator.agent_name if hasattr(evaluator, 'agent_name') else f"Evaluator_{i}", + evaluator_name=( + evaluator.agent_name + if hasattr(evaluator, "agent_name") + else f"Evaluator_{i}" + ), criterion=criterion, score=7.5, # Default score, would need proper parsing feedback=eval_response, - confidence=0.8 # Default confidence + confidence=0.8, # Default confidence ) results.append(result) - + except Exception as e: logger.error(f"Error in evaluation: {e}") continue - + # Get summarized feedback from evaluation supervisor if self.evaluation_supervisor and results: summary_prompt = f"Summarize these evaluation results:\n{results}\n\nProvide coordinated, actionable feedback." - + try: - if hasattr(self.evaluation_supervisor, 'run'): - summary_feedback = self.evaluation_supervisor.run(summary_prompt) - logger.info(f"Evaluation summary: {summary_feedback}") + if hasattr(self.evaluation_supervisor, "run"): + summary_feedback = self.evaluation_supervisor.run( + summary_prompt + ) + logger.info( + f"Evaluation summary: {summary_feedback}" + ) except Exception as e: logger.error(f"Error in evaluation summary: {e}") - + self.evaluation_results.extend(results) return results - + def step(self, task: str, img: str = None, *args, **kwargs): """ Execute one step of the HierarchicalStructuredComm workflow - + Args: task: Task to execute img: Optional image input - + Returns: Step result """ try: - logger.info(f"Executing HierarchicalStructuredComm step for task: {task[:100]}...") - + logger.info( + f"Executing HierarchicalStructuredComm step for task: {task[:100]}..." + ) + # Safety check: prevent recursive task processing - if len(task) > 1000: # If task is too long, it might be recursive - logger.warning("Task too long, possible recursive call detected") - return {"error": "Task too long, possible recursive call"} - + if ( + len(task) > 1000 + ): # If task is too long, it might be recursive + logger.warning( + "Task too long, possible recursive call detected" + ) + return { + "error": "Task too long, possible recursive call" + } + # Step 1: Generate initial content generator_result = self._generate_content(task) - + # Safety check: prevent empty or error results - if not generator_result or generator_result.startswith("Error"): + if not generator_result or generator_result.startswith( + "Error" + ): logger.error(f"Generator failed: {generator_result}") - return {"error": f"Generator failed: {generator_result}"} - + return { + "error": f"Generator failed: {generator_result}" + } + # Step 2: Evaluate content hierarchically - evaluation_results = self.run_hierarchical_evaluation(generator_result) - + evaluation_results = self.run_hierarchical_evaluation( + generator_result + ) + # Step 3: Refine content based on evaluation - refined_result = self._refine_content(generator_result, evaluation_results) - + refined_result = self._refine_content( + generator_result, evaluation_results + ) + # Safety check: ensure we have a valid result if not refined_result: refined_result = generator_result - + return { "generator_result": generator_result, "evaluation_results": evaluation_results, "refined_result": refined_result, - "conversation_history": self.conversation_history + "conversation_history": self.conversation_history, } - + except Exception as e: - logger.error(f"Error in HierarchicalStructuredComm step: {e}") + logger.error( + f"Error in HierarchicalStructuredComm step: {e}" + ) logger.error(traceback.format_exc()) return {"error": str(e)} - + def _generate_content(self, task: str) -> str: """Generate initial content using generator agents""" if not self.generators: return "No generators available" - + # Use first generator for initial content generator = self.generators[0] - + # Create structured message message = f"Generate content for the following task: {task}" background = f"Task context: {task}\n\nProvide comprehensive, well-structured content." - - structured_msg = self.send_structured_message( + + self.send_structured_message( sender=self.supervisor_name, - recipient=generator.agent_name if hasattr(generator, 'agent_name') else "Generator", + recipient=( + generator.agent_name + if hasattr(generator, "agent_name") + else "Generator" + ), message=message, - background=background + background=background, ) - + try: - if hasattr(generator, 'run'): + if hasattr(generator, "run"): # Add a simple, focused prompt to prevent recursive calls prompt = f"Task: {task}\n\nGenerate a clear, concise response. Do not repeat the task or ask for clarification." - + result = generator.run(prompt) - + # Safety check: prevent recursive or overly long responses if len(result) > 2000: result = result[:2000] + "... [truncated]" - + # Safety check: prevent responses that just repeat the task - if task.lower() in result.lower() and len(result) < len(task) * 2: - logger.warning("Generator response appears to be recursive") - return "Error: Generator produced recursive response" - + if ( + task.lower() in result.lower() + and len(result) < len(task) * 2 + ): + logger.warning( + "Generator response appears to be recursive" + ) + return ( + "Error: Generator produced recursive response" + ) + self.intermediate_outputs["generator"] = result return result else: @@ -1523,38 +1725,50 @@ Always explain your refinements and how they address the evaluation feedback. except Exception as e: logger.error(f"Error in content generation: {e}") return f"Error generating content: {e}" - - def _refine_content(self, original_content: str, evaluation_results: List[EvaluationResult]) -> str: + + def _refine_content( + self, + original_content: str, + evaluation_results: List[EvaluationResult], + ) -> str: """Refine content based on evaluation feedback""" if not self.refiners: return original_content - + if not evaluation_results: return original_content - + # Use first refiner refiner = self.refiners[0] - + # Create feedback summary - feedback_summary = "\n".join([ - f"{result.criterion}: {result.feedback} (Score: {result.score}/10)" - for result in evaluation_results - ]) - + feedback_summary = "\n".join( + [ + f"{result.criterion}: {result.feedback} (Score: {result.score}/10)" + for result in evaluation_results + ] + ) + # Create structured message for refinement - message = "Refine the content based on the evaluation feedback" + message = ( + "Refine the content based on the evaluation feedback" + ) background = f"Original content: {original_content}\n\nEvaluation feedback:\n{feedback_summary}" - - structured_msg = self.send_structured_message( + + self.send_structured_message( sender=self.supervisor_name, - recipient=refiner.agent_name if hasattr(refiner, 'agent_name') else "Refiner", + recipient=( + refiner.agent_name + if hasattr(refiner, "agent_name") + else "Refiner" + ), message=message, background=background, - intermediate_output=original_content + intermediate_output=original_content, ) - + try: - if hasattr(refiner, 'run'): + if hasattr(refiner, "run"): refinement_prompt = f""" Original content: {original_content} @@ -1572,89 +1786,121 @@ Please refine the content to address the feedback while maintaining its core str except Exception as e: logger.error(f"Error in content refinement: {e}") return original_content - + def run(self, task: str, img: str = None, *args, **kwargs): """ Run the complete HierarchicalStructuredComm workflow - + Args: task: Task to execute img: Optional image input - + Returns: Final result """ # Enhanced workflow start display - console.print(Panel( - f"[bold yellow]Starting Hierarchical Structured Communication Workflow[/bold yellow]\n" - f"[cyan]Task:[/cyan] {task[:100]}{'...' if len(task) > 100 else ''}\n" - f"[cyan]Max Loops:[/cyan] {self.max_loops}", - title="Workflow Execution", - border_style="yellow" - )) - logger.info(f"Running HierarchicalStructuredComm workflow for task: {task[:100]}...") - + console.print( + Panel( + f"[bold yellow]Starting Hierarchical Structured Communication Workflow[/bold yellow]\n" + f"[cyan]Task:[/cyan] {task[:100]}{'...' if len(task) > 100 else ''}\n" + f"[cyan]Max Loops:[/cyan] {self.max_loops}", + title="Workflow Execution", + border_style="yellow", + ) + ) + logger.info( + f"Running HierarchicalStructuredComm workflow for task: {task[:100]}..." + ) + current_result = None total_loops = 0 - + # Rich progress tracking with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), - console=console + console=console, ) as progress: - task_progress = progress.add_task("Processing workflow...", total=self.max_loops) - + task_progress = progress.add_task( + "Processing workflow...", total=self.max_loops + ) + for loop in range(self.max_loops): total_loops = loop + 1 - progress.update(task_progress, description=f"Loop {total_loops}/{self.max_loops}") - logger.info(f"HierarchicalStructuredComm loop {total_loops}/{self.max_loops}") - + progress.update( + task_progress, + description=f"Loop {total_loops}/{self.max_loops}", + ) + logger.info( + f"HierarchicalStructuredComm loop {total_loops}/{self.max_loops}" + ) + # Execute step step_result = self.step(task, img, *args, **kwargs) - + if "error" in step_result: - console.print(f"[bold red]Error in loop {total_loops}: {step_result['error']}[/bold red]") - logger.error(f"Error in loop {total_loops}: {step_result['error']}") + console.print( + f"[bold red]Error in loop {total_loops}: {step_result['error']}[/bold red]" + ) + logger.error( + f"Error in loop {total_loops}: {step_result['error']}" + ) break - + current_result = step_result["refined_result"] - + # Check if we should continue refining if loop < self.max_loops - 1: # Simple continuation logic - could be enhanced - evaluation_scores = [result.score for result in step_result["evaluation_results"]] - avg_score = sum(evaluation_scores) / len(evaluation_scores) if evaluation_scores else 0 - + evaluation_scores = [ + result.score + for result in step_result[ + "evaluation_results" + ] + ] + avg_score = ( + sum(evaluation_scores) + / len(evaluation_scores) + if evaluation_scores + else 0 + ) + if avg_score >= 8.0: # High quality threshold - console.print(f"[bold green]High quality achieved (avg score: {avg_score:.2f}), stopping refinement[/bold green]") - logger.info(f"High quality achieved (avg score: {avg_score:.2f}), stopping refinement") + console.print( + f"[bold green]High quality achieved (avg score: {avg_score:.2f}), stopping refinement[/bold green]" + ) + logger.info( + f"High quality achieved (avg score: {avg_score:.2f}), stopping refinement" + ) break - + progress.advance(task_progress) - + # Enhanced completion display - console.print(Panel( - f"[bold green]Workflow Completed Successfully![/bold green]\n" - f"[cyan]Total Loops:[/cyan] {total_loops}\n" - f"[cyan]Conversation History:[/cyan] {len(self.conversation_history)} messages\n" - f"[cyan]Evaluation Results:[/cyan] {len(self.evaluation_results)} evaluations", - title="Workflow Summary", - border_style="green" - )) - + console.print( + Panel( + f"[bold green]Workflow Completed Successfully![/bold green]\n" + f"[cyan]Total Loops:[/cyan] {total_loops}\n" + f"[cyan]Conversation History:[/cyan] {len(self.conversation_history)} messages\n" + f"[cyan]Evaluation Results:[/cyan] {len(self.evaluation_results)} evaluations", + title="Workflow Summary", + border_style="green", + ) + ) + return { "final_result": current_result, "total_loops": total_loops, "conversation_history": self.conversation_history, "evaluation_results": self.evaluation_results, - "intermediate_outputs": self.intermediate_outputs + "intermediate_outputs": self.intermediate_outputs, } - + def __str__(self): return f"HierarchicalStructuredCommunicationFramework(name={self.name}, generators={len(self.generators)}, evaluators={len(self.evaluators)}, refiners={len(self.refiners)})" - + def __repr__(self): return self.__str__() + # Nothing to see here yet. diff --git a/swarms/structs/swarm_matcher.py b/swarms/structs/swarm_matcher.py index ce0e9e81..65228901 100644 --- a/swarms/structs/swarm_matcher.py +++ b/swarms/structs/swarm_matcher.py @@ -621,10 +621,6 @@ class SwarmMatcher: name="ConsensusSwarm", description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions, consensus building", ), - SwarmType( - name="DeepResearchSwarm", - description="Conduct in-depth research and analysis by coordinating multiple agents to explore, synthesize, and validate information from various sources. Keywords: research methodology, information synthesis, data validation, comprehensive analysis, knowledge discovery, systematic investigation", - ), SwarmType( name="CouncilAsAJudge", description="Evaluate and judge solutions or decisions through a council of expert agents acting as arbitrators. Keywords: evaluation, judgment, arbitration, expert assessment, quality control, decision validation, peer review, consensus building", diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index cae43b9c..f53a888f 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -10,10 +10,9 @@ from swarms.prompts.multi_agent_collab_prompt import ( MULTI_AGENT_COLLAB_PROMPT_TWO, ) from swarms.structs.agent import Agent +from swarms.structs.agent_rearrange import AgentRearrange from swarms.structs.concurrent_workflow import ConcurrentWorkflow from swarms.structs.council_as_judge import CouncilAsAJudge -from swarms.structs.csv_to_agent import AgentLoader -from swarms.structs.deep_research_swarm import DeepResearchSwarm from swarms.structs.groupchat import GroupChat from swarms.structs.heavy_swarm import HeavySwarm from swarms.structs.hiearchical_swarm import HierarchicalSwarm @@ -23,7 +22,6 @@ from swarms.structs.majority_voting import MajorityVoting from swarms.structs.malt import MALT from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.multi_agent_router import MultiAgentRouter -from swarms.structs.agent_rearrange import AgentRearrange from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.swarm_matcher import swarm_matcher from swarms.telemetry.log_executions import log_execution @@ -45,7 +43,6 @@ SwarmType = Literal[ "auto", "MajorityVoting", "MALT", - "DeepResearchSwarm", "CouncilAsAJudge", "InteractiveGroupChat", "HeavySwarm", @@ -288,12 +285,6 @@ class SwarmRouter: self.setup() - # Load agents from CSV - if self.load_agents_from_csv: - self.agents = AgentLoader( - csv_path=self.csv_file_path - ).load_agents() - if self.telemetry_enabled: self.agent_config = self.agent_config() @@ -387,7 +378,6 @@ class SwarmRouter: "MALT": self._create_malt, "CouncilAsAJudge": self._create_council_as_judge, "InteractiveGroupChat": self._create_interactive_group_chat, - "DeepResearchSwarm": self._create_deep_research_swarm, "HiearchicalSwarm": self._create_hierarchical_swarm, "MixtureOfAgents": self._create_mixture_of_agents, "MajorityVoting": self._create_majority_voting, @@ -455,16 +445,6 @@ class SwarmRouter: speaker_function=self.speaker_function, ) - def _create_deep_research_swarm(self, *args, **kwargs): - """Factory function for DeepResearchSwarm.""" - return DeepResearchSwarm( - name=self.name, - description=self.description, - agents=self.agents, - max_loops=self.max_loops, - output_type=self.output_type, - ) - def _create_hierarchical_swarm(self, *args, **kwargs): """Factory function for HierarchicalSwarm.""" return HierarchicalSwarm( diff --git a/swarms/utils/__init__.py b/swarms/utils/__init__.py index 4d41cd2c..1242f767 100644 --- a/swarms/utils/__init__.py +++ b/swarms/utils/__init__.py @@ -21,9 +21,7 @@ from swarms.utils.history_output_formatter import ( history_output_formatter, ) -from swarms.utils.agent_loader import ( - AgentLoader, - MarkdownAgentConfig, +from swarms.utils.agent_loader_markdown import ( load_agent_from_markdown, load_agents_from_markdown, ) @@ -51,8 +49,6 @@ __all__ = [ "HistoryOutputType", "history_output_formatter", "check_all_model_max_tokens", - "AgentLoader", - "MarkdownAgentConfig", "load_agent_from_markdown", "load_agents_from_markdown", "dynamic_auto_chunking", diff --git a/swarms/utils/agent_loader.py b/swarms/utils/agent_loader_markdown.py similarity index 82% rename from swarms/utils/agent_loader.py rename to swarms/utils/agent_loader_markdown.py index 415daac0..cea69b45 100644 --- a/swarms/utils/agent_loader.py +++ b/swarms/utils/agent_loader_markdown.py @@ -1,14 +1,15 @@ import os -import yaml -from pathlib import Path -from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING from concurrent.futures import ( ThreadPoolExecutor, - as_completed, TimeoutError, + as_completed, ) -from pydantic import BaseModel, Field, field_validator +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + +import yaml from loguru import logger +from pydantic import BaseModel, Field, field_validator # Type checking imports to avoid circular dependency if TYPE_CHECKING: @@ -407,14 +408,39 @@ class AgentLoader: # Convenience functions def load_agent_from_markdown(file_path: str, **kwargs) -> "Agent": """ - Load a single agent from a markdown file with Claude Code YAML frontmatter format. + Load a single agent from a markdown file using the Claude Code YAML frontmatter format. + + This function provides a simple interface for loading an agent configuration + from a markdown file. It supports all configuration overrides accepted by the + underlying `AgentLoader` and agent class. Args: - file_path: Path to markdown file with YAML frontmatter - **kwargs: Additional configuration overrides + file_path (str): Path to the markdown file containing YAML frontmatter + with agent configuration. + **kwargs: Optional keyword arguments to override agent configuration + parameters. Common options include: + - max_loops (int): Maximum number of reasoning loops. + - autosave (bool): Enable automatic state saving. + - dashboard (bool): Enable dashboard monitoring. + - verbose (bool): Enable verbose logging. + - dynamic_temperature_enabled (bool): Enable dynamic temperature. + - saved_state_path (str): Path for saving agent state. + - user_name (str): User identifier. + - retry_attempts (int): Number of retry attempts. + - context_length (int): Maximum context length. + - return_step_meta (bool): Return step metadata. + - output_type (str): Output format type. + - auto_generate_prompt (bool): Auto-generate prompts. + - artifacts_on (bool): Enable artifacts. + - streaming_on (bool): Enable streaming output. + - mcp_url (str): MCP server URL if needed. Returns: - Configured Agent instance + Agent: Configured Agent instance loaded from the markdown file. + + Example: + >>> agent = load_agent_from_markdown("finance_advisor.md", max_loops=3, verbose=True) + >>> response = agent.run("What is the best investment strategy for 2024?") """ # Lazy import to avoid circular dependency @@ -429,16 +455,36 @@ def load_agents_from_markdown( **kwargs, ) -> List["Agent"]: """ - Load multiple agents from markdown files with Claude Code YAML frontmatter format. + Load multiple agents from markdown files using the Claude Code YAML frontmatter format. + + This function supports loading agents from a list of markdown files or from all + markdown files in a directory. It can process files concurrently for faster loading, + and allows configuration overrides for all loaded agents. Args: - file_paths: Directory path or list of file paths with YAML frontmatter - concurrent: Whether to use concurrent processing for multiple files - max_file_size_mb: Maximum file size in MB to prevent memory issues - **kwargs: Additional configuration overrides + file_paths (Union[str, List[str]]): Either a directory path containing markdown + files or a list of markdown file paths to load. + concurrent (bool, optional): If True, enables concurrent processing for faster + loading of multiple files. Defaults to True. + max_file_size_mb (float, optional): Maximum file size (in MB) for each markdown + file to prevent memory issues. Files exceeding this size will be skipped. + Defaults to 10.0. + **kwargs: Optional keyword arguments to override agent configuration + parameters for all loaded agents. See `load_agent_from_markdown` for + available options. Returns: - List of configured Agent instances + List[Agent]: List of configured Agent instances loaded from the markdown files. + + Example: + >>> agents = load_agents_from_markdown( + ... ["agent1.md", "agent2.md"], + ... concurrent=True, + ... max_loops=2, + ... verbose=True + ... ) + >>> for agent in agents: + ... print(agent.name) """ # Lazy import to avoid circular dependency