fix(mcp): resolve async execution flow issues in client and agent

pull/819/head
Pavan Kumar 3 months ago committed by ascender1729
parent d9f1144d36
commit ea66e78154

@ -255,7 +255,7 @@ class Agent:
run_async_concurrent: Run the agent asynchronously and concurrently
run_async_concurrent: Run the agent asynchronously and concurrently
construct_dynamic_prompt: Construct the dynamic prompt
handle_artifacts: Handle artifacts
handle_artifacts
Examples:
@ -841,6 +841,8 @@ class Agent:
f"Initializing Agent: {self.agent_name}"
)
)
data = self.to_dict()
# Beautify the data
@ -2645,88 +2647,18 @@ class Agent:
else:
return str(response)
def mcp_execution_flow(self, response: str) -> str:
"""Forward tool calls to MCP servers with support for various input formats.
Args:
response (str): The response from the LLM containing tool calls or natural language.
Returns:
str: The result of executing the tool calls with preserved formatting.
"""
async def mcp_execution_flow(self, tool_call):
"""Execute MCP tool call flow"""
try:
# Try to parse as JSON first
try:
tool_calls = json.loads(response)
is_json = True
except json.JSONDecodeError:
# If not JSON, treat as natural language
tool_calls = [response]
is_json = False
# Execute tool calls against MCP servers
results = []
errors = []
# Handle both single tool call and array of tool calls
if isinstance(tool_calls, dict):
tool_calls = [tool_calls]
for tool_call in tool_calls:
try:
# Execute the tool call against all MCP servers
result = batch_mcp_flow(self.mcp_servers, tool_call)
if result:
results.extend(result)
# Add successful result to memory with context
self.short_memory.add(
role="assistant",
content=f"Tool execution result: {result}"
)
else:
error_msg = "No result from tool execution"
errors.append(error_msg)
self.short_memory.add(
role="error",
content=error_msg
)
except Exception as e:
error_msg = f"Error executing tool call: {str(e)}"
errors.append(error_msg)
logger.error(error_msg)
self.short_memory.add(
role="error",
content=error_msg
)
# Format the final response
if results:
if len(results) == 1:
# For single results, return as is to preserve formatting
return results[0]
else:
# For multiple results, combine with context
formatted_results = []
for i, result in enumerate(results, 1):
formatted_results.append(f"Result {i}: {result}")
return "\n".join(formatted_results)
elif errors:
if len(errors) == 1:
return errors[0]
else:
return "Multiple errors occurred:\n" + "\n".join(f"- {err}" for err in errors)
else:
return "No results or errors returned"
except Exception as e:
error_msg = f"Error in MCP execution flow: {str(e)}"
logger.error(error_msg)
self.short_memory.add(
role="error",
content=error_msg
result = await execute_mcp_tool(
url=self.mcp_servers[0]["url"],
parameters=tool_call,
output_type="str",
)
return error_msg
return result
except Exception as e:
logger.error(f"Error executing tool call: {e}")
return f"Error executing tool call: {e}"
def sentiment_and_evaluator(self, response: str):
if self.evaluator:

@ -73,19 +73,16 @@ async def _execute_mcp_tool(
raise ValueError(f"Invalid output type: {output_type}")
def execute_mcp_tool(
async def execute_mcp_tool(
url: str,
tool_name: str = None,
method: Literal["stdio", "sse"] = "sse",
parameters: Dict[Any, Any] = None,
output_type: Literal["str", "dict"] = "str",
) -> Dict[Any, Any]:
return asyncio.run(
_execute_mcp_tool(
url=url,
tool_name=tool_name,
method=method,
parameters=parameters,
output_type=output_type,
)
return await _execute_mcp_tool(
url=url,
method=method,
parameters=parameters,
output_type=output_type,
)

Loading…
Cancel
Save