Merge branch 'master' into Fix/stream-issues

pull/938/head
harshalmore31 4 months ago committed by GitHub
commit f601626d89
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,221 +1,40 @@
# Contributing to Swarms: Building the Infrastructure for The Agentic Economy
# Contribute to Swarms
Multi-agent collaboration is the most important technology in human history. It will reshape civilization by enabling billions of autonomous agents to coordinate and solve problems at unprecedented scale.
!!! success "The Foundation of Tomorrow"
**Swarms** is the foundational infrastructure powering this autonomous economy. By contributing, you're building the systems that will enable the next generation of intelligent automation.
### What You're Building
=== "Autonomous Systems"
**Autonomous Resource Allocation**
Global supply chains and energy distribution optimized in real-time
Our mission is to accelerate the transition to a fully autonomous world economy by providing enterprise-grade, production-ready infrastructure that enables seamless deployment and orchestration of millions of autonomous agents. We are creating the operating system for the agent economy, and we need your help to achieve this goal.
=== "Intelligence Networks"
**Distributed Decision Making**
Collaborative intelligence networks across industries and governments
Swarms is built by the community, for the community. We believe that collaborative development is the key to pushing the boundaries of what's possible with multi-agent AI. Your contributions are not only welcome—they are essential to our mission. [Learn more about why you should contribute to Swarms](https://docs.swarms.world/en/latest/contributors/main/)
=== "Smart Markets"
**Self-Organizing Markets**
Agent-driven marketplaces that automatically balance supply and demand
### Why Contribute?
=== "Problem Solving"
**Collaborative Problem Solving**
Massive agent swarms tackling climate change, disease, and scientific discovery
=== "Infrastructure"
**Adaptive Infrastructure**
Self-healing systems that evolve without human intervention
---
## Why Contribute to Swarms?
### :material-rocket-launch: Shape the Future of Civilization
!!! abstract "Your Impact"
- Define standards for multi-agent communication protocols
- Build architectural patterns for distributed intelligence systems
- Create frameworks for deploying agent swarms in production
- Establish ethical guidelines for autonomous agent collaboration
By joining us, you have the opportunity to:
### :material-trophy: Recognition and Professional Development
* **Work on the Frontier of Agents:** Shape the future of autonomous agent technology and help build a production-grade, open-source framework.
!!! tip "Immediate Recognition"
- **Social Media Features** - All merged PRs showcased publicly
- **Bounty Programs** - Financial rewards for high-impact contributions
- **Fast-Track Hiring** - Priority consideration for core team positions
- **Community Spotlights** - Regular recognition and acknowledgments
!!! info "Career Benefits"
- Multi-agent expertise highly valued by AI industry
- Portfolio demonstrates cutting-edge technical skills
- Direct networking with leading researchers and companies
- Thought leadership opportunities in emerging field
### :material-brain: Technical Expertise Development
Master cutting-edge technologies:
| Technology Area | Skills You'll Develop |
|----------------|----------------------|
| **Swarm Intelligence** | Design sophisticated agent coordination mechanisms |
| **Distributed Computing** | Build scalable architectures for thousands of agents |
| **Communication Protocols** | Create novel interaction patterns |
| **Production AI** | Deploy and orchestrate enterprise-scale systems |
| **Research Implementation** | Turn cutting-edge papers into working code |
* **Join a Vibrant Community:** Collaborate with a passionate and growing group of agent developers, researchers, and agent enthusasits.
### :material-account-group: Research Community Access
* **Make a Tangible Impact:** Whether you're fixing a bug, adding a new feature, or improving documentation, your work will be used in real-world applications.
!!! note "Collaborative Environment"
- Work with experts from academic institutions and industry
- Regular technical seminars and research discussions
- Structured mentorship from experienced contributors
- Applied research opportunities with real-world impact
* **Learn and Grow:** Gain hands-on experience with advanced AI concepts and strengthen your software engineering skills.
---
Discover more about our mission and the benefits of becoming a contributor in our official [**Contributor's Guide**](https://docs.swarms.world/en/latest/contributors/main/).
## Contribution Opportunities
=== "New Contributors"
### :material-school: Perfect for Getting Started
- **Documentation** - Improve guides, tutorials, and API references
- **Bug Reports** - Identify and document issues
- **Code Quality** - Participate in testing and review processes
- **Community Support** - Help users in forums and discussions
=== "Experienced Developers"
### :material-code-braces: Advanced Technical Work
- **Core Architecture** - Design fundamental system components
- **Performance Optimization** - Enhance coordination and communication efficiency
- **Research Implementation** - Turn cutting-edge papers into working code
- **Integration Development** - Build connections with AI tools and platforms
### How to Get Started
=== "Researchers"
### :material-flask: Research and Innovation
- **Algorithm Development** - Implement novel multi-agent algorithms
- **Experimental Frameworks** - Create evaluation and benchmarking tools
- **Theoretical Contributions** - Develop research documentation and frameworks
- **Academic Collaboration** - Partner on funded research projects
We've made it easy to start contributing. Here's how you can help:
---
## How to Contribute
1. **Find an Issue to Tackle:** The best way to begin is by visiting our [**contributing project board**](https://github.com/users/kyegomez/projects/1). Look for issues tagged with `good first issue`—these are specifically selected for new contributors.
### Step 1: Get Started
2. **Report a Bug or Request a Feature:** Have a new idea or found something that isn't working right? We'd love to hear from you. Please [**file a Bug Report or Feature Request**](https://github.com/kyegomez/swarms/issues) on our GitHub Issues page.
!!! info "Essential Resources"
[:material-book-open-page-variant: **Documentation**](https://docs.swarms.world/en/latest/){ .md-button .md-button--primary }
[:material-github: **GitHub Repository**](https://github.com/kyegomez/swarms){ .md-button }
[:material-chat: **Community Channels**](#){ .md-button }
3. **Understand Our Workflow and Standards:** Before submitting your work, please review our complete [**Contribution Guidelines**](https://github.com/kyegomez/swarms/blob/master/CONTRIBUTING.md). To help maintain code quality, we also encourage you to read our guide on [**Code Cleanliness**](https://docs.swarms.world/en/latest/swarms/framework/code_cleanliness/).
### Step 2: Find Your Path
4. **Join the Discussion:** To participate in roadmap discussions and connect with other developers, join our community on [**Discord**](https://discord.gg/EamjgSaEQf).
```mermaid
graph TD
A[Choose Your Path] --> B[Browse Issues]
A --> C[Review Roadmap]
A --> D[Propose Ideas]
B --> E[good first issue]
B --> F[help wanted]
C --> G[Core Features]
C --> H[Research Areas]
D --> I[Discussion Forums]
```
### Step 3: Make Impact
### ✨ Our Valued Contributors
1. **Fork & Setup** - Configure your development environment
2. **Develop** - Create your contribution
3. **Submit** - Open a pull request
4. **Collaborate** - Work with maintainers
5. **Celebrate** - See your work recognized
Thank you for contributing to swarms. Your work is extremely appreciated and recognized.
---
## Recognition Framework
### :material-flash: Immediate Benefits
!!! success "Instant Recognition"
| Benefit | Description |
|---------|-------------|
| **Social Media Features** | Every merged PR showcased publicly |
| **Community Recognition** | Contributor badges and documentation credits |
| **Professional References** | Formal acknowledgment for portfolios |
| **Direct Mentorship** | Access to core team guidance |
### :material-trending-up: Long-term Opportunities
!!! tip "Career Growth"
- **Team Positions** - Fast-track consideration for core team roles
- **Conference Speaking** - Present work at AI conferences and events
- **Industry Connections** - Network with leading AI organizations
- **Research Collaboration** - Partner with academic institutions
---
## Societal Impact
!!! abstract "Building Solutions for Humanity"
Swarms enables technology that addresses critical challenges:
=== "Research"
**Scientific Research**
Accelerate collaborative research and discovery across disciplines
=== "Healthcare"
**Healthcare Innovation**
Support drug discovery and personalized medicine development
=== "Environment"
**Environmental Solutions**
Monitor climate and optimize sustainability initiatives
=== "Education"
**Educational Technology**
Create adaptive learning systems for personalized education
=== "Economy"
**Economic Innovation**
Generate new opportunities and efficiency improvements
---
## Get Involved
### :material-link: Connect With Us
!!! info "Join the Community"
[:material-github: **GitHub Repository**](https://github.com/kyegomez/swarms){ .md-button .md-button--primary }
[:material-book: **Documentation**](https://docs.swarms.world/en/latest/){ .md-button }
[:material-forum: **Community Forums**](#){ .md-button }
---
!!! warning "The Future is Now"
Multi-agent collaboration will define the next century of human progress. The autonomous economy depends on the infrastructure we build today.
!!! success "Your Mission"
Your contribution to Swarms helps create the foundation for billions of autonomous agents working together to solve humanity's greatest challenges.
**Join us in building the most important technology of our time.**
---
<div class="result" markdown>
*Built with :material-heart: by the global Swarms community*
</div>
<a href="https://github.com/kyegomez/swarms/graphs/contributors">
<img src="https://contrib.rocks/image?repo=kyegomez/swarms" />
</a>

@ -1,186 +0,0 @@
# Deep Research Swarm
!!! abstract "Overview"
The Deep Research Swarm is a powerful, production-grade research system that conducts comprehensive analysis across multiple domains using parallel processing and advanced AI agents.
Key Features:
- Parallel search processing
- Multi-agent research coordination
- Advanced information synthesis
- Automated query generation
- Concurrent task execution
## Getting Started
!!! tip "Quick Installation"
```bash
pip install swarms
```
=== "Basic Usage"
```python
from swarms.structs import DeepResearchSwarm
# Initialize the swarm
swarm = DeepResearchSwarm(
name="MyResearchSwarm",
output_type="json",
max_loops=1
)
# Run a single research task
results = swarm.run("What are the latest developments in quantum computing?")
```
=== "Batch Processing"
```python
# Run multiple research tasks in parallel
tasks = [
"What are the environmental impacts of electric vehicles?",
"How is AI being used in drug discovery?",
]
batch_results = swarm.batched_run(tasks)
```
## Configuration
!!! info "Constructor Arguments"
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `name` | str | "DeepResearchSwarm" | Name identifier for the swarm |
| `description` | str | "A swarm that conducts..." | Description of the swarm's purpose |
| `research_agent` | Agent | research_agent | Custom research agent instance |
| `max_loops` | int | 1 | Maximum number of research iterations |
| `nice_print` | bool | True | Enable formatted console output |
| `output_type` | str | "json" | Output format ("json" or "string") |
| `max_workers` | int | CPU_COUNT * 2 | Maximum concurrent threads |
| `token_count` | bool | False | Enable token counting |
| `research_model_name` | str | "gpt-4o-mini" | Model to use for research |
## Core Methods
### Run
!!! example "Single Task Execution"
```python
results = swarm.run("What are the latest breakthroughs in fusion energy?")
```
### Batched Run
!!! example "Parallel Task Execution"
```python
tasks = [
"What are current AI safety initiatives?",
"How is CRISPR being used in agriculture?",
]
results = swarm.batched_run(tasks)
```
### Step
!!! example "Single Step Execution"
```python
results = swarm.step("Analyze recent developments in renewable energy storage")
```
## Domain-Specific Examples
=== "Scientific Research"
```python
science_swarm = DeepResearchSwarm(
name="ScienceSwarm",
output_type="json",
max_loops=2 # More iterations for thorough research
)
results = science_swarm.run(
"What are the latest experimental results in quantum entanglement?"
)
```
=== "Market Research"
```python
market_swarm = DeepResearchSwarm(
name="MarketSwarm",
output_type="json"
)
results = market_swarm.run(
"What are the emerging trends in electric vehicle battery technology market?"
)
```
=== "News Analysis"
```python
news_swarm = DeepResearchSwarm(
name="NewsSwarm",
output_type="string" # Human-readable output
)
results = news_swarm.run(
"What are the global economic impacts of recent geopolitical events?"
)
```
=== "Medical Research"
```python
medical_swarm = DeepResearchSwarm(
name="MedicalSwarm",
max_loops=2
)
results = medical_swarm.run(
"What are the latest clinical trials for Alzheimer's treatment?"
)
```
## Advanced Features
??? note "Custom Research Agent"
```python
from swarms import Agent
custom_agent = Agent(
agent_name="SpecializedResearcher",
system_prompt="Your specialized prompt here",
model_name="gpt-4"
)
swarm = DeepResearchSwarm(
research_agent=custom_agent,
max_loops=2
)
```
??? note "Parallel Processing Control"
```python
swarm = DeepResearchSwarm(
max_workers=8, # Limit to 8 concurrent threads
nice_print=False # Disable console output for production
)
```
## Best Practices
!!! success "Recommended Practices"
1. **Query Formulation**: Be specific and clear in your research queries
2. **Resource Management**: Adjust `max_workers` based on your system's capabilities
3. **Output Handling**: Use appropriate `output_type` for your use case
4. **Error Handling**: Implement try-catch blocks around swarm operations
5. **Model Selection**: Choose appropriate models based on research complexity
## Limitations
!!! warning "Known Limitations"
- Requires valid API keys for external services
- Performance depends on system resources
- Rate limits may apply to external API calls
- Token limits apply to model responses

@ -15,7 +15,6 @@ The `SwarmRouter` class is a flexible routing system designed to manage differen
| `HiearchicalSwarm` | Hierarchical organization of agents |
| `MajorityVoting` | Uses majority voting for decision making |
| `MALT` | Multi-Agent Language Tasks |
| `DeepResearchSwarm` | Specialized for deep research tasks |
| `CouncilAsAJudge` | Council-based judgment system |
| `InteractiveGroupChat` | Interactive group chat with user participation |
| `auto` | Automatically selects best swarm type via embedding search |

@ -18,7 +18,7 @@ The AgentLoader enables you to:
The AgentLoader is included with the Swarms framework:
```python
from swarms.utils import AgentLoader, load_agent_from_markdown, load_agents_from_markdown
from swarms import AgentLoader, load_agent_from_markdown, load_agents_from_markdown
```
## Markdown Format
@ -99,7 +99,7 @@ result = workflow.run(task)
For more advanced usage, use the `AgentLoader` class directly:
```python
from swarms.utils import AgentLoader
from swarms import AgentLoader
# Initialize loader
loader = AgentLoader()
@ -209,7 +209,7 @@ response = agent.run(
The AgentLoader provides comprehensive error handling:
```python
from swarms.utils import AgentLoader
from swarms import AgentLoader
loader = AgentLoader()

@ -233,7 +233,6 @@ Available swarm types for different execution patterns.
| `Auto` | Automatically selects the best swarm type |
| `MajorityVoting` | Agents vote on decisions |
| `Malt` | Multi-Agent Language Tasks |
| `DeepResearchSwarm` | Specialized for deep research tasks |
## Detailed Examples

@ -4,40 +4,11 @@ from swarms import Agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
model_name="claude-sonnet-4-20250514",
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_loops=1,
dynamic_context_window=True,
streaming_on=True,
)
out = agent.run(

@ -1,10 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
model = DeepResearchSwarm(
research_model_name="groq/deepseek-r1-distill-qwen-32b"
)
model.run(
"What are the latest research papers on extending telomeres in humans? Give 1 queries for the search not too many`"
)

@ -10,23 +10,8 @@ To run this example:
2. Run: python examples/multi_agent/board_of_directors/board_of_directors_example.py
"""
import os
import sys
from typing import List
# Add the root directory to the Python path if running from examples directory
current_dir = os.path.dirname(os.path.abspath(__file__))
if "examples" in current_dir:
root_dir = current_dir
while os.path.basename(
root_dir
) != "examples" and root_dir != os.path.dirname(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.basename(root_dir) == "examples":
root_dir = os.path.dirname(root_dir)
if root_dir not in sys.path:
sys.path.insert(0, root_dir)
from swarms.structs.board_of_directors_swarm import (
BoardOfDirectorsSwarm,
BoardMember,
@ -37,7 +22,6 @@ from swarms.structs.agent import Agent
def create_board_members() -> List[BoardMember]:
"""Create board members with specific roles."""
chairman = Agent(
agent_name="Chairman",
agent_description="Executive Chairman with strategic vision",
@ -86,7 +70,6 @@ def create_board_members() -> List[BoardMember]:
def create_worker_agents() -> List[Agent]:
"""Create worker agents for the swarm."""
researcher = Agent(
agent_name="Researcher",
agent_description="Research analyst for data analysis",
@ -114,9 +97,8 @@ def create_worker_agents() -> List[Agent]:
return [researcher, developer, marketer]
def run_board_example() -> None:
def run_board_example() -> str:
"""Run a Board of Directors example."""
# Create board members and worker agents
board_members = create_board_members()
worker_agents = create_worker_agents()
@ -127,7 +109,7 @@ def run_board_example() -> None:
board_members=board_members,
agents=worker_agents,
max_loops=2,
verbose=True,
verbose=False,
decision_threshold=0.6,
)
@ -137,66 +119,17 @@ def run_board_example() -> None:
Include market research, technical planning, marketing strategy, and financial projections.
"""
# Execute the task
result = board_swarm.run(task=task)
print("Task completed successfully!")
print(f"Result: {result}")
def run_simple_example() -> None:
"""Run a simple Board of Directors example."""
# Create simple agents
analyst = Agent(
agent_name="Analyst",
agent_description="Data analyst",
model_name="gpt-4o-mini",
max_loops=1,
)
writer = Agent(
agent_name="Writer",
agent_description="Content writer",
model_name="gpt-4o-mini",
max_loops=1,
)
# Create swarm with default settings
board_swarm = BoardOfDirectorsSwarm(
name="Simple_Board",
agents=[analyst, writer],
verbose=True,
)
# Execute simple task
task = (
"Analyze current market trends and create a summary report."
)
result = board_swarm.run(task=task)
print("Simple example completed!")
print(f"Result: {result}")
# Execute the task and return result
return board_swarm.run(task=task)
def main() -> None:
"""Main function to run the examples."""
if not os.getenv("OPENAI_API_KEY"):
print(
"Warning: OPENAI_API_KEY not set. Example may not work."
)
return
try:
print("Running simple Board of Directors example...")
run_simple_example()
print("\nRunning comprehensive Board of Directors example...")
run_board_example()
except Exception as e:
print(f"Error: {e}")
result = run_board_example()
return result
except Exception:
pass
if __name__ == "__main__":

@ -0,0 +1,51 @@
"""
Minimal Board of Directors Example
This example demonstrates the most basic Board of Directors swarm setup
with minimal configuration and agents.
To run this example:
1. Make sure you're in the root directory of the swarms project
2. Run: python examples/multi_agent/board_of_directors/minimal_board_example.py
"""
from swarms.structs.board_of_directors_swarm import (
BoardOfDirectorsSwarm,
)
from swarms.structs.agent import Agent
def run_minimal_example() -> str:
"""Run a minimal Board of Directors example."""
# Create a single agent
agent = Agent(
agent_name="General_Agent",
agent_description="General purpose agent",
model_name="gpt-4o-mini",
max_loops=1,
)
# Create minimal swarm
board_swarm = BoardOfDirectorsSwarm(
name="Minimal_Board",
agents=[agent],
verbose=False,
)
# Execute minimal task
task = "Provide a brief overview of artificial intelligence."
return board_swarm.run(task=task)
def main() -> None:
"""Main function to run the minimal example."""
try:
result = run_minimal_example()
return result
except Exception:
pass
if __name__ == "__main__":
main()

@ -0,0 +1,35 @@
from swarms.structs.board_of_directors_swarm import (
BoardOfDirectorsSwarm,
)
from swarms.structs.agent import Agent
# Create simple agents for basic tasks
analyst = Agent(
agent_name="Analyst",
agent_description="Data analyst",
model_name="gpt-4o-mini",
max_loops=1,
)
writer = Agent(
agent_name="Writer",
agent_description="Content writer",
model_name="gpt-4o-mini",
max_loops=1,
)
agents = [analyst, writer]
# Create swarm with default settings
board_swarm = BoardOfDirectorsSwarm(
name="Simple_Board",
agents=agents,
verbose=False,
)
# Execute simple task
task = "Analyze current market trends and create a summary report."
result = board_swarm.run(task=task)
print(result)

@ -1,12 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
swarm = DeepResearchSwarm(
name="Deep Research Swarm",
description="A swarm that conducts comprehensive research across multiple domains",
max_loops=1,
)
swarm.run(
"What are the biggest gas and oil companies in russia? Only provide 3 queries"
)

@ -1,13 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
def main():
swarm = DeepResearchSwarm(
name="Deep Research Swarm",
description="A swarm of agents that can perform deep research on a given topic",
)
swarm.run("What are the latest news in the AI an crypto space")
main()

@ -1,23 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
def main():
swarm = DeepResearchSwarm(
name="Deep Research Swarm",
description="A swarm of agents that can perform deep research on a given topic",
output_type="string", # Change to string output type for better readability
)
# Format the query as a proper question
query = "What are the latest developments and news in the AI and cryptocurrency space?"
try:
result = swarm.run(query)
print("\nResearch Results:")
print(result)
except Exception as e:
print(f"Error occurred: {str(e)}")
if __name__ == "__main__":
main()

@ -1,13 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
swarm = DeepResearchSwarm(
name="Deep Research Swarm",
description="A swarm of agents that can perform deep research on a given topic",
output_type="all", # Change to string output type for better readability
)
out = swarm.run(
"What are the latest developments and news in the AI and cryptocurrency space?"
)
print(out)

@ -9,10 +9,11 @@ All components are now in one file: hierarchical_structured_communication_framew
import os
import sys
from typing import Dict, Any
# Add the project root to the Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
sys.path.insert(0, project_root)
from dotenv import load_dotenv
@ -20,11 +21,6 @@ from dotenv import load_dotenv
# Import everything from the single file
from swarms.structs.hierarchical_structured_communication_framework import (
HierarchicalStructuredCommunicationFramework,
HierarchicalStructuredCommunicationGenerator,
HierarchicalStructuredCommunicationEvaluator,
HierarchicalStructuredCommunicationRefiner,
HierarchicalStructuredCommunicationSupervisor,
# Convenience aliases
TalkHierarchicalGenerator,
TalkHierarchicalEvaluator,
TalkHierarchicalRefiner,
@ -42,29 +38,29 @@ def example_basic_usage():
print("=" * 80)
print("BASIC USAGE EXAMPLE")
print("=" * 80)
# Create framework with default configuration
framework = HierarchicalStructuredCommunicationFramework(
name="BasicFramework",
max_loops=2,
verbose=True
name="BasicFramework", max_loops=2, verbose=True
)
# Run a simple task
task = "Explain the benefits of structured communication in multi-agent systems"
print(f"Task: {task}")
print("Running framework...")
result = framework.run(task)
print("\n" + "=" * 50)
print("FINAL RESULT")
print("=" * 50)
print(result["final_result"])
print(f"\nTotal loops: {result['total_loops']}")
print(f"Conversation history entries: {len(result['conversation_history'])}")
print(
f"Conversation history entries: {len(result['conversation_history'])}"
)
print(f"Evaluation results: {len(result['evaluation_results'])}")
@ -75,40 +71,40 @@ def example_custom_agents():
print("\n" + "=" * 80)
print("CUSTOM AGENTS EXAMPLE")
print("=" * 80)
# Create custom agents using the convenience aliases
generator = TalkHierarchicalGenerator(
agent_name="ContentCreator",
model_name="gpt-4o-mini",
verbose=True
verbose=True,
)
evaluator1 = TalkHierarchicalEvaluator(
agent_name="AccuracyChecker",
evaluation_criteria=["accuracy", "technical_correctness"],
model_name="gpt-4o-mini",
verbose=True
verbose=True,
)
evaluator2 = TalkHierarchicalEvaluator(
agent_name="ClarityChecker",
evaluation_criteria=["clarity", "readability", "coherence"],
model_name="gpt-4o-mini",
verbose=True
verbose=True,
)
refiner = TalkHierarchicalRefiner(
agent_name="ContentImprover",
model_name="gpt-4o-mini",
verbose=True
verbose=True,
)
supervisor = TalkHierarchicalSupervisor(
agent_name="WorkflowManager",
model_name="gpt-4o-mini",
verbose=True
verbose=True,
)
# Create framework with custom agents
framework = HierarchicalStructuredCommunicationFramework(
name="CustomFramework",
@ -117,24 +113,26 @@ def example_custom_agents():
evaluators=[evaluator1, evaluator2],
refiners=[refiner],
max_loops=3,
verbose=True
verbose=True,
)
# Run a complex task
task = "Design a comprehensive machine learning pipeline for sentiment analysis"
print(f"Task: {task}")
print("Running framework with custom agents...")
result = framework.run(task)
print("\n" + "=" * 50)
print("FINAL RESULT")
print("=" * 50)
print(result["final_result"])
print(f"\nTotal loops: {result['total_loops']}")
print(f"Conversation history entries: {len(result['conversation_history'])}")
print(
f"Conversation history entries: {len(result['conversation_history'])}"
)
print(f"Evaluation results: {len(result['evaluation_results'])}")
@ -145,7 +143,7 @@ def example_ollama_integration():
print("\n" + "=" * 80)
print("OLLAMA INTEGRATION EXAMPLE")
print("=" * 80)
# Create framework with Ollama configuration
framework = HierarchicalStructuredCommunicationFramework(
name="OllamaFramework",
@ -154,27 +152,31 @@ def example_ollama_integration():
model_name="llama3:latest",
use_ollama=True,
ollama_base_url="http://localhost:11434/v1",
ollama_api_key="ollama"
ollama_api_key="ollama",
)
# Run a task with local model
task = "Explain the concept of structured communication protocols"
print(f"Task: {task}")
print("Running framework with Ollama...")
try:
result = framework.run(task)
print("\n" + "=" * 50)
print("FINAL RESULT")
print("=" * 50)
print(result["final_result"])
print(f"\nTotal loops: {result['total_loops']}")
print(f"Conversation history entries: {len(result['conversation_history'])}")
print(f"Evaluation results: {len(result['evaluation_results'])}")
print(
f"Conversation history entries: {len(result['conversation_history'])}"
)
print(
f"Evaluation results: {len(result['evaluation_results'])}"
)
except Exception as e:
print(f"Error with Ollama: {e}")
print("Make sure Ollama is running: ollama serve")
@ -187,28 +189,31 @@ def example_structured_communication():
print("\n" + "=" * 80)
print("STRUCTURED COMMUNICATION EXAMPLE")
print("=" * 80)
# Create framework
framework = HierarchicalStructuredCommunicationFramework(
name="CommunicationDemo",
verbose=True
name="CommunicationDemo", verbose=True
)
# Demonstrate structured message sending
print("Sending structured message...")
structured_msg = framework.send_structured_message(
sender="Supervisor",
recipient="Generator",
message="Create a technical documentation outline",
background="For a Python library focused on data processing",
intermediate_output="Previous research on similar libraries"
intermediate_output="Previous research on similar libraries",
)
print(f"Message sent: {structured_msg.message}")
print(f"Background: {structured_msg.background}")
print(f"Intermediate output: {structured_msg.intermediate_output}")
print(f"From: {structured_msg.sender} -> To: {structured_msg.recipient}")
print(
f"Intermediate output: {structured_msg.intermediate_output}"
)
print(
f"From: {structured_msg.sender} -> To: {structured_msg.recipient}"
)
def example_agent_interaction():
@ -218,52 +223,51 @@ def example_agent_interaction():
print("\n" + "=" * 80)
print("AGENT INTERACTION EXAMPLE")
print("=" * 80)
# Create agents
generator = TalkHierarchicalGenerator(
agent_name="ContentGenerator",
verbose=True
agent_name="ContentGenerator", verbose=True
)
evaluator = TalkHierarchicalEvaluator(
agent_name="QualityEvaluator",
evaluation_criteria=["accuracy", "clarity"],
verbose=True
verbose=True,
)
refiner = TalkHierarchicalRefiner(
agent_name="ContentRefiner",
verbose=True
agent_name="ContentRefiner", verbose=True
)
# Generate content
print("1. Generating content...")
gen_result = generator.generate_with_structure(
message="Create a brief explanation of machine learning",
background="For beginners with no technical background",
intermediate_output=""
intermediate_output="",
)
print(f"Generated content: {gen_result.content[:200]}...")
# Evaluate content
print("\n2. Evaluating content...")
eval_result = evaluator.evaluate_with_criterion(
content=gen_result.content,
criterion="clarity"
content=gen_result.content, criterion="clarity"
)
print(f"Evaluation score: {eval_result.score}/10")
print(f"Feedback: {eval_result.feedback[:200]}...")
# Refine content
print("\n3. Refining content...")
refine_result = refiner.refine_with_feedback(
original_content=gen_result.content,
evaluation_results=[eval_result]
evaluation_results=[eval_result],
)
print(
f"Refined content: {refine_result.refined_content[:200]}..."
)
print(f"Refined content: {refine_result.refined_content[:200]}...")
print(f"Changes made: {refine_result.changes_made}")
@ -271,12 +275,16 @@ def main():
"""
Main function to run all examples
"""
print("SINGLE-FILE HIERARCHICAL STRUCTURED COMMUNICATION FRAMEWORK")
print(
"SINGLE-FILE HIERARCHICAL STRUCTURED COMMUNICATION FRAMEWORK"
)
print("=" * 80)
print("This demonstrates the consolidated single-file implementation")
print(
"This demonstrates the consolidated single-file implementation"
)
print("based on the research paper: arXiv:2502.11098")
print("=" * 80)
try:
# Run examples
example_basic_usage()
@ -284,27 +292,30 @@ def main():
example_ollama_integration()
example_structured_communication()
example_agent_interaction()
print("\n" + "=" * 80)
print("ALL EXAMPLES COMPLETED SUCCESSFULLY!")
print("=" * 80)
print("Framework Features Demonstrated:")
print("✓ Single-file implementation")
print("✓ Structured Communication Protocol (M_ij, B_ij, I_ij)")
print(
"✓ Structured Communication Protocol (M_ij, B_ij, I_ij)"
)
print("✓ Hierarchical Evaluation System")
print("✓ Iterative Refinement Process")
print("✓ Flexible Model Configuration (OpenAI/Ollama)")
print("✓ Custom Agent Specialization")
print("✓ Direct Agent Interaction")
print("✓ Convenience Aliases")
except KeyboardInterrupt:
print("\nInterrupted by user")
except Exception as e:
print(f"Error during execution: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
main()

@ -0,0 +1,9 @@
from swarms import load_agents_from_markdown
agents = load_agents_from_markdown(["finance_advisor.md"])
# Use the agent
response = agents[0].run(
"I have $100k to invest. I want to hedge my bets on the energy companies that will benefit from the AI revoltion"
"What are the top 4 stocks to invest in?"
)

@ -1,7 +1,7 @@
---
name: FinanceAdvisor
description: Expert financial advisor for investment and budgeting guidance
model_name: gpt-4o
model_name: claude-sonnet-4-20250514
temperature: 0.7
max_loops: 1
---

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "8.1.1"
version = "8.1.2"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -28,4 +28,3 @@ mcp
numpy
openai
schedule
colorama

@ -20,8 +20,11 @@ from swarms.agents.create_agents_from_yaml import (
)
from swarms.cli.onboarding_process import OnboardingProcess
from swarms.structs.agent import Agent
from swarms.utils.agent_loader import AgentLoader
from swarms.structs.agent_loader import AgentLoader
from swarms.utils.formatter import formatter
from dotenv import load_dotenv
load_dotenv()
# Initialize console with custom styling
console = Console()
@ -45,12 +48,14 @@ COLORS = {
}
ASCII_ART = r"""
_________
/ _____/_ _ _______ _______ _____ ______
\_____ \\ \/ \/ /\__ \\_ __ \/ \ / ___/
/ \\ / / __ \| | \/ Y Y \\___ \
/_______ / \/\_/ (____ /__| |__|_| /____ >
\/ \/ \/ \/
"""
@ -68,7 +73,7 @@ def show_ascii_art():
panel = Panel(
Text(ASCII_ART, style=f"bold {COLORS['primary']}"),
border_style=COLORS["secondary"],
title="[bold]Welcome to Swarms CLI[/bold]",
title="[bold]Welcome to the Swarms CLI[/bold]",
subtitle="[dim]swarms.ai[/dim]",
)
console.print(panel)
@ -395,7 +400,14 @@ def check_python_version() -> tuple[bool, str, str]:
def check_api_keys() -> tuple[bool, str, str]:
"""Check if common API keys are set."""
"""
Check if at least one common API key is set in the environment variables.
Returns:
tuple: (True, "", message) if at least one API key is set,
(False, "", message) otherwise.
"""
api_keys = {
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"),
"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY"),
@ -403,9 +415,16 @@ def check_api_keys() -> tuple[bool, str, str]:
"COHERE_API_KEY": os.getenv("COHERE_API_KEY"),
}
set_keys = [key for key, value in api_keys.items() if value]
if set_keys:
return True, "", f"API keys found: {', '.join(set_keys)}"
# At least one key must be present and non-empty
if any(value for value in api_keys.values()):
present_keys = [
key for key, value in api_keys.items() if value
]
return (
True,
"",
f"At least one API key found: {', '.join(present_keys)}",
)
else:
return (
False,

@ -14,7 +14,6 @@ SwarmType = Literal[
"auto",
"MajorityVoting",
"MALT",
"DeepResearchSwarm",
"CouncilAsAJudge",
"InteractiveGroupChat",
]

@ -1,18 +1,15 @@
from swarms.structs.agent import Agent
from swarms.structs.agent_builder import AgentsBuilder
from swarms.structs.agent_rearrange import AgentRearrange, rearrange
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
from swarms.structs.base_structure import BaseStructure
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.batch_agent_execution import batch_agent_execution
from swarms.structs.board_of_directors_swarm import (
BoardOfDirectorsSwarm,
)
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.conversation import Conversation
from swarms.structs.council_as_judge import CouncilAsAJudge
from swarms.structs.cron_job import CronJob
from swarms.structs.de_hallucination_swarm import DeHallucinationSwarm
from swarms.structs.deep_research_swarm import DeepResearchSwarm
from swarms.structs.graph_workflow import (
Edge,
GraphWorkflow,
@ -24,8 +21,8 @@ from swarms.structs.groupchat import (
expertise_based,
)
from swarms.structs.heavy_swarm import HeavySwarm
from swarms.structs.hierarchical_swarm import HierarchicalSwarm
from swarms.structs.hybrid_hierarchical_peer_swarm import (
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
from swarms.structs.hybrid_hiearchical_peer_swarm import (
HybridHierarchicalClusterSwarm,
)
from swarms.structs.interactive_groupchat import (
@ -66,7 +63,6 @@ from swarms.structs.multi_agent_exec import (
run_single_agent,
)
from swarms.structs.multi_agent_router import MultiAgentRouter
from swarms.structs.rearrange import AgentRearrange, rearrange
from swarms.structs.round_robin import RoundRobinSwarm
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
@ -82,7 +78,7 @@ from swarms.structs.stopping_conditions import (
check_stopped,
check_success,
)
from swarms.structs.swarm_arange import SwarmRearrange
from swarms.structs.swarm_rearrange import SwarmRearrange
from swarms.structs.swarm_router import (
SwarmRouter,
SwarmType,
@ -107,32 +103,11 @@ from swarms.structs.swarming_architectures import (
staircase_swarm,
star_swarm,
)
from swarms.structs.hierarchical_structured_communication_framework import (
HierarchicalStructuredCommunicationFramework,
HierarchicalStructuredCommunicationGenerator,
HierarchicalStructuredCommunicationEvaluator,
HierarchicalStructuredCommunicationRefiner,
HierarchicalStructuredCommunicationSupervisor,
StructuredMessage,
HierarchicalOrder,
EvaluationResult,
StructuredMessageSchema,
EvaluationResultSchema,
GeneratorResponseSchema,
EvaluatorResponseSchema,
RefinerResponseSchema,
CommunicationType,
AgentRole,
)
# Convenience alias(fixes old code if any was left out in the wild)
HierarchicalStructuredCommunicationSwarm = HierarchicalStructuredCommunicationFramework
__all__ = [
"Agent",
"BaseStructure",
"BaseSwarm",
"BoardOfDirectorsSwarm",
"ConcurrentWorkflow",
"Conversation",
"GroupChat",
@ -188,7 +163,6 @@ __all__ = [
"AgentsBuilder",
"MALT",
"DeHallucinationSwarm",
"DeepResearchSwarm",
"HybridHierarchicalClusterSwarm",
"get_agents_info",
"get_swarms_info",
@ -206,22 +180,6 @@ __all__ = [
"HierarchicalSwarm",
"HeavySwarm",
"CronJob",
"HierarchicalStructuredCommunicationSwarm",
"HierarchicalStructuredCommunicationGenerator",
"HierarchicalStructuredCommunicationEvaluator",
"HierarchicalStructuredCommunicationRefiner",
"HierarchicalStructuredCommunicationSupervisor",
"StructuredMessage",
"HierarchicalOrder",
"EvaluationResult",
"StructuredMessageSchema",
"EvaluationResultSchema",
"GeneratorResponseSchema",
"EvaluatorResponseSchema",
"RefinerResponseSchema",
"CommunicationType",
"AgentRole",
# Stopping conditions
"check_done",
"check_finished",
"check_complete",

@ -0,0 +1,208 @@
import os
from typing import List, Union
from swarms.agents.create_agents_from_yaml import (
ReturnTypes,
create_agents_from_yaml,
)
from swarms.structs.agent import Agent
from swarms.structs.csv_to_agent import AgentLoader as CSVAgentLoader
from swarms.utils.agent_loader_markdown import (
load_agent_from_markdown,
load_agents_from_markdown,
AgentLoader as MarkdownAgentLoader,
)
class AgentLoader:
"""
Loader class for creating Agent objects from various file formats.
This class provides methods to load agents from Markdown, YAML, and CSV files.
"""
def __init__(self):
"""
Initialize the AgentLoader instance.
"""
pass
def load_agents_from_markdown(
self,
file_paths: Union[str, List[str]],
concurrent: bool = True,
max_file_size_mb: float = 10.0,
**kwargs,
) -> List[Agent]:
"""
Load multiple agents from one or more Markdown files.
Args:
file_paths (Union[str, List[str]]): Path or list of paths to Markdown file(s) containing agent definitions.
concurrent (bool, optional): Whether to load files concurrently. Defaults to True.
max_file_size_mb (float, optional): Maximum file size in MB to process. Defaults to 10.0.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects.
"""
return load_agents_from_markdown(
file_paths=file_paths,
concurrent=concurrent,
max_file_size_mb=max_file_size_mb,
**kwargs,
)
def load_agent_from_markdown(
self, file_path: str, **kwargs
) -> Agent:
"""
Load a single agent from a Markdown file.
Args:
file_path (str): Path to the Markdown file containing the agent definition.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
Agent: The loaded Agent object.
"""
return load_agent_from_markdown(file_path=file_path, **kwargs)
def load_agents_from_yaml(
self,
yaml_file: str,
return_type: ReturnTypes = "auto",
**kwargs,
) -> List[Agent]:
"""
Load agents from a YAML file.
Args:
yaml_file (str): Path to the YAML file containing agent definitions.
return_type (ReturnTypes, optional): The return type for the loader. Defaults to "auto".
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects.
"""
return create_agents_from_yaml(
yaml_file=yaml_file, return_type=return_type, **kwargs
)
def load_many_agents_from_yaml(
self,
yaml_files: List[str],
return_types: List[ReturnTypes] = ["auto"],
**kwargs,
) -> List[Agent]:
"""
Load agents from multiple YAML files.
Args:
yaml_files (List[str]): List of YAML file paths containing agent definitions.
return_types (List[ReturnTypes], optional): List of return types for each YAML file. Defaults to ["auto"].
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects from all files.
"""
return [
self.load_agents_from_yaml(
yaml_file=yaml_file,
return_type=return_types[i],
**kwargs,
)
for i, yaml_file in enumerate(yaml_files)
]
def load_agents_from_csv(
self, csv_file: str, **kwargs
) -> List[Agent]:
"""
Load agents from a CSV file.
Args:
csv_file (str): Path to the CSV file containing agent definitions.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects.
"""
loader = CSVAgentLoader(file_path=csv_file)
return loader.load_agents()
def auto(self, file_path: str, *args, **kwargs):
"""
Automatically load agents from a file based on its extension.
Args:
file_path (str): Path to the agent file (Markdown, YAML, or CSV).
*args: Additional positional arguments passed to the underlying loader.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects.
Raises:
ValueError: If the file type is not supported.
"""
if file_path.endswith(".md"):
return self.load_agents_from_markdown(
file_path, *args, **kwargs
)
elif file_path.endswith(".yaml"):
return self.load_agents_from_yaml(
file_path, *args, **kwargs
)
elif file_path.endswith(".csv"):
return self.load_agents_from_csv(
file_path, *args, **kwargs
)
else:
raise ValueError(f"Unsupported file type: {file_path}")
def load_single_agent(self, *args, **kwargs):
"""
Load a single agent from a file of a supported type.
Args:
*args: Positional arguments passed to the underlying loader.
**kwargs: Keyword arguments passed to the underlying loader.
Returns:
Agent: The loaded Agent object.
"""
return self.auto(*args, **kwargs)
def load_multiple_agents(
self, file_paths: List[str], *args, **kwargs
):
"""
Load multiple agents from a list of files of various supported types.
Args:
file_paths (List[str]): List of file paths to agent files (Markdown, YAML, or CSV).
*args: Additional positional arguments passed to the underlying loader.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects from all files.
"""
return [
self.auto(file_path, *args, **kwargs)
for file_path in file_paths
]
def parse_markdown_file(self, file_path: str):
"""
Parse a Markdown file and return the agents defined within.
Args:
file_path (str): Path to the Markdown file.
Returns:
List[Agent]: A list of Agent objects parsed from the file.
"""
return MarkdownAgentLoader(
max_workers=os.cpu_count()
).parse_markdown_file(file_path=file_path)

@ -85,7 +85,6 @@ Choose the most appropriate architecture based on task requirements:
- **HiearchicalSwarm**: Layered decision-making with management and execution tiers
- **MajorityVoting**: Democratic decision-making with voting mechanisms
- **MALT**: Multi-agent learning and training with knowledge sharing
- **DeepResearchSwarm**: Comprehensive research with multiple specialized investigators
- **CouncilAsAJudge**: Deliberative decision-making with expert panels
- **InteractiveGroupChat**: Dynamic group interactions with real-time collaboration
- **HeavySwarm**: High-capacity processing with multiple specialized agents

@ -19,7 +19,6 @@ Flow:
6. All context and conversation history is preserved throughout the process
"""
import asyncio
import json
import os
import re
@ -34,7 +33,6 @@ from loguru import logger
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import list_all_agents
from swarms.utils.history_output_formatter import (
@ -54,23 +52,6 @@ board_logger = initialize_logger(
# ============================================================================
class BoardFeatureStatus(str, Enum):
"""Enumeration of Board of Directors feature status.
This enum defines the possible states of the Board of Directors feature
within the Swarms Framework.
Attributes:
ENABLED: Feature is explicitly enabled
DISABLED: Feature is explicitly disabled
AUTO: Feature state is determined automatically
"""
ENABLED = "enabled"
DISABLED = "disabled"
AUTO = "auto"
class BoardConfigModel(BaseModel):
"""
Configuration model for Board of Directors feature.
@ -91,12 +72,6 @@ class BoardConfigModel(BaseModel):
custom_board_templates: Custom board templates for different use cases
"""
# Feature control
board_feature_enabled: bool = Field(
default=False,
description="Whether the Board of Directors feature is enabled globally.",
)
# Board composition
default_board_size: int = Field(
default=3,
@ -201,9 +176,6 @@ class BoardConfig:
):
self._load_from_file()
# Override with environment variables
self._load_from_environment()
# Override with explicit config data
if self.config_data:
self._load_from_dict(self.config_data)
@ -236,62 +208,6 @@ class BoardConfig:
)
raise
def _load_from_environment(self) -> None:
"""
Load configuration from environment variables.
This method maps environment variables to configuration parameters
and handles type conversion appropriately.
"""
env_mappings = {
"SWARMS_BOARD_FEATURE_ENABLED": "board_feature_enabled",
"SWARMS_BOARD_DEFAULT_SIZE": "default_board_size",
"SWARMS_BOARD_DECISION_THRESHOLD": "decision_threshold",
"SWARMS_BOARD_ENABLE_VOTING": "enable_voting",
"SWARMS_BOARD_ENABLE_CONSENSUS": "enable_consensus",
"SWARMS_BOARD_DEFAULT_MODEL": "default_board_model",
"SWARMS_BOARD_VERBOSE_LOGGING": "verbose_logging",
"SWARMS_BOARD_MAX_MEETING_DURATION": "max_board_meeting_duration",
"SWARMS_BOARD_AUTO_FALLBACK": "auto_fallback_to_director",
}
for env_var, config_key in env_mappings.items():
value = os.getenv(env_var)
if value is not None:
try:
# Convert string values to appropriate types
if config_key in [
"board_feature_enabled",
"enable_voting",
"enable_consensus",
"verbose_logging",
"auto_fallback_to_director",
]:
converted_value = value.lower() in [
"true",
"1",
"yes",
"on",
]
elif config_key in [
"default_board_size",
"max_board_meeting_duration",
]:
converted_value = int(value)
elif config_key in ["decision_threshold"]:
converted_value = float(value)
else:
converted_value = value
setattr(self.config, config_key, converted_value)
logger.debug(
f"Loaded {config_key} from environment: {converted_value}"
)
except (ValueError, TypeError) as e:
logger.warning(
f"Failed to parse environment variable {env_var}: {e}"
)
def _load_from_dict(self, config_dict: Dict[str, Any]) -> None:
"""
Load configuration from dictionary.
@ -312,15 +228,6 @@ class BoardConfig:
f"Invalid configuration value for {key}: {e}"
)
def is_enabled(self) -> bool:
"""
Check if the Board of Directors feature is enabled.
Returns:
bool: True if the feature is enabled, False otherwise
"""
return self.config.board_feature_enabled
def get_config(self) -> BoardConfigModel:
"""
Get the current configuration.
@ -562,64 +469,6 @@ def get_board_config(
return _board_config
def enable_board_feature(
config_file_path: Optional[str] = None,
) -> None:
"""
Enable the Board of Directors feature globally.
This function enables the Board of Directors feature and saves the configuration
to the specified file path.
Args:
config_file_path: Optional path to save the configuration
"""
config = get_board_config(config_file_path)
config.update_config({"board_feature_enabled": True})
if config_file_path:
config.save_config(config_file_path)
logger.info("Board of Directors feature enabled")
def disable_board_feature(
config_file_path: Optional[str] = None,
) -> None:
"""
Disable the Board of Directors feature globally.
This function disables the Board of Directors feature and saves the configuration
to the specified file path.
Args:
config_file_path: Optional path to save the configuration
"""
config = get_board_config(config_file_path)
config.update_config({"board_feature_enabled": False})
if config_file_path:
config.save_config(config_file_path)
logger.info("Board of Directors feature disabled")
def is_board_feature_enabled(
config_file_path: Optional[str] = None,
) -> bool:
"""
Check if the Board of Directors feature is enabled.
Args:
config_file_path: Optional path to configuration file
Returns:
bool: True if the feature is enabled, False otherwise
"""
config = get_board_config(config_file_path)
return config.is_enabled()
def create_default_config_file(
file_path: str = "swarms_board_config.yaml",
) -> None:
@ -953,7 +802,7 @@ class BoardSpec(BaseModel):
)
class BoardOfDirectorsSwarm(BaseSwarm):
class BoardOfDirectorsSwarm:
"""
A hierarchical swarm of agents with a Board of Directors that orchestrates tasks.
@ -1029,13 +878,8 @@ class BoardOfDirectorsSwarm(BaseSwarm):
Raises:
ValueError: If critical requirements are not met during initialization
"""
super().__init__(
name=name,
description=description,
agents=agents,
)
self.name = name
self.description = description
self.board_members = board_members or []
self.agents = agents or []
self.max_loops = max_loops
@ -1047,9 +891,8 @@ class BoardOfDirectorsSwarm(BaseSwarm):
self.decision_threshold = decision_threshold
self.enable_voting = enable_voting
self.enable_consensus = enable_consensus
self.max_workers = max_workers or min(
32, (os.cpu_count() or 1) + 4
)
self.max_workers = max_workers
self.max_workers = os.cpu_count()
# Initialize the swarm
self._init_board_swarm()
@ -1258,14 +1101,6 @@ You should be thorough, organized, and detail-oriented in your documentation."""
f"🔍 Running reliability checks for swarm: {self.name}"
)
# Check if Board of Directors feature is enabled
board_config = get_board_config()
if not board_config.is_enabled():
raise ValueError(
"Board of Directors feature is not enabled. Please enable it using "
"enable_board_feature() or set SWARMS_BOARD_FEATURE_ENABLED=true environment variable."
)
if not self.agents or len(self.agents) == 0:
raise ValueError(
"No agents found in the swarm. At least one agent must be provided to create a Board of Directors swarm."
@ -1687,34 +1522,6 @@ Please provide your response in the following format:
board_logger.error(error_msg)
raise
async def arun(
self,
task: str,
img: Optional[str] = None,
*args: Any,
**kwargs: Any,
) -> Any:
"""
Run the Board of Directors swarm asynchronously.
This method provides an asynchronous interface for running the swarm,
allowing for non-blocking execution in async contexts.
Args:
task: The task to be executed
img: Optional image input
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
Any: The final result of the swarm execution
"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, self.run, task, img, *args, **kwargs
)
return result
def _generate_board_feedback(self, outputs: List[Any]) -> str:
"""
Provide feedback from the Board of Directors based on agent outputs.

@ -1,23 +1,25 @@
import concurrent.futures
import csv
import json
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import (
List,
Any,
Dict,
List,
TypedDict,
Any,
Union,
TypeVar,
Union,
)
from dataclasses import dataclass
import csv
import json
import yaml
from pathlib import Path
from enum import Enum
from swarms.structs.agent import Agent
from swarms.schemas.swarms_api_schemas import AgentSpec
from litellm import model_list
import concurrent.futures
from tqdm import tqdm
from swarms.schemas.swarms_api_schemas import AgentSpec
from swarms.structs.agent import Agent
# Type variable for agent configuration
AgentConfigType = TypeVar(
"AgentConfigType", bound=Union[AgentSpec, Dict[str, Any]]

@ -1,479 +0,0 @@
import concurrent.futures
import json
import os
from typing import Any, List
from dotenv import load_dotenv
from rich.console import Console
import requests
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.formatter import formatter
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.str_to_dict import str_to_dict
console = Console()
load_dotenv()
# Number of worker threads for concurrent operations
MAX_WORKERS = (
os.cpu_count() * 2
) # Optimal number of workers based on CPU cores
def exa_search(query: str, **kwargs: Any) -> str:
"""Performs web search using Exa.ai API and returns formatted results."""
api_url = "https://api.exa.ai/search"
api_key = os.getenv("EXA_API_KEY")
if not api_key:
return "### Error\nEXA_API_KEY environment variable not set\n"
headers = {
"x-api-key": api_key,
"Content-Type": "application/json",
}
safe_kwargs = {
str(k): v
for k, v in kwargs.items()
if k is not None and v is not None and str(k) != "None"
}
payload = {
"query": query,
"useAutoprompt": True,
"numResults": safe_kwargs.get("num_results", 10),
"contents": {
"text": True,
"highlights": {"numSentences": 10},
},
}
for key, value in safe_kwargs.items():
if key not in payload and key not in [
"query",
"useAutoprompt",
"numResults",
"contents",
]:
payload[key] = value
try:
response = requests.post(
api_url, json=payload, headers=headers
)
if response.status_code != 200:
return f"### Error\nHTTP {response.status_code}: {response.text}\n"
json_data = response.json()
except Exception as e:
return f"### Error\n{str(e)}\n"
if "error" in json_data:
return f"### Error\n{json_data['error']}\n"
formatted_text = []
search_params = json_data.get("effectiveFilters", {})
query = search_params.get("query", "General web search")
formatted_text.append(
f"### Exa Search Results for: '{query}'\n\n---\n"
)
results = json_data.get("results", [])
if not results:
formatted_text.append("No results found.\n")
return "".join(formatted_text)
for i, result in enumerate(results, 1):
title = result.get("title", "No title")
url = result.get("url", result.get("id", "No URL"))
published_date = result.get("publishedDate", "")
highlights = result.get("highlights", [])
highlight_text = (
"\n".join(
(
h.get("text", str(h))
if isinstance(h, dict)
else str(h)
)
for h in highlights[:3]
)
if highlights
else "No summary available"
)
formatted_text.extend(
[
f"{i}. **{title}**\n",
f" - URL: {url}\n",
f" - Published: {published_date.split('T')[0] if published_date else 'Date unknown'}\n",
f" - Key Points:\n {highlight_text}\n\n",
]
)
return "".join(formatted_text)
# Define the research tools schema
tools = [
{
"type": "function",
"function": {
"name": "search_topic",
"description": "Conduct a thorough search on a specified topic or subtopic, generating a precise array of highly detailed search queries tailored to the input parameters.",
"parameters": {
"type": "object",
"properties": {
"depth": {
"type": "integer",
"description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 signifies a superficial search and 3 indicates an in-depth exploration of the topic.",
},
"detailed_queries": {
"type": "array",
"description": "An array of specific search queries generated based on the input query and the specified depth. Each query must be crafted to elicit detailed and relevant information from various sources.",
"items": {
"type": "string",
"description": "Each item in this array must represent a unique search query targeting a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.",
},
},
},
"required": ["depth", "detailed_queries"],
},
},
},
]
RESEARCH_AGENT_PROMPT = """
You are an advanced research agent specialized in conducting deep, comprehensive research across multiple domains.
Your task is to:
1. Break down complex topics into searchable subtopics
2. Generate diverse search queries to explore each subtopic thoroughly
3. Identify connections and patterns across different areas of research
4. Synthesize findings into coherent insights
5. Identify gaps in current knowledge and suggest areas for further investigation
For each research task:
- Consider multiple perspectives and approaches
- Look for both supporting and contradicting evidence
- Evaluate the credibility and relevance of sources
- Track emerging trends and recent developments
- Consider cross-disciplinary implications
Output Format:
- Provide structured research plans
- Include specific search queries for each subtopic
- Prioritize queries based on relevance and potential impact
- Suggest follow-up areas for deeper investigation
"""
SUMMARIZATION_AGENT_PROMPT = """
You are an expert information synthesis and summarization agent designed for producing clear, accurate, and insightful summaries of complex information. Your core capabilities include:
Core Capabilities:
- Identify and extract key concepts, themes, and insights from any given content
- Recognize patterns, relationships, and hierarchies within information
- Filter out noise while preserving crucial context and nuance
- Handle multiple sources and perspectives simultaneously
Summarization Strategy
1. Multi-level Structure
- Provide an extensive summary
- Follow with key findings
- Include detailed insights with supporting evidence
- End with implications or next steps when relevant
2. Quality Standards
- Maintain factual accuracy and precision
- Preserve important technical details and terminology
- Avoid oversimplification of complex concepts
- Include quantitative data when available
- Cite or reference specific sources when summarizing claims
3. Clarity & Accessibility
- Use clear, concise language
- Define technical terms when necessary
- Structure information logically
- Use formatting to enhance readability
- Maintain appropriate level of technical depth for the audience
4. Synthesis & Analysis
- Identify conflicting information or viewpoints
- Highlight consensus across sources
- Note gaps or limitations in the information
- Draw connections between related concepts
- Provide context for better understanding
OUTPUT REQUIREMENTS:
- Begin with a clear statement of the topic or question being addressed
- Use consistent formatting and structure
- Clearly separate different levels of detail
- Include confidence levels for conclusions when appropriate
- Note any areas requiring additional research or clarification
Remember: Your goal is to make complex information accessible while maintaining accuracy and depth. Prioritize clarity without sacrificing important nuance or detail."""
class DeepResearchSwarm:
def __init__(
self,
name: str = "DeepResearchSwarm",
description: str = "A swarm that conducts comprehensive research across multiple domains",
max_loops: int = 1,
nice_print: bool = True,
output_type: str = "json",
max_workers: int = os.cpu_count()
* 2, # Let the system decide optimal thread count
token_count: bool = False,
research_model_name: str = "gpt-4o-mini",
claude_summarization_model_name: str = "claude-3-5-sonnet-20240620",
):
self.name = name
self.description = description
self.max_loops = max_loops
self.nice_print = nice_print
self.output_type = output_type
self.max_workers = max_workers
self.research_model_name = research_model_name
self.claude_summarization_model_name = (
claude_summarization_model_name
)
self.reliability_check()
self.conversation = Conversation(token_count=token_count)
# Create a persistent ThreadPoolExecutor for the lifetime of the swarm
# This eliminates thread creation overhead on each query
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
)
# Initialize the research agent
self.research_agent = Agent(
agent_name="Deep-Research-Agent",
agent_description="Specialized agent for conducting comprehensive research across multiple domains",
system_prompt=RESEARCH_AGENT_PROMPT,
max_loops=1, # Allow multiple iterations for thorough research
tools_list_dictionary=tools,
model_name=self.research_model_name,
output_type="final",
)
self.summarization_agent = Agent(
agent_name="Summarization-Agent",
agent_description="Specialized agent for summarizing research results",
system_prompt=SUMMARIZATION_AGENT_PROMPT,
max_loops=1,
model_name=self.claude_summarization_model_name,
output_type="final",
)
def __del__(self):
"""Clean up the executor on object destruction"""
self.executor.shutdown(wait=False)
def reliability_check(self):
"""Check the reliability of the query"""
if self.max_loops < 1:
raise ValueError("max_loops must be greater than 0")
formatter.print_panel(
"DeepResearchSwarm is booting up...", "blue"
)
formatter.print_panel("Reliability check passed", "green")
def get_queries(self, query: str) -> List[str]:
"""
Generate a list of detailed search queries based on the input query.
Args:
query (str): The main research query to explore
Returns:
List[str]: A list of detailed search queries
"""
self.conversation.add(role="User", content=query)
# Get the agent's response
agent_output = self.research_agent.run(query)
# Transform the string into a list of dictionaries
agent_output = json.loads(agent_output)
print(agent_output)
print(type(agent_output))
formatter.print_panel(
f"Agent output type: {type(agent_output)} \n {agent_output}",
"blue",
)
# Convert the output to a dictionary if it's a list
if isinstance(agent_output, list):
agent_output = json.dumps(agent_output)
if isinstance(agent_output, str):
# Convert the string output to dictionary
output_dict = (
str_to_dict(agent_output)
if isinstance(agent_output, str)
else agent_output
)
# Extract the detailed queries from the output
# Search for the key "detailed_queries" in the output list[dictionary]
if isinstance(output_dict, list):
for item in output_dict:
if "detailed_queries" in item:
queries = item["detailed_queries"]
break
else:
queries = output_dict.get("detailed_queries", [])
print(queries)
# Log the number of queries generated
formatter.print_panel(
f"Generated {len(queries)} queries", "blue"
)
print(queries)
print(type(queries))
return queries
def step(self, query: str):
"""
Execute a single research step with maximum parallelism.
Args:
query (str): The research query to process
Returns:
Formatted conversation history
"""
try:
# Get all the queries to process
queries = self.get_queries(query)
print(queries)
# Submit all queries for concurrent processing
futures = []
for q in queries:
future = self.executor.submit(exa_search, q)
futures.append((q, future))
# Process results as they complete
for q, future in futures:
try:
# Get search results only
results = future.result()
# Add search results to conversation
self.conversation.add(
role="User",
content=f"Search results for {q}: \n {results}",
)
except Exception as e:
# Handle any errors in the thread
error_msg = (
f"Error processing query '{q}': {str(e)}"
)
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add(
role="System",
content=error_msg,
)
# Generate final comprehensive analysis after all searches are complete
try:
final_summary = self.summarization_agent.run(
f"Please generate a comprehensive 4,000-word report analyzing the following content: {self.conversation.get_str()}"
)
self.conversation.add(
role=self.summarization_agent.agent_name,
content=final_summary,
)
except Exception as e:
error_msg = (
f"Error generating final summary: {str(e)}"
)
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add(
role="System",
content=error_msg,
)
# Return formatted output
result = history_output_formatter(
self.conversation, type=self.output_type
)
# If output type is JSON, ensure it's properly formatted
if self.output_type.lower() == "json":
try:
import json
if isinstance(result, str):
# Try to parse and reformat for pretty printing
parsed = json.loads(result)
return json.dumps(
parsed, indent=2, ensure_ascii=False
)
except (json.JSONDecodeError, TypeError):
# If parsing fails, return as-is
pass
return result
except Exception as e:
error_msg = f"Critical error in step execution: {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
return (
{"error": error_msg}
if self.output_type.lower() == "json"
else error_msg
)
def run(self, task: str):
return self.step(task)
def batched_run(self, tasks: List[str]):
"""
Execute a list of research tasks in parallel.
Args:
tasks (List[str]): A list of research tasks to execute
Returns:
List[str]: A list of formatted conversation histories
"""
futures = []
for task in tasks:
future = self.executor.submit(self.step, task)
futures.append((task, future))
# Example usage
# if __name__ == "__main__":
# try:
# swarm = DeepResearchSwarm(
# output_type="json",
# )
# result = swarm.step(
# "What is the active tariff situation with mexico? Only create 2 queries"
# )
# # Parse and display results in rich format with markdown export
# swarm.parse_and_display_results(result, export_markdown=True)
# except Exception as e:
# print(f"Error running deep research swarm: {str(e)}")
# import traceback
# traceback.print_exc()

@ -621,10 +621,6 @@ class SwarmMatcher:
name="ConsensusSwarm",
description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions, consensus building",
),
SwarmType(
name="DeepResearchSwarm",
description="Conduct in-depth research and analysis by coordinating multiple agents to explore, synthesize, and validate information from various sources. Keywords: research methodology, information synthesis, data validation, comprehensive analysis, knowledge discovery, systematic investigation",
),
SwarmType(
name="CouncilAsAJudge",
description="Evaluate and judge solutions or decisions through a council of expert agents acting as arbitrators. Keywords: evaluation, judgment, arbitration, expert assessment, quality control, decision validation, peer review, consensus building",

@ -10,10 +10,9 @@ from swarms.prompts.multi_agent_collab_prompt import (
MULTI_AGENT_COLLAB_PROMPT_TWO,
)
from swarms.structs.agent import Agent
from swarms.structs.agent_rearrange import AgentRearrange
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.council_as_judge import CouncilAsAJudge
from swarms.structs.csv_to_agent import AgentLoader
from swarms.structs.deep_research_swarm import DeepResearchSwarm
from swarms.structs.groupchat import GroupChat
from swarms.structs.heavy_swarm import HeavySwarm
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
@ -23,7 +22,6 @@ from swarms.structs.majority_voting import MajorityVoting
from swarms.structs.malt import MALT
from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.multi_agent_router import MultiAgentRouter
from swarms.structs.agent_rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.swarm_matcher import swarm_matcher
from swarms.telemetry.log_executions import log_execution
@ -45,7 +43,6 @@ SwarmType = Literal[
"auto",
"MajorityVoting",
"MALT",
"DeepResearchSwarm",
"CouncilAsAJudge",
"InteractiveGroupChat",
"HeavySwarm",
@ -288,12 +285,6 @@ class SwarmRouter:
self.setup()
# Load agents from CSV
if self.load_agents_from_csv:
self.agents = AgentLoader(
csv_path=self.csv_file_path
).load_agents()
if self.telemetry_enabled:
self.agent_config = self.agent_config()
@ -387,7 +378,6 @@ class SwarmRouter:
"MALT": self._create_malt,
"CouncilAsAJudge": self._create_council_as_judge,
"InteractiveGroupChat": self._create_interactive_group_chat,
"DeepResearchSwarm": self._create_deep_research_swarm,
"HiearchicalSwarm": self._create_hierarchical_swarm,
"MixtureOfAgents": self._create_mixture_of_agents,
"MajorityVoting": self._create_majority_voting,
@ -455,16 +445,6 @@ class SwarmRouter:
speaker_function=self.speaker_function,
)
def _create_deep_research_swarm(self, *args, **kwargs):
"""Factory function for DeepResearchSwarm."""
return DeepResearchSwarm(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
output_type=self.output_type,
)
def _create_hierarchical_swarm(self, *args, **kwargs):
"""Factory function for HierarchicalSwarm."""
return HierarchicalSwarm(

@ -21,9 +21,7 @@ from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.agent_loader import (
AgentLoader,
MarkdownAgentConfig,
from swarms.utils.agent_loader_markdown import (
load_agent_from_markdown,
load_agents_from_markdown,
)
@ -51,8 +49,6 @@ __all__ = [
"HistoryOutputType",
"history_output_formatter",
"check_all_model_max_tokens",
"AgentLoader",
"MarkdownAgentConfig",
"load_agent_from_markdown",
"load_agents_from_markdown",
"dynamic_auto_chunking",

@ -1,14 +1,15 @@
import os
import yaml
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING
from concurrent.futures import (
ThreadPoolExecutor,
as_completed,
TimeoutError,
as_completed,
)
from pydantic import BaseModel, Field, field_validator
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import yaml
from loguru import logger
from pydantic import BaseModel, Field, field_validator
# Type checking imports to avoid circular dependency
if TYPE_CHECKING:
@ -407,14 +408,39 @@ class AgentLoader:
# Convenience functions
def load_agent_from_markdown(file_path: str, **kwargs) -> "Agent":
"""
Load a single agent from a markdown file with Claude Code YAML frontmatter format.
Load a single agent from a markdown file using the Claude Code YAML frontmatter format.
This function provides a simple interface for loading an agent configuration
from a markdown file. It supports all configuration overrides accepted by the
underlying `AgentLoader` and agent class.
Args:
file_path: Path to markdown file with YAML frontmatter
**kwargs: Additional configuration overrides
file_path (str): Path to the markdown file containing YAML frontmatter
with agent configuration.
**kwargs: Optional keyword arguments to override agent configuration
parameters. Common options include:
- max_loops (int): Maximum number of reasoning loops.
- autosave (bool): Enable automatic state saving.
- dashboard (bool): Enable dashboard monitoring.
- verbose (bool): Enable verbose logging.
- dynamic_temperature_enabled (bool): Enable dynamic temperature.
- saved_state_path (str): Path for saving agent state.
- user_name (str): User identifier.
- retry_attempts (int): Number of retry attempts.
- context_length (int): Maximum context length.
- return_step_meta (bool): Return step metadata.
- output_type (str): Output format type.
- auto_generate_prompt (bool): Auto-generate prompts.
- artifacts_on (bool): Enable artifacts.
- streaming_on (bool): Enable streaming output.
- mcp_url (str): MCP server URL if needed.
Returns:
Configured Agent instance
Agent: Configured Agent instance loaded from the markdown file.
Example:
>>> agent = load_agent_from_markdown("finance_advisor.md", max_loops=3, verbose=True)
>>> response = agent.run("What is the best investment strategy for 2024?")
"""
# Lazy import to avoid circular dependency
@ -429,16 +455,36 @@ def load_agents_from_markdown(
**kwargs,
) -> List["Agent"]:
"""
Load multiple agents from markdown files with Claude Code YAML frontmatter format.
Load multiple agents from markdown files using the Claude Code YAML frontmatter format.
This function supports loading agents from a list of markdown files or from all
markdown files in a directory. It can process files concurrently for faster loading,
and allows configuration overrides for all loaded agents.
Args:
file_paths: Directory path or list of file paths with YAML frontmatter
concurrent: Whether to use concurrent processing for multiple files
max_file_size_mb: Maximum file size in MB to prevent memory issues
**kwargs: Additional configuration overrides
file_paths (Union[str, List[str]]): Either a directory path containing markdown
files or a list of markdown file paths to load.
concurrent (bool, optional): If True, enables concurrent processing for faster
loading of multiple files. Defaults to True.
max_file_size_mb (float, optional): Maximum file size (in MB) for each markdown
file to prevent memory issues. Files exceeding this size will be skipped.
Defaults to 10.0.
**kwargs: Optional keyword arguments to override agent configuration
parameters for all loaded agents. See `load_agent_from_markdown` for
available options.
Returns:
List of configured Agent instances
List[Agent]: List of configured Agent instances loaded from the markdown files.
Example:
>>> agents = load_agents_from_markdown(
... ["agent1.md", "agent2.md"],
... concurrent=True,
... max_loops=2,
... verbose=True
... )
>>> for agent in agents:
... print(agent.name)
"""
# Lazy import to avoid circular dependency
Loading…
Cancel
Save