Update matrix_swarm.py

pull/1034/head
CI-DEV 2 months ago committed by GitHub
parent cbbc01bcb6
commit 7e4eba1e01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,10 +1,11 @@
import json import json
from typing import Any, List from typing import Any, List, Optional
from loguru import logger from loguru import logger
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from swarms import Agent from swarms import Agent
from swarms.security import SwarmShieldIntegration, ShieldConfig
class AgentOutput(BaseModel): class AgentOutput(BaseModel):
@ -29,11 +30,20 @@ class MatrixSwarm:
A class to manage a matrix of agents and perform matrix operations similar to linear algebra. A class to manage a matrix of agents and perform matrix operations similar to linear algebra.
""" """
def __init__(self, agents: List[List[Agent]]): def __init__(
self,
agents: List[List[Agent]],
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard",
):
""" """
Initializes the MatrixSwarm with a 2D list of agents. Initializes the MatrixSwarm with a 2D list of agents.
Args: Args:
agents (List[List[Agent]]): 2D list of agents representing the matrix. agents (List[List[Agent]]): 2D list of agents representing the matrix.
shield_config (ShieldConfig, optional): Security configuration for SwarmShield integration. Defaults to None.
enable_security (bool, optional): Whether to enable SwarmShield security features. Defaults to True.
security_level (str, optional): Pre-defined security level. Options: "basic", "standard", "enhanced", "maximum". Defaults to "standard".
""" """
if not agents or not all( if not agents or not all(
isinstance(row, list) for row in agents isinstance(row, list) for row in agents
@ -50,6 +60,89 @@ class MatrixSwarm:
self.agents = agents self.agents = agents
self.outputs = [] # List to store outputs as AgentOutput self.outputs = [] # List to store outputs as AgentOutput
# Initialize SwarmShield integration
self._initialize_swarm_shield(shield_config, enable_security, security_level)
def _initialize_swarm_shield(
self,
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard"
) -> None:
"""Initialize SwarmShield integration for security features."""
self.enable_security = enable_security
self.security_level = security_level
if enable_security:
if shield_config is None:
shield_config = ShieldConfig.get_security_level(security_level)
self.swarm_shield = SwarmShieldIntegration(shield_config)
else:
self.swarm_shield = None
# Security methods
def validate_task_with_shield(self, task: str) -> str:
"""Validate and sanitize task input using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_and_protect_input(task)
return task
def validate_agent_config_with_shield(self, agent_config: dict) -> dict:
"""Validate agent configuration using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_and_protect_input(str(agent_config))
return agent_config
def process_agent_communication_with_shield(self, message: str, agent_name: str) -> str:
"""Process agent communication through SwarmShield security."""
if self.swarm_shield:
return self.swarm_shield.process_agent_communication(message, agent_name)
return message
def check_rate_limit_with_shield(self, agent_name: str) -> bool:
"""Check rate limits for an agent using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.check_rate_limit(agent_name)
return True
def add_secure_message(self, message: str, agent_name: str) -> None:
"""Add a message to secure conversation history."""
if self.swarm_shield:
self.swarm_shield.add_secure_message(message, agent_name)
def get_secure_messages(self) -> List[dict]:
"""Get secure conversation messages."""
if self.swarm_shield:
return self.swarm_shield.get_secure_messages()
return []
def get_security_stats(self) -> dict:
"""Get security statistics and metrics."""
if self.swarm_shield:
return self.swarm_shield.get_security_stats()
return {"security_enabled": False}
def update_shield_config(self, new_config: ShieldConfig) -> None:
"""Update SwarmShield configuration."""
if self.swarm_shield:
self.swarm_shield.update_config(new_config)
def enable_security(self) -> None:
"""Enable SwarmShield security features."""
if not self.swarm_shield:
self._initialize_swarm_shield(enable_security=True, security_level=self.security_level)
def disable_security(self) -> None:
"""Disable SwarmShield security features."""
self.swarm_shield = None
self.enable_security = False
def cleanup_security(self) -> None:
"""Clean up SwarmShield resources."""
if self.swarm_shield:
self.swarm_shield.cleanup()
def validate_dimensions(self, other: "MatrixSwarm") -> None: def validate_dimensions(self, other: "MatrixSwarm") -> None:
""" """
Validates that two matrices have compatible dimensions for operations. Validates that two matrices have compatible dimensions for operations.

Loading…
Cancel
Save