From 9e62c126620b6088b012e16393290807012c9dcf Mon Sep 17 00:00:00 2001 From: Pavan Kumar <66913595+ascender1729@users.noreply.github.com> Date: Thu, 17 Apr 2025 17:50:35 +0000 Subject: [PATCH] test: rewrite MCP integration tests from user perspective --- tests/tools/test_mcp_integration.py | 191 ++++++++++++++++++---------- 1 file changed, 126 insertions(+), 65 deletions(-) diff --git a/tests/tools/test_mcp_integration.py b/tests/tools/test_mcp_integration.py index 012eb042..41db7ebe 100644 --- a/tests/tools/test_mcp_integration.py +++ b/tests/tools/test_mcp_integration.py @@ -1,75 +1,136 @@ import pytest -from swarms.tools.mcp_integration import MCPServerSseParams -from swarms import Agent -from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT +import asyncio +from swarms.tools.mcp_integration import ( + MCPServer, + MCPServerStdio, + MCPServerSse, + mcp_flow, + mcp_flow_get_tool_schema, + batch_mcp_flow +) -def test_interactive_multi_agent_mcp(): - # Configure two MCP servers - server_one = MCPServerSseParams( - url="http://0.0.0.0:6274", - headers={"Content-Type": "application/json"} - ) +# Test basic server connectivity +def test_server_connection(): + """ + Test that a user can connect to the MCP server successfully + """ + params = {"url": "http://localhost:8000"} + server = MCPServerSse(params, cache_tools_list=True) - server_two = MCPServerSseParams( - url="http://0.0.0.0:6275", - headers={"Content-Type": "application/json"} - ) - - # Create two agents with different roles - finance_agent = Agent( - agent_name="Finance-Agent", - agent_description="Financial analysis expert", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - max_loops=1, - mcp_servers=[server_one], - interactive=True, - streaming_on=True - ) + # Connect should work + asyncio.run(server.connect()) + assert server.session is not None + + # Cleanup should work + asyncio.run(server.cleanup()) + assert server.session is None - research_agent = Agent( - agent_name="Research-Agent", - agent_description="Market research specialist", - system_prompt="You are a market research specialist. Analyze market trends and provide insights.", - max_loops=1, - mcp_servers=[server_two], - interactive=True, - streaming_on=True - ) +# Test tool listing functionality +def test_list_tools(): + """ + Test that a user can retrieve available tools from the server + """ + params = {"url": "http://localhost:8000"} + server = MCPServerSse(params) + + asyncio.run(server.connect()) + tools = asyncio.run(server.list_tools()) + + assert isinstance(tools, list) + assert len(tools) > 0 + + asyncio.run(server.cleanup()) - try: - # Interactive loop - while True: - # Get user input for which agent to use - print("\nWhich agent would you like to interact with?") - print("1. Finance Agent") - print("2. Research Agent") - print("3. Exit") - - choice = input("Enter your choice (1-3): ") - - if choice == "3": - break - - # Get the task from user - task = input("\nEnter your task for the agent: ") - - # Route to appropriate agent - if choice == "1": - response = finance_agent.run(task) - print(f"\nFinance Agent Response:\n{response}") - elif choice == "2": - response = research_agent.run(task) - print(f"\nResearch Agent Response:\n{response}") - else: - print("Invalid choice, please try again") +# Test tool execution +def test_tool_execution(): + """ + Test that a user can execute a tool successfully + """ + params = {"url": "http://localhost:8000"} + function_call = { + "tool_name": "add", + "arguments": {"a": 5, "b": 3} + } + + result = mcp_flow(params, function_call) + assert result is not None - except Exception as e: - pytest.fail(f"Interactive multi-agent test failed: {e}") +# Test batch operations +def test_batch_execution(): + """ + Test that a user can execute multiple tools in batch + """ + params_list = [ + {"url": "http://localhost:8000"}, + {"url": "http://localhost:8000"} + ] + function_calls = [ + {"tool_name": "add", "arguments": {"a": 1, "b": 2}}, + {"tool_name": "subtract", "arguments": {"a": 5, "b": 3}} + ] + + results = batch_mcp_flow(params_list, function_calls) + assert len(results) == 2 + assert all(result is not None for result in results) -def test_mcp_invalid_params(): +# Test error handling +def test_error_handling(): + """ + Test that users receive proper error messages for invalid operations + """ + params = {"url": "http://localhost:8000"} + invalid_function = { + "tool_name": "nonexistent_tool", + "arguments": {} + } + with pytest.raises(Exception): - mcp_flow(None, {}) + mcp_flow(params, invalid_function) -if __name__ == "__main__": - test_interactive_multi_agent_mcp() +# Test tool schema retrieval +def test_get_tool_schema(): + """ + Test that users can retrieve tool schemas correctly + """ + params = {"url": "http://localhost:8000"} + schema = mcp_flow_get_tool_schema(params) + + assert isinstance(schema, dict) + assert "tools" in schema or "functions" in schema + +# Test server reconnection +def test_server_reconnection(): + """ + Test that users can reconnect to the server after disconnection + """ + params = {"url": "http://localhost:8000"} + server = MCPServerSse(params) + + # First connection + asyncio.run(server.connect()) + asyncio.run(server.cleanup()) + + # Second connection should work + asyncio.run(server.connect()) + assert server.session is not None + asyncio.run(server.cleanup()) + +# Test cache functionality +def test_cache_behavior(): + """ + Test that tool caching works as expected for users + """ + params = {"url": "http://localhost:8000"} + server = MCPServerSse(params, cache_tools_list=True) + + asyncio.run(server.connect()) + + # First call should cache + tools1 = asyncio.run(server.list_tools()) + # Second call should use cache + tools2 = asyncio.run(server.list_tools()) + + assert tools1 == tools2 + + asyncio.run(server.cleanup())