diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 5275f25d..6444933b 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -30,6 +30,7 @@ from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3 from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, ) +from swarms.tools.mcp_client_call import aget_mcp_tools from swarms.prompts.tools import tool_sop_prompt from swarms.schemas.agent_mcp_errors import ( AgentMCPConnectionError, @@ -433,6 +434,7 @@ class Agent: summarize_multiple_images: bool = False, tool_retry_attempts: int = 3, speed_mode: str = None, + lazy_init_mcp: bool = False, *args, **kwargs, ): @@ -621,6 +623,37 @@ class Agent: self.print_dashboard() self.reliability_check() + self.lazy_init_mcp = lazy_init_mcp + self._mcp_tools_loaded = False + + @classmethod + async def create(cls, **kwargs): + """ + Asynchronously creates an Agent instance. + + This is the preferred way to create an Agent that uses MCP tools + when running in an async context (like inside FastAPI, Quart, etc.) + + Args: + **kwargs: All parameters accepted by Agent.__init__ + + Returns: + An initialized Agent instance with MCP tools loaded + """ + # 创建带有延迟初始化标志的实例 + instance = cls(lazy_init_mcp=True, **kwargs) + + # 异步加载 MCP 工具(如果配置了) + if exists(instance.mcp_url) or exists(instance.mcp_urls) or exists(instance.mcp_config): + await instance.async_init_mcp_tools() + + # 完成初始化 LLM + if instance.llm is None: + # 使用异步转换方式运行同步函数 + instance.llm = await asyncio.to_thread(instance.llm_handling) + + return instance + def rag_setup_handling(self): return AgentRAGHandler( @@ -774,22 +807,21 @@ class Agent: This function checks for either a single MCP URL or multiple MCP URLs and adds the available tools to the agent's memory. The tools are listed in JSON format. - - Raises: - Exception: If there's an error accessing the MCP tools """ + # 如果工具已经加载过且处于懒加载模式,直接返回已缓存的工具 + if hasattr(self, '_mcp_tools_loaded') and self._mcp_tools_loaded and self.tools_list_dictionary is not None: + return self.tools_list_dictionary + try: if exists(self.mcp_url): tools = get_mcp_tools_sync(server_path=self.mcp_url) elif exists(self.mcp_config): tools = get_mcp_tools_sync(connection=self.mcp_config) - # logger.info(f"Tools: {tools}") elif exists(self.mcp_urls): tools = get_tools_for_multiple_mcp_servers( urls=self.mcp_urls, output_type="str", ) - # print(f"Tools: {tools} for {self.mcp_urls}") else: raise AgentMCPConnectionError( "mcp_url must be either a string URL or MCPConnection object" @@ -799,18 +831,71 @@ class Agent: exists(self.mcp_url) or exists(self.mcp_urls) or exists(self.mcp_config) - ): - if self.print_on is True: - self.pretty_print( - f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')} ✨", - loop_count=0, - ) + ) and self.print_on is True: + self.pretty_print( + f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')} ✨", + loop_count=0, + ) + # 标记工具已加载并保存 + self._mcp_tools_loaded = True + self.tools_list_dictionary = tools return tools except AgentMCPConnectionError as e: logger.error(f"Error in MCP connection: {e}") raise e + async def async_init_mcp_tools(self): + """ + Asynchronously initialize MCP tools. + + This method should be used when the agent is created in an async context + to avoid event loop conflicts. + + Returns: + The list of MCP tools + """ + # 如果工具已加载,直接返回 + if hasattr(self, '_mcp_tools_loaded') and self._mcp_tools_loaded and self.tools_list_dictionary is not None: + return self.tools_list_dictionary + + try: + if exists(self.mcp_url): + tools = await aget_mcp_tools(server_path=self.mcp_url, format="openai") + elif exists(self.mcp_config): + tools = await aget_mcp_tools(connection=self.mcp_config, format="openai") + elif exists(self.mcp_urls): + # 使用异步转换方式运行同步函数 + tools = await asyncio.to_thread( + get_tools_for_multiple_mcp_servers, + urls=self.mcp_urls, + output_type="str" + ) + else: + raise AgentMCPConnectionError( + "mcp_url must be either a string URL or MCPConnection object" + ) + + if ( + exists(self.mcp_url) + or exists(self.mcp_urls) + or exists(self.mcp_config) + ) and self.print_on is True: + # 使用异步转换方式运行同步函数 + await asyncio.to_thread( + self.pretty_print, + f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')} ✨", + loop_count=0 + ) + + # 标记工具已加载并保存 + self._mcp_tools_loaded = True + self.tools_list_dictionary = tools + return tools + except Exception as e: + logger.error(f"Error in async MCP tools initialization: {e}") + raise AgentMCPConnectionError(f"Failed to initialize MCP tools: {str(e)}") + def setup_config(self): # The max_loops will be set dynamically if the dynamic_loop if self.dynamic_loops is True: @@ -996,6 +1081,7 @@ class Agent: self, task: Optional[Union[str, Any]] = None, img: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> Any: @@ -1077,6 +1163,7 @@ class Agent: task=task_prompt, img=img, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) @@ -1084,6 +1171,7 @@ class Agent: response = self.call_llm( task=task_prompt, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) @@ -1110,6 +1198,8 @@ class Agent: f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", loop_count, ) + elif self.streaming_on: + pass else: self.pretty_print( response, loop_count @@ -1239,12 +1329,13 @@ class Agent: traceback_info = traceback.format_exc() logger.error( - f"Error detected running your agent {self.agent_name}\n" + f"An error occurred while running your agent {self.agent_name}.\n" f"Error Type: {error_type}\n" f"Error Message: {error_message}\n" f"Traceback:\n{traceback_info}\n" f"Agent State: {self.to_dict()}\n" - f"Optimize your input parameters and or add an issue on the swarms github and contact our team on discord for support ;)" + f"Please optimize your input parameters, or create an issue on the Swarms GitHub and contact our team on Discord for support. " + f"For technical support, refer to this document: https://docs.swarms.world/en/latest/swarms/support/" ) raise error @@ -1269,26 +1360,19 @@ class Agent: ) -> Any: """ Asynchronously runs the agent with the specified parameters. - - Args: - task (Optional[str]): The task to be performed. Defaults to None. - img (Optional[str]): The image to be processed. Defaults to None. - is_last (bool): Indicates if this is the last task. Defaults to False. - device (str): The device to use for execution. Defaults to "cpu". - device_id (int): The ID of the GPU to use if device is set to "gpu". Defaults to 1. - all_cores (bool): If True, uses all available CPU cores. Defaults to True. - do_not_use_cluster_ops (bool): If True, does not use cluster operations. Defaults to True. - all_gpus (bool): If True, uses all available GPUs. Defaults to False. - *args: Additional positional arguments. - **kwargs: Additional keyword arguments. - - Returns: - Any: The result of the asynchronous operation. - - Raises: - Exception: If an error occurs during the asynchronous operation. + + Enhanced to support proper async initialization of MCP tools if needed. """ try: + # 如果需要且尚未加载 MCP 工具,先进行异步初始化 + if (exists(self.mcp_url) or exists(self.mcp_urls) or exists(self.mcp_config)) and \ + not (hasattr(self, '_mcp_tools_loaded') and self._mcp_tools_loaded): + await self.async_init_mcp_tools() + # 确保 LLM 已初始化并加载了工具 + if self.llm is None: + self.llm = await asyncio.to_thread(self.llm_handling) + + # 使用原来的方式调用同步 run 函数 return await asyncio.to_thread( self.run, task=task, @@ -1297,9 +1381,7 @@ class Agent: **kwargs, ) except Exception as error: - await self._handle_run_error( - error - ) # Ensure this is also async if needed + await self._handle_run_error(error) def __call__( self, @@ -1334,37 +1416,6 @@ class Agent: ) return self.run(task=improved_prompt, *args, **kwargs) - # def parse_and_execute_tools(self, response: str, *args, **kwargs): - # max_retries = 3 # Maximum number of retries - # retries = 0 - # while retries < max_retries: - # try: - # logger.info("Executing tool...") - - # # try to Execute the tool and return a string - # out = parse_and_execute_json( - # functions=self.tools, - # json_string=response, - # parse_md=True, - # *args, - # **kwargs, - # ) - # logger.info(f"Tool Output: {out}") - # # Add the output to the memory - # # self.short_memory.add( - # # role="Tool Executor", - # # content=out, - # # ) - # return out - # except Exception as error: - # retries += 1 - # logger.error( - # f"Attempt {retries}: Error executing tool: {error}" - # ) - # if retries == max_retries: - # raise error - # time.sleep(1) # Wait for a bit before retrying - def add_memory(self, message: str): """Add a memory to the agent @@ -1539,15 +1590,16 @@ class Agent: if self.tools_list_dictionary is not None: if not supports_function_calling(self.model_name): - raise AgentInitializationError( + logger.warning( f"The model '{self.model_name}' does not support function calling. Please use a model that supports function calling." ) try: if self.max_tokens > get_max_tokens(self.model_name): - raise AgentInitializationError( + logger.warning( f"Max tokens is set to {self.max_tokens}, but the model '{self.model_name}' only supports {get_max_tokens(self.model_name)} tokens. Please set max tokens to {get_max_tokens(self.model_name)} or less." ) + except Exception: pass @@ -2503,6 +2555,7 @@ class Agent: task: str, img: Optional[str] = None, current_loop: int = 0, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> str: @@ -2513,6 +2566,7 @@ class Agent: task (str): The task to be performed by the `llm` object. img (str, optional): Path or URL to an image file. audio (str, optional): Path or URL to an audio file. + streaming_callback (Optional[Callable[[str], None]]): Callback function to receive streaming tokens in real-time. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. @@ -2548,8 +2602,24 @@ class Agent: if hasattr( streaming_response, "__iter__" ) and not isinstance(streaming_response, str): + # Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration) + if streaming_callback is not None: + # Real-time callback streaming for dashboard integration + chunks = [] + for chunk in streaming_response: + if ( + hasattr(chunk, "choices") + and chunk.choices[0].delta.content + ): + content = chunk.choices[ + 0 + ].delta.content + chunks.append(content) + # Call the streaming callback with the new chunk + streaming_callback(content) + complete_response = "".join(chunks) # Check print_on parameter for different streaming behaviors - if self.print_on is False: + elif self.print_on is False: # Silent streaming - no printing, just collect chunks chunks = [] for chunk in streaming_response: @@ -2632,6 +2702,7 @@ class Agent: img: Optional[str] = None, imgs: Optional[List[str]] = None, correct_answer: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> Any: @@ -2646,6 +2717,7 @@ class Agent: task (Optional[str], optional): The task to be executed. Defaults to None. img (Optional[str], optional): The image to be processed. Defaults to None. imgs (Optional[List[str]], optional): The list of images to be processed. Defaults to None. + streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None. *args: Additional positional arguments to be passed to the execution method. **kwargs: Additional keyword arguments to be passed to the execution method. @@ -2677,15 +2749,25 @@ class Agent: output = self._run( task=task, img=img, + streaming_callback=streaming_callback, *args, **kwargs, ) return output - except ValueError as e: + except AgentRunError as e: self._handle_run_error(e) + except KeyboardInterrupt: + logger.warning( + f"Keyboard interrupt detected for agent '{self.agent_name}'. " + "If autosave is enabled, the agent's state will be saved to the workspace directory. " + "To enable autosave, please initialize the agent with Agent(autosave=True)." + "For technical support, refer to this document: https://docs.swarms.world/en/latest/swarms/support/" + ) + raise KeyboardInterrupt + def handle_artifacts( self, text: str, file_output_path: str, file_extension: str ) -> None: @@ -2824,6 +2906,9 @@ class Agent: if response is None: response = "No response generated" + if self.streaming_on: + pass + if self.print_on: formatter.print_panel( response, @@ -3225,19 +3310,9 @@ class Agent: f"Agent '{self.agent_name}' received None response from LLM in loop {loop_count}. " f"This may indicate an issue with the model or prompt. Skipping tool execution." ) - except Exception as e: + except AgentToolExecutionError as e: logger.error( f"Agent '{self.agent_name}' encountered error during tool execution in loop {loop_count}: {str(e)}. " f"Full traceback: {traceback.format_exc()}. " f"Attempting to retry tool execution with 3 attempts" - ) - - def add_tool_schema(self, tool_schema: dict): - self.tools_list_dictionary = [tool_schema] - - self.output_type = "dict-all-except-first" - - def add_multiple_tool_schemas(self, tool_schemas: list[dict]): - self.tools_list_dictionary = tool_schemas - - self.output_type = "dict-all-except-first" + ) \ No newline at end of file diff --git a/swarms/tools/mcp_client_call.py b/swarms/tools/mcp_client_call.py index 3fa3a9fa..1867c264 100644 --- a/swarms/tools/mcp_client_call.py +++ b/swarms/tools/mcp_client_call.py @@ -1,16 +1,35 @@ -import os import asyncio import contextlib import json +import os import random +from concurrent.futures import ThreadPoolExecutor, as_completed from functools import wraps from typing import Any, Dict, List, Literal, Optional, Union -from concurrent.futures import ThreadPoolExecutor, as_completed from litellm.types.utils import ChatCompletionMessageToolCall from loguru import logger from mcp import ClientSession from mcp.client.sse import sse_client + +# Try to import nest_asyncio if available +try: + import nest_asyncio + HAS_NEST_ASYNCIO = True + logger.debug("nest_asyncio is available and will be used for nested event loops") +except ImportError: + HAS_NEST_ASYNCIO = False + logger.debug("nest_asyncio is not available, will use alternative methods for nested event loops") + +try: + from mcp.client.streamable_http import streamablehttp_client + HAS_STREAMABLE_HTTP = True +except ImportError: + HAS_STREAMABLE_HTTP = False + logger.error( + "streamablehttp_client is not available. Please ensure the MCP SDK is up to date with pip3 install -U mcp" + ) + from mcp.types import ( CallToolRequestParams as MCPCallToolRequestParams, ) @@ -20,50 +39,55 @@ from openai.types.chat import ChatCompletionToolParam from openai.types.shared_params.function_definition import ( FunctionDefinition, ) - from swarms.schemas.mcp_schemas import ( MCPConnection, ) from swarms.utils.index import exists +from urllib.parse import urlparse class MCPError(Exception): """Base exception for MCP related errors.""" - pass class MCPConnectionError(MCPError): """Raised when there are issues connecting to the MCP server.""" - pass class MCPToolError(MCPError): """Raised when there are issues with MCP tool operations.""" - pass class MCPValidationError(MCPError): """Raised when there are validation issues with MCP operations.""" - pass class MCPExecutionError(MCPError): """Raised when there are issues executing MCP operations.""" - pass ######################################################## # List MCP Tool functions ######################################################## + def transform_mcp_tool_to_openai_tool( mcp_tool: MCPTool, ) -> ChatCompletionToolParam: - """Convert an MCP tool to an OpenAI tool.""" + """ + Convert an MCP tool to an OpenAI tool. + Args: + mcp_tool (MCPTool): The MCP tool object. + Returns: + ChatCompletionToolParam: The OpenAI-compatible tool parameter. + """ + logger.info( + f"Transforming MCP tool '{mcp_tool.name}' to OpenAI tool format." + ) return ChatCompletionToolParam( type="function", function=FunctionDefinition( @@ -79,15 +103,14 @@ async def load_mcp_tools( session: ClientSession, format: Literal["mcp", "openai"] = "mcp" ) -> Union[List[MCPTool], List[ChatCompletionToolParam]]: """ - Load all available MCP tools - + Load all available MCP tools from the session. Args: - session: The MCP session to use - format: The format to convert the tools to - By default, the tools are returned in MCP format. - - If format is set to "openai", the tools are converted to OpenAI API compatible tools. + session (ClientSession): The MCP session to use. + format (Literal["mcp", "openai"]): The format to convert the tools to. + Returns: + List of tools in the specified format. """ + logger.info(f"Loading MCP tools with format '{format}'.") tools = await session.list_tools() if format == "openai": return [ @@ -101,21 +124,32 @@ async def load_mcp_tools( # Call MCP Tool functions ######################################################## - async def call_mcp_tool( session: ClientSession, call_tool_request_params: MCPCallToolRequestParams, ) -> MCPCallToolResult: - """Call an MCP tool.""" - tool_result = await session.call_tool( + """ + Call an MCP tool using the provided session and request parameters. + Args: + session (ClientSession): The MCP session to use. + call_tool_request_params (MCPCallToolRequestParams): The tool call request params. + Returns: + MCPCallToolResult: The result of the tool call. + """ + return await session.call_tool( name=call_tool_request_params.name, arguments=call_tool_request_params.arguments, ) - return tool_result def _get_function_arguments(function: FunctionDefinition) -> dict: - """Helper to safely get and parse function arguments.""" + """ + Helper to safely get and parse function arguments from a function definition. + Args: + function (FunctionDefinition): The function definition. + Returns: + dict: Parsed arguments as a dictionary. + """ arguments = function.get("arguments", {}) if isinstance(arguments, str): try: @@ -128,7 +162,13 @@ def _get_function_arguments(function: FunctionDefinition) -> dict: def transform_openai_tool_call_request_to_mcp_tool_call_request( openai_tool: Union[ChatCompletionMessageToolCall, Dict], ) -> MCPCallToolRequestParams: - """Convert an OpenAI ChatCompletionMessageToolCall to an MCP CallToolRequestParams.""" + """ + Convert an OpenAI ChatCompletionMessageToolCall to an MCP CallToolRequestParams. + Args: + openai_tool (Union[ChatCompletionMessageToolCall, Dict]): The OpenAI tool call request. + Returns: + MCPCallToolRequestParams: The MCP tool call request params. + """ function = openai_tool["function"] return MCPCallToolRequestParams( name=function["name"], @@ -142,12 +182,11 @@ async def call_openai_tool( ) -> MCPCallToolResult: """ Call an OpenAI tool using MCP client. - Args: - session: The MCP session to use - openai_tool: The OpenAI tool to call. You can get this from the `choices[0].message.tool_calls[0]` of the response from the OpenAI API. + session (ClientSession): The MCP session to use. + openai_tool (dict): The OpenAI tool to call. Returns: - The result of the MCP tool call. + MCPCallToolResult: The result of the MCP tool call. """ mcp_tool_call_request_params = ( transform_openai_tool_call_request_to_mcp_tool_call_request( @@ -161,8 +200,14 @@ async def call_openai_tool( def retry_with_backoff(retries=3, backoff_in_seconds=1): - """Decorator for retrying functions with exponential backoff.""" - + """ + Decorator for retrying async functions with exponential backoff. + Args: + retries (int): Number of retry attempts. + backoff_in_seconds (int): Initial backoff time in seconds. + Returns: + Decorated async function with retry logic. + """ def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): @@ -185,52 +230,86 @@ def retry_with_backoff(retries=3, backoff_in_seconds=1): ) await asyncio.sleep(sleep_time) x += 1 - return wrapper - return decorator +def _run_in_new_thread(func, *args, **kwargs): + """Run a coroutine function in a new thread with its own event loop.""" + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(_run_in_new_loop, func, *args, **kwargs) + return future.result() + + +def _run_in_new_loop(func, *args, **kwargs): + """Run a coroutine function in a new event loop.""" + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + return loop.run_until_complete(func(*args, **kwargs)) + finally: + loop.close() + + @contextlib.contextmanager def get_or_create_event_loop(): - """Context manager to handle event loop creation and cleanup.""" + """Context manager to handle event loop creation and cleanup with better handling of running loops.""" try: loop = asyncio.get_event_loop() + loop_was_running = loop.is_running() + + # If loop is running and nest_asyncio is available, apply it + if loop_was_running and HAS_NEST_ASYNCIO: + nest_asyncio.apply(loop) + logger.debug("Applied nest_asyncio to running event loop") + created_new = False + # If loop is running and nest_asyncio is not available, create a new loop + elif loop_was_running: + logger.debug("Event loop is already running, creating new loop") + loop = asyncio.new_event_loop() + created_new = True + else: + created_new = False except RuntimeError: + logger.debug("No event loop found, creating new one") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - + created_new = True + loop_was_running = False + try: yield loop finally: - # Only close the loop if we created it and it's not the main event loop - if loop != asyncio.get_event_loop() and not loop.is_running(): + # Only close the loop if we created a new one and it's not running + if created_new and not loop.is_running(): if not loop.is_closed(): loop.close() def connect_to_mcp_server(connection: MCPConnection = None): - """Connect to an MCP server. - + """ + Connect to an MCP server using the provided connection configuration. Args: - connection (MCPConnection): The connection configuration object - + connection (MCPConnection): The connection configuration object. Returns: - tuple: A tuple containing (headers, timeout, transport, url) - + tuple: (headers, timeout, transport, url) Raises: - MCPValidationError: If the connection object is invalid + MCPValidationError: If the connection object is invalid. """ + logger.info( + "Connecting to MCP server using MCPConnection object." + ) if not isinstance(connection, MCPConnection): + logger.error( + "Invalid connection type provided to connect_to_mcp_server." + ) raise MCPValidationError("Invalid connection type") - - # Direct attribute access is faster than property access headers = dict(connection.headers or {}) if connection.authorization_token: headers["Authorization"] = ( f"Bearer {connection.authorization_token}" ) - return ( headers, connection.timeout or 5, @@ -239,31 +318,104 @@ def connect_to_mcp_server(connection: MCPConnection = None): ) +def get_mcp_client(transport, url, headers=None, timeout=5, **kwargs): + """ + Helper to select the correct MCP client context manager based on transport. + Supports 'sse' (default) and 'streamable_http'. + Args: + transport (str): The transport type ('sse' or 'streamable_http'). + url (str): The server URL. + headers (dict): Optional headers. + timeout (int): Timeout in seconds. + **kwargs: Additional arguments. + Returns: + Context manager for the selected client. + Raises: + ImportError: If streamablehttp_client is not available when requested. + """ + logger.info( + f"Getting MCP client for transport '{transport}' and url '{url}'." + ) + if transport == "streamable_http": + if not HAS_STREAMABLE_HTTP: + logger.error("streamablehttp_client is not available.") + raise ImportError( + "streamablehttp_client is not available. Please ensure the MCP SDK is up to date." + ) + return streamablehttp_client( + url, headers=headers, timeout=timeout, **kwargs + ) + else: + return sse_client( + url, headers=headers, timeout=timeout, **kwargs + ) + + +def auto_detect_transport(url: str) -> str: + """ + Guess the MCP transport based on the URL scheme and path. + Does not make any network requests. + Returns one of: 'streamable_http', 'sse', or 'stdio'. + Args: + url (str): The server URL. + Returns: + str: The detected transport type. + """ + parsed = urlparse(url) + scheme = parsed.scheme.lower() + if scheme in ("http", "https"): + logger.info( + f"Automatically selected 'streamable_http' transport for {url}" + ) + return "streamable_http" + elif scheme in ("ws", "wss"): + logger.info( + f"Automatically selected 'sse' transport for {url}" + ) + return "sse" # or 'websocket' if you support it + elif "stdio" in url or scheme == "": + logger.info( + f"Automatically selected 'stdio' transport for {url}" + ) + return "stdio" + else: + logger.info(f"Defaulting to 'sse' transport for {url}") + return "sse" + + @retry_with_backoff(retries=3) async def aget_mcp_tools( server_path: Optional[str] = None, format: str = "openai", connection: Optional[MCPConnection] = None, + transport: Optional[str] = None, *args, **kwargs, ) -> List[Dict[str, Any]]: """ Fetch available MCP tools from the server with retry logic. - Args: - server_path (str): Path to the MCP server script - + server_path (str): Path to the MCP server script. + format (str): Format to return tools in ('openai' or 'mcp'). + connection (Optional[MCPConnection]): Optional connection object. + transport (Optional[str]): Transport type. If None, auto-detects. Returns: - List[Dict[str, Any]]: List of available MCP tools in OpenAI format - + List[Dict[str, Any]]: List of available MCP tools in OpenAI format. Raises: - MCPValidationError: If server_path is invalid - MCPConnectionError: If connection to server fails + MCPValidationError: If server_path is invalid. + MCPConnectionError: If connection to server fails. """ + logger.info( + f"aget_mcp_tools called for server_path: {server_path}" + ) + if transport is None: + transport = auto_detect_transport(server_path) if exists(connection): - headers, timeout, transport, url = connect_to_mcp_server( - connection + headers, timeout, transport_from_conn, url = ( + connect_to_mcp_server(connection) ) + if transport_from_conn: + transport = transport_from_conn else: headers, timeout, _transport, _url = ( None, @@ -271,20 +423,25 @@ async def aget_mcp_tools( None, server_path, ) + url = server_path - logger.info(f"Fetching MCP tools from server: {server_path}") + logger.info( + f"Fetching MCP tools from server: {server_path} using transport: {transport}" + ) try: - async with sse_client( - url=server_path, + async with get_mcp_client( + transport, + url=url, headers=headers, timeout=timeout, *args, **kwargs, - ) as ( - read, - write, - ): + ) as ctx: + if len(ctx) == 2: + read, write = ctx + else: + read, write, *_ = ctx async with ClientSession(read, write) as session: await session.initialize() tools = await load_mcp_tools( @@ -305,51 +462,112 @@ def get_mcp_tools_sync( server_path: Optional[str] = None, format: str = "openai", connection: Optional[MCPConnection] = None, + transport: Optional[str] = None, *args, **kwargs, ) -> List[Dict[str, Any]]: """ Synchronous version of get_mcp_tools that handles event loop management. - + Improved to handle cases where the event loop is already running. Args: - server_path (str): Path to the MCP server script - + server_path (str): Path to the MCP server script. + format (str): Format to return tools in ('openai' or 'mcp'). + connection (Optional[MCPConnection]): Optional connection object. + transport (Optional[str]): Transport type. If None, auto-detects. Returns: - List[Dict[str, Any]]: List of available MCP tools in OpenAI format - + List[Dict[str, Any]]: List of available MCP tools in requested format. Raises: - MCPValidationError: If server_path is invalid - MCPConnectionError: If connection to server fails - MCPExecutionError: If event loop management fails + MCPValidationError: If server_path is invalid. + MCPConnectionError: If connection to server fails. + MCPExecutionError: If event loop management fails. """ - with get_or_create_event_loop() as loop: + logger.info( + f"get_mcp_tools_sync called for server_path: {server_path}" + ) + + if transport is None: + transport = auto_detect_transport(server_path) + + try: + # Check if we're in a running event loop try: + loop = asyncio.get_event_loop() + loop_is_running = loop.is_running() + except RuntimeError: + loop_is_running = False + loop = None + + # If loop is already running and nest_asyncio is available, use it + if loop_is_running and HAS_NEST_ASYNCIO: + logger.debug("Using nest_asyncio with running event loop") + nest_asyncio.apply(loop) return loop.run_until_complete( aget_mcp_tools( server_path=server_path, format=format, connection=connection, + transport=transport, *args, **kwargs, ) ) - except Exception as e: - logger.error(f"Error in get_mcp_tools_sync: {str(e)}") - raise MCPExecutionError( - f"Failed to execute MCP tools sync: {str(e)}" + # If loop is running but nest_asyncio not available, use thread + elif loop_is_running: + logger.debug("Event loop is running, executing in separate thread") + return _run_in_new_thread( + aget_mcp_tools, + server_path=server_path, + format=format, + connection=connection, + transport=transport, + *args, + **kwargs, ) + # Standard case: no running loop or we're not in an event loop + else: + logger.debug("Using standard event loop management") + with get_or_create_event_loop() as loop: + return loop.run_until_complete( + aget_mcp_tools( + server_path=server_path, + format=format, + connection=connection, + transport=transport, + *args, + **kwargs, + ) + ) + except Exception as e: + logger.error(f"Error in get_mcp_tools_sync: {str(e)}") + raise MCPExecutionError( + f"Failed to execute MCP tools sync: {str(e)}" + ) def _fetch_tools_for_server( url: str, connection: Optional[MCPConnection] = None, format: str = "openai", + transport: Optional[str] = None, ) -> List[Dict[str, Any]]: - """Helper function to fetch tools for a single server.""" + """ + Helper function to fetch tools for a single server. + Args: + url (str): The server URL. + connection (Optional[MCPConnection]): Optional connection object. + format (str): Format to return tools in. + transport (Optional[str]): Transport type. If None, auto-detects. + Returns: + List[Dict[str, Any]]: List of available MCP tools. + """ + logger.info(f"_fetch_tools_for_server called for url: {url}") + if transport is None: + transport = auto_detect_transport(url) return get_mcp_tools_sync( server_path=url, connection=connection, format=format, + transport=transport, ) @@ -359,44 +577,55 @@ def get_tools_for_multiple_mcp_servers( format: str = "openai", output_type: Literal["json", "dict", "str"] = "str", max_workers: Optional[int] = None, + transport: Optional[str] = None, ) -> List[Dict[str, Any]]: - """Get tools for multiple MCP servers concurrently using ThreadPoolExecutor. - + """ + Get tools for multiple MCP servers concurrently using ThreadPoolExecutor. Args: - urls: List of server URLs to fetch tools from - connections: Optional list of MCPConnection objects corresponding to each URL - format: Format to return tools in (default: "openai") - output_type: Type of output format (default: "str") - max_workers: Maximum number of worker threads (default: None, uses min(32, os.cpu_count() + 4)) - + urls (List[str]): List of server URLs to fetch tools from. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + format (str): Format to return tools in. + output_type (Literal): Output format type. + max_workers (Optional[int]): Max worker threads. + transport (Optional[str]): Transport type. If None, auto-detects per URL. Returns: - List[Dict[str, Any]]: Combined list of tools from all servers + List[Dict[str, Any]]: Combined list of tools from all servers. """ + logger.info( + f"get_tools_for_multiple_mcp_servers called for {len(urls)} urls." + ) tools = [] - ( + + max_workers = ( min(32, os.cpu_count() + 4) if max_workers is None else max_workers ) + with ThreadPoolExecutor(max_workers=max_workers) as executor: if exists(connections): - # Create future tasks for each URL-connection pair future_to_url = { executor.submit( - _fetch_tools_for_server, url, connection, format + _fetch_tools_for_server, + url, + connection, + format, + transport, ): url for url, connection in zip(urls, connections) } else: - # Create future tasks for each URL without connections future_to_url = { executor.submit( - _fetch_tools_for_server, url, None, format + _fetch_tools_for_server, + url, + None, + format, + transport, ): url for url in urls } - # Process completed futures as they come in for future in as_completed(future_to_url): url = future_to_url[future] try: @@ -418,14 +647,34 @@ async def _execute_tool_call_simple( server_path: str = None, connection: Optional[MCPConnection] = None, output_type: Literal["json", "dict", "str"] = "str", + transport: Optional[str] = None, *args, **kwargs, ): - """Execute a tool call using the MCP client.""" + """ + Execute a tool call using the MCP client, supporting both SSE and streamable HTTP. + Args: + response (any): The tool call request. + server_path (str): The server URL. + connection (Optional[MCPConnection]): Optional connection object. + output_type (Literal): Output format type. + transport (Optional[str]): Transport type. If None, auto-detects. + Returns: + The tool call result in the specified output format. + Raises: + MCPExecutionError, MCPConnectionError + """ + logger.info( + f"_execute_tool_call_simple called for server_path: {server_path}" + ) + if transport is None: + transport = auto_detect_transport(server_path) if exists(connection): - headers, timeout, transport, url = connect_to_mcp_server( - connection + headers, timeout, transport_from_conn, url = ( + connect_to_mcp_server(connection) ) + if transport_from_conn: + transport = transport_from_conn else: headers, timeout, _transport, url = ( None, @@ -435,21 +684,24 @@ async def _execute_tool_call_simple( ) try: - async with sse_client( - url=url, headers=headers, timeout=timeout, *args, **kwargs - ) as ( - read, - write, - ): + async with get_mcp_client( + transport, + url=url, + headers=headers, + timeout=timeout, + *args, + **kwargs, + ) as ctx: + if len(ctx) == 2: + read, write = ctx + else: + read, write, *_ = ctx async with ClientSession(read, write) as session: try: await session.initialize() - call_result = await call_openai_tool( - session=session, - openai_tool=response, + session=session, openai_tool=response ) - if output_type == "json": out = call_result.model_dump_json(indent=4) elif output_type == "dict": @@ -470,19 +722,21 @@ async def _execute_tool_call_simple( f"{key}: {value}" ) out = "\n".join(formatted_lines) - + else: + out = call_result.model_dump() + logger.info( + f"Tool call executed successfully for {server_path}" + ) return out - except Exception as e: logger.error(f"Error in tool execution: {str(e)}") raise MCPExecutionError( - f"Tool execution failed: {str(e)}" + f"Tool execution failed for tool '{getattr(response, 'function', {}).get('name', 'unknown')}' on server '{url}': {str(e)}" ) - except Exception as e: - logger.error(f"Error in SSE client connection: {str(e)}") + logger.error(f"Error in MCP client connection: {str(e)}") raise MCPConnectionError( - f"Failed to connect to MCP server: {str(e)}" + f"Failed to connect to MCP server '{url}' using transport '{transport}': {str(e)}" ) @@ -491,17 +745,34 @@ async def execute_tool_call_simple( server_path: str = None, connection: Optional[MCPConnection] = None, output_type: Literal["json", "dict", "str", "formatted"] = "str", + transport: Optional[str] = None, *args, **kwargs, ) -> List[Dict[str, Any]]: + """ + High-level async function to execute a tool call on an MCP server. + Args: + response (any): The tool call request. + server_path (str): The server URL. + connection (Optional[MCPConnection]): Optional connection object. + output_type (Literal): Output format type. + transport (Optional[str]): Transport type. If None, auto-detects. + Returns: + The tool call result in the specified output format. + """ + logger.info( + f"execute_tool_call_simple called for server_path: {server_path}" + ) + if transport is None: + transport = auto_detect_transport(server_path) if isinstance(response, str): response = json.loads(response) - return await _execute_tool_call_simple( response=response, server_path=server_path, connection=connection, output_type=output_type, + transport=transport, *args, **kwargs, ) @@ -511,36 +782,32 @@ def _create_server_tool_mapping( urls: List[str], connections: List[MCPConnection] = None, format: str = "openai", + transport: Optional[str] = None, ) -> Dict[str, Dict[str, Any]]: """ Create a mapping of function names to server information for all MCP servers. - Args: - urls: List of server URLs - connections: Optional list of MCPConnection objects - format: Format to fetch tools in - + urls (List[str]): List of server URLs. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + format (str): Format to fetch tools in. + transport (Optional[str]): Transport type. If None, auto-detects per URL. Returns: - Dict mapping function names to server info (url, connection, tool) + Dict[str, Dict[str, Any]]: Mapping of function names to server info. """ server_tool_mapping = {} - for i, url in enumerate(urls): connection = ( connections[i] if connections and i < len(connections) else None ) - try: - # Get tools for this server tools = get_mcp_tools_sync( server_path=url, connection=connection, format=format, + transport=transport, ) - - # Create mapping for each tool for tool in tools: if isinstance(tool, dict) and "function" in tool: function_name = tool["function"]["name"] @@ -551,20 +818,17 @@ def _create_server_tool_mapping( "server_index": i, } elif hasattr(tool, "name"): - # Handle MCPTool objects server_tool_mapping[tool.name] = { "url": url, "connection": connection, "tool": tool, "server_index": i, } - except Exception as e: logger.warning( f"Failed to fetch tools from server {url}: {str(e)}" ) continue - return server_tool_mapping @@ -572,36 +836,37 @@ async def _create_server_tool_mapping_async( urls: List[str], connections: List[MCPConnection] = None, format: str = "openai", + transport: Optional[str] = None, ) -> Dict[str, Dict[str, Any]]: """ Async version: Create a mapping of function names to server information for all MCP servers. - Args: - urls: List of server URLs - connections: Optional list of MCPConnection objects - format: Format to fetch tools in - + urls (List[str]): List of server URLs. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + format (str): Format to fetch tools in. + transport (Optional[str]): Transport type. If None, auto-detects per URL. Returns: - Dict mapping function names to server info (url, connection, tool) + Dict[str, Dict[str, Any]]: Mapping of function names to server info. """ server_tool_mapping = {} - for i, url in enumerate(urls): connection = ( connections[i] if connections and i < len(connections) else None ) - try: - # Get tools for this server using async function + if transport is None: + transport_to_use = auto_detect_transport(url) + else: + transport_to_use = transport + tools = await aget_mcp_tools( server_path=url, connection=connection, format=format, + transport=transport_to_use, ) - - # Create mapping for each tool for tool in tools: if isinstance(tool, dict) and "function" in tool: function_name = tool["function"]["name"] @@ -612,20 +877,17 @@ async def _create_server_tool_mapping_async( "server_index": i, } elif hasattr(tool, "name"): - # Handle MCPTool objects server_tool_mapping[tool.name] = { "url": url, "connection": connection, "tool": tool, "server_index": i, } - except Exception as e: logger.warning( f"Failed to fetch tools from server {url}: {str(e)}" ) continue - return server_tool_mapping @@ -633,17 +895,17 @@ async def _execute_tool_on_server( tool_call: Dict[str, Any], server_info: Dict[str, Any], output_type: Literal["json", "dict", "str", "formatted"] = "str", + transport: Optional[str] = None, ) -> Dict[str, Any]: """ Execute a single tool call on a specific server. - Args: - tool_call: The tool call to execute - server_info: Server information from the mapping - output_type: Output format type - + tool_call (Dict[str, Any]): The tool call to execute. + server_info (Dict[str, Any]): Server information from the mapping. + output_type (Literal): Output format type. + transport (Optional[str]): Transport type. If None, auto-detects. Returns: - Execution result with server metadata + Dict[str, Any]: Execution result with server metadata. """ try: result = await _execute_tool_call_simple( @@ -651,8 +913,8 @@ async def _execute_tool_on_server( server_path=server_info["url"], connection=server_info["connection"], output_type=output_type, + transport=transport, ) - return { "server_url": server_info["url"], "server_index": server_info["server_index"], @@ -662,7 +924,6 @@ async def _execute_tool_on_server( "result": result, "status": "success", } - except Exception as e: logger.error( f"Failed to execute tool on server {server_info['url']}: {str(e)}" @@ -674,7 +935,7 @@ async def _execute_tool_on_server( "name", "unknown" ), "result": None, - "error": str(e), + "error": f"Custom error: Failed to execute tool '{tool_call.get('function', {}).get('name', 'unknown')}' on server '{server_info['url']}': {str(e)}", "status": "error", } @@ -685,49 +946,21 @@ async def execute_multiple_tools_on_multiple_mcp_servers( connections: List[MCPConnection] = None, output_type: Literal["json", "dict", "str", "formatted"] = "str", max_concurrent: Optional[int] = None, + transport: Optional[str] = None, *args, **kwargs, ) -> List[Dict[str, Any]]: """ Execute multiple tool calls across multiple MCP servers. - - This function creates a mapping of function names to servers, then for each response - that contains tool calls, it finds the appropriate server for each function and - executes the calls concurrently. - Args: - responses: List of responses containing tool calls (OpenAI format) - urls: List of MCP server URLs - connections: Optional list of MCPConnection objects corresponding to each URL - output_type: Output format type for results - max_concurrent: Maximum number of concurrent executions (default: len(responses)) - + responses (List[Dict[str, Any]]): List of tool call requests. + urls (List[str]): List of server URLs. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + output_type (Literal): Output format type. + max_concurrent (Optional[int]): Max concurrent tasks. + transport (Optional[str]): Transport type. If None, auto-detects per URL. Returns: - List of execution results with server metadata - - Example: - # Example responses format: - responses = [ - { - "function": { - "name": "search_web", - "arguments": {"query": "python programming"} - } - }, - { - "function": { - "name": "search_database", - "arguments": {"table": "users", "id": 123} - } - } - ] - - urls = ["http://server1:8000", "http://server2:8000"] - - results = await execute_multiple_tools_on_multiple_mcp_servers( - responses=responses, - urls=urls - ) + List[Dict[str, Any]]: List of execution results. """ if not responses: logger.warning("No responses provided for execution") @@ -736,10 +969,14 @@ async def execute_multiple_tools_on_multiple_mcp_servers( if not urls: raise MCPValidationError("No server URLs provided") - # Create mapping of function names to servers using async version - logger.info(f"Creating tool mapping for {len(urls)} servers") + logger.info( + f"Creating tool mapping for {len(urls)} servers" + ) server_tool_mapping = await _create_server_tool_mapping_async( - urls=urls, connections=connections, format="openai" + urls=urls, + connections=connections, + format="openai", + transport=transport, ) if not server_tool_mapping: @@ -751,13 +988,11 @@ async def execute_multiple_tools_on_multiple_mcp_servers( f"Found {len(server_tool_mapping)} unique functions across all servers" ) - # Extract all tool calls from responses all_tool_calls = [] logger.info( f"Processing {len(responses)} responses for tool call extraction" ) - # Check if responses are individual characters that need to be reconstructed if len(responses) > 10 and all( isinstance(r, str) and len(r) == 1 for r in responses ): @@ -772,8 +1007,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.debug( f"Reconstructed response: {reconstructed_response}" ) - - # Try to parse the reconstructed response to validate it try: json.loads(reconstructed_response) logger.info( @@ -789,7 +1022,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.debug( f"Last 100 chars: {reconstructed_response[-100:]}" ) - responses = [reconstructed_response] except Exception as e: logger.warning( @@ -800,8 +1032,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.debug( f"Processing response {i}: {type(response)} - {response}" ) - - # Handle JSON string responses if isinstance(response, str): try: response = json.loads(response) @@ -815,12 +1045,10 @@ async def execute_multiple_tools_on_multiple_mcp_servers( continue if isinstance(response, dict): - # Single tool call if "function" in response: logger.debug( f"Found single tool call in response {i}: {response['function']}" ) - # Parse arguments if they're a JSON string if isinstance( response["function"].get("arguments"), str ): @@ -837,15 +1065,12 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.warning( f"Failed to parse function arguments: {response['function']['arguments']}" ) - all_tool_calls.append((i, response)) - # Multiple tool calls elif "tool_calls" in response: logger.debug( f"Found multiple tool calls in response {i}: {len(response['tool_calls'])} calls" ) for tool_call in response["tool_calls"]: - # Parse arguments if they're a JSON string if isinstance( tool_call.get("function", {}).get( "arguments" @@ -865,14 +1090,11 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.warning( f"Failed to parse tool call arguments: {tool_call['function']['arguments']}" ) - all_tool_calls.append((i, tool_call)) - # Direct tool call elif "name" in response and "arguments" in response: logger.debug( f"Found direct tool call in response {i}: {response}" ) - # Parse arguments if they're a JSON string if isinstance(response.get("arguments"), str): try: response["arguments"] = json.loads( @@ -885,7 +1107,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.warning( f"Failed to parse direct tool call arguments: {response['arguments']}" ) - all_tool_calls.append((i, {"function": response})) else: logger.debug( @@ -903,7 +1124,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.info(f"Found {len(all_tool_calls)} tool calls to execute") - # Execute tool calls concurrently max_concurrent = max_concurrent or len(all_tool_calls) semaphore = asyncio.Semaphore(max_concurrent) @@ -913,7 +1133,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( function_name = tool_call.get("function", {}).get( "name", "unknown" ) - if function_name not in server_tool_mapping: logger.warning( f"Function '{function_name}' not found on any server" @@ -931,18 +1150,18 @@ async def execute_multiple_tools_on_multiple_mcp_servers( tool_call=tool_call, server_info=server_info, output_type=output_type, + transport=transport, ) result["response_index"] = response_index return result - # Execute all tool calls concurrently tasks = [ execute_with_semaphore(tool_call_info) for tool_call_info in all_tool_calls ] + results = await asyncio.gather(*tasks, return_exceptions=True) - # Process results and handle exceptions processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): @@ -977,24 +1196,36 @@ def execute_multiple_tools_on_multiple_mcp_servers_sync( connections: List[MCPConnection] = None, output_type: Literal["json", "dict", "str", "formatted"] = "str", max_concurrent: Optional[int] = None, + transport: Optional[str] = None, *args, **kwargs, ) -> List[Dict[str, Any]]: """ Synchronous version of execute_multiple_tools_on_multiple_mcp_servers. - + Modified to handle running event loops better. Args: - responses: List of responses containing tool calls (OpenAI format) - urls: List of MCP server URLs - connections: Optional list of MCPConnection objects corresponding to each URL - output_type: Output format type for results - max_concurrent: Maximum number of concurrent executions - + responses (List[Dict[str, Any]]): List of tool call requests. + urls (List[str]): List of server URLs. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + output_type (Literal): Output format type. + max_concurrent (Optional[int]): Max concurrent tasks. + transport (Optional[str]): Transport type. If None, auto-detects per URL. Returns: - List of execution results with server metadata + List[Dict[str, Any]]: List of execution results. """ - with get_or_create_event_loop() as loop: + try: + # Check if we're in a running event loop try: + loop = asyncio.get_event_loop() + loop_is_running = loop.is_running() + except RuntimeError: + loop_is_running = False + loop = None + + # If loop is already running and nest_asyncio is available, use it + if loop_is_running and HAS_NEST_ASYNCIO: + logger.debug("Using nest_asyncio with running event loop for multiple tools") + nest_asyncio.apply(loop) return loop.run_until_complete( execute_multiple_tools_on_multiple_mcp_servers( responses=responses, @@ -1002,14 +1233,45 @@ def execute_multiple_tools_on_multiple_mcp_servers_sync( connections=connections, output_type=output_type, max_concurrent=max_concurrent, + transport=transport, *args, **kwargs, ) ) - except Exception as e: - logger.error( - f"Error in execute_multiple_tools_on_multiple_mcp_servers_sync: {str(e)}" - ) - raise MCPExecutionError( - f"Failed to execute multiple tools sync: {str(e)}" + # If loop is running but nest_asyncio not available, use thread + elif loop_is_running: + logger.debug("Event loop is running, executing multiple tools in separate thread") + return _run_in_new_thread( + execute_multiple_tools_on_multiple_mcp_servers, + responses=responses, + urls=urls, + connections=connections, + output_type=output_type, + max_concurrent=max_concurrent, + transport=transport, + *args, + **kwargs, ) + # Standard case: no running loop or we're not in an event loop + else: + logger.debug("Using standard event loop management for multiple tools") + with get_or_create_event_loop() as loop: + return loop.run_until_complete( + execute_multiple_tools_on_multiple_mcp_servers( + responses=responses, + urls=urls, + connections=connections, + output_type=output_type, + max_concurrent=max_concurrent, + transport=transport, + *args, + **kwargs, + ) + ) + except Exception as e: + logger.error( + f"Error in execute_multiple_tools_on_multiple_mcp_servers_sync: {str(e)}" + ) + raise MCPExecutionError( + f"Failed to execute multiple tools sync: {str(e)}" + ) \ No newline at end of file