Assistant checkpoint: Update stock server to use FastMCP

Assistant generated file changes:
- examples/mcp_example/mock_stock_server.py: Update to use FastMCP

---

User prompt:

i see the severs use fast mcp insed of mcp but the agets use mcp howthis is working ?

Replit-Commit-Author: Assistant
Replit-Commit-Session-Id: fb95cfda-0201-499a-811b-4d56364a96ec
Pavan Kumar 3 months ago
parent d6bfd6c6d4
commit 63a86baf57

@ -1,33 +1,39 @@
from fastmcp import Server from fastmcp import FastMCP
from fastmcp.messages import Request, Response from typing import Dict, Union
# Create MCP server # Create FastMCP server
mcp = Server("Stock-Mock-Server") mcp = FastMCP("Stock-Mock-Server")
# Define stock price lookup handler @mcp.tool()
@mcp.handler("get_stock_price") def get_stock_price(symbol: str) -> Dict[str, Union[float, str]]:
async def get_stock_price(request: Request) -> Response: """Get the current price of a stock"""
# Mock stock price data prices = {
stock_prices = {
"AAPL": 150.0, "AAPL": 150.0,
"GOOGL": 2800.0, "GOOGL": 2800.0,
"MSFT": 300.0 "MSFT": 300.0,
"AMZN": 3300.0
} }
if symbol not in prices:
return {"error": f"Stock {symbol} not found"}
return {"price": prices[symbol]}
@mcp.tool()
def calculate_moving_average(prices: list[float], window: int) -> Dict[str, Union[list[float], str]]:
"""Calculate moving average of stock prices"""
if not isinstance(prices, list) or not all(isinstance(x, (int, float)) for x in prices):
return {"error": "Invalid price data"}
if not isinstance(window, int) or window <= 0:
return {"error": "Invalid window size"}
if len(prices) < window:
return {"error": "Not enough price points"}
symbol = request.data.get("symbol") avgs = []
if symbol in stock_prices: for i in range(len(prices) - window + 1):
return Response(data={"price": stock_prices[symbol]}) avg = sum(prices[i:i+window]) / window
else: avgs.append(round(avg, 2))
return Response(error=f"Stock {symbol} not found") return {"averages": avgs}
if __name__ == "__main__": if __name__ == "__main__":
print("Starting Mock Stock Server on port 8001...") print("Starting Mock Stock Server on port 8001...")
# Run server with SSE transport on specified host/port mcp.run(transport="sse", host="0.0.0.0", port=8001)
mcp.run(
transport="sse",
transport_kwargs={
"host": "0.0.0.0",
"port": 8001
}
)

Loading…
Cancel
Save