[FIX][Alternative swarm architectures]

pull/584/head
Your Name 4 months ago
parent 83763c372e
commit 79c88e3740

@ -156,7 +156,7 @@ graph TD
# Mixture of Agents Architecture ### Mixture of Agents Architecture
```mermaid ```mermaid
@ -176,4 +176,415 @@ graph TD
J --> K[Final Output] J --> K[Final Output]
```
## Alternative Experimental Architectures
### **1. Circular Swarm**
#### Input Arguments:
- **name** (str): Name of the swarm.
- **description** (str): Description of the swarm.
- **goal** (str): Goal of the swarm.
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
- **return_full_history** (bool): Whether to return the full conversation history.
#### Functionality:
Agents pass tasks in a circular manner, where each agent works on the next task in the list.
```mermaid
graph TD
Task1 --> Agent1
Agent1 --> Agent2
Agent2 --> Agent3
Agent3 --> Task2
Task2 --> Agent1
```
---
### **2. Linear Swarm**
#### Input Arguments:
- **name** (str): Name of the swarm.
- **description** (str): Description of the swarm.
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
- **conversation** (Conversation): Conversation object.
- **return_full_history** (bool): Whether to return the full conversation history.
#### Functionality:
Agents pass tasks in a linear fashion, each agent working on one task sequentially.
```mermaid
graph LR
Task1 --> Agent1
Agent1 --> Agent2
Agent2 --> Agent3
Agent3 --> Task2
```
---
### **3. Star Swarm**
#### Input Arguments:
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
#### Functionality:
A central agent (Agent 1) executes the tasks first, followed by the other agents working in parallel.
```mermaid
graph TD
Task1 --> Agent1
Agent1 --> Agent2
Agent1 --> Agent3
Agent1 --> Agent4
```
---
### **4. Mesh Swarm**
#### Input Arguments:
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
#### Functionality:
Each agent works on tasks randomly from a task queue, until the task queue is empty.
```mermaid
graph TD
Task1 --> Agent1
Task2 --> Agent2
Task3 --> Agent3
Task4 --> Agent4
Task5 --> Agent1
Task6 --> Agent2
```
---
### **5. Grid Swarm**
#### Input Arguments:
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
#### Functionality:
Agents are structured in a grid, and tasks are distributed accordingly.
```mermaid
graph TD
Task1 --> Agent1
Task2 --> Agent2
Task3 --> Agent3
Task4 --> Agent4
```
---
### **6. Pyramid Swarm**
#### Input Arguments:
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
#### Functionality:
Agents are arranged in a pyramid structure. Each level of agents works in sequence.
```mermaid
graph TD
Task1 --> Agent1
Agent1 --> Agent2
Agent2 --> Agent3
Agent3 --> Task2
```
---
### **7. Fibonacci Swarm**
#### Input Arguments:
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
#### Functionality:
Agents work according to the Fibonacci sequence, where the number of agents working on tasks follows this progression.
```mermaid
graph TD
Task1 --> Agent1
Agent1 --> Agent2
Agent2 --> Agent3
Task2 --> Agent5
Agent5 --> Agent8
```
---
### **8. Prime Swarm**
#### Input Arguments:
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
#### Functionality:
Agents are assigned tasks based on prime number indices in the list of agents.
```mermaid
graph TD
Task1 --> Agent2
Task2 --> Agent3
Task3 --> Agent5
Task4 --> Agent7
```
---
### **9. Power Swarm**
#### Input Arguments:
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
#### Functionality:
Agents work on tasks following powers of two.
```mermaid
graph TD
Task1 --> Agent1
Task2 --> Agent2
Task3 --> Agent4
Task4 --> Agent8
```
---
### **10. Sigmoid Swarm**
#### Input Arguments:
- **agents** (AgentListType): List of agents involved.
- **tasks** (List[str]): List of tasks for the agents.
#### Functionality:
Agents are selected based on the sigmoid function, with higher-indexed agents handling more complex tasks.
```mermaid
graph TD
Task1 --> Agent1
Task2 --> Agent2
Task3 --> Agent3
Task4 --> Agent4
```
---
### **11. Sinusoidal Swarm**
#### Input Arguments:
- **agents** (AgentListType): List of agents involved.
- **task** (str): Task for the agents to work on.
#### Functionality:
Agents are assigned tasks based on a sinusoidal pattern.
```mermaid
graph TD
Task --> Agent1
Agent1 --> Agent2
Agent2 --> Agent3
Agent3 --> Task2
```
---
Each of these swarm architectures enables different task distribution and agent coordination strategies, making it possible to select the right architecture for specific types of agent-based problem-solving scenarios.
## Examples
```python
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from swarm_models import OpenAIChat
from tickr_agent.main import TickrAgent
from swarms.structs.swarming_architectures import (
circular_swarm,
linear_swarm,
mesh_swarm,
pyramid_swarm,
star_swarm,
)
# Load environment variables (API keys)
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
# Initialize the OpenAI model
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4", temperature=0.1
)
# Custom Financial Agent System Prompts
STOCK_ANALYSIS_PROMPT = """
You are an expert financial analyst. Your task is to analyze stock market data for a company
and provide insights on whether to buy, hold, or sell. Analyze trends, financial ratios, and market conditions.
"""
NEWS_SUMMARIZATION_PROMPT = """
You are a financial news expert. Summarize the latest news related to a company and provide insights on
how it could impact its stock price. Be concise and focus on the key takeaways.
"""
RATIO_CALCULATION_PROMPT = """
You are a financial ratio analyst. Your task is to calculate key financial ratios for a company
based on the available data, such as P/E ratio, debt-to-equity ratio, and return on equity.
Explain what each ratio means for investors.
"""
# Example Usage
# Define stock tickers
stocks = ["AAPL", "TSLA"]
# Initialize Financial Analysis Agents
stock_analysis_agent = TickrAgent(
agent_name="Stock-Analysis-Agent",
system_prompt=STOCK_ANALYSIS_PROMPT,
stocks=stocks,
)
news_summarization_agent = TickrAgent(
agent_name="News-Summarization-Agent",
system_prompt=NEWS_SUMMARIZATION_PROMPT,
stocks=stocks,
)
ratio_calculation_agent = TickrAgent(
agent_name="Ratio-Calculation-Agent",
system_prompt=RATIO_CALCULATION_PROMPT,
stocks=stocks,
)
# Create a list of agents for swarming
agents = [
stock_analysis_agent,
news_summarization_agent,
ratio_calculation_agent,
]
# Define financial analysis tasks
tasks = [
"Analyze the stock performance of Apple (AAPL) in the last 6 months.",
"Summarize the latest financial news on Tesla (TSLA).",
"Calculate the P/E ratio and debt-to-equity ratio for Amazon (AMZN).",
]
# -------------------------------# Showcase Circular Swarm
# -------------------------------
logger.info("Starting Circular Swarm for financial analysis.")
circular_result = circular_swarm(agents, tasks)
logger.info(f"Circular Swarm Result:\n{circular_result}\n")
# -------------------------------
# Showcase Linear Swarm
# -------------------------------
logger.info("Starting Linear Swarm for financial analysis.")
linear_result = linear_swarm(agents, tasks)
logger.info(f"Linear Swarm Result:\n{linear_result}\n")
# -------------------------------
# Showcase Star Swarm
# -------------------------------
logger.info("Starting Star Swarm for financial analysis.")
star_result = star_swarm(agents, tasks)
logger.info(f"Star Swarm Result:\n{star_result}\n")
# -------------------------------
# Showcase Mesh Swarm
# -------------------------------
logger.info("Starting Mesh Swarm for financial analysis.")
mesh_result = mesh_swarm(agents, tasks)
logger.info(f"Mesh Swarm Result:\n{mesh_result}\n")
# -------------------------------
# Showcase Pyramid Swarm
# -------------------------------
logger.info("Starting Pyramid Swarm for financial analysis.")
pyramid_result = pyramid_swarm(agents, tasks)
logger.info(f"Pyramid Swarm Result:\n{pyramid_result}\n")
# -------------------------------
# Example: One-to-One Communication between Agents
# -------------------------------
logger.info(
"Starting One-to-One communication between Stock and News agents."
)
one_to_one_result = stock_analysis_agent.run(
"Analyze Apple stock performance, and then send the result to the News Summarization Agent"
)
news_summary_result = news_summarization_agent.run(one_to_one_result)
logger.info(
f"One-to-One Communication Result:\n{news_summary_result}\n"
)
# -------------------------------
# Example: Broadcasting to all agents
# -------------------------------
async def broadcast_task():
logger.info("Broadcasting task to all agents.")
task = "Summarize the overall stock market performance today."
await asyncio.gather(*[agent.run(task) for agent in agents])
asyncio.run(broadcast_task())
# -------------------------------
# Deep Comments & Explanations
# -------------------------------
"""
Explanation of Key Components:
1. **Agents**:
- We created three specialized agents for financial analysis: Stock Analysis, News Summarization, and Ratio Calculation.
- Each agent is provided with a custom system prompt that defines their unique task in analyzing stock data.
2. **Swarm Examples**:
- **Circular Swarm**: Agents take turns processing tasks in a circular manner.
- **Linear Swarm**: Tasks are processed sequentially by each agent.
- **Star Swarm**: The first agent (Stock Analysis) processes all tasks before distributing them to other agents.
- **Mesh Swarm**: Agents work on random tasks from the task queue.
- **Pyramid Swarm**: Agents are arranged in a pyramid structure, processing tasks layer by layer.
3. **One-to-One Communication**:
- This showcases how one agent can pass its result to another agent for further processing, useful for complex workflows where agents depend on each other.
4. **Broadcasting**:
- The broadcasting function demonstrates how a single task can be sent to all agents simultaneously. This can be useful for situations like summarizing daily stock market performance across multiple agents.
5. **Logging with Loguru**:
- We use `loguru` for detailed logging throughout the swarms. This helps to track the flow of information and responses from each agent.
"""
``` ```

@ -0,0 +1,175 @@
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from swarm_models import OpenAIChat
from tickr_agent.main import TickrAgent
from swarms.structs.swarming_architectures import (
circular_swarm,
linear_swarm,
mesh_swarm,
pyramid_swarm,
star_swarm,
)
# Load environment variables (API keys)
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
# Initialize the OpenAI model
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4", temperature=0.1
)
# Custom Financial Agent System Prompts
STOCK_ANALYSIS_PROMPT = """
You are an expert financial analyst. Your task is to analyze stock market data for a company
and provide insights on whether to buy, hold, or sell. Analyze trends, financial ratios, and market conditions.
"""
NEWS_SUMMARIZATION_PROMPT = """
You are a financial news expert. Summarize the latest news related to a company and provide insights on
how it could impact its stock price. Be concise and focus on the key takeaways.
"""
RATIO_CALCULATION_PROMPT = """
You are a financial ratio analyst. Your task is to calculate key financial ratios for a company
based on the available data, such as P/E ratio, debt-to-equity ratio, and return on equity.
Explain what each ratio means for investors.
"""
# Example Usage
# Define stock tickers
stocks = ["AAPL", "TSLA"]
# Initialize Financial Analysis Agents
stock_analysis_agent = TickrAgent(
agent_name="Stock-Analysis-Agent",
system_prompt=STOCK_ANALYSIS_PROMPT,
stocks=stocks,
)
news_summarization_agent = TickrAgent(
agent_name="News-Summarization-Agent",
system_prompt=NEWS_SUMMARIZATION_PROMPT,
stocks=stocks,
)
ratio_calculation_agent = TickrAgent(
agent_name="Ratio-Calculation-Agent",
system_prompt=RATIO_CALCULATION_PROMPT,
stocks=stocks,
)
# Create a list of agents for swarming
agents = [
stock_analysis_agent,
news_summarization_agent,
ratio_calculation_agent,
]
# Define financial analysis tasks
tasks = [
"Analyze the stock performance of Apple (AAPL) in the last 6 months.",
"Summarize the latest financial news on Tesla (TSLA).",
"Calculate the P/E ratio and debt-to-equity ratio for Amazon (AMZN).",
]
# -------------------------------# Showcase Circular Swarm
# -------------------------------
logger.info("Starting Circular Swarm for financial analysis.")
circular_result = circular_swarm(agents, tasks)
logger.info(f"Circular Swarm Result:\n{circular_result}\n")
# -------------------------------
# Showcase Linear Swarm
# -------------------------------
logger.info("Starting Linear Swarm for financial analysis.")
linear_result = linear_swarm(agents, tasks)
logger.info(f"Linear Swarm Result:\n{linear_result}\n")
# -------------------------------
# Showcase Star Swarm
# -------------------------------
logger.info("Starting Star Swarm for financial analysis.")
star_result = star_swarm(agents, tasks)
logger.info(f"Star Swarm Result:\n{star_result}\n")
# -------------------------------
# Showcase Mesh Swarm
# -------------------------------
logger.info("Starting Mesh Swarm for financial analysis.")
mesh_result = mesh_swarm(agents, tasks)
logger.info(f"Mesh Swarm Result:\n{mesh_result}\n")
# -------------------------------
# Showcase Pyramid Swarm
# -------------------------------
logger.info("Starting Pyramid Swarm for financial analysis.")
pyramid_result = pyramid_swarm(agents, tasks)
logger.info(f"Pyramid Swarm Result:\n{pyramid_result}\n")
# -------------------------------
# Example: One-to-One Communication between Agents
# -------------------------------
logger.info(
"Starting One-to-One communication between Stock and News agents."
)
one_to_one_result = stock_analysis_agent.run(
"Analyze Apple stock performance, and then send the result to the News Summarization Agent"
)
news_summary_result = news_summarization_agent.run(one_to_one_result)
logger.info(
f"One-to-One Communication Result:\n{news_summary_result}\n"
)
# -------------------------------
# Example: Broadcasting to all agents
# -------------------------------
async def broadcast_task():
logger.info("Broadcasting task to all agents.")
task = "Summarize the overall stock market performance today."
await asyncio.gather(*[agent.run(task) for agent in agents])
asyncio.run(broadcast_task())
# -------------------------------
# Deep Comments & Explanations
# -------------------------------
"""
Explanation of Key Components:
1. **Agents**:
- We created three specialized agents for financial analysis: Stock Analysis, News Summarization, and Ratio Calculation.
- Each agent is provided with a custom system prompt that defines their unique task in analyzing stock data.
2. **Swarm Examples**:
- **Circular Swarm**: Agents take turns processing tasks in a circular manner.
- **Linear Swarm**: Tasks are processed sequentially by each agent.
- **Star Swarm**: The first agent (Stock Analysis) processes all tasks before distributing them to other agents.
- **Mesh Swarm**: Agents work on random tasks from the task queue.
- **Pyramid Swarm**: Agents are arranged in a pyramid structure, processing tasks layer by layer.
3. **One-to-One Communication**:
- This showcases how one agent can pass its result to another agent for further processing, useful for complex workflows where agents depend on each other.
4. **Broadcasting**:
- The broadcasting function demonstrates how a single task can be sent to all agents simultaneously. This can be useful for situations like summarizing daily stock market performance across multiple agents.
5. **Logging with Loguru**:
- We use `loguru` for detailed logging throughout the swarms. This helps to track the flow of information and responses from each agent.
"""

@ -1,176 +1,224 @@
import asyncio import asyncio
import math import math
from typing import List from typing import List, Union
from loguru import logger
from pydantic import BaseModel
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.utils.loguru_logger import logger
from swarms.structs.conversation import Conversation
from swarms.structs.concat import concat_strings
from swarms.structs.omni_agent_types import AgentListType from swarms.structs.omni_agent_types import AgentListType
# from swarms.structs.swarm_registry import swarm_registry, SwarmRegistry
# Define Pydantic schema for logging agent responses
class AgentLog(BaseModel):
agent_name: str
task: str
response: str
# @swarm_registry
def circular_swarm(
name: str = "Circular Swarm",
description: str = "A circular swarm is a type of swarm where agents pass tasks in a circular manner.",
goal: str = None,
agents: AgentListType = None,
tasks: List[str] = None,
return_full_history: bool = True,
):
if not agents:
raise ValueError("Agents list cannot be empty.")
if not tasks: class Conversation(BaseModel):
raise ValueError("Tasks list cannot be empty.") logs: List[AgentLog] = []
conversation = Conversation( def add_log(
time_enabled=True, self, agent_name: str, task: str, response: str
) ) -> None:
log_entry = AgentLog(
agent_name=agent_name, task=task, response=response
)
self.logs.append(log_entry)
logger.info(
f"Agent: {agent_name} | Task: {task} | Response: {response}"
)
def return_history(self) -> dict:
return {
"history": [
{
"agent_name": log.agent_name,
"task": log.task,
"response": log.response,
}
for log in self.logs
]
}
# Circular Swarm: Agents pass tasks in a circular manner
def circular_swarm(
agents: AgentListType,
tasks: List[str],
return_full_history: bool = True,
) -> Union[str, List[str]]:
if not agents or not tasks:
raise ValueError("Agents and tasks lists cannot be empty.")
conversation = Conversation()
responses = [] responses = []
for task in tasks: for task in tasks:
for agent in agents: for agent in agents:
# Log the task response = agent.run(task)
out = agent.run(task) conversation.add_log(
# print(f"Task: {task}, Response {out}") agent_name=agent.agent_name,
# prompt = f"Task: {task}, Response {out}" task=task,
logger.info(f"Agent: {agent.agent_name} Response {out}") response=response,
conversation.add(
role=agent.agent_name,
content=out,
) )
responses.append(response)
# Response list return (
responses.append(out) conversation.return_history()
if return_full_history
if return_full_history: else responses
return conversation.return_history_as_string() )
else:
return responses
# @swarm_registry() def grid_swarm(agents: AgentListType, tasks: List[str]):
grid_size = int(
len(agents) ** 0.5
) # Assuming agents can form a perfect square grid
for i in range(grid_size):
for j in range(grid_size):
if tasks:
task = tasks.pop(0)
agents[i * grid_size + j].run(task)
# Linear Swarm: Agents process tasks in a sequential linear manner
def linear_swarm( def linear_swarm(
name: str = "Linear Swarm", agents: AgentListType,
description: str = "A linear swarm is a type of swarm where agents pass tasks in a linear manner.", tasks: List[str],
agents: AgentListType = None,
tasks: List[str] = None,
conversation: Conversation = None,
return_full_history: bool = True, return_full_history: bool = True,
): ) -> Union[str, List[str]]:
if not agents: if not agents or not tasks:
raise ValueError("Agents list cannot be empty.") raise ValueError("Agents and tasks lists cannot be empty.")
if not tasks:
raise ValueError("Tasks list cannot be empty.")
if not conversation:
conversation = Conversation(
time_enabled=True,
)
conversation = Conversation()
responses = [] responses = []
for i in range(len(agents)): for agent in agents:
if tasks: if tasks:
task = tasks.pop(0) task = tasks.pop(0)
out = agents[i].run(task) response = agent.run(task)
conversation.add_log(
conversation.add( agent_name=agent.agent_name,
role=agents[i].agent_name, task=task,
content=f"Task: {task}, Response {out}", response=response,
) )
responses.append(response)
responses.append(out) return (
conversation.return_history()
if return_full_history: if return_full_history
return conversation.return_history_as_string() else responses
else:
return responses
# print(SwarmRegistry().list_swarms())
# def linear_swarm(agents: AgentListType, tasks: List[str]):
# logger.info(f"Running linear swarm with {len(agents)} agents")
# for i in range(len(agents)):
# if tasks:
# task = tasks.pop(0)
# agents[i].run(task)
def star_swarm(agents: AgentListType, tasks: List[str]) -> str:
logger.info(
f"Running star swarm with {len(agents)} agents and {len(tasks)} tasks"
) )
if not agents:
raise ValueError("Agents list cannot be empty.")
if not tasks:
raise ValueError("Tasks list cannot be empty.")
conversation = Conversation(time_enabled=True) # Star Swarm: A central agent first processes all tasks, followed by others
center_agent = agents[0] def star_swarm(
agents: AgentListType,
tasks: List[str],
return_full_history: bool = True,
) -> Union[str, List[str]]:
if not agents or not tasks:
raise ValueError("Agents and tasks lists cannot be empty.")
conversation = Conversation()
center_agent = agents[0] # The central agent
responses = [] responses = []
for task in tasks: for task in tasks:
# Central agent processes the task
center_response = center_agent.run(task)
conversation.add_log(
agent_name=center_agent.agent_name,
task=task,
response=center_response,
)
responses.append(center_response)
out = center_agent.run(task) # Other agents process the same task
log = f"Agent: {center_agent.agent_name} Response {out}"
logger.info(log)
conversation.add(center_agent.agent_name, out)
responses.append(out)
for agent in agents[1:]: for agent in agents[1:]:
response = agent.run(task)
conversation.add_log(
agent_name=agent.agent_name,
task=task,
response=response,
)
responses.append(response)
output = agent.run(task) return (
log_two = f"Agent: {agent.agent_name} Response {output}" conversation.return_history()
logger.info(log_two) if return_full_history
conversation.add(agent.agent_name, output) else responses
responses.append(out) )
out = concat_strings(responses)
print(out)
return out
# Mesh Swarm: Agents work on tasks randomly from a task queue until all tasks are processed
def mesh_swarm(
agents: AgentListType,
tasks: List[str],
return_full_history: bool = True,
) -> Union[str, List[str]]:
if not agents or not tasks:
raise ValueError("Agents and tasks lists cannot be empty.")
def mesh_swarm(agents: AgentListType, tasks: List[str]): conversation = Conversation()
task_queue = tasks.copy() task_queue = tasks.copy()
responses = []
while task_queue: while task_queue:
for agent in agents: for agent in agents:
if task_queue: if task_queue:
task = task_queue.pop(0) task = task_queue.pop(0)
agent.run(task) response = agent.run(task)
conversation.add_log(
agent_name=agent.agent_name,
task=task,
response=response,
)
responses.append(response)
return (
conversation.return_history()
if return_full_history
else responses
)
def grid_swarm(agents: AgentListType, tasks: List[str]): # Pyramid Swarm: Agents are arranged in a pyramid structure
grid_size = int( def pyramid_swarm(
len(agents) ** 0.5 agents: AgentListType,
) # Assuming agents can form a perfect square grid tasks: List[str],
for i in range(grid_size): return_full_history: bool = True,
for j in range(grid_size): ) -> Union[str, List[str]]:
if tasks: if not agents or not tasks:
task = tasks.pop(0) raise ValueError("Agents and tasks lists cannot be empty.")
agents[i * grid_size + j].run(task)
conversation = Conversation()
responses = []
def pyramid_swarm(agents: AgentListType, tasks: List[str]):
levels = int( levels = int(
(-1 + (1 + 8 * len(agents)) ** 0.5) / 2 (-1 + (1 + 8 * len(agents)) ** 0.5) / 2
) # Assuming agents can form a perfect pyramid ) # Number of levels in the pyramid
for i in range(levels): for i in range(levels):
for j in range(i + 1): for j in range(i + 1):
if tasks: if tasks:
task = tasks.pop(0) task = tasks.pop(0)
agents[int(i * (i + 1) / 2 + j)].run(task) agent_index = int(i * (i + 1) / 2 + j)
response = agents[agent_index].run(task)
conversation.add_log(
agent_name=agents[agent_index].agent_name,
task=task,
response=response,
)
responses.append(response)
return (
conversation.return_history()
if return_full_history
else responses
)
def fibonacci_swarm(agents: AgentListType, tasks: List[str]): def fibonacci_swarm(agents: AgentListType, tasks: List[str]):
@ -318,86 +366,92 @@ async def one_to_three(
raise error raise error
async def broadcast( """
sender: Agent, This module contains functions for facilitating communication between agents in a swarm. It includes methods for one-to-one communication, broadcasting, and other swarm architectures.
agents: AgentListType, """
task: str,
):
# One-to-One Communication between two agents
def one_to_one(
sender: Agent, receiver: Agent, task: str, max_loops: int = 1
) -> str:
""" """
Broadcasts a message from the sender agent to a list of agents. Facilitates one-to-one communication between two agents. The sender and receiver agents exchange messages for a specified number of loops.
Args: Args:
sender (Agent): The agent sending the message. sender (Agent): The agent sending the message.
agents (AgentListType): The list of agents to receive the message. receiver (Agent): The agent receiving the message.
task (str): The message to be broadcasted. task (str): The message to be sent.
max_loops (int, optional): The number of times the sender and receiver exchange messages. Defaults to 1.
Raises:
Exception: If an error occurs during the broadcast.
Returns: Returns:
None str: The conversation history between the sender and receiver.
"""
if not sender:
raise ValueError("The sender cannot be empty.")
if not agents: Raises:
raise ValueError("The agents list cannot be empty.") Exception: If there is an error during the communication process.
"""
if not task: conversation = Conversation()
raise ValueError("The task cannot be empty.") responses = []
try: try:
receive_tasks = [] for _ in range(max_loops):
for agent in agents: # Sender processes the task
receive_tasks.append( sender_response = sender.run(task)
agent.receive_message(sender.agent_name, task) conversation.add_log(
agent_name=sender.agent_name,
task=task,
response=sender_response,
) )
responses.append(sender_response)
# Receiver processes the result of the sender
receiver_response = receiver.run(sender_response)
conversation.add_log(
agent_name=receiver.agent_name,
task=task,
response=receiver_response,
)
responses.append(receiver_response)
await asyncio.gather(*receive_tasks)
except Exception as error: except Exception as error:
logger.error( logger.error(
f"[ERROR][CLASS: Agent][METHOD: broadcast] {error}" f"Error during one_to_one communication: {error}"
) )
raise error raise error
return conversation.return_history()
def one_to_one(
sender: Agent, # Broadcasting: A message from one agent to many
receiver: Agent, async def broadcast(
task: str, sender: Agent, agents: AgentListType, task: str
max_loops: int = 1, ) -> None:
):
""" """
Sends a message from the sender agent to the receiver agent. Facilitates broadcasting of a message from one agent to multiple agents.
Args: Args:
sender (Agent): The agent sending the message. sender (Agent): The agent sending the message.
receiver (Agent): The agent to receive the message. agents (AgentListType): The list of agents to receive the message.
task (str): The message to be sent. task (str): The message to be sent.
Raises: Raises:
Exception: If an error occurs during the message sending. ValueError: If the sender, agents, or task is empty.
Exception: If there is an error during the broadcasting process.
Returns:
None
""" """
try: conversation = Conversation()
responses = []
responses.append(task)
for i in range(max_loops):
# Run the agent on the task then pass the response to the receiver if not sender or not agents or not task:
response = sender.run(task) raise ValueError("Sender, agents, and task cannot be empty.")
log = f"Agent {sender.agent_name} Response: {response}"
responses.append(log)
# Send the response to the receiver try:
out = receiver.run(concat_strings(responses)) receive_tasks = []
responses.append(out) for agent in agents:
receive_tasks.append(agent.run(task))
conversation.add_log(
agent_name=agent.agent_name, task=task, response=task
)
return concat_strings(responses) await asyncio.gather(*receive_tasks)
except Exception as error: except Exception as error:
logger.error( logger.error(f"Error during broadcast: {error}")
f"[ERROR][CLASS: Agent][METHOD: one_to_one] {error}"
)
raise error raise error

Loading…
Cancel
Save