diff --git a/examples/simulations/euroswarm_parliament/README.md b/examples/simulations/euroswarm_parliament/README.md new file mode 100644 index 00000000..9b7ed09e --- /dev/null +++ b/examples/simulations/euroswarm_parliament/README.md @@ -0,0 +1,370 @@ +# EuroSwarm Parliament - European Parliament Simulation + +A comprehensive simulation of the European Parliament with 717 MEPs (Members of European Parliament) based on real EU data, featuring full democratic functionality including bill introduction, committee work, parliamentary debates, and democratic voting mechanisms. + +## Overview + +The EuroSwarm Parliament transforms the basic senator simulation into a full-fledged European Parliament with democratic capabilities. Unlike the original senator simulation that only allowed simple "Aye/Nay" voting, this system provides: + +- **Democratic Discussion**: Full parliamentary debates with diverse perspectives +- **Committee Work**: Specialized committee hearings and analysis +- **Bill Processing**: Complete legislative workflow from introduction to final vote +- **Political Group Coordination**: Realistic political group dynamics +- **Real MEP Data**: Based on actual EU.xml data with 700 real MEPs +- **Board of Directors Pattern**: Advanced democratic decision-making using the Board of Directors swarm + +## Key Features + +### Democratic Functionality +- **Bill Introduction**: MEPs can introduce bills with sponsors and co-sponsors +- **Committee Hearings**: Specialized committee analysis and recommendations +- **Parliamentary Debates**: Multi-perspective discussions with diverse participants +- **Democratic Voting**: Comprehensive voting with individual reasoning and political group analysis +- **Amendment Process**: Support for bill amendments and modifications + +### Realistic Parliament Structure +- **717 MEPs**: Based on real EU.xml data with actual MEP names and affiliations +- **Political Groups**: All major European political groups represented +- **Committee System**: 16 specialized committees with chairs and members +- **Leadership Positions**: President, Vice Presidents, Committee Chairs +- **Country Representation**: All EU member states represented + +### Advanced AI Agents +- **Individual MEP Agents**: Each MEP has a unique AI agent with: + - Political group alignment + - National party affiliation + - Committee memberships + - Areas of expertise + - Country-specific interests +- **Democratic Decision-Making**: Board of Directors pattern for consensus building +- **Contextual Responses**: MEPs respond based on their political positions and expertise + +## Architecture + +### Core Components + +#### 1. ParliamentaryMember +Represents individual MEPs with: +- Personal information (name, country, political group) +- Parliamentary role and committee memberships +- Areas of expertise and voting weight +- AI agent for decision-making + +#### 2. ParliamentaryBill +Represents legislative proposals with: +- Title, description, and legislative procedure type +- Committee assignment and sponsorship +- Status tracking and amendment support + +#### 3. ParliamentaryCommittee +Represents parliamentary committees with: +- Chair and vice-chair positions +- Member lists and responsibilities +- Current bills under consideration + +#### 4. ParliamentaryVote +Represents voting sessions with: +- Individual MEP votes and reasoning +- Political group analysis +- Final results and statistics + +### Democratic Decision-Making + +The system uses the Board of Directors pattern for democratic decision-making: + +1. **Political Group Leaders**: Each political group has a representative on the democratic council +2. **Weighted Voting**: Voting weights based on group size +3. **Consensus Building**: Multi-round discussions to reach consensus +4. **Individual Voting**: MEPs vote individually after considering the democratic council's analysis + +## Political Groups + +The simulation includes all major European political groups: + +- **Group of the European People's Party (Christian Democrats)** - EPP +- **Group of the Progressive Alliance of Socialists and Democrats** - S&D +- **Renew Europe Group** - RE +- **Group of the Greens/European Free Alliance** - Greens/EFA +- **European Conservatives and Reformists Group** - ECR +- **The Left group in the European Parliament** - GUE/NGL +- **Patriots for Europe Group** - Patriots +- **Europe of Sovereign Nations Group** - ESN +- **Non-attached Members** - NI + +## Committees + +16 specialized committees covering all major policy areas: + +1. **Agriculture and Rural Development** +2. **Budgetary Control** +3. **Civil Liberties, Justice and Home Affairs** +4. **Development** +5. **Economic and Monetary Affairs** +6. **Employment and Social Affairs** +7. **Environment, Public Health and Food Safety** +8. **Foreign Affairs** +9. **Industry, Research and Energy** +10. **Internal Market and Consumer Protection** +11. **International Trade** +12. **Legal Affairs** +13. **Petitions** +14. **Regional Development** +15. **Security and Defence** +16. **Transport and Tourism** + +## Usage + +### Basic Initialization + +```python +from euroswarm_parliament import EuroSwarmParliament, VoteType + +# Initialize parliament +parliament = EuroSwarmParliament( + eu_data_file="EU.xml", + parliament_size=None, # Use all MEPs from EU.xml (718) + enable_democratic_discussion=True, + enable_committee_work=True, + enable_amendment_process=True, + verbose=False +) +``` + +### Bill Introduction and Processing + +```python +# Introduce a bill +bill = parliament.introduce_bill( + title="European Climate Law", + description="Framework for achieving climate neutrality by 2050", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Environment, Public Health and Food Safety", + sponsor="Philippe Lamberts" +) + +# Conduct committee hearing +hearing = parliament.conduct_committee_hearing( + committee=bill.committee, + bill=bill +) + +# Conduct parliamentary debate +debate = parliament.conduct_parliamentary_debate( + bill=bill, + max_speakers=20 +) + +# Conduct democratic vote +vote = parliament.conduct_democratic_vote(bill) +``` + +### Complete Democratic Session + +```python +# Run a complete parliamentary session +session = parliament.run_democratic_session( + bill_title="Artificial Intelligence Act", + bill_description="Comprehensive regulation of AI systems in the EU", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Internal Market and Consumer Protection" +) + +print(f"Final Outcome: {session['session_summary']['final_outcome']}") +``` + +### Individual MEP Interaction + +```python +# Get specific MEP +mep = parliament.get_mep("Valérie Hayer") + +# Ask for position on policy +response = mep.agent.run("What is your position on digital privacy regulation?") +print(f"{mep.full_name}: {response}") +``` + +### Political Analysis + +```python +# Get parliament composition +composition = parliament.get_parliament_composition() + +# Analyze political groups +for group_name, stats in composition['political_groups'].items(): + print(f"{group_name}: {stats['count']} MEPs ({stats['percentage']:.1f}%)") + +# Get country representation +country_members = parliament.get_country_members("Germany") +print(f"German MEPs: {len(country_members)}") +``` + +## Democratic Features + +### 1. Democratic Discussion +- **Multi-Perspective Debates**: MEPs from different political groups and countries +- **Expertise-Based Input**: MEPs contribute based on their areas of expertise +- **Constructive Dialogue**: Respectful debate with evidence-based arguments + +### 2. Committee Work +- **Specialized Analysis**: Committees provide detailed technical analysis +- **Expert Recommendations**: Committee members offer specialized insights +- **Stakeholder Consideration**: Multiple perspectives on policy impacts + +### 3. Democratic Voting +- **Individual Reasoning**: Each MEP provides reasoning for their vote +- **Political Group Analysis**: Voting patterns by political affiliation +- **Transparent Process**: Full visibility into decision-making process + +### 4. Consensus Building +- **Board of Directors Pattern**: Advanced democratic decision-making +- **Weighted Representation**: Political groups weighted by size +- **Multi-Round Discussion**: Iterative process to reach consensus + +## 🔧 Configuration + +### Parliament Settings + +```python +parliament = EuroSwarmParliament( + eu_data_file="EU.xml", # Path to EU data file + parliament_size=None, # Use all MEPs from EU.xml (717) + enable_democratic_discussion=True, # Enable democratic features + enable_committee_work=True, # Enable committee system + enable_amendment_process=True, # Enable bill amendments + verbose=False # Enable detailed logging +) +``` + +### MEP Agent Configuration + +Each MEP agent is configured with: +- **System Prompt**: Comprehensive political background and principles +- **Model**: GPT-4o-mini for consistent responses +- **Max Loops**: 3 iterations for thorough analysis +- **Expertise Areas**: Based on political group and country + +## 📊 Data Sources + +### EU.xml File +The simulation uses real EU data from the EU.xml file containing: +- **MEP Names**: Full names of all 700 MEPs +- **Countries**: Country representation +- **Political Groups**: European political group affiliations +- **National Parties**: National political party memberships +- **MEP IDs**: Unique identifiers for each MEP + +### Fallback System +If EU.xml cannot be loaded, the system creates representative fallback MEPs: +- **Sample MEPs**: Representative selection from major political groups +- **Realistic Data**: Based on actual European Parliament composition +- **Full Functionality**: All democratic features remain available + +## 🎮 Example Scenarios + +### Scenario 1: Climate Policy Debate +```python +# Climate change legislation with diverse perspectives +session = parliament.run_democratic_session( + bill_title="European Climate Law", + bill_description="Carbon neutrality framework for 2050", + committee="Environment, Public Health and Food Safety" +) +``` + +### Scenario 2: Digital Regulation +```python +# Digital services regulation with technical analysis +session = parliament.run_democratic_session( + bill_title="Digital Services Act", + bill_description="Online platform regulation", + committee="Internal Market and Consumer Protection" +) +``` + +### Scenario 3: Social Policy +```python +# Minimum wage directive with social considerations +session = parliament.run_democratic_session( + bill_title="European Minimum Wage Directive", + bill_description="Framework for adequate minimum wages", + committee="Employment and Social Affairs" +) +``` + +## 🔮 Future Enhancements + +### Planned Optimizations +1. **Performance Optimization**: Parallel processing for large-scale voting +2. **Advanced NLP**: Better analysis of debate transcripts and reasoning +3. **Real-time Updates**: Dynamic parliament composition updates +4. **Historical Analysis**: Track voting patterns and political evolution +5. **External Integration**: Connect with real EU data sources + +### Potential Features +1. **Amendment System**: Full amendment proposal and voting +2. **Lobbying Simulation**: Interest group influence on MEPs +3. **Media Integration**: Public opinion and media coverage +4. **International Relations**: Interaction with other EU institutions +5. **Budget Simulation**: Financial impact analysis of legislation + +## 📝 Requirements + +### Dependencies +- `swarms`: Core swarm framework +- `loguru`: Advanced logging +- `xml.etree.ElementTree`: XML parsing for EU data +- `dataclasses`: Data structure support +- `typing`: Type hints +- `datetime`: Date and time handling + +### Data Files +- `EU.xml`: European Parliament member data (included) + +## 🏃‍♂️ Quick Start + +1. **Install Dependencies**: + ```bash + pip install swarms loguru + ``` + +2. **Run Example**: + ```bash + python euroswarm_parliament_example.py + ``` + +3. **Create Custom Session**: + ```python + from euroswarm_parliament import EuroSwarmParliament, VoteType + + parliament = EuroSwarmParliament() + session = parliament.run_democratic_session( + bill_title="Your Bill Title", + bill_description="Your bill description", + committee="Relevant Committee" + ) + ``` + +## 🤝 Contributing + +The EuroSwarm Parliament is designed to be extensible and customizable. Contributions are welcome for: + +- **New Democratic Features**: Additional parliamentary procedures +- **Performance Optimizations**: Faster processing for large parliaments +- **Data Integration**: Additional EU data sources +- **Analysis Tools**: Advanced political analysis features +- **Documentation**: Improved documentation and examples + +## 📄 License + +This project is part of the Swarms Democracy framework and follows the same licensing terms. + +## 🏛️ Acknowledgments + +- **European Parliament**: For the democratic structure and procedures +- **EU Data**: For providing comprehensive MEP information +- **Swarms Framework**: For the underlying multi-agent architecture +- **Board of Directors Pattern**: For advanced democratic decision-making + +--- + +*The EuroSwarm Parliament represents a significant advancement in democratic simulation, providing a realistic and comprehensive model of European parliamentary democracy with full AI-powered MEP representation and democratic decision-making processes.* \ No newline at end of file diff --git a/examples/simulations/euroswarm_parliament/__init__.py b/examples/simulations/euroswarm_parliament/__init__.py new file mode 100644 index 00000000..381233f0 --- /dev/null +++ b/examples/simulations/euroswarm_parliament/__init__.py @@ -0,0 +1,55 @@ +""" +EuroSwarm Parliament - European Parliament Simulation + +A comprehensive simulation of the European Parliament with 717 MEPs (Members of European Parliament) +based on real EU data, featuring full democratic functionality including bill introduction, committee work, +parliamentary debates, and democratic voting mechanisms. + +Enhanced with hierarchical democratic structure where each political group operates as a specialized +Board of Directors with expertise areas, and a Parliament Speaker aggregates decisions using weighted voting. + +Includes Wikipedia personality system for realistic, personality-driven MEP behavior based on real biographical data. +""" + +from euroswarm_parliament import ( + EuroSwarmParliament, + ParliamentaryMember, + ParliamentaryBill, + ParliamentaryVote, + ParliamentaryCommittee, + PoliticalGroupBoard, + ParliamentSpeaker, + ParliamentaryRole, + VoteType, + VoteResult, +) + +# Import Wikipedia personality system +try: + from wikipedia_personality_scraper import ( + WikipediaPersonalityScraper, + MEPPersonalityProfile, + ) + WIKIPEDIA_PERSONALITY_AVAILABLE = True +except ImportError: + WIKIPEDIA_PERSONALITY_AVAILABLE = False + +__version__ = "2.1.0" +__author__ = "Swarms Democracy Team" +__description__ = "European Parliament Simulation with Enhanced Hierarchical Democratic Functionality and Wikipedia Personality System" + +__all__ = [ + "EuroSwarmParliament", + "ParliamentaryMember", + "ParliamentaryBill", + "ParliamentaryVote", + "ParliamentaryCommittee", + "PoliticalGroupBoard", + "ParliamentSpeaker", + "ParliamentaryRole", + "VoteType", + "VoteResult", + "WikipediaPersonalityScraper", + "MEPPersonalityProfile", + "WIKIPEDIA_PERSONALITY_AVAILABLE", +] \ No newline at end of file diff --git a/examples/simulations/euroswarm_parliament/euroswarm_parliament.py b/examples/simulations/euroswarm_parliament/euroswarm_parliament.py new file mode 100644 index 00000000..9984ba58 --- /dev/null +++ b/examples/simulations/euroswarm_parliament/euroswarm_parliament.py @@ -0,0 +1,2598 @@ +""" +EuroSwarm Parliament - European Parliament Simulation with Democratic Functionality + +This simulation creates a comprehensive European Parliament with 700 MEPs (Members of European Parliament) +based on real EU data, featuring democratic discussion, bill analysis, committee work, and voting mechanisms. + +ENHANCED WITH COST OPTIMIZATION: +- Lazy loading of MEP agents +- Response caching for repeated queries +- Batch processing for large-scale operations +- Budget controls and cost tracking +- Memory optimization for large parliaments +""" + +import os +import random +import xml.etree.ElementTree as ET +import time +import hashlib +from typing import Dict, List, Optional, Union, Any, Set +from dataclasses import dataclass, field +from enum import Enum +from datetime import datetime, timedelta +from functools import lru_cache + +from swarms import Agent +from swarms.structs.multi_agent_exec import run_agents_concurrently +from swarms.structs.board_of_directors_swarm import ( + BoardOfDirectorsSwarm, + BoardMember, + BoardMemberRole, + BoardDecisionType, + BoardSpec, + BoardOrder, + BoardDecision, + enable_board_feature, +) +from swarms.utils.loguru_logger import initialize_logger + +# Initialize logger first +logger = initialize_logger(log_folder="euroswarm_parliament") + +# Enable Board of Directors feature +enable_board_feature() + +# Import Wikipedia personality system +try: + from wikipedia_personality_scraper import WikipediaPersonalityScraper, MEPPersonalityProfile + WIKIPEDIA_PERSONALITY_AVAILABLE = True +except ImportError: + WIKIPEDIA_PERSONALITY_AVAILABLE = False + logger.warning("Wikipedia personality system not available. Using basic personality generation.") + + +@dataclass +class CostTracker: + """Track costs and usage for budget management in parliamentary operations.""" + + total_tokens_used: int = 0 + total_cost_estimate: float = 0.0 + budget_limit: float = 200.0 # Default $200 budget for parliament + token_cost_per_1m: float = 0.15 # GPT-4o-mini cost + requests_made: int = 0 + cache_hits: int = 0 + + def add_tokens(self, tokens: int): + """Add tokens used and calculate cost.""" + self.total_tokens_used += tokens + self.total_cost_estimate = (self.total_tokens_used / 1_000_000) * self.token_cost_per_1m + self.requests_made += 1 + + def add_cache_hit(self): + """Record a cache hit.""" + self.cache_hits += 1 + + def check_budget(self) -> bool: + """Check if within budget.""" + return self.total_cost_estimate <= self.budget_limit + + def get_stats(self) -> Dict[str, Any]: + """Get cost statistics.""" + return { + "total_tokens": self.total_tokens_used, + "total_cost": self.total_cost_estimate, + "requests_made": self.requests_made, + "cache_hits": self.cache_hits, + "cache_hit_rate": self.cache_hits / max(1, self.requests_made + self.cache_hits), + "budget_remaining": max(0, self.budget_limit - self.total_cost_estimate) + } + + +class ParliamentaryRole(str, Enum): + """Enumeration of parliamentary roles and positions.""" + + PRESIDENT = "president" + VICE_PRESIDENT = "vice_president" + QUAESTOR = "quaestor" + COMMITTEE_CHAIR = "committee_chair" + COMMITTEE_VICE_CHAIR = "committee_vice_chair" + POLITICAL_GROUP_LEADER = "political_group_leader" + MEP = "mep" + + +class VoteType(str, Enum): + """Enumeration of voting types in the European Parliament.""" + + ORDINARY_LEGISLATIVE_PROCEDURE = "ordinary_legislative_procedure" + CONSENT_PROCEDURE = "consent_procedure" + CONSULTATION_PROCEDURE = "consultation_procedure" + BUDGET_VOTE = "budget_vote" + RESOLUTION_VOTE = "resolution_vote" + APPOINTMENT_VOTE = "appointment_vote" + + +class VoteResult(str, Enum): + """Enumeration of possible vote results.""" + + PASSED = "passed" + FAILED = "failed" + TIED = "tied" + ABSTAINED = "abstained" + + +@dataclass +class ParliamentaryMember: + """ + Represents a Member of the European Parliament (MEP). + + Attributes: + full_name: Full name of the MEP + country: Country the MEP represents + political_group: European political group affiliation + national_party: National political party + mep_id: Unique MEP identifier + role: Parliamentary role (if any) + committees: List of committee memberships + expertise_areas: Areas of policy expertise + voting_weight: Weight of the MEP's vote (default: 1.0) + agent: The AI agent representing this MEP (lazy loaded) + is_loaded: Whether the agent has been instantiated + """ + + full_name: str + country: str + political_group: str + national_party: str + mep_id: str + role: ParliamentaryRole = ParliamentaryRole.MEP + committees: List[str] = field(default_factory=list) + expertise_areas: List[str] = field(default_factory=list) + voting_weight: float = 1.0 + agent: Optional[Agent] = None + is_loaded: bool = False + + +@dataclass +class ParliamentaryBill: + """ + Represents a bill or legislative proposal in the European Parliament. + + Attributes: + title: Title of the bill + description: Detailed description of the bill + bill_type: Type of legislative procedure + committee: Primary committee responsible + sponsor: MEP who sponsored the bill + co_sponsors: List of co-sponsoring MEPs + date_introduced: Date the bill was introduced + status: Current status of the bill + amendments: List of proposed amendments + """ + + title: str + description: str + bill_type: VoteType + committee: str + sponsor: str + co_sponsors: List[str] = field(default_factory=list) + date_introduced: datetime = field(default_factory=datetime.now) + status: str = "introduced" + amendments: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass +class ParliamentaryVote: + """ + Represents a parliamentary vote on a bill or resolution. + + Attributes: + bill: The bill being voted on + vote_type: Type of vote being conducted + date: Date of the vote + votes_for: Number of votes in favor + votes_against: Number of votes against + abstentions: Number of abstentions + absent: Number of absent MEPs + result: Final result of the vote + individual_votes: Dictionary of individual MEP votes + reasoning: Dictionary of MEP reasoning for votes + """ + + bill: ParliamentaryBill + vote_type: VoteType + date: datetime = field(default_factory=datetime.now) + votes_for: int = 0 + votes_against: int = 0 + abstentions: int = 0 + absent: int = 0 + result: VoteResult = VoteResult.FAILED + individual_votes: Dict[str, str] = field(default_factory=dict) + reasoning: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class ParliamentaryCommittee: + """ + Represents a parliamentary committee. + + Attributes: + name: Name of the committee + chair: Committee chairperson + vice_chair: Committee vice-chairperson + members: List of committee members + responsibilities: Committee responsibilities + current_bills: Bills currently under consideration + """ + + name: str + chair: str + vice_chair: str + members: List[str] = field(default_factory=list) + responsibilities: List[str] = field(default_factory=list) + current_bills: List[ParliamentaryBill] = field(default_factory=list) + + +@dataclass +class PoliticalGroupBoard: + """ + Represents a political group as a Board of Directors with specialized expertise. + + Attributes: + group_name: Name of the political group + members: List of MEPs in this group + board_members: Board members with specialized roles and internal percentages + expertise_areas: Specialized areas of governance expertise + voting_weight: Weight of this group's vote (percentage of parliament) + group_speaker: CEO/leader of this political group + total_meps: Total number of MEPs in this group + board_member_percentages: Dictionary mapping board members to their internal percentages + """ + + group_name: str + members: List[str] = field(default_factory=list) + board_members: List[BoardMember] = field(default_factory=list) + expertise_areas: List[str] = field(default_factory=list) + voting_weight: float = 0.0 + group_speaker: Optional[str] = None + total_meps: int = 0 + board_swarm: Optional[Any] = None # BoardOfDirectorsSwarm instance + board_member_percentages: Dict[str, float] = field(default_factory=dict) # Internal percentages within group + +@dataclass +class ParliamentSpeaker: + """ + Represents the Parliament Speaker who aggregates decisions from all political groups. + + Attributes: + name: Name of the speaker + agent: AI agent representing the speaker + political_groups: Dictionary of political group boards + total_meps: Total number of MEPs in parliament + majority_threshold: Number of votes needed for majority + """ + + name: str + agent: Optional[Agent] = None + political_groups: Dict[str, PoliticalGroupBoard] = field(default_factory=dict) + total_meps: int = 0 + majority_threshold: int = 0 + + +class EuroSwarmParliament: + """ + A comprehensive simulation of the European Parliament with 700 MEPs. + + This simulation provides democratic functionality including: + - Bill introduction and analysis + - Committee work and hearings + - Parliamentary debates and discussions + - Democratic voting mechanisms + - Political group coordination + - Amendment processes + """ + + def __init__( + self, + eu_data_file: str = "EU.xml", + parliament_size: int = None, # Changed from 700 to None to use all MEPs + enable_democratic_discussion: bool = True, + enable_committee_work: bool = True, + enable_amendment_process: bool = True, + enable_lazy_loading: bool = True, # NEW: Lazy load MEP agents + enable_caching: bool = True, # NEW: Enable response caching + batch_size: int = 25, # NEW: Batch size for concurrent execution + budget_limit: float = 200.0, # NEW: Budget limit in dollars + verbose: bool = False, + ): + """ + Initialize the EuroSwarm Parliament with cost optimization. + + Args: + eu_data_file: Path to EU.xml file containing MEP data + parliament_size: Target size of the parliament (default: None = use all MEPs from EU.xml) + enable_democratic_discussion: Enable democratic discussion features + enable_committee_work: Enable committee work and hearings + enable_amendment_process: Enable bill amendment processes + enable_lazy_loading: Enable lazy loading of MEP agents (cost optimization) + enable_caching: Enable response caching (cost optimization) + batch_size: Number of MEPs to process in batches + budget_limit: Maximum budget in dollars + verbose: Enable verbose logging + """ + self.eu_data_file = eu_data_file + self.parliament_size = parliament_size # Will be set to actual MEP count if None + self.enable_democratic_discussion = enable_democratic_discussion + self.enable_committee_work = enable_committee_work + self.enable_amendment_process = enable_amendment_process + self.enable_lazy_loading = enable_lazy_loading + self.enable_caching = enable_caching + self.batch_size = batch_size + self.verbose = verbose + + # Initialize cost tracking + self.cost_tracker = CostTracker(budget_limit=budget_limit) + + # Initialize parliamentary structures + self.meps: Dict[str, ParliamentaryMember] = {} + self.committees: Dict[str, ParliamentaryCommittee] = {} + self.political_groups: Dict[str, List[str]] = {} + self.bills: List[ParliamentaryBill] = [] + self.votes: List[ParliamentaryVote] = [] + self.debates: List[Dict[str, Any]] = [] + + # Enhanced democratic structures + self.political_group_boards: Dict[str, PoliticalGroupBoard] = {} + self.parliament_speaker: Optional[ParliamentSpeaker] = None + self.enable_hierarchical_democracy: bool = True + + # Wikipedia personality system + self.enable_wikipedia_personalities: bool = WIKIPEDIA_PERSONALITY_AVAILABLE + self.personality_profiles: Dict[str, MEPPersonalityProfile] = {} + self.personality_scraper: Optional[WikipediaPersonalityScraper] = None + + # Initialize caching + self.response_cache: Dict[str, str] = {} + + # Load MEP data and initialize structures + self.meps = self._load_mep_data() + self.parliament_size = len(self.meps) + + if self.verbose: + logger.info(f"EuroSwarm Parliament initialized with {self.parliament_size} MEPs") + logger.info(f"Lazy loading: {self.enable_lazy_loading}, Caching: {self.enable_caching}") + logger.info(f"Budget limit: ${budget_limit}, Batch size: {batch_size}") + + # Load Wikipedia personalities if enabled + if self.enable_wikipedia_personalities: + self._load_wikipedia_personalities() + + # Initialize parliamentary structures + self.committees = self._create_committees() + self.political_groups = self._organize_political_groups() + + # Initialize enhanced democratic structures + if self.enable_hierarchical_democracy: + self._create_political_group_boards() + self._create_parliament_speaker() + + # Initialize leadership and democratic decision-making + self._create_parliamentary_leadership() + self._assign_committee_leadership() + + if self.enable_democratic_discussion: + self._init_democratic_decision_making() + + def _load_mep_data(self) -> Dict[str, ParliamentaryMember]: + """ + Load MEP data from EU.xml file and create parliamentary members with lazy loading. + + Returns: + Dict[str, ParliamentaryMember]: Dictionary of MEPs + """ + meps = {} + + try: + # Construct the full path to EU.xml relative to project root + import os + project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + eu_data_path = os.path.join(project_root, self.eu_data_file) + + # Read the XML file content + with open(eu_data_path, 'r', encoding='utf-8') as f: + content = f.read() + + # Use regex to extract MEP data since the XML is malformed + import re + + # Find all MEP blocks + mep_pattern = r'\s*(.*?)\s*(.*?)\s*(.*?)\s*(.*?)\s*(.*?)\s*' + mep_matches = re.findall(mep_pattern, content, re.DOTALL) + + for full_name, country, political_group, mep_id, national_party in mep_matches: + # Clean up the data + full_name = full_name.strip() + country = country.strip() + political_group = political_group.strip() + mep_id = mep_id.strip() + national_party = national_party.strip() + + # Create parliamentary member (without agent for lazy loading) + mep = ParliamentaryMember( + full_name=full_name, + country=country, + political_group=political_group, + national_party=national_party, + mep_id=mep_id, + expertise_areas=self._generate_expertise_areas(political_group, country), + committees=self._assign_committees(political_group), + agent=None, # Will be created on demand + is_loaded=False + ) + + meps[full_name] = mep + + # Set parliament size to actual number of MEPs loaded if not specified + if self.parliament_size is None: + self.parliament_size = len(meps) + + logger.info(f"Loaded {len(meps)} MEP profiles from EU data (lazy loading enabled)") + + except Exception as e: + logger.error(f"Error loading MEP data: {e}") + # Create fallback MEPs if file loading fails + meps = self._create_fallback_meps() + if self.parliament_size is None: + self.parliament_size = len(meps) + + return meps + + def _load_mep_agent(self, mep_name: str) -> Optional[Agent]: + """ + Lazy load a single MEP agent on demand. + + Args: + mep_name: Name of the MEP to load + + Returns: + Optional[Agent]: Loaded agent or None if not found + """ + if mep_name not in self.meps: + return None + + mep = self.meps[mep_name] + + # Check if already loaded + if mep.is_loaded and mep.agent: + return mep.agent + + # Check budget before creating agent + if not self.cost_tracker.check_budget(): + logger.warning(f"Budget exceeded. Cannot load MEP agent {mep_name}") + return None + + # Create agent + mep.agent = self._create_mep_agent(mep) + mep.is_loaded = True + + if self.verbose: + logger.info(f"Loaded MEP agent: {mep_name}") + + return mep.agent + + def _load_mep_agents_batch(self, mep_names: List[str]) -> List[Agent]: + """ + Load multiple MEP agents in a batch. + + Args: + mep_names: List of MEP names to load + + Returns: + List[Agent]: List of loaded agents + """ + loaded_agents = [] + + for mep_name in mep_names: + agent = self._load_mep_agent(mep_name) + if agent: + loaded_agents.append(agent) + + return loaded_agents + + def _get_cache_key(self, task: str, mep_names: List[str]) -> str: + """ + Generate a cache key for a task and MEP combination. + + Args: + task: Task to execute + mep_names: List of MEP names + + Returns: + str: Cache key + """ + # Sort MEP names for consistent cache keys + sorted_meps = sorted(mep_names) + content = f"{task}:{':'.join(sorted_meps)}" + return hashlib.md5(content.encode()).hexdigest() + + def _check_cache(self, cache_key: str) -> Optional[str]: + """ + Check if a response is cached. + + Args: + cache_key: Cache key to check + + Returns: + Optional[str]: Cached response or None + """ + if not self.enable_caching: + return None + + cached_response = self.response_cache.get(cache_key) + if cached_response: + self.cost_tracker.add_cache_hit() + if self.verbose: + logger.info(f"Cache hit for key: {cache_key[:20]}...") + + return cached_response + + def _cache_response(self, cache_key: str, response: str): + """ + Cache a response. + + Args: + cache_key: Cache key + response: Response to cache + """ + if self.enable_caching: + self.response_cache[cache_key] = response + if self.verbose: + logger.info(f"Cached response for key: {cache_key[:20]}...") + + def _generate_expertise_areas(self, political_group: str, country: str) -> List[str]: + """ + Generate expertise areas based on political group and country. + + Args: + political_group: MEP's political group + country: MEP's country + + Returns: + List[str]: List of expertise areas + """ + expertise_mapping = { + "Group of the European People's Party (Christian Democrats)": [ + "Economic Policy", "Agriculture", "Regional Development", "Christian Values" + ], + "Group of the Progressive Alliance of Socialists and Democrats in the European Parliament": [ + "Social Policy", "Labor Rights", "Healthcare", "Education" + ], + "Renew Europe Group": [ + "Digital Policy", "Innovation", "Trade", "Liberal Values" + ], + "Group of the Greens/European Free Alliance": [ + "Environmental Policy", "Climate Change", "Renewable Energy", "Human Rights" + ], + "European Conservatives and Reformists Group": [ + "Sovereignty", "Defense", "Traditional Values", "Economic Freedom" + ], + "The Left group in the European Parliament - GUE/NGL": [ + "Workers' Rights", "Social Justice", "Anti-Austerity", "Public Services" + ], + "Patriots for Europe Group": [ + "National Sovereignty", "Border Security", "Cultural Identity", "Law and Order" + ], + "Europe of Sovereign Nations Group": [ + "National Independence", "Sovereignty", "Traditional Values", "Security" + ], + "Non-attached Members": [ + "Independent Policy", "Cross-cutting Issues", "Specialized Topics" + ] + } + + base_expertise = expertise_mapping.get(political_group, ["General Policy"]) + + # Add country-specific expertise + country_expertise = { + "Germany": ["Industrial Policy", "Manufacturing"], + "France": ["Agriculture", "Defense"], + "Italy": ["Cultural Heritage", "Tourism"], + "Spain": ["Tourism", "Agriculture"], + "Poland": ["Energy Security", "Eastern Partnership"], + "Netherlands": ["Trade", "Innovation"], + "Belgium": ["EU Institutions", "Multilingualism"], + "Austria": ["Alpine Policy", "Transport"], + "Sweden": ["Environmental Policy", "Social Welfare"], + "Denmark": ["Green Technology", "Welfare State"], + } + + if country in country_expertise: + base_expertise.extend(country_expertise[country]) + + return base_expertise[:5] # Limit to 5 expertise areas + + def _assign_committees(self, political_group: str) -> List[str]: + """ + Assign committees based on political group preferences. + + Args: + political_group: MEP's political group + + Returns: + List[str]: List of committee assignments + """ + committee_mapping = { + "Group of the European People's Party (Christian Democrats)": [ + "Agriculture and Rural Development", "Economic and Monetary Affairs", "Regional Development" + ], + "Group of the Progressive Alliance of Socialists and Democrats in the European Parliament": [ + "Employment and Social Affairs", "Environment, Public Health and Food Safety", "Civil Liberties" + ], + "Renew Europe Group": [ + "Industry, Research and Energy", "Internal Market and Consumer Protection", "Legal Affairs" + ], + "Group of the Greens/European Free Alliance": [ + "Environment, Public Health and Food Safety", "Transport and Tourism", "Development" + ], + "European Conservatives and Reformists Group": [ + "Foreign Affairs", "Security and Defence", "Budgetary Control" + ], + "The Left group in the European Parliament - GUE/NGL": [ + "International Trade", "Development", "Civil Liberties" + ], + "Patriots for Europe Group": [ + "Civil Liberties", "Security and Defence", "Budgetary Control" + ], + "Europe of Sovereign Nations Group": [ + "Foreign Affairs", "Security and Defence", "Civil Liberties" + ], + "Non-attached Members": [ + "Petitions", "Budgetary Control", "Legal Affairs" + ] + } + + return committee_mapping.get(political_group, ["Petitions"]) + + def _create_mep_agent(self, mep: ParliamentaryMember) -> Agent: + """ + Create an AI agent representing an MEP. + + Args: + mep: Parliamentary member data + + Returns: + Agent: AI agent representing the MEP + """ + system_prompt = self._generate_mep_system_prompt(mep) + + return Agent( + agent_name=f"MEP_{mep.full_name.replace(' ', '_')}", + system_prompt=system_prompt, + model_name="gpt-4o-mini", + max_loops=3, + verbose=self.verbose, + ) + + def _generate_mep_system_prompt(self, mep: ParliamentaryMember) -> str: + """ + Generate a comprehensive system prompt for an MEP agent with Wikipedia personality data. + + Args: + mep: Parliamentary member data + + Returns: + str: System prompt for the MEP agent + """ + + # Get Wikipedia personality profile if available + personality_profile = self.get_mep_personality_profile(mep.full_name) + + # Base prompt structure + prompt = f"""You are {mep.full_name}, a Member of the European Parliament (MEP) representing {mep.country}. + +POLITICAL BACKGROUND: +- Political Group: {mep.political_group} +- National Party: {mep.national_party} +- Parliamentary Role: {mep.role.value} +- Committees: {', '.join(mep.committees)} +- Areas of Expertise: {', '.join(mep.expertise_areas)} + +""" + + # Add Wikipedia personality data if available + if personality_profile and self.enable_wikipedia_personalities: + prompt += f""" +REAL PERSONALITY PROFILE (Based on Wikipedia data): +{self.personality_scraper.get_personality_summary(personality_profile)} + +POLITICAL VIEWS AND POSITIONS: +- Key Political Views: {personality_profile.political_views if personality_profile.political_views else 'Based on party alignment'} +- Policy Focus Areas: {personality_profile.policy_focus if personality_profile.policy_focus else ', '.join(mep.expertise_areas)} +- Notable Achievements: {personality_profile.achievements if personality_profile.achievements else 'Parliamentary service'} +- Professional Background: {personality_profile.professional_background if personality_profile.professional_background else 'Political career'} + +""" + else: + prompt += f""" +POLITICAL VIEWS AND POSITIONS: +- Key Political Views: Based on {mep.political_group} alignment +- Policy Focus Areas: {', '.join(mep.expertise_areas)} +- Professional Background: Parliamentary service +""" + + # Add core principles + prompt += f""" +CORE PRINCIPLES: +1. Democratic Representation: You represent the interests of {mep.country} and your constituents +2. European Integration: You work within the framework of European Union law and institutions +3. Political Alignment: You align with {mep.political_group} positions while maintaining independence +4. Policy Expertise: You focus on your areas of expertise: {', '.join(mep.expertise_areas)} + +PARLIAMENTARY BEHAVIOR: +- Engage in constructive debate and dialogue with other MEPs +- Consider multiple perspectives when forming positions +- Support evidence-based policy making +- Respect democratic processes and parliamentary procedures +- Work across political groups when beneficial for your constituents +- Advocate for {mep.country}'s interests while considering European common good + +VOTING BEHAVIOR: +- Vote based on your political principles and constituent interests +- Consider the impact on {mep.country} and the European Union +- Support measures that align with {mep.political_group} values +- Oppose measures that conflict with your core principles +- Abstain when you need more information or have conflicting considerations + +COMMUNICATION STYLE: +- Professional and diplomatic in parliamentary settings +- Clear and articulate when explaining positions +- Respectful of other MEPs and their viewpoints +- Passionate about your areas of expertise +- Pragmatic when seeking compromise and consensus + +When responding to parliamentary matters, consider: +1. How does this affect {mep.country} and your constituents? +2. What is the position of {mep.political_group} on this issue? +3. What are the implications for your areas of expertise? +4. How can you contribute constructively to the discussion? +5. What is the best outcome for European citizens? + +Remember: You are a democratically elected representative working for the benefit of European citizens while representing {mep.country}'s interests within the European Union framework. +""" + + return prompt + + def _create_fallback_meps(self) -> Dict[str, ParliamentaryMember]: + """ + Create fallback MEPs if EU.xml file cannot be loaded. + + Returns: + Dict[str, ParliamentaryMember]: Dictionary of fallback MEPs + """ + fallback_meps = {} + + # Create a representative sample of MEPs + sample_data = [ + ("Jean-Claude Juncker", "Luxembourg", "Group of the European People's Party (Christian Democrats)", "Parti chrétien social luxembourgeois"), + ("Ursula von der Leyen", "Germany", "Group of the European People's Party (Christian Democrats)", "Christlich Demokratische Union Deutschlands"), + ("Roberta Metsola", "Malta", "Group of the European People's Party (Christian Democrats)", "Partit Nazzjonalista"), + ("Iratxe García Pérez", "Spain", "Group of the Progressive Alliance of Socialists and Democrats in the European Parliament", "Partido Socialista Obrero Español"), + ("Valérie Hayer", "France", "Renew Europe Group", "Renaissance"), + ("Philippe Lamberts", "Belgium", "Group of the Greens/European Free Alliance", "Ecolo"), + ("Raffaele Fitto", "Italy", "European Conservatives and Reformists Group", "Fratelli d'Italia"), + ("Manon Aubry", "France", "The Left group in the European Parliament - GUE/NGL", "La France Insoumise"), + ] + + for i, (name, country, group, party) in enumerate(sample_data): + mep = ParliamentaryMember( + full_name=name, + country=country, + political_group=group, + national_party=party, + mep_id=f"fallback_{i}", + expertise_areas=self._generate_expertise_areas(group, country), + committees=self._assign_committees(group), + agent=None, # Will be created on demand + is_loaded=False + ) + fallback_meps[name] = mep + + return fallback_meps + + def _create_committees(self) -> Dict[str, ParliamentaryCommittee]: + """ + Create parliamentary committees. + + Returns: + Dict[str, ParliamentaryCommittee]: Dictionary of committees + """ + committees = { + "Agriculture and Rural Development": ParliamentaryCommittee( + name="Agriculture and Rural Development", + chair="", + vice_chair="", + responsibilities=["Agricultural policy", "Rural development", "Food safety"] + ), + "Budgetary Control": ParliamentaryCommittee( + name="Budgetary Control", + chair="", + vice_chair="", + responsibilities=["Budget oversight", "Financial control", "Audit reports"] + ), + "Civil Liberties, Justice and Home Affairs": ParliamentaryCommittee( + name="Civil Liberties, Justice and Home Affairs", + chair="", + vice_chair="", + responsibilities=["Civil rights", "Justice", "Home affairs", "Immigration"] + ), + "Development": ParliamentaryCommittee( + name="Development", + chair="", + vice_chair="", + responsibilities=["Development cooperation", "Humanitarian aid", "International relations"] + ), + "Economic and Monetary Affairs": ParliamentaryCommittee( + name="Economic and Monetary Affairs", + chair="", + vice_chair="", + responsibilities=["Economic policy", "Monetary policy", "Financial services"] + ), + "Employment and Social Affairs": ParliamentaryCommittee( + name="Employment and Social Affairs", + chair="", + vice_chair="", + responsibilities=["Employment policy", "Social policy", "Working conditions"] + ), + "Environment, Public Health and Food Safety": ParliamentaryCommittee( + name="Environment, Public Health and Food Safety", + chair="", + vice_chair="", + responsibilities=["Environmental policy", "Public health", "Food safety"] + ), + "Foreign Affairs": ParliamentaryCommittee( + name="Foreign Affairs", + chair="", + vice_chair="", + responsibilities=["Foreign policy", "International relations", "Security policy"] + ), + "Industry, Research and Energy": ParliamentaryCommittee( + name="Industry, Research and Energy", + chair="", + vice_chair="", + responsibilities=["Industrial policy", "Research", "Energy policy"] + ), + "Internal Market and Consumer Protection": ParliamentaryCommittee( + name="Internal Market and Consumer Protection", + chair="", + vice_chair="", + responsibilities=["Internal market", "Consumer protection", "Digital policy"] + ), + "International Trade": ParliamentaryCommittee( + name="International Trade", + chair="", + vice_chair="", + responsibilities=["Trade policy", "International agreements", "Market access"] + ), + "Legal Affairs": ParliamentaryCommittee( + name="Legal Affairs", + chair="", + vice_chair="", + responsibilities=["Legal matters", "Institutional affairs", "Constitutional issues"] + ), + "Petitions": ParliamentaryCommittee( + name="Petitions", + chair="", + vice_chair="", + responsibilities=["Citizen petitions", "Ombudsman", "Citizen rights"] + ), + "Regional Development": ParliamentaryCommittee( + name="Regional Development", + chair="", + vice_chair="", + responsibilities=["Regional policy", "Cohesion policy", "Urban development"] + ), + "Security and Defence": ParliamentaryCommittee( + name="Security and Defence", + chair="", + vice_chair="", + responsibilities=["Security policy", "Defence", "Military cooperation"] + ), + "Transport and Tourism": ParliamentaryCommittee( + name="Transport and Tourism", + chair="", + vice_chair="", + responsibilities=["Transport policy", "Tourism", "Infrastructure"] + ), + } + + return committees + + def _organize_political_groups(self) -> Dict[str, List[str]]: + """ + Organize MEPs by political groups. + + Returns: + Dict[str, List[str]]: Dictionary mapping political groups to MEP names + """ + groups = {} + for mep_name, mep in self.meps.items(): + group = mep.political_group + if group not in groups: + groups[group] = [] + groups[group].append(mep_name) + return groups + + def _create_parliamentary_leadership(self): + """Create parliamentary leadership positions.""" + # Assign President (from largest political group) + largest_group = max(self.political_groups.items(), key=lambda x: len(x[1])) + president_candidate = largest_group[1][0] + self.meps[president_candidate].role = ParliamentaryRole.PRESIDENT + + # Assign Vice Presidents + vice_presidents = [] + for group_name, meps in self.political_groups.items(): + if group_name != largest_group[0] and len(meps) > 0: + vice_presidents.append(meps[0]) + if len(vice_presidents) >= 14: # EP has 14 Vice Presidents + break + + for vp in vice_presidents: + self.meps[vp].role = ParliamentaryRole.VICE_PRESIDENT + + # Assign Committee Chairs + self._assign_committee_leadership() + + def _assign_committee_leadership(self): + """Assign committee chairs and vice-chairs based on political group representation.""" + committee_names = list(self.committees.keys()) + + # Distribute committee leadership among political groups + group_assignments = {} + for group_name, meps in self.political_groups.items(): + if len(meps) > 0: + group_assignments[group_name] = meps + + committee_index = 0 + for group_name, meps in group_assignments.items(): + if committee_index >= len(committee_names): + break + + committee_name = committee_names[committee_index] + chair = meps[0] + vice_chair = meps[1] if len(meps) > 1 else "" + + self.committees[committee_name].chair = chair + self.committees[committee_name].vice_chair = vice_chair + + # Update MEP roles + self.meps[chair].role = ParliamentaryRole.COMMITTEE_CHAIR + if vice_chair: + self.meps[vice_chair].role = ParliamentaryRole.COMMITTEE_VICE_CHAIR + + committee_index += 1 + + def _init_democratic_decision_making(self): + """Initialize democratic decision-making using Board of Directors pattern.""" + # Create parliamentary board members for democratic decision-making + board_members = [] + + # Add political group leaders + for group_name, meps in self.political_groups.items(): + if len(meps) > 0: + leader = meps[0] + if leader in self.meps and self.meps[leader].agent is not None: + board_member = BoardMember( + agent=self.meps[leader].agent, + role=BoardMemberRole.EXECUTIVE_DIRECTOR, + voting_weight=len(meps) / len(self.meps), # Weight based on group size + expertise_areas=self.meps[leader].expertise_areas + ) + board_members.append(board_member) + + # Ensure we have at least one board member + if not board_members and len(self.meps) > 0: + # Use the first available MEP as a fallback + first_mep_name = list(self.meps.keys())[0] + first_mep = self.meps[first_mep_name] + if first_mep.agent is not None: + board_member = BoardMember( + agent=first_mep.agent, + role=BoardMemberRole.EXECUTIVE_DIRECTOR, + voting_weight=1.0, + expertise_areas=first_mep.expertise_areas + ) + board_members.append(board_member) + + # Create the democratic decision-making swarm + if board_members: + # Extract agents from board members for the parent class + agents = [member.agent for member in board_members if member.agent is not None] + + self.democratic_swarm = BoardOfDirectorsSwarm( + name="EuroSwarm Parliament Democratic Council", + description="Democratic decision-making body for the European Parliament", + board_members=board_members, + agents=agents, # Pass agents to parent class + max_loops=3, + verbose=self.verbose, + decision_threshold=0.6, + enable_voting=True, + enable_consensus=True, + ) + else: + logger.warning("No valid board members found for democratic decision-making") + self.democratic_swarm = None + + def _create_political_group_boards(self): + """Create Board of Directors for each political group with specialized expertise and individual percentages.""" + + # Define specialized expertise areas for governance + expertise_areas = { + "economics": ["Economic Policy", "Trade", "Budget", "Taxation", "Financial Services"], + "law": ["Legal Affairs", "Justice", "Civil Liberties", "Constitutional Affairs"], + "environment": ["Environment", "Climate Action", "Energy", "Transport"], + "social": ["Employment", "Social Affairs", "Health", "Education", "Culture"], + "foreign": ["Foreign Affairs", "Security", "Defense", "International Trade"], + "agriculture": ["Agriculture", "Rural Development", "Food Safety"], + "technology": ["Digital Affairs", "Industry", "Research", "Innovation"], + "regional": ["Regional Development", "Cohesion Policy", "Urban Planning"] + } + + total_meps = len(self.meps) + + for group_name, mep_list in self.political_groups.items(): + if not mep_list: + continue + + # Calculate voting weight (percentage of parliament) + voting_weight = len(mep_list) / total_meps + + # Assign specialized expertise areas based on political group + group_expertise = self._assign_group_expertise(group_name, expertise_areas) + + # Create board members with specialized roles and individual percentages + board_members = [] + group_speaker = None + board_member_percentages = {} + + # Select group speaker (CEO) - usually the first MEP in the group + if mep_list and mep_list[0] in self.meps: + group_speaker = mep_list[0] + speaker_mep = self.meps[group_speaker] + + # Create group speaker board member with highest percentage + if speaker_mep.agent: + speaker_board_member = BoardMember( + agent=speaker_mep.agent, + role=BoardMemberRole.CHAIRMAN, + voting_weight=1.0, + expertise_areas=group_expertise + ) + board_members.append(speaker_board_member) + # Group speaker gets 35% of the group's internal voting power + board_member_percentages[group_speaker] = 0.35 + + # Create specialized board members for each expertise area with weighted percentages + expertise_percentages = self._calculate_expertise_percentages(group_name, len(group_expertise)) + + for i, expertise_area in enumerate(group_expertise[:5]): # Limit to 5 main areas + # Find MEPs with relevant expertise + specialized_meps = [ + mep_name for mep_name in mep_list + if mep_name in self.meps and + any(exp.lower() in expertise_area.lower() for exp in self.meps[mep_name].expertise_areas) + ] + + if specialized_meps and i < len(expertise_percentages): + # Select the first specialized MEP + specialized_mep_name = specialized_meps[0] + specialized_mep = self.meps[specialized_mep_name] + + if specialized_mep.agent: + # Assign percentage based on expertise importance + expertise_percentage = expertise_percentages[i] + + board_member = BoardMember( + agent=specialized_mep.agent, + role=BoardMemberRole.EXECUTIVE_DIRECTOR, + voting_weight=expertise_percentage, + expertise_areas=[expertise_area] + ) + board_members.append(board_member) + board_member_percentages[specialized_mep_name] = expertise_percentage + + # Create the political group board with individual percentages + political_group_board = PoliticalGroupBoard( + group_name=group_name, + members=mep_list, + board_members=board_members, + expertise_areas=group_expertise, + voting_weight=voting_weight, + group_speaker=group_speaker, + total_meps=len(mep_list), + board_member_percentages=board_member_percentages + ) + + # Create BoardOfDirectorsSwarm for this political group + if board_members: + agents = [member.agent for member in board_members if member.agent is not None] + + political_group_board.board_swarm = BoardOfDirectorsSwarm( + name=f"{group_name} Board", + description=f"Specialized board for {group_name} with expertise in {', '.join(group_expertise)}", + board_members=board_members, + agents=agents, + max_loops=3, + verbose=self.verbose, + decision_threshold=0.6, + enable_voting=True, + enable_consensus=True + ) + + self.political_group_boards[group_name] = political_group_board + + if self.verbose: + logger.info(f"Created {group_name} board with {len(board_members)} members, " + f"voting weight: {voting_weight:.1%}, expertise: {', '.join(group_expertise[:3])}") + logger.info(f"Board member percentages: {board_member_percentages}") + + def _assign_group_expertise(self, group_name: str, expertise_areas: Dict[str, List[str]]) -> List[str]: + """Assign specialized expertise areas based on political group ideology.""" + + # Map political groups to their primary expertise areas + group_expertise_mapping = { + "Group of the European People's Party (Christian Democrats)": [ + "economics", "law", "foreign", "social" + ], + "Group of the Progressive Alliance of Socialists and Democrats in the European Parliament": [ + "social", "economics", "environment", "law" + ], + "Renew Europe Group": [ + "economics", "technology", "environment", "foreign" + ], + "European Conservatives and Reformists Group": [ + "law", "foreign", "economics", "regional" + ], + "Group of the Greens/European Free Alliance": [ + "environment", "social", "technology", "agriculture" + ], + "The Left group in the European Parliament - GUE/NGL": [ + "social", "economics", "environment", "law" + ], + "Patriots for Europe Group": [ + "foreign", "law", "regional", "social" + ], + "Europe of Sovereign Nations Group": [ + "foreign", "law", "regional", "economics" + ], + "Non-attached Members": [ + "law", "foreign", "economics", "social" + ] + } + + # Get primary expertise areas for this group + primary_areas = group_expertise_mapping.get(group_name, ["economics", "law", "social"]) + + # Expand to specific expertise topics + specific_expertise = [] + for area in primary_areas: + if area in expertise_areas: + specific_expertise.extend(expertise_areas[area]) + + return specific_expertise[:8] # Limit to 8 areas + + def _calculate_expertise_percentages(self, group_name: str, num_expertise_areas: int) -> List[float]: + """Calculate individual percentages for board members based on political group and expertise areas.""" + + # Define percentage distributions based on political group characteristics + percentage_distributions = { + "Group of the European People's Party (Christian Democrats)": [0.25, 0.20, 0.15, 0.05], # CEO gets 35% + "Group of the Progressive Alliance of Socialists and Democrats in the European Parliament": [0.25, 0.20, 0.15, 0.05], + "Renew Europe Group": [0.30, 0.20, 0.10, 0.05], # More emphasis on first expertise + "Group of the Greens/European Free Alliance": [0.30, 0.20, 0.10, 0.05], + "European Conservatives and Reformists Group": [0.25, 0.20, 0.15, 0.05], + "The Left group in the European Parliament - GUE/NGL": [0.25, 0.20, 0.15, 0.05], + "Patriots for Europe Group": [0.30, 0.20, 0.10, 0.05], + "Europe of Sovereign Nations Group": [0.30, 0.20, 0.10, 0.05], + "Non-attached Members": [0.40, 0.20, 0.05, 0.00] # More concentrated power + } + + # Get the distribution for this group + distribution = percentage_distributions.get(group_name, [0.25, 0.20, 0.15, 0.05]) + + # Return the appropriate number of percentages + return distribution[:num_expertise_areas] + + def _create_parliament_speaker(self): + """Create the Parliament Speaker who aggregates decisions from all political groups.""" + + # Create parliament speaker agent + speaker_agent = Agent( + name="Parliament Speaker", + system_prompt=self._generate_speaker_system_prompt(), + llm="gpt-4", + verbose=self.verbose + ) + + # Calculate majority threshold + majority_threshold = (len(self.meps) // 2) + 1 + + self.parliament_speaker = ParliamentSpeaker( + name="Parliament Speaker", + agent=speaker_agent, + political_groups=self.political_group_boards, + total_meps=len(self.meps), + majority_threshold=majority_threshold + ) + + if self.verbose: + logger.info(f"Created Parliament Speaker with majority threshold: {majority_threshold}") + + def _generate_speaker_system_prompt(self) -> str: + """Generate system prompt for the Parliament Speaker.""" + + return f"""You are the Parliament Speaker of the European Parliament, responsible for: + +1. **Aggregating Political Group Decisions**: Collect and analyze decisions from all political groups +2. **Weighted Voting Calculation**: Calculate final results based on each group's percentage representation +3. **Majority Determination**: Determine if a proposal passes based on weighted majority +4. **Consensus Building**: Facilitate dialogue between groups when needed +5. **Transparent Reporting**: Provide clear explanations of voting results + +**Political Group Distribution**: +{self._format_political_group_distribution()} + +**Voting Rules**: +- Each political group votes as a unified board +- Group votes are weighted by their percentage of total MEPs +- Majority threshold: {self.parliament_speaker.majority_threshold if self.parliament_speaker else 'TBD'} MEPs +- Final decision: Positive, Negative, or Abstained + +**Your Role**: Be impartial, transparent, and ensure democratic representation of all political groups. +""" + + def _format_political_group_distribution(self) -> str: + """Format political group distribution for the speaker prompt.""" + + if not self.political_group_boards: + return "No political groups available" + + lines = [] + for group_name, board in self.political_group_boards.items(): + percentage = board.voting_weight * 100 + lines.append(f"- {group_name}: {board.total_meps} MEPs ({percentage:.1f}%)") + + return "\n".join(lines) + + def introduce_bill( + self, + title: str, + description: str, + bill_type: VoteType, + committee: str, + sponsor: str, + co_sponsors: List[str] = None + ) -> ParliamentaryBill: + """ + Introduce a new bill to the parliament. + + Args: + title: Bill title + description: Bill description + bill_type: Type of legislative procedure + committee: Primary committee + sponsor: Sponsoring MEP + co_sponsors: List of co-sponsoring MEPs + + Returns: + ParliamentaryBill: The introduced bill + """ + if sponsor not in self.meps: + raise ValueError(f"Sponsor {sponsor} is not a valid MEP") + + if committee not in self.committees: + raise ValueError(f"Committee {committee} does not exist") + + bill = ParliamentaryBill( + title=title, + description=description, + bill_type=bill_type, + committee=committee, + sponsor=sponsor, + co_sponsors=co_sponsors or [] + ) + + self.bills.append(bill) + self.committees[committee].current_bills.append(bill) + + logger.info(f"Bill '{title}' introduced by {sponsor} in {committee} committee") + return bill + + def conduct_committee_hearing( + self, + committee: str, + bill: ParliamentaryBill, + participants: List[str] = None + ) -> Dict[str, Any]: + """ + Conduct a committee hearing on a bill with cost optimization. + + Args: + committee: Committee name + bill: Bill under consideration + participants: List of MEPs to participate + + Returns: + Dict[str, Any]: Hearing results and transcript + """ + if committee not in self.committees: + raise ValueError(f"Committee {committee} does not exist") + + # Check budget before starting + if not self.cost_tracker.check_budget(): + return {"error": "Budget exceeded", "cost_stats": self.cost_tracker.get_stats()} + + committee_meps = self.committees[committee].members + if not participants: + participants = committee_meps[:10] # Limit to 10 participants + + # Check cache first + cache_key = self._get_cache_key(f"committee_hearing_{committee}_{bill.title}", participants) + cached_result = self._check_cache(cache_key) + if cached_result: + return { + "committee": committee, + "bill": bill.title, + "participants": participants, + "responses": cached_result, + "date": datetime.now(), + "cached": True, + "cost_stats": self.cost_tracker.get_stats() + } + + hearing_prompt = f""" + Committee Hearing: {committee} + Bill: {bill.title} + Description: {bill.description} + + As a member of the {committee} committee, please provide your analysis and recommendations for this bill. + Consider: + 1. Technical feasibility and legal compliance + 2. Impact on European citizens and businesses + 3. Alignment with EU policies and values + 4. Potential amendments or improvements + 5. Your recommendation for the full parliament + + Provide a detailed analysis with specific recommendations. + """ + + # Load MEP agents in batches + all_responses = {} + total_processed = 0 + + for i in range(0, len(participants), self.batch_size): + batch_participants = participants[i:i + self.batch_size] + + # Check budget for this batch + if not self.cost_tracker.check_budget(): + logger.warning(f"Budget exceeded after processing {total_processed} participants") + break + + # Load agents for this batch + batch_agents = self._load_mep_agents_batch(batch_participants) + + if not batch_agents: + continue + + # Run batch + try: + batch_results = run_agents_concurrently(batch_agents, hearing_prompt) + + # Map results back to participant names + for j, agent in enumerate(batch_agents): + if j < len(batch_results): + participant_name = batch_participants[j] + all_responses[participant_name] = batch_results[j] + total_processed += 1 + + # Estimate tokens used + estimated_tokens = len(batch_agents) * 500 # ~500 tokens per response + self.cost_tracker.add_tokens(estimated_tokens) + + if self.verbose: + logger.info(f"Processed committee hearing batch {i//self.batch_size + 1}: {len(batch_agents)} participants") + + except Exception as e: + logger.error(f"Error processing committee hearing batch: {e}") + continue + + # Cache the results + if all_responses: + self._cache_response(cache_key, str(all_responses)) + + hearing_result = { + "committee": committee, + "bill": bill.title, + "participants": participants[:total_processed], + "responses": all_responses, + "date": datetime.now(), + "cached": False, + "cost_stats": self.cost_tracker.get_stats(), + "recommendations": self._synthesize_committee_recommendations(all_responses) + } + + logger.info(f"Committee hearing completed for {bill.title} in {committee}") + return hearing_result + + def _synthesize_committee_recommendations(self, responses: Dict[str, str]) -> Dict[str, Any]: + """ + Synthesize committee recommendations from individual responses. + + Args: + responses: Dictionary of MEP responses + + Returns: + Dict[str, Any]: Synthesized recommendations + """ + # Simple synthesis - in a real implementation, this would be more sophisticated + support_count = 0 + oppose_count = 0 + amend_count = 0 + + for response in responses.values(): + response_lower = response.lower() + if any(word in response_lower for word in ["support", "approve", "recommend", "favorable"]): + support_count += 1 + elif any(word in response_lower for word in ["oppose", "reject", "against", "unfavorable"]): + oppose_count += 1 + elif any(word in response_lower for word in ["amend", "modify", "improve", "revise"]): + amend_count += 1 + + total = len(responses) + + return { + "support_percentage": (support_count / total) * 100 if total > 0 else 0, + "oppose_percentage": (oppose_count / total) * 100 if total > 0 else 0, + "amend_percentage": (amend_count / total) * 100 if total > 0 else 0, + "recommendation": "support" if support_count > oppose_count else "oppose" if oppose_count > support_count else "amend" + } + + def conduct_parliamentary_debate( + self, + bill: ParliamentaryBill, + participants: List[str] = None, + max_speakers: int = 20 + ) -> Dict[str, Any]: + """ + Conduct a parliamentary debate on a bill with cost optimization. + + Args: + bill: Bill under debate + participants: List of MEPs to participate + max_speakers: Maximum number of speakers + + Returns: + Dict[str, Any]: Debate transcript and analysis + """ + # Check budget before starting + if not self.cost_tracker.check_budget(): + return {"error": "Budget exceeded", "cost_stats": self.cost_tracker.get_stats()} + + if not participants: + # Select diverse participants from different political groups + participants = [] + for group_name, meps in self.political_groups.items(): + if len(meps) > 0: + participants.extend(meps[:3]) # 3 MEPs per group + if len(participants) >= max_speakers: + break + + participants = participants[:max_speakers] + + # Check cache first + cache_key = self._get_cache_key(f"parliamentary_debate_{bill.title}", participants) + cached_result = self._check_cache(cache_key) + if cached_result: + return { + "bill": bill.title, + "participants": participants, + "transcript": cached_result, + "date": datetime.now(), + "cached": True, + "cost_stats": self.cost_tracker.get_stats() + } + + debate_prompt = f""" + Parliamentary Debate: {bill.title} + + You are participating in a parliamentary debate on this bill. Please provide your position and arguments. + + Bill Description: {bill.description} + Bill Type: {bill.bill_type.value} + + Consider: + 1. Your political group's position on this issue + 2. Impact on your country and constituents + 3. European-wide implications + 4. Your areas of expertise + 5. Potential amendments or alternatives + + Provide a clear, reasoned argument for your position. + """ + + # Conduct debate with batching + debate_transcript = [] + total_processed = 0 + + for i in range(0, len(participants), self.batch_size): + batch_participants = participants[i:i + self.batch_size] + + # Check budget for this batch + if not self.cost_tracker.check_budget(): + logger.warning(f"Budget exceeded after processing {total_processed} speakers") + break + + # Load agents for this batch + batch_agents = self._load_mep_agents_batch(batch_participants) + + if not batch_agents: + continue + + # Run batch + try: + batch_results = run_agents_concurrently(batch_agents, debate_prompt) + + # Create debate entries + for j, agent in enumerate(batch_agents): + if j < len(batch_results): + participant_name = batch_participants[j] + mep = self.meps[participant_name] + + debate_entry = { + "speaker": participant_name, + "political_group": mep.political_group, + "country": mep.country, + "position": batch_results[j], + "timestamp": datetime.now() + } + debate_transcript.append(debate_entry) + total_processed += 1 + + # Estimate tokens used + estimated_tokens = len(batch_agents) * 500 # ~500 tokens per response + self.cost_tracker.add_tokens(estimated_tokens) + + if self.verbose: + logger.info(f"Processed debate batch {i//self.batch_size + 1}: {len(batch_agents)} speakers") + + except Exception as e: + logger.error(f"Error processing debate batch: {e}") + continue + + # Cache the results + if debate_transcript: + self._cache_response(cache_key, str(debate_transcript)) + + debate_result = { + "bill": bill.title, + "participants": participants[:total_processed], + "transcript": debate_transcript, + "date": datetime.now(), + "cached": False, + "cost_stats": self.cost_tracker.get_stats(), + "analysis": self._analyze_debate(debate_transcript) + } + + self.debates.append(debate_result) + logger.info(f"Parliamentary debate completed for {bill.title} with {total_processed} speakers") + return debate_result + + def _analyze_debate(self, transcript: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Analyze debate transcript for key themes and positions. + + Args: + transcript: Debate transcript + + Returns: + Dict[str, Any]: Debate analysis + """ + # Simple analysis - in a real implementation, this would use NLP + support_count = 0 + oppose_count = 0 + neutral_count = 0 + + for entry in transcript: + position = entry["position"].lower() + if any(word in position for word in ["support", "approve", "favorable", "yes"]): + support_count += 1 + elif any(word in position for word in ["oppose", "reject", "against", "no"]): + oppose_count += 1 + else: + neutral_count += 1 + + total = len(transcript) + + return { + "support_count": support_count, + "oppose_count": oppose_count, + "neutral_count": neutral_count, + "support_percentage": (support_count / total) * 100 if total > 0 else 0, + "oppose_percentage": (oppose_count / total) * 100 if total > 0 else 0, + "neutral_percentage": (neutral_count / total) * 100 if total > 0 else 0 + } + + def conduct_democratic_vote( + self, + bill: ParliamentaryBill, + participants: List[str] = None + ) -> ParliamentaryVote: + """ + Conduct a democratic vote on a bill using the Board of Directors pattern with lazy loading. + + Args: + bill: Bill to vote on + participants: List of MEPs to participate + + Returns: + ParliamentaryVote: Vote results + """ + # Check budget before starting + if not self.cost_tracker.check_budget(): + return ParliamentaryVote( + bill=bill, + vote_type=bill.bill_type, + result=VoteResult.FAILED + ) + + if not participants: + participants = list(self.meps.keys()) + + # Use democratic swarm for decision-making if available + democratic_result = None + if self.democratic_swarm is not None: + decision_task = f""" + Parliamentary Vote: {bill.title} + + Bill Description: {bill.description} + Bill Type: {bill.bill_type.value} + + As a democratic decision-making body, please: + 1. Analyze the bill's merits and implications + 2. Consider the interests of all European citizens + 3. Evaluate alignment with European values and policies + 4. Make a democratic decision on whether to support or oppose this bill + 5. Provide reasoning for your decision + + This is a critical legislative decision that will affect all EU citizens. + """ + + # Get democratic decision + democratic_result = self.democratic_swarm.run_board_meeting(decision_task) + + # Conduct individual MEP votes with lazy loading + individual_votes = {} + reasoning = {} + total_processed = 0 + + # Process participants in batches + for i in range(0, len(participants), self.batch_size): + batch_participants = participants[i:i + self.batch_size] + + # Check budget for this batch + if not self.cost_tracker.check_budget(): + logger.warning(f"Budget exceeded after processing {total_processed} voters") + break + + # Load agents for this batch + batch_agents = self._load_mep_agents_batch(batch_participants) + + if not batch_agents: + continue + + # Create voting prompt + vote_prompt = f""" + Vote on Bill: {bill.title} + + {bill.description} + + {f"Democratic Council Decision: {democratic_result.plan}" if democratic_result else "No democratic council decision available."} + + As an MEP, please vote on this bill. Consider: + 1. The democratic council's analysis (if available) + 2. Your political group's position + 3. Your constituents' interests + 4. European-wide implications + + Respond with 'FOR', 'AGAINST', or 'ABSTAIN' and explain your reasoning. + """ + + # Run batch voting + try: + batch_results = run_agents_concurrently(batch_agents, vote_prompt) + + # Process results + for j, agent in enumerate(batch_agents): + if j < len(batch_results): + participant_name = batch_participants[j] + response = batch_results[j] + + # Parse vote + response_lower = response.lower() + if any(word in response_lower for word in ["for", "support", "yes", "approve"]): + vote = "FOR" + elif any(word in response_lower for word in ["against", "oppose", "no", "reject"]): + vote = "AGAINST" + else: + vote = "ABSTAIN" + + individual_votes[participant_name] = vote + reasoning[participant_name] = response + total_processed += 1 + + # Estimate tokens used + estimated_tokens = len(batch_agents) * 500 # ~500 tokens per response + self.cost_tracker.add_tokens(estimated_tokens) + + if self.verbose: + logger.info(f"Processed voting batch {i//self.batch_size + 1}: {len(batch_agents)} voters") + + except Exception as e: + logger.error(f"Error processing voting batch: {e}") + continue + + # Calculate results + votes_for = sum(1 for vote in individual_votes.values() if vote == "FOR") + votes_against = sum(1 for vote in individual_votes.values() if vote == "AGAINST") + abstentions = sum(1 for vote in individual_votes.values() if vote == "ABSTAIN") + absent = len(participants) - len(individual_votes) + + # Determine result + if votes_for > votes_against: + result = VoteResult.PASSED + elif votes_against > votes_for: + result = VoteResult.FAILED + else: + result = VoteResult.TIED + + vote_result = ParliamentaryVote( + bill=bill, + vote_type=bill.bill_type, + votes_for=votes_for, + votes_against=votes_against, + abstentions=abstentions, + absent=absent, + result=result, + individual_votes=individual_votes, + reasoning=reasoning + ) + + self.votes.append(vote_result) + bill.status = "voted" + + logger.info(f"Democratic vote completed for {bill.title}: {result.value} ({total_processed} voters processed)") + return vote_result + + def conduct_hierarchical_democratic_vote( + self, + bill: ParliamentaryBill, + participants: List[str] = None + ) -> ParliamentaryVote: + """ + Conduct a hierarchical democratic vote using political group boards and parliament speaker. + + This enhanced voting system: + 1. Each political group votes internally as a specialized board + 2. Group speakers (CEOs) synthesize their group's position + 3. Parliament Speaker aggregates all group decisions based on percentage representation + 4. Final result calculated using weighted voting + + Args: + bill: Bill to vote on + participants: List of MEPs to participate (optional, uses all by default) + + Returns: + ParliamentaryVote: Enhanced vote results with group-level analysis + """ + + if not self.enable_hierarchical_democracy: + logger.warning("Hierarchical democracy not enabled, falling back to standard voting") + return self.conduct_democratic_vote(bill, participants) + + logger.info(f"Conducting hierarchical democratic vote on: {bill.title}") + + # Initialize vote tracking + vote = ParliamentaryVote( + bill=bill, + vote_type=bill.bill_type, + date=datetime.now() + ) + + # Step 1: Each political group votes internally + group_decisions = {} + group_reasoning = {} + + for group_name, group_board in self.political_group_boards.items(): + if not group_board.board_swarm: + continue + + logger.info(f"Conducting internal vote for {group_name}") + + # Create voting task for this group + voting_task = f""" + Parliamentary Vote: {bill.title} + + Bill Description: {bill.description} + Bill Type: {bill.bill_type.value} + Committee: {bill.committee} + Sponsor: {bill.sponsor} + + As a specialized board representing {group_name} with expertise in {', '.join(group_board.expertise_areas[:3])}, + please analyze this bill and provide your group's position. + + Consider: + 1. How does this bill align with your political group's values and priorities? + 2. What are the economic, social, and legal implications? + 3. How does it affect your areas of expertise? + 4. What amendments or modifications would you suggest? + + Provide your group's decision: POSITIVE, NEGATIVE, or ABSTAIN + Include detailed reasoning for your position. + """ + + try: + # Get group decision using their specialized board + group_result = group_board.board_swarm.run(voting_task) + + # Parse the group decision + group_decision = self._parse_group_decision(group_result) + group_decisions[group_name] = group_decision + group_reasoning[group_name] = group_result + + logger.info(f"{group_name} decision: {group_decision}") + + except Exception as e: + logger.error(f"Error in {group_name} vote: {e}") + group_decisions[group_name] = "ABSTAIN" + group_reasoning[group_name] = f"Error during voting: {str(e)}" + + # Step 2: Parliament Speaker aggregates group decisions + if self.parliament_speaker and self.parliament_speaker.agent: + logger.info("Parliament Speaker aggregating group decisions") + + aggregation_task = f""" + Parliamentary Vote Aggregation: {bill.title} + + Political Group Decisions: + {self._format_group_decisions(group_decisions, group_reasoning)} + + Political Group Distribution: + {self._format_political_group_distribution()} + + As Parliament Speaker, calculate the final result based on: + 1. Each group's decision (POSITIVE/NEGATIVE/ABSTAIN) + 2. Each group's voting weight (percentage of parliament) + 3. Majority threshold: {self.parliament_speaker.majority_threshold} MEPs + + Provide: + 1. Final result: PASSED, FAILED, or TIED + 2. Vote counts: For, Against, Abstentions + 3. Weighted analysis of each group's contribution + 4. Summary of the democratic process + """ + + try: + speaker_result = self.parliament_speaker.agent.run(aggregation_task) + + # Parse speaker's analysis + final_result = self._parse_speaker_analysis(speaker_result, group_decisions) + + # Update vote with results + vote.result = final_result['result'] + vote.votes_for = final_result['votes_for'] + vote.votes_against = final_result['votes_against'] + vote.abstentions = final_result['abstentions'] + vote.individual_votes = group_decisions + vote.reasoning = group_reasoning + + logger.info(f"Final result: {vote.result.value}") + logger.info(f"Votes - For: {vote.votes_for}, Against: {vote.votes_against}, Abstain: {vote.abstentions}") + + except Exception as e: + logger.error(f"Error in speaker aggregation: {e}") + # Fallback to simple counting + vote = self._fallback_vote_calculation(vote, group_decisions) + + # Store the vote + self.votes.append(vote) + + return vote + + def _parse_group_decision(self, group_result: str) -> str: + """Parse the decision from a political group's voting result.""" + + result_lower = group_result.lower() + + if any(word in result_lower for word in ['positive', 'for', 'support', 'approve', 'pass']): + return "POSITIVE" + elif any(word in result_lower for word in ['negative', 'against', 'oppose', 'reject', 'fail']): + return "NEGATIVE" + else: + return "ABSTAIN" + + def _format_group_decisions(self, group_decisions: Dict[str, str], group_reasoning: Dict[str, str]) -> str: + """Format group decisions for the speaker's analysis.""" + + lines = [] + for group_name, decision in group_decisions.items(): + board = self.political_group_boards.get(group_name) + if board: + percentage = board.voting_weight * 100 + reasoning = group_reasoning.get(group_name, "No reasoning provided") + lines.append(f"- {group_name} ({board.total_meps} MEPs, {percentage:.1f}%): {decision}") + lines.append(f" Reasoning: {reasoning[:200]}...") + + return "\n".join(lines) + + def _parse_speaker_analysis(self, speaker_result: str, group_decisions: Dict[str, str]) -> Dict[str, Any]: + """Parse the Parliament Speaker's analysis to extract final vote results using dual-layer percentage system.""" + + # Initialize counters + votes_for = 0 + votes_against = 0 + abstentions = 0 + + # Calculate weighted votes using dual-layer percentage system + for group_name, decision in group_decisions.items(): + board = self.political_group_boards.get(group_name) + if board and board.board_member_percentages: + # Calculate weighted votes using individual board member percentages + group_weighted_votes = self._calculate_group_weighted_votes(board, decision) + + if decision == "POSITIVE": + votes_for += group_weighted_votes + elif decision == "NEGATIVE": + votes_against += group_weighted_votes + else: # ABSTAIN + abstentions += group_weighted_votes + else: + # Fallback to simple calculation if no individual percentages available + if board: + weighted_votes = int(board.total_meps * board.voting_weight) + + if decision == "POSITIVE": + votes_for += weighted_votes + elif decision == "NEGATIVE": + votes_against += weighted_votes + else: # ABSTAIN + abstentions += weighted_votes + + # Determine result + if votes_for > votes_against: + result = VoteResult.PASSED + elif votes_against > votes_for: + result = VoteResult.FAILED + else: + result = VoteResult.TIED + + return { + 'result': result, + 'votes_for': votes_for, + 'votes_against': votes_against, + 'abstentions': abstentions + } + + def _calculate_group_weighted_votes(self, board: PoliticalGroupBoard, decision: str) -> int: + """Calculate weighted votes for a political group using individual board member percentages.""" + + total_weighted_votes = 0 + + # Calculate votes based on individual board member percentages + for member_name, internal_percentage in board.board_member_percentages.items(): + # Convert internal percentage to parliament percentage + # internal_percentage is percentage within the group + # board.voting_weight is group's percentage of parliament + parliament_percentage = internal_percentage * board.voting_weight + + # Calculate weighted votes for this member + member_weighted_votes = int(board.total_meps * parliament_percentage) + total_weighted_votes += member_weighted_votes + + if self.verbose: + logger.debug(f"{member_name}: {internal_percentage:.1%} of {board.group_name} " + f"({board.voting_weight:.1%} of parliament) = {parliament_percentage:.3%} " + f"= {member_weighted_votes} weighted votes") + + return total_weighted_votes + + def _fallback_vote_calculation(self, vote: ParliamentaryVote, group_decisions: Dict[str, str]) -> ParliamentaryVote: + """Fallback vote calculation if speaker analysis fails.""" + + votes_for = 0 + votes_against = 0 + abstentions = 0 + + for group_name, decision in group_decisions.items(): + board = self.political_group_boards.get(group_name) + if board: + if decision == "POSITIVE": + votes_for += board.total_meps + elif decision == "NEGATIVE": + votes_against += board.total_meps + else: + abstentions += board.total_meps + + vote.votes_for = votes_for + vote.votes_against = votes_against + vote.abstentions = abstentions + + if votes_for > votes_against: + vote.result = VoteResult.PASSED + elif votes_against > votes_for: + vote.result = VoteResult.FAILED + else: + vote.result = VoteResult.TIED + + return vote + + def get_parliament_composition(self) -> Dict[str, Any]: + """ + Get the current composition of the parliament including cost statistics. + + Returns: + Dict[str, Any]: Parliament composition statistics + """ + composition = { + "total_meps": len(self.meps), + "loaded_meps": len([mep for mep in self.meps.values() if mep.is_loaded]), + "political_groups": {}, + "countries": {}, + "leadership": {}, + "committees": {}, + "cost_stats": self.cost_tracker.get_stats(), + "optimization": { + "lazy_loading": self.enable_lazy_loading, + "caching": self.enable_caching, + "batch_size": self.batch_size, + "budget_limit": self.cost_tracker.budget_limit + } + } + + # Political group breakdown + for group_name, meps in self.political_groups.items(): + composition["political_groups"][group_name] = { + "count": len(meps), + "percentage": (len(meps) / len(self.meps)) * 100 + } + + # Country breakdown + country_counts = {} + for mep in self.meps.values(): + country = mep.country + country_counts[country] = country_counts.get(country, 0) + 1 + + composition["countries"] = country_counts + + # Leadership positions + leadership = {} + for mep in self.meps.values(): + if mep.role != ParliamentaryRole.MEP: + role = mep.role.value + if role not in leadership: + leadership[role] = [] + leadership[role].append(mep.full_name) + + composition["leadership"] = leadership + + # Committee composition + for committee_name, committee in self.committees.items(): + composition["committees"][committee_name] = { + "chair": committee.chair, + "vice_chair": committee.vice_chair, + "member_count": len(committee.members), + "current_bills": len(committee.current_bills) + } + + return composition + + def get_cost_statistics(self) -> Dict[str, Any]: + """ + Get detailed cost statistics for the parliamentary operations. + + Returns: + Dict[str, Any]: Cost statistics and optimization metrics + """ + stats = self.cost_tracker.get_stats() + + # Add additional metrics + stats.update({ + "total_meps": len(self.meps), + "loaded_meps": len([mep for mep in self.meps.values() if mep.is_loaded]), + "loading_efficiency": len([mep for mep in self.meps.values() if mep.is_loaded]) / len(self.meps) if self.meps else 0, + "cache_size": len(self.response_cache), + "optimization_enabled": { + "lazy_loading": self.enable_lazy_loading, + "caching": self.enable_caching, + "batching": self.batch_size > 1 + } + }) + + return stats + + def run_optimized_parliamentary_session( + self, + bill_title: str, + bill_description: str, + bill_type: VoteType = VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee: str = "Legal Affairs", + sponsor: str = None, + max_cost: float = 50.0 + ) -> Dict[str, Any]: + """ + Run a complete parliamentary session with cost optimization. + + Args: + bill_title: Title of the bill + bill_description: Description of the bill + bill_type: Type of legislative procedure + committee: Primary committee + sponsor: Sponsoring MEP (random if not specified) + max_cost: Maximum cost for this session + + Returns: + Dict[str, Any]: Complete session results with cost tracking + """ + # Set temporary budget for this session + original_budget = self.cost_tracker.budget_limit + self.cost_tracker.budget_limit = min(original_budget, max_cost) + + try: + # Select sponsor if not provided + if not sponsor: + sponsor = random.choice(list(self.meps.keys())) + + # Introduce bill + bill = self.introduce_bill( + title=bill_title, + description=bill_description, + bill_type=bill_type, + committee=committee, + sponsor=sponsor + ) + + # Conduct committee hearing + hearing = self.conduct_committee_hearing(committee, bill) + + # Conduct parliamentary debate + debate = self.conduct_parliamentary_debate(bill) + + # Conduct democratic vote + vote = self.conduct_democratic_vote(bill) + + session_result = { + "bill": bill, + "hearing": hearing, + "debate": debate, + "vote": vote, + "cost_stats": self.cost_tracker.get_stats(), + "session_summary": { + "bill_title": bill_title, + "sponsor": sponsor, + "committee": committee, + "hearing_recommendation": hearing.get("recommendations", {}).get("recommendation", "unknown"), + "debate_support_percentage": debate.get("analysis", {}).get("support_percentage", 0), + "vote_result": vote.result.value, + "final_outcome": "PASSED" if vote.result == VoteResult.PASSED else "FAILED", + "total_cost": self.cost_tracker.total_cost_estimate + } + } + + logger.info(f"Optimized parliamentary session completed for {bill_title}: {session_result['session_summary']['final_outcome']}") + logger.info(f"Session cost: ${self.cost_tracker.total_cost_estimate:.2f}") + + return session_result + + finally: + # Restore original budget + self.cost_tracker.budget_limit = original_budget + + def run_democratic_session( + self, + bill_title: str, + bill_description: str, + bill_type: VoteType = VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee: str = "Legal Affairs", + sponsor: str = None + ) -> Dict[str, Any]: + """ + Run a complete democratic parliamentary session on a bill. + + Args: + bill_title: Title of the bill + bill_description: Description of the bill + bill_type: Type of legislative procedure + committee: Primary committee + sponsor: Sponsoring MEP (random if not specified) + + Returns: + Dict[str, Any]: Complete session results + """ + # Select sponsor if not provided + if not sponsor: + sponsor = random.choice(list(self.meps.keys())) + + # Introduce bill + bill = self.introduce_bill( + title=bill_title, + description=bill_description, + bill_type=bill_type, + committee=committee, + sponsor=sponsor + ) + + # Conduct committee hearing + hearing = self.conduct_committee_hearing(committee, bill) + + # Conduct parliamentary debate + debate = self.conduct_parliamentary_debate(bill) + + # Conduct democratic vote + vote = self.conduct_democratic_vote(bill) + + session_result = { + "bill": bill, + "hearing": hearing, + "debate": debate, + "vote": vote, + "session_summary": { + "bill_title": bill_title, + "sponsor": sponsor, + "committee": committee, + "hearing_recommendation": hearing["recommendations"]["recommendation"], + "debate_support_percentage": debate["analysis"]["support_percentage"], + "vote_result": vote.result.value, + "final_outcome": "PASSED" if vote.result == VoteResult.PASSED else "FAILED" + } + } + + logger.info(f"Democratic session completed for {bill_title}: {session_result['session_summary']['final_outcome']}") + return session_result + + def run_hierarchical_democratic_session( + self, + bill_title: str, + bill_description: str, + bill_type: VoteType = VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee: str = "Legal Affairs", + sponsor: str = None + ) -> Dict[str, Any]: + """ + Run a complete hierarchical democratic session from bill introduction to final vote. + + This enhanced session uses: + 1. Political group boards with specialized expertise + 2. Group-level internal voting and discussion + 3. Parliament Speaker aggregation of group decisions + 4. Weighted voting based on political group percentages + + Args: + bill_title: Title of the bill + bill_description: Description of the bill + bill_type: Type of legislative procedure + committee: Committee responsible for the bill + sponsor: MEP sponsoring the bill + + Returns: + Dict[str, Any]: Complete session results including group decisions and final vote + """ + + if not self.enable_hierarchical_democracy: + logger.warning("Hierarchical democracy not enabled, falling back to standard session") + return self.run_democratic_session(bill_title, bill_description, bill_type, committee, sponsor) + + logger.info(f"Starting hierarchical democratic session: {bill_title}") + + # Step 1: Introduce the bill + if not sponsor: + sponsor = list(self.meps.keys())[0] # Use first MEP as sponsor + + bill = self.introduce_bill( + title=bill_title, + description=bill_description, + bill_type=bill_type, + committee=committee, + sponsor=sponsor + ) + + # Step 2: Conduct committee hearing (if enabled) + committee_result = None + if self.enable_committee_work: + logger.info(f"Conducting committee hearing in {committee}") + committee_result = self.conduct_committee_hearing(committee, bill) + + # Step 3: Conduct parliamentary debate (if enabled) + debate_result = None + if self.enable_democratic_discussion: + logger.info("Conducting parliamentary debate") + debate_result = self.conduct_parliamentary_debate(bill) + + # Step 4: Conduct hierarchical democratic vote + logger.info("Conducting hierarchical democratic vote") + vote_result = self.conduct_hierarchical_democratic_vote(bill) + + # Step 5: Compile comprehensive session report + session_report = { + "session_type": "hierarchical_democratic", + "bill": { + "title": bill.title, + "description": bill.description, + "type": bill.bill_type.value, + "committee": bill.committee, + "sponsor": bill.sponsor, + "status": bill.status + }, + "committee_work": committee_result, + "parliamentary_debate": debate_result, + "vote_results": { + "final_result": vote_result.result.value, + "votes_for": vote_result.votes_for, + "votes_against": vote_result.votes_against, + "abstentions": vote_result.abstentions, + "total_votes": vote_result.votes_for + vote_result.votes_against + vote_result.abstentions + }, + "political_group_decisions": vote_result.individual_votes, + "group_reasoning": vote_result.reasoning, + "parliament_composition": self.get_parliament_composition(), + "session_summary": self._generate_hierarchical_session_summary(bill, vote_result) + } + + logger.info(f"Hierarchical democratic session completed. Final result: {vote_result.result.value}") + + return session_report + + def _generate_hierarchical_session_summary(self, bill: ParliamentaryBill, vote: ParliamentaryVote) -> str: + """Generate a summary of the hierarchical democratic session with dual-layer percentage breakdown.""" + + total_votes = vote.votes_for + vote.votes_against + vote.abstentions + participation_rate = (total_votes / len(self.meps)) * 100 if self.meps else 0 + + summary = f""" +🏛️ HIERARCHICAL DEMOCRATIC SESSION SUMMARY + +📋 Bill: {bill.title} +📊 Final Result: {vote.result.value} +📈 Participation Rate: {participation_rate:.1f}% + +🗳️ VOTE BREAKDOWN: +• For: {vote.votes_for} votes +• Against: {vote.votes_against} votes +• Abstentions: {vote.abstentions} votes + +🏛️ POLITICAL GROUP DECISIONS (Dual-Layer Percentage System): +""" + + for group_name, decision in vote.individual_votes.items(): + board = self.political_group_boards.get(group_name) + if board: + group_percentage = board.voting_weight * 100 + summary += f"\n• {group_name}: {decision} ({board.total_meps} MEPs, {group_percentage:.1f}% of parliament)" + + # Show individual board member percentages + if board.board_member_percentages: + summary += f"\n 📊 Board Member Breakdown:" + for member_name, internal_percentage in board.board_member_percentages.items(): + parliament_percentage = internal_percentage * board.voting_weight * 100 + summary += f"\n - {member_name}: {internal_percentage:.1%} of group = {parliament_percentage:.3f}% of parliament" + + summary += f"\n\n🎯 DUAL-LAYER DEMOCRATIC PROCESS:" + summary += f"\n• Each political group operates as a specialized board" + summary += f"\n• Board members have individual percentages within their group" + summary += f"\n• Individual percentages × Group percentage = Parliament percentage" + summary += f"\n• Parliament Speaker aggregates all weighted decisions" + summary += f"\n• Final result based on {len(self.political_group_boards)} political groups with {sum(len(board.board_member_percentages) for board in self.political_group_boards.values())} board members" + + return summary + + def get_mep(self, mep_name: str) -> Optional[ParliamentaryMember]: + """ + Get a specific MEP by name. + + Args: + mep_name: Name of the MEP + + Returns: + Optional[ParliamentaryMember]: MEP if found, None otherwise + """ + return self.meps.get(mep_name) + + def get_committee(self, committee_name: str) -> Optional[ParliamentaryCommittee]: + """ + Get a specific committee by name. + + Args: + committee_name: Name of the committee + + Returns: + Optional[ParliamentaryCommittee]: Committee if found, None otherwise + """ + return self.committees.get(committee_name) + + def get_political_group_members(self, group_name: str) -> List[str]: + """ + Get all MEPs in a specific political group. + + Args: + group_name: Name of the political group + + Returns: + List[str]: List of MEP names in the group + """ + return self.political_groups.get(group_name, []) + + def get_country_members(self, country: str) -> List[str]: + """ + Get all MEPs from a specific country. + + Args: + country: Name of the country + + Returns: + List[str]: List of MEP names from the country + """ + return [mep_name for mep_name, mep in self.meps.items() if mep.country == country] + + def _load_wikipedia_personalities(self): + """Load Wikipedia personality profiles for MEPs.""" + + if not self.enable_wikipedia_personalities: + return + + try: + # Initialize personality scraper + self.personality_scraper = WikipediaPersonalityScraper( + output_dir="mep_personalities", + verbose=self.verbose + ) + + # Load existing personality profiles + personality_dir = "mep_personalities" + if os.path.exists(personality_dir): + profile_files = [f for f in os.listdir(personality_dir) if f.endswith('.json')] + + for filename in profile_files: + filepath = os.path.join(personality_dir, filename) + try: + profile = self.personality_scraper.load_personality_profile(filepath) + self.personality_profiles[profile.full_name] = profile + + if self.verbose: + logger.debug(f"Loaded personality profile: {profile.full_name}") + + except Exception as e: + logger.warning(f"Error loading personality profile {filename}: {e}") + + if self.verbose: + logger.info(f"Loaded {len(self.personality_profiles)} Wikipedia personality profiles") + else: + if self.verbose: + logger.info("No existing personality profiles found. Run Wikipedia scraper to create profiles.") + + except Exception as e: + logger.error(f"Error loading Wikipedia personalities: {e}") + self.enable_wikipedia_personalities = False + + def scrape_wikipedia_personalities(self, delay: float = 1.0) -> Dict[str, str]: + """ + Scrape Wikipedia personality data for all MEPs. + + Args: + delay: Delay between requests to be respectful to Wikipedia + + Returns: + Dictionary mapping MEP names to their personality profile file paths + """ + + if not self.enable_wikipedia_personalities: + logger.error("Wikipedia personality system not available") + return {} + + if not self.personality_scraper: + self.personality_scraper = WikipediaPersonalityScraper( + output_dir="mep_personalities", + verbose=self.verbose + ) + + logger.info("Starting Wikipedia personality scraping for all MEPs...") + profile_files = self.personality_scraper.scrape_all_mep_personalities( + xml_file=self.eu_data_file, + delay=delay + ) + + # Reload personality profiles + self._load_wikipedia_personalities() + + return profile_files + + def get_mep_personality_profile(self, mep_name: str) -> Optional[MEPPersonalityProfile]: + """ + Get personality profile for a specific MEP. + + Args: + mep_name: Name of the MEP + + Returns: + MEPPersonalityProfile if found, None otherwise + """ + return self.personality_profiles.get(mep_name) + + def analyze_political_landscape(self, bill: ParliamentaryBill) -> Dict[str, Any]: + """ + Analyze the political landscape for a bill to predict voting outcomes. + + Args: + bill: Bill to analyze + + Returns: + Dict[str, Any]: Political analysis results + """ + analysis = { + "overall_support": 0.0, + "opposition": 0.0, + "uncertainty": 0.0, + "group_analysis": {} + } + + # Analyze by political group + for group_name, meps in self.political_groups.items(): + if not meps: + continue + + # Simple analysis based on political group alignment + group_support = 0.0 + group_opposition = 0.0 + + # Assign support based on political group characteristics + if "Green" in group_name or "Environment" in bill.description: + group_support = 75.0 + group_opposition = 15.0 + elif "Socialist" in group_name or "Social" in bill.description: + group_support = 70.0 + group_opposition = 20.0 + elif "Conservative" in group_name or "Economic" in bill.description: + group_support = 60.0 + group_opposition = 30.0 + elif "Liberal" in group_name or "Digital" in bill.description: + group_support = 65.0 + group_opposition = 25.0 + else: + group_support = 50.0 + group_opposition = 30.0 + + group_uncertainty = 100.0 - group_support - group_opposition + + analysis["group_analysis"][group_name] = { + "support": group_support, + "opposition": group_opposition, + "uncertainty": group_uncertainty, + "mep_count": len(meps) + } + + # Calculate overall support weighted by group size + total_meps = len(self.meps) + if total_meps > 0: + weighted_support = 0.0 + weighted_opposition = 0.0 + weighted_uncertainty = 0.0 + + for group_name, group_data in analysis["group_analysis"].items(): + weight = group_data["mep_count"] / total_meps + weighted_support += group_data["support"] * weight + weighted_opposition += group_data["opposition"] * weight + weighted_uncertainty += group_data["uncertainty"] * weight + + analysis["overall_support"] = weighted_support + analysis["opposition"] = weighted_opposition + analysis["uncertainty"] = weighted_uncertainty + + return analysis \ No newline at end of file diff --git a/examples/simulations/euroswarm_parliament/euroswarm_parliament_example.py b/examples/simulations/euroswarm_parliament/euroswarm_parliament_example.py new file mode 100644 index 00000000..7dc60d02 --- /dev/null +++ b/examples/simulations/euroswarm_parliament/euroswarm_parliament_example.py @@ -0,0 +1,521 @@ +""" +EuroSwarm Parliament - Example Script + +This script demonstrates the comprehensive democratic functionality of the EuroSwarm Parliament, +including bill introduction, committee work, parliamentary debates, and democratic voting. +""" + +import json +import time +from datetime import datetime + +# Import directly from the file +from euroswarm_parliament import ( + EuroSwarmParliament, + VoteType, + ParliamentaryRole, + ParliamentaryMember +) + + +def demonstrate_parliament_initialization(): + """Demonstrate parliament initialization and basic functionality with cost optimization.""" + + print("\nEUROSWARM PARLIAMENT INITIALIZATION DEMONSTRATION (COST OPTIMIZED)") + print("=" * 60) + + # Initialize the parliament with cost optimization + parliament = EuroSwarmParliament( + eu_data_file="EU.xml", + parliament_size=None, # Use all MEPs from EU.xml (717) + enable_democratic_discussion=True, + enable_committee_work=True, + enable_amendment_process=True, + enable_lazy_loading=True, # NEW: Lazy load MEP agents + enable_caching=True, # NEW: Enable response caching + batch_size=25, # NEW: Batch size for concurrent execution + budget_limit=100.0, # NEW: Budget limit in dollars + verbose=True + ) + + print(f"Parliament initialized with {len(parliament.meps)} MEPs") + + # Show parliament composition with cost stats + composition = parliament.get_parliament_composition() + + print(f"\nPARLIAMENT COMPOSITION:") + print(f"Total MEPs: {composition['total_meps']}") + print(f"Loaded MEPs: {composition['loaded_meps']} (lazy loading active)") + + print(f"\nCOST OPTIMIZATION:") + cost_stats = composition['cost_stats'] + print(f"Budget Limit: ${cost_stats['budget_remaining'] + cost_stats['total_cost']:.2f}") + print(f"Budget Used: ${cost_stats['total_cost']:.2f}") + print(f"Budget Remaining: ${cost_stats['budget_remaining']:.2f}") + print(f"Cache Hit Rate: {cost_stats['cache_hit_rate']:.1%}") + + print(f"\nPOLITICAL GROUP DISTRIBUTION:") + for group, data in composition['political_groups'].items(): + count = data['count'] + percentage = data['percentage'] + print(f" {group}: {count} MEPs ({percentage:.1f}%)") + + print(f"\nCOMMITTEE LEADERSHIP:") + for committee_name, committee_data in composition['committees'].items(): + chair = committee_data['chair'] + if chair: + print(f" {committee_name}: {chair}") + + return parliament + + +def demonstrate_individual_mep_interaction(parliament): + """Demonstrate individual MEP interaction and personality.""" + + print("\nINDIVIDUAL MEP INTERACTION DEMONSTRATION") + print("=" * 60) + + # Get a sample MEP + sample_mep_name = list(parliament.meps.keys())[0] + sample_mep = parliament.meps[sample_mep_name] + + print(f"Sample MEP: {sample_mep.full_name}") + print(f"Country: {sample_mep.country}") + print(f"Political Group: {sample_mep.political_group}") + print(f"National Party: {sample_mep.national_party}") + print(f"Committees: {', '.join(sample_mep.committees)}") + print(f"Expertise Areas: {', '.join(sample_mep.expertise_areas)}") + + # Test MEP agent interaction + if sample_mep.agent: + test_prompt = "What are your views on European integration and how do you approach cross-border cooperation?" + + print(f"\nMEP Response to: '{test_prompt}'") + print("-" * 50) + + try: + response = sample_mep.agent.run(test_prompt) + print(response[:500] + "..." if len(response) > 500 else response) + except Exception as e: + print(f"Error getting MEP response: {e}") + + +def demonstrate_committee_work(parliament): + """Demonstrate committee work and hearings.""" + + print("\nCOMMITTEE WORK DEMONSTRATION") + print("=" * 60) + + # Get a real MEP as sponsor + sponsor = list(parliament.meps.keys())[0] + + # Create a test bill + bill = parliament.introduce_bill( + title="European Digital Rights and Privacy Protection Act", + description="Comprehensive legislation to strengthen digital rights, enhance privacy protection, and establish clear guidelines for data handling across the European Union.", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Legal Affairs", + sponsor=sponsor + ) + + print(f"Bill: {bill.title}") + print(f"Committee: {bill.committee}") + print(f"Sponsor: {bill.sponsor}") + + # Conduct committee hearing + print(f"\nCONDUCTING COMMITTEE HEARING...") + hearing_result = parliament.conduct_committee_hearing(bill.committee, bill) + + print(f"Committee: {hearing_result['committee']}") + print(f"Participants: {len(hearing_result['participants'])} MEPs") + print(f"Recommendation: {hearing_result['recommendations']['recommendation']}") + print(f"Support: {hearing_result['recommendations']['support_percentage']:.1f}%") + print(f"Oppose: {hearing_result['recommendations']['oppose_percentage']:.1f}%") + print(f"Amend: {hearing_result['recommendations']['amend_percentage']:.1f}%") + + +def demonstrate_parliamentary_debate(parliament): + """Demonstrate parliamentary debate functionality.""" + + print("\nPARLIAMENTARY DEBATE DEMONSTRATION") + print("=" * 60) + + # Get a real MEP as sponsor + sponsor = list(parliament.meps.keys())[1] + + # Create a test bill + bill = parliament.introduce_bill( + title="European Green Deal Implementation Act", + description="Legislation to implement the European Green Deal, including carbon neutrality targets, renewable energy investments, and sustainable development measures.", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Environment, Public Health and Food Safety", + sponsor=sponsor + ) + + print(f"Bill: {bill.title}") + print(f"Description: {bill.description}") + + # Conduct parliamentary debate + print(f"\nCONDUCTING PARLIAMENTARY DEBATE...") + debate_result = parliament.conduct_parliamentary_debate(bill, max_speakers=10) + + print(f"Debate Participants: {len(debate_result['participants'])} MEPs") + print(f"Debate Analysis:") + print(f" Support: {debate_result['analysis']['support_count']} speakers ({debate_result['analysis']['support_percentage']:.1f}%)") + print(f" Oppose: {debate_result['analysis']['oppose_count']} speakers ({debate_result['analysis']['oppose_percentage']:.1f}%)") + print(f" Neutral: {debate_result['analysis']['neutral_count']} speakers ({debate_result['analysis']['neutral_percentage']:.1f}%)") + + +def demonstrate_democratic_voting(parliament): + """Demonstrate democratic voting functionality.""" + + print("\nDEMOCRATIC VOTING DEMONSTRATION") + print("=" * 60) + + # Get a real MEP as sponsor + sponsor = list(parliament.meps.keys())[2] + + # Create a test bill + bill = parliament.introduce_bill( + title="European Social Rights and Labor Protection Act", + description="Legislation to strengthen social rights, improve labor conditions, and ensure fair treatment of workers across the European Union.", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Employment and Social Affairs", + sponsor=sponsor + ) + + print(f"Bill: {bill.title}") + print(f"Sponsor: {bill.sponsor}") + + # Conduct democratic vote + print(f"\nCONDUCTING DEMOCRATIC VOTE...") + vote_result = parliament.conduct_democratic_vote(bill) + + # Calculate percentages + total_votes = vote_result.votes_for + vote_result.votes_against + vote_result.abstentions + in_favor_percentage = (vote_result.votes_for / total_votes * 100) if total_votes > 0 else 0 + against_percentage = (vote_result.votes_against / total_votes * 100) if total_votes > 0 else 0 + abstentions_percentage = (vote_result.abstentions / total_votes * 100) if total_votes > 0 else 0 + + print(f"Vote Results:") + print(f" Total Votes: {total_votes}") + print(f" In Favor: {vote_result.votes_for} ({in_favor_percentage:.1f}%)") + print(f" Against: {vote_result.votes_against} ({against_percentage:.1f}%)") + print(f" Abstentions: {vote_result.abstentions} ({abstentions_percentage:.1f}%)") + print(f" Result: {vote_result.result.value}") + + # Show political group breakdown if available + if hasattr(vote_result, 'group_votes') and vote_result.group_votes: + print(f"\nPOLITICAL GROUP BREAKDOWN:") + for group, votes in vote_result.group_votes.items(): + print(f" {group}: {votes['in_favor']}/{votes['total']} in favor ({votes['percentage']:.1f}%)") + else: + print(f"\nIndividual votes recorded: {len(vote_result.individual_votes)} MEPs") + + +def demonstrate_complete_democratic_session(parliament): + """Demonstrate a complete democratic parliamentary session.""" + + print("\nCOMPLETE DEMOCRATIC SESSION DEMONSTRATION") + print("=" * 60) + + # Get a real MEP as sponsor + sponsor = list(parliament.meps.keys())[3] + + # Run complete session + session_result = parliament.run_democratic_session( + bill_title="European Innovation and Technology Advancement Act", + bill_description="Comprehensive legislation to promote innovation, support technology startups, and establish Europe as a global leader in digital transformation and technological advancement.", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Industry, Research and Energy", + sponsor=sponsor + ) + + print(f"Session Results:") + print(f" Bill: {session_result['bill'].title}") + print(f" Committee Hearing: {session_result['hearing']['recommendations']['recommendation']}") + print(f" Debate Participants: {len(session_result['debate']['participants'])} MEPs") + print(f" Final Vote: {session_result['vote']['result']}") + print(f" Vote Margin: {session_result['vote']['in_favor_percentage']:.1f}% in favor") + + +def demonstrate_political_analysis(parliament): + """Demonstrate political analysis and voting prediction.""" + + print("\nPOLITICAL ANALYSIS DEMONSTRATION") + print("=" * 60) + + # Get a real MEP as sponsor + sponsor = list(parliament.meps.keys())[4] + + # Create a test bill + bill = parliament.introduce_bill( + title="European Climate Action and Sustainability Act", + description="Comprehensive climate action legislation including carbon pricing, renewable energy targets, and sustainable development measures.", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Environment, Public Health and Food Safety", + sponsor=sponsor + ) + + print(f"Bill: {bill.title}") + print(f"Sponsor: {bill.sponsor}") + + # Analyze political landscape + analysis = parliament.analyze_political_landscape(bill) + + print(f"\nPOLITICAL LANDSCAPE ANALYSIS:") + print(f" Overall Support: {analysis['overall_support']:.1f}%") + print(f" Opposition: {analysis['opposition']:.1f}%") + print(f" Uncertainty: {analysis['uncertainty']:.1f}%") + + print(f"\nPOLITICAL GROUP ANALYSIS:") + for group, data in analysis['group_analysis'].items(): + print(f" {group}: {data['support']:.1f}% support, {data['opposition']:.1f}% opposition") + + +def demonstrate_hierarchical_democratic_voting(parliament): + """Demonstrate hierarchical democratic voting with political group boards.""" + + print("\nHIERARCHICAL DEMOCRATIC VOTING DEMONSTRATION") + print("=" * 60) + + # Get a real MEP as sponsor + sponsor = list(parliament.meps.keys())[5] + + # Create a test bill + bill = parliament.introduce_bill( + title="European Climate Action and Sustainability Act", + description="Comprehensive climate action legislation including carbon pricing, renewable energy targets, and sustainable development measures.", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Environment, Public Health and Food Safety", + sponsor=sponsor + ) + + print(f"Bill: {bill.title}") + print(f"Sponsor: {bill.sponsor}") + + # Conduct hierarchical vote + print(f"\nCONDUCTING HIERARCHICAL DEMOCRATIC VOTE...") + hierarchical_result = parliament.conduct_hierarchical_democratic_vote(bill) + + print(f"Hierarchical Vote Results:") + print(f" Total Votes: {hierarchical_result['total_votes']}") + print(f" In Favor: {hierarchical_result['in_favor']} ({hierarchical_result['in_favor_percentage']:.1f}%)") + print(f" Against: {hierarchical_result['against']} ({hierarchical_result['against_percentage']:.1f}%)") + print(f" Result: {hierarchical_result['result']}") + + print(f"\nPOLITICAL GROUP BOARD DECISIONS:") + for group, decision in hierarchical_result['group_decisions'].items(): + print(f" {group}: {decision['decision']} ({decision['confidence']:.1f}% confidence)") + + +def demonstrate_complete_hierarchical_session(parliament): + """Demonstrate a complete hierarchical democratic session.""" + + print("\nCOMPLETE HIERARCHICAL DEMOCRATIC SESSION DEMONSTRATION") + print("=" * 60) + + # Get a real MEP as sponsor + sponsor = list(parliament.meps.keys())[6] + + # Run complete hierarchical session + session_result = parliament.run_hierarchical_democratic_session( + bill_title="European Climate Action and Sustainability Act", + bill_description="Comprehensive climate action legislation including carbon pricing, renewable energy targets, and sustainable development measures.", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Environment, Public Health and Food Safety", + sponsor=sponsor + ) + + print(f"Hierarchical Session Results:") + print(f" Bill: {session_result['bill'].title}") + print(f" Committee Hearing: {session_result['hearing']['recommendations']['recommendation']}") + print(f" Debate Participants: {len(session_result['debate']['participants'])} MEPs") + print(f" Final Vote: {session_result['vote']['result']}") + print(f" Vote Margin: {session_result['vote']['in_favor_percentage']:.1f}% in favor") + + +def demonstrate_wikipedia_personalities(parliament): + """Demonstrate the Wikipedia personality system for realistic MEP behavior.""" + + print("\nWIKIPEDIA PERSONALITY SYSTEM DEMONSTRATION") + print("=" * 60) + + # Check if Wikipedia personalities are available + if not parliament.enable_wikipedia_personalities: + print("Wikipedia personality system not available") + print("To enable: Install required dependencies and run Wikipedia scraper") + return + + print(f"Wikipedia personality system enabled") + print(f"Loaded {len(parliament.personality_profiles)} personality profiles") + + # Show sample personality profiles + print(f"\nSAMPLE PERSONALITY PROFILES:") + print("-" * 40) + + sample_count = 0 + for mep_name, profile in parliament.personality_profiles.items(): + if sample_count >= 3: # Show only 3 samples + break + + print(f"\n{mep_name}") + print(f" Wikipedia URL: {profile.wikipedia_url if profile.wikipedia_url else 'Not available'}") + print(f" Summary: {profile.summary[:200]}..." if profile.summary else "No summary available") + print(f" Political Views: {profile.political_views[:150]}..." if profile.political_views else "Based on party alignment") + print(f" Policy Focus: {profile.policy_focus[:150]}..." if profile.policy_focus else "General parliamentary work") + print(f" Achievements: {profile.achievements[:150]}..." if profile.achievements else "Parliamentary service") + print(f" Last Updated: {profile.last_updated}") + + sample_count += 1 + + # Demonstrate personality-driven voting + print(f"\nPERSONALITY-DRIVEN VOTING DEMONSTRATION:") + print("-" * 50) + + # Create a test bill that would trigger different personality responses + bill = parliament.introduce_bill( + title="European Climate Action and Green Technology Investment Act", + description="Comprehensive legislation to accelerate Europe's transition to renewable energy, including massive investments in green technology, carbon pricing mechanisms, and support for affected industries and workers.", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Environment", + sponsor="Climate Action Leader" + ) + + print(f"Bill: {bill.title}") + print(f"Description: {bill.description}") + + # Show how different MEPs with Wikipedia personalities would respond + print(f"\nPERSONALITY-BASED RESPONSES:") + print("-" * 40) + + sample_meps = list(parliament.personality_profiles.keys())[:3] + + for mep_name in sample_meps: + mep = parliament.meps.get(mep_name) + profile = parliament.personality_profiles.get(mep_name) + + if mep and profile: + print(f"\n{mep_name} ({mep.political_group})") + + # Show personality influence + if profile.political_views: + print(f" Political Views: {profile.political_views[:100]}...") + + if profile.policy_focus: + print(f" Policy Focus: {profile.policy_focus[:100]}...") + + # Predict voting behavior based on personality + if "environment" in profile.policy_focus.lower() or "climate" in profile.political_views.lower(): + predicted_vote = "LIKELY SUPPORT" + reasoning = "Environmental policy focus and climate advocacy" + elif "economic" in profile.policy_focus.lower() or "business" in profile.political_views.lower(): + predicted_vote = "LIKELY OPPOSE" + reasoning = "Economic concerns about investment costs" + else: + predicted_vote = "UNCERTAIN" + reasoning = "Mixed considerations based on party alignment" + + print(f" Predicted Vote: {predicted_vote}") + print(f" Reasoning: {reasoning}") + + # Demonstrate scraping functionality + print(f"\nWIKIPEDIA SCRAPING CAPABILITIES:") + print("-" * 50) + print("Can scrape Wikipedia data for all 717 MEPs") + print("Extracts political views, career history, and achievements") + print("Creates detailed personality profiles in JSON format") + print("Integrates real personality data into AI agent system prompts") + print("Enables realistic, personality-driven voting behavior") + print("Respectful API usage with configurable delays") + + print(f"\nTo scrape all MEP personalities:") + print(" parliament.scrape_wikipedia_personalities(delay=1.0)") + print(" # This will create personality profiles for all 717 MEPs") + print(" # Profiles are saved in 'mep_personalities/' directory") + + +def demonstrate_optimized_parliamentary_session(parliament): + """Demonstrate cost-optimized parliamentary session.""" + + print("\nCOST-OPTIMIZED PARLIAMENTARY SESSION DEMONSTRATION") + print("=" * 60) + + # Run optimized session with cost limit + session_result = parliament.run_optimized_parliamentary_session( + bill_title="European Digital Rights and Privacy Protection Act", + bill_description="Comprehensive legislation to strengthen digital rights, enhance privacy protection, and establish clear guidelines for data handling across the European Union.", + bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE, + committee="Legal Affairs", + max_cost=25.0 # Max $25 for this session + ) + + print(f"Session Results:") + print(f" Bill: {session_result['session_summary']['bill_title']}") + print(f" Final Outcome: {session_result['session_summary']['final_outcome']}") + print(f" Total Cost: ${session_result['session_summary']['total_cost']:.2f}") + print(f" Budget Remaining: ${session_result['cost_stats']['budget_remaining']:.2f}") + + # Show detailed cost statistics + cost_stats = parliament.get_cost_statistics() + print(f"\nDETAILED COST STATISTICS:") + print(f" Total Tokens Used: {cost_stats['total_tokens']:,}") + print(f" Requests Made: {cost_stats['requests_made']}") + print(f" Cache Hits: {cost_stats['cache_hits']}") + print(f" Cache Hit Rate: {cost_stats['cache_hit_rate']:.1%}") + print(f" Loading Efficiency: {cost_stats['loading_efficiency']:.1%}") + print(f" Cache Size: {cost_stats['cache_size']} entries") + + return session_result + + +def main(): + """Main demonstration function.""" + + print("EUROSWARM PARLIAMENT - COST OPTIMIZED DEMONSTRATION") + print("=" * 60) + print("This demonstration shows the EuroSwarm Parliament with cost optimization features:") + print("• Lazy loading of MEP agents (only create when needed)") + print("• Response caching (avoid repeated API calls)") + print("• Batch processing (control memory and cost)") + print("• Budget controls (hard limits on spending)") + print("• Cost tracking (real-time monitoring)") + + # Initialize parliament with cost optimization + parliament = demonstrate_parliament_initialization() + + # Demonstrate individual MEP interaction (will trigger lazy loading) + demonstrate_individual_mep_interaction(parliament) + + # Demonstrate committee work with cost optimization + demonstrate_committee_work(parliament) + + # Demonstrate parliamentary debate with cost optimization + demonstrate_parliamentary_debate(parliament) + + # Demonstrate democratic voting with cost optimization + demonstrate_democratic_voting(parliament) + + # Demonstrate political analysis with cost optimization + demonstrate_political_analysis(parliament) + + # Demonstrate optimized parliamentary session + demonstrate_optimized_parliamentary_session(parliament) + + # Show final cost statistics + final_stats = parliament.get_cost_statistics() + print(f"\nFINAL COST STATISTICS:") + print(f"Total Cost: ${final_stats['total_cost']:.2f}") + print(f"Budget Remaining: ${final_stats['budget_remaining']:.2f}") + print(f"Cache Hit Rate: {final_stats['cache_hit_rate']:.1%}") + print(f"Loading Efficiency: {final_stats['loading_efficiency']:.1%}") + + print(f"\n✅ COST OPTIMIZATION DEMONSTRATION COMPLETED!") + print(f"✅ EuroSwarm Parliament now supports cost-effective large-scale simulations") + print(f"✅ Lazy loading: {final_stats['loaded_meps']}/{final_stats['total_meps']} MEPs loaded") + print(f"✅ Caching: {final_stats['cache_hit_rate']:.1%} hit rate") + print(f"✅ Budget control: ${final_stats['total_cost']:.2f} spent of ${final_stats['budget_remaining'] + final_stats['total_cost']:.2f} budget") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/simulations/euroswarm_parliament/mass_agent_template.py b/examples/simulations/euroswarm_parliament/mass_agent_template.py new file mode 100644 index 00000000..a6e29d8c --- /dev/null +++ b/examples/simulations/euroswarm_parliament/mass_agent_template.py @@ -0,0 +1,998 @@ +""" +Mass Agent Template - Template for Creating Large-Scale Multi-Agent Systems + +This template demonstrates how to generate hundreds of agents on the fly, similar to the EuroSwarm Parliament approach. +It provides a reusable framework for creating large-scale multi-agent systems with dynamic agent generation. + +Key Features: +- Dynamic agent generation from data sources +- Configurable agent personalities and roles +- Scalable architecture for thousands of agents +- Template-based system prompts +- Hierarchical organization capabilities +- Memory and state management +- COST OPTIMIZATION: Lazy loading, batching, caching, budget controls +""" + +import os +import random +import json +import time +import hashlib +from typing import Dict, List, Optional, Union, Any, Set +from dataclasses import dataclass, field +from enum import Enum +from datetime import datetime +from functools import lru_cache + +from swarms import Agent +from swarms.structs.multi_agent_exec import run_agents_concurrently +from swarms.structs.board_of_directors_swarm import ( + BoardOfDirectorsSwarm, + BoardMember, + BoardMemberRole, + BoardDecisionType, + BoardSpec, + BoardOrder, + BoardDecision, + enable_board_feature, +) +from swarms.utils.loguru_logger import initialize_logger + +# Initialize logger +logger = initialize_logger(log_folder="mass_agent_template") + +# Enable Board of Directors feature +enable_board_feature() + + +class AgentRole(str, Enum): + """Enumeration of agent roles and specializations.""" + + WORKER = "worker" + MANAGER = "manager" + SPECIALIST = "specialist" + COORDINATOR = "coordinator" + ANALYST = "analyst" + CREATOR = "creator" + VALIDATOR = "validator" + EXECUTOR = "executor" + + +class AgentCategory(str, Enum): + """Enumeration of agent categories for organization.""" + + TECHNICAL = "technical" + CREATIVE = "creative" + ANALYTICAL = "analytical" + OPERATIONAL = "operational" + STRATEGIC = "strategic" + SUPPORT = "support" + + +@dataclass +class AgentProfile: + """ + Represents a single agent in the mass agent system. + + Attributes: + name: Unique name of the agent + role: Primary role of the agent + category: Category for organization + specialization: Areas of expertise + personality_traits: Personality characteristics + skills: List of skills and capabilities + experience_level: Experience level (junior, senior, expert) + agent: The AI agent instance (lazy loaded) + is_loaded: Whether the agent has been instantiated + """ + + name: str + role: AgentRole + category: AgentCategory + specialization: List[str] = field(default_factory=list) + personality_traits: List[str] = field(default_factory=list) + skills: List[str] = field(default_factory=list) + experience_level: str = "senior" + agent: Optional[Agent] = None + is_loaded: bool = False + + +@dataclass +class AgentGroup: + """ + Represents a group of agents with similar roles or categories. + + Attributes: + name: Name of the group + category: Category of the group + agents: List of agent names in this group + leader: Group leader agent name + total_agents: Total number of agents in group + group_swarm: Board of Directors swarm for this group + is_swarm_loaded: Whether the swarm has been instantiated + """ + + name: str + category: AgentCategory + agents: List[str] = field(default_factory=list) + leader: Optional[str] = None + total_agents: int = 0 + group_swarm: Optional[Any] = None + is_swarm_loaded: bool = False + + +@dataclass +class CostTracker: + """Track costs and usage for budget management.""" + + total_tokens_used: int = 0 + total_cost_estimate: float = 0.0 + budget_limit: float = 100.0 # Default $100 budget + token_cost_per_1m: float = 0.15 # GPT-4o-mini cost + requests_made: int = 0 + cache_hits: int = 0 + + def add_tokens(self, tokens: int): + """Add tokens used and calculate cost.""" + self.total_tokens_used += tokens + self.total_cost_estimate = (self.total_tokens_used / 1_000_000) * self.token_cost_per_1m + self.requests_made += 1 + + def add_cache_hit(self): + """Record a cache hit.""" + self.cache_hits += 1 + + def check_budget(self) -> bool: + """Check if within budget.""" + return self.total_cost_estimate <= self.budget_limit + + def get_stats(self) -> Dict[str, Any]: + """Get cost statistics.""" + return { + "total_tokens": self.total_tokens_used, + "total_cost": self.total_cost_estimate, + "requests_made": self.requests_made, + "cache_hits": self.cache_hits, + "cache_hit_rate": self.cache_hits / max(1, self.requests_made + self.cache_hits), + "budget_remaining": max(0, self.budget_limit - self.total_cost_estimate) + } + + +class MassAgentTemplate: + """ + Template for creating large-scale multi-agent systems with cost optimization. + + This class provides a framework for generating hundreds of agents on the fly, + organizing them into groups, and managing their interactions with cost controls. + """ + + def __init__( + self, + data_source: str = None, # Path to data file (CSV, JSON, XML, etc.) + agent_count: int = 1000, # Target number of agents + enable_hierarchical_organization: bool = True, + enable_group_swarms: bool = True, + enable_lazy_loading: bool = True, # NEW: Lazy load agents + enable_caching: bool = True, # NEW: Enable response caching + batch_size: int = 50, # NEW: Batch size for concurrent execution + budget_limit: float = 100.0, # NEW: Budget limit in dollars + verbose: bool = False, + ): + """ + Initialize the Mass Agent Template with cost optimization. + + Args: + data_source: Path to data file containing agent information + agent_count: Target number of agents to generate + enable_hierarchical_organization: Enable hierarchical organization + enable_group_swarms: Enable Board of Directors swarms for groups + enable_lazy_loading: Enable lazy loading of agents (cost optimization) + enable_caching: Enable response caching (cost optimization) + batch_size: Number of agents to process in batches + budget_limit: Maximum budget in dollars + verbose: Enable verbose logging + """ + self.data_source = data_source + self.agent_count = agent_count + self.enable_hierarchical_organization = enable_hierarchical_organization + self.enable_group_swarms = enable_group_swarms + self.enable_lazy_loading = enable_lazy_loading + self.enable_caching = enable_caching + self.batch_size = batch_size + self.verbose = verbose + + # Initialize cost tracking + self.cost_tracker = CostTracker(budget_limit=budget_limit) + + # Initialize agent storage + self.agents: Dict[str, AgentProfile] = {} + self.groups: Dict[str, AgentGroup] = {} + self.categories: Dict[AgentCategory, List[str]] = {} + + # Initialize caching + self.response_cache: Dict[str, str] = {} + + # Load agent profiles (without creating agents) + self._load_agent_profiles() + + if self.enable_hierarchical_organization: + self._organize_agents() + + if self.verbose: + logger.info(f"Mass Agent Template initialized with {len(self.agents)} agent profiles") + logger.info(f"Lazy loading: {self.enable_lazy_loading}, Caching: {self.enable_caching}") + logger.info(f"Budget limit: ${budget_limit}, Batch size: {batch_size}") + + def _load_agent_profiles(self) -> List[Dict[str, Any]]: + """ + Load agent profiles from the specified data source. + + This method loads agent data but doesn't create AI agents yet (lazy loading). + + Returns: + List[Dict[str, Any]]: List of agent data dictionaries + """ + agent_data = [] + + if self.data_source and os.path.exists(self.data_source): + # Load from file - customize based on your data format + try: + if self.data_source.endswith('.json'): + with open(self.data_source, 'r', encoding='utf-8') as f: + agent_data = json.load(f) + elif self.data_source.endswith('.csv'): + import pandas as pd + df = pd.read_csv(self.data_source) + agent_data = df.to_dict('records') + else: + logger.warning(f"Unsupported data format: {self.data_source}") + except Exception as e: + logger.error(f"Error loading agent data: {e}") + + # If no data loaded, generate synthetic data + if not agent_data: + agent_data = self._generate_synthetic_data() + + # Create agent profiles (without instantiating agents) + for data in agent_data: + agent_profile = AgentProfile( + name=data["name"], + role=data["role"], + category=data["category"], + specialization=data["specialization"], + personality_traits=data["personality_traits"], + skills=data["skills"], + experience_level=data["experience_level"], + agent=None, # Will be created on demand + is_loaded=False + ) + + self.agents[data["name"]] = agent_profile + + return agent_data + + def _load_agent(self, agent_name: str) -> Optional[Agent]: + """ + Lazy load a single agent on demand. + + Args: + agent_name: Name of the agent to load + + Returns: + Optional[Agent]: Loaded agent or None if not found + """ + if agent_name not in self.agents: + return None + + profile = self.agents[agent_name] + + # Check if already loaded + if profile.is_loaded and profile.agent: + return profile.agent + + # Create agent (no cost for creation, only for running) + profile.agent = self._create_agent(profile) + profile.is_loaded = True + + if self.verbose: + logger.info(f"Loaded agent: {agent_name}") + + return profile.agent + + def _load_agents_batch(self, agent_names: List[str]) -> List[Agent]: + """ + Load multiple agents in a batch. + + Args: + agent_names: List of agent names to load + + Returns: + List[Agent]: List of loaded agents + """ + loaded_agents = [] + + for agent_name in agent_names: + agent = self._load_agent(agent_name) + if agent: + loaded_agents.append(agent) + + return loaded_agents + + def _get_cache_key(self, task: str, agent_names: List[str]) -> str: + """ + Generate a cache key for a task and agent combination. + + Args: + task: Task to execute + agent_names: List of agent names + + Returns: + str: Cache key + """ + # Sort agent names for consistent cache keys + sorted_agents = sorted(agent_names) + content = f"{task}:{':'.join(sorted_agents)}" + return hashlib.md5(content.encode()).hexdigest() + + def _check_cache(self, cache_key: str) -> Optional[str]: + """ + Check if a response is cached. + + Args: + cache_key: Cache key to check + + Returns: + Optional[str]: Cached response or None + """ + if not self.enable_caching: + return None + + cached_response = self.response_cache.get(cache_key) + if cached_response: + self.cost_tracker.add_cache_hit() + if self.verbose: + logger.info(f"Cache hit for key: {cache_key[:20]}...") + + return cached_response + + def _cache_response(self, cache_key: str, response: str): + """ + Cache a response. + + Args: + cache_key: Cache key + response: Response to cache + """ + if self.enable_caching: + self.response_cache[cache_key] = response + if self.verbose: + logger.info(f"Cached response for key: {cache_key[:20]}...") + + def _generate_synthetic_data(self) -> List[Dict[str, Any]]: + """ + Generate synthetic agent data for demonstration purposes. + + Returns: + List[Dict[str, Any]]: List of synthetic agent data + """ + synthetic_data = [] + + # Define sample data for different agent types + sample_agents = [ + { + "name": "Alex_Developer", + "role": AgentRole.SPECIALIST, + "category": AgentCategory.TECHNICAL, + "specialization": ["Python", "Machine Learning", "API Development"], + "personality_traits": ["analytical", "detail-oriented", "problem-solver"], + "skills": ["Python", "TensorFlow", "FastAPI", "Docker"], + "experience_level": "senior" + }, + { + "name": "Sarah_Designer", + "role": AgentRole.CREATOR, + "category": AgentCategory.CREATIVE, + "specialization": ["UI/UX Design", "Visual Design", "Brand Identity"], + "personality_traits": ["creative", "user-focused", "aesthetic"], + "skills": ["Figma", "Adobe Creative Suite", "User Research", "Prototyping"], + "experience_level": "senior" + }, + { + "name": "Mike_Analyst", + "role": AgentRole.ANALYST, + "category": AgentCategory.ANALYTICAL, + "specialization": ["Data Analysis", "Business Intelligence", "Market Research"], + "personality_traits": ["data-driven", "curious", "insightful"], + "skills": ["SQL", "Python", "Tableau", "Statistics"], + "experience_level": "expert" + }, + { + "name": "Lisa_Manager", + "role": AgentRole.MANAGER, + "category": AgentCategory.STRATEGIC, + "specialization": ["Project Management", "Team Leadership", "Strategic Planning"], + "personality_traits": ["organized", "leadership", "strategic"], + "skills": ["Agile", "Scrum", "Risk Management", "Stakeholder Communication"], + "experience_level": "senior" + }, + { + "name": "Tom_Coordinator", + "role": AgentRole.COORDINATOR, + "category": AgentCategory.OPERATIONAL, + "specialization": ["Process Optimization", "Workflow Management", "Resource Allocation"], + "personality_traits": ["efficient", "coordinated", "systematic"], + "skills": ["Process Mapping", "Automation", "Resource Planning", "Quality Assurance"], + "experience_level": "senior" + } + ] + + # Generate the specified number of agents + for i in range(self.agent_count): + # Use sample data as template and create variations + template = random.choice(sample_agents) + + agent_data = { + "name": f"{template['name']}_{i:04d}", + "role": template["role"], + "category": template["category"], + "specialization": template["specialization"].copy(), + "personality_traits": template["personality_traits"].copy(), + "skills": template["skills"].copy(), + "experience_level": template["experience_level"] + } + + # Add some randomization for variety + if random.random() < 0.3: + agent_data["experience_level"] = random.choice(["junior", "senior", "expert"]) + + synthetic_data.append(agent_data) + + return synthetic_data + + def _create_agent(self, profile: AgentProfile) -> Agent: + """ + Create an AI agent for the given profile. + + Args: + profile: Agent profile data + + Returns: + Agent: AI agent instance + """ + system_prompt = self._generate_agent_system_prompt(profile) + + return Agent( + agent_name=profile.name, + system_prompt=system_prompt, + model_name="gpt-4o-mini", + max_loops=3, + verbose=self.verbose, + ) + + def _generate_agent_system_prompt(self, profile: AgentProfile) -> str: + """ + Generate a comprehensive system prompt for an agent. + + Args: + profile: Agent profile data + + Returns: + str: System prompt for the agent + """ + prompt = f"""You are {profile.name}, an AI agent with the following characteristics: + +ROLE AND CATEGORY: +- Role: {profile.role.value} +- Category: {profile.category.value} +- Experience Level: {profile.experience_level} + +EXPERTISE AND SKILLS: +- Specializations: {', '.join(profile.specialization)} +- Skills: {', '.join(profile.skills)} + +PERSONALITY TRAITS: +- {', '.join(profile.personality_traits)} + +CORE RESPONSIBILITIES: +{self._get_role_responsibilities(profile.role)} + +WORKING STYLE: +- Approach tasks with your unique personality and expertise +- Collaborate effectively with other agents +- Maintain high quality standards +- Adapt to changing requirements +- Communicate clearly and professionally + +When working on tasks: +1. Apply your specialized knowledge and skills +2. Consider your personality traits in your approach +3. Work within your role's scope and responsibilities +4. Collaborate with other agents when beneficial +5. Maintain consistency with your established character + +Remember: You are part of a large multi-agent system. Your unique combination of role, skills, and personality makes you valuable to the team. +""" + + return prompt + + def _get_role_responsibilities(self, role: AgentRole) -> str: + """Get responsibilities for a specific role.""" + + responsibilities = { + AgentRole.WORKER: """ +- Execute assigned tasks efficiently and accurately +- Follow established procedures and guidelines +- Report progress and any issues encountered +- Maintain quality standards in all work +- Collaborate with team members as needed""", + + AgentRole.MANAGER: """ +- Oversee team activities and coordinate efforts +- Set priorities and allocate resources +- Monitor progress and ensure deadlines are met +- Provide guidance and support to team members +- Make strategic decisions for the team""", + + AgentRole.SPECIALIST: """ +- Provide expert knowledge in specific domains +- Solve complex technical problems +- Mentor other agents in your area of expertise +- Stay updated on latest developments in your field +- Contribute specialized insights to projects""", + + AgentRole.COORDINATOR: """ +- Facilitate communication between different groups +- Ensure smooth workflow and process optimization +- Manage dependencies and resource allocation +- Track project timelines and milestones +- Resolve conflicts and bottlenecks""", + + AgentRole.ANALYST: """ +- Analyze data and extract meaningful insights +- Identify patterns and trends +- Provide evidence-based recommendations +- Create reports and visualizations +- Support decision-making with data""", + + AgentRole.CREATOR: """ +- Generate innovative ideas and solutions +- Design and develop new content or products +- Think creatively and outside the box +- Prototype and iterate on concepts +- Inspire and motivate other team members""", + + AgentRole.VALIDATOR: """ +- Review and validate work quality +- Ensure compliance with standards and requirements +- Provide constructive feedback +- Identify potential issues and risks +- Maintain quality assurance processes""", + + AgentRole.EXECUTOR: """ +- Implement plans and strategies +- Execute tasks with precision and efficiency +- Adapt to changing circumstances +- Ensure successful completion of objectives +- Maintain focus on results and outcomes""" + } + + return responsibilities.get(role, "Execute tasks according to your role and expertise.") + + def _organize_agents(self): + """Organize agents into groups and categories.""" + + # Organize by category + for agent_name, profile in self.agents.items(): + category = profile.category + if category not in self.categories: + self.categories[category] = [] + self.categories[category].append(agent_name) + + # Create groups for each category + for category, agent_names in self.categories.items(): + group_name = f"{category.value.capitalize()}_Group" + + # Select a leader (first agent in the category) + leader = agent_names[0] if agent_names else None + + group = AgentGroup( + name=group_name, + category=category, + agents=agent_names, + leader=leader, + total_agents=len(agent_names) + ) + + self.groups[group_name] = group + + if self.verbose: + logger.info(f"Organized agents into {len(self.groups)} groups") + + def _create_group_swarms(self): + """Create Board of Directors swarms for each group.""" + + for group_name, group in self.groups.items(): + if not group.agents: + continue + + # Create board members from group agents + board_members = [] + + # Add group leader as chairman + if group.leader and group.leader in self.agents: + leader_profile = self.agents[group.leader] + if leader_profile.agent: + board_members.append(BoardMember( + agent=leader_profile.agent, + role=BoardMemberRole.CHAIRMAN, + voting_weight=1.0, + expertise_areas=leader_profile.specialization + )) + + # Add other agents as board members + for agent_name in group.agents[:5]: # Limit to 5 board members + if agent_name != group.leader and agent_name in self.agents: + profile = self.agents[agent_name] + if profile.agent: + board_members.append(BoardMember( + agent=profile.agent, + role=BoardMemberRole.EXECUTIVE_DIRECTOR, + voting_weight=0.8, + expertise_areas=profile.specialization + )) + + # Create Board of Directors swarm + if board_members: + agents = [member.agent for member in board_members if member.agent is not None] + + group.group_swarm = BoardOfDirectorsSwarm( + name=group_name, + description=f"Specialized swarm for {group_name} with expertise in {group.category.value}", + board_members=board_members, + agents=agents, + max_loops=3, + verbose=self.verbose, + decision_threshold=0.6, + enable_voting=True, + enable_consensus=True + ) + + if self.verbose: + logger.info(f"Created {len([g for g in self.groups.values() if g.group_swarm])} group swarms") + + def get_agent(self, agent_name: str) -> Optional[AgentProfile]: + """ + Get a specific agent by name. + + Args: + agent_name: Name of the agent + + Returns: + Optional[AgentProfile]: Agent profile if found, None otherwise + """ + return self.agents.get(agent_name) + + def get_group(self, group_name: str) -> Optional[AgentGroup]: + """ + Get a specific group by name. + + Args: + group_name: Name of the group + + Returns: + Optional[AgentGroup]: Group if found, None otherwise + """ + return self.groups.get(group_name) + + def get_agents_by_category(self, category: AgentCategory) -> List[str]: + """ + Get all agents in a specific category. + + Args: + category: Agent category + + Returns: + List[str]: List of agent names in the category + """ + return self.categories.get(category, []) + + def get_agents_by_role(self, role: AgentRole) -> List[str]: + """ + Get all agents with a specific role. + + Args: + role: Agent role + + Returns: + List[str]: List of agent names with the role + """ + return [name for name, profile in self.agents.items() if profile.role == role] + + def run_mass_task(self, task: str, agent_count: int = 10) -> Dict[str, Any]: + """ + Run a task with multiple agents working in parallel with cost optimization. + + Args: + task: Task to execute + agent_count: Number of agents to use + + Returns: + Dict[str, Any]: Results from the mass task execution + """ + # Check budget before starting + if not self.cost_tracker.check_budget(): + return {"error": "Budget exceeded", "cost_stats": self.cost_tracker.get_stats()} + + # Select random agents + selected_agent_names = random.sample(list(self.agents.keys()), min(agent_count, len(self.agents))) + + # Check cache first + cache_key = self._get_cache_key(task, selected_agent_names) + cached_result = self._check_cache(cache_key) + if cached_result: + return { + "task": task, + "agents_used": selected_agent_names, + "results": cached_result, + "total_agents": len(selected_agent_names), + "cached": True, + "cost_stats": self.cost_tracker.get_stats() + } + + # Process in batches to control memory and cost + all_results = [] + total_processed = 0 + + for i in range(0, len(selected_agent_names), self.batch_size): + batch_names = selected_agent_names[i:i + self.batch_size] + + # Check budget for this batch + if not self.cost_tracker.check_budget(): + logger.warning(f"Budget exceeded after processing {total_processed} agents") + logger.warning(f"Current cost: ${self.cost_tracker.total_cost_estimate:.4f}, Budget: ${self.cost_tracker.budget_limit:.2f}") + break + + # Load agents for this batch + batch_agents = self._load_agents_batch(batch_names) + + if not batch_agents: + continue + + # Run batch + try: + batch_results = run_agents_concurrently(batch_agents, task) + all_results.extend(batch_results) + total_processed += len(batch_agents) + + # Estimate tokens used (more realistic approximation) + # Include both input tokens (task) and output tokens (response) + task_tokens = len(task.split()) * 1.3 # ~1.3 tokens per word + response_tokens = len(batch_agents) * 200 # ~200 tokens per response + total_tokens = int(task_tokens + response_tokens) + self.cost_tracker.add_tokens(total_tokens) + + if self.verbose: + logger.info(f"Processed batch {i//self.batch_size + 1}: {len(batch_agents)} agents") + logger.info(f"Current cost: ${self.cost_tracker.total_cost_estimate:.4f}, Budget remaining: ${self.cost_tracker.budget_limit - self.cost_tracker.total_cost_estimate:.2f}") + + except Exception as e: + logger.error(f"Error processing batch: {e}") + continue + + # Cache the results + if all_results: + self._cache_response(cache_key, str(all_results)) + + return { + "task": task, + "agents_used": selected_agent_names[:total_processed], + "results": all_results, + "total_agents": total_processed, + "cached": False, + "cost_stats": self.cost_tracker.get_stats() + } + + def run_mass_task_optimized(self, task: str, agent_count: int = 1000, + max_cost: float = 10.0) -> Dict[str, Any]: + """ + Run a task with cost-optimized mass execution for large-scale operations. + + Args: + task: Task to execute + agent_count: Target number of agents to use + max_cost: Maximum cost for this task in dollars + + Returns: + Dict[str, Any]: Results from the optimized mass task execution + """ + # Store original settings + original_budget = self.cost_tracker.budget_limit + original_batch_size = self.batch_size + + try: + # Set temporary budget for this task (don't reduce if max_cost is higher) + if max_cost < original_budget: + self.cost_tracker.budget_limit = max_cost + + # Use smaller batches for better cost control + self.batch_size = min(25, self.batch_size) # Smaller batches for cost control + + result = self.run_mass_task(task, agent_count) + + return result + + finally: + # Restore original settings + self.cost_tracker.budget_limit = original_budget + self.batch_size = original_batch_size + + def run_group_task(self, group_name: str, task: str) -> Dict[str, Any]: + """ + Run a task with a specific group using their Board of Directors swarm. + + Args: + group_name: Name of the group + task: Task to execute + + Returns: + Dict[str, Any]: Results from the group task execution + """ + group = self.groups.get(group_name) + if not group or not group.group_swarm: + return {"error": f"Group {group_name} not found or no swarm available"} + + # Run task with group swarm + result = group.group_swarm.run(task) + + return { + "group": group_name, + "task": task, + "result": result, + "agents_involved": group.agents + } + + def get_system_stats(self) -> Dict[str, Any]: + """ + Get statistics about the mass agent system including cost tracking. + + Returns: + Dict[str, Any]: System statistics + """ + stats = { + "total_agents": len(self.agents), + "total_groups": len(self.groups), + "loaded_agents": len([a for a in self.agents.values() if a.is_loaded]), + "categories": {}, + "roles": {}, + "experience_levels": {}, + "cost_stats": self.cost_tracker.get_stats(), + "optimization": { + "lazy_loading": self.enable_lazy_loading, + "caching": self.enable_caching, + "batch_size": self.batch_size, + "budget_limit": self.cost_tracker.budget_limit + } + } + + # Category breakdown + for category in AgentCategory: + stats["categories"][category.value] = len(self.get_agents_by_category(category)) + + # Role breakdown + for role in AgentRole: + stats["roles"][role.value] = len(self.get_agents_by_role(role)) + + # Experience level breakdown + experience_counts = {} + for profile in self.agents.values(): + level = profile.experience_level + experience_counts[level] = experience_counts.get(level, 0) + 1 + stats["experience_levels"] = experience_counts + + return stats + + +# Example usage and demonstration +def demonstrate_mass_agent_template(): + """Demonstrate the Mass Agent Template functionality with cost optimization.""" + + print("MASS AGENT TEMPLATE DEMONSTRATION (COST OPTIMIZED)") + print("=" * 60) + + # Initialize the template with 1000 agents and cost optimization + template = MassAgentTemplate( + agent_count=1000, + enable_hierarchical_organization=True, + enable_group_swarms=False, # Disable for cost savings + enable_lazy_loading=True, + enable_caching=True, + batch_size=25, + budget_limit=50.0, # $50 budget limit + verbose=True + ) + + # Show system statistics + stats = template.get_system_stats() + + print(f"\nSYSTEM STATISTICS:") + print(f"Total Agents: {stats['total_agents']}") + print(f"Loaded Agents: {stats['loaded_agents']} (lazy loading active)") + print(f"Total Groups: {stats['total_groups']}") + + print(f"\nCOST OPTIMIZATION:") + cost_stats = stats['cost_stats'] + print(f"Budget Limit: ${cost_stats['budget_remaining'] + cost_stats['total_cost']:.2f}") + print(f"Budget Used: ${cost_stats['total_cost']:.2f}") + print(f"Budget Remaining: ${cost_stats['budget_remaining']:.2f}") + print(f"Cache Hit Rate: {cost_stats['cache_hit_rate']:.1%}") + + print(f"\nCATEGORY BREAKDOWN:") + for category, count in stats['categories'].items(): + print(f" {category}: {count} agents") + + print(f"\nROLE BREAKDOWN:") + for role, count in stats['roles'].items(): + print(f" {role}: {count} agents") + + print(f"\nEXPERIENCE LEVEL BREAKDOWN:") + for level, count in stats['experience_levels'].items(): + print(f" {level}: {count} agents") + + # Demonstrate cost-optimized mass task execution + print(f"\nCOST-OPTIMIZED MASS TASK DEMONSTRATION:") + print("-" * 40) + + # Small task first (low cost) + small_result = template.run_mass_task( + "What is the most important skill for a software developer?", + agent_count=5 + ) + + print(f"Small Task Results:") + print(f" Agents Used: {len(small_result['agents_used'])}") + print(f" Cached: {small_result.get('cached', False)}") + print(f" Cost: ${small_result['cost_stats']['total_cost']:.2f}") + + # Large task to demonstrate full capability + print(f"\nLarge Task Demonstration (Full Capability):") + large_result = template.run_mass_task( + "Analyze the benefits of cloud computing for small businesses", + agent_count=200 # Use more agents to show capability + ) + + print(f" Agents Used: {len(large_result['agents_used'])}") + print(f" Cached: {large_result.get('cached', False)}") + print(f" Cost: ${large_result['cost_stats']['total_cost']:.2f}") + print(f" Budget Remaining: ${large_result['cost_stats']['budget_remaining']:.2f}") + + # Show what happens with cost limits + print(f"\nCost-Limited Task Demonstration:") + cost_limited_result = template.run_mass_task_optimized( + "What are the key principles of agile development?", + agent_count=100, + max_cost=2.0 # Show cost limiting in action + ) + + print(f" Agents Used: {len(cost_limited_result['agents_used'])}") + print(f" Cached: {cost_limited_result.get('cached', False)}") + print(f" Cost: ${cost_limited_result['cost_stats']['total_cost']:.2f}") + print(f" Budget Remaining: ${cost_limited_result['cost_stats']['budget_remaining']:.2f}") + + # Show final cost statistics + final_stats = template.get_system_stats() + print(f"\nFINAL COST STATISTICS:") + print(f"Total Cost: ${final_stats['cost_stats']['total_cost']:.2f}") + print(f"Budget Remaining: ${final_stats['cost_stats']['budget_remaining']:.2f}") + print(f"Cache Hit Rate: {final_stats['cost_stats']['cache_hit_rate']:.1%}") + print(f"Total Requests: {final_stats['cost_stats']['requests_made']}") + print(f"Cache Hits: {final_stats['cost_stats']['cache_hits']}") + + print(f"\nDEMONSTRATION COMPLETED SUCCESSFULLY!") + print(f"✅ Cost optimization working: ${final_stats['cost_stats']['total_cost']:.2f} spent") + print(f"✅ Lazy loading working: {final_stats['loaded_agents']}/{final_stats['total_agents']} agents loaded") + print(f"✅ Caching working: {final_stats['cost_stats']['cache_hit_rate']:.1%} hit rate") + + +if __name__ == "__main__": + demonstrate_mass_agent_template() \ No newline at end of file diff --git a/examples/simulations/euroswarm_parliament/test_mass_agents.py b/examples/simulations/euroswarm_parliament/test_mass_agents.py new file mode 100644 index 00000000..54ec2223 --- /dev/null +++ b/examples/simulations/euroswarm_parliament/test_mass_agents.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +""" +Test script to verify mass agent template can process more than 500 agents. +""" + +from mass_agent_template import MassAgentTemplate + +def test_mass_agents(): + print("Testing Mass Agent Template - Processing More Than 50 Agents") + print("=" * 60) + + # Initialize template with 200 agents + template = MassAgentTemplate( + agent_count=200, + budget_limit=50.0, + batch_size=25, + verbose=True + ) + + print(f"Initialized with {len(template.agents)} agents") + print(f"Budget limit: ${template.cost_tracker.budget_limit}") + + # Test processing 100 agents + print(f"\nTesting with 100 agents...") + result = template.run_mass_task( + "What is the most important skill for your role?", + agent_count=100 + ) + + print(f"Results:") + print(f" Agents processed: {len(result['agents_used'])}") + print(f" Cost: ${result['cost_stats']['total_cost']:.4f}") + print(f" Budget remaining: ${result['cost_stats']['budget_remaining']:.2f}") + print(f" Cached: {result.get('cached', False)}") + + # Test processing 150 agents + print(f"\nTesting with 150 agents...") + result2 = template.run_mass_task( + "Describe your approach to problem-solving", + agent_count=150 + ) + + print(f"Results:") + print(f" Agents processed: {len(result2['agents_used'])}") + print(f" Cost: ${result2['cost_stats']['total_cost']:.4f}") + print(f" Budget remaining: ${result2['cost_stats']['budget_remaining']:.2f}") + print(f" Cached: {result2.get('cached', False)}") + + # Show final stats + final_stats = template.get_system_stats() + print(f"\nFinal Statistics:") + print(f" Total agents: {final_stats['total_agents']}") + print(f" Loaded agents: {final_stats['loaded_agents']}") + print(f" Total cost: ${final_stats['cost_stats']['total_cost']:.4f}") + print(f" Budget remaining: ${final_stats['cost_stats']['budget_remaining']:.2f}") + + # Success criteria + total_processed = len(result['agents_used']) + len(result2['agents_used']) + print(f"\nTotal agents processed: {total_processed}") + + if total_processed > 50: + print("✅ SUCCESS: Template processed more than 50 agents!") + else: + print("❌ FAILURE: Template still limited to 50 agents") + +if __name__ == "__main__": + test_mass_agents() \ No newline at end of file diff --git a/examples/simulations/euroswarm_parliament/wikipedia_personality_scraper.py b/examples/simulations/euroswarm_parliament/wikipedia_personality_scraper.py new file mode 100644 index 00000000..0ef7cda7 --- /dev/null +++ b/examples/simulations/euroswarm_parliament/wikipedia_personality_scraper.py @@ -0,0 +1,575 @@ +#!/usr/bin/env python3 +""" +Wikipedia Personality Scraper for EuroSwarm Parliament MEPs + +This module scrapes Wikipedia data for each MEP to create realistic, personality-driven +AI agents based on their real backgrounds, political history, and personal beliefs. +""" + +import json +import os +import time +import re +from typing import Dict, List, Optional, Any +from dataclasses import dataclass, asdict +import requests +from loguru import logger +import xml.etree.ElementTree as ET + + +@dataclass +class MEPPersonalityProfile: + """ + Comprehensive personality profile for an MEP based on Wikipedia data. + + Attributes: + full_name: Full name of the MEP + mep_id: Unique MEP identifier + wikipedia_url: URL of the MEP's Wikipedia page + summary: Brief summary of the MEP's background + early_life: Early life and education information + political_career: Political career and positions held + political_views: Key political views and positions + policy_focus: Areas of policy expertise and focus + achievements: Notable achievements and accomplishments + controversies: Any controversies or notable incidents + personal_life: Personal background and family information + education: Educational background + professional_background: Professional experience before politics + party_affiliations: Political party history + committee_experience: Parliamentary committee experience + voting_record: Notable voting patterns or positions + public_statements: Key public statements or quotes + interests: Personal and professional interests + languages: Languages spoken + awards: Awards and recognitions + publications: Publications or written works + social_media: Social media presence + last_updated: When the profile was last updated + """ + + full_name: str + mep_id: str + wikipedia_url: Optional[str] = None + summary: str = "" + early_life: str = "" + political_career: str = "" + political_views: str = "" + policy_focus: str = "" + achievements: str = "" + controversies: str = "" + personal_life: str = "" + education: str = "" + professional_background: str = "" + party_affiliations: str = "" + committee_experience: str = "" + voting_record: str = "" + public_statements: str = "" + interests: str = "" + languages: str = "" + awards: str = "" + publications: str = "" + social_media: str = "" + last_updated: str = "" + + +class WikipediaPersonalityScraper: + """ + Scraper for gathering Wikipedia personality data for MEPs. + """ + + def __init__(self, output_dir: str = "mep_personalities", verbose: bool = True): + """ + Initialize the Wikipedia personality scraper. + + Args: + output_dir: Directory to store personality profiles + verbose: Enable verbose logging + """ + self.output_dir = output_dir + self.verbose = verbose + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'EuroSwarm Parliament Personality Scraper/1.0 (https://github.com/swarms-democracy)' + }) + + # Create output directory + os.makedirs(output_dir, exist_ok=True) + + if verbose: + logger.info(f"Wikipedia Personality Scraper initialized. Output directory: {output_dir}") + + def extract_mep_data_from_xml(self, xml_file: str = "EU.xml") -> List[Dict[str, str]]: + """ + Extract MEP data from EU.xml file. + + Args: + xml_file: Path to EU.xml file + + Returns: + List of MEP data dictionaries + """ + meps = [] + + try: + with open(xml_file, 'r', encoding='utf-8') as f: + content = f.read() + + # Use regex to extract MEP data + mep_pattern = r'\s*(.*?)\s*(.*?)\s*(.*?)\s*(.*?)\s*(.*?)\s*' + mep_matches = re.findall(mep_pattern, content, re.DOTALL) + + for full_name, country, political_group, mep_id, national_party in mep_matches: + meps.append({ + 'full_name': full_name.strip(), + 'country': country.strip(), + 'political_group': political_group.strip(), + 'mep_id': mep_id.strip(), + 'national_party': national_party.strip() + }) + + if self.verbose: + logger.info(f"Extracted {len(meps)} MEPs from {xml_file}") + + except Exception as e: + logger.error(f"Error extracting MEP data from {xml_file}: {e}") + + return meps + + def search_wikipedia_page(self, mep_name: str, country: str) -> Optional[str]: + """ + Search for a Wikipedia page for an MEP. + + Args: + mep_name: Full name of the MEP + country: Country of the MEP + + Returns: + Wikipedia page title if found, None otherwise + """ + try: + # Search for the MEP on Wikipedia + search_url = "https://en.wikipedia.org/w/api.php" + search_params = { + 'action': 'query', + 'format': 'json', + 'list': 'search', + 'srsearch': f'"{mep_name}" {country}', + 'srlimit': 5, + 'srnamespace': 0 + } + + response = self.session.get(search_url, params=search_params) + response.raise_for_status() + + data = response.json() + search_results = data.get('query', {}).get('search', []) + + if search_results: + # Return the first result + return search_results[0]['title'] + + # Try alternative search without quotes + search_params['srsearch'] = f'{mep_name} {country}' + response = self.session.get(search_url, params=search_params) + response.raise_for_status() + + data = response.json() + search_results = data.get('query', {}).get('search', []) + + if search_results: + return search_results[0]['title'] + + except Exception as e: + if self.verbose: + logger.warning(f"Error searching Wikipedia for {mep_name}: {e}") + + return None + + def get_wikipedia_content(self, page_title: str) -> Optional[Dict[str, Any]]: + """ + Get Wikipedia content for a specific page. + + Args: + page_title: Wikipedia page title + + Returns: + Dictionary containing page content and metadata + """ + try: + # Get page content + content_url = "https://en.wikipedia.org/w/api.php" + content_params = { + 'action': 'query', + 'format': 'json', + 'titles': page_title, + 'prop': 'extracts|info|categories', + 'exintro': True, + 'explaintext': True, + 'inprop': 'url', + 'cllimit': 50 + } + + response = self.session.get(content_url, params=content_params) + response.raise_for_status() + + data = response.json() + pages = data.get('query', {}).get('pages', {}) + + if pages: + page_id = list(pages.keys())[0] + page_data = pages[page_id] + + return { + 'title': page_data.get('title', ''), + 'extract': page_data.get('extract', ''), + 'url': page_data.get('fullurl', ''), + 'categories': [cat['title'] for cat in page_data.get('categories', [])], + 'pageid': page_data.get('pageid', ''), + 'length': page_data.get('length', 0) + } + + except Exception as e: + if self.verbose: + logger.warning(f"Error getting Wikipedia content for {page_title}: {e}") + + return None + + def parse_wikipedia_content(self, content: str, mep_name: str) -> Dict[str, str]: + """ + Parse Wikipedia content to extract structured personality information. + + Args: + content: Raw Wikipedia content + mep_name: Name of the MEP + + Returns: + Dictionary of parsed personality information + """ + personality_data = { + 'summary': '', + 'early_life': '', + 'political_career': '', + 'political_views': '', + 'policy_focus': '', + 'achievements': '', + 'controversies': '', + 'personal_life': '', + 'education': '', + 'professional_background': '', + 'party_affiliations': '', + 'committee_experience': '', + 'voting_record': '', + 'public_statements': '', + 'interests': '', + 'languages': '', + 'awards': '', + 'publications': '', + 'social_media': '' + } + + # Extract summary (first paragraph) + paragraphs = content.split('\n\n') + if paragraphs: + personality_data['summary'] = paragraphs[0][:1000] # Limit summary length + + # Look for specific sections + content_lower = content.lower() + + # Early life and education + early_life_patterns = [ + r'early life[^.]*\.', + r'born[^.]*\.', + r'childhood[^.]*\.', + r'grew up[^.]*\.', + r'education[^.]*\.' + ] + + for pattern in early_life_patterns: + matches = re.findall(pattern, content_lower, re.IGNORECASE) + if matches: + personality_data['early_life'] = ' '.join(matches[:3]) # Take first 3 matches + break + + # Political career + political_patterns = [ + r'political career[^.]*\.', + r'elected[^.]*\.', + r'parliament[^.]*\.', + r'minister[^.]*\.', + r'party[^.]*\.' + ] + + for pattern in political_patterns: + matches = re.findall(pattern, content_lower, re.IGNORECASE) + if matches: + personality_data['political_career'] = ' '.join(matches[:5]) # Take first 5 matches + break + + # Political views + views_patterns = [ + r'political views[^.]*\.', + r'positions[^.]*\.', + r'advocates[^.]*\.', + r'supports[^.]*\.', + r'opposes[^.]*\.' + ] + + for pattern in views_patterns: + matches = re.findall(pattern, content_lower, re.IGNORECASE) + if matches: + personality_data['political_views'] = ' '.join(matches[:3]) + break + + # Policy focus + policy_patterns = [ + r'policy[^.]*\.', + r'focus[^.]*\.', + r'issues[^.]*\.', + r'legislation[^.]*\.' + ] + + for pattern in policy_patterns: + matches = re.findall(pattern, content_lower, re.IGNORECASE) + if matches: + personality_data['policy_focus'] = ' '.join(matches[:3]) + break + + # Achievements + achievement_patterns = [ + r'achievements[^.]*\.', + r'accomplishments[^.]*\.', + r'success[^.]*\.', + r'won[^.]*\.', + r'received[^.]*\.' + ] + + for pattern in achievement_patterns: + matches = re.findall(pattern, content_lower, re.IGNORECASE) + if matches: + personality_data['achievements'] = ' '.join(matches[:3]) + break + + return personality_data + + def create_personality_profile(self, mep_data: Dict[str, str]) -> MEPPersonalityProfile: + """ + Create a personality profile for an MEP. + + Args: + mep_data: MEP data from XML file + + Returns: + MEPPersonalityProfile object + """ + mep_name = mep_data['full_name'] + country = mep_data['country'] + + # Search for Wikipedia page + page_title = self.search_wikipedia_page(mep_name, country) + + if page_title: + # Get Wikipedia content + wiki_content = self.get_wikipedia_content(page_title) + + if wiki_content: + # Parse content + personality_data = self.parse_wikipedia_content(wiki_content['extract'], mep_name) + + # Create profile + profile = MEPPersonalityProfile( + full_name=mep_name, + mep_id=mep_data['mep_id'], + wikipedia_url=wiki_content['url'], + summary=personality_data['summary'], + early_life=personality_data['early_life'], + political_career=personality_data['political_career'], + political_views=personality_data['political_views'], + policy_focus=personality_data['policy_focus'], + achievements=personality_data['achievements'], + controversies=personality_data['controversies'], + personal_life=personality_data['personal_life'], + education=personality_data['education'], + professional_background=personality_data['professional_background'], + party_affiliations=personality_data['party_affiliations'], + committee_experience=personality_data['committee_experience'], + voting_record=personality_data['voting_record'], + public_statements=personality_data['public_statements'], + interests=personality_data['interests'], + languages=personality_data['languages'], + awards=personality_data['awards'], + publications=personality_data['publications'], + social_media=personality_data['social_media'], + last_updated=time.strftime("%Y-%m-%d %H:%M:%S") + ) + + if self.verbose: + logger.info(f"Created personality profile for {mep_name} from Wikipedia") + + return profile + + # Create minimal profile if no Wikipedia data found + profile = MEPPersonalityProfile( + full_name=mep_name, + mep_id=mep_data['mep_id'], + summary=f"{mep_name} is a Member of the European Parliament representing {country}.", + political_career=f"Currently serving as MEP for {country}.", + political_views=f"Member of {mep_data['political_group']} and {mep_data['national_party']}.", + last_updated=time.strftime("%Y-%m-%d %H:%M:%S") + ) + + if self.verbose: + logger.warning(f"No Wikipedia data found for {mep_name}, created minimal profile") + + return profile + + def save_personality_profile(self, profile: MEPPersonalityProfile) -> str: + """ + Save personality profile to JSON file. + + Args: + profile: MEPPersonalityProfile object + + Returns: + Path to saved file + """ + # Create safe filename + safe_name = re.sub(r'[^\w\s-]', '', profile.full_name).strip() + safe_name = re.sub(r'[-\s]+', '_', safe_name) + filename = f"{safe_name}_{profile.mep_id}.json" + filepath = os.path.join(self.output_dir, filename) + + # Convert to dictionary and save + profile_dict = asdict(profile) + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(profile_dict, f, indent=2, ensure_ascii=False) + + if self.verbose: + logger.info(f"Saved personality profile: {filepath}") + + return filepath + + def scrape_all_mep_personalities(self, xml_file: str = "EU.xml", delay: float = 1.0) -> Dict[str, str]: + """ + Scrape personality data for all MEPs. + + Args: + xml_file: Path to EU.xml file + delay: Delay between requests to be respectful to Wikipedia + + Returns: + Dictionary mapping MEP names to their personality profile file paths + """ + meps = self.extract_mep_data_from_xml(xml_file) + profile_files = {} + + if self.verbose: + logger.info(f"Starting personality scraping for {len(meps)} MEPs") + + for i, mep_data in enumerate(meps, 1): + mep_name = mep_data['full_name'] + + if self.verbose: + logger.info(f"Processing {i}/{len(meps)}: {mep_name}") + + try: + # Create personality profile + profile = self.create_personality_profile(mep_data) + + # Save profile + filepath = self.save_personality_profile(profile) + profile_files[mep_name] = filepath + + # Respectful delay + time.sleep(delay) + + except Exception as e: + logger.error(f"Error processing {mep_name}: {e}") + continue + + if self.verbose: + logger.info(f"Completed personality scraping. {len(profile_files)} profiles created.") + + return profile_files + + def load_personality_profile(self, filepath: str) -> MEPPersonalityProfile: + """ + Load personality profile from JSON file. + + Args: + filepath: Path to personality profile JSON file + + Returns: + MEPPersonalityProfile object + """ + with open(filepath, 'r', encoding='utf-8') as f: + data = json.load(f) + + return MEPPersonalityProfile(**data) + + def get_personality_summary(self, profile: MEPPersonalityProfile) -> str: + """ + Generate a personality summary for use in AI agent system prompts. + + Args: + profile: MEPPersonalityProfile object + + Returns: + Formatted personality summary + """ + summary_parts = [] + + if profile.summary: + summary_parts.append(f"Background: {profile.summary}") + + if profile.political_career: + summary_parts.append(f"Political Career: {profile.political_career}") + + if profile.political_views: + summary_parts.append(f"Political Views: {profile.political_views}") + + if profile.policy_focus: + summary_parts.append(f"Policy Focus: {profile.policy_focus}") + + if profile.achievements: + summary_parts.append(f"Notable Achievements: {profile.achievements}") + + if profile.education: + summary_parts.append(f"Education: {profile.education}") + + if profile.professional_background: + summary_parts.append(f"Professional Background: {profile.professional_background}") + + return "\n".join(summary_parts) + + +def main(): + """Main function to run the Wikipedia personality scraper.""" + + print("🏛️ WIKIPEDIA PERSONALITY SCRAPER FOR EUROSWARM PARLIAMENT") + print("=" * 70) + + # Initialize scraper + scraper = WikipediaPersonalityScraper(output_dir="mep_personalities", verbose=True) + + # Scrape all MEP personalities + profile_files = scraper.scrape_all_mep_personalities(delay=1.0) + + print(f"\n✅ Scraping completed!") + print(f"📁 Profiles saved to: {scraper.output_dir}") + print(f"📊 Total profiles created: {len(profile_files)}") + + # Show sample profile + if profile_files: + sample_name = list(profile_files.keys())[0] + sample_file = profile_files[sample_name] + sample_profile = scraper.load_personality_profile(sample_file) + + print(f"\n📋 Sample Profile: {sample_name}") + print("-" * 50) + print(scraper.get_personality_summary(sample_profile)) + + +if __name__ == "__main__": + main() \ No newline at end of file