cron job examples, nw gpt-oss examples, and more

pull/1011/head
Kye Gomez 4 weeks ago
parent c0c9b7201a
commit 6dc01c3c72

@ -122,6 +122,363 @@ cron_job = CronJob(
cron_job.run("Perform analysis")
```
### Cron Jobs With Multi-Agent Structures
You can also run Cron Jobs with multi-agent structures like `SequentialWorkflow`, `ConcurrentWorkflow`, `HiearchicalSwarm`, and other methods.
- Just initialize the class as the agent parameter in the `CronJob(agent=swarm)`
- Input your arguments into the `.run(task: str)` method
```python
"""
Cryptocurrency Concurrent Multi-Agent Cron Job Example
This example demonstrates how to use ConcurrentWorkflow with CronJob to create
a powerful cryptocurrency tracking system. Each specialized agent analyzes a
specific cryptocurrency concurrently every minute.
Features:
- ConcurrentWorkflow for parallel agent execution
- CronJob scheduling for automated runs every 1 minute
- Each agent specializes in analyzing one specific cryptocurrency
- Real-time data fetching from CoinGecko API
- Concurrent analysis of multiple cryptocurrencies
- Structured output with professional formatting
Architecture:
CronJob -> ConcurrentWorkflow -> [Bitcoin Agent, Ethereum Agent, Solana Agent, etc.] -> Parallel Analysis
"""
from typing import List
from loguru import logger
from swarms import Agent, CronJob, ConcurrentWorkflow
from swarms_tools import coin_gecko_coin_api
def create_crypto_specific_agents() -> List[Agent]:
"""
Creates agents that each specialize in analyzing a specific cryptocurrency.
Returns:
List[Agent]: List of cryptocurrency-specific Agent instances
"""
# Bitcoin Specialist Agent
bitcoin_agent = Agent(
agent_name="Bitcoin-Analyst",
agent_description="Expert analyst specializing exclusively in Bitcoin (BTC) analysis and market dynamics",
system_prompt="""You are a Bitcoin specialist and expert analyst. Your expertise includes:
BITCOIN SPECIALIZATION:
- Bitcoin's unique position as digital gold
- Bitcoin halving cycles and their market impact
- Bitcoin mining economics and hash rate analysis
- Lightning Network and Layer 2 developments
- Bitcoin adoption by institutions and countries
- Bitcoin's correlation with traditional markets
- Bitcoin technical analysis and on-chain metrics
- Bitcoin's role as a store of value and hedge against inflation
ANALYSIS FOCUS:
- Analyze ONLY Bitcoin data from the provided dataset
- Focus on Bitcoin-specific metrics and trends
- Consider Bitcoin's unique market dynamics
- Evaluate Bitcoin's dominance and market leadership
- Assess institutional adoption trends
- Monitor on-chain activity and network health
DELIVERABLES:
- Bitcoin-specific analysis and insights
- Price action assessment and predictions
- Market dominance analysis
- Institutional adoption impact
- Technical and fundamental outlook
- Risk factors specific to Bitcoin
Extract Bitcoin data from the provided dataset and provide comprehensive Bitcoin-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Ethereum Specialist Agent
ethereum_agent = Agent(
agent_name="Ethereum-Analyst",
agent_description="Expert analyst specializing exclusively in Ethereum (ETH) analysis and ecosystem development",
system_prompt="""You are an Ethereum specialist and expert analyst. Your expertise includes:
ETHEREUM SPECIALIZATION:
- Ethereum's smart contract platform and DeFi ecosystem
- Ethereum 2.0 transition and proof-of-stake mechanics
- Gas fees, network usage, and scalability solutions
- Layer 2 solutions (Arbitrum, Optimism, Polygon)
- DeFi protocols and TVL (Total Value Locked) analysis
- NFT markets and Ethereum's role in digital assets
- Developer activity and ecosystem growth
- EIP proposals and network upgrades
ANALYSIS FOCUS:
- Analyze ONLY Ethereum data from the provided dataset
- Focus on Ethereum's platform utility and network effects
- Evaluate DeFi ecosystem health and growth
- Assess Layer 2 adoption and scalability solutions
- Monitor network usage and gas fee trends
- Consider Ethereum's competitive position vs other smart contract platforms
DELIVERABLES:
- Ethereum-specific analysis and insights
- Platform utility and adoption metrics
- DeFi ecosystem impact assessment
- Network health and scalability evaluation
- Competitive positioning analysis
- Technical and fundamental outlook for ETH
Extract Ethereum data from the provided dataset and provide comprehensive Ethereum-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Solana Specialist Agent
solana_agent = Agent(
agent_name="Solana-Analyst",
agent_description="Expert analyst specializing exclusively in Solana (SOL) analysis and ecosystem development",
system_prompt="""You are a Solana specialist and expert analyst. Your expertise includes:
SOLANA SPECIALIZATION:
- Solana's high-performance blockchain architecture
- Proof-of-History consensus mechanism
- Solana's DeFi ecosystem and DEX platforms (Serum, Raydium)
- NFT marketplaces and creator economy on Solana
- Network outages and reliability concerns
- Developer ecosystem and Rust programming adoption
- Validator economics and network decentralization
- Cross-chain bridges and interoperability
ANALYSIS FOCUS:
- Analyze ONLY Solana data from the provided dataset
- Focus on Solana's performance and scalability advantages
- Evaluate network stability and uptime improvements
- Assess ecosystem growth and developer adoption
- Monitor DeFi and NFT activity on Solana
- Consider Solana's competitive position vs Ethereum
DELIVERABLES:
- Solana-specific analysis and insights
- Network performance and reliability assessment
- Ecosystem growth and adoption metrics
- DeFi and NFT market analysis
- Competitive advantages and challenges
- Technical and fundamental outlook for SOL
Extract Solana data from the provided dataset and provide comprehensive Solana-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Cardano Specialist Agent
cardano_agent = Agent(
agent_name="Cardano-Analyst",
agent_description="Expert analyst specializing exclusively in Cardano (ADA) analysis and research-driven development",
system_prompt="""You are a Cardano specialist and expert analyst. Your expertise includes:
CARDANO SPECIALIZATION:
- Cardano's research-driven development approach
- Ouroboros proof-of-stake consensus protocol
- Smart contract capabilities via Plutus and Marlowe
- Cardano's three-layer architecture (settlement, computation, control)
- Academic partnerships and peer-reviewed research
- Cardano ecosystem projects and DApp development
- Native tokens and Cardano's UTXO model
- Sustainability and treasury funding mechanisms
ANALYSIS FOCUS:
- Analyze ONLY Cardano data from the provided dataset
- Focus on Cardano's methodical development approach
- Evaluate smart contract adoption and ecosystem growth
- Assess academic partnerships and research contributions
- Monitor native token ecosystem development
- Consider Cardano's long-term roadmap and milestones
DELIVERABLES:
- Cardano-specific analysis and insights
- Development progress and milestone achievements
- Smart contract ecosystem evaluation
- Academic research impact assessment
- Native token and DApp adoption metrics
- Technical and fundamental outlook for ADA
Extract Cardano data from the provided dataset and provide comprehensive Cardano-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Binance Coin Specialist Agent
bnb_agent = Agent(
agent_name="BNB-Analyst",
agent_description="Expert analyst specializing exclusively in BNB analysis and Binance ecosystem dynamics",
system_prompt="""You are a BNB specialist and expert analyst. Your expertise includes:
BNB SPECIALIZATION:
- BNB's utility within the Binance ecosystem
- Binance Smart Chain (BSC) development and adoption
- BNB token burns and deflationary mechanics
- Binance exchange volume and market leadership
- BSC DeFi ecosystem and yield farming
- Cross-chain bridges and multi-chain strategies
- Regulatory challenges facing Binance globally
- BNB's role in transaction fee discounts and platform benefits
ANALYSIS FOCUS:
- Analyze ONLY BNB data from the provided dataset
- Focus on BNB's utility value and exchange benefits
- Evaluate BSC ecosystem growth and competition with Ethereum
- Assess token burn impact on supply and price
- Monitor Binance platform developments and regulations
- Consider BNB's centralized vs decentralized aspects
DELIVERABLES:
- BNB-specific analysis and insights
- Utility value and ecosystem benefits assessment
- BSC adoption and DeFi growth evaluation
- Token economics and burn mechanism impact
- Regulatory risk and compliance analysis
- Technical and fundamental outlook for BNB
Extract BNB data from the provided dataset and provide comprehensive BNB-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# XRP Specialist Agent
xrp_agent = Agent(
agent_name="XRP-Analyst",
agent_description="Expert analyst specializing exclusively in XRP analysis and cross-border payment solutions",
system_prompt="""You are an XRP specialist and expert analyst. Your expertise includes:
XRP SPECIALIZATION:
- XRP's role in cross-border payments and remittances
- RippleNet adoption by financial institutions
- Central Bank Digital Currency (CBDC) partnerships
- Regulatory landscape and SEC lawsuit implications
- XRP Ledger's consensus mechanism and energy efficiency
- On-Demand Liquidity (ODL) usage and growth
- Competition with SWIFT and traditional payment rails
- Ripple's partnerships with banks and payment providers
ANALYSIS FOCUS:
- Analyze ONLY XRP data from the provided dataset
- Focus on XRP's utility in payments and remittances
- Evaluate RippleNet adoption and institutional partnerships
- Assess regulatory developments and legal clarity
- Monitor ODL usage and transaction volumes
- Consider XRP's competitive position in payments
DELIVERABLES:
- XRP-specific analysis and insights
- Payment utility and adoption assessment
- Regulatory landscape and legal developments
- Institutional partnership impact evaluation
- Cross-border payment market analysis
- Technical and fundamental outlook for XRP
Extract XRP data from the provided dataset and provide comprehensive XRP-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
return [
bitcoin_agent,
ethereum_agent,
solana_agent,
cardano_agent,
bnb_agent,
xrp_agent,
]
def create_crypto_workflow() -> ConcurrentWorkflow:
"""
Creates a ConcurrentWorkflow with cryptocurrency-specific analysis agents.
Returns:
ConcurrentWorkflow: Configured workflow for crypto analysis
"""
agents = create_crypto_specific_agents()
workflow = ConcurrentWorkflow(
name="Crypto-Specific-Analysis-Workflow",
description="Concurrent execution of cryptocurrency-specific analysis agents",
agents=agents,
max_loops=1,
)
return workflow
def create_crypto_cron_job() -> CronJob:
"""
Creates a CronJob that runs cryptocurrency-specific analysis every minute using ConcurrentWorkflow.
Returns:
CronJob: Configured cron job for automated crypto analysis
"""
# Create the concurrent workflow
workflow = create_crypto_workflow()
# Create the cron job
cron_job = CronJob(
agent=workflow, # Use the workflow as the agent
interval="5seconds", # Run every 1 minute
)
return cron_job
def main():
"""
Main function to run the cryptocurrency-specific concurrent analysis cron job.
"""
cron_job = create_crypto_cron_job()
prompt = """
Conduct a comprehensive analysis of your assigned cryptocurrency.
"""
# Start the cron job
logger.info("🔄 Starting automated analysis loop...")
logger.info("⏰ Press Ctrl+C to stop the cron job")
output = cron_job.run(task=prompt)
print(output)
if __name__ == "__main__":
main()
```
## Conclusion
The CronJob class provides a powerful way to schedule and automate tasks using Swarms Agents or custom functions. Key benefits include:

@ -0,0 +1,349 @@
"""
Cryptocurrency Concurrent Multi-Agent Cron Job Example
This example demonstrates how to use ConcurrentWorkflow with CronJob to create
a powerful cryptocurrency tracking system. Each specialized agent analyzes a
specific cryptocurrency concurrently every minute.
Features:
- ConcurrentWorkflow for parallel agent execution
- CronJob scheduling for automated runs every 1 minute
- Each agent specializes in analyzing one specific cryptocurrency
- Real-time data fetching from CoinGecko API
- Concurrent analysis of multiple cryptocurrencies
- Structured output with professional formatting
Architecture:
CronJob -> ConcurrentWorkflow -> [Bitcoin Agent, Ethereum Agent, Solana Agent, etc.] -> Parallel Analysis
"""
from typing import List
from loguru import logger
from swarms import Agent, CronJob, ConcurrentWorkflow
from swarms_tools import coin_gecko_coin_api
def create_crypto_specific_agents() -> List[Agent]:
"""
Creates agents that each specialize in analyzing a specific cryptocurrency.
Returns:
List[Agent]: List of cryptocurrency-specific Agent instances
"""
# Bitcoin Specialist Agent
bitcoin_agent = Agent(
agent_name="Bitcoin-Analyst",
agent_description="Expert analyst specializing exclusively in Bitcoin (BTC) analysis and market dynamics",
system_prompt="""You are a Bitcoin specialist and expert analyst. Your expertise includes:
BITCOIN SPECIALIZATION:
- Bitcoin's unique position as digital gold
- Bitcoin halving cycles and their market impact
- Bitcoin mining economics and hash rate analysis
- Lightning Network and Layer 2 developments
- Bitcoin adoption by institutions and countries
- Bitcoin's correlation with traditional markets
- Bitcoin technical analysis and on-chain metrics
- Bitcoin's role as a store of value and hedge against inflation
ANALYSIS FOCUS:
- Analyze ONLY Bitcoin data from the provided dataset
- Focus on Bitcoin-specific metrics and trends
- Consider Bitcoin's unique market dynamics
- Evaluate Bitcoin's dominance and market leadership
- Assess institutional adoption trends
- Monitor on-chain activity and network health
DELIVERABLES:
- Bitcoin-specific analysis and insights
- Price action assessment and predictions
- Market dominance analysis
- Institutional adoption impact
- Technical and fundamental outlook
- Risk factors specific to Bitcoin
Extract Bitcoin data from the provided dataset and provide comprehensive Bitcoin-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Ethereum Specialist Agent
ethereum_agent = Agent(
agent_name="Ethereum-Analyst",
agent_description="Expert analyst specializing exclusively in Ethereum (ETH) analysis and ecosystem development",
system_prompt="""You are an Ethereum specialist and expert analyst. Your expertise includes:
ETHEREUM SPECIALIZATION:
- Ethereum's smart contract platform and DeFi ecosystem
- Ethereum 2.0 transition and proof-of-stake mechanics
- Gas fees, network usage, and scalability solutions
- Layer 2 solutions (Arbitrum, Optimism, Polygon)
- DeFi protocols and TVL (Total Value Locked) analysis
- NFT markets and Ethereum's role in digital assets
- Developer activity and ecosystem growth
- EIP proposals and network upgrades
ANALYSIS FOCUS:
- Analyze ONLY Ethereum data from the provided dataset
- Focus on Ethereum's platform utility and network effects
- Evaluate DeFi ecosystem health and growth
- Assess Layer 2 adoption and scalability solutions
- Monitor network usage and gas fee trends
- Consider Ethereum's competitive position vs other smart contract platforms
DELIVERABLES:
- Ethereum-specific analysis and insights
- Platform utility and adoption metrics
- DeFi ecosystem impact assessment
- Network health and scalability evaluation
- Competitive positioning analysis
- Technical and fundamental outlook for ETH
Extract Ethereum data from the provided dataset and provide comprehensive Ethereum-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Solana Specialist Agent
solana_agent = Agent(
agent_name="Solana-Analyst",
agent_description="Expert analyst specializing exclusively in Solana (SOL) analysis and ecosystem development",
system_prompt="""You are a Solana specialist and expert analyst. Your expertise includes:
SOLANA SPECIALIZATION:
- Solana's high-performance blockchain architecture
- Proof-of-History consensus mechanism
- Solana's DeFi ecosystem and DEX platforms (Serum, Raydium)
- NFT marketplaces and creator economy on Solana
- Network outages and reliability concerns
- Developer ecosystem and Rust programming adoption
- Validator economics and network decentralization
- Cross-chain bridges and interoperability
ANALYSIS FOCUS:
- Analyze ONLY Solana data from the provided dataset
- Focus on Solana's performance and scalability advantages
- Evaluate network stability and uptime improvements
- Assess ecosystem growth and developer adoption
- Monitor DeFi and NFT activity on Solana
- Consider Solana's competitive position vs Ethereum
DELIVERABLES:
- Solana-specific analysis and insights
- Network performance and reliability assessment
- Ecosystem growth and adoption metrics
- DeFi and NFT market analysis
- Competitive advantages and challenges
- Technical and fundamental outlook for SOL
Extract Solana data from the provided dataset and provide comprehensive Solana-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Cardano Specialist Agent
cardano_agent = Agent(
agent_name="Cardano-Analyst",
agent_description="Expert analyst specializing exclusively in Cardano (ADA) analysis and research-driven development",
system_prompt="""You are a Cardano specialist and expert analyst. Your expertise includes:
CARDANO SPECIALIZATION:
- Cardano's research-driven development approach
- Ouroboros proof-of-stake consensus protocol
- Smart contract capabilities via Plutus and Marlowe
- Cardano's three-layer architecture (settlement, computation, control)
- Academic partnerships and peer-reviewed research
- Cardano ecosystem projects and DApp development
- Native tokens and Cardano's UTXO model
- Sustainability and treasury funding mechanisms
ANALYSIS FOCUS:
- Analyze ONLY Cardano data from the provided dataset
- Focus on Cardano's methodical development approach
- Evaluate smart contract adoption and ecosystem growth
- Assess academic partnerships and research contributions
- Monitor native token ecosystem development
- Consider Cardano's long-term roadmap and milestones
DELIVERABLES:
- Cardano-specific analysis and insights
- Development progress and milestone achievements
- Smart contract ecosystem evaluation
- Academic research impact assessment
- Native token and DApp adoption metrics
- Technical and fundamental outlook for ADA
Extract Cardano data from the provided dataset and provide comprehensive Cardano-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Binance Coin Specialist Agent
bnb_agent = Agent(
agent_name="BNB-Analyst",
agent_description="Expert analyst specializing exclusively in BNB analysis and Binance ecosystem dynamics",
system_prompt="""You are a BNB specialist and expert analyst. Your expertise includes:
BNB SPECIALIZATION:
- BNB's utility within the Binance ecosystem
- Binance Smart Chain (BSC) development and adoption
- BNB token burns and deflationary mechanics
- Binance exchange volume and market leadership
- BSC DeFi ecosystem and yield farming
- Cross-chain bridges and multi-chain strategies
- Regulatory challenges facing Binance globally
- BNB's role in transaction fee discounts and platform benefits
ANALYSIS FOCUS:
- Analyze ONLY BNB data from the provided dataset
- Focus on BNB's utility value and exchange benefits
- Evaluate BSC ecosystem growth and competition with Ethereum
- Assess token burn impact on supply and price
- Monitor Binance platform developments and regulations
- Consider BNB's centralized vs decentralized aspects
DELIVERABLES:
- BNB-specific analysis and insights
- Utility value and ecosystem benefits assessment
- BSC adoption and DeFi growth evaluation
- Token economics and burn mechanism impact
- Regulatory risk and compliance analysis
- Technical and fundamental outlook for BNB
Extract BNB data from the provided dataset and provide comprehensive BNB-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# XRP Specialist Agent
xrp_agent = Agent(
agent_name="XRP-Analyst",
agent_description="Expert analyst specializing exclusively in XRP analysis and cross-border payment solutions",
system_prompt="""You are an XRP specialist and expert analyst. Your expertise includes:
XRP SPECIALIZATION:
- XRP's role in cross-border payments and remittances
- RippleNet adoption by financial institutions
- Central Bank Digital Currency (CBDC) partnerships
- Regulatory landscape and SEC lawsuit implications
- XRP Ledger's consensus mechanism and energy efficiency
- On-Demand Liquidity (ODL) usage and growth
- Competition with SWIFT and traditional payment rails
- Ripple's partnerships with banks and payment providers
ANALYSIS FOCUS:
- Analyze ONLY XRP data from the provided dataset
- Focus on XRP's utility in payments and remittances
- Evaluate RippleNet adoption and institutional partnerships
- Assess regulatory developments and legal clarity
- Monitor ODL usage and transaction volumes
- Consider XRP's competitive position in payments
DELIVERABLES:
- XRP-specific analysis and insights
- Payment utility and adoption assessment
- Regulatory landscape and legal developments
- Institutional partnership impact evaluation
- Cross-border payment market analysis
- Technical and fundamental outlook for XRP
Extract XRP data from the provided dataset and provide comprehensive XRP-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
return [
bitcoin_agent,
ethereum_agent,
solana_agent,
cardano_agent,
bnb_agent,
xrp_agent,
]
def create_crypto_workflow() -> ConcurrentWorkflow:
"""
Creates a ConcurrentWorkflow with cryptocurrency-specific analysis agents.
Returns:
ConcurrentWorkflow: Configured workflow for crypto analysis
"""
agents = create_crypto_specific_agents()
workflow = ConcurrentWorkflow(
name="Crypto-Specific-Analysis-Workflow",
description="Concurrent execution of cryptocurrency-specific analysis agents",
agents=agents,
max_loops=1,
)
return workflow
def create_crypto_cron_job() -> CronJob:
"""
Creates a CronJob that runs cryptocurrency-specific analysis every minute using ConcurrentWorkflow.
Returns:
CronJob: Configured cron job for automated crypto analysis
"""
# Create the concurrent workflow
workflow = create_crypto_workflow()
# Create the cron job
cron_job = CronJob(
agent=workflow, # Use the workflow as the agent
interval="5seconds", # Run every 1 minute
)
return cron_job
def main():
"""
Main function to run the cryptocurrency-specific concurrent analysis cron job.
"""
cron_job = create_crypto_cron_job()
prompt = (
"You are a world-class institutional crypto analyst at a top-tier asset management firm (e.g., BlackRock).\n"
"Conduct a thorough, data-driven, and professional analysis of your assigned cryptocurrency, including:\n"
"- Current price, market cap, and recent performance trends\n"
"- Key technical and fundamental indicators\n"
"- Major news, regulatory, or macroeconomic events impacting the asset\n"
"- On-chain activity and notable whale or institutional movements\n"
"- Short-term and long-term outlook with clear, actionable insights\n"
"Present your findings in a concise, well-structured report suitable for executive decision-makers."
)
# Start the cron job
logger.info("🔄 Starting automated analysis loop...")
logger.info("⏰ Press Ctrl+C to stop the cron job")
output = cron_job.run(task=prompt)
print(output)
if __name__ == "__main__":
main()

@ -0,0 +1,157 @@
"""
Simple Cryptocurrency Concurrent CronJob Example
This is a simplified version showcasing the core concept of combining:
- CronJob (for scheduling)
- ConcurrentWorkflow (for parallel execution)
- Each agent analyzes a specific cryptocurrency
Perfect for understanding the basic pattern before diving into the full example.
"""
import json
import requests
from datetime import datetime
from loguru import logger
from swarms import Agent, CronJob, ConcurrentWorkflow
def get_specific_crypto_data(coin_ids):
"""Fetch specific crypto data from CoinGecko API."""
try:
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": ",".join(coin_ids),
"vs_currencies": "usd",
"include_24hr_change": True,
"include_market_cap": True,
"include_24hr_vol": True,
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
result = {
"timestamp": datetime.now().isoformat(),
"coins": data,
}
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error fetching crypto data: {e}")
return f"Error: {e}"
def create_crypto_specific_agents():
"""Create agents that each specialize in one cryptocurrency."""
# Bitcoin Specialist Agent
bitcoin_agent = Agent(
agent_name="Bitcoin-Analyst",
system_prompt="""You are a Bitcoin specialist. Analyze ONLY Bitcoin (BTC) data from the provided dataset.
Focus on:
- Bitcoin price movements and trends
- Market dominance and institutional adoption
- Bitcoin-specific market dynamics
- Store of value characteristics
Ignore all other cryptocurrencies in your analysis.""",
model_name="gpt-4o-mini",
max_loops=1,
print_on=False, # Important for concurrent execution
)
# Ethereum Specialist Agent
ethereum_agent = Agent(
agent_name="Ethereum-Analyst",
system_prompt="""You are an Ethereum specialist. Analyze ONLY Ethereum (ETH) data from the provided dataset.
Focus on:
- Ethereum price action and DeFi ecosystem
- Smart contract platform adoption
- Gas fees and network usage
- Layer 2 scaling solutions impact
Ignore all other cryptocurrencies in your analysis.""",
model_name="gpt-4o-mini",
max_loops=1,
print_on=False,
)
# Solana Specialist Agent
solana_agent = Agent(
agent_name="Solana-Analyst",
system_prompt="""You are a Solana specialist. Analyze ONLY Solana (SOL) data from the provided dataset.
Focus on:
- Solana price performance and ecosystem growth
- High-performance blockchain advantages
- DeFi and NFT activity on Solana
- Network reliability and uptime
Ignore all other cryptocurrencies in your analysis.""",
model_name="gpt-4o-mini",
max_loops=1,
print_on=False,
)
return [bitcoin_agent, ethereum_agent, solana_agent]
def main():
"""Main function demonstrating crypto-specific concurrent analysis with cron job."""
logger.info(
"🚀 Starting Simple Crypto-Specific Concurrent Analysis"
)
logger.info("💰 Each agent analyzes one specific cryptocurrency:")
logger.info(" 🟠 Bitcoin-Analyst -> BTC only")
logger.info(" 🔵 Ethereum-Analyst -> ETH only")
logger.info(" 🟢 Solana-Analyst -> SOL only")
# Define specific cryptocurrencies to analyze
coin_ids = ["bitcoin", "ethereum", "solana"]
# Step 1: Create crypto-specific agents
agents = create_crypto_specific_agents()
# Step 2: Create ConcurrentWorkflow
workflow = ConcurrentWorkflow(
name="Simple-Crypto-Specific-Analysis",
agents=agents,
show_dashboard=True, # Shows real-time progress
)
# Step 3: Create CronJob with the workflow
cron_job = CronJob(
agent=workflow, # Use workflow as the agent
interval="60seconds", # Run every minute
job_id="simple-crypto-specific-cron",
)
# Step 4: Define the analysis task
task = f"""
Analyze the cryptocurrency data below. Each agent should focus ONLY on their assigned cryptocurrency:
- Bitcoin-Analyst: Analyze Bitcoin (BTC) data only
- Ethereum-Analyst: Analyze Ethereum (ETH) data only
- Solana-Analyst: Analyze Solana (SOL) data only
Cryptocurrency Data:
{get_specific_crypto_data(coin_ids)}
Each agent should:
1. Extract and analyze data for YOUR ASSIGNED cryptocurrency only
2. Provide brief insights from your specialty perspective
3. Give a price trend assessment
4. Identify key opportunities or risks
5. Ignore all other cryptocurrencies
"""
# Step 5: Start the cron job
logger.info("▶️ Starting cron job - Press Ctrl+C to stop")
try:
cron_job.run(task=task)
except KeyboardInterrupt:
logger.info("⏹️ Stopped by user")
if __name__ == "__main__":
main()

@ -141,7 +141,7 @@ def analyze_solana_data(data: str) -> str:
formatted_data = solana_data.get("formatted_data", {})
# Extract key metrics
current_price = price_data.get("current_price_usd")
price_data.get("current_price_usd")
price_change = price_data.get("price_change_24h_percent")
volume_24h = price_data.get("volume_24h_usd")
market_cap = price_data.get("market_cap_usd")

@ -16,11 +16,13 @@ from typing import List
# Add the root directory to the Python path if running from examples directory
current_dir = os.path.dirname(os.path.abspath(__file__))
if 'examples' in current_dir:
if "examples" in current_dir:
root_dir = current_dir
while os.path.basename(root_dir) != 'examples' and root_dir != os.path.dirname(root_dir):
while os.path.basename(
root_dir
) != "examples" and root_dir != os.path.dirname(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.basename(root_dir) == 'examples':
if os.path.basename(root_dir) == "examples":
root_dir = os.path.dirname(root_dir)
if root_dir not in sys.path:
sys.path.insert(0, root_dir)
@ -65,19 +67,19 @@ def create_board_members() -> List[BoardMember]:
agent=chairman,
role=BoardMemberRole.CHAIRMAN,
voting_weight=2.0,
expertise_areas=["leadership", "strategy"]
expertise_areas=["leadership", "strategy"],
),
BoardMember(
agent=cto,
role=BoardMemberRole.EXECUTIVE_DIRECTOR,
voting_weight=1.5,
expertise_areas=["technology", "innovation"]
expertise_areas=["technology", "innovation"],
),
BoardMember(
agent=cfo,
role=BoardMemberRole.EXECUTIVE_DIRECTOR,
voting_weight=1.5,
expertise_areas=["finance", "risk_management"]
expertise_areas=["finance", "risk_management"],
),
]
@ -168,7 +170,9 @@ def run_simple_example() -> None:
)
# Execute simple task
task = "Analyze current market trends and create a summary report."
task = (
"Analyze current market trends and create a summary report."
)
result = board_swarm.run(task=task)
print("Simple example completed!")
@ -179,7 +183,9 @@ def main() -> None:
"""Main function to run the examples."""
if not os.getenv("OPENAI_API_KEY"):
print("Warning: OPENAI_API_KEY not set. Example may not work.")
print(
"Warning: OPENAI_API_KEY not set. Example may not work."
)
return
try:

@ -0,0 +1,70 @@
"""
Debug script for the Arasaka Dashboard to test agent output display.
"""
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
from swarms.structs.agent import Agent
def debug_dashboard():
"""Debug the dashboard functionality."""
print("🔍 Starting dashboard debug...")
# Create simple agents with clear names
agent1 = Agent(
agent_name="Research-Agent",
agent_description="A research agent for testing",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
agent2 = Agent(
agent_name="Analysis-Agent",
agent_description="An analysis agent for testing",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
print(
f"✅ Created agents: {agent1.agent_name}, {agent2.agent_name}"
)
# Create swarm with dashboard
swarm = HierarchicalSwarm(
name="Debug Swarm",
description="A test swarm for debugging dashboard functionality",
agents=[agent1, agent2],
max_loops=1,
interactive=True,
verbose=True,
)
print("✅ Created swarm with dashboard")
print("📊 Dashboard should now show agents in PENDING status")
# Wait a moment to see the initial dashboard
import time
time.sleep(3)
print("\n🚀 Starting swarm execution...")
# Run with a simple task
result = swarm.run(
task="Create a brief summary of machine learning"
)
print("\n✅ Debug completed!")
print("📋 Final result preview:")
print(
str(result)[:300] + "..."
if len(str(result)) > 300
else str(result)
)
if __name__ == "__main__":
debug_dashboard()

@ -0,0 +1,71 @@
"""
Hierarchical Swarm with Arasaka Dashboard Example
This example demonstrates the new interactive dashboard functionality for the
hierarchical swarm, featuring a futuristic Arasaka Corporation-style interface
with red and black color scheme.
"""
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
from swarms.structs.agent import Agent
def main():
"""
Demonstrate the hierarchical swarm with interactive dashboard.
"""
print("🚀 Initializing Swarms Corporation Hierarchical Swarm...")
# Create specialized agents
research_agent = Agent(
agent_name="Research-Analyst",
agent_description="Specialized in comprehensive research and data gathering",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
analysis_agent = Agent(
agent_name="Data-Analyst",
agent_description="Expert in data analysis and pattern recognition",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
strategy_agent = Agent(
agent_name="Strategy-Consultant",
agent_description="Specialized in strategic planning and recommendations",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
# Create hierarchical swarm with interactive dashboard
swarm = HierarchicalSwarm(
name="Swarms Corporation Operations",
description="Enterprise-grade hierarchical swarm for complex task execution",
agents=[research_agent, analysis_agent, strategy_agent],
max_loops=2,
interactive=True, # Enable the Arasaka dashboard
verbose=True,
)
print("\n🎯 Swarm initialized successfully!")
print(
"📊 Interactive dashboard will be displayed during execution."
)
print(
"💡 The swarm will prompt you for a task when you call swarm.run()"
)
# Run the swarm (task will be prompted interactively)
result = swarm.run()
print("\n✅ Swarm execution completed!")
print("📋 Final result:")
print(result)
if __name__ == "__main__":
main()

@ -0,0 +1,56 @@
"""
Test script for the Arasaka Dashboard functionality.
"""
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
from swarms.structs.agent import Agent
def test_dashboard():
"""Test the dashboard functionality with a simple task."""
# Create simple agents
agent1 = Agent(
agent_name="Test-Agent-1",
agent_description="A test agent for dashboard verification",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
agent2 = Agent(
agent_name="Test-Agent-2",
agent_description="Another test agent for dashboard verification",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
# Create swarm with dashboard
swarm = HierarchicalSwarm(
name="Dashboard Test Swarm",
agents=[agent1, agent2],
max_loops=1,
interactive=True,
verbose=True,
)
print("🧪 Testing Arasaka Dashboard...")
print("📊 Dashboard should appear and prompt for task input")
# Run with a simple task
result = swarm.run(
task="Create a simple summary of artificial intelligence trends"
)
print("\n✅ Test completed!")
print("📋 Result preview:")
print(
str(result)[:500] + "..."
if len(str(result)) > 500
else str(result)
)
if __name__ == "__main__":
test_dashboard()

@ -0,0 +1,56 @@
"""
Test script for full agent output display in the Arasaka Dashboard.
"""
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
from swarms.structs.agent import Agent
def test_full_output():
"""Test the full output display functionality."""
print("🔍 Testing full agent output display...")
# Create agents that will produce substantial output
agent1 = Agent(
agent_name="Research-Agent",
agent_description="A research agent that produces detailed output",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
agent2 = Agent(
agent_name="Analysis-Agent",
agent_description="An analysis agent that provides comprehensive analysis",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
# Create swarm with dashboard and detailed view enabled
swarm = HierarchicalSwarm(
name="Full Output Test Swarm",
description="A test swarm for verifying full agent output display",
agents=[agent1, agent2],
max_loops=1,
interactive=True,
verbose=True,
)
print("✅ Created swarm with detailed view enabled")
print(
"📊 Dashboard should show full agent outputs without truncation"
)
# Run with a task that will generate substantial output
swarm.run(
task="Provide a comprehensive analysis of artificial intelligence trends in 2024, including detailed explanations of each trend"
)
print("\n✅ Test completed!")
print("📋 Check the dashboard for full agent outputs")
if __name__ == "__main__":
test_full_output()

@ -0,0 +1,57 @@
"""
Test script for multi-loop agent tracking in the Arasaka Dashboard.
"""
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
from swarms.structs.agent import Agent
def test_multi_loop():
"""Test the multi-loop agent tracking functionality."""
print("🔍 Testing multi-loop agent tracking...")
# Create agents
agent1 = Agent(
agent_name="Research-Agent",
agent_description="A research agent for multi-loop testing",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
agent2 = Agent(
agent_name="Analysis-Agent",
agent_description="An analysis agent for multi-loop testing",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
# Create swarm with multiple loops
swarm = HierarchicalSwarm(
name="Multi-Loop Test Swarm",
description="A test swarm for verifying multi-loop agent tracking",
agents=[agent1, agent2],
max_loops=3, # Multiple loops to test history tracking
interactive=True,
verbose=True,
)
print("✅ Created swarm with multi-loop tracking")
print(
"📊 Dashboard should show agent outputs across multiple loops"
)
print("🔄 Each loop will add new rows to the monitoring matrix")
# Run with a task that will benefit from multiple iterations
swarm.run(
task="Analyze the impact of artificial intelligence on healthcare, then refine the analysis with additional insights, and finally provide actionable recommendations"
)
print("\n✅ Multi-loop test completed!")
print("📋 Check the dashboard for agent outputs across all loops")
if __name__ == "__main__":
test_multi_loop()

@ -0,0 +1,44 @@
from transformers import pipeline
from swarms import Agent
class GPTOSS:
def __init__(
self,
model_id: str = "openai/gpt-oss-20b",
max_new_tokens: int = 256,
temperature: int = 0.7,
system_prompt: str = "You are a helpful assistant.",
):
self.max_new_tokens = max_new_tokens
self.temperature = temperature
self.system_prompt = system_prompt
self.model_id = model_id
self.pipe = pipeline(
"text-generation",
model=model_id,
torch_dtype="auto",
device_map="auto",
temperature=temperature,
)
def run(self, task: str):
self.messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task},
]
outputs = self.pipe(
self.messages,
max_new_tokens=self.max_new_tokens,
)
return outputs[0]["generated_text"][-1]
agent = Agent(
name="GPT-OSS-Agent",
llm=GPTOSS(),
system_prompt="You are a helpful assistant.",
)
agent.run(task="Explain quantum mechanics clearly and concisely.")

@ -0,0 +1,49 @@
from swarms import Agent
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
model_name="groq/openai/gpt-oss-120b",
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_loops="auto",
interactive=True,
no_reasoning_prompt=True,
streaming_on=True,
# dashboard=True
)
out = agent.run(
task="What are the best top 3 etfs for gold coverage?"
)
print(out)

@ -0,0 +1,24 @@
from swarms import HierarchicalSwarm, Agent
# Create agents
research_agent = Agent(
agent_name="Research-Analyst", model_name="gpt-4.1", print_on=True
)
analysis_agent = Agent(
agent_name="Data-Analyst", model_name="gpt-4.1", print_on=True
)
# Create swarm with interactive dashboard
swarm = HierarchicalSwarm(
agents=[research_agent, analysis_agent],
max_loops=1,
interactive=True, # Enable the Arasaka dashboard
# director_reasoning_enabled=False,
# director_reasoning_model_name="groq/moonshotai/kimi-k2-instruct",
multi_agent_prompt_improvements=True,
)
# Run swarm (task will be prompted interactively)
result = swarm.run("what are the best nanomachine research papers?")
print(result)

@ -7,3 +7,8 @@ The reasoning process and the final answer should be distinctly enclosed within
It is essential to output multiple <think> </think> tags to reflect the depth of thought and exploration involved in addressing the task. The Assistant should strive to think deeply and thoroughly about the question, ensuring that all relevant aspects are considered before arriving at a conclusion.
"""
INTERNAL_MONOLGUE_PROMPT = """
You are an introspective reasoning engine whose sole task is to explore and unpack any problem or task without ever delivering a final solution. Whenever you process a prompt, you must envelope every discrete insight, question, or inference inside <think> and </think> tags, using as many of these tagsnested or sequentialas needed to reveal your full chain of thought. Begin each session by rephrasing the problem in your own words to ensure youve captured its goals, inputs, outputs, and constraintsentirely within <think> blocksand identify any ambiguities or assumptions you must clarify. Then decompose the task into sub-questions or logical components, examining multiple approaches, edge cases, and trade-offs, all inside further <think> tags. Continue layering your reasoning, pausing at each step to ask yourself What else might I consider? or Is there an implicit assumption here?always inside <think></think>. Never move beyond analysis: do not generate outlines, pseudocode, or answersonly think. If you find yourself tempted to propose a solution, immediately halt and circle back into deeper <think> tags. Your objective is total transparency of reasoning and exhaustive exploration of the problem space; defer any answer generation until explicitly instructed otherwise.
"""

@ -94,6 +94,7 @@ from swarms.structs.swarming_architectures import (
star_swarm,
)
__all__ = [
"Agent",
"BaseStructure",

@ -102,7 +102,7 @@ def parse_done_token(response: str) -> bool:
# Agent ID generator
def agent_id():
"""Generate an agent id"""
return uuid.uuid4().hex
return f"agent-{uuid.uuid4().hex}"
# Agent output types
@ -673,7 +673,7 @@ class Agent:
# Initialize the short term memory
memory = Conversation(
system_prompt=prompt,
name=f"{self.agent_name}_conversation",
user=self.user_name,
rules=self.rules,
token_count=(
@ -693,6 +693,12 @@ class Agent:
),
)
# Add the system prompt to the conversation
memory.add(
role="System",
content=prompt,
)
return memory
def agent_output_model(self):
@ -861,7 +867,9 @@ class Agent:
return tools
except AgentMCPConnectionError as e:
logger.error(f"Error in MCP connection: {e}")
logger.error(
f"Error in MCP connection: {e} Traceback: {traceback.format_exc()}"
)
raise e
def setup_config(self):
@ -1172,7 +1180,8 @@ class Agent:
if self.print_on is True:
if isinstance(response, list):
self.pretty_print(
f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ",
# f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ",
f"[Structured Output] [Time: {time.strftime('%H:%M:%S')}] \n\n {json.dumps(response, indent=4)}",
loop_count,
)
elif self.streaming_on:
@ -2457,6 +2466,10 @@ class Agent:
Returns:
Dict[str, Any]: A dictionary representation of the class attributes.
"""
# Remove the llm object from the dictionary
self.__dict__.pop("llm", None)
return {
attr_name: self._serialize_attr(attr_name, attr_value)
for attr_name, attr_value in self.__dict__.items()

@ -1,11 +1,16 @@
import concurrent.futures
from swarms.structs.agent import Agent
from typing import List
from typing import List, Union, Callable
import os
from swarms.utils.formatter import formatter
from loguru import logger
import traceback
def batch_agent_execution(
agents: List[Agent],
tasks: List[str],
agents: List[Union[Agent, Callable]],
tasks: List[str] = None,
imgs: List[str] = None,
):
"""
Execute a batch of agents on a list of tasks concurrently.
@ -20,18 +25,21 @@ def batch_agent_execution(
Raises:
ValueError: If number of agents doesn't match number of tasks
"""
try:
logger.info(
f"Executing {len(agents)} agents on {len(tasks)} tasks"
)
if len(agents) != len(tasks):
raise ValueError(
"Number of agents must match number of tasks"
)
import concurrent.futures
import multiprocessing
results = []
# Calculate max workers as 90% of available CPU cores
max_workers = max(1, int(multiprocessing.cpu_count() * 0.9))
max_workers = max(1, int(os.cpu_count() * 0.9))
formatter.print_panel(
f"Executing {len(agents)} agents on {len(tasks)} tasks using {max_workers} workers"
@ -42,12 +50,18 @@ def batch_agent_execution(
) as executor:
# Submit all tasks to the executor
future_to_task = {
executor.submit(agent.run, task): (agent, task)
for agent, task in zip(agents, tasks)
executor.submit(agent.run, task, imgs): (
agent,
task,
imgs,
)
for agent, task, imgs in zip(agents, tasks, imgs)
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_task):
for future in concurrent.futures.as_completed(
future_to_task
):
agent, task = future_to_task[future]
try:
result = future.result()
@ -62,3 +76,7 @@ def batch_agent_execution(
concurrent.futures.wait(future_to_task.keys())
return results
except Exception as e:
log = f"Batch agent execution failed Error: {str(e)} Traceback: {traceback.format_exc()}"
logger.error(log)
raise e

File diff suppressed because it is too large Load Diff

@ -18,6 +18,9 @@ Todo
- Auto build agents from input prompt - and then add them to the swarm
- Create an interactive and dynamic UI like we did with heavy swarm
- Make it faster and more high performance
- Enable the director to choose a multi-agent approach to the task, it orchestrates how the agents talk and work together.
- Improve the director feedback, maybe add agent as a judge to the worker agent instead of the director.
- Use agent rearrange to orchestrate the agents
Classes:
HierarchicalOrder: Represents a single task assignment to a specific agent
@ -25,14 +28,26 @@ Classes:
HierarchicalSwarm: Main swarm orchestrator that manages director and worker agents
"""
import time
import traceback
from typing import Any, Callable, List, Optional, Union
from loguru import logger
from pydantic import BaseModel, Field
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from swarms.prompts.hiearchical_system_prompt import (
HIEARCHICAL_SWARM_SYSTEM_PROMPT,
)
from swarms.prompts.multi_agent_collab_prompt import (
MULTI_AGENT_COLLAB_PROMPT_TWO,
)
from swarms.prompts.reasoning_prompt import INTERNAL_MONOLGUE_PROMPT
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import list_all_agents
@ -40,10 +55,507 @@ from swarms.tools.base_tool import BaseTool
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
logger = initialize_logger(log_folder="hierarchical_swarm")
class HierarchicalSwarmDashboard:
"""
Futuristic Arasaka Corporation-style dashboard for hierarchical swarm monitoring.
This dashboard provides a professional, enterprise-grade interface with red and black
color scheme, real-time monitoring of swarm operations, and cyberpunk aesthetics.
Attributes:
console (Console): Rich console instance for rendering
live_display (Live): Live display for real-time updates
swarm_name (str): Name of the swarm being monitored
agent_statuses (dict): Current status of all agents
director_status (str): Current status of the director
current_loop (int): Current execution loop
max_loops (int): Maximum number of loops
is_active (bool): Whether the dashboard is currently active
"""
def __init__(self, swarm_name: str = "Swarms Corporation"):
"""
Initialize the Arasaka dashboard.
Args:
swarm_name (str): Name of the swarm to display in the dashboard
"""
self.console = Console()
self.live_display = None
self.swarm_name = swarm_name
self.agent_statuses = {}
self.director_status = "INITIALIZING"
self.current_loop = 0
self.max_loops = 1
self.is_active = False
self.start_time = None
self.spinner_frames = [
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
]
self.spinner_idx = 0
# Director information tracking
self.director_plan = ""
self.director_orders = []
# Swarm information
self.swarm_description = ""
self.director_name = "Director"
self.director_model_name = "gpt-4o-mini"
# View mode for agents display
self.detailed_view = False
# Multi-loop agent tracking
self.agent_history = {} # Track agent outputs across loops
self.current_loop = 0
def _get_spinner(self) -> str:
"""Get current spinner frame for loading animations."""
self.spinner_idx = (self.spinner_idx + 1) % len(
self.spinner_frames
)
return self.spinner_frames[self.spinner_idx]
def _create_header(self) -> Panel:
"""Create the dashboard header with Swarms Corporation branding."""
header_text = Text()
header_text.append(
"╔══════════════════════════════════════════════════════════════════════════════╗\n",
style="bold red",
)
header_text.append("", style="bold red")
header_text.append(" ", style="bold red")
header_text.append(
"SWARMS CORPORATION", style="bold white on red"
)
header_text.append(" ", style="bold red")
header_text.append("\n", style="bold red")
header_text.append("", style="bold red")
header_text.append(" ", style="bold red")
header_text.append(
"HIERARCHICAL SWARM OPERATIONS CENTER", style="bold red"
)
header_text.append(" ", style="bold red")
header_text.append("\n", style="bold red")
header_text.append(
"╚══════════════════════════════════════════════════════════════════════════════╝",
style="bold red",
)
return Panel(
header_text,
border_style="red",
padding=(0, 1),
)
def _create_status_panel(self) -> Panel:
"""Create the operations status panel."""
status_text = Text()
# Corporation branding and operation type
status_text.append(
"By the Swarms Corporation", style="bold cyan"
)
status_text.append("\n", style="white")
status_text.append(
"Hierarchical Agent Operations", style="bold white"
)
status_text.append("\n\n", style="white")
# Swarm information
status_text.append("SWARM NAME: ", style="bold white")
status_text.append(f"{self.swarm_name}", style="bold cyan")
status_text.append("\n", style="white")
status_text.append("DESCRIPTION: ", style="bold white")
status_text.append(f"{self.swarm_description}", style="white")
status_text.append("\n", style="white")
status_text.append("DIRECTOR: ", style="bold white")
status_text.append(
f"{self.director_name} ({self.director_model_name})",
style="cyan",
)
status_text.append("\n", style="white")
status_text.append("TOTAL LOOPS: ", style="bold white")
status_text.append(f"{self.max_loops}", style="bold cyan")
status_text.append(" | ", style="white")
status_text.append("CURRENT LOOP: ", style="bold white")
status_text.append(
f"{self.current_loop}", style="bold yellow"
)
# Agent count metadata
agent_count = len(getattr(self, "agent_history", {}))
status_text.append(" | ", style="white")
status_text.append("AGENTS: ", style="bold white")
status_text.append(f"{agent_count}", style="bold green")
status_text.append("\n\n", style="white")
# Director status
status_text.append("DIRECTOR STATUS: ", style="bold white")
if self.director_status == "INITIALIZING":
status_text.append(
f"{self._get_spinner()} {self.director_status}",
style="bold yellow",
)
elif self.director_status == "ACTIVE":
status_text.append(
f"{self.director_status}", style="bold green"
)
elif self.director_status == "PROCESSING":
status_text.append(
f"{self._get_spinner()} {self.director_status}",
style="bold cyan",
)
else:
status_text.append(
f"{self.director_status}", style="bold red"
)
status_text.append("\n\n", style="white")
# Runtime and completion information
if self.start_time:
runtime = time.time() - self.start_time
status_text.append("RUNTIME: ", style="bold white")
status_text.append(f"{runtime:.2f}s", style="bold green")
# Add completion percentage if loops are running
if self.max_loops > 0:
completion_percent = (
self.current_loop / self.max_loops
) * 100
status_text.append(" | ", style="white")
status_text.append("PROGRESS: ", style="bold white")
status_text.append(
f"{completion_percent:.1f}%", style="bold cyan"
)
return Panel(
status_text,
border_style="red",
padding=(1, 2),
title="[bold white]OPERATIONS STATUS[/bold white]",
)
def _create_agents_table(self) -> Table:
"""Create the agents monitoring table with full outputs and loop history."""
table = Table(
show_header=True,
header_style="bold white on red",
border_style="red",
title="[bold white]AGENT MONITORING MATRIX[/bold white]",
title_style="bold white",
show_lines=True,
)
table.add_column("AGENT ID", style="bold cyan", width=25)
table.add_column("LOOP", style="bold white", width=8)
table.add_column("STATUS", style="bold white", width=15)
table.add_column("TASK", style="white", width=40)
table.add_column("OUTPUT", style="white", width=150)
# Display agents with their history across loops
for agent_name, history in self.agent_history.items():
for loop_num in range(self.max_loops + 1):
loop_key = f"Loop_{loop_num}"
if loop_key in history:
loop_data = history[loop_key]
status = loop_data.get("status", "UNKNOWN")
task = loop_data.get("task", "N/A")
output = loop_data.get("output", "")
# Style status
if status == "RUNNING":
status_display = (
f"{self._get_spinner()} {status}"
)
status_style = "bold yellow"
elif status == "COMPLETED":
status_display = f"{status}"
status_style = "bold green"
elif status == "PENDING":
status_display = f"{status}"
status_style = "bold red"
else:
status_display = f"{status}"
status_style = "bold red"
# Show full output without truncation
output_display = output if output else "No output"
table.add_row(
Text(agent_name, style="bold cyan"),
Text(f"Loop {loop_num}", style="bold white"),
Text(status_display, style=status_style),
Text(task, style="white"),
Text(output_display, style="white"),
)
return table
def _create_detailed_agents_view(self) -> Panel:
"""Create a detailed view of agents with full outputs and loop history."""
detailed_text = Text()
for agent_name, history in self.agent_history.items():
detailed_text.append(
f"AGENT: {agent_name}\n", style="bold cyan"
)
detailed_text.append("=" * 80 + "\n", style="red")
for loop_num in range(self.max_loops + 1):
loop_key = f"Loop_{loop_num}"
if loop_key in history:
loop_data = history[loop_key]
status = loop_data.get("status", "UNKNOWN")
task = loop_data.get("task", "N/A")
output = loop_data.get("output", "")
detailed_text.append(
f"LOOP {loop_num}:\n", style="bold white"
)
detailed_text.append(
f"STATUS: {status}\n", style="bold white"
)
detailed_text.append(
f"TASK: {task}\n", style="white"
)
detailed_text.append(
"OUTPUT:\n", style="bold white"
)
detailed_text.append(f"{output}\n", style="white")
detailed_text.append("" * 80 + "\n", style="red")
return Panel(
detailed_text,
border_style="red",
padding=(1, 2),
title="[bold white]DETAILED AGENT OUTPUTS (FULL HISTORY)[/bold white]",
)
def _create_director_panel(self) -> Panel:
"""Create the director information panel showing plan and orders."""
director_text = Text()
# Plan section
director_text.append("DIRECTOR PLAN:\n", style="bold white")
if self.director_plan:
director_text.append(self.director_plan, style="white")
else:
director_text.append(
"No plan available", style="dim white"
)
director_text.append("\n\n", style="white")
# Orders section
director_text.append("CURRENT ORDERS:\n", style="bold white")
if self.director_orders:
for i, order in enumerate(
self.director_orders
): # Show first 5 orders
director_text.append(f"{i+1}. ", style="bold cyan")
director_text.append(
f"{order.get('agent_name', 'Unknown')}: ",
style="bold white",
)
task = order.get("task", "No task")
director_text.append(task, style="white")
director_text.append("\n", style="white")
if len(self.director_orders) > 5:
director_text.append(
f"... and {len(self.director_orders) - 5} more orders",
style="dim white",
)
else:
director_text.append(
"No orders available", style="dim white"
)
return Panel(
director_text,
border_style="red",
padding=(1, 2),
title="[bold white]DIRECTOR OPERATIONS[/bold white]",
)
def _create_dashboard_layout(self) -> Layout:
"""Create the complete dashboard layout."""
layout = Layout()
# Split into operations status, director operations, and agents
layout.split_column(
Layout(name="operations_status", size=12),
Layout(name="director_operations", size=12),
Layout(name="agents", ratio=1),
)
# Add content to each section
layout["operations_status"].update(
self._create_status_panel()
)
layout["director_operations"].update(
self._create_director_panel()
)
# Choose between table view and detailed view
if self.detailed_view:
layout["agents"].update(
self._create_detailed_agents_view()
)
else:
layout["agents"].update(
Panel(
self._create_agents_table(),
border_style="red",
padding=(1, 1),
)
)
return layout
def start(self, max_loops: int = 1):
"""Start the dashboard display."""
self.max_loops = max_loops
self.start_time = time.time()
self.is_active = True
self.live_display = Live(
self._create_dashboard_layout(),
console=self.console,
refresh_per_second=10,
transient=False,
)
self.live_display.start()
def update_agent_status(
self,
agent_name: str,
status: str,
task: str = "",
output: str = "",
):
"""Update the status of a specific agent."""
# Create loop key for tracking history
loop_key = f"Loop_{self.current_loop}"
# Initialize agent history if not exists
if agent_name not in self.agent_history:
self.agent_history[agent_name] = {}
# Store current status and add to history
self.agent_statuses[agent_name] = {
"status": status,
"task": task,
"output": output,
}
# Add to history for this loop
self.agent_history[agent_name][loop_key] = {
"status": status,
"task": task,
"output": output,
}
if self.live_display and self.is_active:
self.live_display.update(self._create_dashboard_layout())
def update_director_status(self, status: str):
"""Update the director status."""
self.director_status = status
if self.live_display and self.is_active:
self.live_display.update(self._create_dashboard_layout())
def update_loop(self, current_loop: int):
"""Update the current execution loop."""
self.current_loop = current_loop
if self.live_display and self.is_active:
self.live_display.update(self._create_dashboard_layout())
def update_director_plan(self, plan: str):
"""Update the director's plan."""
self.director_plan = plan
if self.live_display and self.is_active:
self.live_display.update(self._create_dashboard_layout())
def update_director_orders(self, orders: list):
"""Update the director's orders."""
self.director_orders = orders
if self.live_display and self.is_active:
self.live_display.update(self._create_dashboard_layout())
def stop(self):
"""Stop the dashboard display."""
self.is_active = False
if self.live_display:
self.live_display.stop()
self.console.print()
def update_swarm_info(
self,
name: str,
description: str,
max_loops: int,
director_name: str,
director_model_name: str,
):
"""Update the dashboard with swarm-specific information."""
self.swarm_name = name
self.swarm_description = description
self.max_loops = max_loops
self.director_name = director_name
self.director_model_name = director_model_name
if self.live_display and self.is_active:
self.live_display.update(self._create_dashboard_layout())
def force_refresh(self):
"""Force refresh the dashboard display."""
if self.live_display and self.is_active:
self.live_display.update(self._create_dashboard_layout())
def show_full_output(self, agent_name: str, full_output: str):
"""Display full agent output in a separate panel."""
if self.live_display and self.is_active:
# Create a full output panel
output_panel = Panel(
Text(full_output, style="white"),
title=f"[bold white]FULL OUTPUT - {agent_name}[/bold white]",
border_style="red",
padding=(1, 2),
width=120,
)
# Temporarily show the full output
self.console.print(output_panel)
self.console.print() # Add spacing
def toggle_detailed_view(self):
"""Toggle between table view and detailed view."""
self.detailed_view = not self.detailed_view
if self.live_display and self.is_active:
self.live_display.update(self._create_dashboard_layout())
class HierarchicalOrder(BaseModel):
@ -71,6 +583,25 @@ class HierarchicalOrder(BaseModel):
)
class HierarchicalOrderRearrange(BaseModel):
"""
Represents a single task assignment within the hierarchical swarm.
This class defines the structure for individual task orders that the director
distributes to worker agents. Each order specifies which agent should execute
what specific task.
"""
initial_task: str = Field(
...,
description="The initial task that the director has to execute.",
)
flow_of_communication: str = Field(
...,
description="How the agents will communicate with each other to accomplish the task. Like agent_one -> agent_two -> agent_three -> agent_four -> agent_one, can use comma signs to denote sequential communication and commas to denote parallel communication for example agent_one -> agent_two, agent_three -> agent_four",
)
class SwarmSpec(BaseModel):
"""
Defines the complete specification for a hierarchical swarm execution.
@ -86,10 +617,20 @@ class SwarmSpec(BaseModel):
individual agents within the swarm.
"""
# # thoughts: str = Field(
# # ...,
# # description="A plan generated by the director agent for the swarm to accomplish the given task, where the director autonomously reasons through the problem, devises its own strategy, and determines the sequence of actions. "
# # "This plan reflects the director's independent thought process, outlining the rationale, priorities, and steps it deems necessary for successful execution. "
# # "It serves as a blueprint for the swarm, enabling agents to follow the director's self-derived guidance and adapt as needed throughout the process.",
# )
plan: str = Field(
...,
description="Outlines the sequence of actions to be taken by the swarm. This plan is a detailed roadmap that guides the swarm's behavior and decision-making.",
description="A plan generated by the director agent for the swarm to accomplish the given task, where the director autonomously reasons through the problem, devises its own strategy, and determines the sequence of actions. "
"This plan reflects the director's independent thought process, outlining the rationale, priorities, and steps it deems necessary for successful execution. "
"It serves as a blueprint for the swarm, enabling agents to follow the director's self-derived guidance and adapt as needed throughout the process.",
)
orders: List[HierarchicalOrder] = Field(
...,
description="A collection of task assignments to specific agents within the swarm. These orders are the specific instructions that guide the agents in their task execution and are a key element in the swarm's plan.",
@ -143,6 +684,11 @@ class HierarchicalSwarm:
Union[Agent, Callable, Any]
] = None,
director_feedback_on: bool = True,
interactive: bool = False,
director_system_prompt: str = HIEARCHICAL_SWARM_SYSTEM_PROMPT,
director_reasoning_model_name: str = "o3-mini",
director_reasoning_enabled: bool = True,
multi_agent_prompt_improvements: bool = False,
*args,
**kwargs,
):
@ -187,9 +733,79 @@ class HierarchicalSwarm:
self.add_collaboration_prompt = add_collaboration_prompt
self.planning_director_agent = planning_director_agent
self.director_feedback_on = director_feedback_on
self.interactive = interactive
self.director_system_prompt = director_system_prompt
self.director_reasoning_model_name = (
director_reasoning_model_name
)
self.director_reasoning_enabled = director_reasoning_enabled
self.multi_agent_prompt_improvements = (
multi_agent_prompt_improvements
)
if self.interactive:
self.agents_no_print()
# Initialize dashboard if interactive mode is enabled
self.dashboard = None
if self.interactive:
self.dashboard = HierarchicalSwarmDashboard(self.name)
# Enable detailed view for better output visibility
self.dashboard.detailed_view = True
# Pass additional swarm information to dashboard
self.dashboard.update_swarm_info(
name=self.name,
description=self.description,
max_loops=self.max_loops,
director_name=self.director_name,
director_model_name=self.director_model_name,
)
self.init_swarm()
def list_worker_agents(self) -> str:
return list_all_agents(
agents=self.agents,
add_to_conversation=False,
)
def prepare_worker_agents(self):
for agent in self.agents:
prompt = (
MULTI_AGENT_COLLAB_PROMPT_TWO
+ self.list_worker_agents()
)
if hasattr(agent, "system_prompt"):
agent.system_prompt += prompt
else:
agent.system_prompt = prompt
def reasoning_agent_run(
self, task: str, img: Optional[str] = None
):
"""
Run a reasoning agent to analyze the task before the main director processes it.
Args:
task (str): The task to reason about
img (Optional[str]): Optional image input
Returns:
str: The reasoning output from the agent
"""
agent = Agent(
agent_name=self.director_name,
agent_description=f"You're the {self.director_name} agent that is responsible for reasoning about the task and creating a plan for the swarm to accomplish the task.",
model_name=self.director_reasoning_model_name,
system_prompt=INTERNAL_MONOLGUE_PROMPT
+ self.director_system_prompt,
max_loops=1,
)
prompt = f"Conversation History: {self.conversation.get_str()} \n\n Task: {task}"
return agent.run(task=prompt, img=img)
def init_swarm(self):
"""
Initialize the swarm with proper configuration and validation.
@ -216,11 +832,27 @@ class HierarchicalSwarm:
self.add_context_to_director()
# Initialize agent statuses in dashboard if interactive mode
if self.interactive and self.dashboard:
for agent in self.agents:
if hasattr(agent, "agent_name"):
self.dashboard.update_agent_status(
agent.agent_name,
"PENDING",
"Awaiting task assignment",
"Ready for deployment",
)
# Force refresh to ensure agents are displayed
self.dashboard.force_refresh()
if self.verbose:
logger.success(
f"✅ HierarchicalSwarm: {self.name} initialized successfully."
)
if self.multi_agent_prompt_improvements:
self.prepare_worker_agents()
def add_context_to_director(self):
"""
Add agent context and collaboration information to the director's conversation.
@ -282,6 +914,7 @@ class HierarchicalSwarm:
return Agent(
agent_name=self.director_name,
agent_description="A director agent that can create a plan and distribute orders to agents",
system_prompt=self.director_system_prompt,
model_name=self.director_model_name,
max_loops=1,
base_model=SwarmSpec,
@ -333,6 +966,10 @@ class HierarchicalSwarm:
error_msg = f"❌ Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def agents_no_print(self):
for agent in self.agents:
agent.print_on = False
def run_director(
self,
task: str,
@ -358,9 +995,7 @@ class HierarchicalSwarm:
"""
try:
if self.verbose:
logger.info(
f"🎯 Running director with task: {task[:100]}..."
)
logger.info(f"🎯 Running director with task: {task}")
if self.planning_director_agent is not None:
plan = self.planning_director_agent.run(
@ -370,6 +1005,12 @@ class HierarchicalSwarm:
task += plan
if self.director_reasoning_enabled:
reasoning_output = self.reasoning_agent_run(
task=task, img=img
)
task += f"\n\n Reasoning: {reasoning_output}"
# Run the director with the context
function_call = self.director.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
@ -391,6 +1032,7 @@ class HierarchicalSwarm:
except Exception as e:
error_msg = f"❌ Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
raise e
def step(self, task: str, img: str = None, *args, **kwargs):
"""
@ -417,9 +1059,13 @@ class HierarchicalSwarm:
try:
if self.verbose:
logger.info(
f"👣 Executing single step for task: {task[:100]}..."
f"👣 Executing single step for task: {task}"
)
# Update dashboard for director execution
if self.interactive and self.dashboard:
self.dashboard.update_director_status("PLANNING")
output = self.run_director(task=task, img=img)
# Parse the orders
@ -430,6 +1076,20 @@ class HierarchicalSwarm:
f"📋 Parsed plan and {len(orders)} orders"
)
# Update dashboard with plan and orders information
if self.interactive and self.dashboard:
self.dashboard.update_director_plan(plan)
# Convert orders to list of dicts for dashboard
orders_list = [
{
"agent_name": order.agent_name,
"task": order.task,
}
for order in orders
]
self.dashboard.update_director_orders(orders_list)
self.dashboard.update_director_status("EXECUTING")
# Execute the orders
outputs = self.execute_orders(orders)
@ -450,7 +1110,13 @@ class HierarchicalSwarm:
error_msg = f"❌ Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def run(self, task: str, img: str = None, *args, **kwargs):
def run(
self,
task: Optional[str] = None,
img: Optional[str] = None,
*args,
**kwargs,
):
"""
Execute the hierarchical swarm for the specified number of feedback loops.
@ -462,7 +1128,8 @@ class HierarchicalSwarm:
context from previous iterations to subsequent ones.
Args:
task (str): The initial task to be processed by the swarm.
task (str, optional): The initial task to be processed by the swarm.
If None and interactive mode is enabled, will prompt for input.
img (str, optional): Optional image input for the agents.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
@ -475,9 +1142,23 @@ class HierarchicalSwarm:
Exception: If swarm execution fails.
"""
try:
# Handle interactive mode task input
if task is None and self.interactive:
task = self._get_interactive_task()
# if task is None:
# raise ValueError(
# "Task is required for swarm execution"
# )
current_loop = 0
last_output = None
# Start dashboard if in interactive mode
if self.interactive and self.dashboard:
self.dashboard.start(self.max_loops)
self.dashboard.update_director_status("ACTIVE")
if self.verbose:
logger.info(
f"🚀 Starting hierarchical swarm run: {self.name}"
@ -492,6 +1173,13 @@ class HierarchicalSwarm:
f"🔄 Loop {current_loop + 1}/{self.max_loops} - Processing task"
)
# Update dashboard loop counter
if self.interactive and self.dashboard:
self.dashboard.update_loop(current_loop + 1)
self.dashboard.update_director_status(
"PROCESSING"
)
# For the first loop, use the original task.
# For subsequent loops, use the feedback from the previous loop as context.
if current_loop == 0:
@ -527,6 +1215,11 @@ class HierarchicalSwarm:
content=f"--- Loop {current_loop}/{self.max_loops} completed ---",
)
# Stop dashboard if in interactive mode
if self.interactive and self.dashboard:
self.dashboard.update_director_status("COMPLETED")
self.dashboard.stop()
if self.verbose:
logger.success(
f"🎉 Hierarchical swarm run completed: {self.name}"
@ -540,9 +1233,32 @@ class HierarchicalSwarm:
)
except Exception as e:
# Stop dashboard on error
if self.interactive and self.dashboard:
self.dashboard.update_director_status("ERROR")
self.dashboard.stop()
error_msg = f"❌ Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
def _get_interactive_task(self) -> str:
"""
Get task input from user in interactive mode.
Returns:
str: The task input from the user
"""
if self.dashboard:
self.dashboard.console.print(
"\n[bold red]SWARMS CORPORATION[/bold red] - [bold white]TASK INPUT REQUIRED[/bold white]"
)
self.dashboard.console.print(
"[bold cyan]Enter your task for the hierarchical swarm:[/bold cyan]"
)
task = input("> ")
return task.strip()
def feedback_director(self, outputs: list):
"""
Generate feedback from the director based on agent outputs.
@ -646,6 +1362,12 @@ class HierarchicalSwarm:
f"Agent '{agent_name}' not found in swarm. Available agents: {available_agents}"
)
# Update dashboard for agent execution
if self.interactive and self.dashboard:
self.dashboard.update_agent_status(
agent_name, "RUNNING", task, "Executing task..."
)
output = agent.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
*args,
@ -661,6 +1383,12 @@ class HierarchicalSwarm:
return output
except Exception as e:
# Update dashboard with error status
if self.interactive and self.dashboard:
self.dashboard.update_agent_status(
agent_name, "ERROR", task, f"Error: {str(e)}"
)
error_msg = f"❌ Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
@ -805,8 +1533,9 @@ class HierarchicalSwarm:
)
except Exception as e:
error_msg = f"❌ Failed to setup director: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
error_msg = f"❌ Failed to parse orders: {str(e)}\n🔍 Traceback: {traceback.format_exc()}\n🐛 If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues"
logger.error(error_msg)
raise e
def execute_orders(self, orders: list):
"""
@ -836,9 +1565,31 @@ class HierarchicalSwarm:
f"📋 Executing order {i+1}/{len(orders)}: {order.agent_name}"
)
# Update dashboard for agent execution
if self.interactive and self.dashboard:
self.dashboard.update_agent_status(
order.agent_name,
"RUNNING",
order.task,
"Processing...",
)
output = self.call_single_agent(
order.agent_name, order.task
)
# Update dashboard with completed status
if self.interactive and self.dashboard:
# Always show full output without truncation
output_display = str(output)
self.dashboard.update_agent_status(
order.agent_name,
"COMPLETED",
order.task,
output_display,
)
outputs.append(output)
if self.verbose:

@ -15,8 +15,7 @@ The test suite follows the Swarms testing philosophy:
import os
import pytest
import asyncio
from unittest.mock import Mock, patch, MagicMock, AsyncMock
from typing import List, Dict, Any, Optional
from unittest.mock import Mock, patch, AsyncMock
from swarms.structs.board_of_directors_swarm import (
BoardOfDirectorsSwarm,
@ -28,7 +27,6 @@ from swarms.structs.board_of_directors_swarm import (
BoardSpec,
)
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
# Test fixtures
@ -50,7 +48,7 @@ def mock_board_member(mock_agent):
agent=mock_agent,
role=BoardMemberRole.CHAIRMAN,
voting_weight=1.5,
expertise_areas=["leadership", "strategy"]
expertise_areas=["leadership", "strategy"],
)
@ -70,7 +68,11 @@ def sample_agents():
@pytest.fixture
def sample_board_members(sample_agents):
"""Create sample board members for testing."""
roles = [BoardMemberRole.CHAIRMAN, BoardMemberRole.VICE_CHAIRMAN, BoardMemberRole.SECRETARY]
roles = [
BoardMemberRole.CHAIRMAN,
BoardMemberRole.VICE_CHAIRMAN,
BoardMemberRole.SECRETARY,
]
board_members = []
for i, (agent, role) in enumerate(zip(sample_agents, roles)):
@ -78,7 +80,7 @@ def sample_board_members(sample_agents):
agent=agent,
role=role,
voting_weight=1.0 + (i * 0.2),
expertise_areas=[f"expertise_{i+1}"]
expertise_areas=[f"expertise_{i+1}"],
)
board_members.append(board_member)
@ -92,7 +94,7 @@ def basic_board_swarm(sample_agents):
name="TestBoard",
agents=sample_agents,
verbose=False,
max_loops=1
max_loops=1,
)
@ -109,7 +111,7 @@ def configured_board_swarm(sample_agents, sample_board_members):
decision_threshold=0.7,
enable_voting=True,
enable_consensus=True,
max_workers=4
max_workers=4,
)
@ -124,7 +126,9 @@ class TestBoardMemberRole:
assert BoardMemberRole.SECRETARY == "secretary"
assert BoardMemberRole.TREASURER == "treasurer"
assert BoardMemberRole.MEMBER == "member"
assert BoardMemberRole.EXECUTIVE_DIRECTOR == "executive_director"
assert (
BoardMemberRole.EXECUTIVE_DIRECTOR == "executive_director"
)
class TestBoardDecisionType:
@ -135,7 +139,9 @@ class TestBoardDecisionType:
assert BoardDecisionType.UNANIMOUS == "unanimous"
assert BoardDecisionType.MAJORITY == "majority"
assert BoardDecisionType.CONSENSUS == "consensus"
assert BoardDecisionType.CHAIRMAN_DECISION == "chairman_decision"
assert (
BoardDecisionType.CHAIRMAN_DECISION == "chairman_decision"
)
class TestBoardMember:
@ -147,19 +153,21 @@ class TestBoardMember:
agent=mock_agent,
role=BoardMemberRole.CHAIRMAN,
voting_weight=1.5,
expertise_areas=["leadership", "strategy"]
expertise_areas=["leadership", "strategy"],
)
assert board_member.agent == mock_agent
assert board_member.role == BoardMemberRole.CHAIRMAN
assert board_member.voting_weight == 1.5
assert board_member.expertise_areas == ["leadership", "strategy"]
assert board_member.expertise_areas == [
"leadership",
"strategy",
]
def test_board_member_defaults(self, mock_agent):
"""Test board member with default values."""
board_member = BoardMember(
agent=mock_agent,
role=BoardMemberRole.MEMBER
agent=mock_agent, role=BoardMemberRole.MEMBER
)
assert board_member.voting_weight == 1.0
@ -170,7 +178,7 @@ class TestBoardMember:
board_member = BoardMember(
agent=mock_agent,
role=BoardMemberRole.MEMBER,
expertise_areas=None
expertise_areas=None,
)
assert board_member.expertise_areas == []
@ -186,7 +194,7 @@ class TestBoardOrder:
task="Test task",
priority=1,
deadline="2024-01-01",
assigned_by="Chairman"
assigned_by="Chairman",
)
assert order.agent_name == "TestAgent"
@ -197,10 +205,7 @@ class TestBoardOrder:
def test_board_order_defaults(self):
"""Test board order with default values."""
order = BoardOrder(
agent_name="TestAgent",
task="Test task"
)
order = BoardOrder(agent_name="TestAgent", task="Test task")
assert order.priority == 3
assert order.deadline is None
@ -213,14 +218,14 @@ class TestBoardOrder:
BoardOrder(
agent_name="TestAgent",
task="Test task",
priority=0 # Invalid priority
priority=0, # Invalid priority
)
with pytest.raises(ValueError):
BoardOrder(
agent_name="TestAgent",
task="Test task",
priority=6 # Invalid priority
priority=6, # Invalid priority
)
@ -235,7 +240,7 @@ class TestBoardDecision:
votes_for=3,
votes_against=1,
abstentions=0,
reasoning="The proposal aligns with our strategic goals"
reasoning="The proposal aligns with our strategic goals",
)
assert decision.decision_type == BoardDecisionType.MAJORITY
@ -243,13 +248,16 @@ class TestBoardDecision:
assert decision.votes_for == 3
assert decision.votes_against == 1
assert decision.abstentions == 0
assert decision.reasoning == "The proposal aligns with our strategic goals"
assert (
decision.reasoning
== "The proposal aligns with our strategic goals"
)
def test_board_decision_defaults(self):
"""Test board decision with default values."""
decision = BoardDecision(
decision_type=BoardDecisionType.CONSENSUS,
decision="Test decision"
decision="Test decision",
)
assert decision.votes_for == 0
@ -265,12 +273,12 @@ class TestBoardSpec:
"""Test creating a board spec."""
orders = [
BoardOrder(agent_name="Agent1", task="Task 1"),
BoardOrder(agent_name="Agent2", task="Task 2")
BoardOrder(agent_name="Agent2", task="Task 2"),
]
decisions = [
BoardDecision(
decision_type=BoardDecisionType.MAJORITY,
decision="Decision 1"
decision="Decision 1",
)
]
@ -278,7 +286,7 @@ class TestBoardSpec:
plan="Test plan",
orders=orders,
decisions=decisions,
meeting_summary="Test meeting summary"
meeting_summary="Test meeting summary",
)
assert spec.plan == "Test plan"
@ -288,10 +296,7 @@ class TestBoardSpec:
def test_board_spec_defaults(self):
"""Test board spec with default values."""
spec = BoardSpec(
plan="Test plan",
orders=[]
)
spec = BoardSpec(plan="Test plan", orders=[])
assert spec.decisions == []
assert spec.meeting_summary == ""
@ -304,8 +309,7 @@ class TestBoardOfDirectorsSwarmInitialization:
def test_basic_initialization(self, sample_agents):
"""Test basic swarm initialization."""
swarm = BoardOfDirectorsSwarm(
name="TestSwarm",
agents=sample_agents
name="TestSwarm", agents=sample_agents
)
assert swarm.name == "TestSwarm"
@ -314,7 +318,9 @@ class TestBoardOfDirectorsSwarmInitialization:
assert swarm.verbose is False
assert swarm.decision_threshold == 0.6
def test_configured_initialization(self, sample_agents, sample_board_members):
def test_configured_initialization(
self, sample_agents, sample_board_members
):
"""Test configured swarm initialization."""
swarm = BoardOfDirectorsSwarm(
name="ConfiguredSwarm",
@ -326,7 +332,7 @@ class TestBoardOfDirectorsSwarmInitialization:
decision_threshold=0.8,
enable_voting=False,
enable_consensus=False,
max_workers=8
max_workers=8,
)
assert swarm.name == "ConfiguredSwarm"
@ -346,23 +352,41 @@ class TestBoardOfDirectorsSwarmInitialization:
assert len(swarm.board_members) == 3
assert swarm.board_members[0].role == BoardMemberRole.CHAIRMAN
assert swarm.board_members[1].role == BoardMemberRole.VICE_CHAIRMAN
assert swarm.board_members[2].role == BoardMemberRole.SECRETARY
assert (
swarm.board_members[1].role
== BoardMemberRole.VICE_CHAIRMAN
)
assert (
swarm.board_members[2].role == BoardMemberRole.SECRETARY
)
def test_initialization_without_agents(self):
"""Test initialization without agents should raise error."""
with pytest.raises(ValueError, match="No agents found in the swarm"):
with pytest.raises(
ValueError, match="No agents found in the swarm"
):
BoardOfDirectorsSwarm(agents=[])
def test_initialization_with_invalid_max_loops(self, sample_agents):
def test_initialization_with_invalid_max_loops(
self, sample_agents
):
"""Test initialization with invalid max_loops."""
with pytest.raises(ValueError, match="Max loops must be greater than 0"):
with pytest.raises(
ValueError, match="Max loops must be greater than 0"
):
BoardOfDirectorsSwarm(agents=sample_agents, max_loops=0)
def test_initialization_with_invalid_decision_threshold(self, sample_agents):
def test_initialization_with_invalid_decision_threshold(
self, sample_agents
):
"""Test initialization with invalid decision threshold."""
with pytest.raises(ValueError, match="Decision threshold must be between 0.0 and 1.0"):
BoardOfDirectorsSwarm(agents=sample_agents, decision_threshold=1.5)
with pytest.raises(
ValueError,
match="Decision threshold must be between 0.0 and 1.0",
):
BoardOfDirectorsSwarm(
agents=sample_agents, decision_threshold=1.5
)
class TestBoardOfDirectorsSwarmMethods:
@ -373,8 +397,14 @@ class TestBoardOfDirectorsSwarmMethods:
swarm = BoardOfDirectorsSwarm(agents=sample_agents)
assert len(swarm.board_members) == 3
assert all(hasattr(member.agent, 'agent_name') for member in swarm.board_members)
assert all(hasattr(member.agent, 'run') for member in swarm.board_members)
assert all(
hasattr(member.agent, "agent_name")
for member in swarm.board_members
)
assert all(
hasattr(member.agent, "run")
for member in swarm.board_members
)
def test_get_chairman_prompt(self, sample_agents):
"""Test chairman prompt generation."""
@ -412,12 +442,16 @@ class TestBoardOfDirectorsSwarmMethods:
assert "Secretary" in info
assert "expertise" in info
def test_add_board_member(self, basic_board_swarm, mock_board_member):
def test_add_board_member(
self, basic_board_swarm, mock_board_member
):
"""Test adding a board member."""
initial_count = len(basic_board_swarm.board_members)
basic_board_swarm.add_board_member(mock_board_member)
assert len(basic_board_swarm.board_members) == initial_count + 1
assert (
len(basic_board_swarm.board_members) == initial_count + 1
)
assert mock_board_member in basic_board_swarm.board_members
def test_remove_board_member(self, configured_board_swarm):
@ -428,19 +462,29 @@ class TestBoardOfDirectorsSwarmMethods:
initial_count = len(configured_board_swarm.board_members)
configured_board_swarm.remove_board_member(member_name)
assert len(configured_board_swarm.board_members) == initial_count - 1
assert member_to_remove not in configured_board_swarm.board_members
assert (
len(configured_board_swarm.board_members)
== initial_count - 1
)
assert (
member_to_remove
not in configured_board_swarm.board_members
)
def test_get_board_member(self, configured_board_swarm):
"""Test getting a board member by name."""
member = configured_board_swarm.board_members[0]
member_name = member.agent.agent_name
found_member = configured_board_swarm.get_board_member(member_name)
found_member = configured_board_swarm.get_board_member(
member_name
)
assert found_member == member
# Test with non-existent member
not_found = configured_board_swarm.get_board_member("NonExistent")
not_found = configured_board_swarm.get_board_member(
"NonExistent"
)
assert not_found is None
def test_get_board_summary(self, configured_board_swarm):
@ -462,10 +506,14 @@ class TestBoardOfDirectorsSwarmMethods:
class TestBoardMeetingOperations:
"""Test board meeting operations."""
def test_create_board_meeting_prompt(self, configured_board_swarm):
def test_create_board_meeting_prompt(
self, configured_board_swarm
):
"""Test board meeting prompt creation."""
task = "Test task for board meeting"
prompt = configured_board_swarm._create_board_meeting_prompt(task)
prompt = configured_board_swarm._create_board_meeting_prompt(
task
)
assert task in prompt
assert "BOARD OF DIRECTORS MEETING" in prompt
@ -477,23 +525,33 @@ class TestBoardMeetingOperations:
"""Test board discussion conduction."""
prompt = "Test board meeting prompt"
with patch.object(configured_board_swarm.board_members[0].agent, 'run') as mock_run:
with patch.object(
configured_board_swarm.board_members[0].agent, "run"
) as mock_run:
mock_run.return_value = "Board discussion result"
result = configured_board_swarm._conduct_board_discussion(prompt)
result = configured_board_swarm._conduct_board_discussion(
prompt
)
assert result == "Board discussion result"
mock_run.assert_called_once_with(task=prompt, img=None)
def test_conduct_board_discussion_no_chairman(self, sample_agents):
def test_conduct_board_discussion_no_chairman(
self, sample_agents
):
"""Test board discussion when no chairman is found."""
swarm = BoardOfDirectorsSwarm(agents=sample_agents)
# Remove all board members
swarm.board_members = []
with pytest.raises(ValueError, match="No chairman found in board members"):
with pytest.raises(
ValueError, match="No chairman found in board members"
):
swarm._conduct_board_discussion("Test prompt")
def test_parse_board_decisions_valid_json(self, configured_board_swarm):
def test_parse_board_decisions_valid_json(
self, configured_board_swarm
):
"""Test parsing valid JSON board decisions."""
valid_json = """
{
@ -520,7 +578,9 @@ class TestBoardMeetingOperations:
}
"""
result = configured_board_swarm._parse_board_decisions(valid_json)
result = configured_board_swarm._parse_board_decisions(
valid_json
)
assert isinstance(result, BoardSpec)
assert result.plan == "Test plan"
@ -528,33 +588,46 @@ class TestBoardMeetingOperations:
assert len(result.decisions) == 1
assert result.meeting_summary == "Test summary"
def test_parse_board_decisions_invalid_json(self, configured_board_swarm):
def test_parse_board_decisions_invalid_json(
self, configured_board_swarm
):
"""Test parsing invalid JSON board decisions."""
invalid_json = "Invalid JSON content"
result = configured_board_swarm._parse_board_decisions(invalid_json)
result = configured_board_swarm._parse_board_decisions(
invalid_json
)
assert isinstance(result, BoardSpec)
assert result.plan == invalid_json
assert len(result.orders) == 0
assert len(result.decisions) == 0
assert result.meeting_summary == "Parsing failed, using raw output"
assert (
result.meeting_summary
== "Parsing failed, using raw output"
)
def test_run_board_meeting(self, configured_board_swarm):
"""Test running a complete board meeting."""
task = "Test board meeting task"
with patch.object(configured_board_swarm, '_conduct_board_discussion') as mock_discuss:
with patch.object(configured_board_swarm, '_parse_board_decisions') as mock_parse:
with patch.object(
configured_board_swarm, "_conduct_board_discussion"
) as mock_discuss:
with patch.object(
configured_board_swarm, "_parse_board_decisions"
) as mock_parse:
mock_discuss.return_value = "Board discussion"
mock_parse.return_value = BoardSpec(
plan="Test plan",
orders=[],
decisions=[],
meeting_summary="Test summary"
meeting_summary="Test summary",
)
result = configured_board_swarm.run_board_meeting(task)
result = configured_board_swarm.run_board_meeting(
task
)
assert isinstance(result, BoardSpec)
mock_discuss.assert_called_once()
@ -569,17 +642,27 @@ class TestTaskExecution:
agent_name = "Agent1"
task = "Test task"
with patch.object(configured_board_swarm.agents[0], 'run') as mock_run:
with patch.object(
configured_board_swarm.agents[0], "run"
) as mock_run:
mock_run.return_value = "Agent response"
result = configured_board_swarm._call_single_agent(agent_name, task)
result = configured_board_swarm._call_single_agent(
agent_name, task
)
assert result == "Agent response"
mock_run.assert_called_once()
def test_call_single_agent_not_found(self, configured_board_swarm):
def test_call_single_agent_not_found(
self, configured_board_swarm
):
"""Test calling a non-existent agent."""
with pytest.raises(ValueError, match="Agent 'NonExistent' not found"):
configured_board_swarm._call_single_agent("NonExistent", "Test task")
with pytest.raises(
ValueError, match="Agent 'NonExistent' not found"
):
configured_board_swarm._call_single_agent(
"NonExistent", "Test task"
)
def test_execute_single_order(self, configured_board_swarm):
"""Test executing a single order."""
@ -587,27 +670,36 @@ class TestTaskExecution:
agent_name="Agent1",
task="Test order task",
priority=1,
assigned_by="Chairman"
assigned_by="Chairman",
)
with patch.object(configured_board_swarm, '_call_single_agent') as mock_call:
with patch.object(
configured_board_swarm, "_call_single_agent"
) as mock_call:
mock_call.return_value = "Order execution result"
result = configured_board_swarm._execute_single_order(order)
result = configured_board_swarm._execute_single_order(
order
)
assert result == "Order execution result"
mock_call.assert_called_once_with(
agent_name="Agent1",
task="Test order task"
agent_name="Agent1", task="Test order task"
)
def test_execute_orders(self, configured_board_swarm):
"""Test executing multiple orders."""
orders = [
BoardOrder(agent_name="Agent1", task="Task 1", priority=1),
BoardOrder(agent_name="Agent2", task="Task 2", priority=2),
BoardOrder(
agent_name="Agent1", task="Task 1", priority=1
),
BoardOrder(
agent_name="Agent2", task="Task 2", priority=2
),
]
with patch.object(configured_board_swarm, '_execute_single_order') as mock_execute:
with patch.object(
configured_board_swarm, "_execute_single_order"
) as mock_execute:
mock_execute.side_effect = ["Result 1", "Result 2"]
results = configured_board_swarm._execute_orders(orders)
@ -621,12 +713,16 @@ class TestTaskExecution:
"""Test generating board feedback."""
outputs = [
{"agent_name": "Agent1", "output": "Output 1"},
{"agent_name": "Agent2", "output": "Output 2"}
{"agent_name": "Agent2", "output": "Output 2"},
]
with patch.object(configured_board_swarm.board_members[0].agent, 'run') as mock_run:
with patch.object(
configured_board_swarm.board_members[0].agent, "run"
) as mock_run:
mock_run.return_value = "Board feedback"
result = configured_board_swarm._generate_board_feedback(outputs)
result = configured_board_swarm._generate_board_feedback(
outputs
)
assert result == "Board feedback"
mock_run.assert_called_once()
@ -636,7 +732,9 @@ class TestTaskExecution:
swarm = BoardOfDirectorsSwarm(agents=sample_agents)
swarm.board_members = [] # Remove all board members
with pytest.raises(ValueError, match="No chairman found for feedback"):
with pytest.raises(
ValueError, match="No chairman found for feedback"
):
swarm._generate_board_feedback([])
@ -647,22 +745,36 @@ class TestStepAndRunMethods:
"""Test the step method."""
task = "Test step task"
with patch.object(configured_board_swarm, 'run_board_meeting') as mock_meeting:
with patch.object(configured_board_swarm, '_execute_orders') as mock_execute:
with patch.object(configured_board_swarm, '_generate_board_feedback') as mock_feedback:
with patch.object(
configured_board_swarm, "run_board_meeting"
) as mock_meeting:
with patch.object(
configured_board_swarm, "_execute_orders"
) as mock_execute:
with patch.object(
configured_board_swarm, "_generate_board_feedback"
) as mock_feedback:
mock_meeting.return_value = BoardSpec(
plan="Test plan",
orders=[BoardOrder(agent_name="Agent1", task="Task 1")],
orders=[
BoardOrder(
agent_name="Agent1", task="Task 1"
)
],
decisions=[],
meeting_summary="Test summary"
meeting_summary="Test summary",
)
mock_execute.return_value = [{"agent_name": "Agent1", "output": "Result"}]
mock_execute.return_value = [
{"agent_name": "Agent1", "output": "Result"}
]
mock_feedback.return_value = "Board feedback"
result = configured_board_swarm.step(task)
assert result == "Board feedback"
mock_meeting.assert_called_once_with(task=task, img=None)
mock_meeting.assert_called_once_with(
task=task, img=None
)
mock_execute.assert_called_once()
mock_feedback.assert_called_once()
@ -671,30 +783,44 @@ class TestStepAndRunMethods:
configured_board_swarm.board_feedback_on = False
task = "Test step task"
with patch.object(configured_board_swarm, 'run_board_meeting') as mock_meeting:
with patch.object(configured_board_swarm, '_execute_orders') as mock_execute:
with patch.object(
configured_board_swarm, "run_board_meeting"
) as mock_meeting:
with patch.object(
configured_board_swarm, "_execute_orders"
) as mock_execute:
mock_meeting.return_value = BoardSpec(
plan="Test plan",
orders=[BoardOrder(agent_name="Agent1", task="Task 1")],
orders=[
BoardOrder(agent_name="Agent1", task="Task 1")
],
decisions=[],
meeting_summary="Test summary"
meeting_summary="Test summary",
)
mock_execute.return_value = [{"agent_name": "Agent1", "output": "Result"}]
mock_execute.return_value = [
{"agent_name": "Agent1", "output": "Result"}
]
result = configured_board_swarm.step(task)
assert result == [{"agent_name": "Agent1", "output": "Result"}]
assert result == [
{"agent_name": "Agent1", "output": "Result"}
]
def test_run_method(self, configured_board_swarm):
"""Test the run method."""
task = "Test run task"
with patch.object(configured_board_swarm, 'step') as mock_step:
with patch.object(configured_board_swarm, 'conversation') as mock_conversation:
with patch.object(
configured_board_swarm, "step"
) as mock_step:
with patch.object(
configured_board_swarm, "conversation"
) as mock_conversation:
mock_step.return_value = "Step result"
mock_conversation.add = Mock()
result = configured_board_swarm.run(task)
configured_board_swarm.run(task)
assert mock_step.call_count == 2 # max_loops = 2
assert mock_conversation.add.call_count == 2
@ -703,7 +829,7 @@ class TestStepAndRunMethods:
"""Test the async run method."""
task = "Test async run task"
with patch.object(configured_board_swarm, 'run') as mock_run:
with patch.object(configured_board_swarm, "run") as mock_run:
mock_run.return_value = "Async result"
async def test_async():
@ -722,9 +848,7 @@ class TestBoardOfDirectorsSwarmIntegration:
def test_full_workflow_integration(self, sample_agents):
"""Test full workflow integration."""
swarm = BoardOfDirectorsSwarm(
agents=sample_agents,
verbose=False,
max_loops=1
agents=sample_agents, verbose=False, max_loops=1
)
task = "Create a simple report"
@ -761,7 +885,9 @@ class TestBoardOfDirectorsSwarmIntegration:
}
"""
with patch.object(swarm.board_members[0].agent, 'run') as mock_run:
with patch.object(
swarm.board_members[0].agent, "run"
) as mock_run:
mock_run.return_value = mock_board_output
result = swarm.run(task)
@ -777,7 +903,7 @@ class TestBoardOfDirectorsSwarmIntegration:
agent=sample_agents[0],
role=BoardMemberRole.MEMBER,
voting_weight=1.0,
expertise_areas=["testing"]
expertise_areas=["testing"],
)
initial_count = len(swarm.board_members)
@ -790,7 +916,9 @@ class TestBoardOfDirectorsSwarmIntegration:
assert len(swarm.board_members) == initial_count
# Test getting board member
member = swarm.get_board_member(swarm.board_members[0].agent.agent_name)
member = swarm.get_board_member(
swarm.board_members[0].agent.agent_name
)
assert member is not None
@ -798,21 +926,33 @@ class TestBoardOfDirectorsSwarmIntegration:
@pytest.mark.parametrize("max_loops", [1, 2, 3])
def test_max_loops_parameterization(sample_agents, max_loops):
"""Test swarm with different max_loops values."""
swarm = BoardOfDirectorsSwarm(agents=sample_agents, max_loops=max_loops)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, max_loops=max_loops
)
assert swarm.max_loops == max_loops
@pytest.mark.parametrize("decision_threshold", [0.5, 0.6, 0.7, 0.8, 0.9])
def test_decision_threshold_parameterization(sample_agents, decision_threshold):
@pytest.mark.parametrize(
"decision_threshold", [0.5, 0.6, 0.7, 0.8, 0.9]
)
def test_decision_threshold_parameterization(
sample_agents, decision_threshold
):
"""Test swarm with different decision threshold values."""
swarm = BoardOfDirectorsSwarm(agents=sample_agents, decision_threshold=decision_threshold)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, decision_threshold=decision_threshold
)
assert swarm.decision_threshold == decision_threshold
@pytest.mark.parametrize("board_model", ["gpt-4o-mini", "gpt-4", "claude-3-sonnet"])
@pytest.mark.parametrize(
"board_model", ["gpt-4o-mini", "gpt-4", "claude-3-sonnet"]
)
def test_board_model_parameterization(sample_agents, board_model):
"""Test swarm with different board models."""
swarm = BoardOfDirectorsSwarm(agents=sample_agents, board_model_name=board_model)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, board_model_name=board_model
)
assert swarm.board_model_name == board_model
@ -825,28 +965,50 @@ class TestBoardOfDirectorsSwarmErrorHandling:
with pytest.raises(ValueError):
BoardOfDirectorsSwarm(agents=[])
def test_board_meeting_error_handling(self, configured_board_swarm):
def test_board_meeting_error_handling(
self, configured_board_swarm
):
"""Test error handling during board meeting."""
with patch.object(configured_board_swarm, '_conduct_board_discussion') as mock_discuss:
mock_discuss.side_effect = Exception("Board meeting failed")
with patch.object(
configured_board_swarm, "_conduct_board_discussion"
) as mock_discuss:
mock_discuss.side_effect = Exception(
"Board meeting failed"
)
with pytest.raises(Exception, match="Board meeting failed"):
with pytest.raises(
Exception, match="Board meeting failed"
):
configured_board_swarm.run_board_meeting("Test task")
def test_task_execution_error_handling(self, configured_board_swarm):
def test_task_execution_error_handling(
self, configured_board_swarm
):
"""Test error handling during task execution."""
with patch.object(configured_board_swarm, '_call_single_agent') as mock_call:
with patch.object(
configured_board_swarm, "_call_single_agent"
) as mock_call:
mock_call.side_effect = Exception("Task execution failed")
with pytest.raises(Exception, match="Task execution failed"):
configured_board_swarm._call_single_agent("Agent1", "Test task")
with pytest.raises(
Exception, match="Task execution failed"
):
configured_board_swarm._call_single_agent(
"Agent1", "Test task"
)
def test_order_execution_error_handling(self, configured_board_swarm):
def test_order_execution_error_handling(
self, configured_board_swarm
):
"""Test error handling during order execution."""
orders = [BoardOrder(agent_name="Agent1", task="Task 1")]
with patch.object(configured_board_swarm, '_execute_single_order') as mock_execute:
mock_execute.side_effect = Exception("Order execution failed")
with patch.object(
configured_board_swarm, "_execute_single_order"
) as mock_execute:
mock_execute.side_effect = Exception(
"Order execution failed"
)
# Should not raise exception, but log error
results = configured_board_swarm._execute_orders(orders)
@ -863,9 +1025,7 @@ class TestBoardOfDirectorsSwarmPerformance:
import time
swarm = BoardOfDirectorsSwarm(
agents=sample_agents,
max_workers=3,
verbose=False
agents=sample_agents, max_workers=3, verbose=False
)
# Create multiple orders
@ -876,15 +1036,21 @@ class TestBoardOfDirectorsSwarmPerformance:
start_time = time.time()
with patch.object(swarm, '_execute_single_order') as mock_execute:
mock_execute.side_effect = lambda order: f"Result for {order.task}"
with patch.object(
swarm, "_execute_single_order"
) as mock_execute:
mock_execute.side_effect = (
lambda order: f"Result for {order.task}"
)
results = swarm._execute_orders(orders)
end_time = time.time()
execution_time = end_time - start_time
assert len(results) == 3
assert execution_time < 1.0 # Should complete quickly with parallel execution
assert (
execution_time < 1.0
) # Should complete quickly with parallel execution
def test_memory_usage(self, sample_agents):
"""Test memory usage characteristics."""
@ -898,9 +1064,7 @@ class TestBoardOfDirectorsSwarmPerformance:
swarms = []
for i in range(5):
swarm = BoardOfDirectorsSwarm(
agents=sample_agents,
name=f"Swarm{i}",
verbose=False
agents=sample_agents, name=f"Swarm{i}", verbose=False
)
swarms.append(swarm)
@ -917,49 +1081,69 @@ class TestBoardOfDirectorsSwarmConfiguration:
def test_verbose_configuration(self, sample_agents):
"""Test verbose configuration."""
swarm = BoardOfDirectorsSwarm(agents=sample_agents, verbose=True)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, verbose=True
)
assert swarm.verbose is True
swarm = BoardOfDirectorsSwarm(agents=sample_agents, verbose=False)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, verbose=False
)
assert swarm.verbose is False
def test_collaboration_prompt_configuration(self, sample_agents):
"""Test collaboration prompt configuration."""
swarm = BoardOfDirectorsSwarm(agents=sample_agents, add_collaboration_prompt=True)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, add_collaboration_prompt=True
)
assert swarm.add_collaboration_prompt is True
swarm = BoardOfDirectorsSwarm(agents=sample_agents, add_collaboration_prompt=False)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, add_collaboration_prompt=False
)
assert swarm.add_collaboration_prompt is False
def test_board_feedback_configuration(self, sample_agents):
"""Test board feedback configuration."""
swarm = BoardOfDirectorsSwarm(agents=sample_agents, board_feedback_on=True)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, board_feedback_on=True
)
assert swarm.board_feedback_on is True
swarm = BoardOfDirectorsSwarm(agents=sample_agents, board_feedback_on=False)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, board_feedback_on=False
)
assert swarm.board_feedback_on is False
def test_voting_configuration(self, sample_agents):
"""Test voting configuration."""
swarm = BoardOfDirectorsSwarm(agents=sample_agents, enable_voting=True)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, enable_voting=True
)
assert swarm.enable_voting is True
swarm = BoardOfDirectorsSwarm(agents=sample_agents, enable_voting=False)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, enable_voting=False
)
assert swarm.enable_voting is False
def test_consensus_configuration(self, sample_agents):
"""Test consensus configuration."""
swarm = BoardOfDirectorsSwarm(agents=sample_agents, enable_consensus=True)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, enable_consensus=True
)
assert swarm.enable_consensus is True
swarm = BoardOfDirectorsSwarm(agents=sample_agents, enable_consensus=False)
swarm = BoardOfDirectorsSwarm(
agents=sample_agents, enable_consensus=False
)
assert swarm.enable_consensus is False
# Real integration tests (skipped if no API key)
@pytest.mark.skipif(
not os.getenv("OPENAI_API_KEY"),
reason="OpenAI API key not available"
reason="OpenAI API key not available",
)
class TestBoardOfDirectorsSwarmRealIntegration:
"""Real integration tests for BoardOfDirectorsSwarm."""
@ -972,20 +1156,18 @@ class TestBoardOfDirectorsSwarmRealIntegration:
agent_name="Researcher",
agent_description="Research analyst",
model_name="gpt-4o-mini",
max_loops=1
max_loops=1,
),
Agent(
agent_name="Writer",
agent_description="Content writer",
model_name="gpt-4o-mini",
max_loops=1
)
max_loops=1,
),
]
swarm = BoardOfDirectorsSwarm(
agents=agents,
verbose=False,
max_loops=1
agents=agents, verbose=False, max_loops=1
)
task = "Create a brief market analysis report"
@ -1003,7 +1185,7 @@ class TestBoardOfDirectorsSwarmRealIntegration:
agent_name="TestAgent",
agent_description="Test agent",
model_name="gpt-4o-mini",
max_loops=1
max_loops=1,
)
]

Loading…
Cancel
Save