test: enhance MCP integration test with tool discovery and user guidance

pull/819/head
Pavan Kumar 2 days ago committed by ascender1729
parent 2e2ebc40c8
commit 10e056a05e

@ -41,3 +41,20 @@ mode = "sequential"
[[workflows.workflow.tasks]] [[workflows.workflow.tasks]]
task = "shell.exec" task = "shell.exec"
args = "python examples/mcp_example/test_integration.py" args = "python examples/mcp_example/test_integration.py"
[[workflows.workflow]]
name = "Run MCP Test"
author = 13983571
mode = "sequential"
[[workflows.workflow.tasks]]
task = "shell.exec"
args = "python examples/mcp_example/math_server.py & "
[[workflows.workflow.tasks]]
task = "shell.exec"
args = "sleep 2"
[[workflows.workflow.tasks]]
task = "shell.exec"
args = "python examples/mcp_example/test_integration.py"

@ -1,8 +1,10 @@
from swarms import Agent from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
from swarms.tools.mcp_integration import MCPServerSseParams from swarms.tools.mcp_integration import MCPServerSseParams
import logging import logging
def main():
# Configure MCP server connection # Configure MCP server connection
server = MCPServerSseParams( server = MCPServerSseParams(
url="http://0.0.0.0:6274", url="http://0.0.0.0:6274",
@ -21,23 +23,25 @@ agent = Agent(
streaming_on=True streaming_on=True
) )
def test_mcp_operations():
"""Test basic MCP operations with error handling"""
try: try:
# Test addition # First get available tools from server
print("\nTesting addition...") print("\nDiscovering available tools from MCP server...")
add_result = agent.run("Use the add tool to add 5 and 3") tools = agent.mcp_tool_handling()
print("Addition result:", add_result) print("\nAvailable tools:", tools)
while True:
# Get user input
user_input = input("\nEnter a math operation (or 'exit' to quit): ")
# Test multiplication if user_input.lower() == 'exit':
print("\nTesting multiplication...") break
mult_result = agent.run("Use the multiply tool to multiply 4 and 6")
print("Multiplication result:", mult_result)
# Test error case # Process user input through agent
print("\nTesting error handling...") try:
error_result = agent.run("Use the add tool with invalid inputs") result = agent.run(user_input)
print("Error handling result:", error_result) print("\nResult:", result)
except Exception as e:
print(f"Error processing request: {e}")
except Exception as e: except Exception as e:
logging.error(f"Test failed: {e}") logging.error(f"Test failed: {e}")
@ -45,4 +49,4 @@ def test_mcp_operations():
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
test_mcp_operations() main()

Loading…
Cancel
Save