fix system prompt, fix agent prompting swarms, new auto swarm builder, auto swarm builder docs, and more

pull/831/head
Kye Gomez 1 week ago
parent 0948fdff43
commit cc387abaf6

@ -1,6 +1,6 @@
<div align="center">
<a href="https://swarms.world">
<img src="https://github.com/kyegomez/swarms/blob/master/images/swarmslogobanner.png" style="margin: 15px; max-width: 300px" width="50%" alt="Logo">
<img src="https://github.com/kyegomez/swarms/blob/master/images/swarmslogobanner.png" style="margin: 15px; max-width: 500px" width="50%" alt="Logo">
</a>
</div>
<p align="center">
@ -139,28 +139,41 @@ Refer to our documentation for production grade implementation details.
## Install 💻
Install the following packages with copy and paste
### Using pip
```bash
$ pip3 install -U swarms swarms-memory
$ pip3 install -U swarms
```
### Using uv (Recommended)
[uv](https://github.com/astral-sh/uv) is a fast Python package installer and resolver, written in Rust.
## [Optional] Onboarding
```bash
# Install uv
$ curl -LsSf https://astral.sh/uv/install.sh | sh
Now that you have downloaded swarms with `pip3 install -U swarms`, we get access to the `CLI`. Get Onboarded with CLI Now with:
# Install swarms using uv
$ uv pip install swarms
```
### Using poetry
```bash
swarms onboarding
```
# Install poetry if you haven't already
$ curl -sSL https://install.python-poetry.org | python3 -
You can also run this command for help:
# Add swarms to your project
$ poetry add swarms
```
### From source
```bash
swarms help
```
# Clone the repository
$ git clone https://github.com/kyegomez/swarms.git
$ cd swarms
For more documentation on the CLI [CLICK HERE](https://docs.swarms.world/en/latest/swarms/cli/main/)
# Install with pip
$ pip install -e .
```
---
@ -178,7 +191,8 @@ Here are some example scripts to get you started. For more comprehensive documen
---
## `Agent` Class
The `Agent` class is a fundamental component of the Swarms framework, designed to execute tasks autonomously. It fuses llms, tools and long-term memory capabilities to create a full stack agent. The `Agent` class is highly customizable, allowing for fine-grained control over its behavior and interactions.
The `Agent` class is a customizable autonomous component of the Swarms framework that integrates LLMs, tools, and long-term memory. Its `run` method processes text tasks and optionally handles image inputs through vision-language models.
```mermaid
graph TD
@ -191,8 +205,6 @@ graph TD
G --> C
```
### `run` Method
The `run` method is the primary entry point for executing tasks with an `Agent` instance. It accepts a task string as the main input task and processes it according to the agent's configuration. And, it can also accept an `img` parameter such as `img="image_filepath.png` to process images if you have a VLM attached such as `GPT4VisionAPI`
@ -214,6 +226,7 @@ agent.run("What is the current market trend for tech stocks?")
```
### Settings and Customization
The `Agent` class offers a range of settings to tailor its behavior to specific needs. Some key settings include:
| Setting | Description | Default Value |
@ -269,6 +282,7 @@ agent.run(
-----
### Integrating RAG with Swarms for Enhanced Long-Term Memory
`Agent` equipped with quasi-infinite long term memory using RAG (Relational Agent Graph) for advanced document understanding, analysis, and retrieval capabilities.
**Mermaid Diagram for RAG Integration**
@ -1254,7 +1268,7 @@ print("Execution results:", results)
## `MixtureOfAgents`
The MixtureOfAgents architecture, inspired by together.ais paper (arXiv:2406.04692), achieves SOTA performance on AlpacaEval 2.0, MT-Bench, and FLASK, surpassing GPT-4 Omni. It processes tasks via parallel agent collaboration and sequential layering, with documentation [HERE](https://docs.swarms.world/en/latest/swarms/structs/moa/)
The MixtureOfAgents architecture, inspired by together.ai's paper (arXiv:2406.04692), achieves SOTA performance on AlpacaEval 2.0, MT-Bench, and FLASK, surpassing GPT-4 Omni. It processes tasks via parallel agent collaboration and sequential layering, with documentation [HERE](https://docs.swarms.world/en/latest/swarms/structs/moa/)
```python

@ -0,0 +1,17 @@
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
from dotenv import load_dotenv
load_dotenv()
swarm = AutoSwarmBuilder(
name="My Swarm",
description="My Swarm Description",
verbose=True,
max_loops=1,
)
result = swarm.run(
task="Build a swarm to write a research paper on the topic of AI"
)
print(result)

@ -0,0 +1,14 @@
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
example = AutoSwarmBuilder(
name="ChipDesign-Swarm",
description="A swarm of specialized AI agents collaborating on chip architecture, logic design, verification, and optimization to create novel semiconductor designs",
max_loops=1,
)
print(
example.run(
"Design a new AI accelerator chip optimized for transformer model inference. Consider the following aspects: 1) Overall chip architecture and block diagram 2) Memory hierarchy and interconnects 3) Processing elements and data flow 4) Power and thermal considerations 5) Physical layout recommendations -> "
)
)

@ -0,0 +1,643 @@
"""
CEO -> Finds department leader
Department leader -> Finds employees
Employees -> Do the work
Todo
- Create schemas that enable the ceo to find the department leader or leaders
- CEO then distributes orders to department leaders or just one leader
- Department leader then distributes orders to employees
- Employees can choose to do the work or delegate to another employee or work together
- When the employees are done, they report back to the department leader
- Department leader then reports back to the ceo
- CEO then reports back to the user
Logic
- dynamically setup conversations for each department -- Feed context to each agent in the department
- Feed context to each agent in the department
"""
from typing import Callable, List, Union
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import list_all_agents
from swarms.utils.str_to_dict import str_to_dict
from swarms.utils.any_to_str import any_to_str
class Department(BaseModel):
name: str = Field(description="The name of the department")
description: str = Field(
description="A description of the department"
)
employees: List[Union[Agent, Callable]] = Field(
description="A list of employees in the department"
)
leader_name: str = Field(
description="The name of the leader of the department"
)
class Config:
arbitrary_types_allowed = True
CEO_SCHEMA = {
"name": "delegate_task_to_department",
"description": "CEO function to analyze and delegate tasks to appropriate department leaders",
"parameters": {
"type": "object",
"properties": {
"thought": {
"type": "string",
"description": "Reasoning about the task, its requirements, and potential approaches",
},
"plan": {
"type": "string",
"description": "Structured plan for how to accomplish the task across departments",
},
"tasks": {
"type": "object",
"properties": {
"task_description": {"type": "string"},
"selected_departments": {
"type": "array",
"items": {"type": "string"},
"description": "List of department names that should handle this task",
},
"selected_leaders": {
"type": "array",
"items": {"type": "string"},
"description": "List of department leaders to assign the task to",
},
"success_criteria": {"type": "string"},
},
"required": [
"task_description",
"selected_departments",
"selected_leaders",
],
},
},
"required": ["thought", "plan", "tasks"],
},
}
DEPARTMENT_LEADER_SCHEMA = {
"name": "manage_department_task",
"description": "Department leader function to break down and assign tasks to employees",
"parameters": {
"type": "object",
"properties": {
"task_management": {
"type": "object",
"properties": {
"original_task": {"type": "string"},
"subtasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"description": {"type": "string"},
"assigned_employees": {
"type": "array",
"items": {"type": "string"},
},
"estimated_duration": {
"type": "string"
},
"dependencies": {
"type": "array",
"items": {"type": "string"},
},
},
},
},
"progress_tracking": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": [
"not_started",
"in_progress",
"completed",
],
},
"completion_percentage": {
"type": "number"
},
"blockers": {
"type": "array",
"items": {"type": "string"},
},
},
},
},
"required": ["original_task", "subtasks"],
}
},
"required": ["task_management"],
},
}
EMPLOYEE_SCHEMA = {
"name": "handle_assigned_task",
"description": "Employee function to process and execute assigned tasks",
"parameters": {
"type": "object",
"properties": {
"thought": {
"type": "string",
"description": "Reasoning about the task, its requirements, and potential approaches",
},
"plan": {
"type": "string",
"description": "Structured plan for how to accomplish the task across departments",
},
"task_execution": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"action_taken": {
"type": "string",
"enum": [
"execute",
"delegate",
"collaborate",
],
},
"execution_details": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": [
"in_progress",
"completed",
"blocked",
],
},
"work_log": {"type": "string"},
"collaboration_partners": {
"type": "array",
"items": {"type": "string"},
},
"delegate_to": {"type": "string"},
"results": {"type": "string"},
"issues_encountered": {
"type": "array",
"items": {"type": "string"},
},
},
},
},
"required": [
"thought",
"plan",
"subtask_id",
"action_taken",
"execution_details",
],
},
},
"required": ["task_execution"],
},
}
# Status report schemas for the feedback loop
EMPLOYEE_REPORT_SCHEMA = {
"name": "submit_task_report",
"description": "Employee function to report task completion status to department leader",
"parameters": {
"type": "object",
"properties": {
"task_report": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"completion_status": {
"type": "string",
"enum": ["completed", "partial", "blocked"],
},
"work_summary": {"type": "string"},
"time_spent": {"type": "string"},
"challenges": {
"type": "array",
"items": {"type": "string"},
},
"next_steps": {"type": "string"},
},
"required": [
"subtask_id",
"completion_status",
"work_summary",
],
}
},
"required": ["task_report"],
},
}
DEPARTMENT_REPORT_SCHEMA = {
"name": "submit_department_report",
"description": "Department leader function to report department progress to CEO",
"parameters": {
"type": "object",
"properties": {
"department_report": {
"type": "object",
"properties": {
"department_name": {"type": "string"},
"task_summary": {"type": "string"},
"overall_status": {
"type": "string",
"enum": ["on_track", "at_risk", "completed"],
},
"completion_percentage": {"type": "number"},
"key_achievements": {
"type": "array",
"items": {"type": "string"},
},
"blockers": {
"type": "array",
"items": {"type": "string"},
},
"resource_needs": {
"type": "array",
"items": {"type": "string"},
},
"next_milestones": {
"type": "array",
"items": {"type": "string"},
},
},
"required": [
"department_name",
"task_summary",
"overall_status",
],
}
},
"required": ["department_report"],
},
}
CEO_FINAL_REPORT_SCHEMA = {
"name": "generate_final_report",
"description": "CEO function to compile final report for the user",
"parameters": {
"type": "object",
"properties": {
"final_report": {
"type": "object",
"properties": {
"task_overview": {"type": "string"},
"overall_status": {
"type": "string",
"enum": ["successful", "partial", "failed"],
},
"department_summaries": {
"type": "array",
"items": {
"type": "object",
"properties": {
"department": {"type": "string"},
"contribution": {"type": "string"},
"performance": {"type": "string"},
},
},
},
"final_results": {"type": "string"},
"recommendations": {
"type": "array",
"items": {"type": "string"},
},
"next_steps": {"type": "string"},
},
"required": [
"task_overview",
"overall_status",
"final_results",
],
}
},
"required": ["final_report"],
},
}
# # Example output schemas
# CEO_EXAMPLE_OUTPUT = {
# "thought": "This task requires coordination between the engineering and design departments to create a new feature. The engineering team will handle the backend implementation while design focuses on the user interface.",
# "plan": "1. Assign backend development to engineering department\n2. Assign UI/UX design to design department\n3. Set up regular sync meetings between departments\n4. Establish clear success criteria",
# "tasks": {
# "task_description": "Develop a new user authentication system with social login integration",
# "selected_departments": ["engineering", "design"],
# "selected_leaders": ["engineering_lead", "design_lead"],
# "success_criteria": "1. Social login working with 3 major providers\n2. UI/UX approved by design team\n3. Security audit passed\n4. Performance metrics met"
# }
# }
# DEPARTMENT_LEADER_EXAMPLE_OUTPUT = {
# "task_management": {
# "original_task": "Develop a new user authentication system with social login integration",
# "subtasks": [
# {
# "subtask_id": "ENG-001",
# "description": "Implement OAuth2 integration for Google",
# "assigned_employees": ["dev1", "dev2"],
# "estimated_duration": "3 days",
# "dependencies": ["DES-001"]
# },
# {
# "subtask_id": "ENG-002",
# "description": "Implement OAuth2 integration for Facebook",
# "assigned_employees": ["dev3"],
# "estimated_duration": "2 days",
# "dependencies": ["DES-001"]
# }
# ],
# "progress_tracking": {
# "status": "in_progress",
# "completion_percentage": 0.3,
# "blockers": ["Waiting for design team to provide UI mockups"]
# }
# }
# }
# EMPLOYEE_EXAMPLE_OUTPUT = {
# "thought": "The Google OAuth2 integration requires careful handling of token management and user data synchronization",
# "plan": "1. Set up Google OAuth2 credentials\n2. Implement token refresh mechanism\n3. Create user data sync pipeline\n4. Add error handling and logging",
# "task_execution": {
# "subtask_id": "ENG-001",
# "action_taken": "execute",
# "execution_details": {
# "status": "in_progress",
# "work_log": "Completed OAuth2 credential setup and initial token handling implementation",
# "collaboration_partners": ["dev2"],
# "delegate_to": None,
# "results": "Successfully implemented basic OAuth2 flow",
# "issues_encountered": ["Need to handle token refresh edge cases"]
# }
# }
# }
# EMPLOYEE_REPORT_EXAMPLE = {
# "task_report": {
# "subtask_id": "ENG-001",
# "completion_status": "partial",
# "work_summary": "Completed initial OAuth2 implementation, working on token refresh mechanism",
# "time_spent": "2 days",
# "challenges": ["Token refresh edge cases", "Rate limiting considerations"],
# "next_steps": "Implement token refresh mechanism and add rate limiting protection"
# }
# }
# DEPARTMENT_REPORT_EXAMPLE = {
# "department_report": {
# "department_name": "Engineering",
# "task_summary": "Making good progress on OAuth2 implementation, but waiting on design team for UI components",
# "overall_status": "on_track",
# "completion_percentage": 0.4,
# "key_achievements": [
# "Completed Google OAuth2 basic flow",
# "Set up secure token storage"
# ],
# "blockers": ["Waiting for UI mockups from design team"],
# "resource_needs": ["Additional QA resources for testing"],
# "next_milestones": [
# "Complete Facebook OAuth2 integration",
# "Implement token refresh mechanism"
# ]
# }
# }
# CEO_FINAL_REPORT_EXAMPLE = {
# "final_report": {
# "task_overview": "Successfully implemented new authentication system with social login capabilities",
# "overall_status": "successful",
# "department_summaries": [
# {
# "department": "Engineering",
# "contribution": "Implemented secure OAuth2 integrations and token management",
# "performance": "Excellent - completed all technical requirements"
# },
# {
# "department": "Design",
# "contribution": "Created intuitive UI/UX for authentication flows",
# "performance": "Good - delivered all required designs on time"
# }
# ],
# "final_results": "New authentication system is live and processing 1000+ logins per day",
# "recommendations": [
# "Add more social login providers",
# "Implement biometric authentication",
# "Add two-factor authentication"
# ],
# "next_steps": "Monitor system performance and gather user feedback for improvements"
# }
# }
class AutoCorp:
def __init__(
self,
name: str = "AutoCorp",
description: str = "A company that uses agents to automate tasks",
departments: List[Department] = [],
ceo: Agent = None,
):
self.name = name
self.description = description
self.departments = departments
self.ceo = ceo
self.conversation = Conversation()
# Check if the CEO and departments are set
self.reliability_check()
# Add departments to conversation
self.add_departments_to_conversation()
# Initialize the CEO agent
self.initialize_ceo_agent()
# Initialize the department leaders
self.setup_department_leaders()
# Initialize the department employees
self.department_employees_initialize()
def initialize_ceo_agent(self):
self.ceo.tools_list_dictionary = [
CEO_SCHEMA,
CEO_FINAL_REPORT_SCHEMA,
]
def setup_department_leaders(self):
self.department_leader_initialize()
self.initialize_department_leaders()
def department_leader_initialize(self):
"""Initialize each department leader with their department's context."""
for department in self.departments:
# Create a context dictionary for the department
department_context = {
"name": department.name,
"description": department.description,
"employees": list_all_agents(
department.employees,
self.conversation,
department.name,
False,
),
}
# Convert the context to a string
context_str = any_to_str(department_context)
# TODO: Add the department leader's tools and context
department.leader.system_prompt += f"""
You are the leader of the {department.name} department.
Department Context:
{context_str}
Your role is to:
1. Break down tasks into subtasks
2. Assign subtasks to appropriate employees
3. Track progress and manage blockers
4. Report back to the CEO
Use the provided tools to manage your department effectively.
"""
def department_employees_initialize(self):
"""Initialize each department leader with their department's context."""
for department in self.departments:
# Create a context dictionary for the department
department_context = {
"name": department.name,
"description": department.description,
"employees": list_all_agents(
department.employees,
self.conversation,
department.name,
False,
),
"leader": department.leader_name,
}
print(department_context)
# Convert the context to a string
context_str = any_to_str(department_context)
# Set the department leader's tools and context
department.employees.system_prompt += f"""
You are an employee of the {department.name} department.
Department Context:
{context_str}
Your role is to:
1. Break down tasks into subtasks
2. Assign subtasks to appropriate employees
3. Track progress and manage blockers
4. Report back to the CEO
Use the provided tools to manage your department effectively.
"""
def initialize_department_leaders(self):
# Use list comprehension for faster initialization
[
setattr(
dept.leader,
"tools_list_dictionary",
[DEPARTMENT_LEADER_SCHEMA],
)
for dept in self.departments
]
def reliability_check(self):
if self.ceo is None:
raise ValueError("CEO is not set")
if self.departments is None:
raise ValueError("No departments are set")
if len(self.departments) == 0:
raise ValueError("No departments are set")
def add_departments_to_conversation(self):
# Batch process departments using list comprehension
messages = [
{
"role": "System",
"content": f"Team: {dept.name}\nDescription: {dept.description}\nLeader: {dept.leader_name}\nAgents: {list_all_agents(dept.employees, self.conversation, dept.name, False)}",
}
for dept in self.departments
]
self.conversation.batch_add(messages)
# def add_department(self, department: Department):
# self.departments.append(department)
# def add_employee(self, employee: Union[Agent, Callable]):
# self.departments[-1].employees.append(employee)
# def add_ceo(self, ceo: Agent):
# self.ceo = ceo
# def add_employee_to_department(
# self, employee: Union[Agent, Callable], department: Department
# ):
# department.employees.append(employee)
# def add_leader_to_department(
# self, leader: Agent, department: Department
# ):
# department.leader = leader
# def add_department_to_auto_corp(self, department: Department):
# self.departments.append(department)
# def add_ceo_to_auto_corp(self, ceo: Agent):
# self.ceo = ceo
# def add_employee_to_ceo(self, employee: Union[Agent, Callable]):
# self.ceo.employees.append(employee)
def run(self, task: str):
self.ceo_to_department_leaders(task)
# Then the department leaders to employees
def ceo_to_department_leaders(self, task: str):
orders = self.ceo.run(
f"History: {self.conversation.get_str()}\n Your Current Task: {task}"
)
orders = str_to_dict(orders)
for department in orders["tasks"]["selected_departments"]:
department_leader = self.departments[department].leader
# Get the department leader to break down the task
outputs = department_leader.run(
orders["tasks"]["selected_leaders"]
)
# Add the department leader's response to the conversation
self.conversation.add(
role=f"{department_leader.name} from {department}",
content=outputs,
)

@ -16,8 +16,8 @@ Welcome to the Swarms ecosystem. Click any tile below to explore our products, c
display: block;
padding: 1.2rem;
border-radius: 12px;
background: #1e1e2f;
color: white;
background: #000000;
color: #ff0000;
text-decoration: none;
text-align: center;
font-weight: 600;
@ -27,7 +27,7 @@ Welcome to the Swarms ecosystem. Click any tile below to explore our products, c
.resource-card:hover {
transform: translateY(-4px);
background: #2a2a3d;
background: #1a0000;
}
</style>

@ -245,6 +245,7 @@ nav:
- Various Execution Methods: "swarms/structs/various_execution_methods.md"
- Hybrid Hierarchical-Cluster Swarm: "swarms/structs/hhcs.md"
- Deep Research Swarm: "swarms/structs/deep_research_swarm.md"
- Auto Swarm Builder: "swarms/structs/auto_swarm_builder.md"
- Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
- SequentialWorkflow: "swarms/structs/sequential_workflow.md"
@ -258,7 +259,6 @@ nav:
- Finance: "swarms_tools/finance.md"
- Search: "swarms_tools/search.md"
- Social Media:
- Overview: "swarms_tools/social_media.md"
- Twitter: "swarms_tools/twitter.md"
- Swarms Memory:

@ -0,0 +1,179 @@
# AutoSwarmBuilder Documentation
The `AutoSwarmBuilder` is a powerful class that automatically builds and manages swarms of AI agents to accomplish complex tasks. It uses a boss agent to delegate work and create specialized agents as needed.
## Overview
The AutoSwarmBuilder is designed to:
- Automatically create and coordinate multiple AI agents
- Delegate tasks to specialized agents
- Manage communication between agents
- Handle complex workflows through a swarm router
## Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| name | str | None | The name of the swarm |
| description | str | None | A description of the swarm's purpose |
| verbose | bool | True | Whether to output detailed logs |
| max_loops | int | 1 | Maximum number of execution loops |
| random_models | bool | True | Whether to use random models for agents |
## Core Methods
### run(task: str, *args, **kwargs)
Executes the swarm on a given task.
**Parameters:**
- `task` (str): The task to execute
- `*args`: Additional positional arguments
- `**kwargs`: Additional keyword arguments
**Returns:**
- The result of the swarm execution
### create_agents(task: str)
Creates specialized agents for a given task.
**Parameters:**
- `task` (str): The task to create agents for
**Returns:**
- List[Agent]: List of created agents
### build_agent(agent_name: str, agent_description: str, agent_system_prompt: str)
Builds a single agent with specified parameters.
**Parameters:**
- `agent_name` (str): Name of the agent
- `agent_description` (str): Description of the agent
- `agent_system_prompt` (str): System prompt for the agent
**Returns:**
- Agent: The constructed agent
### batch_run(tasks: List[str])
Executes the swarm on multiple tasks.
**Parameters:**
- `tasks` (List[str]): List of tasks to execute
**Returns:**
- List[Any]: Results from each task execution
## Examples
### Example 1: Content Creation Swarm
```python
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
# Initialize the swarm builder
swarm = AutoSwarmBuilder(
name="Content Creation Swarm",
description="A swarm specialized in creating high-quality content"
)
# Run the swarm on a content creation task
result = swarm.run(
"Create a comprehensive blog post about artificial intelligence in healthcare, "
"including current applications, future trends, and ethical considerations."
)
```
### Example 2: Data Analysis Swarm
```python
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
# Initialize the swarm builder
swarm = AutoSwarmBuilder(
name="Data Analysis Swarm",
description="A swarm specialized in data analysis and visualization"
)
# Run the swarm on a data analysis task
result = swarm.run(
"Analyze the provided sales data and create a detailed report with visualizations "
"showing trends, patterns, and recommendations for improvement."
)
```
### Example 3: Batch Processing Multiple Tasks
```python
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
# Initialize the swarm builder
swarm = AutoSwarmBuilder(
name="Multi-Task Swarm",
description="A swarm capable of handling multiple diverse tasks"
)
# Define multiple tasks
tasks = [
"Create a marketing strategy for a new product launch",
"Analyze customer feedback and generate improvement suggestions",
"Develop a project timeline for the next quarter"
]
# Run the swarm on all tasks
results = swarm.batch_run(tasks)
```
## Best Practices
!!! tip "Task Definition"
- Provide clear, specific task descriptions
- Include any relevant context or constraints
- Specify expected output format if needed
!!! note "Configuration"
- Set appropriate `max_loops` based on task complexity
- Use `verbose=True` during development for debugging
- Consider using `random_models=True` for diverse agent capabilities
!!! warning "Error Handling"
- The class includes comprehensive error handling
- All methods include try-catch blocks with detailed logging
- Errors are propagated with full stack traces for debugging
## Notes
!!! info "Architecture"
- The AutoSwarmBuilder uses a sophisticated boss agent system to coordinate tasks
- Agents are created dynamically based on task requirements
- The system includes built-in logging and error handling
- Results are returned in a structured format for easy processing

@ -25,7 +25,6 @@ The `Conversation` class is a powerful tool for managing and structuring convers
- `save_as_json(self, filename: str)`
- `load_from_json(self, filename: str)`
- `search_keyword_in_conversation(self, keyword: str)`
- `pretty_print_conversation(self, messages)`
---
@ -130,12 +129,6 @@ The `Conversation` class is designed to manage conversations by keeping track of
- `keyword (str)`: The keyword to search for.
- **Returns**: A list of messages containing the keyword.
#### `pretty_print_conversation(self, messages)`
- **Description**: Pretty prints a list of messages with colored role indicators.
- **Parameters**:
- `messages (list)`: A list of messages to print.
## Examples
Here are some usage examples of the `Conversation` class:
@ -248,13 +241,7 @@ You can search for messages containing a specific keyword within the conversatio
results = conv.search_keyword_in_conversation("Hello")
```
### Pretty Printing
The `pretty_print_conversation` method provides a visually appealing way to display messages with colored role indicators:
```python
conv.pretty_print_conversation(conv.conversation_history)
```
These examples demonstrate the versatility of the `Conversation` class in managing and interacting with conversation data. Whether you're building a chatbot, conducting analysis, or simply organizing dialogues, this class offers a robust set of tools to help you accomplish your goals.

@ -1,196 +0,0 @@
# Swarms Telemetry API Documentation
This documentation covers the API for handling telemetry data. The API is implemented using Next.js, Supabase for data storage, and Zod for request validation. The handler processes incoming telemetry data, validates it, and stores it in a Supabase database. The handler also includes robust error handling and retries for database insertions to ensure data reliability.
## Endpoint
- **URL:** `/api/telemetry`
- **Method:** `POST`
- **Content-Type:** `application/json`
- **Description:** Receives telemetry data and stores it in the Supabase database.
## Request Schema
The API expects a JSON object in the request body that matches the following schema, validated using Zod:
| Field Name | Type | Required | Description |
|---------------------|----------|----------|-----------------------------------------------------------|
| `data` | `any` | No | Telemetry data payload. |
| `swarms_api_key` | `string` | No | API key associated with the swarms framework. |
| `status` | `string` | No | Status of the telemetry data. Default is `'received'`. |
| `processing_time` | `string` | No | Time taken to process the telemetry data. |
## Response
### Success Response
- **Status Code:** `200 OK`
- **Content-Type:** `application/json`
- **Body:**
```json
{
"message": "Telemetry data received and stored successfully"
}
```
### Error Responses
- **Status Code:** `400 Bad Request`
- **Content-Type:** `application/json`
- **Body:**
```json
{
"error": "Invalid data format",
"details": [
// Zod validation error details
]
}
```
- **Status Code:** `405 Method Not Allowed`
- **Content-Type:** `application/json`
- **Body:**
```json
{
"error": "Method Not Allowed"
}
```
- **Status Code:** `500 Internal Server Error`
- **Content-Type:** `application/json`
- **Body:**
```json
{
"error": "Internal Server Error",
"details": "Error message"
}
```
## Example Usage
### Python (Using `requests` Library)
```python
import requests
url = "https://swarms.world/api/telemetry"
headers = {
"Content-Type": "application/json"
}
data = {
"data": {"example_key": "example_value"},
"swarms_api_key": "your_swarms_api_key",
"status": "received",
"processing_time": "123ms"
}
response = requests.post(url, json=data, headers=headers)
print(response.status_code)
print(response.json())
```
### Node.js (Using `axios` Library)
```javascript
const axios = require('axios');
const url = 'https://swarms.world/api/telemetry';
const data = {
data: { example_key: 'example_value' },
swarms_api_key: 'your_swarms_api_key',
status: 'received',
processing_time: '123ms'
};
axios.post(url, data)
.then(response => {
console.log(response.status);
console.log(response.data);
})
.catch(error => {
console.error(error.response.status);
console.error(error.response.data);
});
```
### Go (Using `net/http` and `encoding/json`)
```go
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
func main() {
url := "https://swarms.world/api/telemetry"
data := map[string]interface{}{
"data": map[string]interface{}{"example_key": "example_value"},
"swarms_api_key": "your_swarms_api_key",
"status": "received",
"processing_time": "123ms",
}
jsonData, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling JSON:", err)
return
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
fmt.Println("Error creating request:", err)
return
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Println("Error making request:", err)
return
}
defer resp.Body.Close()
fmt.Println("Response status:", resp.Status)
}
```
### cURL Command
```bash
curl -X POST https://swarms.world/api/telemetry \
-H "Content-Type: application/json" \
-d '{
"data": {"example_key": "example_value"},
"swarms_api_key": "your_swarms_api_key",
"status": "received",
"processing_time": "123ms"
}'
```
### Supabase Table Structure
The Supabase table (presumably `swarms_framework_schema`) should have the following columns:
- **`data`**: JSONB or TEXT - Stores the telemetry data payload.
- **`swarms_api_key`**: TEXT - Stores the API key associated with the data.
- **`source_ip`**: TEXT - Stores the IP address of the request source.
- **`status`**: TEXT - Stores the status of the data processing.
- **`processing_time`**: TEXT - Stores the time taken to process the telemetry data.
## References and Further Reading
- [Next.js API Routes Documentation](https://nextjs.org/docs/api-routes/introduction)
- [Supabase JavaScript Client](https://supabase.com/docs/reference/javascript/supabase-client)
- [Zod Schema Validation](https://zod.dev/)
- [OpenAPI Specification](https://swagger.io/specification/)
This documentation is designed to be thorough and provide all the necessary details for developers to effectively use and integrate with the telemetry API.

@ -12,6 +12,9 @@ agent = Agent(
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
interactive=False,
output_type="dict",
)
agent.run("Conduct an analysis of the best real undervalued ETFs")
print(
agent.run("Conduct an analysis of the best real undervalued ETFs")
)

@ -1,68 +0,0 @@
import os
from swarms import Agent
from swarm_models import OpenAIChat
from swarms.structs.agents_available import showcase_available_agents
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the Claims Director agent
director_agent = Agent(
agent_name="ClaimsDirector",
agent_description="Oversees and coordinates the medical insurance claims processing workflow",
system_prompt="""You are the Claims Director responsible for managing the medical insurance claims process.
Assign and prioritize tasks between claims processors and auditors. Ensure claims are handled efficiently
and accurately while maintaining compliance with insurance policies and regulations.""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="director_agent.json",
)
# Initialize Claims Processor agent
processor_agent = Agent(
agent_name="ClaimsProcessor",
agent_description="Reviews and processes medical insurance claims, verifying coverage and eligibility",
system_prompt="""Review medical insurance claims for completeness and accuracy. Verify patient eligibility,
coverage details, and process claims according to policy guidelines. Flag any claims requiring special review.""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="processor_agent.json",
)
# Initialize Claims Auditor agent
auditor_agent = Agent(
agent_name="ClaimsAuditor",
agent_description="Audits processed claims for accuracy and compliance with policies and regulations",
system_prompt="""Audit processed insurance claims for accuracy and compliance. Review claim decisions,
identify potential fraud or errors, and ensure all processing follows established guidelines and regulations.""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="auditor_agent.json",
)
# Create a list of agents
agents = [director_agent, processor_agent, auditor_agent]
print(showcase_available_agents(agents=agents))

@ -3,7 +3,9 @@ import time
import psutil
from swarms.structs.agent import Agent
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
# Initialize the agent (no external imports or env lookups needed here)
agent = Agent(
@ -24,6 +26,7 @@ agent = Agent(
model_name="gpt-4o-mini",
)
# Helper decorator to measure time and memory usage
def measure_time_and_memory(func):
def wrapper(*args, **kwargs):
@ -31,23 +34,29 @@ def measure_time_and_memory(func):
result = func(*args, **kwargs)
elapsed = time.time() - start
mem_mb = psutil.Process().memory_info().rss / 1024**2
print(f"[{func.__name__}] Time: {elapsed:.2f}s | Memory: {mem_mb:.2f} MB")
print(
f"[{func.__name__}] Time: {elapsed:.2f}s | Memory: {mem_mb:.2f} MB"
)
return result
return wrapper
# Async wrapper using asyncio.to_thread for the blocking call
@measure_time_and_memory
async def run_agent_async():
return await asyncio.to_thread(
agent.run,
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?"
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?",
)
# Threaded wrapper simply runs the async version in the event loop again
@measure_time_and_memory
def run_agent_in_thread():
asyncio.run(run_agent_async())
if __name__ == "__main__":
# 1) Run asynchronously
asyncio.run(run_agent_async())

@ -0,0 +1,114 @@
import os
from dotenv import load_dotenv
# Swarm imports
from swarms.structs.agent import Agent
from swarms.structs.hiearchical_swarm import (
HierarchicalSwarm,
SwarmSpec,
OrganizationalUnit,
)
from swarms.utils.function_caller_model import OpenAIFunctionCaller
# Load environment variables
load_dotenv()
# Create the agents first
research_manager = Agent(
agent_name="Research Manager",
agent_description="Manages research operations and coordinates research tasks",
system_prompt="You are a research manager responsible for overseeing research projects and coordinating research efforts.",
model_name="gpt-4o",
)
data_analyst = Agent(
agent_name="Data Analyst",
agent_description="Analyzes data and generates insights",
system_prompt="You are a data analyst specializing in processing and analyzing data to extract meaningful insights.",
model_name="gpt-4o",
)
research_assistant = Agent(
agent_name="Research Assistant",
agent_description="Assists with research tasks and data collection",
system_prompt="You are a research assistant who helps gather information and support research activities.",
model_name="gpt-4o",
)
development_manager = Agent(
agent_name="Development Manager",
agent_description="Manages development projects and coordinates development tasks",
system_prompt="You are a development manager responsible for overseeing software development projects and coordinating development efforts.",
model_name="gpt-4o",
)
software_engineer = Agent(
agent_name="Software Engineer",
agent_description="Develops and implements software solutions",
system_prompt="You are a software engineer specializing in building and implementing software solutions.",
model_name="gpt-4o",
)
qa_engineer = Agent(
agent_name="QA Engineer",
agent_description="Tests and ensures quality of software",
system_prompt="You are a QA engineer responsible for testing software and ensuring its quality.",
model_name="gpt-4o",
)
# Create organizational units with the agents
research_unit = OrganizationalUnit(
name="Research Unit",
description="Handles research and analysis tasks",
manager=research_manager,
members=[data_analyst, research_assistant],
)
development_unit = OrganizationalUnit(
name="Development Unit",
description="Handles development and implementation tasks",
manager=development_manager,
members=[software_engineer, qa_engineer],
)
# Initialize the director agent
director = OpenAIFunctionCaller(
model_name="gpt-4o",
system_prompt=(
"As the Director of this Hierarchical Agent Swarm, you are responsible for:\n"
"1. Analyzing tasks and breaking them down into subtasks\n"
"2. Assigning tasks to appropriate organizational units\n"
"3. Coordinating communication between units\n"
"4. Ensuring tasks are completed efficiently and effectively\n"
"5. Providing feedback and guidance to units as needed\n\n"
"Your decisions should be based on the capabilities of each unit and the requirements of the task."
),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.5,
base_model=SwarmSpec,
max_tokens=10000,
)
# Initialize the hierarchical swarm with the organizational units
swarm = HierarchicalSwarm(
name="Example Hierarchical Swarm",
description="A hierarchical swarm demonstrating multi-unit collaboration",
director=director,
organizational_units=[research_unit, development_unit],
max_loops=2, # Allow for feedback and iteration
output_type="dict",
)
# Example task to run through the swarm
task = """
Develop a comprehensive market analysis for a new AI-powered productivity tool.
The analysis should include:
1. Market research and competitor analysis
2. User needs and pain points
3. Technical feasibility assessment
4. Implementation recommendations
"""
# Run the task through the swarm
result = swarm.run(task)
print("Swarm Results:", result)

@ -0,0 +1,411 @@
from functools import lru_cache
from io import BytesIO
import os
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional, Union
import PyPDF2
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import set_random_models_for_agents
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.generate_keys import generate_api_key
# System prompts for each agent
CLARA_SYS_PROMPT = """
You are Clara, a meticulous and client-focused Criteria Agent specialized in understanding and validating contract requirements for a legal automation system. Your purpose is to ensure all contract criteria are clear, complete, and actionable.
KEY RESPONSIBILITIES:
- Extract and interpret contract requirements from client documents, text, or instructions.
- Validate criteria for completeness, consistency, and legal feasibility.
- Identify ambiguities, missing details, or potential risks in the requirements.
- Produce a clear, structured summary of the criteria for downstream use.
APPROACH:
- Begin with a professional introduction explaining your role in ensuring contract clarity.
- Ask targeted questions to resolve ambiguities or fill gaps in the criteria.
- Summarize and confirm requirements to ensure accuracy.
- Flag any criteria that may lead to legal or practical issues.
- Maintain strict confidentiality and data security.
OUTPUT FORMAT:
Provide a plain text summary with:
1. Validated contract criteria (e.g., parties, purpose, terms, jurisdiction).
2. Notes on any ambiguities or missing information.
3. Recommendations for clarifying or refining criteria.
"""
MASON_SYS_PROMPT = """
You are Mason, a precise and creative Contract Drafting Agent specialized in crafting exceptional, extensive legal contracts for a legal automation system. Your expertise lies in producing long, comprehensive, enforceable, and tailored contract documents that cover all possible contingencies and details.
KEY RESPONSIBILITIES:
- Draft detailed, lengthy contracts based on validated criteria provided.
- Ensure contracts are legally sound, exhaustive, and client-specific, addressing all relevant aspects thoroughly.
- Use precise language while maintaining accessibility for non-legal readers.
- Incorporate feedback from evaluations to refine and enhance drafts.
- Include extensive clauses to cover all potential scenarios, risks, and obligations.
APPROACH:
- Structure contracts with clear, detailed sections and consistent formatting.
- Include all essential elements (parties, purpose, terms, signatures, etc.) with comprehensive elaboration.
- Tailor clauses to address specific client needs, jurisdictional requirements, and potential future disputes.
- Provide in-depth explanations of terms, conditions, and contingencies.
- Highlight areas requiring further review or customization.
- Output the contract as a plain text string, avoiding markdown.
OUTPUT FORMAT:
Provide a plain text contract with:
1. Identification of parties and effective date.
2. Detailed statement of purpose and scope.
3. Exhaustive terms and conditions covering all possible scenarios.
4. Comprehensive rights and obligations of each party.
5. Detailed termination and amendment procedures.
6. Signature blocks.
7. Annotations for areas needing review (in comments).
"""
SOPHIA_SYS_PROMPT = """
You are Sophia, a rigorous and insightful Contract Evaluation Agent specialized in reviewing and improving legal contracts for a legal automation system. Your role is to evaluate contracts for quality, compliance, and clarity, providing actionable feedback to enhance the final document.
KEY RESPONSIBILITIES:
- Review draft contracts for legal risks, clarity, and enforceability.
- Identify compliance issues with relevant laws and regulations.
- Assess whether the contract meets the provided criteria and client needs.
- Provide specific, actionable feedback for revisions.
- Recommend areas requiring human attorney review.
APPROACH:
- Begin with a disclaimer that your evaluation is automated and not a substitute for human legal advice.
- Analyze the contract section by section, focusing on legal soundness and clarity.
- Highlight strengths and weaknesses, with emphasis on areas for improvement.
- Provide precise suggestions for revised language or additional clauses.
- Maintain a professional, constructive tone to support iterative improvement.
OUTPUT FORMAT:
Provide a plain text evaluation with:
1. Summary of the contract's strengths.
2. Identified issues (legal risks, ambiguities, missing elements).
3. Specific feedback for revisions (e.g., suggested clause changes).
4. Compliance notes (e.g., relevant laws or regulations).
5. Recommendations for human attorney review.
"""
class LegalSwarm:
def __init__(
self,
name: str = "Legal Swarm",
description: str = "A swarm of agents that can handle legal tasks",
max_loops: int = 1,
user_name: str = "John Doe",
documents: Optional[List[str]] = None,
output_type: str = "list",
):
"""
Initialize the LegalSwarm with a base LLM and configure agents.
Args:
llm (BaseLLM): The underlying LLM model for all agents.
max_loops (int): Maximum iterations for each agent's task.
"""
self.max_loops = max_loops
self.name = name
self.description = description
self.user_name = user_name
self.documents = documents
self.output_type = output_type
self.agents = self._initialize_agents()
self.agents = set_random_models_for_agents(self.agents)
self.conversation = Conversation()
self.handle_initial_processing()
def handle_initial_processing(self):
if self.documents:
documents_text = self.handle_documents(self.documents)
else:
documents_text = None
self.conversation.add(
role=self.user_name,
content=f"Firm Name: {self.name}\nFirm Description: {self.description}\nUser Name: {self.user_name}\nDocuments: {documents_text}",
)
def _initialize_agents(self) -> List[Agent]:
"""
Initialize all agents with their respective prompts and configurations.
Returns:
List[Agent]: List of Agent instances.
"""
return [
Agent(
agent_name="Clara-Intake-Agent",
agent_description="Handles client data intake and validation",
system_prompt=CLARA_SYS_PROMPT,
max_loops=self.max_loops,
dynamic_temperature_enabled=True,
output_type="final",
),
# Agent(
# agent_name="Riley-Report-Agent",
# agent_description="Generates client reports from intake data",
# system_prompt=RILEY_SYS_PROMPT,
# max_loops=self.max_loops,
# dynamic_temperature_enabled=True,
# output_type = "final"
# ),
Agent(
agent_name="Mason-Contract-Agent",
agent_description="Creates and updates legal contracts",
system_prompt=MASON_SYS_PROMPT,
max_loops=self.max_loops,
dynamic_temperature_enabled=True,
output_type="final",
),
Agent(
agent_name="Sophia-Counsel-Agent",
agent_description="Provides legal advice and compliance checks",
system_prompt=SOPHIA_SYS_PROMPT,
max_loops=self.max_loops,
dynamic_temperature_enabled=True,
output_type="final",
),
# Agent(
# agent_name="Ethan-Coordinator-Agent",
# agent_description="Manages workflow and communication",
# system_prompt=ETHAN_SYS_PROMPT,
# max_loops=self.max_loops,
# dynamic_temperature_enabled=True,
# output_type = "final"
# ),
]
@lru_cache(maxsize=1)
def handle_documents(self, documents: List[str]) -> str:
"""
Handle a list of documents concurrently, extracting text from PDFs and other documents.
Args:
documents (List[str]): List of document file paths to process.
Returns:
str: Combined text content from all documents.
"""
def process_document(file_path: str) -> str:
"""Process a single document and return its text content."""
if not os.path.exists(file_path):
return f"Error: File not found - {file_path}"
try:
if file_path.lower().endswith(".pdf"):
with open(file_path, "rb") as file:
pdf_reader = PyPDF2.PdfReader(file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text
else:
# Handle other document types (txt, docx, etc.)
with open(
file_path, "r", encoding="utf-8"
) as file:
return file.read()
except Exception as e:
return f"Error processing {file_path}: {str(e)}"
# Process documents concurrently
combined_text = ""
with ThreadPoolExecutor(
max_workers=min(len(documents), 4)
) as executor:
results = list(executor.map(process_document, documents))
combined_text = "\n\n".join(results)
return combined_text
def find_agent_by_name(self, name: str) -> Agent:
"""
Find an agent by their name.
"""
for agent in self.agents:
if agent.agent_name == name:
return agent
def initial_processing(self):
clara_agent = self.find_agent_by_name("Clara-Intake-Agent")
# Run Clara's agent
clara_output = clara_agent.run(
f"History: {self.conversation.get_str()}\n Create a structured summary document of the customer's case."
)
self.conversation.add(
role="Clara-Intake-Agent", content=clara_output
)
def create_contract(self, task: str):
mason_agent = self.find_agent_by_name("Mason-Contract-Agent")
mason_output = mason_agent.run(
f"History: {self.conversation.get_str()}\n Your purpose is to create a contract based on the following details: {task}"
)
self.conversation.add(
role="Mason-Contract-Agent", content=mason_output
)
artifact_id = generate_api_key(
"legal-swarm-artifact-", length=10
)
# Run Sophia's agent
sophia_agent = self.find_agent_by_name("Sophia-Counsel-Agent")
sophia_output = sophia_agent.run(
f"History: {self.conversation.get_str()}\n Your purpose is to review the contract Mason created and provide feedback."
)
self.conversation.add(
role="Sophia-Counsel-Agent", content=sophia_output
)
# Run Mason's agent
mason_agent = self.find_agent_by_name("Mason-Contract-Agent")
mason_output = mason_agent.run(
f"History: {self.conversation.get_str()}\n Your purpose is to update the contract based on the feedback Sophia provided."
)
self.conversation.add(
role="Mason-Contract-Agent", content=mason_output
)
self.create_pdf_from_string(
mason_output, f"{artifact_id}-contract.pdf"
)
def create_pdf_from_string(
self, string: str, output_path: Optional[str] = None
) -> Union[BytesIO, str]:
"""
Create a PDF from a string with proper formatting and styling.
Args:
string (str): The text content to convert to PDF
output_path (Optional[str]): If provided, save the PDF to this path. Otherwise return BytesIO object
Returns:
Union[BytesIO, str]: Either a BytesIO object containing the PDF or the path where it was saved
"""
try:
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib.styles import (
getSampleStyleSheet,
ParagraphStyle,
)
from reportlab.platypus import (
Paragraph,
SimpleDocTemplate,
)
from reportlab.lib.units import inch
# Create a buffer or file
if output_path:
doc = SimpleDocTemplate(output_path, pagesize=letter)
else:
buffer = BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=letter)
# Create styles
styles = getSampleStyleSheet()
custom_style = ParagraphStyle(
"CustomStyle",
parent=styles["Normal"],
fontSize=12,
leading=14,
spaceAfter=12,
firstLineIndent=0.5 * inch,
)
# Prepare content
story = []
paragraphs = string.split("\n\n")
for para in paragraphs:
if para.strip():
story.append(
Paragraph(para.strip(), custom_style)
)
# Build PDF
doc.build(story)
if output_path:
return output_path
else:
buffer.seek(0)
return buffer
except ImportError:
raise ImportError(
"Please install reportlab: pip install reportlab"
)
except Exception as e:
raise Exception(f"Error creating PDF: {str(e)}")
def run(self, task: str):
"""
Process an input document through the swarm, coordinating tasks among agents.
Args:
task (str): The input task or text to process.
Returns:
Dict[str, Any]: Final output including client data, report, contract, counsel, and workflow status.
"""
self.conversation.add(role=self.user_name, content=task)
self.initial_processing()
self.create_contract(task)
return history_output_formatter(
self.conversation, type=self.output_type
)
# Example usage
if __name__ == "__main__":
# Initialize the swarm
swarm = LegalSwarm(
max_loops=1,
name="TGSC's Legal Swarm",
description="A swarm of agents that can handle legal tasks",
user_name="Kye Gomez",
output_type="json",
)
# Sample document for COO employment contract
sample_document = """
Company: Swarms TGSC
Entity Type: Delaware C Corporation
Position: Chief Operating Officer (COO)
Details: Creating an employment contract for a COO position with standard executive-level terms including:
- Base salary and equity compensation $5,000,000
- Performance bonuses and incentives
- Benefits package
- Non-compete and confidentiality clauses
- Termination provisions
- Stock options and vesting schedule
- Reporting structure and responsibilities
Contact: hr@swarms.tgsc
"""
# Run the swarm
result = swarm.run(task=sample_document)
print("Swarm Output:", result)

@ -1,15 +1,4 @@
from dotenv import load_dotenv
from swarms import Agent
from swarms.tools.mcp_integration import MCPServerSseParams
load_dotenv()
server = MCPServerSseParams(
url="http://localhost:8000/sse",
timeout=10,
)
tools = [
{

@ -3,6 +3,6 @@ from swarms.tools.mcp_client import execute_mcp_tool
print(
execute_mcp_tool(
"http://0.0.0.0:8000/sse",
parameters={"name": "add", "a": 1, "b": 2},
parameters={"name": "multiply", "a": 1, "b": 2},
)
)

@ -0,0 +1,20 @@
from swarms import ReasoningAgentRouter
calculus_router = ReasoningAgentRouter(
agent_name="calculus-expert",
description="A calculus problem solving agent",
model_name="gpt-4o-mini",
system_prompt="You are a calculus expert. Solve differentiation and integration problems methodically.",
swarm_type="self-consistency",
num_samples=3, # Generate 3 samples to ensure consistency
output_type="list",
)
# Example calculus problem
calculus_problem = "Find the derivative of f(x) = x³ln(x) - 5x²"
# Get the solution
solution = calculus_router.run(calculus_problem)
print(solution)

@ -0,0 +1,69 @@
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
import json
# Create the agents first
research_manager = Agent(
agent_name="Research Manager",
agent_description="Manages research operations and coordinates research tasks",
system_prompt="You are a research manager responsible for overseeing research projects and coordinating research efforts.",
model_name="gpt-4o",
)
data_analyst = Agent(
agent_name="Data Analyst",
agent_description="Analyzes data and generates insights",
system_prompt="You are a data analyst specializing in processing and analyzing data to extract meaningful insights.",
model_name="gpt-4o",
)
research_assistant = Agent(
agent_name="Research Assistant",
agent_description="Assists with research tasks and data collection",
system_prompt="You are a research assistant who helps gather information and support research activities.",
model_name="gpt-4o",
)
development_manager = Agent(
agent_name="Development Manager",
agent_description="Manages development projects and coordinates development tasks",
system_prompt="You are a development manager responsible for overseeing software development projects and coordinating development efforts.",
model_name="gpt-4o",
)
software_engineer = Agent(
agent_name="Software Engineer",
agent_description="Develops and implements software solutions",
system_prompt="You are a software engineer specializing in building and implementing software solutions.",
model_name="gpt-4o",
)
qa_engineer = Agent(
agent_name="QA Engineer",
agent_description="Tests and ensures quality of software",
system_prompt="You are a QA engineer responsible for testing software and ensuring its quality.",
model_name="gpt-4o",
)
swarm_router = SwarmRouter(
name="Swarm Router",
description="A swarm router that routes tasks to the appropriate agents",
agents=[
research_manager,
data_analyst,
research_assistant,
development_manager,
software_engineer,
qa_engineer,
],
multi_agent_collab_prompt=True,
swarm_type="MixtureOfAgents",
output_type="dict",
)
output = swarm_router.run(
task="Write a research paper on the impact of AI on the future of work"
)
with open("output.json", "w") as f:
json.dump(output, f)

@ -1,10 +1,51 @@
from swarms.prompts.prompt import Prompt
# Aggregator system prompt
aggregator_system_prompt = Prompt(
name="aggregation_prompt",
description="Aggregate and summarize multiple agent outputs",
content="""
# aggregator_system_prompt = Prompt(
# name="aggregation_prompt",
# description="Aggregate and summarize multiple agent outputs",
# content="""
# # Multi-Agent Observer and Summarizer
# You are an advanced AI agent tasked with observing, analyzing, and summarizing the responses of multiple other AI agents. Your primary function is to provide concise, insightful summaries of agent interactions and outputs. Follow these guidelines:
# ## Core Responsibilities:
# 1. Observe and record responses from all agents in a given interaction.
# 2. Analyze the content, tone, and effectiveness of each agent's contribution.
# 3. Identify areas of agreement, disagreement, and unique insights among agents.
# 4. Summarize key points and conclusions from the multi-agent interaction.
# 5. Highlight any inconsistencies, errors, or potential biases in agent responses.
# ## Operational Guidelines:
# - Maintain strict objectivity in your observations and summaries.
# - Use clear, concise language in your reports.
# - Organize summaries in a structured format for easy comprehension.
# - Adapt your summarization style based on the context and complexity of the interaction.
# - Respect confidentiality and ethical guidelines in your reporting.
# ## Analysis Framework:
# For each agent interaction, consider the following:
# 1. Relevance: How well did each agent address the given task or query?
# 2. Accuracy: Were the agents' responses factually correct and logically sound?
# 3. Creativity: Did any agents provide unique or innovative perspectives?
# 4. Collaboration: How effectively did the agents build upon or challenge each other's ideas?
# 5. Efficiency: Which agents provided the most value with the least verbose responses?
# ## Output Format:
# Your summaries should include:
# 1. A brief overview of the interaction context
# 2. Key points from each agent's contribution
# 3. Areas of consensus and disagreement
# 4. Notable insights or breakthroughs
# 5. Potential improvements or areas for further exploration
# Remember: Your role is crucial in distilling complex multi-agent interactions into actionable insights. Strive for clarity, accuracy, and impartiality in all your summaries.
# """,
# )
# # print(aggregator_system_prompt.get_prompt())
aggregator_system_prompt_main = """
# Multi-Agent Observer and Summarizer
@ -28,7 +69,7 @@ aggregator_system_prompt = Prompt(
For each agent interaction, consider the following:
1. Relevance: How well did each agent address the given task or query?
2. Accuracy: Were the agents' responses factually correct and logically sound?
3. Creativity: Did any agents provide unique or innovative perspectives?
3. Creativity: Div any agents provide unique or innovative perspectives?
4. Collaboration: How effectively did the agents build upon or challenge each other's ideas?
5. Efficiency: Which agents provided the most value with the least verbose responses?
@ -40,14 +81,5 @@ aggregator_system_prompt = Prompt(
4. Notable insights or breakthroughs
5. Potential improvements or areas for further exploration
## Self-Improvement:
- Continuously refine your observation and summarization techniques.
- Identify patterns in agent behaviors and interactions to enhance your analytical capabilities.
- Adapt to various domains and types of agent interactions.
Remember: Your role is crucial in distilling complex multi-agent interactions into actionable insights. Strive for clarity, accuracy, and impartiality in all your summaries.
""",
)
# print(aggregator_system_prompt.get_prompt())
Remember: Your role is crucial in distilling complex mult-agent interactions into actionable insights. Strive for clarity, accuracy, and impartiality in all your summaries.
"""

@ -311,3 +311,21 @@ These principles guide your interaction with the rest of the system:
"""
MULTI_AGENT_COLLAB_PROMPT_TWO = """
You are part of a collaborative multi-agent system. Work together to solve complex tasks reliably and efficiently.
### Core Principles
1. **Clarity**: Restate tasks in your own words.
2. **Role Awareness**: Know your role and don't assume others' roles.
3. **Communication**: Share all relevant information and acknowledge others' inputs.
4. **Verification**: Use the 3C Protocol (Completeness, Coherence, Correctness).
5. **Reflection**: Continuously evaluate your actions and their impact.
### Key Protocols
- Before acting: Verify if task is already done by others
- During execution: Share reasoning and intermediate steps
- After completion: Get verification from at least one other agent
- Always: Explain your rationale and acknowledge others' contributions
"""

@ -1,6 +1,5 @@
from swarms.structs.agent import Agent
from swarms.structs.agent_builder import AgentsBuilder
from swarms.structs.agents_available import showcase_available_agents
from swarms.structs.base_structure import BaseStructure
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.base_workflow import BaseWorkflow
@ -79,6 +78,7 @@ from swarms.structs.swarming_architectures import (
staircase_swarm,
star_swarm,
)
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
__all__ = [
"Agent",
@ -135,7 +135,6 @@ __all__ = [
"run_agents_with_resource_monitoring",
"swarm_router",
"run_agents_with_tasks_concurrently",
"showcase_available_agents",
"GroupChat",
"expertise_based",
"MultiAgentRouter",
@ -148,4 +147,5 @@ __all__ = [
"HybridHierarchicalClusterSwarm",
"get_agents_info",
"get_swarms_info",
"AutoSwarmBuilder",
]

@ -337,7 +337,7 @@ class Agent:
# [Tools]
custom_tools_prompt: Optional[Callable] = None,
tool_schema: ToolUsageType = None,
output_type: HistoryOutputType = "str",
output_type: HistoryOutputType = "str-all-except-first",
function_calling_type: str = "json",
output_cleaner: Optional[Callable] = None,
function_calling_format_type: Optional[str] = "OpenAI",
@ -541,7 +541,7 @@ class Agent:
self.agent_name is not None
or self.agent_description is not None
):
prompt = f"Your Name: {self.agent_name} \n\n Your Description: {self.agent_description} \n\n {self.system_prompt}"
prompt = f"\n Your Name: {self.agent_name} \n\n Your Description: {self.agent_description} \n\n {self.system_prompt}"
else:
prompt = self.system_prompt
@ -776,17 +776,16 @@ class Agent:
try:
if isinstance(response, dict):
result = response
print(type(result))
else:
result = str_to_dict(response)
print(type(result))
output = execute_mcp_tool(
url=self.mcp_url,
parameters=result,
)
print(output)
print(type(output))
self.short_memory.add(
role="Tool Executor", content=str(output)
)
@ -833,12 +832,12 @@ class Agent:
Exception: If there's an error in tool handling
"""
try:
if self.mcp_url is not None:
self._single_mcp_tool_handling(response)
elif self.mcp_url is None and len(self.mcp_servers) > 1:
self._multiple_mcp_tool_handling(response)
else:
raise ValueError("No MCP URL or MCP Servers provided")
# if self.mcp_url is not None:
self._single_mcp_tool_handling(response)
# elif self.mcp_url is None and len(self.mcp_servers) > 1:
# self._multiple_mcp_tool_handling(response)
# else:
# raise ValueError("No MCP URL or MCP Servers provided")
except Exception as e:
logger.error(f"Error in mcp_tool_handling: {e}")
raise e
@ -1156,6 +1155,15 @@ class Agent:
) as executor:
executor.map(lambda f: f(), update_tasks)
####### MCP TOOL HANDLING #######
if (
self.mcp_servers
and self.tools_list_dictionary is not None
):
self.mcp_tool_handling(response)
####### MCP TOOL HANDLING #######
# Check and execute tools
if self.tools is not None:
out = self.parse_and_execute_tools(
@ -1186,12 +1194,6 @@ class Agent:
role=self.agent_name, content=out
)
if (
self.mcp_servers
and self.tools_list_dictionary is not None
):
self.mcp_tool_handling(response)
self.sentiment_and_evaluator(response)
success = True # Mark as successful to exit the retry loop
@ -2561,12 +2563,7 @@ class Agent:
self,
task: Optional[Union[str, Any]] = None,
img: Optional[str] = None,
device: Optional[str] = "cpu", # gpu
device_id: Optional[int] = 0,
all_cores: Optional[bool] = True,
scheduled_run_date: Optional[datetime] = None,
do_not_use_cluster_ops: Optional[bool] = True,
all_gpus: Optional[bool] = False,
*args,
**kwargs,
) -> Any:
@ -2606,7 +2603,6 @@ class Agent:
) # Sleep for a short period to avoid busy waiting
try:
# If cluster ops disabled, run directly
output = self._run(
task=task,
img=img,
@ -2616,11 +2612,6 @@ class Agent:
return output
# if self.tools_list_dictionary is not None:
# return str_to_dict(output)
# else:
# return output
except ValueError as e:
self._handle_run_error(e)

@ -1,87 +0,0 @@
from swarms.structs.agent import Agent
from typing import List
def showcase_available_agents(
agents: List[Agent],
name: str = None,
description: str = None,
format: str = "XML",
) -> str:
"""
Format the available agents in either XML or Table format.
Args:
agents (List[Agent]): A list of agents to represent
name (str, optional): Name of the swarm
description (str, optional): Description of the swarm
format (str, optional): Output format ("XML" or "Table"). Defaults to "XML"
Returns:
str: Formatted string containing agent information
"""
def truncate(text: str, max_length: int = 130) -> str:
return (
f"{text[:max_length]}..."
if len(text) > max_length
else text
)
output = []
if format.upper() == "TABLE":
output.append("\n| ID | Agent Name | Description |")
output.append("|-----|------------|-------------|")
for idx, agent in enumerate(agents):
if isinstance(agent, Agent):
agent_name = getattr(agent, "agent_name", str(agent))
description = getattr(
agent,
"description",
getattr(
agent, "system_prompt", "Unknown description"
),
)
desc = truncate(description, 50)
output.append(
f"| {idx + 1} | {agent_name} | {desc} |"
)
else:
output.append(
f"| {idx + 1} | {agent} | Unknown description |"
)
return "\n".join(output)
# Default XML format
output.append("<agents>")
if name:
output.append(f" <name>{name}</name>")
if description:
output.append(
f" <description>{truncate(description)}</description>"
)
for idx, agent in enumerate(agents):
output.append(f" <agent id='{idx + 1}'>")
if isinstance(agent, Agent):
agent_name = getattr(agent, "agent_name", str(agent))
description = getattr(
agent,
"description",
getattr(
agent, "system_prompt", "Unknown description"
),
)
output.append(f" <name>{agent_name}</name>")
output.append(
f" <description>{truncate(description)}</description>"
)
else:
output.append(f" <name>{agent}</name>")
output.append(
" <description>Unknown description</description>"
)
output.append(" </agent>")
output.append("</agents>")
return "\n".join(output)

@ -1,107 +1,131 @@
import os
from typing import List
from loguru import logger
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.structs.swarm_router import SwarmRouter
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.agents_available import showcase_available_agents
from swarms.structs.ma_utils import set_random_models_for_agents
from swarms.structs.swarm_router import SwarmRouter, SwarmRouterConfig
from dotenv import load_dotenv
load_dotenv()
logger = initialize_logger(log_folder="auto_swarm_builder")
BOSS_SYSTEM_PROMPT = """
You are an expert swarm manager and agent architect. Your role is to create and coordinate a team of specialized AI agents, each with distinct personalities, roles, and capabilities. Your primary goal is to ensure the swarm operates efficiently while maintaining clear communication and well-defined responsibilities.
### Core Principles:
1. **Deep Task Understanding**:
- First, thoroughly analyze the task requirements, breaking them down into core components and sub-tasks
- Identify the necessary skills, knowledge domains, and personality traits needed for each component
- Consider potential challenges, dependencies, and required coordination between agents
- Map out the ideal workflow and information flow between agents
2. **Agent Design Philosophy**:
- Each agent must have a clear, specific purpose and domain of expertise
- Agents should have distinct personalities that complement their roles
- Design agents to be self-aware of their limitations and when to seek help
- Ensure agents can effectively communicate their progress and challenges
3. **Agent Creation Framework**:
For each new agent, define:
- **Role & Purpose**: Clear, specific description of what the agent does and why
- **Personality Traits**: Distinct characteristics that influence how the agent thinks and communicates
- **Expertise Level**: Specific knowledge domains and skill sets
- **Communication Style**: How the agent presents information and interacts
- **Decision-Making Process**: How the agent approaches problems and makes choices
- **Limitations & Boundaries**: What the agent cannot or should not do
- **Collaboration Protocol**: How the agent works with others
4. **System Prompt Design**:
Create detailed system prompts that include:
- Role and purpose explanation
- Personality description and behavioral guidelines
- Specific capabilities and tools available
- Communication protocols and reporting requirements
- Problem-solving approach and decision-making framework
- Collaboration guidelines and team interaction rules
- Quality standards and success criteria
5. **Swarm Coordination**:
- Design clear communication channels between agents
- Establish protocols for task handoffs and information sharing
- Create feedback loops for continuous improvement
- Implement error handling and recovery procedures
- Define escalation paths for complex issues
6. **Quality Assurance**:
- Set clear success criteria for each agent and the overall swarm
- Implement verification steps for task completion
- Create mechanisms for self-assessment and improvement
- Establish protocols for handling edge cases and unexpected situations
### Output Format:
When creating a new agent or swarm, provide:
1. **Agent Design**:
- Role and purpose statement
- Personality profile
- Capabilities and limitations
- Communication style
- Collaboration protocols
2. **System Prompt**:
- Complete, detailed prompt that embodies the agent's identity
- Clear instructions for behavior and decision-making
- Specific guidelines for interaction and reporting
3. **Swarm Architecture**:
- Team structure and hierarchy
- Communication flow
- Task distribution plan
- Quality control measures
### Notes:
- Always prioritize clarity and specificity in agent design
- Ensure each agent has a unique, well-defined role
- Create detailed, comprehensive system prompts
- Maintain clear documentation of agent capabilities and limitations
- Design for scalability and adaptability
- Focus on creating agents that can work together effectively
- Consider edge cases and potential failure modes
- Implement robust error handling and recovery procedures
"""
class AgentConfig(BaseModel):
"""Configuration for an individual agent in a swarm"""
name: str = Field(
description="The name of the agent", example="Research-Agent"
description="The name of the agent",
)
description: str = Field(
description="A description of the agent's purpose and capabilities",
example="Agent responsible for researching and gathering information",
)
system_prompt: str = Field(
description="The system prompt that defines the agent's behavior",
example="You are a research agent. Your role is to gather and analyze information...",
)
# max_loops: int = Field(
# description="Maximum number of reasoning loops the agent can perform",
# example=3,
# description="The maximum number of loops for the agent to run",
# )
class Config:
arbitrary_types_allowed = True
class SwarmConfig(BaseModel):
"""Configuration for a swarm of cooperative agents"""
name: str = Field(
description="The name of the swarm",
example="Research-Writing-Swarm",
)
description: str = Field(
description="The description of the swarm's purpose and capabilities",
example="A swarm of agents that work together to research topics and write articles",
)
class AgentsConfig(BaseModel):
"""Configuration for a list of agents in a swarm"""
agents: List[AgentConfig] = Field(
description="The list of agents that make up the swarm",
example=[
AgentConfig(
name="Research-Agent",
description="Gathers information",
system_prompt="You are a research agent...",
),
AgentConfig(
name="Writing-Agent",
description="Writes content",
system_prompt="You are a writing agent...",
),
],
)
max_loops: int = Field(
description="The maximum number of loops to run the swarm",
example=1,
description="A list of agent configurations",
)
BOSS_SYSTEM_PROMPT = """
Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective.
### Instructions:
1. **Task Assignment**:
- Analyze available worker agents when a task is presented.
- Delegate tasks to existing agents with clear, direct, and actionable instructions if an appropriate agent is available.
- If no suitable agent exists, create a new agent with a fitting system prompt to handle the task.
2. **Agent Creation**:
- Name agents according to the task they are intended to perform (e.g., "Twitter Marketing Agent").
- Provide each new agent with a concise and clear system prompt that includes its role, objectives, and any tools it can utilize.
3. **Efficiency**:
- Minimize redundancy and maximize task completion speed.
- Avoid unnecessary agent creation if an existing agent can fulfill the task.
4. **Communication**:
- Be explicit in task delegation instructions to avoid ambiguity and ensure effective task execution.
- Require agents to report back on task completion or encountered issues.
5. **Reasoning and Decisions**:
- Offer brief reasoning when selecting or creating agents to maintain transparency.
- Avoid using an agent if unnecessary, with a clear explanation if no agents are suitable for a task.
# Output Format
Present your plan in clear, bullet-point format or short concise paragraphs, outlining task assignment, agent creation, efficiency strategies, and communication protocols.
# Notes
- Preserve transparency by always providing reasoning for task-agent assignments and creation.
- Ensure instructions to agents are unambiguous to minimize error.
"""
class AutoSwarmBuilder:
"""A class that automatically builds and manages swarms of AI agents.
@ -114,6 +138,7 @@ class AutoSwarmBuilder:
description (str): A description of the swarm's purpose
verbose (bool, optional): Whether to output detailed logs. Defaults to True.
max_loops (int, optional): Maximum number of execution loops. Defaults to 1.
random_models (bool, optional): Whether to use random models for agents. Defaults to True.
"""
def __init__(
@ -122,183 +147,219 @@ class AutoSwarmBuilder:
description: str = None,
verbose: bool = True,
max_loops: int = 1,
random_models: bool = True,
):
"""Initialize the AutoSwarmBuilder.
Args:
name (str): The name of the swarm
description (str): A description of the swarm's purpose
verbose (bool): Whether to output detailed logs
max_loops (int): Maximum number of execution loops
random_models (bool): Whether to use random models for agents
"""
self.name = name
self.description = description
self.verbose = verbose
self.max_loops = max_loops
self.agents_pool = []
self.random_models = random_models
logger.info(
f"Initialized AutoSwarmBuilder: {name} {description}"
f"Initializing AutoSwarmBuilder with name: {name}, description: {description}"
)
# @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def run(self, task: str, image_url: str = None, *args, **kwargs):
def run(self, task: str, *args, **kwargs):
"""Run the swarm on a given task.
Args:
task (str): The task to be accomplished
image_url (str, optional): URL of an image input if needed. Defaults to None.
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
task (str): The task to execute
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
The output from the swarm's execution
"""
logger.info(f"Running swarm on task: {task}")
agents = self._create_agents(task, image_url, *args, **kwargs)
logger.info(f"Agents created {len(agents)}")
logger.info("Routing task through swarm")
output = self.swarm_router(agents, task, image_url)
logger.info(f"Swarm execution complete with output: {output}")
return output
# @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def _create_agents(self, task: str, *args, **kwargs):
"""Create the necessary agents for a task.
Any: The result of the swarm execution
Args:
task (str): The task to create agents for
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
list: List of created agents
Raises:
Exception: If there's an error during execution
"""
logger.info("Creating agents for task")
model = OpenAIFunctionCaller(
system_prompt=BOSS_SYSTEM_PROMPT,
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.1,
base_model=SwarmConfig,
)
agents_dictionary = model.run(task)
logger.info(f"Agents dictionary: {agents_dictionary}")
try:
logger.info(f"Starting swarm execution for task: {task}")
agents = self.create_agents(task)
logger.info(f"Created {len(agents)} agents")
# Convert dictionary to SwarmConfig if needed
if isinstance(agents_dictionary, dict):
agents_dictionary = SwarmConfig(**agents_dictionary)
if self.random_models:
logger.info("Setting random models for agents")
agents = set_random_models_for_agents(agents=agents)
# Set swarm config
self.name = agents_dictionary.name
self.description = agents_dictionary.description
self.max_loops = getattr(
agents_dictionary
) # Default to 1 if not set
return self.initialize_swarm_router(
agents=agents, task=task
)
except Exception as e:
logger.error(
f"Error in swarm execution: {str(e)}", exc_info=True
)
raise
logger.info(
f"Swarm config: {self.name}, {self.description}, {self.max_loops}"
)
def create_agents(self, task: str):
"""Create agents for a given task.
# Create agents from config
agents = []
for agent_config in agents_dictionary.agents:
# Convert dict to AgentConfig if needed
if isinstance(agent_config, dict):
agent_config = AgentConfig(**agent_config)
agent = self.build_agent(
agent_name=agent_config.name,
agent_description=agent_config.description,
agent_system_prompt=agent_config.system_prompt,
)
agents.append(agent)
Args:
task (str): The task to create agents for
# Showcasing available agents
agents_available = showcase_available_agents(
name=self.name,
description=self.description,
agents=agents,
)
Returns:
List[Agent]: List of created agents
for agent in agents:
agent.system_prompt += "\n" + agents_available
Raises:
Exception: If there's an error during agent creation
"""
try:
logger.info(f"Creating agents for task: {task}")
model = OpenAIFunctionCaller(
system_prompt=BOSS_SYSTEM_PROMPT,
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.5,
base_model=AgentsConfig,
)
return agents
logger.info(
"Getting agent configurations from boss agent"
)
output = model.run(
f"Create the agents for the following task: {task}"
)
logger.debug(
f"Received agent configurations: {output.model_dump()}"
)
output = output.model_dump()
agents = []
if isinstance(output, dict):
for agent_config in output["agents"]:
logger.info(
f"Building agent: {agent_config['name']}"
)
agent = self.build_agent(
agent_name=agent_config["name"],
agent_description=agent_config["description"],
agent_system_prompt=agent_config[
"system_prompt"
],
)
agents.append(agent)
logger.info(
f"Successfully built agent: {agent_config['name']}"
)
return agents
except Exception as e:
logger.error(
f"Error creating agents: {str(e)}", exc_info=True
)
raise
def build_agent(
self,
agent_name: str,
agent_description: str,
agent_system_prompt: str,
max_loops: int = 1,
):
"""Build a single agent with the given specifications.
) -> Agent:
"""Build a single agent with enhanced error handling.
Args:
agent_name (str): Name of the agent
agent_description (str): Description of the agent's purpose
agent_system_prompt (str): The system prompt for the agent
agent_description (str): Description of the agent
agent_system_prompt (str): System prompt for the agent
Returns:
Agent: The constructed agent instance
Agent: The constructed agent
Raises:
Exception: If there's an error during agent construction
"""
logger.info(f"Building agent: {agent_name}")
agent = Agent(
agent_name=agent_name,
description=agent_description,
system_prompt=agent_system_prompt,
model_name="gpt-4o",
max_loops=max_loops,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"{agent_name}.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" soon "yaml" and
streaming_on=False,
auto_generate_prompt=True,
)
return agent
try:
agent = Agent(
agent_name=agent_name,
description=agent_description,
system_prompt=agent_system_prompt,
verbose=self.verbose,
dynamic_temperature_enabled=False,
)
logger.info(f"Successfully built agent: {agent_name}")
return agent
except Exception as e:
logger.error(
f"Error building agent {agent_name}: {str(e)}",
exc_info=True,
)
raise
def swarm_router(
self,
agents: List[Agent],
task: str,
image_url: str = None,
*args,
**kwargs,
):
"""Route tasks between agents in the swarm.
def initialize_swarm_router(self, agents: List[Agent], task: str):
"""Initialize and run the swarm router.
Args:
agents (List[Agent]): List of available agents
task (str): The task to route
image_url (str, optional): URL of an image input if needed. Defaults to None.
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
agents (List[Agent]): List of agents to use
task (str): The task to execute
Returns:
The output from the routed task execution
Any: The result of the swarm router execution
Raises:
Exception: If there's an error during router initialization or execution
"""
logger.info("Routing task through swarm")
swarm_router_instance = SwarmRouter(
name=self.name,
description=self.description,
agents=agents,
swarm_type="auto",
max_loops=1,
)
try:
logger.info("Initializing swarm router")
model = OpenAIFunctionCaller(
system_prompt=BOSS_SYSTEM_PROMPT,
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.5,
base_model=SwarmRouterConfig,
)
return swarm_router_instance.run(
self.name + " " + self.description + " " + task,
)
logger.info("Creating swarm specification")
swarm_spec = model.run(
f"Create the swarm spec for the following task: {task}"
)
logger.debug(
f"Received swarm specification: {swarm_spec.model_dump()}"
)
swarm_spec = swarm_spec.model_dump()
logger.info("Initializing SwarmRouter")
swarm_router = SwarmRouter(
name=swarm_spec["name"],
description=swarm_spec["description"],
max_loops=1,
swarm_type=swarm_spec["swarm_type"],
rearrange_flow=swarm_spec["rearrange_flow"],
rules=swarm_spec["rules"],
multi_agent_collab_prompt=swarm_spec[
"multi_agent_collab_prompt"
],
agents=agents,
output_type="dict",
)
logger.info("Starting swarm router execution")
return swarm_router.run(task)
except Exception as e:
logger.error(
f"Error in swarm router initialization/execution: {str(e)}",
exc_info=True,
)
raise
# example = AutoSwarmBuilder(
# name="ChipDesign-Swarm",
# description="A swarm of specialized AI agents collaborating on chip architecture, logic design, verification, and optimization to create novel semiconductor designs",
# max_loops=1,
# )
def batch_run(self, tasks: List[str]):
"""Run the swarm on a list of tasks.
Args:
tasks (List[str]): List of tasks to execute
Returns:
List[Any]: List of results from each task execution
Raises:
Exception: If there's an error during batch execution
"""
# print(
# example.run(
# "Design a new AI accelerator chip optimized for transformer model inference. Consider the following aspects: 1) Overall chip architecture and block diagram 2) Memory hierarchy and interconnects 3) Processing elements and data flow 4) Power and thermal considerations 5) Physical layout recommendations -> "
# )
# )
return [self.run(task) for task in tasks]

@ -93,7 +93,7 @@ class Conversation(BaseStructure):
# If system prompt is not None, add it to the conversation history
if self.system_prompt is not None:
self.add("System: ", self.system_prompt)
self.add("System", self.system_prompt)
if self.rules is not None:
self.add("User", rules)
@ -320,53 +320,6 @@ class Conversation(BaseStructure):
if keyword in msg["content"]
]
def pretty_print_conversation(self, messages):
"""Pretty print the conversation history.
Args:
messages (list): List of messages to print.
"""
role_to_color = {
"system": "red",
"user": "green",
"assistant": "blue",
"tool": "magenta",
}
for message in messages:
if message["role"] == "system":
formatter.print_panel(
f"system: {message['content']}\n",
role_to_color[message["role"]],
)
elif message["role"] == "user":
formatter.print_panel(
f"user: {message['content']}\n",
role_to_color[message["role"]],
)
elif message["role"] == "assistant" and message.get(
"function_call"
):
formatter.print_panel(
f"assistant: {message['function_call']}\n",
role_to_color[message["role"]],
)
elif message["role"] == "assistant" and not message.get(
"function_call"
):
formatter.print_panel(
f"assistant: {message['content']}\n",
role_to_color[message["role"]],
)
elif message["role"] == "tool":
formatter.print_panel(
(
f"function ({message['name']}):"
f" {message['content']}\n"
),
role_to_color[message["role"]],
)
def truncate_memory_with_tokenizer(self):
"""
Truncates the conversation history based on the total number of tokens using a tokenizer.
@ -551,6 +504,14 @@ class Conversation(BaseStructure):
]
)
def batch_add(self, messages: List[dict]):
"""Batch add messages to the conversation history.
Args:
messages (List[dict]): List of messages to add.
"""
self.conversation_history.extend(messages)
# # Example usage
# # conversation = Conversation()

@ -1,4 +1,3 @@
from concurrent.futures import ThreadPoolExecutor
import json
import os
from typing import Any, List, Optional, Union, Dict
@ -9,10 +8,15 @@ from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.output_types import OutputType
from swarms.utils.any_to_str import any_to_str
from swarms.utils.formatter import formatter
from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.structs.ma_utils import list_all_agents
logger = initialize_logger(log_folder="hierarchical_swarm")
@ -48,9 +52,6 @@ class SwarmSpec(BaseModel):
HIEARCHICAL_SWARM_SYSTEM_PROMPT = """
Below is a comprehensive production-grade hierarchical agent director prompt that is designed to break down orders, distribute tasks, and select the best worker agents to achieve the overall objectives. This prompt follows the schematic provided by the HierarchicalOrder and SwarmSpec classes and is composed of nearly 2,000 words. You can use this as your system prompt for the director agent in a multi-agent swarm system.
---
**SYSTEM PROMPT: HIERARCHICAL AGENT DIRECTOR**
@ -211,6 +212,27 @@ Remember: the success of the swarm depends on your ability to manage complexity,
"""
class TeamUnit(BaseModel):
"""Represents a team within a department."""
name: Optional[str] = Field(
None, description="The name of the team."
)
description: Optional[str] = Field(
None, description="A brief description of the team's purpose."
)
agents: Optional[List[Union[Agent, Any]]] = Field(
None,
description="A list of agents that are part of the team.",
)
team_leader: Optional[Union[Agent, Any]] = Field(
None, description="The team leader of the team."
)
class Config:
arbitrary_types_allowed = True
class HierarchicalSwarm(BaseSwarm):
"""
_Representer a hierarchical swarm of agents, with a director that orchestrates tasks among the agents.
@ -230,8 +252,9 @@ class HierarchicalSwarm(BaseSwarm):
agents: List[Union[Agent, Any]] = None,
max_loops: int = 1,
output_type: OutputType = "dict",
return_all_history: bool = False,
director_model_name: str = "gpt-4o",
teams: Optional[List[TeamUnit]] = None,
inter_agent_loops: int = 1,
*args,
**kwargs,
):
@ -244,7 +267,6 @@ class HierarchicalSwarm(BaseSwarm):
:param agents: A list of agents within the swarm.
:param max_loops: The maximum number of feedback loops between the director and agents.
:param output_type: The format in which to return the output (dict, str, or list).
:param return_all_history: A flag indicating whether to return all conversation history.
"""
super().__init__(
name=name,
@ -254,18 +276,46 @@ class HierarchicalSwarm(BaseSwarm):
self.director = director
self.agents = agents
self.max_loops = max_loops
self.return_all_history = return_all_history
self.output_type = output_type
self.director_model_name = director_model_name
self.teams = teams
self.inter_agent_loops = inter_agent_loops
self.conversation = Conversation(time_enabled=False)
self.current_loop = 0
self.agent_outputs = {} # Store agent outputs for each loop
self.add_name_and_description()
self.check_agents()
self.list_all_agents()
# Reliability checks
self.reliability_checks()
# Handle teams
self.handle_teams()
# List all agents
list_all_agents(self.agents, self.conversation, self.name)
self.director = self.setup_director()
def handle_teams(self):
if not self.teams:
return
# Use list comprehension for faster team processing
team_list = [team.model_dump() for team in self.teams]
# Use extend() instead of append() in a loop for better performance
self.agents.extend(
agent for team in team_list for agent in team["agents"]
)
# Add conversation message
self.conversation.add(
role="System",
content=f"Teams Available: {any_to_str(team_list)}",
)
def setup_director(self):
director = OpenAIFunctionCaller(
model_name=self.director_model_name,
@ -278,12 +328,15 @@ class HierarchicalSwarm(BaseSwarm):
return director
def check_agents(self):
def reliability_checks(self):
"""
Checks if there are any agents and a director set for the swarm.
Raises ValueError if either condition is not met.
"""
if not self.agents:
logger.info(f"🔍 CHECKING RELIABILITY OF SWARM: {self.name}")
if len(self.agents) == 0:
raise ValueError(
"No agents found in the swarm. At least one agent must be provided to create a hierarchical swarm."
)
@ -301,9 +354,7 @@ class HierarchicalSwarm(BaseSwarm):
"Director not set for the swarm. A director agent is required to coordinate and orchestrate tasks among the agents."
)
logger.info(
"Reliability checks have passed. Swarm is ready to execute."
)
logger.info(f"🔍 RELIABILITY CHECKS PASSED: {self.name}")
def run_director(
self, task: str, loop_context: str = "", img: str = None
@ -327,12 +378,6 @@ class HierarchicalSwarm(BaseSwarm):
# Run the director with the context
function_call = self.director.run(task=director_context)
print(function_call)
# function_call = self.check_director_agent_output(
# function_call
# )
formatter.print_panel(
f"Director Output (Loop {self.current_loop}/{self.max_loops}):\n{function_call}",
title="Director's Orders",
@ -374,10 +419,6 @@ class HierarchicalSwarm(BaseSwarm):
logger.info(
f"Starting loop {self.current_loop}/{self.max_loops}"
)
formatter.print_panel(
f"⚡ EXECUTING LOOP {self.current_loop}/{self.max_loops}",
title="SWARM EXECUTION CYCLE",
)
# Get director's orders
swarm_spec = self.run_director(
@ -416,7 +457,9 @@ class HierarchicalSwarm(BaseSwarm):
break
# Return the results in the specified format
return self.format_output()
return history_output_formatter(
self.conversation, self.output_type
)
def compile_loop_context(self, loop_number: int) -> str:
"""
@ -437,74 +480,41 @@ class HierarchicalSwarm(BaseSwarm):
return context
def format_output(self) -> Union[str, Dict, List]:
"""
Formats the output according to the specified output_type.
:return: The formatted output.
"""
if self.output_type == "str" or self.return_all_history:
return self.conversation.get_str()
elif self.output_type == "dict":
return self.conversation.return_messages_as_dictionary()
elif self.output_type == "list":
return self.conversation.return_messages_as_list()
else:
return self.conversation.get_str()
def add_name_and_description(self):
"""
Adds the swarm's name and description to the conversation.
"""
self.conversation.add(
role="System",
content=f"Swarm Name: {self.name}\nSwarm Description: {self.description}",
content=f"\n\nSwarm Name: {self.name}\n\nSwarm Description: {self.description}",
)
formatter.print_panel(
f"⚡ INITIALIZING HIERARCHICAL SWARM UNIT: {self.name}\n"
f"🔒 CLASSIFIED DIRECTIVE: {self.description}\n"
f"📡 STATUS: ACTIVATING SWARM PROTOCOLS\n"
f"🌐 ESTABLISHING SECURE AGENT MESH NETWORK\n"
f"⚠️ CYBERSECURITY MEASURES ENGAGED\n",
title="SWARM CORPORATION - HIERARCHICAL SWARMS ACTIVATING...",
)
def list_all_agents(self) -> str:
"""
Lists all agents available in the swarm.
:return: A string representation of all agents in the swarm.
"""
# Compile information about all agents
all_agents = "\n".join(
f"Agent: {agent.agent_name} || Description: {agent.description or agent.system_prompt}"
for agent in self.agents
)
# Add the agent information to the conversation
self.conversation.add(
role="System",
content=f"All Agents Available in the Swarm {self.name}:\n{all_agents}",
)
formatter.print_panel(
all_agents, title="All Agents Available in the Swarm"
)
return all_agents
def find_agent(self, name: str) -> Optional[Agent]:
"""
Finds an agent by its name within the swarm.
:param name: The name of the agent to find.
:return: The agent if found, otherwise None.
:raises: ValueError if agent is not found
"""
for agent in self.agents:
if agent.agent_name == name:
return agent
return None
try:
# Fast path: use list comprehension for quick lookup
matching_agents = [
agent
for agent in self.agents
if agent.agent_name == name
]
if not matching_agents:
error_msg = f"Agent '{name}' not found in the swarm '{self.name}'"
logger.error(error_msg)
return None
return matching_agents[0]
except Exception as e:
logger.error(f"Error finding agent '{name}': {str(e)}")
return None
def run_agent(
self, agent_name: str, task: str, img: str = None
@ -520,14 +530,6 @@ class HierarchicalSwarm(BaseSwarm):
try:
agent = self.find_agent(agent_name)
if not agent:
error_msg = f"Agent '{agent_name}' not found in the swarm '{self.name}'"
logger.error(error_msg)
self.conversation.add(
role="System", content=f"Error: {error_msg}"
)
return error_msg
# Prepare context for the agent
agent_context = (
f"Loop: {self.current_loop}/{self.max_loops}\n"
@ -541,7 +543,7 @@ class HierarchicalSwarm(BaseSwarm):
title=f"Agent Task - Loop {self.current_loop}/{self.max_loops}",
)
out = agent.run(task=agent_context, img=img)
out = agent.run(task=agent_context)
# Add the agent's output to the conversation
self.conversation.add(
@ -560,9 +562,6 @@ class HierarchicalSwarm(BaseSwarm):
f"Error running agent '{agent_name}': {str(e)}"
)
logger.error(error_msg)
self.conversation.add(
role="System", content=f"Error: {error_msg}"
)
return error_msg
def parse_orders(self, swarm_spec: SwarmSpec) -> List[Any]:
@ -590,9 +589,6 @@ class HierarchicalSwarm(BaseSwarm):
f"Error parsing and executing orders: {str(e)}"
)
logger.error(error_msg)
self.conversation.add(
role="System", content=f"Error: {error_msg}"
)
return [error_msg]
def parse_swarm_spec(
@ -668,19 +664,14 @@ class HierarchicalSwarm(BaseSwarm):
:param swarm_spec: The SwarmSpec containing the goals, plan, and rules.
"""
try:
goals = swarm_spec.goals
plan = swarm_spec.plan
rules = swarm_spec.rules
# Directly access and format attributes in one line
self.conversation.add(
role="Director",
content=f"Goals: {goals}\nPlan: {plan}\nRules: {rules}",
content=f"Goals:\n{swarm_spec.goals}\n\nPlan:\n{swarm_spec.plan}\n\nRules:\n{swarm_spec.rules}",
)
except Exception as e:
error_msg = f"Error adding goals and plan to conversation: {str(e)}"
logger.error(error_msg)
self.conversation.add(
role="System", content=f"Error: {error_msg}"
logger.error(
f"Error adding goals and plan to conversation: {str(e)}"
)
def batch_run(
@ -714,23 +705,3 @@ class HierarchicalSwarm(BaseSwarm):
"Output is neither a dictionary nor a string."
)
return {}
def concurrent_run(
self, tasks: List[str], img: str = None
) -> List[Union[str, Dict, List]]:
"""
Runs multiple tasks concurrently through the swarm.
:param tasks: The list of tasks to be executed.
:param img: Optional image to be used with the tasks.
:return: A list of outputs from each task execution.
"""
with ThreadPoolExecutor(max_workers=len(tasks)) as executor:
# Create a list of partial functions with the img parameter
task_functions = [(task, img) for task in tasks]
# Use starmap to unpack the arguments
return list(
executor.map(
lambda args: self.run(*args), task_functions
)
)

@ -0,0 +1,96 @@
from swarms.structs.agent import Agent
from typing import List, Any, Optional, Union
import random
def list_all_agents(
agents: List[Union[Agent, Any]],
conversation: Optional[Any] = None,
name: str = "",
add_to_conversation: bool = False,
) -> str:
"""Lists all agents in a swarm and optionally adds them to a conversation.
This function compiles information about all agents in a swarm, including their names and descriptions.
It can optionally add this information to a conversation history.
Args:
agents (List[Union[Agent, Any]]): List of agents to list information about
conversation (Any): Conversation object to optionally add agent information to
name (str): Name of the swarm/group of agents
add_to_conversation (bool, optional): Whether to add agent information to conversation. Defaults to False.
Returns:
str: Formatted string containing information about all agents
Example:
>>> agents = [agent1, agent2]
>>> conversation = Conversation()
>>> agent_info = list_all_agents(agents, conversation, "MySwarm")
>>> print(agent_info)
Total Agents: 2
Agent: Agent1
Description: First agent description...
Agent: Agent2
Description: Second agent description...
"""
# Compile information about all agents
total_agents = len(agents)
all_agents = f"Total Agents: {total_agents}\n\n" + "\n\n".join(
f"Agent: {agent.agent_name} \n\n Description: {agent.description or (agent.system_prompt[:50] + '...' if len(agent.system_prompt) > 50 else agent.system_prompt)}"
for agent in agents
)
if add_to_conversation:
# Add the agent information to the conversation
conversation.add(
role="System",
content=f"All Agents Available in the Swarm {name}:\n\n{all_agents}",
)
return all_agents
models = [
"anthropic/claude-3-sonnet-20240229",
"openai/gpt-4o-mini",
"openai/gpt-4o",
"deepseek/deepseek-chat",
"deepseek/deepseek-reasoner",
"groq/deepseek-r1-distill-qwen-32b",
"groq/deepseek-r1-distill-qwen-32b",
# "gemini/gemini-pro",
# "gemini/gemini-1.5-pro",
"openai/03-mini",
"o4-mini",
"o3",
"gpt-4.1",
"gpt-4.1-nano",
]
def set_random_models_for_agents(
agents: Union[List[Agent], Agent], model_names: List[str] = models
) -> Union[List[Agent], Agent]:
"""Sets random models for agents in the swarm.
Args:
agents (Union[List[Agent], Agent]): Either a single agent or a list of agents
model_names (List[str], optional): List of model names to choose from. Defaults to models.
Returns:
Union[List[Agent], Agent]: The agent(s) with randomly assigned models
"""
if isinstance(agents, list):
return [
setattr(agent, "model_name", random.choice(model_names))
or agent
for agent in agents
]
else:
setattr(agents, "model_name", random.choice(model_names))
return agents

@ -1,59 +1,20 @@
import asyncio
import os
import time
from typing import Any, Dict, List, Optional
from typing import List, Optional
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.telemetry.main import log_agent_data
from swarms.schemas.agent_step_schemas import ManySteps
from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.prompts.ag_prompt import aggregator_system_prompt_main
from swarms.utils.loguru_logger import initialize_logger
import concurrent.futures
from swarms.structs.output_types import OutputType
from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
logger = initialize_logger(log_folder="mixture_of_agents")
time_stamp = time.strftime("%Y-%m-%d %H:%M:%S")
class MixtureOfAgentsInput(BaseModel):
name: str = "MixtureOfAgents"
description: str = (
"A class to run a mixture of agents and aggregate their responses."
)
agents: List[Dict[str, Any]]
aggregator_agent: Any = Field(
...,
description="An aggregator agent to be used in the mixture.",
)
aggregator_system_prompt: str = Field(
default=aggregator_system_prompt.get_prompt(),
description=aggregator_system_prompt.description,
)
layers: int = 3
time_created: str = Field(
time_stamp,
description="The time the mixture of agents was created.",
)
class MixtureOfAgentsOutput(BaseModel):
id: str = Field(
..., description="The ID of the mixture of agents."
)
task: str = Field(..., description="None")
InputConfig: MixtureOfAgentsInput
# output: List[ManySteps]
normal_agent_outputs: List[ManySteps]
aggregator_agent_summary: str
time_completed: str = Field(
time_stamp,
description="The time the mixture of agents was completed.",
)
class MixtureOfAgents:
"""
@ -66,7 +27,7 @@ class MixtureOfAgents:
description: str = "A class to run a mixture of agents and aggregate their responses.",
agents: List[Agent] = [],
aggregator_agent: Agent = None,
aggregator_system_prompt: str = "",
aggregator_system_prompt: str = aggregator_system_prompt_main,
layers: int = 3,
max_loops: int = 1,
return_str_on: bool = False,
@ -85,31 +46,13 @@ class MixtureOfAgents:
"""
self.name = name
self.description = description
self.agents: List[Agent] = agents
self.aggregator_agent: Agent = aggregator_agent
self.aggregator_system_prompt: str = aggregator_system_prompt
self.layers: int = layers
self.max_loops: int = max_loops
self.return_str_on: bool = return_str_on
self.output_type: OutputType = output_type
self.input_schema = MixtureOfAgentsInput(
name=name,
description=description,
agents=[agent.to_dict() for agent in self.agents],
aggregator_agent=aggregator_agent.to_dict(),
aggregator_system_prompt=self.aggregator_system_prompt,
layers=self.layers,
time_created=time_stamp,
)
self.output_schema = MixtureOfAgentsOutput(
id="MixtureOfAgents",
InputConfig=self.input_schema.model_dump(),
normal_agent_outputs=[],
aggregator_agent_summary="",
task="",
)
self.agents = agents
self.aggregator_agent = aggregator_agent
self.aggregator_system_prompt = aggregator_system_prompt_main
self.layers = layers
self.max_loops = max_loops
self.return_str_on = return_str_on
self.output_type = output_type
self.reliability_check()
@ -182,9 +125,6 @@ class MixtureOfAgents:
Returns:
str: The response from the agent.
"""
# Update the task in the output schema
self.output_schema.task = task
# If there are previous responses, update the agent's system prompt
if prev_responses:
system_prompt_with_responses = (
@ -197,9 +137,7 @@ class MixtureOfAgents:
# Run the agent asynchronously
response = await asyncio.to_thread(agent.run, task)
self.output_schema.normal_agent_outputs.append(
agent.agent_output
)
self.conversation.add(agent.agent_name, response)
# Log the agent's response
print(f"Agent {agent.agent_name} response: {response}")
@ -235,7 +173,6 @@ class MixtureOfAgents:
final_result = await self._run_agent_async(
self.aggregator_agent, task, prev_responses=results
)
self.output_schema.aggregator_agent_summary = final_result
print(f"Final Aggregated Response: {final_result}")
@ -248,38 +185,18 @@ class MixtureOfAgents:
"""
try:
self.conversation.add("user", task)
prev_context = None
for _ in range(self.max_loops):
# Add previous context to task if available
current_task = (
f"{task}\n\nPrevious context:\n{prev_context}"
if prev_context
else task
)
prompt = f"History: {self.conversation.get_str()}\n\nTask: {task}"
# Run async process
asyncio.run(self._run_async(current_task))
asyncio.run(self._run_async(prompt))
# Store current results as context for next loop
prev_context = (
self.output_schema.aggregator_agent_summary
)
self.output_schema.task = task
log_agent_data(self.output_schema.model_dump())
if self.return_str_on or self.output_type == "str":
return self.conversation.get_str()
elif self.output_type == "dict":
return (
self.conversation.return_messages_as_dictionary()
)
elif self.output_type == "list":
return self.conversation.return_messages_as_list()
else:
return self.output_schema.model_dump_json(indent=4)
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
)
except Exception as e:
logger.error(f"Error running mixture of agents: {str(e)}")

@ -9,8 +9,6 @@ Todo:
"""
import os
import uuid
from datetime import datetime
from typing import List, Optional
from loguru import logger
@ -20,6 +18,10 @@ from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.output_types import OutputType
from swarms.utils.any_to_str import any_to_str
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.formatter import formatter
class AgentResponse(BaseModel):
@ -62,7 +64,7 @@ class MultiAgentRouter:
temperature: float = 0.1,
shared_memory_system: callable = None,
output_type: OutputType = "dict",
execute_task: bool = True,
if_print: bool = True,
):
"""
Initializes the MultiAgentRouter with a list of agents and configuration options.
@ -80,15 +82,15 @@ class MultiAgentRouter:
self.description = description
self.shared_memory_system = shared_memory_system
self.output_type = output_type
self.execute_task = execute_task
self.model = model
self.temperature = temperature
self.if_print = if_print
# Initialize Agents
self.agents = {agent.name: agent for agent in agents}
self.conversation = Conversation()
self.api_key = os.getenv("OPENAI_API_KEY")
if not self.api_key:
raise ValueError("OpenAI API key must be provided")
@ -99,6 +101,7 @@ class MultiAgentRouter:
system_prompt=self.boss_system_prompt,
api_key=self.api_key,
temperature=temperature,
base_model=AgentResponse,
)
def __repr__(self):
@ -140,21 +143,6 @@ class MultiAgentRouter:
Always select exactly one agent that best matches the task requirements.
"""
def find_agent_in_list(self, agent_name: str) -> Optional[Agent]:
"""
Find an agent by name in a list of agents.
Args:
agent_name (str): The name of the agent to find.
Returns:
Optional[Agent]: The agent object if found, otherwise None.
"""
for agent in self.agent_list:
if agent.name == agent_name:
return agent
return None
def route_task(self, task: str) -> dict:
"""
Routes a task to the appropriate agent and returns their response.
@ -167,15 +155,21 @@ class MultiAgentRouter:
"""
try:
self.conversation.add(role="user", content=task)
start_time = datetime.now()
# Get boss decision using function calling
boss_response = self.function_caller.get_completion(task)
boss_response = self.function_caller.run(task)
boss_response_str = any_to_str(boss_response)
if self.if_print:
formatter.print_panel(
boss_response_str,
title="Multi-Agent Router Decision",
)
else:
pass
self.conversation.add(
role="assistant", content=boss_response_str
role="Agent Router", content=boss_response_str
)
# Validate that the selected agent exists
@ -190,60 +184,16 @@ class MultiAgentRouter:
# Use the modified task if provided, otherwise use original task
final_task = boss_response.modified_task or task
# Execute the task with the selected agent if enabled
execution_start = datetime.now()
agent_response = None
execution_time = 0
# Use the agent's run method directly
agent_response = selected_agent.run(final_task)
if self.execute_task:
# Use the agent's run method directly
agent_response = selected_agent.run(final_task)
self.conversation.add(
role=selected_agent.name, content=agent_response
)
execution_time = (
datetime.now() - execution_start
).total_seconds()
else:
logger.info(
"Task execution skipped (execute_task=False)"
)
self.conversation.add(
role=selected_agent.name, content=agent_response
)
total_time = (datetime.now() - start_time).total_seconds()
result = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"task": {
"original": task,
"modified": (
final_task
if boss_response.modified_task
else None
),
},
"boss_decision": {
"selected_agent": boss_response.selected_agent,
"reasoning": boss_response.reasoning,
},
"execution": {
"agent_name": selected_agent.name,
"agent_id": selected_agent.id,
"was_executed": self.execute_task,
"response": (
agent_response if self.execute_task else None
),
"execution_time": (
execution_time if self.execute_task else None
),
},
"total_time": total_time,
}
logger.info(
f"Successfully routed task to {selected_agent.name}"
return history_output_formatter(
conversation=self.conversation, type=self.output_type
)
return result
except Exception as e:
logger.error(f"Error routing task: {str(e)}")
@ -312,31 +262,18 @@ class MultiAgentRouter:
# ),
# ]
# # Initialize routers with different configurations
# router_execute = MultiAgentRouter(agents=agents, execute_task=True)
# # router_no_execute = MultiAgentRouter(agents=agents, execute_task=False)
# # Initialize router
# router = MultiAgentRouter(agents=agents)
# # Example task
# task = "Write a Python function to calculate fibonacci numbers"
# try:
# # Process the task with execution
# print("\nWith task execution:")
# result_execute = router_execute.route_task(task)
# print(
# f"Selected Agent: {result_execute['boss_decision']['selected_agent']}"
# )
# print(
# f"Reasoning: {result_execute['boss_decision']['reasoning']}"
# )
# if result_execute["execution"]["response"]:
# print(
# f"Response Preview: {result_execute['execution']['response'][:200]}..."
# )
# print(
# f"Execution Time: {result_execute['execution']['execution_time']:.2f}s"
# )
# print(f"Total Time: {result_execute['total_time']:.2f}s")
# # Process the task
# result = router.route_task(task)
# print(f"Selected Agent: {result['boss_decision']['selected_agent']}")
# print(f"Reasoning: {result['boss_decision']['reasoning']}")
# print(f"Total Time: {result['total_time']:.2f}s")
# except Exception as e:
# print(f"Error occurred: {str(e)}")

@ -1,19 +1,6 @@
from typing import Literal
# Literal of output types
OutputType = Literal[
"all",
"final",
"list",
"dict",
".json",
".md",
".txt",
".yaml",
".toml",
"string",
"str",
]
from swarms.utils.history_output_formatter import (
HistoryOutputType as OutputType,
)
# Use the OutputType for type annotations
output_type: OutputType

@ -36,3 +36,33 @@ def check_exit(s):
def check_end(s):
return "end" in s
def check_stopping_conditions(input: str) -> str:
"""
Checks a string against all stopping conditions and returns an appropriate message.
Args:
s (str): The input string to check
Returns:
str: A message indicating which stopping condition was met, or None if no condition was met
"""
conditions = [
(check_done, "Task is done"),
(check_finished, "Task is finished"),
(check_complete, "Task is complete"),
(check_success, "Task succeeded"),
(check_failure, "Task failed"),
(check_error, "Task encountered an error"),
(check_stopped, "Task was stopped"),
(check_cancelled, "Task was cancelled"),
(check_exit, "Task exited"),
(check_end, "Task ended"),
]
for check_func, message in conditions:
if check_func(input):
return message
return None

@ -1,395 +0,0 @@
import os
from typing import List, Optional
from loguru import logger
from pydantic import BaseModel, Field
from pydantic.v1 import validator
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
)
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter, SwarmType
from swarms.utils.function_caller_model import OpenAIFunctionCaller
BOSS_SYSTEM_PROMPT = """
Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective.
### Instructions:
1. **Task Assignment**:
- Analyze available worker agents when a task is presented.
- Delegate tasks to existing agents with clear, direct, and actionable instructions if an appropriate agent is available.
- If no suitable agent exists, create a new agent with a fitting system prompt to handle the task.
2. **Agent Creation**:
- Name agents according to the task they are intended to perform (e.g., "Twitter Marketing Agent").
- Provide each new agent with a concise and clear system prompt that includes its role, objectives, and any tools it can utilize.
3. **Efficiency**:
- Minimize redundancy and maximize task completion speed.
- Avoid unnecessary agent creation if an existing agent can fulfill the task.
4. **Communication**:
- Be explicit in task delegation instructions to avoid ambiguity and ensure effective task execution.
- Require agents to report back on task completion or encountered issues.
5. **Reasoning and Decisions**:
- Offer brief reasoning when selecting or creating agents to maintain transparency.
- Avoid using an agent if unnecessary, with a clear explanation if no agents are suitable for a task.
# Output Format
Present your plan in clear, bullet-point format or short concise paragraphs, outlining task assignment, agent creation, efficiency strategies, and communication protocols.
# Notes
- Preserve transparency by always providing reasoning for task-agent assignments and creation.
- Ensure instructions to agents are unambiguous to minimize error.
"""
class AgentConfig(BaseModel):
"""Configuration for an individual agent in a swarm"""
name: str = Field(
description="The name of the agent",
)
description: str = Field(
description="A description of the agent's purpose and capabilities",
)
system_prompt: str = Field(
description="The system prompt that defines the agent's behavior",
)
class SwarmConfig(BaseModel):
"""Configuration for a swarm of cooperative agents"""
name: str = Field(
description="The name of the swarm",
example="Research-Writing-Swarm",
)
description: str = Field(
description="The description of the swarm's purpose and capabilities",
example="A swarm of agents that work together to research topics and write articles",
)
agents: List[AgentConfig] = Field(
description="The list of agents that make up the swarm",
)
max_loops: int = Field(
description="The maximum number of loops for the swarm to iterate on",
)
@validator("agents")
def validate_agents(cls, v):
if not v:
raise ValueError("Swarm must have at least one agent")
return v
class AutoSwarmBuilderOutput(BaseModel):
"""A class that automatically builds and manages swarms of AI agents with enhanced error handling."""
name: Optional[str] = Field(
description="The name of the swarm",
example="DefaultSwarm",
default=None,
)
description: Optional[str] = Field(
description="The description of the swarm's purpose and capabilities",
example="Generic AI Agent Swarm",
default=None,
)
verbose: Optional[bool] = Field(
description="Whether to display verbose output",
default=None,
)
model_name: Optional[str] = Field(
description="The name of the OpenAI model to use",
default=None,
)
boss_output_schema: Optional[list] = Field(
description="The schema for the output of the BOSS system prompt",
default=None,
)
director_agents_created: Optional[SwarmConfig] = Field(
description="The agents created by the director",
default=None,
)
swarm_router_outputs: Optional[list] = Field(
description="The outputs from the swarm router",
default=None,
)
max_loops: Optional[int] = Field(
description="The maximum number of loops for the swarm to iterate on",
default=None,
)
swarm_type: Optional[SwarmType] = Field(
description="The type of swarm to build",
default=None,
)
class AutoSwarmBuilder:
"""A class that automatically builds and manages swarms of AI agents with enhanced error handling."""
def __init__(
self,
name: Optional[str] = "autonomous-swarm-builder",
description: Optional[
str
] = "Given a task, this swarm will automatically create specialized agents and route it to the appropriate agents.",
verbose: bool = True,
model_name: str = "gpt-4o",
boss_output_schema: list = None,
swarm_router_outputs: AutoSwarmBuilderOutput = None,
max_loops: int = 1,
swarm_type: str = "SequentialWorkflow",
auto_generate_prompts_for_agents: bool = False,
shared_memory_system: callable = None,
):
self.name = name or "DefaultSwarm"
self.description = description or "Generic AI Agent Swarm"
self.verbose = verbose
self.agents_pool = []
self.api_key = os.getenv("OPENAI_API_KEY")
self.model_name = model_name
self.boss_output_schema = boss_output_schema
self.max_loops = max_loops
self.swarm_type = swarm_type
self.auto_generate_prompts_for_agents = (
auto_generate_prompts_for_agents
)
self.shared_memory_system = shared_memory_system
self.auto_swarm_builder_output = AutoSwarmBuilderOutput(
name=name,
description=description,
verbose=verbose,
model_name=model_name,
boss_output_schema=boss_output_schema or [],
swarm_router_outputs=swarm_router_outputs or [],
max_loops=max_loops,
swarm_type=swarm_type,
)
logger.info(
"Initialized AutoSwarmBuilder",
extra={
"swarm_name": self.name,
"description": self.description,
"model": self.model_name,
},
)
def run(
self,
task: str,
image_url: Optional[str] = None,
*args,
**kwargs,
):
"""Run the swarm on a given task with error handling and retries."""
if not task or not task.strip():
raise ValueError("Task cannot be empty")
logger.info("Starting swarm execution", extra={"task": task})
try:
# Create agents for the task
agents = self._create_agents(task)
if not agents:
raise ValueError(
"No agents were created for the task"
)
# Execute the task through the swarm router
logger.info(
"Routing task through swarm",
extra={"num_agents": len(agents)},
)
output = self.swarm_router(
agents=agents,
task=task,
image_url=image_url,
*args,
**kwargs,
)
self.auto_swarm_builder_output.swarm_router_outputs.append(
output
)
print(output)
logger.info("Swarm execution completed successfully")
# return output
return self.auto_swarm_builder_output.model_dump_json(
indent=4
)
except Exception as e:
logger.error(
f"Error during swarm execution: {str(e)}",
)
raise e
def _create_agents(
self,
task: str,
) -> List[Agent]:
"""Create the necessary agents for a task with enhanced error handling."""
logger.info("Creating agents for task", extra={"task": task})
try:
model = OpenAIFunctionCaller(
system_prompt=BOSS_SYSTEM_PROMPT,
api_key=self.api_key,
temperature=0.1,
base_model=SwarmConfig,
)
agents_config = model.run(task)
logger.info(
f"Director has successfully created agents: {agents_config}"
)
self.auto_swarm_builder_output.director_agents_created = (
agents_config
)
if isinstance(agents_config, dict):
agents_config = SwarmConfig(**agents_config)
# Update swarm configuration
self.name = agents_config.name
self.description = agents_config.description
# Create agents from configuration
agents = []
for agent_config in agents_config.agents:
if isinstance(agent_config, dict):
agent_config = AgentConfig(**agent_config)
agent = self.build_agent(
agent_name=agent_config.name,
agent_description=agent_config.description,
agent_system_prompt=agent_config.system_prompt,
)
agents.append(agent)
print(
f"Agent created: {agent_config.name}: Description: {agent_config.description}"
)
# # Add available agents showcase to system prompts
# agents_available = showcase_available_agents(
# name=self.name,
# description=self.description,
# agents=agents,
# )
# for agent in agents:
# agent.system_prompt += "\n" + agents_available
logger.info(
"Successfully created agents",
extra={"num_agents": len(agents)},
)
return agents
except Exception as e:
logger.error(
f"Error creating agents: {str(e)}", exc_info=True
)
raise
def build_agent(
self,
agent_name: str,
agent_description: str,
agent_system_prompt: str,
*args,
**kwargs,
) -> Agent:
"""Build a single agent with enhanced error handling."""
logger.info(
"Building agent", extra={"agent_name": agent_name}
)
try:
agent = Agent(
agent_name=agent_name,
description=agent_description,
system_prompt=agent_system_prompt,
model_name="gpt-4o",
verbose=self.verbose,
dynamic_temperature_enabled=False,
return_step_meta=False,
output_type="str",
streaming_on=True,
)
return agent
except Exception as e:
logger.error(
f"Error building agent: {str(e)}", exc_info=True
)
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def swarm_router(
self,
agents: List[Agent],
task: str,
img: Optional[str] = None,
*args,
**kwargs,
) -> str:
"""Route tasks between agents in the swarm with error handling and retries."""
logger.info(
"Initializing swarm router",
extra={"num_agents": len(agents)},
)
try:
swarm_router_instance = SwarmRouter(
name=self.name,
description=self.description,
agents=agents,
swarm_type=self.swarm_type,
auto_generate_prompts=self.auto_generate_prompts_for_agents,
)
# formatted_task = f"{self.name} {self.description} {task}"
result = swarm_router_instance.run(
task=task, *args, **kwargs
)
logger.info("Successfully completed swarm routing")
return result
except Exception as e:
logger.error(
f"Error in swarm router: {str(e)}", exc_info=True
)
raise
# swarm = AutoSwarmBuilder(
# name="ChipDesign-Swarm",
# description="A swarm of specialized AI agents for chip design",
# swarm_type="ConcurrentWorkflow",
# )
# try:
# result = swarm.run(
# "Design a new AI accelerator chip optimized for transformer model inference..."
# )
# print(result)
# except Exception as e:
# print(f"An error occurred: {e}")

@ -5,7 +5,9 @@ from typing import Any, Callable, Dict, List, Literal, Optional, Union
from pydantic import BaseModel, Field
from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.prompts.multi_agent_collab_prompt import (
MULTI_AGENT_COLLAB_PROMPT_TWO,
)
from swarms.structs.agent import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.csv_to_agent import AgentLoader
@ -66,6 +68,38 @@ class SwarmLog(BaseModel):
documents: List[Document] = []
class SwarmRouterConfig(BaseModel):
"""Configuration model for SwarmRouter."""
name: str = Field(
description="Name identifier for the SwarmRouter instance",
)
description: str = Field(
description="Description of the SwarmRouter's purpose",
)
# max_loops: int = Field(
# description="Maximum number of execution loops"
# )
swarm_type: SwarmType = Field(
description="Type of swarm to use",
)
rearrange_flow: Optional[str] = Field(
description="Flow configuration string"
)
rules: Optional[str] = Field(
description="Rules to inject into every agent"
)
multi_agent_collab_prompt: bool = Field(
description="Whether to enable multi-agent collaboration prompts",
)
task: str = Field(
description="The task to be executed by the swarm",
)
class Config:
arbitrary_types_allowed = True
class SwarmRouter:
"""
A class that dynamically routes tasks to different swarm types based on user selection or automatic matching.
@ -157,6 +191,7 @@ class SwarmRouter:
load_agents_from_csv: bool = False,
csv_file_path: str = None,
return_entire_history: bool = True,
multi_agent_collab_prompt: bool = True,
*args,
**kwargs,
):
@ -179,14 +214,18 @@ class SwarmRouter:
self.load_agents_from_csv = load_agents_from_csv
self.csv_file_path = csv_file_path
self.return_entire_history = return_entire_history
self.multi_agent_collab_prompt = multi_agent_collab_prompt
# Reliability check
self.reliability_check()
# Load agents from CSV
if self.load_agents_from_csv:
self.agents = AgentLoader(
csv_path=self.csv_file_path
).load_agents()
self.reliability_check()
# Log initialization
self._log(
"info",
f"SwarmRouter initialized with swarm type: {swarm_type}",
@ -345,7 +384,6 @@ class SwarmRouter:
name=self.name,
description=self.description,
agents=self.agents,
aggregator_system_prompt=aggregator_system_prompt.get_prompt(),
aggregator_agent=self.agents[-1],
layers=self.max_loops,
output_type=self.output_type,
@ -421,6 +459,17 @@ class SwarmRouter:
f"Invalid swarm type: {self.swarm_type} try again with a valid swarm type such as 'SequentialWorkflow' or 'ConcurrentWorkflow' or 'auto' or 'AgentRearrange' or 'MixtureOfAgents' or 'SpreadSheetSwarm'"
)
def update_system_prompt_for_agent_in_swarm(self):
# Use list comprehension for faster iteration
[
setattr(
agent,
"system_prompt",
agent.system_prompt + MULTI_AGENT_COLLAB_PROMPT_TWO,
)
for agent in self.agents
]
def _log(
self,
level: str,
@ -464,6 +513,9 @@ class SwarmRouter:
"""
self.swarm = self._create_swarm(task, *args, **kwargs)
if self.multi_agent_collab_prompt is True:
self.update_system_prompt_for_agent_in_swarm()
try:
logger.info(
f"Running task on {self.swarm_type} swarm with task: {task}"

File diff suppressed because it is too large Load Diff

@ -3,6 +3,7 @@ import json
from typing import List, Literal, Dict, Any, Union
from fastmcp import Client
from swarms.utils.str_to_dict import str_to_dict
from loguru import logger
def parse_agent_output(
@ -186,13 +187,21 @@ def execute_mcp_tool(
ValueError: If tool execution fails.
"""
try:
return asyncio.run(
logger.info(f"Executing MCP tool with URL: {url}")
logger.debug(f"Tool parameters: {parameters}")
result = asyncio.run(
_execute_mcp_tool(
url=url,
parameters=parameters,
)
)
logger.info("MCP tool execution completed successfully")
logger.debug(f"Tool execution result: {result}")
return result
except Exception as e:
logger.error(f"Error in execute_mcp_tool: {str(e)}")
raise ValueError(f"Error in execute_mcp_tool: {str(e)}")

@ -0,0 +1,64 @@
import secrets
import string
import re
def generate_api_key(prefix: str = "sk-", length: int = 32) -> str:
"""
Generate a secure API key with a custom prefix.
Args:
prefix (str): The prefix to add to the API key (default: "sk-")
length (int): The length of the random part of the key (default: 32)
Returns:
str: A secure API key in the format: prefix + random_string
Example:
>>> generate_api_key("sk-")
'sk-abc123...'
"""
# Validate prefix
if not isinstance(prefix, str):
raise ValueError("Prefix must be a string")
# Validate length
if length < 8:
raise ValueError("Length must be at least 8 characters")
# Generate random string using alphanumeric characters
alphabet = string.ascii_letters + string.digits
random_part = "".join(
secrets.choice(alphabet) for _ in range(length)
)
# Combine prefix and random part
api_key = f"{prefix}{random_part}"
return api_key
def validate_api_key(api_key: str, prefix: str = "sk-") -> bool:
"""
Validate if an API key matches the expected format.
Args:
api_key (str): The API key to validate
prefix (str): The expected prefix (default: "sk-")
Returns:
bool: True if the API key is valid, False otherwise
"""
if not isinstance(api_key, str):
return False
# Check if key starts with prefix
if not api_key.startswith(prefix):
return False
# Check if the rest of the key contains only alphanumeric characters
random_part = api_key[len(prefix) :]
if not re.match(r"^[a-zA-Z0-9]+$", random_part):
return False
return True
Loading…
Cancel
Save