[FIX][AOP Docs]

pull/1104/head
Kye Gomez 1 week ago
parent 301a201f40
commit b580980c71

@ -301,6 +301,7 @@ nav:
- MultiAgentRouter: "swarms/structs/multi_agent_router.md" - MultiAgentRouter: "swarms/structs/multi_agent_router.md"
- ModelRouter: "swarms/structs/model_router.md" - ModelRouter: "swarms/structs/model_router.md"
- Rearrangers: - Rearrangers:
- SwarmRearrange: "swarms/structs/swarm_rearrange.md" - SwarmRearrange: "swarms/structs/swarm_rearrange.md"
- AgentRearrange: "swarms/structs/agent_rearrange.md" - AgentRearrange: "swarms/structs/agent_rearrange.md"
@ -327,7 +328,6 @@ nav:
- BaseTool Reference: "swarms/tools/base_tool.md" - BaseTool Reference: "swarms/tools/base_tool.md"
- MCP Client Utils: "swarms/tools/mcp_client_call.md" - MCP Client Utils: "swarms/tools/mcp_client_call.md"
- MCP Agent Tool: "swarms/tools/mcp_agent_tool.md" - MCP Agent Tool: "swarms/tools/mcp_agent_tool.md"
- Agent Orchestration Protocol (AOP): "swarms/structs/aop.md"
- Vertical Tools: - Vertical Tools:
- Finance: "swarms_tools/finance.md" - Finance: "swarms_tools/finance.md"

@ -0,0 +1,336 @@
# AOP Cluster Example
This example demonstrates how to use AOPCluster to connect to and manage multiple MCP servers running AOP agents.
## Basic Cluster Setup
```python
import json
from swarms.structs.aop import AOPCluster
# Connect to multiple MCP servers
cluster = AOPCluster(
urls=[
"http://localhost:8000/mcp", # Research and Analysis server
"http://localhost:8001/mcp", # Writing and Code server
"http://localhost:8002/mcp" # Financial server
],
transport="streamable-http"
)
# Get all available tools from all servers
all_tools = cluster.get_tools(output_type="dict")
print(f"Found {len(all_tools)} tools across all servers")
# Pretty print all tools
print(json.dumps(all_tools, indent=2))
```
## Finding Specific Tools
```python
# Find a specific tool by name
research_tool = cluster.find_tool_by_server_name("Research-Agent")
if research_tool:
print("Found Research-Agent tool:")
print(json.dumps(research_tool, indent=2))
else:
print("Research-Agent tool not found")
# Find multiple tools
tool_names = ["Research-Agent", "Analysis-Agent", "Writing-Agent", "Code-Agent"]
found_tools = {}
for tool_name in tool_names:
tool = cluster.find_tool_by_server_name(tool_name)
if tool:
found_tools[tool_name] = tool
print(f"✓ Found {tool_name}")
else:
print(f"✗ {tool_name} not found")
print(f"Found {len(found_tools)} out of {len(tool_names)} tools")
```
## Tool Discovery and Management
```python
# Get tools in different formats
json_tools = cluster.get_tools(output_type="json")
dict_tools = cluster.get_tools(output_type="dict")
str_tools = cluster.get_tools(output_type="str")
print(f"JSON format: {len(json_tools)} tools")
print(f"Dict format: {len(dict_tools)} tools")
print(f"String format: {len(str_tools)} tools")
# Analyze tool distribution across servers
server_tools = {}
for tool in dict_tools:
server_name = tool.get("server", "unknown")
if server_name not in server_tools:
server_tools[server_name] = []
server_tools[server_name].append(tool.get("function", {}).get("name", "unknown"))
print("\nTools by server:")
for server, tools in server_tools.items():
print(f" {server}: {len(tools)} tools - {tools}")
```
## Advanced Cluster Management
```python
class AOPClusterManager:
def __init__(self, urls, transport="streamable-http"):
self.cluster = AOPCluster(urls, transport)
self.tools_cache = {}
self.last_update = None
def refresh_tools(self):
"""Refresh the tools cache"""
self.tools_cache = {}
tools = self.cluster.get_tools(output_type="dict")
for tool in tools:
tool_name = tool.get("function", {}).get("name")
if tool_name:
self.tools_cache[tool_name] = tool
self.last_update = time.time()
return len(self.tools_cache)
def get_tool(self, tool_name):
"""Get a specific tool by name"""
if not self.tools_cache or time.time() - self.last_update > 300: # 5 min cache
self.refresh_tools()
return self.tools_cache.get(tool_name)
def list_tools_by_category(self):
"""Categorize tools by their names"""
categories = {
"research": [],
"analysis": [],
"writing": [],
"code": [],
"financial": [],
"other": []
}
for tool_name in self.tools_cache.keys():
tool_name_lower = tool_name.lower()
if "research" in tool_name_lower:
categories["research"].append(tool_name)
elif "analysis" in tool_name_lower:
categories["analysis"].append(tool_name)
elif "writing" in tool_name_lower:
categories["writing"].append(tool_name)
elif "code" in tool_name_lower:
categories["code"].append(tool_name)
elif "financial" in tool_name_lower:
categories["financial"].append(tool_name)
else:
categories["other"].append(tool_name)
return categories
def get_available_servers(self):
"""Get list of available servers"""
servers = set()
for tool in self.tools_cache.values():
server = tool.get("server", "unknown")
servers.add(server)
return list(servers)
# Usage example
import time
manager = AOPClusterManager([
"http://localhost:8000/mcp",
"http://localhost:8001/mcp",
"http://localhost:8002/mcp"
])
# Refresh and display tools
tool_count = manager.refresh_tools()
print(f"Loaded {tool_count} tools")
# Categorize tools
categories = manager.list_tools_by_category()
for category, tools in categories.items():
if tools:
print(f"{category.title()}: {tools}")
# Get available servers
servers = manager.get_available_servers()
print(f"Available servers: {servers}")
```
## Error Handling and Resilience
```python
class ResilientAOPCluster:
def __init__(self, urls, transport="streamable-http"):
self.urls = urls
self.transport = transport
self.cluster = AOPCluster(urls, transport)
self.failed_servers = set()
def get_tools_with_fallback(self, output_type="dict"):
"""Get tools with fallback for failed servers"""
try:
return self.cluster.get_tools(output_type=output_type)
except Exception as e:
print(f"Error getting tools: {e}")
# Try individual servers
all_tools = []
for url in self.urls:
if url in self.failed_servers:
continue
try:
single_cluster = AOPCluster([url], self.transport)
tools = single_cluster.get_tools(output_type=output_type)
all_tools.extend(tools)
except Exception as server_error:
print(f"Server {url} failed: {server_error}")
self.failed_servers.add(url)
return all_tools
def find_tool_with_retry(self, tool_name, max_retries=3):
"""Find tool with retry logic"""
for attempt in range(max_retries):
try:
return self.cluster.find_tool_by_server_name(tool_name)
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
return None
# Usage
resilient_cluster = ResilientAOPCluster([
"http://localhost:8000/mcp",
"http://localhost:8001/mcp",
"http://localhost:8002/mcp"
])
# Get tools with error handling
tools = resilient_cluster.get_tools_with_fallback()
print(f"Retrieved {len(tools)} tools")
# Find tool with retry
research_tool = resilient_cluster.find_tool_with_retry("Research-Agent")
if research_tool:
print("Found Research-Agent tool")
else:
print("Research-Agent tool not found after retries")
```
## Monitoring and Health Checks
```python
class AOPClusterMonitor:
def __init__(self, cluster_manager):
self.manager = cluster_manager
self.health_status = {}
def check_server_health(self, url):
"""Check if a server is healthy"""
try:
single_cluster = AOPCluster([url], self.manager.cluster.transport)
tools = single_cluster.get_tools(output_type="dict")
return {
"status": "healthy",
"tool_count": len(tools),
"timestamp": time.time()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": time.time()
}
def check_all_servers(self):
"""Check health of all servers"""
for url in self.manager.cluster.urls:
health = self.check_server_health(url)
self.health_status[url] = health
status_icon = "✓" if health["status"] == "healthy" else "✗"
print(f"{status_icon} {url}: {health['status']}")
if health["status"] == "healthy":
print(f" Tools available: {health['tool_count']}")
else:
print(f" Error: {health['error']}")
def get_health_summary(self):
"""Get summary of server health"""
healthy_count = sum(1 for status in self.health_status.values()
if status["status"] == "healthy")
total_count = len(self.health_status)
return {
"healthy_servers": healthy_count,
"total_servers": total_count,
"health_percentage": (healthy_count / total_count) * 100 if total_count > 0 else 0
}
# Usage
monitor = AOPClusterMonitor(manager)
monitor.check_all_servers()
summary = monitor.get_health_summary()
print(f"Health Summary: {summary['healthy_servers']}/{summary['total_servers']} servers healthy ({summary['health_percentage']:.1f}%)")
```
## Complete Example
```python
import json
import time
from swarms.structs.aop import AOPCluster
def main():
# Initialize cluster
cluster = AOPCluster(
urls=[
"http://localhost:8000/mcp",
"http://localhost:8001/mcp",
"http://localhost:8002/mcp"
],
transport="streamable-http"
)
print("AOP Cluster Management System")
print("=" * 40)
# Get all tools
print("\n1. Discovering tools...")
tools = cluster.get_tools(output_type="dict")
print(f"Found {len(tools)} tools across all servers")
# List all tool names
tool_names = [tool.get("function", {}).get("name") for tool in tools]
print(f"Available tools: {tool_names}")
# Find specific tools
print("\n2. Finding specific tools...")
target_tools = ["Research-Agent", "Analysis-Agent", "Writing-Agent", "Code-Agent", "Financial-Agent"]
for tool_name in target_tools:
tool = cluster.find_tool_by_server_name(tool_name)
if tool:
print(f"✓ {tool_name}: Available")
else:
print(f"✗ {tool_name}: Not found")
# Display tool details
print("\n3. Tool details:")
for tool in tools[:3]: # Show first 3 tools
print(f"\nTool: {tool.get('function', {}).get('name')}")
print(f"Description: {tool.get('function', {}).get('description')}")
print(f"Parameters: {list(tool.get('function', {}).get('parameters', {}).get('properties', {}).keys())}")
print("\nAOP Cluster setup complete!")
if __name__ == "__main__":
main()
```
This example demonstrates comprehensive AOP cluster management including tool discovery, error handling, health monitoring, and advanced cluster operations.

@ -0,0 +1,326 @@
# AOP Server Setup Example
This example demonstrates how to set up an AOP (Agent Orchestration Protocol) server with multiple specialized agents.
## Complete Server Setup
```python
from swarms import Agent
from swarms.structs.aop import AOP
# Create specialized agents
research_agent = Agent(
agent_name="Research-Agent",
agent_description="Expert in research, data collection, and information gathering",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a research specialist. Your role is to:
1. Gather comprehensive information on any given topic
2. Analyze data from multiple sources
3. Provide well-structured research findings
4. Cite sources and maintain accuracy
5. Present findings in a clear, organized manner
Always provide detailed, factual information with proper context.""",
)
analysis_agent = Agent(
agent_name="Analysis-Agent",
agent_description="Expert in data analysis, pattern recognition, and generating insights",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are an analysis specialist. Your role is to:
1. Analyze data and identify patterns
2. Generate actionable insights
3. Create visualizations and summaries
4. Provide statistical analysis
5. Make data-driven recommendations
Focus on extracting meaningful insights from information.""",
)
writing_agent = Agent(
agent_name="Writing-Agent",
agent_description="Expert in content creation, editing, and communication",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a writing specialist. Your role is to:
1. Create engaging, well-structured content
2. Edit and improve existing text
3. Adapt tone and style for different audiences
4. Ensure clarity and coherence
5. Follow best practices in writing
Always produce high-quality, professional content.""",
)
code_agent = Agent(
agent_name="Code-Agent",
agent_description="Expert in programming, code review, and software development",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a coding specialist. Your role is to:
1. Write clean, efficient code
2. Debug and fix issues
3. Review and optimize code
4. Explain programming concepts
5. Follow best practices and standards
Always provide working, well-documented code.""",
)
financial_agent = Agent(
agent_name="Financial-Agent",
agent_description="Expert in financial analysis, market research, and investment insights",
model_name="anthropic/claude-sonnet-4-5",
max_loops=1,
top_p=None,
dynamic_temperature_enabled=True,
system_prompt="""You are a financial specialist. Your role is to:
1. Analyze financial data and markets
2. Provide investment insights
3. Assess risk and opportunities
4. Create financial reports
5. Explain complex financial concepts
Always provide accurate, well-reasoned financial analysis.""",
)
# Create AOP instance
deployer = AOP(
server_name="MyAgentServer",
port=8000,
verbose=True,
log_level="INFO"
)
# Add all agents at once
agents = [
research_agent,
analysis_agent,
writing_agent,
code_agent,
financial_agent,
]
tool_names = deployer.add_agents_batch(agents)
print(f"Added {len(tool_names)} agents: {tool_names}")
# Display server information
server_info = deployer.get_server_info()
print(f"Server: {server_info['server_name']}")
print(f"Total tools: {server_info['total_tools']}")
print(f"Available tools: {server_info['tools']}")
# Start the server
print("Starting AOP server...")
deployer.run()
```
## Running the Server
1. Save the code above to a file (e.g., `aop_server.py`)
2. Install required dependencies:
```bash
pip install swarms
```
3. Run the server:
```bash
python aop_server.py
```
The server will start on `http://localhost:8000` and make all agents available as MCP tools.
## Tool Usage Examples
Once the server is running, you can call the tools using MCP clients:
### Research Agent
```python
# Call the research agent
result = research_tool(task="Research the latest AI trends in 2024")
print(result)
```
### Analysis Agent with Image
```python
# Call the analysis agent with an image
result = analysis_tool(
task="Analyze this chart and provide insights",
img="path/to/chart.png"
)
print(result)
```
### Writing Agent with Multiple Images
```python
# Call the writing agent with multiple images
result = writing_tool(
task="Write a comprehensive report based on these images",
imgs=["image1.jpg", "image2.jpg", "image3.jpg"]
)
print(result)
```
### Code Agent with Validation
```python
# Call the code agent with expected output
result = code_tool(
task="Debug this Python function",
correct_answer="Expected output: Hello World"
)
print(result)
```
### Financial Agent
```python
# Call the financial agent
result = financial_tool(task="Analyze the current market trends for tech stocks")
print(result)
```
## Response Format
All tools return a standardized response:
```json
{
"result": "The agent's response to the task",
"success": true,
"error": null
}
```
## Advanced Configuration
### Custom Timeouts and Retries
```python
# Add agent with custom configuration
deployer.add_agent(
agent=research_agent,
tool_name="custom_research_tool",
tool_description="Research tool with extended timeout",
timeout=120, # 2 minutes
max_retries=5,
verbose=True
)
```
### Custom Input/Output Schemas
```python
# Define custom schemas
custom_input_schema = {
"type": "object",
"properties": {
"task": {"type": "string", "description": "The research task"},
"sources": {
"type": "array",
"items": {"type": "string"},
"description": "Specific sources to research"
},
"depth": {
"type": "string",
"enum": ["shallow", "medium", "deep"],
"description": "Research depth level"
}
},
"required": ["task"]
}
# Add agent with custom schemas
deployer.add_agent(
agent=research_agent,
tool_name="advanced_research_tool",
input_schema=custom_input_schema,
timeout=60
)
```
## Monitoring and Debugging
### Enable Verbose Logging
```python
deployer = AOP(
server_name="DebugServer",
verbose=True,
traceback_enabled=True,
log_level="DEBUG"
)
```
### Check Server Status
```python
# List all registered agents
agents = deployer.list_agents()
print(f"Registered agents: {agents}")
# Get detailed agent information
for agent_name in agents:
info = deployer.get_agent_info(agent_name)
print(f"Agent {agent_name}: {info}")
# Get server information
server_info = deployer.get_server_info()
print(f"Server info: {server_info}")
```
## Production Deployment
### External Access
```python
deployer = AOP(
server_name="ProductionServer",
host="0.0.0.0", # Allow external connections
port=8000,
verbose=False, # Disable verbose logging in production
log_level="WARNING"
)
```
### Multiple Servers
```python
# Server 1: Research and Analysis
research_deployer = AOP("ResearchServer", port=8000)
research_deployer.add_agent(research_agent)
research_deployer.add_agent(analysis_agent)
# Server 2: Writing and Code
content_deployer = AOP("ContentServer", port=8001)
content_deployer.add_agent(writing_agent)
content_deployer.add_agent(code_agent)
# Server 3: Financial
finance_deployer = AOP("FinanceServer", port=8002)
finance_deployer.add_agent(financial_agent)
# Start all servers
import threading
threading.Thread(target=research_deployer.run).start()
threading.Thread(target=content_deployer.run).start()
threading.Thread(target=finance_deployer.run).start()
```
This example demonstrates a complete AOP server setup with multiple specialized agents, proper configuration, and production-ready deployment options.
Loading…
Cancel
Save