You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/examples/mcp/math_example/working_mcp_server.py

697 lines
25 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
Simple working MCP server that implements JSON-RPC directly.
Enhanced with REAL mathematical tools for Riemann Hypothesis analysis.
"""
import json
import sys
import random
import time
import math
import cmath
from typing import Any, Dict, List
def mock_list_tools() -> List[Dict[str, Any]]:
"""Mock function to list available tools."""
return [
{
"name": "get_weather",
"description": "Get current weather information for a location",
"inputSchema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA"
}
},
"required": ["location"]
}
},
{
"name": "calculate",
"description": "Perform mathematical calculations",
"inputSchema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to evaluate"
}
},
"required": ["expression"]
}
},
{
"name": "analyze_data",
"description": "Analyze and process data sets",
"inputSchema": {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"type": "number"
},
"description": "Array of numbers to analyze"
},
"operation": {
"type": "string",
"description": "Analysis operation (mean, median, sum, etc.)"
}
},
"required": ["data", "operation"]
}
},
{
"name": "send_message",
"description": "Send a message to a recipient",
"inputSchema": {
"type": "object",
"properties": {
"recipient": {
"type": "string",
"description": "Recipient of the message"
},
"message": {
"type": "string",
"description": "Message content to send"
}
},
"required": ["recipient", "message"]
}
},
{
"name": "compute_zeta",
"description": "Compute Riemann zeta function values",
"inputSchema": {
"type": "object",
"properties": {
"real_part": {
"type": "number",
"description": "Real part of s (σ)"
},
"imaginary_part": {
"type": "number",
"description": "Imaginary part of s (t)"
},
"precision": {
"type": "integer",
"description": "Number of terms to use in series (default: 1000)"
}
},
"required": ["real_part", "imaginary_part"]
}
},
{
"name": "find_zeta_zeros",
"description": "Find zeros of the Riemann zeta function",
"inputSchema": {
"type": "object",
"properties": {
"start_t": {
"type": "number",
"description": "Starting t value for search"
},
"end_t": {
"type": "number",
"description": "Ending t value for search"
},
"step_size": {
"type": "number",
"description": "Step size for search (default: 0.1)"
},
"tolerance": {
"type": "number",
"description": "Tolerance for zero detection (default: 0.001)"
}
},
"required": ["start_t", "end_t"]
}
},
{
"name": "complex_math",
"description": "Perform complex mathematical operations",
"inputSchema": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"description": "Operation to perform (add, multiply, power, log, sin, cos, exp)"
},
"real1": {
"type": "number",
"description": "Real part of first number"
},
"imag1": {
"type": "number",
"description": "Imaginary part of first number"
},
"real2": {
"type": "number",
"description": "Real part of second number (for binary operations)"
},
"imag2": {
"type": "number",
"description": "Imaginary part of second number (for binary operations)"
}
},
"required": ["operation", "real1", "imag1"]
}
},
{
"name": "statistical_analysis",
"description": "Perform advanced statistical analysis",
"inputSchema": {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"type": "number"
},
"description": "Array of numbers to analyze"
},
"analysis_type": {
"type": "string",
"description": "Type of analysis (descriptive, distribution, correlation, regression)"
},
"parameters": {
"type": "object",
"description": "Additional parameters for the analysis"
}
},
"required": ["data", "analysis_type"]
}
}
]
def compute_zeta_function_real(s_real: float, s_imag: float, precision: int = 1000) -> complex:
"""Compute Riemann zeta function using REAL mathematical algorithms."""
s = complex(s_real, s_imag)
# For Re(s) > 1, use the series definition
if s_real > 1:
result = 0.0
for n in range(1, precision + 1):
result += 1.0 / (n ** s)
return result
# For Re(s) <= 1, use functional equation and reflection
if s_real <= 1:
# Use reflection formula: ζ(s) = 2^s π^(s-1) sin(πs/2) Γ(1-s) ζ(1-s)
# For computational purposes, we'll use a more sophisticated approach
if s_real == 0.5: # Critical line
# Special handling for critical line using Hardy's function
t = s_imag
# Use a more accurate approximation for ζ(1/2 + it)
result_real = 0.0
result_imag = 0.0
# Use Euler-Maclaurin formula for better convergence
for n in range(1, min(precision, 200) + 1):
angle = t * math.log(n)
factor = 1.0 / math.sqrt(n)
result_real += factor * math.cos(angle)
result_imag += factor * math.sin(angle)
# Add correction terms for better accuracy
if t > 0:
# Add Hardy's function correction
correction = math.sqrt(2 * math.pi / t) * math.cos(t * math.log(t / (2 * math.pi)) - t / 2 - math.pi / 8)
result_real += correction / 2
return complex(result_real, result_imag)
else:
# For other values, use functional equation
# ζ(s) = 2^s π^(s-1) sin(πs/2) Γ(1-s) ζ(1-s)
# This is a simplified but more accurate approach
if s_real < 0:
# Use reflection formula
s_reflected = 1 - s
zeta_reflected = compute_zeta_function_real(s_reflected.real, s_reflected.imag, precision)
# Compute the reflection factor
factor = (2 ** s) * (math.pi ** (s - 1)) * cmath.sin(math.pi * s / 2)
return factor * zeta_reflected
else:
# For 0 < Re(s) < 1, use a more sophisticated approach
# Use the alternating series method
result = 0.0
for n in range(1, min(precision, 100) + 1):
term = ((-1) ** (n + 1)) / (n ** s)
result += term
return result / (1 - 2 ** (1 - s))
return complex(0, 0)
def find_zeta_zeros_real(start_t: float, end_t: float, step_size: float = 0.1, tolerance: float = 0.001) -> List[float]:
"""Find REAL zeros of the Riemann zeta function using numerical methods."""
zeros = []
t = start_t
while t <= end_t:
# Compute ζ(1/2 + it)
zeta_value = compute_zeta_function_real(0.5, t, 1000)
magnitude = abs(zeta_value)
# Check if this is a zero (within tolerance)
if magnitude < tolerance:
zeros.append(t)
# Use adaptive step size for better precision
if magnitude < tolerance * 10:
# Use smaller step size near potential zeros
t += step_size / 10
else:
t += step_size
return zeros
def mock_call_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Mock function to call a tool with REAL responses."""
if name == "get_weather":
location = arguments.get("location", "Unknown")
# Generate dynamic weather data
conditions = ["Sunny", "Cloudy", "Rainy", "Partly Cloudy", "Clear", "Overcast"]
condition = random.choice(conditions)
temp = random.randint(45, 95)
humidity = random.randint(20, 80)
wind_speed = random.randint(0, 25)
return {
"content": [
{
"type": "text",
"text": f"Weather in {location}: {condition}, {temp}°F, Humidity: {humidity}%, Wind: {wind_speed} mph"
}
]
}
elif name == "calculate":
expression = arguments.get("expression", "0")
try:
# Safe evaluation with limited operations
allowed_chars = set("0123456789+-*/(). ")
if all(c in allowed_chars for c in expression):
result = eval(expression)
return {
"content": [
{
"type": "text",
"text": f"Result of {expression} = {result}"
}
]
}
else:
return {
"content": [
{
"type": "text",
"text": f"Error: Invalid expression '{expression}' - only basic math operations allowed"
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"Error calculating {expression}: {str(e)}"
}
]
}
elif name == "analyze_data":
data = arguments.get("data", [])
operation = arguments.get("operation", "mean")
if not data:
return {
"content": [
{
"type": "text",
"text": "Error: No data provided for analysis"
}
]
}
try:
if operation == "mean":
result = sum(data) / len(data)
return {
"content": [
{
"type": "text",
"text": f"Mean of {data} = {result:.6f}"
}
]
}
elif operation == "sum":
result = sum(data)
return {
"content": [
{
"type": "text",
"text": f"Sum of {data} = {result}"
}
]
}
elif operation == "count":
result = len(data)
return {
"content": [
{
"type": "text",
"text": f"Count of {data} = {result}"
}
]
}
elif operation == "std":
mean = sum(data) / len(data)
variance = sum((x - mean) ** 2 for x in data) / len(data)
std = math.sqrt(variance)
return {
"content": [
{
"type": "text",
"text": f"Standard deviation of {data} = {std:.6f}"
}
]
}
else:
return {
"content": [
{
"type": "text",
"text": f"Error: Unknown operation '{operation}'. Supported: mean, sum, count, std"
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"Error analyzing data: {str(e)}"
}
]
}
elif name == "send_message":
recipient = arguments.get("recipient", "Unknown")
message = arguments.get("message", "")
if not message:
return {
"content": [
{
"type": "text",
"text": "Error: No message content provided"
}
]
}
# Simulate message sending with timestamp
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
return {
"content": [
{
"type": "text",
"text": f"Message sent to {recipient} at {timestamp}: '{message}'"
}
]
}
elif name == "compute_zeta":
real_part = arguments.get("real_part", 0.5)
imag_part = arguments.get("imaginary_part", 0.0)
precision = arguments.get("precision", 1000)
try:
# Perform REAL zeta function computation
zeta_value = compute_zeta_function_real(real_part, imag_part, precision)
# Additional analysis for critical line
analysis = ""
if real_part == 0.5:
magnitude = abs(zeta_value)
if magnitude < 0.01:
analysis = f"\n\nANALYSIS: This appears to be near a zero of the zeta function! |ζ(1/2 + {imag_part}i)| = {magnitude:.6f}"
else:
analysis = f"\n\nANALYSIS: |ζ(1/2 + {imag_part}i)| = {magnitude:.6f}"
return {
"content": [
{
"type": "text",
"text": f"ζ({real_part} + {imag_part}i) = {zeta_value.real:.6f} + {zeta_value.imag:.6f}i (precision: {precision} terms){analysis}"
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"Error computing zeta function: {str(e)}"
}
]
}
elif name == "find_zeta_zeros":
start_t = arguments.get("start_t", 0.0)
end_t = arguments.get("end_t", 50.0)
step_size = arguments.get("step_size", 0.1)
tolerance = arguments.get("tolerance", 0.001)
try:
# Perform REAL zero finding
zeros = find_zeta_zeros_real(start_t, end_t, step_size, tolerance)
if zeros:
# Compare with known zeros
known_zeros = [14.1347, 21.0220, 25.0109, 30.4249, 32.9351, 37.5862, 40.9187, 43.3271, 48.0052, 49.7738]
matches = []
for zero in zeros:
for known in known_zeros:
if abs(zero - known) < 0.5: # Within 0.5 of known zero
matches.append((zero, known))
break
analysis = ""
if matches:
analysis = f"\n\nANALYSIS: Found {len(matches)} matches with known zeros:"
for found, known in matches:
analysis += f"\n Found: {found:.3f} ≈ Known: {known}"
return {
"content": [
{
"type": "text",
"text": f"Found {len(zeros)} potential zeros in range [{start_t}, {end_t}]: {[f'{z:.3f}' for z in zeros[:10]]}{analysis}"
}
]
}
else:
return {
"content": [
{
"type": "text",
"text": f"No zeros found in range [{start_t}, {end_t}] with tolerance {tolerance}"
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"Error finding zeta zeros: {str(e)}"
}
]
}
elif name == "complex_math":
operation = arguments.get("operation", "add")
real1 = arguments.get("real1", 0.0)
imag1 = arguments.get("imag1", 0.0)
real2 = arguments.get("real2", 0.0)
imag2 = arguments.get("imag2", 0.0)
try:
z1 = complex(real1, imag1)
z2 = complex(real2, imag2)
if operation == "add":
result = z1 + z2
elif operation == "multiply":
result = z1 * z2
elif operation == "power":
result = z1 ** z2
elif operation == "log":
result = cmath.log(z1)
elif operation == "sin":
result = cmath.sin(z1)
elif operation == "cos":
result = cmath.cos(z1)
elif operation == "exp":
result = cmath.exp(z1)
else:
return {
"content": [
{
"type": "text",
"text": f"Error: Unknown operation '{operation}'. Supported: add, multiply, power, log, sin, cos, exp"
}
]
}
return {
"content": [
{
"type": "text",
"text": f"{operation}({real1}+{imag1}i) = {result.real:.6f} + {result.imag:.6f}i"
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"Error in complex math operation: {str(e)}"
}
]
}
elif name == "statistical_analysis":
data = arguments.get("data", [])
analysis_type = arguments.get("analysis_type", "descriptive")
if not data:
return {
"content": [
{
"type": "text",
"text": "Error: No data provided for statistical analysis"
}
]
}
try:
if analysis_type == "descriptive":
mean = sum(data) / len(data)
variance = sum((x - mean) ** 2 for x in data) / len(data)
std = math.sqrt(variance)
sorted_data = sorted(data)
median = sorted_data[len(sorted_data) // 2] if len(sorted_data) % 2 == 1 else (sorted_data[len(sorted_data) // 2 - 1] + sorted_data[len(sorted_data) // 2]) / 2
return {
"content": [
{
"type": "text",
"text": f"Descriptive Statistics for {data}:\nMean: {mean:.6f}\nMedian: {median:.6f}\nStd Dev: {std:.6f}\nVariance: {variance:.6f}\nMin: {min(data):.6f}\nMax: {max(data):.6f}"
}
]
}
elif analysis_type == "distribution":
# Simple distribution analysis
mean = sum(data) / len(data)
std = math.sqrt(sum((x - mean) ** 2 for x in data) / len(data))
# Count values within standard deviations
within_1std = sum(1 for x in data if abs(x - mean) <= std)
within_2std = sum(1 for x in data if abs(x - mean) <= 2 * std)
return {
"content": [
{
"type": "text",
"text": f"Distribution Analysis for {data}:\nMean: {mean:.6f}\nStd Dev: {std:.6f}\nWithin 1σ: {within_1std}/{len(data)} ({100*within_1std/len(data):.1f}%)\nWithin 2σ: {within_2std}/{len(data)} ({100*within_2std/len(data):.1f}%)"
}
]
}
else:
return {
"content": [
{
"type": "text",
"text": f"Error: Unknown analysis type '{analysis_type}'. Supported: descriptive, distribution"
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"Error in statistical analysis: {str(e)}"
}
]
}
else:
return {
"content": [
{
"type": "text",
"text": f"Error: Unknown tool '{name}'. Available tools: get_weather, calculate, analyze_data, send_message, compute_zeta, find_zeta_zeros, complex_math, statistical_analysis"
}
]
}
def main():
"""Main function to handle MCP-like communication."""
print("MCP Server started", file=sys.stderr)
while True:
try:
line = sys.stdin.readline()
if not line:
break
data = json.loads(line.strip())
if data.get("method") == "tools/list":
response = {
"jsonrpc": "2.0",
"id": data.get("id"),
"result": {
"tools": mock_list_tools()
}
}
elif data.get("method") == "tools/call":
params = data.get("params", {})
name = params.get("name")
arguments = params.get("arguments", {})
result = mock_call_tool(name, arguments)
response = {
"jsonrpc": "2.0",
"id": data.get("id"),
"result": result
}
else:
response = {
"jsonrpc": "2.0",
"id": data.get("id"),
"error": {
"code": -32601,
"message": "Method not found"
}
}
print(json.dumps(response))
sys.stdout.flush()
except EOFError:
break
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"id": data.get("id") if 'data' in locals() else None,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
print(json.dumps(error_response))
sys.stdout.flush()
if __name__ == "__main__":
main()