swarms api pricing docs

pull/791/merge
Kye Gomez 1 month ago
parent fdb2be5ee5
commit d25b43c9d9

@ -270,13 +270,15 @@ nav:
- Swarms Cloud API: - Swarms Cloud API:
# - Overview: "swarms_cloud/main.md" # - Overview: "swarms_cloud/main.md"
# - Overview: "swarms_cloud/vision.md" # - Overview: "swarms_cloud/vision.md"
- Overview: "swarms_cloud/launch.md" # - Overview: "swarms_cloud/launch.md"
- Deploying Swarms on Google Cloud Run: "swarms_cloud/cloud_run.md" - Overview: "swarms_cloud/swarms_api.md"
- Swarms API Pricing: "swarms_cloud/api_pricing.md"
# - Swarms Cloud CLI: "swarms_cloud/cli.md" # - Swarms Cloud CLI: "swarms_cloud/cli.md"
- Swarm APIs: - Swarm Ecosystem APIs:
- Swarms API: "swarms_cloud/swarms_api.md"
- MCS API: "swarms_cloud/mcs_api.md" - MCS API: "swarms_cloud/mcs_api.md"
- CreateNow API: "swarms_cloud/create_api.md" - CreateNow API: "swarms_cloud/create_api.md"
- Self Hosted Swarms:
- Deploying Swarms on Google Cloud Run: "swarms_cloud/cloud_run.md"
- Swarms Memory: - Swarms Memory:
- Overview: "swarms_memory/index.md" - Overview: "swarms_memory/index.md"
- Memory Systems: - Memory Systems:

@ -0,0 +1,159 @@
# Swarm Agent API Pricing Documentation
## Overview
The Swarm Agent API provides a powerful platform for managing and executing Python agents in the cloud without requiring Docker or Kubernetes. This document outlines the pricing model, how costs are calculated, and how to purchase and manage your credits.
Our pricing is designed to be transparent and cost-effective, with costs based on:
- Number of agents used
- Input and output token usage
- Execution time
## Credit System
The Swarm API operates on a credit-based system:
- **Credits** are the currency used within the platform
- 1 credit = $1 USD
- Credits can be purchased with USD or $swarms Solana tokens
- Two types of credits:
- **Standard Credits**: Purchased credits that never expire
- **Free Credits**: Promotional credits that may have usage restrictions
## Pricing Structure
### Base Costs
| Cost Component | Price |
|----------------|-------|
| Base cost per agent | $0.01 per agent |
### Token Usage Costs
| Token Type | Cost |
|------------|------|
| Input tokens | $2.00 per 1M tokens |
| Output tokens | $4.50 per 1M tokens |
### Night-Time Discount
To encourage efficient resource usage during off-peak hours, we offer significant discounts for operations performed during California night-time hours:
| Time Period (Pacific Time) | Discount |
|----------------------------|----------|
| 8:00 PM to 6:00 AM | 75% off token costs |
## Cost Calculation
### Formula
The total cost for a swarm execution is calculated as follows:
```
Total Cost = (Number of Agents × $0.01) +
(Total Input Tokens / 1M × $2.00 × Number of Agents) +
(Total Output Tokens / 1M × $4.50 × Number of Agents)
```
With night-time discount applied:
```
Input Token Cost = Input Token Cost × 0.25
Output Token Cost = Output Token Cost × 0.25
```
### Example Scenarios
#### Scenario 1: Basic Workflow (Day-time)
- 3 agents
- 10,000 input tokens total
- 25,000 output tokens total
**Calculation:**
- Agent cost: 3 × $0.01 = $0.03
- Input token cost: (10,000 / 1,000,000) × $2.00 × 3 = $0.06
- Output token cost: (25,000 / 1,000,000) × $4.50 × 3 = $0.3375
- **Total cost: $0.4275**
#### Scenario 2: Complex Workflow (Night-time)
- 5 agents
- 50,000 input tokens total
- 125,000 output tokens total
**Calculation:**
- Agent cost: 5 × $0.01 = $0.05
- Input token cost: (50,000 / 1,000,000) × $2.00 × 5 × 0.25 = $0.125
- Output token cost: (125,000 / 1,000,000) × $4.50 × 5 × 0.25 = $0.703125
- **Total cost: $0.878125**
## Purchasing Credits
Credits can be purchased through our platform in two ways:
1. **USD Payment**
- Available through our [account page](https://swarms.world/platform/account)
- Secure payment processing
- Minimum purchase: $10
2. **$swarms Token Payment**
- Use Solana-based $swarms tokens
- Tokens can be purchased on supported exchanges
- Connect your Solana wallet on our [account page](https://swarms.world/platform/account)
## Free Credits
We occasionally offer free credits to:
- New users (welcome bonus)
- During promotional periods
- For educational and research purposes
Notes about free credits:
- Used before standard credits
- May have expiration dates
- May have usage restrictions
## Billing and Usage Tracking
Track your credit usage through our comprehensive logging and reporting features:
1. **API Logs**
- Access detailed logs via the `/v1/swarm/logs` endpoint
- View cost breakdowns for each execution
2. **Dashboard**
- Real-time credit balance display
- Historical usage graphs
- Detailed cost analysis
- Available at [https://swarms.world/platform/dashboard](https://swarms.world/platform/dashboard)
## FAQ
**Q: Is there a minimum credit purchase?**
A: Yes, the minimum credit purchase is $10 USD equivalent.
**Q: Do credits expire?**
A: Standard credits do not expire. Free promotional credits may have expiration dates.
**Q: How is the night-time discount applied?**
A: The system automatically detects the execution time based on Pacific Time (America/Los_Angeles) and applies a 75% discount to token costs for executions between 8:00 PM and 6:00 AM.
**Q: What happens if I run out of credits during execution?**
A: Executions will fail with a 402 Payment Required error if sufficient credits are not available. We recommend maintaining a credit balance appropriate for your usage patterns.
**Q: Can I get a refund for unused credits?**
A: Please contact our support team for refund requests for unused credits.
**Q: Are there volume discounts available?**
A: Yes, please contact our sales team for enterprise pricing and volume discounts.
## References
- [Swarm API Documentation](https://docs.swarms.world)
- [Account Management Portal](https://swarms.world/platform/account)
- [Swarm Types Reference](https://docs.swarms.world/swarm-types)
- [Token Usage Guide](https://docs.swarms.world/token-usage)
- [API Reference](https://docs.swarms.world/api-reference)
---
For additional questions or custom pricing options, please contact our support team at kye@swarms.world.

@ -341,6 +341,7 @@ class Agent:
load_state_path: str = None, load_state_path: str = None,
role: agent_roles = "worker", role: agent_roles = "worker",
no_print: bool = False, no_print: bool = False,
tools_list_dictionary: Optional[List[Dict[str, Any]]] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -459,7 +460,7 @@ class Agent:
self.load_state_path = load_state_path self.load_state_path = load_state_path
self.role = role self.role = role
self.no_print = no_print self.no_print = no_print
self.tools_list_dictionary = tools_list_dictionary
# Initialize the short term memory # Initialize the short term memory
self.short_memory = Conversation( self.short_memory = Conversation(
system_prompt=system_prompt, system_prompt=system_prompt,
@ -596,6 +597,11 @@ class Agent:
return llm return llm
def prepare_tools_list_dictionary(self):
import json
return json.loads(self.tools_list_dictionary)
def check_if_no_prompt_then_autogenerate(self, task: str = None): def check_if_no_prompt_then_autogenerate(self, task: str = None):
""" """
Checks if auto_generate_prompt is enabled and generates a prompt by combining agent name, description and system prompt if available. Checks if auto_generate_prompt is enabled and generates a prompt by combining agent name, description and system prompt if available.

@ -1,12 +1,13 @@
import datetime import datetime
import json import json
from typing import Any, Optional from typing import Any, Optional, Union
import yaml import yaml
from swarms.structs.base_structure import BaseStructure from swarms.structs.base_structure import BaseStructure
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from swarms.utils.formatter import formatter from swarms.utils.formatter import formatter
if TYPE_CHECKING: if TYPE_CHECKING:
from swarms.structs.agent import ( from swarms.structs.agent import (
Agent, Agent,
@ -110,7 +111,13 @@ class Conversation(BaseStructure):
if tokenizer is not None: if tokenizer is not None:
self.truncate_memory_with_tokenizer() self.truncate_memory_with_tokenizer()
def add(self, role: str, content: str, *args, **kwargs): def add(
self,
role: str,
content: Union[str, dict, list],
*args,
**kwargs,
):
"""Add a message to the conversation history """Add a message to the conversation history
Args: Args:
@ -118,18 +125,20 @@ class Conversation(BaseStructure):
content (str): The content of the message content (str): The content of the message
""" """
if self.time_enabled: now = datetime.datetime.now()
now = datetime.datetime.now() timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
if isinstance(content, dict) or isinstance(content, list):
message = { message = {
"role": role, "role": role,
"content": content, "content": content,
"timestamp": timestamp,
} }
else: else:
message = { message = {
"role": role, "role": role,
"content": content, "content": f"Time: {timestamp} \n{content}",
} }
self.conversation_history.append(message) self.conversation_history.append(message)
@ -425,13 +434,30 @@ class Conversation(BaseStructure):
for message in self.conversation_history for message in self.conversation_history
] ]
def add_tool_output_to_agent(self, role: str, tool_output: dict):
"""
Add a tool output to the conversation history
"""
self.add(role, tool_output)
def return_json(self):
return json.dumps(
self.return_messages_as_dictionary(), indent=4
)
# Example usage # # Example usage
# # conversation = Conversation()
# conversation = Conversation() # conversation = Conversation()
# conversation.add("user", "Hello, how are you?") # conversation.add("user", "Hello, how are you?")
# print(conversation.get_last_message_as_string()) # conversation.add(
# print(conversation.return_messages_as_list()) # "assistant", {"name": "tool_1", "output": "Hello, how are you?"}
# conversation.add("assistant", "I am doing well, thanks.") # )
# # print(conversation.to_json()) # print(conversation.return_json())
# print(type(conversation.to_dict()))
# # print(conversation.get_last_message_as_string())
# # print(conversation.return_messages_as_list())
# # conversation.add("assistant", "I am doing well, thanks.")
# # # print(conversation.to_json())
# # print(type(conversation.to_dict()))
# # print(conversation.to_yaml()) # # print(conversation.to_yaml())

@ -0,0 +1,895 @@
import math
from typing import List, Union, Dict
from loguru import logger
from swarms.structs.agent import Agent
from swarms.structs.omni_agent_types import AgentListType
from swarms.structs.conversation import Conversation
# Base Swarm class that all other swarm types will inherit from
class BaseSwarm:
def __init__(self, agents: AgentListType):
# Ensure agents is a flat list of Agent objects
self.agents = (
[agent for sublist in agents for agent in sublist]
if isinstance(agents[0], list)
else agents
)
self.conversation = Conversation()
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
# Implementation will be overridden by child classes
raise NotImplementedError(
"This method should be implemented by child classes"
)
def _format_return(
self, return_type: str
) -> Union[Dict, List, str]:
"""Format the return value based on the return_type"""
if return_type.lower() == "dict":
return self.conversation.return_messages_as_dictionary()
elif return_type.lower() == "list":
return self.conversation.return_messages_as_list()
elif return_type.lower() == "string":
return self.conversation.return_history_as_string()
else:
raise ValueError(
"return_type must be one of 'dict', 'list', or 'string'"
)
class CircularSwarm(BaseSwarm):
"""
Implements a circular swarm where agents pass tasks in a circular manner.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the circular swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
responses = []
for task in tasks:
for agent in self.agents:
response = agent.run(task)
self.conversation.add(
role=agent.agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class LinearSwarm(BaseSwarm):
"""
Implements a linear swarm where agents process tasks sequentially.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the linear swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
tasks_copy = tasks.copy()
responses = []
for agent in self.agents:
if tasks_copy:
task = tasks_copy.pop(0)
response = agent.run(task)
self.conversation.add(
role=agent.agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class StarSwarm(BaseSwarm):
"""
Implements a star swarm where a central agent processes all tasks, followed by others.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the star swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
responses = []
center_agent = self.agents[0] # The central agent
for task in tasks:
# Central agent processes the task
center_response = center_agent.run(task)
self.conversation.add(
role=center_agent.agent_name,
content=center_response,
)
responses.append(center_response)
# Other agents process the same task
for agent in self.agents[1:]:
response = agent.run(task)
self.conversation.add(
role=agent.agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class MeshSwarm(BaseSwarm):
"""
Implements a mesh swarm where agents work on tasks randomly from a task queue.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the mesh swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
task_queue = tasks.copy()
responses = []
while task_queue:
for agent in self.agents:
if task_queue:
task = task_queue.pop(0)
response = agent.run(task)
self.conversation.add(
role=agent.agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class PyramidSwarm(BaseSwarm):
"""
Implements a pyramid swarm where agents are arranged in a pyramid structure.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the pyramid swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
tasks_copy = tasks.copy()
responses = []
levels = int(
(-1 + (1 + 8 * len(self.agents)) ** 0.5) / 2
) # Number of levels in the pyramid
for i in range(levels):
for j in range(i + 1):
if tasks_copy:
task = tasks_copy.pop(0)
agent_index = int(i * (i + 1) / 2 + j)
if agent_index < len(self.agents):
response = self.agents[agent_index].run(task)
self.conversation.add(
role=self.agents[agent_index].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class FibonacciSwarm(BaseSwarm):
"""
Implements a Fibonacci swarm where agents are arranged according to the Fibonacci sequence.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Fibonacci swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
tasks_copy = tasks.copy()
responses = []
fib = [1, 1]
while len(fib) < len(self.agents):
fib.append(fib[-1] + fib[-2])
for i in range(len(fib)):
for j in range(fib[i]):
agent_index = int(sum(fib[:i]) + j)
if agent_index < len(self.agents) and tasks_copy:
task = tasks_copy.pop(0)
response = self.agents[agent_index].run(task)
self.conversation.add(
role=self.agents[agent_index].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class PrimeSwarm(BaseSwarm):
"""
Implements a Prime swarm where agents at prime indices process tasks.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Prime swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
tasks_copy = tasks.copy()
responses = []
primes = [
2,
3,
5,
7,
11,
13,
17,
19,
23,
29,
31,
37,
41,
43,
47,
53,
59,
61,
67,
71,
73,
79,
83,
89,
97,
] # First 25 prime numbers
for prime in primes:
if prime < len(self.agents) and tasks_copy:
task = tasks_copy.pop(0)
response = self.agents[prime].run(task)
self.conversation.add(
role=self.agents[prime].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class PowerSwarm(BaseSwarm):
"""
Implements a Power swarm where agents at power-of-2 indices process tasks.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Power swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
tasks_copy = tasks.copy()
responses = []
powers = [2**i for i in range(int(len(self.agents) ** 0.5))]
for power in powers:
if power < len(self.agents) and tasks_copy:
task = tasks_copy.pop(0)
response = self.agents[power].run(task)
self.conversation.add(
role=self.agents[power].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class LogSwarm(BaseSwarm):
"""
Implements a Log swarm where agents at logarithmic indices process tasks.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Log swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
tasks_copy = tasks.copy()
responses = []
for i in range(len(self.agents)):
index = 2**i
if index < len(self.agents) and tasks_copy:
task = tasks_copy.pop(0)
response = self.agents[index].run(task)
self.conversation.add(
role=self.agents[index].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class ExponentialSwarm(BaseSwarm):
"""
Implements an Exponential swarm where agents at exponential indices process tasks.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Exponential swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
tasks_copy = tasks.copy()
responses = []
for i in range(len(self.agents)):
index = min(int(2**i), len(self.agents) - 1)
if tasks_copy:
task = tasks_copy.pop(0)
response = self.agents[index].run(task)
self.conversation.add(
role=self.agents[index].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class GeometricSwarm(BaseSwarm):
"""
Implements a Geometric swarm where agents at geometrically increasing indices process tasks.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Geometric swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
tasks_copy = tasks.copy()
responses = []
ratio = 2
for i in range(len(self.agents)):
index = min(int(ratio**i), len(self.agents) - 1)
if tasks_copy:
task = tasks_copy.pop(0)
response = self.agents[index].run(task)
self.conversation.add(
role=self.agents[index].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class HarmonicSwarm(BaseSwarm):
"""
Implements a Harmonic swarm where agents at harmonically spaced indices process tasks.
"""
def run(
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Harmonic swarm with the given tasks
Args:
tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not tasks:
raise ValueError(
"Agents and tasks lists cannot be empty."
)
tasks_copy = tasks.copy()
responses = []
for i in range(1, len(self.agents) + 1):
index = min(
int(len(self.agents) / i), len(self.agents) - 1
)
if tasks_copy:
task = tasks_copy.pop(0)
response = self.agents[index].run(task)
self.conversation.add(
role=self.agents[index].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class StaircaseSwarm(BaseSwarm):
"""
Implements a Staircase swarm where agents at staircase-patterned indices process a task.
"""
def run(
self, task: str, return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Staircase swarm with the given task
Args:
task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not task:
raise ValueError("Agents and task cannot be empty.")
responses = []
step = len(self.agents) // 5
for i in range(len(self.agents)):
index = (i // step) * step
if index < len(self.agents):
response = self.agents[index].run(task)
self.conversation.add(
role=self.agents[index].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class SigmoidSwarm(BaseSwarm):
"""
Implements a Sigmoid swarm where agents at sigmoid-distributed indices process a task.
"""
def run(
self, task: str, return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Sigmoid swarm with the given task
Args:
task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not task:
raise ValueError("Agents and task cannot be empty.")
responses = []
for i in range(len(self.agents)):
index = int(len(self.agents) / (1 + math.exp(-i)))
if index < len(self.agents):
response = self.agents[index].run(task)
self.conversation.add(
role=self.agents[index].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
class SinusoidalSwarm(BaseSwarm):
"""
Implements a Sinusoidal swarm where agents at sinusoidally-distributed indices process a task.
"""
def run(
self, task: str, return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the Sinusoidal swarm with the given task
Args:
task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.agents or not task:
raise ValueError("Agents and task cannot be empty.")
responses = []
for i in range(len(self.agents)):
index = int((math.sin(i) + 1) / 2 * len(self.agents))
if index < len(self.agents):
response = self.agents[index].run(task)
self.conversation.add(
role=self.agents[index].agent_name,
content=response,
)
responses.append(response)
return self._format_return(return_type)
# Communication classes
class OneToOne:
"""
Facilitates one-to-one communication between two agents.
"""
def __init__(self, sender: Agent, receiver: Agent):
self.sender = sender
self.receiver = receiver
self.conversation = Conversation()
def run(
self, task: str, max_loops: int = 1, return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the one-to-one communication with the given task
Args:
task: Task to be processed
max_loops: Number of exchange iterations
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.sender or not self.receiver or not task:
raise ValueError(
"Sender, receiver, and task cannot be empty."
)
responses = []
try:
for loop in range(max_loops):
# Sender processes the task
sender_response = self.sender.run(task)
self.conversation.add(
role=self.sender.agent_name,
content=sender_response,
)
responses.append(sender_response)
# Receiver processes the result of the sender
receiver_response = self.receiver.run(sender_response)
self.conversation.add(
role=self.receiver.agent_name,
content=receiver_response,
)
responses.append(receiver_response)
# Update task for next loop if needed
if loop < max_loops - 1:
task = receiver_response
except Exception as error:
logger.error(
f"Error during one_to_one communication: {error}"
)
raise error
if return_type.lower() == "dict":
return self.conversation.return_messages_as_dictionary()
elif return_type.lower() == "list":
return self.conversation.return_messages_as_list()
elif return_type.lower() == "string":
return self.conversation.return_history_as_string()
else:
raise ValueError(
"return_type must be one of 'dict', 'list', or 'string'"
)
class Broadcast:
"""
Facilitates broadcasting from one agent to many agents.
"""
def __init__(self, sender: Agent, receivers: AgentListType):
self.sender = sender
self.receivers = (
[agent for sublist in receivers for agent in sublist]
if isinstance(receivers[0], list)
else receivers
)
self.conversation = Conversation()
def run(
self, task: str, return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the broadcast communication with the given task
Args:
task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.sender or not self.receivers or not task:
raise ValueError(
"Sender, receivers, and task cannot be empty."
)
try:
# First get the sender's broadcast message
broadcast_message = self.sender.run(task)
self.conversation.add(
role=self.sender.agent_name,
content=broadcast_message,
)
# Then have all receivers process it
for agent in self.receivers:
response = agent.run(broadcast_message)
self.conversation.add(
role=agent.agent_name,
content=response,
)
if return_type.lower() == "dict":
return (
self.conversation.return_messages_as_dictionary()
)
elif return_type.lower() == "list":
return self.conversation.return_messages_as_list()
elif return_type.lower() == "string":
return self.conversation.return_history_as_string()
else:
raise ValueError(
"return_type must be one of 'dict', 'list', or 'string'"
)
except Exception as error:
logger.error(f"Error during broadcast: {error}")
raise error
class OneToThree:
"""
Facilitates one-to-three communication from one agent to exactly three agents.
"""
def __init__(self, sender: Agent, receivers: AgentListType):
if len(receivers) != 3:
raise ValueError(
"The number of receivers must be exactly 3."
)
self.sender = sender
self.receivers = receivers
self.conversation = Conversation()
def run(
self, task: str, return_type: str = "dict"
) -> Union[Dict, List, str]:
"""
Run the one-to-three communication with the given task
Args:
task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns:
Union[Dict, List, str]: The conversation history in the requested format
"""
if not self.sender or not task:
raise ValueError("Sender and task cannot be empty.")
try:
# Get sender's message
sender_message = self.sender.run(task)
self.conversation.add(
role=self.sender.agent_name,
content=sender_message,
)
# Have each receiver process the message
for i, agent in enumerate(self.receivers):
response = agent.run(sender_message)
self.conversation.add(
role=agent.agent_name,
content=response,
)
if return_type.lower() == "dict":
return (
self.conversation.return_messages_as_dictionary()
)
elif return_type.lower() == "list":
return self.conversation.return_messages_as_list()
elif return_type.lower() == "string":
return self.conversation.return_history_as_string()
else:
raise ValueError(
"return_type must be one of 'dict', 'list', or 'string'"
)
except Exception as error:
logger.error(f"Error in one_to_three: {error}")
raise error
Loading…
Cancel
Save