From d25b43c9d975a1584fc7c720ad33ba77436ba7b1 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 6 Mar 2025 10:54:06 -0800 Subject: [PATCH] swarms api pricing docs --- docs/mkdocs.yml | 10 +- docs/swarms_cloud/api_pricing.md | 159 +++++ swarms/structs/agent.py | 8 +- swarms/structs/conversation.py | 52 +- swarms/structs/various_alt_swarms.py | 895 +++++++++++++++++++++++++++ 5 files changed, 1106 insertions(+), 18 deletions(-) create mode 100644 docs/swarms_cloud/api_pricing.md create mode 100644 swarms/structs/various_alt_swarms.py diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index f7b48b81..9fbaae94 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -270,13 +270,15 @@ nav: - Swarms Cloud API: # - Overview: "swarms_cloud/main.md" # - Overview: "swarms_cloud/vision.md" - - Overview: "swarms_cloud/launch.md" - - Deploying Swarms on Google Cloud Run: "swarms_cloud/cloud_run.md" + # - Overview: "swarms_cloud/launch.md" + - Overview: "swarms_cloud/swarms_api.md" + - Swarms API Pricing: "swarms_cloud/api_pricing.md" # - Swarms Cloud CLI: "swarms_cloud/cli.md" - - Swarm APIs: - - Swarms API: "swarms_cloud/swarms_api.md" + - Swarm Ecosystem APIs: - MCS API: "swarms_cloud/mcs_api.md" - CreateNow API: "swarms_cloud/create_api.md" + - Self Hosted Swarms: + - Deploying Swarms on Google Cloud Run: "swarms_cloud/cloud_run.md" - Swarms Memory: - Overview: "swarms_memory/index.md" - Memory Systems: diff --git a/docs/swarms_cloud/api_pricing.md b/docs/swarms_cloud/api_pricing.md new file mode 100644 index 00000000..849332ba --- /dev/null +++ b/docs/swarms_cloud/api_pricing.md @@ -0,0 +1,159 @@ +# Swarm Agent API Pricing Documentation + + +## Overview + +The Swarm Agent API provides a powerful platform for managing and executing Python agents in the cloud without requiring Docker or Kubernetes. This document outlines the pricing model, how costs are calculated, and how to purchase and manage your credits. + +Our pricing is designed to be transparent and cost-effective, with costs based on: +- Number of agents used +- Input and output token usage +- Execution time + +## Credit System + +The Swarm API operates on a credit-based system: + +- **Credits** are the currency used within the platform +- 1 credit = $1 USD +- Credits can be purchased with USD or $swarms Solana tokens +- Two types of credits: + - **Standard Credits**: Purchased credits that never expire + - **Free Credits**: Promotional credits that may have usage restrictions + +## Pricing Structure + +### Base Costs + +| Cost Component | Price | +|----------------|-------| +| Base cost per agent | $0.01 per agent | + +### Token Usage Costs + +| Token Type | Cost | +|------------|------| +| Input tokens | $2.00 per 1M tokens | +| Output tokens | $4.50 per 1M tokens | + +### Night-Time Discount + +To encourage efficient resource usage during off-peak hours, we offer significant discounts for operations performed during California night-time hours: + +| Time Period (Pacific Time) | Discount | +|----------------------------|----------| +| 8:00 PM to 6:00 AM | 75% off token costs | + +## Cost Calculation + +### Formula + +The total cost for a swarm execution is calculated as follows: + +``` +Total Cost = (Number of Agents × $0.01) + + (Total Input Tokens / 1M × $2.00 × Number of Agents) + + (Total Output Tokens / 1M × $4.50 × Number of Agents) +``` + +With night-time discount applied: +``` +Input Token Cost = Input Token Cost × 0.25 +Output Token Cost = Output Token Cost × 0.25 +``` + +### Example Scenarios + +#### Scenario 1: Basic Workflow (Day-time) +- 3 agents +- 10,000 input tokens total +- 25,000 output tokens total + +**Calculation:** +- Agent cost: 3 × $0.01 = $0.03 +- Input token cost: (10,000 / 1,000,000) × $2.00 × 3 = $0.06 +- Output token cost: (25,000 / 1,000,000) × $4.50 × 3 = $0.3375 +- **Total cost: $0.4275** + +#### Scenario 2: Complex Workflow (Night-time) +- 5 agents +- 50,000 input tokens total +- 125,000 output tokens total + +**Calculation:** +- Agent cost: 5 × $0.01 = $0.05 +- Input token cost: (50,000 / 1,000,000) × $2.00 × 5 × 0.25 = $0.125 +- Output token cost: (125,000 / 1,000,000) × $4.50 × 5 × 0.25 = $0.703125 +- **Total cost: $0.878125** + +## Purchasing Credits + +Credits can be purchased through our platform in two ways: + +1. **USD Payment** + - Available through our [account page](https://swarms.world/platform/account) + - Secure payment processing + - Minimum purchase: $10 + +2. **$swarms Token Payment** + - Use Solana-based $swarms tokens + - Tokens can be purchased on supported exchanges + - Connect your Solana wallet on our [account page](https://swarms.world/platform/account) + +## Free Credits + +We occasionally offer free credits to: +- New users (welcome bonus) +- During promotional periods +- For educational and research purposes + +Notes about free credits: +- Used before standard credits +- May have expiration dates +- May have usage restrictions + +## Billing and Usage Tracking + +Track your credit usage through our comprehensive logging and reporting features: + +1. **API Logs** + - Access detailed logs via the `/v1/swarm/logs` endpoint + - View cost breakdowns for each execution + +2. **Dashboard** + - Real-time credit balance display + - Historical usage graphs + - Detailed cost analysis + - Available at [https://swarms.world/platform/dashboard](https://swarms.world/platform/dashboard) + +## FAQ + +**Q: Is there a minimum credit purchase?** +A: Yes, the minimum credit purchase is $10 USD equivalent. + +**Q: Do credits expire?** +A: Standard credits do not expire. Free promotional credits may have expiration dates. + +**Q: How is the night-time discount applied?** +A: The system automatically detects the execution time based on Pacific Time (America/Los_Angeles) and applies a 75% discount to token costs for executions between 8:00 PM and 6:00 AM. + +**Q: What happens if I run out of credits during execution?** +A: Executions will fail with a 402 Payment Required error if sufficient credits are not available. We recommend maintaining a credit balance appropriate for your usage patterns. + +**Q: Can I get a refund for unused credits?** +A: Please contact our support team for refund requests for unused credits. + +**Q: Are there volume discounts available?** +A: Yes, please contact our sales team for enterprise pricing and volume discounts. + +## References + +- [Swarm API Documentation](https://docs.swarms.world) +- [Account Management Portal](https://swarms.world/platform/account) +- [Swarm Types Reference](https://docs.swarms.world/swarm-types) +- [Token Usage Guide](https://docs.swarms.world/token-usage) +- [API Reference](https://docs.swarms.world/api-reference) + +--- + +For additional questions or custom pricing options, please contact our support team at kye@swarms.world. \ No newline at end of file diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 5a1bfb3d..16cbf0a8 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -341,6 +341,7 @@ class Agent: load_state_path: str = None, role: agent_roles = "worker", no_print: bool = False, + tools_list_dictionary: Optional[List[Dict[str, Any]]] = None, *args, **kwargs, ): @@ -459,7 +460,7 @@ class Agent: self.load_state_path = load_state_path self.role = role self.no_print = no_print - + self.tools_list_dictionary = tools_list_dictionary # Initialize the short term memory self.short_memory = Conversation( system_prompt=system_prompt, @@ -595,6 +596,11 @@ class Agent: ) return llm + + + def prepare_tools_list_dictionary(self): + import json + return json.loads(self.tools_list_dictionary) def check_if_no_prompt_then_autogenerate(self, task: str = None): """ diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index 6880e7d9..de00f209 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -1,12 +1,13 @@ import datetime import json -from typing import Any, Optional +from typing import Any, Optional, Union import yaml from swarms.structs.base_structure import BaseStructure from typing import TYPE_CHECKING from swarms.utils.formatter import formatter + if TYPE_CHECKING: from swarms.structs.agent import ( Agent, @@ -110,7 +111,13 @@ class Conversation(BaseStructure): if tokenizer is not None: self.truncate_memory_with_tokenizer() - def add(self, role: str, content: str, *args, **kwargs): + def add( + self, + role: str, + content: Union[str, dict, list], + *args, + **kwargs, + ): """Add a message to the conversation history Args: @@ -118,18 +125,20 @@ class Conversation(BaseStructure): content (str): The content of the message """ - if self.time_enabled: - now = datetime.datetime.now() - timestamp = now.strftime("%Y-%m-%d %H:%M:%S") + now = datetime.datetime.now() + timestamp = now.strftime("%Y-%m-%d %H:%M:%S") + + if isinstance(content, dict) or isinstance(content, list): + message = { "role": role, "content": content, - "timestamp": timestamp, } + else: message = { "role": role, - "content": content, + "content": f"Time: {timestamp} \n{content}", } self.conversation_history.append(message) @@ -425,13 +434,30 @@ class Conversation(BaseStructure): for message in self.conversation_history ] + def add_tool_output_to_agent(self, role: str, tool_output: dict): + """ + Add a tool output to the conversation history + """ + self.add(role, tool_output) + + def return_json(self): + return json.dumps( + self.return_messages_as_dictionary(), indent=4 + ) + -# Example usage +# # Example usage +# # conversation = Conversation() # conversation = Conversation() # conversation.add("user", "Hello, how are you?") -# print(conversation.get_last_message_as_string()) -# print(conversation.return_messages_as_list()) -# conversation.add("assistant", "I am doing well, thanks.") -# # print(conversation.to_json()) -# print(type(conversation.to_dict())) +# conversation.add( +# "assistant", {"name": "tool_1", "output": "Hello, how are you?"} +# ) +# print(conversation.return_json()) + +# # print(conversation.get_last_message_as_string()) +# # print(conversation.return_messages_as_list()) +# # conversation.add("assistant", "I am doing well, thanks.") +# # # print(conversation.to_json()) +# # print(type(conversation.to_dict())) # # print(conversation.to_yaml()) diff --git a/swarms/structs/various_alt_swarms.py b/swarms/structs/various_alt_swarms.py new file mode 100644 index 00000000..ab12b9ad --- /dev/null +++ b/swarms/structs/various_alt_swarms.py @@ -0,0 +1,895 @@ +import math +from typing import List, Union, Dict + +from loguru import logger + +from swarms.structs.agent import Agent +from swarms.structs.omni_agent_types import AgentListType +from swarms.structs.conversation import Conversation + + +# Base Swarm class that all other swarm types will inherit from +class BaseSwarm: + def __init__(self, agents: AgentListType): + # Ensure agents is a flat list of Agent objects + self.agents = ( + [agent for sublist in agents for agent in sublist] + if isinstance(agents[0], list) + else agents + ) + self.conversation = Conversation() + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + # Implementation will be overridden by child classes + raise NotImplementedError( + "This method should be implemented by child classes" + ) + + def _format_return( + self, return_type: str + ) -> Union[Dict, List, str]: + """Format the return value based on the return_type""" + if return_type.lower() == "dict": + return self.conversation.return_messages_as_dictionary() + elif return_type.lower() == "list": + return self.conversation.return_messages_as_list() + elif return_type.lower() == "string": + return self.conversation.return_history_as_string() + else: + raise ValueError( + "return_type must be one of 'dict', 'list', or 'string'" + ) + + +class CircularSwarm(BaseSwarm): + """ + Implements a circular swarm where agents pass tasks in a circular manner. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the circular swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + responses = [] + + for task in tasks: + for agent in self.agents: + response = agent.run(task) + self.conversation.add( + role=agent.agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class LinearSwarm(BaseSwarm): + """ + Implements a linear swarm where agents process tasks sequentially. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the linear swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + tasks_copy = tasks.copy() + responses = [] + + for agent in self.agents: + if tasks_copy: + task = tasks_copy.pop(0) + response = agent.run(task) + self.conversation.add( + role=agent.agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class StarSwarm(BaseSwarm): + """ + Implements a star swarm where a central agent processes all tasks, followed by others. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the star swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + responses = [] + center_agent = self.agents[0] # The central agent + + for task in tasks: + # Central agent processes the task + center_response = center_agent.run(task) + self.conversation.add( + role=center_agent.agent_name, + content=center_response, + ) + responses.append(center_response) + + # Other agents process the same task + for agent in self.agents[1:]: + response = agent.run(task) + self.conversation.add( + role=agent.agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class MeshSwarm(BaseSwarm): + """ + Implements a mesh swarm where agents work on tasks randomly from a task queue. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the mesh swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + task_queue = tasks.copy() + responses = [] + + while task_queue: + for agent in self.agents: + if task_queue: + task = task_queue.pop(0) + response = agent.run(task) + self.conversation.add( + role=agent.agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class PyramidSwarm(BaseSwarm): + """ + Implements a pyramid swarm where agents are arranged in a pyramid structure. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the pyramid swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + tasks_copy = tasks.copy() + responses = [] + + levels = int( + (-1 + (1 + 8 * len(self.agents)) ** 0.5) / 2 + ) # Number of levels in the pyramid + + for i in range(levels): + for j in range(i + 1): + if tasks_copy: + task = tasks_copy.pop(0) + agent_index = int(i * (i + 1) / 2 + j) + if agent_index < len(self.agents): + response = self.agents[agent_index].run(task) + self.conversation.add( + role=self.agents[agent_index].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class FibonacciSwarm(BaseSwarm): + """ + Implements a Fibonacci swarm where agents are arranged according to the Fibonacci sequence. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Fibonacci swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + tasks_copy = tasks.copy() + responses = [] + + fib = [1, 1] + while len(fib) < len(self.agents): + fib.append(fib[-1] + fib[-2]) + + for i in range(len(fib)): + for j in range(fib[i]): + agent_index = int(sum(fib[:i]) + j) + if agent_index < len(self.agents) and tasks_copy: + task = tasks_copy.pop(0) + response = self.agents[agent_index].run(task) + self.conversation.add( + role=self.agents[agent_index].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class PrimeSwarm(BaseSwarm): + """ + Implements a Prime swarm where agents at prime indices process tasks. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Prime swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + tasks_copy = tasks.copy() + responses = [] + + primes = [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + ] # First 25 prime numbers + + for prime in primes: + if prime < len(self.agents) and tasks_copy: + task = tasks_copy.pop(0) + response = self.agents[prime].run(task) + self.conversation.add( + role=self.agents[prime].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class PowerSwarm(BaseSwarm): + """ + Implements a Power swarm where agents at power-of-2 indices process tasks. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Power swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + tasks_copy = tasks.copy() + responses = [] + + powers = [2**i for i in range(int(len(self.agents) ** 0.5))] + + for power in powers: + if power < len(self.agents) and tasks_copy: + task = tasks_copy.pop(0) + response = self.agents[power].run(task) + self.conversation.add( + role=self.agents[power].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class LogSwarm(BaseSwarm): + """ + Implements a Log swarm where agents at logarithmic indices process tasks. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Log swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + tasks_copy = tasks.copy() + responses = [] + + for i in range(len(self.agents)): + index = 2**i + if index < len(self.agents) and tasks_copy: + task = tasks_copy.pop(0) + response = self.agents[index].run(task) + self.conversation.add( + role=self.agents[index].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class ExponentialSwarm(BaseSwarm): + """ + Implements an Exponential swarm where agents at exponential indices process tasks. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Exponential swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + tasks_copy = tasks.copy() + responses = [] + + for i in range(len(self.agents)): + index = min(int(2**i), len(self.agents) - 1) + if tasks_copy: + task = tasks_copy.pop(0) + response = self.agents[index].run(task) + self.conversation.add( + role=self.agents[index].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class GeometricSwarm(BaseSwarm): + """ + Implements a Geometric swarm where agents at geometrically increasing indices process tasks. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Geometric swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + tasks_copy = tasks.copy() + responses = [] + ratio = 2 + + for i in range(len(self.agents)): + index = min(int(ratio**i), len(self.agents) - 1) + if tasks_copy: + task = tasks_copy.pop(0) + response = self.agents[index].run(task) + self.conversation.add( + role=self.agents[index].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class HarmonicSwarm(BaseSwarm): + """ + Implements a Harmonic swarm where agents at harmonically spaced indices process tasks. + """ + + def run( + self, tasks: List[str], return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Harmonic swarm with the given tasks + + Args: + tasks: List of tasks to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not tasks: + raise ValueError( + "Agents and tasks lists cannot be empty." + ) + + tasks_copy = tasks.copy() + responses = [] + + for i in range(1, len(self.agents) + 1): + index = min( + int(len(self.agents) / i), len(self.agents) - 1 + ) + if tasks_copy: + task = tasks_copy.pop(0) + response = self.agents[index].run(task) + self.conversation.add( + role=self.agents[index].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class StaircaseSwarm(BaseSwarm): + """ + Implements a Staircase swarm where agents at staircase-patterned indices process a task. + """ + + def run( + self, task: str, return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Staircase swarm with the given task + + Args: + task: Task to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not task: + raise ValueError("Agents and task cannot be empty.") + + responses = [] + step = len(self.agents) // 5 + + for i in range(len(self.agents)): + index = (i // step) * step + if index < len(self.agents): + response = self.agents[index].run(task) + self.conversation.add( + role=self.agents[index].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class SigmoidSwarm(BaseSwarm): + """ + Implements a Sigmoid swarm where agents at sigmoid-distributed indices process a task. + """ + + def run( + self, task: str, return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Sigmoid swarm with the given task + + Args: + task: Task to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not task: + raise ValueError("Agents and task cannot be empty.") + + responses = [] + + for i in range(len(self.agents)): + index = int(len(self.agents) / (1 + math.exp(-i))) + if index < len(self.agents): + response = self.agents[index].run(task) + self.conversation.add( + role=self.agents[index].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +class SinusoidalSwarm(BaseSwarm): + """ + Implements a Sinusoidal swarm where agents at sinusoidally-distributed indices process a task. + """ + + def run( + self, task: str, return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the Sinusoidal swarm with the given task + + Args: + task: Task to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.agents or not task: + raise ValueError("Agents and task cannot be empty.") + + responses = [] + + for i in range(len(self.agents)): + index = int((math.sin(i) + 1) / 2 * len(self.agents)) + if index < len(self.agents): + response = self.agents[index].run(task) + self.conversation.add( + role=self.agents[index].agent_name, + content=response, + ) + responses.append(response) + + return self._format_return(return_type) + + +# Communication classes +class OneToOne: + """ + Facilitates one-to-one communication between two agents. + """ + + def __init__(self, sender: Agent, receiver: Agent): + self.sender = sender + self.receiver = receiver + self.conversation = Conversation() + + def run( + self, task: str, max_loops: int = 1, return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the one-to-one communication with the given task + + Args: + task: Task to be processed + max_loops: Number of exchange iterations + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.sender or not self.receiver or not task: + raise ValueError( + "Sender, receiver, and task cannot be empty." + ) + + responses = [] + + try: + for loop in range(max_loops): + # Sender processes the task + sender_response = self.sender.run(task) + self.conversation.add( + role=self.sender.agent_name, + content=sender_response, + ) + responses.append(sender_response) + + # Receiver processes the result of the sender + receiver_response = self.receiver.run(sender_response) + self.conversation.add( + role=self.receiver.agent_name, + content=receiver_response, + ) + responses.append(receiver_response) + + # Update task for next loop if needed + if loop < max_loops - 1: + task = receiver_response + + except Exception as error: + logger.error( + f"Error during one_to_one communication: {error}" + ) + raise error + + if return_type.lower() == "dict": + return self.conversation.return_messages_as_dictionary() + elif return_type.lower() == "list": + return self.conversation.return_messages_as_list() + elif return_type.lower() == "string": + return self.conversation.return_history_as_string() + else: + raise ValueError( + "return_type must be one of 'dict', 'list', or 'string'" + ) + + +class Broadcast: + """ + Facilitates broadcasting from one agent to many agents. + """ + + def __init__(self, sender: Agent, receivers: AgentListType): + self.sender = sender + self.receivers = ( + [agent for sublist in receivers for agent in sublist] + if isinstance(receivers[0], list) + else receivers + ) + self.conversation = Conversation() + + def run( + self, task: str, return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the broadcast communication with the given task + + Args: + task: Task to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.sender or not self.receivers or not task: + raise ValueError( + "Sender, receivers, and task cannot be empty." + ) + + try: + # First get the sender's broadcast message + broadcast_message = self.sender.run(task) + self.conversation.add( + role=self.sender.agent_name, + content=broadcast_message, + ) + + # Then have all receivers process it + for agent in self.receivers: + response = agent.run(broadcast_message) + self.conversation.add( + role=agent.agent_name, + content=response, + ) + + if return_type.lower() == "dict": + return ( + self.conversation.return_messages_as_dictionary() + ) + elif return_type.lower() == "list": + return self.conversation.return_messages_as_list() + elif return_type.lower() == "string": + return self.conversation.return_history_as_string() + else: + raise ValueError( + "return_type must be one of 'dict', 'list', or 'string'" + ) + + except Exception as error: + logger.error(f"Error during broadcast: {error}") + raise error + + +class OneToThree: + """ + Facilitates one-to-three communication from one agent to exactly three agents. + """ + + def __init__(self, sender: Agent, receivers: AgentListType): + if len(receivers) != 3: + raise ValueError( + "The number of receivers must be exactly 3." + ) + + self.sender = sender + self.receivers = receivers + self.conversation = Conversation() + + def run( + self, task: str, return_type: str = "dict" + ) -> Union[Dict, List, str]: + """ + Run the one-to-three communication with the given task + + Args: + task: Task to be processed + return_type: Type of return value, one of 'dict', 'list', or 'string' + + Returns: + Union[Dict, List, str]: The conversation history in the requested format + """ + if not self.sender or not task: + raise ValueError("Sender and task cannot be empty.") + + try: + # Get sender's message + sender_message = self.sender.run(task) + self.conversation.add( + role=self.sender.agent_name, + content=sender_message, + ) + + # Have each receiver process the message + for i, agent in enumerate(self.receivers): + response = agent.run(sender_message) + self.conversation.add( + role=agent.agent_name, + content=response, + ) + + if return_type.lower() == "dict": + return ( + self.conversation.return_messages_as_dictionary() + ) + elif return_type.lower() == "list": + return self.conversation.return_messages_as_list() + elif return_type.lower() == "string": + return self.conversation.return_history_as_string() + else: + raise ValueError( + "return_type must be one of 'dict', 'list', or 'string'" + ) + + except Exception as error: + logger.error(f"Error in one_to_three: {error}") + raise error