Merge pull request #1122 from IlumCI/aop-health

[EXAMPLE-AOP][Created AOP example with Medical Agents]
pull/1107/head
Kye Gomez 2 months ago committed by GitHub
commit 24b4288943
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,171 @@
# Medical AOP Example
A real-world demonstration of the Agent Orchestration Protocol (AOP) using medical agents deployed as MCP tools.
## Overview
This example showcases how to:
- Deploy multiple medical agents as MCP tools via AOP
- Use discovery tools for dynamic agent collaboration
- Execute real tool calls with structured schemas
- Integrate with keyless APIs for enhanced context
## Architecture
```mermaid
graph LR
A[Medical Agents] --> B[AOP MCP Server<br/>Port 8000]
B --> C[Client<br/>Cursor/Python]
B --> D[Discovery Tools]
B --> E[Tool Execution]
subgraph "Medical Agents"
F[Chief Medical Officer]
G[Virologist]
H[Internist]
I[Medical Coder]
J[Diagnostic Synthesizer]
end
A --> F
A --> G
A --> H
A --> I
A --> J
```
### Medical Agents
- **Chief Medical Officer**: Coordination, diagnosis, triage
- **Virologist**: Viral disease analysis and ICD-10 coding
- **Internist**: Internal medicine evaluation and HCC tagging
- **Medical Coder**: ICD-10 code assignment and compliance
- **Diagnostic Synthesizer**: Final report synthesis with confidence levels
## Files
| File | Description |
|------|-------------|
| `medical_aop/server.py` | AOP server exposing medical agents as MCP tools |
| `medical_aop/client.py` | Discovery client with real tool execution |
| `README.md` | This documentation |
## Usage
### 1. Start the AOP Server
```bash
python -m examples.aop_examples.medical_aop.server
```
### 2. Configure Cursor MCP Integration
Add to `~/.cursor/mcp.json`:
```json
{
"mcpServers": {
"Medical AOP": {
"type": "http",
"url": "http://localhost:8000/mcp"
}
}
}
```
### 3. Use in Cursor
Enable "Medical AOP" in Cursor's MCP settings, then:
#### Discover agents:
```
Call tool discover_agents with: {}
```
#### Execute medical coding:
```
Call tool Medical Coder with: {"task":"Patient: 45M, egfr 59 ml/min/1.73; non-African American. Provide ICD-10 suggestions and coding notes.","priority":"normal","include_images":false}
```
#### Review infection control:
```
Call tool Chief Medical Officer with: {"task":"Review current hospital infection control protocols in light of recent MRSA outbreak in ICU. Provide executive summary, policy adjustment recommendations, and estimated implementation costs.","priority":"high"}
```
### 4. Run Python Client
```bash
python -m examples.aop_examples.medical_aop.client
```
## Features
### Structured Schemas
- Custom input/output schemas with validation
- Priority levels (low/normal/high)
- Image processing support
- Confidence scoring
### Discovery Tools
| Tool | Description |
|------|-------------|
| `discover_agents` | List all available agents |
| `get_agent_details` | Detailed agent information |
| `search_agents` | Keyword-based agent search |
| `list_agents` | Simple agent name list |
### Real-world Integration
- Keyless API integration (disease.sh for epidemiology data)
- Structured medical coding workflows
- Executive-level policy recommendations
- Cost estimation and implementation timelines
## Response Format
All tools return consistent JSON:
```json
{
"result": "Agent response text",
"success": true,
"error": null,
"confidence": 0.95,
"codes": ["N18.3", "Z51.11"]
}
```
## Configuration
### Server Settings
| Setting | Value |
|---------|-------|
| Port | 8000 |
| Transport | streamable-http |
| Timeouts | 40-50 seconds per agent |
| Logging | INFO level with traceback enabled |
### Agent Metadata
Each agent includes:
- Tags for categorization
- Capabilities for matching
- Role classification
- Model configuration
## Best Practices
1. **Use structured inputs**: Leverage the custom schemas for better results
2. **Chain agents**: Pass results between agents for comprehensive analysis
3. **Monitor timeouts**: Adjust based on task complexity
4. **Validate responses**: Check the `success` field in all responses
5. **Use discovery**: Query available agents before hardcoding tool names
## Troubleshooting
| Issue | Solution |
|-------|----------|
| Connection refused | Ensure server is running on port 8000 |
| Tool not found | Use `discover_agents` to verify available tools |
| Timeout errors | Increase timeout values for complex tasks |
| Schema validation | Ensure input matches the defined JSON schema |
## References
- [AOP Reference](https://docs.swarms.world/en/latest/swarms/structs/aop/)
- [MCP Integration](https://docs.swarms.ai/examples/mcp-integration)
- [Protocol Overview](https://docs.swarms.world/en/latest/protocol/overview/)

@ -428,6 +428,9 @@ nav:
- Web Scraper Agents: "developer_guides/web_scraper.md" - Web Scraper Agents: "developer_guides/web_scraper.md"
- Smart Database: "examples/smart_database.md" - Smart Database: "examples/smart_database.md"
- AOP:
- Medical AOP Example: "examples/aop_medical.md"
- Swarms Cloud API: - Swarms Cloud API:
- Overview: "swarms_cloud/migration.md" - Overview: "swarms_cloud/migration.md"

@ -0,0 +1,113 @@
import asyncio
import json
from typing import Dict
import requests
from swarms.structs.aop import AOPCluster
from swarms.tools.mcp_client_tools import execute_tool_call_simple
def _select_tools_by_keyword(tools: list, keyword: str) -> list:
"""
Return tools whose name or description contains the keyword
(case-insensitive).
"""
kw = keyword.lower()
selected = []
for t in tools:
name = t.get("function", {}).get("name", "")
desc = t.get("function", {}).get("description", "")
if kw in name.lower() or kw in desc.lower():
selected.append(t)
return selected
def _example_payload_from_schema(tools: list, tool_name: str) -> dict:
"""
Construct a minimal example payload for a given tool using its JSON schema.
Falls back to a generic 'task' if schema not present.
"""
for t in tools:
fn = t.get("function", {})
if fn.get("name") == tool_name:
schema = fn.get("parameters", {})
required = schema.get("required", [])
props = schema.get("properties", {})
payload = {}
for r in required:
if r in props:
if props[r].get("type") == "string":
payload[r] = (
"Example patient case: 45M, egfr 59 ml/min/1.73"
)
elif props[r].get("type") == "boolean":
payload[r] = False
else:
payload[r] = None
if not payload:
payload = {
"task": "Provide ICD-10 suggestions for the case above"
}
return payload
return {"task": "Provide ICD-10 suggestions for the case above"}
def main() -> None:
cluster = AOPCluster(
urls=["http://localhost:8000/mcp"],
transport="streamable-http",
)
tools = cluster.get_tools(output_type="dict")
print(f"Tools: {len(tools)}")
coding_tools = _select_tools_by_keyword(tools, "coder")
names = [t.get("function", {}).get("name") for t in coding_tools]
print(f"Coding-related tools: {names}")
# Build a real payload for "Medical Coder" and execute the tool call
tool_name = "Medical Coder"
payload: Dict[str, object] = _example_payload_from_schema(tools, tool_name)
# Enrich with public keyless data (epidemiology context via disease.sh)
try:
epi = requests.get(
"https://disease.sh/v3/covid-19/countries/USA?strict=true",
timeout=5,
)
if epi.ok:
data = epi.json()
epi_summary = (
f"US COVID-19 context: cases={data.get('cases')}, "
f"todayCases={data.get('todayCases')}, deaths={data.get('deaths')}"
)
base_task = payload.get("task") or ""
payload["task"] = (
f"{base_task}\n\nEpidemiology context (no key API): {epi_summary}"
)
except Exception:
pass
print("Calling tool:", tool_name)
request = {
"function": {
"name": tool_name,
"arguments": payload,
}
}
result = asyncio.run(
execute_tool_call_simple(
response=request,
server_path="http://localhost:8000/mcp",
output_type="json",
transport="streamable-http",
verbose=False,
)
)
print("Response:")
print(result)
if __name__ == "__main__":
main()

@ -0,0 +1,166 @@
# Import medical agents defined in the demo module
from examples.demos.medical.medical_coder_agent import (chief_medical_officer,
internist,
medical_coder,
synthesizer,
virologist)
from swarms.structs.aop import AOP
def _enrich_agents_metadata() -> None:
"""
Add lightweight tags/capabilities/roles to imported agents for
better discovery results.
"""
chief_medical_officer.tags = [
"coordination",
"diagnosis",
"triage",
]
chief_medical_officer.capabilities = [
"case-intake",
"differential",
"planning",
]
chief_medical_officer.role = "coordinator"
virologist.tags = ["virology", "infectious-disease"]
virologist.capabilities = ["viral-analysis", "icd10-suggestion"]
virologist.role = "specialist"
internist.tags = ["internal-medicine", "evaluation"]
internist.capabilities = [
"system-review",
"hcc-tagging",
"risk-stratification",
]
internist.role = "specialist"
medical_coder.tags = ["coding", "icd10", "compliance"]
medical_coder.capabilities = [
"code-assignment",
"documentation-review",
]
medical_coder.role = "coder"
synthesizer.tags = ["synthesis", "reporting"]
synthesizer.capabilities = [
"evidence-reconciliation",
"final-report",
]
synthesizer.role = "synthesizer"
def _medical_input_schema() -> dict:
return {
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "Patient case or instruction for the agent",
},
"priority": {
"type": "string",
"enum": ["low", "normal", "high"],
"description": "Processing priority",
},
"include_images": {
"type": "boolean",
"description": "Whether to consider linked images if provided",
"default": False,
},
"img": {
"type": "string",
"description": "Optional image path/URL",
},
"imgs": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of images",
},
},
"required": ["task"],
"additionalProperties": False,
}
def _medical_output_schema() -> dict:
return {
"type": "object",
"properties": {
"result": {"type": "string"},
"success": {"type": "boolean"},
"error": {"type": "string"},
"confidence": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "Optional confidence in the assessment",
},
"codes": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of suggested ICD-10 codes",
},
},
"required": ["result", "success"],
"additionalProperties": True,
}
def main() -> None:
"""
Start an AOP MCP server that exposes the medical agents as tools with
structured schemas and per-agent settings.
"""
_enrich_agents_metadata()
deployer = AOP(
server_name="Medical-AOP-Server",
port=8000,
verbose=False,
traceback_enabled=True,
log_level="INFO",
transport="streamable-http",
)
input_schema = _medical_input_schema()
output_schema = _medical_output_schema()
# Register each agent with a modest, role-appropriate timeout
deployer.add_agent(
chief_medical_officer,
timeout=45,
input_schema=input_schema,
output_schema=output_schema,
)
deployer.add_agent(
virologist,
timeout=40,
input_schema=input_schema,
output_schema=output_schema,
)
deployer.add_agent(
internist,
timeout=40,
input_schema=input_schema,
output_schema=output_schema,
)
deployer.add_agent(
medical_coder,
timeout=50,
input_schema=input_schema,
output_schema=output_schema,
)
deployer.add_agent(
synthesizer,
timeout=45,
input_schema=input_schema,
output_schema=output_schema,
)
deployer.run()
if __name__ == "__main__":
main()
Loading…
Cancel
Save