[NEW UPDATE DOC FIXES]

pull/612/head^2
Your Name 3 months ago
parent fbf65094a6
commit 739416ad03

@ -51,3 +51,7 @@ swarm_architecture:
max_loops: 5
swarm_type: "ConcurrentWorkflow"
task: "How can we trademark concepts as a delaware C CORP for free"
autosave: true
return_json: false
# flow: "agent_name -> agent_name 2 -> agent_name 3" # not neccessary if not using agent rearrange

@ -0,0 +1,481 @@
# Building a Multi-Agent System for Real-Time Financial Analysis: A Comprehensive Tutorial
In this tutorial, we'll walk through the process of building a sophisticated multi-agent system for real-time financial analysis using the Swarms framework. This system is designed for financial analysts and developer analysts who want to leverage AI and multiple data sources to gain deeper insights into stock performance, market trends, and economic indicators.
Before we dive into the code, let's briefly introduce the Swarms framework. Swarms is an innovative open-source project that simplifies the creation and management of AI agents. It's particularly well-suited for complex tasks like financial analysis, where multiple specialized agents can work together to provide comprehensive insights.
For more information and to contribute to the project, visit the [Swarms GitHub repository](https://github.com/kyegomez/swarms). We highly recommend exploring the documentation for a deeper understanding of Swarms' capabilities.
Additional resources:
- [Swarms Discord](https://discord.com/servers/agora-999382051935506503) for community discussions
- [Swarms Twitter](https://x.com/swarms_corp) for updates
- [Swarms Spotify](https://open.spotify.com/show/2HLiswhmUaMdjHC8AUHcCF?si=c831ef10c5ef4994) for podcasts
- [Swarms Blog](https://medium.com/@kyeg) for in-depth articles
- [Swarms Website](https://swarms.xyz) for an overview of the project
Now, let's break down our financial analysis system step by step.
## Step 1: Setting Up the Environment
First install the necessary packages:
```bash
$ pip3 install -U swarms yfiance swarm_models fredapi pandas
```
First, we need to set up our environment and import the necessary libraries:
```python
import os
import time
from datetime import datetime, timedelta
import yfinance as yf
import requests
from fredapi import Fred
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
import logging
from dotenv import load_dotenv
import asyncio
import aiohttp
from ratelimit import limits, sleep_and_retry
# Load environment variables
load_dotenv()
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# API keys
POLYGON_API_KEY = os.getenv('POLYGON_API_KEY')
FRED_API_KEY = os.getenv('FRED_API_KEY')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
# Initialize FRED client
fred_client = Fred(api_key=FRED_API_KEY)
# Polygon API base URL
POLYGON_BASE_URL = "https://api.polygon.io"
```
This section sets up our environment, imports necessary libraries, and initializes our API keys and clients. We're using `dotenv` to securely manage our API keys, and we've set up logging to track the execution of our script.
## Step 2: Implementing Rate Limiting
To respect API rate limits, we implement rate limiting decorators:
```python
@sleep_and_retry
@limits(calls=5, period=60) # Adjust these values based on your Polygon API tier
async def call_polygon_api(session, endpoint, params=None):
url = f"{POLYGON_BASE_URL}{endpoint}"
params = params or {}
params['apiKey'] = POLYGON_API_KEY
async with session.get(url, params=params) as response:
response.raise_for_status()
return await response.json()
@sleep_and_retry
@limits(calls=120, period=60) # FRED allows 120 requests per minute
def call_fred_api(func, *args, **kwargs):
return func(*args, **kwargs)
```
These decorators ensure that we don't exceed the rate limits for our API calls. The `call_polygon_api` function is designed to work with asynchronous code, while `call_fred_api` is a wrapper for synchronous FRED API calls.
## Step 3: Implementing Data Fetching Functions
Next, we implement functions to fetch data from various sources:
### Yahoo Finance Integration
```python
async def get_yahoo_finance_data(session, ticker, period="1d", interval="1m"):
try:
stock = yf.Ticker(ticker)
hist = await asyncio.to_thread(stock.history, period=period, interval=interval)
info = await asyncio.to_thread(lambda: stock.info)
return hist, info
except Exception as e:
logger.error(f"Error fetching Yahoo Finance data for {ticker}: {e}")
return None, None
async def get_yahoo_finance_realtime(session, ticker):
try:
stock = yf.Ticker(ticker)
return await asyncio.to_thread(lambda: stock.fast_info)
except Exception as e:
logger.error(f"Error fetching Yahoo Finance realtime data for {ticker}: {e}")
return None
```
These functions fetch historical and real-time data from Yahoo Finance. We use `asyncio.to_thread` to run the synchronous `yfinance` functions in a separate thread, allowing our main event loop to continue running.
### Polygon.io Integration
```python
async def get_polygon_realtime_data(session, ticker):
try:
trades = await call_polygon_api(session, f"/v2/last/trade/{ticker}")
quotes = await call_polygon_api(session, f"/v2/last/nbbo/{ticker}")
return trades, quotes
except Exception as e:
logger.error(f"Error fetching Polygon.io realtime data for {ticker}: {e}")
return None, None
async def get_polygon_news(session, ticker, limit=10):
try:
news = await call_polygon_api(session, f"/v2/reference/news", params={"ticker": ticker, "limit": limit})
return news.get('results', [])
except Exception as e:
logger.error(f"Error fetching Polygon.io news for {ticker}: {e}")
return []
```
These functions fetch real-time trade and quote data, as well as news articles from Polygon.io. We use our `call_polygon_api` function to make these requests, ensuring we respect rate limits.
### FRED Integration
```python
async def get_fred_data(session, series_id, start_date, end_date):
try:
data = await asyncio.to_thread(call_fred_api, fred_client.get_series, series_id, start_date, end_date)
return data
except Exception as e:
logger.error(f"Error fetching FRED data for {series_id}: {e}")
return None
async def get_fred_realtime(session, series_ids):
try:
data = {}
for series_id in series_ids:
series = await asyncio.to_thread(call_fred_api, fred_client.get_series, series_id)
data[series_id] = series.iloc[-1] # Get the most recent value
return data
except Exception as e:
logger.error(f"Error fetching FRED realtime data: {e}")
return {}
```
These functions fetch historical and real-time economic data from FRED. Again, we use `asyncio.to_thread` to run the synchronous FRED API calls in a separate thread.
## Step 4: Creating Specialized Agents
Now we create our specialized agents using the Swarms framework:
```python
stock_agent = Agent(
agent_name="StockAgent",
system_prompt="""You are an expert stock analyst. Your task is to analyze real-time stock data and provide insights.
Consider price movements, trading volume, and any available company information.
Provide a concise summary of the stock's current status and any notable trends or events.""",
llm=OpenAIChat(api_key=OPENAI_API_KEY),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
)
market_agent = Agent(
agent_name="MarketAgent",
system_prompt="""You are a market analysis expert. Your task is to analyze overall market conditions using real-time data.
Consider major indices, sector performance, and market-wide trends.
Provide a concise summary of current market conditions and any significant developments.""",
llm=OpenAIChat(api_key=OPENAI_API_KEY),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
)
macro_agent = Agent(
agent_name="MacroAgent",
system_prompt="""You are a macroeconomic analysis expert. Your task is to analyze key economic indicators and provide insights on the overall economic situation.
Consider GDP growth, inflation rates, unemployment figures, and other relevant economic data.
Provide a concise summary of the current economic situation and any potential impacts on financial markets.""",
llm=OpenAIChat(api_key=OPENAI_API_KEY),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
)
news_agent = Agent(
agent_name="NewsAgent",
system_prompt="""You are a financial news analyst. Your task is to analyze recent news articles related to specific stocks or the overall market.
Consider the potential impact of news events on stock prices or market trends.
Provide a concise summary of key news items and their potential market implications.""",
llm=OpenAIChat(api_key=OPENAI_API_KEY),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
)
```
Each agent is specialized in a different aspect of financial analysis. The `system_prompt` for each agent defines its role and the type of analysis it should perform.
## Step 5: Building the Multi-Agent System
We then combine our specialized agents into a multi-agent system:
```python
agents = [stock_agent, market_agent, macro_agent, news_agent]
flow = "StockAgent -> MarketAgent -> MacroAgent -> NewsAgent"
agent_system = AgentRearrange(agents=agents, flow=flow)
```
The `flow` variable defines the order in which our agents will process information. This allows for a logical progression from specific stock analysis to broader market and economic analysis.
## Step 6: Implementing Real-Time Analysis
Now we implement our main analysis function:
```python
async def real_time_analysis(session, ticker):
logger.info(f"Starting real-time analysis for {ticker}")
# Fetch real-time data
yf_data, yf_info = await get_yahoo_finance_data(session, ticker)
yf_realtime = await get_yahoo_finance_realtime(session, ticker)
polygon_trades, polygon_quotes = await get_polygon_realtime_data(session, ticker)
polygon_news = await get_polygon_news(session, ticker)
fred_data = await get_fred_realtime(session, ['GDP', 'UNRATE', 'CPIAUCSL'])
# Prepare input for the multi-agent system
input_data = f"""
Yahoo Finance Data:
{yf_realtime}
Recent Stock History:
{yf_data.tail().to_string() if yf_data is not None else 'Data unavailable'}
Polygon.io Trade Data:
{polygon_trades}
Polygon.io Quote Data:
{polygon_quotes}
Recent News:
{polygon_news[:3] if polygon_news else 'No recent news available'}
Economic Indicators:
{fred_data}
Analyze this real-time financial data for {ticker}. Provide insights on the stock's performance, overall market conditions, relevant economic factors, and any significant news that might impact the stock or market.
"""
# Run the multi-agent analysis
try:
analysis = agent_system.run(input_data)
logger.info(f"Analysis completed for {ticker}")
return analysis
except Exception as e:
logger.error(f"Error during multi-agent analysis for {ticker}: {e}")
return f"Error during analysis: {e}"
```
This function fetches data from all our sources, prepares it as input for our multi-agent system, and then runs the analysis. The result is a comprehensive analysis of the stock, considering individual performance, market conditions, economic factors, and relevant news.
## Step 7: Implementing Advanced Use Cases
We then implement more advanced analysis functions:
### Compare Stocks
```python
async def compare_stocks(session, tickers):
results = {}
for ticker in tickers:
results[ticker] = await real_time_analysis(session, ticker)
comparison_prompt = f"""
Compare the following stocks based on the provided analyses:
{results}
Highlight key differences and similarities. Provide a ranking of these stocks based on their current performance and future prospects.
"""
try:
comparison = agent_system.run(comparison_prompt)
logger.info(f"Stock comparison completed for {tickers}")
return comparison
except Exception as e:
logger.error(f"Error during stock comparison: {e}")
return f"Error during comparison: {e}"
```
This function compares multiple stocks by running a real-time analysis on each and then prompting our multi-agent system to compare the results.
### Sector Analysis
```python
async def sector_analysis(session, sector):
sector_stocks = {
'Technology': ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA'],
'Finance': ['JPM', 'BAC', 'WFC', 'C', 'GS'],
'Healthcare': ['JNJ', 'UNH', 'PFE', 'ABT', 'MRK'],
'Consumer Goods': ['PG', 'KO', 'PEP', 'COST', 'WMT'],
'Energy': ['XOM', 'CVX', 'COP', 'SLB', 'EOG']
}
if sector not in sector_stocks:
return f"Sector '{sector}' not found. Available sectors: {', '.join(sector_stocks.keys())}"
stocks = sector_stocks[sector][:5]
sector_data = {}
for stock in stocks:
sector_data[stock] = await real_time_analysis(session, stock)
sector_prompt = f"""
Analyze the {sector} sector based on the following data from its top stocks:
{sector_data}
Provide insights on:
1. Overall sector performance
2. Key trends within the sector
3. Top performing stocks and why they're outperforming
4. Any challenges or opportunities facing the sector
"""
try:
analysis = agent_system.run(sector_prompt)
logger.info(f"Sector analysis completed for {sector}")
return analysis
except Exception as e:
logger.error(f"Error during sector analysis for {sector}: {e}")
return f"Error during sector analysis: {e}"
```
This function analyzes an entire sector by running real-time analysis on its top stocks and then prompting our multi-agent system to provide sector-wide insights.
### Economic Impact Analysis
```python
async def economic_impact_analysis(session, indicator, threshold):
# Fetch historical data for the indicator
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
indicator_data = await get_fred_data(session, indicator, start_date, end_date)
if indicator_data is None or len(indicator_data) < 2:
return f"Insufficient data for indicator {indicator}"
# Check if the latest value crosses the threshold
latest_value = indicator_data.iloc[-1]
previous_value = indicator_data.iloc[-2]
crossed_threshold = (latest_value > threshold and previous_value <= threshold) or (latest_value < threshold and previous_value >= threshold)
if crossed_threshold:
impact_prompt = f"""
The economic indicator {indicator} has crossed the threshold of {threshold}. Its current value is {latest_value}.
Historical data:
{indicator_data.tail().to_string()}
Analyze the potential impacts of this change on:
1. Overall economic conditions
2. Different market
2. Different market sectors
3. Specific types of stocks (e.g., growth vs. value)
4. Other economic indicators
Provide a comprehensive analysis of the potential consequences and any recommended actions for investors.
"""
try:
analysis = agent_system.run(impact_prompt)
logger.info(f"Economic impact analysis completed for {indicator}")
return analysis
except Exception as e:
logger.error(f"Error during economic impact analysis for {indicator}: {e}")
return f"Error during economic impact analysis: {e}"
else:
return f"The {indicator} indicator has not crossed the threshold of {threshold}. Current value: {latest_value}"
```
This function analyzes the potential impact of significant changes in economic indicators. It fetches historical data, checks if a threshold has been crossed, and if so, prompts our multi-agent system to provide a comprehensive analysis of the potential consequences.
## Step 8: Running the Analysis
Finally, we implement our main function to run all of our analyses:
```python
async def main():
async with aiohttp.ClientSession() as session:
# Example usage
analysis_result = await real_time_analysis(session, 'AAPL')
print("Single Stock Analysis:")
print(analysis_result)
comparison_result = await compare_stocks(session, ['AAPL', 'GOOGL', 'MSFT'])
print("\nStock Comparison:")
print(comparison_result)
tech_sector_analysis = await sector_analysis(session, 'Technology')
print("\nTechnology Sector Analysis:")
print(tech_sector_analysis)
gdp_impact = await economic_impact_analysis(session, 'GDP', 22000)
print("\nEconomic Impact Analysis:")
print(gdp_impact)
if __name__ == "__main__":
asyncio.run(main())
```
This `main` function demonstrates how to use all of our analysis functions. It runs a single stock analysis, compares multiple stocks, performs a sector analysis, and conducts an economic impact analysis.
## Conclusion and Next Steps
This tutorial has walked you through the process of building a sophisticated multi-agent system for real-time financial analysis using the Swarms framework. Here's a summary of what we've accomplished:
1. Set up our environment and API connections
2. Implemented rate limiting to respect API constraints
3. Created functions to fetch data from multiple sources (Yahoo Finance, Polygon.io, FRED)
4. Designed specialized AI agents for different aspects of financial analysis
5. Combined these agents into a multi-agent system
6. Implemented advanced analysis functions including stock comparison, sector analysis, and economic impact analysis
This system provides a powerful foundation for financial analysis, but there's always room for expansion and improvement. Here are some potential next steps:
1. **Expand data sources**: Consider integrating additional financial data providers for even more comprehensive analysis.
2. **Enhance agent specialization**: You could create more specialized agents, such as a technical analysis agent or a sentiment analysis agent for social media data.
3. **Implement a user interface**: Consider building a web interface or dashboard to make the system more user-friendly for non-technical analysts.
4. **Add visualization capabilities**: Integrate data visualization tools to help interpret complex financial data more easily.
5. **Implement a backtesting system**: Develop a system to evaluate your multi-agent system's performance on historical data.
6. **Explore advanced AI models**: The Swarms framework supports various AI models. Experiment with different models to see which performs best for your specific use case.
7. **Implement real-time monitoring**: Set up a system to continuously monitor markets and alert you to significant changes or opportunities.
Remember, the Swarms framework is a powerful and flexible tool that can be adapted to a wide range of complex tasks beyond just financial analysis. We encourage you to explore the [Swarms GitHub repository](https://github.com/kyegomez/swarms) for more examples and inspiration.
For more in-depth discussions and community support, consider joining the [Swarms Discord](https://discord.com/servers/agora-999382051935506503). You can also stay updated with the latest developments by following [Swarms on Twitter](https://x.com/swarms_corp).
If you're interested in learning more about AI and its applications in various fields, check out the [Swarms Spotify podcast](https://open.spotify.com/show/2HLiswhmUaMdjHC8AUHcCF?si=c831ef10c5ef4994) and the [Swarms Blog](https://medium.com/@kyeg) for insightful articles and discussions.
Lastly, don't forget to visit the [Swarms Website](https://swarms.xyz) for a comprehensive overview of the project and its capabilities.
By leveraging the power of multi-agent AI systems, you're well-equipped to navigate the complex world of financial markets. Happy analyzing!
## Swarm Resources:
* [Swarms Github](https://github.com/kyegomez/swarms)
* [Swarms Discord](https://discord.com/servers/agora-999382051935506503)
* [Swarms Twitter](https://x.com/swarms_corp)
* [Swarms Spotify](https://open.spotify.com/show/2HLiswhmUaMdjHC8AUHcCF?si=c831ef10c5ef4994)
* [Swarms Blog](https://medium.com/@kyeg)
* [Swarms Website](https://swarms.xyz)

@ -0,0 +1,751 @@
# Analyzing Financial Data with AI Agents using Swarms Framework
In the rapidly evolving landscape of quantitative finance, the integration of artificial intelligence with financial data analysis has become increasingly crucial. This blog post will explore how to leverage the power of AI agents, specifically using the Swarms framework, to analyze financial data from various top-tier data providers. We'll demonstrate how to connect these agents with different financial APIs, enabling sophisticated analysis and decision-making processes.
## Table of Contents
1. [Introduction to Swarms Framework](#introduction-to-swarms-framework)
2. [Setting Up the Environment](#setting-up-the-environment)
3. [Connecting AI Agents with Financial Data Providers](#connecting-ai-agents-with-financial-data-providers)
- [Polygon.io](#polygonio)
- [Alpha Vantage](#alpha-vantage)
- [Yahoo Finance](#yahoo-finance)
- [IEX Cloud](#iex-cloud)
- [Finnhub](#finnhub)
4. [Advanced Analysis Techniques](#advanced-analysis-techniques)
5. [Best Practices and Considerations](#best-practices-and-considerations)
6. [Conclusion](#conclusion)
## Introduction to Swarms Framework
The Swarms framework is a powerful tool for building and deploying AI agents that can interact with various data sources and perform complex analyses. In the context of financial data analysis, Swarms can be used to create intelligent agents that can process large volumes of financial data, identify patterns, and make data-driven decisions. Explore our github for examples, applications, and more.
## Setting Up the Environment
Before we dive into connecting AI agents with financial data providers, let's set up our environment:
1. Install the Swarms framework:
```bash
pip install -U swarms
```
2. Install additional required libraries:
```bash
pip install requests pandas numpy matplotlib
```
3. Set up your API keys for the various financial data providers. It's recommended to use environment variables or a secure configuration file to store these keys.
## Connecting AI Agents with Financial Data Providers
Now, let's explore how to connect AI agents using the Swarms framework with different financial data providers.
### Polygon.io
First, we'll create an AI agent that can fetch and analyze stock data from Polygon.io.
```python
import os
from swarms import Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import requests
import pandas as pd
load_dotenv()
# Polygon.io API setup
POLYGON_API_KEY = os.getenv("POLYGON_API_KEY")
POLYGON_BASE_URL = "https://api.polygon.io/v2"
# OpenAI API setup
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=OPENAI_API_KEY,
model_name="gpt-4",
temperature=0.1
)
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial analysis AI assistant. Your task is to analyze stock data and provide insights.",
llm=model,
max_loops=1,
dashboard=False,
verbose=True
)
def get_stock_data(symbol, from_date, to_date):
endpoint = f"{POLYGON_BASE_URL}/aggs/ticker/{symbol}/range/1/day/{from_date}/{to_date}"
params = {
'apiKey': POLYGON_API_KEY,
'adjusted': 'true'
}
response = requests.get(endpoint, params=params)
data = response.json()
return pd.DataFrame(data['results'])
# Example usage
symbol = "AAPL"
from_date = "2023-01-01"
to_date = "2023-12-31"
stock_data = get_stock_data(symbol, from_date, to_date)
analysis_request = f"""
Analyze the following stock data for {symbol} from {from_date} to {to_date}:
{stock_data.to_string()}
Provide insights on the stock's performance, including trends, volatility, and any notable events.
"""
analysis = agent.run(analysis_request)
print(analysis)
```
In this example, we've created an AI agent that can fetch stock data from Polygon.io and perform an analysis based on that data. The agent uses the GPT-4 model to generate insights about the stock's performance.
### Alpha Vantage
Next, let's create an agent that can work with Alpha Vantage data to perform fundamental analysis.
```python
import os
from swarms import Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import requests
load_dotenv()
# Alpha Vantage API setup
ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY")
ALPHA_VANTAGE_BASE_URL = "https://www.alphavantage.co/query"
# OpenAI API setup
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=OPENAI_API_KEY,
model_name="gpt-4",
temperature=0.1
)
# Initialize the agent
agent = Agent(
agent_name="Fundamental-Analysis-Agent",
system_prompt="You are a financial analysis AI assistant specializing in fundamental analysis. Your task is to analyze company financials and provide insights.",
llm=model,
max_loops=1,
dashboard=False,
verbose=True
)
def get_income_statement(symbol):
params = {
'function': 'INCOME_STATEMENT',
'symbol': symbol,
'apikey': ALPHA_VANTAGE_API_KEY
}
response = requests.get(ALPHA_VANTAGE_BASE_URL, params=params)
return response.json()
# Example usage
symbol = "MSFT"
income_statement = get_income_statement(symbol)
analysis_request = f"""
Analyze the following income statement data for {symbol}:
{income_statement}
Provide insights on the company's financial health, profitability trends, and any notable observations.
"""
analysis = agent.run(analysis_request)
print(analysis)
```
This example demonstrates an AI agent that can fetch income statement data from Alpha Vantage and perform a fundamental analysis of a company's financials.
### Yahoo Finance
Now, let's create an agent that can work with Yahoo Finance data to perform technical analysis.
```python
import os
from swarms import Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import yfinance as yf
import pandas as pd
load_dotenv()
# OpenAI API setup
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=OPENAI_API_KEY,
model_name="gpt-4",
temperature=0.1
)
# Initialize the agent
agent = Agent(
agent_name="Technical-Analysis-Agent",
system_prompt="You are a financial analysis AI assistant specializing in technical analysis. Your task is to analyze stock price data and provide insights on trends and potential trading signals.",
llm=model,
max_loops=1,
dashboard=False,
verbose=True
)
def get_stock_data(symbol, start_date, end_date):
stock = yf.Ticker(symbol)
data = stock.history(start=start_date, end=end_date)
return data
# Example usage
symbol = "GOOGL"
start_date = "2023-01-01"
end_date = "2023-12-31"
stock_data = get_stock_data(symbol, start_date, end_date)
# Calculate some technical indicators
stock_data['SMA_20'] = stock_data['Close'].rolling(window=20).mean()
stock_data['SMA_50'] = stock_data['Close'].rolling(window=50).mean()
analysis_request = f"""
Analyze the following stock price data and technical indicators for {symbol} from {start_date} to {end_date}:
{stock_data.tail(30).to_string()}
Provide insights on the stock's price trends, potential support and resistance levels, and any notable trading signals based on the moving averages.
"""
analysis = agent.run(analysis_request)
print(analysis)
```
This example shows an AI agent that can fetch stock price data from Yahoo Finance, calculate some basic technical indicators, and perform a technical analysis.
### IEX Cloud
Let's create an agent that can work with IEX Cloud data to analyze company news sentiment.
```python
import os
from swarms import Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import requests
load_dotenv()
# IEX Cloud API setup
IEX_CLOUD_API_KEY = os.getenv("IEX_CLOUD_API_KEY")
IEX_CLOUD_BASE_URL = "https://cloud.iexapis.com/stable"
# OpenAI API setup
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=OPENAI_API_KEY,
model_name="gpt-4",
temperature=0.1
)
# Initialize the agent
agent = Agent(
agent_name="News-Sentiment-Analysis-Agent",
system_prompt="You are a financial analysis AI assistant specializing in news sentiment analysis. Your task is to analyze company news and provide insights on the overall sentiment and potential impact on the stock.",
llm=model,
max_loops=1,
dashboard=False,
verbose=True
)
def get_company_news(symbol, last_n):
endpoint = f"{IEX_CLOUD_BASE_URL}/stock/{symbol}/news/last/{last_n}"
params = {'token': IEX_CLOUD_API_KEY}
response = requests.get(endpoint, params=params)
return response.json()
# Example usage
symbol = "TSLA"
last_n = 10
news_data = get_company_news(symbol, last_n)
analysis_request = f"""
Analyze the following recent news articles for {symbol}:
{news_data}
Provide insights on the overall sentiment of the news, potential impact on the stock price, and any notable trends or events mentioned.
"""
analysis = agent.run(analysis_request)
print(analysis)
```
This example demonstrates an AI agent that can fetch recent news data from IEX Cloud and perform a sentiment analysis on the company news.
### Finnhub
Finally, let's create an agent that can work with Finnhub data to analyze earnings estimates and recommendations.
```python
import os
from swarms import Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import finnhub
load_dotenv()
# Finnhub API setup
FINNHUB_API_KEY = os.getenv("FINNHUB_API_KEY")
finnhub_client = finnhub.Client(api_key=FINNHUB_API_KEY)
# OpenAI API setup
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=OPENAI_API_KEY,
model_name="gpt-4",
temperature=0.1
)
# Initialize the agent
agent = Agent(
agent_name="Earnings-Analysis-Agent",
system_prompt="You are a financial analysis AI assistant specializing in earnings analysis. Your task is to analyze earnings estimates and recommendations to provide insights on a company's financial outlook.",
llm=model,
max_loops=1,
dashboard=False,
verbose=True
)
def get_earnings_estimates(symbol):
return finnhub_client.earnings_calendar(symbol=symbol, from_date="2023-01-01", to_date="2023-12-31")
def get_recommendations(symbol):
return finnhub_client.recommendation_trends(symbol)
# Example usage
symbol = "NVDA"
earnings_estimates = get_earnings_estimates(symbol)
recommendations = get_recommendations(symbol)
analysis_request = f"""
Analyze the following earnings estimates and recommendations for {symbol}:
Earnings Estimates:
{earnings_estimates}
Recommendations:
{recommendations}
Provide insights on the company's expected financial performance, analyst sentiment, and any notable trends in the recommendations.
"""
analysis = agent.run(analysis_request)
print(analysis)
```
This example shows an AI agent that can fetch earnings estimates and analyst recommendations from Finnhub and perform an analysis on the company's financial outlook.
## Advanced Analysis Techniques
To further enhance the capabilities of our AI agents, we can implement more advanced analysis techniques:
1. Multi-source analysis: Combine data from multiple providers to get a more comprehensive view of a stock or market.
2. Time series forecasting: Implement machine learning models for price prediction.
3. Sentiment analysis of social media: Incorporate data from social media platforms to gauge market sentiment.
4. Portfolio optimization: Use AI agents to suggest optimal portfolio allocations based on risk tolerance and investment goals.
5. Anomaly detection: Implement algorithms to detect unusual patterns or events in financial data.
Here's an example of how we might implement a multi-source analysis:
```python
import os
from swarms import Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import yfinance as yf
import requests
import pandas as pd
load_dotenv()
# API setup
POLYGON_API_KEY = os.getenv("POLYGON_API_KEY")
ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=OPENAI_API_KEY,
model_name="gpt-4",
temperature=0.1
)
# Initialize the agent
agent = Agent(
agent_name="Multi-Source-Analysis-Agent",
system_prompt="You are a financial analysis AI assistant capable of analyzing data from multiple sources. Your task is to provide comprehensive insights on a stock based on various data points.",
llm=model,
max_loops=1,
dashboard=False,
verbose=True
)
def get_stock_data_yf(symbol, start_date, end_date):
stock = yf.Ticker(symbol)
return stock.history(start=start_date, end=end_date)
def get_stock_data_polygon(symbol, from_date, to_date):
endpoint = f"https://api.polygon.io/v2/aggs/ticker/{symbol}/range/1/day/{from_date}/{to_date}"
params = {'apiKey': POLYGON_API_KEY, 'adjusted': 'true'}
response = requests.get(endpoint, params=params)
data = response.json()
return pd.DataFrame(data['results'])
def get_company_overview_av(symbol):
params = {
'function': 'OVERVIEW',
'symbol': symbol,
'apikey': ALPHA_VANTAGE_API_KEY
}
response = requests.get("https://www.alphavantage.co/query", params=params)
return response.json()
# Example usage
symbol = "AAPL"
start_date = "2023-01-01"
end_date = "2023-12-31"
yf_data = get_stock_data_yf(symbol, start_date, end_date)
polygon_data = get_stock_data_polygon(symbol, start_date, end_date)
av_overview = get_company_overview_av(symbol)
analysis_request = f"""
Analyze the following data for {symbol} from {start_date} to {end_date}:
Yahoo Finance Data:
{yf_data.tail().to_string()}
Polygon.io Data:
{polygon_data.tail().to_string()}
Alpha Vantage Company Overview:
{av_overview}
Provide a comprehensive analysis of the stock, including:
1. Price trends and volatility
2. Trading volume analysis
3. Fundamental analysis based on the company overview
4. Any discrepancies between data sources and potential reasons
5. Overall outlook and potential risks/opportunities
"""
analysis = agent.run(analysis_request)
print(analysis)
```
This multi-source analysis example combines data from Yahoo Finance, Polygon.io, and Alpha Vantage to provide a more comprehensive view of a stock. The AI agent can then analyze this diverse set of data to provide deeper insights.
Now, let's explore some additional advanced analysis techniques:
### Time Series Forecasting
We can implement a simple time series forecasting model using the Prophet library and integrate it with our AI agent:
```python
import os
from swarms import Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import yfinance as yf
import pandas as pd
from prophet import Prophet
import matplotlib.pyplot as plt
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
model = OpenAIChat(
openai_api_key=OPENAI_API_KEY,
model_name="gpt-4",
temperature=0.1
)
agent = Agent(
agent_name="Time-Series-Forecast-Agent",
system_prompt="You are a financial analysis AI assistant specializing in time series forecasting. Your task is to analyze stock price predictions and provide insights.",
llm=model,
max_loops=1,
dashboard=False,
verbose=True
)
def get_stock_data(symbol, start_date, end_date):
stock = yf.Ticker(symbol)
data = stock.history(start=start_date, end=end_date)
return data
def forecast_stock_price(data, periods=30):
df = data.reset_index()[['Date', 'Close']]
df.columns = ['ds', 'y']
model = Prophet()
model.fit(df)
future = model.make_future_dataframe(periods=periods)
forecast = model.predict(future)
fig = model.plot(forecast)
plt.savefig('forecast_plot.png')
plt.close()
return forecast
# Example usage
symbol = "MSFT"
start_date = "2020-01-01"
end_date = "2023-12-31"
stock_data = get_stock_data(symbol, start_date, end_date)
forecast = forecast_stock_price(stock_data)
analysis_request = f"""
Analyze the following time series forecast for {symbol}:
Forecast Data:
{forecast.tail(30).to_string()}
The forecast plot has been saved as 'forecast_plot.png'.
Provide insights on:
1. The predicted trend for the stock price
2. Any seasonal patterns observed
3. Potential factors that might influence the forecast
4. Limitations of this forecasting method
5. Recommendations for investors based on this forecast
"""
analysis = agent.run(analysis_request)
print(analysis)
```
This example demonstrates how to integrate a time series forecasting model (Prophet) with our AI agent. The agent can then provide insights based on the forecasted data.
### Sentiment Analysis of Social Media
We can use a pre-trained sentiment analysis model to analyze tweets about a company and integrate this with our AI agent:
```python
import os
from swarms import Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import tweepy
from textblob import TextBlob
import pandas as pd
load_dotenv()
# Twitter API setup
TWITTER_API_KEY = os.getenv("TWITTER_API_KEY")
TWITTER_API_SECRET = os.getenv("TWITTER_API_SECRET")
TWITTER_ACCESS_TOKEN = os.getenv("TWITTER_ACCESS_TOKEN")
TWITTER_ACCESS_TOKEN_SECRET = os.getenv("TWITTER_ACCESS_TOKEN_SECRET")
auth = tweepy.OAuthHandler(TWITTER_API_KEY, TWITTER_API_SECRET)
auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
# OpenAI setup
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
model = OpenAIChat(
openai_api_key=OPENAI_API_KEY,
model_name="gpt-4",
temperature=0.1
)
agent = Agent(
agent_name="Social-Media-Sentiment-Agent",
system_prompt="You are a financial analysis AI assistant specializing in social media sentiment analysis. Your task is to analyze sentiment data from tweets and provide insights on market perception.",
llm=model,
max_loops=1,
dashboard=False,
verbose=True
)
def get_tweets(query, count=100):
tweets = api.search_tweets(q=query, count=count, tweet_mode="extended")
return [tweet.full_text for tweet in tweets]
def analyze_sentiment(tweets):
sentiments = [TextBlob(tweet).sentiment.polarity for tweet in tweets]
return pd.DataFrame({'tweet': tweets, 'sentiment': sentiments})
# Example usage
symbol = "TSLA"
query = f"${symbol} stock"
tweets = get_tweets(query)
sentiment_data = analyze_sentiment(tweets)
analysis_request = f"""
Analyze the following sentiment data for tweets about {symbol} stock:
Sentiment Summary:
Positive tweets: {sum(sentiment_data['sentiment'] > 0)}
Negative tweets: {sum(sentiment_data['sentiment'] < 0)}
Neutral tweets: {sum(sentiment_data['sentiment'] == 0)}
Average sentiment: {sentiment_data['sentiment'].mean()}
Sample tweets and their sentiments:
{sentiment_data.head(10).to_string()}
Provide insights on:
1. The overall sentiment towards the stock
2. Any notable trends or patterns in the sentiment
3. Potential reasons for the observed sentiment
4. How this sentiment might impact the stock price
5. Limitations of this sentiment analysis method
"""
analysis = agent.run(analysis_request)
print(analysis)
```
This example shows how to perform sentiment analysis on tweets about a stock and integrate the results with our AI agent for further analysis.
### Portfolio Optimization
We can use the PyPortfolioOpt library to perform portfolio optimization and have our AI agent provide insights:
```python
import os
from swarms import Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import yfinance as yf
import pandas as pd
import numpy as np
from pypfopt import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
model = OpenAIChat(
openai_api_key=OPENAI_API_KEY,
model_name="gpt-4",
temperature=0.1
)
agent = Agent(
agent_name="Portfolio-Optimization-Agent",
system_prompt="You are a financial analysis AI assistant specializing in portfolio optimization. Your task is to analyze optimized portfolio allocations and provide investment advice.",
llm=model,
max_loops=1,
dashboard=False,
verbose=True
)
def get_stock_data(symbols, start_date, end_date):
data = yf.download(symbols, start=start_date, end=end_date)['Adj Close']
return data
def optimize_portfolio(data):
mu = expected_returns.mean_historical_return(data)
S = risk_models.sample_cov(data)
ef = EfficientFrontier(mu, S)
weights = ef.max_sharpe()
cleaned_weights = ef.clean_weights()
return cleaned_weights
# Example usage
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "FB"]
start_date = "2018-01-01"
end_date = "2023-12-31"
stock_data = get_stock_data(symbols, start_date, end_date)
optimized_weights = optimize_portfolio(stock_data)
analysis_request = f"""
Analyze the following optimized portfolio allocation:
{pd.Series(optimized_weights).to_string()}
The optimization aimed to maximize the Sharpe ratio based on historical data from {start_date} to {end_date}.
Provide insights on:
1. The recommended allocation and its potential benefits
2. Any notable concentrations or diversification in the portfolio
3. Potential risks associated with this allocation
4. How this portfolio might perform in different market conditions
5. Recommendations for an investor considering this allocation
6. Limitations of this optimization method
"""
analysis = agent.run(analysis_request)
print(analysis)
```
This example demonstrates how to perform portfolio optimization using the PyPortfolioOpt library and have our AI agent provide insights on the optimized allocation.
## Best Practices and Considerations
When using AI agents for financial data analysis, consider the following best practices:
1. Data quality: Ensure that the data you're feeding into the agents is accurate and up-to-date.
2. Model limitations: Be aware of the limitations of both the financial models and the AI models being used.
3. Regulatory compliance: Ensure that your use of AI in financial analysis complies with relevant regulations.
4. Ethical considerations: Be mindful of potential biases in AI models and strive for fair and ethical analysis.
5. Continuous monitoring: Regularly evaluate the performance of your AI agents and update them as needed.
6. Human oversight: While AI agents can provide valuable insights, human judgment should always play a role in financial decision-making.
7. Privacy and security: Implement robust security measures to protect sensitive financial data.
## Conclusion
The integration of AI agents with financial data APIs opens up exciting possibilities for advanced financial analysis. By leveraging the power of the Swarms framework and connecting it with various financial data providers, analysts and quants can gain deeper insights, automate complex analyses, and potentially make more informed investment decisions.
However, it's crucial to remember that while AI agents can process vast amounts of data and identify patterns that humans might miss, they should be used as tools to augment human decision-making rather than replace it entirely. The financial markets are complex systems influenced by numerous factors, many of which may not be captured in historical data or current models.
As the field of AI in finance continues to evolve, we can expect even more sophisticated analysis techniques and integrations. Staying updated with the latest developments in both AI and financial analysis will be key to leveraging these powerful tools effectively.

@ -1,68 +1,220 @@
# Building Agents from a YAML File
The `create_agents_from_yaml` function enables the dynamic creation and execution of agents based on configurations defined in a YAML file. This function is designed to support enterprise use-cases, offering flexibility, reliability, and scalability for various agent-based workflows.
The `create_agents_from_yaml` function is designed to dynamically create agents and orchestrate swarms based on configurations defined in a YAML file. It is particularly suited for enterprise use-cases, offering scalability and reliability for agent-based workflows.
By allowing the user to define multiple agents and tasks in a YAML configuration file, this function streamlines the process of initializing and executing tasks through agents while supporting advanced features such as multi-agent orchestration, logging, error handling, and flexible return values.
### Key Features:
- **Multi-Agent Creation**: Automatically instantiate multiple agents from a YAML file.
- **Swarm Architecture**: Supports swarm architectures where agents collaborate to solve complex tasks.
- **Logging with Loguru**: Includes robust logging for tracking operations and diagnosing issues.
- **Flexible Return Types**: Offers several return types based on the requirements of the system.
- **Customizable**: Supports additional arguments (`*args` and `**kwargs`) for fine-tuning agent behavior.
- **Error Handling**: Handles missing configurations and invalid inputs with meaningful error messages.
---
# Key Features
- **Multi-Agent Creation**: Automatically create multiple agents based on a single YAML configuration file.
- **Task Execution**: Each agent can execute a predefined task if specified in the YAML configuration.
- **Logging with Loguru**: Integrated logging using `loguru` for robust, real-time tracking and error reporting.
- **Dynamic Return Types**: Offers flexible return values (agents, tasks, or both) based on user needs.
- **Error Handling**: Gracefully handles missing configurations, invalid inputs, and runtime errors.
- **Extensibility**: Supports additional positional (`*args`) and keyword arguments (`**kwargs`) to customize agent behavior.
### Parameters
| Parameter | Description | Type | Default Value | Example |
|--------------|---------------------------------------------------------------------------------------------------------------------------------------------------|-------------|---------------|-------------------------------------|
| `model` | A callable representing the model (LLM or other) that agents will use. | Callable | None | `OpenAIChat(model_name="gpt-4")` |
| `yaml_file` | Path to the YAML file containing agent configurations. | String | "agents.yaml" | `"config/agents.yaml"` |
| `return_type`| Determines the type of return object. Options: `"auto"`, `"swarm"`, `"agents"`, `"both"`, `"tasks"`, `"run_swarm"`. | String | "auto" | `"both"` |
| `*args` | Additional positional arguments for further customization (e.g., agent behavior). | List | N/A | N/A |
| `**kwargs` | Additional keyword arguments for customization (e.g., specific parameters passed to the agents or swarm). | Dict | N/A | N/A |
---
# Function Signature
### Return Types
| Return Type | Description |
|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------|
| `SwarmRouter` | Returns a `SwarmRouter` object, orchestrating the created agents, only if swarm architecture is defined in YAML. |
| `Agent` | Returns a single agent if only one is defined. |
| `List[Agent]` | Returns a list of agents if multiple are defined. |
| `Tuple` | If both agents and a swarm are present, returns both as a tuple (`SwarmRouter, List[Agent]`). |
| `List[Dict]` | Returns a list of task results if tasks were executed. |
| `None` | Returns nothing if an invalid return type is provided or an error occurs. |
---
### Detailed Return Types
| Return Type | Condition | Example Return Value |
|--------------------|---------------------------------------------------------------------|-----------------------------------------------|
| `"auto"` | Automatically determines the return based on YAML content. | `SwarmRouter` if swarm architecture is defined, otherwise `Agent` or `List[Agent]`. |
| `"swarm"` | Returns `SwarmRouter` if present; otherwise returns agents. | `<SwarmRouter>` |
| `"agents"` | Returns a list of agents (or a single agent if only one is defined).| `[<Agent>, <Agent>]` or `<Agent>` |
| `"both"` | Returns both `SwarmRouter` and agents in a tuple. | `(<SwarmRouter>, [<Agent>, <Agent>])` |
| `"tasks"` | Returns the task results, if tasks were executed by agents. | `[{'task': 'task_output'}, {'task2': 'output'}]` |
| `"run_swarm"` | Executes the swarm (if defined) and returns the result. | `'Swarm task output here'` |
---
### Example Use Cases
1. **Creating Multiple Agents for Financial Analysis**
```yaml
agents:
- agent_name: "Financial-Analysis-Agent"
system_prompt: "Analyze the best investment strategy for 2024."
max_loops: 1
autosave: true
verbose: false
context_length: 100000
output_type: "str"
task: "Analyze stock options for long-term gains."
- agent_name: "Risk-Analysis-Agent"
system_prompt: "Evaluate the risk of tech stocks in 2024."
max_loops: 2
autosave: false
verbose: true
context_length: 50000
output_type: "json"
task: "What are the riskiest stocks in the tech sector?"
```
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
# Model representing your LLM
def model(prompt):
return f"Processed: {prompt}"
# Create agents and return them as a list
agents = create_agents_from_yaml(model=model, yaml_file="agents.yaml", return_type="agents")
print(agents)
```
2. **Running a Swarm of Agents to Solve a Complex Task**
```yaml
agents:
- agent_name: "Legal-Agent"
system_prompt: "Provide legal advice on corporate structuring."
task: "How to incorporate a business as an LLC?"
swarm_architecture:
name: "Corporate-Swarm"
description: "A swarm for helping businesses with legal and tax advice."
swarm_type: "ConcurrentWorkflow"
task: "How can we optimize a business structure for maximum tax efficiency?"
max_loops: 3
```
```python
import os
from dotenv import load_dotenv
from loguru import logger
from swarm_models import OpenAIChat
from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
)
# Load environment variables
load_dotenv()
# Path to your YAML file
yaml_file = "agents_multi_agent.yaml"
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
try:
# Create agents and run tasks (using 'both' to return agents and task results)
task_results = create_agents_from_yaml(
model=model, yaml_file=yaml_file, return_type="run_swarm"
)
logger.info(f"Results from agents: {task_results}")
except Exception as e:
logger.error(f"An error occurred: {e}")
```
3. **Returning Both Agents and Tasks**
```yaml
agents:
- agent_name: "Market-Research-Agent"
system_prompt: "What are the latest trends in AI?"
task: "Provide a market analysis for AI technologies in 2024."
```
```python
def create_agents_from_yaml(yaml_file: str, return_type: str = "agents", *args, **kwargs)
from swarms.structs.agent import Agent
# Model representing your LLM
def model(prompt):
return f"Processed: {prompt}"
# Create agents and run tasks, return both agents and task results
swarm, agents = create_agents_from_yaml(model=model, yaml_file="agents.yaml", return_type="both")
print(swarm, agents)
```
### Parameters:
- **`yaml_file: str`**
- Description: The path to the YAML file containing agent configurations.
- Required: Yes
- Example: `'agents_config.yaml'`
- **`return_type: str`**
- Description: Determines the type of data the function should return.
- Options:
- `"agents"`: Returns a list of the created agents.
- `"tasks"`: Returns a list of task results (outputs or errors).
- `"both"`: Returns both the list of agents and the task results as a tuple.
- Default: `"agents"`
- Required: No
- **`*args` and `**kwargs`**:
- Description: Additional arguments to customize agent behavior. These can be passed through to the underlying `Agent` or `OpenAIChat` class constructors.
- Required: No
- Example: Can be used to modify model configurations or agent behavior dynamically.
### Returns:
- **Based on `return_type`:**
- `return_type="agents"`: Returns a list of initialized `Agent` objects.
- `return_type="tasks"`: Returns a list of task results (success or error).
- `return_type="both"`: Returns a tuple containing both the list of agents and task results.
---
---
# YAML Configuration Structure
### YAML Schema Overview:
Below is a breakdown of the attributes expected in the YAML configuration file, which governs how agents and swarms are created.
### YAML Attributes Table:
| Attribute Name | Description | Type | Required | Default/Example Value |
|-----------------------------------|------------------------------------------------------------|---------------|----------|------------------------------------------|
| `agents` | List of agents to be created. Each agent must have specific configurations. | List of dicts | Yes | |
| `agent_name` | The name of the agent. | String | Yes | `"Stock-Analysis-Agent"` |
| `system_prompt` | The system prompt that the agent will use. | String | Yes | `"Your full system prompt here"` |
| `max_loops` | Maximum number of iterations or loops for the agent. | Integer | No | 1 |
| `autosave` | Whether the agent should automatically save its state. | Boolean | No | `true` |
| `dashboard` | Whether to enable a dashboard for the agent. | Boolean | No | `false` |
| `verbose` | Whether to run the agent in verbose mode (for debugging). | Boolean | No | `false` |
| `dynamic_temperature_enabled` | Enable dynamic temperature adjustments during agent execution. | Boolean | No | `false` |
| `saved_state_path` | Path where the agent's state is saved for recovery. | String | No | `"path_to_save_state.json"` |
| `user_name` | Name of the user interacting with the agent. | String | No | `"default_user"` |
| `retry_attempts` | Number of times to retry an operation in case of failure. | Integer | No | 1 |
| `context_length` | Maximum context length for agent interactions. | Integer | No | 100000 |
| `return_step_meta` | Whether to return metadata for each step of the task. | Boolean | No | `false` |
| `output_type` | The type of output the agent will return (e.g., `str`, `json`). | String | No | `"str"` |
| `task` | Task to be executed by the agent (optional). | String | No | `"What is the best strategy for long-term stock investment?"` |
#### Swarm Architecture (Optional):
| Attribute Name | Description | Type | Required | Default/Example Value |
|-----------------------------------|------------------------------------------------------------|---------------|----------|------------------------------------------|
| `swarm_architecture` | Defines the swarm configuration. For more information on what can be added to the swarm architecture, please refer to the [Swarm Router documentation](https://docs.swarms.world/en/latest/swarms/structs/swarm_router/). | Dict | No | |
| `name` | The name of the swarm. | String | Yes | `"MySwarm"` |
| `description` | Description of the swarm and its purpose. | String | No | `"A swarm for collaborative task solving"`|
| `max_loops` | Maximum number of loops for the swarm. | Integer | No | 5 |
| `swarm_type` | The type of swarm (e.g., `ConcurrentWorkflow`) `SequentialWorkflow`. | String | Yes | `"ConcurrentWorkflow"` |
| `task` | The primary task assigned to the swarm. | String | No | `"How can we trademark concepts as a delaware C CORP for free?"` |
The function relies on a YAML file for defining agents and tasks. Below is an example YAML configuration:
---
### YAML Schema Example:
Below is an updated YAML schema that conforms to the function's expectations:
### Example YAML (agents_config.yaml):
```yaml
agents:
- agent_name: "Financial-Analysis-Agent"
# model:
# model_name: "gpt-4o-mini"
# temperature: 0.1
# max_tokens: 2000
system_prompt: "Your full system prompt here"
max_loops: 1
autosave: true
@ -75,13 +227,9 @@ agents:
context_length: 200000
return_step_meta: false
output_type: "str"
task: "How can I establish a ROTH IRA to buy stocks and get a tax break?"
# task: "How can I establish a ROTH IRA to buy stocks and get a tax break?" # Turn off if using swarm
- agent_name: "Stock-Analysis-Agent"
# model:
# model_name: "gpt-4o-mini"
# temperature: 0.2
# max_tokens: 1500
system_prompt: "Your full system prompt here"
max_loops: 2
autosave: true
@ -94,32 +242,37 @@ agents:
context_length: 150000
return_step_meta: true
output_type: "json"
task: "What is the best strategy for long-term stock investment?"
# task: "What is the best strategy for long-term stock investment?"
# Optional Swarm Configuration
swarm_architecture:
name: "MySwarm"
description: "A swarm for collaborative task solving"
max_loops: 5
swarm_type: "ConcurrentWorkflow"
task: "How can we trademark concepts as a delaware C CORP for free?" # Main task
```
---
# Enterprise Use Cases
### 1. **Automating Financial Analysis**
- An enterprise can use this function to create agents that analyze financial data in real-time. For example, an agent can be configured to provide financial advice based on the latest market trends, using predefined tasks in YAML to query the agent.
### 2. **Scalable Stock Analysis**
- Multiple stock analysis agents can be created, each tasked with analyzing specific stocks or investment strategies. This setup can help enterprises handle large-scale financial modeling and stock analysis without manual intervention.
### 3. **Task Scheduling and Execution**
- In enterprise operations, agents can be pre-configured with tasks such as risk assessment, regulatory compliance checks, or financial forecasting. The function automatically runs these tasks and returns actionable results or alerts.
# Diagram
```mermaid
graph TD;
A[Task] -->|Send to| B[Financial-Analysis-Agent]
A -->|Send to| C[Stock-Analysis-Agent]
```
---
### Full Code Example
### How to Use `create_agents_from_yaml` Function with YAML:
- You need to plug in your specific model until we can create a model router that can fetch any model and set specific settings
#### Example Code:
```python
import os
from dotenv import load_dotenv
from loguru import logger
from swarm_models import OpenAIChat # any model from swarm_models
from swarm_models import OpenAIChat
from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
@ -131,79 +284,37 @@ load_dotenv()
# Path to your YAML file
yaml_file = "agents.yaml"
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
api_key = os.getenv("GROQ_API_KEY")
# Create an instance of the OpenAIChat class
# Model
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
try:
# Create agents and run tasks (using 'both' to return agents and task results)
task_results = create_agents_from_yaml(
model=model, yaml_file=yaml_file, return_type="tasks"
model=model, yaml_file=yaml_file, return_type="run_swarm" #
)
logger.info(f"Results from agents: {task_results}")
except Exception as e:
logger.error(f"An error occurred: {e}")
```
---
# Error Handling
### Common Errors:
1. **Missing API Key**:
- Error: `API key is missing for agent: <agent_name>`
- Cause: The API key is either not provided in the YAML or not available as an environment variable.
- Solution: Ensure the API key is either defined in the YAML configuration or set as an environment variable.
2. **Missing System Prompt**:
- Error: `System prompt is missing for agent: <agent_name>`
- Cause: The `system_prompt` field is not defined in the YAML configuration.
- Solution: Define the system prompt field for each agent.
3. **Invalid `return_type`**:
- Error: `Invalid return_type: <return_type>`
- Cause: The `return_type` provided is not one of `"agents"`, `"tasks"`, or `"both"`.
- Solution: Ensure that `return_type` is set to one of the valid options.
### Logging:
- The function integrates `loguru` logging to track all key actions:
- File loading, agent creation, task execution, and errors are all logged.
- Use this logging output to monitor operations and diagnose issues in production environments.
---
# Scalability and Extensibility
### Scalability:
The `create_agents_from_yaml` function is designed for use in high-scale enterprise environments:
- **Multi-Agent Creation**: Create and manage large numbers of agents simultaneously.
- **Parallel Task Execution**: Tasks can be executed in parallel for real-time analysis and decision-making across multiple business units.
### Extensibility:
- **Customizable Behavior**: Through `*args` and `**kwargs`, users can extend the functionality of the agents or models without altering the core YAML configuration.
- **Seamless Integration**: The function can be easily integrated with larger multi-agent systems and workflows, enabling rapid scaling across departments.
---
# Security Considerations
For enterprise deployments, consider the following security best practices:
1. **API Key Management**: Ensure that API keys are stored securely (e.g., using environment variables or secret management tools).
2. **Data Handling**: Be mindful of sensitive information within tasks or agent responses. Implement data sanitization where necessary.
3. **Task Validation**: Validate tasks in the YAML file to ensure they meet your organization's security and operational policies before execution.
---
# Conclusion
### Error Handling:
The `create_agents_from_yaml` function is a powerful tool for enterprises to dynamically create, manage, and execute tasks using AI-powered agents. With its flexible configuration, logging, and error handling, this function is ideal for scaling agent-based systems and automating complex workflows across industries.
1. **FileNotFoundError**: If the specified YAML file does not exist.
2. **ValueError**: Raised if there are invalid or missing configurations in the YAML file.
3. **Invalid Return Type**: If an invalid return type is specified, the function will raise a `ValueError`.
Integrating this function into your enterprise workflow will enhance efficiency, provide real-time insights, and reduce operational overhead.
### Conclusion:
The `create_agents_from_yaml` function provides a flexible and powerful way to dynamically configure and execute agents, supporting a wide range of tasks and configurations for enterprise-level use cases. By following the YAML schema and function signature, users can easily define and manage their agents and swarms.

@ -0,0 +1,37 @@
import requests
from bs4 import BeautifulSoup
def scrape_blackrock_trades():
url = "https://arkhamintelligence.com/blackrock/trades"
response = requests.get(url)
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
# Example: Assuming trades are in a table
trades = []
table = soup.find('table', {'id': 'trades-table'})
if table:
for row in table.find_all('tr'):
columns = row.find_all('td')
if len(columns) > 0:
trade = {
'trade_date': columns[0].text.strip(),
'asset': columns[1].text.strip(),
'action': columns[2].text.strip(),
'quantity': columns[3].text.strip(),
'price': columns[4].text.strip(),
'total_value': columns[5].text.strip(),
}
trades.append(trade)
return trades
else:
print(f"Failed to fetch data. Status code: {response.status_code}")
return None
if __name__ == "__main__":
trades = scrape_blackrock_trades()
if trades:
for trade in trades:
print(trade)

@ -0,0 +1,140 @@
from typing import Dict, Any, Union
import requests
from loguru import logger
class AlphaVantageClient:
"""
Client to fetch commodities and economic indicators data from Alpha Vantage API.
"""
BASE_URL = "https://www.alphavantage.co/query"
def __init__(self, api_key: str) -> None:
"""
Initialize the AlphaVantageClient with an API key.
:param api_key: Your Alpha Vantage API key.
"""
self.api_key = api_key
def fetch_data(
self, function: str, symbol: str = None
) -> Union[str, Dict[str, Any]]:
"""
Fetches data from Alpha Vantage API and returns it as both string and dictionary.
:param function: Alpha Vantage function type (e.g., 'TIME_SERIES_DAILY', 'REAL_GDP').
:param symbol: Optional. The commodity/economic indicator symbol.
:return: The data as both a string and a dictionary.
"""
params = {
"apikey": self.api_key,
"function": function,
"symbol": symbol,
}
logger.info(
f"Fetching data for function '{function}' with symbol '{symbol}'"
)
try:
response = requests.get(self.BASE_URL, params=params)
response.raise_for_status()
data = response.json()
data_as_string = response.text
logger.success(
f"Successfully fetched data for {symbol if symbol else function}"
)
return data_as_string, data
except requests.RequestException as e:
logger.error(
f"Error while fetching data from Alpha Vantage: {e}"
)
return str(e), {}
def get_commodities_data(
self,
) -> Dict[str, Union[str, Dict[str, Any]]]:
"""
Fetches data for trending commodities such as Crude Oil, Natural Gas, and others.
:return: Dictionary with commodity names as keys and a tuple of (string data, dictionary data) as values.
"""
commodities = {
"Crude Oil (WTI)": "OIL_WTI",
"Crude Oil (Brent)": "OIL_BRENT",
"Natural Gas": "NATURAL_GAS",
"Copper": "COPPER",
"Aluminum": "ALUMINUM",
"Wheat": "WHEAT",
"Corn": "CORN",
"Cotton": "COTTON",
"Sugar": "SUGAR",
"Coffee": "COFFEE",
"Global Commodities Index": "COMMODITIES",
}
commodity_data = {}
for name, symbol in commodities.items():
data_str, data_dict = self.fetch_data(
function="TIME_SERIES_DAILY", symbol=symbol
)
commodity_data[name] = (data_str, data_dict)
return commodity_data
def get_economic_indicators(
self,
) -> Dict[str, Union[str, Dict[str, Any]]]:
"""
Fetches data for economic indicators such as Real GDP, Unemployment Rate, etc.
:return: Dictionary with indicator names as keys and a tuple of (string data, dictionary data) as values.
"""
indicators = {
"Real GDP": "REAL_GDP",
"Real GDP per Capita": "REAL_GDP_PER_CAPITA",
"Treasury Yield": "TREASURY_YIELD",
"Federal Funds Rate": "FEDERAL_FUNDS_RATE",
"CPI": "CPI",
"Inflation": "INFLATION",
"Retail Sales": "RETAIL_SALES",
"Durable Goods Orders": "DURABLE_GOODS",
"Unemployment Rate": "UNEMPLOYMENT",
"Nonfarm Payroll": "NONFARM_PAYROLL",
}
indicator_data = {}
for name, function in indicators.items():
data_str, data_dict = self.fetch_data(function=function)
indicator_data[name] = (data_str, data_dict)
return indicator_data
if __name__ == "__main__":
# Replace with your actual API key
API_KEY = "your_alpha_vantage_api_key"
av_client = AlphaVantageClient(api_key=API_KEY)
logger.info("Fetching commodities data...")
commodities_data = av_client.get_commodities_data()
logger.info("Fetching economic indicators data...")
economic_indicators_data = av_client.get_economic_indicators()
# Example of accessing the data
for name, (data_str, data_dict) in commodities_data.items():
logger.info(
f"{name}: {data_str}..."
) # Truncate the string for display
for name, (
data_str,
data_dict,
) in economic_indicators_data.items():
logger.info(
f"{name}: {data_str}..."
) # Truncate the string for display

@ -0,0 +1,333 @@
import concurrent.futures
import os
from datetime import datetime, timedelta
from typing import Any, Dict, List, Tuple
import requests
import yfinance as yf
from alpha_vantage.cryptocurrencies import CryptoCurrencies
from alpha_vantage.foreignexchange import ForeignExchange
from alpha_vantage.timeseries import TimeSeries
from loguru import logger
def fetch_yahoo_finance_data(tickers: List[str]) -> Dict[str, Any]:
try:
yf_data = yf.download(tickers, period="1d")["Close"]
return {
"S&P 500": yf_data["^GSPC"].iloc[-1],
"Dow Jones": yf_data["^DJI"].iloc[-1],
"NASDAQ": yf_data["^IXIC"].iloc[-1],
"Gold Price": yf_data["GC=F"].iloc[-1],
"Oil Price": yf_data["CL=F"].iloc[-1],
"10-Year Treasury Yield": yf_data["^TNX"].iloc[-1],
}
except Exception as e:
logger.error(f"Error fetching Yahoo Finance data: {str(e)}")
return {ticker: "N/A" for ticker in tickers}
def fetch_polygon_ticker_data(
api_key: str, ticker: str
) -> Dict[str, Any]:
url = f"https://api.polygon.io/v2/aggs/ticker/{ticker}/prev?apiKey={api_key}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
return {ticker: data["results"][0]["c"]}
except requests.RequestException as e:
logger.error(
f"Error fetching Polygon data for {ticker}: {str(e)}"
)
return {ticker: None}
def fetch_polygon_forex_data(
api_key: str, from_currency: str, to_currency: str
) -> Dict[str, Any]:
url = f"https://api.polygon.io/v2/aggs/ticker/C:{from_currency}{to_currency}/prev?apiKey={api_key}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
return {
f"{from_currency} to {to_currency}": data["results"][0][
"c"
]
}
except requests.RequestException as e:
logger.error(
f"Error fetching Polygon forex data for {from_currency}/{to_currency}: {str(e)}"
)
return {f"{from_currency} to {to_currency}": None}
def fetch_polygon_economic_data(
api_key: str, indicator: str
) -> Dict[str, Any]:
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=30)).strftime(
"%Y-%m-%d"
)
url = f"https://api.polygon.io/v2/aggs/ticker/{indicator}/range/1/day/{start_date}/{end_date}?apiKey={api_key}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
return {indicator: data["results"][-1]["c"]}
except requests.RequestException as e:
logger.error(
f"Error fetching Polygon economic data for {indicator}: {str(e)}"
)
return {indicator: None}
def fetch_polygon_data(api_key: str) -> Dict[str, Any]:
if not api_key:
logger.warning(
"Polygon API key not found. Skipping Polygon data."
)
return {}
result_dict = {}
# Define data to fetch
stock_tickers = ["SPY", "DIA", "QQQ", "GLD", "USO", "TLT"]
forex_pairs = [("USD", "EUR"), ("USD", "GBP"), ("USD", "JPY")]
economic_indicators = {
"I:CPI": "Consumer Price Index",
"I:GDPUSD": "US GDP",
"I:UNRATE": "US Unemployment Rate",
"I:INDPRO": "Industrial Production Index",
"I:HOUST": "Housing Starts",
"I:RSXFS": "Retail Sales",
"I:CPIUCSL": "Inflation Rate",
"I:FEDFUNDS": "Federal Funds Rate",
"I:GFDEBTN": "US National Debt",
"I:REALGDP": "Real GDP",
}
# Fetch stock data
for ticker in stock_tickers:
result_dict.update(fetch_polygon_ticker_data(api_key, ticker))
# Fetch forex data
for from_currency, to_currency in forex_pairs:
result_dict.update(
fetch_polygon_forex_data(
api_key, from_currency, to_currency
)
)
# Fetch economic indicator data
for indicator in economic_indicators:
result_dict.update(
fetch_polygon_economic_data(api_key, indicator)
)
return result_dict
def fetch_exchange_rates() -> Dict[str, Any]:
exchange_url = "https://open.er-api.com/v6/latest/USD"
try:
response = requests.get(exchange_url)
response.raise_for_status()
data = response.json()
if data.get("rates"):
return {
"USD to EUR": data["rates"].get("EUR", "N/A"),
"USD to GBP": data["rates"].get("GBP", "N/A"),
"USD to JPY": data["rates"].get("JPY", "N/A"),
}
else:
logger.error("Exchange rate data structure unexpected")
return {
"USD to EUR": "N/A",
"USD to GBP": "N/A",
"USD to JPY": "N/A",
}
except requests.RequestException as e:
logger.error(f"Error fetching exchange rate data: {str(e)}")
return {
"USD to EUR": "N/A",
"USD to GBP": "N/A",
"USD to JPY": "N/A",
}
def fetch_world_bank_data(
indicator: Tuple[str, str]
) -> Dict[str, Any]:
indicator_name, indicator_code = indicator
wb_url = f"http://api.worldbank.org/v2/indicator/{indicator_code}?date=2021:2022&format=json"
try:
response = requests.get(wb_url)
response.raise_for_status()
data = response.json()
if (
isinstance(data, list)
and len(data) > 1
and len(data[1]) > 0
):
return {indicator_name: data[1][0].get("value", "N/A")}
else:
logger.error(
f"Unexpected data structure for {indicator_name}"
)
return {indicator_name: "N/A"}
except requests.RequestException as e:
logger.error(
f"Error fetching {indicator_name} data: {str(e)}"
)
return {indicator_name: "N/A"}
def fetch_alpha_vantage_data(api_key: str) -> Dict[str, Any]:
if not api_key:
logger.warning(
"Alpha Vantage API key not found. Skipping Alpha Vantage data."
)
return {}
ts = TimeSeries(key=api_key, output_format="json")
fx = ForeignExchange(key=api_key)
cc = CryptoCurrencies(key=api_key)
result = {}
try:
data, _ = ts.get_daily("MSFT")
result["MSFT Daily Close"] = data["4. close"]
data, _ = fx.get_currency_exchange_rate(
from_currency="USD", to_currency="EUR"
)
result["USD to EUR (Alpha Vantage)"] = data[
"5. Exchange Rate"
]
data, _ = cc.get_digital_currency_daily(
symbol="BTC", market="USD"
)
result["BTC to USD"] = data["4b. close (USD)"]
except Exception as e:
logger.error(f"Error fetching Alpha Vantage data: {str(e)}")
return result
def fetch_macro_economic_data() -> Tuple[str, Dict[str, Any]]:
"""
Fetches comprehensive macro-economic data from various APIs using multithreading.
Returns:
Tuple[str, Dict[str, Any]]: A tuple containing:
- A formatted string with the macro-economic data
- A dictionary with the raw macro-economic data
"""
logger.info("Starting to fetch comprehensive macro-economic data")
result_dict: Dict[str, Any] = {}
# Define data fetching tasks
tasks = [
(
fetch_yahoo_finance_data,
(["^GSPC", "^DJI", "^IXIC", "GC=F", "CL=F", "^TNX"],),
),
(fetch_polygon_data, (os.environ.get("POLYGON_API_KEY"),)),
(fetch_exchange_rates, ()),
(
fetch_alpha_vantage_data,
(os.environ.get("ALPHA_VANTAGE_API_KEY"),),
),
]
# Execute tasks concurrently
with concurrent.futures.ThreadPoolExecutor(
max_workers=20
) as executor:
future_to_task = {
executor.submit(task, *args): task.__name__
for task, args in tasks
}
for future in concurrent.futures.as_completed(future_to_task):
task_name = future_to_task[future]
try:
data = future.result()
result_dict.update(data)
logger.success(
f"Successfully fetched data from {task_name}"
)
except Exception as e:
logger.error(
f"{task_name} generated an exception: {str(e)}"
)
# Create the formatted string output
# Update the output_string in fetch_macro_economic_data function
output_string = f"""
Macro-economic Data (as of {datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
-----------------------------------------------------------
Stock Market Indices:
S&P 500 (SPY): ${result_dict.get('SPY')}
Dow Jones (DIA): ${result_dict.get('DIA')}
NASDAQ (QQQ): ${result_dict.get('QQQ')}
Commodities:
Gold (GLD): ${result_dict.get('GLD')}
Oil (USO): ${result_dict.get('USO')}
Bonds:
20+ Year Treasury Bond (TLT): ${result_dict.get('TLT')}
Forex:
USD to EUR: {result_dict.get('USD to EUR')}
USD to GBP: {result_dict.get('USD to GBP')}
USD to JPY: {result_dict.get('USD to JPY')}
Economic Indicators:
Consumer Price Index: {result_dict.get('I:CPI')}
US GDP: ${result_dict.get('I:GDPUSD')} billion
US Unemployment Rate: {result_dict.get('I:UNRATE')}%
Industrial Production Index: {result_dict.get('I:INDPRO')}
Housing Starts: {result_dict.get('I:HOUST')} thousand
Retail Sales: ${result_dict.get('I:RSXFS')} billion
Inflation Rate: {result_dict.get('I:CPIUCSL')}%
Federal Funds Rate: {result_dict.get('I:FEDFUNDS')}%
US National Debt: ${result_dict.get('I:GFDEBTN')} billion
Real GDP: ${result_dict.get('I:REALGDP')} billion
Other Market Data:
S&P 500 (Yahoo): {result_dict.get('S&P 500', 'N/A')}
Dow Jones (Yahoo): {result_dict.get('Dow Jones', 'N/A')}
NASDAQ (Yahoo): {result_dict.get('NASDAQ', 'N/A')}
Gold Price (Yahoo): ${result_dict.get('Gold Price', 'N/A')}
Oil Price (Yahoo): ${result_dict.get('Oil Price', 'N/A')}
10-Year Treasury Yield (Yahoo): {result_dict.get('10-Year Treasury Yield', 'N/A')}%
MSFT Daily Close: {result_dict.get('MSFT Daily Close', 'N/A')}
BTC to USD: {result_dict.get('BTC to USD', 'N/A')}
Exchange Rates (Other Sources):
USD to EUR (Open Exchange Rates): {result_dict.get('USD to EUR', 'N/A')}
USD to GBP (Open Exchange Rates): {result_dict.get('USD to GBP', 'N/A')}
USD to JPY (Open Exchange Rates): {result_dict.get('USD to JPY', 'N/A')}
USD to EUR (Alpha Vantage): {result_dict.get('USD to EUR (Alpha Vantage)', 'N/A')}
"""
logger.info("Finished fetching comprehensive macro-economic data")
return output_string, result_dict
# Example usage
if __name__ == "__main__":
logger.add("macro_economic_data.log", rotation="500 MB")
try:
output_str, output_dict = fetch_macro_economic_data()
print(output_str)
print("Dictionary output:", output_dict)
except Exception as e:
logger.exception(f"An error occurred: {str(e)}")

@ -0,0 +1,114 @@
import requests
import pandas as pd
from datetime import datetime, timedelta
import os
from typing import Dict, Tuple, Any
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# You'll need to set these environment variables with your actual API keys
ALPHA_VANTAGE_API_KEY = os.getenv('ALPHA_VANTAGE_API_KEY')
WORLD_BANK_API_KEY = os.getenv('WORLD_BANK_API_KEY')
FRED_API_KEY = os.getenv('FRED_API_KEY')
def fetch_real_economic_data(country: str, start_date: datetime, end_date: datetime) -> Tuple[str, Dict[str, Any]]:
data = {}
def get_alpha_vantage_data(indicator: str) -> pd.Series:
try:
url = f"https://www.alphavantage.co/query?function={indicator}&interval=monthly&apikey={ALPHA_VANTAGE_API_KEY}"
response = requests.get(url)
response.raise_for_status()
df = pd.DataFrame(response.json()['Monthly Time Series']).T
df.index = pd.to_datetime(df.index)
df = df.sort_index()
return df['4. close'].astype(float)
except Exception as e:
logger.error(f"Error fetching Alpha Vantage data: {str(e)}")
return pd.Series()
def get_world_bank_data(indicator: str) -> pd.Series:
try:
url = f"http://api.worldbank.org/v2/country/{country}/indicator/{indicator}?format=json&date={start_date.year}:{end_date.year}&per_page=1000"
response = requests.get(url)
response.raise_for_status()
data = response.json()[1]
df = pd.DataFrame(data)
df['date'] = pd.to_datetime(df['date'], format='%Y')
df = df.set_index('date').sort_index()
return df['value'].astype(float)
except Exception as e:
logger.error(f"Error fetching World Bank data: {str(e)}")
return pd.Series()
def get_fred_data(series_id: str) -> pd.Series:
try:
url = f"https://api.stlouisfed.org/fred/series/observations?series_id={series_id}&api_key={FRED_API_KEY}&file_type=json"
response = requests.get(url)
response.raise_for_status()
df = pd.DataFrame(response.json()['observations'])
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date').sort_index()
return df['value'].astype(float)
except Exception as e:
logger.error(f"Error fetching FRED data: {str(e)}")
return pd.Series()
# Fetch data from different sources
data['GDP_growth_rate'] = get_world_bank_data('NY.GDP.MKTP.KD.ZG')
data['unemployment_rate'] = get_world_bank_data('SL.UEM.TOTL.ZS')
data['inflation_rate'] = get_world_bank_data('FP.CPI.TOTL.ZG')
data['debt_to_GDP_ratio'] = get_world_bank_data('GC.DOD.TOTL.GD.ZS')
data['current_account_balance'] = get_world_bank_data('BN.CAB.XOKA.CD')
data['yield_curve_slope'] = get_fred_data('T10Y2Y')
data['stock_market_index'] = get_alpha_vantage_data('TIME_SERIES_MONTHLY')
data['consumer_confidence_index'] = get_fred_data('CSCICP03USM665S')
data['business_confidence_index'] = get_fred_data('BSCICP03USM665S')
# Combine all data into a single DataFrame
df = pd.DataFrame(data)
df = df.loc[start_date:end_date]
if df.empty:
logger.warning("No data retrieved for the specified date range and country.")
return "No data available", {"country": country, "real_time_data": {}, "historical_data": {}}
# Prepare the dictionary output
output_dict = {
"country": country,
"real_time_data": df.iloc[-1].to_dict(),
"historical_data": df.to_dict()
}
# Create summary string
summary = f"Economic Data Summary for {country} (as of {end_date.strftime('%Y-%m-%d')}):\n"
for key, value in output_dict['real_time_data'].items():
if pd.notna(value):
summary += f"{key.replace('_', ' ').title()}: {value:.2f}\n"
else:
summary += f"{key.replace('_', ' ').title()}: Data not available\n"
return summary, output_dict
# Example usage
if __name__ == "__main__":
country = "US" # ISO country code
start_date = datetime(2020, 1, 1)
end_date = datetime.now()
summary, data = fetch_real_economic_data(country, start_date, end_date)
print(summary)
print("\nOutput Dictionary (truncated):")
print(f"Country: {data['country']}")
print("Real-time data:", data['real_time_data'])
print("Historical data: {First day, Last day}")
if data['historical_data']:
first_day = min(next(iter(data['historical_data'].values())).keys())
last_day = max(next(iter(data['historical_data'].values())).keys())
print(f" {first_day}:", {k: v[first_day] if first_day in v else 'N/A' for k, v in data['historical_data'].items()})
print(f" {last_day}:", {k: v[last_day] if last_day in v else 'N/A' for k, v in data['historical_data'].items()})
else:
print(" No historical data available.")

@ -1,205 +0,0 @@
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# model = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"))
company = "TGSC"
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize the Managing Director agent
managing_director = Agent(
agent_name="Managing-Director",
system_prompt=f"""
As the Managing Director at Blackstone, your role is to oversee the entire investment analysis process for potential acquisitions.
Your responsibilities include:
1. Setting the overall strategy and direction for the analysis
2. Coordinating the efforts of the various team members and ensuring a comprehensive evaluation
3. Reviewing the findings and recommendations from each team member
4. Making the final decision on whether to proceed with the acquisition
For the current potential acquisition of {company}, direct the tasks for the team to thoroughly analyze all aspects of the company, including its financials, industry position, technology, market potential, and regulatory compliance. Provide guidance and feedback as needed to ensure a rigorous and unbiased assessment.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="managing-director.json",
)
# Initialize the Vice President of Finance
vp_finance = Agent(
agent_name="VP-Finance",
system_prompt=f"""
As the Vice President of Finance at Blackstone, your role is to lead the financial analysis of potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a thorough review of {company}' financial statements, including income statements, balance sheets, and cash flow statements
2. Analyzing key financial metrics such as revenue growth, profitability margins, liquidity ratios, and debt levels
3. Assessing the company's historical financial performance and projecting future performance based on assumptions and market conditions
4. Identifying any financial risks or red flags that could impact the acquisition decision
5. Providing a detailed report on your findings and recommendations to the Managing Director
Be sure to consider factors such as the sustainability of {company}' business model, the strength of its customer base, and its ability to generate consistent cash flows. Your analysis should be data-driven, objective, and aligned with Blackstone's investment criteria.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="vp-finance.json",
)
# Initialize the Industry Analyst
industry_analyst = Agent(
agent_name="Industry-Analyst",
system_prompt=f"""
As the Industry Analyst at Blackstone, your role is to provide in-depth research and analysis on the industries and markets relevant to potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a comprehensive analysis of the industrial robotics and automation solutions industry, including market size, growth rates, key trends, and future prospects
2. Identifying the major players in the industry and assessing their market share, competitive strengths and weaknesses, and strategic positioning
3. Evaluating {company}' competitive position within the industry, including its market share, differentiation, and competitive advantages
4. Analyzing the key drivers and restraints for the industry, such as technological advancements, labor costs, regulatory changes, and economic conditions
5. Identifying potential risks and opportunities for {company} based on the industry analysis, such as disruptive technologies, emerging markets, or shifts in customer preferences
Your analysis should provide a clear and objective assessment of the attractiveness and future potential of the industrial robotics industry, as well as {company}' positioning within it. Consider both short-term and long-term factors, and provide evidence-based insights to inform the investment decision.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="industry-analyst.json",
)
# Initialize the Technology Expert
tech_expert = Agent(
agent_name="Tech-Expert",
system_prompt=f"""
As the Technology Expert at Blackstone, your role is to assess the technological capabilities, competitive advantages, and potential risks of companies being considered for acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Conducting a deep dive into {company}' proprietary technologies, including its robotics platforms, automation software, and AI capabilities
2. Assessing the uniqueness, scalability, and defensibility of {company}' technology stack and intellectual property
3. Comparing {company}' technologies to those of its competitors and identifying any key differentiators or technology gaps
4. Evaluating {company}' research and development capabilities, including its innovation pipeline, engineering talent, and R&D investments
5. Identifying any potential technology risks or disruptive threats that could impact {company}' long-term competitiveness, such as emerging technologies or expiring patents
Your analysis should provide a comprehensive assessment of {company}' technological strengths and weaknesses, as well as the sustainability of its competitive advantages. Consider both the current state of its technology and its future potential in light of industry trends and advancements.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="tech-expert.json",
)
# Initialize the Market Researcher
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt=f"""
As the Market Researcher at Blackstone, your role is to analyze the target company's customer base, market share, and growth potential to assess the commercial viability and attractiveness of the potential acquisition.
For the current potential acquisition of {company}, your tasks include:
1. Analyzing {company}' current customer base, including customer segmentation, concentration risk, and retention rates
2. Assessing {company}' market share within its target markets and identifying key factors driving its market position
3. Conducting a detailed market sizing and segmentation analysis for the industrial robotics and automation markets, including identifying high-growth segments and emerging opportunities
4. Evaluating the demand drivers and sales cycles for {company}' products and services, and identifying any potential risks or limitations to adoption
5. Developing financial projections and estimates for {company}' revenue growth potential based on the market analysis and assumptions around market share and penetration
Your analysis should provide a data-driven assessment of the market opportunity for {company} and the feasibility of achieving our investment return targets. Consider both bottom-up and top-down market perspectives, and identify any key sensitivities or assumptions in your projections.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="market-researcher.json",
)
# Initialize the Regulatory Specialist
regulatory_specialist = Agent(
agent_name="Regulatory-Specialist",
system_prompt=f"""
As the Regulatory Specialist at Blackstone, your role is to identify and assess any regulatory risks, compliance requirements, and potential legal liabilities associated with potential acquisitions.
For the current potential acquisition of {company}, your tasks include:
1. Identifying all relevant regulatory bodies and laws that govern the operations of {company}, including industry-specific regulations, labor laws, and environmental regulations
2. Reviewing {company}' current compliance policies, procedures, and track record to identify any potential gaps or areas of non-compliance
3. Assessing the potential impact of any pending or proposed changes to relevant regulations that could affect {company}' business or create additional compliance burdens
4. Evaluating the potential legal liabilities and risks associated with {company}' products, services, and operations, including product liability, intellectual property, and customer contracts
5. Providing recommendations on any regulatory or legal due diligence steps that should be taken as part of the acquisition process, as well as any post-acquisition integration considerations
Your analysis should provide a comprehensive assessment of the regulatory and legal landscape surrounding {company}, and identify any material risks or potential deal-breakers. Consider both the current state and future outlook, and provide practical recommendations to mitigate identified risks.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="regulatory-specialist.json",
)
# Create a list of agents
agents = [
managing_director,
vp_finance,
industry_analyst,
tech_expert,
market_researcher,
regulatory_specialist,
]
# Define multiple flow patterns
flows = [
"Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Managing-Director -> VP-Finance",
"Managing-Director -> VP-Finance -> Industry-Analyst -> Tech-Expert -> Market-Researcher -> Regulatory-Specialist",
"Tech-Expert -> Market-Researcher -> Regulatory-Specialist -> Industry-Analyst -> Managing-Director -> VP-Finance",
]
# Create instances of AgentRearrange for each flow pattern
blackstone_acquisition_analysis = AgentRearrange(
name="Blackstone-Acquisition-Analysis",
description="A system for analyzing potential acquisitions",
agents=agents,
flow=flows[0],
)
blackstone_investment_strategy = AgentRearrange(
name="Blackstone-Investment-Strategy",
description="A system for evaluating investment opportunities",
agents=agents,
flow=flows[1],
)
blackstone_market_analysis = AgentRearrange(
name="Blackstone-Market-Analysis",
description="A system for analyzing market trends and opportunities",
agents=agents,
flow=flows[2],
)
# Example of running each system
# output = blackstone_acquisition_analysis.run(
# f"Analyze the potential acquisition of {company}, a leading manufacturer of industrial robots and automation solutions."
# )
# print(output)

@ -1,5 +1,5 @@
from swarms.structs.swarm_arange import SwarmRearrange
from rearrange_example_blackstone import (
from blackstone_pe.rearrange_example_blackstone import (
blackstone_acquisition_analysis,
blackstone_investment_strategy,
blackstone_market_analysis,
@ -11,7 +11,7 @@ swarm_arrange = SwarmRearrange(
blackstone_investment_strategy,
blackstone_market_analysis,
],
flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}",
flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}, {blackstone_acquisition_analysis.name}",
)
print(

@ -122,6 +122,9 @@ def create_agents_from_yaml(
agents=agents,
swarm_type=swarm_config["swarm_type"],
task=swarm_config.get("task"),
flow=swarm_config.get("flow"),
autosave=swarm_config.get("autosave"),
return_json=swarm_config.get("return_json"),
*args,
**kwargs,
)

@ -52,6 +52,10 @@ from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.run_on_cpu import run_on_cpu
from clusterops import (
execute_on_gpu,
execute_with_cpu_cores,
)
# Utils
@ -664,7 +668,7 @@ class Agent:
########################## FUNCTION CALLING ##########################
@run_on_cpu
def run(
def _run(
self,
task: Optional[str] = None,
img: Optional[str] = None,
@ -2006,3 +2010,70 @@ class Agent:
elif self.return_history:
return self.short_memory.return_history_as_string()
def run(
self,
task: Optional[str] = None,
img: Optional[str] = None,
is_last: bool = False,
device: str = "cpu", # gpu
device_id: int = 0,
all_cores: bool = True,
*args,
**kwargs,
) -> Any:
"""
Executes the agent's run method on a specified device.
This method attempts to execute the agent's run method on a specified device, either CPU or GPU. It logs the device selection and the number of cores or GPU ID used. If the device is set to CPU, it can use all available cores or a specific core specified by `device_id`. If the device is set to GPU, it uses the GPU specified by `device_id`.
Args:
task (Optional[str], optional): The task to be executed. Defaults to None.
img (Optional[str], optional): The image to be processed. Defaults to None.
is_last (bool, optional): Indicates if this is the last task. Defaults to False.
device (str, optional): The device to use for execution. Defaults to "cpu".
device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True.
*args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method.
Returns:
Any: The result of the execution.
Raises:
ValueError: If an invalid device is specified.
Exception: If any other error occurs during execution.
"""
try:
logger.info(f"Attempting to run on device: {device}")
if device == "cpu":
logger.info("Device set to CPU")
if all_cores is True:
count = os.cpu_count()
logger.info(
f"Using all available CPU cores: {count}"
)
else:
count = device_id
logger.info(f"Using specific CPU core: {count}")
return execute_with_cpu_cores(
count, self._run, task, img, *args, **kwargs
)
# If device gpu
elif device == "gpu":
logger.info("Device set to GPU")
return execute_on_gpu(
device_id, self._run, task, img, *args, **kwargs
)
else:
raise ValueError(
f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'."
)
except ValueError as e:
logger.error(f"Invalid device specified: {e}")
raise e
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
raise e

@ -1,9 +1,10 @@
import uuid
from datetime import datetime
from typing import Any, Dict, List, Literal, Union
from typing import Any, Callable, Dict, List, Literal, Union
from loguru import logger
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.mixture_of_agents import MixtureOfAgents
@ -61,8 +62,11 @@ class SwarmRouter:
name: str = "swarm-router",
description: str = "Routes your task to the desired swarm",
max_loops: int = 1,
agents: List[Agent] = None,
swarm_type: SwarmType = None,
agents: List[Union[Agent, Callable]] = [],
swarm_type: SwarmType = "SequentialWorkflow",
autosave: bool = False,
flow: str = None,
return_json: bool = True,
*args,
**kwargs,
):
@ -93,6 +97,9 @@ class SwarmRouter:
self.max_loops = max_loops
self.agents = agents
self.swarm_type = swarm_type
self.autosave = autosave
self.flow = flow
self.return_json = return_json
self.swarm = self._create_swarm(*args, **kwargs)
self.logs = []
@ -105,6 +112,8 @@ class SwarmRouter:
AgentRearrange,
MixtureOfAgents,
SpreadSheetSwarm,
SequentialWorkflow,
ConcurrentWorkflow,
]:
"""
Create and return the specified swarm type.
@ -126,6 +135,8 @@ class SwarmRouter:
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
flow=self.flow,
return_json=self.return_json,
*args,
**kwargs,
)
@ -144,7 +155,8 @@ class SwarmRouter:
name=self.name,
description=self.description,
agents=self.agents,
max_loops=1,
max_loops=self.max_loops,
autosave_on=self.autosave,
*args,
**kwargs,
)
@ -163,6 +175,8 @@ class SwarmRouter:
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
auto_save=self.autosave,
return_str_on=self.return_json,
*args,
**kwargs,
)

Loading…
Cancel
Save