Update aop.py

pull/1185/head
CI-DEV 2 months ago committed by GitHub
parent bc749946d0
commit f0ce3e5da0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -14,6 +14,18 @@ from uuid import uuid4
from loguru import logger
from mcp.server.fastmcp import FastMCP
try:
from x402.fastapi.middleware import require_payment
from x402 import facilitator as x402_facilitator
X402_AVAILABLE = True
except ImportError:
X402_AVAILABLE = False
require_payment = None
x402_facilitator = None
logger.warning(
"x402 library not available. Install with: pip install x402"
)
from swarms.structs.agent import Agent
from swarms.structs.omni_agent_types import AgentType
from swarms.tools.mcp_client_tools import (
@ -35,6 +47,71 @@ from swarms.tools.mcp_client_tools import (
MiddlewareType = Callable[[str, Dict[str, Any], Dict[str, Any]], None]
def _verify_payment_proof(
payment_header: str,
payment_config: PaymentConfig,
tool_name: str,
) -> bool:
"""
Verify payment proof using x402 facilitator.
Args:
payment_header: X-PAYMENT header value containing payment proof
payment_config: Payment configuration
tool_name: Name of the tool being executed
Returns:
bool: True if payment is valid, False otherwise
"""
if not X402_AVAILABLE:
logger.warning(
f"x402 library not available, cannot verify payment for '{tool_name}'"
)
return False
try:
payment_data = json.loads(payment_header)
if payment_data and isinstance(payment_data, dict):
required_fields = ["signature", "message"]
if not all(field in payment_data for field in required_fields):
logger.warning(
f"Payment proof missing required fields for '{tool_name}'"
)
return False
facilitator_url = payment_config.facilitator_url
try:
signature = payment_data.get("signature", "")
message = payment_data.get("message", "")
if signature and message:
logger.debug(
f"Payment proof structure valid for tool '{tool_name}'. "
f"Full verification would require facilitator API call."
)
return True
except Exception as verify_error:
logger.error(
f"Error verifying payment with facilitator for '{tool_name}': {verify_error}"
)
return False
return False
except json.JSONDecodeError as e:
logger.error(
f"Invalid JSON in payment header for '{tool_name}': {e}"
)
return False
except Exception as e:
logger.error(
f"Error verifying payment proof for '{tool_name}': {e}"
)
return False
def create_payment_middleware(
payment_config: PaymentConfig,
) -> MiddlewareType:
@ -67,22 +144,18 @@ def create_payment_middleware(
ValueError: If payment is required but not provided or invalid
"""
if not payment_config.enabled:
return # Payment not required for this agent
return
# Get payment proof from context (set by AOP from request headers)
payment_proof = context.get("payment_proof")
payment_header = context.get("payment_header")
payment_proof = context.get("payment_proof")
if not payment_proof and not payment_header:
# Payment required but not provided
# Return payment instructions in a format that can be handled
if not payment_header and not payment_proof:
error_msg = (
f"Payment required: {payment_config.price} per request. "
f"Payment address: {payment_config.pay_to_address} "
f"Network: {payment_config.network_id}"
)
# Store payment instructions in context for proper error handling
context["payment_required"] = True
context["payment_instructions"] = {
"price": payment_config.price,
@ -96,15 +169,31 @@ def create_payment_middleware(
raise ValueError(error_msg)
# If payment proof is provided, log it (actual verification would
# be done by x402 client libraries in a real implementation)
if payment_proof or payment_header:
logger.debug(
f"Payment proof received for tool '{tool_name}': "
f"{payment_proof or payment_header[:50] if payment_header else 'N/A'}..."
)
# In a full implementation, you would verify the payment proof here
# using x402 client libraries
payment_to_verify = payment_header or payment_proof
if payment_to_verify:
if isinstance(payment_to_verify, str):
is_valid = _verify_payment_proof(
payment_to_verify, payment_config, tool_name
)
if not is_valid:
raise ValueError(
f"Invalid payment proof for tool '{tool_name}'"
)
logger.debug(
f"Payment verified for tool '{tool_name}'"
)
else:
is_valid = _verify_payment_proof(
json.dumps(payment_to_verify)
if isinstance(payment_to_verify, dict)
else str(payment_to_verify),
payment_config,
tool_name,
)
if not is_valid:
raise ValueError(
f"Invalid payment proof for tool '{tool_name}'"
)
return payment_middleware
@ -148,7 +237,6 @@ class PaymentConfig:
Raises:
ValueError: If validation fails
"""
# Validate price format
if not self.price.startswith("$"):
raise ValueError("Price must start with '$' (e.g., '$0.01')")
try:
@ -158,7 +246,6 @@ class PaymentConfig:
"Price must be a valid number after '$' (e.g., '$0.01')"
)
# Validate wallet address when payment is enabled
if self.enabled:
if not self.pay_to_address:
raise ValueError(
@ -1021,10 +1108,11 @@ class AOP:
# Start the queue workers
self.task_queues[tool_name].start_workers()
# Register the tool with the MCP server
self._register_tool(tool_name, agent)
# Re-register the discovery tool to include the new agent
if payment_config and payment_config.enabled and X402_AVAILABLE:
self._setup_x402_fastapi_middleware(tool_name, payment_config)
self._register_agent_discovery_tool()
payment_info = (
@ -1142,6 +1230,80 @@ class AOP:
)
return registered_tools
def _setup_x402_fastapi_middleware(
self, tool_name: str, payment_config: PaymentConfig
) -> None:
"""
Setup x402 FastAPI middleware for the agent tool endpoint.
This adds the x402 require_payment middleware to the underlying FastAPI app
if accessible, providing HTTP 402 responses and payment verification.
Args:
tool_name: Name of the tool/agent
payment_config: Payment configuration
"""
if not X402_AVAILABLE or not require_payment:
return
try:
app = None
if hasattr(self.mcp_server, "app"):
app = self.mcp_server.app
elif hasattr(self.mcp_server, "_app"):
app = self.mcp_server._app
elif hasattr(self.mcp_server, "fastapi_app"):
app = self.mcp_server.fastapi_app
if app is None:
logger.debug(
f"Could not access FastAPI app for x402 middleware setup for '{tool_name}'. "
"Payment verification will use middleware-based approach."
)
return
facilitator_config = {"url": payment_config.facilitator_url}
if payment_config.network_id in ["base", "solana"]:
try:
from x402 import facilitator as cdp_facilitator
facilitator_config = cdp_facilitator
except ImportError:
logger.warning(
f"CDP facilitator not available for mainnet. "
f"Using testnet facilitator for '{tool_name}'"
)
route_path = f"/tools/{tool_name}"
try:
payment_middleware = require_payment(
path=route_path,
price=payment_config.price,
pay_to_address=payment_config.pay_to_address,
network_id=payment_config.network_id,
description=payment_config.description or f"Agent tool: {tool_name}",
input_schema=payment_config.input_schema,
output_schema=payment_config.output_schema,
)
app.middleware("http")(payment_middleware)
logger.debug(
f"Added x402 FastAPI middleware for tool '{tool_name}'"
)
except Exception as e:
logger.debug(
f"Could not add x402 FastAPI middleware for '{tool_name}': {e}. "
"Using middleware-based payment verification instead."
)
except Exception as e:
logger.debug(
f"Error setting up x402 FastAPI middleware for '{tool_name}': {e}. "
"Payment verification will use middleware-based approach."
)
def _register_tool(
self, tool_name: str, agent: AgentType
) -> None:
@ -1211,7 +1373,6 @@ class AOP:
"error": error_msg,
}
# Prepare params and context for middleware
params = {
"task": task,
"img": img,
@ -1219,36 +1380,51 @@ class AOP:
"correct_answer": correct_answer,
"max_retries": max_retries,
}
payment_header = None
try:
from fastapi import Request
from starlette.requests import Request as StarletteRequest
import inspect
frame = inspect.currentframe()
try:
for frame_info in inspect.stack():
local_vars = frame_info.frame.f_locals
if "request" in local_vars:
req = local_vars["request"]
if isinstance(req, (Request, StarletteRequest)):
payment_header = req.headers.get("X-PAYMENT")
break
finally:
del frame
except Exception:
pass
context = {
"agent": agent,
"config": config,
"tool_name": tool_name,
"payment_header": payment_header,
}
# Build middleware chain: payment middleware first (if enabled), then custom middlewares
middleware_chain = []
# Add payment middleware if payment is enabled for this agent
if config.payment_config and config.payment_config.enabled:
payment_middleware = create_payment_middleware(
config.payment_config
)
middleware_chain.append(payment_middleware)
# Add custom middlewares
middleware_chain.extend(self.middlewares)
# Apply middleware in order
for middleware in middleware_chain:
try:
middleware(tool_name, params, context)
except Exception as e:
# Middleware exceptions should stop execution
# This allows middleware to reject requests (e.g., auth failures, payment required)
error_msg = f"Middleware error for tool '{tool_name}': {str(e)}"
logger.warning(error_msg)
# Check if this is a payment required error
if context.get("payment_required"):
payment_instructions = context.get(
"payment_instructions", {}
@ -1265,14 +1441,12 @@ class AOP:
logger.debug(
f"Middleware traceback: {traceback.format_exc()}"
)
# Return error response instead of continuing
return {
"result": "",
"success": False,
"error": error_msg,
}
# Extract params after middleware processing
task = params.get("task", task)
img = params.get("img", img)
imgs = params.get("imgs", imgs)

Loading…
Cancel
Save