Implement x402 payment middleware and configuration

Added payment middleware support for x402 payment verification, including PaymentConfig model and validation.
pull/1185/head
CI-DEV 2 months ago committed by GitHub
parent 638e9e2ba2
commit 977d2f27c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,4 +1,5 @@
import asyncio
import json
import socket
import sys
import threading
@ -7,11 +8,12 @@ import traceback
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Literal, Optional
from typing import Any, Callable, Dict, List, Literal, Optional
from uuid import uuid4
from loguru import logger
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field, field_validator
from swarms.structs.agent import Agent
from swarms.structs.omni_agent_types import AgentType
@ -20,6 +22,211 @@ from swarms.tools.mcp_client_tools import (
)
# Middleware type definition
# A middleware function receives the tool execution context and can modify inputs/outputs.
# Middleware functions are called before tool execution and can modify params and context in-place.
# Args:
# tool_name: Name of the tool being executed
# params: Dictionary of tool parameters (task, img, imgs, correct_answer, max_retries)
# Can be modified in-place by the middleware
# context: Additional context dictionary (agent, config, etc.)
# Can be modified in-place by the middleware
# Returns:
# None (modifications are done in-place)
MiddlewareType = Callable[[str, Dict[str, Any], Dict[str, Any]], None]
def create_payment_middleware(
payment_config: PaymentConfig,
) -> MiddlewareType:
"""
Create a payment middleware function for x402 payment verification.
This middleware verifies that payment has been made before allowing
tool execution to proceed. It checks for the X-PAYMENT header which
contains the payment proof.
Args:
payment_config: Payment configuration for the agent
Returns:
Middleware function that verifies payments
"""
def payment_middleware(
tool_name: str, params: Dict[str, Any], context: Dict[str, Any]
) -> None:
"""
Verify payment before allowing tool execution.
Args:
tool_name: Name of the tool being executed
params: Tool parameters (can be modified in-place)
context: Execution context (contains agent, config, etc.)
Raises:
ValueError: If payment is required but not provided or invalid
"""
if not payment_config.enabled:
return # Payment not required for this agent
# Get payment proof from context (set by AOP from request headers)
payment_proof = context.get("payment_proof")
payment_header = context.get("payment_header")
if not payment_proof and not payment_header:
# Payment required but not provided
# Return payment instructions in a format that can be handled
error_msg = (
f"Payment required: {payment_config.price} per request. "
f"Payment address: {payment_config.pay_to_address} "
f"Network: {payment_config.network_id}"
)
# Store payment instructions in context for proper error handling
context["payment_required"] = True
context["payment_instructions"] = {
"price": payment_config.price,
"pay_to_address": payment_config.pay_to_address,
"network_id": payment_config.network_id,
"description": payment_config.description,
"facilitator_url": payment_config.facilitator_url,
"input_schema": payment_config.input_schema,
"output_schema": payment_config.output_schema,
}
raise ValueError(error_msg)
# If payment proof is provided, log it (actual verification would
# be done by x402 client libraries in a real implementation)
if payment_proof or payment_header:
logger.debug(
f"Payment proof received for tool '{tool_name}': "
f"{payment_proof or payment_header[:50] if payment_header else 'N/A'}..."
)
# In a full implementation, you would verify the payment proof here
# using x402 client libraries
return payment_middleware
class PaymentConfig(BaseModel):
"""
Configuration for x402 payment integration per agent.
This model allows you to monetize each agent individually by configuring
payment requirements using the x402 protocol (Coinbase Commerce).
Attributes:
enabled: Whether payment is required for this agent
price: Price per request in USD (e.g., "$0.01", "$1.00")
pay_to_address: EVM-compatible wallet address to receive payments
network_id: Blockchain network ID (e.g., "base-sepolia", "base")
description: Description of what the payment is for
facilitator_url: Optional custom facilitator URL (defaults to https://x402.org/facilitator)
input_schema: Optional JSON schema for input validation
output_schema: Optional JSON schema for output description
"""
enabled: bool = Field(
default=False,
description="Whether payment is required for this agent",
)
price: str = Field(
default="$0.01",
description="Price per request in USD (e.g., '$0.01', '$1.00')",
)
pay_to_address: Optional[str] = Field(
default=None,
description="EVM-compatible wallet address to receive payments",
)
network_id: str = Field(
default="base-sepolia",
description="Blockchain network ID (e.g., 'base-sepolia' for testnet, 'base' for mainnet)",
)
description: Optional[str] = Field(
default=None,
description="Description of what the payment is for",
)
facilitator_url: str = Field(
default="https://x402.org/facilitator",
description="Facilitator URL for payment processing",
)
input_schema: Optional[Dict[str, Any]] = Field(
default=None,
description="Optional JSON schema for input validation",
)
output_schema: Optional[Dict[str, Any]] = Field(
default=None,
description="Optional JSON schema for output description",
)
@field_validator("price")
@classmethod
def validate_price(cls, v: str) -> str:
"""
Validate that price is in the correct format.
Args:
v: Price string to validate
Returns:
Validated price string
Raises:
ValueError: If price format is invalid
"""
if not v.startswith("$"):
raise ValueError("Price must start with '$' (e.g., '$0.01')")
try:
float(v[1:])
except ValueError:
raise ValueError(
"Price must be a valid number after '$' (e.g., '$0.01')"
)
return v
@field_validator("pay_to_address")
@classmethod
def validate_address(cls, v: Optional[str], info) -> Optional[str]:
"""
Validate wallet address when payment is enabled.
Args:
v: Wallet address to validate
info: Validation context
Returns:
Validated address
Raises:
ValueError: If payment is enabled but address is missing or invalid
"""
if info.data.get("enabled", False):
if not v:
raise ValueError(
"pay_to_address is required when payment is enabled"
)
if not v.startswith("0x") or len(v) != 42:
raise ValueError(
"pay_to_address must be a valid EVM address (0x followed by 40 hex characters)"
)
return v
class Config:
"""Pydantic configuration."""
json_schema_extra = {
"example": {
"enabled": True,
"price": "$0.01",
"pay_to_address": "0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb",
"network_id": "base-sepolia",
"description": "AI-powered research agent",
}
}
class TaskStatus(Enum):
"""Status of a task in the queue."""
@ -535,6 +742,7 @@ class AgentToolConfig:
max_retries: Number of retries if agent execution fails
verbose: Enable verbose logging for this tool
traceback_enabled: Enable traceback logging for errors
payment_config: Optional x402 payment configuration for monetizing this agent
"""
tool_name: str
@ -545,6 +753,7 @@ class AgentToolConfig:
max_retries: int = 3
verbose: bool = False
traceback_enabled: bool = True
payment_config: Optional[PaymentConfig] = None
class AOP:
@ -558,12 +767,14 @@ class AOP:
4. Manage the MCP server lifecycle
5. Queue-based task execution for improved performance and reliability
6. Persistence mode with automatic restart and failsafe protection
7. Custom middleware support for intercepting and modifying tool executions
Attributes:
mcp_server: The FastMCP server instance
agents: Dictionary mapping tool names to agent instances
tool_configs: Dictionary mapping tool names to their configurations
task_queues: Dictionary mapping tool names to their task queues
middlewares: List of middleware functions to apply to tool executions
server_name: Name of the MCP server
queue_enabled: Whether queue-based execution is enabled
persistence: Whether persistence mode is enabled
@ -573,6 +784,27 @@ class AOP:
max_network_retries: Maximum number of network reconnection attempts
network_retry_delay: Delay between network retry attempts in seconds
network_timeout: Network connection timeout in seconds
Example:
>>> from swarms import Agent, AOP
>>>
>>> # Define a middleware function
>>> def auth_middleware(tool_name: str, params: dict, context: dict) -> None:
... # Add authentication logic here
... if not context.get("authenticated"):
... raise ValueError("Not authenticated")
... # Modify params if needed
... params["task"] = f"[AUTH] {params['task']}"
>>>
>>> # Create AOP with middleware
>>> aop = AOP(
... server_name="MyServer",
... middlewares=[auth_middleware]
... )
>>>
>>> # Add agents
>>> agent = Agent(model_name="gpt-4")
>>> aop.add_agent(agent)
"""
def __init__(
@ -600,6 +832,7 @@ class AOP:
log_level: Literal[
"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
] = "INFO",
middlewares: Optional[List[MiddlewareType]] = None,
*args,
**kwargs,
):
@ -628,6 +861,10 @@ class AOP:
max_network_retries: Maximum number of network reconnection attempts
network_retry_delay: Delay between network retry attempts in seconds
network_timeout: Network connection timeout in seconds
middlewares: Optional list of middleware functions to apply to tool executions.
Each middleware receives (tool_name, params, context) and can modify
params and context in-place. Middlewares are applied in order before
each tool execution.
"""
self.server_name = server_name
self.description = description
@ -663,6 +900,7 @@ class AOP:
self.tool_configs: Dict[str, AgentToolConfig] = {}
self.task_queues: Dict[str, TaskQueue] = {}
self.transport = transport
self.middlewares: List[MiddlewareType] = middlewares or []
self.mcp_server = FastMCP(
name=server_name,
port=port,
@ -707,6 +945,7 @@ class AOP:
max_retries: int = 3,
verbose: Optional[bool] = None,
traceback_enabled: Optional[bool] = None,
payment_config: Optional[PaymentConfig] = None,
) -> str:
"""
Add an agent to the MCP server as a tool.
@ -721,6 +960,7 @@ class AOP:
max_retries: Number of retries on failure
verbose: Enable verbose logging for this tool (defaults to deployer's verbose setting)
traceback_enabled: Enable traceback logging for this tool (defaults to deployer's setting)
payment_config: Optional x402 payment configuration for monetizing this agent
Returns:
str: The tool name that was registered
@ -819,6 +1059,7 @@ class AOP:
max_retries=max_retries,
verbose=verbose,
traceback_enabled=traceback_enabled,
payment_config=payment_config,
)
# Create task queue if queue is enabled
@ -841,8 +1082,13 @@ class AOP:
# Re-register the discovery tool to include the new agent
self._register_agent_discovery_tool()
payment_info = (
f"payment_enabled={payment_config.enabled}"
if payment_config
else "payment_enabled=False"
)
logger.info(
f"Added agent '{agent.agent_name}' as tool '{tool_name}' (verbose={verbose}, traceback={traceback_enabled}, queue_enabled={self.queue_enabled})"
f"Added agent '{agent.agent_name}' as tool '{tool_name}' (verbose={verbose}, traceback={traceback_enabled}, queue_enabled={self.queue_enabled}, {payment_info})"
)
return tool_name
@ -1020,6 +1266,74 @@ class AOP:
"error": error_msg,
}
# Prepare params and context for middleware
params = {
"task": task,
"img": img,
"imgs": imgs,
"correct_answer": correct_answer,
"max_retries": max_retries,
}
context = {
"agent": agent,
"config": config,
"tool_name": tool_name,
}
# Build middleware chain: payment middleware first (if enabled), then custom middlewares
middleware_chain = []
# Add payment middleware if payment is enabled for this agent
if config.payment_config and config.payment_config.enabled:
payment_middleware = create_payment_middleware(
config.payment_config
)
middleware_chain.append(payment_middleware)
# Add custom middlewares
middleware_chain.extend(self.middlewares)
# Apply middleware in order
for middleware in middleware_chain:
try:
middleware(tool_name, params, context)
except Exception as e:
# Middleware exceptions should stop execution
# This allows middleware to reject requests (e.g., auth failures, payment required)
error_msg = f"Middleware error for tool '{tool_name}': {str(e)}"
logger.warning(error_msg)
# Check if this is a payment required error
if context.get("payment_required"):
payment_instructions = context.get(
"payment_instructions", {}
)
return {
"result": "",
"success": False,
"error": error_msg,
"payment_required": True,
"payment_instructions": payment_instructions,
}
if config.traceback_enabled:
logger.debug(
f"Middleware traceback: {traceback.format_exc()}"
)
# Return error response instead of continuing
return {
"result": "",
"success": False,
"error": error_msg,
}
# Extract params after middleware processing
task = params.get("task", task)
img = params.get("img", img)
imgs = params.get("imgs", imgs)
correct_answer = params.get("correct_answer", correct_answer)
max_retries = params.get("max_retries", max_retries)
# Use queue-based execution if enabled
if (
self.queue_enabled

Loading…
Cancel
Save