Update image_batch_processor.py

pull/1034/head
CI-DEV 2 months ago committed by GitHub
parent 505e1ffa0c
commit 7e0d97acd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -7,6 +7,7 @@ from typing import Any, Callable, Dict, List, Optional, Union
from loguru import logger from loguru import logger
from swarms.structs import Agent from swarms.structs import Agent
from swarms.security import SwarmShieldIntegration, ShieldConfig
class ImageProcessingError(Exception): class ImageProcessingError(Exception):
@ -42,6 +43,9 @@ class ImageAgentBatchProcessor:
agents: Union[Agent, List[Agent], Callable, List[Callable]], agents: Union[Agent, List[Agent], Callable, List[Callable]],
max_workers: int = None, max_workers: int = None,
supported_formats: Optional[List[str]] = None, supported_formats: Optional[List[str]] = None,
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard",
): ):
""" """
Initialize the ImageBatchProcessor. Initialize the ImageBatchProcessor.
@ -50,6 +54,9 @@ class ImageAgentBatchProcessor:
agents: Single agent or list of agents to process images agents: Single agent or list of agents to process images
max_workers: Maximum number of parallel workers (default: 4) max_workers: Maximum number of parallel workers (default: 4)
supported_formats: List of supported image formats (default: ['.jpg', '.jpeg', '.png']) supported_formats: List of supported image formats (default: ['.jpg', '.jpeg', '.png'])
shield_config (ShieldConfig, optional): Security configuration for SwarmShield integration. Defaults to None.
enable_security (bool, optional): Whether to enable SwarmShield security features. Defaults to True.
security_level (str, optional): Pre-defined security level. Options: "basic", "standard", "enhanced", "maximum". Defaults to "standard".
Raises: Raises:
InvalidAgentError: If agents parameter is invalid InvalidAgentError: If agents parameter is invalid
@ -58,6 +65,9 @@ class ImageAgentBatchProcessor:
self.max_workers = max_workers self.max_workers = max_workers
self.supported_formats = supported_formats self.supported_formats = supported_formats
# Initialize SwarmShield integration
self._initialize_swarm_shield(shield_config, enable_security, security_level)
self.agents = ( self.agents = (
[agents] if isinstance(agents, Agent) else agents [agents] if isinstance(agents, Agent) else agents
) )
@ -81,6 +91,86 @@ class ImageAgentBatchProcessor:
level="INFO", level="INFO",
) )
def _initialize_swarm_shield(
self,
shield_config: Optional[ShieldConfig] = None,
enable_security: bool = True,
security_level: str = "standard"
) -> None:
"""Initialize SwarmShield integration for security features."""
self.enable_security = enable_security
self.security_level = security_level
if enable_security:
if shield_config is None:
shield_config = ShieldConfig.get_security_level(security_level)
self.swarm_shield = SwarmShieldIntegration(shield_config)
else:
self.swarm_shield = None
# Security methods
def validate_task_with_shield(self, task: str) -> str:
"""Validate and sanitize task input using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_and_protect_input(task)
return task
def validate_agent_config_with_shield(self, agent_config: dict) -> dict:
"""Validate agent configuration using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.validate_and_protect_input(str(agent_config))
return agent_config
def process_agent_communication_with_shield(self, message: str, agent_name: str) -> str:
"""Process agent communication through SwarmShield security."""
if self.swarm_shield:
return self.swarm_shield.process_agent_communication(message, agent_name)
return message
def check_rate_limit_with_shield(self, agent_name: str) -> bool:
"""Check rate limits for an agent using SwarmShield."""
if self.swarm_shield:
return self.swarm_shield.check_rate_limit(agent_name)
return True
def add_secure_message(self, message: str, agent_name: str) -> None:
"""Add a message to secure conversation history."""
if self.swarm_shield:
self.swarm_shield.add_secure_message(message, agent_name)
def get_secure_messages(self) -> List[dict]:
"""Get secure conversation messages."""
if self.swarm_shield:
return self.swarm_shield.get_secure_messages()
return []
def get_security_stats(self) -> dict:
"""Get security statistics and metrics."""
if self.swarm_shield:
return self.swarm_shield.get_security_stats()
return {"security_enabled": False}
def update_shield_config(self, new_config: ShieldConfig) -> None:
"""Update SwarmShield configuration."""
if self.swarm_shield:
self.swarm_shield.update_config(new_config)
def enable_security(self) -> None:
"""Enable SwarmShield security features."""
if not self.swarm_shield:
self._initialize_swarm_shield(enable_security=True, security_level=self.security_level)
def disable_security(self) -> None:
"""Disable SwarmShield security features."""
self.swarm_shield = None
self.enable_security = False
def cleanup_security(self) -> None:
"""Clean up SwarmShield resources."""
if self.swarm_shield:
self.swarm_shield.cleanup()
def _validate_image_path( def _validate_image_path(
self, image_path: Union[str, Path] self, image_path: Union[str, Path]
) -> Path: ) -> Path:

Loading…
Cancel
Save