Merge branch 'master' into update/docs

pull/981/head
harshalmore31 1 month ago committed by GitHub
commit c955a602d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,54 +0,0 @@
from swarms import Agent, CronJob
from loguru import logger
# Example usage
if __name__ == "__main__":
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
streaming_on=True,
print_on=True,
telemetry_enable=False,
)
# Example 1: Basic usage with just a task
logger.info("Starting example cron job")
cron_job = CronJob(agent=agent, interval="10seconds")
cron_job.run(
task="What are the best top 3 etfs for gold coverage?"
)

@ -276,7 +276,6 @@ nav:
- Overview: "swarms/structs/overview.md"
- Custom Multi Agent Architectures: "swarms/structs/custom_swarm.md"
- Debate Multi-Agent Architectures: "swarms/structs/orchestration_methods.md"
- MajorityVoting: "swarms/structs/majorityvoting.md"
- RoundRobin: "swarms/structs/round_robin_swarm.md"
- Mixture of Agents: "swarms/structs/moa.md"
@ -295,6 +294,7 @@ nav:
- Hybrid Hierarchical-Cluster Swarm: "swarms/structs/hhcs.md"
- Auto Swarm Builder: "swarms/structs/auto_swarm_builder.md"
- Swarm Matcher: "swarms/structs/swarm_matcher.md"
- Board of Directors: "swarms/structs/BoardOfDirectors.md"
# - Multi-Agent Multi-Modal Structures:
# - ImageAgentBatchProcessor: "swarms/structs/image_batch_agent.md"

@ -0,0 +1,903 @@
# Board of Directors - Multi-Agent Architecture
The Board of Directors is a sophisticated multi-agent architecture that implements collective decision-making through democratic processes, voting mechanisms, and role-based leadership. This architecture provides an alternative to single-director patterns by enabling collaborative intelligence through structured governance.
## 🏛️ Overview
The Board of Directors architecture follows a democratic workflow pattern:
1. **Task Reception**: User provides a task to the swarm
2. **Board Meeting**: Board of Directors convenes to discuss and create a plan
3. **Voting & Consensus**: Board members vote and reach consensus on task distribution
4. **Order Distribution**: Board distributes orders to specialized worker agents
5. **Execution**: Individual agents execute their assigned tasks
6. **Feedback Loop**: Board evaluates results and issues new orders if needed (up to `max_loops`)
7. **Context Preservation**: All conversation history and context is maintained throughout the process
## 🏗️ Architecture Components
### Core Components
| Component | Description | Purpose |
|-----------|-------------|---------|
| **BoardOfDirectorsSwarm** | Main orchestration class | Manages the entire board workflow and agent coordination |
| **Board Member Roles** | Role definitions and hierarchy | Defines responsibilities and voting weights for each board member |
| **Decision Making Process** | Voting and consensus mechanisms | Implements democratic decision-making with weighted voting |
| **Workflow Management** | Process orchestration | Manages the complete lifecycle from task reception to final delivery |
### Board Member Interaction Flow
```mermaid
sequenceDiagram
participant User
participant Chairman
participant ViceChair
participant Secretary
participant Treasurer
participant ExecDir
participant Agents
User->>Chairman: Submit Task
Chairman->>ViceChair: Notify Board Meeting
Chairman->>Secretary: Request Meeting Setup
Chairman->>Treasurer: Resource Assessment
Chairman->>ExecDir: Strategic Planning
Note over Chairman,ExecDir: Board Discussion Phase
Chairman->>ViceChair: Lead Discussion
ViceChair->>Secretary: Document Decisions
Secretary->>Treasurer: Budget Considerations
Treasurer->>ExecDir: Resource Allocation
ExecDir->>Chairman: Strategic Recommendations
Note over Chairman,ExecDir: Voting & Consensus
Chairman->>ViceChair: Call for Vote
ViceChair->>Secretary: Record Votes
Secretary->>Treasurer: Financial Approval
Treasurer->>ExecDir: Resource Approval
ExecDir->>Chairman: Final Decision
Note over Chairman,Agents: Execution Phase
Chairman->>Agents: Distribute Orders
Agents->>Chairman: Execute Tasks
Agents->>ViceChair: Progress Reports
Agents->>Secretary: Documentation
Agents->>Treasurer: Resource Usage
Agents->>ExecDir: Strategic Updates
Note over Chairman,ExecDir: Review & Feedback
Chairman->>User: Deliver Results
```
## 👥 Board Member Roles
The Board of Directors supports various roles with different responsibilities and voting weights:
| Role | Description | Voting Weight | Responsibilities |
|------|-------------|---------------|------------------|
| `CHAIRMAN` | Primary leader responsible for board meetings and final decisions | 1.5 | Leading meetings, facilitating consensus, making final decisions |
| `VICE_CHAIRMAN` | Secondary leader who supports the chairman | 1.2 | Supporting chairman, coordinating operations |
| `SECRETARY` | Responsible for documentation and meeting minutes | 1.0 | Documenting meetings, maintaining records |
| `TREASURER` | Manages financial aspects and resource allocation | 1.0 | Financial oversight, resource management |
| `EXECUTIVE_DIRECTOR` | Executive-level board member with operational authority | 1.5 | Strategic planning, operational oversight |
| `MEMBER` | General board member with specific expertise | 1.0 | Contributing expertise, participating in decisions |
### Role Hierarchy and Authority
```python
# Example: Role hierarchy implementation
class BoardRoleHierarchy:
def __init__(self):
self.roles = {
"CHAIRMAN": {
"voting_weight": 1.5,
"authority_level": "FINAL",
"supervises": ["VICE_CHAIRMAN", "EXECUTIVE_DIRECTOR", "SECRETARY", "TREASURER", "MEMBER"],
"responsibilities": ["leadership", "final_decision", "consensus_facilitation"],
"override_capability": True
},
"VICE_CHAIRMAN": {
"voting_weight": 1.2,
"authority_level": "SENIOR",
"supervises": ["MEMBER"],
"responsibilities": ["operational_support", "coordination", "implementation"],
"backup_for": "CHAIRMAN"
},
"EXECUTIVE_DIRECTOR": {
"voting_weight": 1.5,
"authority_level": "SENIOR",
"supervises": ["MEMBER"],
"responsibilities": ["strategic_planning", "execution_oversight", "performance_management"],
"strategic_authority": True
},
"SECRETARY": {
"voting_weight": 1.0,
"authority_level": "STANDARD",
"supervises": [],
"responsibilities": ["documentation", "record_keeping", "communication"],
"administrative_authority": True
},
"TREASURER": {
"voting_weight": 1.0,
"authority_level": "STANDARD",
"supervises": [],
"responsibilities": ["financial_oversight", "resource_management", "budget_control"],
"financial_authority": True
},
"MEMBER": {
"voting_weight": 1.0,
"authority_level": "STANDARD",
"supervises": [],
"responsibilities": ["expertise_contribution", "analysis", "voting"],
"specialized_expertise": True
}
}
```
## 🚀 Quick Start
### Basic Setup
```python
from swarms import Agent
from swarms.structs.board_of_directors_swarm import (
BoardOfDirectorsSwarm,
BoardMember,
BoardMemberRole
)
from swarms.config.board_config import enable_board_feature
# Enable the Board of Directors feature
enable_board_feature()
# Create board members with specific roles
chairman = Agent(
agent_name="Chairman",
agent_description="Chairman of the Board responsible for leading meetings",
model_name="gpt-4o-mini",
system_prompt="You are the Chairman of the Board..."
)
vice_chairman = Agent(
agent_name="Vice-Chairman",
agent_description="Vice Chairman who supports the Chairman",
model_name="gpt-4o-mini",
system_prompt="You are the Vice Chairman..."
)
# Create BoardMember objects with roles and expertise
board_members = [
BoardMember(chairman, BoardMemberRole.CHAIRMAN, 1.5, ["leadership", "strategy"]),
BoardMember(vice_chairman, BoardMemberRole.VICE_CHAIRMAN, 1.2, ["operations", "coordination"]),
]
# Create worker agents
research_agent = Agent(
agent_name="Research-Specialist",
agent_description="Expert in market research and analysis",
model_name="gpt-4o",
)
financial_agent = Agent(
agent_name="Financial-Analyst",
agent_description="Specialist in financial analysis and valuation",
model_name="gpt-4o",
)
# Initialize the Board of Directors swarm
board_swarm = BoardOfDirectorsSwarm(
name="Executive_Board_Swarm",
description="Executive board with specialized roles for strategic decision-making",
board_members=board_members,
agents=[research_agent, financial_agent],
max_loops=2,
verbose=True,
decision_threshold=0.6,
enable_voting=True,
enable_consensus=True,
)
# Execute a complex task with democratic decision-making
result = board_swarm.run(task="Analyze the market potential for Tesla (TSLA) stock")
print(result)
```
## 📋 Comprehensive Examples
### 1. Strategic Investment Analysis
```python
# Create specialized agents for investment analysis
market_research_agent = Agent(
agent_name="Market-Research-Specialist",
agent_description="Expert in market research, competitive analysis, and industry trends",
model_name="gpt-4o",
system_prompt="""You are a Market Research Specialist. Your responsibilities include:
1. Conducting comprehensive market research and analysis
2. Identifying market trends, opportunities, and risks
3. Analyzing competitive landscape and positioning
4. Providing market size and growth projections
5. Supporting strategic decision-making with research findings
You should be thorough, analytical, and objective in your research."""
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
agent_description="Specialist in financial analysis, valuation, and investment assessment",
model_name="gpt-4o",
system_prompt="""You are a Financial Analyst. Your responsibilities include:
1. Conducting financial analysis and valuation
2. Assessing investment opportunities and risks
3. Analyzing financial performance and metrics
4. Providing financial insights and recommendations
5. Supporting financial decision-making
You should be financially astute, analytical, and focused on value creation."""
)
technical_assessor_agent = Agent(
agent_name="Technical-Assessor",
agent_description="Expert in technical feasibility and implementation assessment",
model_name="gpt-4o",
system_prompt="""You are a Technical Assessor. Your responsibilities include:
1. Evaluating technical feasibility and requirements
2. Assessing implementation challenges and risks
3. Analyzing technology stack and architecture
4. Providing technical insights and recommendations
5. Supporting technical decision-making
You should be technically proficient, practical, and solution-oriented."""
)
# Create comprehensive board members
board_members = [
BoardMember(
chairman,
BoardMemberRole.CHAIRMAN,
1.5,
["leadership", "strategy", "governance", "decision_making"]
),
BoardMember(
vice_chairman,
BoardMemberRole.VICE_CHAIRMAN,
1.2,
["operations", "coordination", "communication", "implementation"]
),
BoardMember(
secretary,
BoardMemberRole.SECRETARY,
1.0,
["documentation", "compliance", "record_keeping", "communication"]
),
BoardMember(
treasurer,
BoardMemberRole.TREASURER,
1.0,
["finance", "budgeting", "risk_management", "resource_allocation"]
),
BoardMember(
executive_director,
BoardMemberRole.EXECUTIVE_DIRECTOR,
1.5,
["strategy", "operations", "innovation", "performance_management"]
)
]
# Initialize the investment analysis board
investment_board = BoardOfDirectorsSwarm(
name="Investment_Analysis_Board",
description="Specialized board for investment analysis and decision-making",
board_members=board_members,
agents=[market_research_agent, financial_analyst_agent, technical_assessor_agent],
max_loops=3,
verbose=True,
decision_threshold=0.75, # Higher threshold for investment decisions
enable_voting=True,
enable_consensus=True,
max_workers=3,
output_type="dict"
)
# Execute investment analysis
investment_task = """
Analyze the strategic investment opportunity for a $50M Series B funding round in a
fintech startup. Consider market conditions, competitive landscape, financial projections,
technical feasibility, and strategic fit. Provide comprehensive recommendations including:
1. Investment recommendation (proceed/hold/decline)
2. Valuation analysis and suggested terms
3. Risk assessment and mitigation strategies
4. Strategic value and synergies
5. Implementation timeline and milestones
"""
result = investment_board.run(task=investment_task)
print("Investment Analysis Results:")
print(json.dumps(result, indent=2))
```
### 2. Technology Strategy Development
```python
# Create technology-focused agents
tech_strategy_agent = Agent(
agent_name="Tech-Strategy-Specialist",
agent_description="Expert in technology strategy and digital transformation",
model_name="gpt-4o",
system_prompt="""You are a Technology Strategy Specialist. Your responsibilities include:
1. Developing technology roadmaps and strategies
2. Assessing digital transformation opportunities
3. Evaluating emerging technologies and trends
4. Planning technology investments and priorities
5. Supporting technology decision-making
You should be strategic, forward-thinking, and technology-savvy."""
)
implementation_planner_agent = Agent(
agent_name="Implementation-Planner",
agent_description="Expert in implementation planning and project management",
model_name="gpt-4o",
system_prompt="""You are an Implementation Planner. Your responsibilities include:
1. Creating detailed implementation plans
2. Assessing resource requirements and timelines
3. Identifying implementation risks and challenges
4. Planning change management strategies
5. Supporting implementation decision-making
You should be practical, organized, and execution-focused."""
)
# Technology strategy board configuration
tech_board = BoardOfDirectorsSwarm(
name="Technology_Strategy_Board",
description="Specialized board for technology strategy and digital transformation",
board_members=board_members,
agents=[tech_strategy_agent, implementation_planner_agent, technical_assessor_agent],
max_loops=4, # More loops for complex technology planning
verbose=True,
decision_threshold=0.7,
enable_voting=True,
enable_consensus=True,
max_workers=3,
output_type="dict"
)
# Execute technology strategy development
tech_strategy_task = """
Develop a comprehensive technology strategy for a mid-size manufacturing company
looking to digitize operations and implement Industry 4.0 technologies. Consider:
1. Current technology assessment and gaps
2. Technology roadmap and implementation plan
3. Investment requirements and ROI analysis
4. Risk assessment and mitigation strategies
5. Change management and training requirements
6. Competitive positioning and market advantages
"""
result = tech_board.run(task=tech_strategy_task)
print("Technology Strategy Results:")
print(json.dumps(result, indent=2))
```
### 3. Crisis Management and Response
```python
# Create crisis management agents
crisis_coordinator_agent = Agent(
agent_name="Crisis-Coordinator",
agent_description="Expert in crisis management and emergency response",
model_name="gpt-4o",
system_prompt="""You are a Crisis Coordinator. Your responsibilities include:
1. Coordinating crisis response efforts
2. Assessing crisis severity and impact
3. Developing immediate response plans
4. Managing stakeholder communications
5. Supporting crisis decision-making
You should be calm, decisive, and action-oriented."""
)
communications_specialist_agent = Agent(
agent_name="Communications-Specialist",
agent_description="Expert in crisis communications and stakeholder management",
model_name="gpt-4o",
system_prompt="""You are a Communications Specialist. Your responsibilities include:
1. Developing crisis communication strategies
2. Managing stakeholder communications
3. Coordinating public relations efforts
4. Ensuring message consistency and accuracy
5. Supporting communication decision-making
You should be clear, empathetic, and strategic in communications."""
)
# Crisis management board configuration
crisis_board = BoardOfDirectorsSwarm(
name="Crisis_Management_Board",
description="Specialized board for crisis management and emergency response",
board_members=board_members,
agents=[crisis_coordinator_agent, communications_specialist_agent, financial_analyst_agent],
max_loops=2, # Faster response needed
verbose=True,
decision_threshold=0.6, # Lower threshold for urgent decisions
enable_voting=True,
enable_consensus=True,
max_workers=3,
output_type="dict"
)
# Execute crisis management
crisis_task = """
Our company is facing a major data breach. Develop an immediate response plan.
Include:
1. Immediate containment and mitigation steps
2. Communication strategy for stakeholders
3. Legal and regulatory compliance requirements
4. Financial impact assessment
5. Long-term recovery and prevention measures
6. Timeline and resource allocation
"""
result = crisis_board.run(task=crisis_task)
print("Crisis Management Results:")
print(json.dumps(result, indent=2))
```
## ⚙️ Configuration and Parameters
### BoardOfDirectorsSwarm Parameters
```python
# Complete parameter reference
board_swarm = BoardOfDirectorsSwarm(
# Basic Configuration
name="Board_Name", # Name of the board
description="Board description", # Description of the board's purpose
# Board Members and Agents
board_members=board_members, # List of BoardMember objects
agents=worker_agents, # List of worker Agent objects
# Execution Control
max_loops=3, # Maximum number of refinement loops
max_workers=4, # Maximum parallel workers
# Decision Making
decision_threshold=0.7, # Consensus threshold (0.0-1.0)
enable_voting=True, # Enable voting mechanisms
enable_consensus=True, # Enable consensus building
# Advanced Features
auto_assign_roles=True, # Auto-assign roles based on expertise
role_mapping={ # Custom role mapping
"financial_analysis": ["Treasurer", "Financial_Member"],
"strategic_planning": ["Chairman", "Executive_Director"]
},
# Consensus Configuration
consensus_timeout=300, # Consensus timeout in seconds
min_participation_rate=0.8, # Minimum participation rate
auto_fallback_to_chairman=True, # Chairman can make final decisions
consensus_rounds=3, # Maximum consensus building rounds
# Output Configuration
output_type="dict", # Output format: "dict", "str", "list"
verbose=True, # Enable detailed logging
# Quality Control
quality_threshold=0.8, # Quality threshold for outputs
enable_quality_gates=True, # Enable quality checkpoints
enable_peer_review=True, # Enable peer review mechanisms
# Performance Optimization
parallel_execution=True, # Enable parallel execution
enable_agent_pooling=True, # Enable agent pooling
timeout_per_agent=300, # Timeout per agent in seconds
# Monitoring and Logging
enable_logging=True, # Enable detailed logging
log_level="INFO", # Logging level
enable_metrics=True, # Enable performance metrics
enable_tracing=True # Enable request tracing
)
```
### Voting Configuration
```python
# Voting system configuration
voting_config = {
"method": "weighted_majority", # Voting method
"threshold": 0.75, # Consensus threshold
"weights": { # Role-based voting weights
"CHAIRMAN": 1.5,
"VICE_CHAIRMAN": 1.2,
"SECRETARY": 1.0,
"TREASURER": 1.0,
"EXECUTIVE_DIRECTOR": 1.5
},
"tie_breaker": "CHAIRMAN", # Tie breaker role
"allow_abstention": True, # Allow board members to abstain
"secret_ballot": False, # Use secret ballot voting
"transparent_process": True # Transparent voting process
}
```
### Quality Control Configuration
```python
# Quality control configuration
quality_config = {
"quality_gates": True, # Enable quality checkpoints
"quality_threshold": 0.8, # Quality threshold
"enable_peer_review": True, # Enable peer review
"review_required": True, # Require peer review
"output_validation": True, # Validate outputs
"enable_metrics_tracking": True, # Track quality metrics
# Quality metrics
"quality_metrics": {
"completeness": {"weight": 0.2, "threshold": 0.8},
"accuracy": {"weight": 0.25, "threshold": 0.85},
"feasibility": {"weight": 0.2, "threshold": 0.8},
"risk": {"weight": 0.15, "threshold": 0.7},
"impact": {"weight": 0.2, "threshold": 0.8}
}
}
```
## 📊 Performance Monitoring and Analytics
### Board Performance Metrics
```python
# Get comprehensive board performance metrics
board_summary = board_swarm.get_board_summary()
print("Board Summary:")
print(f"Board Name: {board_summary['board_name']}")
print(f"Total Board Members: {board_summary['total_members']}")
print(f"Total Worker Agents: {board_summary['total_agents']}")
print(f"Decision Threshold: {board_summary['decision_threshold']}")
print(f"Max Loops: {board_summary['max_loops']}")
# Display board member details
print("\nBoard Members:")
for member in board_summary['members']:
print(f"- {member['name']} (Role: {member['role']}, Weight: {member['voting_weight']})")
print(f" Expertise: {', '.join(member['expertise_areas'])}")
# Display worker agent details
print("\nWorker Agents:")
for agent in board_summary['agents']:
print(f"- {agent['name']}: {agent['description']}")
```
### Decision Analysis
```python
# Analyze decision-making patterns
if hasattr(result, 'get') and callable(result.get):
conversation_history = result.get('conversation_history', [])
print(f"\nDecision Analysis:")
print(f"Total Messages: {len(conversation_history)}")
# Count board member contributions
board_contributions = {}
for msg in conversation_history:
if 'Board' in msg.get('role', ''):
member_name = msg.get('agent_name', 'Unknown')
board_contributions[member_name] = board_contributions.get(member_name, 0) + 1
print(f"Board Member Contributions:")
for member, count in board_contributions.items():
print(f"- {member}: {count} contributions")
# Count agent executions
agent_executions = {}
for msg in conversation_history:
if any(agent.agent_name in msg.get('role', '') for agent in worker_agents):
agent_name = msg.get('agent_name', 'Unknown')
agent_executions[agent_name] = agent_executions.get(agent_name, 0) + 1
print(f"\nAgent Executions:")
for agent, count in agent_executions.items():
print(f"- {agent}: {count} executions")
```
### Performance Monitoring System
```python
# Performance monitoring system
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"execution_times": [],
"quality_scores": [],
"consensus_rounds": [],
"error_rates": []
}
def track_execution_time(self, phase, duration):
"""Track execution time for different phases"""
self.metrics["execution_times"].append({
"phase": phase,
"duration": duration,
"timestamp": datetime.now().isoformat()
})
def track_quality_score(self, score):
"""Track quality scores"""
self.metrics["quality_scores"].append({
"score": score,
"timestamp": datetime.now().isoformat()
})
def generate_performance_report(self):
"""Generate comprehensive performance report"""
return {
"average_execution_time": self.calculate_average_execution_time(),
"quality_trends": self.analyze_quality_trends(),
"consensus_efficiency": self.analyze_consensus_efficiency(),
"error_analysis": self.analyze_errors(),
"recommendations": self.generate_recommendations()
}
# Usage example
monitor = PerformanceMonitor()
# ... track metrics during execution ...
report = monitor.generate_performance_report()
print("Performance Report:")
print(json.dumps(report, indent=2))
```
## 🔧 Advanced Features and Customization
### Custom Board Templates
```python
from swarms.config.board_config import get_default_board_template
# Get pre-configured board templates
financial_board = get_default_board_template("financial_analysis")
strategic_board = get_default_board_template("strategic_planning")
tech_board = get_default_board_template("technology_assessment")
crisis_board = get_default_board_template("crisis_management")
# Custom board template
custom_template = {
"name": "Custom_Board",
"description": "Custom board for specific use case",
"board_members": [
{"role": "CHAIRMAN", "expertise": ["leadership", "strategy"]},
{"role": "VICE_CHAIRMAN", "expertise": ["operations", "coordination"]},
{"role": "SECRETARY", "expertise": ["documentation", "communication"]},
{"role": "TREASURER", "expertise": ["finance", "budgeting"]},
{"role": "EXECUTIVE_DIRECTOR", "expertise": ["strategy", "operations"]}
],
"agents": [
{"name": "Research_Agent", "expertise": ["research", "analysis"]},
{"name": "Technical_Agent", "expertise": ["technical", "implementation"]}
],
"config": {
"max_loops": 3,
"decision_threshold": 0.7,
"enable_voting": True,
"enable_consensus": True
}
}
```
### Dynamic Role Assignment
```python
# Automatically assign roles based on task requirements
board_swarm = BoardOfDirectorsSwarm(
board_members=board_members,
agents=agents,
auto_assign_roles=True,
role_mapping={
"financial_analysis": ["Treasurer", "Financial_Member"],
"strategic_planning": ["Chairman", "Executive_Director"],
"technical_assessment": ["Technical_Member", "Executive_Director"],
"research_analysis": ["Research_Member", "Secretary"],
"crisis_management": ["Chairman", "Vice_Chairman", "Communications_Member"]
}
)
```
### Consensus Optimization
```python
# Advanced consensus-building mechanisms
board_swarm = BoardOfDirectorsSwarm(
board_members=board_members,
agents=agents,
enable_consensus=True,
consensus_timeout=300, # 5 minutes timeout
min_participation_rate=0.8, # 80% minimum participation
auto_fallback_to_chairman=True, # Chairman can make final decisions
consensus_rounds=3, # Maximum consensus building rounds
consensus_method="weighted_majority", # Consensus method
enable_mediation=True, # Enable mediation for conflicts
mediation_timeout=120 # Mediation timeout in seconds
)
```
## 🛠️ Troubleshooting and Debugging
### Common Issues and Solutions
1. **Consensus Failures**
- **Issue**: Board cannot reach consensus within loop limit
- **Solution**: Lower voting threshold, increase max_loops, or adjust voting weights
```python
board_swarm = BoardOfDirectorsSwarm(
decision_threshold=0.6, # Lower threshold
max_loops=5, # More loops
consensus_timeout=600 # Longer timeout
)
```
2. **Agent Timeout**
- **Issue**: Individual agents take too long to respond
- **Solution**: Increase timeout settings or optimize agent prompts
```python
board_swarm = BoardOfDirectorsSwarm(
timeout_per_agent=600, # 10 minutes per agent
enable_agent_pooling=True # Use agent pooling
)
```
3. **Poor Quality Output**
- **Issue**: Final output doesn't meet quality standards
- **Solution**: Enable quality gates, increase max_loops, or improve agent prompts
```python
board_swarm = BoardOfDirectorsSwarm(
enable_quality_gates=True,
quality_threshold=0.8,
enable_peer_review=True,
max_loops=4
)
```
4. **Resource Exhaustion**
- **Issue**: System runs out of resources during execution
- **Solution**: Implement resource limits, use agent pooling, or optimize parallel execution
```python
board_swarm = BoardOfDirectorsSwarm(
max_workers=2, # Limit parallel workers
enable_agent_pooling=True,
parallel_execution=False # Disable parallel execution
)
```
### Debugging Techniques
```python
# Debugging configuration
debug_config = BoardConfig(
max_loops=1, # Limit loops for debugging
enable_logging=True,
log_level="DEBUG",
enable_tracing=True,
debug_mode=True
)
# Create debug swarm
debug_swarm = BoardOfDirectorsSwarm(
agents=agents,
config=debug_config
)
# Execute with debugging
try:
result = debug_swarm.run(task)
except Exception as e:
print(f"Error: {e}")
print(f"Debug info: {debug_swarm.get_debug_info()}")
# Enable detailed logging
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create swarm with logging enabled
logging_swarm = BoardOfDirectorsSwarm(
agents=agents,
config=BoardConfig(
enable_logging=True,
log_level="DEBUG",
enable_metrics=True,
enable_tracing=True
)
)
```
## 📋 Use Cases
### Corporate Governance
- **Strategic Planning**: Long-term business strategy development
- **Risk Management**: Comprehensive risk assessment and mitigation
- **Resource Allocation**: Optimal distribution of company resources
- **Performance Oversight**: Monitoring and evaluating organizational performance
### Financial Analysis
- **Portfolio Management**: Investment portfolio optimization and rebalancing
- **Market Analysis**: Comprehensive market research and trend analysis
- **Risk Assessment**: Financial risk evaluation and management
- **Compliance Monitoring**: Regulatory compliance and audit preparation
### Research & Development
- **Technology Assessment**: Evaluation of emerging technologies
- **Product Development**: Strategic product planning and development
- **Innovation Management**: Managing innovation pipelines and initiatives
- **Quality Assurance**: Ensuring high standards across development processes
### Project Management
- **Complex Project Planning**: Multi-faceted project strategy development
- **Resource Optimization**: Efficient allocation of project resources
- **Stakeholder Management**: Coordinating diverse stakeholder interests
- **Risk Mitigation**: Identifying and addressing project risks
### Crisis Management
- **Emergency Response**: Rapid response to critical situations
- **Stakeholder Communication**: Managing communications during crises
- **Recovery Planning**: Developing recovery and prevention strategies
- **Legal Compliance**: Ensuring compliance during crisis situations
## 🎯 Success Criteria
A successful Board of Directors implementation should demonstrate:
- ✅ **Democratic Decision Making**: All board members contribute to decisions
- ✅ **Consensus Achievement**: Decisions reached through collaborative processes
- ✅ **Role Effectiveness**: Each board member fulfills their responsibilities
- ✅ **Agent Coordination**: Worker agents execute tasks efficiently
- ✅ **Quality Output**: High-quality results through collective intelligence
- ✅ **Process Transparency**: Clear visibility into decision-making processes
- ✅ **Performance Optimization**: Efficient resource utilization and execution
- ✅ **Continuous Improvement**: Learning from each execution cycle
## 📚 Best Practices
### 1. Role Definition
- Clearly define responsibilities for each board member
- Ensure expertise areas align with organizational needs
- Balance voting weights based on role importance
- Document role interactions and communication protocols
### 2. Task Formulation
- Provide clear, specific task descriptions
- Include relevant context and constraints
- Specify expected outputs and deliverables
- Define quality criteria and success metrics
### 3. Consensus Building
- Allow adequate time for discussion and consensus
- Encourage diverse perspectives and viewpoints
- Use structured decision-making processes
- Implement conflict resolution mechanisms
### 4. Performance Monitoring
- Track decision quality and outcomes
- Monitor board member participation
- Analyze agent utilization and effectiveness
- Implement continuous improvement processes
### 5. Resource Management
- Optimize agent allocation and utilization
- Implement parallel execution where appropriate
- Monitor resource usage and performance
- Scale resources based on task complexity
---
The Board of Directors architecture represents a sophisticated approach to multi-agent collaboration, enabling organizations to leverage collective intelligence through structured governance and democratic decision-making processes. This comprehensive implementation provides the tools and frameworks needed to build effective, scalable, and intelligent decision-making systems.

@ -6,96 +6,33 @@ The `Conversation` class is a powerful and flexible tool for managing conversati
### Key Features
- **Multiple Storage Backends**: Support for various storage solutions:
- In-memory: Fast, temporary storage for testing and development
- Supabase: PostgreSQL-based cloud storage with real-time capabilities
- Redis: High-performance caching and persistence
- SQLite: Local file-based storage
- DuckDB: Analytical workloads and columnar storage
- Pulsar: Event streaming for distributed systems
- Mem0: Memory-based storage with mem0 integration
- **Token Management**:
- Built-in token counting with configurable models
- Automatic token tracking for input/output messages
- Token usage analytics and reporting
- Context length management
- **Metadata and Categories**:
- Support for message metadata
- Message categorization (input/output)
- Role-based message tracking
- Custom message IDs
- **Data Export/Import**:
- JSON and YAML export formats
- Automatic saving and loading
- Conversation history management
- Batch operations support
- **Advanced Features**:
- Message search and filtering
- Conversation analytics
- Multi-agent support
- Error handling and fallbacks
- Type hints and validation
| Feature Category | Features / Description |
|----------------------------|-------------------------------------------------------------------------------------------------------------|
| **Multiple Storage Backends** | - In-memory: Fast, temporary storage for testing and development<br>- Supabase: PostgreSQL-based cloud storage with real-time capabilities<br>- Redis: High-performance caching and persistence<br>- SQLite: Local file-based storage<br>- DuckDB: Analytical workloads and columnar storage<br>- Pulsar: Event streaming for distributed systems<br>- Mem0: Memory-based storage with mem0 integration |
| **Token Management** | - Built-in token counting with configurable models<br>- Automatic token tracking for input/output messages<br>- Token usage analytics and reporting<br>- Context length management |
| **Metadata and Categories** | - Support for message metadata<br>- Message categorization (input/output)<br>- Role-based message tracking<br>- Custom message IDs |
| **Data Export/Import** | - JSON and YAML export formats<br>- Automatic saving and loading<br>- Conversation history management<br>- Batch operations support |
| **Advanced Features** | - Message search and filtering<br>- Conversation analytics<br>- Multi-agent support<br>- Error handling and fallbacks<br>- Type hints and validation |
### Use Cases
1. **Chatbot Development**:
- Store and manage conversation history
- Track token usage and context length
- Analyze conversation patterns
2. **Multi-Agent Systems**:
- Coordinate multiple AI agents
- Track agent interactions
- Store agent outputs and metadata
3. **Analytics Applications**:
- Track conversation metrics
- Generate usage reports
- Analyze user interactions
4. **Production Systems**:
- Persistent storage with various backends
- Error handling and recovery
- Scalable conversation management
5. **Development and Testing**:
- Fast in-memory storage
- Debugging support
- Easy export/import of test data
| Use Case | Features / Description |
|----------------------------|--------------------------------------------------------------------------------------------------------|
| **Chatbot Development** | - Store and manage conversation history<br>- Track token usage and context length<br>- Analyze conversation patterns |
| **Multi-Agent Systems** | - Coordinate multiple AI agents<br>- Track agent interactions<br>- Store agent outputs and metadata |
| **Analytics Applications** | - Track conversation metrics<br>- Generate usage reports<br>- Analyze user interactions |
| **Production Systems** | - Persistent storage with various backends<br>- Error handling and recovery<br>- Scalable conversation management |
| **Development and Testing**| - Fast in-memory storage<br>- Debugging support<br>- Easy export/import of test data |
### Best Practices
1. **Storage Selection**:
- Use in-memory for testing and development
- Choose Supabase for multi-user cloud applications
- Use Redis for high-performance requirements
- Select SQLite for single-user local applications
- Pick DuckDB for analytical workloads
- Opt for Pulsar in distributed systems
2. **Token Management**:
- Enable token counting for production use
- Set appropriate context lengths
- Monitor token usage with export_and_count_categories()
3. **Error Handling**:
- Implement proper fallback mechanisms
- Use type hints for better code reliability
- Monitor and log errors appropriately
4. **Data Management**:
- Use appropriate export formats (JSON/YAML)
- Implement regular backup strategies
- Clean up old conversations when needed
5. **Security**:
- Use environment variables for sensitive credentials
- Implement proper access controls
- Validate input data
| Category | Best Practices |
|---------------------|------------------------------------------------------------------------------------------------------------------------|
| **Storage Selection** | - Use in-memory for testing and development<br>- Choose Supabase for multi-user cloud applications<br>- Use Redis for high-performance requirements<br>- Select SQLite for single-user local applications<br>- Pick DuckDB for analytical workloads<br>- Opt for Pulsar in distributed systems |
| **Token Management** | - Enable token counting for production use<br>- Set appropriate context lengths<br>- Monitor token usage with `export_and_count_categories()` |
| **Error Handling** | - Implement proper fallback mechanisms<br>- Use type hints for better code reliability<br>- Monitor and log errors appropriately |
| **Data Management** | - Use appropriate export formats (JSON/YAML)<br>- Implement regular backup strategies<br>- Clean up old conversations when needed |
| **Security** | - Use environment variables for sensitive credentials<br>- Implement proper access controls<br>- Validate input data |
## Table of Contents
@ -113,13 +50,15 @@ The `Conversation` class is designed to manage conversations by keeping track of
**New in this version**: The class now supports multiple storage backends for persistent conversation storage:
- **"in-memory"**: Default memory-based storage (no persistence)
- **"mem0"**: Memory-based storage with mem0 integration (requires: `pip install mem0ai`)
- **"supabase"**: PostgreSQL-based storage using Supabase (requires: `pip install supabase`)
- **"redis"**: Redis-based storage (requires: `pip install redis`)
- **"sqlite"**: SQLite-based storage (built-in to Python)
- **"duckdb"**: DuckDB-based storage (requires: `pip install duckdb`)
- **"pulsar"**: Apache Pulsar messaging backend (requires: `pip install pulsar-client`)
| Backend | Description | Requirements |
|--------------|-------------------------------------------------------------------------------------------------------------|------------------------------------|
| **in-memory**| Default memory-based storage (no persistence) | None (built-in) |
| **mem0** | Memory-based storage with mem0 integration | `pip install mem0ai` |
| **supabase** | PostgreSQL-based storage using Supabase | `pip install supabase` |
| **redis** | Redis-based storage | `pip install redis` |
| **sqlite** | SQLite-based storage (local file) | None (built-in) |
| **duckdb** | DuckDB-based storage (analytical workloads, columnar storage) | `pip install duckdb` |
| **pulsar** | Apache Pulsar messaging backend | `pip install pulsar-client` |
All backends use **lazy loading** - database dependencies are only imported when the specific backend is instantiated. Each backend provides helpful error messages if required packages are not installed.
@ -132,7 +71,6 @@ All backends use **lazy loading** - database dependencies are only imported when
| system_prompt | Optional[str] | System prompt for the conversation |
| time_enabled | bool | Flag to enable time tracking for messages |
| autosave | bool | Flag to enable automatic saving |
| save_enabled | bool | Flag to control if saving is enabled |
| save_filepath | str | File path for saving conversation history |
| load_filepath | str | File path for loading conversation history |
| conversation_history | list | List storing conversation messages |

@ -0,0 +1,247 @@
from loguru import logger
import yfinance as yf
import json
def get_figma_stock_data(stock: str) -> str:
"""
Fetches comprehensive stock data for Figma (FIG) using Yahoo Finance.
Returns:
Dict[str, Any]: A dictionary containing comprehensive Figma stock data including:
- Current price and market data
- Company information
- Financial metrics
- Historical data summary
- Trading statistics
Raises:
Exception: If there's an error fetching the data from Yahoo Finance
"""
try:
# Initialize Figma stock ticker
figma = yf.Ticker(stock)
# Get current stock info
info = figma.info
# Get recent historical data (last 30 days)
hist = figma.history(period="30d")
# Get real-time fast info
fast_info = figma.fast_info
# Compile comprehensive data
figma_data = {
"company_info": {
"name": info.get("longName", "Figma Inc."),
"symbol": "FIG",
"sector": info.get("sector", "N/A"),
"industry": info.get("industry", "N/A"),
"website": info.get("website", "N/A"),
"description": info.get("longBusinessSummary", "N/A"),
},
"current_market_data": {
"current_price": info.get("currentPrice", "N/A"),
"previous_close": info.get("previousClose", "N/A"),
"open": info.get("open", "N/A"),
"day_low": info.get("dayLow", "N/A"),
"day_high": info.get("dayHigh", "N/A"),
"volume": info.get("volume", "N/A"),
"market_cap": info.get("marketCap", "N/A"),
"price_change": (
info.get("currentPrice", 0)
- info.get("previousClose", 0)
if info.get("currentPrice")
and info.get("previousClose")
else "N/A"
),
"price_change_percent": info.get(
"regularMarketChangePercent", "N/A"
),
},
"financial_metrics": {
"pe_ratio": info.get("trailingPE", "N/A"),
"forward_pe": info.get("forwardPE", "N/A"),
"price_to_book": info.get("priceToBook", "N/A"),
"price_to_sales": info.get(
"priceToSalesTrailing12Months", "N/A"
),
"enterprise_value": info.get(
"enterpriseValue", "N/A"
),
"beta": info.get("beta", "N/A"),
"dividend_yield": info.get("dividendYield", "N/A"),
"payout_ratio": info.get("payoutRatio", "N/A"),
},
"trading_statistics": {
"fifty_day_average": info.get(
"fiftyDayAverage", "N/A"
),
"two_hundred_day_average": info.get(
"twoHundredDayAverage", "N/A"
),
"fifty_two_week_low": info.get(
"fiftyTwoWeekLow", "N/A"
),
"fifty_two_week_high": info.get(
"fiftyTwoWeekHigh", "N/A"
),
"shares_outstanding": info.get(
"sharesOutstanding", "N/A"
),
"float_shares": info.get("floatShares", "N/A"),
"shares_short": info.get("sharesShort", "N/A"),
"short_ratio": info.get("shortRatio", "N/A"),
},
"recent_performance": {
"last_30_days": {
"start_price": (
hist.iloc[0]["Close"]
if not hist.empty
else "N/A"
),
"end_price": (
hist.iloc[-1]["Close"]
if not hist.empty
else "N/A"
),
"total_return": (
(
hist.iloc[-1]["Close"]
- hist.iloc[0]["Close"]
)
/ hist.iloc[0]["Close"]
* 100
if not hist.empty
else "N/A"
),
"highest_price": (
hist["High"].max()
if not hist.empty
else "N/A"
),
"lowest_price": (
hist["Low"].min() if not hist.empty else "N/A"
),
"average_volume": (
hist["Volume"].mean()
if not hist.empty
else "N/A"
),
}
},
"real_time_data": {
"last_price": (
fast_info.last_price
if hasattr(fast_info, "last_price")
else "N/A"
),
"last_volume": (
fast_info.last_volume
if hasattr(fast_info, "last_volume")
else "N/A"
),
"bid": (
fast_info.bid
if hasattr(fast_info, "bid")
else "N/A"
),
"ask": (
fast_info.ask
if hasattr(fast_info, "ask")
else "N/A"
),
"bid_size": (
fast_info.bid_size
if hasattr(fast_info, "bid_size")
else "N/A"
),
"ask_size": (
fast_info.ask_size
if hasattr(fast_info, "ask_size")
else "N/A"
),
},
}
logger.info("Successfully fetched Figma (FIG) stock data")
return json.dumps(figma_data, indent=4)
except Exception as e:
logger.error(f"Error fetching Figma stock data: {e}")
raise Exception(f"Failed to fetch Figma stock data: {e}")
# # Example usage
# # Initialize the quantitative trading agent
# agent = Agent(
# agent_name="Quantitative-Trading-Agent",
# agent_description="Advanced quantitative trading and algorithmic analysis agent specializing in stock analysis and trading strategies",
# system_prompt=f"""You are an expert quantitative trading agent with deep expertise in:
# - Algorithmic trading strategies and implementation
# - Statistical arbitrage and market making
# - Risk management and portfolio optimization
# - High-frequency trading systems
# - Market microstructure analysis
# - Quantitative research methodologies
# - Financial mathematics and stochastic processes
# - Machine learning applications in trading
# - Technical analysis and chart patterns
# - Fundamental analysis and valuation models
# - Options trading and derivatives
# - Market sentiment analysis
# Your core responsibilities include:
# 1. Developing and backtesting trading strategies
# 2. Analyzing market data and identifying alpha opportunities
# 3. Implementing risk management frameworks
# 4. Optimizing portfolio allocations
# 5. Conducting quantitative research
# 6. Monitoring market microstructure
# 7. Evaluating trading system performance
# 8. Performing comprehensive stock analysis
# 9. Generating trading signals and recommendations
# 10. Risk assessment and position sizing
# When analyzing stocks, you should:
# - Evaluate technical indicators and chart patterns
# - Assess fundamental metrics and valuation ratios
# - Analyze market sentiment and momentum
# - Consider macroeconomic factors
# - Provide risk-adjusted return projections
# - Suggest optimal entry/exit points
# - Calculate position sizing recommendations
# - Identify potential catalysts and risks
# You maintain strict adherence to:
# - Mathematical rigor in all analyses
# - Statistical significance in strategy development
# - Risk-adjusted return optimization
# - Market impact minimization
# - Regulatory compliance
# - Transaction cost analysis
# - Performance attribution
# - Data-driven decision making
# You communicate in precise, technical terms while maintaining clarity for stakeholders.
# Data: {get_figma_stock_data('FIG')}
# """,
# max_loops=1,
# model_name="gpt-4o-mini",
# dynamic_temperature_enabled=True,
# output_type="str-all-except-first",
# streaming_on=True,
# print_on=True,
# telemetry_enable=False,
# )
# # Example 1: Basic usage with just a task
# logger.info("Starting quantitative analysis cron job for Figma (FIG)")
# cron_job = CronJob(agent=agent, interval="10seconds")
# cron_job.run(
# task="Analyze the Figma (FIG) stock comprehensively using the available stock data. Provide a detailed quantitative analysis"
# )
print(get_figma_stock_data("FIG"))

@ -0,0 +1,105 @@
"""
Example script demonstrating how to fetch Figma (FIG) stock data using swarms_tools Yahoo Finance API.
This shows the alternative approach using the existing swarms_tools package.
"""
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms_tools import yahoo_finance_api
from loguru import logger
import json
def get_figma_data_with_swarms_tools():
"""
Fetches Figma stock data using the swarms_tools Yahoo Finance API.
Returns:
dict: Figma stock data from swarms_tools
"""
try:
logger.info("Fetching Figma stock data using swarms_tools...")
figma_data = yahoo_finance_api(["FIG"])
return figma_data
except Exception as e:
logger.error(f"Error fetching data with swarms_tools: {e}")
raise
def analyze_figma_with_agent():
"""
Uses a Swarms agent to analyze Figma stock data.
"""
try:
# Initialize the agent with Yahoo Finance tool
agent = Agent(
agent_name="Figma-Analysis-Agent",
agent_description="Specialized agent for analyzing Figma stock data",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4o-mini",
tools=[yahoo_finance_api],
dynamic_temperature_enabled=True,
)
# Ask the agent to analyze Figma
analysis = agent.run(
"Analyze the current stock data for Figma (FIG) and provide insights on its performance, valuation metrics, and recent trends."
)
return analysis
except Exception as e:
logger.error(f"Error in agent analysis: {e}")
raise
def main():
"""
Main function to demonstrate different approaches for Figma stock data.
"""
logger.info("Starting Figma stock analysis with swarms_tools")
try:
# Method 1: Direct API call
print("\n" + "=" * 60)
print("METHOD 1: Direct swarms_tools API call")
print("=" * 60)
figma_data = get_figma_data_with_swarms_tools()
print("Raw data from swarms_tools:")
print(json.dumps(figma_data, indent=2, default=str))
# Method 2: Agent-based analysis
print("\n" + "=" * 60)
print("METHOD 2: Agent-based analysis")
print("=" * 60)
analysis = analyze_figma_with_agent()
print("Agent analysis:")
print(analysis)
# Method 3: Comparison with custom function
print("\n" + "=" * 60)
print("METHOD 3: Comparison with custom function")
print("=" * 60)
from cron_job_examples.cron_job_example import (
get_figma_stock_data_simple,
)
custom_data = get_figma_stock_data_simple()
print("Custom function output:")
print(custom_data)
logger.info("All methods completed successfully!")
except Exception as e:
logger.error(f"Error in main function: {e}")
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,79 @@
"""
Example script demonstrating how to fetch Figma (FIG) stock data using Yahoo Finance.
"""
from cron_job_examples.cron_job_example import (
get_figma_stock_data,
get_figma_stock_data_simple,
)
from loguru import logger
import json
def main():
"""
Main function to demonstrate Figma stock data fetching.
"""
logger.info("Starting Figma stock data demonstration")
try:
# Example 1: Get comprehensive data as dictionary
logger.info("Fetching comprehensive Figma stock data...")
figma_data = get_figma_stock_data()
# Print the data in a structured format
print("\n" + "=" * 50)
print("COMPREHENSIVE FIGMA STOCK DATA")
print("=" * 50)
print(json.dumps(figma_data, indent=2, default=str))
# Example 2: Get simple formatted data
logger.info("Fetching simple formatted Figma stock data...")
simple_data = get_figma_stock_data_simple()
print("\n" + "=" * 50)
print("SIMPLE FORMATTED FIGMA STOCK DATA")
print("=" * 50)
print(simple_data)
# Example 3: Access specific data points
logger.info("Accessing specific data points...")
current_price = figma_data["current_market_data"][
"current_price"
]
market_cap = figma_data["current_market_data"]["market_cap"]
pe_ratio = figma_data["financial_metrics"]["pe_ratio"]
print("\nKey Metrics:")
print(f"Current Price: ${current_price}")
print(f"Market Cap: ${market_cap:,}")
print(f"P/E Ratio: {pe_ratio}")
# Example 4: Check if stock is performing well
price_change = figma_data["current_market_data"][
"price_change"
]
if isinstance(price_change, (int, float)):
if price_change > 0:
print(
f"\n📈 Figma stock is up ${price_change:.2f} today!"
)
elif price_change < 0:
print(
f"\n📉 Figma stock is down ${abs(price_change):.2f} today."
)
else:
print("\n➡️ Figma stock is unchanged today.")
logger.info(
"Figma stock data demonstration completed successfully!"
)
except Exception as e:
logger.error(f"Error in main function: {e}")
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,257 @@
from swarms import Agent, CronJob
from loguru import logger
import requests
import json
from datetime import datetime
def get_solana_price() -> str:
"""
Fetches comprehensive Solana (SOL) price data using CoinGecko API.
Returns:
str: A JSON formatted string containing Solana's current price and market data including:
- Current price in USD
- Market cap
- 24h volume
- 24h price change
- Last updated timestamp
Raises:
Exception: If there's an error fetching the data from CoinGecko API
"""
try:
# CoinGecko API endpoint for simple price data
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": "solana", # Solana's CoinGecko ID
"vs_currencies": "usd",
"include_market_cap": True,
"include_24hr_vol": True,
"include_24hr_change": True,
"include_last_updated_at": True,
}
# Make API request with timeout
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
# Parse response data
data = response.json()
if "solana" not in data:
raise Exception("Solana data not found in API response")
solana_data = data["solana"]
# Compile comprehensive data
solana_info = {
"timestamp": datetime.now().isoformat(),
"coin_info": {
"name": "Solana",
"symbol": "SOL",
"coin_id": "solana",
},
"price_data": {
"current_price_usd": solana_data.get("usd", "N/A"),
"market_cap_usd": solana_data.get(
"usd_market_cap", "N/A"
),
"volume_24h_usd": solana_data.get(
"usd_24h_vol", "N/A"
),
"price_change_24h_percent": solana_data.get(
"usd_24h_change", "N/A"
),
"last_updated_at": solana_data.get(
"last_updated_at", "N/A"
),
},
"formatted_data": {
"price_formatted": (
f"${solana_data.get('usd', 'N/A'):,.2f}"
if solana_data.get("usd")
else "N/A"
),
"market_cap_formatted": (
f"${solana_data.get('usd_market_cap', 'N/A'):,.0f}"
if solana_data.get("usd_market_cap")
else "N/A"
),
"volume_formatted": (
f"${solana_data.get('usd_24h_vol', 'N/A'):,.0f}"
if solana_data.get("usd_24h_vol")
else "N/A"
),
"change_formatted": (
f"{solana_data.get('usd_24h_change', 'N/A'):+.2f}%"
if solana_data.get("usd_24h_change") is not None
else "N/A"
),
},
}
logger.info(
f"Successfully fetched Solana price: ${solana_data.get('usd', 'N/A')}"
)
return json.dumps(solana_info, indent=4)
except requests.RequestException as e:
error_msg = f"API request failed: {e}"
logger.error(error_msg)
return json.dumps(
{
"error": error_msg,
"timestamp": datetime.now().isoformat(),
"status": "failed",
},
indent=4,
)
except Exception as e:
error_msg = f"Error fetching Solana price data: {e}"
logger.error(error_msg)
return json.dumps(
{
"error": error_msg,
"timestamp": datetime.now().isoformat(),
"status": "failed",
},
indent=4,
)
def analyze_solana_data(data: str) -> str:
"""
Analyzes Solana price data and provides insights.
Args:
data (str): JSON string containing Solana price data
Returns:
str: Analysis and insights about the current Solana market data
"""
try:
# Parse the data
solana_data = json.loads(data)
if "error" in solana_data:
return f"❌ Error in data: {solana_data['error']}"
price_data = solana_data.get("price_data", {})
formatted_data = solana_data.get("formatted_data", {})
# Extract key metrics
current_price = price_data.get("current_price_usd")
price_change = price_data.get("price_change_24h_percent")
volume_24h = price_data.get("volume_24h_usd")
market_cap = price_data.get("market_cap_usd")
# Generate analysis
analysis = f"""
🔍 **Solana (SOL) Market Analysis** - {solana_data.get('timestamp', 'N/A')}
💰 **Current Price**: {formatted_data.get('price_formatted', 'N/A')}
📊 **24h Change**: {formatted_data.get('change_formatted', 'N/A')}
💎 **Market Cap**: {formatted_data.get('market_cap_formatted', 'N/A')}
📈 **24h Volume**: {formatted_data.get('volume_formatted', 'N/A')}
"""
# Add sentiment analysis based on price change
if price_change is not None:
if price_change > 5:
analysis += "🚀 **Sentiment**: Strongly Bullish - Significant positive momentum\n"
elif price_change > 1:
analysis += "📈 **Sentiment**: Bullish - Positive price action\n"
elif price_change > -1:
analysis += (
"➡️ **Sentiment**: Neutral - Sideways movement\n"
)
elif price_change > -5:
analysis += "📉 **Sentiment**: Bearish - Negative price action\n"
else:
analysis += "🔻 **Sentiment**: Strongly Bearish - Significant decline\n"
# Add volume analysis
if volume_24h and market_cap:
try:
volume_market_cap_ratio = (
volume_24h / market_cap
) * 100
if volume_market_cap_ratio > 10:
analysis += "🔥 **Volume**: High trading activity - Strong market interest\n"
elif volume_market_cap_ratio > 5:
analysis += (
"📊 **Volume**: Moderate trading activity\n"
)
else:
analysis += "😴 **Volume**: Low trading activity - Limited market movement\n"
except (TypeError, ZeroDivisionError):
analysis += "📊 **Volume**: Unable to calculate volume/market cap ratio\n"
analysis += f"\n⏰ **Last Updated**: {price_data.get('last_updated_at', 'N/A')}"
return analysis
except json.JSONDecodeError as e:
return f"❌ Error parsing data: {e}"
except Exception as e:
return f"❌ Error analyzing data: {e}"
# Initialize the Solana analysis agent
agent = Agent(
agent_name="Solana-Price-Analyzer",
agent_description="Specialized agent for analyzing Solana (SOL) cryptocurrency price data and market trends",
system_prompt=f"""You are an expert cryptocurrency analyst specializing in Solana (SOL) analysis. Your expertise includes:
- Technical analysis and chart patterns
- Market sentiment analysis
- Volume and liquidity analysis
- Price action interpretation
- Market cap and valuation metrics
- Cryptocurrency market dynamics
- DeFi ecosystem analysis
- Blockchain technology trends
When analyzing Solana data, you should:
- Evaluate price movements and trends
- Assess market sentiment and momentum
- Consider volume and liquidity factors
- Analyze market cap positioning
- Provide actionable insights
- Identify potential catalysts or risks
- Consider broader market context
You communicate clearly and provide practical analysis that helps users understand Solana's current market position and potential future movements.
Current Solana Data: {get_solana_price()}
""",
max_loops=1,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
streaming_on=False, # need to fix this bug where streaming is working but makes copies of the border when you scroll on the terminal
print_on=True,
telemetry_enable=False,
)
def main():
"""
Main function to run the Solana price tracking cron job.
"""
logger.info("🚀 Starting Solana price tracking cron job")
logger.info("📊 Fetching Solana price every 10 seconds...")
# Create cron job that runs every 10 seconds
cron_job = CronJob(agent=agent, interval="30seconds")
# Run the cron job with analysis task
cron_job.run(
task="Analyze the current Solana (SOL) price data comprehensively. Provide detailed market analysis including price trends, volume analysis, market sentiment, and actionable insights. Format your response clearly with emojis and structured sections."
)
if __name__ == "__main__":
main()

@ -0,0 +1,197 @@
"""
Board of Directors Example
This example demonstrates how to use the Board of Directors swarm feature
in the Swarms Framework. It shows how to create a board, configure it,
and use it to orchestrate tasks across multiple agents.
To run this example:
1. Make sure you're in the root directory of the swarms project
2. Run: python examples/multi_agent/board_of_directors/board_of_directors_example.py
"""
import os
import sys
from typing import List
# Add the root directory to the Python path if running from examples directory
current_dir = os.path.dirname(os.path.abspath(__file__))
if 'examples' in current_dir:
root_dir = current_dir
while os.path.basename(root_dir) != 'examples' and root_dir != os.path.dirname(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.basename(root_dir) == 'examples':
root_dir = os.path.dirname(root_dir)
if root_dir not in sys.path:
sys.path.insert(0, root_dir)
from swarms.structs.board_of_directors_swarm import (
BoardOfDirectorsSwarm,
BoardMember,
BoardMemberRole,
)
from swarms.structs.agent import Agent
def create_board_members() -> List[BoardMember]:
"""Create board members with specific roles."""
chairman = Agent(
agent_name="Chairman",
agent_description="Executive Chairman with strategic vision",
model_name="gpt-4o-mini",
max_loops=1,
system_prompt="You are the Executive Chairman. Provide strategic leadership and facilitate decision-making.",
)
cto = Agent(
agent_name="CTO",
agent_description="Chief Technology Officer with technical expertise",
model_name="gpt-4o-mini",
max_loops=1,
system_prompt="You are the CTO. Provide technical leadership and evaluate technology solutions.",
)
cfo = Agent(
agent_name="CFO",
agent_description="Chief Financial Officer with financial expertise",
model_name="gpt-4o-mini",
max_loops=1,
system_prompt="You are the CFO. Provide financial analysis and ensure fiscal responsibility.",
)
return [
BoardMember(
agent=chairman,
role=BoardMemberRole.CHAIRMAN,
voting_weight=2.0,
expertise_areas=["leadership", "strategy"]
),
BoardMember(
agent=cto,
role=BoardMemberRole.EXECUTIVE_DIRECTOR,
voting_weight=1.5,
expertise_areas=["technology", "innovation"]
),
BoardMember(
agent=cfo,
role=BoardMemberRole.EXECUTIVE_DIRECTOR,
voting_weight=1.5,
expertise_areas=["finance", "risk_management"]
),
]
def create_worker_agents() -> List[Agent]:
"""Create worker agents for the swarm."""
researcher = Agent(
agent_name="Researcher",
agent_description="Research analyst for data analysis",
model_name="gpt-4o-mini",
max_loops=1,
system_prompt="You are a Research Analyst. Conduct thorough research and provide data-driven insights.",
)
developer = Agent(
agent_name="Developer",
agent_description="Software developer for implementation",
model_name="gpt-4o-mini",
max_loops=1,
system_prompt="You are a Software Developer. Design and implement software solutions.",
)
marketer = Agent(
agent_name="Marketer",
agent_description="Marketing specialist for strategy",
model_name="gpt-4o-mini",
max_loops=1,
system_prompt="You are a Marketing Specialist. Develop marketing strategies and campaigns.",
)
return [researcher, developer, marketer]
def run_board_example() -> None:
"""Run a Board of Directors example."""
# Create board members and worker agents
board_members = create_board_members()
worker_agents = create_worker_agents()
# Create the Board of Directors swarm
board_swarm = BoardOfDirectorsSwarm(
name="Executive_Board",
board_members=board_members,
agents=worker_agents,
max_loops=2,
verbose=True,
decision_threshold=0.6,
)
# Define task
task = """
Develop a strategy for launching a new AI-powered product in the market.
Include market research, technical planning, marketing strategy, and financial projections.
"""
# Execute the task
result = board_swarm.run(task=task)
print("Task completed successfully!")
print(f"Result: {result}")
def run_simple_example() -> None:
"""Run a simple Board of Directors example."""
# Create simple agents
analyst = Agent(
agent_name="Analyst",
agent_description="Data analyst",
model_name="gpt-4o-mini",
max_loops=1,
)
writer = Agent(
agent_name="Writer",
agent_description="Content writer",
model_name="gpt-4o-mini",
max_loops=1,
)
# Create swarm with default settings
board_swarm = BoardOfDirectorsSwarm(
name="Simple_Board",
agents=[analyst, writer],
verbose=True,
)
# Execute simple task
task = "Analyze current market trends and create a summary report."
result = board_swarm.run(task=task)
print("Simple example completed!")
print(f"Result: {result}")
def main() -> None:
"""Main function to run the examples."""
if not os.getenv("OPENAI_API_KEY"):
print("Warning: OPENAI_API_KEY not set. Example may not work.")
return
try:
print("Running simple Board of Directors example...")
run_simple_example()
print("\nRunning comprehensive Board of Directors example...")
run_board_example()
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -1,5 +1,4 @@
from swarms import Agent
from swarms.structs.graph_workflow import GraphWorkflow
from swarms import Agent, GraphWorkflow
from swarms.prompts.multi_agent_collab_prompt import (
MULTI_AGENT_COLLAB_PROMPT_TWO,
)
@ -11,6 +10,7 @@ agent1 = Agent(
max_loops=1,
system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO, # Set collaboration prompt
)
agent2 = Agent(
agent_name="ResearchAgent2",
model_name="gpt-4.1",
@ -19,7 +19,11 @@ agent2 = Agent(
)
# Build the workflow with only agents as nodes
workflow = GraphWorkflow()
workflow = GraphWorkflow(
name="Research Workflow",
description="A workflow for researching the best arbitrage trading strategies for altcoins",
auto_compile=True,
)
workflow.add_node(agent1)
workflow.add_node(agent2)
@ -27,27 +31,15 @@ workflow.add_node(agent2)
workflow.add_edge(agent1.agent_name, agent2.agent_name)
# Visualize the workflow using Graphviz
print("\n📊 Creating workflow visualization...")
try:
viz_output = workflow.visualize(
output_path="simple_workflow_graph",
format="png",
view=True, # Auto-open the generated image
show_parallel_patterns=True,
)
print(f"✅ Workflow visualization saved to: {viz_output}")
except Exception as e:
print(f"⚠️ Graphviz not available, using text visualization: {e}")
workflow.visualize()
workflow.visualize()
workflow.compile()
# Export workflow to JSON
workflow_json = workflow.to_json()
print(
f"\n💾 Workflow exported to JSON ({len(workflow_json)} characters)"
)
print(workflow_json)
# Run the workflow and print results
print("\n🚀 Executing workflow...")
results = workflow.run(
task="What are the best arbitrage trading strategies for altcoins? Give me research papers and articles on the topic."
)

@ -1,16 +1,32 @@
from swarms.structs.heavy_swarm import HeavySwarm
from swarms import HeavySwarm
swarm = HeavySwarm(
worker_model_name="claude-3-5-sonnet-20240620",
show_dashboard=True,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
)
def main():
"""
Run a HeavySwarm query to find the best 3 gold ETFs.
This function initializes a HeavySwarm instance and queries it to provide
the top 3 gold exchange-traded funds (ETFs), requesting clear, structured results.
"""
swarm = HeavySwarm(
name="Gold ETF Research Team",
description="A team of agents that research the best gold ETFs",
worker_model_name="claude-sonnet-4-latest",
show_dashboard=True,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
)
out = swarm.run(
"Provide 3 publicly traded biotech companies that are currently trading below their cash value. For each company identified, provide available data or projections for the next 6 months, including any relevant financial metrics, upcoming catalysts, or events that could impact valuation. Present your findings in a clear, structured format. Be very specific and provide their ticker symbol, name, and the current price, cash value, and the percentage difference between the two."
)
prompt = (
"Find the best 3 gold ETFs. For each ETF, provide the ticker symbol, "
"full name, current price, expense ratio, assets under management, and "
"a brief explanation of why it is considered among the best. Present the information "
"in a clear, structured format suitable for investors."
)
print(out)
out = swarm.run(prompt)
print(out)
if __name__ == "__main__":
main()

@ -0,0 +1,34 @@
from swarms import HeavySwarm
def main():
"""
Run a HeavySwarm query to find the best and most promising treatments for diabetes.
This function initializes a HeavySwarm instance and queries it to provide
the top current and theoretical treatments for diabetes, requesting clear,
structured, and evidence-based results suitable for medical research or clinical review.
"""
swarm = HeavySwarm(
name="Diabetes Treatment Research Team",
description="A team of agents that research the best and most promising treatments for diabetes, including theoretical approaches.",
worker_model_name="claude-sonnet-4-20250514",
show_dashboard=True,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
)
prompt = (
"Identify the best and most promising treatments for diabetes, including both current standard therapies and theoretical or experimental approaches. "
"For each treatment, provide: the treatment name, type (e.g., medication, lifestyle intervention, device, gene therapy, etc.), "
"mechanism of action, current stage of research or approval status, key clinical evidence or rationale, "
"potential benefits and risks, and a brief summary of why it is considered promising. "
"Present the information in a clear, structured format suitable for medical professionals or researchers."
)
out = swarm.run(prompt)
print(out)
if __name__ == "__main__":
main()

@ -1,5 +1,4 @@
from swarms import Agent
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
from swarms import Agent, HierarchicalSwarm
# Initialize agents for a $50B portfolio analysis
@ -9,24 +8,27 @@ agents = [
agent_description="Senior financial analyst at BlackRock.",
system_prompt="You are a financial analyst tasked with optimizing asset allocations for a $50B portfolio. Provide clear, quantitative recommendations for each sector.",
max_loops=1,
model_name="groq/deepseek-r1-distill-qwen-32b",
model_name="gpt-4.1",
max_tokens=3000,
streaming_on=True,
),
Agent(
agent_name="Sector-Risk-Analyst",
agent_description="Expert risk management analyst.",
system_prompt="You are a risk analyst responsible for advising on risk allocation within a $50B portfolio. Provide detailed insights on risk exposures for each sector.",
max_loops=1,
model_name="groq/deepseek-r1-distill-qwen-32b",
model_name="gpt-4.1",
max_tokens=3000,
streaming_on=True,
),
Agent(
agent_name="Tech-Sector-Analyst",
agent_description="Technology sector analyst.",
system_prompt="You are a tech sector analyst focused on capital and risk allocations. Provide data-backed insights for the tech sector.",
max_loops=1,
model_name="groq/deepseek-r1-distill-qwen-32b",
model_name="gpt-4.1",
max_tokens=3000,
streaming_on=True,
),
]
@ -35,14 +37,19 @@ majority_voting = HierarchicalSwarm(
name="Sector-Investment-Advisory-System",
description="System for sector analysis and optimal allocations.",
agents=agents,
# director=director_agent,
max_loops=1,
max_loops=2,
output_type="dict",
)
# Run the analysis
result = majority_voting.run(
task="Evaluate market sectors and determine optimal allocation for a $50B portfolio. Include a detailed table of allocations, risk assessments, and a consolidated strategy."
task=(
"Simulate the allocation of a $50B fund specifically for the pharmaceutical sector. "
"Provide specific tickers (e.g., PFE, MRK, JNJ, LLY, BMY, etc.) and a clear rationale for why funds should be allocated to each company. "
"Present a table showing each ticker, company name, allocation percentage, and allocation amount in USD. "
"Include a brief summary of the overall allocation strategy and the reasoning behind the choices."
"Only call the Sector-Financial-Analyst agent to do the analysis. Nobody else should do the analysis."
)
)
print(result)

@ -6,11 +6,11 @@ each with detailed backgrounds, political positions, and comprehensive system pr
that reflect their real-world characteristics, voting patterns, and policy priorities.
"""
import random
from typing import Dict, List, Optional, Union
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently
from typing import Dict, List, Optional, Union
import json
import random
class SenatorSimulation:
@ -3490,159 +3490,159 @@ class SenatorSimulation:
}
# Example usage and demonstration
def main():
"""
Demonstrate the Senate simulation with various scenarios.
"""
print("🏛️ US Senate Simulation Initializing...")
# Create the simulation
senate = SenatorSimulation()
print("\n📊 Senate Composition:")
composition = senate.get_senate_composition()
print(json.dumps(composition, indent=2))
print(f"\n🎭 Available Senators ({len(senate.senators)}):")
for name in senate.senators.keys():
party = senate._get_senator_party(name)
print(f" - {name} ({party})")
# Example 1: Individual senator response
print("\n🗣️ Example: Senator Response")
senator = senate.get_senator("Katie Britt")
response = senator.run(
"What is your position on infrastructure spending and how would you pay for it?"
)
print(f"Senator Katie Britt: {response}")
# Example 2: Simulate a debate
print("\n💬 Example: Senate Debate on Climate Change")
debate = senate.simulate_debate(
"Climate change legislation and carbon pricing",
[
"Katie Britt",
"Mark Kelly",
"Lisa Murkowski",
"Alex Padilla",
],
)
for entry in debate["transcript"]:
print(f"\n{entry['senator']} ({entry['party']}):")
print(f" {entry['position'][:200]}...")
# Example 3: Simulate a vote
print("\n🗳️ Example: Senate Vote on Infrastructure Bill")
vote = senate.simulate_vote(
"A $1.2 trillion infrastructure bill including roads, bridges, broadband, and clean energy projects",
[
"Katie Britt",
"Mark Kelly",
"Lisa Murkowski",
"Alex Padilla",
"Tom Cotton",
],
)
print("Vote Results:")
for senator, vote_choice in vote["votes"].items():
print(f" {senator}: {vote_choice}")
print(f"\nFinal Result: {vote['results']['outcome']}")
print(
f"YEA: {vote['results']['yea']}, NAY: {vote['results']['nay']}, PRESENT: {vote['results']['present']}"
)
# Example 4: Committee hearing
print("\n🏛️ Example: Committee Hearing")
hearing = senate.run_committee_hearing(
"Armed Services",
"Military readiness and defense spending",
["Secretary of Defense", "Joint Chiefs Chairman"],
)
print("Armed Services Committee Hearing on Military Readiness")
for entry in hearing["transcript"][:3]: # Show first 3 entries
print(
f"\n{entry['type'].title()}: {entry['senator'] if 'senator' in entry else entry['witness']}"
)
print(f" {entry['content'][:150]}...")
# Example 5: Run all senators concurrently on a single task
print("\n🚀 Example: All Senators Concurrent Response")
all_senators_results = senate.run(
"What is your position on federal student loan forgiveness and how should we address the student debt crisis?"
)
print(f"\nTask: {all_senators_results['task']}")
print(
f"Selection Method: {all_senators_results['selection_method']}"
)
print(
f"Total Participants: {all_senators_results['total_participants']}"
)
print("\n📊 Party Breakdown:")
for party, senators in all_senators_results[
"party_breakdown"
].items():
if senators:
print(f"\n{party} ({len(senators)} senators):")
for senator_data in senators:
print(f" - {senator_data['senator']}")
# Example 6: Run 50% of senators randomly
print("\n🎲 Example: Random 50% of Senators")
random_results = senate.run(
"What is your position on climate change legislation and carbon pricing?",
participants=0.5, # 50% of all senators
)
print(f"\nTask: {random_results['task']}")
print(f"Selection Method: {random_results['selection_method']}")
print(
f"Total Participants: {random_results['total_participants']}"
)
print("\n📋 Selected Senators:")
for senator in random_results["participants"]:
party = senate._get_senator_party(senator)
print(f" - {senator} ({party})")
print("\n📊 Party Breakdown:")
for party, senators in random_results["party_breakdown"].items():
if senators:
print(f"\n{party} ({len(senators)} senators):")
for senator_data in senators:
print(f" - {senator_data['senator']}")
# Example 7: Run specific senators
print("\n🎯 Example: Specific Senators")
specific_results = senate.run(
"What is your position on military spending and defense policy?",
participants=[
"Katie Britt",
"Mark Kelly",
"Lisa Murkowski",
"Alex Padilla",
"Tom Cotton",
],
)
print(f"\nTask: {specific_results['task']}")
print(f"Selection Method: {specific_results['selection_method']}")
print(
f"Total Participants: {specific_results['total_participants']}"
)
print("\n📋 Responses by Senator:")
for senator, response in specific_results["responses"].items():
party = senate._get_senator_party(senator)
print(f"\n{senator} ({party}):")
print(f" {response[:200]}...")
if __name__ == "__main__":
main()
# # Example usage and demonstration
# def main():
# """
# Demonstrate the Senate simulation with various scenarios.
# """
# print("🏛️ US Senate Simulation Initializing...")
# # Create the simulation
# senate = SenatorSimulation()
# print("\n📊 Senate Composition:")
# composition = senate.get_senate_composition()
# print(json.dumps(composition, indent=2))
# print(f"\n🎭 Available Senators ({len(senate.senators)}):")
# for name in senate.senators.keys():
# party = senate._get_senator_party(name)
# print(f" - {name} ({party})")
# # Example 1: Individual senator response
# print("\n🗣 Example: Senator Response")
# senator = senate.get_senator("Katie Britt")
# response = senator.run(
# "What is your position on infrastructure spending and how would you pay for it?"
# )
# print(f"Senator Katie Britt: {response}")
# # Example 2: Simulate a debate
# print("\n💬 Example: Senate Debate on Climate Change")
# debate = senate.simulate_debate(
# "Climate change legislation and carbon pricing",
# [
# "Katie Britt",
# "Mark Kelly",
# "Lisa Murkowski",
# "Alex Padilla",
# ],
# )
# for entry in debate["transcript"]:
# print(f"\n{entry['senator']} ({entry['party']}):")
# print(f" {entry['position'][:200]}...")
# # Example 3: Simulate a vote
# print("\n🗳 Example: Senate Vote on Infrastructure Bill")
# vote = senate.simulate_vote(
# "A $1.2 trillion infrastructure bill including roads, bridges, broadband, and clean energy projects",
# [
# "Katie Britt",
# "Mark Kelly",
# "Lisa Murkowski",
# "Alex Padilla",
# "Tom Cotton",
# ],
# )
# print("Vote Results:")
# for senator, vote_choice in vote["votes"].items():
# print(f" {senator}: {vote_choice}")
# print(f"\nFinal Result: {vote['results']['outcome']}")
# print(
# f"YEA: {vote['results']['yea']}, NAY: {vote['results']['nay']}, PRESENT: {vote['results']['present']}"
# )
# # Example 4: Committee hearing
# print("\n🏛 Example: Committee Hearing")
# hearing = senate.run_committee_hearing(
# "Armed Services",
# "Military readiness and defense spending",
# ["Secretary of Defense", "Joint Chiefs Chairman"],
# )
# print("Armed Services Committee Hearing on Military Readiness")
# for entry in hearing["transcript"][:3]: # Show first 3 entries
# print(
# f"\n{entry['type'].title()}: {entry['senator'] if 'senator' in entry else entry['witness']}"
# )
# print(f" {entry['content'][:150]}...")
# # Example 5: Run all senators concurrently on a single task
# print("\n🚀 Example: All Senators Concurrent Response")
# all_senators_results = senate.run(
# "What is your position on federal student loan forgiveness and how should we address the student debt crisis?"
# )
# print(f"\nTask: {all_senators_results['task']}")
# print(
# f"Selection Method: {all_senators_results['selection_method']}"
# )
# print(
# f"Total Participants: {all_senators_results['total_participants']}"
# )
# print("\n📊 Party Breakdown:")
# for party, senators in all_senators_results[
# "party_breakdown"
# ].items():
# if senators:
# print(f"\n{party} ({len(senators)} senators):")
# for senator_data in senators:
# print(f" - {senator_data['senator']}")
# # Example 6: Run 50% of senators randomly
# print("\n🎲 Example: Random 50% of Senators")
# random_results = senate.run(
# "What is your position on climate change legislation and carbon pricing?",
# participants=0.5, # 50% of all senators
# )
# print(f"\nTask: {random_results['task']}")
# print(f"Selection Method: {random_results['selection_method']}")
# print(
# f"Total Participants: {random_results['total_participants']}"
# )
# print("\n📋 Selected Senators:")
# for senator in random_results["participants"]:
# party = senate._get_senator_party(senator)
# print(f" - {senator} ({party})")
# print("\n📊 Party Breakdown:")
# for party, senators in random_results["party_breakdown"].items():
# if senators:
# print(f"\n{party} ({len(senators)} senators):")
# for senator_data in senators:
# print(f" - {senator_data['senator']}")
# # Example 7: Run specific senators
# print("\n🎯 Example: Specific Senators")
# specific_results = senate.run(
# "What is your position on military spending and defense policy?",
# participants=[
# "Katie Britt",
# "Mark Kelly",
# "Lisa Murkowski",
# "Alex Padilla",
# "Tom Cotton",
# ],
# )
# print(f"\nTask: {specific_results['task']}")
# print(f"Selection Method: {specific_results['selection_method']}")
# print(
# f"Total Participants: {specific_results['total_participants']}"
# )
# print("\n📋 Responses by Senator:")
# for senator, response in specific_results["responses"].items():
# party = senate._get_senator_party(senator)
# print(f"\n{senator} ({party}):")
# print(f" {response[:200]}...")
# if __name__ == "__main__":
# main()

File diff suppressed because it is too large Load Diff

@ -1,6 +1,10 @@
"""
Flow:
Hierarchical Swarm Implementation
This module provides a hierarchical swarm architecture where a director agent coordinates
multiple worker agents to execute complex tasks through a structured workflow.
Flow:
1. User provides a task
2. Director creates a plan
3. Director distributes orders to agents individually or multiple tasks at once
@ -8,10 +12,21 @@ Flow:
5. Director evaluates results and issues new orders if needed (up to max_loops)
6. All context and conversation history is preserved throughout the process
Todo
- Add layers of management -- a list of list of agents that act as departments
- Auto build agents from input prompt - and then add them to the swarm
- Create an interactive and dynamic UI like we did with heavy swarm
- Make it faster and more high performance
Classes:
HierarchicalOrder: Represents a single task assignment to a specific agent
SwarmSpec: Contains the overall plan and list of orders for the swarm
HierarchicalSwarm: Main swarm orchestrator that manages director and worker agents
"""
import traceback
from typing import Any, Callable, List, Literal, Optional, Union
from typing import Any, Callable, List, Optional, Union
from pydantic import BaseModel, Field
@ -19,7 +34,6 @@ from swarms.prompts.hiearchical_system_prompt import (
HIEARCHICAL_SWARM_SYSTEM_PROMPT,
)
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import list_all_agents
from swarms.tools.base_tool import BaseTool
@ -33,6 +47,20 @@ logger = initialize_logger(log_folder="hierarchical_swarm")
class HierarchicalOrder(BaseModel):
"""
Represents a single task assignment within the hierarchical swarm.
This class defines the structure for individual task orders that the director
distributes to worker agents. Each order specifies which agent should execute
what specific task.
Attributes:
agent_name (str): The name of the agent assigned to execute the task.
Must match an existing agent in the swarm.
task (str): The specific task description to be executed by the assigned agent.
Should be clear and actionable.
"""
agent_name: str = Field(
...,
description="Specifies the name of the agent to which the task is assigned. This is a crucial element in the hierarchical structure of the swarm, as it determines the specific agent responsible for the task execution.",
@ -44,6 +72,20 @@ class HierarchicalOrder(BaseModel):
class SwarmSpec(BaseModel):
"""
Defines the complete specification for a hierarchical swarm execution.
This class contains the overall plan and all individual orders that the director
creates to coordinate the swarm's activities. It serves as the structured output
format for the director agent.
Attributes:
plan (str): A comprehensive plan outlining the sequence of actions and strategy
for the entire swarm to accomplish the given task.
orders (List[HierarchicalOrder]): A list of specific task assignments to
individual agents within the swarm.
"""
plan: str = Field(
...,
description="Outlines the sequence of actions to be taken by the swarm. This plan is a detailed roadmap that guides the swarm's behavior and decision-making.",
@ -54,50 +96,34 @@ class SwarmSpec(BaseModel):
)
SwarmType = Literal[
"AgentRearrange",
"MixtureOfAgents",
"SpreadSheetSwarm",
"SequentialWorkflow",
"ConcurrentWorkflow",
"GroupChat",
"MultiAgentRouter",
"AutoSwarmBuilder",
"HiearchicalSwarm",
"auto",
"MajorityVoting",
"MALT",
"DeepResearchSwarm",
"CouncilAsAJudge",
"InteractiveGroupChat",
]
class SwarmRouterCall(BaseModel):
goal: str = Field(
...,
description="The goal of the swarm router call. This is the goal that the swarm router will use to determine the best swarm to use.",
)
swarm_type: SwarmType = Field(
...,
description="The type of swarm to use. This is the type of swarm that the swarm router will use to determine the best swarm to use.",
)
task: str = Field(
...,
description="The task to be executed by the swarm router. This is the task that the swarm router will use to determine the best swarm to use.",
)
class HierarchicalSwarm(BaseSwarm):
class HierarchicalSwarm:
"""
_Representer a hierarchical swarm of agents, with a director that orchestrates tasks among the agents.
The workflow follows a hierarchical pattern:
1. Task is received and sent to the director
2. Director creates a plan and distributes orders to agents
3. Agents execute tasks and report back to the director
4. Director evaluates results and issues new orders if needed (up to max_loops)
5. All context and conversation history is preserved throughout the process
A hierarchical swarm orchestrator that coordinates multiple agents through a director.
This class implements a hierarchical architecture where a director agent creates
plans and distributes tasks to worker agents. The director can provide feedback
and iterate on results through multiple loops to achieve the desired outcome.
The swarm maintains conversation history throughout the process, allowing for
context-aware decision making and iterative refinement of results.
Attributes:
name (str): The name identifier for this swarm instance.
description (str): A description of the swarm's purpose and capabilities.
director (Optional[Union[Agent, Callable, Any]]): The director agent that
coordinates the swarm.
agents (List[Union[Agent, Callable, Any]]): List of worker agents available
for task execution.
max_loops (int): Maximum number of feedback loops the swarm can perform.
output_type (OutputType): Format for the final output of the swarm.
feedback_director_model_name (str): Model name for the feedback director.
director_name (str): Name identifier for the director agent.
director_model_name (str): Model name for the main director agent.
verbose (bool): Whether to enable detailed logging and progress tracking.
add_collaboration_prompt (bool): Whether to add collaboration prompts to agents.
planning_director_agent (Optional[Union[Agent, Callable, Any]]): Optional
planning agent.
director_feedback_on (bool): Whether director feedback is enabled.
"""
def __init__(
@ -121,22 +147,33 @@ class HierarchicalSwarm(BaseSwarm):
**kwargs,
):
"""
Initializes the HierarchicalSwarm with the given parameters.
:param name: The name of the swarm.
:param description: A description of the swarm.
:param director: The director agent that orchestrates tasks.
:param agents: A list of agents within the swarm.
:param max_loops: The maximum number of feedback loops between the director and agents.
:param output_type: The format in which to return the output (dict, str, or list).
:param verbose: Enable detailed logging with loguru.
Initialize a new HierarchicalSwarm instance.
Args:
name (str): The name identifier for this swarm instance.
description (str): A description of the swarm's purpose.
director (Optional[Union[Agent, Callable, Any]]): The director agent.
If None, a default director will be created.
agents (List[Union[Agent, Callable, Any]]): List of worker agents.
Must not be empty.
max_loops (int): Maximum number of feedback loops (must be > 0).
output_type (OutputType): Format for the final output.
feedback_director_model_name (str): Model name for feedback director.
director_name (str): Name identifier for the director agent.
director_model_name (str): Model name for the main director agent.
verbose (bool): Whether to enable detailed logging.
add_collaboration_prompt (bool): Whether to add collaboration prompts.
planning_director_agent (Optional[Union[Agent, Callable, Any]]):
Optional planning agent for enhanced planning capabilities.
director_feedback_on (bool): Whether director feedback is enabled.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Raises:
ValueError: If no agents are provided or max_loops is invalid.
"""
super().__init__(
name=name,
description=description,
agents=agents,
)
self.name = name
self.description = description
self.director = director
self.agents = agents
self.max_loops = max_loops
@ -155,33 +192,47 @@ class HierarchicalSwarm(BaseSwarm):
def init_swarm(self):
"""
Initializes the swarm.
Initialize the swarm with proper configuration and validation.
This method performs the following initialization steps:
1. Sets up logging if verbose mode is enabled
2. Creates a conversation instance for history tracking
3. Performs reliability checks on the configuration
4. Adds agent context to the director
Raises:
ValueError: If the swarm configuration is invalid.
"""
# Initialize logger only if verbose is enabled
if self.verbose:
logger.info(
f"🚀 Initializing HierarchicalSwarm: {self.name}"
)
logger.info(
f"📊 Configuration - Max loops: {self.max_loops}"
)
self.conversation = Conversation(time_enabled=False)
# Reliability checks
self.reliability_checks()
self.director = self.setup_director()
self.add_context_to_director()
if self.verbose:
logger.success(
f"✅ HierarchicalSwarm initialized successfully: Name {self.name}"
f"✅ HierarchicalSwarm: {self.name} initialized successfully."
)
def add_context_to_director(self):
"""Add agent context to the director's conversation."""
"""
Add agent context and collaboration information to the director's conversation.
This method ensures that the director has complete information about all
available agents, their capabilities, and how they can collaborate. This
context is essential for the director to make informed decisions about
task distribution.
Raises:
Exception: If adding context fails due to agent configuration issues.
"""
try:
if self.verbose:
logger.info("📝 Adding agent context to director")
@ -207,7 +258,18 @@ class HierarchicalSwarm(BaseSwarm):
)
def setup_director(self):
"""Set up the director agent with proper configuration."""
"""
Set up the director agent with proper configuration and tools.
Creates a new director agent with the SwarmSpec schema for structured
output, enabling it to create plans and distribute orders effectively.
Returns:
Agent: A configured director agent ready to coordinate the swarm.
Raises:
Exception: If director setup fails due to configuration issues.
"""
try:
if self.verbose:
logger.info("🎯 Setting up director agent")
@ -217,17 +279,6 @@ class HierarchicalSwarm(BaseSwarm):
if self.verbose:
logger.debug(f"📋 Director schema: {schema}")
# if self.director is not None:
# # if litellm_check_for_tools(self.director.model_name) is True:
# self.director.add_tool_schema([schema])
# if self.verbose:
# logger.success(
# "✅ Director agent setup completed successfully"
# )
# return self.director
# else:
return Agent(
agent_name=self.director_name,
agent_description="A director agent that can create a plan and distribute orders to agents",
@ -244,13 +295,20 @@ class HierarchicalSwarm(BaseSwarm):
def reliability_checks(self):
"""
Checks if there are any agents and a director set for the swarm.
Raises ValueError if either condition is not met.
Perform validation checks to ensure the swarm is properly configured.
This method validates:
1. That at least one agent is provided
2. That max_loops is greater than 0
3. That a director is available (creates default if needed)
Raises:
ValueError: If the swarm configuration is invalid.
"""
try:
if self.verbose:
logger.info(
f"🔍 Running reliability checks for swarm: {self.name}"
f"Hiearchical Swarm: {self.name} Reliability checks in progress..."
)
if not self.agents or len(self.agents) == 0:
@ -263,17 +321,12 @@ class HierarchicalSwarm(BaseSwarm):
"Max loops must be greater than 0. Please set a valid number of loops."
)
if not self.director:
raise ValueError(
"Director not set for the swarm. A director agent is required to coordinate and orchestrate tasks among the agents."
)
if self.director is None:
self.director = self.setup_director()
if self.verbose:
logger.success(
f"✅ Reliability checks passed for swarm: {self.name}"
)
logger.info(
f"📊 Swarm stats - Agents: {len(self.agents)}, Max loops: {self.max_loops}"
f"Hiearchical Swarm: {self.name} Reliability checks passed..."
)
except Exception as e:
@ -286,11 +339,22 @@ class HierarchicalSwarm(BaseSwarm):
img: str = None,
) -> SwarmSpec:
"""
Runs a task through the director agent with the current conversation context.
Execute the director agent with the given task and conversation context.
This method runs the director agent to create a plan and distribute orders
based on the current task and conversation history. If a planning director
agent is configured, it will first create a detailed plan before the main
director processes the task.
Args:
task (str): The task to be executed by the director.
img (str, optional): Optional image input for the task.
Returns:
SwarmSpec: The director's output containing the plan and orders.
:param task: The task to be executed by the director.
:param img: Optional image to be used with the task.
:return: The SwarmSpec containing the director's orders.
Raises:
Exception: If director execution fails.
"""
try:
if self.verbose:
@ -330,7 +394,25 @@ class HierarchicalSwarm(BaseSwarm):
def step(self, task: str, img: str = None, *args, **kwargs):
"""
Runs a single step of the hierarchical swarm.
Execute a single step of the hierarchical swarm workflow.
This method performs one complete iteration of the swarm's workflow:
1. Run the director to create a plan and orders
2. Parse the director's output to extract plan and orders
3. Execute all orders by calling the appropriate agents
4. Optionally generate director feedback on the results
Args:
task (str): The task to be processed in this step.
img (str, optional): Optional image input for the task.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
Any: The results from this step, either agent outputs or director feedback.
Raises:
Exception: If step execution fails.
"""
try:
if self.verbose:
@ -370,13 +452,27 @@ class HierarchicalSwarm(BaseSwarm):
def run(self, task: str, img: str = None, *args, **kwargs):
"""
Executes the hierarchical swarm for a specified number of feedback loops.
Execute the hierarchical swarm for the specified number of feedback loops.
This method orchestrates the complete swarm execution, performing multiple
iterations based on the max_loops configuration. Each iteration builds upon
the previous results, allowing for iterative refinement and improvement.
:param task: The initial task to be processed by the swarm.
:param img: Optional image input for the agents.
:param args: Additional positional arguments.
:param kwargs: Additional keyword arguments.
:return: The formatted conversation history as output.
The method maintains conversation history throughout all loops and provides
context from previous iterations to subsequent ones.
Args:
task (str): The initial task to be processed by the swarm.
img (str, optional): Optional image input for the agents.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
Any: The formatted conversation history as output, formatted according
to the output_type configuration.
Raises:
Exception: If swarm execution fails.
"""
try:
current_loop = 0
@ -448,7 +544,23 @@ class HierarchicalSwarm(BaseSwarm):
logger.error(error_msg)
def feedback_director(self, outputs: list):
"""Provide feedback from the director based on agent outputs."""
"""
Generate feedback from the director based on agent outputs.
This method creates a feedback director agent that analyzes the results
from worker agents and provides specific, actionable feedback for improvement.
The feedback is added to the conversation history and can be used in
subsequent iterations.
Args:
outputs (list): List of outputs from worker agents that need feedback.
Returns:
str: The director's feedback on the agent outputs.
Raises:
Exception: If feedback generation fails.
"""
try:
if self.verbose:
logger.info("📝 Generating director feedback")
@ -491,7 +603,24 @@ class HierarchicalSwarm(BaseSwarm):
self, agent_name: str, task: str, *args, **kwargs
):
"""
Calls a single agent with the given task.
Call a single agent by name to execute a specific task.
This method locates an agent by name and executes the given task with
the current conversation context. The agent's output is added to the
conversation history for future reference.
Args:
agent_name (str): The name of the agent to call.
task (str): The task to be executed by the agent.
*args: Additional positional arguments for the agent.
**kwargs: Additional keyword arguments for the agent.
Returns:
Any: The output from the agent's execution.
Raises:
ValueError: If the specified agent is not found in the swarm.
Exception: If agent execution fails.
"""
try:
if self.verbose:
@ -537,7 +666,22 @@ class HierarchicalSwarm(BaseSwarm):
def parse_orders(self, output):
"""
Parses the orders from the director's output.
Parse the director's output to extract plan and orders.
This method handles various output formats from the director agent and
extracts the plan and hierarchical orders. It supports both direct
dictionary formats and function call formats with JSON arguments.
Args:
output: The raw output from the director agent.
Returns:
tuple: A tuple containing (plan, orders) where plan is a string
and orders is a list of HierarchicalOrder objects.
Raises:
ValueError: If the output format is unexpected or cannot be parsed.
Exception: If parsing fails due to other errors.
"""
try:
if self.verbose:
@ -666,7 +810,20 @@ class HierarchicalSwarm(BaseSwarm):
def execute_orders(self, orders: list):
"""
Executes the orders from the director's output.
Execute all orders from the director's output.
This method iterates through all hierarchical orders and calls the
appropriate agents to execute their assigned tasks. Each agent's
output is collected and returned as a list.
Args:
orders (list): List of HierarchicalOrder objects to execute.
Returns:
list: List of outputs from all executed orders.
Raises:
Exception: If order execution fails.
"""
try:
if self.verbose:
@ -699,7 +856,23 @@ class HierarchicalSwarm(BaseSwarm):
self, tasks: List[str], img: str = None, *args, **kwargs
):
"""
Executes the hierarchical swarm for a list of tasks.
Execute the hierarchical swarm for multiple tasks in sequence.
This method processes a list of tasks sequentially, running the complete
swarm workflow for each task. Each task is processed independently with
its own conversation context and results.
Args:
tasks (List[str]): List of tasks to be processed by the swarm.
img (str, optional): Optional image input for the tasks.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
list: List of results for each processed task.
Raises:
Exception: If batched execution fails.
"""
try:
if self.verbose:

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save