From ea66e78154f5ef7e70a5e90ff68ce1e5225ae5b8 Mon Sep 17 00:00:00 2001 From: Pavan Kumar <66913595+ascender1729@users.noreply.github.com> Date: Sun, 20 Apr 2025 16:26:05 +0000 Subject: [PATCH] fix(mcp): resolve async execution flow issues in client and agent --- swarms/structs/agent.py | 94 ++++++-------------------------------- swarms/tools/mcp_client.py | 15 +++--- 2 files changed, 19 insertions(+), 90 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index fc4af738..5cceabc7 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -255,7 +255,7 @@ class Agent: run_async_concurrent: Run the agent asynchronously and concurrently run_async_concurrent: Run the agent asynchronously and concurrently construct_dynamic_prompt: Construct the dynamic prompt - handle_artifacts: Handle artifacts + handle_artifacts Examples: @@ -841,6 +841,8 @@ class Agent: f"Initializing Agent: {self.agent_name}" ) + ) + data = self.to_dict() # Beautify the data @@ -2645,88 +2647,18 @@ class Agent: else: return str(response) - def mcp_execution_flow(self, response: str) -> str: - """Forward tool calls to MCP servers with support for various input formats. - - Args: - response (str): The response from the LLM containing tool calls or natural language. - - Returns: - str: The result of executing the tool calls with preserved formatting. - """ + async def mcp_execution_flow(self, tool_call): + """Execute MCP tool call flow""" try: - # Try to parse as JSON first - try: - tool_calls = json.loads(response) - is_json = True - except json.JSONDecodeError: - # If not JSON, treat as natural language - tool_calls = [response] - is_json = False - - # Execute tool calls against MCP servers - results = [] - errors = [] - - # Handle both single tool call and array of tool calls - if isinstance(tool_calls, dict): - tool_calls = [tool_calls] - - for tool_call in tool_calls: - try: - # Execute the tool call against all MCP servers - result = batch_mcp_flow(self.mcp_servers, tool_call) - if result: - results.extend(result) - # Add successful result to memory with context - self.short_memory.add( - role="assistant", - content=f"Tool execution result: {result}" - ) - else: - error_msg = "No result from tool execution" - errors.append(error_msg) - self.short_memory.add( - role="error", - content=error_msg - ) - - except Exception as e: - error_msg = f"Error executing tool call: {str(e)}" - errors.append(error_msg) - logger.error(error_msg) - self.short_memory.add( - role="error", - content=error_msg - ) - - # Format the final response - if results: - if len(results) == 1: - # For single results, return as is to preserve formatting - return results[0] - else: - # For multiple results, combine with context - formatted_results = [] - for i, result in enumerate(results, 1): - formatted_results.append(f"Result {i}: {result}") - return "\n".join(formatted_results) - elif errors: - if len(errors) == 1: - return errors[0] - else: - return "Multiple errors occurred:\n" + "\n".join(f"- {err}" for err in errors) - else: - return "No results or errors returned" - - except Exception as e: - error_msg = f"Error in MCP execution flow: {str(e)}" - logger.error(error_msg) - self.short_memory.add( - role="error", - content=error_msg + result = await execute_mcp_tool( + url=self.mcp_servers[0]["url"], + parameters=tool_call, + output_type="str", ) - return error_msg + return result + except Exception as e: + logger.error(f"Error executing tool call: {e}") + return f"Error executing tool call: {e}" def sentiment_and_evaluator(self, response: str): if self.evaluator: diff --git a/swarms/tools/mcp_client.py b/swarms/tools/mcp_client.py index 5d25b33d..c424b925 100644 --- a/swarms/tools/mcp_client.py +++ b/swarms/tools/mcp_client.py @@ -73,19 +73,16 @@ async def _execute_mcp_tool( raise ValueError(f"Invalid output type: {output_type}") -def execute_mcp_tool( +async def execute_mcp_tool( url: str, tool_name: str = None, method: Literal["stdio", "sse"] = "sse", parameters: Dict[Any, Any] = None, output_type: Literal["str", "dict"] = "str", ) -> Dict[Any, Any]: - return asyncio.run( - _execute_mcp_tool( - url=url, - tool_name=tool_name, - method=method, - parameters=parameters, - output_type=output_type, - ) + return await _execute_mcp_tool( + url=url, + method=method, + parameters=parameters, + output_type=output_type, )