[DOCS CLEANUP]

pull/595/head
Your Name 3 months ago
parent edc293cb6f
commit 6be667bba4

@ -157,6 +157,7 @@ nav:
- Why MultiAgent Collaboration is Necessary: "swarms/concept/why.md"
- Swarm Architectures: "swarms/concept/swarm_architectures.md"
- Choosing the right Swarm Architecture: "swarms/concept/how_to_choose_swarms.md"
- Building Custom Swarms: "swarms/structs/custom_swarm.md"
- Architectures Available:
- MajorityVoting: "swarms/structs/majorityvoting.md"
- AgentRearrange: "swarms/structs/agent_rearrange.md"
@ -170,8 +171,8 @@ nav:
- Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
- SequentialWorkflow: "swarms/structs/sequential_workflow.md"
# - Structs:
# - Conversation: "swarms/structs/conversation.md"
- Structs:
- Conversation: "swarms/structs/conversation.md"
# - Task: "swarms/structs/task.md"
- Full API Reference: "swarms/framework/reference.md"
- Contributing:
@ -193,12 +194,12 @@ nav:
- BaseMultiModalModel: "swarms/models/base_multimodal_model.md"
- Multi Modal Models Available: "swarms/models/multimodal_models.md"
- GPT4VisionAPI: "swarms/models/gpt4v.md"
- Swarms Cloud API:
- Overview: "swarms_cloud/main.md"
- Available Models: "swarms_cloud/available_models.md"
- Agent API: "swarms_cloud/agent_api.md"
- Migrate from OpenAI to Swarms in 3 lines of code: "swarms_cloud/migrate_openai.md"
- Getting Started with SOTA Vision Language Models VLM: "swarms_cloud/getting_started.md"
# - Swarms Cloud API:
# - Overview: "swarms_cloud/main.md"
# - Available Models: "swarms_cloud/available_models.md"
# - Agent API: "swarms_cloud/agent_api.md"
# - Migrate from OpenAI to Swarms in 3 lines of code: "swarms_cloud/migrate_openai.md"
# - Getting Started with SOTA Vision Language Models VLM: "swarms_cloud/getting_started.md"
- Swarms Memory:
- Overview: "swarms_memory/index.md"
- Memory Systems:

@ -0,0 +1,249 @@
### Title: Building Custom Swarms with Multiple Agents: A Comprehensive Guide for Swarm Engineers
#### Introduction
As artificial intelligence and machine learning continue to grow in complexity and applicability, building systems that can harness multiple agents to solve complex tasks becomes more critical. Swarm engineering enables AI agents to collaborate and solve problems autonomously in diverse fields such as finance, marketing, operations, and even creative industries. In this guide, we'll focus on how to build a custom swarm system that integrates multiple agents into a cohesive system capable of solving tasks collaboratively.
The swarm we'll design will leverage Python, use types for better code structure, and feature logging with the powerful **loguru** logging library. We'll break down how to define and initialize swarms, make them scalable, and create methods like `run(task: str)` to trigger their execution.
By the end of this article, you will have a complete understanding of:
- What swarms are and how they can be built.
- How to intake multiple agents using a flexible class.
- How to run tasks across agents and capture their outputs.
- Best practices for error handling, logging, and optimization.
---
### 1. Understanding the Concept of a Swarm
A **swarm** refers to a collection of agents that collaborate to solve a problem. Each agent in the swarm performs part of the task, either independently or by communicating with other agents. Swarms are ideal for:
- **Scalability**: You can add or remove agents dynamically based on the task's complexity.
- **Flexibility**: Each agent can be designed to specialize in different parts of the problem, offering modularity.
- **Autonomy**: Agents in a swarm can operate autonomously, reducing the need for constant supervision.
We'll be using Python as the primary programming language and will structure the swarm class using clean, reusable code principles.
---
### 2. Designing the Swarm Class: Intake Multiple Agents
We'll begin by creating a base class for our swarm. This class will intake multiple agents and define a `run` method, which is the core method for executing tasks across the swarm. Each agent is defined by its specific behavior or "intelligence" to complete part of the task.
#### 2.1 Importing the Required Libraries and Dependencies
We'll rely on the **loguru** logging library, Pydantic for metadata handling, and standard Python typing.
```python
from typing import List, Union
from loguru import logger
from swarms.structs.base_swarm import BaseSwarm
class SwarmExecutionError(Exception):
"""Custom exception for handling swarm execution errors."""
pass
```
#### 2.2 Defining the Swarm Class
The class `CustomSwarm` will take in a list of agents. The agents will be instances of `BaseSwarm` (or callable functions). The `run(task: str)` method will delegate tasks to each agent in the swarm and handle any errors or retries.
```python
class CustomSwarm:
def __init__(self, agents: List[BaseSwarm]):
"""
Initializes the CustomSwarm with a list of agents.
Args:
agents (List[BaseSwarm]): A list of agent objects that inherit from BaseSwarm.
"""
self.agents = agents
self.validate_agents()
def validate_agents(self):
"""Validates that each agent has a 'run' method."""
for agent in self.agents:
if not hasattr(agent, 'run'):
raise AttributeError(f"Agent {agent} does not have a 'run' method.")
logger.info(f"Agent {agent} validated successfully.")
def run(self, task: str):
"""
Runs the task across all agents in the swarm.
Args:
task (str): The task to pass to each agent.
"""
logger.info(f"Running task '{task}' across all agents in the swarm.")
for agent in self.agents:
try:
agent.run(task)
logger.info(f"Agent {agent} successfully completed the task.")
except Exception as e:
logger.error(f"Agent {agent} failed to run task: {e}")
raise SwarmExecutionError(f"Execution failed for {agent}. Task: {task}")
```
### 3. Adding Logging and Error Handling with `loguru`
Logging is crucial for production-grade systems, especially when managing complex tasks that involve multiple agents. **Loguru** is a simple and efficient logging library that allows us to log everything from information messages to errors.
```python
from loguru import logger
class CustomSwarm:
def __init__(self, agents: List[BaseSwarm]):
self.agents = agents
logger.info("CustomSwarm initialized with agents.")
self.validate_agents()
def run(self, task: str):
logger.info(f"Task received: {task}")
for agent in self.agents:
try:
agent.run(task)
logger.success(f"Agent {agent} completed task successfully.")
except Exception as e:
logger.error(f"Error while running task '{task}' for {agent}: {e}")
raise SwarmExecutionError(f"Execution failed for {agent}")
```
### 4. Running Tasks Across Multiple Agents
The `run(task: str)` method will handle distributing the task to each agent in the swarm. Each agents `run` method is expected to take a task as input and perform its specific logic. We can add further customization by allowing each agent to return output, which can be collected for later analysis.
#### 4.1 Example of Integrating Agents
Let's take a look at how we can define agents using the `BaseSwarm` class and integrate them into the swarm.
```python
class FinancialAgent(BaseSwarm):
def run(self, task: str):
logger.info(f"FinancialAgent processing task: {task}")
# Custom logic for financial analysis
return f"FinancialAgent response to task: {task}"
class MarketingAgent(BaseSwarm):
def run(self, task: str):
logger.info(f"MarketingAgent processing task: {task}")
# Custom logic for marketing analysis
return f"MarketingAgent response to task: {task}"
```
Now, we initialize the swarm with these agents:
```python
if __name__ == "__main__":
agents = [FinancialAgent(), MarketingAgent()]
swarm = CustomSwarm(agents)
swarm.run("Analyze Q3 financial report and marketing impact.")
```
### 5. Enhancing the Swarm with Concurrent Execution
When dealing with large or time-consuming tasks, running agents concurrently (in parallel) can significantly improve performance. We can achieve this by utilizing Pythons **concurrent.futures** or **threading** libraries.
#### 5.1 Running Swarms Concurrently
```python
from concurrent.futures import ThreadPoolExecutor, as_completed
class CustomSwarm:
def __init__(self, agents: List[BaseSwarm], max_workers: int = 4):
self.agents = agents
self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
logger.info("CustomSwarm initialized with concurrent execution.")
def run(self, task: str):
futures = []
for agent in self.agents:
futures.append(self.thread_pool.submit(agent.run, task))
for future in as_completed(futures):
result = future.result()
logger.info(f"Agent result: {result}")
```
### 6. Advanced Error Handling and Retries
In a production system, agents might fail due to a wide range of reasons (network errors, API rate limits, etc.). To ensure resilience, we can add retry mechanisms and even fallback agents that attempt to recover the failed task.
```python
class CustomSwarm:
def run_with_retries(self, task: str, retries: int = 3):
"""
Runs the task across all agents with retry logic.
Args:
task (str): The task to run.
retries (int): Number of retries allowed for failed agents.
"""
for agent in self.agents:
attempt = 0
while attempt <= retries:
try:
agent.run(task)
logger.success(f"Agent {agent} completed task.")
break
except Exception as e:
logger.error(f"Agent {agent} failed on attempt {attempt + 1}. Error: {e}")
attempt += 1
if attempt > retries:
logger.error(f"Agent {agent} exhausted retries. Task failed.")
```
### 7. Adding Documentation with Docstrings
Clear and concise documentation is critical, especially for engineers maintaining and scaling the system. Using Pythons docstrings, we can document each class and method, describing what they do and their expected inputs/outputs.
```python
class CustomSwarm:
"""
A class to manage and execute tasks using a swarm of agents.
Attributes:
agents (List[BaseSwarm]): A list of agent instances.
Methods:
run(task: str): Runs a task across all agents in the swarm.
validate_agents(): Validates that each agent has a run method.
run_with_retries(task: str, retries: int): Runs the task with retry logic.
"""
def __init__(self, agents: List[BaseSwarm]):
"""
Initializes the CustomSwarm with a list of agents.
Args:
agents (List[BaseSwarm]): A list of agent objects that inherit from BaseSwarm.
"""
self.agents = agents
def run(self, task: str):
"""
Runs the task across all agents in the swarm.
Args:
task (str): The task to pass to each agent.
"""
pass
def validate_agents(self):
"""Validates that each agent has a 'run' method."""
pass
```
`
### Conclusion
Building custom swarms that intake multiple agents can drastically improve the scalability, efficiency, and flexibility of AI-driven systems. By designing a robust swarm class that manages agents, distributes tasks, and ensures error resilience, you can handle complex, multi-agent workloads efficiently.
In this blog, we've covered:
- Designing a basic swarm class.
- Running tasks across multiple agents.
- Leveraging logging, error handling, retries, and concurrency.
- Documenting your class for future-proofing.
This approach sets the foundation for building more advanced and domain-specific swarms in areas like finance, marketing, operations, and beyond. Swarm engineers can now explore more complex, multi-agent systems and push the boundaries of AI collaboration.
Stay tuned for future updates on more advanced swarm functionalities!

@ -7,6 +7,7 @@ llm = Anthropic(
temperature=0.1,
)
# Tools
def terminal(
code: str,

@ -451,7 +451,6 @@
"import os\n",
"from typing import List\n",
"\n",
"from swarms import tool\n",
"\n",
"os.environ['TAVILY_API_KEY'] = os.getenv('TAVILY_API_KEY')\n",
"os.environ[\"KAY_API_KEY\"] = os.getenv('KAY_API_KEY')\n",
@ -1364,7 +1363,7 @@
"metadata": {},
"outputs": [],
"source": [
"from swarms import Agent, OpenAIChat, tool\n",
"from swarms import Agent, OpenAIChat\n",
"\n",
"agent = Agent(\n",
" agent_name=\"Writer Agent\",\n",

@ -0,0 +1,393 @@
from typing import List, Callable, Union, Optional
from loguru import logger
from swarms.structs.base_swarm import BaseSwarm
from queue import PriorityQueue
from concurrent.futures import (
ThreadPoolExecutor,
as_completed,
)
import time
from pydantic import BaseModel, Field
class SwarmRunData(BaseModel):
"""
Pydantic model to capture metadata about each swarm's execution.
"""
swarm_name: str
task: str
priority: int
start_time: Optional[float] = None
end_time: Optional[float] = None
duration: Optional[float] = None
status: str = "Pending"
retries: int = 0
result: Optional[str] = None
exception: Optional[str] = None
class FederatedSwarmModel(BaseModel):
"""
Pydantic base model to capture and log data for the FederatedSwarm system.
"""
task: str
swarms_data: List[SwarmRunData] = Field(default_factory=list)
def add_swarm(self, swarm_name: str, task: str, priority: int):
swarm_data = SwarmRunData(
swarm_name=swarm_name, task=task, priority=priority
)
self.swarms_data.append(swarm_data)
def update_swarm_status(
self,
swarm_name: str,
status: str,
start_time: float = None,
end_time: float = None,
retries: int = 0,
result: str = None,
exception: str = None,
):
for swarm in self.swarms_data:
if swarm.name == swarm_name:
swarm.status = status
if start_time:
swarm.start_time = start_time
if end_time:
swarm.end_time = end_time
swarm.duration = end_time - swarm.start_time
swarm.retries = retries
swarm.result = result
swarm.exception = exception
break
class FederatedSwarm:
def __init__(
self,
swarms: List[Union[BaseSwarm, Callable]],
max_workers: int = 4,
):
"""
Initializes the FederatedSwarm with a list of swarms or callable objects and
sets up a priority queue and thread pool for concurrency.
Args:
swarms (List[Union[BaseSwarm, Callable]]): A list of swarms (BaseSwarm) or callable objects.
max_workers (int): The maximum number of concurrent workers (threads) to run swarms in parallel.
"""
self.swarms = PriorityQueue()
self.max_workers = max_workers
self.thread_pool = ThreadPoolExecutor(
max_workers=self.max_workers
)
self.task_queue = []
self.future_to_swarm = {}
self.results = {}
self.validate_swarms(swarms)
def init_metadata(self, task: str):
"""
Initializes the Pydantic base model to capture metadata about the current task and swarms.
"""
self.metadata = FederatedSwarmModel(task=task)
for priority, swarm in list(self.swarms.queue):
swarm_name = (
swarm.__class__.__name__
if hasattr(swarm, "__class__")
else str(swarm)
)
self.metadata.add_swarm(
swarm_name=swarm_name, task=task, priority=priority
)
logger.info(f"Metadata initialized for task '{task}'.")
def validate_swarms(
self, swarms: List[Union[BaseSwarm, Callable]]
):
"""
Validates and adds swarms to the priority queue, ensuring each swarm has a `run(task)` method.
Args:
swarms (List[Union[BaseSwarm, Callable]]): List of swarms with an optional priority value.
"""
for swarm, priority in swarms:
if not callable(swarm):
raise TypeError(f"{swarm} is not callable.")
if hasattr(swarm, "run"):
logger.info(f"{swarm} has a 'run' method.")
else:
raise AttributeError(
f"{swarm} does not have a 'run(task)' method."
)
self.swarms.put((priority, swarm))
logger.info(
f"Swarm {swarm} added with priority {priority}."
)
def run_parallel(
self,
task: str,
timeout: Optional[float] = None,
retries: int = 0,
):
"""
Runs all swarms in parallel with prioritization and optional timeout.
Args:
task (str): The task to be passed to the `run` method of each swarm.
timeout (Optional[float]): Maximum time allowed for each swarm to run.
retries (int): Number of retries allowed for failed swarms.
"""
logger.info(
f"Running task '{task}' in parallel with timeout: {timeout}, retries: {retries}"
)
self.init_metadata(task)
while not self.swarms.empty():
priority, swarm = self.swarms.get()
swarm_name = (
swarm.__class__.__name__
if hasattr(swarm, "__class__")
else str(swarm)
)
future = self.thread_pool.submit(
self._run_with_retry,
swarm,
task,
retries,
timeout,
swarm_name,
)
self.future_to_swarm[future] = swarm
for future in as_completed(self.future_to_swarm):
swarm = self.future_to_swarm[future]
try:
result = future.result()
swarm_name = (
swarm.__class__.__name__
if hasattr(swarm, "__class__")
else str(swarm)
)
self.metadata.update_swarm_status(
swarm_name=swarm_name,
status="Completed",
result=result,
)
logger.info(
f"Swarm {swarm_name} completed successfully."
)
except Exception as e:
swarm_name = (
swarm.__class__.__name__
if hasattr(swarm, "__class__")
else str(swarm)
)
self.metadata.update_swarm_status(
swarm_name=swarm_name,
status="Failed",
exception=str(e),
)
logger.error(f"Swarm {swarm_name} failed: {e}")
self.results[swarm] = "Failed"
def run_sequentially(
self,
task: str,
retries: int = 0,
timeout: Optional[float] = None,
):
"""
Runs all swarms sequentially in order of priority.
Args:
task (str): The task to pass to the `run` method of each swarm.
retries (int): Number of retries for failed swarms.
timeout (Optional[float]): Optional time limit for each swarm.
"""
logger.info(f"Running task '{task}' sequentially.")
while not self.swarms.empty():
priority, swarm = self.swarms.get()
try:
logger.info(
f"Running swarm {swarm} with priority {priority}."
)
self._run_with_retry(swarm, task, retries, timeout)
logger.info(f"Swarm {swarm} completed successfully.")
except Exception as e:
logger.error(f"Swarm {swarm} failed with error: {e}")
def _run_with_retry(
self,
swarm: Union[BaseSwarm, Callable],
task: str,
retries: int,
timeout: Optional[float],
swarm_name: str,
):
"""
Helper function to run a swarm with a retry mechanism and optional timeout.
Args:
swarm (Union[BaseSwarm, Callable]): The swarm to run.
task (str): The task to pass to the swarm.
retries (int): The number of retries allowed for the swarm in case of failure.
timeout (Optional[float]): Maximum time allowed for the swarm to run.
swarm_name (str): Name of the swarm (used for metadata).
"""
attempts = 0
start_time = time.time()
while attempts <= retries:
try:
logger.info(
f"Running swarm {swarm}. Attempt: {attempts + 1}"
)
self.metadata.update_swarm_status(
swarm_name=swarm_name,
status="Running",
start_time=start_time,
)
if hasattr(swarm, "run"):
if timeout:
start_time = time.time()
swarm.run(task)
duration = time.time() - start_time
if duration > timeout:
raise TimeoutError(
f"Swarm {swarm} timed out after {duration:.2f}s."
)
else:
swarm.run(task)
else:
swarm(task)
end_time = time.time()
self.metadata.update_swarm_status(
swarm_name=swarm_name,
status="Completed",
end_time=end_time,
retries=attempts,
)
return "Success"
except Exception as e:
logger.error(f"Swarm {swarm} failed: {e}")
attempts += 1
if attempts > retries:
end_time = time.time()
self.metadata.update_swarm_status(
swarm_name=swarm_name,
status="Failed",
end_time=end_time,
retries=attempts,
exception=str(e),
)
logger.error(f"Swarm {swarm} exhausted retries.")
raise
def add_swarm(
self, swarm: Union[BaseSwarm, Callable], priority: int
):
"""
Adds a new swarm to the FederatedSwarm at runtime.
Args:
swarm (Union[BaseSwarm, Callable]): The swarm to add.
priority (int): The priority level for the swarm.
"""
self.swarms.put((priority, swarm))
logger.info(
f"Swarm {swarm} added dynamically with priority {priority}."
)
def queue_task(self, task: str):
"""
Adds a task to the internal task queue for batch processing.
Args:
task (str): The task to queue.
"""
self.task_queue.append(task)
logger.info(f"Task '{task}' added to the queue.")
def process_task_queue(self):
"""
Processes all tasks in the task queue.
"""
for task in self.task_queue:
logger.info(f"Processing task: {task}")
self.run_parallel(task)
self.task_queue = []
def log_swarm_results(self):
"""
Logs the results of all swarms after execution.
"""
logger.info("Logging swarm results...")
for swarm, result in self.results.items():
logger.info(f"Swarm {swarm}: {result}")
def get_swarm_status(self) -> dict:
"""
Retrieves the status of each swarm (completed, running, failed).
Returns:
dict: Dictionary containing swarm statuses.
"""
status = {}
for future, swarm in self.future_to_swarm.items():
if future.done():
status[swarm] = "Completed"
elif future.running():
status[swarm] = "Running"
else:
status[swarm] = "Failed"
return status
def cancel_running_swarms(self):
"""
Cancels all currently running swarms by shutting down the thread pool.
"""
logger.warning("Cancelling all running swarms...")
self.thread_pool.shutdown(wait=False)
logger.info("All running swarms cancelled.")
# Example Usage:
# class ExampleSwarm(BaseSwarm):
# def run(self, task: str):
# logger.info(f"ExampleSwarm is processing task: {task}")
# def example_callable(task: str):
# logger.info(f"Callable is processing task: {task}")
# if __name__ == "__main__":
# swarms = [(ExampleSwarm(), 1), (example_callable, 2)]
# federated_swarm = FederatedSwarm(swarms)
# # Run in parallel
# federated_swarm.run_parallel(
# "Process data", timeout=10, retries=3
# )
# # Run sequentially
# federated_swarm.run_sequentially("Process data sequentially")
# # Log results
# federated_swarm.log_swarm_results()
# # Get status of swarms
# status = federated_swarm.get_swarm_status()
# logger.info(f"Swarm statuses: {status}")
# # Cancel running swarms (if needed)
# # federated_swarm.cancel_running_swarms()
Loading…
Cancel
Save