Update aop.py

pull/1034/head
CI-DEV 2 months ago committed by GitHub
parent 900182c2e0
commit 8ea4310f38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -2,13 +2,14 @@ import asyncio
import inspect
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
from typing import Any, Callable, Literal, Optional
from typing import Any, Callable, Literal, Optional, List
from mcp.server.fastmcp import FastMCP
from mcp.client import Client
from loguru import logger
from swarms.utils.any_to_str import any_to_str
from swarms.security import SwarmShieldIntegration, ShieldConfig
class AOP:
@ -30,6 +31,9 @@ class AOP:
description: Optional[str] = None,
url: Optional[str] = "http://localhost:8000/sse",
urls: Optional[list[str]] = None,
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard",
*args,
**kwargs,
):
@ -40,6 +44,10 @@ class AOP:
name (str): The name of the AOP instance
description (str): A description of the AOP instance
url (str): The URL of the MCP instance
urls (list[str]): List of URLs for multiple MCP instances
shield_config (ShieldConfig, optional): Security configuration for SwarmShield integration. Defaults to None.
enable_security (bool, optional): Whether to enable SwarmShield security features. Defaults to True.
security_level (str, optional): Pre-defined security level. Options: "basic", "standard", "enhanced", "maximum". Defaults to "standard".
*args: Additional positional arguments passed to FastMCP
**kwargs: Additional keyword arguments passed to FastMCP
"""
@ -53,10 +61,99 @@ class AOP:
self.mcp = FastMCP(name=name, *args, **kwargs)
# Initialize SwarmShield integration
self._initialize_swarm_shield(shield_config, enable_security, security_level)
logger.success(
f"[AOP] Successfully initialized AOP instance: {name}"
)
def _initialize_swarm_shield(
self,
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard"
) -> None:
"""Initialize SwarmShield integration for security features."""
self.enable_security = enable_security
self.security_level = security_level
if enable_security:
if shield_config is None:
shield_config = ShieldConfig.get_security_level(security_level)
self.swarm_shield = SwarmShieldIntegration(shield_config)
logger.info(f"SwarmShield initialized with {security_level} security level")
else:
self.swarm_shield = None
logger.info("SwarmShield security disabled")
# Security methods
def validate_task_with_shield(self, task: str) -> str:
"""Validate and sanitize task input using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_and_protect_input(task)
return task
def validate_agent_config_with_shield(self, agent_config: dict) -> dict:
"""Validate agent configuration using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_and_protect_input(str(agent_config))
return agent_config
def process_agent_communication_with_shield(self, message: str, agent_name: str) -> str:
"""Process agent communication through SwarmShield security."""
if self.swarm_shield:
return self.swarm_shield.process_agent_communication(message, agent_name)
return message
def check_rate_limit_with_shield(self, agent_name: str) -> bool:
"""Check rate limits for an agent using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.check_rate_limit(agent_name)
return True
def add_secure_message(self, message: str, agent_name: str) -> None:
"""Add a message to secure conversation history."""
if self.swarm_shield:
self.swarm_shield.add_secure_message(message, agent_name)
def get_secure_messages(self) -> List[dict]:
"""Get secure conversation messages."""
if self.swarm_shield:
return self.swarm_shield.get_secure_messages()
return []
def get_security_stats(self) -> dict:
"""Get security statistics and metrics."""
if self.swarm_shield:
return self.swarm_shield.get_security_stats()
return {"security_enabled": False}
def update_shield_config(self, new_config: ShieldConfig) -> None:
"""Update SwarmShield configuration."""
if self.swarm_shield:
self.swarm_shield.update_config(new_config)
logger.info("SwarmShield configuration updated")
def enable_security(self) -> None:
"""Enable SwarmShield security features."""
if not self.swarm_shield:
self._initialize_swarm_shield(enable_security=True, security_level=self.security_level)
logger.info("SwarmShield security enabled")
def disable_security(self) -> None:
"""Disable SwarmShield security features."""
self.swarm_shield = None
self.enable_security = False
logger.info("SwarmShield security disabled")
def cleanup_security(self) -> None:
"""Clean up SwarmShield resources."""
if self.swarm_shield:
self.swarm_shield.cleanup()
logger.info("SwarmShield resources cleaned up")
def tool(
self,
name: Optional[str] = None,

Loading…
Cancel
Save