fix(agent): resolve communication bug by correcting async operations and context management

pull/819/head
DP37 3 months ago committed by ascender1729
parent ff19580da5
commit 9eeb6256bd

@ -0,0 +1,114 @@
Youve fixed the *RuntimeWarning* by wrapping the coroutine, but now the call never actually reaches the mock server.
Two things are still wrong:
| symptom | real cause |
|---------|------------|
| **`__aenter__`** error from `process_param` | `MCPServerSse` *is* an asynccontextmanager; you must use **`async with`** (or call its `connect/cleanup` explicitly). |
| **“No result returned from MCP server”** | we never awaited `call_tool`, so the request was never sent; the wrapper returned `None`. |
Below is a minimal, fullyworking patch set.
Apply it and the *MathMockServer* will answer correctly (e.g. `add 4 and 19834 → 19838`).
---
## 1  `tools/mcp_integration.py`
```python
# keep the import you already added
import asyncio
# ────────────────────────────────────────────────────────────
# 1. async helper that really talks to one MCP server
async def _call_one_server(param: MCPServerSseParams,
payload: dict[str, any]) -> str:
async with MCPServerSse(param, cache_tools_list=True) as srv:
# ↓ fastmcp returns a pydantic model; make it jsonserialisable
res = await srv.call_tool(payload)
try:
return res.model_dump() # fastmcp ≥0.2
except AttributeError:
return res # plain dict / str
# 2. public sync wrapper what the Agent imports & uses
def batch_mcp_flow(params: list[MCPServerSseParams],
payload: dict[str, any]) -> list[str]:
"""Blocking helper fanout to every MCP server given in *params*."""
return asyncio.run(_batch(params, payload))
# 3. little async fanout
async def _batch(params: list[MCPServerSseParams],
payload: dict[str, any]) -> list[str]:
coros = [_call_one_server(p, payload) for p in params]
return await asyncio.gather(*coros)
```
---
## 2  `structs/agent.py`
Replace **only** the `mcp_execution_flow` method with the synchronous
wrapper that now returns the real value (note: `payload` is a *string* here):
```python
# inside class Agent
def mcp_execution_flow(self, response: str) -> str:
"""
Forward the JSON toolcall coming from the LLM to all MCP servers
listed in self.mcp_servers.
"""
try:
payload = json.loads(response) # {"tool_name": …}
results = batch_mcp_flow(self.mcp_servers, payload)
# `batch_mcp_flow` already blocks, so results is a list[str]
return any_to_str(results[0] if len(results) == 1 else results)
except Exception as err:
logger.error(f"MCP flow failed: {err}")
return f"[MCP-error] {err}"
```
*(you may keep the rest of your big `Agent` class unchanged)*
---
## 3  clientside tiny polish
When you start the mock server you exposed SSE at `http://0.0.0.0:8000`.
Thats fine the fastmcp helper discovers the SSE endpoint automatically,
so the URL in `MCPServerSseParams` is correct.
---
## 4  Quick smoketest
1. **Start the mock server** (in one shell):
```bash
python math_mock_server.py # prints “Starting Mock Math Server…”
```
2. **Run the client** (your `mcp_client.py`) in another shell:
```
Enter your query (or 'exit' to quit): add 4 and 19834
Math Agent Response: 19838
```
No warning, no `[MCPerror]`, and you get the right answer.
---
### Why this works
* `async with MCPServerSse(...) as srv:` guarantees `connect()` and `cleanup()` are
called and awaited.
* `await srv.call_tool(payload)` actually sends the JSONRPC request.
* `asyncio.run()` in the wrapper lets the rest of your Agent stay 100 % synchronous.
* Any exception bubbles up, gets logged, and your Agent sees
`[MCPerror] …` instead of mysterious `None`.
You can now add more servers to `self.mcp_servers` theyll all be invoked in
parallel, and the first element of the returned list will always hold the
result from the first server (adapt as needed).
Happy calculating!

@ -0,0 +1,114 @@
Youve fixed the *RuntimeWarning* by wrapping the coroutine, but now the call never actually reaches the mock server.
Two things are still wrong:
| symptom | real cause |
|---------|------------|
| **`__aenter__`** error from `process_param` | `MCPServerSse` *is* an asynccontextmanager; you must use **`async with`** (or call its `connect/cleanup` explicitly). |
| **“No result returned from MCP server”** | we never awaited `call_tool`, so the request was never sent; the wrapper returned `None`. |
Below is a minimal, fullyworking patch set.
Apply it and the *MathMockServer* will answer correctly (e.g. `add 4 and 19834 → 19838`).
---
## 1  `tools/mcp_integration.py`
```python
# keep the import you already added
import asyncio
# ────────────────────────────────────────────────────────────
# 1. async helper that really talks to one MCP server
async def _call_one_server(param: MCPServerSseParams,
payload: dict[str, any]) -> str:
async with MCPServerSse(param, cache_tools_list=True) as srv:
# ↓ fastmcp returns a pydantic model; make it jsonserialisable
res = await srv.call_tool(payload)
try:
return res.model_dump() # fastmcp ≥0.2
except AttributeError:
return res # plain dict / str
# 2. public sync wrapper what the Agent imports & uses
def batch_mcp_flow(params: list[MCPServerSseParams],
payload: dict[str, any]) -> list[str]:
"""Blocking helper fanout to every MCP server given in *params*."""
return asyncio.run(_batch(params, payload))
# 3. little async fanout
async def _batch(params: list[MCPServerSseParams],
payload: dict[str, any]) -> list[str]:
coros = [_call_one_server(p, payload) for p in params]
return await asyncio.gather(*coros)
```
---
## 2  `structs/agent.py`
Replace **only** the `mcp_execution_flow` method with the synchronous
wrapper that now returns the real value (note: `payload` is a *string* here):
```python
# inside class Agent
def mcp_execution_flow(self, response: str) -> str:
"""
Forward the JSON toolcall coming from the LLM to all MCP servers
listed in self.mcp_servers.
"""
try:
payload = json.loads(response) # {"tool_name": …}
results = batch_mcp_flow(self.mcp_servers, payload)
# `batch_mcp_flow` already blocks, so results is a list[str]
return any_to_str(results[0] if len(results) == 1 else results)
except Exception as err:
logger.error(f"MCP flow failed: {err}")
return f"[MCP-error] {err}"
```
*(you may keep the rest of your big `Agent` class unchanged)*
---
## 3  clientside tiny polish
When you start the mock server you exposed SSE at `http://0.0.0.0:8000`.
Thats fine the fastmcp helper discovers the SSE endpoint automatically,
so the URL in `MCPServerSseParams` is correct.
---
## 4  Quick smoketest
1. **Start the mock server** (in one shell):
```bash
python math_mock_server.py # prints “Starting Mock Math Server…”
```
2. **Run the client** (your `mcp_client.py`) in another shell:
```
Enter your query (or 'exit' to quit): add 4 and 19834
Math Agent Response: 19838
```
No warning, no `[MCPerror]`, and you get the right answer.
---
### Why this works
* `async with MCPServerSse(...) as srv:` guarantees `connect()` and `cleanup()` are
called and awaited.
* `await srv.call_tool(payload)` actually sends the JSONRPC request.
* `asyncio.run()` in the wrapper lets the rest of your Agent stay 100 % synchronous.
* Any exception bubbles up, gets logged, and your Agent sees
`[MCPerror] …` instead of mysterious `None`.
You can now add more servers to `self.mcp_servers` theyll all be invoked in
parallel, and the first element of the returned list will always hold the
result from the first server (adapt as needed).
Happy calculating!
Loading…
Cancel
Save