diff --git a/docs/examples/multi_mcp_execution.md b/docs/examples/multi_mcp_execution.md new file mode 100644 index 00000000..31997a09 --- /dev/null +++ b/docs/examples/multi_mcp_execution.md @@ -0,0 +1,34 @@ +# Multi MCP Execution Example + +This example demonstrates using a list of MCP servers with an `Agent`. + +Start the example servers in separate terminals: + +```bash +python examples/tools/mcp_examples/servers/weather_server.py +python examples/tools/mcp_examples/servers/news_server.py +``` + +```python +import os +import json +from swarms import Agent + +# Configure multiple MCP URLs +os.environ["MCP_URLS"] = "http://localhost:8000/sse,http://localhost:9001/sse" + +agent = Agent( + agent_name="Multi-MCP-Agent", + model_name="gpt-4o-mini", + max_loops=1, +) + +# Example JSON payloads produced by your model +response = json.dumps([ + {"function_name": "get_weather", "server_url": "http://localhost:8000/sse", "payload": {"city": "London"}}, + {"function_name": "get_news", "server_url": "http://localhost:9001/sse", "payload": {"topic": "ai"}}, +]) + +agent.handle_multiple_mcp_tools(agent.mcp_urls, response) + +``` diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 9b1a95e8..b0500964 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -361,6 +361,7 @@ nav: - Medical Swarm: "swarms/examples/swarms_api_medical.md" - Finance Swarm: "swarms/examples/swarms_api_finance.md" - ML Model Code Generation Swarm: "swarms/examples/swarms_api_ml_model.md" + - Multi-MCP Execution Example: "examples/multi_mcp_execution.md" # - Swarm Models: # - Overview: "swarms/models/index.md" diff --git a/docs/swarms/structs/agent.md b/docs/swarms/structs/agent.md index 6b533ab4..b024d4ad 100644 --- a/docs/swarms/structs/agent.md +++ b/docs/swarms/structs/agent.md @@ -478,6 +478,51 @@ agent.model_dump_json() print(agent.to_toml()) ``` +### Multi-MCP Tool Execution + +Execute tools from multiple MCP servers by providing a list of URLs via the +`mcp_urls` parameter or the `MCP_URLS` environment variable. + +Start the example servers: + +```bash +python examples/tools/mcp_examples/servers/weather_server.py +python examples/tools/mcp_examples/servers/news_server.py +``` + +```python +import os +import json +from swarms import Agent + +# Using an environment variable for server configuration +os.environ["MCP_URLS"] = "http://localhost:8000/sse,http://localhost:9001/sse" + +agent = Agent( + agent_name="Multi-MCP-Agent", + model_name="gpt-4o-mini", + max_loops=1, +) + +# Example MCP payloads returned by your model +mcp_response = json.dumps([ + + { + "function_name": "get_price", + "server_url": "http://localhost:8000/sse", + "payload": {"symbol": "BTC"}, + }, + { + "function_name": "market_sentiment", + "server_url": "http://localhost:9001/sse", + "payload": {"symbol": "BTC"}, + }, +]) + +agent.handle_multiple_mcp_tools(agent.mcp_urls, mcp_response) + +``` + ## Auto Generate Prompt + CPU Execution diff --git a/docs/swarms/structs/agent_mcp.md b/docs/swarms/structs/agent_mcp.md index a7c0a2c6..e6c691b8 100644 --- a/docs/swarms/structs/agent_mcp.md +++ b/docs/swarms/structs/agent_mcp.md @@ -62,7 +62,7 @@ The **Model Context Protocol (MCP)** integration enables Swarms agents to dynami | Feature | Status | Expected | |---------|--------|----------| | **MCPConnection Model** | 🚧 Development | Q1 2024 | - | **Multiple Server Support** | 🚧 Planned | Q2 2024 | + | **Multiple Server Support** | ✅ Ready | - | | **Parallel Function Calling** | 🚧 Research | Q2 2024 | | **Auto-discovery** | 🚧 Planned | Q3 2024 | @@ -125,6 +125,34 @@ The **Model Context Protocol (MCP)** integration enables Swarms agents to dynami ) ``` +--- + +### MCP Payload Format + +When the agent generates a function call for an MCP tool it should return a +payload in the following format: + +```json +[ + { + "function_name": "tool_name", + "server_url": "http://server:8000/sse", + "payload": {"arg": "value"} + } +] +``` + +Use `handle_multiple_mcp_tools` to execute each payload across the configured +servers. + +Example servers can be started with: + +```bash +python examples/tools/mcp_examples/servers/weather_server.py +python examples/tools/mcp_examples/servers/news_server.py +``` + + --- ## Integration Flow @@ -214,7 +242,10 @@ graph TD agent_name="Crypto-Trading-Agent", agent_description="Real-time cryptocurrency market analyzer", max_loops=2, - mcp_url="http://crypto-server:8000/sse", + mcp_urls=[ + "http://crypto-server:8000/sse", + "http://backup-server:8001/sse", + ], output_type="json", temperature=0.1, ) @@ -406,19 +437,17 @@ graph TD mcp_url = "http://server:8000/sse" ``` - ### 🚧 Single Server Limitation - - Currently supports one server per agent: - + ### ✅ Multiple Server Support + + Agents can now connect to multiple MCP servers. Provide a list of URLs via + the `mcp_urls` parameter or set the environment variable `MCP_URLS`. + ```python - # ❌ Multiple servers not supported - mcp_servers = [ + mcp_urls = [ "http://server1:8000/sse", - "http://server2:8000/sse" + "http://server2:8000/sse", ] - - # ✅ Single server only - mcp_url = "http://primary-server:8000/sse" + agent = Agent(mcp_urls=mcp_urls) ``` ### 🚧 Sequential Execution @@ -688,7 +717,7 @@ graph TD - **Connection Pooling** - Improved performance === "2 Week" - - **Multiple Server Support** - Connect to multiple MCPs + - **Multiple Server Support** - *Completed* - **Parallel Execution** - Concurrent tool calling - **Load Balancing** - Distribute requests across servers - **Advanced Monitoring** - Real-time metrics diff --git a/examples/tools/mcp_examples/servers/news_server.py b/examples/tools/mcp_examples/servers/news_server.py new file mode 100644 index 00000000..584f25cc --- /dev/null +++ b/examples/tools/mcp_examples/servers/news_server.py @@ -0,0 +1,12 @@ +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP("NewsServer") + +mcp.settings.port = 9001 + +@mcp.tool(name="get_news", description="Return simple news headline") +def get_news(topic: str) -> str: + return f"Latest {topic} news headline" + +if __name__ == "__main__": + mcp.run(transport="sse") diff --git a/examples/tools/mcp_examples/servers/weather_server.py b/examples/tools/mcp_examples/servers/weather_server.py new file mode 100644 index 00000000..93825785 --- /dev/null +++ b/examples/tools/mcp_examples/servers/weather_server.py @@ -0,0 +1,12 @@ +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP("WeatherServer") + +mcp.settings.port = 8000 + +@mcp.tool(name="get_weather", description="Return simple weather info") +def get_weather(city: str) -> str: + return f"Weather in {city}: Sunny 22°C" + +if __name__ == "__main__": + mcp.run(transport="sse") diff --git a/examples/tools/mcp_examples/tests/test_multi_mcp_demo.py b/examples/tools/mcp_examples/tests/test_multi_mcp_demo.py new file mode 100644 index 00000000..c6a66a72 --- /dev/null +++ b/examples/tools/mcp_examples/tests/test_multi_mcp_demo.py @@ -0,0 +1,73 @@ +import os +import json +from swarms import Agent +import io +import sys +from contextlib import redirect_stdout + +print("\n=== Testing Multiple MCP Tool Execution ===\n") + +# Configure multiple MCP URLs +os.environ["MCP_URLS"] = "http://localhost:8000/sse,http://localhost:9001/sse" + +def capture_output(func): + """Capture printed output from a function""" + f = io.StringIO() + with redirect_stdout(f): + func() + return f.getvalue() + +def test_direct_tool_execution(): + """Test directly executing tools on different MCP servers""" + print("Testing direct tool execution...\n") + + agent = Agent( + agent_name="Multi-MCP-Agent", + model_name="gpt-4o-mini", + max_loops=1 + ) + + # Create JSON payloads for multiple tools + payloads = [ + { + "function_name": "get_weather", + "server_url": "http://localhost:8000/sse", + "payload": {"city": "Paris"} + }, + { + "function_name": "get_news", + "server_url": "http://localhost:9001/sse", + "payload": {"topic": "science"} + } + ] + + # Execute the tools and capture output + print("Executing tools on multiple MCP servers...") + output = capture_output( + lambda: agent.handle_multiple_mcp_tools(agent.mcp_urls, json.dumps(payloads)) + ) + + # Extract and display results + print("\nResults from MCP tools:") + print(output) + + print("\nTest complete - Multiple MCP execution successful!") + +def test_agent_configuration(): + """Test different ways to configure agents with multiple MCP URLs""" + print("\n=== Testing Agent MCP Configuration Methods ===\n") + + # Method 1: Configure via environment variables (already set above) + agent1 = Agent(agent_name="Env-Config-Agent") + print(f"Agent1 MCP URLs (from env): {agent1.mcp_urls}") + + # Method 2: Configure via direct parameter + agent2 = Agent( + agent_name="Direct-Config-Agent", + mcp_urls=["http://localhost:8000/sse", "http://localhost:9001/sse"] + ) + print(f"Agent2 MCP URLs (from param): {agent2.mcp_urls}") + +if __name__ == "__main__": + test_agent_configuration() + test_direct_tool_execution() diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index dce3c2c2..b0dbc90e 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1,5 +1,6 @@ import asyncio import json +import re import logging import os import random @@ -74,6 +75,7 @@ from swarms.structs.ma_utils import set_random_models_for_agents from swarms.tools.mcp_client_call import ( execute_tool_call_simple, get_mcp_tools_sync, + execute_mcp_call, ) from swarms.schemas.mcp_schemas import ( MCPConnection, @@ -98,6 +100,25 @@ def parse_done_token(response: str) -> bool: return "" in response +def extract_json_from_response(response: str) -> List[Dict[str, Any]]: + """Extract a JSON list from a model response string.""" + if not response: + return [] + if not isinstance(response, str): + return response if isinstance(response, list) else [] + try: + return json.loads(response) + except json.JSONDecodeError: + match = re.search(r"\[[\s\S]*?\]", response) + if match: + try: + return json.loads(match.group(0)) + except json.JSONDecodeError: + pass + logger.error("Failed to parse MCP payloads from response") + return [] + + # Agent ID generator def agent_id(): """Generate an agent id""" @@ -442,6 +463,8 @@ class Agent: self.sop = sop self.sop_list = sop_list self.tools = tools + # Ensure tool_struct exists even when no tools are provided + self.tool_struct = {} self.system_prompt = system_prompt self.agent_name = agent_name self.agent_description = agent_description @@ -537,7 +560,19 @@ class Agent: self.no_print = no_print self.tools_list_dictionary = tools_list_dictionary self.mcp_url = mcp_url - self.mcp_urls = mcp_urls + if mcp_urls is None: + env_urls = os.getenv("MCP_URLS") + self.mcp_urls = ( + [ + url.strip() + for url in env_urls.split(",") + if url.strip() + ] + if env_urls + else None + ) + else: + self.mcp_urls = mcp_urls self.react_on = react_on self.safety_prompt_on = safety_prompt_on self.random_models_on = random_models_on @@ -1084,6 +1119,19 @@ class Agent: response, loop_count ) + if exists(self.mcp_urls): + try: + self.handle_multiple_mcp_tools( + self.mcp_urls, + response, + current_loop=loop_count, + ) + + except Exception as e: + logger.error( + f"Error handling multiple MCP tools: {e}" + ) + self.execute_tools( response=response, loop_count=loop_count, @@ -2786,6 +2834,53 @@ class Agent: logger.error(f"Error in MCP tool: {e}") raise e + def handle_multiple_mcp_tools( + self, + mcp_url_list: List[str], + response: Union[str, List[Dict[str, Any]]], + current_loop: int = 0, + ) -> None: + """Execute multiple MCP tool calls across configured servers.""" + + payloads = extract_json_from_response(response) + for payload in payloads: + + function_name = payload.get("function_name") + server_url = payload.get("server_url") + arguments = payload.get("payload", {}) + + if server_url not in mcp_url_list: + logger.warning( + f"Server URL {server_url} not in configured MCP URL list" + ) + continue + + attempt = 0 + while attempt < 3: + try: + tool_response = asyncio.run( + execute_mcp_call( + function_name=function_name, + server_url=server_url, + payload=arguments, + ) + ) + self.short_memory.add( + role="Tool Executor", + content=str(tool_response), + ) + self.pretty_print( + str(tool_response), + loop_count=current_loop, + ) + break + except Exception as e: # noqa: PERF203 + attempt += 1 + logger.error( + f"Error executing {function_name} on {server_url}: {e}" + ) + time.sleep(1) + def temp_llm_instance_for_tool_summary(self): return LiteLLM( model_name=self.model_name, diff --git a/swarms/tools/__init__.py b/swarms/tools/__init__.py index e6b8032f..e80d4a0d 100644 --- a/swarms/tools/__init__.py +++ b/swarms/tools/__init__.py @@ -30,6 +30,7 @@ from swarms.tools.json_utils import base_model_to_json from swarms.tools.mcp_client_call import ( execute_tool_call_simple, _execute_tool_call_simple, + execute_mcp_call, get_tools_for_multiple_mcp_servers, get_mcp_tools_sync, aget_mcp_tools, @@ -59,6 +60,7 @@ __all__ = [ "base_model_to_json", "execute_tool_call_simple", "_execute_tool_call_simple", + "execute_mcp_call", "get_tools_for_multiple_mcp_servers", "get_mcp_tools_sync", "aget_mcp_tools", diff --git a/swarms/tools/mcp_client_call.py b/swarms/tools/mcp_client_call.py index 25302c78..e79affc4 100644 --- a/swarms/tools/mcp_client_call.py +++ b/swarms/tools/mcp_client_call.py @@ -502,3 +502,74 @@ async def execute_tool_call_simple( *args, **kwargs, ) + + +async def execute_mcp_call( + function_name: str, + server_url: str, + payload: Dict[str, Any], + connection: Optional[MCPConnection] = None, + output_type: Literal["json", "dict", "str"] = "str", + *args, + **kwargs, +) -> Any: + """Execute a specific MCP tool call on a server. + + Parameters + ---------- + function_name: str + Name of the MCP tool to execute. + server_url: str + URL of the MCP server. + payload: Dict[str, Any] + Arguments to pass to the MCP tool. + connection: Optional[MCPConnection] + Optional connection configuration. + output_type: str + Output formatting type. + """ + + if exists(connection): + headers, timeout, _transport, url = connect_to_mcp_server( + connection + ) + else: + headers, timeout, _transport, url = None, 5, None, server_url + + try: + async with sse_client( + url=url, headers=headers, timeout=timeout, *args, **kwargs + ) as ( + read, + write, + ): + async with ClientSession(read, write) as session: + await session.initialize() + req = MCPCallToolRequestParams( + name=function_name, arguments=payload + ) + result = await call_mcp_tool( + session=session, call_tool_request_params=req + ) + + if output_type == "json": + return result.model_dump_json(indent=4) + if output_type == "dict": + return result.model_dump() + + data = result.model_dump() + formatted_lines = [] + for key, value in data.items(): + if isinstance(value, list): + for item in value: + if isinstance(item, dict): + for k, v in item.items(): + formatted_lines.append( + f"{k}: {v}" + ) + else: + formatted_lines.append(f"{key}: {value}") + return "\n".join(formatted_lines) + except Exception as e: + logger.error(f"Error executing MCP call: {e}") + raise MCPExecutionError(f"Failed to execute MCP call: {e}") diff --git a/tests/structs/test_multi_mcp.py b/tests/structs/test_multi_mcp.py new file mode 100644 index 00000000..df9272be --- /dev/null +++ b/tests/structs/test_multi_mcp.py @@ -0,0 +1,50 @@ +import asyncio +import json +from swarms.structs.agent import Agent, extract_json_from_response + +from swarms.structs.agent import execute_mcp_call +from unittest.mock import patch + + +def test_handle_multiple_mcp_tools(): + agent = Agent(agent_name="Test", llm=None, max_loops=1) + urls = ["http://server1", "http://server2"] + payloads = [ + { + "function_name": "tool1", + "server_url": "http://server1", + "payload": {"a": 1}, + }, + { + "function_name": "tool2", + "server_url": "http://server2", + "payload": {}, + }, + ] + called = [] + + async def fake_exec( + function_name, server_url, payload, *args, **kwargs + ): + called.append((function_name, server_url, payload)) + return "ok" + + with patch( + "swarms.structs.agent.execute_mcp_call", side_effect=fake_exec + ): + agent.handle_multiple_mcp_tools(urls, json.dumps(payloads)) + + assert called == [ + ("tool1", "http://server1", {"a": 1}), + ("tool2", "http://server2", {}), + ] + + +def test_extract_json_from_response(): + payloads = [ + {"function_name": "foo", "server_url": "http://x", "payload": {"x": 1}} + ] + text = "Random text" + json.dumps(payloads) + " end" + result = extract_json_from_response(text) + assert result == payloads +