import asyncio import traceback from datetime import datetime from loguru import logger # Import the functions to test (assuming they're in a module called mcp_client) # from mcp_client import * # Replace with actual import from swarms.tools.mcp_client_call import ( MCPConnectionError, MCPValidationError, _create_server_tool_mapping_async, _fetch_tools_for_server, _get_function_arguments, aget_mcp_tools, auto_detect_transport, connect_to_mcp_server, execute_multiple_tools_on_multiple_mcp_servers, execute_multiple_tools_on_multiple_mcp_servers_sync, execute_tool_call_simple, get_mcp_tools_sync, get_tools_for_multiple_mcp_servers, transform_mcp_tool_to_openai_tool, transform_openai_tool_call_request_to_mcp_tool_call_request, ) # Configure logging logger.add("test_results.log", rotation="10 MB", level="DEBUG") # Test configuration TEST_CONFIG = { "server_url": "http://localhost:8080/mcp", "transport": "streamable_http", "timeout": 10 } # Test results storage test_results = [] def log_test_result(test_name: str, status: str, message: str = "", error: str = ""): """Log test result and add to results list""" result = { "test_name": test_name, "status": status, "message": message, "error": error, "timestamp": datetime.now().isoformat() } test_results.append(result) if status == "PASS": logger.success(f"✓ {test_name}: {message}") elif status == "FAIL": logger.error(f"✗ {test_name}: {error}") else: logger.info(f"~ {test_name}: {message}") def test_transform_mcp_tool_to_openai_tool(): """Test MCP tool to OpenAI tool transformation""" test_name = "test_transform_mcp_tool_to_openai_tool" try: # Create mock MCP tool class MockMCPTool: def __init__(self, name, description, input_schema): self.name = name self.description = description self.inputSchema = input_schema mock_tool = MockMCPTool( name="test_function", description="Test function description", input_schema={"type": "object", "properties": {"param1": {"type": "string"}}} ) result = transform_mcp_tool_to_openai_tool(mock_tool) # Validate result structure assert result["type"] == "function" assert result["function"]["name"] == "test_function" assert result["function"]["description"] == "Test function description" assert result["function"]["parameters"]["type"] == "object" log_test_result(test_name, "PASS", "Successfully transformed MCP tool to OpenAI format") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to transform tool: {str(e)}") def test_get_function_arguments(): """Test function argument extraction""" test_name = "test_get_function_arguments" try: # Test with dict arguments function_def = {"arguments": {"param1": "value1", "param2": "value2"}} result = _get_function_arguments(function_def) assert isinstance(result, dict) assert result["param1"] == "value1" # Test with string arguments function_def_str = {"arguments": '{"param1": "value1", "param2": "value2"}'} result_str = _get_function_arguments(function_def_str) assert isinstance(result_str, dict) assert result_str["param1"] == "value1" # Test with empty arguments function_def_empty = {} result_empty = _get_function_arguments(function_def_empty) assert result_empty == {} log_test_result(test_name, "PASS", "Successfully extracted function arguments in all formats") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to extract arguments: {str(e)}") def test_transform_openai_tool_call_request_to_mcp_tool_call_request(): """Test OpenAI tool call to MCP tool call transformation""" test_name = "test_transform_openai_tool_call_request_to_mcp_tool_call_request" try: openai_tool = { "function": { "name": "test_function", "arguments": {"param1": "value1", "param2": "value2"} } } result = transform_openai_tool_call_request_to_mcp_tool_call_request(openai_tool) assert result.name == "test_function" assert result.arguments["param1"] == "value1" assert result.arguments["param2"] == "value2" log_test_result(test_name, "PASS", "Successfully transformed OpenAI tool call to MCP format") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to transform tool call: {str(e)}") def test_auto_detect_transport(): """Test transport auto-detection""" test_name = "test_auto_detect_transport" try: # Test HTTP URL http_url = "http://localhost:8080/mcp" transport = auto_detect_transport(http_url) assert transport == "streamable_http" # Test HTTPS URL https_url = "https://example.com/mcp" transport = auto_detect_transport(https_url) assert transport == "streamable_http" # Test WebSocket URL ws_url = "ws://localhost:8080/mcp" transport = auto_detect_transport(ws_url) assert transport == "sse" # Test stdio stdio_url = "stdio://local" transport = auto_detect_transport(stdio_url) assert transport == "stdio" # Test unknown scheme unknown_url = "unknown://test" transport = auto_detect_transport(unknown_url) assert transport == "sse" # Default log_test_result(test_name, "PASS", "Successfully auto-detected all transport types") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to auto-detect transport: {str(e)}") def test_connect_to_mcp_server(): """Test MCP server connection configuration""" test_name = "test_connect_to_mcp_server" try: from swarms.schemas.mcp_schemas import MCPConnection # Create connection object connection = MCPConnection( url="http://localhost:8080/mcp", transport="streamable_http", timeout=10, headers={"Content-Type": "application/json"}, authorization_token="test_token" ) headers, timeout, transport, url = connect_to_mcp_server(connection) assert url == "http://localhost:8080/mcp" assert transport == "streamable_http" assert timeout == 10 assert "Authorization" in headers assert headers["Authorization"] == "Bearer test_token" log_test_result(test_name, "PASS", "Successfully configured MCP server connection") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to configure connection: {str(e)}") async def test_aget_mcp_tools(): """Test async MCP tools fetching""" test_name = "test_aget_mcp_tools" try: # This will attempt to connect to the actual server tools = await aget_mcp_tools( server_path=TEST_CONFIG["server_url"], format="openai", transport=TEST_CONFIG["transport"] ) assert isinstance(tools, list) log_test_result(test_name, "PASS", f"Successfully fetched {len(tools)} tools from server") except MCPConnectionError as e: log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to fetch tools: {str(e)}") def test_get_mcp_tools_sync(): """Test synchronous MCP tools fetching""" test_name = "test_get_mcp_tools_sync" try: tools = get_mcp_tools_sync( server_path=TEST_CONFIG["server_url"], format="openai", transport=TEST_CONFIG["transport"] ) assert isinstance(tools, list) log_test_result(test_name, "PASS", f"Successfully fetched {len(tools)} tools synchronously") except MCPConnectionError as e: log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to fetch tools sync: {str(e)}") def test_fetch_tools_for_server(): """Test fetching tools for a single server""" test_name = "test_fetch_tools_for_server" try: tools = _fetch_tools_for_server( url=TEST_CONFIG["server_url"], format="openai", transport=TEST_CONFIG["transport"] ) assert isinstance(tools, list) log_test_result(test_name, "PASS", f"Successfully fetched tools for single server: {len(tools)} tools") except MCPConnectionError as e: log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to fetch tools for server: {str(e)}") def test_get_tools_for_multiple_mcp_servers(): """Test fetching tools from multiple servers""" test_name = "test_get_tools_for_multiple_mcp_servers" try: urls = [TEST_CONFIG["server_url"]] # Using single server for testing tools = get_tools_for_multiple_mcp_servers( urls=urls, format="openai", transport=TEST_CONFIG["transport"], max_workers=2 ) assert isinstance(tools, list) log_test_result(test_name, "PASS", f"Successfully fetched tools from multiple servers: {len(tools)} tools") except MCPConnectionError as e: log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to fetch tools from multiple servers: {str(e)}") async def test_execute_tool_call_simple(): """Test simple tool execution""" test_name = "test_execute_tool_call_simple" try: # First try to get available tools try: tools = await aget_mcp_tools( server_path=TEST_CONFIG["server_url"], format="openai", transport=TEST_CONFIG["transport"] ) if not tools: log_test_result(test_name, "SKIP", "No tools available for testing") return # Use the first available tool for testing first_tool = tools[0] tool_name = first_tool["function"]["name"] # Create a basic tool call request tool_call_request = { "function": { "name": tool_name, "arguments": {} # Basic empty arguments } } result = await execute_tool_call_simple( response=tool_call_request, server_path=TEST_CONFIG["server_url"], transport=TEST_CONFIG["transport"], output_type="str" ) assert result is not None log_test_result(test_name, "PASS", f"Successfully executed tool call for {tool_name}") except MCPConnectionError: log_test_result(test_name, "SKIP", "Server not available for tool execution test") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to execute tool call: {str(e)}") async def test_create_server_tool_mapping(): """Test server tool mapping creation""" test_name = "test_create_server_tool_mapping" try: urls = [TEST_CONFIG["server_url"]] mapping = await _create_server_tool_mapping_async( urls=urls, format="openai", transport=TEST_CONFIG["transport"] ) assert isinstance(mapping, dict) log_test_result(test_name, "PASS", f"Successfully created server tool mapping with {len(mapping)} functions") except MCPConnectionError as e: log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to create server tool mapping: {str(e)}") async def test_execute_multiple_tools_on_multiple_servers(): """Test executing multiple tools across servers""" test_name = "test_execute_multiple_tools_on_multiple_servers" try: urls = [TEST_CONFIG["server_url"]] # First get available tools try: tools = await aget_mcp_tools( server_path=TEST_CONFIG["server_url"], format="openai", transport=TEST_CONFIG["transport"] ) if not tools: log_test_result(test_name, "SKIP", "No tools available for testing") return # Create test requests using available tools responses = [] for tool in tools[:2]: # Test with first 2 tools tool_call = { "function": { "name": tool["function"]["name"], "arguments": {} } } responses.append(tool_call) if not responses: log_test_result(test_name, "SKIP", "No suitable tools found for testing") return results = await execute_multiple_tools_on_multiple_mcp_servers( responses=responses, urls=urls, transport=TEST_CONFIG["transport"], max_concurrent=2 ) assert isinstance(results, list) log_test_result(test_name, "PASS", f"Successfully executed {len(results)} tool calls") except MCPConnectionError: log_test_result(test_name, "SKIP", "Server not available for multiple tool execution test") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed to execute multiple tools: {str(e)}") def test_execute_multiple_tools_sync(): """Test synchronous multiple tool execution""" test_name = "test_execute_multiple_tools_sync" try: urls = [TEST_CONFIG["server_url"]] # Create minimal test requests responses = [ { "function": { "name": "test_function", # This will likely fail but tests the sync wrapper "arguments": {} } } ] results = execute_multiple_tools_on_multiple_mcp_servers_sync( responses=responses, urls=urls, transport=TEST_CONFIG["transport"], max_concurrent=1 ) assert isinstance(results, list) log_test_result(test_name, "PASS", f"Successfully ran sync multiple tools execution (got {len(results)} results)") except MCPConnectionError as e: log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") except Exception as e: log_test_result(test_name, "FAIL", error=f"Failed sync multiple tools execution: {str(e)}") def test_error_handling(): """Test error handling for various scenarios""" test_name = "test_error_handling" try: # Test invalid server URL try: get_mcp_tools_sync( server_path="http://invalid-url:99999/mcp", transport="streamable_http" ) assert False, "Should have raised an exception" except MCPConnectionError: pass # Expected # Test invalid connection object try: connect_to_mcp_server("invalid_connection") assert False, "Should have raised an exception" except MCPValidationError: pass # Expected # Test invalid transport detection transport = auto_detect_transport("") assert transport == "sse" # Should default to sse log_test_result(test_name, "PASS", "All error handling tests passed") except Exception as e: log_test_result(test_name, "FAIL", error=f"Error handling test failed: {str(e)}") async def run_all_tests(): """Run all test functions""" logger.info("Starting MCP unit tests...") # Synchronous tests test_transform_mcp_tool_to_openai_tool() test_get_function_arguments() test_transform_openai_tool_call_request_to_mcp_tool_call_request() test_auto_detect_transport() test_connect_to_mcp_server() test_get_mcp_tools_sync() test_fetch_tools_for_server() test_get_tools_for_multiple_mcp_servers() test_execute_multiple_tools_sync() test_error_handling() # Asynchronous tests await test_aget_mcp_tools() await test_execute_tool_call_simple() await test_create_server_tool_mapping() await test_execute_multiple_tools_on_multiple_servers() logger.info(f"Completed all tests. Total tests run: {len(test_results)}") def generate_markdown_report(): """Generate markdown report of test results""" passed_tests = [r for r in test_results if r["status"] == "PASS"] failed_tests = [r for r in test_results if r["status"] == "FAIL"] skipped_tests = [r for r in test_results if r["status"] == "SKIP"] markdown_content = f"""# MCP Unit Test Results ## Summary - **Total Tests**: {len(test_results)} - **Passed**: {len(passed_tests)} - **Failed**: {len(failed_tests)} - **Skipped**: {len(skipped_tests)} - **Success Rate**: {(len(passed_tests)/len(test_results)*100):.1f}% ## Test Configuration - **Server URL**: {TEST_CONFIG["server_url"]} - **Transport**: {TEST_CONFIG["transport"]} - **Timeout**: {TEST_CONFIG["timeout"]}s ## Test Results ### ✅ Passed Tests ({len(passed_tests)}) """ for test in passed_tests: markdown_content += f"- **{test['test_name']}**: {test['message']}\n" if failed_tests: markdown_content += f"\n### ❌ Failed Tests ({len(failed_tests)})\n" for test in failed_tests: markdown_content += f"- **{test['test_name']}**: {test['error']}\n" if skipped_tests: markdown_content += f"\n### ⏭️ Skipped Tests ({len(skipped_tests)})\n" for test in skipped_tests: markdown_content += f"- **{test['test_name']}**: {test['message']}\n" markdown_content += """ ## Detailed Results | Test Name | Status | Message/Error | Timestamp | |-----------|---------|---------------|-----------| """ for test in test_results: status_emoji = {"PASS": "✅", "FAIL": "❌", "SKIP": "⏭️"}.get(test["status"], "❓") message = test.get("message") or test.get("error", "") markdown_content += f"| {test['test_name']} | {status_emoji} {test['status']} | {message} | {test['timestamp']} |\n" markdown_content += f""" ## Notes - Tests marked as SKIP typically indicate the MCP server was not available at {TEST_CONFIG["server_url"]} - Connection tests may fail if the server is not running or configured differently - Tool execution tests depend on the specific tools available on the server Generated at: {datetime.now().isoformat()} """ return markdown_content async def main(): """Main test runner""" try: await run_all_tests() # Generate and save markdown report markdown_report = generate_markdown_report() with open("mcp_test_results.md", "w") as f: f.write(markdown_report) logger.info("Test results saved to mcp_test_results.md") # Print summary passed = len([r for r in test_results if r["status"] == "PASS"]) failed = len([r for r in test_results if r["status"] == "FAIL"]) skipped = len([r for r in test_results if r["status"] == "SKIP"]) print(f"\n{'='*50}") print("TEST SUMMARY") print(f"{'='*50}") print(f"Total Tests: {len(test_results)}") print(f"Passed: {passed}") print(f"Failed: {failed}") print(f"Skipped: {skipped}") print(f"Success Rate: {(passed/len(test_results)*100):.1f}%") print(f"{'='*50}") except Exception as e: logger.error(f"Error running tests: {str(e)}") logger.error(f"Traceback: {traceback.format_exc()}") if __name__ == "__main__": asyncio.run(main())