diff --git a/tests/structs/test_support_mcp.py b/tests/structs/test_support_mcp.py new file mode 100644 index 00000000..4d46baff --- /dev/null +++ b/tests/structs/test_support_mcp.py @@ -0,0 +1,581 @@ +import asyncio +import traceback +from datetime import datetime + +from loguru import logger + +# Import the functions to test (assuming they're in a module called mcp_client) +# from mcp_client import * # Replace with actual import +from swarms.tools.mcp_client_call import ( + MCPConnectionError, + MCPValidationError, + _create_server_tool_mapping_async, + _fetch_tools_for_server, + _get_function_arguments, + aget_mcp_tools, + auto_detect_transport, + connect_to_mcp_server, + execute_multiple_tools_on_multiple_mcp_servers, + execute_multiple_tools_on_multiple_mcp_servers_sync, + execute_tool_call_simple, + get_mcp_tools_sync, + get_tools_for_multiple_mcp_servers, + transform_mcp_tool_to_openai_tool, + transform_openai_tool_call_request_to_mcp_tool_call_request, +) + +# Configure logging +logger.add("test_results.log", rotation="10 MB", level="DEBUG") + +# Test configuration +TEST_CONFIG = { + "server_url": "http://localhost:8080/mcp", + "transport": "streamable_http", + "timeout": 10 +} + +# Test results storage +test_results = [] + +def log_test_result(test_name: str, status: str, message: str = "", error: str = ""): + """Log test result and add to results list""" + result = { + "test_name": test_name, + "status": status, + "message": message, + "error": error, + "timestamp": datetime.now().isoformat() + } + test_results.append(result) + + if status == "PASS": + logger.success(f"✓ {test_name}: {message}") + elif status == "FAIL": + logger.error(f"✗ {test_name}: {error}") + else: + logger.info(f"~ {test_name}: {message}") + +def test_transform_mcp_tool_to_openai_tool(): + """Test MCP tool to OpenAI tool transformation""" + test_name = "test_transform_mcp_tool_to_openai_tool" + + try: + # Create mock MCP tool + class MockMCPTool: + def __init__(self, name, description, input_schema): + self.name = name + self.description = description + self.inputSchema = input_schema + + mock_tool = MockMCPTool( + name="test_function", + description="Test function description", + input_schema={"type": "object", "properties": {"param1": {"type": "string"}}} + ) + + result = transform_mcp_tool_to_openai_tool(mock_tool) + + # Validate result structure + assert result["type"] == "function" + assert result["function"]["name"] == "test_function" + assert result["function"]["description"] == "Test function description" + assert result["function"]["parameters"]["type"] == "object" + + log_test_result(test_name, "PASS", "Successfully transformed MCP tool to OpenAI format") + + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to transform tool: {str(e)}") + +def test_get_function_arguments(): + """Test function argument extraction""" + test_name = "test_get_function_arguments" + + try: + # Test with dict arguments + function_def = {"arguments": {"param1": "value1", "param2": "value2"}} + result = _get_function_arguments(function_def) + assert isinstance(result, dict) + assert result["param1"] == "value1" + + # Test with string arguments + function_def_str = {"arguments": '{"param1": "value1", "param2": "value2"}'} + result_str = _get_function_arguments(function_def_str) + assert isinstance(result_str, dict) + assert result_str["param1"] == "value1" + + # Test with empty arguments + function_def_empty = {} + result_empty = _get_function_arguments(function_def_empty) + assert result_empty == {} + + log_test_result(test_name, "PASS", "Successfully extracted function arguments in all formats") + + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to extract arguments: {str(e)}") + +def test_transform_openai_tool_call_request_to_mcp_tool_call_request(): + """Test OpenAI tool call to MCP tool call transformation""" + test_name = "test_transform_openai_tool_call_request_to_mcp_tool_call_request" + + try: + openai_tool = { + "function": { + "name": "test_function", + "arguments": {"param1": "value1", "param2": "value2"} + } + } + + result = transform_openai_tool_call_request_to_mcp_tool_call_request(openai_tool) + + assert result.name == "test_function" + assert result.arguments["param1"] == "value1" + assert result.arguments["param2"] == "value2" + + log_test_result(test_name, "PASS", "Successfully transformed OpenAI tool call to MCP format") + + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to transform tool call: {str(e)}") + +def test_auto_detect_transport(): + """Test transport auto-detection""" + test_name = "test_auto_detect_transport" + + try: + # Test HTTP URL + http_url = "http://localhost:8080/mcp" + transport = auto_detect_transport(http_url) + assert transport == "streamable_http" + + # Test HTTPS URL + https_url = "https://example.com/mcp" + transport = auto_detect_transport(https_url) + assert transport == "streamable_http" + + # Test WebSocket URL + ws_url = "ws://localhost:8080/mcp" + transport = auto_detect_transport(ws_url) + assert transport == "sse" + + # Test stdio + stdio_url = "stdio://local" + transport = auto_detect_transport(stdio_url) + assert transport == "stdio" + + # Test unknown scheme + unknown_url = "unknown://test" + transport = auto_detect_transport(unknown_url) + assert transport == "sse" # Default + + log_test_result(test_name, "PASS", "Successfully auto-detected all transport types") + + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to auto-detect transport: {str(e)}") + +def test_connect_to_mcp_server(): + """Test MCP server connection configuration""" + test_name = "test_connect_to_mcp_server" + + try: + from swarms.schemas.mcp_schemas import MCPConnection + + # Create connection object + connection = MCPConnection( + url="http://localhost:8080/mcp", + transport="streamable_http", + timeout=10, + headers={"Content-Type": "application/json"}, + authorization_token="test_token" + ) + + headers, timeout, transport, url = connect_to_mcp_server(connection) + + assert url == "http://localhost:8080/mcp" + assert transport == "streamable_http" + assert timeout == 10 + assert "Authorization" in headers + assert headers["Authorization"] == "Bearer test_token" + + log_test_result(test_name, "PASS", "Successfully configured MCP server connection") + + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to configure connection: {str(e)}") + +async def test_aget_mcp_tools(): + """Test async MCP tools fetching""" + test_name = "test_aget_mcp_tools" + + try: + # This will attempt to connect to the actual server + tools = await aget_mcp_tools( + server_path=TEST_CONFIG["server_url"], + format="openai", + transport=TEST_CONFIG["transport"] + ) + + assert isinstance(tools, list) + log_test_result(test_name, "PASS", f"Successfully fetched {len(tools)} tools from server") + + except MCPConnectionError as e: + log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to fetch tools: {str(e)}") + +def test_get_mcp_tools_sync(): + """Test synchronous MCP tools fetching""" + test_name = "test_get_mcp_tools_sync" + + try: + tools = get_mcp_tools_sync( + server_path=TEST_CONFIG["server_url"], + format="openai", + transport=TEST_CONFIG["transport"] + ) + + assert isinstance(tools, list) + log_test_result(test_name, "PASS", f"Successfully fetched {len(tools)} tools synchronously") + + except MCPConnectionError as e: + log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to fetch tools sync: {str(e)}") + +def test_fetch_tools_for_server(): + """Test fetching tools for a single server""" + test_name = "test_fetch_tools_for_server" + + try: + tools = _fetch_tools_for_server( + url=TEST_CONFIG["server_url"], + format="openai", + transport=TEST_CONFIG["transport"] + ) + + assert isinstance(tools, list) + log_test_result(test_name, "PASS", f"Successfully fetched tools for single server: {len(tools)} tools") + + except MCPConnectionError as e: + log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to fetch tools for server: {str(e)}") + +def test_get_tools_for_multiple_mcp_servers(): + """Test fetching tools from multiple servers""" + test_name = "test_get_tools_for_multiple_mcp_servers" + + try: + urls = [TEST_CONFIG["server_url"]] # Using single server for testing + + tools = get_tools_for_multiple_mcp_servers( + urls=urls, + format="openai", + transport=TEST_CONFIG["transport"], + max_workers=2 + ) + + assert isinstance(tools, list) + log_test_result(test_name, "PASS", f"Successfully fetched tools from multiple servers: {len(tools)} tools") + + except MCPConnectionError as e: + log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to fetch tools from multiple servers: {str(e)}") + +async def test_execute_tool_call_simple(): + """Test simple tool execution""" + test_name = "test_execute_tool_call_simple" + + try: + # First try to get available tools + try: + tools = await aget_mcp_tools( + server_path=TEST_CONFIG["server_url"], + format="openai", + transport=TEST_CONFIG["transport"] + ) + + if not tools: + log_test_result(test_name, "SKIP", "No tools available for testing") + return + + # Use the first available tool for testing + first_tool = tools[0] + tool_name = first_tool["function"]["name"] + + # Create a basic tool call request + tool_call_request = { + "function": { + "name": tool_name, + "arguments": {} # Basic empty arguments + } + } + + result = await execute_tool_call_simple( + response=tool_call_request, + server_path=TEST_CONFIG["server_url"], + transport=TEST_CONFIG["transport"], + output_type="str" + ) + + assert result is not None + log_test_result(test_name, "PASS", f"Successfully executed tool call for {tool_name}") + + except MCPConnectionError: + log_test_result(test_name, "SKIP", "Server not available for tool execution test") + + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to execute tool call: {str(e)}") + +async def test_create_server_tool_mapping(): + """Test server tool mapping creation""" + test_name = "test_create_server_tool_mapping" + + try: + urls = [TEST_CONFIG["server_url"]] + + mapping = await _create_server_tool_mapping_async( + urls=urls, + format="openai", + transport=TEST_CONFIG["transport"] + ) + + assert isinstance(mapping, dict) + log_test_result(test_name, "PASS", f"Successfully created server tool mapping with {len(mapping)} functions") + + except MCPConnectionError as e: + log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to create server tool mapping: {str(e)}") + +async def test_execute_multiple_tools_on_multiple_servers(): + """Test executing multiple tools across servers""" + test_name = "test_execute_multiple_tools_on_multiple_servers" + + try: + urls = [TEST_CONFIG["server_url"]] + + # First get available tools + try: + tools = await aget_mcp_tools( + server_path=TEST_CONFIG["server_url"], + format="openai", + transport=TEST_CONFIG["transport"] + ) + + if not tools: + log_test_result(test_name, "SKIP", "No tools available for testing") + return + + # Create test requests using available tools + responses = [] + for tool in tools[:2]: # Test with first 2 tools + tool_call = { + "function": { + "name": tool["function"]["name"], + "arguments": {} + } + } + responses.append(tool_call) + + if not responses: + log_test_result(test_name, "SKIP", "No suitable tools found for testing") + return + + results = await execute_multiple_tools_on_multiple_mcp_servers( + responses=responses, + urls=urls, + transport=TEST_CONFIG["transport"], + max_concurrent=2 + ) + + assert isinstance(results, list) + log_test_result(test_name, "PASS", f"Successfully executed {len(results)} tool calls") + + except MCPConnectionError: + log_test_result(test_name, "SKIP", "Server not available for multiple tool execution test") + + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed to execute multiple tools: {str(e)}") + +def test_execute_multiple_tools_sync(): + """Test synchronous multiple tool execution""" + test_name = "test_execute_multiple_tools_sync" + + try: + urls = [TEST_CONFIG["server_url"]] + + # Create minimal test requests + responses = [ + { + "function": { + "name": "test_function", # This will likely fail but tests the sync wrapper + "arguments": {} + } + } + ] + + results = execute_multiple_tools_on_multiple_mcp_servers_sync( + responses=responses, + urls=urls, + transport=TEST_CONFIG["transport"], + max_concurrent=1 + ) + + assert isinstance(results, list) + log_test_result(test_name, "PASS", f"Successfully ran sync multiple tools execution (got {len(results)} results)") + + except MCPConnectionError as e: + log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Failed sync multiple tools execution: {str(e)}") + +def test_error_handling(): + """Test error handling for various scenarios""" + test_name = "test_error_handling" + + try: + # Test invalid server URL + try: + get_mcp_tools_sync( + server_path="http://invalid-url:99999/mcp", + transport="streamable_http" + ) + assert False, "Should have raised an exception" + except MCPConnectionError: + pass # Expected + + # Test invalid connection object + try: + connect_to_mcp_server("invalid_connection") + assert False, "Should have raised an exception" + except MCPValidationError: + pass # Expected + + # Test invalid transport detection + transport = auto_detect_transport("") + assert transport == "sse" # Should default to sse + + log_test_result(test_name, "PASS", "All error handling tests passed") + + except Exception as e: + log_test_result(test_name, "FAIL", error=f"Error handling test failed: {str(e)}") + +async def run_all_tests(): + """Run all test functions""" + logger.info("Starting MCP unit tests...") + + # Synchronous tests + test_transform_mcp_tool_to_openai_tool() + test_get_function_arguments() + test_transform_openai_tool_call_request_to_mcp_tool_call_request() + test_auto_detect_transport() + test_connect_to_mcp_server() + test_get_mcp_tools_sync() + test_fetch_tools_for_server() + test_get_tools_for_multiple_mcp_servers() + test_execute_multiple_tools_sync() + test_error_handling() + + # Asynchronous tests + await test_aget_mcp_tools() + await test_execute_tool_call_simple() + await test_create_server_tool_mapping() + await test_execute_multiple_tools_on_multiple_servers() + + logger.info(f"Completed all tests. Total tests run: {len(test_results)}") + +def generate_markdown_report(): + """Generate markdown report of test results""" + + passed_tests = [r for r in test_results if r["status"] == "PASS"] + failed_tests = [r for r in test_results if r["status"] == "FAIL"] + skipped_tests = [r for r in test_results if r["status"] == "SKIP"] + + markdown_content = f"""# MCP Unit Test Results + +## Summary +- **Total Tests**: {len(test_results)} +- **Passed**: {len(passed_tests)} +- **Failed**: {len(failed_tests)} +- **Skipped**: {len(skipped_tests)} +- **Success Rate**: {(len(passed_tests)/len(test_results)*100):.1f}% + +## Test Configuration +- **Server URL**: {TEST_CONFIG["server_url"]} +- **Transport**: {TEST_CONFIG["transport"]} +- **Timeout**: {TEST_CONFIG["timeout"]}s + +## Test Results + +### ✅ Passed Tests ({len(passed_tests)}) +""" + + for test in passed_tests: + markdown_content += f"- **{test['test_name']}**: {test['message']}\n" + + if failed_tests: + markdown_content += f"\n### ❌ Failed Tests ({len(failed_tests)})\n" + for test in failed_tests: + markdown_content += f"- **{test['test_name']}**: {test['error']}\n" + + if skipped_tests: + markdown_content += f"\n### ⏭️ Skipped Tests ({len(skipped_tests)})\n" + for test in skipped_tests: + markdown_content += f"- **{test['test_name']}**: {test['message']}\n" + + markdown_content += """ +## Detailed Results + +| Test Name | Status | Message/Error | Timestamp | +|-----------|---------|---------------|-----------| +""" + + for test in test_results: + status_emoji = {"PASS": "✅", "FAIL": "❌", "SKIP": "⏭️"}.get(test["status"], "❓") + message = test.get("message") or test.get("error", "") + markdown_content += f"| {test['test_name']} | {status_emoji} {test['status']} | {message} | {test['timestamp']} |\n" + + markdown_content += f""" +## Notes +- Tests marked as SKIP typically indicate the MCP server was not available at {TEST_CONFIG["server_url"]} +- Connection tests may fail if the server is not running or configured differently +- Tool execution tests depend on the specific tools available on the server + +Generated at: {datetime.now().isoformat()} +""" + + return markdown_content + +async def main(): + """Main test runner""" + try: + await run_all_tests() + + # Generate and save markdown report + markdown_report = generate_markdown_report() + + with open("mcp_test_results.md", "w") as f: + f.write(markdown_report) + + logger.info("Test results saved to mcp_test_results.md") + + # Print summary + passed = len([r for r in test_results if r["status"] == "PASS"]) + failed = len([r for r in test_results if r["status"] == "FAIL"]) + skipped = len([r for r in test_results if r["status"] == "SKIP"]) + + print(f"\n{'='*50}") + print("TEST SUMMARY") + print(f"{'='*50}") + print(f"Total Tests: {len(test_results)}") + print(f"Passed: {passed}") + print(f"Failed: {failed}") + print(f"Skipped: {skipped}") + print(f"Success Rate: {(passed/len(test_results)*100):.1f}%") + print(f"{'='*50}") + + except Exception as e: + logger.error(f"Error running tests: {str(e)}") + logger.error(f"Traceback: {traceback.format_exc()}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file