test: rewrite MCP integration tests from user perspective

pull/819/head
Pavan Kumar 2 days ago committed by ascender1729
parent 757ad598db
commit 9e62c12662

@ -1,75 +1,136 @@
import pytest
from swarms.tools.mcp_integration import MCPServerSseParams
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
def test_interactive_multi_agent_mcp():
# Configure two MCP servers
server_one = MCPServerSseParams(
url="http://0.0.0.0:6274",
headers={"Content-Type": "application/json"}
)
server_two = MCPServerSseParams(
url="http://0.0.0.0:6275",
headers={"Content-Type": "application/json"}
)
# Create two agents with different roles
finance_agent = Agent(
agent_name="Finance-Agent",
agent_description="Financial analysis expert",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
mcp_servers=[server_one],
interactive=True,
streaming_on=True
)
research_agent = Agent(
agent_name="Research-Agent",
agent_description="Market research specialist",
system_prompt="You are a market research specialist. Analyze market trends and provide insights.",
max_loops=1,
mcp_servers=[server_two],
interactive=True,
streaming_on=True
)
try:
# Interactive loop
while True:
# Get user input for which agent to use
print("\nWhich agent would you like to interact with?")
print("1. Finance Agent")
print("2. Research Agent")
print("3. Exit")
choice = input("Enter your choice (1-3): ")
if choice == "3":
break
# Get the task from user
task = input("\nEnter your task for the agent: ")
# Route to appropriate agent
if choice == "1":
response = finance_agent.run(task)
print(f"\nFinance Agent Response:\n{response}")
elif choice == "2":
response = research_agent.run(task)
print(f"\nResearch Agent Response:\n{response}")
else:
print("Invalid choice, please try again")
except Exception as e:
pytest.fail(f"Interactive multi-agent test failed: {e}")
def test_mcp_invalid_params():
import asyncio
from swarms.tools.mcp_integration import (
MCPServer,
MCPServerStdio,
MCPServerSse,
mcp_flow,
mcp_flow_get_tool_schema,
batch_mcp_flow
)
# Test basic server connectivity
def test_server_connection():
"""
Test that a user can connect to the MCP server successfully
"""
params = {"url": "http://localhost:8000"}
server = MCPServerSse(params, cache_tools_list=True)
# Connect should work
asyncio.run(server.connect())
assert server.session is not None
# Cleanup should work
asyncio.run(server.cleanup())
assert server.session is None
# Test tool listing functionality
def test_list_tools():
"""
Test that a user can retrieve available tools from the server
"""
params = {"url": "http://localhost:8000"}
server = MCPServerSse(params)
asyncio.run(server.connect())
tools = asyncio.run(server.list_tools())
assert isinstance(tools, list)
assert len(tools) > 0
asyncio.run(server.cleanup())
# Test tool execution
def test_tool_execution():
"""
Test that a user can execute a tool successfully
"""
params = {"url": "http://localhost:8000"}
function_call = {
"tool_name": "add",
"arguments": {"a": 5, "b": 3}
}
result = mcp_flow(params, function_call)
assert result is not None
# Test batch operations
def test_batch_execution():
"""
Test that a user can execute multiple tools in batch
"""
params_list = [
{"url": "http://localhost:8000"},
{"url": "http://localhost:8000"}
]
function_calls = [
{"tool_name": "add", "arguments": {"a": 1, "b": 2}},
{"tool_name": "subtract", "arguments": {"a": 5, "b": 3}}
]
results = batch_mcp_flow(params_list, function_calls)
assert len(results) == 2
assert all(result is not None for result in results)
# Test error handling
def test_error_handling():
"""
Test that users receive proper error messages for invalid operations
"""
params = {"url": "http://localhost:8000"}
invalid_function = {
"tool_name": "nonexistent_tool",
"arguments": {}
}
with pytest.raises(Exception):
mcp_flow(None, {})
mcp_flow(params, invalid_function)
# Test tool schema retrieval
def test_get_tool_schema():
"""
Test that users can retrieve tool schemas correctly
"""
params = {"url": "http://localhost:8000"}
schema = mcp_flow_get_tool_schema(params)
assert isinstance(schema, dict)
assert "tools" in schema or "functions" in schema
# Test server reconnection
def test_server_reconnection():
"""
Test that users can reconnect to the server after disconnection
"""
params = {"url": "http://localhost:8000"}
server = MCPServerSse(params)
# First connection
asyncio.run(server.connect())
asyncio.run(server.cleanup())
# Second connection should work
asyncio.run(server.connect())
assert server.session is not None
asyncio.run(server.cleanup())
# Test cache functionality
def test_cache_behavior():
"""
Test that tool caching works as expected for users
"""
params = {"url": "http://localhost:8000"}
server = MCPServerSse(params, cache_tools_list=True)
asyncio.run(server.connect())
# First call should cache
tools1 = asyncio.run(server.list_tools())
# Second call should use cache
tools2 = asyncio.run(server.list_tools())
assert tools1 == tools2
if __name__ == "__main__":
test_interactive_multi_agent_mcp()
asyncio.run(server.cleanup())

Loading…
Cancel
Save