From e43e4d8f73c1d8d5d7c849f6357930712528e0e5 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Fri, 29 Aug 2025 10:44:27 -0700 Subject: [PATCH] [STRUCTS CLEANUP][Remove AgentRAGHandler] [ElectionSwarm][DynamicConversationSwarm][AOP][and more] --- .gitignore | 1 + .../apple_board_election_example.py | 2 +- .../example_meaning_of_life_agents.py | 2 +- swarms/structs/__init__.py | 2 - swarms/structs/agent_rag_handler.py | 685 ------------------ swarms/structs/aop.py | 566 --------------- swarms/structs/de_hallucination_swarm.py | 277 ------- .../structs/dynamic_conversational_swarm.py | 237 ------ swarms/structs/election_swarm.py | 270 ------- 9 files changed, 3 insertions(+), 2039 deletions(-) delete mode 100644 swarms/structs/agent_rag_handler.py delete mode 100644 swarms/structs/aop.py delete mode 100644 swarms/structs/de_hallucination_swarm.py delete mode 100644 swarms/structs/dynamic_conversational_swarm.py delete mode 100644 swarms/structs/election_swarm.py diff --git a/.gitignore b/.gitignore index e1c108a0..9df57269 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ experimental/ encryption errors chroma +new_experimental/ agent_workspace .pt Accounting Assistant_state.json diff --git a/examples/multi_agent/election_swarm_examples/apple_board_election_example.py b/examples/multi_agent/election_swarm_examples/apple_board_election_example.py index badace7c..519eeaa8 100644 --- a/examples/multi_agent/election_swarm_examples/apple_board_election_example.py +++ b/examples/multi_agent/election_swarm_examples/apple_board_election_example.py @@ -1,5 +1,5 @@ from swarms import Agent -from swarms.structs.election_swarm import ( +from new_experimental.election_swarm import ( ElectionSwarm, ) diff --git a/examples/single_agent/tools/structured_outputs/example_meaning_of_life_agents.py b/examples/single_agent/tools/structured_outputs/example_meaning_of_life_agents.py index 85cd5d4f..4e8ddb29 100644 --- a/examples/single_agent/tools/structured_outputs/example_meaning_of_life_agents.py +++ b/examples/single_agent/tools/structured_outputs/example_meaning_of_life_agents.py @@ -1,5 +1,5 @@ from swarms.structs.agent import Agent -from swarms.structs.dynamic_conversational_swarm import ( +from new_experimental.dynamic_conversational_swarm import ( DynamicConversationalSwarm, ) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 35a34616..2c9ec52d 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -10,7 +10,6 @@ from swarms.structs.concurrent_workflow import ConcurrentWorkflow from swarms.structs.conversation import Conversation from swarms.structs.council_as_judge import CouncilAsAJudge from swarms.structs.cron_job import CronJob -from swarms.structs.de_hallucination_swarm import DeHallucinationSwarm from swarms.structs.graph_workflow import ( Edge, GraphWorkflow, @@ -163,7 +162,6 @@ __all__ = [ "ModelRouter", "AgentsBuilder", "MALT", - "DeHallucinationSwarm", "HybridHierarchicalClusterSwarm", "get_agents_info", "get_swarms_info", diff --git a/swarms/structs/agent_rag_handler.py b/swarms/structs/agent_rag_handler.py deleted file mode 100644 index f2581149..00000000 --- a/swarms/structs/agent_rag_handler.py +++ /dev/null @@ -1,685 +0,0 @@ -import time -from typing import Any, Dict, List, Optional - -from loguru import logger -from swarms.utils.litellm_tokenizer import count_tokens -from pydantic import BaseModel, Field, field_validator - - -class RAGConfig(BaseModel): - """Configuration class for RAG operations""" - - similarity_threshold: float = Field( - default=0.7, - ge=0.0, - le=1.0, - description="Similarity threshold for memory retrieval", - ) - max_results: int = Field( - default=5, - gt=0, - description="Maximum number of results to return from memory", - ) - context_window_tokens: int = Field( - default=2000, - gt=0, - description="Maximum number of tokens in the context window", - ) - auto_save_to_memory: bool = Field( - default=True, - description="Whether to automatically save responses to memory", - ) - save_every_n_loops: int = Field( - default=5, gt=0, description="Save to memory every N loops" - ) - min_content_length: int = Field( - default=50, - gt=0, - description="Minimum content length to save to memory", - ) - query_every_loop: bool = Field( - default=False, - description="Whether to query memory every loop", - ) - enable_conversation_summaries: bool = Field( - default=True, - description="Whether to enable conversation summaries", - ) - relevance_keywords: Optional[List[str]] = Field( - default=None, description="Keywords to check for relevance" - ) - - @field_validator("relevance_keywords", mode="before") - def set_default_keywords(cls, v): - if v is None: - return [ - "important", - "key", - "critical", - "summary", - "conclusion", - ] - return v - - class Config: - arbitrary_types_allowed = True - validate_assignment = True - json_schema_extra = { - "example": { - "similarity_threshold": 0.7, - "max_results": 5, - "context_window_tokens": 2000, - "auto_save_to_memory": True, - "save_every_n_loops": 5, - "min_content_length": 50, - "query_every_loop": False, - "enable_conversation_summaries": True, - "relevance_keywords": [ - "important", - "key", - "critical", - "summary", - "conclusion", - ], - } - } - - -class AgentRAGHandler: - """ - Handles all RAG (Retrieval-Augmented Generation) operations for agents. - Provides memory querying, storage, and context management capabilities. - """ - - def __init__( - self, - long_term_memory: Optional[Any] = None, - config: Optional[RAGConfig] = None, - agent_name: str = "Unknown", - max_context_length: int = 158_000, - verbose: bool = False, - ): - """ - Initialize the RAG handler. - - Args: - long_term_memory: The long-term memory store (must implement add() and query() methods) - config: RAG configuration settings - agent_name: Name of the agent using this handler - verbose: Enable verbose logging - """ - self.long_term_memory = long_term_memory - self.config = config or RAGConfig() - self.agent_name = agent_name - self.verbose = verbose - self.max_context_length = max_context_length - - self._loop_counter = 0 - self._conversation_history = [] - self._important_memories = [] - - # Validate memory interface - if ( - self.long_term_memory - and not self._validate_memory_interface() - ): - logger.warning( - "Long-term memory doesn't implement required interface" - ) - - def _validate_memory_interface(self) -> bool: - """Validate that the memory object has required methods""" - required_methods = ["add", "query"] - for method in required_methods: - if not hasattr(self.long_term_memory, method): - logger.error( - f"Memory object missing required method: {method}" - ) - return False - return True - - def is_enabled(self) -> bool: - """Check if RAG is enabled (has valid memory store)""" - return self.long_term_memory is not None - - def query_memory( - self, - query: str, - context_type: str = "general", - loop_count: Optional[int] = None, - ) -> str: - """ - Query the long-term memory and return formatted context. - - Args: - query: The query string to search for - context_type: Type of context being queried (for logging) - loop_count: Current loop number (for logging) - - Returns: - Formatted string of relevant memories, empty string if no results - """ - if not self.is_enabled(): - return "" - - try: - if self.verbose: - logger.info( - f"🔍 [{self.agent_name}] Querying RAG for {context_type}: {query[:100]}..." - ) - - # Query the memory store - results = self.long_term_memory.query( - query=query, - top_k=self.config.max_results, - similarity_threshold=self.config.similarity_threshold, - ) - - if not results: - if self.verbose: - logger.info( - f"No relevant memories found for query: {context_type}" - ) - return "" - - # Format results for context - formatted_context = self._format_memory_results( - results, context_type, loop_count - ) - - # Ensure context fits within token limits - if ( - count_tokens(formatted_context) - > self.config.context_window_tokens - ): - formatted_context = self._truncate_context( - formatted_context - ) - - if self.verbose: - logger.info( - f"✅ Retrieved {len(results)} relevant memories for {context_type}" - ) - - return formatted_context - - except Exception as e: - logger.error(f"Error querying long-term memory: {e}") - return "" - - def _format_memory_results( - self, - results: List[Any], - context_type: str, - loop_count: Optional[int] = None, - ) -> str: - """Format memory results into a structured context string""" - if not results: - return "" - - loop_info = f" (Loop {loop_count})" if loop_count else "" - header = ( - f"📚 Relevant Knowledge - {context_type.title()}{loop_info}:\n" - + "=" * 50 - + "\n" - ) - - formatted_sections = [header] - - for i, result in enumerate(results, 1): - ( - content, - score, - source, - metadata, - ) = self._extract_result_fields(result) - - section = f""" -[Memory {i}] Relevance: {score} | Source: {source} -{'-' * 40} -{content} -{'-' * 40} -""" - formatted_sections.append(section) - - formatted_sections.append(f"\n{'='*50}\n") - return "\n".join(formatted_sections) - - def _extract_result_fields(self, result: Any) -> tuple: - """Extract content, score, source, and metadata from a result object""" - if isinstance(result, dict): - content = result.get( - "content", result.get("text", str(result)) - ) - score = result.get( - "score", result.get("similarity", "N/A") - ) - metadata = result.get("metadata", {}) - source = metadata.get( - "source", result.get("source", "Unknown") - ) - else: - content = str(result) - score = "N/A" - source = "Unknown" - metadata = {} - - return content, score, source, metadata - - def _truncate_context(self, content: str) -> str: - """Truncate content to fit within token limits using smart truncation""" - max_chars = ( - self.config.context_window_tokens * 3 - ) # Rough token-to-char ratio - - if len(content) <= max_chars: - return content - - # Try to cut at section boundaries first - sections = content.split("=" * 50) - if len(sections) > 2: # Header + sections + footer - truncated_sections = [sections[0]] # Keep header - current_length = len(sections[0]) - - for section in sections[1:-1]: # Skip footer - if current_length + len(section) > max_chars * 0.9: - break - truncated_sections.append(section) - current_length += len(section) - - truncated_sections.append( - f"\n[... {len(sections) - len(truncated_sections)} more memories truncated for length ...]\n" - ) - truncated_sections.append(sections[-1]) # Keep footer - return "=" * (50).join(truncated_sections) - - # Fallback: simple truncation at sentence boundary - truncated = content[:max_chars] - last_period = truncated.rfind(".") - if last_period > max_chars * 0.8: - truncated = truncated[: last_period + 1] - - return ( - truncated + "\n\n[... content truncated for length ...]" - ) - - def should_save_response( - self, - response: str, - loop_count: int, - has_tool_usage: bool = False, - ) -> bool: - """ - Determine if a response should be saved to long-term memory. - - Args: - response: The response text to evaluate - loop_count: Current loop number - has_tool_usage: Whether tools were used in this response - - Returns: - Boolean indicating whether to save the response - """ - if ( - not self.is_enabled() - or not self.config.auto_save_to_memory - ): - return False - - # Content length check - if len(response.strip()) < self.config.min_content_length: - return False - - save_conditions = [ - # Substantial content - len(response) > 200, - # Contains important keywords - any( - keyword in response.lower() - for keyword in self.config.relevance_keywords - ), - # Periodic saves - loop_count % self.config.save_every_n_loops == 0, - # Tool usage indicates potentially important information - has_tool_usage, - # Complex responses (multiple sentences) - response.count(".") >= 3, - # Contains structured data or lists - any( - marker in response - for marker in ["- ", "1. ", "2. ", "* ", "```"] - ), - ] - - return any(save_conditions) - - def save_to_memory( - self, - content: str, - metadata: Optional[Dict] = None, - content_type: str = "response", - ) -> bool: - """ - Save content to long-term memory with metadata. - - Args: - content: The content to save - metadata: Additional metadata to store - content_type: Type of content being saved - - Returns: - Boolean indicating success - """ - if not self.is_enabled(): - return False - - if ( - not content - or len(content.strip()) < self.config.min_content_length - ): - return False - - try: - # Create default metadata - default_metadata = { - "timestamp": time.time(), - "agent_name": self.agent_name, - "content_type": content_type, - "loop_count": self._loop_counter, - "saved_at": time.strftime("%Y-%m-%d %H:%M:%S"), - } - - # Merge with provided metadata - if metadata: - default_metadata.update(metadata) - - if self.verbose: - logger.info( - f"💾 [{self.agent_name}] Saving to long-term memory: {content[:100]}..." - ) - - success = self.long_term_memory.add( - content, metadata=default_metadata - ) - - if success and self.verbose: - logger.info( - f"✅ Successfully saved {content_type} to long-term memory" - ) - - # Track important memories - if content_type in [ - "final_response", - "conversation_summary", - ]: - self._important_memories.append( - { - "content": content[:200], - "timestamp": time.time(), - "type": content_type, - } - ) - - return success - - except Exception as e: - logger.error(f"Error saving to long-term memory: {e}") - return False - - def create_conversation_summary( - self, - task: str, - final_response: str, - total_loops: int, - tools_used: List[str] = None, - ) -> str: - """Create a comprehensive summary of the conversation""" - tools_info = ( - f"Tools Used: {', '.join(tools_used)}" - if tools_used - else "Tools Used: None" - ) - - summary = f""" -CONVERSATION SUMMARY -==================== -Agent: {self.agent_name} -Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')} - -ORIGINAL TASK: -{task} - -FINAL RESPONSE: -{final_response} - -EXECUTION DETAILS: -- Total Reasoning Loops: {total_loops} -- {tools_info} -- Memory Queries Made: {len(self._conversation_history)} - -KEY INSIGHTS: -{self._extract_key_insights(final_response)} -==================== -""" - return summary - - def _extract_key_insights(self, response: str) -> str: - """Extract key insights from the response for summary""" - # Simple keyword-based extraction - insights = [] - sentences = response.split(".") - - for sentence in sentences: - if any( - keyword in sentence.lower() - for keyword in self.config.relevance_keywords[:5] - ): - insights.append(sentence.strip()) - - if insights: - return "\n- " + "\n- ".join( - insights[:3] - ) # Top 3 insights - return "No specific insights extracted" - - def handle_loop_memory_operations( - self, - task: str, - response: str, - loop_count: int, - conversation_context: str = "", - has_tool_usage: bool = False, - ) -> str: - """ - Handle all memory operations for a single loop iteration. - - Args: - task: Original task - response: Current response - loop_count: Current loop number - conversation_context: Current conversation context - has_tool_usage: Whether tools were used - - Returns: - Retrieved context string (empty if no relevant memories) - """ - self._loop_counter = loop_count - retrieved_context = "" - - # 1. Query memory if enabled for this loop - if self.config.query_every_loop and loop_count > 1: - query_context = f"Task: {task}\nCurrent Context: {conversation_context[-500:]}" - retrieved_context = self.query_memory( - query_context, - context_type=f"loop_{loop_count}", - loop_count=loop_count, - ) - - # 2. Save response if criteria met - if self.should_save_response( - response, loop_count, has_tool_usage - ): - self.save_to_memory( - content=response, - metadata={ - "task_preview": task[:200], - "loop_count": loop_count, - "has_tool_usage": has_tool_usage, - }, - content_type="loop_response", - ) - - return retrieved_context - - def handle_initial_memory_query(self, task: str) -> str: - """Handle the initial memory query before reasoning loops begin""" - if not self.is_enabled(): - return "" - - return self.query_memory(task, context_type="initial_task") - - def handle_final_memory_consolidation( - self, - task: str, - final_response: str, - total_loops: int, - tools_used: List[str] = None, - ) -> bool: - """Handle final memory consolidation after all loops complete""" - if ( - not self.is_enabled() - or not self.config.enable_conversation_summaries - ): - return False - - # Create and save conversation summary - summary = self.create_conversation_summary( - task, final_response, total_loops, tools_used - ) - - return self.save_to_memory( - content=summary, - metadata={ - "task": task[:200], - "total_loops": total_loops, - "tools_used": tools_used or [], - }, - content_type="conversation_summary", - ) - - def search_memories( - self, - query: str, - top_k: int = None, - similarity_threshold: float = None, - ) -> List[Dict]: - """ - Search long-term memory and return raw results. - - Args: - query: Search query - top_k: Number of results to return (uses config default if None) - similarity_threshold: Similarity threshold (uses config default if None) - - Returns: - List of memory results - """ - if not self.is_enabled(): - return [] - - try: - results = self.long_term_memory.query( - query=query, - top_k=top_k or self.config.max_results, - similarity_threshold=similarity_threshold - or self.config.similarity_threshold, - ) - return results if results else [] - except Exception as e: - logger.error(f"Error searching memories: {e}") - return [] - - def get_memory_stats(self) -> Dict[str, Any]: - """Get statistics about memory usage and operations""" - return { - "is_enabled": self.is_enabled(), - "config": self.config.__dict__, - "loops_processed": self._loop_counter, - "important_memories_count": len(self._important_memories), - "last_important_memories": ( - self._important_memories[-3:] - if self._important_memories - else [] - ), - "memory_store_type": ( - type(self.long_term_memory).__name__ - if self.long_term_memory - else None - ), - } - - def clear_session_data(self): - """Clear session-specific data (not the long-term memory store)""" - self._loop_counter = 0 - self._conversation_history.clear() - self._important_memories.clear() - - if self.verbose: - logger.info(f"[{self.agent_name}] Session data cleared") - - def update_config(self, **kwargs): - """Update RAG configuration parameters""" - for key, value in kwargs.items(): - if hasattr(self.config, key): - setattr(self.config, key, value) - if self.verbose: - logger.info( - f"Updated RAG config: {key} = {value}" - ) - else: - logger.warning(f"Unknown config parameter: {key}") - - -# # Example memory interface that your RAG implementation should follow -# class ExampleMemoryInterface: -# """Example interface for long-term memory implementations""" - -# def add(self, content: str, metadata: Dict = None) -> bool: -# """ -# Add content to the memory store. - -# Args: -# content: Text content to store -# metadata: Additional metadata dictionary - -# Returns: -# Boolean indicating success -# """ -# # Your vector database implementation here -# return True - -# def query( -# self, -# query: str, -# top_k: int = 5, -# similarity_threshold: float = 0.7 -# ) -> List[Dict]: -# """ -# Query the memory store for relevant content. - -# Args: -# query: Search query string -# top_k: Maximum number of results to return -# similarity_threshold: Minimum similarity score - -# Returns: -# List of dictionaries with keys: 'content', 'score', 'metadata' -# """ -# # Your vector database query implementation here -# return [ -# { -# 'content': 'Example memory content', -# 'score': 0.85, -# 'metadata': {'source': 'example', 'timestamp': time.time()} -# } -# ] diff --git a/swarms/structs/aop.py b/swarms/structs/aop.py deleted file mode 100644 index 42a5fd44..00000000 --- a/swarms/structs/aop.py +++ /dev/null @@ -1,566 +0,0 @@ -import asyncio -import inspect -from concurrent.futures import ThreadPoolExecutor, as_completed -from functools import wraps -from typing import Any, Callable, Literal, Optional - -from mcp.server.fastmcp import FastMCP -from mcp.client import Client - -from loguru import logger -from swarms.utils.any_to_str import any_to_str - - -class AOP: - """ - Agent-Orchestration Protocol (AOP) class for managing tools, agents, and swarms. - - This class provides decorators and methods for registering and running various components - in a Swarms environment. It handles logging, metadata management, and execution control. - - Attributes: - name (str): The name of the AOP instance - description (str): A description of the AOP instance - mcp (FastMCP): The underlying FastMCP instance for managing components - """ - - def __init__( - self, - name: Optional[str] = None, - description: Optional[str] = None, - url: Optional[str] = "http://localhost:8000/sse", - urls: Optional[list[str]] = None, - *args, - **kwargs, - ): - """ - Initialize the AOP instance. - - Args: - name (str): The name of the AOP instance - description (str): A description of the AOP instance - url (str): The URL of the MCP instance - *args: Additional positional arguments passed to FastMCP - **kwargs: Additional keyword arguments passed to FastMCP - """ - logger.info(f"[AOP] Initializing AOP instance: {name}") - self.name = name - self.description = description - self.url = url - self.urls = urls - self.tools = {} - self.swarms = {} - - self.mcp = FastMCP(name=name, *args, **kwargs) - - logger.success( - f"[AOP] Successfully initialized AOP instance: {name}" - ) - - def tool( - self, - name: Optional[str] = None, - description: Optional[str] = None, - ): - """ - Decorator to register an MCP tool with optional metadata. - - This decorator registers a function as a tool in the MCP system. It handles - logging, metadata management, and execution tracking. - - Args: - name (Optional[str]): Custom name for the tool. If None, uses function name - description (Optional[str]): Custom description. If None, uses function docstring - - Returns: - Callable: A decorator function that registers the tool - """ - logger.debug( - f"[AOP] Creating tool decorator with name={name}, description={description}" - ) - - def decorator(func: Callable): - tool_name = name or func.__name__ - tool_description = description or ( - inspect.getdoc(func) or "" - ) - - logger.debug( - f"[AOP] Registering tool: {tool_name} - {tool_description}" - ) - - self.tools[tool_name] = { - "name": tool_name, - "description": tool_description, - "function": func, - } - - @self.mcp.tool( - name=f"tool_{tool_name}", description=tool_description - ) - @wraps(func) - async def wrapper(*args, **kwargs) -> Any: - logger.info( - f"[TOOL:{tool_name}] ➤ called with args={args}, kwargs={kwargs}" - ) - try: - result = await func(*args, **kwargs) - logger.success(f"[TOOL:{tool_name}] ✅ completed") - return result - except Exception as e: - logger.error( - f"[TOOL:{tool_name}] ❌ failed with error: {str(e)}" - ) - raise - - return wrapper - - return decorator - - def agent( - self, - name: Optional[str] = None, - description: Optional[str] = None, - ): - """ - Decorator to define an agent entry point. - - This decorator registers a function as an agent in the MCP system. It handles - logging, metadata management, and execution tracking for agent operations. - - Args: - name (Optional[str]): Custom name for the agent. If None, uses 'agent_' + function name - description (Optional[str]): Custom description. If None, uses function docstring - - Returns: - Callable: A decorator function that registers the agent - """ - logger.debug( - f"[AOP] Creating agent decorator with name={name}, description={description}" - ) - - def decorator(func: Callable): - agent_name = name or f"agent_{func.__name__}" - agent_description = description or ( - inspect.getdoc(func) or "" - ) - - @self.mcp.tool( - name=agent_name, description=agent_description - ) - @wraps(func) - async def wrapper(*args, **kwargs): - logger.info(f"[AGENT:{agent_name}] 👤 Starting") - try: - result = await func(*args, **kwargs) - logger.success( - f"[AGENT:{agent_name}] ✅ Finished" - ) - return result - except Exception as e: - logger.error( - f"[AGENT:{agent_name}] ❌ failed with error: {str(e)}" - ) - raise - - wrapper._is_agent = True - wrapper._agent_name = agent_name - wrapper._agent_description = agent_description - return wrapper - - return decorator - - def swarm( - self, - name: Optional[str] = None, - description: Optional[str] = None, - ): - """ - Decorator to define a swarm controller. - - This decorator registers a function as a swarm controller in the MCP system. - It handles logging, metadata management, and execution tracking for swarm operations. - - Args: - name (Optional[str]): Custom name for the swarm. If None, uses 'swarm_' + function name - description (Optional[str]): Custom description. If None, uses function docstring - - Returns: - Callable: A decorator function that registers the swarm - """ - logger.debug( - f"[AOP] Creating swarm decorator with name={name}, description={description}" - ) - - def decorator(func: Callable): - swarm_name = name or f"swarm_{func.__name__}" - swarm_description = description or ( - inspect.getdoc(func) or "" - ) - - @self.mcp.tool( - name=swarm_name, description=swarm_description - ) - @wraps(func) - async def wrapper(*args, **kwargs): - logger.info( - f"[SWARM:{swarm_name}] 🐝 Spawning swarm..." - ) - try: - result = await func(*args, **kwargs) - logger.success( - f"[SWARM:{swarm_name}] 🐝 Completed" - ) - return result - except Exception as e: - logger.error( - f"[SWARM:{swarm_name}] ❌ failed with error: {str(e)}" - ) - raise - - wrapper._is_swarm = True - wrapper._swarm_name = swarm_name - wrapper._swarm_description = swarm_description - return wrapper - - return decorator - - def run(self, method: Literal["stdio", "sse"], *args, **kwargs): - """ - Run the MCP with the specified method. - - Args: - method (Literal['stdio', 'sse']): The execution method to use - *args: Additional positional arguments for the run method - **kwargs: Additional keyword arguments for the run method - - Returns: - Any: The result of the MCP run operation - """ - logger.info(f"[AOP] Running MCP with method: {method}") - try: - result = self.mcp.run(method, *args, **kwargs) - logger.success( - f"[AOP] Successfully ran MCP with method: {method}" - ) - return result - except Exception as e: - logger.error( - f"[AOP] Failed to run MCP with method {method}: {str(e)}" - ) - raise - - def run_stdio(self, *args, **kwargs): - """ - Run the MCP using standard I/O method. - - Args: - *args: Additional positional arguments for the run method - **kwargs: Additional keyword arguments for the run method - - Returns: - Any: The result of the MCP run operation - """ - logger.info("[AOP] Running MCP with stdio method") - return self.run("stdio", *args, **kwargs) - - def run_sse(self, *args, **kwargs): - """ - Run the MCP using Server-Sent Events method. - - Args: - *args: Additional positional arguments for the run method - **kwargs: Additional keyword arguments for the run method - - Returns: - Any: The result of the MCP run operation - """ - logger.info("[AOP] Running MCP with SSE method") - return self.run("sse", *args, **kwargs) - - def list_available( - self, output_type: Literal["str", "list"] = "str" - ): - """ - List all available tools in the MCP. - - Returns: - list: A list of all registered tools - """ - if output_type == "str": - return any_to_str(self.mcp.list_tools()) - elif output_type == "list": - return self.mcp.list_tools() - else: - raise ValueError(f"Invalid output type: {output_type}") - - async def check_utility_exists( - self, url: str, name: str, *args, **kwargs - ): - async with Client(url, *args, **kwargs) as client: - if any(tool.name == name for tool in client.list_tools()): - return True - else: - return False - - async def _call_tool( - self, url: str, name: str, arguments: dict, *args, **kwargs - ): - try: - async with Client(url, *args, **kwargs) as client: - result = await client.call_tool(name, arguments) - logger.info( - f"Client connected: {client.is_connected()}" - ) - return result - except Exception as e: - logger.error(f"Error calling tool: {e}") - return None - - def call_tool( - self, - url: str, - name: str, - arguments: dict, - *args, - **kwargs, - ): - return asyncio.run( - self._call_tool(url, name, arguments, *args, **kwargs) - ) - - def call_tool_or_agent( - self, - url: str, - name: str, - arguments: dict, - output_type: Literal["str", "list"] = "str", - *args, - **kwargs, - ): - """ - Execute a tool or agent by name. - - Args: - name (str): The name of the tool or agent to execute - arguments (dict): The arguments to pass to the tool or agent - """ - if output_type == "str": - return any_to_str( - self.call_tool( - url=url, name=name, arguments=arguments - ) - ) - elif output_type == "list": - return self.call_tool( - url=url, name=name, arguments=arguments - ) - else: - raise ValueError(f"Invalid output type: {output_type}") - - def call_tool_or_agent_batched( - self, - url: str, - names: list[str], - arguments: list[dict], - output_type: Literal["str", "list"] = "str", - *args, - **kwargs, - ): - """ - Execute a list of tools or agents by name. - - Args: - names (list[str]): The names of the tools or agents to execute - """ - if output_type == "str": - return [ - any_to_str( - self.call_tool_or_agent( - url=url, - name=name, - arguments=argument, - *args, - **kwargs, - ) - ) - for name, argument in zip(names, arguments) - ] - elif output_type == "list": - return [ - self.call_tool_or_agent( - url=url, - name=name, - arguments=argument, - *args, - **kwargs, - ) - for name, argument in zip(names, arguments) - ] - else: - raise ValueError(f"Invalid output type: {output_type}") - - def call_tool_or_agent_concurrently( - self, - url: str, - names: list[str], - arguments: list[dict], - output_type: Literal["str", "list"] = "str", - *args, - **kwargs, - ): - """ - Execute a list of tools or agents by name concurrently. - - Args: - names (list[str]): The names of the tools or agents to execute - arguments (list[dict]): The arguments to pass to the tools or agents - """ - outputs = [] - with ThreadPoolExecutor(max_workers=len(names)) as executor: - futures = [ - executor.submit( - self.call_tool_or_agent, - url=url, - name=name, - arguments=argument, - *args, - **kwargs, - ) - for name, argument in zip(names, arguments) - ] - for future in as_completed(futures): - outputs.append(future.result()) - - if output_type == "str": - return any_to_str(outputs) - elif output_type == "list": - return outputs - else: - raise ValueError(f"Invalid output type: {output_type}") - - def call_swarm( - self, - url: str, - name: str, - arguments: dict, - output_type: Literal["str", "list"] = "str", - *args, - **kwargs, - ): - """ - Execute a swarm by name. - - Args: - name (str): The name of the swarm to execute - """ - if output_type == "str": - return any_to_str( - asyncio.run( - self._call_tool( - url=url, - name=name, - arguments=arguments, - ) - ) - ) - elif output_type == "list": - return asyncio.run( - self._call_tool( - url=url, - name=name, - arguments=arguments, - ) - ) - else: - raise ValueError(f"Invalid output type: {output_type}") - - def list_agents( - self, output_type: Literal["str", "list"] = "str" - ): - """ - List all available agents in the MCP. - - Returns: - list: A list of all registered agents - """ - - out = self.list_all() - agents = [] - for item in out: - if "agent" in item["name"]: - agents.append(item) - return agents - - def list_swarms( - self, output_type: Literal["str", "list"] = "str" - ): - """ - List all available swarms in the MCP. - - Returns: - list: A list of all registered swarms - """ - out = self.list_all() - agents = [] - for item in out: - if "swarm" in item["name"]: - agents.append(item) - return agents - - async def _list_all(self): - async with Client(self.url) as client: - return await client.list_tools() - - def list_all(self): - out = asyncio.run(self._list_all()) - - outputs = [] - for tool in out: - outputs.append(tool.model_dump()) - - return outputs - - def list_tool_parameters(self, name: str): - out = self.list_all() - - # Find the tool by name - for tool in out: - if tool["name"] == name: - return tool - return None - - def list_tools_for_multiple_urls(self): - out = [] - for url in self.urls: - out.append(self.list_all(url)) - return out - - def search_if_tool_exists(self, name: str): - out = self.list_all() - for tool in out: - if tool["name"] == name: - return True - return False - - def search( - self, - type: Literal["tool", "agent", "swarm"], - name: str, - output_type: Literal["str", "list"] = "str", - ): - """ - Search for a tool, agent, or swarm by name. - - Args: - type (Literal["tool", "agent", "swarm"]): The type of the item to search for - name (str): The name of the item to search for - - Returns: - dict: The item if found, otherwise None - """ - all_items = self.list_all() - for item in all_items: - if item["name"] == name: - return item - return None diff --git a/swarms/structs/de_hallucination_swarm.py b/swarms/structs/de_hallucination_swarm.py deleted file mode 100644 index 72698d33..00000000 --- a/swarms/structs/de_hallucination_swarm.py +++ /dev/null @@ -1,277 +0,0 @@ -from typing import List, Dict, Any, Optional -import time -from loguru import logger -from swarms.structs.agent import Agent - -# Prompt templates for different agent roles -GENERATOR_PROMPT = """ -You are a knowledgeable assistant tasked with providing accurate information on a wide range of topics. - -Your responsibilities: -1. Provide accurate information based on your training data -2. Use clear, concise language -3. Acknowledge limitations in your knowledge -4. Abstain from making up information when uncertain - -When responding to queries: -- Stick to verified facts -- Cite your sources when possible -- Clearly distinguish between firmly established facts and more tentative claims -- Use phrases like "I'm not certain about..." or "Based on my knowledge up to my training cutoff..." when appropriate -- Avoid overly confident language for uncertain topics - -Remember, it's better to acknowledge ignorance than to provide incorrect information. -""" - -CRITIC_PROMPT = """ -You are a critical reviewer tasked with identifying potential inaccuracies, hallucinations, or unsupported claims in AI-generated text. - -Your responsibilities: -1. Carefully analyze the provided text for factual errors -2. Identify claims that lack sufficient evidence -3. Spot logical inconsistencies -4. Flag overly confident language on uncertain topics -5. Detect potentially hallucinated details (names, dates, statistics, etc.) - -For each issue detected, you should: -- Quote the specific problematic text -- Explain why it's potentially inaccurate -- Rate the severity of the issue (low/medium/high) -- Suggest a specific correction or improvement - -Focus particularly on: -- Unfounded claims presented as facts -- Highly specific details that seem suspicious -- Logical contradictions -- Anachronisms or temporal inconsistencies -- Claims that contradict common knowledge - -Be thorough and specific in your critique. Provide actionable feedback for improvement. -""" - -REFINER_PROMPT = """ -You are a refinement specialist tasked with improving text based on critical feedback. - -Your responsibilities: -1. Carefully review the original text and the critical feedback -2. Make precise modifications to address all identified issues -3. Ensure factual accuracy in the refined version -4. Maintain the intended tone and style of the original -5. Add appropriate epistemic status markers (e.g., "likely", "possibly", "according to...") - -Guidelines for refinement: -- Remove or qualify unsupported claims -- Replace specific details with more general statements when evidence is lacking -- Add appropriate hedging language where certainty is not warranted -- Maintain the helpful intent of the original response -- Ensure logical consistency throughout the refined text -- Add qualifiers or clarify knowledge limitations where appropriate - -The refined text should be helpful and informative while being scrupulously accurate. -""" - -VALIDATOR_PROMPT = """ -You are a validation expert tasked with ensuring the highest standards of accuracy in refined AI outputs. - -Your responsibilities: -1. Verify that all critical issues from previous feedback have been properly addressed -2. Check for any remaining factual inaccuracies or unsupported claims -3. Ensure appropriate epistemic status markers are used -4. Confirm the response maintains a helpful tone while being accurate -5. Provide a final assessment of the response quality - -Assessment structure: -- Issue resolution: Have all previously identified issues been addressed? (Yes/No/Partially) -- Remaining concerns: Are there any remaining factual or logical issues? (List if any) -- Epistemics: Does the response appropriately indicate confidence levels? (Yes/No/Needs improvement) -- Helpfulness: Does the response remain helpful despite necessary qualifications? (Yes/No/Partially) -- Overall assessment: Final verdict on whether the response is ready for user consumption (Approved/Needs further refinement) - -If approved, explain what makes this response trustworthy. If further refinement is needed, provide specific guidance. -""" - - -class DeHallucinationSwarm: - """ - A system of multiple agents that work together to reduce hallucinations in generated content. - The system works through multiple rounds of generation, criticism, refinement, and validation. - """ - - def __init__( - self, - name: str = "DeHallucinationSwarm", - description: str = "A system of multiple agents that work together to reduce hallucinations in generated content.", - model_names: List[str] = [ - "gpt-4o-mini", - "gpt-4o-mini", - "gpt-4o-mini", - "gpt-4o-mini", - ], - iterations: int = 2, - system_prompt: str = GENERATOR_PROMPT, - store_intermediate_results: bool = True, - ): - """ - Initialize the DeHallucinationSwarm with configurable agents. - - Args: - model_names: List of model names for generator, critic, refiner, and validator - iterations: Number of criticism-refinement cycles to perform - store_intermediate_results: Whether to store all intermediate outputs - """ - self.name = name - self.description = description - self.iterations = iterations - self.store_intermediate_results = store_intermediate_results - self.system_prompt = system_prompt - self.history = [] - - # Initialize all agents - self.generator = Agent( - agent_name="Generator", - description="An agent that generates initial responses to queries", - system_prompt=GENERATOR_PROMPT, - model_name=model_names[0], - ) - - self.critic = Agent( - agent_name="Critic", - description="An agent that critiques responses for potential inaccuracies", - system_prompt=CRITIC_PROMPT, - model_name=model_names[1], - ) - - self.refiner = Agent( - agent_name="Refiner", - description="An agent that refines responses based on critique", - system_prompt=REFINER_PROMPT, - model_name=model_names[2], - ) - - self.validator = Agent( - agent_name="Validator", - description="An agent that performs final validation of refined content", - system_prompt=VALIDATOR_PROMPT, - model_name=model_names[3], - ) - - def _log_step( - self, - step_name: str, - content: str, - metadata: Optional[Dict[str, Any]] = None, - ): - """Record a step in the swarm's processing history""" - if self.store_intermediate_results: - timestamp = time.time() - step_record = { - "timestamp": timestamp, - "step": step_name, - "content": content, - } - if metadata: - step_record["metadata"] = metadata - - self.history.append(step_record) - logger.debug(f"Logged step: {step_name}") - - def run(self, query: str) -> Dict[str, Any]: - """ - Process a query through the swarm's multi-agent refinement cycle. - - Args: - query: The user's query to process - - Returns: - Dict containing the final response and processing metadata - """ - logger.info(f"Processing query: {query}") - self.history = [] # Reset history for new query - - # Generate initial response - initial_response = self.generator.run(query) - self._log_step( - "initial_generation", initial_response, {"query": query} - ) - - current_response = initial_response - - # Perform multiple iteration cycles - for i in range(self.iterations): - logger.info(f"Starting iteration {i+1}/{self.iterations}") - - # Step 1: Critique the current response - critique = self.critic.run( - f"Review the following response to the query: '{query}'\n\n{current_response}" - ) - self._log_step(f"critique_{i+1}", critique) - - # Step 2: Refine based on critique - refined_response = self.refiner.run( - f"Refine the following response based on the critique provided.\n\n" - f"Original query: {query}\n\n" - f"Original response: {current_response}\n\n" - f"Critique: {critique}" - ) - self._log_step(f"refinement_{i+1}", refined_response) - - # Update current response for next iteration - current_response = refined_response - - # Final validation - validation = self.validator.run( - f"Validate the following refined response for accuracy and helpfulness.\n\n" - f"Original query: {query}\n\n" - f"Final response: {current_response}" - ) - self._log_step("final_validation", validation) - - # Prepare results - result = { - "query": query, - "final_response": current_response, - "validation_result": validation, - "iteration_count": self.iterations, - } - - if self.store_intermediate_results: - result["processing_history"] = self.history - - return result - - def batch_run(self, queries: List[str]) -> List[Dict[str, Any]]: - """ - Process multiple queries through the swarm. - - Args: - queries: List of user queries to process - - Returns: - List of result dictionaries, one per query - """ - results = [] - for query in queries: - logger.info(f"Processing batch query: {query}") - results.append(self.run(query)) - return results - - -# # Example usage -# if __name__ == "__main__": -# # Configure logger -# logger.add("dehallucinationswarm.log", rotation="10 MB") - -# # Create swarm instance -# swarm = DeHallucinationSwarm(iterations=2) - -# # Example queries that might tempt hallucination -# test_queries = [ -# "Tell me about the history of quantum computing", -# "What are the specific details of the Treaty of Utrecht?", -# "Who won the Nobel Prize in Physics in 2020?", -# "What are the main causes of the economic recession of 2008?", -# ] - -# # Process batch of queries -# results = swarm.batch_run(test_queries) -# print(results) diff --git a/swarms/structs/dynamic_conversational_swarm.py b/swarms/structs/dynamic_conversational_swarm.py deleted file mode 100644 index 84355fa2..00000000 --- a/swarms/structs/dynamic_conversational_swarm.py +++ /dev/null @@ -1,237 +0,0 @@ -import json -import random -from swarms.structs.agent import Agent -from typing import List -from swarms.structs.conversation import Conversation -from swarms.structs.ma_blocks import find_agent_by_name -from swarms.utils.history_output_formatter import ( - history_output_formatter, -) -from swarms.utils.any_to_str import any_to_str - -tools = [ - { - "type": "function", - "function": { - "name": "select_agent", - "description": "Analyzes the input task and selects the most appropriate agent configuration, outputting both the agent name and the formatted response.", - "parameters": { - "type": "object", - "properties": { - "respond_or_no_respond": { - "type": "boolean", - "description": "Whether the agent should respond to the response or not.", - }, - "reasoning": { - "type": "string", - "description": "The reasoning behind the selection of the agent and response.", - }, - "agent_name": { - "type": "string", - "description": "The name of the selected agent that is most appropriate for handling the given task.", - }, - "response": { - "type": "string", - "description": "A clear and structured description of the response for the next agent.", - }, - }, - "required": [ - "reasoning", - "agent_name", - "response", - "respond_or_no_respond", - ], - }, - }, - }, -] - - -class DynamicConversationalSwarm: - def __init__( - self, - name: str = "Dynamic Conversational Swarm", - description: str = "A swarm that uses a dynamic conversational model to solve complex tasks.", - agents: List[Agent] = [], - max_loops: int = 1, - output_type: str = "list", - *args, - **kwargs, - ): - self.name = name - self.description = description - self.agents = agents - self.max_loops = max_loops - self.output_type = output_type - - self.conversation = Conversation() - - # Agents in the chat - agents_in_chat = self.get_agents_info() - self.conversation.add( - role="Conversation Log", content=agents_in_chat - ) - - self.inject_tools() - - # Inject tools into the agents - def inject_tools(self): - for agent in self.agents: - agent.tools_list_dictionary = tools - - def parse_json_into_dict(self, json_str: str) -> dict: - try: - return json.loads(json_str) - except json.JSONDecodeError: - raise ValueError("Invalid JSON string") - - def run_agent(self, agent_name: str, task: str) -> str: - """ - Run a specific agent with a given task. - - Args: - agent_name (str): The name of the agent to run - task (str): The task to execute - - Returns: - str: The agent's response to the task - - Raises: - ValueError: If agent is not found - RuntimeError: If there's an error running the agent - """ - agent = find_agent_by_name( - agents=self.agents, agent_name=agent_name - ) - return agent.run(task) - - def fetch_random_agent_name(self) -> str: - return random.choice(self.agents).agent_name - - def run(self, task: str) -> str: - """ - Run the dynamic conversational swarm for a specified number of loops. - Each agent has access to the full conversation history. - - Args: - task (str): The initial task/prompt to process - - Returns: - str: The final response after all loops are complete - """ - self.conversation.add( - role=f"{self.fetch_random_agent_name()}", content=task - ) - - # for loop in range(self.max_loops): - # # Add loop marker to conversation for clarity - # self.conversation.add( - # role="System", - # content=f"=== Starting Loop {loop + 1}/{self.max_loops} ===" - # ) - - # # First agent interaction - # current_agent = self.randomly_select_agent() - # response = self.run_agent(current_agent.name, self.conversation.get_str()) - # self.conversation.add(role=current_agent.name, content=any_to_str(response)) - - # try: - # # Parse response and get next agent - # response_dict = self.parse_json_into_dict(response) - - # # Check if we should continue or end the loop - # if not response_dict.get("respond_or_no_respond", True): - # break - - # # Get the task description for the next agent - # next_task = response_dict.get("task_description", self.conversation.get_str()) - - # # Run the next agent with the specific task description - # next_agent = self.find_agent_by_name(response_dict["agent_name"]) - # next_response = self.run_agent(next_agent.name, next_task) - - # # Add both the task description and response to the conversation - # self.conversation.add( - # role="System", - # content=f"Response from {response_dict['agent_name']}: {next_task}" - # ) - # self.conversation.add(role=next_agent.name, content=any_to_str(next_response)) - - # except (ValueError, KeyError) as e: - # self.conversation.add( - # role="System", - # content=f"Error in loop {loop + 1}: {str(e)}" - # ) - # break - - # Run first agent - current_agent = self.randomly_select_agent() - response = self.run_agent( - current_agent.agent_name, self.conversation.get_str() - ) - self.conversation.add( - role=current_agent.agent_name, - content=any_to_str(response), - ) - - # Convert to json - response_dict = self.parse_json_into_dict(response) - - # Fetch task - respone_two = response_dict["response"] - agent_name = response_dict["agent_name"] - - print(f"Response from {agent_name}: {respone_two}") - - # Run next agent - next_response = self.run_agent( - agent_name, self.conversation.get_str() - ) - self.conversation.add( - role=agent_name, content=any_to_str(next_response) - ) - - # # Get the next agent - # response_three = self.parse_json_into_dict(next_response) - # agent_name_three = response_three["agent_name"] - # respone_four = response_three["response"] - - # print(f"Response from {agent_name_three}: {respone_four}") - # # Run the next agent - # next_response = self.run_agent(agent_name_three, self.conversation.get_str()) - # self.conversation.add(role=agent_name_three, content=any_to_str(next_response)) - - # Format and return the final conversation history - return history_output_formatter( - self.conversation, type=self.output_type - ) - - def randomly_select_agent(self) -> Agent: - return random.choice(self.agents) - - def get_agents_info(self) -> str: - """ - Fetches and formats information about all available agents in the system. - - Returns: - str: A formatted string containing names and descriptions of all agents. - """ - if not self.agents: - return "No agents currently available in the system." - - agents_info = [ - "Agents In the System:", - "", - ] # Empty string for line spacing - - for idx, agent in enumerate(self.agents, 1): - agents_info.extend( - [ - f"[Agent {idx}]", - f"Name: {agent.name}", - f"Description: {agent.description}", - "", # Empty string for line spacing between agents - ] - ) - - return "\n".join(agents_info).strip() diff --git a/swarms/structs/election_swarm.py b/swarms/structs/election_swarm.py deleted file mode 100644 index 351d2474..00000000 --- a/swarms/structs/election_swarm.py +++ /dev/null @@ -1,270 +0,0 @@ -import uuid -from typing import Any, Callable, Dict, List, Optional, Union - -from swarms.structs.agent import Agent -from swarms.structs.concurrent_workflow import ConcurrentWorkflow -from swarms.structs.conversation import Conversation - - -def _create_voting_prompt(candidate_agents: List[Agent]) -> str: - """ - Create a comprehensive voting prompt for the election. - - This method generates a detailed prompt that instructs voter agents on: - - Available candidates - - Required structured output format - - Evaluation criteria - - Voting guidelines - - Returns: - str: A formatted voting prompt string - """ - candidate_names = [ - (agent.agent_name if hasattr(agent, "agent_name") else str(i)) - for i, agent in enumerate(candidate_agents) - ] - - prompt = f""" - You are participating in an election to choose the best candidate agent. - - Available candidates: {', '.join(candidate_names)} - - Please vote for one candidate and provide your reasoning with the following structured output: - - 1. rationality: A detailed explanation of the reasoning behind your decision. Include logical considerations, supporting evidence, and trade-offs that were evaluated when selecting this candidate. - - 2. self_interest: A comprehensive discussion of how self-interest influenced your decision, if at all. Explain whether personal or role-specific incentives played a role, or if your choice was primarily for the collective benefit of the swarm. - - 3. candidate_agent_name: The full name or identifier of the candidate you are voting for. This should exactly match one of the available candidate names listed above. - - Consider the candidates' capabilities, experience, and alignment with the swarm's objectives when making your decision. - """ - - print(prompt) - - return prompt - - -def get_vote_schema(): - return [ - { - "type": "function", - "function": { - "name": "vote", - "description": "Cast a vote for a CEO candidate with reasoning and self-interest analysis.", - "parameters": { - "type": "object", - "properties": { - "rationality": { - "type": "string", - "description": "A detailed explanation of the reasoning behind this voting decision.", - }, - "self_interest": { - "type": "string", - "description": "A comprehensive discussion of how self-interest factored into the decision.", - }, - "candidate_agent_name": { - "type": "string", - "description": "The full name or identifier of the chosen candidate.", - }, - }, - "required": [ - "rationality", - "self_interest", - "candidate_agent_name", - ], - }, - }, - } - ] - - -class ElectionSwarm: - """ - A swarm system that conducts elections among multiple agents to choose the best candidate. - - The ElectionSwarm orchestrates a voting process where multiple voter agents evaluate - and vote for candidate agents based on their capabilities, experience, and alignment - with swarm objectives. The system uses structured output to ensure consistent voting - format and provides detailed reasoning for each vote. - - Attributes: - id (str): Unique identifier for the election swarm - name (str): Name of the election swarm - description (str): Description of the election swarm's purpose - max_loops (int): Maximum number of voting rounds (default: 1) - agents (List[Agent]): List of voter agents that will participate in the election - candidate_agents (List[Agent]): List of candidate agents to be voted on - kwargs (dict): Additional keyword arguments - show_dashboard (bool): Whether to display the election dashboard - conversation (Conversation): Conversation history for the election - """ - - def __init__( - self, - name: str = "Election Swarm", - description: str = "An election swarm is a swarm of agents that will vote on a candidate.", - agents: Union[List[Agent], List[Callable]] = None, - candidate_agents: Union[List[Agent], List[Callable]] = None, - id: str = str(uuid.uuid4()), - max_loops: int = 1, - show_dashboard: bool = True, - **kwargs, - ): - """ - Initialize the ElectionSwarm. - - Args: - name (str, optional): Name of the election swarm - description (str, optional): Description of the election swarm's purpose - agents (Union[List[Agent], List[Callable]], optional): List of voter agents - candidate_agents (Union[List[Agent], List[Callable]], optional): List of candidate agents - id (str, optional): Unique identifier for the election swarm - max_loops (int, optional): Maximum number of voting rounds (default: 1) - show_dashboard (bool, optional): Whether to display the election dashboard (default: True) - **kwargs: Additional keyword arguments - """ - self.id = id - self.name = name - self.description = description - self.max_loops = max_loops - self.agents = agents - self.candidate_agents = candidate_agents - self.kwargs = kwargs - self.show_dashboard = show_dashboard - self.conversation = Conversation() - - self.reliability_check() - - self.setup_voter_agents() - - def reliability_check(self): - """ - Check the reliability of the voter agents. - """ - if self.agents is None: - raise ValueError("Voter agents are not set") - - if self.candidate_agents is None: - raise ValueError("Candidate agents are not set") - - if self.max_loops is None or self.max_loops < 1: - raise ValueError("Max loops are not set") - - def setup_concurrent_workflow(self): - """ - Create a concurrent workflow for running voter agents in parallel. - - Returns: - ConcurrentWorkflow: A configured concurrent workflow for the election - """ - return ConcurrentWorkflow( - name=self.name, - description=self.description, - agents=self.agents, - output_type="dict-all-except-first", - show_dashboard=self.show_dashboard, - ) - - def run_voter_agents( - self, task: str, img: Optional[str] = None, *args, **kwargs - ): - """ - Execute the voting process by running all voter agents concurrently. - - Args: - task (str): The election task or question to be voted on - img (Optional[str], optional): Image path if visual voting is required - *args: Additional positional arguments - **kwargs: Additional keyword arguments - - Returns: - List[Dict[str, Any]]: Results from all voter agents containing their votes and reasoning - """ - concurrent_workflow = self.setup_concurrent_workflow() - - results = concurrent_workflow.run( - task=task, img=img, *args, **kwargs - ) - - conversation_history = ( - concurrent_workflow.conversation.conversation_history - ) - - for message in conversation_history: - self.conversation.add( - role=message["role"], content=message["content"] - ) - - return results - - def parse_results( - self, results: List[Dict[str, Any]] - ) -> Dict[str, int]: - """ - Parse voting results to count votes for each candidate. - - Args: - results (List[Dict[str, Any]]): List of voting results from voter agents - - Returns: - Dict[str, int]: Dictionary mapping candidate names to their vote counts - """ - # Count the number of votes for each candidate - vote_counts = {} - for result in results: - candidate_name = result["candidate_agent_name"] - vote_counts[candidate_name] = ( - vote_counts.get(candidate_name, 0) + 1 - ) - - # Find the candidate with the most votes - - return vote_counts - - def run( - self, task: str, img: Optional[str] = None, *args, **kwargs - ): - """ - Execute the complete election process. - - This method orchestrates the entire election by: - 1. Adding the task to the conversation history - 2. Running all voter agents concurrently - 3. Collecting and processing the voting results - - Args: - task (str): The election task or question to be voted on - img (Optional[str], optional): Image path if visual voting is required - *args: Additional positional arguments - **kwargs: Additional keyword arguments - - Returns: - List[Dict[str, Any]]: Complete voting results from all agents - """ - self.conversation.add(role="user", content=task) - - results = self.run_voter_agents(task, img, *args, **kwargs) - - print(results) - - return results - - def setup_voter_agents(self): - """ - Configure voter agents with structured output capabilities and voting prompts. - - This method sets up each voter agent with: - - Structured output schema for consistent voting format - - Voting-specific system prompts - - Tools for structured response generation - - Returns: - List[Agent]: Configured voter agents ready for the election - """ - schema = get_vote_schema() - prompt = _create_voting_prompt(self.candidate_agents) - - for agent in self.agents: - agent.tools_list_dictionary = schema - agent.system_prompt += f"\n\n{prompt}"