Update corporate_swarm.py

pull/1085/head
CI-DEV 2 weeks ago committed by GitHub
parent 605dba1cec
commit a3a6e95308
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -15,6 +15,7 @@ import os
import time
import traceback
import uuid
import yaml
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
@ -177,7 +178,7 @@ class CorporateConfig:
self._load_from_dict(self.config_data)
except Exception as e:
CORPORATE_LOGGER.error(f"Configuration loading failed: {e}")
raise
raise ValueError(f"Configuration loading failed: {e}") from e
def _load_from_file(self) -> None:
"""Load configuration from YAML file."""
@ -188,7 +189,7 @@ class CorporateConfig:
CORPORATE_LOGGER.info(f"Loaded config from: {self.config_file_path}")
except Exception as e:
CORPORATE_LOGGER.warning(f"File loading failed {self.config_file_path}: {e}")
raise
raise ValueError(f"Configuration file loading failed: {e}") from e
def _load_from_dict(self, config_dict: Dict[str, Any]) -> None:
"""Load configuration from dictionary with validation."""
@ -198,7 +199,7 @@ class CorporateConfig:
setattr(self.config, key, value)
except (ValueError, TypeError) as e:
CORPORATE_LOGGER.warning(f"Config {key} failed: {e}")
raise ValueError(f"Invalid configuration value for {key}: {e}")
raise ValueError(f"Invalid configuration value for {key}: {e}") from e
def get_config(self) -> CorporateConfigModel:
return self.config
@ -208,7 +209,7 @@ class CorporateConfig:
self._load_from_dict(updates)
except ValueError as e:
CORPORATE_LOGGER.error(f"Config update failed: {e}")
raise
raise ValueError(f"Configuration update failed: {e}") from e
def validate_config(self) -> List[str]:
"""Validate configuration and return error list."""
@ -242,21 +243,29 @@ def get_corporate_config(config_file_path: Optional[str] = None) -> CorporateCon
# CORPORATE SWARM DATA MODELS
# ============================================================================
def _generate_uuid() -> str:
"""Generate a new UUID string."""
return str(uuid.uuid4())
def _get_audit_date() -> float:
"""Get next audit date (1 year from now)."""
return time.time() + (365 * 24 * 60 * 60)
class BaseCorporateAgent(ABC):
"""Base class for corporate agents."""
@abstractmethod
def run(self, task: str, **kwargs) -> Any:
def run(self, task: str, **kwargs) -> Union[str, Dict[str, Any]]:
"""Synchronous execution method."""
pass
@abstractmethod
async def arun(self, task: str, **kwargs) -> Any:
async def arun(self, task: str, **kwargs) -> Union[str, Dict[str, Any]]:
"""Asynchronous execution method."""
pass
def __call__(self, task: str, **kwargs) -> Any:
def __call__(self, task: str, **kwargs) -> Union[str, Dict[str, Any]]:
"""Callable interface."""
return self.run(task, **kwargs)
@ -315,7 +324,7 @@ class MeetingType(str, Enum):
@dataclass
class BoardCommittee:
"""Board committee with governance responsibilities."""
committee_id: str = field(default_factory=lambda: str(uuid.uuid4()))
committee_id: str = field(default_factory=_generate_uuid)
name: str = ""
committee_type: BoardCommitteeType = BoardCommitteeType.GOVERNANCE
chair: str = ""
@ -329,7 +338,7 @@ class BoardCommittee:
@dataclass
class BoardMeeting:
"""Board meeting with agenda and minutes."""
meeting_id: str = field(default_factory=lambda: str(uuid.uuid4()))
meeting_id: str = field(default_factory=_generate_uuid)
meeting_type: MeetingType = MeetingType.REGULAR_BOARD
date: float = field(default_factory=time.time)
location: str = ""
@ -344,7 +353,7 @@ class BoardMeeting:
class CorporateMember(BaseModel):
"""Corporate stakeholder with role, expertise, and governance responsibilities."""
member_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
member_id: str = Field(default_factory=_generate_uuid)
name: str = Field(default="")
role: CorporateRole = Field(default=CorporateRole.EMPLOYEE)
department: DepartmentType = Field(default=DepartmentType.OPERATIONS)
@ -355,14 +364,14 @@ class CorporateMember(BaseModel):
term_start: float = Field(default_factory=time.time)
term_end: Optional[float] = Field(default=None)
compensation: Dict[str, Any] = Field(default_factory=dict)
agent: Optional[Agent] = Field(default=None)
agent: Optional[Agent] = Field(default=None, exclude=True)
metadata: Dict[str, Any] = Field(default_factory=dict)
class CorporateProposal(BaseModel):
"""Corporate proposal requiring decision-making with financial impact."""
proposal_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
proposal_id: str = Field(default_factory=_generate_uuid)
title: str = Field(default="")
description: str = Field(default="")
proposal_type: ProposalType = Field(default=ProposalType.STRATEGIC_INITIATIVE)
@ -377,7 +386,7 @@ class CorporateProposal(BaseModel):
@dataclass
class CorporateDepartment:
"""Corporate department with specific functions and budget."""
department_id: str = field(default_factory=lambda: str(uuid.uuid4()))
department_id: str = field(default_factory=_generate_uuid)
name: str = ""
department_type: DepartmentType = DepartmentType.OPERATIONS
head: str = ""
@ -391,7 +400,7 @@ class CorporateDepartment:
class CorporateVote(BaseModel):
"""Corporate voting session with individual votes and analysis."""
vote_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
vote_id: str = Field(default_factory=_generate_uuid)
proposal: CorporateProposal = Field(default_factory=CorporateProposal)
participants: List[str] = Field(default_factory=list)
individual_votes: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
@ -418,7 +427,7 @@ class ESGScore(BaseModel):
class RiskAssessment(BaseModel):
"""Comprehensive risk assessment model for corporate governance."""
risk_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
risk_id: str = Field(default_factory=_generate_uuid)
risk_category: str = Field(default="operational")
risk_level: str = Field(default="medium")
probability: float = Field(default=0.5, ge=0.0, le=1.0)
@ -433,7 +442,7 @@ class RiskAssessment(BaseModel):
class StakeholderEngagement(BaseModel):
"""Stakeholder engagement and management model."""
stakeholder_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
stakeholder_id: str = Field(default_factory=_generate_uuid)
stakeholder_type: str = Field(default="investor")
name: str = Field(default="")
influence_level: str = Field(default="medium")
@ -447,7 +456,7 @@ class StakeholderEngagement(BaseModel):
class ComplianceFramework(BaseModel):
"""Regulatory compliance and audit framework model."""
compliance_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
compliance_id: str = Field(default_factory=_generate_uuid)
regulation_name: str = Field(default="")
regulation_type: str = Field(default="financial")
compliance_status: str = Field(default="compliant")
@ -455,7 +464,7 @@ class ComplianceFramework(BaseModel):
requirements: List[str] = Field(default_factory=list)
controls: List[str] = Field(default_factory=list)
audit_findings: List[str] = Field(default_factory=list)
next_audit_date: float = Field(default_factory=lambda: time.time() + (365 * 24 * 60 * 60))
next_audit_date: float = Field(default_factory=_get_audit_date)
responsible_officer: str = Field(default="")
@ -554,7 +563,7 @@ class CorporateSwarm(BaseCorporateAgent):
except Exception as e:
CORPORATE_LOGGER.error(f"Failed to initialize CorporateSwarm: {str(e)}")
raise
raise RuntimeError(f"Failed to initialize CorporateSwarm: {str(e)}") from e
def _perform_reliability_checks(self) -> None:
"""Validate critical requirements and configuration parameters."""
@ -589,7 +598,7 @@ class CorporateSwarm(BaseCorporateAgent):
except Exception as e:
CORPORATE_LOGGER.error(f"Failed reliability checks: {str(e)}")
raise
raise ValueError(f"Reliability checks failed: {str(e)}") from e
@classmethod
def create_simple_corporation(
@ -603,10 +612,8 @@ class CorporateSwarm(BaseCorporateAgent):
return cls(
name=name,
max_loops=1,
enable_democratic_discussion=True,
enable_departmental_work=True,
enable_financial_oversight=True,
verbose=verbose
verbose=verbose,
corporate_model_name="gpt-4o-mini"
)
def run(
@ -708,7 +715,7 @@ class CorporateSwarm(BaseCorporateAgent):
def _initialize_default_structure(self) -> None:
"""Initialize default corporate structure with key positions."""
if self.verbose:
logger.info("Initializing default corporate structure")
CORPORATE_LOGGER.info("Initializing default corporate structure")
# Create executive team
executives = [
@ -766,7 +773,7 @@ class CorporateSwarm(BaseCorporateAgent):
self._create_board_committees()
if self.verbose:
logger.info(f"Created {len(self.members)} members, {len(self.departments)} departments, {len(self.board_committees)} committees")
CORPORATE_LOGGER.info(f"Created {len(self.members)} members, {len(self.departments)} departments, {len(self.board_committees)} committees")
def _create_departments(self) -> None:
"""Create corporate departments with heads and objectives."""
@ -871,7 +878,7 @@ class CorporateSwarm(BaseCorporateAgent):
def _initialize_democratic_swarm(self) -> None:
"""Initialize democratic decision-making swarm."""
if self.verbose:
logger.info("Initializing democratic decision-making swarm")
CORPORATE_LOGGER.info("Initializing democratic decision-making swarm")
# Create specialized swarms for different corporate functions
governance_swarm = SwarmRouter(
@ -951,7 +958,7 @@ class CorporateSwarm(BaseCorporateAgent):
self.executive_team.append(member.member_id)
if self.verbose:
logger.info(f"Added member: {name} as {role.value} in {department.value}")
CORPORATE_LOGGER.info(f"Added member: {name} as {role.value} in {department.value}")
return member.member_id
@ -1021,7 +1028,7 @@ When participating in corporate decisions, always provide:
self.proposals.append(proposal)
if self.verbose:
logger.info(f"Created proposal: {title} by {self.members[sponsor_id].name}")
CORPORATE_LOGGER.info(f"Created proposal: {title} by {self.members[sponsor_id].name}")
return proposal.proposal_id
@ -1031,31 +1038,60 @@ When participating in corporate decisions, always provide:
participants: List[str] = None
) -> CorporateVote:
"""Conduct a democratic vote on a corporate proposal."""
# Find the proposal
proposal = None
for p in self.proposals:
if p.proposal_id == proposal_id:
proposal = p
break
proposal = self._find_proposal(proposal_id)
participants = participants or self._get_default_participants()
if not proposal:
raise ValueError(f"Proposal {proposal_id} not found")
if not self.cost_tracker.check_budget():
return self._create_failed_vote(proposal)
if not participants:
# Default to board members and executive team
participants = self.board_members + self.executive_team
democratic_result = self._get_democratic_decision(proposal)
individual_votes = self._collect_individual_votes(proposal, participants)
vote_result = self._analyze_vote_results(individual_votes, proposal)
# Check budget before starting
if not self.cost_tracker.check_budget():
return CorporateVote(
proposal=proposal,
result=VoteResult.FAILED
)
vote = self._create_vote_record(proposal, participants, individual_votes, democratic_result, vote_result)
self.votes.append(vote)
# Use democratic swarm for decision-making if available
democratic_result = None
if self.democratic_swarm is not None:
decision_task = f"""
if self.verbose:
CORPORATE_LOGGER.info(f"Vote completed: {proposal.title} - Result: {vote_result.value}")
return vote
def _find_proposal(self, proposal_id: str) -> CorporateProposal:
"""Find a proposal by ID."""
for proposal in self.proposals:
if proposal.proposal_id == proposal_id:
return proposal
raise ValueError(f"Proposal {proposal_id} not found")
def _get_default_participants(self) -> List[str]:
"""Get default voting participants."""
return self.board_members + self.executive_team
def _create_failed_vote(self, proposal: CorporateProposal) -> CorporateVote:
"""Create a failed vote due to budget constraints."""
return CorporateVote(
proposal=proposal,
result=VoteResult.FAILED
)
def _get_democratic_decision(self, proposal: CorporateProposal) -> Optional[Dict[str, Any]]:
"""Get democratic decision from swarm if available."""
if self.democratic_swarm is None:
return None
decision_task = self._create_decision_task(proposal)
try:
democratic_result = self.democratic_swarm.run(decision_task)
return self._parse_democratic_result(democratic_result)
except Exception as e:
if self.verbose:
CORPORATE_LOGGER.warning(f"Democratic swarm encountered issue: {e}")
return {'result': 'error', 'error': str(e), 'type': 'error_response'}
def _create_decision_task(self, proposal: CorporateProposal) -> str:
"""Create decision task for democratic swarm."""
return f"""
Corporate Proposal Vote: {proposal.title}
Proposal Description: {proposal.description}
@ -1074,54 +1110,34 @@ As a corporate decision-making body, please:
This is a critical corporate decision that will impact the organization's future direction.
"""
# Get democratic decision
try:
democratic_result = self.democratic_swarm.run(decision_task)
# Handle case where democratic_swarm returns a list instead of dict
if isinstance(democratic_result, list):
# Extract function call arguments if available
if democratic_result and len(democratic_result) > 0:
first_item = democratic_result[0]
if isinstance(first_item, dict) and 'function' in first_item:
function_data = first_item.get('function', {})
if 'arguments' in function_data:
# Try to parse the arguments as JSON
try:
import json
args_str = function_data['arguments']
if isinstance(args_str, str):
parsed_args = json.loads(args_str)
democratic_result = parsed_args
else:
democratic_result = args_str
except (json.JSONDecodeError, TypeError):
# If parsing fails, use the raw arguments
democratic_result = function_data.get('arguments', {})
else:
democratic_result = function_data
else:
# If it's not a function call, convert to a simple dict
democratic_result = {
'result': 'processed',
'data': democratic_result,
'type': 'list_response'
}
else:
democratic_result = {'result': 'empty_response', 'type': 'list_response'}
except Exception as e:
if self.verbose:
logger.warning(f"Democratic swarm encountered issue: {e}")
democratic_result = {'result': 'error', 'error': str(e), 'type': 'error_response'}
def _parse_democratic_result(self, result: Any) -> Dict[str, Any]:
"""Parse democratic swarm result with robust error handling."""
if not isinstance(result, list) or not result:
return {'result': 'empty_response', 'type': 'list_response'}
first_item = result[0]
if not isinstance(first_item, dict) or 'function' not in first_item:
return {'result': 'processed', 'data': result, 'type': 'list_response'}
# Conduct individual member votes
function_data = first_item.get('function', {})
if 'arguments' in function_data:
try:
args_str = function_data['arguments']
if isinstance(args_str, str):
parsed_args = json.loads(args_str)
return parsed_args
else:
return args_str
except (json.JSONDecodeError, TypeError):
return function_data.get('arguments', {})
else:
return function_data
def _collect_individual_votes(self, proposal: CorporateProposal, participants: List[str]) -> Dict[str, Dict[str, Any]]:
"""Collect individual votes from participants."""
individual_votes = {}
reasoning = {}
total_processed = 0
# Process participants in batches
for i in range(0, len(participants), self.batch_size):
batch = participants[i:i + self.batch_size]
@ -1133,8 +1149,40 @@ This is a critical corporate decision that will impact the organization's future
if not member.agent:
continue
# Create voting prompt
vote_prompt = f"""
vote_data = self._get_member_vote(member, proposal)
if vote_data:
individual_votes[member_id] = vote_data
return individual_votes
def _get_member_vote(self, member: CorporateMember, proposal: CorporateProposal) -> Optional[Dict[str, Any]]:
"""Get vote from a single member."""
vote_prompt = self._create_vote_prompt(member, proposal)
try:
response = member.agent.run(vote_prompt)
if not isinstance(response, str):
response = str(response)
return {
"vote": "APPROVE" if "approve" in response.lower() else
"REJECT" if "reject" in response.lower() else "ABSTAIN",
"reasoning": response,
"member_id": member.member_id,
"member_name": member.name,
"role": member.role.value,
"department": member.department.value,
"voting_weight": member.voting_weight
}
except Exception as e:
if self.verbose:
CORPORATE_LOGGER.error(f"Error getting vote from {member.name}: {e}")
return None
def _create_vote_prompt(self, member: CorporateMember, proposal: CorporateProposal) -> str:
"""Create voting prompt for a member."""
return f"""
Corporate Proposal Vote: {proposal.title}
Proposal Details:
@ -1156,41 +1204,17 @@ Please provide your vote and reasoning:
Consider your expertise in: {', '.join(member.expertise_areas)}
Your voting weight: {member.voting_weight}
"""
try:
# Get member's vote
response = member.agent.run(vote_prompt)
# Ensure response is a string
if not isinstance(response, str):
response = str(response)
# Parse response (simplified parsing)
vote_data = {
"vote": "APPROVE" if "approve" in response.lower() else
"REJECT" if "reject" in response.lower() else "ABSTAIN",
"reasoning": response,
"member_id": member_id,
"member_name": member.name,
"role": member.role.value,
"department": member.department.value,
"voting_weight": member.voting_weight
}
individual_votes[member_id] = vote_data
reasoning[member_id] = response
total_processed += 1
except Exception as e:
if self.verbose:
logger.error(f"Error getting vote from {member.name}: {e}")
continue
# Analyze results
vote_result = self._analyze_vote_results(individual_votes, proposal)
# Create vote record
vote = CorporateVote(
def _create_vote_record(
self,
proposal: CorporateProposal,
participants: List[str],
individual_votes: Dict[str, Dict[str, Any]],
democratic_result: Optional[Dict[str, Any]],
vote_result: VoteResult
) -> CorporateVote:
"""Create vote record with all collected data."""
return CorporateVote(
proposal=proposal,
participants=participants,
individual_votes=individual_votes,
@ -1198,17 +1222,10 @@ Your voting weight: {member.voting_weight}
result=vote_result,
metadata={
"total_participants": len(participants),
"total_processed": total_processed,
"total_processed": len(individual_votes),
"processing_time": time.time()
}
)
self.votes.append(vote)
if self.verbose:
logger.info(f"Vote completed: {proposal.title} - Result: {vote_result.value}")
return vote
def create_board_committee(
self,
@ -1264,7 +1281,7 @@ Your voting weight: {member.voting_weight}
self.members[member_id].board_committees.append(committee.committee_id)
if self.verbose:
logger.info(f"Created board committee: {name} with {len(members)} members")
CORPORATE_LOGGER.info(f"Created board committee: {name} with {len(members)} members")
return committee.committee_id
@ -1307,7 +1324,7 @@ Your voting weight: {member.voting_weight}
self.board_meetings.append(meeting)
if self.verbose:
logger.info(f"Scheduled {meeting_type.value} meeting for {len(attendees)} attendees")
CORPORATE_LOGGER.info(f"Scheduled {meeting_type.value} meeting for {len(attendees)} attendees")
return meeting.meeting_id
@ -1331,7 +1348,7 @@ Your voting weight: {member.voting_weight}
discussion_topics = meeting.agenda
if self.verbose:
logger.info(f"Conducting board meeting: {meeting.meeting_type.value}")
CORPORATE_LOGGER.info(f"Conducting board meeting: {meeting.meeting_type.value}")
# Conduct discussions on each topic
minutes = []
@ -1339,7 +1356,7 @@ Your voting weight: {member.voting_weight}
for topic in discussion_topics:
if self.verbose:
logger.info(f"Discussing topic: {topic}")
CORPORATE_LOGGER.info(f"Discussing topic: {topic}")
# Create a proposal for the topic
proposal_id = self.create_proposal(
@ -1373,7 +1390,7 @@ Your voting weight: {member.voting_weight}
meeting.resolutions = resolutions
if self.verbose:
logger.info(f"Board meeting completed with {len(resolutions)} resolutions")
CORPORATE_LOGGER.info(f"Board meeting completed with {len(resolutions)} resolutions")
return meeting
@ -1390,7 +1407,7 @@ Your voting weight: {member.voting_weight}
committee = self.board_committees[committee_id]
if self.verbose:
logger.info(f"Conducting {committee.name} meeting")
CORPORATE_LOGGER.info(f"Conducting {committee.name} meeting")
try:
# Create a specialized task for committee meeting
@ -1476,7 +1493,7 @@ Your voting weight: {member.voting_weight}
except Exception as e:
if self.verbose:
logger.warning(f"Committee meeting encountered issue: {e}")
CORPORATE_LOGGER.warning(f"Committee meeting encountered issue: {e}")
# Return structured fallback results
return {
@ -1490,7 +1507,7 @@ Your voting weight: {member.voting_weight}
def evaluate_board_performance(self) -> Dict[str, Any]:
"""Evaluate board performance and governance effectiveness."""
if self.verbose:
logger.info("Evaluating board performance")
CORPORATE_LOGGER.info("Evaluating board performance")
# Calculate governance metrics
total_members = len(self.board_members)
@ -1505,9 +1522,17 @@ Your voting weight: {member.voting_weight}
independence_ratio = independent_directors / total_members if total_members > 0 else 0
# Calculate meeting frequency (assuming monthly meetings)
months_operating = (time.time() - min([m.term_start for m in self.members.values()])) / (30 * 24 * 60 * 60)
expected_meetings = max(1, int(months_operating))
meeting_frequency = meetings_held / expected_meetings if expected_meetings > 0 else 0
if not self.members:
meeting_frequency = 0.0
else:
try:
# More efficient: use generator expression instead of list comprehension
earliest_term = min(m.term_start for m in self.members.values())
months_operating = (time.time() - earliest_term) / (30 * 24 * 60 * 60)
expected_meetings = max(1, int(months_operating))
meeting_frequency = meetings_held / expected_meetings if expected_meetings > 0 else 0
except (ValueError, ZeroDivisionError):
meeting_frequency = 0.0
# Calculate decision efficiency
approved_proposals = len([v for v in self.votes if v.result in [VoteResult.APPROVED, VoteResult.UNANIMOUS]])
@ -1624,15 +1649,16 @@ Your voting weight: {member.voting_weight}
if not self.board_members:
return 0.0
# Simple diversity calculation based on different expertise areas
# More efficient: use set comprehension and generator
unique_expertise = set()
for member_id in self.board_members:
if member_id in self.members:
unique_expertise.update(self.members[member_id].expertise_areas)
member = self.members.get(member_id)
if member:
unique_expertise.update(member.expertise_areas)
# Normalize to 0-1 scale
max_possible_diversity = 10 # Assume max 10 different expertise areas
diversity_index = min(1.0, len(unique_expertise) / max_possible_diversity)
# Normalize to 0-1 scale with configurable max diversity
MAX_POSSIBLE_DIVERSITY = 10 # Configurable constant
diversity_index = min(1.0, len(unique_expertise) / MAX_POSSIBLE_DIVERSITY)
return diversity_index
@ -1641,8 +1667,9 @@ Your voting weight: {member.voting_weight}
if not self.stakeholder_engagements:
return 75.0 # Default score
avg_satisfaction = sum(s.satisfaction_score for s in self.stakeholder_engagements.values()) / len(self.stakeholder_engagements)
return avg_satisfaction
# More efficient: avoid creating intermediate list
total_satisfaction = sum(s.satisfaction_score for s in self.stakeholder_engagements.values())
return total_satisfaction / len(self.stakeholder_engagements)
def _calculate_carbon_footprint(self) -> float:
"""Calculate corporate carbon footprint."""
@ -1853,7 +1880,7 @@ Your voting weight: {member.voting_weight}
Dict[str, Any]: Proposal processing results
"""
if self.verbose:
logger.info("Processing proposal task with democratic voting")
CORPORATE_LOGGER.info("Processing proposal task with democratic voting")
try:
# Create a proposal from the task
@ -1879,7 +1906,7 @@ Your voting weight: {member.voting_weight}
}
except Exception as e:
logger.error(f"Error processing proposal task: {str(e)}")
CORPORATE_LOGGER.error(f"Error processing proposal task: {str(e)}")
return {
"status": "error",
"error": str(e),
@ -1889,7 +1916,7 @@ Your voting weight: {member.voting_weight}
def _process_meeting_task(self, task: str, **kwargs) -> Dict[str, Any]:
"""Process meeting-related tasks with board governance."""
if self.verbose:
logger.info("Processing meeting task with board governance")
CORPORATE_LOGGER.info("Processing meeting task with board governance")
try:
# Schedule and conduct a board meeting
@ -1914,7 +1941,7 @@ Your voting weight: {member.voting_weight}
}
except Exception as e:
logger.error(f"Error processing meeting task: {str(e)}")
CORPORATE_LOGGER.error(f"Error processing meeting task: {str(e)}")
return {
"status": "error",
"error": str(e),
@ -1933,7 +1960,7 @@ Your voting weight: {member.voting_weight}
Dict[str, Any]: Strategic planning results
"""
if self.verbose:
logger.info("Processing strategic task with comprehensive analysis")
CORPORATE_LOGGER.info("Processing strategic task with comprehensive analysis")
try:
# Run a corporate session for strategic planning
@ -1952,7 +1979,7 @@ Your voting weight: {member.voting_weight}
}
except Exception as e:
logger.error(f"Error processing strategic task: {str(e)}")
CORPORATE_LOGGER.error(f"Error processing strategic task: {str(e)}")
return {
"status": "error",
"error": str(e),
@ -1962,14 +1989,14 @@ Your voting weight: {member.voting_weight}
def _process_general_task(self, task: str, **kwargs) -> Union[str, Dict[str, Any]]:
"""Process general corporate tasks using democratic swarm with performance optimization."""
if self.verbose:
logger.info("Processing general task through democratic swarm")
CORPORATE_LOGGER.info("Processing general task through democratic swarm")
if self.democratic_swarm is not None:
try:
result = self.democratic_swarm.run(task)
# Handle different result types with robust parsing
parsed_result = self._parse_swarm_result(result)
parsed_result = self._parse_democratic_result(result)
return {
"status": "completed",
@ -1979,7 +2006,7 @@ Your voting weight: {member.voting_weight}
}
except Exception as e:
logger.error(f"Democratic swarm encountered issue: {e}")
CORPORATE_LOGGER.error(f"Democratic swarm encountered issue: {e}")
return {
"status": "error",
"error": str(e),
@ -1987,7 +2014,7 @@ Your voting weight: {member.voting_weight}
}
else:
# Fallback to simple task processing
logger.warning("Democratic swarm not available, using fallback processing")
CORPORATE_LOGGER.warning("Democratic swarm not available, using fallback processing")
return {
"status": "completed",
"result": f"Task processed: {task}",
@ -1995,49 +2022,6 @@ Your voting weight: {member.voting_weight}
"timestamp": time.time()
}
def _parse_swarm_result(self, result: Any) -> Dict[str, Any]:
"""Parse swarm result with robust error handling."""
try:
if isinstance(result, list):
if result and len(result) > 0:
first_item = result[0]
if isinstance(first_item, dict) and 'function' in first_item:
function_data = first_item.get('function', {})
if 'arguments' in function_data:
try:
args_str = function_data['arguments']
if isinstance(args_str, str):
parsed_args = json.loads(args_str)
return parsed_args
else:
return args_str
except (json.JSONDecodeError, TypeError):
return function_data.get('arguments', {})
else:
return function_data
else:
return {
'result': 'processed',
'data': result,
'type': 'list_response'
}
else:
return {'result': 'empty_response', 'type': 'list_response'}
elif isinstance(result, dict):
return result
else:
return {
'result': 'processed',
'data': str(result),
'type': 'string_response'
}
except Exception as e:
logger.warning(f"Error parsing swarm result: {e}")
return {
'result': 'parse_error',
'error': str(e),
'raw_data': str(result)
}
def _analyze_vote_results(
self,
@ -2087,7 +2071,7 @@ Your voting weight: {member.voting_weight}
agenda_items = ["Strategic planning", "Budget review", "Operational updates"]
if self.verbose:
logger.info(f"Starting corporate session: {session_type}")
CORPORATE_LOGGER.info(f"Starting corporate session: {session_type}")
session_results = {
"session_type": session_type,
@ -2100,7 +2084,7 @@ Your voting weight: {member.voting_weight}
# Process each agenda item
for item in agenda_items:
if self.verbose:
logger.info(f"Processing agenda item: {item}")
CORPORATE_LOGGER.info(f"Processing agenda item: {item}")
# Create a proposal for the agenda item
proposal_id = self.create_proposal(
@ -2122,7 +2106,7 @@ Your voting weight: {member.voting_weight}
})
if self.verbose:
logger.info(f"Corporate session completed: {len(session_results['decisions'])} decisions made")
CORPORATE_LOGGER.info(f"Corporate session completed: {len(session_results['decisions'])} decisions made")
return session_results

Loading…
Cancel
Save