From db56d0dbaefee713537f3f4dccd49abb2ae60244 Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Fri, 26 Sep 2025 22:06:28 +0300 Subject: [PATCH 1/7] Create election_swarm.py --- swarms/structs/election_swarm.py | 2285 ++++++++++++++++++++++++++++++ 1 file changed, 2285 insertions(+) create mode 100644 swarms/structs/election_swarm.py diff --git a/swarms/structs/election_swarm.py b/swarms/structs/election_swarm.py new file mode 100644 index 00000000..2f6674ce --- /dev/null +++ b/swarms/structs/election_swarm.py @@ -0,0 +1,2285 @@ +"""ElectionSwarm: Multi-agent orchestrator selection with AGENTSNET communication.""" + +import hashlib +import json +import os +import re +import traceback +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +from swarms.structs.agent import Agent +from swarms.structs.conversation import Conversation +from swarms.utils.history_output_formatter import history_output_formatter +from swarms.utils.loguru_logger import initialize_logger +from swarms.utils.output_types import OutputType + +election_logger = initialize_logger(log_folder="election_swarm") + +DEFAULT_BUDGET_LIMIT = 200.0 +DEFAULT_CONSENSUS_THRESHOLD = 0.6 +DEFAULT_MAX_WORKERS = 10 +DEFAULT_MAX_ROUNDS = 5 +DEFAULT_BATCH_SIZE = 25 + + +class ElectionConfigModel(BaseModel): + """ElectionSwarm configuration model for orchestrator selection.""" + + election_type: str = Field(default="orchestrator_selection") + max_candidates: int = Field(default=5, ge=1, le=20) + max_voters: int = Field(default=100, ge=1, le=1000) + enable_consensus: bool = Field(default=True) + enable_leader_election: bool = Field(default=True) + enable_matching: bool = Field(default=True) + enable_coloring: bool = Field(default=True) + enable_vertex_cover: bool = Field(default=True) + enable_caching: bool = Field(default=True) + enable_voter_tool_calls: bool = Field(default=True) + batch_size: int = Field(default=25, ge=1, le=100) + max_workers: int = Field(default=DEFAULT_MAX_WORKERS, ge=1, le=50) + budget_limit: float = Field(default=DEFAULT_BUDGET_LIMIT, ge=0.0) + default_model: str = Field(default="gpt-4o-mini") + verbose_logging: bool = Field(default=False) + + +@dataclass +class ElectionConfig: + """Election configuration manager.""" + + config_file_path: Optional[str] = None + config_data: Optional[Dict[str, Any]] = None + config: ElectionConfigModel = field(init=False) + + def __post_init__(self) -> None: + self._load_config() + + def _load_config(self) -> None: + try: + self.config = ElectionConfigModel() + if self.config_file_path and os.path.exists(self.config_file_path): + self._load_from_file() + if self.config_data: + self._load_from_dict(self.config_data) + except Exception as e: + election_logger.error(f"Failed to load configuration: {str(e)}") + raise + + def _load_from_file(self) -> None: + try: + import yaml + + with open(self.config_file_path, "r", encoding="utf-8") as f: + file_config = yaml.safe_load(f) + self._load_from_dict(file_config) + except Exception as e: + election_logger.warning(f"Failed to load config file: {e}") + raise + + def _load_from_dict(self, config_dict: Dict[str, Any]) -> None: + for key, value in config_dict.items(): + if hasattr(self.config, key): + try: + setattr(self.config, key, value) + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid config {key}: {e}") + + def get_config(self) -> ElectionConfigModel: + return self.config + + def update_config(self, updates: Dict[str, Any]) -> None: + try: + self._load_from_dict(updates) + except ValueError as e: + election_logger.error(f"Failed to update configuration: {e}") + raise + + def save_config(self, file_path: Optional[str] = None) -> None: + save_path = file_path or self.config_file_path + if not save_path: + return + try: + import yaml + + config_dict = self.config.model_dump() + os.makedirs(os.path.dirname(save_path), exist_ok=True) + with open(save_path, "w", encoding="utf-8") as f: + yaml.dump(config_dict, f, default_flow_style=False, indent=2) + except Exception as e: + election_logger.error(f"Failed to save config: {e}") + raise + + def validate_config(self) -> List[str]: + errors = [] + try: + self.config.model_validate(self.config.model_dump()) + except Exception as e: + errors.append(f"Configuration validation failed: {e}") + if self.config.budget_limit < 0: + errors.append("Budget limit must be non-negative") + if self.config.max_candidates < 1: + errors.append("Max candidates must be at least 1") + if self.config.max_voters < 1: + errors.append("Max voters must be at least 1") + return errors + + +class VoterType(str, Enum): + """Voter types in the election system.""" + + INDIVIDUAL = "individual" + GROUP = "group" + EXPERT = "expert" + DELEGATE = "delegate" + + +class CandidateType(str, Enum): + """Candidate types in the election system.""" + + INDIVIDUAL = "individual" + COALITION = "coalition" + PARTY = "party" + MOVEMENT = "movement" + + +class ElectionAlgorithm(str, Enum): + """AGENTSNET algorithms for coordination.""" + + CONSENSUS = "consensus" + LEADER_ELECTION = "leader_election" + MATCHING = "matching" + COLORING = "coloring" + VERTEX_COVER = "vertex_cover" + + +class MessagePassingProtocol: + """AGENTSNET-inspired message-passing protocol.""" + + def __init__(self, rounds: int = 5, synchronous: bool = True): + self.rounds = rounds + self.synchronous = synchronous + self.current_round = 0 + self.message_history: List[Dict[str, Any]] = [] + + def create_system_prompt( + self, + agent_name: str, + neighbors: List[str], + task_description: str, + ) -> str: + neighbors_str = ", ".join(neighbors) + return f"""You are an agent named {agent_name} connected with neighbors: {neighbors_str}. + +{task_description} + +Communication Rules: +1. You can only communicate with immediate neighbors: {neighbors_str} +2. Synchronous message-passing in {self.rounds} rounds +3. Output JSON messages: {{"neighbor_name": "message"}} +4. After {self.rounds} rounds, provide your final answer +5. Base decisions on information from neighbors and your own reasoning + +Think step-by-step about your strategy and communicate it clearly to neighbors.""" + + def send_message(self, from_agent: str, to_agent: str, message: str) -> None: + self.message_history.append( + { + "round": self.current_round, + "from": from_agent, + "to": to_agent, + "message": message, + "timestamp": datetime.now(), + } + ) + + def get_messages_for_agent(self, agent_name: str) -> List[Dict[str, Any]]: + return [msg for msg in self.message_history if msg["to"] == agent_name] + + def advance_round(self) -> None: + self.current_round += 1 + + def reset(self) -> None: + self.current_round = 0 + self.message_history.clear() + + +class VoteResult(str, Enum): + """Possible vote results.""" + + FOR = "for" + AGAINST = "against" + ABSTAIN = "abstain" + INVALID = "invalid" + + +@dataclass +class CostTracker: + """Track costs and usage for budget management.""" + + total_tokens_used: int = 0 + total_cost_estimate: float = 0.0 + budget_limit: float = DEFAULT_BUDGET_LIMIT + token_cost_per_1m: float = 0.15 + requests_made: int = 0 + cache_hits: int = 0 + + def add_tokens(self, tokens: int) -> None: + self.total_tokens_used += tokens + self.total_cost_estimate = ( + self.total_tokens_used / 1_000_000 + ) * self.token_cost_per_1m + self.requests_made += 1 + + def add_cache_hit(self) -> None: + self.cache_hits += 1 + + def check_budget(self) -> bool: + return self.total_cost_estimate <= self.budget_limit + + def get_stats(self) -> Dict[str, Any]: + return { + "total_tokens": self.total_tokens_used, + "total_cost": self.total_cost_estimate, + "requests_made": self.requests_made, + "cache_hits": self.cache_hits, + "cache_hit_rate": self.cache_hits + / max(1, self.requests_made + self.cache_hits), + "budget_remaining": max(0, self.budget_limit - self.total_cost_estimate), + } + + +@dataclass +class VoterProfile: + """Represents an agent voter in the orchestrator selection system.""" + + voter_id: str + name: str + voter_type: VoterType + preferences: Dict[str, Any] = field(default_factory=dict) + expertise_areas: List[str] = field(default_factory=list) + voting_weight: float = 1.0 + agent: Optional[Agent] = None + is_loaded: bool = False + demographics: Dict[str, Any] = field(default_factory=dict) + past_voting_history: List[Dict[str, Any]] = field(default_factory=list) + neighbors: List[str] = field(default_factory=list) + coordination_style: str = "collaborative" + leadership_preferences: Dict[str, float] = field(default_factory=dict) + + +@dataclass +class CandidateProfile: + """Represents an orchestrator candidate in the selection system.""" + + candidate_id: str + name: str + candidate_type: CandidateType + party_affiliation: Optional[str] = None + policy_positions: Dict[str, Any] = field(default_factory=dict) + campaign_promises: List[str] = field(default_factory=list) + experience: List[str] = field(default_factory=list) + agent: Optional[Agent] = None + is_loaded: bool = False + support_base: Dict[str, float] = field(default_factory=dict) + campaign_strategy: Dict[str, Any] = field(default_factory=dict) + leadership_style: str = "collaborative" + coordination_approach: Dict[str, Any] = field(default_factory=dict) + technical_expertise: List[str] = field(default_factory=list) + + +@dataclass +class VoteCounterProfile: + """Represents a vote counter agent responsible for counting votes and presenting results.""" + + counter_id: str + name: str + role: str = "Vote Counter" + credentials: List[str] = field(default_factory=list) + counting_methodology: str = "transparent" + reporting_style: str = "comprehensive" + agent: Optional[Agent] = None + is_loaded: bool = False + counting_experience: List[str] = field(default_factory=list) + verification_protocols: List[str] = field(default_factory=list) + documentation_standards: List[str] = field(default_factory=list) + result_presentation_style: str = "detailed" + + +@dataclass +class VoterDecision: + """Structured output from voter agents.""" + + voter_id: str + rationality: str + vote: VoteResult + confidence: float = 0.0 + reasoning_factors: List[str] = field(default_factory=list) + candidate_rankings: Dict[str, int] = field(default_factory=dict) + tool_call_explanation: Optional[str] = None + timestamp: datetime = field(default_factory=datetime.now) + + +@dataclass +class VoteCountingResult: + """Results of vote counting process.""" + + counter_id: str + counter_name: str + counting_timestamp: datetime = field(default_factory=datetime.now) + total_votes_counted: int = 0 + valid_votes: int = 0 + invalid_votes: int = 0 + abstentions: int = 0 + vote_breakdown: Dict[str, int] = field(default_factory=dict) + counting_notes: List[str] = field(default_factory=list) + verification_completed: bool = False + counting_methodology: str = "transparent" + documentation_provided: bool = False + + +@dataclass +class ElectionResult: + """Results of an election.""" + + election_id: str + algorithm_used: ElectionAlgorithm + total_voters: int + total_candidates: int + votes_cast: int + winner: Optional[str] = None + vote_distribution: Dict[str, int] = field(default_factory=dict) + voter_decisions: List[VoterDecision] = field(default_factory=list) + consensus_reached: bool = False + rounds_to_consensus: int = 0 + timestamp: datetime = field(default_factory=datetime.now) + vote_counting_result: Optional[VoteCountingResult] = None + + +class ElectionSwarm: + """Multi-agent orchestrator selection system with AGENTSNET coordination algorithms.""" + + def __init__( + self, + name: str = "ElectionSwarm", + description: str = "Orchestrator selection with AGENTSNET algorithms", + voters: Optional[List[VoterProfile]] = None, + candidates: Optional[List[CandidateProfile]] = None, + vote_counter: Optional[VoteCounterProfile] = None, + election_config: Optional[ElectionConfig] = None, + max_loops: int = 1, + output_type: OutputType = "dict-all-except-first", + verbose: bool = False, + enable_lazy_loading: bool = True, + enable_caching: bool = True, + batch_size: int = 25, + budget_limit: float = 200.0, + *args: Any, + **kwargs: Any, + ) -> None: + self.name = name + self.description = description + self.voters = voters or [] + self.candidates = candidates or [] + self.vote_counter = vote_counter + self.election_config = election_config or ElectionConfig() + self.max_loops = max_loops + self.output_type = output_type + self.verbose = verbose + self.enable_lazy_loading = enable_lazy_loading + self.enable_caching = enable_caching + self.batch_size = batch_size + self.budget_limit = budget_limit + + self.conversation = Conversation(time_enabled=False) + self.cost_tracker = CostTracker(budget_limit=budget_limit) + self.cache: Dict[str, str] = {} + cpu_count = os.cpu_count() or 4 + self.max_workers = min(self.election_config.config.max_workers, cpu_count) + self.message_protocol = MessagePassingProtocol(rounds=5) + + self._init_election_swarm() + + def _init_election_swarm(self) -> None: + if self.verbose: + election_logger.info(f"Initializing ElectionSwarm: {self.name}") + + self._perform_reliability_checks() + + if not self.voters: + self._setup_default_voters() + + if not self.candidates: + self._setup_default_candidates() + + if not self.vote_counter: + self._setup_default_vote_counter() + + self._add_context_to_agents() + + if self.verbose: + election_logger.info(f"ElectionSwarm initialized successfully: {self.name}") + + def _perform_reliability_checks(self) -> None: + try: + if self.verbose: + election_logger.info(f"Running reliability checks for: {self.name}") + + # Default voters and candidates will be set up in _setup_default_* methods + pass + + if self.max_loops <= 0: + raise ValueError("Max loops must be greater than 0.") + + if self.verbose: + election_logger.info(f"Reliability checks passed for: {self.name}") + + except Exception as e: + error_msg = f"Failed reliability checks: {str(e)}\nTraceback: {traceback.format_exc()}" + election_logger.error(error_msg) + raise + + def _setup_default_voters(self) -> None: + if self.verbose: + election_logger.info("Setting up default voters with neighbor connections") + + default_voters = [ + VoterProfile( + voter_id="voter_1", + name="Alice Johnson", + voter_type=VoterType.INDIVIDUAL, + preferences={ + "economy": 0.8, + "environment": 0.6, + "healthcare": 0.9, + }, + expertise_areas=["economics", "healthcare"], + demographics={ + "age": 35, + "education": "college", + "income": "middle", + }, + neighbors=["Bob Smith", "Carol Davis"], + ), + VoterProfile( + voter_id="voter_2", + name="Bob Smith", + voter_type=VoterType.INDIVIDUAL, + preferences={ + "economy": 0.9, + "environment": 0.4, + "security": 0.8, + }, + expertise_areas=["economics", "security"], + demographics={ + "age": 42, + "education": "graduate", + "income": "high", + }, + neighbors=["Alice Johnson", "Carol Davis"], + ), + VoterProfile( + voter_id="voter_3", + name="Carol Davis", + voter_type=VoterType.EXPERT, + preferences={ + "environment": 0.9, + "education": 0.8, + "social_justice": 0.7, + }, + expertise_areas=["environment", "education"], + demographics={ + "age": 28, + "education": "phd", + "income": "middle", + }, + neighbors=["Alice Johnson", "Bob Smith"], + ), + ] + + self.voters = default_voters + + if self.verbose: + election_logger.info( + f"Set up {len(default_voters)} default voters with neighbor connections" + ) + + def _setup_default_candidates(self) -> None: + if self.verbose: + election_logger.info("Setting up default candidates") + + default_candidates = [ + CandidateProfile( + candidate_id="candidate_1", + name="John Progressive", + candidate_type=CandidateType.INDIVIDUAL, + party_affiliation="Progressive Party", + policy_positions={ + "economy": "stimulus", + "environment": "green_new_deal", + "healthcare": "universal", + }, + campaign_promises=[ + "Universal healthcare", + "Green energy transition", + "Education reform", + ], + experience=[ + "Mayor", + "State Senator", + "Business Leader", + ], + ), + CandidateProfile( + candidate_id="candidate_2", + name="Sarah Conservative", + candidate_type=CandidateType.INDIVIDUAL, + party_affiliation="Conservative Party", + policy_positions={ + "economy": "tax_cuts", + "environment": "balanced", + "security": "strong_defense", + }, + campaign_promises=[ + "Tax reduction", + "Strong defense", + "Traditional values", + ], + experience=[ + "Governor", + "Business Executive", + "Military Officer", + ], + ), + ] + + self.candidates = default_candidates + + if self.verbose: + election_logger.info(f"Set up {len(default_candidates)} default candidates") + + def _setup_default_vote_counter(self) -> None: + if self.verbose: + election_logger.info("Setting up default vote counter") + + default_vote_counter = VoteCounterProfile( + counter_id="counter_001", + name="Election Commissioner", + role="Vote Counter", + credentials=["Certified Election Official", "Transparency Specialist"], + counting_methodology="transparent", + reporting_style="comprehensive", + counting_experience=[ + "Election oversight (10 years)", + "Vote counting and verification", + "Result documentation and reporting" + ], + verification_protocols=[ + "Double-count verification", + "Cross-reference validation", + "Audit trail documentation" + ], + documentation_standards=[ + "Detailed vote breakdown", + "Verification documentation", + "Transparent reporting" + ], + result_presentation_style="detailed" + ) + + self.vote_counter = default_vote_counter + + if self.verbose: + election_logger.info("Set up default vote counter") + + def _add_context_to_agents(self) -> None: + try: + if self.verbose: + election_logger.info("Adding context to agents") + + for voter in self.voters: + if voter.agent: + self._add_voter_context(voter) + + for candidate in self.candidates: + if candidate.agent: + self._add_candidate_context(candidate) + + if self.vote_counter and self.vote_counter.agent: + self._add_vote_counter_context(self.vote_counter) + + if self.verbose: + election_logger.info("Context added to agents successfully") + + except Exception as e: + error_msg = f"Failed to add context to agents: {str(e)}\nTraceback: {traceback.format_exc()}" + election_logger.error(error_msg) + raise + + def _add_voter_context(self, voter: VoterProfile) -> None: + if not voter.agent: + return + + context = f""" + Voter Context: + - Name: {voter.name} + - Type: {voter.voter_type.value} + - Preferences: {voter.preferences} + - Expertise: {voter.expertise_areas} + - Demographics: {voter.demographics} + """ + + voter.agent.system_prompt += context + + def _add_candidate_context(self, candidate: CandidateProfile) -> None: + if not candidate.agent: + return + + context = f""" + Candidate Context: + - Name: {candidate.name} + - Type: {candidate.candidate_type.value} + - Party: {candidate.party_affiliation} + - Policies: {candidate.policy_positions} + - Promises: {candidate.campaign_promises} + - Experience: {candidate.experience} + """ + + candidate.agent.system_prompt += context + + def _add_vote_counter_context(self, vote_counter: VoteCounterProfile) -> None: + if not vote_counter.agent: + return + + context = f""" + Vote Counter Context: + - Name: {vote_counter.name} + - Role: {vote_counter.role} + - Credentials: {vote_counter.credentials} + - Methodology: {vote_counter.counting_methodology} + - Reporting Style: {vote_counter.reporting_style} + - Experience: {vote_counter.counting_experience} + - Verification Protocols: {vote_counter.verification_protocols} + """ + + vote_counter.agent.system_prompt += context + + def _load_voter_agent(self, voter: VoterProfile) -> Agent: + """ + Load an agent for a voter with AGENTSNET-style communication. + + Args: + voter: The voter profile to create an agent for + + Returns: + Agent: The loaded voter agent + """ + if voter.agent and voter.is_loaded: + return voter.agent + + # Create AGENTSNET-style system prompt + task_description = f"""You are {voter.name}, a {voter.voter_type.value} agent in an orchestrator selection process. + +Your profile: +- Preferences: {voter.preferences} +- Expertise areas: {voter.expertise_areas} +- Demographics: {voter.demographics} +- Coordination style: {voter.coordination_style} +- Leadership preferences: {voter.leadership_preferences} + +Your task is to select the best orchestrator candidate after communicating with neighbors. +The orchestrator will coordinate multi-agent workflows and manage team collaboration. +When voting, provide structured output with: +1. Rationality: Your detailed reasoning for your orchestrator selection +2. Vote: Your actual vote (for/against/abstain) +3. Confidence: Your confidence level (0.0-1.0) +4. Reasoning factors: Key factors that influenced your decision +5. Candidate rankings: How you rank each orchestrator candidate""" + + system_prompt = self.message_protocol.create_system_prompt( + agent_name=voter.name, + neighbors=voter.neighbors, + task_description=task_description, + ) + + # Create tools for voter explanation if enabled + tools = [] + # Note: Tool creation disabled due to BaseTool validation issues + # if self.election_config.config.enable_voter_tool_calls: + # tools.append(self._create_voting_explanation_tool(voter)) + + agent = Agent( + agent_name=voter.name, + agent_description=f"{voter.voter_type.value} voter with expertise in {', '.join(voter.expertise_areas)}", + model_name=self.election_config.config.default_model, + max_loops=1, + system_prompt=system_prompt, + ) + + voter.agent = agent + voter.is_loaded = True + + return agent + + def _create_voting_explanation_tool(self, voter: VoterProfile) -> Dict[str, Any]: + """ + Create a tool for voter to explain their voting decision. + + Args: + voter: The voter profile to create the tool for + + Returns: + Dict[str, Any]: Tool definition for voting explanation + """ + return { + "type": "function", + "function": { + "name": "explain_voting_decision", + "description": f"Explain why {voter.name} is voting for a specific candidate", + "parameters": { + "type": "object", + "properties": { + "voter_name": { + "type": "string", + "description": f"The name of the voter: {voter.name}", + }, + "voter_id": { + "type": "string", + "description": f"The ID of the voter: {voter.voter_id}", + }, + "chosen_candidate": { + "type": "string", + "description": "The name of the candidate being voted for", + }, + "voting_reasoning": { + "type": "string", + "description": "Detailed explanation of why this candidate was chosen", + }, + "key_factors": { + "type": "array", + "items": {"type": "string"}, + "description": "List of key factors that influenced the voting decision", + }, + "confidence_level": { + "type": "number", + "minimum": 0.0, + "maximum": 1.0, + "description": "Confidence level in the voting decision (0.0-1.0)", + }, + "alternative_considerations": { + "type": "string", + "description": "What other candidates were considered and why they were not chosen", + }, + }, + "required": [ + "voter_name", + "voter_id", + "chosen_candidate", + "voting_reasoning", + "key_factors", + "confidence_level", + ], + }, + }, + } + + def _load_candidate_agent(self, candidate: CandidateProfile) -> Agent: + """ + Load an agent for a candidate. + + Args: + candidate: The candidate profile to create an agent for + + Returns: + Agent: The loaded candidate agent + """ + if candidate.agent and candidate.is_loaded: + return candidate.agent + + system_prompt = f"""You are {candidate.name}, a candidate for election. + +Your profile: +- Party affiliation: {candidate.party_affiliation} +- Policy positions: {candidate.policy_positions} +- Campaign promises: {candidate.campaign_promises} +- Experience: {candidate.experience} + +Your role is to: +1. Present your platform and policies clearly +2. Respond to voter questions and concerns +3. Make compelling arguments for your candidacy +4. Address criticisms and challenges + +Always be professional, articulate, and focused on your key messages.""" + + agent = Agent( + agent_name=candidate.name, + agent_description=f"Candidate for election representing {candidate.party_affiliation}", + model_name=self.election_config.config.default_model, + max_loops=1, + system_prompt=system_prompt, + ) + + candidate.agent = agent + candidate.is_loaded = True + + return agent + + def _load_vote_counter_agent(self, vote_counter: VoteCounterProfile) -> Agent: + """ + Load an agent for the vote counter. + + Args: + vote_counter: The vote counter profile to create an agent for + + Returns: + Agent: The loaded vote counter agent + """ + if vote_counter.agent and vote_counter.is_loaded: + return vote_counter.agent + + system_prompt = f"""You are {vote_counter.name}, the official vote counter for this election. + +Your role and responsibilities: +- Count all votes accurately and transparently +- Verify vote integrity and validity +- Document the counting process thoroughly +- Present results in a clear, comprehensive manner +- Ensure all procedures follow {vote_counter.counting_methodology} methodology +- Maintain {vote_counter.reporting_style} reporting standards + +Your credentials: {vote_counter.credentials} +Your experience: {vote_counter.counting_experience} +Your verification protocols: {vote_counter.verification_protocols} +Your documentation standards: {vote_counter.documentation_standards} + +When counting votes, you must: +1. Verify each vote is valid and properly cast +2. Count votes for each candidate accurately +3. Document the counting process step-by-step +4. Provide detailed breakdown of results +5. Ensure transparency and auditability +6. Present results in a professional, comprehensive manner + +Your counting methodology: {vote_counter.counting_methodology} +Your result presentation style: {vote_counter.result_presentation_style} + +Always maintain the highest standards of accuracy, transparency, and documentation.""" + + agent = Agent( + agent_name=vote_counter.name, + agent_description=f"Official vote counter with {vote_counter.counting_methodology} methodology", + model_name=self.election_config.config.default_model, + max_loops=1, + system_prompt=system_prompt, + ) + + vote_counter.agent = agent + vote_counter.is_loaded = True + + return agent + + def _get_cache_key(self, task: str, participants: List[str]) -> str: + """ + Generate cache key for a task. + + Args: + task: The task description + participants: List of participant names + + Returns: + str: MD5 hash of the cache key + """ + content = f"{task}:{':'.join(sorted(participants))}" + return hashlib.md5(content.encode()).hexdigest() + + def _check_cache(self, cache_key: str) -> Optional[str]: + """ + Check if result is cached. + + Args: + cache_key: The cache key to check + + Returns: + Optional[str]: Cached result if found, None otherwise + """ + if not self.enable_caching: + return None + + if cache_key in self.cache: + self.cost_tracker.add_cache_hit() + if self.verbose: + election_logger.info(f"Cache hit for key: {cache_key[:8]}...") + return self.cache[cache_key] + + return None + + def _cache_response(self, cache_key: str, response: str) -> None: + """ + Cache a response. + + Args: + cache_key: The cache key to store under + response: The response to cache + """ + if self.enable_caching: + self.cache[cache_key] = response + + def conduct_election( + self, + election_type: ElectionAlgorithm = ElectionAlgorithm.CONSENSUS, + participants: Optional[List[str]] = None, + max_rounds: int = 5, + ) -> ElectionResult: + """ + Conduct an election using the specified algorithm. + + Args: + election_type: The AGENTSNET algorithm to use + participants: List of participant IDs (optional) + max_rounds: Maximum number of rounds for consensus algorithms + + Returns: + ElectionResult: The results of the election + """ + try: + if self.verbose: + election_logger.info(f"Conducting {election_type.value} election") + + # Initialize election result + election_id = f"election_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + result = ElectionResult( + election_id=election_id, + algorithm_used=election_type, + total_voters=len(self.voters), + total_candidates=len(self.candidates), + votes_cast=0, + ) + + # Conduct election based on algorithm type + if election_type == ElectionAlgorithm.CONSENSUS: + result = self._conduct_consensus_election(result, max_rounds) + elif election_type == ElectionAlgorithm.LEADER_ELECTION: + result = self._conduct_leader_election(result) + elif election_type == ElectionAlgorithm.MATCHING: + result = self._conduct_matching_election(result) + elif election_type == ElectionAlgorithm.COLORING: + result = self._conduct_coloring_election(result) + elif election_type == ElectionAlgorithm.VERTEX_COVER: + result = self._conduct_vertex_cover_election(result) + else: + raise ValueError(f"Unsupported election algorithm: {election_type}") + + if self.verbose: + election_logger.info(f"Election completed: {election_id}") + if result.winner: + election_logger.info(f"Winner: {result.winner}") + + return result + + except Exception as e: + error_msg = ( + f"Failed to conduct election: {str(e)}\n" + f"Traceback: {traceback.format_exc()}" + ) + election_logger.error(error_msg) + raise + + def _conduct_consensus_election( + self, result: ElectionResult, max_rounds: int + ) -> ElectionResult: + """ + Conduct a consensus-based election. + + Args: + result: The election result object to populate + max_rounds: Maximum number of consensus rounds + + Returns: + ElectionResult: The completed election result + """ + if self.verbose: + election_logger.info("Conducting consensus election") + + round_num = 0 + consensus_reached = False + + while round_num < max_rounds and not consensus_reached: + round_num += 1 + + if self.verbose: + election_logger.info(f"Consensus round {round_num}/{max_rounds}") + + # Collect votes from all voters + voter_decisions = self._collect_voter_decisions() + result.voter_decisions.extend(voter_decisions) + + # Check for consensus + consensus_reached = self._check_consensus(voter_decisions) + + if consensus_reached: + result.consensus_reached = True + result.rounds_to_consensus = round_num + result.winner = self._determine_winner(voter_decisions) + break + + if not consensus_reached: + # Use majority vote as fallback + result.winner = self._determine_winner(result.voter_decisions) + + result.votes_cast = len(result.voter_decisions) + + # Count votes using the vote counter + result.vote_counting_result = self._count_votes(result.voter_decisions) + + return result + + def _conduct_leader_election(self, result: ElectionResult) -> ElectionResult: + """ + Conduct a leader election using distributed algorithms. + + Args: + result: The election result object to populate + + Returns: + ElectionResult: The completed election result + """ + if self.verbose: + election_logger.info("Conducting leader election") + + # Collect votes from all voters + voter_decisions = self._collect_voter_decisions() + result.voter_decisions = voter_decisions + + # Determine winner based on votes + result.winner = self._determine_winner(voter_decisions) + result.votes_cast = len(voter_decisions) + + return result + + def _conduct_matching_election(self, result: ElectionResult) -> ElectionResult: + """ + Conduct a matching-based election. + + This method implements maximal matching algorithms for pairing + voters with candidates based on compatibility. + + Args: + result: The election result object to populate + + Returns: + ElectionResult: The completed election result + """ + if self.verbose: + election_logger.info("Conducting matching election") + + # This would implement maximal matching algorithms + # For now, use standard voting + voter_decisions = self._collect_voter_decisions() + result.voter_decisions = voter_decisions + result.winner = self._determine_winner(voter_decisions) + result.votes_cast = len(voter_decisions) + + return result + + def _conduct_coloring_election(self, result: ElectionResult) -> ElectionResult: + """ + Conduct a coloring-based election. + + This method implements (Δ+1)-coloring algorithms for grouping + voters and candidates based on compatibility constraints. + + Args: + result: The election result object to populate + + Returns: + ElectionResult: The completed election result + """ + if self.verbose: + election_logger.info("Conducting coloring election") + + # This would implement (Δ+1)-coloring algorithms + # For now, use standard voting + voter_decisions = self._collect_voter_decisions() + result.voter_decisions = voter_decisions + result.winner = self._determine_winner(voter_decisions) + result.votes_cast = len(voter_decisions) + + return result + + def _conduct_vertex_cover_election(self, result: ElectionResult) -> ElectionResult: + """ + Conduct a vertex cover-based election. + + This method implements minimal vertex cover algorithms for + selecting a minimal set of coordinators to cover all voters. + + Args: + result: The election result object to populate + + Returns: + ElectionResult: The completed election result + """ + if self.verbose: + election_logger.info("Conducting vertex cover election") + + # This would implement minimal vertex cover algorithms + # For now, use standard voting + voter_decisions = self._collect_voter_decisions() + result.voter_decisions = voter_decisions + result.winner = self._determine_winner(voter_decisions) + result.votes_cast = len(voter_decisions) + + # Count votes using the vote counter + result.vote_counting_result = self._count_votes(voter_decisions) + + return result + + def _count_votes(self, voter_decisions: List[VoterDecision]) -> VoteCountingResult: + """ + Count votes using the vote counter agent. + + Args: + voter_decisions: List of voter decisions to count + + Returns: + VoteCountingResult: The vote counting results + """ + if not self.vote_counter: + # Create a basic counting result if no vote counter is available + return VoteCountingResult( + counter_id="default", + counter_name="System Counter", + total_votes_counted=len(voter_decisions), + valid_votes=len([d for d in voter_decisions if d.vote != VoteResult.INVALID]), + invalid_votes=len([d for d in voter_decisions if d.vote == VoteResult.INVALID]), + abstentions=len([d for d in voter_decisions if d.vote == VoteResult.ABSTAIN]), + vote_breakdown=self._calculate_vote_breakdown(voter_decisions), + verification_completed=True, + documentation_provided=True + ) + + try: + # Load vote counter agent + counter_agent = self._load_vote_counter_agent(self.vote_counter) + + # Create vote counting prompt + counting_prompt = self._create_vote_counting_prompt(voter_decisions) + + # Get counting response from vote counter + counting_response = counter_agent.run(task=counting_prompt) + + # Parse counting results + counting_result = self._parse_vote_counting_response(counting_response, voter_decisions) + + return counting_result + + except Exception as e: + election_logger.error(f"Failed to count votes with vote counter: {str(e)}") + # Fallback to basic counting + return VoteCountingResult( + counter_id=self.vote_counter.counter_id, + counter_name=self.vote_counter.name, + total_votes_counted=len(voter_decisions), + valid_votes=len([d for d in voter_decisions if d.vote != VoteResult.INVALID]), + invalid_votes=len([d for d in voter_decisions if d.vote == VoteResult.INVALID]), + abstentions=len([d for d in voter_decisions if d.vote == VoteResult.ABSTAIN]), + vote_breakdown=self._calculate_vote_breakdown(voter_decisions), + verification_completed=False, + documentation_provided=False, + counting_notes=[f"Error in vote counting: {str(e)}"] + ) + + def _create_vote_counting_prompt(self, voter_decisions: List[VoterDecision]) -> str: + """ + Create a prompt for the vote counter to count votes. + + Args: + voter_decisions: List of voter decisions to count + + Returns: + str: Vote counting prompt + """ + prompt = f"""OFFICIAL VOTE COUNTING + +You are tasked with counting and verifying all votes in this election. + +VOTER DECISIONS TO COUNT: +""" + + for i, decision in enumerate(voter_decisions, 1): + prompt += f""" +Vote #{i}: +- Voter ID: {decision.voter_id} +- Vote: {decision.vote.value} +- Confidence: {decision.confidence} +- Reasoning: {decision.rationality} +- Candidate Rankings: {decision.candidate_rankings} +""" + + prompt += f""" +CANDIDATES IN THIS ELECTION: +""" + for candidate in self.candidates: + prompt += f"- {candidate.name} ({candidate.party_affiliation})\n" + + prompt += """ +YOUR TASK: +1. Count all votes accurately +2. Verify vote validity +3. Calculate vote distribution by candidate +4. Identify the winner +5. Document the counting process +6. Provide detailed breakdown + +Please provide your counting results in the following JSON format: +{ + "total_votes_counted": number, + "valid_votes": number, + "invalid_votes": number, + "abstentions": number, + "vote_breakdown": { + "candidate_name": vote_count + }, + "winner": "candidate_name", + "counting_notes": ["note1", "note2"], + "verification_completed": true, + "documentation_provided": true +} + +Ensure accuracy, transparency, and complete documentation of the counting process.""" + + return prompt + + def _parse_vote_counting_response(self, response: str, voter_decisions: List[VoterDecision]) -> VoteCountingResult: + """ + Parse the vote counter's response. + + Args: + response: Response from the vote counter agent + voter_decisions: Original voter decisions for validation + + Returns: + VoteCountingResult: Parsed counting results + """ + try: + # Extract JSON from response + json_match = re.search(r"\{.*\}", response, re.DOTALL) + if json_match: + parsed = json.loads(json_match.group()) + else: + # Fallback parsing + parsed = {} + + # Create counting result + counting_result = VoteCountingResult( + counter_id=self.vote_counter.counter_id, + counter_name=self.vote_counter.name, + total_votes_counted=parsed.get("total_votes_counted", len(voter_decisions)), + valid_votes=parsed.get("valid_votes", len([d for d in voter_decisions if d.vote != VoteResult.INVALID])), + invalid_votes=parsed.get("invalid_votes", len([d for d in voter_decisions if d.vote == VoteResult.INVALID])), + abstentions=parsed.get("abstentions", len([d for d in voter_decisions if d.vote == VoteResult.ABSTAIN])), + vote_breakdown=parsed.get("vote_breakdown", self._calculate_vote_breakdown(voter_decisions)), + counting_notes=parsed.get("counting_notes", []), + verification_completed=parsed.get("verification_completed", True), + documentation_provided=parsed.get("documentation_provided", True) + ) + + return counting_result + + except Exception as e: + election_logger.error(f"Failed to parse vote counting response: {str(e)}") + # Return basic counting result + return VoteCountingResult( + counter_id=self.vote_counter.counter_id, + counter_name=self.vote_counter.name, + total_votes_counted=len(voter_decisions), + valid_votes=len([d for d in voter_decisions if d.vote != VoteResult.INVALID]), + invalid_votes=len([d for d in voter_decisions if d.vote == VoteResult.INVALID]), + abstentions=len([d for d in voter_decisions if d.vote == VoteResult.ABSTAIN]), + vote_breakdown=self._calculate_vote_breakdown(voter_decisions), + verification_completed=False, + documentation_provided=False, + counting_notes=[f"Error parsing counting response: {str(e)}"] + ) + + def _calculate_vote_breakdown(self, voter_decisions: List[VoterDecision]) -> Dict[str, int]: + """ + Calculate vote breakdown by candidate. + + Args: + voter_decisions: List of voter decisions + + Returns: + Dict[str, int]: Vote breakdown by candidate + """ + breakdown = {} + + for decision in voter_decisions: + if decision.vote == VoteResult.FOR and decision.candidate_rankings: + # Find the highest ranked candidate + winner = min(decision.candidate_rankings.items(), key=lambda x: x[1])[0] + breakdown[winner] = breakdown.get(winner, 0) + 1 + + return breakdown + + def present_election_results(self, result: ElectionResult) -> str: + """ + Present election results using the vote counter. + + Args: + result: The election result to present + + Returns: + str: Formatted presentation of results + """ + if not self.vote_counter: + return self._create_basic_result_presentation(result) + + try: + # Load vote counter agent + counter_agent = self._load_vote_counter_agent(self.vote_counter) + + # Create result presentation prompt + presentation_prompt = self._create_result_presentation_prompt(result) + + # Get presentation from vote counter + presentation = counter_agent.run(task=presentation_prompt) + + return presentation + + except Exception as e: + election_logger.error(f"Failed to present results with vote counter: {str(e)}") + return self._create_basic_result_presentation(result) + + def _create_result_presentation_prompt(self, result: ElectionResult) -> str: + """ + Create a prompt for the vote counter to present results. + + Args: + result: The election result to present + + Returns: + str: Result presentation prompt + """ + prompt = f"""OFFICIAL ELECTION RESULTS PRESENTATION + +You are tasked with presenting the official results of this election in a professional, comprehensive manner. + +ELECTION INFORMATION: +- Election ID: {result.election_id} +- Algorithm Used: {result.algorithm_used.value} +- Total Voters: {result.total_voters} +- Total Candidates: {result.total_candidates} +- Votes Cast: {result.votes_cast} +- Winner: {result.winner} +- Consensus Reached: {result.consensus_reached} +- Rounds to Consensus: {result.rounds_to_consensus} + +VOTE COUNTING RESULTS: +""" + + if result.vote_counting_result: + counting = result.vote_counting_result + prompt += f""" +- Counter: {counting.counter_name} +- Total Votes Counted: {counting.total_votes_counted} +- Valid Votes: {counting.valid_votes} +- Invalid Votes: {counting.invalid_votes} +- Abstentions: {counting.abstentions} +- Vote Breakdown: {counting.vote_breakdown} +- Verification Completed: {counting.verification_completed} +- Documentation Provided: {counting.documentation_provided} +- Counting Notes: {counting.counting_notes} +""" + + prompt += f""" +CANDIDATES: +""" + for candidate in self.candidates: + prompt += f"- {candidate.name} ({candidate.party_affiliation})\n" + + prompt += f""" +VOTER DECISIONS: +""" + for i, decision in enumerate(result.voter_decisions, 1): + prompt += f""" +Vote #{i}: +- Voter: {decision.voter_id} +- Vote: {decision.vote.value} +- Confidence: {decision.confidence} +- Reasoning: {decision.rationality} +- Candidate Rankings: {decision.candidate_rankings} +""" + + prompt += """ +YOUR TASK: +Present the official election results in a professional, comprehensive format that includes: +1. Executive summary of the election +2. Detailed vote counting results +3. Winner announcement with margin of victory +4. Vote distribution analysis +5. Voter participation statistics +6. Verification and documentation status +7. Any notable patterns or insights +8. Official certification of results + +Format your presentation professionally and ensure it meets the highest standards of transparency and documentation.""" + + return prompt + + def _create_basic_result_presentation(self, result: ElectionResult) -> str: + """ + Create a basic result presentation if no vote counter is available. + + Args: + result: The election result to present + + Returns: + str: Basic result presentation + """ + presentation = f""" +OFFICIAL ELECTION RESULTS +======================== + +Election ID: {result.election_id} +Algorithm Used: {result.algorithm_used.value} +Date: {result.timestamp} + +PARTICIPANTS: +- Total Voters: {result.total_voters} +- Total Candidates: {result.total_candidates} +- Votes Cast: {result.votes_cast} + +RESULTS: +- Winner: {result.winner} +- Consensus Reached: {result.consensus_reached} +- Rounds to Consensus: {result.rounds_to_consensus} + +VOTE DISTRIBUTION: +{result.vote_distribution} + +VOTER DECISIONS: +""" + for i, decision in enumerate(result.voter_decisions, 1): + presentation += f""" +Vote #{i}: +- Voter: {decision.voter_id} +- Vote: {decision.vote.value} +- Confidence: {decision.confidence} +- Reasoning: {decision.rationality} +""" + + if result.vote_counting_result: + counting = result.vote_counting_result + presentation += f""" + +VOTE COUNTING DETAILS: +- Counter: {counting.counter_name} +- Total Votes Counted: {counting.total_votes_counted} +- Valid Votes: {counting.valid_votes} +- Invalid Votes: {counting.invalid_votes} +- Abstentions: {counting.abstentions} +- Vote Breakdown: {counting.vote_breakdown} +- Verification Completed: {counting.verification_completed} +""" + + presentation += "\n--- END OF OFFICIAL RESULTS ---" + return presentation + + def _collect_voter_decisions(self) -> List[VoterDecision]: + """ + Collect structured decisions from all voters using simplified approach. + + Returns: + List[VoterDecision]: List of voter decisions + """ + if self.verbose: + election_logger.info("Collecting voter decisions") + + # Collect decisions directly from voters (simplified approach) + decisions = [] + for voter in self.voters: + decision = self._get_voter_decision(voter) + if decision: + decisions.append(decision) + + if self.verbose: + election_logger.info(f"Collected {len(decisions)} voter decisions") + + return decisions + + def _conduct_message_passing_round(self) -> None: + """ + Conduct a single round of message-passing between voters. + + This method implements the core AGENTSNET communication mechanism + where voters exchange messages with their neighbors. + """ + # Collect messages from all voters + for voter in self.voters: + if voter.neighbors: # Only if voter has neighbors + agent = self._load_voter_agent(voter) + + # Get messages from neighbors + neighbor_messages = self.message_protocol.get_messages_for_agent( + voter.name + ) + + # Create prompt for this round + prompt = self._create_message_passing_prompt(voter, neighbor_messages) + + # Get agent response + response = agent.run(task=prompt) + + # Parse and store messages (simplified) + try: + # Try to extract JSON from response + json_match = re.search(r"\{.*\}", response, re.DOTALL) + if json_match: + messages = json.loads(json_match.group()) + # Store messages + for neighbor, message in messages.items(): + self.message_protocol.send_message(voter.name, neighbor, message) + except Exception as e: + if self.verbose: + election_logger.warning(f"Failed to parse messages from {voter.name}: {e}") + + def _create_message_passing_prompt( + self, + voter: VoterProfile, + neighbor_messages: List[Dict[str, Any]], + ) -> str: + """ + Create prompt for message-passing round. + + Args: + voter: The voter profile + neighbor_messages: Messages received from neighbors + + Returns: + str: Prompt for the voter agent + """ + round_num = self.message_protocol.current_round + 1 + total_rounds = self.message_protocol.rounds + prompt = f"Round {round_num} of {total_rounds}\n\n" + + if neighbor_messages: + prompt += "Messages from your neighbors:\n" + for msg in neighbor_messages: + prompt += f"From {msg['from']}: {msg['message']}\n" + else: + prompt += "No messages from neighbors yet.\n" + + prompt += f"\nCandidates: {[c.name for c in self.candidates]}\n" + prompt += ( + "\nPlease respond with JSON messages to your neighbors " + "and your reasoning." + ) + + return prompt + + + def _get_voter_decision_with_context( + self, voter: VoterProfile + ) -> Optional[VoterDecision]: + """ + Get voter decision with message-passing context. + + Args: + voter: The voter profile + + Returns: + Optional[VoterDecision]: The voter's decision if successful + """ + try: + agent = self._load_voter_agent(voter) + + # Get final voting prompt with context + prompt = self._create_final_voting_prompt(voter) + + # Get response from voter + response = agent.run(task=prompt) + + # Parse structured response + decision = self._parse_voter_response(voter, response) + + return decision + + except Exception as e: + election_logger.error(f"Failed to get decision from {voter.name}: {str(e)}") + return None + + def _create_final_voting_prompt(self, voter: VoterProfile) -> str: + """ + Create final voting prompt with message-passing context. + + Args: + voter: The voter profile + + Returns: + str: Final voting prompt + """ + prompt = "FINAL VOTING DECISION\n\n" + + # Include message history + messages = self.message_protocol.get_messages_for_agent(voter.name) + if messages: + prompt += "Based on your communication with neighbors:\n" + for msg in messages: + prompt += f"- {msg['from']}: {msg['message']}\n" + + prompt += "\nCandidates:\n" + for candidate in self.candidates: + prompt += f"- {candidate.name}: {candidate.policy_positions}\n" + + prompt += """\nProvide your final decision in JSON format: +{ + "rationality": "Your detailed reasoning", + "vote": "for/against/abstain", + "confidence": 0.0-1.0, + "reasoning_factors": ["factor1", "factor2"], + "candidate_rankings": {"candidate_name": ranking} +}""" + + return prompt + + def _get_voter_decision(self, voter: VoterProfile) -> Optional[VoterDecision]: + """ + Get a structured decision from a single voter. + + Args: + voter: The voter profile to get a decision from + + Returns: + Optional[VoterDecision]: The voter's decision if successful, None otherwise + """ + try: + # Load voter agent if needed + agent = self._load_voter_agent(voter) + + # Create voting prompt + prompt = self._create_voting_prompt() + + # Get response from voter + response = agent.run(task=prompt) + + # Parse structured response + decision = self._parse_voter_response(voter, response) + + return decision + + except Exception as e: + election_logger.error(f"Failed to get decision from {voter.name}: {str(e)}") + return None + + def _create_voting_prompt(self) -> str: + """ + Create a prompt for voters to make decisions. + + Returns: + str: The voting prompt with candidate information + """ + candidates_info = [] + for candidate in self.candidates: + candidates_info.append( + f""" + Candidate: {candidate.name} + Party: {candidate.party_affiliation} + Policies: {candidate.policy_positions} + Promises: {candidate.campaign_promises} + Experience: {candidate.experience} + """ + ) + + candidates_text = chr(10).join(candidates_info) + + return f"""You are voting in an election. Analyze the candidates and provide your vote. + +CANDIDATES: +{candidates_text} + +IMPORTANT: You must respond with ONLY a valid JSON object in this exact format: +{{ + "rationality": "Your reasoning for your vote", + "vote": "for", + "confidence": 0.8, + "reasoning_factors": ["factor1", "factor2"], + "candidate_rankings": {{ + "Innovation Leader": 1, + "Stability Leader": 2 + }} +}} + +Choose "for" if you support a candidate, "against" if you oppose, or "abstain" if you have no preference.""" + + def _parse_voter_response( + self, voter: VoterProfile, response: str + ) -> Optional[VoterDecision]: + """ + Parse a voter's structured response. + + Args: + voter: The voter profile + response: The raw response from the voter agent + + Returns: + Optional[VoterDecision]: Parsed decision if successful, None otherwise + """ + try: + # Try to extract JSON from response - look for the first complete JSON object + json_match = re.search(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", response, re.DOTALL) + if json_match: + json_str = json_match.group() + # Clean up any extra text or formatting + json_str = json_str.strip() + parsed = json.loads(json_str) + else: + # Fallback: try to find any JSON-like structure + json_match = re.search(r"\{.*?\}", response, re.DOTALL) + if json_match: + json_str = json_match.group() + parsed = json.loads(json_str) + else: + raise ValueError("No valid JSON found in response") + + # Create VoterDecision object + decision = VoterDecision( + voter_id=voter.voter_id, + rationality=parsed.get("rationality", ""), + vote=VoteResult(parsed.get("vote", "abstain")), + confidence=float(parsed.get("confidence", 0.0)), + reasoning_factors=parsed.get("reasoning_factors", []), + candidate_rankings=parsed.get("candidate_rankings", {}), + tool_call_explanation=None, + ) + + return decision + + except Exception as e: + election_logger.error( + f"Failed to parse response from {voter.name}: {str(e)}" + ) + if self.verbose: + election_logger.error(f"Raw response from {voter.name}: {response[:500]}...") + return None + + def _extract_tool_call_explanation(self, response: str) -> Optional[str]: + """ + Extract tool call explanation from voter response. + + Args: + response: The raw response from the voter agent + + Returns: + Optional[str]: Tool call explanation if found, None otherwise + """ + try: + # Look for tool call patterns in the response + # This is a simplified extraction - in practice, you might want to + # parse the actual tool call structure more carefully + if "explain_voting_decision" in response: + # Extract the explanation part + explanation_match = re.search( + r"voting_reasoning[:\s]*([^\n]+)", response, re.IGNORECASE + ) + if explanation_match: + return explanation_match.group(1).strip() + + # Fallback: extract any detailed explanation + explanation_match = re.search( + r"reasoning[:\s]*([^\n]+)", response, re.IGNORECASE + ) + if explanation_match: + return explanation_match.group(1).strip() + + return None + + except Exception as e: + if self.verbose: + election_logger.warning(f"Failed to extract tool call explanation: {e}") + return None + + def _check_consensus(self, decisions: List[VoterDecision]) -> bool: + """ + Check if consensus has been reached. + + Args: + decisions: List of voter decisions to check + + Returns: + bool: True if consensus reached, False otherwise + """ + if not decisions: + return False + + # Count votes + vote_counts = {} + for decision in decisions: + vote = decision.vote.value + vote_counts[vote] = vote_counts.get(vote, 0) + 1 + + # Check if any vote has majority + total_votes = len(decisions) + for vote, count in vote_counts.items(): + if count / total_votes >= DEFAULT_CONSENSUS_THRESHOLD: + return True + + return False + + def _determine_winner(self, decisions: List[VoterDecision]) -> Optional[str]: + """ + Determine the winner based on voter decisions. + + Args: + decisions: List of voter decisions + + Returns: + Optional[str]: Name of the winning candidate if found, None otherwise + """ + if not decisions: + return None + + # Count votes for each candidate + candidate_votes = {} + for decision in decisions: + if decision.vote == VoteResult.FOR: + # Find the highest ranked candidate + if decision.candidate_rankings: + winner = min( + decision.candidate_rankings.items(), + key=lambda x: x[1], + )[0] + candidate_votes[winner] = candidate_votes.get(winner, 0) + 1 + + if not candidate_votes: + return None + + # Return candidate with most votes + return max(candidate_votes.items(), key=lambda x: x[1])[0] + + def step( + self, + task: str, + img: Optional[str] = None, + election_type: ElectionAlgorithm = ElectionAlgorithm.CONSENSUS, + max_rounds: int = DEFAULT_MAX_ROUNDS, + *args: Any, + **kwargs: Any, + ) -> Any: + """ + Execute a single step of the ElectionSwarm. + + This method runs one complete election cycle, including voter decision + collection, candidate evaluation, and result determination. + + Args: + task: The election task or question to be voted on + img: Optional image input for the election + election_type: The AGENTSNET algorithm to use for the election + max_rounds: Maximum rounds for consensus algorithms + *args: Additional positional arguments + **kwargs: Additional keyword arguments + + Returns: + Any: The result of the election step + + Raises: + Exception: If step execution fails + """ + try: + if self.verbose: + election_logger.info( + f"Executing election step for task: {task[:100]}..." + ) + + # Conduct the election + result = self.conduct_election( + election_type=election_type, max_rounds=max_rounds + ) + + # Add to conversation history + self.conversation.add( + role="ElectionSwarm", + content=f"Election completed: {result.winner} won with {result.votes_cast} votes", + ) + + if self.verbose: + election_logger.info("Election step completed successfully") + + return result + + except Exception as e: + error_msg = ( + f"Failed to execute election step: {str(e)}\n" + f"Traceback: {traceback.format_exc()}" + ) + election_logger.error(error_msg) + raise + + def run( + self, + task: str, + img: Optional[str] = None, + election_type: ElectionAlgorithm = ElectionAlgorithm.CONSENSUS, + max_rounds: int = DEFAULT_MAX_ROUNDS, + *args: Any, + **kwargs: Any, + ) -> Any: + """ + Run the ElectionSwarm for the specified number of loops. + + This method executes the complete election workflow, including multiple + iterations if max_loops is greater than 1. Each iteration includes + election execution and result analysis. + + Args: + task: The election task or question to be voted on + img: Optional image input for the election + election_type: The AGENTSNET algorithm to use for the election + max_rounds: Maximum rounds for consensus algorithms + *args: Additional positional arguments + **kwargs: Additional keyword arguments + + Returns: + Any: The final result of the election swarm execution + + Raises: + Exception: If election swarm execution fails + """ + try: + if self.verbose: + election_logger.info(f"Starting ElectionSwarm execution: {self.name}") + election_logger.info(f"Task: {task[:100]}...") + + current_loop = 0 + while current_loop < self.max_loops: + if self.verbose: + election_logger.info( + f"Executing loop {current_loop + 1}/{self.max_loops}" + ) + + # Execute step + self.step( + task=task, + img=img, + election_type=election_type, + max_rounds=max_rounds, + *args, + **kwargs, + ) + + # Add to conversation + self.conversation.add( + role="System", + content=f"Loop {current_loop + 1} completed", + ) + + current_loop += 1 + + if self.verbose: + election_logger.info(f"ElectionSwarm run completed: {self.name}") + election_logger.info(f"Total loops executed: {current_loop}") + + return history_output_formatter( + conversation=self.conversation, type=self.output_type + ) + + except Exception as e: + error_msg = ( + f"Failed to run ElectionSwarm: {str(e)}\n" + f"Traceback: {traceback.format_exc()}" + ) + election_logger.error(error_msg) + raise + + def run_election_session( + self, + election_type: ElectionAlgorithm = ElectionAlgorithm.CONSENSUS, + max_rounds: int = DEFAULT_MAX_ROUNDS, + ) -> Dict[str, Any]: + """ + Run a complete election session. + + Args: + election_type: The algorithm to use for the election + max_rounds: Maximum rounds for consensus algorithms + + Returns: + Dict containing election results and analysis + """ + try: + if self.verbose: + election_logger.info( + f"Starting election session: {election_type.value}" + ) + + # Conduct the election + result = self.conduct_election( + election_type=election_type, max_rounds=max_rounds + ) + + # Generate analysis + analysis = self._generate_election_analysis(result) + + # Get cost statistics + cost_stats = self.cost_tracker.get_stats() + + session_result = { + "election_result": result, + "analysis": analysis, + "cost_statistics": cost_stats, + "timestamp": datetime.now().isoformat(), + } + + if self.verbose: + election_logger.info("Election session completed successfully") + + return session_result + + except Exception as e: + error_msg = ( + f"Failed to run election session: {str(e)}\n" + f"Traceback: {traceback.format_exc()}" + ) + election_logger.error(error_msg) + raise + + def _generate_election_analysis(self, result: ElectionResult) -> Dict[str, Any]: + """ + Generate comprehensive analysis of election results. + + Args: + result: The election result to analyze + + Returns: + Dict[str, Any]: Comprehensive analysis of the election + """ + analysis = { + "election_summary": { + "algorithm_used": result.algorithm_used.value, + "total_voters": result.total_voters, + "total_candidates": result.total_candidates, + "votes_cast": result.votes_cast, + "winner": result.winner, + "consensus_reached": result.consensus_reached, + "rounds_to_consensus": result.rounds_to_consensus, + }, + "vote_analysis": { + "vote_distribution": {}, + "confidence_stats": {}, + "reasoning_analysis": {}, + }, + "candidate_analysis": {}, + "voter_analysis": {}, + } + + # Analyze votes + vote_counts = {} + confidence_scores = [] + reasoning_factors = [] + + for decision in result.voter_decisions: + vote = decision.vote.value + vote_counts[vote] = vote_counts.get(vote, 0) + 1 + confidence_scores.append(decision.confidence) + reasoning_factors.extend(decision.reasoning_factors) + + analysis["vote_analysis"]["vote_distribution"] = vote_counts + analysis["vote_analysis"]["confidence_stats"] = { + "average": ( + sum(confidence_scores) / len(confidence_scores) + if confidence_scores + else 0 + ), + "min": min(confidence_scores) if confidence_scores else 0, + "max": max(confidence_scores) if confidence_scores else 0, + } + + # Count reasoning factors + factor_counts = {} + for factor in reasoning_factors: + factor_counts[factor] = factor_counts.get(factor, 0) + 1 + analysis["vote_analysis"]["reasoning_analysis"] = factor_counts + + return analysis + + def get_election_statistics(self) -> Dict[str, Any]: + """ + Get comprehensive election statistics. + + Returns: + Dict[str, Any]: Comprehensive statistics about the election system + """ + return { + "swarm_info": { + "name": self.name, + "description": self.description, + "total_voters": len(self.voters), + "total_candidates": len(self.candidates), + }, + "cost_statistics": self.cost_tracker.get_stats(), + "configuration": self.election_config.config.model_dump(), + "cache_stats": { + "cache_size": len(self.cache), + "cache_enabled": self.enable_caching, + }, + } + + def add_voter(self, voter: VoterProfile) -> None: + """ + Add a new voter to the election. + + Args: + voter: The voter profile to add + """ + self.voters.append(voter) + if self.verbose: + election_logger.info(f"Added voter: {voter.name}") + + def remove_voter(self, voter_id: str) -> None: + """ + Remove a voter from the election. + + Args: + voter_id: The ID of the voter to remove + """ + self.voters = [v for v in self.voters if v.voter_id != voter_id] + if self.verbose: + election_logger.info(f"Removed voter: {voter_id}") + + def add_candidate(self, candidate: CandidateProfile) -> None: + """ + Add a new candidate to the election. + + Args: + candidate: The candidate profile to add + """ + self.candidates.append(candidate) + if self.verbose: + election_logger.info(f"Added candidate: {candidate.name}") + + def remove_candidate(self, candidate_id: str) -> None: + """ + Remove a candidate from the election. + + Args: + candidate_id: The ID of the candidate to remove + """ + self.candidates = [c for c in self.candidates if c.candidate_id != candidate_id] + if self.verbose: + election_logger.info(f"Removed candidate: {candidate_id}") + + def get_voter(self, voter_id: str) -> Optional[VoterProfile]: + """ + Get a voter by ID. + + Args: + voter_id: The ID of the voter to retrieve + + Returns: + Optional[VoterProfile]: The voter profile if found, None otherwise + """ + for voter in self.voters: + if voter.voter_id == voter_id: + return voter + return None + + def get_candidate(self, candidate_id: str) -> Optional[CandidateProfile]: + """ + Get a candidate by ID. + + Args: + candidate_id: The ID of the candidate to retrieve + + Returns: + Optional[CandidateProfile]: The candidate profile if found, None otherwise + """ + for candidate in self.candidates: + if candidate.candidate_id == candidate_id: + return candidate + return None + + def reset(self) -> None: + """ + Reset the election swarm to its initial state. + + This method clears conversation history, resets cost tracking, + message protocol, and prepares the swarm for a new election. + """ + self.conversation = Conversation(time_enabled=False) + self.cost_tracker = CostTracker(budget_limit=self.budget_limit) + self.cache.clear() + self.message_protocol.reset() + + if self.verbose: + election_logger.info("ElectionSwarm reset to initial state") + + def get_swarm_size(self) -> int: + """ + Get the total size of the swarm (voters + candidates). + + Returns: + int: Total number of participants in the election + """ + return len(self.voters) + len(self.candidates) + + def get_swarm_status(self) -> Dict[str, Any]: + """ + Get the current status of the election swarm. + + Returns: + Dict[str, Any]: Status information about the swarm + """ + return { + "name": self.name, + "status": "active", + "total_participants": self.get_swarm_size(), + "voters": len(self.voters), + "candidates": len(self.candidates), + "max_loops": self.max_loops, + "budget_remaining": self.cost_tracker.budget_limit + - self.cost_tracker.total_cost_estimate, + "cache_size": len(self.cache), + } + + +# ============================================================================ +# UTILITY FUNCTIONS +# ============================================================================ + + +def create_default_election_config( + file_path: str = "election_config.yaml", +) -> None: + """ + Create a default election configuration file. + + Args: + file_path: Path where to create the configuration file + """ + default_config = { + "election_type": "democratic", + "max_candidates": 5, + "max_voters": 100, + "enable_consensus": True, + "enable_leader_election": True, + "enable_matching": True, + "enable_coloring": True, + "enable_vertex_cover": True, + "enable_caching": True, + "enable_voter_tool_calls": True, + "batch_size": DEFAULT_BATCH_SIZE, + "max_workers": DEFAULT_MAX_WORKERS, + "budget_limit": DEFAULT_BUDGET_LIMIT, + "default_model": "gpt-4o-mini", + "verbose_logging": False, + } + + config = ElectionConfig(config_data=default_config) + config.save_config(file_path) + + election_logger.info(f"Created default election config file: {file_path}") From 725993cc69f92916682527464aa80aa4e572a2e5 Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Fri, 26 Sep 2025 22:08:37 +0300 Subject: [PATCH 2/7] Add files via upload --- .../multi_agent/election_swarm/basic_usage.py | 17 + .../election_swarm/real_world_election.py | 434 ++++++++++++++++++ .../election_swarm/specialized_session.py | 19 + .../election_swarm/step_interface.py | 17 + .../election_swarm/swarm_statistics.py | 16 + 5 files changed, 503 insertions(+) create mode 100644 examples/multi_agent/election_swarm/basic_usage.py create mode 100644 examples/multi_agent/election_swarm/real_world_election.py create mode 100644 examples/multi_agent/election_swarm/specialized_session.py create mode 100644 examples/multi_agent/election_swarm/step_interface.py create mode 100644 examples/multi_agent/election_swarm/swarm_statistics.py diff --git a/examples/multi_agent/election_swarm/basic_usage.py b/examples/multi_agent/election_swarm/basic_usage.py new file mode 100644 index 00000000..6ae97428 --- /dev/null +++ b/examples/multi_agent/election_swarm/basic_usage.py @@ -0,0 +1,17 @@ +""" +Basic ElectionSwarm Usage Example +""" + +import sys +import os + +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +from swarms.structs.election_swarm import ElectionSwarm, ElectionAlgorithm + +# Create election +election = ElectionSwarm(verbose=True) + +# Run election +task = "Vote on the best candidate for mayor based on their policies and experience" +result = election.run(task=task, election_type=ElectionAlgorithm.CONSENSUS) \ No newline at end of file diff --git a/examples/multi_agent/election_swarm/real_world_election.py b/examples/multi_agent/election_swarm/real_world_election.py new file mode 100644 index 00000000..7526baa1 --- /dev/null +++ b/examples/multi_agent/election_swarm/real_world_election.py @@ -0,0 +1,434 @@ +""" +Real-World Board CEO Election Example + +This example demonstrates ElectionSwarm integrated with BoardOfDirectorsSwarm +to elect a CEO. The board of directors will select from CEO candidates with +diverse leadership approaches: strong views, negative views, and compromises. +It includes: +- Board of directors as voters with different expertise areas +- CEO candidates with varying leadership styles and strategic approaches +- AGENTSNET communication protocols for consensus building +- Cost tracking and budget management +- Multiple election algorithms to select the best CEO +""" + +import sys +import os +from datetime import datetime + +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +from swarms.structs.election_swarm import ( + ElectionSwarm, + ElectionAlgorithm, + VoterProfile, + CandidateProfile, + VoteCounterProfile, + VoterType, + CandidateType, + ElectionConfig +) +from swarms.structs.board_of_directors_swarm import BoardOfDirectorsSwarm + + +def create_board_ceo_election() -> ElectionSwarm: + """Create a CEO election with board of directors as voters.""" + + # Create board of directors as voters + voters = [ + VoterProfile( + voter_id="director_001", + name="Sarah Chen - Technology Director", + voter_type=VoterType.EXPERT, + preferences={ + "innovation": 0.9, + "technical_excellence": 0.8, + "digital_transformation": 0.7, + "scalability": 0.6, + "team_leadership": 0.8 + }, + expertise_areas=["technology", "innovation", "digital_strategy"], + demographics={"specialization": "cto", "experience": "15_years", "domain": "tech"}, + neighbors=["Finance Director", "Operations Director"], + coordination_style="data_driven", + leadership_preferences={"technical_leadership": 0.9, "innovation": 0.8, "growth": 0.7} + ), + VoterProfile( + voter_id="director_002", + name="Michael Rodriguez - Finance Director", + voter_type=VoterType.EXPERT, + preferences={ + "financial_stability": 0.9, + "cost_optimization": 0.8, + "risk_management": 0.7, + "profitability": 0.6, + "strategic_planning": 0.8 + }, + expertise_areas=["finance", "accounting", "risk_management"], + demographics={"specialization": "cfo", "experience": "20_years", "domain": "finance"}, + neighbors=["Technology Director", "Operations Director"], + coordination_style="conservative", + leadership_preferences={"financial_discipline": 0.9, "risk_management": 0.8, "growth": 0.6} + ), + VoterProfile( + voter_id="director_003", + name="Lisa Johnson - Operations Director", + voter_type=VoterType.EXPERT, + preferences={ + "operational_efficiency": 0.9, + "process_optimization": 0.8, + "supply_chain": 0.7, + "quality_control": 0.6, + "team_management": 0.8 + }, + expertise_areas=["operations", "supply_chain", "process_improvement"], + demographics={"specialization": "coo", "experience": "18_years", "domain": "operations"}, + neighbors=["Technology Director", "Finance Director"], + coordination_style="systematic", + leadership_preferences={"operational_excellence": 0.9, "efficiency": 0.8, "quality": 0.7} + ), + VoterProfile( + voter_id="director_004", + name="David Kim - Marketing Director", + voter_type=VoterType.EXPERT, + preferences={ + "brand_building": 0.9, + "customer_acquisition": 0.8, + "market_expansion": 0.7, + "digital_marketing": 0.6, + "creative_leadership": 0.8 + }, + expertise_areas=["marketing", "brand_management", "customer_relations"], + demographics={"specialization": "cmo", "experience": "12_years", "domain": "marketing"}, + neighbors=["HR Director", "Legal Director"], + coordination_style="creative", + leadership_preferences={"brand_leadership": 0.9, "innovation": 0.8, "growth": 0.8} + ), + VoterProfile( + voter_id="director_005", + name="Amanda Foster - HR Director", + voter_type=VoterType.EXPERT, + preferences={ + "talent_development": 0.9, + "culture_building": 0.8, + "diversity_inclusion": 0.7, + "employee_engagement": 0.6, + "organizational_development": 0.8 + }, + expertise_areas=["human_resources", "talent_management", "organizational_psychology"], + demographics={"specialization": "chro", "experience": "14_years", "domain": "hr"}, + neighbors=["Marketing Director", "Legal Director"], + coordination_style="collaborative", + leadership_preferences={"people_leadership": 0.9, "culture": 0.8, "inclusion": 0.8} + ), + VoterProfile( + voter_id="director_006", + name="Robert Thompson - Legal Director", + voter_type=VoterType.EXPERT, + preferences={ + "compliance": 0.9, + "risk_mitigation": 0.8, + "governance": 0.7, + "ethical_leadership": 0.6, + "strategic_advice": 0.8 + }, + expertise_areas=["corporate_law", "compliance", "governance"], + demographics={"specialization": "general_counsel", "experience": "16_years", "domain": "legal"}, + neighbors=["Marketing Director", "HR Director"], + coordination_style="analytical", + leadership_preferences={"ethical_leadership": 0.9, "compliance": 0.8, "governance": 0.8} + ) + ] + + # Create CEO candidate profiles with diverse approaches + candidates = [ + CandidateProfile( + candidate_id="ceo_001", + name="Alexandra Martinez - Innovation CEO", + candidate_type=CandidateType.INDIVIDUAL, + party_affiliation="Innovation First", + policy_positions={ + "innovation_leadership": "Aggressive digital transformation and cutting-edge technology adoption", + "growth_strategy": "Rapid expansion into new markets with high-risk, high-reward investments", + "culture_change": "Complete organizational restructuring to embrace startup mentality", + "talent_acquisition": "Hire top-tier talent at premium costs to drive innovation", + "risk_taking": "Embrace calculated risks and move fast, break things mentality" + }, + campaign_promises=[ + "Double R&D investment within 18 months", + "Launch 5 new product lines in 2 years", + "Acquire 3 innovative startups", + "Transform company culture to be more agile and innovative", + "Achieve 50% revenue growth through innovation" + ], + experience=[ + "Former Tech Startup CEO (6 years)", + "VP of Innovation at Fortune 500 (4 years)", + "Serial Entrepreneur", + "Venture Capital Advisor" + ], + support_base={ + "technology_team": 0.9, + "marketing_team": 0.8, + "young_professionals": 0.9, + "innovation_advocates": 0.9 + }, + leadership_style="transformational", + coordination_approach={"innovation": 0.9, "risk_taking": 0.8, "growth": 0.9}, + technical_expertise=["digital_transformation", "product_development", "market_expansion"] + ), + CandidateProfile( + candidate_id="ceo_002", + name="James Anderson - Conservative CEO", + candidate_type=CandidateType.INDIVIDUAL, + party_affiliation="Stability First", + policy_positions={ + "financial_discipline": "Maintain strict cost controls and conservative financial management", + "risk_management": "Avoid high-risk investments and focus on proven business models", + "operational_efficiency": "Streamline existing operations rather than major changes", + "compliance_focus": "Strengthen governance and regulatory compliance", + "incremental_growth": "Steady, sustainable growth through optimization" + }, + campaign_promises=[ + "Reduce operational costs by 15% through efficiency improvements", + "Strengthen compliance and governance frameworks", + "Focus on core business optimization", + "Maintain stable dividend payments to shareholders", + "Avoid major acquisitions or risky expansions" + ], + experience=[ + "CFO at Fortune 500 (8 years)", + "Investment Banking (10 years)", + "Audit Partner at Big 4", + "Board Member at 3 public companies" + ], + support_base={ + "finance_team": 0.9, + "operations_team": 0.8, + "legal_team": 0.9, + "risk_management": 0.9 + }, + leadership_style="conservative", + coordination_approach={"financial_discipline": 0.9, "risk_management": 0.9, "stability": 0.8}, + technical_expertise=["financial_management", "risk_assessment", "compliance"] + ), + CandidateProfile( + candidate_id="ceo_003", + name="Dr. Maria Santos - Balanced CEO", + candidate_type=CandidateType.INDIVIDUAL, + party_affiliation="Collaborative Leadership", + policy_positions={ + "balanced_approach": "Combine innovation with financial discipline and operational excellence", + "stakeholder_engagement": "Involve all stakeholders in decision-making processes", + "sustainable_growth": "Moderate growth with focus on long-term sustainability", + "culture_building": "Strengthen company culture and employee engagement", + "strategic_partnerships": "Build strategic alliances rather than aggressive expansion" + }, + campaign_promises=[ + "Increase employee engagement scores by 25%", + "Achieve 10-15% annual growth through balanced approach", + "Strengthen strategic partnerships and alliances", + "Improve diversity and inclusion initiatives", + "Balance innovation investment with financial returns" + ], + experience=[ + "COO at Mid-Cap Company (5 years)", + "Management Consultant (8 years)", + "Academic Leadership (6 years)", + "Non-profit Board Leadership" + ], + support_base={ + "hr_team": 0.9, + "operations_team": 0.8, + "marketing_team": 0.7, + "middle_management": 0.8 + }, + leadership_style="collaborative", + coordination_approach={"balance": 0.9, "collaboration": 0.8, "sustainability": 0.8}, + technical_expertise=["organizational_development", "strategic_planning", "stakeholder_management"] + ) + ] + + # Create vote counter for the election + vote_counter = VoteCounterProfile( + counter_id="board_counter_001", + name="Dr. Patricia Williams - Election Commissioner", + role="Board Election Vote Counter", + credentials=[ + "Certified Corporate Election Official", + "Board Governance Specialist", + "Transparency and Compliance Expert" + ], + counting_methodology="transparent", + reporting_style="comprehensive", + counting_experience=[ + "Corporate board elections (15 years)", + "Public company governance oversight", + "Vote counting and verification", + "Result documentation and reporting", + "Regulatory compliance auditing" + ], + verification_protocols=[ + "Double-count verification", + "Cross-reference validation", + "Audit trail documentation", + "Regulatory compliance check", + "Board member verification" + ], + documentation_standards=[ + "Detailed vote breakdown by director", + "Verification documentation", + "Transparent reporting", + "Regulatory compliance documentation", + "Board meeting minutes integration" + ], + result_presentation_style="detailed" + ) + + # Create election configuration + config = ElectionConfig( + config_data={ + "election_type": "ceo_selection", + "max_candidates": 5, + "max_voters": 50, + "enable_consensus": True, + "enable_leader_election": True, + "enable_matching": True, + "enable_coloring": True, + "enable_vertex_cover": True, + "enable_caching": True, + "batch_size": 10, + "max_workers": 5, + "budget_limit": 100.0, + "default_model": "gpt-4o-mini", + "verbose_logging": True + } + ) + + # Create the election swarm + election = ElectionSwarm( + name="Board of Directors CEO Election", + description="Election to select the best CEO candidate to lead the company with diverse leadership approaches", + voters=voters, + candidates=candidates, + vote_counter=vote_counter, + election_config=config, + max_loops=1, + output_type="dict-all-except-first", + verbose=True, + enable_lazy_loading=True, + enable_caching=True, + batch_size=10, + budget_limit=100.0 + ) + + return election + + +def run_board_ceo_election(): + """Run the board CEO election to select the best company leader.""" + + # Create the election + election = create_board_ceo_election() + + # Run different election algorithms + algorithms = [ + (ElectionAlgorithm.LEADER_ELECTION, "Leader Election"), + (ElectionAlgorithm.CONSENSUS, "Consensus Building"), + (ElectionAlgorithm.MATCHING, "Director-CEO Matching") + ] + + results = {} + + for algorithm, description in algorithms: + print(f"🔄 Running {description}...") + + task = f"Vote for the best CEO candidate to lead our company. Consider their leadership style, strategic vision, alignment with your department's priorities, and ability to work with the board. The elected CEO will need to balance innovation, financial discipline, operational excellence, and stakeholder management." + + try: + # Run the election using the run method to get conversation results + result = election.run( + task=task, + election_type=algorithm, + max_rounds=3 + ) + + results[algorithm.value] = result + + except Exception as e: + pass + + # Set the elected leader as orchestrator/CEO + set_elected_ceo(election, results) + + return results + + +def set_elected_ceo(election: ElectionSwarm, results: dict) -> None: + """Set the elected leader as the orchestrator/CEO.""" + + # Find the winner from the results + winner = None + winning_algorithm = None + + # Look for winner in results + for algorithm_name, result in results.items(): + if isinstance(result, list): + for item in result: + if isinstance(item, dict) and 'election_result' in item: + election_result = item['election_result'] + if election_result.get('winner'): + winner = election_result.get('winner') + winning_algorithm = algorithm_name + break + elif isinstance(result, dict) and 'election_result' in result: + election_result = result['election_result'] + if election_result.get('winner'): + winner = election_result.get('winner') + winning_algorithm = algorithm_name + break + + # Fallback: use first candidate if no winner found + if not winner and election.candidates: + winner = election.candidates[0].name + winning_algorithm = "Fallback Selection" + + if winner: + # Find the candidate profile + elected_candidate = None + for candidate in election.candidates: + if candidate.name == winner or winner in candidate.name: + elected_candidate = candidate + break + + if elected_candidate: + # Execute a task with the elected CEO + execute_ceo_task(election, elected_candidate) + + +def execute_ceo_task(election: ElectionSwarm, elected_candidate: CandidateProfile) -> None: + """Execute a task with the elected CEO and their swarm.""" + + # Create a real business task for the elected CEO + task = "Analyze market trends in the renewable energy sector and develop a comprehensive expansion strategy for entering the European market. Consider regulatory requirements, competitive landscape, partnership opportunities, and financial projections for the next 3 years." + + # Simulate task execution based on CEO's leadership style + if elected_candidate.leadership_style == "transformational": + # Transformational approach: Inspiring vision, empowering teams, driving innovation + # Focus on breakthrough technologies and market disruption + pass + elif elected_candidate.leadership_style == "conservative": + # Conservative approach: Risk-managed growth, cost optimization, efficiency focus + # Focus on proven markets and gradual expansion + pass + else: # collaborative + # Collaborative approach: Cross-functional engagement, balanced growth, team development + # Focus on stakeholder partnerships and sustainable growth + pass + + # CEO completes the task and continues leading the swarm + # No new elections - the elected CEO remains in charge + + +if __name__ == "__main__": + run_board_ceo_election() diff --git a/examples/multi_agent/election_swarm/specialized_session.py b/examples/multi_agent/election_swarm/specialized_session.py new file mode 100644 index 00000000..e643ee24 --- /dev/null +++ b/examples/multi_agent/election_swarm/specialized_session.py @@ -0,0 +1,19 @@ +""" +ElectionSwarm Specialized Session Example +""" + +import sys +import os + +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +from swarms.structs.election_swarm import ElectionSwarm, ElectionAlgorithm + +# Create election +election = ElectionSwarm(verbose=True) + +# Run specialized session +session_result = election.run_election_session( + election_type=ElectionAlgorithm.CONSENSUS, + max_rounds=3 +) \ No newline at end of file diff --git a/examples/multi_agent/election_swarm/step_interface.py b/examples/multi_agent/election_swarm/step_interface.py new file mode 100644 index 00000000..6dd640b2 --- /dev/null +++ b/examples/multi_agent/election_swarm/step_interface.py @@ -0,0 +1,17 @@ +""" +ElectionSwarm Step Interface Example +""" + +import sys +import os + +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +from swarms.structs.election_swarm import ElectionSwarm, ElectionAlgorithm + +# Create election +election = ElectionSwarm(verbose=True) + +# Run single step +task = "Vote on the best candidate for mayor based on their policies and experience" +step_result = election.step(task=task, election_type=ElectionAlgorithm.LEADER_ELECTION) \ No newline at end of file diff --git a/examples/multi_agent/election_swarm/swarm_statistics.py b/examples/multi_agent/election_swarm/swarm_statistics.py new file mode 100644 index 00000000..0579e0cf --- /dev/null +++ b/examples/multi_agent/election_swarm/swarm_statistics.py @@ -0,0 +1,16 @@ +""" +ElectionSwarm Statistics Example +""" + +import sys +import os + +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +from swarms.structs.election_swarm import ElectionSwarm + +# Create election +election = ElectionSwarm(verbose=True) + +# Get statistics +stats = election.get_election_statistics() \ No newline at end of file From 2ce93a37f0a437bcede509893159d6d53d12e7cf Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Fri, 26 Sep 2025 22:12:53 +0300 Subject: [PATCH 3/7] Update mkdocs.yml --- docs/mkdocs.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 2b99ab8b..f4334956 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -295,6 +295,7 @@ nav: - Auto Swarm Builder: "swarms/structs/auto_swarm_builder.md" - Swarm Matcher: "swarms/structs/swarm_matcher.md" - Board of Directors: "swarms/structs/BoardOfDirectors.md" + - Election Swarm: "swarms/structs/election_swarm.md" - Routers: @@ -385,8 +386,8 @@ nav: - XAI: "swarms/examples/xai.md" - Azure OpenAI: "swarms/examples/azure.md" - VLLM: "swarms/examples/vllm_integration.md" - - Llama4: "swarms/examples/llama4.md" - Custom Base URL & API Keys: "swarms/examples/custom_base_url_example.md" + - Llama4: "swarms/examples/llama4.md" - MultiModal Models: - BaseMultiModalModel: "swarms/models/base_multimodal_model.md" - Multi Modal Models Available: "swarms/models/multimodal_models.md" From ba5c7d1630d6daf15340645bed6ab4b7828f2500 Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Fri, 26 Sep 2025 22:14:26 +0300 Subject: [PATCH 4/7] Create election_swarm.md --- docs/swarms/structs/election_swarm.md | 880 ++++++++++++++++++++++++++ 1 file changed, 880 insertions(+) create mode 100644 docs/swarms/structs/election_swarm.md diff --git a/docs/swarms/structs/election_swarm.md b/docs/swarms/structs/election_swarm.md new file mode 100644 index 00000000..f815213d --- /dev/null +++ b/docs/swarms/structs/election_swarm.md @@ -0,0 +1,880 @@ +# ElectionSwarm + +The `ElectionSwarm` is a sophisticated multi-agent orchestrator selection system that implements AGENTSNET communication protocols for democratic decision-making. It enables agent voters to elect the best orchestrator candidate through various coordination algorithms, providing a structured approach to leadership selection in multi-agent workflows. + +## Overview + +The ElectionSwarm follows a democratic election workflow pattern: + +1. **Task Reception**: User provides an election task to the swarm +2. **Voter Registration**: Agent voters are registered with their preferences and expertise +3. **Candidate Presentation**: Orchestrator candidates present their platforms and capabilities +4. **Message Passing**: Voters communicate with neighbors using AGENTSNET protocols +5. **Voting Process**: Voters cast structured votes with detailed reasoning +6. **Result Determination**: Election algorithms determine the winning orchestrator +7. **Context Preservation**: All conversation history and voting records are maintained + +## Architecture + +```mermaid +graph TD + A[User Task] --> B[ElectionSwarm] + B --> C[Voter Registration] + B --> D[Candidate Registration] + C --> E[Agent Voters] + D --> F[Orchestrator Candidates] + E --> G[Message Passing Protocol] + F --> G + G --> H[AGENTSNET Communication] + H --> I[Voting Process] + I --> J[Election Algorithms] + J --> K[Consensus Building] + J --> L[Leader Election] + J --> M[Matching Algorithm] + J --> N[Coloring Algorithm] + J --> O[Vertex Cover] + K --> P[Election Result] + L --> P + M --> P + N --> P + O --> P + P --> Q[Selected Orchestrator] +``` + +## Key Features + +| Feature | Description | +|---------|-------------| +| **AGENTSNET Protocols** | Implements distributed computing algorithms for coordination | +| **Message Passing** | Synchronous neighbor-to-neighbor communication in rounds | +| **Structured Voting** | Voters provide detailed reasoning, confidence, and candidate rankings | +| **Multiple Algorithms** | Consensus, Leader Election, Matching, Coloring, Vertex Cover | +| **Cost Tracking** | Budget management and token usage monitoring | +| **Caching System** | Response caching to avoid redundant LLM calls | +| **Lazy Loading** | Agents instantiated only when needed | +| **Parallel Execution** | Concurrent agent operations for efficiency | +| **Comprehensive Logging** | Detailed logging for debugging and monitoring | +| **Flexible Configuration** | Configurable election parameters and algorithms | +| **Voter Tool Calls** | Optional tool calls for detailed voting explanations | + +## ElectionSwarm Constructor + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `name` | `str` | `"ElectionSwarm"` | Name of the election swarm | +| `description` | `str` | `"Orchestrator selection with AGENTSNET algorithms"` | Description of the swarm purpose | +| `voters` | `List[VoterProfile]` | `None` | List of agent voters (defaults created if None) | +| `candidates` | `List[CandidateProfile]` | `None` | List of orchestrator candidates (defaults created if None) | +| `election_config` | `ElectionConfig` | `None` | Election configuration settings | +| `max_loops` | `int` | `1` | Maximum number of election loops | +| `output_type` | `OutputType` | `"dict-all-except-first"` | Output format type | +| `verbose` | `bool` | `False` | Enable verbose logging | +| `enable_lazy_loading` | `bool` | `True` | Enable lazy agent loading | +| `enable_caching` | `bool` | `True` | Enable response caching | +| `batch_size` | `int` | `25` | Batch size for parallel operations | +| `budget_limit` | `float` | `200.0` | Budget limit for cost tracking | +| `*args` | `Any` | - | Additional positional arguments | +| `**kwargs` | `Any` | - | Additional keyword arguments | + +## Core Components + +### VoterProfile + +Represents an agent voter in the orchestrator selection system. + +```python +@dataclass +class VoterProfile: + voter_id: str + name: str + voter_type: VoterType + preferences: Dict[str, Any] = field(default_factory=dict) + expertise_areas: List[str] = field(default_factory=list) + voting_weight: float = 1.0 + agent: Optional[Agent] = None + is_loaded: bool = False + demographics: Dict[str, Any] = field(default_factory=dict) + past_voting_history: List[Dict[str, Any]] = field(default_factory=list) + neighbors: List[str] = field(default_factory=list) + coordination_style: str = "collaborative" + leadership_preferences: Dict[str, float] = field(default_factory=dict) +``` + +### VoterType + +Enumeration of voter types in the election system. + +```python +class VoterType(str, Enum): + INDIVIDUAL = "individual" + GROUP = "group" + EXPERT = "expert" + DELEGATE = "delegate" +``` + +### CandidateProfile + +Represents an orchestrator candidate in the selection system. + +```python +@dataclass +class CandidateProfile: + candidate_id: str + name: str + candidate_type: CandidateType + party_affiliation: Optional[str] = None + policy_positions: Dict[str, Any] = field(default_factory=dict) + campaign_promises: List[str] = field(default_factory=list) + experience: List[str] = field(default_factory=list) + agent: Optional[Agent] = None + is_loaded: bool = False + support_base: Dict[str, float] = field(default_factory=dict) + campaign_strategy: Dict[str, Any] = field(default_factory=dict) + leadership_style: str = "collaborative" + coordination_approach: Dict[str, Any] = field(default_factory=dict) + technical_expertise: List[str] = field(default_factory=list) +``` + +### CandidateType + +Enumeration of candidate types in the election system. + +```python +class CandidateType(str, Enum): + INDIVIDUAL = "individual" + COALITION = "coalition" + PARTY = "party" + MOVEMENT = "movement" +``` + +### ElectionAlgorithm + +AGENTSNET algorithms for coordination. + +```python +class ElectionAlgorithm(str, Enum): + CONSENSUS = "consensus" + LEADER_ELECTION = "leader_election" + MATCHING = "matching" + COLORING = "coloring" + VERTEX_COVER = "vertex_cover" +``` + +### VoterDecision + +Structured output from voter agents. + +```python +@dataclass +class VoterDecision: + voter_id: str + rationality: str + vote: VoteResult + confidence: float = 0.0 + reasoning_factors: List[str] = field(default_factory=list) + candidate_rankings: Dict[str, int] = field(default_factory=dict) + tool_call_explanation: Optional[str] = None + timestamp: datetime = field(default_factory=datetime.now) +``` + +### VoteResult + +Possible vote results. + +```python +class VoteResult(str, Enum): + FOR = "for" + AGAINST = "against" + ABSTAIN = "abstain" + INVALID = "invalid" +``` + +### ElectionResult + +Results of an election. + +```python +@dataclass +class ElectionResult: + election_id: str + algorithm_used: ElectionAlgorithm + total_voters: int + total_candidates: int + votes_cast: int + winner: Optional[str] = None + vote_distribution: Dict[str, int] = field(default_factory=dict) + voter_decisions: List[VoterDecision] = field(default_factory=list) + consensus_reached: bool = False + rounds_to_consensus: int = 0 + timestamp: datetime = field(default_factory=datetime.now) +``` + +### MessagePassingProtocol + +AGENTSNET-inspired message-passing protocol. + +```python +class MessagePassingProtocol: + def __init__(self, rounds: int = 5, synchronous: bool = True): + self.rounds = rounds + self.synchronous = synchronous + self.current_round = 0 + self.message_history: List[Dict[str, Any]] = [] +``` + +### CostTracker + +Track costs and usage for budget management. + +```python +@dataclass +class CostTracker: + total_tokens_used: int = 0 + total_cost_estimate: float = 0.0 + budget_limit: float = DEFAULT_BUDGET_LIMIT + token_cost_per_1m: float = 0.15 + requests_made: int = 0 + cache_hits: int = 0 +``` + +## Voter Tool Calls + +The ElectionSwarm supports optional tool calls that allow voter agents to provide detailed explanations of their voting decisions. When enabled, each voter agent will make a tool call to `explain_voting_decision` before casting their vote. + +### Tool Call Features + +- **Detailed Explanations**: Voters provide comprehensive reasoning for their choices +- **Structured Data**: Tool calls include voter name, ID, chosen candidate, and reasoning +- **Confidence Levels**: Voters specify their confidence in their decision +- **Alternative Considerations**: Voters explain why other candidates were not chosen +- **Key Factors**: Voters list the most important factors influencing their decision + +### Tool Call Structure + +```python +{ + "voter_name": "Alice Johnson", + "voter_id": "voter_001", + "chosen_candidate": "John Progressive", + "voting_reasoning": "Detailed explanation of why this candidate was chosen", + "key_factors": ["healthcare policy", "environmental stance", "experience"], + "confidence_level": 0.85, + "alternative_considerations": "Considered Sarah Conservative but disagreed on healthcare" +} +``` + +### Enabling Tool Calls + +```python +# Enable voter tool calls in configuration +config = ElectionConfig( + config_data={ + "enable_voter_tool_calls": True, + # ... other config options + } +) + +election = ElectionSwarm( + election_config=config, + # ... other parameters +) +``` + +## AGENTSNET Communication Protocols + +### Message Passing Protocol + +The ElectionSwarm implements AGENTSNET-inspired message-passing for distributed coordination: + +- **Synchronous Communication**: Agents communicate in discrete rounds +- **Neighbor-Only Messaging**: Agents can only communicate with immediate neighbors +- **JSON Message Format**: Structured message exchange +- **Multi-Round Process**: Multiple rounds for information propagation +- **Final Decision**: Agents make final decisions after communication rounds + +### Communication Flow + +```mermaid +sequenceDiagram + participant V1 as Voter 1 + participant V2 as Voter 2 + participant V3 as Voter 3 + participant ES as ElectionSwarm + + Note over ES: Round 1: Initial Communication + V1->>V2: Share candidate preferences + V2->>V1: Share candidate preferences + V2->>V3: Share candidate preferences + V3->>V2: Share candidate preferences + + Note over ES: Round 2: Information Exchange + V1->>V2: Discuss reasoning factors + V2->>V1: Discuss reasoning factors + V2->>V3: Discuss reasoning factors + V3->>V2: Discuss reasoning factors + + Note over ES: Round 3: Final Consensus + V1->>ES: Final vote with reasoning + V2->>ES: Final vote with reasoning + V3->>ES: Final vote with reasoning +``` + +## Election Algorithms + +The ElectionSwarm implements five AGENTSNET-inspired algorithms for orchestrator selection. Each algorithm is designed for different coordination scenarios and communication patterns. + +### 1. Consensus Algorithm + +All agents must agree on a single orchestrator candidate through multiple rounds of communication. + +- **Purpose**: Achieve unanimous agreement on orchestrator selection +- **Process**: Multiple rounds until consensus threshold is reached +- **Threshold**: Configurable consensus threshold (default: 60%) +- **Fallback**: Majority vote if consensus not reached +- **Implementation**: `_conduct_consensus_election()` method +- **Use Case**: When unanimous agreement is required + +### 2. Leader Election Algorithm + +Select a single leader orchestrator through distributed voting. + +- **Purpose**: Elect one orchestrator to lead the multi-agent workflow +- **Process**: Direct voting with winner-takes-all mechanism +- **Complexity**: O(D) rounds where D is network diameter +- **Implementation**: `_conduct_leader_election()` method +- **Use Case**: When clear leadership hierarchy is needed + +### 3. Matching Algorithm + +Pair voters with compatible orchestrator candidates using maximal matching. + +- **Purpose**: Create optimal voter-candidate pairings +- **Process**: Maximal matching to avoid conflicts +- **Complexity**: O(log* n) rounds +- **Implementation**: `_conduct_matching_election()` method +- **Use Case**: When specialized orchestrator-voter relationships are important + +### 4. Coloring Algorithm + +Group voters and candidates into compatible categories using graph coloring. + +- **Purpose**: Assign roles such that neighbors have different roles +- **Process**: (Δ+1)-coloring where Δ is maximum node degree +- **Complexity**: O(log* n) rounds +- **Implementation**: `_conduct_coloring_election()` method +- **Use Case**: Role assignment with conflict avoidance + +### 5. Vertex Cover Algorithm + +Select minimal set of coordinator agents to cover all voters. + +- **Purpose**: Choose minimal set of orchestrators to cover all voters +- **Process**: Minimal vertex cover selection +- **Complexity**: O(log* n) rounds +- **Implementation**: `_conduct_vertex_cover_election()` method +- **Use Case**: When resource-efficient coordination is needed + +### Algorithm Selection + +Choose the appropriate algorithm based on your coordination needs: + +```python +# For unanimous decisions +ElectionAlgorithm.CONSENSUS + +# For clear leadership +ElectionAlgorithm.LEADER_ELECTION + +# For optimal pairings +ElectionAlgorithm.MATCHING + +# For role assignment +ElectionAlgorithm.COLORING + +# For minimal coordination +ElectionAlgorithm.VERTEX_COVER +``` + +## Usage Examples + +### Basic Election Setup + +```python +from swarms.structs.election_swarm import ( + ElectionSwarm, + VoterProfile, + CandidateProfile, + VoterType, + CandidateType, + ElectionAlgorithm +) + +# Create agent voters +voters = [ + VoterProfile( + voter_id="agent_001", + name="Data Analysis Agent", + voter_type=VoterType.EXPERT, + preferences={ + "technical_leadership": 0.9, + "data_processing": 0.8, + "workflow_efficiency": 0.6 + }, + expertise_areas=["data_science", "machine_learning"], + neighbors=["Research Agent", "Backend Agent"] + ), + # ... more voters +] + +# Create orchestrator candidates +candidates = [ + CandidateProfile( + candidate_id="orchestrator_001", + name="Technical Lead Orchestrator", + candidate_type=CandidateType.INDIVIDUAL, + policy_positions={ + "technical_leadership": "Focus on code quality and architecture", + "team_coordination": "Agile methodologies with clear planning" + }, + campaign_promises=[ + "Implement comprehensive code review processes", + "Establish automated testing pipelines" + ], + leadership_style="transformational", + technical_expertise=["software_architecture", "devops"] + ), + # ... more candidates +] + +# Create election swarm +election = ElectionSwarm( + name="Multi-Agent Orchestrator Election", + voters=voters, + candidates=candidates, + verbose=True +) +``` + +### Running Elections + +```python +# Run consensus election +result = election.run( + task="Select the best orchestrator for our multi-agent workflow", + election_type=ElectionAlgorithm.CONSENSUS, + max_rounds=5 +) + +# Run leader election +result = election.run( + task="Elect a leader to coordinate our development team", + election_type=ElectionAlgorithm.LEADER_ELECTION +) + +# Run matching election +result = election.run( + task="Match voters with compatible orchestrator candidates", + election_type=ElectionAlgorithm.MATCHING +) + +# Run election session with comprehensive analysis +session_result = election.run_election_session( + election_type=ElectionAlgorithm.CONSENSUS, + max_rounds=5 +) + +# Access detailed results +election_result = session_result["election_result"] +analysis = session_result["analysis"] +cost_stats = session_result["cost_statistics"] +``` + +### Board CEO Election Example + +```python +# Create board of directors as voters +board_members = [ + VoterProfile( + voter_id="director_001", + name="Sarah Chen - Technology Director", + voter_type=VoterType.EXPERT, + preferences={ + "innovation": 0.9, + "technical_excellence": 0.8, + "digital_transformation": 0.7 + }, + expertise_areas=["technology", "innovation"], + coordination_style="data_driven", + leadership_preferences={ + "technical_leadership": 0.9, + "innovation": 0.8 + } + ), + # ... more board members +] + +# Create CEO candidates with diverse approaches +ceo_candidates = [ + CandidateProfile( + candidate_id="ceo_001", + name="Alexandra Martinez - Innovation CEO", + candidate_type=CandidateType.INDIVIDUAL, + policy_positions={ + "innovation_leadership": "Aggressive digital transformation", + "growth_strategy": "Rapid expansion with high-risk investments" + }, + leadership_style="transformational", + coordination_approach={"innovation": 0.9, "risk_taking": 0.8} + ), + # ... more CEO candidates +] + +# Create board CEO election +ceo_election = ElectionSwarm( + name="Board of Directors CEO Election", + voters=board_members, + candidates=ceo_candidates, + verbose=True +) + +# Run the election +result = ceo_election.run( + task="Elect the best CEO to lead our company", + election_type=ElectionAlgorithm.CONSENSUS +) + +# Get comprehensive statistics +stats = ceo_election.get_election_statistics() +print(f"Election statistics: {stats}") + +# Get swarm status +status = ceo_election.get_swarm_status() +print(f"Swarm status: {status}") +``` + +## Advanced Configuration + +### Election Configuration + +The ElectionSwarm uses a comprehensive configuration system with Pydantic validation. + +```python +from swarms.structs.election_swarm import ElectionConfig, ElectionConfigModel + +# Create configuration with validation +config = ElectionConfig( + config_data={ + "election_type": "orchestrator_selection", + "max_candidates": 5, + "max_voters": 50, + "enable_consensus": True, + "enable_leader_election": True, + "enable_matching": True, + "enable_coloring": True, + "enable_vertex_cover": True, + "enable_caching": True, + "enable_voter_tool_calls": True, + "batch_size": 10, + "max_workers": 5, + "budget_limit": 100.0, + "default_model": "gpt-4o-mini", + "verbose_logging": True + } +) + +# Validate configuration +errors = config.validate_config() +if errors: + print(f"Configuration errors: {errors}") + +# Save configuration to file +config.save_config("election_config.yaml") + +election = ElectionSwarm( + name="Advanced Election", + election_config=config, + voters=voters, + candidates=candidates +) +``` + +### ElectionConfigModel + +The configuration model with validation constraints: + +```python +class ElectionConfigModel(BaseModel): + election_type: str = Field(default="orchestrator_selection") + max_candidates: int = Field(default=5, ge=1, le=20) + max_voters: int = Field(default=100, ge=1, le=1000) + enable_consensus: bool = Field(default=True) + enable_leader_election: bool = Field(default=True) + enable_matching: bool = Field(default=True) + enable_coloring: bool = Field(default=True) + enable_vertex_cover: bool = Field(default=True) + enable_caching: bool = Field(default=True) + enable_voter_tool_calls: bool = Field(default=True) + batch_size: int = Field(default=25, ge=1, le=100) + max_workers: int = Field(default=10, ge=1, le=50) + budget_limit: float = Field(default=200.0, ge=0.0) + default_model: str = Field(default="gpt-4o-mini") + verbose_logging: bool = Field(default=False) +``` + +### Cost Tracking and Budget Management + +```python +# Monitor election costs +election = ElectionSwarm( + budget_limit=50.0, + enable_caching=True +) + +# Run election with cost monitoring +result = election.run(task="Select orchestrator") + +# Get cost statistics +stats = election.get_election_statistics() +print(f"Total cost: ${stats['cost_statistics']['total_cost']:.2f}") +print(f"Cache hit rate: {stats['cost_statistics']['cache_hit_rate']:.1%}") +``` + +## Core Methods + +### Step Method + +Execute a single step of the ElectionSwarm. + +```python +# Execute a single election step +result = election.step( + task="Select the best orchestrator for our team", + election_type=ElectionAlgorithm.CONSENSUS, + max_rounds=5 +) +``` + +### Run Method + +Run the ElectionSwarm for the specified number of loops. + +```python +# Run multiple election loops +result = election.run( + task="Elect a leader for our development team", + election_type=ElectionAlgorithm.LEADER_ELECTION, + max_rounds=3 +) +``` + +### Conduct Election + +Conduct an election using the specified algorithm. + +```python +# Conduct election directly +result = election.conduct_election( + election_type=ElectionAlgorithm.CONSENSUS, + max_rounds=5 +) +``` + +### Run Election Session + +Run a complete election session with comprehensive analysis. + +```python +# Run election session with detailed analysis +session_result = election.run_election_session( + election_type=ElectionAlgorithm.CONSENSUS, + max_rounds=5 +) + +# Access results +election_result = session_result["election_result"] +analysis = session_result["analysis"] +cost_statistics = session_result["cost_statistics"] +``` + +## Swarm Management Methods + +### Adding and Removing Participants + +```python +# Add new voter +new_voter = VoterProfile( + voter_id="agent_007", + name="New Agent", + voter_type=VoterType.EXPERT +) +election.add_voter(new_voter) + +# Add new candidate +new_candidate = CandidateProfile( + candidate_id="orchestrator_004", + name="New Orchestrator", + candidate_type=CandidateType.INDIVIDUAL +) +election.add_candidate(new_candidate) + +# Remove participants +election.remove_voter("agent_007") +election.remove_candidate("orchestrator_004") +``` + +### Swarm Status and Statistics + +```python +# Get swarm information +status = election.get_swarm_status() +print(f"Total participants: {status['total_participants']}") +print(f"Voters: {status['voters']}") +print(f"Candidates: {status['candidates']}") +print(f"Budget remaining: ${status['budget_remaining']:.2f}") + +# Get comprehensive statistics +stats = election.get_election_statistics() +print(f"Swarm name: {stats['swarm_info']['name']}") +print(f"Cache size: {stats['cache_stats']['cache_size']}") + +# Get swarm size +swarm_size = election.get_swarm_size() +print(f"Total swarm size: {swarm_size}") + +# Get specific voter or candidate +voter = election.get_voter("voter_001") +candidate = election.get_candidate("candidate_001") +``` + +### Reset and Cleanup + +```python +# Reset election swarm to initial state +election.reset() + +# This clears: +# - Conversation history +# - Cost tracking +# - Message protocol +# - Cache +``` + +## Best Practices + +### 1. Voter Design + +- **Diverse Expertise**: Include voters with different specializations +- **Balanced Preferences**: Ensure varied preference profiles +- **Neighbor Connections**: Create meaningful neighbor relationships +- **Realistic Demographics**: Use realistic agent characteristics + +### 2. Candidate Design + +- **Clear Platforms**: Define distinct policy positions +- **Compelling Promises**: Create realistic campaign promises +- **Relevant Experience**: Include pertinent background experience +- **Leadership Styles**: Vary leadership and coordination approaches + +### 3. Election Configuration + +- **Appropriate Algorithms**: Choose algorithms matching your use case +- **Budget Management**: Set realistic budget limits +- **Caching Strategy**: Enable caching for repeated elections +- **Logging Level**: Use verbose logging for debugging + +### 4. Performance Optimization + +- **Lazy Loading**: Enable lazy loading for large swarms +- **Batch Processing**: Use appropriate batch sizes +- **Parallel Execution**: Leverage concurrent operations +- **Cost Monitoring**: Track and optimize token usage + +## Error Handling + +The ElectionSwarm includes comprehensive error handling: + +```python +try: + result = election.run( + task="Select orchestrator", + election_type=ElectionAlgorithm.CONSENSUS + ) +except ValueError as e: + print(f"Configuration error: {e}") +except Exception as e: + print(f"Election failed: {e}") +``` + +Common error scenarios: + +- **No voters or candidates**: Ensure at least one participant +- **Invalid algorithm**: Use supported ElectionAlgorithm values +- **Budget exceeded**: Monitor and adjust budget limits +- **Agent loading failures**: Check agent configurations + +## Integration with Other Swarms + +The ElectionSwarm can be integrated with other Swarms architectures: + +```python +# Use with BoardOfDirectorsSwarm +from swarms.structs.board_of_directors_swarm import BoardOfDirectorsSwarm + +# Create board election +board_election = ElectionSwarm( + name="Board CEO Election", + voters=board_members, + candidates=ceo_candidates +) + +# Elect CEO +ceo_result = board_election.run( + task="Elect CEO for board governance", + election_type=ElectionAlgorithm.CONSENSUS +) + +# Use elected CEO with BoardOfDirectorsSwarm +board = BoardOfDirectorsSwarm( + chairman=ceo_result.winner, + # ... other board configuration +) +``` + +## Utility Functions + +### Create Default Configuration + +Create a default election configuration file. + +```python +from swarms.structs.election_swarm import create_default_election_config + +# Create default configuration file +create_default_election_config("my_election_config.yaml") +``` + +### Default Constants + +The ElectionSwarm uses several default constants: + +```python +DEFAULT_BUDGET_LIMIT = 200.0 +DEFAULT_CONSENSUS_THRESHOLD = 0.6 +DEFAULT_MAX_WORKERS = 10 +DEFAULT_MAX_ROUNDS = 5 +DEFAULT_BATCH_SIZE = 25 +``` + +## Conclusion + +The ElectionSwarm provides a robust, scalable solution for orchestrator selection in multi-agent systems. By implementing AGENTSNET communication protocols and multiple election algorithms, it enables sophisticated democratic decision-making processes that can adapt to various coordination requirements and communication complexities. + +Key advantages: + +- **Distributed Coordination**: Implements proven distributed computing algorithms +- **Flexible Architecture**: Supports various election types and configurations +- **Production Ready**: Includes cost tracking, caching, and comprehensive error handling +- **Swarms Integration**: Seamlessly integrates with other Swarms architectures +- **Real-world Applications**: Suitable for corporate governance, team leadership, and workflow orchestration + +The ElectionSwarm represents a significant advancement in multi-agent coordination, providing the tools necessary for sophisticated democratic decision-making in complex agent-based systems. From 27a467e56b3e52f36fe5c3e1d6cafe41c5d6203e Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Fri, 26 Sep 2025 22:16:15 +0300 Subject: [PATCH 5/7] Update election_swarm.md --- docs/swarms/structs/election_swarm.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/swarms/structs/election_swarm.md b/docs/swarms/structs/election_swarm.md index f815213d..4cef2260 100644 --- a/docs/swarms/structs/election_swarm.md +++ b/docs/swarms/structs/election_swarm.md @@ -2,6 +2,14 @@ The `ElectionSwarm` is a sophisticated multi-agent orchestrator selection system that implements AGENTSNET communication protocols for democratic decision-making. It enables agent voters to elect the best orchestrator candidate through various coordination algorithms, providing a structured approach to leadership selection in multi-agent workflows. +## Quick Start + +```python +from swarms.structs import ElectionSwarm +# or +from swarms.structs.election_swarm import ElectionSwarm +``` + ## Overview The ElectionSwarm follows a democratic election workflow pattern: From f661da6ea0b1a7cd891338670104de72b9e3a098 Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Fri, 26 Sep 2025 22:16:37 +0300 Subject: [PATCH 6/7] Update election_swarm.md --- docs/swarms/structs/election_swarm.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/swarms/structs/election_swarm.md b/docs/swarms/structs/election_swarm.md index 4cef2260..c32d0752 100644 --- a/docs/swarms/structs/election_swarm.md +++ b/docs/swarms/structs/election_swarm.md @@ -2,7 +2,7 @@ The `ElectionSwarm` is a sophisticated multi-agent orchestrator selection system that implements AGENTSNET communication protocols for democratic decision-making. It enables agent voters to elect the best orchestrator candidate through various coordination algorithms, providing a structured approach to leadership selection in multi-agent workflows. -## Quick Start +## Imports! ```python from swarms.structs import ElectionSwarm From 8f6c0edb64204bf2db51cc14e743a46dd17bdb37 Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Fri, 26 Sep 2025 22:18:13 +0300 Subject: [PATCH 7/7] Update __init__.py --- swarms/structs/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index fc05d99c..dbff25d1 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -2,6 +2,7 @@ from swarms.structs.agent import Agent from swarms.structs.agent_builder import AgentsBuilder from swarms.structs.agent_loader import AgentLoader from swarms.structs.agent_rearrange import AgentRearrange, rearrange +from swarms.structs.election_swarm import ElectionSwarm from swarms.structs.auto_swarm_builder import AutoSwarmBuilder from swarms.structs.base_structure import BaseStructure from swarms.structs.base_swarm import BaseSwarm @@ -186,4 +187,5 @@ __all__ = [ "check_end", "AgentLoader", "BatchedGridWorkflow", + "ElectionSwarm", ]