You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/examples/aop_examples/discovery/simple_discovery_example.py

232 lines
7.1 KiB

#!/usr/bin/env python3
"""
Simple example showing how to call the discover_agents tool synchronously.
"""
import json
import asyncio
from swarms.structs.aop import AOPCluster
from swarms.tools.mcp_client_tools import execute_tool_call_simple
def call_discover_agents_sync(server_url="http://localhost:5932/mcp"):
"""
Synchronously call the discover_agents tool.
Args:
server_url: URL of the MCP server
Returns:
Dict containing the discovery results
"""
# Create the tool call request
tool_call_request = {
"type": "function",
"function": {
"name": "discover_agents",
"arguments": json.dumps({}), # Empty = get all agents
},
}
# Run the async function
return asyncio.run(
execute_tool_call_simple(
response=tool_call_request,
server_path=server_url,
output_type="dict",
)
)
def call_discover_specific_agent_sync(
agent_name, server_url="http://localhost:5932/mcp"
):
"""
Synchronously call the discover_agents tool for a specific agent.
Args:
agent_name: Name of the specific agent to discover
server_url: URL of the MCP server
Returns:
Dict containing the discovery results
"""
# Create the tool call request
tool_call_request = {
"type": "function",
"function": {
"name": "discover_agents",
"arguments": json.dumps({"agent_name": agent_name}),
},
}
# Run the async function
return asyncio.run(
execute_tool_call_simple(
response=tool_call_request,
server_path=server_url,
output_type="dict",
)
)
def main():
"""Main function demonstrating discovery tool usage."""
print("🔍 AOP Agent Discovery Tool Example")
print("=" * 40)
print()
# First, check what tools are available
print("1. Checking available MCP tools...")
aop_cluster = AOPCluster(
urls=["http://localhost:5932/mcp"],
transport="streamable-http",
)
tools = aop_cluster.get_tools(output_type="dict")
print(f" Found {len(tools)} tools")
# Check if discover_agents is available
discover_tool = aop_cluster.find_tool_by_server_name(
"discover_agents"
)
if not discover_tool:
print("❌ discover_agents tool not found!")
print(
" Make sure your AOP server is running with agents registered."
)
return
print("✅ discover_agents tool found!")
print()
# Discover all agents
print("2. Discovering all agents...")
try:
result = call_discover_agents_sync()
if isinstance(result, list) and len(result) > 0:
discovery_data = result[0]
if discovery_data.get("success"):
agents = discovery_data.get("agents", [])
print(f" ✅ Found {len(agents)} agents:")
for i, agent in enumerate(agents, 1):
print(
f" {i}. {agent.get('agent_name', 'Unknown')}"
)
print(
f" Role: {agent.get('role', 'worker')}"
)
print(
f" Description: {agent.get('description', 'No description')}"
)
print(
f" Tags: {', '.join(agent.get('tags', []))}"
)
print(
f" Capabilities: {', '.join(agent.get('capabilities', []))}"
)
print(
f" System Prompt: {agent.get('short_system_prompt', 'No prompt')[:100]}..."
)
print()
else:
print(
f" ❌ Discovery failed: {discovery_data.get('error', 'Unknown error')}"
)
else:
print(" ❌ No valid result returned")
except Exception as e:
print(f" ❌ Error: {e}")
print()
# Example of discovering a specific agent (if any exist)
print("3. Example: Discovering a specific agent...")
try:
# Try to discover the first agent specifically
if isinstance(result, list) and len(result) > 0:
discovery_data = result[0]
if discovery_data.get("success") and discovery_data.get(
"agents"
):
first_agent_name = discovery_data["agents"][0].get(
"agent_name"
)
if first_agent_name:
print(
f" Looking for specific agent: {first_agent_name}"
)
specific_result = (
call_discover_specific_agent_sync(
first_agent_name
)
)
if (
isinstance(specific_result, list)
and len(specific_result) > 0
):
specific_data = specific_result[0]
if specific_data.get("success"):
agent = specific_data.get("agents", [{}])[
0
]
print(
f" ✅ Found specific agent: {agent.get('agent_name', 'Unknown')}"
)
print(
f" Model: {agent.get('model_name', 'Unknown')}"
)
print(
f" Max Loops: {agent.get('max_loops', 1)}"
)
print(
f" Temperature: {agent.get('temperature', 0.5)}"
)
else:
print(
f" ❌ Specific discovery failed: {specific_data.get('error')}"
)
else:
print(" ❌ No valid specific result")
else:
print(
" ⚠️ No agents found to test specific discovery"
)
else:
print(
" ⚠️ No agents available for specific discovery"
)
else:
print(
" ⚠️ No previous discovery results to use for specific discovery"
)
except Exception as e:
print(f" ❌ Error in specific discovery: {e}")
print()
print("✅ Discovery tool demonstration complete!")
print()
print("💡 Usage Summary:")
print(
" • Call discover_agents() with no arguments to get all agents"
)
print(
" • Call discover_agents(agent_name='AgentName') to get specific agent"
)
print(
" • Each agent returns: name, description, role, tags, capabilities, system prompt, etc."
)
if __name__ == "__main__":
main()