Update hybrid_hiearchical_peer_swarm.py

pull/1034/head
CI-DEV 2 months ago committed by GitHub
parent 35bac5da23
commit 505e1ffa0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,5 +1,5 @@
import os import os
from typing import List from typing import List, Optional
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import get_swarms_info from swarms.structs.multi_agent_exec import get_swarms_info
@ -10,6 +10,7 @@ from swarms.utils.history_output_formatter import (
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Union, Callable from typing import Union, Callable
from swarms.utils.history_output_formatter import HistoryOutputType from swarms.utils.history_output_formatter import HistoryOutputType
from swarms.security import SwarmShieldIntegration, ShieldConfig
tools = [ tools = [
{ {
@ -108,6 +109,9 @@ class HybridHierarchicalClusterSwarm:
max_loops: int = 1, max_loops: int = 1,
output_type: HistoryOutputType = "list", output_type: HistoryOutputType = "list",
router_agent_model_name: str = "gpt-4o-mini", router_agent_model_name: str = "gpt-4o-mini",
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard",
*args, *args,
**kwargs, **kwargs,
): ):
@ -117,6 +121,9 @@ class HybridHierarchicalClusterSwarm:
self.max_loops = max_loops self.max_loops = max_loops
self.output_type = output_type self.output_type = output_type
# Initialize SwarmShield integration
self._initialize_swarm_shield(shield_config, enable_security, security_level)
self.conversation = Conversation() self.conversation = Conversation()
self.router_agent = Agent( self.router_agent = Agent(
@ -129,6 +136,86 @@ class HybridHierarchicalClusterSwarm:
output_type="final", output_type="final",
) )
def _initialize_swarm_shield(
self,
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard"
) -> None:
"""Initialize SwarmShield integration for security features."""
self.enable_security = enable_security
self.security_level = security_level
if enable_security:
if shield_config is None:
shield_config = ShieldConfig.get_security_level(security_level)
self.swarm_shield = SwarmShieldIntegration(shield_config)
else:
self.swarm_shield = None
# Security methods
def validate_task_with_shield(self, task: str) -> str:
"""Validate and sanitize task input using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_and_protect_input(task)
return task
def validate_agent_config_with_shield(self, agent_config: dict) -> dict:
"""Validate agent configuration using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_and_protect_input(str(agent_config))
return agent_config
def process_agent_communication_with_shield(self, message: str, agent_name: str) -> str:
"""Process agent communication through SwarmShield security."""
if self.swarm_shield:
return self.swarm_shield.process_agent_communication(message, agent_name)
return message
def check_rate_limit_with_shield(self, agent_name: str) -> bool:
"""Check rate limits for an agent using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.check_rate_limit(agent_name)
return True
def add_secure_message(self, message: str, agent_name: str) -> None:
"""Add a message to secure conversation history."""
if self.swarm_shield:
self.swarm_shield.add_secure_message(message, agent_name)
def get_secure_messages(self) -> List[dict]:
"""Get secure conversation messages."""
if self.swarm_shield:
return self.swarm_shield.get_secure_messages()
return []
def get_security_stats(self) -> dict:
"""Get security statistics and metrics."""
if self.swarm_shield:
return self.swarm_shield.get_security_stats()
return {"security_enabled": False}
def update_shield_config(self, new_config: ShieldConfig) -> None:
"""Update SwarmShield configuration."""
if self.swarm_shield:
self.swarm_shield.update_config(new_config)
def enable_security(self) -> None:
"""Enable SwarmShield security features."""
if not self.swarm_shield:
self._initialize_swarm_shield(enable_security=True, security_level=self.security_level)
def disable_security(self) -> None:
"""Disable SwarmShield security features."""
self.swarm_shield = None
self.enable_security = False
def cleanup_security(self) -> None:
"""Clean up SwarmShield resources."""
if self.swarm_shield:
self.swarm_shield.cleanup()
def convert_str_to_dict(self, response: str): def convert_str_to_dict(self, response: str):
# Handle response whether it's a string or dictionary # Handle response whether it's a string or dictionary
if isinstance(response, str): if isinstance(response, str):

Loading…
Cancel
Save