pull/891/merge
Pavan Kumar 1 month ago committed by GitHub
commit 409543a1f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,34 @@
# Multi MCP Execution Example
This example demonstrates using a list of MCP servers with an `Agent`.
Start the example servers in separate terminals:
```bash
python examples/tools/mcp_examples/servers/weather_server.py
python examples/tools/mcp_examples/servers/news_server.py
```
```python
import os
import json
from swarms import Agent
# Configure multiple MCP URLs
os.environ["MCP_URLS"] = "http://localhost:8000/sse,http://localhost:9001/sse"
agent = Agent(
agent_name="Multi-MCP-Agent",
model_name="gpt-4o-mini",
max_loops=1,
)
# Example JSON payloads produced by your model
response = json.dumps([
{"function_name": "get_weather", "server_url": "http://localhost:8000/sse", "payload": {"city": "London"}},
{"function_name": "get_news", "server_url": "http://localhost:9001/sse", "payload": {"topic": "ai"}},
])
agent.handle_multiple_mcp_tools(agent.mcp_urls, response)
```

@ -361,6 +361,7 @@ nav:
- Medical Swarm: "swarms/examples/swarms_api_medical.md" - Medical Swarm: "swarms/examples/swarms_api_medical.md"
- Finance Swarm: "swarms/examples/swarms_api_finance.md" - Finance Swarm: "swarms/examples/swarms_api_finance.md"
- ML Model Code Generation Swarm: "swarms/examples/swarms_api_ml_model.md" - ML Model Code Generation Swarm: "swarms/examples/swarms_api_ml_model.md"
- Multi-MCP Execution Example: "examples/multi_mcp_execution.md"
# - Swarm Models: # - Swarm Models:
# - Overview: "swarms/models/index.md" # - Overview: "swarms/models/index.md"

@ -478,6 +478,51 @@ agent.model_dump_json()
print(agent.to_toml()) print(agent.to_toml())
``` ```
### Multi-MCP Tool Execution
Execute tools from multiple MCP servers by providing a list of URLs via the
`mcp_urls` parameter or the `MCP_URLS` environment variable.
Start the example servers:
```bash
python examples/tools/mcp_examples/servers/weather_server.py
python examples/tools/mcp_examples/servers/news_server.py
```
```python
import os
import json
from swarms import Agent
# Using an environment variable for server configuration
os.environ["MCP_URLS"] = "http://localhost:8000/sse,http://localhost:9001/sse"
agent = Agent(
agent_name="Multi-MCP-Agent",
model_name="gpt-4o-mini",
max_loops=1,
)
# Example MCP payloads returned by your model
mcp_response = json.dumps([
{
"function_name": "get_price",
"server_url": "http://localhost:8000/sse",
"payload": {"symbol": "BTC"},
},
{
"function_name": "market_sentiment",
"server_url": "http://localhost:9001/sse",
"payload": {"symbol": "BTC"},
},
])
agent.handle_multiple_mcp_tools(agent.mcp_urls, mcp_response)
```
## Auto Generate Prompt + CPU Execution ## Auto Generate Prompt + CPU Execution

@ -62,7 +62,7 @@ The **Model Context Protocol (MCP)** integration enables Swarms agents to dynami
| Feature | Status | Expected | | Feature | Status | Expected |
|---------|--------|----------| |---------|--------|----------|
| **MCPConnection Model** | 🚧 Development | Q1 2024 | | **MCPConnection Model** | 🚧 Development | Q1 2024 |
| **Multiple Server Support** | 🚧 Planned | Q2 2024 | | **Multiple Server Support** | ✅ Ready | - |
| **Parallel Function Calling** | 🚧 Research | Q2 2024 | | **Parallel Function Calling** | 🚧 Research | Q2 2024 |
| **Auto-discovery** | 🚧 Planned | Q3 2024 | | **Auto-discovery** | 🚧 Planned | Q3 2024 |
@ -125,6 +125,34 @@ The **Model Context Protocol (MCP)** integration enables Swarms agents to dynami
) )
``` ```
---
### MCP Payload Format
When the agent generates a function call for an MCP tool it should return a
payload in the following format:
```json
[
{
"function_name": "tool_name",
"server_url": "http://server:8000/sse",
"payload": {"arg": "value"}
}
]
```
Use `handle_multiple_mcp_tools` to execute each payload across the configured
servers.
Example servers can be started with:
```bash
python examples/tools/mcp_examples/servers/weather_server.py
python examples/tools/mcp_examples/servers/news_server.py
```
--- ---
## Integration Flow ## Integration Flow
@ -214,7 +242,10 @@ graph TD
agent_name="Crypto-Trading-Agent", agent_name="Crypto-Trading-Agent",
agent_description="Real-time cryptocurrency market analyzer", agent_description="Real-time cryptocurrency market analyzer",
max_loops=2, max_loops=2,
mcp_url="http://crypto-server:8000/sse", mcp_urls=[
"http://crypto-server:8000/sse",
"http://backup-server:8001/sse",
],
output_type="json", output_type="json",
temperature=0.1, temperature=0.1,
) )
@ -406,19 +437,17 @@ graph TD
mcp_url = "http://server:8000/sse" mcp_url = "http://server:8000/sse"
``` ```
### 🚧 Single Server Limitation ### ✅ Multiple Server Support
Currently supports one server per agent: Agents can now connect to multiple MCP servers. Provide a list of URLs via
the `mcp_urls` parameter or set the environment variable `MCP_URLS`.
```python ```python
# ❌ Multiple servers not supported mcp_urls = [
mcp_servers = [
"http://server1:8000/sse", "http://server1:8000/sse",
"http://server2:8000/sse" "http://server2:8000/sse",
] ]
agent = Agent(mcp_urls=mcp_urls)
# ✅ Single server only
mcp_url = "http://primary-server:8000/sse"
``` ```
### 🚧 Sequential Execution ### 🚧 Sequential Execution
@ -688,7 +717,7 @@ graph TD
- **Connection Pooling** - Improved performance - **Connection Pooling** - Improved performance
=== "2 Week" === "2 Week"
- **Multiple Server Support** - Connect to multiple MCPs - **Multiple Server Support** - *Completed*
- **Parallel Execution** - Concurrent tool calling - **Parallel Execution** - Concurrent tool calling
- **Load Balancing** - Distribute requests across servers - **Load Balancing** - Distribute requests across servers
- **Advanced Monitoring** - Real-time metrics - **Advanced Monitoring** - Real-time metrics

@ -0,0 +1,12 @@
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("NewsServer")
mcp.settings.port = 9001
@mcp.tool(name="get_news", description="Return simple news headline")
def get_news(topic: str) -> str:
return f"Latest {topic} news headline"
if __name__ == "__main__":
mcp.run(transport="sse")

@ -0,0 +1,12 @@
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("WeatherServer")
mcp.settings.port = 8000
@mcp.tool(name="get_weather", description="Return simple weather info")
def get_weather(city: str) -> str:
return f"Weather in {city}: Sunny 22°C"
if __name__ == "__main__":
mcp.run(transport="sse")

@ -0,0 +1,73 @@
import os
import json
from swarms import Agent
import io
import sys
from contextlib import redirect_stdout
print("\n=== Testing Multiple MCP Tool Execution ===\n")
# Configure multiple MCP URLs
os.environ["MCP_URLS"] = "http://localhost:8000/sse,http://localhost:9001/sse"
def capture_output(func):
"""Capture printed output from a function"""
f = io.StringIO()
with redirect_stdout(f):
func()
return f.getvalue()
def test_direct_tool_execution():
"""Test directly executing tools on different MCP servers"""
print("Testing direct tool execution...\n")
agent = Agent(
agent_name="Multi-MCP-Agent",
model_name="gpt-4o-mini",
max_loops=1
)
# Create JSON payloads for multiple tools
payloads = [
{
"function_name": "get_weather",
"server_url": "http://localhost:8000/sse",
"payload": {"city": "Paris"}
},
{
"function_name": "get_news",
"server_url": "http://localhost:9001/sse",
"payload": {"topic": "science"}
}
]
# Execute the tools and capture output
print("Executing tools on multiple MCP servers...")
output = capture_output(
lambda: agent.handle_multiple_mcp_tools(agent.mcp_urls, json.dumps(payloads))
)
# Extract and display results
print("\nResults from MCP tools:")
print(output)
print("\nTest complete - Multiple MCP execution successful!")
def test_agent_configuration():
"""Test different ways to configure agents with multiple MCP URLs"""
print("\n=== Testing Agent MCP Configuration Methods ===\n")
# Method 1: Configure via environment variables (already set above)
agent1 = Agent(agent_name="Env-Config-Agent")
print(f"Agent1 MCP URLs (from env): {agent1.mcp_urls}")
# Method 2: Configure via direct parameter
agent2 = Agent(
agent_name="Direct-Config-Agent",
mcp_urls=["http://localhost:8000/sse", "http://localhost:9001/sse"]
)
print(f"Agent2 MCP URLs (from param): {agent2.mcp_urls}")
if __name__ == "__main__":
test_agent_configuration()
test_direct_tool_execution()

@ -1,5 +1,6 @@
import asyncio import asyncio
import json import json
import re
import logging import logging
import os import os
import random import random
@ -74,6 +75,7 @@ from swarms.structs.ma_utils import set_random_models_for_agents
from swarms.tools.mcp_client_call import ( from swarms.tools.mcp_client_call import (
execute_tool_call_simple, execute_tool_call_simple,
get_mcp_tools_sync, get_mcp_tools_sync,
execute_mcp_call,
) )
from swarms.schemas.mcp_schemas import ( from swarms.schemas.mcp_schemas import (
MCPConnection, MCPConnection,
@ -98,6 +100,25 @@ def parse_done_token(response: str) -> bool:
return "<DONE>" in response return "<DONE>" in response
def extract_json_from_response(response: str) -> List[Dict[str, Any]]:
"""Extract a JSON list from a model response string."""
if not response:
return []
if not isinstance(response, str):
return response if isinstance(response, list) else []
try:
return json.loads(response)
except json.JSONDecodeError:
match = re.search(r"\[[\s\S]*?\]", response)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
logger.error("Failed to parse MCP payloads from response")
return []
# Agent ID generator # Agent ID generator
def agent_id(): def agent_id():
"""Generate an agent id""" """Generate an agent id"""
@ -442,6 +463,8 @@ class Agent:
self.sop = sop self.sop = sop
self.sop_list = sop_list self.sop_list = sop_list
self.tools = tools self.tools = tools
# Ensure tool_struct exists even when no tools are provided
self.tool_struct = {}
self.system_prompt = system_prompt self.system_prompt = system_prompt
self.agent_name = agent_name self.agent_name = agent_name
self.agent_description = agent_description self.agent_description = agent_description
@ -537,7 +560,19 @@ class Agent:
self.no_print = no_print self.no_print = no_print
self.tools_list_dictionary = tools_list_dictionary self.tools_list_dictionary = tools_list_dictionary
self.mcp_url = mcp_url self.mcp_url = mcp_url
self.mcp_urls = mcp_urls if mcp_urls is None:
env_urls = os.getenv("MCP_URLS")
self.mcp_urls = (
[
url.strip()
for url in env_urls.split(",")
if url.strip()
]
if env_urls
else None
)
else:
self.mcp_urls = mcp_urls
self.react_on = react_on self.react_on = react_on
self.safety_prompt_on = safety_prompt_on self.safety_prompt_on = safety_prompt_on
self.random_models_on = random_models_on self.random_models_on = random_models_on
@ -1084,6 +1119,19 @@ class Agent:
response, loop_count response, loop_count
) )
if exists(self.mcp_urls):
try:
self.handle_multiple_mcp_tools(
self.mcp_urls,
response,
current_loop=loop_count,
)
except Exception as e:
logger.error(
f"Error handling multiple MCP tools: {e}"
)
self.execute_tools( self.execute_tools(
response=response, response=response,
loop_count=loop_count, loop_count=loop_count,
@ -2786,6 +2834,53 @@ class Agent:
logger.error(f"Error in MCP tool: {e}") logger.error(f"Error in MCP tool: {e}")
raise e raise e
def handle_multiple_mcp_tools(
self,
mcp_url_list: List[str],
response: Union[str, List[Dict[str, Any]]],
current_loop: int = 0,
) -> None:
"""Execute multiple MCP tool calls across configured servers."""
payloads = extract_json_from_response(response)
for payload in payloads:
function_name = payload.get("function_name")
server_url = payload.get("server_url")
arguments = payload.get("payload", {})
if server_url not in mcp_url_list:
logger.warning(
f"Server URL {server_url} not in configured MCP URL list"
)
continue
attempt = 0
while attempt < 3:
try:
tool_response = asyncio.run(
execute_mcp_call(
function_name=function_name,
server_url=server_url,
payload=arguments,
)
)
self.short_memory.add(
role="Tool Executor",
content=str(tool_response),
)
self.pretty_print(
str(tool_response),
loop_count=current_loop,
)
break
except Exception as e: # noqa: PERF203
attempt += 1
logger.error(
f"Error executing {function_name} on {server_url}: {e}"
)
time.sleep(1)
def temp_llm_instance_for_tool_summary(self): def temp_llm_instance_for_tool_summary(self):
return LiteLLM( return LiteLLM(
model_name=self.model_name, model_name=self.model_name,

@ -30,6 +30,7 @@ from swarms.tools.json_utils import base_model_to_json
from swarms.tools.mcp_client_call import ( from swarms.tools.mcp_client_call import (
execute_tool_call_simple, execute_tool_call_simple,
_execute_tool_call_simple, _execute_tool_call_simple,
execute_mcp_call,
get_tools_for_multiple_mcp_servers, get_tools_for_multiple_mcp_servers,
get_mcp_tools_sync, get_mcp_tools_sync,
aget_mcp_tools, aget_mcp_tools,
@ -59,6 +60,7 @@ __all__ = [
"base_model_to_json", "base_model_to_json",
"execute_tool_call_simple", "execute_tool_call_simple",
"_execute_tool_call_simple", "_execute_tool_call_simple",
"execute_mcp_call",
"get_tools_for_multiple_mcp_servers", "get_tools_for_multiple_mcp_servers",
"get_mcp_tools_sync", "get_mcp_tools_sync",
"aget_mcp_tools", "aget_mcp_tools",

@ -502,3 +502,74 @@ async def execute_tool_call_simple(
*args, *args,
**kwargs, **kwargs,
) )
async def execute_mcp_call(
function_name: str,
server_url: str,
payload: Dict[str, Any],
connection: Optional[MCPConnection] = None,
output_type: Literal["json", "dict", "str"] = "str",
*args,
**kwargs,
) -> Any:
"""Execute a specific MCP tool call on a server.
Parameters
----------
function_name: str
Name of the MCP tool to execute.
server_url: str
URL of the MCP server.
payload: Dict[str, Any]
Arguments to pass to the MCP tool.
connection: Optional[MCPConnection]
Optional connection configuration.
output_type: str
Output formatting type.
"""
if exists(connection):
headers, timeout, _transport, url = connect_to_mcp_server(
connection
)
else:
headers, timeout, _transport, url = None, 5, None, server_url
try:
async with sse_client(
url=url, headers=headers, timeout=timeout, *args, **kwargs
) as (
read,
write,
):
async with ClientSession(read, write) as session:
await session.initialize()
req = MCPCallToolRequestParams(
name=function_name, arguments=payload
)
result = await call_mcp_tool(
session=session, call_tool_request_params=req
)
if output_type == "json":
return result.model_dump_json(indent=4)
if output_type == "dict":
return result.model_dump()
data = result.model_dump()
formatted_lines = []
for key, value in data.items():
if isinstance(value, list):
for item in value:
if isinstance(item, dict):
for k, v in item.items():
formatted_lines.append(
f"{k}: {v}"
)
else:
formatted_lines.append(f"{key}: {value}")
return "\n".join(formatted_lines)
except Exception as e:
logger.error(f"Error executing MCP call: {e}")
raise MCPExecutionError(f"Failed to execute MCP call: {e}")

@ -0,0 +1,50 @@
import asyncio
import json
from swarms.structs.agent import Agent, extract_json_from_response
from swarms.structs.agent import execute_mcp_call
from unittest.mock import patch
def test_handle_multiple_mcp_tools():
agent = Agent(agent_name="Test", llm=None, max_loops=1)
urls = ["http://server1", "http://server2"]
payloads = [
{
"function_name": "tool1",
"server_url": "http://server1",
"payload": {"a": 1},
},
{
"function_name": "tool2",
"server_url": "http://server2",
"payload": {},
},
]
called = []
async def fake_exec(
function_name, server_url, payload, *args, **kwargs
):
called.append((function_name, server_url, payload))
return "ok"
with patch(
"swarms.structs.agent.execute_mcp_call", side_effect=fake_exec
):
agent.handle_multiple_mcp_tools(urls, json.dumps(payloads))
assert called == [
("tool1", "http://server1", {"a": 1}),
("tool2", "http://server2", {}),
]
def test_extract_json_from_response():
payloads = [
{"function_name": "foo", "server_url": "http://x", "payload": {"x": 1}}
]
text = "Random text" + json.dumps(payloads) + " end"
result = extract_json_from_response(text)
assert result == payloads
Loading…
Cancel
Save