fix: update MCP server and client integration based on FastMCP and Swarms guidelines

pull/819/head
Pavan Kumar 2 days ago committed by ascender1729
parent d71030859d
commit 2e2ebc40c8

@ -1,77 +1,25 @@
from fastmcp import FastMCP from fastmcp import FastMCP
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
# Initialize MCP server # Initialize MCP server
mcp = FastMCP("Math-Server") mcp = FastMCP("Math-Server")
# Add tool documentation and type hints
@mcp.tool() @mcp.tool()
def add(a: int, b: int) -> Dict[str, Any]: def add(a: int, b: int) -> int:
"""Add two numbers together """Add two numbers together"""
Args:
a (int): First number to add
b (int): Second number to add
Returns:
Dict[str, Any]: Result dictionary containing sum and metadata
"""
try: try:
result = a + b return a + b
return {
"status": "success",
"result": result,
"message": f"Successfully added {a} and {b}"
}
except Exception as e: except Exception as e:
return { return {"error": str(e)}
"status": "error",
"error": str(e),
"message": "Failed to perform addition"
}
@mcp.tool() @mcp.tool()
def multiply(a: int, b: int) -> Dict[str, Any]: def multiply(a: int, b: int) -> int:
"""Multiply two numbers together """Multiply two numbers together"""
Args:
a (int): First number to multiply
b (int): Second number to multiply
Returns:
Dict[str, Any]: Result dictionary containing product and metadata
"""
try: try:
result = a * b return a * b
return {
"status": "success",
"result": result,
"message": f"Successfully multiplied {a} and {b}"
}
except Exception as e: except Exception as e:
return { return {"error": str(e)}
"status": "error",
"error": str(e),
"message": "Failed to perform multiplication"
}
@mcp.tool()
def get_available_operations() -> Dict[str, Any]:
"""Get list of available mathematical operations
Returns:
Dict[str, Any]: Dictionary containing available operations and their descriptions
"""
return {
"status": "success",
"operations": {
"add": "Add two numbers together",
"multiply": "Multiply two numbers together"
}
}
if __name__ == "__main__": if __name__ == "__main__":
print("Starting Math Server...") print("Starting Math Server...")
print("Available operations:", get_available_operations()) mcp.run(transport="sse")
mcp.run(host="0.0.0.0", port=6274, transport="sse")

@ -1,66 +1,43 @@
from swarms import Agent from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
from swarms.tools.mcp_integration import MCPServerSseParams from swarms.tools.mcp_integration import MCPServerSseParams
import logging import logging
from typing import Dict, Any, Optional
def handle_mcp_response(response: Dict[str, Any]) -> str:
"""Handle MCP response and extract meaningful output"""
if response.get("status") == "error":
return f"Error: {response.get('message', 'Unknown error occurred')}"
return str(response.get("result", response))
def setup_mcp_agent(name: str, description: str) -> Agent: # Configure MCP server connection
"""Setup an MCP-enabled agent with proper configuration""" server = MCPServerSseParams(
try:
server = MCPServerSseParams(
url="http://0.0.0.0:6274", url="http://0.0.0.0:6274",
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
timeout=10.0, timeout=10.0,
sse_read_timeout=300.0 sse_read_timeout=300.0
) )
return Agent( # Initialize agent with MCP capabilities
agent_name=name, agent = Agent(
agent_description=description, agent_name="Math-Agent",
agent_description="Agent that performs math operations",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT, system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1, max_loops=1,
mcp_servers=[server], mcp_servers=[server],
streaming_on=True streaming_on=True
) )
except Exception as e:
logging.error(f"Failed to setup agent: {e}")
raise
def test_mcp_operations(): def test_mcp_operations():
"""Test basic MCP operations with error handling""" """Test basic MCP operations with error handling"""
try: try:
# Initialize agent
agent = setup_mcp_agent(
"Math-Agent",
"Agent that performs math operations"
)
# Get available operations
print("\nQuerying available operations...")
result = agent.run("What operations are available?")
print("Available operations:", result)
# Test addition # Test addition
print("\nTesting addition...") print("\nTesting addition...")
add_result = agent.run("Use the add tool to add 5 and 3") add_result = agent.run("Use the add tool to add 5 and 3")
print("Addition result:", handle_mcp_response(add_result)) print("Addition result:", add_result)
# Test multiplication # Test multiplication
print("\nTesting multiplication...") print("\nTesting multiplication...")
mult_result = agent.run("Use the multiply tool to multiply 4 and 6") mult_result = agent.run("Use the multiply tool to multiply 4 and 6")
print("Multiplication result:", handle_mcp_response(mult_result)) print("Multiplication result:", mult_result)
# Test error case # Test error case
print("\nTesting error handling...") print("\nTesting error handling...")
error_result = agent.run("Use the add tool with invalid inputs") error_result = agent.run("Use the add tool with invalid inputs")
print("Error handling result:", handle_mcp_response(error_result)) print("Error handling result:", error_result)
except Exception as e: except Exception as e:
logging.error(f"Test failed: {e}") logging.error(f"Test failed: {e}")

Loading…
Cancel
Save