parent
77eee8eb29
commit
3a836ec6f5
@ -0,0 +1,211 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Test Core MCP Streaming Functionality
|
||||||
|
This script tests the basic MCP streaming integration to ensure everything works.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Add project root to path
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent))
|
||||||
|
|
||||||
|
def test_imports():
|
||||||
|
"""Test that all required imports work."""
|
||||||
|
print("Testing imports...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Test basic swarms imports
|
||||||
|
from swarms.structs import Agent
|
||||||
|
print("Agent import successful")
|
||||||
|
|
||||||
|
# Test MCP streaming imports
|
||||||
|
from swarms.tools.mcp_unified_client import (
|
||||||
|
MCPUnifiedClient,
|
||||||
|
UnifiedTransportConfig,
|
||||||
|
call_tool_streaming_sync,
|
||||||
|
MCP_STREAMING_AVAILABLE
|
||||||
|
)
|
||||||
|
print("MCP unified client imports successful")
|
||||||
|
print(f" MCP Streaming Available: {MCP_STREAMING_AVAILABLE}")
|
||||||
|
|
||||||
|
# Test MCP schemas
|
||||||
|
from swarms.schemas.mcp_schemas import MCPConnection
|
||||||
|
print("MCP schemas import successful")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except ImportError as e:
|
||||||
|
print(f"Import error: {e}")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Unexpected error: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def test_agent_creation():
|
||||||
|
"""Test that Agent can be created with MCP streaming parameters."""
|
||||||
|
print("\nTesting Agent creation with MCP streaming...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
from swarms.structs import Agent
|
||||||
|
|
||||||
|
# Test basic agent creation
|
||||||
|
agent = Agent(
|
||||||
|
model_name="gpt-4o-mini",
|
||||||
|
mcp_streaming_enabled=True,
|
||||||
|
mcp_streaming_timeout=30,
|
||||||
|
verbose=True
|
||||||
|
)
|
||||||
|
print("Basic agent creation successful")
|
||||||
|
|
||||||
|
# Test agent with MCP URL
|
||||||
|
agent_with_mcp = Agent(
|
||||||
|
model_name="gpt-4o-mini",
|
||||||
|
mcp_url="http://localhost:8000/mcp",
|
||||||
|
mcp_streaming_enabled=True,
|
||||||
|
verbose=True
|
||||||
|
)
|
||||||
|
print("Agent with MCP URL creation successful")
|
||||||
|
|
||||||
|
# Test streaming status
|
||||||
|
status = agent_with_mcp.get_mcp_streaming_status()
|
||||||
|
print(f" Streaming status: {status}")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Agent creation error: {e}")
|
||||||
|
import traceback
|
||||||
|
print(f"Full traceback: {traceback.format_exc()}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def test_mcp_client():
|
||||||
|
"""Test MCP unified client functionality."""
|
||||||
|
print("\nTesting MCP unified client...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
from swarms.tools.mcp_unified_client import (
|
||||||
|
MCPUnifiedClient,
|
||||||
|
UnifiedTransportConfig,
|
||||||
|
create_auto_config
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test config creation
|
||||||
|
config = create_auto_config("http://localhost:8000/mcp")
|
||||||
|
print("Auto config creation successful")
|
||||||
|
|
||||||
|
# Test client creation
|
||||||
|
client = MCPUnifiedClient(config)
|
||||||
|
print("MCP client creation successful")
|
||||||
|
|
||||||
|
# Test config validation
|
||||||
|
print(f" Transport type: {client._get_effective_transport()}")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"MCP client error: {e}")
|
||||||
|
import traceback
|
||||||
|
print(f"Full traceback: {traceback.format_exc()}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def test_streaming_functions():
|
||||||
|
"""Test streaming function availability."""
|
||||||
|
print("\nTesting streaming functions...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
from swarms.tools.mcp_unified_client import (
|
||||||
|
call_tool_streaming_sync,
|
||||||
|
execute_tool_call_streaming_unified
|
||||||
|
)
|
||||||
|
print("Streaming functions import successful")
|
||||||
|
|
||||||
|
# Test function signatures
|
||||||
|
import inspect
|
||||||
|
sig = inspect.signature(call_tool_streaming_sync)
|
||||||
|
print(f" call_tool_streaming_sync signature: {sig}")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Streaming functions error: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def test_schemas():
|
||||||
|
"""Test MCP schemas functionality."""
|
||||||
|
print("\nTesting MCP schemas...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
from swarms.schemas.mcp_schemas import (
|
||||||
|
MCPConnection,
|
||||||
|
MCPStreamingConfig,
|
||||||
|
UnifiedTransportConfig
|
||||||
|
)
|
||||||
|
print("MCP schemas import successful")
|
||||||
|
|
||||||
|
# Test schema creation
|
||||||
|
connection = MCPConnection(
|
||||||
|
url="http://localhost:8000/mcp",
|
||||||
|
transport="streamable_http",
|
||||||
|
enable_streaming=True
|
||||||
|
)
|
||||||
|
print("MCP connection schema creation successful")
|
||||||
|
|
||||||
|
streaming_config = MCPStreamingConfig(
|
||||||
|
enable_streaming=True,
|
||||||
|
streaming_timeout=30
|
||||||
|
)
|
||||||
|
print("MCP streaming config creation successful")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"MCP schemas error: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Run all tests."""
|
||||||
|
print("Testing Core MCP Streaming Functionality")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
tests = [
|
||||||
|
test_imports,
|
||||||
|
test_agent_creation,
|
||||||
|
test_mcp_client,
|
||||||
|
test_streaming_functions,
|
||||||
|
test_schemas
|
||||||
|
]
|
||||||
|
|
||||||
|
passed = 0
|
||||||
|
total = len(tests)
|
||||||
|
|
||||||
|
for test in tests:
|
||||||
|
if test():
|
||||||
|
passed += 1
|
||||||
|
print()
|
||||||
|
|
||||||
|
print("=" * 60)
|
||||||
|
print(f"Test Results: {passed}/{total} tests passed")
|
||||||
|
|
||||||
|
if passed == total:
|
||||||
|
print("All tests passed! Core functionality is working correctly.")
|
||||||
|
print("\nWhat's working:")
|
||||||
|
print(" - MCP streaming imports")
|
||||||
|
print(" - Agent creation with MCP parameters")
|
||||||
|
print(" - MCP unified client")
|
||||||
|
print(" - Streaming functions")
|
||||||
|
print(" - MCP schemas")
|
||||||
|
print("\nYou can now use MCP streaming functionality!")
|
||||||
|
else:
|
||||||
|
print("Some tests failed. Please check the errors above.")
|
||||||
|
print("\nCommon fixes:")
|
||||||
|
print(" - Install required dependencies: pip install mcp mcp[streamable-http] httpx")
|
||||||
|
print(" - Check that all files are in the correct locations")
|
||||||
|
print(" - Verify that imports are working correctly")
|
||||||
|
|
||||||
|
return passed == total
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
success = main()
|
||||||
|
sys.exit(0 if success else 1)
|
Loading…
Reference in new issue