From 79c88e374058c1462ad38b6125928d49984ef0fc Mon Sep 17 00:00:00 2001 From: Your Name Date: Sat, 21 Sep 2024 14:10:21 -0400 Subject: [PATCH] [FIX][Alternative swarm architectures] --- docs/swarms/concept/swarm_architectures.md | 413 +++++++++++++++++- .../structs/various_swarm_architectures.md | 0 swarm_architecture_examples.py | 175 ++++++++ swarms/structs/swarming_architectures.py | 394 ++++++++++------- 4 files changed, 811 insertions(+), 171 deletions(-) create mode 100644 docs/swarms/structs/various_swarm_architectures.md create mode 100644 swarm_architecture_examples.py diff --git a/docs/swarms/concept/swarm_architectures.md b/docs/swarms/concept/swarm_architectures.md index e18a1d23..6395ece9 100644 --- a/docs/swarms/concept/swarm_architectures.md +++ b/docs/swarms/concept/swarm_architectures.md @@ -156,7 +156,7 @@ graph TD -# Mixture of Agents Architecture +### Mixture of Agents Architecture ```mermaid @@ -176,4 +176,415 @@ graph TD J --> K[Final Output] +``` + + +## Alternative Experimental Architectures + +### **1. Circular Swarm** + +#### Input Arguments: +- **name** (str): Name of the swarm. +- **description** (str): Description of the swarm. +- **goal** (str): Goal of the swarm. +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. +- **return_full_history** (bool): Whether to return the full conversation history. + +#### Functionality: +Agents pass tasks in a circular manner, where each agent works on the next task in the list. + +```mermaid +graph TD + Task1 --> Agent1 + Agent1 --> Agent2 + Agent2 --> Agent3 + Agent3 --> Task2 + Task2 --> Agent1 +``` + +--- + +### **2. Linear Swarm** + +#### Input Arguments: +- **name** (str): Name of the swarm. +- **description** (str): Description of the swarm. +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. +- **conversation** (Conversation): Conversation object. +- **return_full_history** (bool): Whether to return the full conversation history. + +#### Functionality: +Agents pass tasks in a linear fashion, each agent working on one task sequentially. + +```mermaid +graph LR + Task1 --> Agent1 + Agent1 --> Agent2 + Agent2 --> Agent3 + Agent3 --> Task2 +``` + +--- + +### **3. Star Swarm** + +#### Input Arguments: +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. + +#### Functionality: +A central agent (Agent 1) executes the tasks first, followed by the other agents working in parallel. + +```mermaid +graph TD + Task1 --> Agent1 + Agent1 --> Agent2 + Agent1 --> Agent3 + Agent1 --> Agent4 +``` + +--- + +### **4. Mesh Swarm** + +#### Input Arguments: +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. + +#### Functionality: +Each agent works on tasks randomly from a task queue, until the task queue is empty. + +```mermaid +graph TD + Task1 --> Agent1 + Task2 --> Agent2 + Task3 --> Agent3 + Task4 --> Agent4 + Task5 --> Agent1 + Task6 --> Agent2 +``` + +--- + +### **5. Grid Swarm** + +#### Input Arguments: +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. + +#### Functionality: +Agents are structured in a grid, and tasks are distributed accordingly. + +```mermaid +graph TD + Task1 --> Agent1 + Task2 --> Agent2 + Task3 --> Agent3 + Task4 --> Agent4 +``` + +--- + +### **6. Pyramid Swarm** + +#### Input Arguments: +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. + +#### Functionality: +Agents are arranged in a pyramid structure. Each level of agents works in sequence. + +```mermaid +graph TD + Task1 --> Agent1 + Agent1 --> Agent2 + Agent2 --> Agent3 + Agent3 --> Task2 +``` + +--- + +### **7. Fibonacci Swarm** + +#### Input Arguments: +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. + +#### Functionality: +Agents work according to the Fibonacci sequence, where the number of agents working on tasks follows this progression. + +```mermaid +graph TD + Task1 --> Agent1 + Agent1 --> Agent2 + Agent2 --> Agent3 + Task2 --> Agent5 + Agent5 --> Agent8 +``` + +--- + +### **8. Prime Swarm** + +#### Input Arguments: +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. + +#### Functionality: +Agents are assigned tasks based on prime number indices in the list of agents. + +```mermaid +graph TD + Task1 --> Agent2 + Task2 --> Agent3 + Task3 --> Agent5 + Task4 --> Agent7 +``` + +--- + +### **9. Power Swarm** + +#### Input Arguments: +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. + +#### Functionality: +Agents work on tasks following powers of two. + +```mermaid +graph TD + Task1 --> Agent1 + Task2 --> Agent2 + Task3 --> Agent4 + Task4 --> Agent8 +``` + +--- + +### **10. Sigmoid Swarm** + +#### Input Arguments: +- **agents** (AgentListType): List of agents involved. +- **tasks** (List[str]): List of tasks for the agents. + +#### Functionality: +Agents are selected based on the sigmoid function, with higher-indexed agents handling more complex tasks. + +```mermaid +graph TD + Task1 --> Agent1 + Task2 --> Agent2 + Task3 --> Agent3 + Task4 --> Agent4 +``` + +--- + +### **11. Sinusoidal Swarm** + +#### Input Arguments: +- **agents** (AgentListType): List of agents involved. +- **task** (str): Task for the agents to work on. + +#### Functionality: +Agents are assigned tasks based on a sinusoidal pattern. + +```mermaid +graph TD + Task --> Agent1 + Agent1 --> Agent2 + Agent2 --> Agent3 + Agent3 --> Task2 +``` + +--- + +Each of these swarm architectures enables different task distribution and agent coordination strategies, making it possible to select the right architecture for specific types of agent-based problem-solving scenarios. + + + +## Examples + +```python + +import asyncio +import os + +from dotenv import load_dotenv +from loguru import logger +from swarm_models import OpenAIChat +from tickr_agent.main import TickrAgent + +from swarms.structs.swarming_architectures import ( + circular_swarm, + linear_swarm, + mesh_swarm, + pyramid_swarm, + star_swarm, +) + +# Load environment variables (API keys) +load_dotenv() +api_key = os.getenv("OPENAI_API_KEY") + +# Initialize the OpenAI model +model = OpenAIChat( + openai_api_key=api_key, model_name="gpt-4", temperature=0.1 +) + +# Custom Financial Agent System Prompts +STOCK_ANALYSIS_PROMPT = """ +You are an expert financial analyst. Your task is to analyze stock market data for a company +and provide insights on whether to buy, hold, or sell. Analyze trends, financial ratios, and market conditions. +""" + +NEWS_SUMMARIZATION_PROMPT = """ +You are a financial news expert. Summarize the latest news related to a company and provide insights on +how it could impact its stock price. Be concise and focus on the key takeaways. +""" + +RATIO_CALCULATION_PROMPT = """ +You are a financial ratio analyst. Your task is to calculate key financial ratios for a company +based on the available data, such as P/E ratio, debt-to-equity ratio, and return on equity. +Explain what each ratio means for investors. +""" + +# Example Usage +# Define stock tickers +stocks = ["AAPL", "TSLA"] + + +# Initialize Financial Analysis Agents +stock_analysis_agent = TickrAgent( + agent_name="Stock-Analysis-Agent", + system_prompt=STOCK_ANALYSIS_PROMPT, + stocks=stocks, +) + +news_summarization_agent = TickrAgent( + agent_name="News-Summarization-Agent", + system_prompt=NEWS_SUMMARIZATION_PROMPT, + stocks=stocks, + +) + +ratio_calculation_agent = TickrAgent( + agent_name="Ratio-Calculation-Agent", + system_prompt=RATIO_CALCULATION_PROMPT, + stocks=stocks, + +) +# Create a list of agents for swarming +agents = [ + stock_analysis_agent, + news_summarization_agent, + ratio_calculation_agent, +] + +# Define financial analysis tasks +tasks = [ + "Analyze the stock performance of Apple (AAPL) in the last 6 months.", + "Summarize the latest financial news on Tesla (TSLA).", + "Calculate the P/E ratio and debt-to-equity ratio for Amazon (AMZN).", +] + +# -------------------------------# Showcase Circular Swarm +# ------------------------------- +logger.info("Starting Circular Swarm for financial analysis.") +circular_result = circular_swarm(agents, tasks) +logger.info(f"Circular Swarm Result:\n{circular_result}\n") + + +# ------------------------------- +# Showcase Linear Swarm +# ------------------------------- +logger.info("Starting Linear Swarm for financial analysis.") +linear_result = linear_swarm(agents, tasks) +logger.info(f"Linear Swarm Result:\n{linear_result}\n") + + +# ------------------------------- +# Showcase Star Swarm +# ------------------------------- +logger.info("Starting Star Swarm for financial analysis.") +star_result = star_swarm(agents, tasks) +logger.info(f"Star Swarm Result:\n{star_result}\n") + + +# ------------------------------- +# Showcase Mesh Swarm +# ------------------------------- +logger.info("Starting Mesh Swarm for financial analysis.") +mesh_result = mesh_swarm(agents, tasks) +logger.info(f"Mesh Swarm Result:\n{mesh_result}\n") + + +# ------------------------------- +# Showcase Pyramid Swarm +# ------------------------------- +logger.info("Starting Pyramid Swarm for financial analysis.") +pyramid_result = pyramid_swarm(agents, tasks) +logger.info(f"Pyramid Swarm Result:\n{pyramid_result}\n") + + +# ------------------------------- +# Example: One-to-One Communication between Agents +# ------------------------------- +logger.info( + "Starting One-to-One communication between Stock and News agents." +) +one_to_one_result = stock_analysis_agent.run( + "Analyze Apple stock performance, and then send the result to the News Summarization Agent" +) +news_summary_result = news_summarization_agent.run(one_to_one_result) +logger.info( + f"One-to-One Communication Result:\n{news_summary_result}\n" +) + + +# ------------------------------- +# Example: Broadcasting to all agents +# ------------------------------- +async def broadcast_task(): + logger.info("Broadcasting task to all agents.") + task = "Summarize the overall stock market performance today." + await asyncio.gather(*[agent.run(task) for agent in agents]) + + +asyncio.run(broadcast_task()) + + +# ------------------------------- +# Deep Comments & Explanations +# ------------------------------- + +""" +Explanation of Key Components: + +1. **Agents**: + - We created three specialized agents for financial analysis: Stock Analysis, News Summarization, and Ratio Calculation. + - Each agent is provided with a custom system prompt that defines their unique task in analyzing stock data. + +2. **Swarm Examples**: + - **Circular Swarm**: Agents take turns processing tasks in a circular manner. + - **Linear Swarm**: Tasks are processed sequentially by each agent. + - **Star Swarm**: The first agent (Stock Analysis) processes all tasks before distributing them to other agents. + - **Mesh Swarm**: Agents work on random tasks from the task queue. + - **Pyramid Swarm**: Agents are arranged in a pyramid structure, processing tasks layer by layer. + +3. **One-to-One Communication**: + - This showcases how one agent can pass its result to another agent for further processing, useful for complex workflows where agents depend on each other. + +4. **Broadcasting**: + - The broadcasting function demonstrates how a single task can be sent to all agents simultaneously. This can be useful for situations like summarizing daily stock market performance across multiple agents. + +5. **Logging with Loguru**: + - We use `loguru` for detailed logging throughout the swarms. This helps to track the flow of information and responses from each agent. +""" + + + ``` \ No newline at end of file diff --git a/docs/swarms/structs/various_swarm_architectures.md b/docs/swarms/structs/various_swarm_architectures.md new file mode 100644 index 00000000..e69de29b diff --git a/swarm_architecture_examples.py b/swarm_architecture_examples.py new file mode 100644 index 00000000..db7016db --- /dev/null +++ b/swarm_architecture_examples.py @@ -0,0 +1,175 @@ + +import asyncio +import os + +from dotenv import load_dotenv +from loguru import logger +from swarm_models import OpenAIChat +from tickr_agent.main import TickrAgent + +from swarms.structs.swarming_architectures import ( + circular_swarm, + linear_swarm, + mesh_swarm, + pyramid_swarm, + star_swarm, +) + +# Load environment variables (API keys) +load_dotenv() +api_key = os.getenv("OPENAI_API_KEY") + +# Initialize the OpenAI model +model = OpenAIChat( + openai_api_key=api_key, model_name="gpt-4", temperature=0.1 +) + +# Custom Financial Agent System Prompts +STOCK_ANALYSIS_PROMPT = """ +You are an expert financial analyst. Your task is to analyze stock market data for a company +and provide insights on whether to buy, hold, or sell. Analyze trends, financial ratios, and market conditions. +""" + +NEWS_SUMMARIZATION_PROMPT = """ +You are a financial news expert. Summarize the latest news related to a company and provide insights on +how it could impact its stock price. Be concise and focus on the key takeaways. +""" + +RATIO_CALCULATION_PROMPT = """ +You are a financial ratio analyst. Your task is to calculate key financial ratios for a company +based on the available data, such as P/E ratio, debt-to-equity ratio, and return on equity. +Explain what each ratio means for investors. +""" + +# Example Usage +# Define stock tickers +stocks = ["AAPL", "TSLA"] + + +# Initialize Financial Analysis Agents +stock_analysis_agent = TickrAgent( + agent_name="Stock-Analysis-Agent", + system_prompt=STOCK_ANALYSIS_PROMPT, + stocks=stocks, +) + +news_summarization_agent = TickrAgent( + agent_name="News-Summarization-Agent", + system_prompt=NEWS_SUMMARIZATION_PROMPT, + stocks=stocks, + +) + +ratio_calculation_agent = TickrAgent( + agent_name="Ratio-Calculation-Agent", + system_prompt=RATIO_CALCULATION_PROMPT, + stocks=stocks, + +) +# Create a list of agents for swarming +agents = [ + stock_analysis_agent, + news_summarization_agent, + ratio_calculation_agent, +] + +# Define financial analysis tasks +tasks = [ + "Analyze the stock performance of Apple (AAPL) in the last 6 months.", + "Summarize the latest financial news on Tesla (TSLA).", + "Calculate the P/E ratio and debt-to-equity ratio for Amazon (AMZN).", +] + +# -------------------------------# Showcase Circular Swarm +# ------------------------------- +logger.info("Starting Circular Swarm for financial analysis.") +circular_result = circular_swarm(agents, tasks) +logger.info(f"Circular Swarm Result:\n{circular_result}\n") + + +# ------------------------------- +# Showcase Linear Swarm +# ------------------------------- +logger.info("Starting Linear Swarm for financial analysis.") +linear_result = linear_swarm(agents, tasks) +logger.info(f"Linear Swarm Result:\n{linear_result}\n") + + +# ------------------------------- +# Showcase Star Swarm +# ------------------------------- +logger.info("Starting Star Swarm for financial analysis.") +star_result = star_swarm(agents, tasks) +logger.info(f"Star Swarm Result:\n{star_result}\n") + + +# ------------------------------- +# Showcase Mesh Swarm +# ------------------------------- +logger.info("Starting Mesh Swarm for financial analysis.") +mesh_result = mesh_swarm(agents, tasks) +logger.info(f"Mesh Swarm Result:\n{mesh_result}\n") + + +# ------------------------------- +# Showcase Pyramid Swarm +# ------------------------------- +logger.info("Starting Pyramid Swarm for financial analysis.") +pyramid_result = pyramid_swarm(agents, tasks) +logger.info(f"Pyramid Swarm Result:\n{pyramid_result}\n") + + +# ------------------------------- +# Example: One-to-One Communication between Agents +# ------------------------------- +logger.info( + "Starting One-to-One communication between Stock and News agents." +) +one_to_one_result = stock_analysis_agent.run( + "Analyze Apple stock performance, and then send the result to the News Summarization Agent" +) +news_summary_result = news_summarization_agent.run(one_to_one_result) +logger.info( + f"One-to-One Communication Result:\n{news_summary_result}\n" +) + + +# ------------------------------- +# Example: Broadcasting to all agents +# ------------------------------- +async def broadcast_task(): + logger.info("Broadcasting task to all agents.") + task = "Summarize the overall stock market performance today." + await asyncio.gather(*[agent.run(task) for agent in agents]) + + +asyncio.run(broadcast_task()) + + +# ------------------------------- +# Deep Comments & Explanations +# ------------------------------- + +""" +Explanation of Key Components: + +1. **Agents**: + - We created three specialized agents for financial analysis: Stock Analysis, News Summarization, and Ratio Calculation. + - Each agent is provided with a custom system prompt that defines their unique task in analyzing stock data. + +2. **Swarm Examples**: + - **Circular Swarm**: Agents take turns processing tasks in a circular manner. + - **Linear Swarm**: Tasks are processed sequentially by each agent. + - **Star Swarm**: The first agent (Stock Analysis) processes all tasks before distributing them to other agents. + - **Mesh Swarm**: Agents work on random tasks from the task queue. + - **Pyramid Swarm**: Agents are arranged in a pyramid structure, processing tasks layer by layer. + +3. **One-to-One Communication**: + - This showcases how one agent can pass its result to another agent for further processing, useful for complex workflows where agents depend on each other. + +4. **Broadcasting**: + - The broadcasting function demonstrates how a single task can be sent to all agents simultaneously. This can be useful for situations like summarizing daily stock market performance across multiple agents. + +5. **Logging with Loguru**: + - We use `loguru` for detailed logging throughout the swarms. This helps to track the flow of information and responses from each agent. +""" diff --git a/swarms/structs/swarming_architectures.py b/swarms/structs/swarming_architectures.py index 76d76724..abf1ae37 100644 --- a/swarms/structs/swarming_architectures.py +++ b/swarms/structs/swarming_architectures.py @@ -1,176 +1,224 @@ import asyncio import math -from typing import List +from typing import List, Union + +from loguru import logger +from pydantic import BaseModel from swarms.structs.agent import Agent -from swarms.utils.loguru_logger import logger -from swarms.structs.conversation import Conversation -from swarms.structs.concat import concat_strings from swarms.structs.omni_agent_types import AgentListType -# from swarms.structs.swarm_registry import swarm_registry, SwarmRegistry +# Define Pydantic schema for logging agent responses +class AgentLog(BaseModel): + agent_name: str + task: str + response: str -# @swarm_registry -def circular_swarm( - name: str = "Circular Swarm", - description: str = "A circular swarm is a type of swarm where agents pass tasks in a circular manner.", - goal: str = None, - agents: AgentListType = None, - tasks: List[str] = None, - return_full_history: bool = True, -): - if not agents: - raise ValueError("Agents list cannot be empty.") - if not tasks: - raise ValueError("Tasks list cannot be empty.") +class Conversation(BaseModel): + logs: List[AgentLog] = [] - conversation = Conversation( - time_enabled=True, - ) + def add_log( + self, agent_name: str, task: str, response: str + ) -> None: + log_entry = AgentLog( + agent_name=agent_name, task=task, response=response + ) + self.logs.append(log_entry) + logger.info( + f"Agent: {agent_name} | Task: {task} | Response: {response}" + ) + def return_history(self) -> dict: + return { + "history": [ + { + "agent_name": log.agent_name, + "task": log.task, + "response": log.response, + } + for log in self.logs + ] + } + + +# Circular Swarm: Agents pass tasks in a circular manner +def circular_swarm( + agents: AgentListType, + tasks: List[str], + return_full_history: bool = True, +) -> Union[str, List[str]]: + if not agents or not tasks: + raise ValueError("Agents and tasks lists cannot be empty.") + + conversation = Conversation() responses = [] for task in tasks: for agent in agents: - # Log the task - out = agent.run(task) - # print(f"Task: {task}, Response {out}") - # prompt = f"Task: {task}, Response {out}" - logger.info(f"Agent: {agent.agent_name} Response {out}") - - conversation.add( - role=agent.agent_name, - content=out, + response = agent.run(task) + conversation.add_log( + agent_name=agent.agent_name, + task=task, + response=response, ) + responses.append(response) - # Response list - responses.append(out) - - if return_full_history: - return conversation.return_history_as_string() - else: - return responses + return ( + conversation.return_history() + if return_full_history + else responses + ) -# @swarm_registry() +def grid_swarm(agents: AgentListType, tasks: List[str]): + grid_size = int( + len(agents) ** 0.5 + ) # Assuming agents can form a perfect square grid + for i in range(grid_size): + for j in range(grid_size): + if tasks: + task = tasks.pop(0) + agents[i * grid_size + j].run(task) + + +# Linear Swarm: Agents process tasks in a sequential linear manner def linear_swarm( - name: str = "Linear Swarm", - description: str = "A linear swarm is a type of swarm where agents pass tasks in a linear manner.", - agents: AgentListType = None, - tasks: List[str] = None, - conversation: Conversation = None, + agents: AgentListType, + tasks: List[str], return_full_history: bool = True, -): - if not agents: - raise ValueError("Agents list cannot be empty.") - - if not tasks: - raise ValueError("Tasks list cannot be empty.") - - if not conversation: - conversation = Conversation( - time_enabled=True, - ) +) -> Union[str, List[str]]: + if not agents or not tasks: + raise ValueError("Agents and tasks lists cannot be empty.") + conversation = Conversation() responses = [] - for i in range(len(agents)): + for agent in agents: if tasks: task = tasks.pop(0) - out = agents[i].run(task) - - conversation.add( - role=agents[i].agent_name, - content=f"Task: {task}, Response {out}", + response = agent.run(task) + conversation.add_log( + agent_name=agent.agent_name, + task=task, + response=response, ) + responses.append(response) - responses.append(out) - - if return_full_history: - return conversation.return_history_as_string() - else: - return responses - - -# print(SwarmRegistry().list_swarms()) - -# def linear_swarm(agents: AgentListType, tasks: List[str]): -# logger.info(f"Running linear swarm with {len(agents)} agents") -# for i in range(len(agents)): -# if tasks: -# task = tasks.pop(0) -# agents[i].run(task) - - -def star_swarm(agents: AgentListType, tasks: List[str]) -> str: - logger.info( - f"Running star swarm with {len(agents)} agents and {len(tasks)} tasks" + return ( + conversation.return_history() + if return_full_history + else responses ) - if not agents: - raise ValueError("Agents list cannot be empty.") - - if not tasks: - raise ValueError("Tasks list cannot be empty.") - conversation = Conversation(time_enabled=True) - center_agent = agents[0] +# Star Swarm: A central agent first processes all tasks, followed by others +def star_swarm( + agents: AgentListType, + tasks: List[str], + return_full_history: bool = True, +) -> Union[str, List[str]]: + if not agents or not tasks: + raise ValueError("Agents and tasks lists cannot be empty.") + conversation = Conversation() + center_agent = agents[0] # The central agent responses = [] for task in tasks: + # Central agent processes the task + center_response = center_agent.run(task) + conversation.add_log( + agent_name=center_agent.agent_name, + task=task, + response=center_response, + ) + responses.append(center_response) - out = center_agent.run(task) - log = f"Agent: {center_agent.agent_name} Response {out}" - logger.info(log) - conversation.add(center_agent.agent_name, out) - responses.append(out) - + # Other agents process the same task for agent in agents[1:]: + response = agent.run(task) + conversation.add_log( + agent_name=agent.agent_name, + task=task, + response=response, + ) + responses.append(response) - output = agent.run(task) - log_two = f"Agent: {agent.agent_name} Response {output}" - logger.info(log_two) - conversation.add(agent.agent_name, output) - responses.append(out) - - out = concat_strings(responses) - print(out) + return ( + conversation.return_history() + if return_full_history + else responses + ) - return out +# Mesh Swarm: Agents work on tasks randomly from a task queue until all tasks are processed +def mesh_swarm( + agents: AgentListType, + tasks: List[str], + return_full_history: bool = True, +) -> Union[str, List[str]]: + if not agents or not tasks: + raise ValueError("Agents and tasks lists cannot be empty.") -def mesh_swarm(agents: AgentListType, tasks: List[str]): + conversation = Conversation() task_queue = tasks.copy() + responses = [] + while task_queue: for agent in agents: if task_queue: task = task_queue.pop(0) - agent.run(task) + response = agent.run(task) + conversation.add_log( + agent_name=agent.agent_name, + task=task, + response=response, + ) + responses.append(response) + + return ( + conversation.return_history() + if return_full_history + else responses + ) -def grid_swarm(agents: AgentListType, tasks: List[str]): - grid_size = int( - len(agents) ** 0.5 - ) # Assuming agents can form a perfect square grid - for i in range(grid_size): - for j in range(grid_size): - if tasks: - task = tasks.pop(0) - agents[i * grid_size + j].run(task) +# Pyramid Swarm: Agents are arranged in a pyramid structure +def pyramid_swarm( + agents: AgentListType, + tasks: List[str], + return_full_history: bool = True, +) -> Union[str, List[str]]: + if not agents or not tasks: + raise ValueError("Agents and tasks lists cannot be empty.") + conversation = Conversation() + responses = [] -def pyramid_swarm(agents: AgentListType, tasks: List[str]): levels = int( (-1 + (1 + 8 * len(agents)) ** 0.5) / 2 - ) # Assuming agents can form a perfect pyramid + ) # Number of levels in the pyramid + for i in range(levels): for j in range(i + 1): if tasks: task = tasks.pop(0) - agents[int(i * (i + 1) / 2 + j)].run(task) + agent_index = int(i * (i + 1) / 2 + j) + response = agents[agent_index].run(task) + conversation.add_log( + agent_name=agents[agent_index].agent_name, + task=task, + response=response, + ) + responses.append(response) + + return ( + conversation.return_history() + if return_full_history + else responses + ) def fibonacci_swarm(agents: AgentListType, tasks: List[str]): @@ -318,86 +366,92 @@ async def one_to_three( raise error -async def broadcast( - sender: Agent, - agents: AgentListType, - task: str, -): +""" +This module contains functions for facilitating communication between agents in a swarm. It includes methods for one-to-one communication, broadcasting, and other swarm architectures. +""" + + +# One-to-One Communication between two agents +def one_to_one( + sender: Agent, receiver: Agent, task: str, max_loops: int = 1 +) -> str: """ - Broadcasts a message from the sender agent to a list of agents. + Facilitates one-to-one communication between two agents. The sender and receiver agents exchange messages for a specified number of loops. Args: sender (Agent): The agent sending the message. - agents (AgentListType): The list of agents to receive the message. - task (str): The message to be broadcasted. - - Raises: - Exception: If an error occurs during the broadcast. + receiver (Agent): The agent receiving the message. + task (str): The message to be sent. + max_loops (int, optional): The number of times the sender and receiver exchange messages. Defaults to 1. Returns: - None - """ - if not sender: - raise ValueError("The sender cannot be empty.") + str: The conversation history between the sender and receiver. - if not agents: - raise ValueError("The agents list cannot be empty.") - - if not task: - raise ValueError("The task cannot be empty.") + Raises: + Exception: If there is an error during the communication process. + """ + conversation = Conversation() + responses = [] try: - receive_tasks = [] - for agent in agents: - receive_tasks.append( - agent.receive_message(sender.agent_name, task) + for _ in range(max_loops): + # Sender processes the task + sender_response = sender.run(task) + conversation.add_log( + agent_name=sender.agent_name, + task=task, + response=sender_response, ) + responses.append(sender_response) + + # Receiver processes the result of the sender + receiver_response = receiver.run(sender_response) + conversation.add_log( + agent_name=receiver.agent_name, + task=task, + response=receiver_response, + ) + responses.append(receiver_response) - await asyncio.gather(*receive_tasks) except Exception as error: logger.error( - f"[ERROR][CLASS: Agent][METHOD: broadcast] {error}" + f"Error during one_to_one communication: {error}" ) raise error + return conversation.return_history() -def one_to_one( - sender: Agent, - receiver: Agent, - task: str, - max_loops: int = 1, -): + +# Broadcasting: A message from one agent to many +async def broadcast( + sender: Agent, agents: AgentListType, task: str +) -> None: """ - Sends a message from the sender agent to the receiver agent. + Facilitates broadcasting of a message from one agent to multiple agents. Args: sender (Agent): The agent sending the message. - receiver (Agent): The agent to receive the message. + agents (AgentListType): The list of agents to receive the message. task (str): The message to be sent. Raises: - Exception: If an error occurs during the message sending. - - Returns: - None + ValueError: If the sender, agents, or task is empty. + Exception: If there is an error during the broadcasting process. """ - try: - responses = [] - responses.append(task) - for i in range(max_loops): + conversation = Conversation() - # Run the agent on the task then pass the response to the receiver - response = sender.run(task) - log = f"Agent {sender.agent_name} Response: {response}" - responses.append(log) + if not sender or not agents or not task: + raise ValueError("Sender, agents, and task cannot be empty.") - # Send the response to the receiver - out = receiver.run(concat_strings(responses)) - responses.append(out) + try: + receive_tasks = [] + for agent in agents: + receive_tasks.append(agent.run(task)) + conversation.add_log( + agent_name=agent.agent_name, task=task, response=task + ) - return concat_strings(responses) + await asyncio.gather(*receive_tasks) except Exception as error: - logger.error( - f"[ERROR][CLASS: Agent][METHOD: one_to_one] {error}" - ) + logger.error(f"Error during broadcast: {error}") raise error