You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/examples/tools/mcp_examples/servers/okx_crypto_server.py

121 lines
3.8 KiB

from mcp.server.fastmcp import FastMCP
import requests
mcp = FastMCP("OKXCryptoPrice")
mcp.settings.port = 8001
@mcp.tool(
name="get_okx_crypto_price",
description="Get the current price and basic information for a given cryptocurrency from OKX exchange.",
)
def get_okx_crypto_price(symbol: str) -> str:
"""
Get the current price and basic information for a given cryptocurrency using OKX API.
Args:
symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT')
Returns:
str: A formatted string containing the cryptocurrency information
Example:
>>> get_okx_crypto_price('BTC-USDT')
'Current price of BTC/USDT: $45,000'
"""
try:
if not symbol:
return "Please provide a valid trading pair (e.g., 'BTC-USDT')"
# Convert to uppercase and ensure proper format
symbol = symbol.upper()
if not symbol.endswith("-USDT"):
symbol = f"{symbol}-USDT"
# OKX API endpoint for ticker information
url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}"
# Make the API request
response = requests.get(url)
response.raise_for_status()
data = response.json()
if data.get("code") != "0":
return f"Error: {data.get('msg', 'Unknown error')}"
ticker_data = data.get("data", [{}])[0]
if not ticker_data:
return f"Could not find data for {symbol}. Please check the trading pair."
price = float(ticker_data.get("last", 0))
float(ticker_data.get("last24h", 0))
change_percent = float(ticker_data.get("change24h", 0))
base_currency = symbol.split("-")[0]
return f"Current price of {base_currency}/USDT: ${price:,.2f}\n24h Change: {change_percent:.2f}%"
except requests.exceptions.RequestException as e:
return f"Error fetching OKX data: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool(
name="get_okx_crypto_volume",
description="Get the 24-hour trading volume for a given cryptocurrency from OKX exchange.",
)
def get_okx_crypto_volume(symbol: str) -> str:
"""
Get the 24-hour trading volume for a given cryptocurrency using OKX API.
Args:
symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT')
Returns:
str: A formatted string containing the trading volume information
Example:
>>> get_okx_crypto_volume('BTC-USDT')
'24h Trading Volume for BTC/USDT: $1,234,567'
"""
try:
if not symbol:
return "Please provide a valid trading pair (e.g., 'BTC-USDT')"
# Convert to uppercase and ensure proper format
symbol = symbol.upper()
if not symbol.endswith("-USDT"):
symbol = f"{symbol}-USDT"
# OKX API endpoint for ticker information
url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}"
# Make the API request
response = requests.get(url)
response.raise_for_status()
data = response.json()
if data.get("code") != "0":
return f"Error: {data.get('msg', 'Unknown error')}"
ticker_data = data.get("data", [{}])[0]
if not ticker_data:
return f"Could not find data for {symbol}. Please check the trading pair."
volume_24h = float(ticker_data.get("vol24h", 0))
base_currency = symbol.split("-")[0]
return f"24h Trading Volume for {base_currency}/USDT: ${volume_24h:,.2f}"
except requests.exceptions.RequestException as e:
return f"Error fetching OKX data: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
if __name__ == "__main__":
# Run the server on port 8000 (you can change this to any available port)
mcp.run(transport="sse")