[DOCS][IMPROVEMENT][EXAMPLES] Custom Auth Callback for AOP

pull/1184/head
Aksh Parekh 2 months ago
parent 638e9e2ba2
commit 7b5ef1e63c

@ -156,6 +156,122 @@ Each agent is created with:
- **Port**: Change the port number as needed
- **Verbose**: Set to False for reduced logging
- **Server Name**: Use a descriptive name for your server
- **Authentication**: Add `auth_callback` to enable security (see below)
## Adding Authentication
You can secure your AOP server by adding a custom authentication callback:
### Simple API Key Authentication
```python
from swarms import Agent
from swarms.structs.aop import AOP
# Define authentication callback
def my_auth(token: str) -> bool:
"""Validate API keys."""
valid_keys = {"api-key-1", "api-key-2", "api-key-3"}
return token in valid_keys
# Create agents (same as above)
research_agent = Agent(
agent_name="Research-Agent",
model_name="claude-sonnet-4-5-20250929",
max_loops=1,
system_prompt="You are a research specialist.",
temperature=0.7,
top_p=None,
)
# Create AOP with authentication
deployer = AOP(
server_name="SecureAgentServer",
port=5932,
verbose=True,
auth_callback=my_auth, # Enable authentication
)
deployer.add_agent(research_agent)
deployer.run()
```
### JWT Token Authentication
```python
import jwt
def jwt_auth(token: str) -> bool:
"""Validate JWT tokens."""
try:
payload = jwt.decode(token, "your-secret-key", algorithms=["HS256"])
return payload.get("authorized", False)
except:
return False
deployer = AOP(
server_name="JWT-SecureServer",
port=5932,
auth_callback=jwt_auth,
)
```
### Environment-Based Authentication
```python
import os
def env_auth(token: str) -> bool:
"""Validate tokens from environment."""
valid_tokens = set(os.getenv("VALID_API_KEYS", "").split(","))
return token in valid_tokens
deployer = AOP(
server_name="Env-AuthServer",
port=5932,
auth_callback=env_auth,
)
```
### Client Usage with Authentication
When calling tools on an authenticated server:
```python
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async def call_tool():
url = "http://localhost:5932/mcp"
async with streamablehttp_client(url) as ctx:
read, write = ctx if len(ctx) == 2 else (ctx[0], ctx[1])
async with ClientSession(read, write) as session:
await session.initialize()
# Include auth_token parameter
result = await session.call_tool(
name="Research-Agent",
arguments={
"task": "Research AI trends",
"auth_token": "api-key-1" # Required!
},
)
print(result)
asyncio.run(call_tool())
```
### Authentication Rules
- If `auth_callback` is provided → authentication is enabled
- If `auth_callback` is None → no authentication required
- The callback function determines ALL security logic
- Return `True` to allow access, `False` to deny
- Failed authentication returns: `{"success": false, "error": "Authentication failed"}`
## Next Steps

@ -122,6 +122,7 @@ Main class for deploying agents as tools in an MCP server.
| `processing_timeout` | `int` | `30` | Timeout for processing tasks in seconds |
| `retry_delay` | `float` | `1.0` | Delay between retries in seconds |
| `log_level` | `str` | `"INFO"` | Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) |
| `auth_callback` | `Optional[Callable[[str], bool]]` | `None` | Custom authentication callback function. If provided, authentication is enabled. |
| `*args` | `Any` | - | Additional positional arguments passed to FastMCP |
| `**kwargs` | `Any` | - | Additional keyword arguments passed to FastMCP |
@ -585,6 +586,7 @@ All agent tools accept the following parameters:
| `img` | `str` | No | Single image to be processed by the agent |
| `imgs` | `List[str]` | No | Multiple images to be processed by the agent |
| `correct_answer` | `str` | No | Correct answer for validation or comparison |
| `auth_token` | `str` | Conditional | Authentication token (required if `auth_callback` is configured on server) |
## Output Format
@ -598,6 +600,173 @@ All agent tools return a standardized response format:
}
```
## Authentication for AOP Servers
AOP supports optional authentication to secure your agent deployments. Authentication is enabled by providing a custom `auth_callback` function when creating the AOP instance.
### How Authentication Works
1. **Server Side**: Provide an `auth_callback` function that validates tokens
2. **Client Side**: Include `auth_token` parameter when calling tools
3. **The callback determines ALL security logic** - token format, validation, etc.
### Authentication Callback
The `auth_callback` function should:
- Accept a single parameter: `token` (str)
- Return `True` to allow access, `False` to deny
- Handle all validation logic (API keys, JWT, database lookups, etc.)
### Authentication Examples
#### Simple API Key Authentication
```python
from swarms import Agent
from swarms.structs.aop import AOP
# Define authentication callback
def simple_auth(token: str) -> bool:
"""Validate token against a list of valid API keys."""
valid_tokens = {"secret-key-123", "secret-key-456", "secret-key-789"}
return token in valid_tokens
# Create agent
agent = Agent(
agent_name="Research-Agent",
model_name="claude-sonnet-4-5-20250929",
max_loops=1,
system_prompt="You are a helpful research assistant.",
temperature=0.7,
top_p=None,
)
# Create AOP with authentication
server = AOP(
server_name="SecureServer",
port=5932,
auth_callback=simple_auth, # This enables and governs authentication
)
server.add_agent(agent)
server.run()
```
#### JWT Token Authentication
```python
import jwt
def jwt_auth(token: str) -> bool:
"""Validate JWT tokens."""
try:
secret = "your-secret-key"
payload = jwt.decode(token, secret, algorithms=["HS256"])
return payload.get("authorized", False)
except jwt.InvalidTokenError:
return False
server = AOP(
server_name="JWT-Server",
port=5932,
auth_callback=jwt_auth,
)
```
#### Database Authentication
```python
def database_auth(token: str) -> bool:
"""Validate tokens against database."""
from your_db import session, Token
token_record = session.query(Token).filter_by(
token=token,
is_active=True
).first()
return token_record is not None
server = AOP(
server_name="DB-Auth-Server",
port=5932,
auth_callback=database_auth,
)
```
#### Environment-Based Authentication
```python
import os
def env_auth(token: str) -> bool:
"""Validate tokens from environment variables."""
valid_tokens = set(os.getenv("VALID_API_KEYS", "").split(","))
return token in valid_tokens
server = AOP(
server_name="Env-Auth-Server",
port=5932,
auth_callback=env_auth,
)
```
### Client-Side Authentication
When calling tools on an authenticated server, include the `auth_token` parameter:
```python
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async def call_authenticated_tool():
url = "http://localhost:5932/mcp"
async with streamablehttp_client(url) as ctx:
read, write = ctx if len(ctx) == 2 else (ctx[0], ctx[1])
async with ClientSession(read, write) as session:
await session.initialize()
# Include auth_token in tool call
result = await session.call_tool(
name="Research-Agent",
arguments={
"task": "Research AI trends",
"auth_token": "secret-key-123" # Required!
},
)
print(result)
asyncio.run(call_authenticated_tool())
```
### Authentication Error Handling
When authentication fails, tools return:
```json
{
"result": "",
"success": false,
"error": "Authentication failed"
}
```
### No Authentication
To run without authentication, simply don't provide `auth_callback`:
```python
server = AOP(
server_name="PublicServer",
port=5932,
# No auth_callback = no authentication required
)
```
## Complete Examples
### Basic Server Setup

@ -0,0 +1,61 @@
"""
Simple AOP Client with Authentication
Just pass your token when calling tools. That's it.
The server's auth_callback determines if it's valid.
"""
import json
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async def call_server():
"""Call the AOP server with authentication."""
url = "http://localhost:5932/mcp"
async with streamablehttp_client(url, timeout=10) as ctx:
if len(ctx) == 2:
read, write = ctx
else:
read, write, *_ = ctx
async with ClientSession(read, write) as session:
await session.initialize()
print("\n" + "=" * 60)
print("Calling discover_agents...")
print("=" * 60 + "\n")
# Just pass auth_token in arguments
result = await session.call_tool(
name="discover_agents",
arguments={
"auth_token": "mytoken123" # That's it!
},
)
print(json.dumps(result.model_dump(), indent=2))
print("\n" + "=" * 60)
print("Calling Research-Agent...")
print("=" * 60 + "\n")
# Same for any tool
result = await session.call_tool(
name="Research-Agent",
arguments={
"task": "What is Python?",
"auth_token": "mytoken123" # Just include it
},
)
print(json.dumps(result.model_dump(), indent=2))
if __name__ == "__main__":
print("\n🔐 Simple Auth Client")
print("Token: mytoken123\n")
asyncio.run(call_server())

@ -0,0 +1,56 @@
"""
Simple AOP Server with Custom Authentication Callback
The auth_callback function determines ALL authentication logic.
If you provide auth_callback, authentication is enabled.
If you don't provide it, no authentication is required.
"""
from swarms import Agent
from swarms.structs.aop import AOP
# This function governs ALL security
def custom_auth(token: str) -> bool:
"""
Your custom authentication logic goes here.
Return True to allow access, False to deny.
This function determines everything:
- What tokens are valid
- Token format (API key, JWT, whatever)
- Any additional validation logic
"""
# Simple example: check against valid tokens
valid_tokens = {
"mytoken123",
"anothertoken456",
}
return token in valid_tokens
# Create agents
agent = Agent(
agent_name="Research-Agent",
model_name="claude-sonnet-4-5-20250929",
max_loops=1,
system_prompt="You are a helpful research assistant.",
temperature=0.7,
top_p=None, # Can't use both temperature and top_p with Claude
)
# Create server with auth callback
# If auth_callback is provided, auth is automatically enabled
server = AOP(
server_name="SimpleAuthServer",
port=5932,
auth_callback=custom_auth, # This enables and governs auth
)
server.add_agent(agent)
print("\n🚀 Server starting on port 5932")
print("🔐 Authentication: ENABLED")
print("✅ Valid tokens: mytoken123, anothertoken456\n")
server.run()

@ -7,7 +7,7 @@ import traceback
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Literal, Optional
from typing import Any, Callable, Dict, List, Literal, Optional
from uuid import uuid4
from loguru import logger
@ -573,6 +573,7 @@ class AOP:
max_network_retries: Maximum number of network reconnection attempts
network_retry_delay: Delay between network retry attempts in seconds
network_timeout: Network connection timeout in seconds
auth_callback: Custom authentication callback function (if provided, auth is enabled)
"""
def __init__(
@ -600,6 +601,7 @@ class AOP:
log_level: Literal[
"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
] = "INFO",
auth_callback: Optional[Callable[[str], bool]] = None,
*args,
**kwargs,
):
@ -628,6 +630,7 @@ class AOP:
max_network_retries: Maximum number of network reconnection attempts
network_retry_delay: Delay between network retry attempts in seconds
network_timeout: Network connection timeout in seconds
auth_callback: Custom authentication callback function that takes a token (str) and returns bool. If provided, authentication is enabled.
"""
self.server_name = server_name
self.description = description
@ -648,6 +651,7 @@ class AOP:
self.max_network_retries = max_network_retries
self.network_retry_delay = network_retry_delay
self.network_timeout = network_timeout
self.auth_callback = auth_callback
# Persistence state tracking
self._restart_count = 0
@ -681,7 +685,7 @@ class AOP:
)
logger.info(
f"Initialized AOP with server name: {server_name}, verbose: {verbose}, traceback: {traceback_enabled}, persistence: {persistence}, network_monitoring: {network_monitoring}"
f"Initialized AOP with server name: {server_name}, verbose: {verbose}, traceback: {traceback_enabled}, persistence: {persistence}, network_monitoring: {network_monitoring}, auth: {'enabled' if auth_callback else 'disabled'}"
)
# Add initial agents if provided
@ -696,6 +700,36 @@ class AOP:
if self.queue_enabled:
self._register_queue_management_tools()
def _validate_auth(self, token: Optional[str] = None) -> bool:
"""
Validate authentication token using the configured auth callback.
Args:
token: The authentication token to validate
Returns:
bool: True if no auth callback configured or token is valid, False otherwise
"""
# If no auth callback configured, allow all requests
if self.auth_callback is None:
return True
# If auth callback exists, validate the token
if token is None:
logger.warning("Authentication required but no token provided")
return False
try:
is_valid = self.auth_callback(token)
if not is_valid:
logger.warning(f"Authentication failed for token: {token[:10]}...")
return is_valid
except Exception as e:
logger.error(f"Error during authentication: {str(e)}")
if self.traceback_enabled:
logger.error(traceback.format_exc())
return False
def add_agent(
self,
agent: AgentType,
@ -972,6 +1006,7 @@ class AOP:
imgs: List[str] = None,
correct_answer: str = None,
max_retries: int = None,
auth_token: str = None,
) -> Dict[str, Any]:
"""
Execute the agent with the provided parameters.
@ -982,10 +1017,23 @@ class AOP:
imgs: Optional list of images to be processed by the agent
correct_answer: Optional correct answer for validation or comparison
max_retries: Maximum number of retries (uses config default if None)
auth_token: Optional authentication token (required if auth is enabled)
Returns:
Dict containing the agent's response and execution status
"""
# Validate authentication first
if not self._validate_auth(auth_token):
error_msg = "Authentication failed"
logger.warning(
f"Tool '{tool_name}' authentication failed"
)
return {
"result": "",
"success": False,
"error": error_msg,
}
start_time = None
if config.verbose:
start_time = (
@ -1685,16 +1733,27 @@ class AOP:
name="discover_agents",
description="Discover information about other agents in the cluster including their name, description, system prompt (truncated to 200 chars), and tags.",
)
def discover_agents(agent_name: str = None) -> Dict[str, Any]:
def discover_agents(
agent_name: str = None, auth_token: str = None
) -> Dict[str, Any]:
"""
Discover information about agents in the cluster.
Args:
agent_name: Optional specific agent name to get info for. If None, returns info for all agents.
auth_token: Optional authentication token (required if auth is enabled)
Returns:
Dict containing agent information for discovery
"""
# Validate authentication
if not self._validate_auth(auth_token):
return {
"success": False,
"error": "Authentication failed",
"agents": [],
}
try:
if agent_name:
# Get specific agent info

Loading…
Cancel
Save