updated docs for swarms api

pull/839/head
Kye Gomez 4 weeks ago
parent e9a7c7994c
commit ed46063dcc

@ -45,6 +45,16 @@ This comprehensive guide outlines production-grade best practices for using the
| Agent Optimization | Use minimum required agents | 15-25% cost reduction | | Agent Optimization | Use minimum required agents | 15-25% cost reduction |
| Smart Routing | Route to specialized agents | 10-15% cost reduction | | Smart Routing | Route to specialized agents | 10-15% cost reduction |
| Prompt Engineering | Optimize input tokens | 15-20% cost reduction | | Prompt Engineering | Optimize input tokens | 15-20% cost reduction |
| Flex Processing | Use flex tier for non-urgent tasks | 75% cost reduction |
=== "Service Tiers"
!!! tip "Choosing the Right Service Tier"
| Tier | Best For | Benefits | Considerations |
|------|----------|----------|----------------|
| Standard | - Real-time processing<br>- Time-sensitive tasks<br>- Critical workflows | - Immediate execution<br>- Higher priority<br>- Predictable timing | - Higher cost<br>- 5-min timeout |
| Flex | - Batch processing<br>- Non-urgent tasks<br>- Cost-sensitive workloads | - 75% cost reduction<br>- Longer timeouts<br>- Auto-retries | - Variable timing<br>- Resource contention |
=== "Industry Solutions" === "Industry Solutions"
@ -127,6 +137,8 @@ Use this framework to select the optimal swarm architecture for your use case:
- Monitor and log executions - Monitor and log executions
- Cache repeated results - Cache repeated results
- Rotate API keys regularly - Rotate API keys regularly
- Choose appropriate service tier based on task urgency
- Use flex processing for batch and non-urgent tasks
!!! danger "Anti-patterns to Avoid" !!! danger "Anti-patterns to Avoid"
- Hardcoding API keys - Hardcoding API keys
@ -134,6 +146,8 @@ Use this framework to select the optimal swarm architecture for your use case:
- Missing error handling - Missing error handling
- Excessive agent count - Excessive agent count
- Inadequate monitoring - Inadequate monitoring
- Using standard tier for non-urgent tasks
- Not implementing retry logic for flex tier
### Performance Benchmarks ### Performance Benchmarks
@ -141,11 +155,12 @@ Use this framework to select the optimal swarm architecture for your use case:
| Metric | Target Range | Warning Threshold | | Metric | Target Range | Warning Threshold |
|--------|--------------|-------------------| |--------|--------------|-------------------|
| Response Time | < 2s | > 5s | | Response Time | < 2s (standard)<br>< 15s (flex) | > 5s (standard)<br>> 30s (flex) |
| Success Rate | > 99% | < 95% | | Success Rate | > 99% | < 95% |
| Cost per Task | < $0.05 | > $0.10 | | Cost per Task | < $0.05 (standard)<br>< $0.0125 (flex) | > $0.10 (standard)<br>> $0.025 (flex) |
| Cache Hit Rate | > 80% | < 60% | | Cache Hit Rate | > 80% | < 60% |
| Error Rate | < 1% | > 5% | | Error Rate | < 1% | > 5% |
| Retry Rate (flex) | < 10% | > 30% |
### Additional Resources ### Additional Resources

@ -46,6 +46,7 @@ API keys can be obtained and managed at [https://swarms.world/platform/api-keys]
| `/v1/swarm/logs` | GET | Retrieve API request logs | | `/v1/swarm/logs` | GET | Retrieve API request logs |
| `/v1/swarms/available` | GET | Get all available swarms as a list of strings | | `/v1/swarms/available` | GET | Get all available swarms as a list of strings |
| `/v1/models/available` | GET | Get all available models as a list of strings | | `/v1/models/available` | GET | Get all available models as a list of strings |
| `/v1/agent/completions` | POST | Run a single agent with specified configuration |
@ -88,6 +89,7 @@ The `SwarmSpec` model defines the configuration of a swarm.
| return_history | boolean | Whether to return execution history | No | | return_history | boolean | Whether to return execution history | No |
| rules | string | Guidelines for swarm behavior | No | | rules | string | Guidelines for swarm behavior | No |
| schedule | ScheduleSpec | Scheduling information | No | | schedule | ScheduleSpec | Scheduling information | No |
| service_tier | string | Service tier for processing ("standard" or "flex") | No |
### AgentSpec ### AgentSpec
@ -341,11 +343,16 @@ curl -X POST "https://api.swarms.world/v1/swarm/batch/completions" \
] ]
``` ```
#### Schedule Swarm -------
### Run Single Agent
Schedule a swarm to run at a specific time. Run a single agent with the specified configuration.
**Endpoint**: `/v1/swarm/schedule` **Endpoint**: `/v1/agent/completions`
**Method**: POST **Method**: POST
**Rate Limit**: 100 requests per 60 seconds **Rate Limit**: 100 requests per 60 seconds
@ -353,104 +360,46 @@ Schedule a swarm to run at a specific time.
| Field | Type | Description | Required | | Field | Type | Description | Required |
|-------|------|-------------|----------| |-------|------|-------------|----------|
| name | string | Identifier for the swarm | No | | agent_config | AgentSpec | Configuration for the agent | Yes |
| description | string | Description of the swarm's purpose | No | | task | string | The task to be completed by the agent | Yes |
| agents | Array<AgentSpec> | List of agent specifications | No |
| max_loops | integer | Maximum number of execution loops | No |
| swarm_type | SwarmType | Architecture of the swarm | No |
| task | string | The main task for the swarm to accomplish | Yes |
| schedule | ScheduleSpec | Scheduling information | Yes |
**Example Request**: **Example Request**:
```bash ```bash
curl -X POST "https://api.swarms.world/v1/swarm/schedule" \ curl -X POST "https://api.swarms.world/v1/agent/completions" \
-H "x-api-key: your_api_key_here" \ -H "x-api-key: your_api_key_here" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-d '{ -d '{
"name": "daily-market-analysis", "agent_config": {
"description": "Daily analysis of market conditions", "agent_name": "Research Assistant",
"task": "Analyze today's market movements and prepare a summary report", "description": "Helps with research tasks",
"schedule": { "system_prompt": "You are a research assistant expert.",
"scheduled_time": "2025-03-05T17:00:00Z", "model_name": "gpt-4o",
"timezone": "UTC" "max_loops": 1,
} "max_tokens": 8192,
"temperature": 0.5
},
"task": "Research the latest developments in quantum computing."
}' }'
``` ```
**Example Response**: **Example Response**:
```json ```json
{ {
"status": "success", "id": "agent-abc123",
"message": "Swarm scheduled successfully", "success": true,
"job_id": "swarm_daily-market-analysis_1709563245", "name": "Research Assistant",
"scheduled_time": "2025-03-05T17:00:00Z", "description": "Helps with research tasks",
"timezone": "UTC" "temperature": 0.5,
} "outputs": {},
``` "usage": {
"input_tokens": 150,
#### Get Scheduled Jobs "output_tokens": 450,
"total_tokens": 600
Retrieve all scheduled swarm jobs. },
"timestamp": "2024-03-05T12:34:56.789Z"
**Endpoint**: `/v1/swarm/schedule`
**Method**: GET
**Rate Limit**: 100 requests per 60 seconds
**Example Request**:
```bash
curl -X GET "https://api.swarms.world/v1/swarm/schedule" \
-H "x-api-key: your_api_key_here"
```
**Example Response**:
```json
{
"status": "success",
"scheduled_jobs": [
{
"job_id": "swarm_daily-market-analysis_1709563245",
"swarm_name": "daily-market-analysis",
"scheduled_time": "2025-03-05T17:00:00Z",
"timezone": "UTC"
},
{
"job_id": "swarm_weekly-report_1709563348",
"swarm_name": "weekly-report",
"scheduled_time": "2025-03-09T12:00:00Z",
"timezone": "UTC"
}
]
} }
``` ```
#### Cancel Scheduled Job
Cancel a previously scheduled swarm job.
**Endpoint**: `/v1/swarm/schedule/{job_id}`
**Method**: DELETE
**Rate Limit**: 100 requests per 60 seconds
**Path Parameters**:
| Parameter | Description |
|-----------|-------------|
| job_id | ID of the scheduled job to cancel |
**Example Request**:
```bash
curl -X DELETE "https://api.swarms.world/v1/swarm/schedule/swarm_daily-market-analysis_1709563245" \
-H "x-api-key: your_api_key_here"
```
**Example Response**:
```json
{
"status": "success",
"message": "Scheduled job cancelled successfully",
"job_id": "swarm_daily-market-analysis_1709563245"
}
```
### Get Models ### Get Models
@ -1004,3 +953,35 @@ For technical assistance with the Swarms API, please contact:
- Community Discord: [https://discord.gg/swarms](https://discord.gg/swarms) - Community Discord: [https://discord.gg/swarms](https://discord.gg/swarms)
- Swarms Marketplace: [https://swarms.world](https://swarms.world) - Swarms Marketplace: [https://swarms.world](https://swarms.world)
- Swarms AI Website: [https://swarms.ai](https://swarms.ai) - Swarms AI Website: [https://swarms.ai](https://swarms.ai)
## Service Tiers
The API offers two service tiers to accommodate different processing needs:
### Standard Tier
- Default processing tier
- Immediate execution
- Higher priority processing
- Standard pricing
- 5-minute timeout limit
### Flex Tier
- Lower cost processing
- Automatic retries (up to 3 attempts)
- Longer timeout (15 minutes)
- 75% discount on token costs
- Best for non-urgent tasks
- Exponential backoff on resource contention
To use the flex tier, set `service_tier: "flex"` in your SwarmSpec configuration.

@ -39,12 +39,19 @@ from swarms.schemas.base_schemas import (
) )
from swarms.structs.agent_roles import agent_roles from swarms.structs.agent_roles import agent_roles
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from swarms.structs.output_types import OutputType
from swarms.structs.safe_loading import ( from swarms.structs.safe_loading import (
SafeLoaderUtils, SafeLoaderUtils,
SafeStateManager, SafeStateManager,
) )
from swarms.telemetry.main import log_agent_data from swarms.telemetry.main import log_agent_data
from swarms.tools.base_tool import BaseTool from swarms.tools.base_tool import BaseTool
from swarms.tools.mcp_client import (
execute_mcp_tool,
find_and_execute_tool,
list_all,
list_tools_for_multiple_urls,
)
from swarms.tools.mcp_integration import MCPServerSseParams from swarms.tools.mcp_integration import MCPServerSseParams
from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.tools.tool_parse_exec import parse_and_execute_json
from swarms.utils.any_to_str import any_to_str from swarms.utils.any_to_str import any_to_str
@ -57,14 +64,7 @@ from swarms.utils.history_output_formatter import (
from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.litellm_wrapper import LiteLLM
from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.pdf_to_text import pdf_to_text
from swarms.structs.output_types import OutputType
from swarms.utils.str_to_dict import str_to_dict from swarms.utils.str_to_dict import str_to_dict
from swarms.tools.mcp_client import (
execute_mcp_tool,
list_tools_for_multiple_urls,
list_all,
find_and_execute_tool,
)
# Utils # Utils

@ -1,6 +1,8 @@
import datetime import datetime
import json import json
from typing import Any, List, Optional, Union from typing import Any, List, Optional, Union, Dict
import threading
import hashlib
import yaml import yaml
from swarms.structs.base_structure import BaseStructure from swarms.structs.base_structure import BaseStructure
@ -8,7 +10,6 @@ from typing import TYPE_CHECKING
from swarms.utils.any_to_str import any_to_str from swarms.utils.any_to_str import any_to_str
from swarms.utils.formatter import formatter from swarms.utils.formatter import formatter
from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.litellm_tokenizer import count_tokens
import threading
if TYPE_CHECKING: if TYPE_CHECKING:
from swarms.structs.agent import ( from swarms.structs.agent import (
@ -37,6 +38,9 @@ class Conversation(BaseStructure):
save_as_json_bool (bool): Flag to save conversation history as JSON. save_as_json_bool (bool): Flag to save conversation history as JSON.
token_count (bool): Flag to enable token counting for messages. token_count (bool): Flag to enable token counting for messages.
conversation_history (list): List to store the history of messages. conversation_history (list): List to store the history of messages.
cache_enabled (bool): Flag to enable prompt caching.
cache_stats (dict): Statistics about cache usage.
cache_lock (threading.Lock): Lock for thread-safe cache operations.
""" """
def __init__( def __init__(
@ -54,6 +58,7 @@ class Conversation(BaseStructure):
save_as_yaml: bool = True, save_as_yaml: bool = True,
save_as_json_bool: bool = False, save_as_json_bool: bool = False,
token_count: bool = True, token_count: bool = True,
cache_enabled: bool = True,
*args, *args,
**kwargs, **kwargs,
): ):
@ -74,6 +79,7 @@ class Conversation(BaseStructure):
save_as_yaml (bool): Flag to save conversation history as YAML. save_as_yaml (bool): Flag to save conversation history as YAML.
save_as_json_bool (bool): Flag to save conversation history as JSON. save_as_json_bool (bool): Flag to save conversation history as JSON.
token_count (bool): Flag to enable token counting for messages. token_count (bool): Flag to enable token counting for messages.
cache_enabled (bool): Flag to enable prompt caching.
""" """
super().__init__() super().__init__()
self.system_prompt = system_prompt self.system_prompt = system_prompt
@ -90,6 +96,14 @@ class Conversation(BaseStructure):
self.save_as_yaml = save_as_yaml self.save_as_yaml = save_as_yaml
self.save_as_json_bool = save_as_json_bool self.save_as_json_bool = save_as_json_bool
self.token_count = token_count self.token_count = token_count
self.cache_enabled = cache_enabled
self.cache_stats = {
"hits": 0,
"misses": 0,
"cached_tokens": 0,
"total_tokens": 0,
}
self.cache_lock = threading.Lock()
# If system prompt is not None, add it to the conversation history # If system prompt is not None, add it to the conversation history
if self.system_prompt is not None: if self.system_prompt is not None:
@ -105,6 +119,61 @@ class Conversation(BaseStructure):
if tokenizer is not None: if tokenizer is not None:
self.truncate_memory_with_tokenizer() self.truncate_memory_with_tokenizer()
def _generate_cache_key(
self, content: Union[str, dict, list]
) -> str:
"""Generate a cache key for the given content.
Args:
content (Union[str, dict, list]): The content to generate a cache key for.
Returns:
str: The cache key.
"""
if isinstance(content, (dict, list)):
content = json.dumps(content, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
def _get_cached_tokens(
self, content: Union[str, dict, list]
) -> Optional[int]:
"""Get the number of cached tokens for the given content.
Args:
content (Union[str, dict, list]): The content to check.
Returns:
Optional[int]: The number of cached tokens, or None if not cached.
"""
if not self.cache_enabled:
return None
with self.cache_lock:
cache_key = self._generate_cache_key(content)
if cache_key in self.cache_stats:
self.cache_stats["hits"] += 1
return self.cache_stats[cache_key]
self.cache_stats["misses"] += 1
return None
def _update_cache_stats(
self, content: Union[str, dict, list], token_count: int
):
"""Update cache statistics for the given content.
Args:
content (Union[str, dict, list]): The content to update stats for.
token_count (int): The number of tokens in the content.
"""
if not self.cache_enabled:
return
with self.cache_lock:
cache_key = self._generate_cache_key(content)
self.cache_stats[cache_key] = token_count
self.cache_stats["cached_tokens"] += token_count
self.cache_stats["total_tokens"] += token_count
def add( def add(
self, self,
role: str, role: str,
@ -118,7 +187,6 @@ class Conversation(BaseStructure):
role (str): The role of the speaker (e.g., 'User', 'System'). role (str): The role of the speaker (e.g., 'User', 'System').
content (Union[str, dict, list]): The content of the message to be added. content (Union[str, dict, list]): The content of the message to be added.
""" """
# Base message with role # Base message with role
message = { message = {
"role": role, "role": role,
@ -134,10 +202,20 @@ class Conversation(BaseStructure):
else: else:
message["content"] = content message["content"] = content
# Check cache for token count
cached_tokens = self._get_cached_tokens(content)
if cached_tokens is not None:
message["token_count"] = cached_tokens
message["cached"] = True
else:
message["cached"] = False
# Add the message to history immediately without waiting for token count # Add the message to history immediately without waiting for token count
self.conversation_history.append(message) self.conversation_history.append(message)
if self.token_count is True: if self.token_count is True and not message.get(
"cached", False
):
self._count_tokens(content, message) self._count_tokens(content, message)
def add_multiple_messages( def add_multiple_messages(
@ -155,6 +233,8 @@ class Conversation(BaseStructure):
tokens = count_tokens(any_to_str(content)) tokens = count_tokens(any_to_str(content))
# Update the message that's already in the conversation history # Update the message that's already in the conversation history
message["token_count"] = int(tokens) message["token_count"] = int(tokens)
# Update cache stats
self._update_cache_stats(content, int(tokens))
# If autosave is enabled, save after token count is updated # If autosave is enabled, save after token count is updated
if self.autosave: if self.autosave:
@ -277,13 +357,23 @@ class Conversation(BaseStructure):
] ]
) )
def get_str(self): def get_str(self) -> str:
"""Get the conversation history as a string. """Get the conversation history as a string.
Returns: Returns:
str: The conversation history. str: The conversation history.
""" """
return self.return_history_as_string() messages = []
for message in self.conversation_history:
content = message["content"]
if isinstance(content, (dict, list)):
content = json.dumps(content)
messages.append(f"{message['role']}: {content}")
if "token_count" in message:
messages[-1] += f" (tokens: {message['token_count']})"
if message.get("cached", False):
messages[-1] += " [cached]"
return "\n".join(messages)
def save_as_json(self, filename: str = None): def save_as_json(self, filename: str = None):
"""Save the conversation history as a JSON file. """Save the conversation history as a JSON file.
@ -512,6 +602,33 @@ class Conversation(BaseStructure):
""" """
self.conversation_history.extend(messages) self.conversation_history.extend(messages)
def get_cache_stats(self) -> Dict[str, int]:
"""Get statistics about cache usage.
Returns:
Dict[str, int]: Statistics about cache usage.
"""
with self.cache_lock:
return {
"hits": self.cache_stats["hits"],
"misses": self.cache_stats["misses"],
"cached_tokens": self.cache_stats["cached_tokens"],
"total_tokens": self.cache_stats["total_tokens"],
"hit_rate": (
self.cache_stats["hits"]
/ (
self.cache_stats["hits"]
+ self.cache_stats["misses"]
)
if (
self.cache_stats["hits"]
+ self.cache_stats["misses"]
)
> 0
else 0
),
}
# # Example usage # # Example usage
# # conversation = Conversation() # # conversation = Conversation()

@ -0,0 +1,241 @@
from swarms.structs.conversation import Conversation
import time
import threading
import random
from typing import List
def test_conversation_cache():
"""
Test the caching functionality of the Conversation class.
This test demonstrates:
1. Cache hits and misses
2. Token counting with caching
3. Cache statistics
4. Thread safety
5. Different content types
6. Edge cases
7. Performance metrics
"""
print("\n=== Testing Conversation Cache ===")
# Create a conversation with caching enabled
conv = Conversation(cache_enabled=True)
# Test 1: Basic caching with repeated messages
print("\nTest 1: Basic caching with repeated messages")
message = "This is a test message that should be cached"
# First add (should be a cache miss)
print("\nAdding first message...")
conv.add("user", message)
time.sleep(0.1) # Wait for token counting thread
# Second add (should be a cache hit)
print("\nAdding same message again...")
conv.add("user", message)
time.sleep(0.1) # Wait for token counting thread
# Check cache stats
stats = conv.get_cache_stats()
print("\nCache stats after repeated message:")
print(f"Hits: {stats['hits']}")
print(f"Misses: {stats['misses']}")
print(f"Cached tokens: {stats['cached_tokens']}")
print(f"Hit rate: {stats['hit_rate']:.2%}")
# Test 2: Different content types
print("\nTest 2: Different content types")
# Test with dictionary
dict_content = {"key": "value", "nested": {"inner": "data"}}
print("\nAdding dictionary content...")
conv.add("user", dict_content)
time.sleep(0.1)
# Test with list
list_content = ["item1", "item2", {"nested": "data"}]
print("\nAdding list content...")
conv.add("user", list_content)
time.sleep(0.1)
# Test 3: Thread safety
print("\nTest 3: Thread safety with concurrent adds")
def add_message(msg):
conv.add("user", msg)
# Add multiple messages concurrently
messages = [f"Concurrent message {i}" for i in range(5)]
for msg in messages:
add_message(msg)
time.sleep(0.5) # Wait for all token counting threads
# Test 4: Cache with different message lengths
print("\nTest 4: Cache with different message lengths")
# Short message
short_msg = "Short"
conv.add("user", short_msg)
time.sleep(0.1)
# Long message
long_msg = "This is a much longer message that should have more tokens and might be cached differently"
conv.add("user", long_msg)
time.sleep(0.1)
# Test 5: Cache statistics after all tests
print("\nTest 5: Final cache statistics")
final_stats = conv.get_cache_stats()
print("\nFinal cache stats:")
print(f"Total hits: {final_stats['hits']}")
print(f"Total misses: {final_stats['misses']}")
print(f"Total cached tokens: {final_stats['cached_tokens']}")
print(f"Total tokens: {final_stats['total_tokens']}")
print(f"Overall hit rate: {final_stats['hit_rate']:.2%}")
# Test 6: Display conversation with cache status
print("\nTest 6: Display conversation with cache status")
print("\nConversation history:")
print(conv.get_str())
# Test 7: Cache disabled
print("\nTest 7: Cache disabled")
conv_disabled = Conversation(cache_enabled=False)
conv_disabled.add("user", message)
time.sleep(0.1)
conv_disabled.add("user", message)
time.sleep(0.1)
disabled_stats = conv_disabled.get_cache_stats()
print("\nCache stats with caching disabled:")
print(f"Hits: {disabled_stats['hits']}")
print(f"Misses: {disabled_stats['misses']}")
print(f"Cached tokens: {disabled_stats['cached_tokens']}")
# Test 8: High concurrency stress test
print("\nTest 8: High concurrency stress test")
conv_stress = Conversation(cache_enabled=True)
def stress_test_worker(messages: List[str]):
for msg in messages:
conv_stress.add("user", msg)
time.sleep(random.uniform(0.01, 0.05))
# Create multiple threads with different messages
threads = []
for i in range(5):
thread_messages = [
f"Stress test message {i}_{j}" for j in range(10)
]
t = threading.Thread(
target=stress_test_worker, args=(thread_messages,)
)
threads.append(t)
t.start()
# Wait for all threads to complete
for t in threads:
t.join()
time.sleep(0.5) # Wait for token counting
stress_stats = conv_stress.get_cache_stats()
print("\nStress test stats:")
print(
f"Total messages: {stress_stats['hits'] + stress_stats['misses']}"
)
print(f"Cache hits: {stress_stats['hits']}")
print(f"Cache misses: {stress_stats['misses']}")
# Test 9: Complex nested structures
print("\nTest 9: Complex nested structures")
complex_content = {
"nested": {
"array": [1, 2, 3, {"deep": "value"}],
"object": {
"key": "value",
"nested_array": ["a", "b", "c"],
},
},
"simple": "value",
}
# Add complex content multiple times
for _ in range(3):
conv.add("user", complex_content)
time.sleep(0.1)
# Test 10: Large message test
print("\nTest 10: Large message test")
large_message = "x" * 10000 # 10KB message
conv.add("user", large_message)
time.sleep(0.1)
# Test 11: Mixed content types in sequence
print("\nTest 11: Mixed content types in sequence")
mixed_sequence = [
"Simple string",
{"key": "value"},
["array", "items"],
"Simple string", # Should be cached
{"key": "value"}, # Should be cached
["array", "items"], # Should be cached
]
for content in mixed_sequence:
conv.add("user", content)
time.sleep(0.1)
# Test 12: Cache performance metrics
print("\nTest 12: Cache performance metrics")
start_time = time.time()
# Add 100 messages quickly
for i in range(100):
conv.add("user", f"Performance test message {i}")
end_time = time.time()
performance_stats = conv.get_cache_stats()
print("\nPerformance metrics:")
print(f"Time taken: {end_time - start_time:.2f} seconds")
print(f"Messages per second: {100 / (end_time - start_time):.2f}")
print(f"Cache hit rate: {performance_stats['hit_rate']:.2%}")
# Test 13: Cache with special characters
print("\nTest 13: Cache with special characters")
special_chars = [
"Hello! @#$%^&*()",
"Unicode: 你好世界",
"Emoji: 😀🎉🌟",
"Hello! @#$%^&*()", # Should be cached
"Unicode: 你好世界", # Should be cached
"Emoji: 😀🎉🌟", # Should be cached
]
for content in special_chars:
conv.add("user", content)
time.sleep(0.1)
# Test 14: Cache with different roles
print("\nTest 14: Cache with different roles")
roles = ["user", "assistant", "system", "function"]
for role in roles:
conv.add(role, "Same message different role")
time.sleep(0.1)
# Final statistics
print("\n=== Final Cache Statistics ===")
final_stats = conv.get_cache_stats()
print(f"Total hits: {final_stats['hits']}")
print(f"Total misses: {final_stats['misses']}")
print(f"Total cached tokens: {final_stats['cached_tokens']}")
print(f"Total tokens: {final_stats['total_tokens']}")
print(f"Overall hit rate: {final_stats['hit_rate']:.2%}")
print("\n=== Cache Testing Complete ===")
if __name__ == "__main__":
test_conversation_cache()
Loading…
Cancel
Save