feat: support multi mcp and direct model init

pull/891/head
Pavan Kumar 1 month ago
parent bc50b15ead
commit 5e91a4b08c

@ -0,0 +1,25 @@
# Multi MCP Execution Example
This example demonstrates using a list of MCP servers with an `Agent`.
```python
import os
from swarms import Agent
# Configure multiple MCP URLs
os.environ["MCP_URLS"] = "http://localhost:8000/sse,http://localhost:9001/sse"
agent = Agent(
agent_name="Multi-MCP-Agent",
model_name="gpt-4o-mini",
max_loops=1,
)
# Example payloads produced by your model
payloads = [
{"function_name": "get_weather", "server_url": "http://localhost:8000/sse", "payload": {"city": "London"}},
{"function_name": "get_news", "server_url": "http://localhost:9001/sse", "payload": {"topic": "ai"}},
]
agent.handle_multiple_mcp_tools(agent.mcp_urls, payloads)
```

@ -478,6 +478,41 @@ agent.model_dump_json()
print(agent.to_toml())
```
### Multi-MCP Tool Execution
Execute tools from multiple MCP servers by providing a list of URLs via the
`mcp_urls` parameter or the `MCP_URLS` environment variable.
```python
import os
from swarms import Agent
# Using an environment variable for server configuration
os.environ["MCP_URLS"] = "http://localhost:8000/sse,http://localhost:9001/sse"
agent = Agent(
agent_name="Multi-MCP-Agent",
model_name="gpt-4o-mini",
max_loops=1,
)
# Example MCP payloads returned by your model
mcp_payloads = [
{
"function_name": "get_price",
"server_url": "http://localhost:8000/sse",
"payload": {"symbol": "BTC"},
},
{
"function_name": "market_sentiment",
"server_url": "http://localhost:9001/sse",
"payload": {"symbol": "BTC"},
},
]
agent.handle_multiple_mcp_tools(agent.mcp_urls, mcp_payloads)
```
## Auto Generate Prompt + CPU Execution

@ -62,7 +62,7 @@ The **Model Context Protocol (MCP)** integration enables Swarms agents to dynami
| Feature | Status | Expected |
|---------|--------|----------|
| **MCPConnection Model** | 🚧 Development | Q1 2024 |
| **Multiple Server Support** | 🚧 Planned | Q2 2024 |
| **Multiple Server Support** | ✅ Ready | - |
| **Parallel Function Calling** | 🚧 Research | Q2 2024 |
| **Auto-discovery** | 🚧 Planned | Q3 2024 |
@ -127,6 +127,26 @@ The **Model Context Protocol (MCP)** integration enables Swarms agents to dynami
---
### MCP Payload Format
When the agent generates a function call for an MCP tool it should return a
payload in the following format:
```json
[
{
"function_name": "tool_name",
"server_url": "http://server:8000/sse",
"payload": {"arg": "value"}
}
]
```
Use `handle_multiple_mcp_tools` to execute each payload across the configured
servers.
---
## Integration Flow
The following diagram illustrates the complete MCP integration workflow:
@ -214,7 +234,10 @@ graph TD
agent_name="Crypto-Trading-Agent",
agent_description="Real-time cryptocurrency market analyzer",
max_loops=2,
mcp_url="http://crypto-server:8000/sse",
mcp_urls=[
"http://crypto-server:8000/sse",
"http://backup-server:8001/sse",
],
output_type="json",
temperature=0.1,
)
@ -406,19 +429,17 @@ graph TD
mcp_url = "http://server:8000/sse"
```
### 🚧 Single Server Limitation
Currently supports one server per agent:
### ✅ Multiple Server Support
Agents can now connect to multiple MCP servers. Provide a list of URLs via
the `mcp_urls` parameter or set the environment variable `MCP_URLS`.
```python
# ❌ Multiple servers not supported
mcp_servers = [
mcp_urls = [
"http://server1:8000/sse",
"http://server2:8000/sse"
"http://server2:8000/sse",
]
# ✅ Single server only
mcp_url = "http://primary-server:8000/sse"
agent = Agent(mcp_urls=mcp_urls)
```
### 🚧 Sequential Execution
@ -688,7 +709,7 @@ graph TD
- **Connection Pooling** - Improved performance
=== "2 Week"
- **Multiple Server Support** - Connect to multiple MCPs
- **Multiple Server Support** - *Completed*
- **Parallel Execution** - Concurrent tool calling
- **Load Balancing** - Distribute requests across servers
- **Advanced Monitoring** - Real-time metrics

@ -537,7 +537,19 @@ class Agent:
self.no_print = no_print
self.tools_list_dictionary = tools_list_dictionary
self.mcp_url = mcp_url
self.mcp_urls = mcp_urls
if mcp_urls is None:
env_urls = os.getenv("MCP_URLS")
self.mcp_urls = (
[
url.strip()
for url in env_urls.split(",")
if url.strip()
]
if env_urls
else None
)
else:
self.mcp_urls = mcp_urls
self.react_on = react_on
self.safety_prompt_on = safety_prompt_on
self.random_models_on = random_models_on
@ -1084,6 +1096,25 @@ class Agent:
response, loop_count
)
if exists(self.mcp_urls):
try:
payload = (
json.loads(response)
if isinstance(response, str)
else response
)
if isinstance(payload, list):
self.handle_multiple_mcp_tools(
self.mcp_urls,
payload,
current_loop=loop_count,
)
except Exception as e:
logger.error(
f"Error handling multiple MCP tools: {e}"
)
self.execute_tools(
response=response,
loop_count=loop_count,
@ -2786,6 +2817,45 @@ class Agent:
logger.error(f"Error in MCP tool: {e}")
raise e
def handle_multiple_mcp_tools(
self,
mcp_url_list: List[str],
mcp_payloads: List[Dict[str, Any]],
current_loop: int = 0,
) -> None:
"""Execute a list of MCP tool calls across multiple servers."""
for payload in mcp_payloads:
function_name = payload.get("function_name")
server_url = payload.get("server_url")
arguments = payload.get("payload", {})
if server_url not in mcp_url_list:
logger.warning(
f"Server URL {server_url} not in configured MCP URL list"
)
continue
try:
tool_response = asyncio.run(
execute_mcp_call(
function_name=function_name,
server_url=server_url,
payload=arguments,
)
)
self.short_memory.add(
role="Tool Executor",
content=str(tool_response),
)
self.pretty_print(
str(tool_response), loop_count=current_loop
)
except Exception as e: # noqa: PERF203
logger.error(
f"Error executing {function_name} on {server_url}: {e}"
)
def temp_llm_instance_for_tool_summary(self):
return LiteLLM(
model_name=self.model_name,

@ -30,6 +30,7 @@ from swarms.tools.json_utils import base_model_to_json
from swarms.tools.mcp_client_call import (
execute_tool_call_simple,
_execute_tool_call_simple,
execute_mcp_call,
get_tools_for_multiple_mcp_servers,
get_mcp_tools_sync,
aget_mcp_tools,
@ -59,6 +60,7 @@ __all__ = [
"base_model_to_json",
"execute_tool_call_simple",
"_execute_tool_call_simple",
"execute_mcp_call",
"get_tools_for_multiple_mcp_servers",
"get_mcp_tools_sync",
"aget_mcp_tools",

@ -502,3 +502,74 @@ async def execute_tool_call_simple(
*args,
**kwargs,
)
async def execute_mcp_call(
function_name: str,
server_url: str,
payload: Dict[str, Any],
connection: Optional[MCPConnection] = None,
output_type: Literal["json", "dict", "str"] = "str",
*args,
**kwargs,
) -> Any:
"""Execute a specific MCP tool call on a server.
Parameters
----------
function_name: str
Name of the MCP tool to execute.
server_url: str
URL of the MCP server.
payload: Dict[str, Any]
Arguments to pass to the MCP tool.
connection: Optional[MCPConnection]
Optional connection configuration.
output_type: str
Output formatting type.
"""
if exists(connection):
headers, timeout, _transport, url = connect_to_mcp_server(
connection
)
else:
headers, timeout, _transport, url = None, 5, None, server_url
try:
async with sse_client(
url=url, headers=headers, timeout=timeout, *args, **kwargs
) as (
read,
write,
):
async with ClientSession(read, write) as session:
await session.initialize()
req = MCPCallToolRequestParams(
name=function_name, arguments=payload
)
result = await call_mcp_tool(
session=session, call_tool_request_params=req
)
if output_type == "json":
return result.model_dump_json(indent=4)
if output_type == "dict":
return result.model_dump()
data = result.model_dump()
formatted_lines = []
for key, value in data.items():
if isinstance(value, list):
for item in value:
if isinstance(item, dict):
for k, v in item.items():
formatted_lines.append(
f"{k}: {v}"
)
else:
formatted_lines.append(f"{key}: {value}")
return "\n".join(formatted_lines)
except Exception as e:
logger.error(f"Error executing MCP call: {e}")
raise MCPExecutionError(f"Failed to execute MCP call: {e}")

@ -0,0 +1,38 @@
import asyncio
from swarms.structs.agent import Agent
from swarms.structs.agent import execute_mcp_call
from unittest.mock import patch
def test_handle_multiple_mcp_tools():
agent = Agent(agent_name="Test", llm=None, max_loops=1)
urls = ["http://server1", "http://server2"]
payloads = [
{
"function_name": "tool1",
"server_url": "http://server1",
"payload": {"a": 1},
},
{
"function_name": "tool2",
"server_url": "http://server2",
"payload": {},
},
]
called = []
async def fake_exec(
function_name, server_url, payload, *args, **kwargs
):
called.append((function_name, server_url, payload))
return "ok"
with patch(
"swarms.structs.agent.execute_mcp_call", side_effect=fake_exec
):
agent.handle_multiple_mcp_tools(urls, payloads)
assert called == [
("tool1", "http://server1", {"a": 1}),
("tool2", "http://server2", {}),
]
Loading…
Cancel
Save