From 977d2f27c750fec41188b11165d629d000a7e1f7 Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Wed, 5 Nov 2025 15:38:27 +0200 Subject: [PATCH] Implement x402 payment middleware and configuration Added payment middleware support for x402 payment verification, including PaymentConfig model and validation. --- swarms/structs/aop.py | 318 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 316 insertions(+), 2 deletions(-) diff --git a/swarms/structs/aop.py b/swarms/structs/aop.py index b95acb77..2a64fa11 100644 --- a/swarms/structs/aop.py +++ b/swarms/structs/aop.py @@ -1,4 +1,5 @@ import asyncio +import json import socket import sys import threading @@ -7,11 +8,12 @@ import traceback from collections import deque from dataclasses import dataclass, field from enum import Enum -from typing import Any, Dict, List, Literal, Optional +from typing import Any, Callable, Dict, List, Literal, Optional from uuid import uuid4 from loguru import logger from mcp.server.fastmcp import FastMCP +from pydantic import BaseModel, Field, field_validator from swarms.structs.agent import Agent from swarms.structs.omni_agent_types import AgentType @@ -20,6 +22,211 @@ from swarms.tools.mcp_client_tools import ( ) +# Middleware type definition +# A middleware function receives the tool execution context and can modify inputs/outputs. +# Middleware functions are called before tool execution and can modify params and context in-place. +# Args: +# tool_name: Name of the tool being executed +# params: Dictionary of tool parameters (task, img, imgs, correct_answer, max_retries) +# Can be modified in-place by the middleware +# context: Additional context dictionary (agent, config, etc.) +# Can be modified in-place by the middleware +# Returns: +# None (modifications are done in-place) +MiddlewareType = Callable[[str, Dict[str, Any], Dict[str, Any]], None] + + +def create_payment_middleware( + payment_config: PaymentConfig, +) -> MiddlewareType: + """ + Create a payment middleware function for x402 payment verification. + + This middleware verifies that payment has been made before allowing + tool execution to proceed. It checks for the X-PAYMENT header which + contains the payment proof. + + Args: + payment_config: Payment configuration for the agent + + Returns: + Middleware function that verifies payments + """ + + def payment_middleware( + tool_name: str, params: Dict[str, Any], context: Dict[str, Any] + ) -> None: + """ + Verify payment before allowing tool execution. + + Args: + tool_name: Name of the tool being executed + params: Tool parameters (can be modified in-place) + context: Execution context (contains agent, config, etc.) + + Raises: + ValueError: If payment is required but not provided or invalid + """ + if not payment_config.enabled: + return # Payment not required for this agent + + # Get payment proof from context (set by AOP from request headers) + payment_proof = context.get("payment_proof") + payment_header = context.get("payment_header") + + if not payment_proof and not payment_header: + # Payment required but not provided + # Return payment instructions in a format that can be handled + error_msg = ( + f"Payment required: {payment_config.price} per request. " + f"Payment address: {payment_config.pay_to_address} " + f"Network: {payment_config.network_id}" + ) + + # Store payment instructions in context for proper error handling + context["payment_required"] = True + context["payment_instructions"] = { + "price": payment_config.price, + "pay_to_address": payment_config.pay_to_address, + "network_id": payment_config.network_id, + "description": payment_config.description, + "facilitator_url": payment_config.facilitator_url, + "input_schema": payment_config.input_schema, + "output_schema": payment_config.output_schema, + } + + raise ValueError(error_msg) + + # If payment proof is provided, log it (actual verification would + # be done by x402 client libraries in a real implementation) + if payment_proof or payment_header: + logger.debug( + f"Payment proof received for tool '{tool_name}': " + f"{payment_proof or payment_header[:50] if payment_header else 'N/A'}..." + ) + # In a full implementation, you would verify the payment proof here + # using x402 client libraries + + return payment_middleware + + +class PaymentConfig(BaseModel): + """ + Configuration for x402 payment integration per agent. + + This model allows you to monetize each agent individually by configuring + payment requirements using the x402 protocol (Coinbase Commerce). + + Attributes: + enabled: Whether payment is required for this agent + price: Price per request in USD (e.g., "$0.01", "$1.00") + pay_to_address: EVM-compatible wallet address to receive payments + network_id: Blockchain network ID (e.g., "base-sepolia", "base") + description: Description of what the payment is for + facilitator_url: Optional custom facilitator URL (defaults to https://x402.org/facilitator) + input_schema: Optional JSON schema for input validation + output_schema: Optional JSON schema for output description + """ + + enabled: bool = Field( + default=False, + description="Whether payment is required for this agent", + ) + price: str = Field( + default="$0.01", + description="Price per request in USD (e.g., '$0.01', '$1.00')", + ) + pay_to_address: Optional[str] = Field( + default=None, + description="EVM-compatible wallet address to receive payments", + ) + network_id: str = Field( + default="base-sepolia", + description="Blockchain network ID (e.g., 'base-sepolia' for testnet, 'base' for mainnet)", + ) + description: Optional[str] = Field( + default=None, + description="Description of what the payment is for", + ) + facilitator_url: str = Field( + default="https://x402.org/facilitator", + description="Facilitator URL for payment processing", + ) + input_schema: Optional[Dict[str, Any]] = Field( + default=None, + description="Optional JSON schema for input validation", + ) + output_schema: Optional[Dict[str, Any]] = Field( + default=None, + description="Optional JSON schema for output description", + ) + + @field_validator("price") + @classmethod + def validate_price(cls, v: str) -> str: + """ + Validate that price is in the correct format. + + Args: + v: Price string to validate + + Returns: + Validated price string + + Raises: + ValueError: If price format is invalid + """ + if not v.startswith("$"): + raise ValueError("Price must start with '$' (e.g., '$0.01')") + try: + float(v[1:]) + except ValueError: + raise ValueError( + "Price must be a valid number after '$' (e.g., '$0.01')" + ) + return v + + @field_validator("pay_to_address") + @classmethod + def validate_address(cls, v: Optional[str], info) -> Optional[str]: + """ + Validate wallet address when payment is enabled. + + Args: + v: Wallet address to validate + info: Validation context + + Returns: + Validated address + + Raises: + ValueError: If payment is enabled but address is missing or invalid + """ + if info.data.get("enabled", False): + if not v: + raise ValueError( + "pay_to_address is required when payment is enabled" + ) + if not v.startswith("0x") or len(v) != 42: + raise ValueError( + "pay_to_address must be a valid EVM address (0x followed by 40 hex characters)" + ) + return v + + class Config: + """Pydantic configuration.""" + + json_schema_extra = { + "example": { + "enabled": True, + "price": "$0.01", + "pay_to_address": "0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb", + "network_id": "base-sepolia", + "description": "AI-powered research agent", + } + } + + class TaskStatus(Enum): """Status of a task in the queue.""" @@ -535,6 +742,7 @@ class AgentToolConfig: max_retries: Number of retries if agent execution fails verbose: Enable verbose logging for this tool traceback_enabled: Enable traceback logging for errors + payment_config: Optional x402 payment configuration for monetizing this agent """ tool_name: str @@ -545,6 +753,7 @@ class AgentToolConfig: max_retries: int = 3 verbose: bool = False traceback_enabled: bool = True + payment_config: Optional[PaymentConfig] = None class AOP: @@ -558,12 +767,14 @@ class AOP: 4. Manage the MCP server lifecycle 5. Queue-based task execution for improved performance and reliability 6. Persistence mode with automatic restart and failsafe protection + 7. Custom middleware support for intercepting and modifying tool executions Attributes: mcp_server: The FastMCP server instance agents: Dictionary mapping tool names to agent instances tool_configs: Dictionary mapping tool names to their configurations task_queues: Dictionary mapping tool names to their task queues + middlewares: List of middleware functions to apply to tool executions server_name: Name of the MCP server queue_enabled: Whether queue-based execution is enabled persistence: Whether persistence mode is enabled @@ -573,6 +784,27 @@ class AOP: max_network_retries: Maximum number of network reconnection attempts network_retry_delay: Delay between network retry attempts in seconds network_timeout: Network connection timeout in seconds + + Example: + >>> from swarms import Agent, AOP + >>> + >>> # Define a middleware function + >>> def auth_middleware(tool_name: str, params: dict, context: dict) -> None: + ... # Add authentication logic here + ... if not context.get("authenticated"): + ... raise ValueError("Not authenticated") + ... # Modify params if needed + ... params["task"] = f"[AUTH] {params['task']}" + >>> + >>> # Create AOP with middleware + >>> aop = AOP( + ... server_name="MyServer", + ... middlewares=[auth_middleware] + ... ) + >>> + >>> # Add agents + >>> agent = Agent(model_name="gpt-4") + >>> aop.add_agent(agent) """ def __init__( @@ -600,6 +832,7 @@ class AOP: log_level: Literal[ "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" ] = "INFO", + middlewares: Optional[List[MiddlewareType]] = None, *args, **kwargs, ): @@ -628,6 +861,10 @@ class AOP: max_network_retries: Maximum number of network reconnection attempts network_retry_delay: Delay between network retry attempts in seconds network_timeout: Network connection timeout in seconds + middlewares: Optional list of middleware functions to apply to tool executions. + Each middleware receives (tool_name, params, context) and can modify + params and context in-place. Middlewares are applied in order before + each tool execution. """ self.server_name = server_name self.description = description @@ -663,6 +900,7 @@ class AOP: self.tool_configs: Dict[str, AgentToolConfig] = {} self.task_queues: Dict[str, TaskQueue] = {} self.transport = transport + self.middlewares: List[MiddlewareType] = middlewares or [] self.mcp_server = FastMCP( name=server_name, port=port, @@ -707,6 +945,7 @@ class AOP: max_retries: int = 3, verbose: Optional[bool] = None, traceback_enabled: Optional[bool] = None, + payment_config: Optional[PaymentConfig] = None, ) -> str: """ Add an agent to the MCP server as a tool. @@ -721,6 +960,7 @@ class AOP: max_retries: Number of retries on failure verbose: Enable verbose logging for this tool (defaults to deployer's verbose setting) traceback_enabled: Enable traceback logging for this tool (defaults to deployer's setting) + payment_config: Optional x402 payment configuration for monetizing this agent Returns: str: The tool name that was registered @@ -819,6 +1059,7 @@ class AOP: max_retries=max_retries, verbose=verbose, traceback_enabled=traceback_enabled, + payment_config=payment_config, ) # Create task queue if queue is enabled @@ -841,8 +1082,13 @@ class AOP: # Re-register the discovery tool to include the new agent self._register_agent_discovery_tool() + payment_info = ( + f"payment_enabled={payment_config.enabled}" + if payment_config + else "payment_enabled=False" + ) logger.info( - f"Added agent '{agent.agent_name}' as tool '{tool_name}' (verbose={verbose}, traceback={traceback_enabled}, queue_enabled={self.queue_enabled})" + f"Added agent '{agent.agent_name}' as tool '{tool_name}' (verbose={verbose}, traceback={traceback_enabled}, queue_enabled={self.queue_enabled}, {payment_info})" ) return tool_name @@ -1020,6 +1266,74 @@ class AOP: "error": error_msg, } + # Prepare params and context for middleware + params = { + "task": task, + "img": img, + "imgs": imgs, + "correct_answer": correct_answer, + "max_retries": max_retries, + } + context = { + "agent": agent, + "config": config, + "tool_name": tool_name, + } + + # Build middleware chain: payment middleware first (if enabled), then custom middlewares + middleware_chain = [] + + # Add payment middleware if payment is enabled for this agent + if config.payment_config and config.payment_config.enabled: + payment_middleware = create_payment_middleware( + config.payment_config + ) + middleware_chain.append(payment_middleware) + + # Add custom middlewares + middleware_chain.extend(self.middlewares) + + # Apply middleware in order + for middleware in middleware_chain: + try: + middleware(tool_name, params, context) + except Exception as e: + # Middleware exceptions should stop execution + # This allows middleware to reject requests (e.g., auth failures, payment required) + error_msg = f"Middleware error for tool '{tool_name}': {str(e)}" + logger.warning(error_msg) + + # Check if this is a payment required error + if context.get("payment_required"): + payment_instructions = context.get( + "payment_instructions", {} + ) + return { + "result": "", + "success": False, + "error": error_msg, + "payment_required": True, + "payment_instructions": payment_instructions, + } + + if config.traceback_enabled: + logger.debug( + f"Middleware traceback: {traceback.format_exc()}" + ) + # Return error response instead of continuing + return { + "result": "", + "success": False, + "error": error_msg, + } + + # Extract params after middleware processing + task = params.get("task", task) + img = params.get("img", img) + imgs = params.get("imgs", imgs) + correct_answer = params.get("correct_answer", correct_answer) + max_retries = params.get("max_retries", max_retries) + # Use queue-based execution if enabled if ( self.queue_enabled