From 7b5ef1e63c5d7137abf4aacefd9db59fb159ea7e Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Tue, 4 Nov 2025 19:27:10 -0800 Subject: [PATCH] [DOCS][IMPROVEMENT][EXAMPLES] Custom Auth Callback for AOP --- docs/swarms/examples/aop_server_example.md | 116 ++++++++++++ docs/swarms/structs/aop.md | 169 ++++++++++++++++++ .../aop_examples/auth/simple_auth_client.py | 61 +++++++ .../aop_examples/auth/simple_auth_server.py | 56 ++++++ swarms/structs/aop.py | 65 ++++++- 5 files changed, 464 insertions(+), 3 deletions(-) create mode 100644 examples/aop_examples/auth/simple_auth_client.py create mode 100644 examples/aop_examples/auth/simple_auth_server.py diff --git a/docs/swarms/examples/aop_server_example.md b/docs/swarms/examples/aop_server_example.md index e8ebeca9..20978262 100644 --- a/docs/swarms/examples/aop_server_example.md +++ b/docs/swarms/examples/aop_server_example.md @@ -156,6 +156,122 @@ Each agent is created with: - **Port**: Change the port number as needed - **Verbose**: Set to False for reduced logging - **Server Name**: Use a descriptive name for your server +- **Authentication**: Add `auth_callback` to enable security (see below) + +## Adding Authentication + +You can secure your AOP server by adding a custom authentication callback: + +### Simple API Key Authentication + +```python +from swarms import Agent +from swarms.structs.aop import AOP + +# Define authentication callback +def my_auth(token: str) -> bool: + """Validate API keys.""" + valid_keys = {"api-key-1", "api-key-2", "api-key-3"} + return token in valid_keys + +# Create agents (same as above) +research_agent = Agent( + agent_name="Research-Agent", + model_name="claude-sonnet-4-5-20250929", + max_loops=1, + system_prompt="You are a research specialist.", + temperature=0.7, + top_p=None, +) + +# Create AOP with authentication +deployer = AOP( + server_name="SecureAgentServer", + port=5932, + verbose=True, + auth_callback=my_auth, # Enable authentication +) + +deployer.add_agent(research_agent) +deployer.run() +``` + +### JWT Token Authentication + +```python +import jwt + +def jwt_auth(token: str) -> bool: + """Validate JWT tokens.""" + try: + payload = jwt.decode(token, "your-secret-key", algorithms=["HS256"]) + return payload.get("authorized", False) + except: + return False + +deployer = AOP( + server_name="JWT-SecureServer", + port=5932, + auth_callback=jwt_auth, +) +``` + +### Environment-Based Authentication + +```python +import os + +def env_auth(token: str) -> bool: + """Validate tokens from environment.""" + valid_tokens = set(os.getenv("VALID_API_KEYS", "").split(",")) + return token in valid_tokens + +deployer = AOP( + server_name="Env-AuthServer", + port=5932, + auth_callback=env_auth, +) +``` + +### Client Usage with Authentication + +When calling tools on an authenticated server: + +```python +import asyncio +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client + +async def call_tool(): + url = "http://localhost:5932/mcp" + + async with streamablehttp_client(url) as ctx: + read, write = ctx if len(ctx) == 2 else (ctx[0], ctx[1]) + + async with ClientSession(read, write) as session: + await session.initialize() + + # Include auth_token parameter + result = await session.call_tool( + name="Research-Agent", + arguments={ + "task": "Research AI trends", + "auth_token": "api-key-1" # Required! + }, + ) + + print(result) + +asyncio.run(call_tool()) +``` + +### Authentication Rules + +- If `auth_callback` is provided → authentication is enabled +- If `auth_callback` is None → no authentication required +- The callback function determines ALL security logic +- Return `True` to allow access, `False` to deny +- Failed authentication returns: `{"success": false, "error": "Authentication failed"}` ## Next Steps diff --git a/docs/swarms/structs/aop.md b/docs/swarms/structs/aop.md index 8062503a..f93cda11 100644 --- a/docs/swarms/structs/aop.md +++ b/docs/swarms/structs/aop.md @@ -122,6 +122,7 @@ Main class for deploying agents as tools in an MCP server. | `processing_timeout` | `int` | `30` | Timeout for processing tasks in seconds | | `retry_delay` | `float` | `1.0` | Delay between retries in seconds | | `log_level` | `str` | `"INFO"` | Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) | +| `auth_callback` | `Optional[Callable[[str], bool]]` | `None` | Custom authentication callback function. If provided, authentication is enabled. | | `*args` | `Any` | - | Additional positional arguments passed to FastMCP | | `**kwargs` | `Any` | - | Additional keyword arguments passed to FastMCP | @@ -585,6 +586,7 @@ All agent tools accept the following parameters: | `img` | `str` | No | Single image to be processed by the agent | | `imgs` | `List[str]` | No | Multiple images to be processed by the agent | | `correct_answer` | `str` | No | Correct answer for validation or comparison | +| `auth_token` | `str` | Conditional | Authentication token (required if `auth_callback` is configured on server) | ## Output Format @@ -598,6 +600,173 @@ All agent tools return a standardized response format: } ``` +## Authentication for AOP Servers + +AOP supports optional authentication to secure your agent deployments. Authentication is enabled by providing a custom `auth_callback` function when creating the AOP instance. + +### How Authentication Works + +1. **Server Side**: Provide an `auth_callback` function that validates tokens +2. **Client Side**: Include `auth_token` parameter when calling tools +3. **The callback determines ALL security logic** - token format, validation, etc. + +### Authentication Callback + +The `auth_callback` function should: +- Accept a single parameter: `token` (str) +- Return `True` to allow access, `False` to deny +- Handle all validation logic (API keys, JWT, database lookups, etc.) + +### Authentication Examples + +#### Simple API Key Authentication + +```python +from swarms import Agent +from swarms.structs.aop import AOP + +# Define authentication callback +def simple_auth(token: str) -> bool: + """Validate token against a list of valid API keys.""" + valid_tokens = {"secret-key-123", "secret-key-456", "secret-key-789"} + return token in valid_tokens + +# Create agent +agent = Agent( + agent_name="Research-Agent", + model_name="claude-sonnet-4-5-20250929", + max_loops=1, + system_prompt="You are a helpful research assistant.", + temperature=0.7, + top_p=None, +) + +# Create AOP with authentication +server = AOP( + server_name="SecureServer", + port=5932, + auth_callback=simple_auth, # This enables and governs authentication +) + +server.add_agent(agent) +server.run() +``` + +#### JWT Token Authentication + +```python +import jwt + +def jwt_auth(token: str) -> bool: + """Validate JWT tokens.""" + try: + secret = "your-secret-key" + payload = jwt.decode(token, secret, algorithms=["HS256"]) + return payload.get("authorized", False) + except jwt.InvalidTokenError: + return False + +server = AOP( + server_name="JWT-Server", + port=5932, + auth_callback=jwt_auth, +) +``` + +#### Database Authentication + +```python +def database_auth(token: str) -> bool: + """Validate tokens against database.""" + from your_db import session, Token + + token_record = session.query(Token).filter_by( + token=token, + is_active=True + ).first() + + return token_record is not None + +server = AOP( + server_name="DB-Auth-Server", + port=5932, + auth_callback=database_auth, +) +``` + +#### Environment-Based Authentication + +```python +import os + +def env_auth(token: str) -> bool: + """Validate tokens from environment variables.""" + valid_tokens = set(os.getenv("VALID_API_KEYS", "").split(",")) + return token in valid_tokens + +server = AOP( + server_name="Env-Auth-Server", + port=5932, + auth_callback=env_auth, +) +``` + +### Client-Side Authentication + +When calling tools on an authenticated server, include the `auth_token` parameter: + +```python +import asyncio +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client + +async def call_authenticated_tool(): + url = "http://localhost:5932/mcp" + + async with streamablehttp_client(url) as ctx: + read, write = ctx if len(ctx) == 2 else (ctx[0], ctx[1]) + + async with ClientSession(read, write) as session: + await session.initialize() + + # Include auth_token in tool call + result = await session.call_tool( + name="Research-Agent", + arguments={ + "task": "Research AI trends", + "auth_token": "secret-key-123" # Required! + }, + ) + + print(result) + +asyncio.run(call_authenticated_tool()) +``` + +### Authentication Error Handling + +When authentication fails, tools return: + +```json +{ + "result": "", + "success": false, + "error": "Authentication failed" +} +``` + +### No Authentication + +To run without authentication, simply don't provide `auth_callback`: + +```python +server = AOP( + server_name="PublicServer", + port=5932, + # No auth_callback = no authentication required +) +``` + ## Complete Examples ### Basic Server Setup diff --git a/examples/aop_examples/auth/simple_auth_client.py b/examples/aop_examples/auth/simple_auth_client.py new file mode 100644 index 00000000..41ebc49a --- /dev/null +++ b/examples/aop_examples/auth/simple_auth_client.py @@ -0,0 +1,61 @@ +""" +Simple AOP Client with Authentication + +Just pass your token when calling tools. That's it. +The server's auth_callback determines if it's valid. +""" + +import json +import asyncio +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client + + +async def call_server(): + """Call the AOP server with authentication.""" + + url = "http://localhost:5932/mcp" + + async with streamablehttp_client(url, timeout=10) as ctx: + if len(ctx) == 2: + read, write = ctx + else: + read, write, *_ = ctx + + async with ClientSession(read, write) as session: + await session.initialize() + + print("\n" + "=" * 60) + print("Calling discover_agents...") + print("=" * 60 + "\n") + + # Just pass auth_token in arguments + result = await session.call_tool( + name="discover_agents", + arguments={ + "auth_token": "mytoken123" # That's it! + }, + ) + + print(json.dumps(result.model_dump(), indent=2)) + + print("\n" + "=" * 60) + print("Calling Research-Agent...") + print("=" * 60 + "\n") + + # Same for any tool + result = await session.call_tool( + name="Research-Agent", + arguments={ + "task": "What is Python?", + "auth_token": "mytoken123" # Just include it + }, + ) + + print(json.dumps(result.model_dump(), indent=2)) + + +if __name__ == "__main__": + print("\nšŸ” Simple Auth Client") + print("Token: mytoken123\n") + asyncio.run(call_server()) diff --git a/examples/aop_examples/auth/simple_auth_server.py b/examples/aop_examples/auth/simple_auth_server.py new file mode 100644 index 00000000..a00fbeea --- /dev/null +++ b/examples/aop_examples/auth/simple_auth_server.py @@ -0,0 +1,56 @@ +""" +Simple AOP Server with Custom Authentication Callback + +The auth_callback function determines ALL authentication logic. +If you provide auth_callback, authentication is enabled. +If you don't provide it, no authentication is required. +""" + +from swarms import Agent +from swarms.structs.aop import AOP + + +# This function governs ALL security +def custom_auth(token: str) -> bool: + """ + Your custom authentication logic goes here. + Return True to allow access, False to deny. + + This function determines everything: + - What tokens are valid + - Token format (API key, JWT, whatever) + - Any additional validation logic + """ + # Simple example: check against valid tokens + valid_tokens = { + "mytoken123", + "anothertoken456", + } + return token in valid_tokens + + +# Create agents +agent = Agent( + agent_name="Research-Agent", + model_name="claude-sonnet-4-5-20250929", + max_loops=1, + system_prompt="You are a helpful research assistant.", + temperature=0.7, + top_p=None, # Can't use both temperature and top_p with Claude +) + +# Create server with auth callback +# If auth_callback is provided, auth is automatically enabled +server = AOP( + server_name="SimpleAuthServer", + port=5932, + auth_callback=custom_auth, # This enables and governs auth +) + +server.add_agent(agent) + +print("\nšŸš€ Server starting on port 5932") +print("šŸ” Authentication: ENABLED") +print("āœ… Valid tokens: mytoken123, anothertoken456\n") + +server.run() diff --git a/swarms/structs/aop.py b/swarms/structs/aop.py index b95acb77..7bd849a7 100644 --- a/swarms/structs/aop.py +++ b/swarms/structs/aop.py @@ -7,7 +7,7 @@ import traceback from collections import deque from dataclasses import dataclass, field from enum import Enum -from typing import Any, Dict, List, Literal, Optional +from typing import Any, Callable, Dict, List, Literal, Optional from uuid import uuid4 from loguru import logger @@ -573,6 +573,7 @@ class AOP: max_network_retries: Maximum number of network reconnection attempts network_retry_delay: Delay between network retry attempts in seconds network_timeout: Network connection timeout in seconds + auth_callback: Custom authentication callback function (if provided, auth is enabled) """ def __init__( @@ -600,6 +601,7 @@ class AOP: log_level: Literal[ "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" ] = "INFO", + auth_callback: Optional[Callable[[str], bool]] = None, *args, **kwargs, ): @@ -628,6 +630,7 @@ class AOP: max_network_retries: Maximum number of network reconnection attempts network_retry_delay: Delay between network retry attempts in seconds network_timeout: Network connection timeout in seconds + auth_callback: Custom authentication callback function that takes a token (str) and returns bool. If provided, authentication is enabled. """ self.server_name = server_name self.description = description @@ -648,6 +651,7 @@ class AOP: self.max_network_retries = max_network_retries self.network_retry_delay = network_retry_delay self.network_timeout = network_timeout + self.auth_callback = auth_callback # Persistence state tracking self._restart_count = 0 @@ -681,7 +685,7 @@ class AOP: ) logger.info( - f"Initialized AOP with server name: {server_name}, verbose: {verbose}, traceback: {traceback_enabled}, persistence: {persistence}, network_monitoring: {network_monitoring}" + f"Initialized AOP with server name: {server_name}, verbose: {verbose}, traceback: {traceback_enabled}, persistence: {persistence}, network_monitoring: {network_monitoring}, auth: {'enabled' if auth_callback else 'disabled'}" ) # Add initial agents if provided @@ -696,6 +700,36 @@ class AOP: if self.queue_enabled: self._register_queue_management_tools() + def _validate_auth(self, token: Optional[str] = None) -> bool: + """ + Validate authentication token using the configured auth callback. + + Args: + token: The authentication token to validate + + Returns: + bool: True if no auth callback configured or token is valid, False otherwise + """ + # If no auth callback configured, allow all requests + if self.auth_callback is None: + return True + + # If auth callback exists, validate the token + if token is None: + logger.warning("Authentication required but no token provided") + return False + + try: + is_valid = self.auth_callback(token) + if not is_valid: + logger.warning(f"Authentication failed for token: {token[:10]}...") + return is_valid + except Exception as e: + logger.error(f"Error during authentication: {str(e)}") + if self.traceback_enabled: + logger.error(traceback.format_exc()) + return False + def add_agent( self, agent: AgentType, @@ -972,6 +1006,7 @@ class AOP: imgs: List[str] = None, correct_answer: str = None, max_retries: int = None, + auth_token: str = None, ) -> Dict[str, Any]: """ Execute the agent with the provided parameters. @@ -982,10 +1017,23 @@ class AOP: imgs: Optional list of images to be processed by the agent correct_answer: Optional correct answer for validation or comparison max_retries: Maximum number of retries (uses config default if None) + auth_token: Optional authentication token (required if auth is enabled) Returns: Dict containing the agent's response and execution status """ + # Validate authentication first + if not self._validate_auth(auth_token): + error_msg = "Authentication failed" + logger.warning( + f"Tool '{tool_name}' authentication failed" + ) + return { + "result": "", + "success": False, + "error": error_msg, + } + start_time = None if config.verbose: start_time = ( @@ -1685,16 +1733,27 @@ class AOP: name="discover_agents", description="Discover information about other agents in the cluster including their name, description, system prompt (truncated to 200 chars), and tags.", ) - def discover_agents(agent_name: str = None) -> Dict[str, Any]: + def discover_agents( + agent_name: str = None, auth_token: str = None + ) -> Dict[str, Any]: """ Discover information about agents in the cluster. Args: agent_name: Optional specific agent name to get info for. If None, returns info for all agents. + auth_token: Optional authentication token (required if auth is enabled) Returns: Dict containing agent information for discovery """ + # Validate authentication + if not self._validate_auth(auth_token): + return { + "success": False, + "error": "Authentication failed", + "agents": [], + } + try: if agent_name: # Get specific agent info