Update sequential_workflow.py

pull/1034/head
CI-DEV 2 months ago committed by GitHub
parent 09c1d7b807
commit 6a99e2ce2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,10 +1,12 @@
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, List, Optional, Union from typing import Callable, List, Optional, Union, Tuple
from datetime import datetime
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange from swarms.structs.rearrange import AgentRearrange
from swarms.utils.loguru_logger import initialize_logger from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType from swarms.utils.output_types import OutputType
from swarms.security import SwarmShieldIntegration, ShieldConfig
logger = initialize_logger(log_folder="sequential_workflow") logger = initialize_logger(log_folder="sequential_workflow")
@ -20,6 +22,9 @@ class SequentialWorkflow:
max_loops (int, optional): The maximum number of times to execute the workflow. Defaults to 1. max_loops (int, optional): The maximum number of times to execute the workflow. Defaults to 1.
output_type (OutputType, optional): The format of the output from the workflow. Defaults to "dict". output_type (OutputType, optional): The format of the output from the workflow. Defaults to "dict".
shared_memory_system (callable, optional): A callable for managing shared memory between agents. Defaults to None. shared_memory_system (callable, optional): A callable for managing shared memory between agents. Defaults to None.
shield_config (ShieldConfig, optional): Security configuration for SwarmShield integration. Defaults to None.
enable_security (bool, optional): Whether to enable SwarmShield security features. Defaults to True.
security_level (str, optional): Pre-defined security level. Options: "basic", "standard", "enhanced", "maximum". Defaults to "standard".
*args: Additional positional arguments. *args: Additional positional arguments.
**kwargs: Additional keyword arguments. **kwargs: Additional keyword arguments.
@ -36,6 +41,9 @@ class SequentialWorkflow:
max_loops: int = 1, max_loops: int = 1,
output_type: OutputType = "dict", output_type: OutputType = "dict",
shared_memory_system: callable = None, shared_memory_system: callable = None,
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard",
*args, *args,
**kwargs, **kwargs,
): ):
@ -47,6 +55,9 @@ class SequentialWorkflow:
self.output_type = output_type self.output_type = output_type
self.shared_memory_system = shared_memory_system self.shared_memory_system = shared_memory_system
# Initialize SwarmShield integration
self._initialize_swarm_shield(shield_config, enable_security, security_level)
self.reliability_check() self.reliability_check()
self.flow = self.sequential_flow() self.flow = self.sequential_flow()
@ -59,6 +70,105 @@ class SequentialWorkflow:
output_type=self.output_type, output_type=self.output_type,
) )
def _initialize_swarm_shield(
self,
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard"
) -> None:
"""Initialize SwarmShield integration for security features."""
self.enable_security = enable_security
self.security_level = security_level
if enable_security:
if shield_config is None:
shield_config = ShieldConfig.get_security_level(security_level)
self.swarm_shield = SwarmShieldIntegration(shield_config)
logger.info(f"SwarmShield initialized with {security_level} security level")
else:
self.swarm_shield = None
logger.info("SwarmShield security disabled")
# Security methods
def validate_task_with_shield(self, task: str, agent_name: str = "default") -> Tuple[bool, str, Optional[str]]:
"""Validate and sanitize task input using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_task(task, agent_name)
return True, task, None
def validate_agent_config_with_shield(self, agent_config: dict, agent_name: str = "default") -> Tuple[bool, dict, Optional[str]]:
"""Validate agent configuration using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_agent_config(agent_config, agent_name)
return True, agent_config, None
def process_agent_communication_with_shield(self, agent_name: str, message: str, direction: str = "outbound") -> Tuple[bool, str, Optional[str]]:
"""Process agent communication through SwarmShield security."""
if self.swarm_shield:
return self.swarm_shield.process_agent_communication(agent_name, message, direction)
return True, message, None
def check_rate_limit_with_shield(self, agent_name: str, request_size: int = 1) -> Tuple[bool, Optional[str]]:
"""Check rate limits for an agent using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.check_rate_limit(agent_name, request_size)
return True, None
def add_secure_message(self, conversation_id: str, agent_name: str, message: str) -> bool:
"""Add a message to secure conversation history."""
if self.swarm_shield:
return self.swarm_shield.add_secure_message(conversation_id, agent_name, message)
return False
def get_secure_messages(self, conversation_id: str) -> List[Tuple[str, str, datetime]]:
"""Get secure conversation messages."""
if self.swarm_shield:
return self.swarm_shield.get_secure_messages(conversation_id)
return []
def create_secure_conversation(self, name: str = "") -> Optional[str]:
"""Create a secure conversation."""
if self.swarm_shield:
return self.swarm_shield.create_secure_conversation(name)
return None
def filter_and_protect_output(self, output_data: Union[str, dict, List], agent_name: str, output_type: str = "text") -> Tuple[bool, Union[str, dict, List], Optional[str]]:
"""Filter and protect output data using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.filter_and_protect_output(output_data, agent_name, output_type)
return True, output_data, None
def get_security_stats(self) -> dict:
"""Get security statistics and metrics."""
if self.swarm_shield:
return self.swarm_shield.get_security_stats()
return {"security_enabled": False}
def update_shield_config(self, new_config: ShieldConfig) -> None:
"""Update SwarmShield configuration."""
if self.swarm_shield:
self.swarm_shield.update_config(new_config)
logger.info("SwarmShield configuration updated")
def enable_security(self) -> None:
"""Enable SwarmShield security features."""
if not self.swarm_shield:
self._initialize_swarm_shield(enable_security=True, security_level=self.security_level)
logger.info("SwarmShield security enabled")
def disable_security(self) -> None:
"""Disable SwarmShield security features."""
self.swarm_shield = None
self.enable_security = False
logger.info("SwarmShield security disabled")
def cleanup_security(self) -> None:
"""Clean up SwarmShield resources."""
if self.swarm_shield:
self.swarm_shield.cleanup()
logger.info("SwarmShield resources cleaned up")
def sequential_flow(self): def sequential_flow(self):
# Only create flow if agents exist # Only create flow if agents exist
if self.agents: if self.agents:

Loading…
Cancel
Save