[IMPROVEMENT][AOP][New tools for agent discovery and more]

pull/1100/head
Kye Gomez 1 week ago
parent 7406320843
commit 7813b59efd

@ -1,326 +0,0 @@
# AOP Server Setup Example
This example demonstrates how to set up an AOP (Agent Orchestration Protocol) server with multiple specialized agents.
## Complete Server Setup
```python
from swarms import Agent
from swarms.structs.aop import AOP
# Create specialized agents
research_agent = Agent(
agent_name="Research-Agent",
agent_description="Expert in research, data collection, and information gathering",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a research specialist. Your role is to:
1. Gather comprehensive information on any given topic
2. Analyze data from multiple sources
3. Provide well-structured research findings
4. Cite sources and maintain accuracy
5. Present findings in a clear, organized manner
Always provide detailed, factual information with proper context.""",
)
analysis_agent = Agent(
agent_name="Analysis-Agent",
agent_description="Expert in data analysis, pattern recognition, and generating insights",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are an analysis specialist. Your role is to:
1. Analyze data and identify patterns
2. Generate actionable insights
3. Create visualizations and summaries
4. Provide statistical analysis
5. Make data-driven recommendations
Focus on extracting meaningful insights from information.""",
)
writing_agent = Agent(
agent_name="Writing-Agent",
agent_description="Expert in content creation, editing, and communication",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a writing specialist. Your role is to:
1. Create engaging, well-structured content
2. Edit and improve existing text
3. Adapt tone and style for different audiences
4. Ensure clarity and coherence
5. Follow best practices in writing
Always produce high-quality, professional content.""",
)
code_agent = Agent(
agent_name="Code-Agent",
agent_description="Expert in programming, code review, and software development",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a coding specialist. Your role is to:
1. Write clean, efficient code
2. Debug and fix issues
3. Review and optimize code
4. Explain programming concepts
5. Follow best practices and standards
Always provide working, well-documented code.""",
)
financial_agent = Agent(
agent_name="Financial-Agent",
agent_description="Expert in financial analysis, market research, and investment insights",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a financial specialist. Your role is to:
1. Analyze financial data and markets
2. Provide investment insights
3. Assess risk and opportunities
4. Create financial reports
5. Explain complex financial concepts
Always provide accurate, well-reasoned financial analysis.""",
)
# Create AOP instance
deployer = AOP(
server_name="MyAgentServer",
port=8000,
verbose=True,
log_level="INFO"
)
# Add all agents at once
agents = [
research_agent,
analysis_agent,
writing_agent,
code_agent,
financial_agent,
]
tool_names = deployer.add_agents_batch(agents)
print(f"Added {len(tool_names)} agents: {tool_names}")
# Display server information
server_info = deployer.get_server_info()
print(f"Server: {server_info['server_name']}")
print(f"Total tools: {server_info['total_tools']}")
print(f"Available tools: {server_info['tools']}")
# Start the server
print("Starting AOP server...")
deployer.run()
```
## Running the Server
1. Save the code above to a file (e.g., `aop_server.py`)
2. Install required dependencies:
```bash
pip install swarms
```
3. Run the server:
```bash
python aop_server.py
```
The server will start on `http://localhost:8000` and make all agents available as MCP tools.
## Tool Usage Examples
Once the server is running, you can call the tools using MCP clients:
### Research Agent
```python
# Call the research agent
result = research_tool(task="Research the latest AI trends in 2024")
print(result)
```
### Analysis Agent with Image
```python
# Call the analysis agent with an image
result = analysis_tool(
task="Analyze this chart and provide insights",
img="path/to/chart.png"
)
print(result)
```
### Writing Agent with Multiple Images
```python
# Call the writing agent with multiple images
result = writing_tool(
task="Write a comprehensive report based on these images",
imgs=["image1.jpg", "image2.jpg", "image3.jpg"]
)
print(result)
```
### Code Agent with Validation
```python
# Call the code agent with expected output
result = code_tool(
task="Debug this Python function",
correct_answer="Expected output: Hello World"
)
print(result)
```
### Financial Agent
```python
# Call the financial agent
result = financial_tool(task="Analyze the current market trends for tech stocks")
print(result)
```
## Response Format
All tools return a standardized response:
```json
{
"result": "The agent's response to the task",
"success": true,
"error": null
}
```
## Advanced Configuration
### Custom Timeouts and Retries
```python
# Add agent with custom configuration
deployer.add_agent(
agent=research_agent,
tool_name="custom_research_tool",
tool_description="Research tool with extended timeout",
timeout=120, # 2 minutes
max_retries=5,
verbose=True
)
```
### Custom Input/Output Schemas
```python
# Define custom schemas
custom_input_schema = {
"type": "object",
"properties": {
"task": {"type": "string", "description": "The research task"},
"sources": {
"type": "array",
"items": {"type": "string"},
"description": "Specific sources to research"
},
"depth": {
"type": "string",
"enum": ["shallow", "medium", "deep"],
"description": "Research depth level"
}
},
"required": ["task"]
}
# Add agent with custom schemas
deployer.add_agent(
agent=research_agent,
tool_name="advanced_research_tool",
input_schema=custom_input_schema,
timeout=60
)
```
## Monitoring and Debugging
### Enable Verbose Logging
```python
deployer = AOP(
server_name="DebugServer",
verbose=True,
traceback_enabled=True,
log_level="DEBUG"
)
```
### Check Server Status
```python
# List all registered agents
agents = deployer.list_agents()
print(f"Registered agents: {agents}")
# Get detailed agent information
for agent_name in agents:
info = deployer.get_agent_info(agent_name)
print(f"Agent {agent_name}: {info}")
# Get server information
server_info = deployer.get_server_info()
print(f"Server info: {server_info}")
```
## Production Deployment
### External Access
```python
deployer = AOP(
server_name="ProductionServer",
host="0.0.0.0", # Allow external connections
port=8000,
verbose=False, # Disable verbose logging in production
log_level="WARNING"
)
```
### Multiple Servers
```python
# Server 1: Research and Analysis
research_deployer = AOP("ResearchServer", port=8000)
research_deployer.add_agent(research_agent)
research_deployer.add_agent(analysis_agent)
# Server 2: Writing and Code
content_deployer = AOP("ContentServer", port=8001)
content_deployer.add_agent(writing_agent)
content_deployer.add_agent(code_agent)
# Server 3: Financial
finance_deployer = AOP("FinanceServer", port=8002)
finance_deployer.add_agent(financial_agent)
# Start all servers
import threading
threading.Thread(target=research_deployer.run).start()
threading.Thread(target=content_deployer.run).start()
threading.Thread(target=finance_deployer.run).start()
```
This example demonstrates a complete AOP server setup with multiple specialized agents, proper configuration, and production-ready deployment options.

@ -34,13 +34,15 @@ Main class for deploying agents as tools in an MCP server.
|-----------|------|---------|-------------| |-----------|------|---------|-------------|
| `server_name` | `str` | `"AOP Cluster"` | Name for the MCP server | | `server_name` | `str` | `"AOP Cluster"` | Name for the MCP server |
| `description` | `str` | `"A cluster that enables you to deploy multiple agents as tools in an MCP server."` | Server description | | `description` | `str` | `"A cluster that enables you to deploy multiple agents as tools in an MCP server."` | Server description |
| `agents` | `List[Agent]` | `None` | Optional list of agents to add initially | | `agents` | `any` | `None` | Optional list of agents to add initially |
| `port` | `int` | `8000` | Port for the MCP server | | `port` | `int` | `8000` | Port for the MCP server |
| `transport` | `str` | `"streamable-http"` | Transport type for the MCP server | | `transport` | `str` | `"streamable-http"` | Transport type for the MCP server |
| `verbose` | `bool` | `False` | Enable verbose logging | | `verbose` | `bool` | `False` | Enable verbose logging |
| `traceback_enabled` | `bool` | `True` | Enable traceback logging for errors | | `traceback_enabled` | `bool` | `True` | Enable traceback logging for errors |
| `host` | `str` | `"localhost"` | Host to bind the server to | | `host` | `str` | `"localhost"` | Host to bind the server to |
| `log_level` | `str` | `"INFO"` | Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) | | `log_level` | `str` | `"INFO"` | Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) |
| `*args` | `Any` | - | Additional positional arguments passed to FastMCP |
| `**kwargs` | `Any` | - | Additional keyword arguments passed to FastMCP |
#### Methods #### Methods
@ -120,6 +122,203 @@ Get information about the MCP server and registered tools.
**Returns:** `Dict[str, Any]` - Server information **Returns:** `Dict[str, Any]` - Server information
##### _register_tool()
Register a single agent as an MCP tool (internal method).
| Parameter | Type | Description |
|-----------|------|-------------|
| `tool_name` | `str` | Name of the tool to register |
| `agent` | `AgentType` | The agent instance to register |
##### _execute_agent_with_timeout()
Execute an agent with a timeout and all run method parameters (internal method).
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `agent` | `AgentType` | Required | The agent to execute |
| `task` | `str` | Required | The task to execute |
| `timeout` | `int` | Required | Maximum execution time in seconds |
| `img` | `str` | `None` | Optional image to be processed by the agent |
| `imgs` | `List[str]` | `None` | Optional list of images to be processed by the agent |
| `correct_answer` | `str` | `None` | Optional correct answer for validation or comparison |
**Returns:** `str` - The agent's response
**Raises:** `TimeoutError` if execution exceeds timeout, `Exception` if agent execution fails
##### _get_agent_discovery_info()
Get discovery information for a specific agent (internal method).
| Parameter | Type | Description |
|-----------|------|-------------|
| `tool_name` | `str` | Name of the agent tool |
**Returns:** `Optional[Dict[str, Any]]` - Agent discovery information, or None if not found
## Discovery Tools
AOP automatically registers several discovery tools that allow agents to learn about each other and enable dynamic agent discovery within the cluster.
### discover_agents
Discover information about agents in the cluster including their name, description, system prompt (truncated to 200 chars), and tags.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `agent_name` | `str` | `None` | Optional specific agent name to get info for. If None, returns info for all agents. |
**Returns:** `Dict[str, Any]` - Agent information for discovery
**Response Format:**
```json
{
"success": true,
"agents": [
{
"tool_name": "agent_name",
"agent_name": "Agent Name",
"description": "Agent description",
"short_system_prompt": "Truncated system prompt...",
"tags": ["tag1", "tag2"],
"capabilities": ["capability1", "capability2"],
"role": "worker",
"model_name": "model_name",
"max_loops": 1,
"temperature": 0.5,
"max_tokens": 4096
}
]
}
```
### get_agent_details
Get detailed information about a single agent by name including configuration, capabilities, and metadata.
| Parameter | Type | Description |
|-----------|------|-------------|
| `agent_name` | `str` | Name of the agent to get information for. |
**Returns:** `Dict[str, Any]` - Detailed agent information
**Response Format:**
```json
{
"success": true,
"agent_info": {
"tool_name": "agent_name",
"agent_name": "Agent Name",
"agent_description": "Agent description",
"model_name": "model_name",
"max_loops": 1,
"tool_description": "Tool description",
"timeout": 30,
"max_retries": 3,
"verbose": false,
"traceback_enabled": true
},
"discovery_info": {
"tool_name": "agent_name",
"agent_name": "Agent Name",
"description": "Agent description",
"short_system_prompt": "Truncated system prompt...",
"tags": ["tag1", "tag2"],
"capabilities": ["capability1", "capability2"],
"role": "worker",
"model_name": "model_name",
"max_loops": 1,
"temperature": 0.5,
"max_tokens": 4096
}
}
```
### get_agents_info
Get detailed information about multiple agents by providing a list of agent names.
| Parameter | Type | Description |
|-----------|------|-------------|
| `agent_names` | `List[str]` | List of agent names to get information for. |
**Returns:** `Dict[str, Any]` - Detailed information for all requested agents
**Response Format:**
```json
{
"success": true,
"agents_info": [
{
"agent_name": "agent_name",
"agent_info": { /* detailed agent info */ },
"discovery_info": { /* discovery info */ }
}
],
"not_found": ["missing_agent"],
"total_found": 1,
"total_requested": 2
}
```
### list_agents
Get a simple list of all available agent names in the cluster.
**Returns:** `Dict[str, Any]` - List of agent names
**Response Format:**
```json
{
"success": true,
"agent_names": ["agent1", "agent2", "agent3"],
"total_count": 3
}
```
### search_agents
Search for agents by name, description, tags, or capabilities using keyword matching.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `query` | `str` | Required | Search query string |
| `search_fields` | `List[str]` | `["name", "description", "tags", "capabilities"]` | Optional list of fields to search in. If None, searches all fields. |
**Returns:** `Dict[str, Any]` - Matching agents
**Response Format:**
```json
{
"success": true,
"matching_agents": [
{
"tool_name": "agent_name",
"agent_name": "Agent Name",
"description": "Agent description",
"short_system_prompt": "Truncated system prompt...",
"tags": ["tag1", "tag2"],
"capabilities": ["capability1", "capability2"],
"role": "worker",
"model_name": "model_name",
"max_loops": 1,
"temperature": 0.5,
"max_tokens": 4096
}
],
"total_matches": 1,
"query": "search_term",
"search_fields": ["name", "description", "tags", "capabilities"]
}
```
### AOPCluster Class ### AOPCluster Class
Class for connecting to and managing multiple MCP servers. Class for connecting to and managing multiple MCP servers.
@ -275,18 +474,22 @@ print(f"Added {len(tool_names)} agents: {tool_names}")
deployer.run() deployer.run()
``` ```
### Advanced Configuration ### Advanced Configuration with Tags and Capabilities
```python ```python
from swarms import Agent from swarms import Agent
from swarms.structs.aop import AOP from swarms.structs.aop import AOP
# Create agent with custom configuration # Create agent with custom configuration, tags, and capabilities
research_agent = Agent( research_agent = Agent(
agent_name="Research-Agent", agent_name="Research-Agent",
agent_description="Expert in research and data collection", agent_description="Expert in research and data collection",
model_name="anthropic/claude-sonnet-4-5", model_name="anthropic/claude-sonnet-4-5",
max_loops=1, max_loops=1,
# Add tags and capabilities for better discovery
tags=["research", "data-collection", "analysis"],
capabilities=["web-search", "data-gathering", "report-generation"],
role="researcher"
) )
# Create AOP with custom settings # Create AOP with custom settings
@ -405,6 +608,141 @@ else:
print("Research-Agent tool not found") print("Research-Agent tool not found")
``` ```
### Discovery Tools Examples
The AOP server automatically provides discovery tools that allow agents to learn about each other. Here are examples of how to use these tools:
```python
# Example discovery tool calls (these would be made by MCP clients)
# Discover all agents in the cluster
all_agents = discover_agents()
print(f"Found {len(all_agents['agents'])} agents in the cluster")
# Discover a specific agent
research_agent_info = discover_agents(agent_name="Research-Agent")
if research_agent_info['success']:
agent = research_agent_info['agents'][0]
print(f"Agent: {agent['agent_name']}")
print(f"Description: {agent['description']}")
print(f"Tags: {agent['tags']}")
print(f"Capabilities: {agent['capabilities']}")
# Get detailed information about a specific agent
agent_details = get_agent_details(agent_name="Research-Agent")
if agent_details['success']:
print("Agent Info:", agent_details['agent_info'])
print("Discovery Info:", agent_details['discovery_info'])
# Get information about multiple agents
multiple_agents = get_agents_info(agent_names=["Research-Agent", "Analysis-Agent"])
print(f"Found {multiple_agents['total_found']} out of {multiple_agents['total_requested']} agents")
print("Not found:", multiple_agents['not_found'])
# List all available agents
agent_list = list_agents()
print(f"Available agents: {agent_list['agent_names']}")
# Search for agents by keyword
search_results = search_agents(query="research")
print(f"Found {search_results['total_matches']} agents matching 'research'")
# Search in specific fields only
tag_search = search_agents(
query="data",
search_fields=["tags", "capabilities"]
)
print(f"Found {tag_search['total_matches']} agents with 'data' in tags or capabilities")
```
### Dynamic Agent Discovery Example
Here's a practical example of how agents can use discovery tools to find and collaborate with other agents:
```python
from swarms import Agent
from swarms.structs.aop import AOP
# Create a coordinator agent that can discover and use other agents
coordinator = Agent(
agent_name="Coordinator-Agent",
agent_description="Coordinates tasks between different specialized agents",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
tags=["coordination", "orchestration", "management"],
capabilities=["agent-discovery", "task-distribution", "workflow-management"],
role="coordinator"
)
# Create specialized agents
research_agent = Agent(
agent_name="Research-Agent",
agent_description="Expert in research and data collection",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
tags=["research", "data-collection", "analysis"],
capabilities=["web-search", "data-gathering", "report-generation"],
role="researcher"
)
analysis_agent = Agent(
agent_name="Analysis-Agent",
agent_description="Expert in data analysis and insights",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
tags=["analysis", "data-processing", "insights"],
capabilities=["statistical-analysis", "pattern-recognition", "visualization"],
role="analyst"
)
# Create AOP server
deployer = AOP(
server_name="DynamicAgentCluster",
port=8000,
verbose=True
)
# Add all agents
deployer.add_agent(coordinator)
deployer.add_agent(research_agent)
deployer.add_agent(analysis_agent)
# The coordinator can now discover other agents and use them
# This would be done through MCP tool calls in practice
def coordinate_research_task(task_description):
"""
Example of how the coordinator might use discovery tools
"""
# 1. Discover available research agents
research_agents = discover_agents()
research_agents = [a for a in research_agents['agents'] if 'research' in a['tags']]
# 2. Get detailed info about the best research agent
if research_agents:
best_agent = research_agents[0]
agent_details = get_agent_details(agent_name=best_agent['agent_name'])
# 3. Use the research agent for the task
research_result = research_agent.run(task=task_description)
# 4. Find analysis agents for processing the research
analysis_agents = search_agents(query="analysis", search_fields=["tags"])
if analysis_agents['matching_agents']:
analysis_agent_name = analysis_agents['matching_agents'][0]['agent_name']
analysis_result = analysis_agent.run(task=f"Analyze this research: {research_result}")
return {
"research_result": research_result,
"analysis_result": analysis_result,
"agents_used": [best_agent['agent_name'], analysis_agent_name]
}
return {"error": "No suitable agents found"}
# Start the server
deployer.run()
```
### Tool Execution Examples ### Tool Execution Examples
Once your AOP server is running, you can call the tools using MCP clients. Here are examples of how the tools would be called: Once your AOP server is running, you can call the tools using MCP clients. Here are examples of how the tools would be called:
@ -460,6 +798,10 @@ AOP provides comprehensive error handling:
| **Handle Errors** | Always check the `success` field in tool responses | | **Handle Errors** | Always check the `success` field in tool responses |
| **Resource Management** | Monitor server resources when running multiple agents | | **Resource Management** | Monitor server resources when running multiple agents |
| **Security** | Use appropriate host/port settings for your deployment environment | | **Security** | Use appropriate host/port settings for your deployment environment |
| **Use Tags and Capabilities** | Add meaningful tags and capabilities to agents for better discovery |
| **Define Agent Roles** | Use the `role` attribute to categorize agents (coordinator, worker, etc.) |
| **Leverage Discovery Tools** | Use built-in discovery tools for dynamic agent collaboration |
| **Design for Scalability** | Plan for adding/removing agents dynamically using discovery tools |
## Integration with Other Systems ## Integration with Other Systems

@ -1,336 +0,0 @@
# AOP Cluster Example
This example demonstrates how to use AOPCluster to connect to and manage multiple MCP servers running AOP agents.
## Basic Cluster Setup
```python
import json
from swarms.structs.aop import AOPCluster
# Connect to multiple MCP servers
cluster = AOPCluster(
urls=[
"http://localhost:8000/mcp", # Research and Analysis server
"http://localhost:8001/mcp", # Writing and Code server
"http://localhost:8002/mcp" # Financial server
],
transport="streamable-http"
)
# Get all available tools from all servers
all_tools = cluster.get_tools(output_type="dict")
print(f"Found {len(all_tools)} tools across all servers")
# Pretty print all tools
print(json.dumps(all_tools, indent=2))
```
## Finding Specific Tools
```python
# Find a specific tool by name
research_tool = cluster.find_tool_by_server_name("Research-Agent")
if research_tool:
print("Found Research-Agent tool:")
print(json.dumps(research_tool, indent=2))
else:
print("Research-Agent tool not found")
# Find multiple tools
tool_names = ["Research-Agent", "Analysis-Agent", "Writing-Agent", "Code-Agent"]
found_tools = {}
for tool_name in tool_names:
tool = cluster.find_tool_by_server_name(tool_name)
if tool:
found_tools[tool_name] = tool
print(f"✓ Found {tool_name}")
else:
print(f"✗ {tool_name} not found")
print(f"Found {len(found_tools)} out of {len(tool_names)} tools")
```
## Tool Discovery and Management
```python
# Get tools in different formats
json_tools = cluster.get_tools(output_type="json")
dict_tools = cluster.get_tools(output_type="dict")
str_tools = cluster.get_tools(output_type="str")
print(f"JSON format: {len(json_tools)} tools")
print(f"Dict format: {len(dict_tools)} tools")
print(f"String format: {len(str_tools)} tools")
# Analyze tool distribution across servers
server_tools = {}
for tool in dict_tools:
server_name = tool.get("server", "unknown")
if server_name not in server_tools:
server_tools[server_name] = []
server_tools[server_name].append(tool.get("function", {}).get("name", "unknown"))
print("\nTools by server:")
for server, tools in server_tools.items():
print(f" {server}: {len(tools)} tools - {tools}")
```
## Advanced Cluster Management
```python
class AOPClusterManager:
def __init__(self, urls, transport="streamable-http"):
self.cluster = AOPCluster(urls, transport)
self.tools_cache = {}
self.last_update = None
def refresh_tools(self):
"""Refresh the tools cache"""
self.tools_cache = {}
tools = self.cluster.get_tools(output_type="dict")
for tool in tools:
tool_name = tool.get("function", {}).get("name")
if tool_name:
self.tools_cache[tool_name] = tool
self.last_update = time.time()
return len(self.tools_cache)
def get_tool(self, tool_name):
"""Get a specific tool by name"""
if not self.tools_cache or time.time() - self.last_update > 300: # 5 min cache
self.refresh_tools()
return self.tools_cache.get(tool_name)
def list_tools_by_category(self):
"""Categorize tools by their names"""
categories = {
"research": [],
"analysis": [],
"writing": [],
"code": [],
"financial": [],
"other": []
}
for tool_name in self.tools_cache.keys():
tool_name_lower = tool_name.lower()
if "research" in tool_name_lower:
categories["research"].append(tool_name)
elif "analysis" in tool_name_lower:
categories["analysis"].append(tool_name)
elif "writing" in tool_name_lower:
categories["writing"].append(tool_name)
elif "code" in tool_name_lower:
categories["code"].append(tool_name)
elif "financial" in tool_name_lower:
categories["financial"].append(tool_name)
else:
categories["other"].append(tool_name)
return categories
def get_available_servers(self):
"""Get list of available servers"""
servers = set()
for tool in self.tools_cache.values():
server = tool.get("server", "unknown")
servers.add(server)
return list(servers)
# Usage example
import time
manager = AOPClusterManager([
"http://localhost:8000/mcp",
"http://localhost:8001/mcp",
"http://localhost:8002/mcp"
])
# Refresh and display tools
tool_count = manager.refresh_tools()
print(f"Loaded {tool_count} tools")
# Categorize tools
categories = manager.list_tools_by_category()
for category, tools in categories.items():
if tools:
print(f"{category.title()}: {tools}")
# Get available servers
servers = manager.get_available_servers()
print(f"Available servers: {servers}")
```
## Error Handling and Resilience
```python
class ResilientAOPCluster:
def __init__(self, urls, transport="streamable-http"):
self.urls = urls
self.transport = transport
self.cluster = AOPCluster(urls, transport)
self.failed_servers = set()
def get_tools_with_fallback(self, output_type="dict"):
"""Get tools with fallback for failed servers"""
try:
return self.cluster.get_tools(output_type=output_type)
except Exception as e:
print(f"Error getting tools: {e}")
# Try individual servers
all_tools = []
for url in self.urls:
if url in self.failed_servers:
continue
try:
single_cluster = AOPCluster([url], self.transport)
tools = single_cluster.get_tools(output_type=output_type)
all_tools.extend(tools)
except Exception as server_error:
print(f"Server {url} failed: {server_error}")
self.failed_servers.add(url)
return all_tools
def find_tool_with_retry(self, tool_name, max_retries=3):
"""Find tool with retry logic"""
for attempt in range(max_retries):
try:
return self.cluster.find_tool_by_server_name(tool_name)
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
return None
# Usage
resilient_cluster = ResilientAOPCluster([
"http://localhost:8000/mcp",
"http://localhost:8001/mcp",
"http://localhost:8002/mcp"
])
# Get tools with error handling
tools = resilient_cluster.get_tools_with_fallback()
print(f"Retrieved {len(tools)} tools")
# Find tool with retry
research_tool = resilient_cluster.find_tool_with_retry("Research-Agent")
if research_tool:
print("Found Research-Agent tool")
else:
print("Research-Agent tool not found after retries")
```
## Monitoring and Health Checks
```python
class AOPClusterMonitor:
def __init__(self, cluster_manager):
self.manager = cluster_manager
self.health_status = {}
def check_server_health(self, url):
"""Check if a server is healthy"""
try:
single_cluster = AOPCluster([url], self.manager.cluster.transport)
tools = single_cluster.get_tools(output_type="dict")
return {
"status": "healthy",
"tool_count": len(tools),
"timestamp": time.time()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": time.time()
}
def check_all_servers(self):
"""Check health of all servers"""
for url in self.manager.cluster.urls:
health = self.check_server_health(url)
self.health_status[url] = health
status_icon = "✓" if health["status"] == "healthy" else "✗"
print(f"{status_icon} {url}: {health['status']}")
if health["status"] == "healthy":
print(f" Tools available: {health['tool_count']}")
else:
print(f" Error: {health['error']}")
def get_health_summary(self):
"""Get summary of server health"""
healthy_count = sum(1 for status in self.health_status.values()
if status["status"] == "healthy")
total_count = len(self.health_status)
return {
"healthy_servers": healthy_count,
"total_servers": total_count,
"health_percentage": (healthy_count / total_count) * 100 if total_count > 0 else 0
}
# Usage
monitor = AOPClusterMonitor(manager)
monitor.check_all_servers()
summary = monitor.get_health_summary()
print(f"Health Summary: {summary['healthy_servers']}/{summary['total_servers']} servers healthy ({summary['health_percentage']:.1f}%)")
```
## Complete Example
```python
import json
import time
from swarms.structs.aop import AOPCluster
def main():
# Initialize cluster
cluster = AOPCluster(
urls=[
"http://localhost:8000/mcp",
"http://localhost:8001/mcp",
"http://localhost:8002/mcp"
],
transport="streamable-http"
)
print("AOP Cluster Management System")
print("=" * 40)
# Get all tools
print("\n1. Discovering tools...")
tools = cluster.get_tools(output_type="dict")
print(f"Found {len(tools)} tools across all servers")
# List all tool names
tool_names = [tool.get("function", {}).get("name") for tool in tools]
print(f"Available tools: {tool_names}")
# Find specific tools
print("\n2. Finding specific tools...")
target_tools = ["Research-Agent", "Analysis-Agent", "Writing-Agent", "Code-Agent", "Financial-Agent"]
for tool_name in target_tools:
tool = cluster.find_tool_by_server_name(tool_name)
if tool:
print(f"✓ {tool_name}: Available")
else:
print(f"✗ {tool_name}: Not found")
# Display tool details
print("\n3. Tool details:")
for tool in tools[:3]: # Show first 3 tools
print(f"\nTool: {tool.get('function', {}).get('name')}")
print(f"Description: {tool.get('function', {}).get('description')}")
print(f"Parameters: {list(tool.get('function', {}).get('parameters', {}).get('properties', {}).keys())}")
print("\nAOP Cluster setup complete!")
if __name__ == "__main__":
main()
```
This example demonstrates comprehensive AOP cluster management including tool discovery, error handling, health monitoring, and advanced cluster operations.

@ -1,11 +1,68 @@
import json import json
import asyncio
from swarms.structs.aop import AOPCluster from swarms.structs.aop import AOPCluster
from swarms.tools.mcp_client_tools import execute_tool_call_simple
aop_cluster = AOPCluster(
urls=["http://localhost:8000/mcp"],
transport="streamable-http",
)
print(json.dumps(aop_cluster.get_tools(output_type="dict"), indent=4)) async def discover_agents_example():
print(aop_cluster.find_tool_by_server_name("Research-Agent")) """Example of how to call the discover_agents tool."""
# Create AOP cluster connection
aop_cluster = AOPCluster(
urls=["http://localhost:5932/mcp"],
transport="streamable-http",
)
# Check if discover_agents tool is available
discover_tool = aop_cluster.find_tool_by_server_name(
"discover_agents"
)
if discover_tool:
try:
# Create the tool call request
tool_call_request = {
"type": "function",
"function": {
"name": "discover_agents",
"arguments": json.dumps(
{}
), # No specific agent name = get all
},
}
# Execute the tool call
result = await execute_tool_call_simple(
response=tool_call_request,
server_path="http://localhost:5932/mcp",
output_type="dict",
verbose=False,
)
print(json.dumps(result, indent=2))
# Parse the result
if isinstance(result, list) and len(result) > 0:
discovery_data = result[0]
if discovery_data.get("success"):
agents = discovery_data.get("agents", [])
return agents
else:
return None
else:
return None
except Exception:
return None
else:
return None
def main():
"""Main function to run the discovery example."""
# Run the async function
return asyncio.run(discover_agents_example())
if __name__ == "__main__":
main()

@ -1,318 +0,0 @@
# AOP Server Setup Example
This example demonstrates how to set up an AOP (Agent Orchestration Protocol) server with multiple specialized agents.
## Complete Server Setup
```python
from swarms import Agent
from swarms.structs.aop import AOP
# Create specialized agents
research_agent = Agent(
agent_name="Research-Agent",
agent_description="Expert in research, data collection, and information gathering",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a research specialist. Your role is to:
1. Gather comprehensive information on any given topic
2. Analyze data from multiple sources
3. Provide well-structured research findings
4. Cite sources and maintain accuracy
5. Present findings in a clear, organized manner
Always provide detailed, factual information with proper context.""",
)
analysis_agent = Agent(
agent_name="Analysis-Agent",
agent_description="Expert in data analysis, pattern recognition, and generating insights",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are an analysis specialist. Your role is to:
1. Analyze data and identify patterns
2. Generate actionable insights
3. Create visualizations and summaries
4. Provide statistical analysis
5. Make data-driven recommendations
Focus on extracting meaningful insights from information.""",
)
writing_agent = Agent(
agent_name="Writing-Agent",
agent_description="Expert in content creation, editing, and communication",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a writing specialist. Your role is to:
1. Create engaging, well-structured content
2. Edit and improve existing text
3. Adapt tone and style for different audiences
4. Ensure clarity and coherence
5. Follow best practices in writing
Always produce high-quality, professional content.""",
)
code_agent = Agent(
agent_name="Code-Agent",
agent_description="Expert in programming, code review, and software development",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a coding specialist. Your role is to:
1. Write clean, efficient code
2. Debug and fix issues
3. Review and optimize code
4. Explain programming concepts
5. Follow best practices and standards
Always provide working, well-documented code.""",
)
financial_agent = Agent(
agent_name="Financial-Agent",
agent_description="Expert in financial analysis, market research, and investment insights",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a financial specialist. Your role is to:
1. Analyze financial data and markets
2. Provide investment insights
3. Assess risk and opportunities
4. Create financial reports
5. Explain complex financial concepts
Always provide accurate, well-reasoned financial analysis.""",
)
# Create AOP instance
deployer = AOP(
server_name="MyAgentServer",
port=8000,
verbose=True,
log_level="INFO"
)
# Add all agents at once
agents = [
research_agent,
analysis_agent,
writing_agent,
code_agent,
financial_agent,
]
tool_names = deployer.add_agents_batch(agents)
print(f"Added {len(tool_names)} agents: {tool_names}")
# Display server information
server_info = deployer.get_server_info()
print(f"Server: {server_info['server_name']}")
print(f"Total tools: {server_info['total_tools']}")
print(f"Available tools: {server_info['tools']}")
# Start the server
print("Starting AOP server...")
deployer.run()
```
## Running the Server
1. Save the code above to a file (e.g., `aop_server.py`)
2. Install required dependencies:
```bash
pip install swarms
```
3. Run the server:
```bash
python aop_server.py
```
The server will start on `http://localhost:8000` and make all agents available as MCP tools.
## Tool Usage Examples
Once the server is running, you can call the tools using MCP clients:
### Research Agent
```python
# Call the research agent
result = research_tool(task="Research the latest AI trends in 2024")
print(result)
```
### Analysis Agent with Image
```python
# Call the analysis agent with an image
result = analysis_tool(
task="Analyze this chart and provide insights",
img="path/to/chart.png"
)
print(result)
```
### Writing Agent with Multiple Images
```python
# Call the writing agent with multiple images
result = writing_tool(
task="Write a comprehensive report based on these images",
imgs=["image1.jpg", "image2.jpg", "image3.jpg"]
)
print(result)
```
### Code Agent with Validation
```python
# Call the code agent with expected output
result = code_tool(
task="Debug this Python function",
correct_answer="Expected output: Hello World"
)
print(result)
```
### Financial Agent
```python
# Call the financial agent
result = financial_tool(task="Analyze the current market trends for tech stocks")
print(result)
```
## Response Format
All tools return a standardized response:
```json
{
"result": "The agent's response to the task",
"success": true,
"error": null
}
```
## Advanced Configuration
### Custom Timeouts and Retries
```python
# Add agent with custom configuration
deployer.add_agent(
agent=research_agent,
tool_name="custom_research_tool",
tool_description="Research tool with extended timeout",
timeout=120, # 2 minutes
max_retries=5,
verbose=True
)
```
### Custom Input/Output Schemas
```python
# Define custom schemas
custom_input_schema = {
"type": "object",
"properties": {
"task": {"type": "string", "description": "The research task"},
"sources": {
"type": "array",
"items": {"type": "string"},
"description": "Specific sources to research"
},
"depth": {
"type": "string",
"enum": ["shallow", "medium", "deep"],
"description": "Research depth level"
}
},
"required": ["task"]
}
# Add agent with custom schemas
deployer.add_agent(
agent=research_agent,
tool_name="advanced_research_tool",
input_schema=custom_input_schema,
timeout=60
)
```
## Monitoring and Debugging
### Enable Verbose Logging
```python
deployer = AOP(
server_name="DebugServer",
verbose=True,
traceback_enabled=True,
log_level="DEBUG"
)
```
### Check Server Status
```python
# List all registered agents
agents = deployer.list_agents()
print(f"Registered agents: {agents}")
# Get detailed agent information
for agent_name in agents:
info = deployer.get_agent_info(agent_name)
print(f"Agent {agent_name}: {info}")
# Get server information
server_info = deployer.get_server_info()
print(f"Server info: {server_info}")
```
## Production Deployment
### External Access
```python
deployer = AOP(
server_name="ProductionServer",
host="0.0.0.0", # Allow external connections
port=8000,
verbose=False, # Disable verbose logging in production
log_level="WARNING"
)
```
### Multiple Servers
```python
# Server 1: Research and Analysis
research_deployer = AOP("ResearchServer", port=8000)
research_deployer.add_agent(research_agent)
research_deployer.add_agent(analysis_agent)
# Server 2: Writing and Code
content_deployer = AOP("ContentServer", port=8001)
content_deployer.add_agent(writing_agent)
content_deployer.add_agent(code_agent)
# Server 3: Financial
finance_deployer = AOP("FinanceServer", port=8002)
finance_deployer.add_agent(financial_agent)
# Start all servers
import threading
threading.Thread(target=research_deployer.run).start()
threading.Thread(target=content_deployer.run).start()
threading.Thread(target=finance_deployer.run).start()
```
This example demonstrates a complete AOP server setup with multiple specialized agents, proper configuration, and production-ready deployment options.

@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
Example showing how agents can use the discovery tool to learn about each other
and collaborate more effectively.
"""
from swarms import Agent
from swarms.structs.aop import AOP
def simulate_agent_discovery():
"""Simulate how an agent would use the discovery tool."""
# Create a sample agent that will use the discovery tool
coordinator_agent = Agent(
agent_name="ProjectCoordinator",
agent_description="Coordinates projects and assigns tasks to other agents",
system_prompt="You are a project coordinator who helps organize work and delegate tasks to the most appropriate team members. You can discover information about other agents to make better decisions.",
model_name="gpt-4o-mini",
temperature=0.4,
)
# Create the AOP cluster
aop = AOP(
server_name="Project Team",
description="A team of specialized agents for project coordination",
verbose=True,
)
# Add some specialized agents
data_agent = Agent(
agent_name="DataSpecialist",
agent_description="Handles all data-related tasks and analysis",
system_prompt="You are a data specialist with expertise in data processing, analysis, and visualization. You work with large datasets and create insights.",
tags=["data", "analysis", "python", "sql", "statistics"],
capabilities=[
"data_processing",
"statistical_analysis",
"visualization",
],
role="specialist",
)
code_agent = Agent(
agent_name="CodeSpecialist",
agent_description="Handles all coding and development tasks",
system_prompt="You are a software development specialist who writes clean, efficient code and follows best practices. You handle both frontend and backend development.",
tags=[
"coding",
"development",
"python",
"javascript",
"react",
],
capabilities=[
"software_development",
"code_review",
"debugging",
],
role="developer",
)
writing_agent = Agent(
agent_name="ContentSpecialist",
agent_description="Creates and manages all written content",
system_prompt="You are a content specialist who creates engaging written content, documentation, and marketing materials. You ensure all content is clear and compelling.",
tags=["writing", "content", "documentation", "marketing"],
capabilities=[
"content_creation",
"technical_writing",
"editing",
],
role="writer",
)
# Add agents to the cluster
aop.add_agent(data_agent, tool_name="data_specialist")
aop.add_agent(code_agent, tool_name="code_specialist")
aop.add_agent(writing_agent, tool_name="content_specialist")
print("🏢 Project Team AOP Cluster Created!")
print(f"👥 Team members: {aop.list_agents()}")
print()
# Simulate the coordinator discovering team members
print("🔍 Project Coordinator discovering team capabilities...")
print()
# Get discovery info for each agent
for tool_name in aop.list_agents():
if (
tool_name != "discover_agents"
): # Skip the discovery tool itself
agent_info = aop._get_agent_discovery_info(tool_name)
if agent_info:
print(f"📋 {agent_info['agent_name']}:")
print(f" Description: {agent_info['description']}")
print(f" Role: {agent_info['role']}")
print(f" Tags: {', '.join(agent_info['tags'])}")
print(
f" Capabilities: {', '.join(agent_info['capabilities'])}"
)
print(
f" System Prompt: {agent_info['short_system_prompt'][:100]}..."
)
print()
print("💡 How agents would use this in practice:")
print(" 1. Agent calls 'discover_agents' MCP tool")
print(" 2. Gets information about all available agents")
print(
" 3. Uses this info to make informed decisions about task delegation"
)
print(
" 4. Can discover specific agents by name for targeted collaboration"
)
print()
# Show what the MCP tool response would look like
print("📡 Sample MCP tool response structure:")
sample_response = {
"success": True,
"agents": [
{
"tool_name": "data_specialist",
"agent_name": "DataSpecialist",
"description": "Handles all data-related tasks and analysis",
"short_system_prompt": "You are a data specialist with expertise in data processing, analysis, and visualization...",
"tags": [
"data",
"analysis",
"python",
"sql",
"statistics",
],
"capabilities": [
"data_processing",
"statistical_analysis",
"visualization",
],
"role": "specialist",
"model_name": "gpt-4o-mini",
"max_loops": 1,
"temperature": 0.5,
"max_tokens": 4096,
}
],
}
print(" discover_agents() -> {")
print(" 'success': True,")
print(" 'agents': [")
print(" {")
print(" 'tool_name': 'data_specialist',")
print(" 'agent_name': 'DataSpecialist',")
print(
" 'description': 'Handles all data-related tasks...',"
)
print(
" 'short_system_prompt': 'You are a data specialist...',"
)
print(" 'tags': ['data', 'analysis', 'python'],")
print(
" 'capabilities': ['data_processing', 'statistics'],"
)
print(" 'role': 'specialist',")
print(" ...")
print(" }")
print(" ]")
print(" }")
print()
print("✅ Agent discovery system ready for collaborative work!")
if __name__ == "__main__":
simulate_agent_discovery()

@ -0,0 +1,117 @@
#!/usr/bin/env python3
"""
Example demonstrating the new agent discovery MCP tool in AOP.
This example shows how agents can discover information about each other
using the new 'discover_agents' MCP tool.
"""
from swarms import Agent
from swarms.structs.aop import AOP
def main():
"""Demonstrate the agent discovery functionality."""
# Create some sample agents with different configurations
agent1 = Agent(
agent_name="DataAnalyst",
agent_description="Specialized in data analysis and visualization",
system_prompt="You are a data analyst with expertise in Python, pandas, and statistical analysis. You help users understand data patterns and create visualizations.",
tags=["data", "analysis", "python", "pandas"],
capabilities=["data_analysis", "visualization", "statistics"],
role="analyst",
model_name="gpt-4o-mini",
temperature=0.3,
)
agent2 = Agent(
agent_name="CodeReviewer",
agent_description="Expert code reviewer and quality assurance specialist",
system_prompt="You are a senior software engineer who specializes in code review, best practices, and quality assurance. You help identify bugs, suggest improvements, and ensure code follows industry standards.",
tags=["code", "review", "quality", "python", "javascript"],
capabilities=[
"code_review",
"quality_assurance",
"best_practices",
],
role="reviewer",
model_name="gpt-4o-mini",
temperature=0.2,
)
agent3 = Agent(
agent_name="CreativeWriter",
agent_description="Creative content writer and storyteller",
system_prompt="You are a creative writer who specializes in storytelling, content creation, and engaging narratives. You help create compelling stories, articles, and marketing content.",
tags=["writing", "creative", "content", "storytelling"],
capabilities=[
"creative_writing",
"content_creation",
"storytelling",
],
role="writer",
model_name="gpt-4o-mini",
temperature=0.8,
)
# Create AOP cluster with the agents
aop = AOP(
server_name="Agent Discovery Demo",
description="A demo cluster showing agent discovery capabilities",
agents=[agent1, agent2, agent3],
verbose=True,
)
print("🚀 AOP Cluster initialized with agent discovery tool!")
print(f"📊 Total agents registered: {len(aop.agents)}")
print(f"🔧 Available tools: {aop.list_agents()}")
print()
# Demonstrate the discovery tool
print("🔍 Testing agent discovery functionality...")
print()
# Test discovering all agents
print("1. Discovering all agents:")
all_agents_info = aop._get_agent_discovery_info(
"DataAnalyst"
) # This would normally be called via MCP
print(
f" Found agent: {all_agents_info['agent_name'] if all_agents_info else 'None'}"
)
print()
# Show what the MCP tool would return
print("2. What the 'discover_agents' MCP tool would return:")
print(" - Tool name: discover_agents")
print(
" - Description: Discover information about other agents in the cluster"
)
print(" - Parameters: agent_name (optional)")
print(
" - Returns: Agent info including name, description, short system prompt, tags, capabilities, role, etc."
)
print()
# Show sample agent info structure
if all_agents_info:
print("3. Sample agent discovery info structure:")
for key, value in all_agents_info.items():
if key == "short_system_prompt":
print(f" {key}: {value[:100]}...")
else:
print(f" {key}: {value}")
print()
print("✅ Agent discovery tool successfully integrated!")
print(
"💡 Agents can now use the 'discover_agents' MCP tool to learn about each other."
)
print(
"🔄 The tool is automatically updated when new agents are added to the cluster."
)
if __name__ == "__main__":
main()

@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""
Simple example showing how to call the discover_agents tool synchronously.
"""
import json
import asyncio
from swarms.structs.aop import AOPCluster
from swarms.tools.mcp_client_tools import execute_tool_call_simple
def call_discover_agents_sync(server_url="http://localhost:5932/mcp"):
"""
Synchronously call the discover_agents tool.
Args:
server_url: URL of the MCP server
Returns:
Dict containing the discovery results
"""
# Create the tool call request
tool_call_request = {
"type": "function",
"function": {
"name": "discover_agents",
"arguments": json.dumps({}), # Empty = get all agents
},
}
# Run the async function
return asyncio.run(
execute_tool_call_simple(
response=tool_call_request,
server_path=server_url,
output_type="dict",
)
)
def call_discover_specific_agent_sync(
agent_name, server_url="http://localhost:5932/mcp"
):
"""
Synchronously call the discover_agents tool for a specific agent.
Args:
agent_name: Name of the specific agent to discover
server_url: URL of the MCP server
Returns:
Dict containing the discovery results
"""
# Create the tool call request
tool_call_request = {
"type": "function",
"function": {
"name": "discover_agents",
"arguments": json.dumps({"agent_name": agent_name}),
},
}
# Run the async function
return asyncio.run(
execute_tool_call_simple(
response=tool_call_request,
server_path=server_url,
output_type="dict",
)
)
def main():
"""Main function demonstrating discovery tool usage."""
print("🔍 AOP Agent Discovery Tool Example")
print("=" * 40)
print()
# First, check what tools are available
print("1. Checking available MCP tools...")
aop_cluster = AOPCluster(
urls=["http://localhost:5932/mcp"],
transport="streamable-http",
)
tools = aop_cluster.get_tools(output_type="dict")
print(f" Found {len(tools)} tools")
# Check if discover_agents is available
discover_tool = aop_cluster.find_tool_by_server_name(
"discover_agents"
)
if not discover_tool:
print("❌ discover_agents tool not found!")
print(
" Make sure your AOP server is running with agents registered."
)
return
print("✅ discover_agents tool found!")
print()
# Discover all agents
print("2. Discovering all agents...")
try:
result = call_discover_agents_sync()
if isinstance(result, list) and len(result) > 0:
discovery_data = result[0]
if discovery_data.get("success"):
agents = discovery_data.get("agents", [])
print(f" ✅ Found {len(agents)} agents:")
for i, agent in enumerate(agents, 1):
print(
f" {i}. {agent.get('agent_name', 'Unknown')}"
)
print(
f" Role: {agent.get('role', 'worker')}"
)
print(
f" Description: {agent.get('description', 'No description')}"
)
print(
f" Tags: {', '.join(agent.get('tags', []))}"
)
print(
f" Capabilities: {', '.join(agent.get('capabilities', []))}"
)
print(
f" System Prompt: {agent.get('short_system_prompt', 'No prompt')[:100]}..."
)
print()
else:
print(
f" ❌ Discovery failed: {discovery_data.get('error', 'Unknown error')}"
)
else:
print(" ❌ No valid result returned")
except Exception as e:
print(f" ❌ Error: {e}")
print()
# Example of discovering a specific agent (if any exist)
print("3. Example: Discovering a specific agent...")
try:
# Try to discover the first agent specifically
if isinstance(result, list) and len(result) > 0:
discovery_data = result[0]
if discovery_data.get("success") and discovery_data.get(
"agents"
):
first_agent_name = discovery_data["agents"][0].get(
"agent_name"
)
if first_agent_name:
print(
f" Looking for specific agent: {first_agent_name}"
)
specific_result = (
call_discover_specific_agent_sync(
first_agent_name
)
)
if (
isinstance(specific_result, list)
and len(specific_result) > 0
):
specific_data = specific_result[0]
if specific_data.get("success"):
agent = specific_data.get("agents", [{}])[
0
]
print(
f" ✅ Found specific agent: {agent.get('agent_name', 'Unknown')}"
)
print(
f" Model: {agent.get('model_name', 'Unknown')}"
)
print(
f" Max Loops: {agent.get('max_loops', 1)}"
)
print(
f" Temperature: {agent.get('temperature', 0.5)}"
)
else:
print(
f" ❌ Specific discovery failed: {specific_data.get('error')}"
)
else:
print(" ❌ No valid specific result")
else:
print(
" ⚠️ No agents found to test specific discovery"
)
else:
print(
" ⚠️ No agents available for specific discovery"
)
else:
print(
" ⚠️ No previous discovery results to use for specific discovery"
)
except Exception as e:
print(f" ❌ Error in specific discovery: {e}")
print()
print("✅ Discovery tool demonstration complete!")
print()
print("💡 Usage Summary:")
print(
" • Call discover_agents() with no arguments to get all agents"
)
print(
" • Call discover_agents(agent_name='AgentName') to get specific agent"
)
print(
" • Each agent returns: name, description, role, tags, capabilities, system prompt, etc."
)
if __name__ == "__main__":
main()

@ -0,0 +1,198 @@
#!/usr/bin/env python3
"""
Test script to verify the agent discovery functionality works correctly.
"""
import sys
import os
# Add the swarms directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "swarms"))
from swarms import Agent
from swarms.structs.aop import AOP
def test_agent_discovery():
"""Test the agent discovery functionality."""
print("🧪 Testing AOP Agent Discovery Functionality")
print("=" * 50)
# Create test agents
agent1 = Agent(
agent_name="TestAgent1",
agent_description="First test agent for discovery",
system_prompt="This is a test agent with a very long system prompt that should be truncated to 200 characters when returned by the discovery tool. This prompt contains detailed instructions about how the agent should behave and what tasks it can perform.",
tags=["test", "agent", "discovery"],
capabilities=["testing", "validation"],
role="tester",
)
agent2 = Agent(
agent_name="TestAgent2",
agent_description="Second test agent for discovery",
system_prompt="Another test agent with different capabilities and a shorter prompt.",
tags=["test", "agent", "analysis"],
capabilities=["analysis", "reporting"],
role="analyst",
)
# Create AOP cluster
aop = AOP(
server_name="Test Cluster",
description="Test cluster for agent discovery",
verbose=False,
)
# Add agents
aop.add_agent(agent1, tool_name="test_agent_1")
aop.add_agent(agent2, tool_name="test_agent_2")
print(f"✅ Created AOP cluster with {len(aop.agents)} agents")
print(f"📋 Available tools: {aop.list_agents()}")
print()
# Test discovery functionality
print("🔍 Testing agent discovery...")
# Test getting info for specific agent
agent1_info = aop._get_agent_discovery_info("test_agent_1")
assert (
agent1_info is not None
), "Should be able to get info for test_agent_1"
assert (
agent1_info["agent_name"] == "TestAgent1"
), "Agent name should match"
assert (
agent1_info["description"] == "First test agent for discovery"
), "Description should match"
assert (
len(agent1_info["short_system_prompt"]) <= 203
), "System prompt should be truncated to ~200 chars"
assert "test" in agent1_info["tags"], "Tags should include 'test'"
assert (
"testing" in agent1_info["capabilities"]
), "Capabilities should include 'testing'"
assert agent1_info["role"] == "tester", "Role should be 'tester'"
print("✅ Specific agent discovery test passed")
# Test getting info for non-existent agent
non_existent_info = aop._get_agent_discovery_info(
"non_existent_agent"
)
assert (
non_existent_info is None
), "Should return None for non-existent agent"
print("✅ Non-existent agent test passed")
# Test that discovery tool is registered
# Note: In a real scenario, this would be tested via MCP tool calls
# For now, we just verify the method exists and works
try:
# This simulates what the MCP tool would do
discovery_result = {"success": True, "agents": []}
for tool_name in aop.agents.keys():
agent_info = aop._get_agent_discovery_info(tool_name)
if agent_info:
discovery_result["agents"].append(agent_info)
assert (
len(discovery_result["agents"]) == 2
), "Should discover both agents"
assert (
discovery_result["success"] is True
), "Discovery should be successful"
print("✅ Discovery tool simulation test passed")
except Exception as e:
print(f"❌ Discovery tool test failed: {e}")
return False
# Test system prompt truncation
long_prompt = "A" * 300 # 300 character string
agent_with_long_prompt = Agent(
agent_name="LongPromptAgent",
agent_description="Agent with very long system prompt",
system_prompt=long_prompt,
)
aop.add_agent(
agent_with_long_prompt, tool_name="long_prompt_agent"
)
long_prompt_info = aop._get_agent_discovery_info(
"long_prompt_agent"
)
assert (
long_prompt_info is not None
), "Should get info for long prompt agent"
assert (
len(long_prompt_info["short_system_prompt"]) == 203
), "Should truncate to 200 chars + '...'"
assert long_prompt_info["short_system_prompt"].endswith(
"..."
), "Should end with '...'"
print("✅ System prompt truncation test passed")
# Test with missing attributes
minimal_agent = Agent(
agent_name="MinimalAgent",
# No description, tags, capabilities, or role specified
)
aop.add_agent(minimal_agent, tool_name="minimal_agent")
minimal_info = aop._get_agent_discovery_info("minimal_agent")
assert (
minimal_info is not None
), "Should get info for minimal agent"
assert (
minimal_info["description"] == "No description available"
), "Should have default description"
assert minimal_info["tags"] == [], "Should have empty tags list"
assert (
minimal_info["capabilities"] == []
), "Should have empty capabilities list"
assert (
minimal_info["role"] == "worker"
), "Should have default role"
print("✅ Minimal agent attributes test passed")
print()
print(
"🎉 All tests passed! Agent discovery functionality is working correctly."
)
print()
print("📊 Summary of discovered agents:")
for tool_name in aop.agents.keys():
info = aop._get_agent_discovery_info(tool_name)
if info:
print(
f"{info['agent_name']} ({info['role']}) - {info['description']}"
)
return True
if __name__ == "__main__":
try:
success = test_agent_discovery()
if success:
print("\n✅ All tests completed successfully!")
sys.exit(0)
else:
print("\n❌ Some tests failed!")
sys.exit(1)
except Exception as e:
print(f"\n💥 Test failed with exception: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

@ -0,0 +1,225 @@
#!/usr/bin/env python3
"""
Example demonstrating the new agent information tools in AOP.
This example shows how to use the new MCP tools for getting agent information.
"""
import json
import asyncio
from swarms.structs.aop import AOPCluster
from swarms.tools.mcp_client_tools import execute_tool_call_simple
async def demonstrate_new_agent_tools():
"""Demonstrate the new agent information tools."""
# Create AOP cluster connection
aop_cluster = AOPCluster(
urls=["http://localhost:5932/mcp"],
transport="streamable-http",
)
print("🔧 New AOP Agent Information Tools Demo")
print("=" * 50)
print()
# 1. List all agents
print("1. Listing all agents...")
try:
tool_call = {
"type": "function",
"function": {"name": "list_agents", "arguments": "{}"},
}
result = await execute_tool_call_simple(
response=tool_call,
server_path="http://localhost:5932/mcp",
output_type="dict",
verbose=False,
)
if isinstance(result, list) and len(result) > 0:
data = result[0]
if data.get("success"):
agent_names = data.get("agent_names", [])
print(
f" Found {len(agent_names)} agents: {agent_names}"
)
else:
print(f" Error: {data.get('error')}")
else:
print(" No valid result returned")
except Exception as e:
print(f" Error: {e}")
print()
# 2. Get details for a specific agent
print("2. Getting details for a specific agent...")
try:
tool_call = {
"type": "function",
"function": {
"name": "get_agent_details",
"arguments": json.dumps(
{"agent_name": "Research-Agent"}
),
},
}
result = await execute_tool_call_simple(
response=tool_call,
server_path="http://localhost:5932/mcp",
output_type="dict",
verbose=False,
)
if isinstance(result, list) and len(result) > 0:
data = result[0]
if data.get("success"):
agent_info = data.get("agent_info", {})
discovery_info = data.get("discovery_info", {})
print(
f" Agent: {discovery_info.get('agent_name', 'Unknown')}"
)
print(
f" Description: {discovery_info.get('description', 'No description')}"
)
print(
f" Model: {discovery_info.get('model_name', 'Unknown')}"
)
print(f" Tags: {discovery_info.get('tags', [])}")
print(
f" Capabilities: {discovery_info.get('capabilities', [])}"
)
else:
print(f" Error: {data.get('error')}")
else:
print(" No valid result returned")
except Exception as e:
print(f" Error: {e}")
print()
# 3. Get info for multiple agents
print("3. Getting info for multiple agents...")
try:
tool_call = {
"type": "function",
"function": {
"name": "get_agents_info",
"arguments": json.dumps(
{
"agent_names": [
"Research-Agent",
"DataAnalyst",
"Writer",
]
}
),
},
}
result = await execute_tool_call_simple(
response=tool_call,
server_path="http://localhost:5932/mcp",
output_type="dict",
verbose=False,
)
if isinstance(result, list) and len(result) > 0:
data = result[0]
if data.get("success"):
agents_info = data.get("agents_info", [])
not_found = data.get("not_found", [])
print(
f" Found {len(agents_info)} agents out of {data.get('total_requested', 0)} requested"
)
for agent in agents_info:
discovery_info = agent.get("discovery_info", {})
print(
f"{discovery_info.get('agent_name', 'Unknown')}: {discovery_info.get('description', 'No description')}"
)
if not_found:
print(f" Not found: {not_found}")
else:
print(f" Error: {data.get('error')}")
else:
print(" No valid result returned")
except Exception as e:
print(f" Error: {e}")
print()
# 4. Search for agents
print("4. Searching for agents...")
try:
tool_call = {
"type": "function",
"function": {
"name": "search_agents",
"arguments": json.dumps(
{
"query": "data",
"search_fields": [
"name",
"description",
"tags",
"capabilities",
],
}
),
},
}
result = await execute_tool_call_simple(
response=tool_call,
server_path="http://localhost:5932/mcp",
output_type="dict",
verbose=False,
)
if isinstance(result, list) and len(result) > 0:
data = result[0]
if data.get("success"):
matching_agents = data.get("matching_agents", [])
print(
f" Found {len(matching_agents)} agents matching 'data'"
)
for agent in matching_agents:
print(
f"{agent.get('agent_name', 'Unknown')}: {agent.get('description', 'No description')}"
)
print(f" Tags: {agent.get('tags', [])}")
print(
f" Capabilities: {agent.get('capabilities', [])}"
)
else:
print(f" Error: {data.get('error')}")
else:
print(" No valid result returned")
except Exception as e:
print(f" Error: {e}")
print()
print("✅ New agent tools demonstration complete!")
print()
print("💡 Available Tools:")
print(
" • discover_agents - Get discovery info for all or specific agents"
)
print(
" • get_agent_details - Get detailed info for a single agent"
)
print(
" • get_agents_info - Get detailed info for multiple agents"
)
print(" • list_agents - Get simple list of all agent names")
print(" • search_agents - Search agents by keywords")
def main():
"""Main function to run the demonstration."""
asyncio.run(demonstrate_new_agent_tools())
if __name__ == "__main__":
main()

@ -90,7 +90,7 @@ financial_agent = Agent(
) )
# Basic usage - individual agent addition # Basic usage - individual agent addition
deployer = AOP("MyAgentServer", verbose=True) deployer = AOP("MyAgentServer", verbose=True, port=5932)
agents = [ agents = [
research_agent, research_agent,
@ -102,17 +102,5 @@ agents = [
deployer.add_agents_batch(agents) deployer.add_agents_batch(agents)
# Example usage with different parameters
# The tools now accept: task, img, imgs, correct_answer parameters
# task: str (required) - The main task or prompt
# img: str (optional) - Single image to process
# imgs: List[str] (optional) - Multiple images to process
# correct_answer: str (optional) - Correct answer for validation
# Example calls that would be made to the MCP tools:
# research_tool(task="Research the latest AI trends")
# analysis_tool(task="Analyze this data", img="path/to/image.jpg")
# writing_tool(task="Write a blog post", imgs=["img1.jpg", "img2.jpg"])
# code_tool(task="Debug this code", correct_answer="expected_output")
deployer.run() deployer.run()

@ -1,5 +1,4 @@
import json import json
import os
from contextlib import suppress from contextlib import suppress
from typing import Any, Callable, Dict, Optional, Type, Union from typing import Any, Callable, Dict, Optional, Type, Union

@ -1,4 +1,3 @@
import os
from dotenv import load_dotenv from dotenv import load_dotenv
# Swarm imports # Swarm imports

@ -1,4 +1,3 @@
import os
from dotenv import load_dotenv from dotenv import load_dotenv
# Swarm imports # Swarm imports

@ -114,6 +114,9 @@ class AOP:
logger.info(f"Adding {len(agents)} initial agents") logger.info(f"Adding {len(agents)} initial agents")
self.add_agents_batch(agents) self.add_agents_batch(agents)
# Register the agent discovery tool
self._register_agent_discovery_tool()
def add_agent( def add_agent(
self, self,
agent: AgentType, agent: AgentType,
@ -242,6 +245,9 @@ class AOP:
# Register the tool with the MCP server # Register the tool with the MCP server
self._register_tool(tool_name, agent) self._register_tool(tool_name, agent)
# Re-register the discovery tool to include the new agent
self._register_agent_discovery_tool()
logger.info( logger.info(
f"Added agent '{agent.agent_name}' as tool '{tool_name}' (verbose={verbose}, traceback={traceback_enabled})" f"Added agent '{agent.agent_name}' as tool '{tool_name}' (verbose={verbose}, traceback={traceback_enabled})"
) )
@ -344,6 +350,9 @@ class AOP:
) )
registered_tools.append(tool_name) registered_tools.append(tool_name)
# Re-register the discovery tool to include all new agents
self._register_agent_discovery_tool()
logger.info( logger.info(
f"Added {len(agents)} agents as tools: {registered_tools}" f"Added {len(agents)} agents as tools: {registered_tools}"
) )
@ -598,6 +607,373 @@ class AOP:
return info return info
def _register_agent_discovery_tool(self) -> None:
"""
Register the agent discovery tools that allow agents to learn about each other.
"""
@self.mcp_server.tool(
name="discover_agents",
description="Discover information about other agents in the cluster including their name, description, system prompt (truncated to 200 chars), and tags.",
)
def discover_agents(agent_name: str = None) -> Dict[str, Any]:
"""
Discover information about agents in the cluster.
Args:
agent_name: Optional specific agent name to get info for. If None, returns info for all agents.
Returns:
Dict containing agent information for discovery
"""
try:
if agent_name:
# Get specific agent info
if agent_name not in self.agents:
return {
"success": False,
"error": f"Agent '{agent_name}' not found",
"agents": [],
}
agent_info = self._get_agent_discovery_info(
agent_name
)
return {
"success": True,
"agents": [agent_info] if agent_info else [],
}
else:
# Get all agents info
all_agents_info = []
for tool_name in self.agents.keys():
agent_info = self._get_agent_discovery_info(
tool_name
)
if agent_info:
all_agents_info.append(agent_info)
return {
"success": True,
"agents": all_agents_info,
}
except Exception as e:
error_msg = str(e)
logger.error(
f"Error in discover_agents tool: {error_msg}"
)
return {
"success": False,
"error": error_msg,
"agents": [],
}
@self.mcp_server.tool(
name="get_agent_details",
description="Get detailed information about a single agent by name including configuration, capabilities, and metadata.",
)
def get_agent_details(agent_name: str) -> Dict[str, Any]:
"""
Get detailed information about a specific agent.
Args:
agent_name: Name of the agent to get information for.
Returns:
Dict containing detailed agent information
"""
try:
if agent_name not in self.agents:
return {
"success": False,
"error": f"Agent '{agent_name}' not found",
"agent_info": None,
}
agent_info = self.get_agent_info(agent_name)
discovery_info = self._get_agent_discovery_info(
agent_name
)
return {
"success": True,
"agent_info": agent_info,
"discovery_info": discovery_info,
}
except Exception as e:
error_msg = str(e)
logger.error(
f"Error in get_agent_details tool: {error_msg}"
)
return {
"success": False,
"error": error_msg,
"agent_info": None,
}
@self.mcp_server.tool(
name="get_agents_info",
description="Get detailed information about multiple agents by providing a list of agent names.",
)
def get_agents_info(agent_names: List[str]) -> Dict[str, Any]:
"""
Get detailed information about multiple agents.
Args:
agent_names: List of agent names to get information for.
Returns:
Dict containing detailed information for all requested agents
"""
try:
if not agent_names:
return {
"success": False,
"error": "No agent names provided",
"agents_info": [],
}
agents_info = []
not_found = []
for agent_name in agent_names:
if agent_name in self.agents:
agent_info = self.get_agent_info(agent_name)
discovery_info = (
self._get_agent_discovery_info(agent_name)
)
agents_info.append(
{
"agent_name": agent_name,
"agent_info": agent_info,
"discovery_info": discovery_info,
}
)
else:
not_found.append(agent_name)
return {
"success": True,
"agents_info": agents_info,
"not_found": not_found,
"total_found": len(agents_info),
"total_requested": len(agent_names),
}
except Exception as e:
error_msg = str(e)
logger.error(
f"Error in get_agents_info tool: {error_msg}"
)
return {
"success": False,
"error": error_msg,
"agents_info": [],
}
@self.mcp_server.tool(
name="list_agents",
description="Get a simple list of all available agent names in the cluster.",
)
def list_agents() -> Dict[str, Any]:
"""
Get a list of all available agent names.
Returns:
Dict containing the list of agent names
"""
try:
agent_names = self.list_agents()
return {
"success": True,
"agent_names": agent_names,
"total_count": len(agent_names),
}
except Exception as e:
error_msg = str(e)
logger.error(
f"Error in list_agents tool: {error_msg}"
)
return {
"success": False,
"error": error_msg,
"agent_names": [],
}
@self.mcp_server.tool(
name="search_agents",
description="Search for agents by name, description, tags, or capabilities using keyword matching.",
)
def search_agents(
query: str, search_fields: List[str] = None
) -> Dict[str, Any]:
"""
Search for agents using keyword matching.
Args:
query: Search query string
search_fields: Optional list of fields to search in (name, description, tags, capabilities).
If None, searches all fields.
Returns:
Dict containing matching agents
"""
try:
if not query:
return {
"success": False,
"error": "No search query provided",
"matching_agents": [],
}
# Default search fields
if search_fields is None:
search_fields = [
"name",
"description",
"tags",
"capabilities",
]
query_lower = query.lower()
matching_agents = []
for tool_name in self.agents.keys():
discovery_info = self._get_agent_discovery_info(
tool_name
)
if not discovery_info:
continue
match_found = False
# Search in specified fields
for field in search_fields:
if (
field == "name"
and query_lower
in discovery_info.get(
"agent_name", ""
).lower()
):
match_found = True
break
elif (
field == "description"
and query_lower
in discovery_info.get(
"description", ""
).lower()
):
match_found = True
break
elif field == "tags":
tags = discovery_info.get("tags", [])
if any(
query_lower in tag.lower()
for tag in tags
):
match_found = True
break
elif field == "capabilities":
capabilities = discovery_info.get(
"capabilities", []
)
if any(
query_lower in capability.lower()
for capability in capabilities
):
match_found = True
break
if match_found:
matching_agents.append(discovery_info)
return {
"success": True,
"matching_agents": matching_agents,
"total_matches": len(matching_agents),
"query": query,
"search_fields": search_fields,
}
except Exception as e:
error_msg = str(e)
logger.error(
f"Error in search_agents tool: {error_msg}"
)
return {
"success": False,
"error": error_msg,
"matching_agents": [],
}
def _get_agent_discovery_info(
self, tool_name: str
) -> Optional[Dict[str, Any]]:
"""
Get discovery information for a specific agent.
Args:
tool_name: Name of the agent tool
Returns:
Dict containing agent discovery information, or None if not found
"""
if tool_name not in self.agents:
return None
agent = self.agents[tool_name]
# Get system prompt and truncate to 200 characters
system_prompt = getattr(agent, "system_prompt", "")
short_system_prompt = (
system_prompt[:200] + "..."
if len(system_prompt) > 200
else system_prompt
)
# Get tags (if available)
tags = getattr(agent, "tags", [])
if not tags:
tags = []
# Get capabilities (if available)
capabilities = getattr(agent, "capabilities", [])
if not capabilities:
capabilities = []
# Get role (if available)
role = getattr(agent, "role", "worker")
# Get model name
model_name = getattr(agent, "model_name", "Unknown")
info = {
"tool_name": tool_name,
"agent_name": agent.agent_name,
"description": agent.agent_description
or "No description available",
"short_system_prompt": short_system_prompt,
"tags": tags,
"capabilities": capabilities,
"role": role,
"model_name": model_name,
"max_loops": getattr(agent, "max_loops", 1),
"temperature": getattr(agent, "temperature", 0.5),
"max_tokens": getattr(agent, "max_tokens", 4096),
}
if self.verbose:
logger.debug(
f"Retrieved discovery info for agent '{tool_name}': {info}"
)
return info
def start_server(self) -> None: def start_server(self) -> None:
""" """
Start the MCP server. Start the MCP server.

@ -1,5 +1,4 @@
import json import json
import os
from typing import List from typing import List

@ -346,6 +346,7 @@ async def aget_mcp_tools(
format: str = "openai", format: str = "openai",
connection: Optional[MCPConnection] = None, connection: Optional[MCPConnection] = None,
transport: Optional[str] = None, transport: Optional[str] = None,
verbose: bool = True,
*args, *args,
**kwargs, **kwargs,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
@ -356,15 +357,17 @@ async def aget_mcp_tools(
format (str): Format to return tools in ('openai' or 'mcp'). format (str): Format to return tools in ('openai' or 'mcp').
connection (Optional[MCPConnection]): Optional connection object. connection (Optional[MCPConnection]): Optional connection object.
transport (Optional[str]): Transport type. If None, auto-detects. transport (Optional[str]): Transport type. If None, auto-detects.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
List[Dict[str, Any]]: List of available MCP tools in OpenAI format. List[Dict[str, Any]]: List of available MCP tools in OpenAI format.
Raises: Raises:
MCPValidationError: If server_path is invalid. MCPValidationError: If server_path is invalid.
MCPConnectionError: If connection to server fails. MCPConnectionError: If connection to server fails.
""" """
logger.info( if verbose:
f"aget_mcp_tools called for server_path: {server_path}" logger.info(
) f"aget_mcp_tools called for server_path: {server_path}"
)
if transport is None: if transport is None:
transport = auto_detect_transport(server_path) transport = auto_detect_transport(server_path)
if exists(connection): if exists(connection):
@ -381,9 +384,10 @@ async def aget_mcp_tools(
server_path, server_path,
) )
url = server_path url = server_path
logger.info( if verbose:
f"Fetching MCP tools from server: {server_path} using transport: {transport}" logger.info(
) f"Fetching MCP tools from server: {server_path} using transport: {transport}"
)
try: try:
async with get_mcp_client( async with get_mcp_client(
transport, transport,
@ -402,9 +406,10 @@ async def aget_mcp_tools(
tools = await load_mcp_tools( tools = await load_mcp_tools(
session=session, format=format session=session, format=format
) )
logger.info( if verbose:
f"Successfully fetched {len(tools)} tools" logger.info(
) f"Successfully fetched {len(tools)} tools"
)
return tools return tools
except Exception as e: except Exception as e:
logger.error( logger.error(
@ -420,6 +425,7 @@ def get_mcp_tools_sync(
format: str = "openai", format: str = "openai",
connection: Optional[MCPConnection] = None, connection: Optional[MCPConnection] = None,
transport: Optional[str] = "streamable-http", transport: Optional[str] = "streamable-http",
verbose: bool = True,
*args, *args,
**kwargs, **kwargs,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
@ -430,6 +436,7 @@ def get_mcp_tools_sync(
format (str): Format to return tools in ('openai' or 'mcp'). format (str): Format to return tools in ('openai' or 'mcp').
connection (Optional[MCPConnection]): Optional connection object. connection (Optional[MCPConnection]): Optional connection object.
transport (Optional[str]): Transport type. If None, auto-detects. transport (Optional[str]): Transport type. If None, auto-detects.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
List[Dict[str, Any]]: List of available MCP tools in OpenAI format. List[Dict[str, Any]]: List of available MCP tools in OpenAI format.
Raises: Raises:
@ -437,9 +444,10 @@ def get_mcp_tools_sync(
MCPConnectionError: If connection to server fails. MCPConnectionError: If connection to server fails.
MCPExecutionError: If event loop management fails. MCPExecutionError: If event loop management fails.
""" """
logger.info( if verbose:
f"get_mcp_tools_sync called for server_path: {server_path}" logger.info(
) f"get_mcp_tools_sync called for server_path: {server_path}"
)
if transport is None: if transport is None:
transport = auto_detect_transport(server_path) transport = auto_detect_transport(server_path)
with get_or_create_event_loop() as loop: with get_or_create_event_loop() as loop:
@ -450,6 +458,7 @@ def get_mcp_tools_sync(
format=format, format=format,
connection=connection, connection=connection,
transport=transport, transport=transport,
verbose=verbose,
*args, *args,
**kwargs, **kwargs,
) )
@ -468,6 +477,7 @@ def _fetch_tools_for_server(
connection: Optional[MCPConnection] = None, connection: Optional[MCPConnection] = None,
format: str = "openai", format: str = "openai",
transport: Optional[str] = None, transport: Optional[str] = None,
verbose: bool = True,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Helper function to fetch tools for a single server. Helper function to fetch tools for a single server.
@ -476,10 +486,12 @@ def _fetch_tools_for_server(
connection (Optional[MCPConnection]): Optional connection object. connection (Optional[MCPConnection]): Optional connection object.
format (str): Format to return tools in. format (str): Format to return tools in.
transport (Optional[str]): Transport type. If None, auto-detects. transport (Optional[str]): Transport type. If None, auto-detects.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
List[Dict[str, Any]]: List of available MCP tools. List[Dict[str, Any]]: List of available MCP tools.
""" """
logger.info(f"_fetch_tools_for_server called for url: {url}") if verbose:
logger.info(f"_fetch_tools_for_server called for url: {url}")
if transport is None: if transport is None:
transport = auto_detect_transport(url) transport = auto_detect_transport(url)
return get_mcp_tools_sync( return get_mcp_tools_sync(
@ -487,6 +499,7 @@ def _fetch_tools_for_server(
connection=connection, connection=connection,
format=format, format=format,
transport=transport, transport=transport,
verbose=verbose,
) )
@ -497,6 +510,7 @@ def get_tools_for_multiple_mcp_servers(
output_type: Literal["json", "dict", "str"] = "str", output_type: Literal["json", "dict", "str"] = "str",
max_workers: Optional[int] = None, max_workers: Optional[int] = None,
transport: Optional[str] = None, transport: Optional[str] = None,
verbose: bool = True,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Get tools for multiple MCP servers concurrently using ThreadPoolExecutor. Get tools for multiple MCP servers concurrently using ThreadPoolExecutor.
@ -507,12 +521,14 @@ def get_tools_for_multiple_mcp_servers(
output_type (Literal): Output format type. output_type (Literal): Output format type.
max_workers (Optional[int]): Max worker threads. max_workers (Optional[int]): Max worker threads.
transport (Optional[str]): Transport type. If None, auto-detects per URL. transport (Optional[str]): Transport type. If None, auto-detects per URL.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
List[Dict[str, Any]]: Combined list of tools from all servers. List[Dict[str, Any]]: Combined list of tools from all servers.
""" """
logger.info( if verbose:
f"get_tools_for_multiple_mcp_servers called for {len(urls)} urls." logger.info(
) f"get_tools_for_multiple_mcp_servers called for {len(urls)} urls."
)
tools = [] tools = []
( (
min(32, os.cpu_count() + 4) min(32, os.cpu_count() + 4)
@ -528,6 +544,7 @@ def get_tools_for_multiple_mcp_servers(
connection, connection,
format, format,
transport, transport,
verbose,
): url ): url
for url, connection in zip(urls, connections) for url, connection in zip(urls, connections)
} }
@ -539,6 +556,7 @@ def get_tools_for_multiple_mcp_servers(
None, None,
format, format,
transport, transport,
verbose,
): url ): url
for url in urls for url in urls
} }
@ -563,6 +581,7 @@ async def _execute_tool_call_simple(
connection: Optional[MCPConnection] = None, connection: Optional[MCPConnection] = None,
output_type: Literal["json", "dict", "str"] = "str", output_type: Literal["json", "dict", "str"] = "str",
transport: Optional[str] = None, transport: Optional[str] = None,
verbose: bool = True,
*args, *args,
**kwargs, **kwargs,
): ):
@ -574,14 +593,16 @@ async def _execute_tool_call_simple(
connection (Optional[MCPConnection]): Optional connection object. connection (Optional[MCPConnection]): Optional connection object.
output_type (Literal): Output format type. output_type (Literal): Output format type.
transport (Optional[str]): Transport type. If None, auto-detects. transport (Optional[str]): Transport type. If None, auto-detects.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
The tool call result in the specified output format. The tool call result in the specified output format.
Raises: Raises:
MCPExecutionError, MCPConnectionError MCPExecutionError, MCPConnectionError
""" """
logger.info( if verbose:
f"_execute_tool_call_simple called for server_path: {server_path}" logger.info(
) f"_execute_tool_call_simple called for server_path: {server_path}"
)
if transport is None: if transport is None:
transport = auto_detect_transport(server_path) transport = auto_detect_transport(server_path)
if exists(connection): if exists(connection):
@ -638,9 +659,10 @@ async def _execute_tool_call_simple(
out = "\n".join(formatted_lines) out = "\n".join(formatted_lines)
else: else:
out = call_result.model_dump() out = call_result.model_dump()
logger.info( if verbose:
f"Tool call executed successfully for {server_path}" logger.info(
) f"Tool call executed successfully for {server_path}"
)
return out return out
except Exception as e: except Exception as e:
logger.error( logger.error(
@ -664,6 +686,7 @@ async def execute_tool_call_simple(
connection: Optional[MCPConnection] = None, connection: Optional[MCPConnection] = None,
output_type: Literal["json", "dict", "str", "formatted"] = "str", output_type: Literal["json", "dict", "str", "formatted"] = "str",
transport: Optional[str] = None, transport: Optional[str] = None,
verbose: bool = True,
*args, *args,
**kwargs, **kwargs,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
@ -675,12 +698,14 @@ async def execute_tool_call_simple(
connection (Optional[MCPConnection]): Optional connection object. connection (Optional[MCPConnection]): Optional connection object.
output_type (Literal): Output format type. output_type (Literal): Output format type.
transport (Optional[str]): Transport type. If None, auto-detects. transport (Optional[str]): Transport type. If None, auto-detects.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
The tool call result in the specified output format. The tool call result in the specified output format.
""" """
logger.info( if verbose:
f"execute_tool_call_simple called for server_path: {server_path}" logger.info(
) f"execute_tool_call_simple called for server_path: {server_path}"
)
if transport is None: if transport is None:
transport = auto_detect_transport(server_path) transport = auto_detect_transport(server_path)
if isinstance(response, str): if isinstance(response, str):
@ -691,6 +716,7 @@ async def execute_tool_call_simple(
connection=connection, connection=connection,
output_type=output_type, output_type=output_type,
transport=transport, transport=transport,
verbose=verbose,
*args, *args,
**kwargs, **kwargs,
) )
@ -701,6 +727,7 @@ def _create_server_tool_mapping(
connections: List[MCPConnection] = None, connections: List[MCPConnection] = None,
format: str = "openai", format: str = "openai",
transport: Optional[str] = None, transport: Optional[str] = None,
verbose: bool = True,
) -> Dict[str, Dict[str, Any]]: ) -> Dict[str, Dict[str, Any]]:
""" """
Create a mapping of function names to server information for all MCP servers. Create a mapping of function names to server information for all MCP servers.
@ -709,6 +736,7 @@ def _create_server_tool_mapping(
connections (List[MCPConnection]): Optional list of MCPConnection objects. connections (List[MCPConnection]): Optional list of MCPConnection objects.
format (str): Format to fetch tools in. format (str): Format to fetch tools in.
transport (Optional[str]): Transport type. If None, auto-detects per URL. transport (Optional[str]): Transport type. If None, auto-detects per URL.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
Dict[str, Dict[str, Any]]: Mapping of function names to server info. Dict[str, Dict[str, Any]]: Mapping of function names to server info.
""" """
@ -725,6 +753,7 @@ def _create_server_tool_mapping(
connection=connection, connection=connection,
format=format, format=format,
transport=transport, transport=transport,
verbose=verbose,
) )
for tool in tools: for tool in tools:
if isinstance(tool, dict) and "function" in tool: if isinstance(tool, dict) and "function" in tool:
@ -755,6 +784,7 @@ async def _create_server_tool_mapping_async(
connections: List[MCPConnection] = None, connections: List[MCPConnection] = None,
format: str = "openai", format: str = "openai",
transport: str = "streamable-http", transport: str = "streamable-http",
verbose: bool = True,
) -> Dict[str, Dict[str, Any]]: ) -> Dict[str, Dict[str, Any]]:
""" """
Async version: Create a mapping of function names to server information for all MCP servers. Async version: Create a mapping of function names to server information for all MCP servers.
@ -763,6 +793,7 @@ async def _create_server_tool_mapping_async(
connections (List[MCPConnection]): Optional list of MCPConnection objects. connections (List[MCPConnection]): Optional list of MCPConnection objects.
format (str): Format to fetch tools in. format (str): Format to fetch tools in.
transport (str): Transport type. transport (str): Transport type.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
Dict[str, Dict[str, Any]]: Mapping of function names to server info. Dict[str, Dict[str, Any]]: Mapping of function names to server info.
""" """
@ -779,6 +810,7 @@ async def _create_server_tool_mapping_async(
connection=connection, connection=connection,
format=format, format=format,
transport=transport, transport=transport,
verbose=verbose,
) )
for tool in tools: for tool in tools:
if isinstance(tool, dict) and "function" in tool: if isinstance(tool, dict) and "function" in tool:
@ -809,6 +841,7 @@ async def _execute_tool_on_server(
server_info: Dict[str, Any], server_info: Dict[str, Any],
output_type: Literal["json", "dict", "str", "formatted"] = "str", output_type: Literal["json", "dict", "str", "formatted"] = "str",
transport: str = "streamable-http", transport: str = "streamable-http",
verbose: bool = True,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Execute a single tool call on a specific server. Execute a single tool call on a specific server.
@ -817,6 +850,7 @@ async def _execute_tool_on_server(
server_info (Dict[str, Any]): Server information from the mapping. server_info (Dict[str, Any]): Server information from the mapping.
output_type (Literal): Output format type. output_type (Literal): Output format type.
transport (str): Transport type. transport (str): Transport type.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
Dict[str, Any]: Execution result with server metadata. Dict[str, Any]: Execution result with server metadata.
""" """
@ -827,6 +861,7 @@ async def _execute_tool_on_server(
connection=server_info["connection"], connection=server_info["connection"],
output_type=output_type, output_type=output_type,
transport=transport, transport=transport,
verbose=verbose,
) )
return { return {
"server_url": server_info["url"], "server_url": server_info["url"],
@ -860,6 +895,7 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
output_type: Literal["json", "dict", "str", "formatted"] = "str", output_type: Literal["json", "dict", "str", "formatted"] = "str",
max_concurrent: Optional[int] = None, max_concurrent: Optional[int] = None,
transport: str = "streamable-http", transport: str = "streamable-http",
verbose: bool = True,
*args, *args,
**kwargs, **kwargs,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
@ -872,88 +908,103 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
output_type (Literal): Output format type. output_type (Literal): Output format type.
max_concurrent (Optional[int]): Max concurrent tasks. max_concurrent (Optional[int]): Max concurrent tasks.
transport (str): Transport type. transport (str): Transport type.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
List[Dict[str, Any]]: List of execution results. List[Dict[str, Any]]: List of execution results.
""" """
if not responses: if not responses:
logger.warning("No responses provided for execution") if verbose:
logger.warning("No responses provided for execution")
return [] return []
if not urls: if not urls:
raise MCPValidationError("No server URLs provided") raise MCPValidationError("No server URLs provided")
logger.info( if verbose:
f"Creating tool mapping for {len(urls)} servers using transport: {transport}" logger.info(
) f"Creating tool mapping for {len(urls)} servers using transport: {transport}"
)
server_tool_mapping = await _create_server_tool_mapping_async( server_tool_mapping = await _create_server_tool_mapping_async(
urls=urls, urls=urls,
connections=connections, connections=connections,
format="openai", format="openai",
transport=transport, transport=transport,
verbose=verbose,
) )
if not server_tool_mapping: if not server_tool_mapping:
raise MCPExecutionError( raise MCPExecutionError(
"No tools found on any of the provided servers" "No tools found on any of the provided servers"
) )
logger.info( if verbose:
f"Found {len(server_tool_mapping)} unique functions across all servers" logger.info(
) f"Found {len(server_tool_mapping)} unique functions across all servers"
)
all_tool_calls = [] all_tool_calls = []
logger.info( if verbose:
f"Processing {len(responses)} responses for tool call extraction" logger.info(
) f"Processing {len(responses)} responses for tool call extraction"
)
if len(responses) > 10 and all( if len(responses) > 10 and all(
isinstance(r, str) and len(r) == 1 for r in responses isinstance(r, str) and len(r) == 1 for r in responses
): ):
logger.info( if verbose:
"Detected character-by-character response, reconstructing JSON string"
)
try:
reconstructed_response = "".join(responses)
logger.info( logger.info(
f"Reconstructed response length: {len(reconstructed_response)}" "Detected character-by-character response, reconstructing JSON string"
) )
logger.debug( try:
f"Reconstructed response: {reconstructed_response}" reconstructed_response = "".join(responses)
) if verbose:
try:
json.loads(reconstructed_response)
logger.info( logger.info(
"Successfully validated reconstructed JSON response" f"Reconstructed response length: {len(reconstructed_response)}"
)
except json.JSONDecodeError as e:
logger.warning(
f"Reconstructed response is not valid JSON: {str(e)}"
) )
logger.debug( logger.debug(
f"First 100 chars: {reconstructed_response[:100]}" f"Reconstructed response: {reconstructed_response}"
)
logger.debug(
f"Last 100 chars: {reconstructed_response[-100:]}"
) )
try:
json.loads(reconstructed_response)
if verbose:
logger.info(
"Successfully validated reconstructed JSON response"
)
except json.JSONDecodeError as e:
if verbose:
logger.warning(
f"Reconstructed response is not valid JSON: {str(e)}"
)
logger.debug(
f"First 100 chars: {reconstructed_response[:100]}"
)
logger.debug(
f"Last 100 chars: {reconstructed_response[-100:]}"
)
responses = [reconstructed_response] responses = [reconstructed_response]
except Exception as e: except Exception as e:
logger.warning( if verbose:
f"Failed to reconstruct response from characters: {str(e)}" logger.warning(
) f"Failed to reconstruct response from characters: {str(e)}"
)
for i, response in enumerate(responses): for i, response in enumerate(responses):
logger.debug( if verbose:
f"Processing response {i}: {type(response)} - {response}" logger.debug(
) f"Processing response {i}: {type(response)} - {response}"
)
if isinstance(response, str): if isinstance(response, str):
try: try:
response = json.loads(response) response = json.loads(response)
logger.debug( if verbose:
f"Parsed JSON string response {i}: {response}" logger.debug(
) f"Parsed JSON string response {i}: {response}"
)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.warning( if verbose:
f"Failed to parse JSON response at index {i}: {response}" logger.warning(
) f"Failed to parse JSON response at index {i}: {response}"
)
continue continue
if isinstance(response, dict): if isinstance(response, dict):
if "function" in response: if "function" in response:
logger.debug( if verbose:
f"Found single tool call in response {i}: {response['function']}" logger.debug(
) f"Found single tool call in response {i}: {response['function']}"
)
if isinstance( if isinstance(
response["function"].get("arguments"), str response["function"].get("arguments"), str
): ):
@ -963,18 +1014,21 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
response["function"]["arguments"] response["function"]["arguments"]
) )
) )
logger.debug( if verbose:
f"Parsed function arguments: {response['function']['arguments']}" logger.debug(
) f"Parsed function arguments: {response['function']['arguments']}"
)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.warning( if verbose:
f"Failed to parse function arguments: {response['function']['arguments']}" logger.warning(
) f"Failed to parse function arguments: {response['function']['arguments']}"
)
all_tool_calls.append((i, response)) all_tool_calls.append((i, response))
elif "tool_calls" in response: elif "tool_calls" in response:
logger.debug( if verbose:
f"Found multiple tool calls in response {i}: {len(response['tool_calls'])} calls" logger.debug(
) f"Found multiple tool calls in response {i}: {len(response['tool_calls'])} calls"
)
for tool_call in response["tool_calls"]: for tool_call in response["tool_calls"]:
if isinstance( if isinstance(
tool_call.get("function", {}).get( tool_call.get("function", {}).get(
@ -988,44 +1042,55 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
tool_call["function"]["arguments"] tool_call["function"]["arguments"]
) )
) )
logger.debug( if verbose:
f"Parsed tool call arguments: {tool_call['function']['arguments']}" logger.debug(
) f"Parsed tool call arguments: {tool_call['function']['arguments']}"
)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.warning( if verbose:
f"Failed to parse tool call arguments: {tool_call['function']['arguments']}" logger.warning(
) f"Failed to parse tool call arguments: {tool_call['function']['arguments']}"
)
all_tool_calls.append((i, tool_call)) all_tool_calls.append((i, tool_call))
elif "name" in response and "arguments" in response: elif "name" in response and "arguments" in response:
logger.debug( if verbose:
f"Found direct tool call in response {i}: {response}" logger.debug(
) f"Found direct tool call in response {i}: {response}"
)
if isinstance(response.get("arguments"), str): if isinstance(response.get("arguments"), str):
try: try:
response["arguments"] = json.loads( response["arguments"] = json.loads(
response["arguments"] response["arguments"]
) )
logger.debug( if verbose:
f"Parsed direct tool call arguments: {response['arguments']}" logger.debug(
) f"Parsed direct tool call arguments: {response['arguments']}"
)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.warning( if verbose:
f"Failed to parse direct tool call arguments: {response['arguments']}" logger.warning(
) f"Failed to parse direct tool call arguments: {response['arguments']}"
)
all_tool_calls.append((i, {"function": response})) all_tool_calls.append((i, {"function": response}))
else: else:
logger.debug( if verbose:
f"Response {i} is a dict but doesn't match expected tool call formats: {list(response.keys())}" logger.debug(
) f"Response {i} is a dict but doesn't match expected tool call formats: {list(response.keys())}"
)
else: else:
logger.warning( if verbose:
f"Unsupported response type at index {i}: {type(response)}" logger.warning(
) f"Unsupported response type at index {i}: {type(response)}"
)
continue continue
if not all_tool_calls: if not all_tool_calls:
logger.warning("No tool calls found in responses") if verbose:
logger.warning("No tool calls found in responses")
return [] return []
logger.info(f"Found {len(all_tool_calls)} tool calls to execute") if verbose:
logger.info(
f"Found {len(all_tool_calls)} tool calls to execute"
)
max_concurrent = max_concurrent or len(all_tool_calls) max_concurrent = max_concurrent or len(all_tool_calls)
semaphore = asyncio.Semaphore(max_concurrent) semaphore = asyncio.Semaphore(max_concurrent)
@ -1036,9 +1101,10 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
"name", "unknown" "name", "unknown"
) )
if function_name not in server_tool_mapping: if function_name not in server_tool_mapping:
logger.warning( if verbose:
f"Function '{function_name}' not found on any server" logger.warning(
) f"Function '{function_name}' not found on any server"
)
return { return {
"response_index": response_index, "response_index": response_index,
"function_name": function_name, "function_name": function_name,
@ -1052,6 +1118,7 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
server_info=server_info, server_info=server_info,
output_type=output_type, output_type=output_type,
transport=transport, transport=transport,
verbose=verbose,
) )
result["response_index"] = response_index result["response_index"] = response_index
return result return result
@ -1082,9 +1149,10 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
) )
else: else:
processed_results.append(result) processed_results.append(result)
logger.info( if verbose:
f"Completed execution of {len(processed_results)} tool calls" logger.info(
) f"Completed execution of {len(processed_results)} tool calls"
)
return processed_results return processed_results
@ -1095,6 +1163,7 @@ def execute_multiple_tools_on_multiple_mcp_servers_sync(
output_type: Literal["json", "dict", "str", "formatted"] = "str", output_type: Literal["json", "dict", "str", "formatted"] = "str",
max_concurrent: Optional[int] = None, max_concurrent: Optional[int] = None,
transport: str = "streamable-http", transport: str = "streamable-http",
verbose: bool = True,
*args, *args,
**kwargs, **kwargs,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
@ -1107,6 +1176,7 @@ def execute_multiple_tools_on_multiple_mcp_servers_sync(
output_type (Literal): Output format type. output_type (Literal): Output format type.
max_concurrent (Optional[int]): Max concurrent tasks. max_concurrent (Optional[int]): Max concurrent tasks.
transport (str): Transport type. transport (str): Transport type.
verbose (bool): Enable verbose logging. Defaults to True.
Returns: Returns:
List[Dict[str, Any]]: List of execution results. List[Dict[str, Any]]: List of execution results.
""" """
@ -1120,6 +1190,7 @@ def execute_multiple_tools_on_multiple_mcp_servers_sync(
output_type=output_type, output_type=output_type,
max_concurrent=max_concurrent, max_concurrent=max_concurrent,
transport=transport, transport=transport,
verbose=verbose,
*args, *args,
**kwargs, **kwargs,
) )

Loading…
Cancel
Save