Refactor RedisConversationTester: streamline test methods, enhance export/import functionality, and improve error handling for Redis availability

pull/866/head
harshalmore31 4 weeks ago
parent 9e6f254b13
commit a16cee4114

@ -8,6 +8,7 @@ from pathlib import Path
# Add the project root to Python path to allow imports
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from swarms.communication.redis_wrap import (
RedisConversation,
REDIS_AVAILABLE,
@ -189,18 +190,11 @@ class RedisConversationTester:
"""Test message update."""
# Add initial message
self.conversation.add("user", "original message")
# Get all messages to find the last message ID
all_messages = self.conversation.return_messages_as_list()
if len(all_messages) > 0:
# Update the last message (index 0 in this case means the first message)
# Note: This test may need adjustment based on how Redis stores messages
self.conversation.update(0, "user", "updated message")
# Get the message directly using query
updated_message = self.conversation.query(0)
# Since Redis might store content differently, just check that update didn't crash
assert True, "Update method executed successfully"
def test_clear(self):
@ -212,28 +206,14 @@ class RedisConversationTester:
def test_export_import(self):
"""Test export and import functionality."""
try:
self.conversation.add("user", "export test")
self.conversation.export_conversation("test_export.txt")
# Clear conversation
self.conversation.clear()
# Import back
self.conversation.import_conversation("test_export.txt")
messages = self.conversation.return_messages_as_list()
assert (
len(messages) > 0
), "Failed to export/import conversation"
# Cleanup test file
import os
if os.path.exists("test_export.txt"):
os.remove("test_export.txt")
except Exception as e:
logger.warning(f"Export/import test failed: {e}")
# Don't fail the test entirely, just log the warning
assert True, "Export/import test completed with warnings"
self.conversation.add("user", "export test")
self.conversation.export_conversation("test_export.txt")
self.conversation.clear()
self.conversation.import_conversation("test_export.txt")
messages = self.conversation.return_messages_as_list()
assert (
len(messages) > 0
), "Failed to export/import conversation"
def test_json_operations(self):
"""Test JSON operations."""
@ -254,7 +234,6 @@ class RedisConversationTester:
self.conversation.add("user", "token test message")
time.sleep(1) # Wait for async token counting
messages = self.conversation.to_dict()
# Token counting may not be implemented in Redis version, so just check it doesn't crash
assert isinstance(messages, list), "Token counting test completed"
def test_cache_operations(self):
@ -275,9 +254,9 @@ class RedisConversationTester:
"""Run all tests and generate report."""
if not REDIS_AVAILABLE:
logger.error(
"Redis is not available. The auto-installation should have handled this."
"Redis is not available. Please install redis package."
)
return "# Redis Tests Failed\n\nRedis package could not be loaded even after auto-installation."
return "# Redis Tests Failed\n\nRedis package is not installed."
try:
if not self.setup():
@ -326,8 +305,6 @@ class RedisConversationTester:
def main():
"""Main function to run tests and save results."""
logger.info(f"Starting Redis tests. REDIS_AVAILABLE: {REDIS_AVAILABLE}")
tester = RedisConversationTester()
markdown_results = tester.run_all_tests()

Loading…
Cancel
Save