From ed46063dccfc68f22c866bde7599b4fe10d16d1c Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Fri, 9 May 2025 12:35:45 -0700 Subject: [PATCH] updated docs for swarms api --- docs/swarms_cloud/best_practices.md | 19 +- docs/swarms_cloud/swarms_api.md | 153 ++++++------ swarms/structs/agent.py | 14 +- swarms/structs/conversation.py | 129 +++++++++- benchmark_init.py => tests/benchmark_init.py | 0 tests/structs/test_conversation_cache.py | 241 +++++++++++++++++++ 6 files changed, 455 insertions(+), 101 deletions(-) rename benchmark_init.py => tests/benchmark_init.py (100%) create mode 100644 tests/structs/test_conversation_cache.py diff --git a/docs/swarms_cloud/best_practices.md b/docs/swarms_cloud/best_practices.md index 9a33263f..76bb1e21 100644 --- a/docs/swarms_cloud/best_practices.md +++ b/docs/swarms_cloud/best_practices.md @@ -45,6 +45,16 @@ This comprehensive guide outlines production-grade best practices for using the | Agent Optimization | Use minimum required agents | 15-25% cost reduction | | Smart Routing | Route to specialized agents | 10-15% cost reduction | | Prompt Engineering | Optimize input tokens | 15-20% cost reduction | + | Flex Processing | Use flex tier for non-urgent tasks | 75% cost reduction | + +=== "Service Tiers" + + !!! tip "Choosing the Right Service Tier" + + | Tier | Best For | Benefits | Considerations | + |------|----------|----------|----------------| + | Standard | - Real-time processing
- Time-sensitive tasks
- Critical workflows | - Immediate execution
- Higher priority
- Predictable timing | - Higher cost
- 5-min timeout | + | Flex | - Batch processing
- Non-urgent tasks
- Cost-sensitive workloads | - 75% cost reduction
- Longer timeouts
- Auto-retries | - Variable timing
- Resource contention | === "Industry Solutions" @@ -127,6 +137,8 @@ Use this framework to select the optimal swarm architecture for your use case: - Monitor and log executions - Cache repeated results - Rotate API keys regularly + - Choose appropriate service tier based on task urgency + - Use flex processing for batch and non-urgent tasks !!! danger "Anti-patterns to Avoid" - Hardcoding API keys @@ -134,6 +146,8 @@ Use this framework to select the optimal swarm architecture for your use case: - Missing error handling - Excessive agent count - Inadequate monitoring + - Using standard tier for non-urgent tasks + - Not implementing retry logic for flex tier ### Performance Benchmarks @@ -141,11 +155,12 @@ Use this framework to select the optimal swarm architecture for your use case: | Metric | Target Range | Warning Threshold | |--------|--------------|-------------------| - | Response Time | < 2s | > 5s | + | Response Time | < 2s (standard)
< 15s (flex) | > 5s (standard)
> 30s (flex) | | Success Rate | > 99% | < 95% | - | Cost per Task | < $0.05 | > $0.10 | + | Cost per Task | < $0.05 (standard)
< $0.0125 (flex) | > $0.10 (standard)
> $0.025 (flex) | | Cache Hit Rate | > 80% | < 60% | | Error Rate | < 1% | > 5% | + | Retry Rate (flex) | < 10% | > 30% | ### Additional Resources diff --git a/docs/swarms_cloud/swarms_api.md b/docs/swarms_cloud/swarms_api.md index f3a60678..3d6c15de 100644 --- a/docs/swarms_cloud/swarms_api.md +++ b/docs/swarms_cloud/swarms_api.md @@ -46,6 +46,7 @@ API keys can be obtained and managed at [https://swarms.world/platform/api-keys] | `/v1/swarm/logs` | GET | Retrieve API request logs | | `/v1/swarms/available` | GET | Get all available swarms as a list of strings | | `/v1/models/available` | GET | Get all available models as a list of strings | +| `/v1/agent/completions` | POST | Run a single agent with specified configuration | @@ -88,6 +89,7 @@ The `SwarmSpec` model defines the configuration of a swarm. | return_history | boolean | Whether to return execution history | No | | rules | string | Guidelines for swarm behavior | No | | schedule | ScheduleSpec | Scheduling information | No | +| service_tier | string | Service tier for processing ("standard" or "flex") | No | ### AgentSpec @@ -341,11 +343,16 @@ curl -X POST "https://api.swarms.world/v1/swarm/batch/completions" \ ] ``` -#### Schedule Swarm +------- + + + + +### Run Single Agent -Schedule a swarm to run at a specific time. +Run a single agent with the specified configuration. -**Endpoint**: `/v1/swarm/schedule` +**Endpoint**: `/v1/agent/completions` **Method**: POST **Rate Limit**: 100 requests per 60 seconds @@ -353,104 +360,46 @@ Schedule a swarm to run at a specific time. | Field | Type | Description | Required | |-------|------|-------------|----------| -| name | string | Identifier for the swarm | No | -| description | string | Description of the swarm's purpose | No | -| agents | Array | List of agent specifications | No | -| max_loops | integer | Maximum number of execution loops | No | -| swarm_type | SwarmType | Architecture of the swarm | No | -| task | string | The main task for the swarm to accomplish | Yes | -| schedule | ScheduleSpec | Scheduling information | Yes | +| agent_config | AgentSpec | Configuration for the agent | Yes | +| task | string | The task to be completed by the agent | Yes | **Example Request**: ```bash -curl -X POST "https://api.swarms.world/v1/swarm/schedule" \ +curl -X POST "https://api.swarms.world/v1/agent/completions" \ -H "x-api-key: your_api_key_here" \ -H "Content-Type: application/json" \ -d '{ - "name": "daily-market-analysis", - "description": "Daily analysis of market conditions", - "task": "Analyze today's market movements and prepare a summary report", - "schedule": { - "scheduled_time": "2025-03-05T17:00:00Z", - "timezone": "UTC" - } + "agent_config": { + "agent_name": "Research Assistant", + "description": "Helps with research tasks", + "system_prompt": "You are a research assistant expert.", + "model_name": "gpt-4o", + "max_loops": 1, + "max_tokens": 8192, + "temperature": 0.5 + }, + "task": "Research the latest developments in quantum computing." }' ``` **Example Response**: ```json { - "status": "success", - "message": "Swarm scheduled successfully", - "job_id": "swarm_daily-market-analysis_1709563245", - "scheduled_time": "2025-03-05T17:00:00Z", - "timezone": "UTC" -} -``` - -#### Get Scheduled Jobs - -Retrieve all scheduled swarm jobs. - -**Endpoint**: `/v1/swarm/schedule` -**Method**: GET -**Rate Limit**: 100 requests per 60 seconds - -**Example Request**: -```bash -curl -X GET "https://api.swarms.world/v1/swarm/schedule" \ - -H "x-api-key: your_api_key_here" -``` - -**Example Response**: -```json -{ - "status": "success", - "scheduled_jobs": [ - { - "job_id": "swarm_daily-market-analysis_1709563245", - "swarm_name": "daily-market-analysis", - "scheduled_time": "2025-03-05T17:00:00Z", - "timezone": "UTC" - }, - { - "job_id": "swarm_weekly-report_1709563348", - "swarm_name": "weekly-report", - "scheduled_time": "2025-03-09T12:00:00Z", - "timezone": "UTC" - } - ] + "id": "agent-abc123", + "success": true, + "name": "Research Assistant", + "description": "Helps with research tasks", + "temperature": 0.5, + "outputs": {}, + "usage": { + "input_tokens": 150, + "output_tokens": 450, + "total_tokens": 600 + }, + "timestamp": "2024-03-05T12:34:56.789Z" } ``` -#### Cancel Scheduled Job - -Cancel a previously scheduled swarm job. - -**Endpoint**: `/v1/swarm/schedule/{job_id}` -**Method**: DELETE -**Rate Limit**: 100 requests per 60 seconds - -**Path Parameters**: - -| Parameter | Description | -|-----------|-------------| -| job_id | ID of the scheduled job to cancel | - -**Example Request**: -```bash -curl -X DELETE "https://api.swarms.world/v1/swarm/schedule/swarm_daily-market-analysis_1709563245" \ - -H "x-api-key: your_api_key_here" -``` - -**Example Response**: -```json -{ - "status": "success", - "message": "Scheduled job cancelled successfully", - "job_id": "swarm_daily-market-analysis_1709563245" -} -``` ### Get Models @@ -1004,3 +953,35 @@ For technical assistance with the Swarms API, please contact: - Community Discord: [https://discord.gg/swarms](https://discord.gg/swarms) - Swarms Marketplace: [https://swarms.world](https://swarms.world) - Swarms AI Website: [https://swarms.ai](https://swarms.ai) + +## Service Tiers + +The API offers two service tiers to accommodate different processing needs: + +### Standard Tier + +- Default processing tier + +- Immediate execution + +- Higher priority processing + +- Standard pricing + +- 5-minute timeout limit + +### Flex Tier + +- Lower cost processing + +- Automatic retries (up to 3 attempts) + +- Longer timeout (15 minutes) + +- 75% discount on token costs + +- Best for non-urgent tasks + +- Exponential backoff on resource contention + +To use the flex tier, set `service_tier: "flex"` in your SwarmSpec configuration. diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index f37f5d61..770fb096 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -39,12 +39,19 @@ from swarms.schemas.base_schemas import ( ) from swarms.structs.agent_roles import agent_roles from swarms.structs.conversation import Conversation +from swarms.structs.output_types import OutputType from swarms.structs.safe_loading import ( SafeLoaderUtils, SafeStateManager, ) from swarms.telemetry.main import log_agent_data from swarms.tools.base_tool import BaseTool +from swarms.tools.mcp_client import ( + execute_mcp_tool, + find_and_execute_tool, + list_all, + list_tools_for_multiple_urls, +) from swarms.tools.mcp_integration import MCPServerSseParams from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.any_to_str import any_to_str @@ -57,14 +64,7 @@ from swarms.utils.history_output_formatter import ( from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.pdf_to_text import pdf_to_text -from swarms.structs.output_types import OutputType from swarms.utils.str_to_dict import str_to_dict -from swarms.tools.mcp_client import ( - execute_mcp_tool, - list_tools_for_multiple_urls, - list_all, - find_and_execute_tool, -) # Utils diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index 8cdcfe50..86f424fa 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -1,6 +1,8 @@ import datetime import json -from typing import Any, List, Optional, Union +from typing import Any, List, Optional, Union, Dict +import threading +import hashlib import yaml from swarms.structs.base_structure import BaseStructure @@ -8,7 +10,6 @@ from typing import TYPE_CHECKING from swarms.utils.any_to_str import any_to_str from swarms.utils.formatter import formatter from swarms.utils.litellm_tokenizer import count_tokens -import threading if TYPE_CHECKING: from swarms.structs.agent import ( @@ -37,6 +38,9 @@ class Conversation(BaseStructure): save_as_json_bool (bool): Flag to save conversation history as JSON. token_count (bool): Flag to enable token counting for messages. conversation_history (list): List to store the history of messages. + cache_enabled (bool): Flag to enable prompt caching. + cache_stats (dict): Statistics about cache usage. + cache_lock (threading.Lock): Lock for thread-safe cache operations. """ def __init__( @@ -54,6 +58,7 @@ class Conversation(BaseStructure): save_as_yaml: bool = True, save_as_json_bool: bool = False, token_count: bool = True, + cache_enabled: bool = True, *args, **kwargs, ): @@ -74,6 +79,7 @@ class Conversation(BaseStructure): save_as_yaml (bool): Flag to save conversation history as YAML. save_as_json_bool (bool): Flag to save conversation history as JSON. token_count (bool): Flag to enable token counting for messages. + cache_enabled (bool): Flag to enable prompt caching. """ super().__init__() self.system_prompt = system_prompt @@ -90,6 +96,14 @@ class Conversation(BaseStructure): self.save_as_yaml = save_as_yaml self.save_as_json_bool = save_as_json_bool self.token_count = token_count + self.cache_enabled = cache_enabled + self.cache_stats = { + "hits": 0, + "misses": 0, + "cached_tokens": 0, + "total_tokens": 0, + } + self.cache_lock = threading.Lock() # If system prompt is not None, add it to the conversation history if self.system_prompt is not None: @@ -105,6 +119,61 @@ class Conversation(BaseStructure): if tokenizer is not None: self.truncate_memory_with_tokenizer() + def _generate_cache_key( + self, content: Union[str, dict, list] + ) -> str: + """Generate a cache key for the given content. + + Args: + content (Union[str, dict, list]): The content to generate a cache key for. + + Returns: + str: The cache key. + """ + if isinstance(content, (dict, list)): + content = json.dumps(content, sort_keys=True) + return hashlib.md5(content.encode()).hexdigest() + + def _get_cached_tokens( + self, content: Union[str, dict, list] + ) -> Optional[int]: + """Get the number of cached tokens for the given content. + + Args: + content (Union[str, dict, list]): The content to check. + + Returns: + Optional[int]: The number of cached tokens, or None if not cached. + """ + if not self.cache_enabled: + return None + + with self.cache_lock: + cache_key = self._generate_cache_key(content) + if cache_key in self.cache_stats: + self.cache_stats["hits"] += 1 + return self.cache_stats[cache_key] + self.cache_stats["misses"] += 1 + return None + + def _update_cache_stats( + self, content: Union[str, dict, list], token_count: int + ): + """Update cache statistics for the given content. + + Args: + content (Union[str, dict, list]): The content to update stats for. + token_count (int): The number of tokens in the content. + """ + if not self.cache_enabled: + return + + with self.cache_lock: + cache_key = self._generate_cache_key(content) + self.cache_stats[cache_key] = token_count + self.cache_stats["cached_tokens"] += token_count + self.cache_stats["total_tokens"] += token_count + def add( self, role: str, @@ -118,7 +187,6 @@ class Conversation(BaseStructure): role (str): The role of the speaker (e.g., 'User', 'System'). content (Union[str, dict, list]): The content of the message to be added. """ - # Base message with role message = { "role": role, @@ -134,10 +202,20 @@ class Conversation(BaseStructure): else: message["content"] = content + # Check cache for token count + cached_tokens = self._get_cached_tokens(content) + if cached_tokens is not None: + message["token_count"] = cached_tokens + message["cached"] = True + else: + message["cached"] = False + # Add the message to history immediately without waiting for token count self.conversation_history.append(message) - if self.token_count is True: + if self.token_count is True and not message.get( + "cached", False + ): self._count_tokens(content, message) def add_multiple_messages( @@ -155,6 +233,8 @@ class Conversation(BaseStructure): tokens = count_tokens(any_to_str(content)) # Update the message that's already in the conversation history message["token_count"] = int(tokens) + # Update cache stats + self._update_cache_stats(content, int(tokens)) # If autosave is enabled, save after token count is updated if self.autosave: @@ -277,13 +357,23 @@ class Conversation(BaseStructure): ] ) - def get_str(self): + def get_str(self) -> str: """Get the conversation history as a string. Returns: str: The conversation history. """ - return self.return_history_as_string() + messages = [] + for message in self.conversation_history: + content = message["content"] + if isinstance(content, (dict, list)): + content = json.dumps(content) + messages.append(f"{message['role']}: {content}") + if "token_count" in message: + messages[-1] += f" (tokens: {message['token_count']})" + if message.get("cached", False): + messages[-1] += " [cached]" + return "\n".join(messages) def save_as_json(self, filename: str = None): """Save the conversation history as a JSON file. @@ -512,6 +602,33 @@ class Conversation(BaseStructure): """ self.conversation_history.extend(messages) + def get_cache_stats(self) -> Dict[str, int]: + """Get statistics about cache usage. + + Returns: + Dict[str, int]: Statistics about cache usage. + """ + with self.cache_lock: + return { + "hits": self.cache_stats["hits"], + "misses": self.cache_stats["misses"], + "cached_tokens": self.cache_stats["cached_tokens"], + "total_tokens": self.cache_stats["total_tokens"], + "hit_rate": ( + self.cache_stats["hits"] + / ( + self.cache_stats["hits"] + + self.cache_stats["misses"] + ) + if ( + self.cache_stats["hits"] + + self.cache_stats["misses"] + ) + > 0 + else 0 + ), + } + # # Example usage # # conversation = Conversation() diff --git a/benchmark_init.py b/tests/benchmark_init.py similarity index 100% rename from benchmark_init.py rename to tests/benchmark_init.py diff --git a/tests/structs/test_conversation_cache.py b/tests/structs/test_conversation_cache.py new file mode 100644 index 00000000..430a0794 --- /dev/null +++ b/tests/structs/test_conversation_cache.py @@ -0,0 +1,241 @@ +from swarms.structs.conversation import Conversation +import time +import threading +import random +from typing import List + + +def test_conversation_cache(): + """ + Test the caching functionality of the Conversation class. + This test demonstrates: + 1. Cache hits and misses + 2. Token counting with caching + 3. Cache statistics + 4. Thread safety + 5. Different content types + 6. Edge cases + 7. Performance metrics + """ + print("\n=== Testing Conversation Cache ===") + + # Create a conversation with caching enabled + conv = Conversation(cache_enabled=True) + + # Test 1: Basic caching with repeated messages + print("\nTest 1: Basic caching with repeated messages") + message = "This is a test message that should be cached" + + # First add (should be a cache miss) + print("\nAdding first message...") + conv.add("user", message) + time.sleep(0.1) # Wait for token counting thread + + # Second add (should be a cache hit) + print("\nAdding same message again...") + conv.add("user", message) + time.sleep(0.1) # Wait for token counting thread + + # Check cache stats + stats = conv.get_cache_stats() + print("\nCache stats after repeated message:") + print(f"Hits: {stats['hits']}") + print(f"Misses: {stats['misses']}") + print(f"Cached tokens: {stats['cached_tokens']}") + print(f"Hit rate: {stats['hit_rate']:.2%}") + + # Test 2: Different content types + print("\nTest 2: Different content types") + + # Test with dictionary + dict_content = {"key": "value", "nested": {"inner": "data"}} + print("\nAdding dictionary content...") + conv.add("user", dict_content) + time.sleep(0.1) + + # Test with list + list_content = ["item1", "item2", {"nested": "data"}] + print("\nAdding list content...") + conv.add("user", list_content) + time.sleep(0.1) + + # Test 3: Thread safety + print("\nTest 3: Thread safety with concurrent adds") + + def add_message(msg): + conv.add("user", msg) + + # Add multiple messages concurrently + messages = [f"Concurrent message {i}" for i in range(5)] + for msg in messages: + add_message(msg) + + time.sleep(0.5) # Wait for all token counting threads + + # Test 4: Cache with different message lengths + print("\nTest 4: Cache with different message lengths") + + # Short message + short_msg = "Short" + conv.add("user", short_msg) + time.sleep(0.1) + + # Long message + long_msg = "This is a much longer message that should have more tokens and might be cached differently" + conv.add("user", long_msg) + time.sleep(0.1) + + # Test 5: Cache statistics after all tests + print("\nTest 5: Final cache statistics") + final_stats = conv.get_cache_stats() + print("\nFinal cache stats:") + print(f"Total hits: {final_stats['hits']}") + print(f"Total misses: {final_stats['misses']}") + print(f"Total cached tokens: {final_stats['cached_tokens']}") + print(f"Total tokens: {final_stats['total_tokens']}") + print(f"Overall hit rate: {final_stats['hit_rate']:.2%}") + + # Test 6: Display conversation with cache status + print("\nTest 6: Display conversation with cache status") + print("\nConversation history:") + print(conv.get_str()) + + # Test 7: Cache disabled + print("\nTest 7: Cache disabled") + conv_disabled = Conversation(cache_enabled=False) + conv_disabled.add("user", message) + time.sleep(0.1) + conv_disabled.add("user", message) + time.sleep(0.1) + + disabled_stats = conv_disabled.get_cache_stats() + print("\nCache stats with caching disabled:") + print(f"Hits: {disabled_stats['hits']}") + print(f"Misses: {disabled_stats['misses']}") + print(f"Cached tokens: {disabled_stats['cached_tokens']}") + + # Test 8: High concurrency stress test + print("\nTest 8: High concurrency stress test") + conv_stress = Conversation(cache_enabled=True) + + def stress_test_worker(messages: List[str]): + for msg in messages: + conv_stress.add("user", msg) + time.sleep(random.uniform(0.01, 0.05)) + + # Create multiple threads with different messages + threads = [] + for i in range(5): + thread_messages = [ + f"Stress test message {i}_{j}" for j in range(10) + ] + t = threading.Thread( + target=stress_test_worker, args=(thread_messages,) + ) + threads.append(t) + t.start() + + # Wait for all threads to complete + for t in threads: + t.join() + + time.sleep(0.5) # Wait for token counting + stress_stats = conv_stress.get_cache_stats() + print("\nStress test stats:") + print( + f"Total messages: {stress_stats['hits'] + stress_stats['misses']}" + ) + print(f"Cache hits: {stress_stats['hits']}") + print(f"Cache misses: {stress_stats['misses']}") + + # Test 9: Complex nested structures + print("\nTest 9: Complex nested structures") + complex_content = { + "nested": { + "array": [1, 2, 3, {"deep": "value"}], + "object": { + "key": "value", + "nested_array": ["a", "b", "c"], + }, + }, + "simple": "value", + } + + # Add complex content multiple times + for _ in range(3): + conv.add("user", complex_content) + time.sleep(0.1) + + # Test 10: Large message test + print("\nTest 10: Large message test") + large_message = "x" * 10000 # 10KB message + conv.add("user", large_message) + time.sleep(0.1) + + # Test 11: Mixed content types in sequence + print("\nTest 11: Mixed content types in sequence") + mixed_sequence = [ + "Simple string", + {"key": "value"}, + ["array", "items"], + "Simple string", # Should be cached + {"key": "value"}, # Should be cached + ["array", "items"], # Should be cached + ] + + for content in mixed_sequence: + conv.add("user", content) + time.sleep(0.1) + + # Test 12: Cache performance metrics + print("\nTest 12: Cache performance metrics") + start_time = time.time() + + # Add 100 messages quickly + for i in range(100): + conv.add("user", f"Performance test message {i}") + + end_time = time.time() + performance_stats = conv.get_cache_stats() + + print("\nPerformance metrics:") + print(f"Time taken: {end_time - start_time:.2f} seconds") + print(f"Messages per second: {100 / (end_time - start_time):.2f}") + print(f"Cache hit rate: {performance_stats['hit_rate']:.2%}") + + # Test 13: Cache with special characters + print("\nTest 13: Cache with special characters") + special_chars = [ + "Hello! @#$%^&*()", + "Unicode: δ½ ε₯½δΈ–η•Œ", + "Emoji: πŸ˜€πŸŽ‰πŸŒŸ", + "Hello! @#$%^&*()", # Should be cached + "Unicode: δ½ ε₯½δΈ–η•Œ", # Should be cached + "Emoji: πŸ˜€πŸŽ‰πŸŒŸ", # Should be cached + ] + + for content in special_chars: + conv.add("user", content) + time.sleep(0.1) + + # Test 14: Cache with different roles + print("\nTest 14: Cache with different roles") + roles = ["user", "assistant", "system", "function"] + for role in roles: + conv.add(role, "Same message different role") + time.sleep(0.1) + + # Final statistics + print("\n=== Final Cache Statistics ===") + final_stats = conv.get_cache_stats() + print(f"Total hits: {final_stats['hits']}") + print(f"Total misses: {final_stats['misses']}") + print(f"Total cached tokens: {final_stats['cached_tokens']}") + print(f"Total tokens: {final_stats['total_tokens']}") + print(f"Overall hit rate: {final_stats['hit_rate']:.2%}") + + print("\n=== Cache Testing Complete ===") + + +if __name__ == "__main__": + test_conversation_cache()