[WORKSHOP][New Examples and demos]

pull/987/head^2
Kye Gomez 1 month ago
parent 6944a3281b
commit eb413d350a

@ -1,54 +0,0 @@
from swarms import Agent, CronJob
from loguru import logger
# Example usage
if __name__ == "__main__":
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
streaming_on=True,
print_on=True,
telemetry_enable=False,
)
# Example 1: Basic usage with just a task
logger.info("Starting example cron job")
cron_job = CronJob(agent=agent, interval="10seconds")
cron_job.run(
task="What are the best top 3 etfs for gold coverage?"
)

@ -6,96 +6,33 @@ The `Conversation` class is a powerful and flexible tool for managing conversati
### Key Features
- **Multiple Storage Backends**: Support for various storage solutions:
- In-memory: Fast, temporary storage for testing and development
- Supabase: PostgreSQL-based cloud storage with real-time capabilities
- Redis: High-performance caching and persistence
- SQLite: Local file-based storage
- DuckDB: Analytical workloads and columnar storage
- Pulsar: Event streaming for distributed systems
- Mem0: Memory-based storage with mem0 integration
- **Token Management**:
- Built-in token counting with configurable models
- Automatic token tracking for input/output messages
- Token usage analytics and reporting
- Context length management
- **Metadata and Categories**:
- Support for message metadata
- Message categorization (input/output)
- Role-based message tracking
- Custom message IDs
- **Data Export/Import**:
- JSON and YAML export formats
- Automatic saving and loading
- Conversation history management
- Batch operations support
- **Advanced Features**:
- Message search and filtering
- Conversation analytics
- Multi-agent support
- Error handling and fallbacks
- Type hints and validation
| Feature Category | Features / Description |
|----------------------------|-------------------------------------------------------------------------------------------------------------|
| **Multiple Storage Backends** | - In-memory: Fast, temporary storage for testing and development<br>- Supabase: PostgreSQL-based cloud storage with real-time capabilities<br>- Redis: High-performance caching and persistence<br>- SQLite: Local file-based storage<br>- DuckDB: Analytical workloads and columnar storage<br>- Pulsar: Event streaming for distributed systems<br>- Mem0: Memory-based storage with mem0 integration |
| **Token Management** | - Built-in token counting with configurable models<br>- Automatic token tracking for input/output messages<br>- Token usage analytics and reporting<br>- Context length management |
| **Metadata and Categories** | - Support for message metadata<br>- Message categorization (input/output)<br>- Role-based message tracking<br>- Custom message IDs |
| **Data Export/Import** | - JSON and YAML export formats<br>- Automatic saving and loading<br>- Conversation history management<br>- Batch operations support |
| **Advanced Features** | - Message search and filtering<br>- Conversation analytics<br>- Multi-agent support<br>- Error handling and fallbacks<br>- Type hints and validation |
### Use Cases
1. **Chatbot Development**:
- Store and manage conversation history
- Track token usage and context length
- Analyze conversation patterns
2. **Multi-Agent Systems**:
- Coordinate multiple AI agents
- Track agent interactions
- Store agent outputs and metadata
3. **Analytics Applications**:
- Track conversation metrics
- Generate usage reports
- Analyze user interactions
4. **Production Systems**:
- Persistent storage with various backends
- Error handling and recovery
- Scalable conversation management
5. **Development and Testing**:
- Fast in-memory storage
- Debugging support
- Easy export/import of test data
| Use Case | Features / Description |
|----------------------------|--------------------------------------------------------------------------------------------------------|
| **Chatbot Development** | - Store and manage conversation history<br>- Track token usage and context length<br>- Analyze conversation patterns |
| **Multi-Agent Systems** | - Coordinate multiple AI agents<br>- Track agent interactions<br>- Store agent outputs and metadata |
| **Analytics Applications** | - Track conversation metrics<br>- Generate usage reports<br>- Analyze user interactions |
| **Production Systems** | - Persistent storage with various backends<br>- Error handling and recovery<br>- Scalable conversation management |
| **Development and Testing**| - Fast in-memory storage<br>- Debugging support<br>- Easy export/import of test data |
### Best Practices
1. **Storage Selection**:
- Use in-memory for testing and development
- Choose Supabase for multi-user cloud applications
- Use Redis for high-performance requirements
- Select SQLite for single-user local applications
- Pick DuckDB for analytical workloads
- Opt for Pulsar in distributed systems
2. **Token Management**:
- Enable token counting for production use
- Set appropriate context lengths
- Monitor token usage with export_and_count_categories()
3. **Error Handling**:
- Implement proper fallback mechanisms
- Use type hints for better code reliability
- Monitor and log errors appropriately
4. **Data Management**:
- Use appropriate export formats (JSON/YAML)
- Implement regular backup strategies
- Clean up old conversations when needed
5. **Security**:
- Use environment variables for sensitive credentials
- Implement proper access controls
- Validate input data
| Category | Best Practices |
|---------------------|------------------------------------------------------------------------------------------------------------------------|
| **Storage Selection** | - Use in-memory for testing and development<br>- Choose Supabase for multi-user cloud applications<br>- Use Redis for high-performance requirements<br>- Select SQLite for single-user local applications<br>- Pick DuckDB for analytical workloads<br>- Opt for Pulsar in distributed systems |
| **Token Management** | - Enable token counting for production use<br>- Set appropriate context lengths<br>- Monitor token usage with `export_and_count_categories()` |
| **Error Handling** | - Implement proper fallback mechanisms<br>- Use type hints for better code reliability<br>- Monitor and log errors appropriately |
| **Data Management** | - Use appropriate export formats (JSON/YAML)<br>- Implement regular backup strategies<br>- Clean up old conversations when needed |
| **Security** | - Use environment variables for sensitive credentials<br>- Implement proper access controls<br>- Validate input data |
## Table of Contents
@ -113,13 +50,15 @@ The `Conversation` class is designed to manage conversations by keeping track of
**New in this version**: The class now supports multiple storage backends for persistent conversation storage:
- **"in-memory"**: Default memory-based storage (no persistence)
- **"mem0"**: Memory-based storage with mem0 integration (requires: `pip install mem0ai`)
- **"supabase"**: PostgreSQL-based storage using Supabase (requires: `pip install supabase`)
- **"redis"**: Redis-based storage (requires: `pip install redis`)
- **"sqlite"**: SQLite-based storage (built-in to Python)
- **"duckdb"**: DuckDB-based storage (requires: `pip install duckdb`)
- **"pulsar"**: Apache Pulsar messaging backend (requires: `pip install pulsar-client`)
| Backend | Description | Requirements |
|--------------|-------------------------------------------------------------------------------------------------------------|------------------------------------|
| **in-memory**| Default memory-based storage (no persistence) | None (built-in) |
| **mem0** | Memory-based storage with mem0 integration | `pip install mem0ai` |
| **supabase** | PostgreSQL-based storage using Supabase | `pip install supabase` |
| **redis** | Redis-based storage | `pip install redis` |
| **sqlite** | SQLite-based storage (local file) | None (built-in) |
| **duckdb** | DuckDB-based storage (analytical workloads, columnar storage) | `pip install duckdb` |
| **pulsar** | Apache Pulsar messaging backend | `pip install pulsar-client` |
All backends use **lazy loading** - database dependencies are only imported when the specific backend is instantiated. Each backend provides helpful error messages if required packages are not installed.
@ -132,7 +71,6 @@ All backends use **lazy loading** - database dependencies are only imported when
| system_prompt | Optional[str] | System prompt for the conversation |
| time_enabled | bool | Flag to enable time tracking for messages |
| autosave | bool | Flag to enable automatic saving |
| save_enabled | bool | Flag to control if saving is enabled |
| save_filepath | str | File path for saving conversation history |
| load_filepath | str | File path for loading conversation history |
| conversation_history | list | List storing conversation messages |

@ -0,0 +1,247 @@
from loguru import logger
import yfinance as yf
import json
def get_figma_stock_data(stock: str) -> str:
"""
Fetches comprehensive stock data for Figma (FIG) using Yahoo Finance.
Returns:
Dict[str, Any]: A dictionary containing comprehensive Figma stock data including:
- Current price and market data
- Company information
- Financial metrics
- Historical data summary
- Trading statistics
Raises:
Exception: If there's an error fetching the data from Yahoo Finance
"""
try:
# Initialize Figma stock ticker
figma = yf.Ticker(stock)
# Get current stock info
info = figma.info
# Get recent historical data (last 30 days)
hist = figma.history(period="30d")
# Get real-time fast info
fast_info = figma.fast_info
# Compile comprehensive data
figma_data = {
"company_info": {
"name": info.get("longName", "Figma Inc."),
"symbol": "FIG",
"sector": info.get("sector", "N/A"),
"industry": info.get("industry", "N/A"),
"website": info.get("website", "N/A"),
"description": info.get("longBusinessSummary", "N/A"),
},
"current_market_data": {
"current_price": info.get("currentPrice", "N/A"),
"previous_close": info.get("previousClose", "N/A"),
"open": info.get("open", "N/A"),
"day_low": info.get("dayLow", "N/A"),
"day_high": info.get("dayHigh", "N/A"),
"volume": info.get("volume", "N/A"),
"market_cap": info.get("marketCap", "N/A"),
"price_change": (
info.get("currentPrice", 0)
- info.get("previousClose", 0)
if info.get("currentPrice")
and info.get("previousClose")
else "N/A"
),
"price_change_percent": info.get(
"regularMarketChangePercent", "N/A"
),
},
"financial_metrics": {
"pe_ratio": info.get("trailingPE", "N/A"),
"forward_pe": info.get("forwardPE", "N/A"),
"price_to_book": info.get("priceToBook", "N/A"),
"price_to_sales": info.get(
"priceToSalesTrailing12Months", "N/A"
),
"enterprise_value": info.get(
"enterpriseValue", "N/A"
),
"beta": info.get("beta", "N/A"),
"dividend_yield": info.get("dividendYield", "N/A"),
"payout_ratio": info.get("payoutRatio", "N/A"),
},
"trading_statistics": {
"fifty_day_average": info.get(
"fiftyDayAverage", "N/A"
),
"two_hundred_day_average": info.get(
"twoHundredDayAverage", "N/A"
),
"fifty_two_week_low": info.get(
"fiftyTwoWeekLow", "N/A"
),
"fifty_two_week_high": info.get(
"fiftyTwoWeekHigh", "N/A"
),
"shares_outstanding": info.get(
"sharesOutstanding", "N/A"
),
"float_shares": info.get("floatShares", "N/A"),
"shares_short": info.get("sharesShort", "N/A"),
"short_ratio": info.get("shortRatio", "N/A"),
},
"recent_performance": {
"last_30_days": {
"start_price": (
hist.iloc[0]["Close"]
if not hist.empty
else "N/A"
),
"end_price": (
hist.iloc[-1]["Close"]
if not hist.empty
else "N/A"
),
"total_return": (
(
hist.iloc[-1]["Close"]
- hist.iloc[0]["Close"]
)
/ hist.iloc[0]["Close"]
* 100
if not hist.empty
else "N/A"
),
"highest_price": (
hist["High"].max()
if not hist.empty
else "N/A"
),
"lowest_price": (
hist["Low"].min() if not hist.empty else "N/A"
),
"average_volume": (
hist["Volume"].mean()
if not hist.empty
else "N/A"
),
}
},
"real_time_data": {
"last_price": (
fast_info.last_price
if hasattr(fast_info, "last_price")
else "N/A"
),
"last_volume": (
fast_info.last_volume
if hasattr(fast_info, "last_volume")
else "N/A"
),
"bid": (
fast_info.bid
if hasattr(fast_info, "bid")
else "N/A"
),
"ask": (
fast_info.ask
if hasattr(fast_info, "ask")
else "N/A"
),
"bid_size": (
fast_info.bid_size
if hasattr(fast_info, "bid_size")
else "N/A"
),
"ask_size": (
fast_info.ask_size
if hasattr(fast_info, "ask_size")
else "N/A"
),
},
}
logger.info("Successfully fetched Figma (FIG) stock data")
return json.dumps(figma_data, indent=4)
except Exception as e:
logger.error(f"Error fetching Figma stock data: {e}")
raise Exception(f"Failed to fetch Figma stock data: {e}")
# # Example usage
# # Initialize the quantitative trading agent
# agent = Agent(
# agent_name="Quantitative-Trading-Agent",
# agent_description="Advanced quantitative trading and algorithmic analysis agent specializing in stock analysis and trading strategies",
# system_prompt=f"""You are an expert quantitative trading agent with deep expertise in:
# - Algorithmic trading strategies and implementation
# - Statistical arbitrage and market making
# - Risk management and portfolio optimization
# - High-frequency trading systems
# - Market microstructure analysis
# - Quantitative research methodologies
# - Financial mathematics and stochastic processes
# - Machine learning applications in trading
# - Technical analysis and chart patterns
# - Fundamental analysis and valuation models
# - Options trading and derivatives
# - Market sentiment analysis
# Your core responsibilities include:
# 1. Developing and backtesting trading strategies
# 2. Analyzing market data and identifying alpha opportunities
# 3. Implementing risk management frameworks
# 4. Optimizing portfolio allocations
# 5. Conducting quantitative research
# 6. Monitoring market microstructure
# 7. Evaluating trading system performance
# 8. Performing comprehensive stock analysis
# 9. Generating trading signals and recommendations
# 10. Risk assessment and position sizing
# When analyzing stocks, you should:
# - Evaluate technical indicators and chart patterns
# - Assess fundamental metrics and valuation ratios
# - Analyze market sentiment and momentum
# - Consider macroeconomic factors
# - Provide risk-adjusted return projections
# - Suggest optimal entry/exit points
# - Calculate position sizing recommendations
# - Identify potential catalysts and risks
# You maintain strict adherence to:
# - Mathematical rigor in all analyses
# - Statistical significance in strategy development
# - Risk-adjusted return optimization
# - Market impact minimization
# - Regulatory compliance
# - Transaction cost analysis
# - Performance attribution
# - Data-driven decision making
# You communicate in precise, technical terms while maintaining clarity for stakeholders.
# Data: {get_figma_stock_data('FIG')}
# """,
# max_loops=1,
# model_name="gpt-4o-mini",
# dynamic_temperature_enabled=True,
# output_type="str-all-except-first",
# streaming_on=True,
# print_on=True,
# telemetry_enable=False,
# )
# # Example 1: Basic usage with just a task
# logger.info("Starting quantitative analysis cron job for Figma (FIG)")
# cron_job = CronJob(agent=agent, interval="10seconds")
# cron_job.run(
# task="Analyze the Figma (FIG) stock comprehensively using the available stock data. Provide a detailed quantitative analysis"
# )
print(get_figma_stock_data("FIG"))

@ -0,0 +1,105 @@
"""
Example script demonstrating how to fetch Figma (FIG) stock data using swarms_tools Yahoo Finance API.
This shows the alternative approach using the existing swarms_tools package.
"""
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms_tools import yahoo_finance_api
from loguru import logger
import json
def get_figma_data_with_swarms_tools():
"""
Fetches Figma stock data using the swarms_tools Yahoo Finance API.
Returns:
dict: Figma stock data from swarms_tools
"""
try:
logger.info("Fetching Figma stock data using swarms_tools...")
figma_data = yahoo_finance_api(["FIG"])
return figma_data
except Exception as e:
logger.error(f"Error fetching data with swarms_tools: {e}")
raise
def analyze_figma_with_agent():
"""
Uses a Swarms agent to analyze Figma stock data.
"""
try:
# Initialize the agent with Yahoo Finance tool
agent = Agent(
agent_name="Figma-Analysis-Agent",
agent_description="Specialized agent for analyzing Figma stock data",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4o-mini",
tools=[yahoo_finance_api],
dynamic_temperature_enabled=True,
)
# Ask the agent to analyze Figma
analysis = agent.run(
"Analyze the current stock data for Figma (FIG) and provide insights on its performance, valuation metrics, and recent trends."
)
return analysis
except Exception as e:
logger.error(f"Error in agent analysis: {e}")
raise
def main():
"""
Main function to demonstrate different approaches for Figma stock data.
"""
logger.info("Starting Figma stock analysis with swarms_tools")
try:
# Method 1: Direct API call
print("\n" + "=" * 60)
print("METHOD 1: Direct swarms_tools API call")
print("=" * 60)
figma_data = get_figma_data_with_swarms_tools()
print("Raw data from swarms_tools:")
print(json.dumps(figma_data, indent=2, default=str))
# Method 2: Agent-based analysis
print("\n" + "=" * 60)
print("METHOD 2: Agent-based analysis")
print("=" * 60)
analysis = analyze_figma_with_agent()
print("Agent analysis:")
print(analysis)
# Method 3: Comparison with custom function
print("\n" + "=" * 60)
print("METHOD 3: Comparison with custom function")
print("=" * 60)
from cron_job_examples.cron_job_example import (
get_figma_stock_data_simple,
)
custom_data = get_figma_stock_data_simple()
print("Custom function output:")
print(custom_data)
logger.info("All methods completed successfully!")
except Exception as e:
logger.error(f"Error in main function: {e}")
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,79 @@
"""
Example script demonstrating how to fetch Figma (FIG) stock data using Yahoo Finance.
"""
from cron_job_examples.cron_job_example import (
get_figma_stock_data,
get_figma_stock_data_simple,
)
from loguru import logger
import json
def main():
"""
Main function to demonstrate Figma stock data fetching.
"""
logger.info("Starting Figma stock data demonstration")
try:
# Example 1: Get comprehensive data as dictionary
logger.info("Fetching comprehensive Figma stock data...")
figma_data = get_figma_stock_data()
# Print the data in a structured format
print("\n" + "=" * 50)
print("COMPREHENSIVE FIGMA STOCK DATA")
print("=" * 50)
print(json.dumps(figma_data, indent=2, default=str))
# Example 2: Get simple formatted data
logger.info("Fetching simple formatted Figma stock data...")
simple_data = get_figma_stock_data_simple()
print("\n" + "=" * 50)
print("SIMPLE FORMATTED FIGMA STOCK DATA")
print("=" * 50)
print(simple_data)
# Example 3: Access specific data points
logger.info("Accessing specific data points...")
current_price = figma_data["current_market_data"][
"current_price"
]
market_cap = figma_data["current_market_data"]["market_cap"]
pe_ratio = figma_data["financial_metrics"]["pe_ratio"]
print("\nKey Metrics:")
print(f"Current Price: ${current_price}")
print(f"Market Cap: ${market_cap:,}")
print(f"P/E Ratio: {pe_ratio}")
# Example 4: Check if stock is performing well
price_change = figma_data["current_market_data"][
"price_change"
]
if isinstance(price_change, (int, float)):
if price_change > 0:
print(
f"\n📈 Figma stock is up ${price_change:.2f} today!"
)
elif price_change < 0:
print(
f"\n📉 Figma stock is down ${abs(price_change):.2f} today."
)
else:
print("\n➡️ Figma stock is unchanged today.")
logger.info(
"Figma stock data demonstration completed successfully!"
)
except Exception as e:
logger.error(f"Error in main function: {e}")
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,257 @@
from swarms import Agent, CronJob
from loguru import logger
import requests
import json
from datetime import datetime
def get_solana_price() -> str:
"""
Fetches comprehensive Solana (SOL) price data using CoinGecko API.
Returns:
str: A JSON formatted string containing Solana's current price and market data including:
- Current price in USD
- Market cap
- 24h volume
- 24h price change
- Last updated timestamp
Raises:
Exception: If there's an error fetching the data from CoinGecko API
"""
try:
# CoinGecko API endpoint for simple price data
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": "solana", # Solana's CoinGecko ID
"vs_currencies": "usd",
"include_market_cap": True,
"include_24hr_vol": True,
"include_24hr_change": True,
"include_last_updated_at": True,
}
# Make API request with timeout
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
# Parse response data
data = response.json()
if "solana" not in data:
raise Exception("Solana data not found in API response")
solana_data = data["solana"]
# Compile comprehensive data
solana_info = {
"timestamp": datetime.now().isoformat(),
"coin_info": {
"name": "Solana",
"symbol": "SOL",
"coin_id": "solana",
},
"price_data": {
"current_price_usd": solana_data.get("usd", "N/A"),
"market_cap_usd": solana_data.get(
"usd_market_cap", "N/A"
),
"volume_24h_usd": solana_data.get(
"usd_24h_vol", "N/A"
),
"price_change_24h_percent": solana_data.get(
"usd_24h_change", "N/A"
),
"last_updated_at": solana_data.get(
"last_updated_at", "N/A"
),
},
"formatted_data": {
"price_formatted": (
f"${solana_data.get('usd', 'N/A'):,.2f}"
if solana_data.get("usd")
else "N/A"
),
"market_cap_formatted": (
f"${solana_data.get('usd_market_cap', 'N/A'):,.0f}"
if solana_data.get("usd_market_cap")
else "N/A"
),
"volume_formatted": (
f"${solana_data.get('usd_24h_vol', 'N/A'):,.0f}"
if solana_data.get("usd_24h_vol")
else "N/A"
),
"change_formatted": (
f"{solana_data.get('usd_24h_change', 'N/A'):+.2f}%"
if solana_data.get("usd_24h_change") is not None
else "N/A"
),
},
}
logger.info(
f"Successfully fetched Solana price: ${solana_data.get('usd', 'N/A')}"
)
return json.dumps(solana_info, indent=4)
except requests.RequestException as e:
error_msg = f"API request failed: {e}"
logger.error(error_msg)
return json.dumps(
{
"error": error_msg,
"timestamp": datetime.now().isoformat(),
"status": "failed",
},
indent=4,
)
except Exception as e:
error_msg = f"Error fetching Solana price data: {e}"
logger.error(error_msg)
return json.dumps(
{
"error": error_msg,
"timestamp": datetime.now().isoformat(),
"status": "failed",
},
indent=4,
)
def analyze_solana_data(data: str) -> str:
"""
Analyzes Solana price data and provides insights.
Args:
data (str): JSON string containing Solana price data
Returns:
str: Analysis and insights about the current Solana market data
"""
try:
# Parse the data
solana_data = json.loads(data)
if "error" in solana_data:
return f"❌ Error in data: {solana_data['error']}"
price_data = solana_data.get("price_data", {})
formatted_data = solana_data.get("formatted_data", {})
# Extract key metrics
current_price = price_data.get("current_price_usd")
price_change = price_data.get("price_change_24h_percent")
volume_24h = price_data.get("volume_24h_usd")
market_cap = price_data.get("market_cap_usd")
# Generate analysis
analysis = f"""
🔍 **Solana (SOL) Market Analysis** - {solana_data.get('timestamp', 'N/A')}
💰 **Current Price**: {formatted_data.get('price_formatted', 'N/A')}
📊 **24h Change**: {formatted_data.get('change_formatted', 'N/A')}
💎 **Market Cap**: {formatted_data.get('market_cap_formatted', 'N/A')}
📈 **24h Volume**: {formatted_data.get('volume_formatted', 'N/A')}
"""
# Add sentiment analysis based on price change
if price_change is not None:
if price_change > 5:
analysis += "🚀 **Sentiment**: Strongly Bullish - Significant positive momentum\n"
elif price_change > 1:
analysis += "📈 **Sentiment**: Bullish - Positive price action\n"
elif price_change > -1:
analysis += (
"➡️ **Sentiment**: Neutral - Sideways movement\n"
)
elif price_change > -5:
analysis += "📉 **Sentiment**: Bearish - Negative price action\n"
else:
analysis += "🔻 **Sentiment**: Strongly Bearish - Significant decline\n"
# Add volume analysis
if volume_24h and market_cap:
try:
volume_market_cap_ratio = (
volume_24h / market_cap
) * 100
if volume_market_cap_ratio > 10:
analysis += "🔥 **Volume**: High trading activity - Strong market interest\n"
elif volume_market_cap_ratio > 5:
analysis += (
"📊 **Volume**: Moderate trading activity\n"
)
else:
analysis += "😴 **Volume**: Low trading activity - Limited market movement\n"
except (TypeError, ZeroDivisionError):
analysis += "📊 **Volume**: Unable to calculate volume/market cap ratio\n"
analysis += f"\n⏰ **Last Updated**: {price_data.get('last_updated_at', 'N/A')}"
return analysis
except json.JSONDecodeError as e:
return f"❌ Error parsing data: {e}"
except Exception as e:
return f"❌ Error analyzing data: {e}"
# Initialize the Solana analysis agent
agent = Agent(
agent_name="Solana-Price-Analyzer",
agent_description="Specialized agent for analyzing Solana (SOL) cryptocurrency price data and market trends",
system_prompt=f"""You are an expert cryptocurrency analyst specializing in Solana (SOL) analysis. Your expertise includes:
- Technical analysis and chart patterns
- Market sentiment analysis
- Volume and liquidity analysis
- Price action interpretation
- Market cap and valuation metrics
- Cryptocurrency market dynamics
- DeFi ecosystem analysis
- Blockchain technology trends
When analyzing Solana data, you should:
- Evaluate price movements and trends
- Assess market sentiment and momentum
- Consider volume and liquidity factors
- Analyze market cap positioning
- Provide actionable insights
- Identify potential catalysts or risks
- Consider broader market context
You communicate clearly and provide practical analysis that helps users understand Solana's current market position and potential future movements.
Current Solana Data: {get_solana_price()}
""",
max_loops=1,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
streaming_on=False, # need to fix this bug where streaming is working but makes copies of the border when you scroll on the terminal
print_on=True,
telemetry_enable=False,
)
def main():
"""
Main function to run the Solana price tracking cron job.
"""
logger.info("🚀 Starting Solana price tracking cron job")
logger.info("📊 Fetching Solana price every 10 seconds...")
# Create cron job that runs every 10 seconds
cron_job = CronJob(agent=agent, interval="30seconds")
# Run the cron job with analysis task
cron_job.run(
task="Analyze the current Solana (SOL) price data comprehensively. Provide detailed market analysis including price trends, volume analysis, market sentiment, and actionable insights. Format your response clearly with emojis and structured sections."
)
if __name__ == "__main__":
main()

@ -1,5 +1,4 @@
from swarms import Agent
from swarms.structs.graph_workflow import GraphWorkflow
from swarms import Agent, GraphWorkflow
from swarms.prompts.multi_agent_collab_prompt import (
MULTI_AGENT_COLLAB_PROMPT_TWO,
)
@ -11,6 +10,7 @@ agent1 = Agent(
max_loops=1,
system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO, # Set collaboration prompt
)
agent2 = Agent(
agent_name="ResearchAgent2",
model_name="gpt-4.1",
@ -19,7 +19,11 @@ agent2 = Agent(
)
# Build the workflow with only agents as nodes
workflow = GraphWorkflow()
workflow = GraphWorkflow(
name="Research Workflow",
description="A workflow for researching the best arbitrage trading strategies for altcoins",
auto_compile=True,
)
workflow.add_node(agent1)
workflow.add_node(agent2)
@ -27,27 +31,15 @@ workflow.add_node(agent2)
workflow.add_edge(agent1.agent_name, agent2.agent_name)
# Visualize the workflow using Graphviz
print("\n📊 Creating workflow visualization...")
try:
viz_output = workflow.visualize(
output_path="simple_workflow_graph",
format="png",
view=True, # Auto-open the generated image
show_parallel_patterns=True,
)
print(f"✅ Workflow visualization saved to: {viz_output}")
except Exception as e:
print(f"⚠️ Graphviz not available, using text visualization: {e}")
workflow.visualize()
workflow.visualize()
workflow.compile()
# Export workflow to JSON
workflow_json = workflow.to_json()
print(
f"\n💾 Workflow exported to JSON ({len(workflow_json)} characters)"
)
print(workflow_json)
# Run the workflow and print results
print("\n🚀 Executing workflow...")
results = workflow.run(
task="What are the best arbitrage trading strategies for altcoins? Give me research papers and articles on the topic."
)

@ -1,16 +1,32 @@
from swarms.structs.heavy_swarm import HeavySwarm
from swarms import HeavySwarm
swarm = HeavySwarm(
worker_model_name="claude-3-5-sonnet-20240620",
show_dashboard=True,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
)
def main():
"""
Run a HeavySwarm query to find the best 3 gold ETFs.
This function initializes a HeavySwarm instance and queries it to provide
the top 3 gold exchange-traded funds (ETFs), requesting clear, structured results.
"""
swarm = HeavySwarm(
name="Gold ETF Research Team",
description="A team of agents that research the best gold ETFs",
worker_model_name="claude-sonnet-4-latest",
show_dashboard=True,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
)
out = swarm.run(
"Provide 3 publicly traded biotech companies that are currently trading below their cash value. For each company identified, provide available data or projections for the next 6 months, including any relevant financial metrics, upcoming catalysts, or events that could impact valuation. Present your findings in a clear, structured format. Be very specific and provide their ticker symbol, name, and the current price, cash value, and the percentage difference between the two."
)
prompt = (
"Find the best 3 gold ETFs. For each ETF, provide the ticker symbol, "
"full name, current price, expense ratio, assets under management, and "
"a brief explanation of why it is considered among the best. Present the information "
"in a clear, structured format suitable for investors."
)
print(out)
out = swarm.run(prompt)
print(out)
if __name__ == "__main__":
main()

@ -0,0 +1,34 @@
from swarms import HeavySwarm
def main():
"""
Run a HeavySwarm query to find the best and most promising treatments for diabetes.
This function initializes a HeavySwarm instance and queries it to provide
the top current and theoretical treatments for diabetes, requesting clear,
structured, and evidence-based results suitable for medical research or clinical review.
"""
swarm = HeavySwarm(
name="Diabetes Treatment Research Team",
description="A team of agents that research the best and most promising treatments for diabetes, including theoretical approaches.",
worker_model_name="claude-sonnet-4-20250514",
show_dashboard=True,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
)
prompt = (
"Identify the best and most promising treatments for diabetes, including both current standard therapies and theoretical or experimental approaches. "
"For each treatment, provide: the treatment name, type (e.g., medication, lifestyle intervention, device, gene therapy, etc.), "
"mechanism of action, current stage of research or approval status, key clinical evidence or rationale, "
"potential benefits and risks, and a brief summary of why it is considered promising. "
"Present the information in a clear, structured format suitable for medical professionals or researchers."
)
out = swarm.run(prompt)
print(out)
if __name__ == "__main__":
main()

@ -1,5 +1,4 @@
from swarms import Agent
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
from swarms import Agent, HierarchicalSwarm
# Initialize agents for a $50B portfolio analysis
@ -9,24 +8,27 @@ agents = [
agent_description="Senior financial analyst at BlackRock.",
system_prompt="You are a financial analyst tasked with optimizing asset allocations for a $50B portfolio. Provide clear, quantitative recommendations for each sector.",
max_loops=1,
model_name="groq/deepseek-r1-distill-qwen-32b",
model_name="gpt-4.1",
max_tokens=3000,
streaming_on=True,
),
Agent(
agent_name="Sector-Risk-Analyst",
agent_description="Expert risk management analyst.",
system_prompt="You are a risk analyst responsible for advising on risk allocation within a $50B portfolio. Provide detailed insights on risk exposures for each sector.",
max_loops=1,
model_name="groq/deepseek-r1-distill-qwen-32b",
model_name="gpt-4.1",
max_tokens=3000,
streaming_on=True,
),
Agent(
agent_name="Tech-Sector-Analyst",
agent_description="Technology sector analyst.",
system_prompt="You are a tech sector analyst focused on capital and risk allocations. Provide data-backed insights for the tech sector.",
max_loops=1,
model_name="groq/deepseek-r1-distill-qwen-32b",
model_name="gpt-4.1",
max_tokens=3000,
streaming_on=True,
),
]
@ -35,14 +37,19 @@ majority_voting = HierarchicalSwarm(
name="Sector-Investment-Advisory-System",
description="System for sector analysis and optimal allocations.",
agents=agents,
# director=director_agent,
max_loops=1,
max_loops=2,
output_type="dict",
)
# Run the analysis
result = majority_voting.run(
task="Evaluate market sectors and determine optimal allocation for a $50B portfolio. Include a detailed table of allocations, risk assessments, and a consolidated strategy."
task=(
"Simulate the allocation of a $50B fund specifically for the pharmaceutical sector. "
"Provide specific tickers (e.g., PFE, MRK, JNJ, LLY, BMY, etc.) and a clear rationale for why funds should be allocated to each company. "
"Present a table showing each ticker, company name, allocation percentage, and allocation amount in USD. "
"Include a brief summary of the overall allocation strategy and the reasoning behind the choices."
"Only call the Sector-Financial-Analyst agent to do the analysis. Nobody else should do the analysis."
)
)
print(result)

@ -6,11 +6,11 @@ each with detailed backgrounds, political positions, and comprehensive system pr
that reflect their real-world characteristics, voting patterns, and policy priorities.
"""
import random
from typing import Dict, List, Optional, Union
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently
from typing import Dict, List, Optional, Union
import json
import random
class SenatorSimulation:
@ -3490,159 +3490,159 @@ class SenatorSimulation:
}
# Example usage and demonstration
def main():
"""
Demonstrate the Senate simulation with various scenarios.
"""
print("🏛️ US Senate Simulation Initializing...")
# Create the simulation
senate = SenatorSimulation()
print("\n📊 Senate Composition:")
composition = senate.get_senate_composition()
print(json.dumps(composition, indent=2))
print(f"\n🎭 Available Senators ({len(senate.senators)}):")
for name in senate.senators.keys():
party = senate._get_senator_party(name)
print(f" - {name} ({party})")
# Example 1: Individual senator response
print("\n🗣️ Example: Senator Response")
senator = senate.get_senator("Katie Britt")
response = senator.run(
"What is your position on infrastructure spending and how would you pay for it?"
)
print(f"Senator Katie Britt: {response}")
# Example 2: Simulate a debate
print("\n💬 Example: Senate Debate on Climate Change")
debate = senate.simulate_debate(
"Climate change legislation and carbon pricing",
[
"Katie Britt",
"Mark Kelly",
"Lisa Murkowski",
"Alex Padilla",
],
)
for entry in debate["transcript"]:
print(f"\n{entry['senator']} ({entry['party']}):")
print(f" {entry['position'][:200]}...")
# Example 3: Simulate a vote
print("\n🗳️ Example: Senate Vote on Infrastructure Bill")
vote = senate.simulate_vote(
"A $1.2 trillion infrastructure bill including roads, bridges, broadband, and clean energy projects",
[
"Katie Britt",
"Mark Kelly",
"Lisa Murkowski",
"Alex Padilla",
"Tom Cotton",
],
)
print("Vote Results:")
for senator, vote_choice in vote["votes"].items():
print(f" {senator}: {vote_choice}")
print(f"\nFinal Result: {vote['results']['outcome']}")
print(
f"YEA: {vote['results']['yea']}, NAY: {vote['results']['nay']}, PRESENT: {vote['results']['present']}"
)
# Example 4: Committee hearing
print("\n🏛️ Example: Committee Hearing")
hearing = senate.run_committee_hearing(
"Armed Services",
"Military readiness and defense spending",
["Secretary of Defense", "Joint Chiefs Chairman"],
)
print("Armed Services Committee Hearing on Military Readiness")
for entry in hearing["transcript"][:3]: # Show first 3 entries
print(
f"\n{entry['type'].title()}: {entry['senator'] if 'senator' in entry else entry['witness']}"
)
print(f" {entry['content'][:150]}...")
# Example 5: Run all senators concurrently on a single task
print("\n🚀 Example: All Senators Concurrent Response")
all_senators_results = senate.run(
"What is your position on federal student loan forgiveness and how should we address the student debt crisis?"
)
print(f"\nTask: {all_senators_results['task']}")
print(
f"Selection Method: {all_senators_results['selection_method']}"
)
print(
f"Total Participants: {all_senators_results['total_participants']}"
)
print("\n📊 Party Breakdown:")
for party, senators in all_senators_results[
"party_breakdown"
].items():
if senators:
print(f"\n{party} ({len(senators)} senators):")
for senator_data in senators:
print(f" - {senator_data['senator']}")
# Example 6: Run 50% of senators randomly
print("\n🎲 Example: Random 50% of Senators")
random_results = senate.run(
"What is your position on climate change legislation and carbon pricing?",
participants=0.5, # 50% of all senators
)
print(f"\nTask: {random_results['task']}")
print(f"Selection Method: {random_results['selection_method']}")
print(
f"Total Participants: {random_results['total_participants']}"
)
print("\n📋 Selected Senators:")
for senator in random_results["participants"]:
party = senate._get_senator_party(senator)
print(f" - {senator} ({party})")
print("\n📊 Party Breakdown:")
for party, senators in random_results["party_breakdown"].items():
if senators:
print(f"\n{party} ({len(senators)} senators):")
for senator_data in senators:
print(f" - {senator_data['senator']}")
# Example 7: Run specific senators
print("\n🎯 Example: Specific Senators")
specific_results = senate.run(
"What is your position on military spending and defense policy?",
participants=[
"Katie Britt",
"Mark Kelly",
"Lisa Murkowski",
"Alex Padilla",
"Tom Cotton",
],
)
print(f"\nTask: {specific_results['task']}")
print(f"Selection Method: {specific_results['selection_method']}")
print(
f"Total Participants: {specific_results['total_participants']}"
)
print("\n📋 Responses by Senator:")
for senator, response in specific_results["responses"].items():
party = senate._get_senator_party(senator)
print(f"\n{senator} ({party}):")
print(f" {response[:200]}...")
if __name__ == "__main__":
main()
# # Example usage and demonstration
# def main():
# """
# Demonstrate the Senate simulation with various scenarios.
# """
# print("🏛️ US Senate Simulation Initializing...")
# # Create the simulation
# senate = SenatorSimulation()
# print("\n📊 Senate Composition:")
# composition = senate.get_senate_composition()
# print(json.dumps(composition, indent=2))
# print(f"\n🎭 Available Senators ({len(senate.senators)}):")
# for name in senate.senators.keys():
# party = senate._get_senator_party(name)
# print(f" - {name} ({party})")
# # Example 1: Individual senator response
# print("\n🗣 Example: Senator Response")
# senator = senate.get_senator("Katie Britt")
# response = senator.run(
# "What is your position on infrastructure spending and how would you pay for it?"
# )
# print(f"Senator Katie Britt: {response}")
# # Example 2: Simulate a debate
# print("\n💬 Example: Senate Debate on Climate Change")
# debate = senate.simulate_debate(
# "Climate change legislation and carbon pricing",
# [
# "Katie Britt",
# "Mark Kelly",
# "Lisa Murkowski",
# "Alex Padilla",
# ],
# )
# for entry in debate["transcript"]:
# print(f"\n{entry['senator']} ({entry['party']}):")
# print(f" {entry['position'][:200]}...")
# # Example 3: Simulate a vote
# print("\n🗳 Example: Senate Vote on Infrastructure Bill")
# vote = senate.simulate_vote(
# "A $1.2 trillion infrastructure bill including roads, bridges, broadband, and clean energy projects",
# [
# "Katie Britt",
# "Mark Kelly",
# "Lisa Murkowski",
# "Alex Padilla",
# "Tom Cotton",
# ],
# )
# print("Vote Results:")
# for senator, vote_choice in vote["votes"].items():
# print(f" {senator}: {vote_choice}")
# print(f"\nFinal Result: {vote['results']['outcome']}")
# print(
# f"YEA: {vote['results']['yea']}, NAY: {vote['results']['nay']}, PRESENT: {vote['results']['present']}"
# )
# # Example 4: Committee hearing
# print("\n🏛 Example: Committee Hearing")
# hearing = senate.run_committee_hearing(
# "Armed Services",
# "Military readiness and defense spending",
# ["Secretary of Defense", "Joint Chiefs Chairman"],
# )
# print("Armed Services Committee Hearing on Military Readiness")
# for entry in hearing["transcript"][:3]: # Show first 3 entries
# print(
# f"\n{entry['type'].title()}: {entry['senator'] if 'senator' in entry else entry['witness']}"
# )
# print(f" {entry['content'][:150]}...")
# # Example 5: Run all senators concurrently on a single task
# print("\n🚀 Example: All Senators Concurrent Response")
# all_senators_results = senate.run(
# "What is your position on federal student loan forgiveness and how should we address the student debt crisis?"
# )
# print(f"\nTask: {all_senators_results['task']}")
# print(
# f"Selection Method: {all_senators_results['selection_method']}"
# )
# print(
# f"Total Participants: {all_senators_results['total_participants']}"
# )
# print("\n📊 Party Breakdown:")
# for party, senators in all_senators_results[
# "party_breakdown"
# ].items():
# if senators:
# print(f"\n{party} ({len(senators)} senators):")
# for senator_data in senators:
# print(f" - {senator_data['senator']}")
# # Example 6: Run 50% of senators randomly
# print("\n🎲 Example: Random 50% of Senators")
# random_results = senate.run(
# "What is your position on climate change legislation and carbon pricing?",
# participants=0.5, # 50% of all senators
# )
# print(f"\nTask: {random_results['task']}")
# print(f"Selection Method: {random_results['selection_method']}")
# print(
# f"Total Participants: {random_results['total_participants']}"
# )
# print("\n📋 Selected Senators:")
# for senator in random_results["participants"]:
# party = senate._get_senator_party(senator)
# print(f" - {senator} ({party})")
# print("\n📊 Party Breakdown:")
# for party, senators in random_results["party_breakdown"].items():
# if senators:
# print(f"\n{party} ({len(senators)} senators):")
# for senator_data in senators:
# print(f" - {senator_data['senator']}")
# # Example 7: Run specific senators
# print("\n🎯 Example: Specific Senators")
# specific_results = senate.run(
# "What is your position on military spending and defense policy?",
# participants=[
# "Katie Britt",
# "Mark Kelly",
# "Lisa Murkowski",
# "Alex Padilla",
# "Tom Cotton",
# ],
# )
# print(f"\nTask: {specific_results['task']}")
# print(f"Selection Method: {specific_results['selection_method']}")
# print(
# f"Total Participants: {specific_results['total_participants']}"
# )
# print("\n📋 Responses by Senator:")
# for senator, response in specific_results["responses"].items():
# party = senate._get_senator_party(senator)
# print(f"\n{senator} ({party}):")
# print(f" {response[:200]}...")
# if __name__ == "__main__":
# main()

@ -1,6 +1,10 @@
"""
Flow:
Hierarchical Swarm Implementation
This module provides a hierarchical swarm architecture where a director agent coordinates
multiple worker agents to execute complex tasks through a structured workflow.
Flow:
1. User provides a task
2. Director creates a plan
3. Director distributes orders to agents individually or multiple tasks at once
@ -8,10 +12,21 @@ Flow:
5. Director evaluates results and issues new orders if needed (up to max_loops)
6. All context and conversation history is preserved throughout the process
Todo
- Add layers of management -- a list of list of agents that act as departments
- Auto build agents from input prompt - and then add them to the swarm
- Create an interactive and dynamic UI like we did with heavy swarm
- Make it faster and more high performance
Classes:
HierarchicalOrder: Represents a single task assignment to a specific agent
SwarmSpec: Contains the overall plan and list of orders for the swarm
HierarchicalSwarm: Main swarm orchestrator that manages director and worker agents
"""
import traceback
from typing import Any, Callable, List, Literal, Optional, Union
from typing import Any, Callable, List, Optional, Union
from pydantic import BaseModel, Field
@ -19,7 +34,6 @@ from swarms.prompts.hiearchical_system_prompt import (
HIEARCHICAL_SWARM_SYSTEM_PROMPT,
)
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import list_all_agents
from swarms.tools.base_tool import BaseTool
@ -33,6 +47,20 @@ logger = initialize_logger(log_folder="hierarchical_swarm")
class HierarchicalOrder(BaseModel):
"""
Represents a single task assignment within the hierarchical swarm.
This class defines the structure for individual task orders that the director
distributes to worker agents. Each order specifies which agent should execute
what specific task.
Attributes:
agent_name (str): The name of the agent assigned to execute the task.
Must match an existing agent in the swarm.
task (str): The specific task description to be executed by the assigned agent.
Should be clear and actionable.
"""
agent_name: str = Field(
...,
description="Specifies the name of the agent to which the task is assigned. This is a crucial element in the hierarchical structure of the swarm, as it determines the specific agent responsible for the task execution.",
@ -44,6 +72,20 @@ class HierarchicalOrder(BaseModel):
class SwarmSpec(BaseModel):
"""
Defines the complete specification for a hierarchical swarm execution.
This class contains the overall plan and all individual orders that the director
creates to coordinate the swarm's activities. It serves as the structured output
format for the director agent.
Attributes:
plan (str): A comprehensive plan outlining the sequence of actions and strategy
for the entire swarm to accomplish the given task.
orders (List[HierarchicalOrder]): A list of specific task assignments to
individual agents within the swarm.
"""
plan: str = Field(
...,
description="Outlines the sequence of actions to be taken by the swarm. This plan is a detailed roadmap that guides the swarm's behavior and decision-making.",
@ -54,50 +96,34 @@ class SwarmSpec(BaseModel):
)
SwarmType = Literal[
"AgentRearrange",
"MixtureOfAgents",
"SpreadSheetSwarm",
"SequentialWorkflow",
"ConcurrentWorkflow",
"GroupChat",
"MultiAgentRouter",
"AutoSwarmBuilder",
"HiearchicalSwarm",
"auto",
"MajorityVoting",
"MALT",
"DeepResearchSwarm",
"CouncilAsAJudge",
"InteractiveGroupChat",
]
class SwarmRouterCall(BaseModel):
goal: str = Field(
...,
description="The goal of the swarm router call. This is the goal that the swarm router will use to determine the best swarm to use.",
)
swarm_type: SwarmType = Field(
...,
description="The type of swarm to use. This is the type of swarm that the swarm router will use to determine the best swarm to use.",
)
task: str = Field(
...,
description="The task to be executed by the swarm router. This is the task that the swarm router will use to determine the best swarm to use.",
)
class HierarchicalSwarm(BaseSwarm):
class HierarchicalSwarm:
"""
_Representer a hierarchical swarm of agents, with a director that orchestrates tasks among the agents.
The workflow follows a hierarchical pattern:
1. Task is received and sent to the director
2. Director creates a plan and distributes orders to agents
3. Agents execute tasks and report back to the director
4. Director evaluates results and issues new orders if needed (up to max_loops)
5. All context and conversation history is preserved throughout the process
A hierarchical swarm orchestrator that coordinates multiple agents through a director.
This class implements a hierarchical architecture where a director agent creates
plans and distributes tasks to worker agents. The director can provide feedback
and iterate on results through multiple loops to achieve the desired outcome.
The swarm maintains conversation history throughout the process, allowing for
context-aware decision making and iterative refinement of results.
Attributes:
name (str): The name identifier for this swarm instance.
description (str): A description of the swarm's purpose and capabilities.
director (Optional[Union[Agent, Callable, Any]]): The director agent that
coordinates the swarm.
agents (List[Union[Agent, Callable, Any]]): List of worker agents available
for task execution.
max_loops (int): Maximum number of feedback loops the swarm can perform.
output_type (OutputType): Format for the final output of the swarm.
feedback_director_model_name (str): Model name for the feedback director.
director_name (str): Name identifier for the director agent.
director_model_name (str): Model name for the main director agent.
verbose (bool): Whether to enable detailed logging and progress tracking.
add_collaboration_prompt (bool): Whether to add collaboration prompts to agents.
planning_director_agent (Optional[Union[Agent, Callable, Any]]): Optional
planning agent.
director_feedback_on (bool): Whether director feedback is enabled.
"""
def __init__(
@ -121,22 +147,33 @@ class HierarchicalSwarm(BaseSwarm):
**kwargs,
):
"""
Initializes the HierarchicalSwarm with the given parameters.
:param name: The name of the swarm.
:param description: A description of the swarm.
:param director: The director agent that orchestrates tasks.
:param agents: A list of agents within the swarm.
:param max_loops: The maximum number of feedback loops between the director and agents.
:param output_type: The format in which to return the output (dict, str, or list).
:param verbose: Enable detailed logging with loguru.
Initialize a new HierarchicalSwarm instance.
Args:
name (str): The name identifier for this swarm instance.
description (str): A description of the swarm's purpose.
director (Optional[Union[Agent, Callable, Any]]): The director agent.
If None, a default director will be created.
agents (List[Union[Agent, Callable, Any]]): List of worker agents.
Must not be empty.
max_loops (int): Maximum number of feedback loops (must be > 0).
output_type (OutputType): Format for the final output.
feedback_director_model_name (str): Model name for feedback director.
director_name (str): Name identifier for the director agent.
director_model_name (str): Model name for the main director agent.
verbose (bool): Whether to enable detailed logging.
add_collaboration_prompt (bool): Whether to add collaboration prompts.
planning_director_agent (Optional[Union[Agent, Callable, Any]]):
Optional planning agent for enhanced planning capabilities.
director_feedback_on (bool): Whether director feedback is enabled.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Raises:
ValueError: If no agents are provided or max_loops is invalid.
"""
super().__init__(
name=name,
description=description,
agents=agents,
)
self.name = name
self.description = description
self.director = director
self.agents = agents
self.max_loops = max_loops
@ -155,33 +192,47 @@ class HierarchicalSwarm(BaseSwarm):
def init_swarm(self):
"""
Initializes the swarm.
Initialize the swarm with proper configuration and validation.
This method performs the following initialization steps:
1. Sets up logging if verbose mode is enabled
2. Creates a conversation instance for history tracking
3. Performs reliability checks on the configuration
4. Adds agent context to the director
Raises:
ValueError: If the swarm configuration is invalid.
"""
# Initialize logger only if verbose is enabled
if self.verbose:
logger.info(
f"🚀 Initializing HierarchicalSwarm: {self.name}"
)
logger.info(
f"📊 Configuration - Max loops: {self.max_loops}"
)
self.conversation = Conversation(time_enabled=False)
# Reliability checks
self.reliability_checks()
self.director = self.setup_director()
self.add_context_to_director()
if self.verbose:
logger.success(
f"✅ HierarchicalSwarm initialized successfully: Name {self.name}"
f"✅ HierarchicalSwarm: {self.name} initialized successfully."
)
def add_context_to_director(self):
"""Add agent context to the director's conversation."""
"""
Add agent context and collaboration information to the director's conversation.
This method ensures that the director has complete information about all
available agents, their capabilities, and how they can collaborate. This
context is essential for the director to make informed decisions about
task distribution.
Raises:
Exception: If adding context fails due to agent configuration issues.
"""
try:
if self.verbose:
logger.info("📝 Adding agent context to director")
@ -207,7 +258,18 @@ class HierarchicalSwarm(BaseSwarm):
)
def setup_director(self):
"""Set up the director agent with proper configuration."""
"""
Set up the director agent with proper configuration and tools.
Creates a new director agent with the SwarmSpec schema for structured
output, enabling it to create plans and distribute orders effectively.
Returns:
Agent: A configured director agent ready to coordinate the swarm.
Raises:
Exception: If director setup fails due to configuration issues.
"""
try:
if self.verbose:
logger.info("🎯 Setting up director agent")
@ -217,17 +279,6 @@ class HierarchicalSwarm(BaseSwarm):
if self.verbose:
logger.debug(f"📋 Director schema: {schema}")
# if self.director is not None:
# # if litellm_check_for_tools(self.director.model_name) is True:
# self.director.add_tool_schema([schema])
# if self.verbose:
# logger.success(
# "✅ Director agent setup completed successfully"
# )
# return self.director
# else:
return Agent(
agent_name=self.director_name,
agent_description="A director agent that can create a plan and distribute orders to agents",
@ -244,13 +295,20 @@ class HierarchicalSwarm(BaseSwarm):
def reliability_checks(self):
"""
Checks if there are any agents and a director set for the swarm.
Raises ValueError if either condition is not met.
Perform validation checks to ensure the swarm is properly configured.
This method validates:
1. That at least one agent is provided
2. That max_loops is greater than 0
3. That a director is available (creates default if needed)
Raises:
ValueError: If the swarm configuration is invalid.
"""
try:
if self.verbose:
logger.info(
f"🔍 Running reliability checks for swarm: {self.name}"
f"Hiearchical Swarm: {self.name} Reliability checks in progress..."
)
if not self.agents or len(self.agents) == 0:
@ -263,17 +321,12 @@ class HierarchicalSwarm(BaseSwarm):
"Max loops must be greater than 0. Please set a valid number of loops."
)
if not self.director:
raise ValueError(
"Director not set for the swarm. A director agent is required to coordinate and orchestrate tasks among the agents."
)
if self.director is None:
self.director = self.setup_director()
if self.verbose:
logger.success(
f"✅ Reliability checks passed for swarm: {self.name}"
)
logger.info(
f"📊 Swarm stats - Agents: {len(self.agents)}, Max loops: {self.max_loops}"
f"Hiearchical Swarm: {self.name} Reliability checks passed..."
)
except Exception as e:
@ -286,11 +339,22 @@ class HierarchicalSwarm(BaseSwarm):
img: str = None,
) -> SwarmSpec:
"""
Runs a task through the director agent with the current conversation context.
Execute the director agent with the given task and conversation context.
This method runs the director agent to create a plan and distribute orders
based on the current task and conversation history. If a planning director
agent is configured, it will first create a detailed plan before the main
director processes the task.
Args:
task (str): The task to be executed by the director.
img (str, optional): Optional image input for the task.
Returns:
SwarmSpec: The director's output containing the plan and orders.
:param task: The task to be executed by the director.
:param img: Optional image to be used with the task.
:return: The SwarmSpec containing the director's orders.
Raises:
Exception: If director execution fails.
"""
try:
if self.verbose:
@ -330,7 +394,25 @@ class HierarchicalSwarm(BaseSwarm):
def step(self, task: str, img: str = None, *args, **kwargs):
"""
Runs a single step of the hierarchical swarm.
Execute a single step of the hierarchical swarm workflow.
This method performs one complete iteration of the swarm's workflow:
1. Run the director to create a plan and orders
2. Parse the director's output to extract plan and orders
3. Execute all orders by calling the appropriate agents
4. Optionally generate director feedback on the results
Args:
task (str): The task to be processed in this step.
img (str, optional): Optional image input for the task.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
Any: The results from this step, either agent outputs or director feedback.
Raises:
Exception: If step execution fails.
"""
try:
if self.verbose:
@ -370,13 +452,27 @@ class HierarchicalSwarm(BaseSwarm):
def run(self, task: str, img: str = None, *args, **kwargs):
"""
Executes the hierarchical swarm for a specified number of feedback loops.
Execute the hierarchical swarm for the specified number of feedback loops.
This method orchestrates the complete swarm execution, performing multiple
iterations based on the max_loops configuration. Each iteration builds upon
the previous results, allowing for iterative refinement and improvement.
:param task: The initial task to be processed by the swarm.
:param img: Optional image input for the agents.
:param args: Additional positional arguments.
:param kwargs: Additional keyword arguments.
:return: The formatted conversation history as output.
The method maintains conversation history throughout all loops and provides
context from previous iterations to subsequent ones.
Args:
task (str): The initial task to be processed by the swarm.
img (str, optional): Optional image input for the agents.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
Any: The formatted conversation history as output, formatted according
to the output_type configuration.
Raises:
Exception: If swarm execution fails.
"""
try:
current_loop = 0
@ -448,7 +544,23 @@ class HierarchicalSwarm(BaseSwarm):
logger.error(error_msg)
def feedback_director(self, outputs: list):
"""Provide feedback from the director based on agent outputs."""
"""
Generate feedback from the director based on agent outputs.
This method creates a feedback director agent that analyzes the results
from worker agents and provides specific, actionable feedback for improvement.
The feedback is added to the conversation history and can be used in
subsequent iterations.
Args:
outputs (list): List of outputs from worker agents that need feedback.
Returns:
str: The director's feedback on the agent outputs.
Raises:
Exception: If feedback generation fails.
"""
try:
if self.verbose:
logger.info("📝 Generating director feedback")
@ -491,7 +603,24 @@ class HierarchicalSwarm(BaseSwarm):
self, agent_name: str, task: str, *args, **kwargs
):
"""
Calls a single agent with the given task.
Call a single agent by name to execute a specific task.
This method locates an agent by name and executes the given task with
the current conversation context. The agent's output is added to the
conversation history for future reference.
Args:
agent_name (str): The name of the agent to call.
task (str): The task to be executed by the agent.
*args: Additional positional arguments for the agent.
**kwargs: Additional keyword arguments for the agent.
Returns:
Any: The output from the agent's execution.
Raises:
ValueError: If the specified agent is not found in the swarm.
Exception: If agent execution fails.
"""
try:
if self.verbose:
@ -537,7 +666,22 @@ class HierarchicalSwarm(BaseSwarm):
def parse_orders(self, output):
"""
Parses the orders from the director's output.
Parse the director's output to extract plan and orders.
This method handles various output formats from the director agent and
extracts the plan and hierarchical orders. It supports both direct
dictionary formats and function call formats with JSON arguments.
Args:
output: The raw output from the director agent.
Returns:
tuple: A tuple containing (plan, orders) where plan is a string
and orders is a list of HierarchicalOrder objects.
Raises:
ValueError: If the output format is unexpected or cannot be parsed.
Exception: If parsing fails due to other errors.
"""
try:
if self.verbose:
@ -666,7 +810,20 @@ class HierarchicalSwarm(BaseSwarm):
def execute_orders(self, orders: list):
"""
Executes the orders from the director's output.
Execute all orders from the director's output.
This method iterates through all hierarchical orders and calls the
appropriate agents to execute their assigned tasks. Each agent's
output is collected and returned as a list.
Args:
orders (list): List of HierarchicalOrder objects to execute.
Returns:
list: List of outputs from all executed orders.
Raises:
Exception: If order execution fails.
"""
try:
if self.verbose:
@ -699,7 +856,23 @@ class HierarchicalSwarm(BaseSwarm):
self, tasks: List[str], img: str = None, *args, **kwargs
):
"""
Executes the hierarchical swarm for a list of tasks.
Execute the hierarchical swarm for multiple tasks in sequence.
This method processes a list of tasks sequentially, running the complete
swarm workflow for each task. Each task is processed independently with
its own conversation context and results.
Args:
tasks (List[str]): List of tasks to be processed by the swarm.
img (str, optional): Optional image input for the tasks.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
list: List of results for each processed task.
Raises:
Exception: If batched execution fails.
"""
try:
if self.verbose:

Loading…
Cancel
Save