diff --git a/cron_job_example.py b/cron_job_example.py deleted file mode 100644 index 855a9f31..00000000 --- a/cron_job_example.py +++ /dev/null @@ -1,54 +0,0 @@ -from swarms import Agent, CronJob -from loguru import logger - - -# Example usage -if __name__ == "__main__": - # Initialize the agent - agent = Agent( - agent_name="Quantitative-Trading-Agent", - agent_description="Advanced quantitative trading and algorithmic analysis agent", - system_prompt="""You are an expert quantitative trading agent with deep expertise in: - - Algorithmic trading strategies and implementation - - Statistical arbitrage and market making - - Risk management and portfolio optimization - - High-frequency trading systems - - Market microstructure analysis - - Quantitative research methodologies - - Financial mathematics and stochastic processes - - Machine learning applications in trading - - Your core responsibilities include: - 1. Developing and backtesting trading strategies - 2. Analyzing market data and identifying alpha opportunities - 3. Implementing risk management frameworks - 4. Optimizing portfolio allocations - 5. Conducting quantitative research - 6. Monitoring market microstructure - 7. Evaluating trading system performance - - You maintain strict adherence to: - - Mathematical rigor in all analyses - - Statistical significance in strategy development - - Risk-adjusted return optimization - - Market impact minimization - - Regulatory compliance - - Transaction cost analysis - - Performance attribution - - You communicate in precise, technical terms while maintaining clarity for stakeholders.""", - max_loops=1, - model_name="gpt-4.1", - dynamic_temperature_enabled=True, - output_type="str-all-except-first", - streaming_on=True, - print_on=True, - telemetry_enable=False, - ) - - # Example 1: Basic usage with just a task - logger.info("Starting example cron job") - cron_job = CronJob(agent=agent, interval="10seconds") - cron_job.run( - task="What are the best top 3 etfs for gold coverage?" - ) diff --git a/docs/swarms/structs/conversation.md b/docs/swarms/structs/conversation.md index 0e3b9bf0..e1fd7c7a 100644 --- a/docs/swarms/structs/conversation.md +++ b/docs/swarms/structs/conversation.md @@ -6,96 +6,33 @@ The `Conversation` class is a powerful and flexible tool for managing conversati ### Key Features -- **Multiple Storage Backends**: Support for various storage solutions: - - In-memory: Fast, temporary storage for testing and development - - Supabase: PostgreSQL-based cloud storage with real-time capabilities - - Redis: High-performance caching and persistence - - SQLite: Local file-based storage - - DuckDB: Analytical workloads and columnar storage - - Pulsar: Event streaming for distributed systems - - Mem0: Memory-based storage with mem0 integration - -- **Token Management**: - - Built-in token counting with configurable models - - Automatic token tracking for input/output messages - - Token usage analytics and reporting - - Context length management - -- **Metadata and Categories**: - - Support for message metadata - - Message categorization (input/output) - - Role-based message tracking - - Custom message IDs - -- **Data Export/Import**: - - JSON and YAML export formats - - Automatic saving and loading - - Conversation history management - - Batch operations support - -- **Advanced Features**: - - Message search and filtering - - Conversation analytics - - Multi-agent support - - Error handling and fallbacks - - Type hints and validation +| Feature Category | Features / Description | +|----------------------------|-------------------------------------------------------------------------------------------------------------| +| **Multiple Storage Backends** | - In-memory: Fast, temporary storage for testing and development
- Supabase: PostgreSQL-based cloud storage with real-time capabilities
- Redis: High-performance caching and persistence
- SQLite: Local file-based storage
- DuckDB: Analytical workloads and columnar storage
- Pulsar: Event streaming for distributed systems
- Mem0: Memory-based storage with mem0 integration | +| **Token Management** | - Built-in token counting with configurable models
- Automatic token tracking for input/output messages
- Token usage analytics and reporting
- Context length management | +| **Metadata and Categories** | - Support for message metadata
- Message categorization (input/output)
- Role-based message tracking
- Custom message IDs | +| **Data Export/Import** | - JSON and YAML export formats
- Automatic saving and loading
- Conversation history management
- Batch operations support | +| **Advanced Features** | - Message search and filtering
- Conversation analytics
- Multi-agent support
- Error handling and fallbacks
- Type hints and validation | ### Use Cases -1. **Chatbot Development**: - - Store and manage conversation history - - Track token usage and context length - - Analyze conversation patterns - -2. **Multi-Agent Systems**: - - Coordinate multiple AI agents - - Track agent interactions - - Store agent outputs and metadata - -3. **Analytics Applications**: - - Track conversation metrics - - Generate usage reports - - Analyze user interactions - -4. **Production Systems**: - - Persistent storage with various backends - - Error handling and recovery - - Scalable conversation management - -5. **Development and Testing**: - - Fast in-memory storage - - Debugging support - - Easy export/import of test data +| Use Case | Features / Description | +|----------------------------|--------------------------------------------------------------------------------------------------------| +| **Chatbot Development** | - Store and manage conversation history
- Track token usage and context length
- Analyze conversation patterns | +| **Multi-Agent Systems** | - Coordinate multiple AI agents
- Track agent interactions
- Store agent outputs and metadata | +| **Analytics Applications** | - Track conversation metrics
- Generate usage reports
- Analyze user interactions | +| **Production Systems** | - Persistent storage with various backends
- Error handling and recovery
- Scalable conversation management | +| **Development and Testing**| - Fast in-memory storage
- Debugging support
- Easy export/import of test data | ### Best Practices -1. **Storage Selection**: - - Use in-memory for testing and development - - Choose Supabase for multi-user cloud applications - - Use Redis for high-performance requirements - - Select SQLite for single-user local applications - - Pick DuckDB for analytical workloads - - Opt for Pulsar in distributed systems - -2. **Token Management**: - - Enable token counting for production use - - Set appropriate context lengths - - Monitor token usage with export_and_count_categories() - -3. **Error Handling**: - - Implement proper fallback mechanisms - - Use type hints for better code reliability - - Monitor and log errors appropriately - -4. **Data Management**: - - Use appropriate export formats (JSON/YAML) - - Implement regular backup strategies - - Clean up old conversations when needed - -5. **Security**: - - Use environment variables for sensitive credentials - - Implement proper access controls - - Validate input data +| Category | Best Practices | +|---------------------|------------------------------------------------------------------------------------------------------------------------| +| **Storage Selection** | - Use in-memory for testing and development
- Choose Supabase for multi-user cloud applications
- Use Redis for high-performance requirements
- Select SQLite for single-user local applications
- Pick DuckDB for analytical workloads
- Opt for Pulsar in distributed systems | +| **Token Management** | - Enable token counting for production use
- Set appropriate context lengths
- Monitor token usage with `export_and_count_categories()` | +| **Error Handling** | - Implement proper fallback mechanisms
- Use type hints for better code reliability
- Monitor and log errors appropriately | +| **Data Management** | - Use appropriate export formats (JSON/YAML)
- Implement regular backup strategies
- Clean up old conversations when needed | +| **Security** | - Use environment variables for sensitive credentials
- Implement proper access controls
- Validate input data | ## Table of Contents @@ -113,13 +50,15 @@ The `Conversation` class is designed to manage conversations by keeping track of **New in this version**: The class now supports multiple storage backends for persistent conversation storage: -- **"in-memory"**: Default memory-based storage (no persistence) -- **"mem0"**: Memory-based storage with mem0 integration (requires: `pip install mem0ai`) -- **"supabase"**: PostgreSQL-based storage using Supabase (requires: `pip install supabase`) -- **"redis"**: Redis-based storage (requires: `pip install redis`) -- **"sqlite"**: SQLite-based storage (built-in to Python) -- **"duckdb"**: DuckDB-based storage (requires: `pip install duckdb`) -- **"pulsar"**: Apache Pulsar messaging backend (requires: `pip install pulsar-client`) +| Backend | Description | Requirements | +|--------------|-------------------------------------------------------------------------------------------------------------|------------------------------------| +| **in-memory**| Default memory-based storage (no persistence) | None (built-in) | +| **mem0** | Memory-based storage with mem0 integration | `pip install mem0ai` | +| **supabase** | PostgreSQL-based storage using Supabase | `pip install supabase` | +| **redis** | Redis-based storage | `pip install redis` | +| **sqlite** | SQLite-based storage (local file) | None (built-in) | +| **duckdb** | DuckDB-based storage (analytical workloads, columnar storage) | `pip install duckdb` | +| **pulsar** | Apache Pulsar messaging backend | `pip install pulsar-client` | All backends use **lazy loading** - database dependencies are only imported when the specific backend is instantiated. Each backend provides helpful error messages if required packages are not installed. @@ -132,7 +71,6 @@ All backends use **lazy loading** - database dependencies are only imported when | system_prompt | Optional[str] | System prompt for the conversation | | time_enabled | bool | Flag to enable time tracking for messages | | autosave | bool | Flag to enable automatic saving | -| save_enabled | bool | Flag to control if saving is enabled | | save_filepath | str | File path for saving conversation history | | load_filepath | str | File path for loading conversation history | | conversation_history | list | List storing conversation messages | diff --git a/examples/cron_job_examples/cron_job_example.py b/examples/cron_job_examples/cron_job_example.py new file mode 100644 index 00000000..b7c0d501 --- /dev/null +++ b/examples/cron_job_examples/cron_job_example.py @@ -0,0 +1,247 @@ +from loguru import logger +import yfinance as yf +import json + + +def get_figma_stock_data(stock: str) -> str: + """ + Fetches comprehensive stock data for Figma (FIG) using Yahoo Finance. + + Returns: + Dict[str, Any]: A dictionary containing comprehensive Figma stock data including: + - Current price and market data + - Company information + - Financial metrics + - Historical data summary + - Trading statistics + + Raises: + Exception: If there's an error fetching the data from Yahoo Finance + """ + try: + # Initialize Figma stock ticker + figma = yf.Ticker(stock) + + # Get current stock info + info = figma.info + + # Get recent historical data (last 30 days) + hist = figma.history(period="30d") + + # Get real-time fast info + fast_info = figma.fast_info + + # Compile comprehensive data + figma_data = { + "company_info": { + "name": info.get("longName", "Figma Inc."), + "symbol": "FIG", + "sector": info.get("sector", "N/A"), + "industry": info.get("industry", "N/A"), + "website": info.get("website", "N/A"), + "description": info.get("longBusinessSummary", "N/A"), + }, + "current_market_data": { + "current_price": info.get("currentPrice", "N/A"), + "previous_close": info.get("previousClose", "N/A"), + "open": info.get("open", "N/A"), + "day_low": info.get("dayLow", "N/A"), + "day_high": info.get("dayHigh", "N/A"), + "volume": info.get("volume", "N/A"), + "market_cap": info.get("marketCap", "N/A"), + "price_change": ( + info.get("currentPrice", 0) + - info.get("previousClose", 0) + if info.get("currentPrice") + and info.get("previousClose") + else "N/A" + ), + "price_change_percent": info.get( + "regularMarketChangePercent", "N/A" + ), + }, + "financial_metrics": { + "pe_ratio": info.get("trailingPE", "N/A"), + "forward_pe": info.get("forwardPE", "N/A"), + "price_to_book": info.get("priceToBook", "N/A"), + "price_to_sales": info.get( + "priceToSalesTrailing12Months", "N/A" + ), + "enterprise_value": info.get( + "enterpriseValue", "N/A" + ), + "beta": info.get("beta", "N/A"), + "dividend_yield": info.get("dividendYield", "N/A"), + "payout_ratio": info.get("payoutRatio", "N/A"), + }, + "trading_statistics": { + "fifty_day_average": info.get( + "fiftyDayAverage", "N/A" + ), + "two_hundred_day_average": info.get( + "twoHundredDayAverage", "N/A" + ), + "fifty_two_week_low": info.get( + "fiftyTwoWeekLow", "N/A" + ), + "fifty_two_week_high": info.get( + "fiftyTwoWeekHigh", "N/A" + ), + "shares_outstanding": info.get( + "sharesOutstanding", "N/A" + ), + "float_shares": info.get("floatShares", "N/A"), + "shares_short": info.get("sharesShort", "N/A"), + "short_ratio": info.get("shortRatio", "N/A"), + }, + "recent_performance": { + "last_30_days": { + "start_price": ( + hist.iloc[0]["Close"] + if not hist.empty + else "N/A" + ), + "end_price": ( + hist.iloc[-1]["Close"] + if not hist.empty + else "N/A" + ), + "total_return": ( + ( + hist.iloc[-1]["Close"] + - hist.iloc[0]["Close"] + ) + / hist.iloc[0]["Close"] + * 100 + if not hist.empty + else "N/A" + ), + "highest_price": ( + hist["High"].max() + if not hist.empty + else "N/A" + ), + "lowest_price": ( + hist["Low"].min() if not hist.empty else "N/A" + ), + "average_volume": ( + hist["Volume"].mean() + if not hist.empty + else "N/A" + ), + } + }, + "real_time_data": { + "last_price": ( + fast_info.last_price + if hasattr(fast_info, "last_price") + else "N/A" + ), + "last_volume": ( + fast_info.last_volume + if hasattr(fast_info, "last_volume") + else "N/A" + ), + "bid": ( + fast_info.bid + if hasattr(fast_info, "bid") + else "N/A" + ), + "ask": ( + fast_info.ask + if hasattr(fast_info, "ask") + else "N/A" + ), + "bid_size": ( + fast_info.bid_size + if hasattr(fast_info, "bid_size") + else "N/A" + ), + "ask_size": ( + fast_info.ask_size + if hasattr(fast_info, "ask_size") + else "N/A" + ), + }, + } + + logger.info("Successfully fetched Figma (FIG) stock data") + return json.dumps(figma_data, indent=4) + + except Exception as e: + logger.error(f"Error fetching Figma stock data: {e}") + raise Exception(f"Failed to fetch Figma stock data: {e}") + + +# # Example usage +# # Initialize the quantitative trading agent +# agent = Agent( +# agent_name="Quantitative-Trading-Agent", +# agent_description="Advanced quantitative trading and algorithmic analysis agent specializing in stock analysis and trading strategies", +# system_prompt=f"""You are an expert quantitative trading agent with deep expertise in: +# - Algorithmic trading strategies and implementation +# - Statistical arbitrage and market making +# - Risk management and portfolio optimization +# - High-frequency trading systems +# - Market microstructure analysis +# - Quantitative research methodologies +# - Financial mathematics and stochastic processes +# - Machine learning applications in trading +# - Technical analysis and chart patterns +# - Fundamental analysis and valuation models +# - Options trading and derivatives +# - Market sentiment analysis + +# Your core responsibilities include: +# 1. Developing and backtesting trading strategies +# 2. Analyzing market data and identifying alpha opportunities +# 3. Implementing risk management frameworks +# 4. Optimizing portfolio allocations +# 5. Conducting quantitative research +# 6. Monitoring market microstructure +# 7. Evaluating trading system performance +# 8. Performing comprehensive stock analysis +# 9. Generating trading signals and recommendations +# 10. Risk assessment and position sizing + +# When analyzing stocks, you should: +# - Evaluate technical indicators and chart patterns +# - Assess fundamental metrics and valuation ratios +# - Analyze market sentiment and momentum +# - Consider macroeconomic factors +# - Provide risk-adjusted return projections +# - Suggest optimal entry/exit points +# - Calculate position sizing recommendations +# - Identify potential catalysts and risks + +# You maintain strict adherence to: +# - Mathematical rigor in all analyses +# - Statistical significance in strategy development +# - Risk-adjusted return optimization +# - Market impact minimization +# - Regulatory compliance +# - Transaction cost analysis +# - Performance attribution +# - Data-driven decision making + +# You communicate in precise, technical terms while maintaining clarity for stakeholders. +# Data: {get_figma_stock_data('FIG')} + +# """, +# max_loops=1, +# model_name="gpt-4o-mini", +# dynamic_temperature_enabled=True, +# output_type="str-all-except-first", +# streaming_on=True, +# print_on=True, +# telemetry_enable=False, +# ) + +# # Example 1: Basic usage with just a task +# logger.info("Starting quantitative analysis cron job for Figma (FIG)") +# cron_job = CronJob(agent=agent, interval="10seconds") +# cron_job.run( +# task="Analyze the Figma (FIG) stock comprehensively using the available stock data. Provide a detailed quantitative analysis" +# ) + +print(get_figma_stock_data("FIG")) diff --git a/examples/cron_job_examples/cron_job_figma_stock_swarms_tools_example.py b/examples/cron_job_examples/cron_job_figma_stock_swarms_tools_example.py new file mode 100644 index 00000000..da914c63 --- /dev/null +++ b/examples/cron_job_examples/cron_job_figma_stock_swarms_tools_example.py @@ -0,0 +1,105 @@ +""" +Example script demonstrating how to fetch Figma (FIG) stock data using swarms_tools Yahoo Finance API. +This shows the alternative approach using the existing swarms_tools package. +""" + +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms_tools import yahoo_finance_api +from loguru import logger +import json + + +def get_figma_data_with_swarms_tools(): + """ + Fetches Figma stock data using the swarms_tools Yahoo Finance API. + + Returns: + dict: Figma stock data from swarms_tools + """ + try: + logger.info("Fetching Figma stock data using swarms_tools...") + figma_data = yahoo_finance_api(["FIG"]) + return figma_data + except Exception as e: + logger.error(f"Error fetching data with swarms_tools: {e}") + raise + + +def analyze_figma_with_agent(): + """ + Uses a Swarms agent to analyze Figma stock data. + """ + try: + # Initialize the agent with Yahoo Finance tool + agent = Agent( + agent_name="Figma-Analysis-Agent", + agent_description="Specialized agent for analyzing Figma stock data", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4o-mini", + tools=[yahoo_finance_api], + dynamic_temperature_enabled=True, + ) + + # Ask the agent to analyze Figma + analysis = agent.run( + "Analyze the current stock data for Figma (FIG) and provide insights on its performance, valuation metrics, and recent trends." + ) + + return analysis + + except Exception as e: + logger.error(f"Error in agent analysis: {e}") + raise + + +def main(): + """ + Main function to demonstrate different approaches for Figma stock data. + """ + logger.info("Starting Figma stock analysis with swarms_tools") + + try: + # Method 1: Direct API call + print("\n" + "=" * 60) + print("METHOD 1: Direct swarms_tools API call") + print("=" * 60) + + figma_data = get_figma_data_with_swarms_tools() + print("Raw data from swarms_tools:") + print(json.dumps(figma_data, indent=2, default=str)) + + # Method 2: Agent-based analysis + print("\n" + "=" * 60) + print("METHOD 2: Agent-based analysis") + print("=" * 60) + + analysis = analyze_figma_with_agent() + print("Agent analysis:") + print(analysis) + + # Method 3: Comparison with custom function + print("\n" + "=" * 60) + print("METHOD 3: Comparison with custom function") + print("=" * 60) + + from cron_job_examples.cron_job_example import ( + get_figma_stock_data_simple, + ) + + custom_data = get_figma_stock_data_simple() + print("Custom function output:") + print(custom_data) + + logger.info("All methods completed successfully!") + + except Exception as e: + logger.error(f"Error in main function: {e}") + print(f"Error: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/cron_job_examples/figma_stock_example.py b/examples/cron_job_examples/figma_stock_example.py new file mode 100644 index 00000000..f9760462 --- /dev/null +++ b/examples/cron_job_examples/figma_stock_example.py @@ -0,0 +1,79 @@ +""" +Example script demonstrating how to fetch Figma (FIG) stock data using Yahoo Finance. +""" + +from cron_job_examples.cron_job_example import ( + get_figma_stock_data, + get_figma_stock_data_simple, +) +from loguru import logger +import json + + +def main(): + """ + Main function to demonstrate Figma stock data fetching. + """ + logger.info("Starting Figma stock data demonstration") + + try: + # Example 1: Get comprehensive data as dictionary + logger.info("Fetching comprehensive Figma stock data...") + figma_data = get_figma_stock_data() + + # Print the data in a structured format + print("\n" + "=" * 50) + print("COMPREHENSIVE FIGMA STOCK DATA") + print("=" * 50) + print(json.dumps(figma_data, indent=2, default=str)) + + # Example 2: Get simple formatted data + logger.info("Fetching simple formatted Figma stock data...") + simple_data = get_figma_stock_data_simple() + + print("\n" + "=" * 50) + print("SIMPLE FORMATTED FIGMA STOCK DATA") + print("=" * 50) + print(simple_data) + + # Example 3: Access specific data points + logger.info("Accessing specific data points...") + + current_price = figma_data["current_market_data"][ + "current_price" + ] + market_cap = figma_data["current_market_data"]["market_cap"] + pe_ratio = figma_data["financial_metrics"]["pe_ratio"] + + print("\nKey Metrics:") + print(f"Current Price: ${current_price}") + print(f"Market Cap: ${market_cap:,}") + print(f"P/E Ratio: {pe_ratio}") + + # Example 4: Check if stock is performing well + price_change = figma_data["current_market_data"][ + "price_change" + ] + if isinstance(price_change, (int, float)): + if price_change > 0: + print( + f"\nšŸ“ˆ Figma stock is up ${price_change:.2f} today!" + ) + elif price_change < 0: + print( + f"\nšŸ“‰ Figma stock is down ${abs(price_change):.2f} today." + ) + else: + print("\nāž”ļø Figma stock is unchanged today.") + + logger.info( + "Figma stock data demonstration completed successfully!" + ) + + except Exception as e: + logger.error(f"Error in main function: {e}") + print(f"Error: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/cron_job_examples/solana_price_tracker.py b/examples/cron_job_examples/solana_price_tracker.py new file mode 100644 index 00000000..0ae048ab --- /dev/null +++ b/examples/cron_job_examples/solana_price_tracker.py @@ -0,0 +1,257 @@ +from swarms import Agent, CronJob +from loguru import logger +import requests +import json +from datetime import datetime + + +def get_solana_price() -> str: + """ + Fetches comprehensive Solana (SOL) price data using CoinGecko API. + + Returns: + str: A JSON formatted string containing Solana's current price and market data including: + - Current price in USD + - Market cap + - 24h volume + - 24h price change + - Last updated timestamp + + Raises: + Exception: If there's an error fetching the data from CoinGecko API + """ + try: + # CoinGecko API endpoint for simple price data + url = "https://api.coingecko.com/api/v3/simple/price" + params = { + "ids": "solana", # Solana's CoinGecko ID + "vs_currencies": "usd", + "include_market_cap": True, + "include_24hr_vol": True, + "include_24hr_change": True, + "include_last_updated_at": True, + } + + # Make API request with timeout + response = requests.get(url, params=params, timeout=10) + response.raise_for_status() + + # Parse response data + data = response.json() + + if "solana" not in data: + raise Exception("Solana data not found in API response") + + solana_data = data["solana"] + + # Compile comprehensive data + solana_info = { + "timestamp": datetime.now().isoformat(), + "coin_info": { + "name": "Solana", + "symbol": "SOL", + "coin_id": "solana", + }, + "price_data": { + "current_price_usd": solana_data.get("usd", "N/A"), + "market_cap_usd": solana_data.get( + "usd_market_cap", "N/A" + ), + "volume_24h_usd": solana_data.get( + "usd_24h_vol", "N/A" + ), + "price_change_24h_percent": solana_data.get( + "usd_24h_change", "N/A" + ), + "last_updated_at": solana_data.get( + "last_updated_at", "N/A" + ), + }, + "formatted_data": { + "price_formatted": ( + f"${solana_data.get('usd', 'N/A'):,.2f}" + if solana_data.get("usd") + else "N/A" + ), + "market_cap_formatted": ( + f"${solana_data.get('usd_market_cap', 'N/A'):,.0f}" + if solana_data.get("usd_market_cap") + else "N/A" + ), + "volume_formatted": ( + f"${solana_data.get('usd_24h_vol', 'N/A'):,.0f}" + if solana_data.get("usd_24h_vol") + else "N/A" + ), + "change_formatted": ( + f"{solana_data.get('usd_24h_change', 'N/A'):+.2f}%" + if solana_data.get("usd_24h_change") is not None + else "N/A" + ), + }, + } + + logger.info( + f"Successfully fetched Solana price: ${solana_data.get('usd', 'N/A')}" + ) + return json.dumps(solana_info, indent=4) + + except requests.RequestException as e: + error_msg = f"API request failed: {e}" + logger.error(error_msg) + return json.dumps( + { + "error": error_msg, + "timestamp": datetime.now().isoformat(), + "status": "failed", + }, + indent=4, + ) + except Exception as e: + error_msg = f"Error fetching Solana price data: {e}" + logger.error(error_msg) + return json.dumps( + { + "error": error_msg, + "timestamp": datetime.now().isoformat(), + "status": "failed", + }, + indent=4, + ) + + +def analyze_solana_data(data: str) -> str: + """ + Analyzes Solana price data and provides insights. + + Args: + data (str): JSON string containing Solana price data + + Returns: + str: Analysis and insights about the current Solana market data + """ + try: + # Parse the data + solana_data = json.loads(data) + + if "error" in solana_data: + return f"āŒ Error in data: {solana_data['error']}" + + price_data = solana_data.get("price_data", {}) + formatted_data = solana_data.get("formatted_data", {}) + + # Extract key metrics + current_price = price_data.get("current_price_usd") + price_change = price_data.get("price_change_24h_percent") + volume_24h = price_data.get("volume_24h_usd") + market_cap = price_data.get("market_cap_usd") + + # Generate analysis + analysis = f""" +šŸ” **Solana (SOL) Market Analysis** - {solana_data.get('timestamp', 'N/A')} + +šŸ’° **Current Price**: {formatted_data.get('price_formatted', 'N/A')} +šŸ“Š **24h Change**: {formatted_data.get('change_formatted', 'N/A')} +šŸ’Ž **Market Cap**: {formatted_data.get('market_cap_formatted', 'N/A')} +šŸ“ˆ **24h Volume**: {formatted_data.get('volume_formatted', 'N/A')} + +""" + + # Add sentiment analysis based on price change + if price_change is not None: + if price_change > 5: + analysis += "šŸš€ **Sentiment**: Strongly Bullish - Significant positive momentum\n" + elif price_change > 1: + analysis += "šŸ“ˆ **Sentiment**: Bullish - Positive price action\n" + elif price_change > -1: + analysis += ( + "āž”ļø **Sentiment**: Neutral - Sideways movement\n" + ) + elif price_change > -5: + analysis += "šŸ“‰ **Sentiment**: Bearish - Negative price action\n" + else: + analysis += "šŸ”» **Sentiment**: Strongly Bearish - Significant decline\n" + + # Add volume analysis + if volume_24h and market_cap: + try: + volume_market_cap_ratio = ( + volume_24h / market_cap + ) * 100 + if volume_market_cap_ratio > 10: + analysis += "šŸ”„ **Volume**: High trading activity - Strong market interest\n" + elif volume_market_cap_ratio > 5: + analysis += ( + "šŸ“Š **Volume**: Moderate trading activity\n" + ) + else: + analysis += "😓 **Volume**: Low trading activity - Limited market movement\n" + except (TypeError, ZeroDivisionError): + analysis += "šŸ“Š **Volume**: Unable to calculate volume/market cap ratio\n" + + analysis += f"\nā° **Last Updated**: {price_data.get('last_updated_at', 'N/A')}" + + return analysis + + except json.JSONDecodeError as e: + return f"āŒ Error parsing data: {e}" + except Exception as e: + return f"āŒ Error analyzing data: {e}" + + +# Initialize the Solana analysis agent +agent = Agent( + agent_name="Solana-Price-Analyzer", + agent_description="Specialized agent for analyzing Solana (SOL) cryptocurrency price data and market trends", + system_prompt=f"""You are an expert cryptocurrency analyst specializing in Solana (SOL) analysis. Your expertise includes: + +- Technical analysis and chart patterns +- Market sentiment analysis +- Volume and liquidity analysis +- Price action interpretation +- Market cap and valuation metrics +- Cryptocurrency market dynamics +- DeFi ecosystem analysis +- Blockchain technology trends + +When analyzing Solana data, you should: +- Evaluate price movements and trends +- Assess market sentiment and momentum +- Consider volume and liquidity factors +- Analyze market cap positioning +- Provide actionable insights +- Identify potential catalysts or risks +- Consider broader market context + +You communicate clearly and provide practical analysis that helps users understand Solana's current market position and potential future movements. + +Current Solana Data: {get_solana_price()} +""", + max_loops=1, + model_name="gpt-4o-mini", + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + streaming_on=False, # need to fix this bug where streaming is working but makes copies of the border when you scroll on the terminal + print_on=True, + telemetry_enable=False, +) + + +def main(): + """ + Main function to run the Solana price tracking cron job. + """ + logger.info("šŸš€ Starting Solana price tracking cron job") + logger.info("šŸ“Š Fetching Solana price every 10 seconds...") + + # Create cron job that runs every 10 seconds + cron_job = CronJob(agent=agent, interval="30seconds") + + # Run the cron job with analysis task + cron_job.run( + task="Analyze the current Solana (SOL) price data comprehensively. Provide detailed market analysis including price trends, volume analysis, market sentiment, and actionable insights. Format your response clearly with emojis and structured sections." + ) + + +if __name__ == "__main__": + main() diff --git a/examples/multi_agent/graphworkflow_examples/graph_workflow_example.py b/examples/multi_agent/graphworkflow_examples/graph_workflow_example.py index 75aa8b4d..22e4b0a9 100644 --- a/examples/multi_agent/graphworkflow_examples/graph_workflow_example.py +++ b/examples/multi_agent/graphworkflow_examples/graph_workflow_example.py @@ -1,5 +1,4 @@ -from swarms import Agent -from swarms.structs.graph_workflow import GraphWorkflow +from swarms import Agent, GraphWorkflow from swarms.prompts.multi_agent_collab_prompt import ( MULTI_AGENT_COLLAB_PROMPT_TWO, ) @@ -11,6 +10,7 @@ agent1 = Agent( max_loops=1, system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO, # Set collaboration prompt ) + agent2 = Agent( agent_name="ResearchAgent2", model_name="gpt-4.1", @@ -19,7 +19,11 @@ agent2 = Agent( ) # Build the workflow with only agents as nodes -workflow = GraphWorkflow() +workflow = GraphWorkflow( + name="Research Workflow", + description="A workflow for researching the best arbitrage trading strategies for altcoins", + auto_compile=True, +) workflow.add_node(agent1) workflow.add_node(agent2) @@ -27,27 +31,15 @@ workflow.add_node(agent2) workflow.add_edge(agent1.agent_name, agent2.agent_name) # Visualize the workflow using Graphviz -print("\nšŸ“Š Creating workflow visualization...") -try: - viz_output = workflow.visualize( - output_path="simple_workflow_graph", - format="png", - view=True, # Auto-open the generated image - show_parallel_patterns=True, - ) - print(f"āœ… Workflow visualization saved to: {viz_output}") -except Exception as e: - print(f"āš ļø Graphviz not available, using text visualization: {e}") - workflow.visualize() +workflow.visualize() + +workflow.compile() # Export workflow to JSON workflow_json = workflow.to_json() -print( - f"\nšŸ’¾ Workflow exported to JSON ({len(workflow_json)} characters)" -) +print(workflow_json) # Run the workflow and print results -print("\nšŸš€ Executing workflow...") results = workflow.run( task="What are the best arbitrage trading strategies for altcoins? Give me research papers and articles on the topic." ) diff --git a/examples/multi_agent/heavy_swarm_examples/heavy_swarm_example.py b/examples/multi_agent/heavy_swarm_examples/heavy_swarm_example.py index 57715ff7..588e3f3e 100644 --- a/examples/multi_agent/heavy_swarm_examples/heavy_swarm_example.py +++ b/examples/multi_agent/heavy_swarm_examples/heavy_swarm_example.py @@ -1,16 +1,32 @@ -from swarms.structs.heavy_swarm import HeavySwarm +from swarms import HeavySwarm -swarm = HeavySwarm( - worker_model_name="claude-3-5-sonnet-20240620", - show_dashboard=True, - question_agent_model_name="gpt-4.1", - loops_per_agent=1, -) +def main(): + """ + Run a HeavySwarm query to find the best 3 gold ETFs. + This function initializes a HeavySwarm instance and queries it to provide + the top 3 gold exchange-traded funds (ETFs), requesting clear, structured results. + """ + swarm = HeavySwarm( + name="Gold ETF Research Team", + description="A team of agents that research the best gold ETFs", + worker_model_name="claude-sonnet-4-latest", + show_dashboard=True, + question_agent_model_name="gpt-4.1", + loops_per_agent=1, + ) -out = swarm.run( - "Provide 3 publicly traded biotech companies that are currently trading below their cash value. For each company identified, provide available data or projections for the next 6 months, including any relevant financial metrics, upcoming catalysts, or events that could impact valuation. Present your findings in a clear, structured format. Be very specific and provide their ticker symbol, name, and the current price, cash value, and the percentage difference between the two." -) + prompt = ( + "Find the best 3 gold ETFs. For each ETF, provide the ticker symbol, " + "full name, current price, expense ratio, assets under management, and " + "a brief explanation of why it is considered among the best. Present the information " + "in a clear, structured format suitable for investors." + ) -print(out) + out = swarm.run(prompt) + print(out) + + +if __name__ == "__main__": + main() diff --git a/examples/multi_agent/heavy_swarm_examples/medical_heavy_swarm_example.py b/examples/multi_agent/heavy_swarm_examples/medical_heavy_swarm_example.py new file mode 100644 index 00000000..ad460310 --- /dev/null +++ b/examples/multi_agent/heavy_swarm_examples/medical_heavy_swarm_example.py @@ -0,0 +1,34 @@ +from swarms import HeavySwarm + + +def main(): + """ + Run a HeavySwarm query to find the best and most promising treatments for diabetes. + + This function initializes a HeavySwarm instance and queries it to provide + the top current and theoretical treatments for diabetes, requesting clear, + structured, and evidence-based results suitable for medical research or clinical review. + """ + swarm = HeavySwarm( + name="Diabetes Treatment Research Team", + description="A team of agents that research the best and most promising treatments for diabetes, including theoretical approaches.", + worker_model_name="claude-sonnet-4-20250514", + show_dashboard=True, + question_agent_model_name="gpt-4.1", + loops_per_agent=1, + ) + + prompt = ( + "Identify the best and most promising treatments for diabetes, including both current standard therapies and theoretical or experimental approaches. " + "For each treatment, provide: the treatment name, type (e.g., medication, lifestyle intervention, device, gene therapy, etc.), " + "mechanism of action, current stage of research or approval status, key clinical evidence or rationale, " + "potential benefits and risks, and a brief summary of why it is considered promising. " + "Present the information in a clear, structured format suitable for medical professionals or researchers." + ) + + out = swarm.run(prompt) + print(out) + + +if __name__ == "__main__": + main() diff --git a/examples/multi_agent/hiearchical_swarm/hiearchical_swarm.py b/examples/multi_agent/hiearchical_swarm/sector_analysis_hiearchical_swarm.py similarity index 62% rename from examples/multi_agent/hiearchical_swarm/hiearchical_swarm.py rename to examples/multi_agent/hiearchical_swarm/sector_analysis_hiearchical_swarm.py index ee4d1d60..7312d90b 100644 --- a/examples/multi_agent/hiearchical_swarm/hiearchical_swarm.py +++ b/examples/multi_agent/hiearchical_swarm/sector_analysis_hiearchical_swarm.py @@ -1,5 +1,4 @@ -from swarms import Agent -from swarms.structs.hiearchical_swarm import HierarchicalSwarm +from swarms import Agent, HierarchicalSwarm # Initialize agents for a $50B portfolio analysis @@ -9,24 +8,27 @@ agents = [ agent_description="Senior financial analyst at BlackRock.", system_prompt="You are a financial analyst tasked with optimizing asset allocations for a $50B portfolio. Provide clear, quantitative recommendations for each sector.", max_loops=1, - model_name="groq/deepseek-r1-distill-qwen-32b", + model_name="gpt-4.1", max_tokens=3000, + streaming_on=True, ), Agent( agent_name="Sector-Risk-Analyst", agent_description="Expert risk management analyst.", system_prompt="You are a risk analyst responsible for advising on risk allocation within a $50B portfolio. Provide detailed insights on risk exposures for each sector.", max_loops=1, - model_name="groq/deepseek-r1-distill-qwen-32b", + model_name="gpt-4.1", max_tokens=3000, + streaming_on=True, ), Agent( agent_name="Tech-Sector-Analyst", agent_description="Technology sector analyst.", system_prompt="You are a tech sector analyst focused on capital and risk allocations. Provide data-backed insights for the tech sector.", max_loops=1, - model_name="groq/deepseek-r1-distill-qwen-32b", + model_name="gpt-4.1", max_tokens=3000, + streaming_on=True, ), ] @@ -35,14 +37,19 @@ majority_voting = HierarchicalSwarm( name="Sector-Investment-Advisory-System", description="System for sector analysis and optimal allocations.", agents=agents, - # director=director_agent, - max_loops=1, + max_loops=2, output_type="dict", ) -# Run the analysis + result = majority_voting.run( - task="Evaluate market sectors and determine optimal allocation for a $50B portfolio. Include a detailed table of allocations, risk assessments, and a consolidated strategy." + task=( + "Simulate the allocation of a $50B fund specifically for the pharmaceutical sector. " + "Provide specific tickers (e.g., PFE, MRK, JNJ, LLY, BMY, etc.) and a clear rationale for why funds should be allocated to each company. " + "Present a table showing each ticker, company name, allocation percentage, and allocation amount in USD. " + "Include a brief summary of the overall allocation strategy and the reasoning behind the choices." + "Only call the Sector-Financial-Analyst agent to do the analysis. Nobody else should do the analysis." + ) ) print(result) diff --git a/simulations/senator_assembly/senator_simulation.py b/simulations/senator_assembly/senator_simulation.py index b03a7762..75a2ee61 100644 --- a/simulations/senator_assembly/senator_simulation.py +++ b/simulations/senator_assembly/senator_simulation.py @@ -6,11 +6,11 @@ each with detailed backgrounds, political positions, and comprehensive system pr that reflect their real-world characteristics, voting patterns, and policy priorities. """ +import random +from typing import Dict, List, Optional, Union + from swarms import Agent from swarms.structs.multi_agent_exec import run_agents_concurrently -from typing import Dict, List, Optional, Union -import json -import random class SenatorSimulation: @@ -3490,159 +3490,159 @@ class SenatorSimulation: } -# Example usage and demonstration -def main(): - """ - Demonstrate the Senate simulation with various scenarios. - """ - print("šŸ›ļø US Senate Simulation Initializing...") - - # Create the simulation - senate = SenatorSimulation() - - print("\nšŸ“Š Senate Composition:") - composition = senate.get_senate_composition() - print(json.dumps(composition, indent=2)) - - print(f"\nšŸŽ­ Available Senators ({len(senate.senators)}):") - for name in senate.senators.keys(): - party = senate._get_senator_party(name) - print(f" - {name} ({party})") - - # Example 1: Individual senator response - print("\nšŸ—£ļø Example: Senator Response") - senator = senate.get_senator("Katie Britt") - response = senator.run( - "What is your position on infrastructure spending and how would you pay for it?" - ) - print(f"Senator Katie Britt: {response}") - - # Example 2: Simulate a debate - print("\nšŸ’¬ Example: Senate Debate on Climate Change") - debate = senate.simulate_debate( - "Climate change legislation and carbon pricing", - [ - "Katie Britt", - "Mark Kelly", - "Lisa Murkowski", - "Alex Padilla", - ], - ) - - for entry in debate["transcript"]: - print(f"\n{entry['senator']} ({entry['party']}):") - print(f" {entry['position'][:200]}...") - - # Example 3: Simulate a vote - print("\nšŸ—³ļø Example: Senate Vote on Infrastructure Bill") - vote = senate.simulate_vote( - "A $1.2 trillion infrastructure bill including roads, bridges, broadband, and clean energy projects", - [ - "Katie Britt", - "Mark Kelly", - "Lisa Murkowski", - "Alex Padilla", - "Tom Cotton", - ], - ) - - print("Vote Results:") - for senator, vote_choice in vote["votes"].items(): - print(f" {senator}: {vote_choice}") - - print(f"\nFinal Result: {vote['results']['outcome']}") - print( - f"YEA: {vote['results']['yea']}, NAY: {vote['results']['nay']}, PRESENT: {vote['results']['present']}" - ) - - # Example 4: Committee hearing - print("\nšŸ›ļø Example: Committee Hearing") - hearing = senate.run_committee_hearing( - "Armed Services", - "Military readiness and defense spending", - ["Secretary of Defense", "Joint Chiefs Chairman"], - ) - - print("Armed Services Committee Hearing on Military Readiness") - for entry in hearing["transcript"][:3]: # Show first 3 entries - print( - f"\n{entry['type'].title()}: {entry['senator'] if 'senator' in entry else entry['witness']}" - ) - print(f" {entry['content'][:150]}...") - - # Example 5: Run all senators concurrently on a single task - print("\nšŸš€ Example: All Senators Concurrent Response") - all_senators_results = senate.run( - "What is your position on federal student loan forgiveness and how should we address the student debt crisis?" - ) - - print(f"\nTask: {all_senators_results['task']}") - print( - f"Selection Method: {all_senators_results['selection_method']}" - ) - print( - f"Total Participants: {all_senators_results['total_participants']}" - ) - - print("\nšŸ“Š Party Breakdown:") - for party, senators in all_senators_results[ - "party_breakdown" - ].items(): - if senators: - print(f"\n{party} ({len(senators)} senators):") - for senator_data in senators: - print(f" - {senator_data['senator']}") - - # Example 6: Run 50% of senators randomly - print("\nšŸŽ² Example: Random 50% of Senators") - random_results = senate.run( - "What is your position on climate change legislation and carbon pricing?", - participants=0.5, # 50% of all senators - ) - - print(f"\nTask: {random_results['task']}") - print(f"Selection Method: {random_results['selection_method']}") - print( - f"Total Participants: {random_results['total_participants']}" - ) - - print("\nšŸ“‹ Selected Senators:") - for senator in random_results["participants"]: - party = senate._get_senator_party(senator) - print(f" - {senator} ({party})") - - print("\nšŸ“Š Party Breakdown:") - for party, senators in random_results["party_breakdown"].items(): - if senators: - print(f"\n{party} ({len(senators)} senators):") - for senator_data in senators: - print(f" - {senator_data['senator']}") - - # Example 7: Run specific senators - print("\nšŸŽÆ Example: Specific Senators") - specific_results = senate.run( - "What is your position on military spending and defense policy?", - participants=[ - "Katie Britt", - "Mark Kelly", - "Lisa Murkowski", - "Alex Padilla", - "Tom Cotton", - ], - ) - - print(f"\nTask: {specific_results['task']}") - print(f"Selection Method: {specific_results['selection_method']}") - print( - f"Total Participants: {specific_results['total_participants']}" - ) - - print("\nšŸ“‹ Responses by Senator:") - for senator, response in specific_results["responses"].items(): - party = senate._get_senator_party(senator) - print(f"\n{senator} ({party}):") - print(f" {response[:200]}...") - - -if __name__ == "__main__": - main() +# # Example usage and demonstration +# def main(): +# """ +# Demonstrate the Senate simulation with various scenarios. +# """ +# print("šŸ›ļø US Senate Simulation Initializing...") + +# # Create the simulation +# senate = SenatorSimulation() + +# print("\nšŸ“Š Senate Composition:") +# composition = senate.get_senate_composition() +# print(json.dumps(composition, indent=2)) + +# print(f"\nšŸŽ­ Available Senators ({len(senate.senators)}):") +# for name in senate.senators.keys(): +# party = senate._get_senator_party(name) +# print(f" - {name} ({party})") + +# # Example 1: Individual senator response +# print("\nšŸ—£ļø Example: Senator Response") +# senator = senate.get_senator("Katie Britt") +# response = senator.run( +# "What is your position on infrastructure spending and how would you pay for it?" +# ) +# print(f"Senator Katie Britt: {response}") + +# # Example 2: Simulate a debate +# print("\nšŸ’¬ Example: Senate Debate on Climate Change") +# debate = senate.simulate_debate( +# "Climate change legislation and carbon pricing", +# [ +# "Katie Britt", +# "Mark Kelly", +# "Lisa Murkowski", +# "Alex Padilla", +# ], +# ) + +# for entry in debate["transcript"]: +# print(f"\n{entry['senator']} ({entry['party']}):") +# print(f" {entry['position'][:200]}...") + +# # Example 3: Simulate a vote +# print("\nšŸ—³ļø Example: Senate Vote on Infrastructure Bill") +# vote = senate.simulate_vote( +# "A $1.2 trillion infrastructure bill including roads, bridges, broadband, and clean energy projects", +# [ +# "Katie Britt", +# "Mark Kelly", +# "Lisa Murkowski", +# "Alex Padilla", +# "Tom Cotton", +# ], +# ) + +# print("Vote Results:") +# for senator, vote_choice in vote["votes"].items(): +# print(f" {senator}: {vote_choice}") + +# print(f"\nFinal Result: {vote['results']['outcome']}") +# print( +# f"YEA: {vote['results']['yea']}, NAY: {vote['results']['nay']}, PRESENT: {vote['results']['present']}" +# ) + +# # Example 4: Committee hearing +# print("\nšŸ›ļø Example: Committee Hearing") +# hearing = senate.run_committee_hearing( +# "Armed Services", +# "Military readiness and defense spending", +# ["Secretary of Defense", "Joint Chiefs Chairman"], +# ) + +# print("Armed Services Committee Hearing on Military Readiness") +# for entry in hearing["transcript"][:3]: # Show first 3 entries +# print( +# f"\n{entry['type'].title()}: {entry['senator'] if 'senator' in entry else entry['witness']}" +# ) +# print(f" {entry['content'][:150]}...") + +# # Example 5: Run all senators concurrently on a single task +# print("\nšŸš€ Example: All Senators Concurrent Response") +# all_senators_results = senate.run( +# "What is your position on federal student loan forgiveness and how should we address the student debt crisis?" +# ) + +# print(f"\nTask: {all_senators_results['task']}") +# print( +# f"Selection Method: {all_senators_results['selection_method']}" +# ) +# print( +# f"Total Participants: {all_senators_results['total_participants']}" +# ) + +# print("\nšŸ“Š Party Breakdown:") +# for party, senators in all_senators_results[ +# "party_breakdown" +# ].items(): +# if senators: +# print(f"\n{party} ({len(senators)} senators):") +# for senator_data in senators: +# print(f" - {senator_data['senator']}") + +# # Example 6: Run 50% of senators randomly +# print("\nšŸŽ² Example: Random 50% of Senators") +# random_results = senate.run( +# "What is your position on climate change legislation and carbon pricing?", +# participants=0.5, # 50% of all senators +# ) + +# print(f"\nTask: {random_results['task']}") +# print(f"Selection Method: {random_results['selection_method']}") +# print( +# f"Total Participants: {random_results['total_participants']}" +# ) + +# print("\nšŸ“‹ Selected Senators:") +# for senator in random_results["participants"]: +# party = senate._get_senator_party(senator) +# print(f" - {senator} ({party})") + +# print("\nšŸ“Š Party Breakdown:") +# for party, senators in random_results["party_breakdown"].items(): +# if senators: +# print(f"\n{party} ({len(senators)} senators):") +# for senator_data in senators: +# print(f" - {senator_data['senator']}") + +# # Example 7: Run specific senators +# print("\nšŸŽÆ Example: Specific Senators") +# specific_results = senate.run( +# "What is your position on military spending and defense policy?", +# participants=[ +# "Katie Britt", +# "Mark Kelly", +# "Lisa Murkowski", +# "Alex Padilla", +# "Tom Cotton", +# ], +# ) + +# print(f"\nTask: {specific_results['task']}") +# print(f"Selection Method: {specific_results['selection_method']}") +# print( +# f"Total Participants: {specific_results['total_participants']}" +# ) + +# print("\nšŸ“‹ Responses by Senator:") +# for senator, response in specific_results["responses"].items(): +# party = senate._get_senator_party(senator) +# print(f"\n{senator} ({party}):") +# print(f" {response[:200]}...") + + +# if __name__ == "__main__": +# main() diff --git a/swarms/structs/hiearchical_swarm.py b/swarms/structs/hiearchical_swarm.py index 2ff30a06..85a720cf 100644 --- a/swarms/structs/hiearchical_swarm.py +++ b/swarms/structs/hiearchical_swarm.py @@ -1,6 +1,10 @@ """ -Flow: +Hierarchical Swarm Implementation + +This module provides a hierarchical swarm architecture where a director agent coordinates +multiple worker agents to execute complex tasks through a structured workflow. +Flow: 1. User provides a task 2. Director creates a plan 3. Director distributes orders to agents individually or multiple tasks at once @@ -8,10 +12,21 @@ Flow: 5. Director evaluates results and issues new orders if needed (up to max_loops) 6. All context and conversation history is preserved throughout the process +Todo + +- Add layers of management -- a list of list of agents that act as departments +- Auto build agents from input prompt - and then add them to the swarm +- Create an interactive and dynamic UI like we did with heavy swarm +- Make it faster and more high performance + +Classes: + HierarchicalOrder: Represents a single task assignment to a specific agent + SwarmSpec: Contains the overall plan and list of orders for the swarm + HierarchicalSwarm: Main swarm orchestrator that manages director and worker agents """ import traceback -from typing import Any, Callable, List, Literal, Optional, Union +from typing import Any, Callable, List, Optional, Union from pydantic import BaseModel, Field @@ -19,7 +34,6 @@ from swarms.prompts.hiearchical_system_prompt import ( HIEARCHICAL_SWARM_SYSTEM_PROMPT, ) from swarms.structs.agent import Agent -from swarms.structs.base_swarm import BaseSwarm from swarms.structs.conversation import Conversation from swarms.structs.ma_utils import list_all_agents from swarms.tools.base_tool import BaseTool @@ -33,6 +47,20 @@ logger = initialize_logger(log_folder="hierarchical_swarm") class HierarchicalOrder(BaseModel): + """ + Represents a single task assignment within the hierarchical swarm. + + This class defines the structure for individual task orders that the director + distributes to worker agents. Each order specifies which agent should execute + what specific task. + + Attributes: + agent_name (str): The name of the agent assigned to execute the task. + Must match an existing agent in the swarm. + task (str): The specific task description to be executed by the assigned agent. + Should be clear and actionable. + """ + agent_name: str = Field( ..., description="Specifies the name of the agent to which the task is assigned. This is a crucial element in the hierarchical structure of the swarm, as it determines the specific agent responsible for the task execution.", @@ -44,6 +72,20 @@ class HierarchicalOrder(BaseModel): class SwarmSpec(BaseModel): + """ + Defines the complete specification for a hierarchical swarm execution. + + This class contains the overall plan and all individual orders that the director + creates to coordinate the swarm's activities. It serves as the structured output + format for the director agent. + + Attributes: + plan (str): A comprehensive plan outlining the sequence of actions and strategy + for the entire swarm to accomplish the given task. + orders (List[HierarchicalOrder]): A list of specific task assignments to + individual agents within the swarm. + """ + plan: str = Field( ..., description="Outlines the sequence of actions to be taken by the swarm. This plan is a detailed roadmap that guides the swarm's behavior and decision-making.", @@ -54,50 +96,34 @@ class SwarmSpec(BaseModel): ) -SwarmType = Literal[ - "AgentRearrange", - "MixtureOfAgents", - "SpreadSheetSwarm", - "SequentialWorkflow", - "ConcurrentWorkflow", - "GroupChat", - "MultiAgentRouter", - "AutoSwarmBuilder", - "HiearchicalSwarm", - "auto", - "MajorityVoting", - "MALT", - "DeepResearchSwarm", - "CouncilAsAJudge", - "InteractiveGroupChat", -] - - -class SwarmRouterCall(BaseModel): - goal: str = Field( - ..., - description="The goal of the swarm router call. This is the goal that the swarm router will use to determine the best swarm to use.", - ) - swarm_type: SwarmType = Field( - ..., - description="The type of swarm to use. This is the type of swarm that the swarm router will use to determine the best swarm to use.", - ) - - task: str = Field( - ..., - description="The task to be executed by the swarm router. This is the task that the swarm router will use to determine the best swarm to use.", - ) - - -class HierarchicalSwarm(BaseSwarm): +class HierarchicalSwarm: """ - _Representer a hierarchical swarm of agents, with a director that orchestrates tasks among the agents. - The workflow follows a hierarchical pattern: - 1. Task is received and sent to the director - 2. Director creates a plan and distributes orders to agents - 3. Agents execute tasks and report back to the director - 4. Director evaluates results and issues new orders if needed (up to max_loops) - 5. All context and conversation history is preserved throughout the process + A hierarchical swarm orchestrator that coordinates multiple agents through a director. + + This class implements a hierarchical architecture where a director agent creates + plans and distributes tasks to worker agents. The director can provide feedback + and iterate on results through multiple loops to achieve the desired outcome. + + The swarm maintains conversation history throughout the process, allowing for + context-aware decision making and iterative refinement of results. + + Attributes: + name (str): The name identifier for this swarm instance. + description (str): A description of the swarm's purpose and capabilities. + director (Optional[Union[Agent, Callable, Any]]): The director agent that + coordinates the swarm. + agents (List[Union[Agent, Callable, Any]]): List of worker agents available + for task execution. + max_loops (int): Maximum number of feedback loops the swarm can perform. + output_type (OutputType): Format for the final output of the swarm. + feedback_director_model_name (str): Model name for the feedback director. + director_name (str): Name identifier for the director agent. + director_model_name (str): Model name for the main director agent. + verbose (bool): Whether to enable detailed logging and progress tracking. + add_collaboration_prompt (bool): Whether to add collaboration prompts to agents. + planning_director_agent (Optional[Union[Agent, Callable, Any]]): Optional + planning agent. + director_feedback_on (bool): Whether director feedback is enabled. """ def __init__( @@ -121,22 +147,33 @@ class HierarchicalSwarm(BaseSwarm): **kwargs, ): """ - Initializes the HierarchicalSwarm with the given parameters. - - :param name: The name of the swarm. - :param description: A description of the swarm. - :param director: The director agent that orchestrates tasks. - :param agents: A list of agents within the swarm. - :param max_loops: The maximum number of feedback loops between the director and agents. - :param output_type: The format in which to return the output (dict, str, or list). - :param verbose: Enable detailed logging with loguru. + Initialize a new HierarchicalSwarm instance. + + Args: + name (str): The name identifier for this swarm instance. + description (str): A description of the swarm's purpose. + director (Optional[Union[Agent, Callable, Any]]): The director agent. + If None, a default director will be created. + agents (List[Union[Agent, Callable, Any]]): List of worker agents. + Must not be empty. + max_loops (int): Maximum number of feedback loops (must be > 0). + output_type (OutputType): Format for the final output. + feedback_director_model_name (str): Model name for feedback director. + director_name (str): Name identifier for the director agent. + director_model_name (str): Model name for the main director agent. + verbose (bool): Whether to enable detailed logging. + add_collaboration_prompt (bool): Whether to add collaboration prompts. + planning_director_agent (Optional[Union[Agent, Callable, Any]]): + Optional planning agent for enhanced planning capabilities. + director_feedback_on (bool): Whether director feedback is enabled. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Raises: + ValueError: If no agents are provided or max_loops is invalid. """ - super().__init__( - name=name, - description=description, - agents=agents, - ) self.name = name + self.description = description self.director = director self.agents = agents self.max_loops = max_loops @@ -155,33 +192,47 @@ class HierarchicalSwarm(BaseSwarm): def init_swarm(self): """ - Initializes the swarm. + Initialize the swarm with proper configuration and validation. + + This method performs the following initialization steps: + 1. Sets up logging if verbose mode is enabled + 2. Creates a conversation instance for history tracking + 3. Performs reliability checks on the configuration + 4. Adds agent context to the director + + Raises: + ValueError: If the swarm configuration is invalid. """ # Initialize logger only if verbose is enabled if self.verbose: logger.info( f"šŸš€ Initializing HierarchicalSwarm: {self.name}" ) - logger.info( - f"šŸ“Š Configuration - Max loops: {self.max_loops}" - ) self.conversation = Conversation(time_enabled=False) # Reliability checks self.reliability_checks() - self.director = self.setup_director() - self.add_context_to_director() if self.verbose: logger.success( - f"āœ… HierarchicalSwarm initialized successfully: Name {self.name}" + f"āœ… HierarchicalSwarm: {self.name} initialized successfully." ) def add_context_to_director(self): - """Add agent context to the director's conversation.""" + """ + Add agent context and collaboration information to the director's conversation. + + This method ensures that the director has complete information about all + available agents, their capabilities, and how they can collaborate. This + context is essential for the director to make informed decisions about + task distribution. + + Raises: + Exception: If adding context fails due to agent configuration issues. + """ try: if self.verbose: logger.info("šŸ“ Adding agent context to director") @@ -207,7 +258,18 @@ class HierarchicalSwarm(BaseSwarm): ) def setup_director(self): - """Set up the director agent with proper configuration.""" + """ + Set up the director agent with proper configuration and tools. + + Creates a new director agent with the SwarmSpec schema for structured + output, enabling it to create plans and distribute orders effectively. + + Returns: + Agent: A configured director agent ready to coordinate the swarm. + + Raises: + Exception: If director setup fails due to configuration issues. + """ try: if self.verbose: logger.info("šŸŽÆ Setting up director agent") @@ -217,17 +279,6 @@ class HierarchicalSwarm(BaseSwarm): if self.verbose: logger.debug(f"šŸ“‹ Director schema: {schema}") - # if self.director is not None: - # # if litellm_check_for_tools(self.director.model_name) is True: - # self.director.add_tool_schema([schema]) - - # if self.verbose: - # logger.success( - # "āœ… Director agent setup completed successfully" - # ) - - # return self.director - # else: return Agent( agent_name=self.director_name, agent_description="A director agent that can create a plan and distribute orders to agents", @@ -244,13 +295,20 @@ class HierarchicalSwarm(BaseSwarm): def reliability_checks(self): """ - Checks if there are any agents and a director set for the swarm. - Raises ValueError if either condition is not met. + Perform validation checks to ensure the swarm is properly configured. + + This method validates: + 1. That at least one agent is provided + 2. That max_loops is greater than 0 + 3. That a director is available (creates default if needed) + + Raises: + ValueError: If the swarm configuration is invalid. """ try: if self.verbose: logger.info( - f"šŸ” Running reliability checks for swarm: {self.name}" + f"Hiearchical Swarm: {self.name} Reliability checks in progress..." ) if not self.agents or len(self.agents) == 0: @@ -263,17 +321,12 @@ class HierarchicalSwarm(BaseSwarm): "Max loops must be greater than 0. Please set a valid number of loops." ) - if not self.director: - raise ValueError( - "Director not set for the swarm. A director agent is required to coordinate and orchestrate tasks among the agents." - ) + if self.director is None: + self.director = self.setup_director() if self.verbose: logger.success( - f"āœ… Reliability checks passed for swarm: {self.name}" - ) - logger.info( - f"šŸ“Š Swarm stats - Agents: {len(self.agents)}, Max loops: {self.max_loops}" + f"Hiearchical Swarm: {self.name} Reliability checks passed..." ) except Exception as e: @@ -286,11 +339,22 @@ class HierarchicalSwarm(BaseSwarm): img: str = None, ) -> SwarmSpec: """ - Runs a task through the director agent with the current conversation context. + Execute the director agent with the given task and conversation context. + + This method runs the director agent to create a plan and distribute orders + based on the current task and conversation history. If a planning director + agent is configured, it will first create a detailed plan before the main + director processes the task. + + Args: + task (str): The task to be executed by the director. + img (str, optional): Optional image input for the task. + + Returns: + SwarmSpec: The director's output containing the plan and orders. - :param task: The task to be executed by the director. - :param img: Optional image to be used with the task. - :return: The SwarmSpec containing the director's orders. + Raises: + Exception: If director execution fails. """ try: if self.verbose: @@ -330,7 +394,25 @@ class HierarchicalSwarm(BaseSwarm): def step(self, task: str, img: str = None, *args, **kwargs): """ - Runs a single step of the hierarchical swarm. + Execute a single step of the hierarchical swarm workflow. + + This method performs one complete iteration of the swarm's workflow: + 1. Run the director to create a plan and orders + 2. Parse the director's output to extract plan and orders + 3. Execute all orders by calling the appropriate agents + 4. Optionally generate director feedback on the results + + Args: + task (str): The task to be processed in this step. + img (str, optional): Optional image input for the task. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Returns: + Any: The results from this step, either agent outputs or director feedback. + + Raises: + Exception: If step execution fails. """ try: if self.verbose: @@ -370,13 +452,27 @@ class HierarchicalSwarm(BaseSwarm): def run(self, task: str, img: str = None, *args, **kwargs): """ - Executes the hierarchical swarm for a specified number of feedback loops. + Execute the hierarchical swarm for the specified number of feedback loops. + + This method orchestrates the complete swarm execution, performing multiple + iterations based on the max_loops configuration. Each iteration builds upon + the previous results, allowing for iterative refinement and improvement. - :param task: The initial task to be processed by the swarm. - :param img: Optional image input for the agents. - :param args: Additional positional arguments. - :param kwargs: Additional keyword arguments. - :return: The formatted conversation history as output. + The method maintains conversation history throughout all loops and provides + context from previous iterations to subsequent ones. + + Args: + task (str): The initial task to be processed by the swarm. + img (str, optional): Optional image input for the agents. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Returns: + Any: The formatted conversation history as output, formatted according + to the output_type configuration. + + Raises: + Exception: If swarm execution fails. """ try: current_loop = 0 @@ -448,7 +544,23 @@ class HierarchicalSwarm(BaseSwarm): logger.error(error_msg) def feedback_director(self, outputs: list): - """Provide feedback from the director based on agent outputs.""" + """ + Generate feedback from the director based on agent outputs. + + This method creates a feedback director agent that analyzes the results + from worker agents and provides specific, actionable feedback for improvement. + The feedback is added to the conversation history and can be used in + subsequent iterations. + + Args: + outputs (list): List of outputs from worker agents that need feedback. + + Returns: + str: The director's feedback on the agent outputs. + + Raises: + Exception: If feedback generation fails. + """ try: if self.verbose: logger.info("šŸ“ Generating director feedback") @@ -491,7 +603,24 @@ class HierarchicalSwarm(BaseSwarm): self, agent_name: str, task: str, *args, **kwargs ): """ - Calls a single agent with the given task. + Call a single agent by name to execute a specific task. + + This method locates an agent by name and executes the given task with + the current conversation context. The agent's output is added to the + conversation history for future reference. + + Args: + agent_name (str): The name of the agent to call. + task (str): The task to be executed by the agent. + *args: Additional positional arguments for the agent. + **kwargs: Additional keyword arguments for the agent. + + Returns: + Any: The output from the agent's execution. + + Raises: + ValueError: If the specified agent is not found in the swarm. + Exception: If agent execution fails. """ try: if self.verbose: @@ -537,7 +666,22 @@ class HierarchicalSwarm(BaseSwarm): def parse_orders(self, output): """ - Parses the orders from the director's output. + Parse the director's output to extract plan and orders. + + This method handles various output formats from the director agent and + extracts the plan and hierarchical orders. It supports both direct + dictionary formats and function call formats with JSON arguments. + + Args: + output: The raw output from the director agent. + + Returns: + tuple: A tuple containing (plan, orders) where plan is a string + and orders is a list of HierarchicalOrder objects. + + Raises: + ValueError: If the output format is unexpected or cannot be parsed. + Exception: If parsing fails due to other errors. """ try: if self.verbose: @@ -666,7 +810,20 @@ class HierarchicalSwarm(BaseSwarm): def execute_orders(self, orders: list): """ - Executes the orders from the director's output. + Execute all orders from the director's output. + + This method iterates through all hierarchical orders and calls the + appropriate agents to execute their assigned tasks. Each agent's + output is collected and returned as a list. + + Args: + orders (list): List of HierarchicalOrder objects to execute. + + Returns: + list: List of outputs from all executed orders. + + Raises: + Exception: If order execution fails. """ try: if self.verbose: @@ -699,7 +856,23 @@ class HierarchicalSwarm(BaseSwarm): self, tasks: List[str], img: str = None, *args, **kwargs ): """ - Executes the hierarchical swarm for a list of tasks. + Execute the hierarchical swarm for multiple tasks in sequence. + + This method processes a list of tasks sequentially, running the complete + swarm workflow for each task. Each task is processed independently with + its own conversation context and results. + + Args: + tasks (List[str]): List of tasks to be processed by the swarm. + img (str, optional): Optional image input for the tasks. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Returns: + list: List of results for each processed task. + + Raises: + Exception: If batched execution fails. """ try: if self.verbose: