diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 0317bebf..51c3d1d4 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -189,17 +189,15 @@ nav: - Reasoning Agent Router: "swarms/agents/reasoning_agent_router.md" - Multi-Agent Architectures: - - Introduction to Multi-Agent Collaboration: "swarms/concept/why.md" - - - Concepts: - - Introduction to Multi Agent Architectures: "swarms/concept/swarm_architectures.md" - - How to Choose the Right Multi Agent Architecture: "swarms/concept/how_to_choose_swarms.md" - - How to Build Custom Swarms: "swarms/structs/custom_swarm.md" - - How to Create New Multi Agent Architectures: "swarms/structs/create_new_swarm.md" - - Introduction to Hiearchical Multi Agent Architectures: "swarms/structs/multi_swarm_orchestration.md" - - - Multi-Agent Architectures Documentation: + - Overview: "swarms/concept/swarm_architectures.md" + - Benefits: "swarms/concept/why.md" + - Choosing Multi Agent Architecture: "swarms/concept/how_to_choose_swarms.md" + + - Documentation: - Overview: "swarms/structs/overview.md" + - Custom Multi Agent Architectures: "swarms/structs/custom_swarm.md" + + - MajorityVoting: "swarms/structs/majorityvoting.md" - RoundRobin: "swarms/structs/round_robin_swarm.md" - Mixture of Agents: "swarms/structs/moa.md" @@ -213,6 +211,7 @@ nav: - Hiearchical Architectures: + - Overview: "swarms/structs/multi_swarm_orchestration.md" - HierarchicalSwarm: "swarms/structs/hierarchical_swarm.md" - Auto Agent Builder: "swarms/structs/auto_agent_builder.md" - Hybrid Hierarchical-Cluster Swarm: "swarms/structs/hhcs.md" @@ -222,9 +221,6 @@ nav: - Multi-Agent Multi-Modal Structures: - ImageAgentBatchProcessor: "swarms/structs/image_batch_agent.md" - - Storage: - - AgentRegistry: "swarms/structs/agent_registry.md" - - Routers: - SwarmRouter: "swarms/structs/swarm_router.md" - MultiAgentRouter: "swarms/structs/multi_agent_router.md" @@ -243,6 +239,9 @@ nav: - SequentialWorkflow: "swarms/structs/sequential_workflow.md" - GraphWorkflow: "swarms/structs/graph_workflow.md" + - Storage: + - AgentRegistry: "swarms/structs/agent_registry.md" + - Communication Structure: "swarms/structs/conversation.md" - Tools: @@ -256,17 +255,17 @@ nav: - Social Media: - Twitter: "swarms_tools/twitter.md" - - Memory: - - Overview: "swarms_memory/index.md" - - Memory Systems: - - ChromaDB: "swarms_memory/chromadb.md" - - Pinecone: "swarms_memory/pinecone.md" - - Faiss: "swarms_memory/faiss.md" + # - Memory: + # - Overview: "swarms_memory/index.md" + # - Memory Systems: + # - ChromaDB: "swarms_memory/chromadb.md" + # - Pinecone: "swarms_memory/pinecone.md" + # - Faiss: "swarms_memory/faiss.md" - Deployment Solutions: - - Deploy your agents on Google Cloud Run: "swarms_cloud/cloud_run.md" - - Deploy your agents on Phala: "swarms_cloud/phala_deploy.md" - # - Deploy your agents on FastAPI: + - Deploy on Google Cloud Run: "swarms_cloud/cloud_run.md" + - Deploy on Phala: "swarms_cloud/phala_deploy.md" + # - Deploy on FastAPI: "swarms_cloud/fastapi_deploy.md" - More About Us: - Swarms Ecosystem: "swarms/ecosystem.md" diff --git a/docs/swarms/structs/custom_swarm.md b/docs/swarms/structs/custom_swarm.md index 531e2e2d..467e713e 100644 --- a/docs/swarms/structs/custom_swarm.md +++ b/docs/swarms/structs/custom_swarm.md @@ -1,259 +1,801 @@ -### Title: Building Custom Swarms with Multiple Agents: A Comprehensive Guide for Swarm Engineers +# Building Custom Swarms: A Comprehensive Guide for Swarm Engineers -#### Introductio -As artificial intelligence and machine learning continue to grow in complexity and applicability, building systems that can harness multiple agents to solve complex tasks becomes more critical. Swarm engineering enables AI agents to collaborate and solve problems autonomously in diverse fields such as finance, marketing, operations, and even creative industries. In this guide, we'll focus on how to build a custom swarm system that integrates multiple agents into a cohesive system capable of solving tasks collaboratively. +## Introduction -The swarm we'll design will leverage Python, use types for better code structure, and feature logging with the powerful **loguru** logging library. We'll break down how to define and initialize swarms, make them scalable, and create methods like `run(task: str)` to trigger their execution. +As artificial intelligence and machine learning continue to grow in complexity and applicability, building systems that can harness multiple agents to solve complex tasks becomes more critical. Swarm engineering enables AI agents to collaborate and solve problems autonomously in diverse fields such as finance, marketing, operations, and even creative industries. -By the end of this article, you will have a complete understanding of: +This comprehensive guide covers how to build a custom swarm system that integrates multiple agents into a cohesive system capable of solving tasks collaboratively. We'll cover everything from basic swarm structure to advanced features like conversation management, logging, error handling, and scalability. -- What swarms are and how they can be built. +By the end of this guide, you will have a complete understanding of: -- How to intake multiple agents using a flexible class. +- What swarms are and how they can be built -- How to run tasks across agents and capture their outputs. +- How to create agents and integrate them into swarms + +- How to implement proper conversation management for message storage + +- Best practices for error handling, logging, and optimization + +- How to make swarms scalable and production-ready -- Best practices for error handling, logging, and optimization. --- -### 1. Understanding the Concept of a Swarm +## Overview of Swarm Architecture + +A **Swarm** refers to a collection of agents that collaborate to solve a problem. Each agent in the swarm performs part of the task, either independently or by communicating with other agents. Swarms are ideal for: -A **swarm** refers to a collection of agents that collaborate to solve a problem. Each agent in the swarm performs part of the task, either independently or by communicating with other agents. Swarms are ideal for: +- **Scalability**: You can add or remove agents dynamically based on the task's complexity -- **Scalability**: You can add or remove agents dynamically based on the task's complexity. +- **Flexibility**: Each agent can be designed to specialize in different parts of the problem, offering modularity -- **Flexibility**: Each agent can be designed to specialize in different parts of the problem, offering modularity. +- **Autonomy**: Agents in a swarm can operate autonomously, reducing the need for constant supervision -- **Autonomy**: Agents in a swarm can operate autonomously, reducing the need for constant supervision. +- **Conversation Management**: All interactions are tracked and stored for analysis and continuity -We'll be using Python as the primary programming language and will structure the swarm class using clean, reusable code principles. --- -### 2. Designing the Swarm Class: Intake Multiple Agents +## Core Requirements for Swarm Classes + +Every Swarm class must adhere to these fundamental requirements: + +### Required Methods and Attributes + +- **`run(task: str, img: str, *args, **kwargs)` method**: The primary execution method for tasks + +- **`name`**: A descriptive name for the swarm + +- **`description`**: A clear description of the swarm's purpose + +- **`agents`**: A list of callables representing the agents + +- **`conversation`**: A conversation structure for message storage and history management + + +### Required Agent Structure -We'll begin by creating a base class for our swarm. This class will intake multiple agents and define a `run` method, which is the core method for executing tasks across the swarm. Each agent is defined by its specific behavior or "intelligence" to complete part of the task. +Each Agent within the swarm must contain: -#### 2.1 Importing the Required Libraries and Dependencies +- **`agent_name`**: Unique identifier for the agent -We'll rely on the **loguru** logging library, Pydantic for metadata handling, and standard Python typing. +- **`system_prompt`**: Instructions that guide the agent's behavior + +- **`run` method**: Method to execute tasks assigned to the agent + + +--- + +## Setting Up the Foundation + +### Required Dependencies ```python -from typing import List, Union +from typing import List, Union, Any, Optional, Callable from loguru import logger from swarms.structs.base_swarm import BaseSwarm +from swarms.structs.conversation import Conversation +from swarms.structs.agent import Agent +import concurrent.futures +import os +``` +### Custom Exception Handling + +```python class SwarmExecutionError(Exception): """Custom exception for handling swarm execution errors.""" pass + +class AgentValidationError(Exception): + """Custom exception for agent validation errors.""" + pass ``` -#### 2.2 Defining the Swarm Class +--- + +## Building the Custom Swarm Class -The class `CustomSwarm` will take in a list of agents. The agents will be instances of `BaseSwarm` (or callable functions). The `run(task: str)` method will delegate tasks to each agent in the swarm and handle any errors or retries. +### Basic Swarm Structure ```python -class CustomSwarm: - def __init__(self, agents: List[BaseSwarm]): +class CustomSwarm(BaseSwarm): + """ + A custom swarm class to manage and execute tasks with multiple agents. + + This swarm integrates conversation management for tracking all agent interactions, + provides error handling, and supports both sequential and concurrent execution. + + Attributes: + name (str): The name of the swarm. + description (str): A brief description of the swarm's purpose. + agents (List[Callable]): A list of callables representing the agents. + conversation (Conversation): Conversation management for message storage. + max_workers (int): Maximum number of concurrent workers for parallel execution. + autosave_conversation (bool): Whether to automatically save conversation history. + """ + + def __init__( + self, + name: str, + description: str, + agents: List[Callable], + max_workers: int = 4, + autosave_conversation: bool = True, + conversation_config: Optional[dict] = None, + ): """ - Initializes the CustomSwarm with a list of agents. + Initialize the CustomSwarm with its name, description, and agents. Args: - agents (List[BaseSwarm]): A list of agent objects that inherit from BaseSwarm. + name (str): The name of the swarm. + description (str): A description of the swarm. + agents (List[Callable]): A list of callables that provide the agents for the swarm. + max_workers (int): Maximum number of concurrent workers. + autosave_conversation (bool): Whether to automatically save conversations. + conversation_config (dict): Configuration for conversation management. """ + super().__init__(name=name, description=description, agents=agents) + self.name = name + self.description = description self.agents = agents + self.max_workers = max_workers + self.autosave_conversation = autosave_conversation + + # Initialize conversation management + # See: https://docs.swarms.world/swarms/structs/conversation/ + conversation_config = conversation_config or {} + self.conversation = Conversation( + id=f"swarm_{name}_{int(time.time())}", + name=f"{name}_conversation", + autosave=autosave_conversation, + save_enabled=True, + time_enabled=True, + **conversation_config + ) + + # Validate agents and log initialization self.validate_agents() + logger.info(f"πŸš€ CustomSwarm '{self.name}' initialized with {len(self.agents)} agents") + + # Add swarm initialization to conversation history + self.conversation.add( + role="System", + content=f"Swarm '{self.name}' initialized with {len(self.agents)} agents: {[getattr(agent, 'agent_name', 'Unknown') for agent in self.agents]}" + ) def validate_agents(self): - """Validates that each agent has a 'run' method.""" - for agent in self.agents: + """ + Validates that each agent has the required methods and attributes. + + Raises: + AgentValidationError: If any agent fails validation. + """ + for i, agent in enumerate(self.agents): + # Check for required run method if not hasattr(agent, 'run'): - raise AttributeError(f"Agent {agent} does not have a 'run' method.") - logger.info(f"Agent {agent} validated successfully.") - - def run(self, task: str): + raise AgentValidationError(f"Agent at index {i} does not have a 'run' method.") + + # Check for agent_name attribute + if not hasattr(agent, 'agent_name'): + logger.warning(f"Agent at index {i} does not have 'agent_name' attribute. Using 'Agent_{i}'") + agent.agent_name = f"Agent_{i}" + + logger.info(f"βœ… Agent '{agent.agent_name}' validated successfully.") + + def run(self, task: str, img: str = None, *args: Any, **kwargs: Any) -> Any: """ - Runs the task across all agents in the swarm. + Execute a task using the swarm and its agents with conversation tracking. + + Args: + task (str): The task description. + img (str): The image input (optional). + *args: Additional positional arguments for customization. + **kwargs: Additional keyword arguments for fine-tuning behavior. + Returns: + Any: The result of the task execution, aggregated from all agents. + """ + logger.info(f"🎯 Running task '{task}' across {len(self.agents)} agents in swarm '{self.name}'") + + # Add task to conversation history + self.conversation.add( + role="User", + content=f"Task: {task}" + (f" | Image: {img}" if img else ""), + category="input" + ) + + try: + # Execute task across all agents + results = self._execute_agents(task, img, *args, **kwargs) + + # Add results to conversation + self.conversation.add( + role="Swarm", + content=f"Task completed successfully. Processed by {len(results)} agents.", + category="output" + ) + + logger.success(f"βœ… Task completed successfully by swarm '{self.name}'") + return results + + except Exception as e: + error_msg = f"❌ Task execution failed in swarm '{self.name}': {str(e)}" + logger.error(error_msg) + + # Add error to conversation + self.conversation.add( + role="System", + content=f"Error: {error_msg}", + category="error" + ) + + raise SwarmExecutionError(error_msg) + + def _execute_agents(self, task: str, img: str = None, *args, **kwargs) -> List[Any]: + """ + Execute the task across all agents with proper conversation tracking. + Args: - task (str): The task to pass to each agent. + task (str): The task to execute. + img (str): Optional image input. + + Returns: + List[Any]: Results from all agents. """ - logger.info(f"Running task '{task}' across all agents in the swarm.") + results = [] + for agent in self.agents: try: - agent.run(task) - logger.info(f"Agent {agent} successfully completed the task.") + # Execute agent task + result = agent.run(task, img, *args, **kwargs) + results.append(result) + + # Add agent response to conversation + self.conversation.add( + role=agent.agent_name, + content=result, + category="agent_output" + ) + + logger.info(f"βœ… Agent '{agent.agent_name}' completed task successfully") + except Exception as e: - logger.error(f"Agent {agent} failed to run task: {e}") - raise SwarmExecutionError(f"Execution failed for {agent}. Task: {task}") + error_msg = f"Agent '{agent.agent_name}' failed: {str(e)}" + logger.error(error_msg) + + # Add agent error to conversation + self.conversation.add( + role=agent.agent_name, + content=f"Error: {error_msg}", + category="agent_error" + ) + + # Continue with other agents but log the failure + results.append(f"FAILED: {error_msg}") + + return results ``` -### 3. Adding Logging and Error Handling with `loguru` - -Logging is crucial for production-grade systems, especially when managing complex tasks that involve multiple agents. **Loguru** is a simple and efficient logging library that allows us to log everything from information messages to errors. +### Enhanced Swarm with Concurrent Execution ```python -from loguru import logger + def run_concurrent(self, task: str, img: str = None, *args: Any, **kwargs: Any) -> List[Any]: + """ + Execute a task using concurrent execution for better performance. + + Args: + task (str): The task description. + img (str): The image input (optional). + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Returns: + List[Any]: Results from all agents executed concurrently. + """ + logger.info(f"πŸš€ Running task concurrently across {len(self.agents)} agents") + + # Add task to conversation + self.conversation.add( + role="User", + content=f"Concurrent Task: {task}" + (f" | Image: {img}" if img else ""), + category="input" + ) + + results = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # Submit all agent tasks + future_to_agent = { + executor.submit(self._run_single_agent, agent, task, img, *args, **kwargs): agent + for agent in self.agents + } + + # Collect results as they complete + for future in concurrent.futures.as_completed(future_to_agent): + agent = future_to_agent[future] + try: + result = future.result() + results.append(result) + + # Add to conversation + self.conversation.add( + role=agent.agent_name, + content=result, + category="agent_output" + ) + + except Exception as e: + error_msg = f"Concurrent execution failed for agent '{agent.agent_name}': {str(e)}" + logger.error(error_msg) + results.append(f"FAILED: {error_msg}") + + # Add error to conversation + self.conversation.add( + role=agent.agent_name, + content=f"Error: {error_msg}", + category="agent_error" + ) + + # Add completion summary + self.conversation.add( + role="Swarm", + content=f"Concurrent task completed. {len(results)} agents processed.", + category="output" + ) + + return results -class CustomSwarm: - def __init__(self, agents: List[BaseSwarm]): - self.agents = agents - logger.info("CustomSwarm initialized with agents.") - self.validate_agents() + def _run_single_agent(self, agent: Callable, task: str, img: str = None, *args, **kwargs) -> Any: + """ + Execute a single agent with error handling. + + Args: + agent: The agent to execute. + task (str): The task to execute. + img (str): Optional image input. + + Returns: + Any: The agent's result. + """ + try: + return agent.run(task, img, *args, **kwargs) + except Exception as e: + logger.error(f"Agent '{getattr(agent, 'agent_name', 'Unknown')}' execution failed: {str(e)}") + raise +``` + +### Advanced Features - def run(self, task: str): - logger.info(f"Task received: {task}") +```python + def run_with_retries(self, task: str, img: str = None, retries: int = 3, *args, **kwargs) -> List[Any]: + """ + Execute a task with retry logic for failed agents. + + Args: + task (str): The task to execute. + img (str): Optional image input. + retries (int): Number of retries for failed agents. + + Returns: + List[Any]: Results from all agents with retry attempts. + """ + logger.info(f"πŸ”„ Running task with {retries} retries per agent") + + # Add task to conversation + self.conversation.add( + role="User", + content=f"Task with retries ({retries}): {task}", + category="input" + ) + + results = [] + for agent in self.agents: - try: - agent.run(task) - logger.success(f"Agent {agent} completed task successfully.") - except Exception as e: - logger.error(f"Error while running task '{task}' for {agent}: {e}") - raise SwarmExecutionError(f"Execution failed for {agent}") -``` + attempt = 0 + success = False + + while attempt <= retries and not success: + try: + result = agent.run(task, img, *args, **kwargs) + results.append(result) + success = True + + # Add successful result to conversation + self.conversation.add( + role=agent.agent_name, + content=result, + category="agent_output" + ) + + if attempt > 0: + logger.success(f"βœ… Agent '{agent.agent_name}' succeeded on attempt {attempt + 1}") + + except Exception as e: + attempt += 1 + error_msg = f"Agent '{agent.agent_name}' failed on attempt {attempt}: {str(e)}" + logger.warning(error_msg) + + # Add retry attempt to conversation + self.conversation.add( + role=agent.agent_name, + content=f"Retry attempt {attempt}: {error_msg}", + category="agent_retry" + ) + + if attempt > retries: + final_error = f"Agent '{agent.agent_name}' exhausted all {retries} retries" + logger.error(final_error) + results.append(f"FAILED: {final_error}") + + # Add final failure to conversation + self.conversation.add( + role=agent.agent_name, + content=final_error, + category="agent_error" + ) + + return results -### 4. Running Tasks Across Multiple Agents + def get_conversation_summary(self) -> dict: + """ + Get a summary of the conversation history and agent performance. + + Returns: + dict: Summary of conversation statistics and agent performance. + """ + # Get conversation statistics + message_counts = self.conversation.count_messages_by_role() + + # Count categories + category_counts = {} + for message in self.conversation.conversation_history: + category = message.get("category", "uncategorized") + category_counts[category] = category_counts.get(category, 0) + 1 + + # Get token counts if available + token_summary = self.conversation.export_and_count_categories() + + return { + "swarm_name": self.name, + "total_messages": len(self.conversation.conversation_history), + "messages_by_role": message_counts, + "messages_by_category": category_counts, + "token_summary": token_summary, + "conversation_id": self.conversation.id, + } + + def export_conversation(self, filepath: str = None) -> str: + """ + Export the conversation history to a file. + + Args: + filepath (str): Optional custom filepath for export. + + Returns: + str: The filepath where the conversation was saved. + """ + if filepath is None: + filepath = f"conversations/{self.name}_{self.conversation.id}.json" + + self.conversation.export_conversation(filepath) + logger.info(f"πŸ“„ Conversation exported to: {filepath}") + return filepath -The `run(task: str)` method will handle distributing the task to each agent in the swarm. Each agent’s `run` method is expected to take a task as input and perform its specific logic. We can add further customization by allowing each agent to return output, which can be collected for later analysis. + def display_conversation(self, detailed: bool = True): + """ + Display the conversation history in a formatted way. + + Args: + detailed (bool): Whether to show detailed information. + """ + logger.info(f"πŸ’¬ Displaying conversation for swarm: {self.name}") + self.conversation.display_conversation(detailed=detailed) +``` + +--- -#### 4.1 Example of Integrating Agents +## Creating Agents for Your Swarm -Let's take a look at how we can define agents using the `BaseSwarm` class and integrate them into the swarm. +### Basic Agent Structure ```python -class FinancialAgent(BaseSwarm): - def run(self, task: str): - logger.info(f"FinancialAgent processing task: {task}") - # Custom logic for financial analysis - return f"FinancialAgent response to task: {task}" - -class MarketingAgent(BaseSwarm): - def run(self, task: str): - logger.info(f"MarketingAgent processing task: {task}") - # Custom logic for marketing analysis - return f"MarketingAgent response to task: {task}" +class CustomAgent: + """ + A custom agent class that integrates with the swarm conversation system. + + Attributes: + agent_name (str): The name of the agent. + system_prompt (str): The system prompt guiding the agent's behavior. + conversation (Optional[Conversation]): Shared conversation for context. + """ + + def __init__( + self, + agent_name: str, + system_prompt: str, + conversation: Optional[Conversation] = None + ): + """ + Initialize the agent with its name and system prompt. + + Args: + agent_name (str): The name of the agent. + system_prompt (str): The guiding prompt for the agent. + conversation (Optional[Conversation]): Shared conversation context. + """ + self.agent_name = agent_name + self.system_prompt = system_prompt + self.conversation = conversation + + def run(self, task: str, img: str = None, *args: Any, **kwargs: Any) -> Any: + """ + Execute a specific task assigned to the agent. + + Args: + task (str): The task description. + img (str): The image input for processing. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Returns: + Any: The result of the task execution. + """ + # Add context from shared conversation if available + context = "" + if self.conversation: + context = f"Previous context: {self.conversation.get_last_message_as_string()}\n\n" + + # Process the task (implement your custom logic here) + result = f"Agent {self.agent_name} processed: {context}{task}" + + logger.info(f"πŸ€– Agent '{self.agent_name}' completed task") + return result ``` -Now, we initialize the swarm with these agents: +### Using Swarms Framework Agents + +You can also use the built-in Agent class from the Swarms framework: ```python -if __name__ == "__main__": - agents = [FinancialAgent(), MarketingAgent()] - swarm = CustomSwarm(agents) - swarm.run("Analyze Q3 financial report and marketing impact.") +from swarms.structs.agent import Agent + +def create_financial_agent() -> Agent: + """Create a financial analysis agent.""" + return Agent( + agent_name="FinancialAnalyst", + system_prompt="You are a financial analyst specializing in market analysis and risk assessment.", + model_name="gpt-4o-mini", + max_loops=1, + ) + +def create_marketing_agent() -> Agent: + """Create a marketing analysis agent.""" + return Agent( + agent_name="MarketingSpecialist", + system_prompt="You are a marketing specialist focused on campaign analysis and customer insights.", + model_name="gpt-4o-mini", + max_loops=1, + ) ``` -### 5. Enhancing the Swarm with Concurrent Execution +--- -When dealing with large or time-consuming tasks, running agents concurrently (in parallel) can significantly improve performance. We can achieve this by utilizing Python’s **concurrent.futures** or **threading** libraries. +## Complete Implementation Example -#### 5.1 Running Swarms Concurrently +### Setting Up Your Swarm ```python -from concurrent.futures import ThreadPoolExecutor, as_completed +import time +from typing import List -class CustomSwarm: - def __init__(self, agents: List[BaseSwarm], max_workers: int = 4): - self.agents = agents - self.thread_pool = ThreadPoolExecutor(max_workers=max_workers) - logger.info("CustomSwarm initialized with concurrent execution.") +def create_multi_domain_swarm() -> CustomSwarm: + """ + Create a comprehensive multi-domain analysis swarm. + + Returns: + CustomSwarm: A configured swarm with multiple specialized agents. + """ + # Create agents + agents = [ + create_financial_agent(), + create_marketing_agent(), + Agent( + agent_name="OperationsAnalyst", + system_prompt="You are an operations analyst specializing in process optimization and efficiency.", + model_name="gpt-4o-mini", + max_loops=1, + ), + ] + + # Configure conversation settings + conversation_config = { + "backend": "sqlite", # Use SQLite for persistent storage + "db_path": f"conversations/swarm_conversations.db", + "time_enabled": True, + "token_count": True, + } + + # Create the swarm + swarm = CustomSwarm( + name="MultiDomainAnalysisSwarm", + description="A comprehensive swarm for financial, marketing, and operations analysis", + agents=agents, + max_workers=3, + autosave_conversation=True, + conversation_config=conversation_config, + ) + + return swarm - def run(self, task: str): - futures = [] - for agent in self.agents: - futures.append(self.thread_pool.submit(agent.run, task)) - - for future in as_completed(futures): - result = future.result() - logger.info(f"Agent result: {result}") +# Usage example +if __name__ == "__main__": + # Create and initialize the swarm + swarm = create_multi_domain_swarm() + + # Execute a complex analysis task + task = """ + Analyze the Q3 2024 performance data for our company: + - Revenue: $2.5M (up 15% from Q2) + + - Customer acquisition: 1,200 new customers + + - Marketing spend: $150K + + - Operational costs: $800K + + + Provide insights from financial, marketing, and operations perspectives. + """ + + # Run the analysis + results = swarm.run(task) + + # Display results + print("\n" + "="*50) + print("SWARM ANALYSIS RESULTS") + print("="*50) + + for i, result in enumerate(results): + agent_name = swarm.agents[i].agent_name + print(f"\nπŸ€– {agent_name}:") + print(f"πŸ“Š {result}") + + # Get conversation summary + summary = swarm.get_conversation_summary() + print(f"\nπŸ“ˆ Conversation Summary:") + print(f" Total messages: {summary['total_messages']}") + print(f" Total tokens: {summary['token_summary']['total_tokens']}") + + # Export conversation for later analysis + export_path = swarm.export_conversation() + print(f"πŸ’Ύ Conversation saved to: {export_path}") ``` -### 6. Advanced Error Handling and Retries - -In a production system, agents might fail due to a wide range of reasons (network errors, API rate limits, etc.). To ensure resilience, we can add retry mechanisms and even fallback agents that attempt to recover the failed task. +### Advanced Usage with Concurrent Execution ```python -class CustomSwarm: - def run_with_retries(self, task: str, retries: int = 3): - """ - Runs the task across all agents with retry logic. - - Args: - task (str): The task to run. - retries (int): Number of retries allowed for failed agents. - """ - for agent in self.agents: - attempt = 0 - while attempt <= retries: - try: - agent.run(task) - logger.success(f"Agent {agent} completed task.") - break - except Exception as e: - logger.error(f"Agent {agent} failed on attempt {attempt + 1}. Error: {e}") - attempt += 1 - if attempt > retries: - logger.error(f"Agent {agent} exhausted retries. Task failed.") +def run_batch_analysis(): + """Example of running multiple tasks concurrently.""" + swarm = create_multi_domain_swarm() + + tasks = [ + "Analyze Q1 financial performance", + "Evaluate marketing campaign effectiveness", + "Review operational efficiency metrics", + "Assess customer satisfaction trends", + ] + + # Process all tasks concurrently + all_results = [] + for task in tasks: + results = swarm.run_concurrent(task) + all_results.append({"task": task, "results": results}) + + return all_results ``` -### 7. Adding Documentation with Docstrings +--- -Clear and concise documentation is critical, especially for engineers maintaining and scaling the system. Using Python’s docstrings, we can document each class and method, describing what they do and their expected inputs/outputs. +## Conversation Management Integration -```python -class CustomSwarm: - """ - A class to manage and execute tasks using a swarm of agents. +The swarm uses the Swarms framework's [Conversation structure](../conversation/) for comprehensive message storage and management. This provides: - Attributes: - agents (List[BaseSwarm]): A list of agent instances. +### Key Features + +- **Persistent Storage**: Multiple backend options (SQLite, Redis, Supabase, etc.) + +- **Message Categorization**: Organize messages by type (input, output, error, etc.) + +- **Token Tracking**: Monitor token usage across conversations + +- **Export/Import**: Save and load conversation histories + +- **Search Capabilities**: Find specific messages or content + + +### Conversation Configuration Options + +```python +conversation_config = { + # Backend storage options + "backend": "sqlite", # or "redis", "supabase", "duckdb", "in-memory" - Methods: - run(task: str): Runs a task across all agents in the swarm. - validate_agents(): Validates that each agent has a run method. - run_with_retries(task: str, retries: int): Runs the task with retry logic. - """ + # File-based storage + "db_path": "conversations/swarm_data.db", + + # Redis configuration (if using Redis backend) + "redis_host": "localhost", + "redis_port": 6379, + + # Features + "time_enabled": True, # Add timestamps to messages + "token_count": True, # Track token usage + "autosave": True, # Automatically save conversations + "save_enabled": True, # Enable saving functionality +} +``` - def __init__(self, agents: List[BaseSwarm]): - """ - Initializes the CustomSwarm with a list of agents. +### Accessing Conversation Data - Args: - agents (List[BaseSwarm]): A list of agent objects that inherit from BaseSwarm. - """ - self.agents = agents +```python +# Get conversation history +history = swarm.conversation.return_history_as_string() - def run(self, task: str): - """ - Runs the task across all agents in the swarm. +# Search for specific content +financial_messages = swarm.conversation.search("financial") - Args: - task (str): The task to pass to each agent. - """ - pass +# Export conversation data +swarm.conversation.export_conversation("analysis_session.json") - def validate_agents(self): - """Validates that each agent has a 'run' method.""" - pass +# Get conversation statistics +stats = swarm.conversation.count_messages_by_role() +token_usage = swarm.conversation.export_and_count_categories() ``` -` +For complete documentation on conversation management, see the [Conversation Structure Documentation](../conversation/). + -### Conclusion +--- -Building custom swarms that intake multiple agents can drastically improve the scalability, efficiency, and flexibility of AI-driven systems. By designing a robust swarm class that manages agents, distributes tasks, and ensures error resilience, you can handle complex, multi-agent workloads efficiently. +## Conclusion -In this Guide, we've covered: +Building custom swarms with proper conversation management enables you to create powerful, scalable, and maintainable multi-agent systems. The integration with the Swarms framework's conversation structure provides: + +- **Complete audit trail** of all agent interactions + +- **Persistent storage** options for different deployment scenarios + +- **Performance monitoring** through token and message tracking + +- **Easy debugging** with searchable conversation history + +- **Scalable architecture** that grows with your needs + + +By following the patterns and best practices outlined in this guide, you can create robust swarms that handle complex tasks efficiently while maintaining full visibility into their operations. + +### Key Takeaways + +1. **Always implement conversation management** for tracking and auditing +2. **Use proper error handling and retries** for production resilience +3. **Implement monitoring and logging** for observability +4. **Design for scalability** with concurrent execution patterns +5. **Test thoroughly** with unit tests and integration tests +6. **Configure appropriately** for your deployment environment + +For more advanced patterns and examples, explore the [Swarms Examples](../../examples/) and consider contributing your custom swarms back to the community by submitting a pull request to the [Swarms repository](https://github.com/kyegomez/swarms). + +--- -- Designing a basic swarm class. +## Additional Resources -- Running tasks across multiple agents. +- [Conversation Structure Documentation](../conversation/) - Complete guide to conversation management -- Leveraging logging, error handling, retries, and concurrency. +- [Agent Documentation](../../agents/) - Learn about creating and configuring agents -- Documenting your class for future-proofing. +- [Multi-Agent Architectures](../overview/) - Explore other swarm patterns and architectures -This approach sets the foundation for building more advanced and domain-specific swarms in areas like finance, marketing, operations, and beyond. Swarm engineers can now explore more complex, multi-agent systems and push the boundaries of AI collaboration. +- [Examples Repository](../../examples/) - Real-world swarm implementations -Stay tuned for future updates on more advanced swarm functionalities! \ No newline at end of file +- [Swarms Framework GitHub](https://github.com/kyegomez/swarms) - Source code and contributions