From a3a6e95308825484ad94a76c446b42079ac8de5d Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Mon, 22 Sep 2025 20:28:50 +0300 Subject: [PATCH] Update corporate_swarm.py --- swarms/structs/corporate_swarm.py | 414 ++++++++++++++---------------- 1 file changed, 199 insertions(+), 215 deletions(-) diff --git a/swarms/structs/corporate_swarm.py b/swarms/structs/corporate_swarm.py index fa1c95f9..8b7ec011 100644 --- a/swarms/structs/corporate_swarm.py +++ b/swarms/structs/corporate_swarm.py @@ -15,6 +15,7 @@ import os import time import traceback import uuid +import yaml from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field @@ -177,7 +178,7 @@ class CorporateConfig: self._load_from_dict(self.config_data) except Exception as e: CORPORATE_LOGGER.error(f"Configuration loading failed: {e}") - raise + raise ValueError(f"Configuration loading failed: {e}") from e def _load_from_file(self) -> None: """Load configuration from YAML file.""" @@ -188,7 +189,7 @@ class CorporateConfig: CORPORATE_LOGGER.info(f"Loaded config from: {self.config_file_path}") except Exception as e: CORPORATE_LOGGER.warning(f"File loading failed {self.config_file_path}: {e}") - raise + raise ValueError(f"Configuration file loading failed: {e}") from e def _load_from_dict(self, config_dict: Dict[str, Any]) -> None: """Load configuration from dictionary with validation.""" @@ -198,7 +199,7 @@ class CorporateConfig: setattr(self.config, key, value) except (ValueError, TypeError) as e: CORPORATE_LOGGER.warning(f"Config {key} failed: {e}") - raise ValueError(f"Invalid configuration value for {key}: {e}") + raise ValueError(f"Invalid configuration value for {key}: {e}") from e def get_config(self) -> CorporateConfigModel: return self.config @@ -208,7 +209,7 @@ class CorporateConfig: self._load_from_dict(updates) except ValueError as e: CORPORATE_LOGGER.error(f"Config update failed: {e}") - raise + raise ValueError(f"Configuration update failed: {e}") from e def validate_config(self) -> List[str]: """Validate configuration and return error list.""" @@ -242,21 +243,29 @@ def get_corporate_config(config_file_path: Optional[str] = None) -> CorporateCon # CORPORATE SWARM DATA MODELS # ============================================================================ +def _generate_uuid() -> str: + """Generate a new UUID string.""" + return str(uuid.uuid4()) + +def _get_audit_date() -> float: + """Get next audit date (1 year from now).""" + return time.time() + (365 * 24 * 60 * 60) + class BaseCorporateAgent(ABC): """Base class for corporate agents.""" @abstractmethod - def run(self, task: str, **kwargs) -> Any: + def run(self, task: str, **kwargs) -> Union[str, Dict[str, Any]]: """Synchronous execution method.""" pass @abstractmethod - async def arun(self, task: str, **kwargs) -> Any: + async def arun(self, task: str, **kwargs) -> Union[str, Dict[str, Any]]: """Asynchronous execution method.""" pass - def __call__(self, task: str, **kwargs) -> Any: + def __call__(self, task: str, **kwargs) -> Union[str, Dict[str, Any]]: """Callable interface.""" return self.run(task, **kwargs) @@ -315,7 +324,7 @@ class MeetingType(str, Enum): @dataclass class BoardCommittee: """Board committee with governance responsibilities.""" - committee_id: str = field(default_factory=lambda: str(uuid.uuid4())) + committee_id: str = field(default_factory=_generate_uuid) name: str = "" committee_type: BoardCommitteeType = BoardCommitteeType.GOVERNANCE chair: str = "" @@ -329,7 +338,7 @@ class BoardCommittee: @dataclass class BoardMeeting: """Board meeting with agenda and minutes.""" - meeting_id: str = field(default_factory=lambda: str(uuid.uuid4())) + meeting_id: str = field(default_factory=_generate_uuid) meeting_type: MeetingType = MeetingType.REGULAR_BOARD date: float = field(default_factory=time.time) location: str = "" @@ -344,7 +353,7 @@ class BoardMeeting: class CorporateMember(BaseModel): """Corporate stakeholder with role, expertise, and governance responsibilities.""" - member_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + member_id: str = Field(default_factory=_generate_uuid) name: str = Field(default="") role: CorporateRole = Field(default=CorporateRole.EMPLOYEE) department: DepartmentType = Field(default=DepartmentType.OPERATIONS) @@ -355,14 +364,14 @@ class CorporateMember(BaseModel): term_start: float = Field(default_factory=time.time) term_end: Optional[float] = Field(default=None) compensation: Dict[str, Any] = Field(default_factory=dict) - agent: Optional[Agent] = Field(default=None) + agent: Optional[Agent] = Field(default=None, exclude=True) metadata: Dict[str, Any] = Field(default_factory=dict) class CorporateProposal(BaseModel): """Corporate proposal requiring decision-making with financial impact.""" - proposal_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + proposal_id: str = Field(default_factory=_generate_uuid) title: str = Field(default="") description: str = Field(default="") proposal_type: ProposalType = Field(default=ProposalType.STRATEGIC_INITIATIVE) @@ -377,7 +386,7 @@ class CorporateProposal(BaseModel): @dataclass class CorporateDepartment: """Corporate department with specific functions and budget.""" - department_id: str = field(default_factory=lambda: str(uuid.uuid4())) + department_id: str = field(default_factory=_generate_uuid) name: str = "" department_type: DepartmentType = DepartmentType.OPERATIONS head: str = "" @@ -391,7 +400,7 @@ class CorporateDepartment: class CorporateVote(BaseModel): """Corporate voting session with individual votes and analysis.""" - vote_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + vote_id: str = Field(default_factory=_generate_uuid) proposal: CorporateProposal = Field(default_factory=CorporateProposal) participants: List[str] = Field(default_factory=list) individual_votes: Dict[str, Dict[str, Any]] = Field(default_factory=dict) @@ -418,7 +427,7 @@ class ESGScore(BaseModel): class RiskAssessment(BaseModel): """Comprehensive risk assessment model for corporate governance.""" - risk_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + risk_id: str = Field(default_factory=_generate_uuid) risk_category: str = Field(default="operational") risk_level: str = Field(default="medium") probability: float = Field(default=0.5, ge=0.0, le=1.0) @@ -433,7 +442,7 @@ class RiskAssessment(BaseModel): class StakeholderEngagement(BaseModel): """Stakeholder engagement and management model.""" - stakeholder_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + stakeholder_id: str = Field(default_factory=_generate_uuid) stakeholder_type: str = Field(default="investor") name: str = Field(default="") influence_level: str = Field(default="medium") @@ -447,7 +456,7 @@ class StakeholderEngagement(BaseModel): class ComplianceFramework(BaseModel): """Regulatory compliance and audit framework model.""" - compliance_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + compliance_id: str = Field(default_factory=_generate_uuid) regulation_name: str = Field(default="") regulation_type: str = Field(default="financial") compliance_status: str = Field(default="compliant") @@ -455,7 +464,7 @@ class ComplianceFramework(BaseModel): requirements: List[str] = Field(default_factory=list) controls: List[str] = Field(default_factory=list) audit_findings: List[str] = Field(default_factory=list) - next_audit_date: float = Field(default_factory=lambda: time.time() + (365 * 24 * 60 * 60)) + next_audit_date: float = Field(default_factory=_get_audit_date) responsible_officer: str = Field(default="") @@ -554,7 +563,7 @@ class CorporateSwarm(BaseCorporateAgent): except Exception as e: CORPORATE_LOGGER.error(f"Failed to initialize CorporateSwarm: {str(e)}") - raise + raise RuntimeError(f"Failed to initialize CorporateSwarm: {str(e)}") from e def _perform_reliability_checks(self) -> None: """Validate critical requirements and configuration parameters.""" @@ -589,7 +598,7 @@ class CorporateSwarm(BaseCorporateAgent): except Exception as e: CORPORATE_LOGGER.error(f"Failed reliability checks: {str(e)}") - raise + raise ValueError(f"Reliability checks failed: {str(e)}") from e @classmethod def create_simple_corporation( @@ -603,10 +612,8 @@ class CorporateSwarm(BaseCorporateAgent): return cls( name=name, max_loops=1, - enable_democratic_discussion=True, - enable_departmental_work=True, - enable_financial_oversight=True, - verbose=verbose + verbose=verbose, + corporate_model_name="gpt-4o-mini" ) def run( @@ -708,7 +715,7 @@ class CorporateSwarm(BaseCorporateAgent): def _initialize_default_structure(self) -> None: """Initialize default corporate structure with key positions.""" if self.verbose: - logger.info("Initializing default corporate structure") + CORPORATE_LOGGER.info("Initializing default corporate structure") # Create executive team executives = [ @@ -766,7 +773,7 @@ class CorporateSwarm(BaseCorporateAgent): self._create_board_committees() if self.verbose: - logger.info(f"Created {len(self.members)} members, {len(self.departments)} departments, {len(self.board_committees)} committees") + CORPORATE_LOGGER.info(f"Created {len(self.members)} members, {len(self.departments)} departments, {len(self.board_committees)} committees") def _create_departments(self) -> None: """Create corporate departments with heads and objectives.""" @@ -871,7 +878,7 @@ class CorporateSwarm(BaseCorporateAgent): def _initialize_democratic_swarm(self) -> None: """Initialize democratic decision-making swarm.""" if self.verbose: - logger.info("Initializing democratic decision-making swarm") + CORPORATE_LOGGER.info("Initializing democratic decision-making swarm") # Create specialized swarms for different corporate functions governance_swarm = SwarmRouter( @@ -951,7 +958,7 @@ class CorporateSwarm(BaseCorporateAgent): self.executive_team.append(member.member_id) if self.verbose: - logger.info(f"Added member: {name} as {role.value} in {department.value}") + CORPORATE_LOGGER.info(f"Added member: {name} as {role.value} in {department.value}") return member.member_id @@ -1021,7 +1028,7 @@ When participating in corporate decisions, always provide: self.proposals.append(proposal) if self.verbose: - logger.info(f"Created proposal: {title} by {self.members[sponsor_id].name}") + CORPORATE_LOGGER.info(f"Created proposal: {title} by {self.members[sponsor_id].name}") return proposal.proposal_id @@ -1031,31 +1038,60 @@ When participating in corporate decisions, always provide: participants: List[str] = None ) -> CorporateVote: """Conduct a democratic vote on a corporate proposal.""" - # Find the proposal - proposal = None - for p in self.proposals: - if p.proposal_id == proposal_id: - proposal = p - break + proposal = self._find_proposal(proposal_id) + participants = participants or self._get_default_participants() - if not proposal: - raise ValueError(f"Proposal {proposal_id} not found") + if not self.cost_tracker.check_budget(): + return self._create_failed_vote(proposal) - if not participants: - # Default to board members and executive team - participants = self.board_members + self.executive_team + democratic_result = self._get_democratic_decision(proposal) + individual_votes = self._collect_individual_votes(proposal, participants) + vote_result = self._analyze_vote_results(individual_votes, proposal) - # Check budget before starting - if not self.cost_tracker.check_budget(): - return CorporateVote( - proposal=proposal, - result=VoteResult.FAILED - ) + vote = self._create_vote_record(proposal, participants, individual_votes, democratic_result, vote_result) + self.votes.append(vote) - # Use democratic swarm for decision-making if available - democratic_result = None - if self.democratic_swarm is not None: - decision_task = f""" + if self.verbose: + CORPORATE_LOGGER.info(f"Vote completed: {proposal.title} - Result: {vote_result.value}") + + return vote + + def _find_proposal(self, proposal_id: str) -> CorporateProposal: + """Find a proposal by ID.""" + for proposal in self.proposals: + if proposal.proposal_id == proposal_id: + return proposal + raise ValueError(f"Proposal {proposal_id} not found") + + def _get_default_participants(self) -> List[str]: + """Get default voting participants.""" + return self.board_members + self.executive_team + + def _create_failed_vote(self, proposal: CorporateProposal) -> CorporateVote: + """Create a failed vote due to budget constraints.""" + return CorporateVote( + proposal=proposal, + result=VoteResult.FAILED + ) + + def _get_democratic_decision(self, proposal: CorporateProposal) -> Optional[Dict[str, Any]]: + """Get democratic decision from swarm if available.""" + if self.democratic_swarm is None: + return None + + decision_task = self._create_decision_task(proposal) + + try: + democratic_result = self.democratic_swarm.run(decision_task) + return self._parse_democratic_result(democratic_result) + except Exception as e: + if self.verbose: + CORPORATE_LOGGER.warning(f"Democratic swarm encountered issue: {e}") + return {'result': 'error', 'error': str(e), 'type': 'error_response'} + + def _create_decision_task(self, proposal: CorporateProposal) -> str: + """Create decision task for democratic swarm.""" + return f""" Corporate Proposal Vote: {proposal.title} Proposal Description: {proposal.description} @@ -1074,54 +1110,34 @@ As a corporate decision-making body, please: This is a critical corporate decision that will impact the organization's future direction. """ - - # Get democratic decision - try: - democratic_result = self.democratic_swarm.run(decision_task) - - # Handle case where democratic_swarm returns a list instead of dict - if isinstance(democratic_result, list): - # Extract function call arguments if available - if democratic_result and len(democratic_result) > 0: - first_item = democratic_result[0] - if isinstance(first_item, dict) and 'function' in first_item: - function_data = first_item.get('function', {}) - if 'arguments' in function_data: - # Try to parse the arguments as JSON - try: - import json - args_str = function_data['arguments'] - if isinstance(args_str, str): - parsed_args = json.loads(args_str) - democratic_result = parsed_args - else: - democratic_result = args_str - except (json.JSONDecodeError, TypeError): - # If parsing fails, use the raw arguments - democratic_result = function_data.get('arguments', {}) - else: - democratic_result = function_data - else: - # If it's not a function call, convert to a simple dict - democratic_result = { - 'result': 'processed', - 'data': democratic_result, - 'type': 'list_response' - } - else: - democratic_result = {'result': 'empty_response', 'type': 'list_response'} - - except Exception as e: - if self.verbose: - logger.warning(f"Democratic swarm encountered issue: {e}") - democratic_result = {'result': 'error', 'error': str(e), 'type': 'error_response'} + + def _parse_democratic_result(self, result: Any) -> Dict[str, Any]: + """Parse democratic swarm result with robust error handling.""" + if not isinstance(result, list) or not result: + return {'result': 'empty_response', 'type': 'list_response'} + + first_item = result[0] + if not isinstance(first_item, dict) or 'function' not in first_item: + return {'result': 'processed', 'data': result, 'type': 'list_response'} - # Conduct individual member votes + function_data = first_item.get('function', {}) + if 'arguments' in function_data: + try: + args_str = function_data['arguments'] + if isinstance(args_str, str): + parsed_args = json.loads(args_str) + return parsed_args + else: + return args_str + except (json.JSONDecodeError, TypeError): + return function_data.get('arguments', {}) + else: + return function_data + + def _collect_individual_votes(self, proposal: CorporateProposal, participants: List[str]) -> Dict[str, Dict[str, Any]]: + """Collect individual votes from participants.""" individual_votes = {} - reasoning = {} - total_processed = 0 - # Process participants in batches for i in range(0, len(participants), self.batch_size): batch = participants[i:i + self.batch_size] @@ -1133,8 +1149,40 @@ This is a critical corporate decision that will impact the organization's future if not member.agent: continue - # Create voting prompt - vote_prompt = f""" + vote_data = self._get_member_vote(member, proposal) + if vote_data: + individual_votes[member_id] = vote_data + + return individual_votes + + def _get_member_vote(self, member: CorporateMember, proposal: CorporateProposal) -> Optional[Dict[str, Any]]: + """Get vote from a single member.""" + vote_prompt = self._create_vote_prompt(member, proposal) + + try: + response = member.agent.run(vote_prompt) + + if not isinstance(response, str): + response = str(response) + + return { + "vote": "APPROVE" if "approve" in response.lower() else + "REJECT" if "reject" in response.lower() else "ABSTAIN", + "reasoning": response, + "member_id": member.member_id, + "member_name": member.name, + "role": member.role.value, + "department": member.department.value, + "voting_weight": member.voting_weight + } + except Exception as e: + if self.verbose: + CORPORATE_LOGGER.error(f"Error getting vote from {member.name}: {e}") + return None + + def _create_vote_prompt(self, member: CorporateMember, proposal: CorporateProposal) -> str: + """Create voting prompt for a member.""" + return f""" Corporate Proposal Vote: {proposal.title} Proposal Details: @@ -1156,41 +1204,17 @@ Please provide your vote and reasoning: Consider your expertise in: {', '.join(member.expertise_areas)} Your voting weight: {member.voting_weight} """ - - try: - # Get member's vote - response = member.agent.run(vote_prompt) - - # Ensure response is a string - if not isinstance(response, str): - response = str(response) - - # Parse response (simplified parsing) - vote_data = { - "vote": "APPROVE" if "approve" in response.lower() else - "REJECT" if "reject" in response.lower() else "ABSTAIN", - "reasoning": response, - "member_id": member_id, - "member_name": member.name, - "role": member.role.value, - "department": member.department.value, - "voting_weight": member.voting_weight - } - - individual_votes[member_id] = vote_data - reasoning[member_id] = response - total_processed += 1 - - except Exception as e: - if self.verbose: - logger.error(f"Error getting vote from {member.name}: {e}") - continue - - # Analyze results - vote_result = self._analyze_vote_results(individual_votes, proposal) - - # Create vote record - vote = CorporateVote( + + def _create_vote_record( + self, + proposal: CorporateProposal, + participants: List[str], + individual_votes: Dict[str, Dict[str, Any]], + democratic_result: Optional[Dict[str, Any]], + vote_result: VoteResult + ) -> CorporateVote: + """Create vote record with all collected data.""" + return CorporateVote( proposal=proposal, participants=participants, individual_votes=individual_votes, @@ -1198,17 +1222,10 @@ Your voting weight: {member.voting_weight} result=vote_result, metadata={ "total_participants": len(participants), - "total_processed": total_processed, + "total_processed": len(individual_votes), "processing_time": time.time() } ) - - self.votes.append(vote) - - if self.verbose: - logger.info(f"Vote completed: {proposal.title} - Result: {vote_result.value}") - - return vote def create_board_committee( self, @@ -1264,7 +1281,7 @@ Your voting weight: {member.voting_weight} self.members[member_id].board_committees.append(committee.committee_id) if self.verbose: - logger.info(f"Created board committee: {name} with {len(members)} members") + CORPORATE_LOGGER.info(f"Created board committee: {name} with {len(members)} members") return committee.committee_id @@ -1307,7 +1324,7 @@ Your voting weight: {member.voting_weight} self.board_meetings.append(meeting) if self.verbose: - logger.info(f"Scheduled {meeting_type.value} meeting for {len(attendees)} attendees") + CORPORATE_LOGGER.info(f"Scheduled {meeting_type.value} meeting for {len(attendees)} attendees") return meeting.meeting_id @@ -1331,7 +1348,7 @@ Your voting weight: {member.voting_weight} discussion_topics = meeting.agenda if self.verbose: - logger.info(f"Conducting board meeting: {meeting.meeting_type.value}") + CORPORATE_LOGGER.info(f"Conducting board meeting: {meeting.meeting_type.value}") # Conduct discussions on each topic minutes = [] @@ -1339,7 +1356,7 @@ Your voting weight: {member.voting_weight} for topic in discussion_topics: if self.verbose: - logger.info(f"Discussing topic: {topic}") + CORPORATE_LOGGER.info(f"Discussing topic: {topic}") # Create a proposal for the topic proposal_id = self.create_proposal( @@ -1373,7 +1390,7 @@ Your voting weight: {member.voting_weight} meeting.resolutions = resolutions if self.verbose: - logger.info(f"Board meeting completed with {len(resolutions)} resolutions") + CORPORATE_LOGGER.info(f"Board meeting completed with {len(resolutions)} resolutions") return meeting @@ -1390,7 +1407,7 @@ Your voting weight: {member.voting_weight} committee = self.board_committees[committee_id] if self.verbose: - logger.info(f"Conducting {committee.name} meeting") + CORPORATE_LOGGER.info(f"Conducting {committee.name} meeting") try: # Create a specialized task for committee meeting @@ -1476,7 +1493,7 @@ Your voting weight: {member.voting_weight} except Exception as e: if self.verbose: - logger.warning(f"Committee meeting encountered issue: {e}") + CORPORATE_LOGGER.warning(f"Committee meeting encountered issue: {e}") # Return structured fallback results return { @@ -1490,7 +1507,7 @@ Your voting weight: {member.voting_weight} def evaluate_board_performance(self) -> Dict[str, Any]: """Evaluate board performance and governance effectiveness.""" if self.verbose: - logger.info("Evaluating board performance") + CORPORATE_LOGGER.info("Evaluating board performance") # Calculate governance metrics total_members = len(self.board_members) @@ -1505,9 +1522,17 @@ Your voting weight: {member.voting_weight} independence_ratio = independent_directors / total_members if total_members > 0 else 0 # Calculate meeting frequency (assuming monthly meetings) - months_operating = (time.time() - min([m.term_start for m in self.members.values()])) / (30 * 24 * 60 * 60) - expected_meetings = max(1, int(months_operating)) - meeting_frequency = meetings_held / expected_meetings if expected_meetings > 0 else 0 + if not self.members: + meeting_frequency = 0.0 + else: + try: + # More efficient: use generator expression instead of list comprehension + earliest_term = min(m.term_start for m in self.members.values()) + months_operating = (time.time() - earliest_term) / (30 * 24 * 60 * 60) + expected_meetings = max(1, int(months_operating)) + meeting_frequency = meetings_held / expected_meetings if expected_meetings > 0 else 0 + except (ValueError, ZeroDivisionError): + meeting_frequency = 0.0 # Calculate decision efficiency approved_proposals = len([v for v in self.votes if v.result in [VoteResult.APPROVED, VoteResult.UNANIMOUS]]) @@ -1624,15 +1649,16 @@ Your voting weight: {member.voting_weight} if not self.board_members: return 0.0 - # Simple diversity calculation based on different expertise areas + # More efficient: use set comprehension and generator unique_expertise = set() for member_id in self.board_members: - if member_id in self.members: - unique_expertise.update(self.members[member_id].expertise_areas) + member = self.members.get(member_id) + if member: + unique_expertise.update(member.expertise_areas) - # Normalize to 0-1 scale - max_possible_diversity = 10 # Assume max 10 different expertise areas - diversity_index = min(1.0, len(unique_expertise) / max_possible_diversity) + # Normalize to 0-1 scale with configurable max diversity + MAX_POSSIBLE_DIVERSITY = 10 # Configurable constant + diversity_index = min(1.0, len(unique_expertise) / MAX_POSSIBLE_DIVERSITY) return diversity_index @@ -1641,8 +1667,9 @@ Your voting weight: {member.voting_weight} if not self.stakeholder_engagements: return 75.0 # Default score - avg_satisfaction = sum(s.satisfaction_score for s in self.stakeholder_engagements.values()) / len(self.stakeholder_engagements) - return avg_satisfaction + # More efficient: avoid creating intermediate list + total_satisfaction = sum(s.satisfaction_score for s in self.stakeholder_engagements.values()) + return total_satisfaction / len(self.stakeholder_engagements) def _calculate_carbon_footprint(self) -> float: """Calculate corporate carbon footprint.""" @@ -1853,7 +1880,7 @@ Your voting weight: {member.voting_weight} Dict[str, Any]: Proposal processing results """ if self.verbose: - logger.info("Processing proposal task with democratic voting") + CORPORATE_LOGGER.info("Processing proposal task with democratic voting") try: # Create a proposal from the task @@ -1879,7 +1906,7 @@ Your voting weight: {member.voting_weight} } except Exception as e: - logger.error(f"Error processing proposal task: {str(e)}") + CORPORATE_LOGGER.error(f"Error processing proposal task: {str(e)}") return { "status": "error", "error": str(e), @@ -1889,7 +1916,7 @@ Your voting weight: {member.voting_weight} def _process_meeting_task(self, task: str, **kwargs) -> Dict[str, Any]: """Process meeting-related tasks with board governance.""" if self.verbose: - logger.info("Processing meeting task with board governance") + CORPORATE_LOGGER.info("Processing meeting task with board governance") try: # Schedule and conduct a board meeting @@ -1914,7 +1941,7 @@ Your voting weight: {member.voting_weight} } except Exception as e: - logger.error(f"Error processing meeting task: {str(e)}") + CORPORATE_LOGGER.error(f"Error processing meeting task: {str(e)}") return { "status": "error", "error": str(e), @@ -1933,7 +1960,7 @@ Your voting weight: {member.voting_weight} Dict[str, Any]: Strategic planning results """ if self.verbose: - logger.info("Processing strategic task with comprehensive analysis") + CORPORATE_LOGGER.info("Processing strategic task with comprehensive analysis") try: # Run a corporate session for strategic planning @@ -1952,7 +1979,7 @@ Your voting weight: {member.voting_weight} } except Exception as e: - logger.error(f"Error processing strategic task: {str(e)}") + CORPORATE_LOGGER.error(f"Error processing strategic task: {str(e)}") return { "status": "error", "error": str(e), @@ -1962,14 +1989,14 @@ Your voting weight: {member.voting_weight} def _process_general_task(self, task: str, **kwargs) -> Union[str, Dict[str, Any]]: """Process general corporate tasks using democratic swarm with performance optimization.""" if self.verbose: - logger.info("Processing general task through democratic swarm") + CORPORATE_LOGGER.info("Processing general task through democratic swarm") if self.democratic_swarm is not None: try: result = self.democratic_swarm.run(task) # Handle different result types with robust parsing - parsed_result = self._parse_swarm_result(result) + parsed_result = self._parse_democratic_result(result) return { "status": "completed", @@ -1979,7 +2006,7 @@ Your voting weight: {member.voting_weight} } except Exception as e: - logger.error(f"Democratic swarm encountered issue: {e}") + CORPORATE_LOGGER.error(f"Democratic swarm encountered issue: {e}") return { "status": "error", "error": str(e), @@ -1987,7 +2014,7 @@ Your voting weight: {member.voting_weight} } else: # Fallback to simple task processing - logger.warning("Democratic swarm not available, using fallback processing") + CORPORATE_LOGGER.warning("Democratic swarm not available, using fallback processing") return { "status": "completed", "result": f"Task processed: {task}", @@ -1995,49 +2022,6 @@ Your voting weight: {member.voting_weight} "timestamp": time.time() } - def _parse_swarm_result(self, result: Any) -> Dict[str, Any]: - """Parse swarm result with robust error handling.""" - try: - if isinstance(result, list): - if result and len(result) > 0: - first_item = result[0] - if isinstance(first_item, dict) and 'function' in first_item: - function_data = first_item.get('function', {}) - if 'arguments' in function_data: - try: - args_str = function_data['arguments'] - if isinstance(args_str, str): - parsed_args = json.loads(args_str) - return parsed_args - else: - return args_str - except (json.JSONDecodeError, TypeError): - return function_data.get('arguments', {}) - else: - return function_data - else: - return { - 'result': 'processed', - 'data': result, - 'type': 'list_response' - } - else: - return {'result': 'empty_response', 'type': 'list_response'} - elif isinstance(result, dict): - return result - else: - return { - 'result': 'processed', - 'data': str(result), - 'type': 'string_response' - } - except Exception as e: - logger.warning(f"Error parsing swarm result: {e}") - return { - 'result': 'parse_error', - 'error': str(e), - 'raw_data': str(result) - } def _analyze_vote_results( self, @@ -2087,7 +2071,7 @@ Your voting weight: {member.voting_weight} agenda_items = ["Strategic planning", "Budget review", "Operational updates"] if self.verbose: - logger.info(f"Starting corporate session: {session_type}") + CORPORATE_LOGGER.info(f"Starting corporate session: {session_type}") session_results = { "session_type": session_type, @@ -2100,7 +2084,7 @@ Your voting weight: {member.voting_weight} # Process each agenda item for item in agenda_items: if self.verbose: - logger.info(f"Processing agenda item: {item}") + CORPORATE_LOGGER.info(f"Processing agenda item: {item}") # Create a proposal for the agenda item proposal_id = self.create_proposal( @@ -2122,7 +2106,7 @@ Your voting weight: {member.voting_weight} }) if self.verbose: - logger.info(f"Corporate session completed: {len(session_results['decisions'])} decisions made") + CORPORATE_LOGGER.info(f"Corporate session completed: {len(session_results['decisions'])} decisions made") return session_results