You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/new/auto_corp.py

644 lines
23 KiB

"""
CEO -> Finds department leader
Department leader -> Finds employees
Employees -> Do the work
Todo
- Create schemas that enable the ceo to find the department leader or leaders
- CEO then distributes orders to department leaders or just one leader
- Department leader then distributes orders to employees
- Employees can choose to do the work or delegate to another employee or work together
- When the employees are done, they report back to the department leader
- Department leader then reports back to the ceo
- CEO then reports back to the user
Logic
- dynamically setup conversations for each department -- Feed context to each agent in the department
- Feed context to each agent in the department
"""
from typing import Callable, List, Union
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import list_all_agents
from swarms.utils.str_to_dict import str_to_dict
from swarms.utils.any_to_str import any_to_str
class Department(BaseModel):
name: str = Field(description="The name of the department")
description: str = Field(
description="A description of the department"
)
employees: List[Union[Agent, Callable]] = Field(
description="A list of employees in the department"
)
leader_name: str = Field(
description="The name of the leader of the department"
)
class Config:
arbitrary_types_allowed = True
CEO_SCHEMA = {
"name": "delegate_task_to_department",
"description": "CEO function to analyze and delegate tasks to appropriate department leaders",
"parameters": {
"type": "object",
"properties": {
"thought": {
"type": "string",
"description": "Reasoning about the task, its requirements, and potential approaches",
},
"plan": {
"type": "string",
"description": "Structured plan for how to accomplish the task across departments",
},
"tasks": {
"type": "object",
"properties": {
"task_description": {"type": "string"},
"selected_departments": {
"type": "array",
"items": {"type": "string"},
"description": "List of department names that should handle this task",
},
"selected_leaders": {
"type": "array",
"items": {"type": "string"},
"description": "List of department leaders to assign the task to",
},
"success_criteria": {"type": "string"},
},
"required": [
"task_description",
"selected_departments",
"selected_leaders",
],
},
},
"required": ["thought", "plan", "tasks"],
},
}
DEPARTMENT_LEADER_SCHEMA = {
"name": "manage_department_task",
"description": "Department leader function to break down and assign tasks to employees",
"parameters": {
"type": "object",
"properties": {
"task_management": {
"type": "object",
"properties": {
"original_task": {"type": "string"},
"subtasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"description": {"type": "string"},
"assigned_employees": {
"type": "array",
"items": {"type": "string"},
},
"estimated_duration": {
"type": "string"
},
"dependencies": {
"type": "array",
"items": {"type": "string"},
},
},
},
},
"progress_tracking": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": [
"not_started",
"in_progress",
"completed",
],
},
"completion_percentage": {
"type": "number"
},
"blockers": {
"type": "array",
"items": {"type": "string"},
},
},
},
},
"required": ["original_task", "subtasks"],
}
},
"required": ["task_management"],
},
}
EMPLOYEE_SCHEMA = {
"name": "handle_assigned_task",
"description": "Employee function to process and execute assigned tasks",
"parameters": {
"type": "object",
"properties": {
"thought": {
"type": "string",
"description": "Reasoning about the task, its requirements, and potential approaches",
},
"plan": {
"type": "string",
"description": "Structured plan for how to accomplish the task across departments",
},
"task_execution": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"action_taken": {
"type": "string",
"enum": [
"execute",
"delegate",
"collaborate",
],
},
"execution_details": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": [
"in_progress",
"completed",
"blocked",
],
},
"work_log": {"type": "string"},
"collaboration_partners": {
"type": "array",
"items": {"type": "string"},
},
"delegate_to": {"type": "string"},
"results": {"type": "string"},
"issues_encountered": {
"type": "array",
"items": {"type": "string"},
},
},
},
},
"required": [
"thought",
"plan",
"subtask_id",
"action_taken",
"execution_details",
],
},
},
"required": ["task_execution"],
},
}
# Status report schemas for the feedback loop
EMPLOYEE_REPORT_SCHEMA = {
"name": "submit_task_report",
"description": "Employee function to report task completion status to department leader",
"parameters": {
"type": "object",
"properties": {
"task_report": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"completion_status": {
"type": "string",
"enum": ["completed", "partial", "blocked"],
},
"work_summary": {"type": "string"},
"time_spent": {"type": "string"},
"challenges": {
"type": "array",
"items": {"type": "string"},
},
"next_steps": {"type": "string"},
},
"required": [
"subtask_id",
"completion_status",
"work_summary",
],
}
},
"required": ["task_report"],
},
}
DEPARTMENT_REPORT_SCHEMA = {
"name": "submit_department_report",
"description": "Department leader function to report department progress to CEO",
"parameters": {
"type": "object",
"properties": {
"department_report": {
"type": "object",
"properties": {
"department_name": {"type": "string"},
"task_summary": {"type": "string"},
"overall_status": {
"type": "string",
"enum": ["on_track", "at_risk", "completed"],
},
"completion_percentage": {"type": "number"},
"key_achievements": {
"type": "array",
"items": {"type": "string"},
},
"blockers": {
"type": "array",
"items": {"type": "string"},
},
"resource_needs": {
"type": "array",
"items": {"type": "string"},
},
"next_milestones": {
"type": "array",
"items": {"type": "string"},
},
},
"required": [
"department_name",
"task_summary",
"overall_status",
],
}
},
"required": ["department_report"],
},
}
CEO_FINAL_REPORT_SCHEMA = {
"name": "generate_final_report",
"description": "CEO function to compile final report for the user",
"parameters": {
"type": "object",
"properties": {
"final_report": {
"type": "object",
"properties": {
"task_overview": {"type": "string"},
"overall_status": {
"type": "string",
"enum": ["successful", "partial", "failed"],
},
"department_summaries": {
"type": "array",
"items": {
"type": "object",
"properties": {
"department": {"type": "string"},
"contribution": {"type": "string"},
"performance": {"type": "string"},
},
},
},
"final_results": {"type": "string"},
"recommendations": {
"type": "array",
"items": {"type": "string"},
},
"next_steps": {"type": "string"},
},
"required": [
"task_overview",
"overall_status",
"final_results",
],
}
},
"required": ["final_report"],
},
}
# # Example output schemas
# CEO_EXAMPLE_OUTPUT = {
# "thought": "This task requires coordination between the engineering and design departments to create a new feature. The engineering team will handle the backend implementation while design focuses on the user interface.",
# "plan": "1. Assign backend development to engineering department\n2. Assign UI/UX design to design department\n3. Set up regular sync meetings between departments\n4. Establish clear success criteria",
# "tasks": {
# "task_description": "Develop a new user authentication system with social login integration",
# "selected_departments": ["engineering", "design"],
# "selected_leaders": ["engineering_lead", "design_lead"],
# "success_criteria": "1. Social login working with 3 major providers\n2. UI/UX approved by design team\n3. Security audit passed\n4. Performance metrics met"
# }
# }
# DEPARTMENT_LEADER_EXAMPLE_OUTPUT = {
# "task_management": {
# "original_task": "Develop a new user authentication system with social login integration",
# "subtasks": [
# {
# "subtask_id": "ENG-001",
# "description": "Implement OAuth2 integration for Google",
# "assigned_employees": ["dev1", "dev2"],
# "estimated_duration": "3 days",
# "dependencies": ["DES-001"]
# },
# {
# "subtask_id": "ENG-002",
# "description": "Implement OAuth2 integration for Facebook",
# "assigned_employees": ["dev3"],
# "estimated_duration": "2 days",
# "dependencies": ["DES-001"]
# }
# ],
# "progress_tracking": {
# "status": "in_progress",
# "completion_percentage": 0.3,
# "blockers": ["Waiting for design team to provide UI mockups"]
# }
# }
# }
# EMPLOYEE_EXAMPLE_OUTPUT = {
# "thought": "The Google OAuth2 integration requires careful handling of token management and user data synchronization",
# "plan": "1. Set up Google OAuth2 credentials\n2. Implement token refresh mechanism\n3. Create user data sync pipeline\n4. Add error handling and logging",
# "task_execution": {
# "subtask_id": "ENG-001",
# "action_taken": "execute",
# "execution_details": {
# "status": "in_progress",
# "work_log": "Completed OAuth2 credential setup and initial token handling implementation",
# "collaboration_partners": ["dev2"],
# "delegate_to": None,
# "results": "Successfully implemented basic OAuth2 flow",
# "issues_encountered": ["Need to handle token refresh edge cases"]
# }
# }
# }
# EMPLOYEE_REPORT_EXAMPLE = {
# "task_report": {
# "subtask_id": "ENG-001",
# "completion_status": "partial",
# "work_summary": "Completed initial OAuth2 implementation, working on token refresh mechanism",
# "time_spent": "2 days",
# "challenges": ["Token refresh edge cases", "Rate limiting considerations"],
# "next_steps": "Implement token refresh mechanism and add rate limiting protection"
# }
# }
# DEPARTMENT_REPORT_EXAMPLE = {
# "department_report": {
# "department_name": "Engineering",
# "task_summary": "Making good progress on OAuth2 implementation, but waiting on design team for UI components",
# "overall_status": "on_track",
# "completion_percentage": 0.4,
# "key_achievements": [
# "Completed Google OAuth2 basic flow",
# "Set up secure token storage"
# ],
# "blockers": ["Waiting for UI mockups from design team"],
# "resource_needs": ["Additional QA resources for testing"],
# "next_milestones": [
# "Complete Facebook OAuth2 integration",
# "Implement token refresh mechanism"
# ]
# }
# }
# CEO_FINAL_REPORT_EXAMPLE = {
# "final_report": {
# "task_overview": "Successfully implemented new authentication system with social login capabilities",
# "overall_status": "successful",
# "department_summaries": [
# {
# "department": "Engineering",
# "contribution": "Implemented secure OAuth2 integrations and token management",
# "performance": "Excellent - completed all technical requirements"
# },
# {
# "department": "Design",
# "contribution": "Created intuitive UI/UX for authentication flows",
# "performance": "Good - delivered all required designs on time"
# }
# ],
# "final_results": "New authentication system is live and processing 1000+ logins per day",
# "recommendations": [
# "Add more social login providers",
# "Implement biometric authentication",
# "Add two-factor authentication"
# ],
# "next_steps": "Monitor system performance and gather user feedback for improvements"
# }
# }
class AutoCorp:
def __init__(
self,
name: str = "AutoCorp",
description: str = "A company that uses agents to automate tasks",
departments: List[Department] = [],
ceo: Agent = None,
):
self.name = name
self.description = description
self.departments = departments
self.ceo = ceo
self.conversation = Conversation()
# Check if the CEO and departments are set
self.reliability_check()
# Add departments to conversation
self.add_departments_to_conversation()
# Initialize the CEO agent
self.initialize_ceo_agent()
# Initialize the department leaders
self.setup_department_leaders()
# Initialize the department employees
self.department_employees_initialize()
def initialize_ceo_agent(self):
self.ceo.tools_list_dictionary = [
CEO_SCHEMA,
CEO_FINAL_REPORT_SCHEMA,
]
def setup_department_leaders(self):
self.department_leader_initialize()
self.initialize_department_leaders()
def department_leader_initialize(self):
"""Initialize each department leader with their department's context."""
for department in self.departments:
# Create a context dictionary for the department
department_context = {
"name": department.name,
"description": department.description,
"employees": list_all_agents(
department.employees,
self.conversation,
department.name,
False,
),
}
# Convert the context to a string
context_str = any_to_str(department_context)
# TODO: Add the department leader's tools and context
department.leader.system_prompt += f"""
You are the leader of the {department.name} department.
Department Context:
{context_str}
Your role is to:
1. Break down tasks into subtasks
2. Assign subtasks to appropriate employees
3. Track progress and manage blockers
4. Report back to the CEO
Use the provided tools to manage your department effectively.
"""
def department_employees_initialize(self):
"""Initialize each department leader with their department's context."""
for department in self.departments:
# Create a context dictionary for the department
department_context = {
"name": department.name,
"description": department.description,
"employees": list_all_agents(
department.employees,
self.conversation,
department.name,
False,
),
"leader": department.leader_name,
}
print(department_context)
# Convert the context to a string
context_str = any_to_str(department_context)
# Set the department leader's tools and context
department.employees.system_prompt += f"""
You are an employee of the {department.name} department.
Department Context:
{context_str}
Your role is to:
1. Break down tasks into subtasks
2. Assign subtasks to appropriate employees
3. Track progress and manage blockers
4. Report back to the CEO
Use the provided tools to manage your department effectively.
"""
def initialize_department_leaders(self):
# Use list comprehension for faster initialization
[
setattr(
dept.leader,
"tools_list_dictionary",
[DEPARTMENT_LEADER_SCHEMA],
)
for dept in self.departments
]
def reliability_check(self):
if self.ceo is None:
raise ValueError("CEO is not set")
if self.departments is None:
raise ValueError("No departments are set")
if len(self.departments) == 0:
raise ValueError("No departments are set")
def add_departments_to_conversation(self):
# Batch process departments using list comprehension
messages = [
{
"role": "System",
"content": f"Team: {dept.name}\nDescription: {dept.description}\nLeader: {dept.leader_name}\nAgents: {list_all_agents(dept.employees, self.conversation, dept.name, False)}",
}
for dept in self.departments
]
self.conversation.batch_add(messages)
# def add_department(self, department: Department):
# self.departments.append(department)
# def add_employee(self, employee: Union[Agent, Callable]):
# self.departments[-1].employees.append(employee)
# def add_ceo(self, ceo: Agent):
# self.ceo = ceo
# def add_employee_to_department(
# self, employee: Union[Agent, Callable], department: Department
# ):
# department.employees.append(employee)
# def add_leader_to_department(
# self, leader: Agent, department: Department
# ):
# department.leader = leader
# def add_department_to_auto_corp(self, department: Department):
# self.departments.append(department)
# def add_ceo_to_auto_corp(self, ceo: Agent):
# self.ceo = ceo
# def add_employee_to_ceo(self, employee: Union[Agent, Callable]):
# self.ceo.employees.append(employee)
def run(self, task: str):
self.ceo_to_department_leaders(task)
# Then the department leaders to employees
def ceo_to_department_leaders(self, task: str):
orders = self.ceo.run(
f"History: {self.conversation.get_str()}\n Your Current Task: {task}"
)
orders = str_to_dict(orders)
for department in orders["tasks"]["selected_departments"]:
department_leader = self.departments[department].leader
# Get the department leader to break down the task
outputs = department_leader.run(
orders["tasks"]["selected_leaders"]
)
# Add the department leader's response to the conversation
self.conversation.add(
role=f"{department_leader.name} from {department}",
content=outputs,
)