Remove Supabase example usage and related test files; update SupabaseConversation imports and error handling in communication module.

pull/861/head
harshalmore31 4 months ago
parent baaddca45f
commit a7ca3b2d4e

@ -1,212 +0,0 @@
"""
Example usage of the SupabaseConversation class for the Swarms Framework.
This example demonstrates how to:
1. Initialize a SupabaseConversation with automatic table creation
2. Add messages of different types
3. Query and search messages
4. Export/import conversations
5. Get conversation statistics
Prerequisites:
1. Install supabase-py: pip install supabase
2. Set up a Supabase project with valid URL and API key
3. Set environment variables (table will be created automatically)
Automatic Table Creation:
The SupabaseConversation will automatically create the required table if it doesn't exist.
For optimal results, you can optionally create this RPC function in your Supabase SQL Editor:
CREATE OR REPLACE FUNCTION exec_sql(sql TEXT)
RETURNS TEXT AS $$
BEGIN
EXECUTE sql;
RETURN 'SUCCESS';
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
Environment Variables:
- SUPABASE_URL: Your Supabase project URL
- SUPABASE_KEY: Your Supabase anon/service key
"""
import os
import json
from swarms.communication.supabase_wrap import (
SupabaseConversation,
MessageType,
SupabaseOperationError,
SupabaseConnectionError
)
from swarms.communication.base_communication import Message
def main():
# Load environment variables
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_KEY")
if not supabase_url or not supabase_key:
print("Error: SUPABASE_URL and SUPABASE_KEY environment variables must be set.")
print("Please create a .env file with these values or set them in your environment.")
return
try:
# Initialize SupabaseConversation
print("🚀 Initializing SupabaseConversation with automatic table creation...")
conversation = SupabaseConversation(
supabase_url=supabase_url,
supabase_key=supabase_key,
system_prompt="You are a helpful AI assistant.",
time_enabled=True,
enable_logging=True,
table_name="conversations",
)
print(f"✅ Successfully initialized! Conversation ID: {conversation.get_conversation_id()}")
print("📋 Table created automatically if it didn't exist!")
# Add various types of messages
print("\n📝 Adding messages...")
# Add user message
user_msg_id = conversation.add(
role="user",
content="Hello! Can you help me understand Supabase?",
message_type=MessageType.USER,
metadata={"source": "example_script", "priority": "high"}
)
print(f"Added user message (ID: {user_msg_id})")
# Add assistant message with complex content
assistant_content = {
"response": "Of course! Supabase is an open-source Firebase alternative with a PostgreSQL database.",
"confidence": 0.95,
"topics": ["database", "backend", "realtime"]
}
assistant_msg_id = conversation.add(
role="assistant",
content=assistant_content,
message_type=MessageType.ASSISTANT,
metadata={"model": "gpt-4", "tokens_used": 150}
)
print(f"Added assistant message (ID: {assistant_msg_id})")
# Add system message
system_msg_id = conversation.add(
role="system",
content="User is asking about Supabase features.",
message_type=MessageType.SYSTEM
)
print(f"Added system message (ID: {system_msg_id})")
# Batch add multiple messages
print("\n📦 Batch adding messages...")
batch_messages = [
Message(
role="user",
content="What are the main features of Supabase?",
message_type=MessageType.USER,
metadata={"follow_up": True}
),
Message(
role="assistant",
content="Supabase provides: database, auth, realtime subscriptions, edge functions, and storage.",
message_type=MessageType.ASSISTANT,
metadata={"comprehensive": True}
)
]
batch_ids = conversation.batch_add(batch_messages)
print(f"Batch added {len(batch_ids)} messages: {batch_ids}")
# Get conversation as string
print("\n💬 Current conversation:")
print(conversation.get_str())
# Search for messages
print("\n🔍 Searching for messages containing 'Supabase':")
search_results = conversation.search("Supabase")
for result in search_results:
print(f" - ID {result['id']}: {result['role']} - {result['content'][:50]}...")
# Get conversation statistics
print("\n📊 Conversation statistics:")
stats = conversation.get_conversation_summary()
print(json.dumps(stats, indent=2, default=str))
# Get messages by role
print("\n👤 User messages:")
user_messages = conversation.get_messages_by_role("user")
for msg in user_messages:
print(f" - {msg['content']}")
# Update a message
print(f"\n✏️ Updating message {user_msg_id}...")
conversation.update(
index=str(user_msg_id),
role="user",
content="Hello! Can you help me understand Supabase and its key features?"
)
print("Message updated successfully!")
# Query a specific message
print(f"\n🔎 Querying message {assistant_msg_id}:")
queried_msg = conversation.query(str(assistant_msg_id))
if queried_msg:
print(f" Role: {queried_msg['role']}")
print(f" Content: {queried_msg['content']}")
print(f" Timestamp: {queried_msg['timestamp']}")
# Export conversation
print("\n💾 Exporting conversation...")
conversation.export_conversation("supabase_conversation_export.yaml")
print("Conversation exported to supabase_conversation_export.yaml")
# Get conversation organized by role
print("\n📋 Messages organized by role:")
by_role = conversation.get_conversation_by_role_dict()
for role, messages in by_role.items():
print(f" {role}: {len(messages)} messages")
# Get timeline
print("\n📅 Conversation timeline:")
timeline = conversation.get_conversation_timeline_dict()
for date, messages in timeline.items():
print(f" {date}: {len(messages)} messages")
# Test delete (be careful with this in production!)
print(f"\n🗑️ Deleting system message {system_msg_id}...")
conversation.delete(str(system_msg_id))
print("System message deleted successfully!")
# Final message count
final_stats = conversation.get_conversation_summary()
print(f"\n📈 Final conversation has {final_stats['total_messages']} messages")
# Start a new conversation
print("\n🆕 Starting a new conversation...")
new_conv_id = conversation.start_new_conversation()
print(f"New conversation started with ID: {new_conv_id}")
# Add a message to the new conversation
conversation.add(
role="user",
content="This is a new conversation!",
message_type=MessageType.USER
)
print("Added message to new conversation")
print("\n✅ Example completed successfully!")
except SupabaseConnectionError as e:
print(f"❌ Connection error: {e}")
print("Please check your Supabase URL and key.")
except SupabaseOperationError as e:
print(f"❌ Operation error: {e}")
print("Please check your database schema and permissions.")
except Exception as e:
print(f"❌ Unexpected error: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,142 @@
{
"summary": {
"total_tests": 21,
"passed_tests": 21,
"failed_tests": 0,
"success_rate": 100.0,
"total_execution_time": 0.009189000000000001,
"average_execution_time": 0.0004375714285714286,
"timestamp": "2025-06-02T22:26:15.591748",
"supabase_available": false,
"environment_configured": true
},
"test_results": [
{
"test_name": "Import Availability",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Fallback Functionality",
"success": true,
"message": "True",
"execution_time": 0.001339
},
{
"test_name": "Initialization",
"success": true,
"message": "True",
"execution_time": 0.000999
},
{
"test_name": "Logging Configuration",
"success": true,
"message": "True",
"execution_time": 0.001114
},
{
"test_name": "Add Message",
"success": true,
"message": "True",
"execution_time": 0.001002
},
{
"test_name": "Add Complex Message",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Batch Add",
"success": true,
"message": "True",
"execution_time": 0.00048
},
{
"test_name": "Get String",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Get Messages",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Search Messages",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Update and Delete",
"success": true,
"message": "True",
"execution_time": 0.003416
},
{
"test_name": "Update Message Method",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Conversation Statistics",
"success": true,
"message": "True",
"execution_time": 0.00036
},
{
"test_name": "JSON Operations",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "YAML Operations",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Message Types",
"success": true,
"message": "True",
"execution_time": 0.000479
},
{
"test_name": "Conversation Management",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Get Messages by Role",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Timeline and Organization",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Concurrent Operations",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Enhanced Error Handling",
"success": true,
"message": "True",
"execution_time": 0.0
}
],
"failed_tests": []
}

@ -0,0 +1,142 @@
{
"summary": {
"total_tests": 21,
"passed_tests": 21,
"failed_tests": 0,
"success_rate": 100.0,
"total_execution_time": 0.026432,
"average_execution_time": 0.0012586666666666666,
"timestamp": "2025-06-02T22:28:42.681034",
"supabase_available": false,
"environment_configured": true
},
"test_results": [
{
"test_name": "Import Availability",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Fallback Functionality",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Initialization",
"success": true,
"message": "True",
"execution_time": 0.008308
},
{
"test_name": "Logging Configuration",
"success": true,
"message": "True",
"execution_time": 0.000999
},
{
"test_name": "Add Message",
"success": true,
"message": "True",
"execution_time": 0.001
},
{
"test_name": "Add Complex Message",
"success": true,
"message": "True",
"execution_time": 0.001009
},
{
"test_name": "Batch Add",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Get String",
"success": true,
"message": "True",
"execution_time": 0.004745
},
{
"test_name": "Get Messages",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Search Messages",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Update and Delete",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Update Message Method",
"success": true,
"message": "True",
"execution_time": 0.010371
},
{
"test_name": "Conversation Statistics",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "JSON Operations",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "YAML Operations",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Message Types",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Conversation Management",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Get Messages by Role",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Timeline and Organization",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Concurrent Operations",
"success": true,
"message": "True",
"execution_time": 0.0
},
{
"test_name": "Enhanced Error Handling",
"success": true,
"message": "True",
"execution_time": 0.0
}
],
"failed_tests": []
}

@ -6,6 +6,7 @@ from swarms.communication.base_communication import (
from swarms.communication.sqlite_wrap import SQLiteConversation from swarms.communication.sqlite_wrap import SQLiteConversation
from swarms.communication.duckdb_wrap import DuckDBConversation from swarms.communication.duckdb_wrap import DuckDBConversation
# Optional dependencies with graceful fallbacks
try: try:
from swarms.communication.supabase_wrap import ( from swarms.communication.supabase_wrap import (
SupabaseConversation, SupabaseConversation,
@ -13,7 +14,6 @@ try:
SupabaseOperationError, SupabaseOperationError,
) )
except ImportError: except ImportError:
# Supabase dependencies might not be installed
SupabaseConversation = None SupabaseConversation = None
SupabaseConnectionError = None SupabaseConnectionError = None
SupabaseOperationError = None SupabaseOperationError = None
@ -35,7 +35,7 @@ __all__ = [
"SQLiteConversation", "SQLiteConversation",
"DuckDBConversation", "DuckDBConversation",
"SupabaseConversation", "SupabaseConversation",
"SupabaseConnectionError", "SupabaseConnectionError",
"SupabaseOperationError", "SupabaseOperationError",
"RedisConversation", "RedisConversation",
"PulsarConversation", "PulsarConversation",

@ -79,14 +79,13 @@ class SupabaseConversation(BaseCommunication):
supabase_key: str, supabase_key: str,
system_prompt: Optional[str] = None, system_prompt: Optional[str] = None,
time_enabled: bool = False, time_enabled: bool = False,
autosave: bool = False, # Less relevant for DB-backed, but kept for interface autosave: bool = False, # Standardized parameter name - less relevant for DB-backed, but kept for interface
save_filepath: str = None, # Used for export/import save_filepath: str = None, # Used for export/import
tokenizer: Any = None, tokenizer: Any = None,
context_length: int = 8192, context_length: int = 8192,
rules: str = None, rules: str = None,
custom_rules_prompt: str = None, custom_rules_prompt: str = None,
user: str = "User:", user: str = "User:",
auto_save: bool = True, # Less relevant
save_as_yaml: bool = True, # Default export format save_as_yaml: bool = True, # Default export format
save_as_json_bool: bool = False, # Alternative export format save_as_json_bool: bool = False, # Alternative export format
token_count: bool = True, token_count: bool = True,
@ -114,7 +113,6 @@ class SupabaseConversation(BaseCommunication):
self.rules = rules self.rules = rules
self.custom_rules_prompt = custom_rules_prompt self.custom_rules_prompt = custom_rules_prompt
self.user = user self.user = user
self.auto_save = auto_save # Actual auto-saving to file is less relevant
self.save_as_yaml_on_export = save_as_yaml self.save_as_yaml_on_export = save_as_yaml
self.save_as_json_on_export = save_as_json_bool self.save_as_json_on_export = save_as_json_bool
self.calculate_token_count = token_count self.calculate_token_count = token_count
@ -346,30 +344,38 @@ CREATE POLICY "Users can manage their own conversations" ON {self.table_name}
return str(content) return str(content)
def _deserialize_content(self, content_str: str) -> Union[str, dict, list]: def _deserialize_content(self, content_str: str) -> Union[str, dict, list]:
"""Deserializes content from JSON string if it looks like JSON.""" """Deserializes content from JSON string if it looks like JSON. More robust approach."""
if not content_str:
return content_str
# Always try to parse as JSON first, fall back to string
try: try:
# Try to parse if it looks like a JSON object or array return json.loads(content_str)
if content_str.strip().startswith(("{", "[")): except (json.JSONDecodeError, TypeError):
return json.loads(content_str) # Not valid JSON, return as string
except json.JSONDecodeError: return content_str
pass # Not a valid JSON, return as string
return content_str
def _serialize_metadata(self, metadata: Optional[Dict]) -> Optional[str]: def _serialize_metadata(self, metadata: Optional[Dict]) -> Optional[str]:
"""Serializes metadata dict to JSON string.""" """Serializes metadata dict to JSON string using simplified encoder."""
if metadata is None: if metadata is None:
return None return None
return json.dumps(metadata, cls=DateTimeEncoder) try:
return json.dumps(metadata, default=str, ensure_ascii=False)
except (TypeError, ValueError) as e:
if self.enable_logging:
self.logger.warning(f"Failed to serialize metadata: {e}")
return None
def _deserialize_metadata(self, metadata_str: Optional[str]) -> Optional[Dict]: def _deserialize_metadata(self, metadata_str: Optional[str]) -> Optional[Dict]:
"""Deserializes metadata from JSON string.""" """Deserializes metadata from JSON string with better error handling."""
if metadata_str is None: if metadata_str is None:
return None return None
try: try:
return json.loads(metadata_str) return json.loads(metadata_str)
except json.JSONDecodeError: except (json.JSONDecodeError, TypeError) as e:
self.logger.warning(f"Failed to deserialize metadata: {metadata_str}") if self.enable_logging:
return None # Or return the string itself if preferred self.logger.warning(f"Failed to deserialize metadata: {metadata_str[:50]}... Error: {e}")
return None
def _generate_conversation_id(self) -> str: def _generate_conversation_id(self) -> str:
"""Generate a unique conversation ID using UUID and timestamp.""" """Generate a unique conversation ID using UUID and timestamp."""
@ -572,14 +578,19 @@ CREATE POLICY "Users can manage their own conversations" ON {self.table_name}
self.logger.error(f"Error deleting message ID {index} from Supabase: {e}") self.logger.error(f"Error deleting message ID {index} from Supabase: {e}")
raise SupabaseOperationError(f"Error deleting message ID {index}: {e}") raise SupabaseOperationError(f"Error deleting message ID {index}: {e}")
def update( def update(self, index: str, role: str, content: Union[str, dict]):
"""Update a message in the conversation history. Matches BaseCommunication signature exactly."""
# Use the flexible internal method
return self._update_flexible(index=index, role=role, content=content)
def _update_flexible(
self, self,
index: Union[str, int], index: Union[str, int],
role: Optional[str] = None, role: Optional[str] = None,
content: Optional[Union[str, dict]] = None, content: Optional[Union[str, dict]] = None,
metadata: Optional[Dict] = None metadata: Optional[Dict] = None
) -> bool: ) -> bool:
"""Update a message in the conversation history. Returns True if successful, False otherwise.""" """Internal flexible update method. Returns True if successful, False otherwise."""
if self.current_conversation_id is None: if self.current_conversation_id is None:
if self.enable_logging: if self.enable_logging:
self.logger.warning("Cannot update message: No current conversation.") self.logger.warning("Cannot update message: No current conversation.")
@ -638,10 +649,10 @@ CREATE POLICY "Users can manage their own conversations" ON {self.table_name}
self.logger.error(f"Error updating message ID {message_id} in Supabase: {e}") self.logger.error(f"Error updating message ID {message_id} in Supabase: {e}")
return False return False
def query(self, index: str) -> Optional[Dict]: def query(self, index: str) -> Dict:
"""Query a message in the conversation history by its primary key 'id'.""" """Query a message in the conversation history by its primary key 'id'. Returns empty dict if not found to match BaseCommunication signature."""
if self.current_conversation_id is None: if self.current_conversation_id is None:
return None return {}
try: try:
# Handle both string and int message IDs # Handle both string and int message IDs
try: try:
@ -649,7 +660,7 @@ CREATE POLICY "Users can manage their own conversations" ON {self.table_name}
except ValueError: except ValueError:
if self.enable_logging: if self.enable_logging:
self.logger.warning(f"Invalid message ID for query: {index}. Must be an integer.") self.logger.warning(f"Invalid message ID for query: {index}. Must be an integer.")
return None return {}
response = self.client.table(self.table_name).select("*") \ response = self.client.table(self.table_name).select("*") \
.eq("id", message_id) \ .eq("id", message_id) \
@ -660,11 +671,16 @@ CREATE POLICY "Users can manage their own conversations" ON {self.table_name}
data = self._handle_api_response(response, f"query_message (id: {message_id})") data = self._handle_api_response(response, f"query_message (id: {message_id})")
if data: if data:
return self._format_row_to_dict(data) return self._format_row_to_dict(data)
return None return {}
except Exception as e: except Exception as e:
if self.enable_logging: if self.enable_logging:
self.logger.error(f"Error querying message ID {index} from Supabase: {e}") self.logger.error(f"Error querying message ID {index} from Supabase: {e}")
return None return {}
def query_optional(self, index: str) -> Optional[Dict]:
"""Query a message and return None if not found. More precise return type."""
result = self.query(index)
return result if result else None
def search(self, keyword: str) -> List[Dict]: def search(self, keyword: str) -> List[Dict]:
"""Search for messages containing a keyword in their content.""" """Search for messages containing a keyword in their content."""
@ -1011,53 +1027,77 @@ CREATE POLICY "Users can manage their own conversations" ON {self.table_name}
} }
def truncate_memory_with_tokenizer(self): def truncate_memory_with_tokenizer(self):
"""Truncate the conversation history based on token count if a tokenizer is provided.""" """Truncate the conversation history based on token count if a tokenizer is provided. Optimized for better performance."""
if not self.tokenizer or self.current_conversation_id is None: if not self.tokenizer or self.current_conversation_id is None:
self.logger.info("Tokenizer not available or no current conversation, skipping truncation.") if self.enable_logging:
self.logger.info("Tokenizer not available or no current conversation, skipping truncation.")
return return
try: try:
messages = self.get_messages() # Fetches ordered by timestamp ASC # Fetch messages with only necessary fields for efficiency
response = self.client.table(self.table_name).select("id, content, token_count") \
# Calculate cumulative tokens from newest to oldest to decide cutoff .eq("conversation_id", self.current_conversation_id) \
# Or, from oldest to newest to keep newest messages within context_length .order("timestamp", desc=False) \
.execute()
# Let's keep newest messages: iterate backwards, then delete earlier ones. messages = self._handle_api_response(response, "fetch_messages_for_truncation")
# This is complex with current `delete` by ID. if not messages:
# A simpler approach: calculate total tokens, if > context_length, delete oldest ones. return
current_total_tokens = sum(
m.get("token_count", 0) if m.get("token_count") is not None
else (self.tokenizer.count_tokens(self._serialize_content(m["content"])) if self.calculate_token_count else 0)
for m in messages
)
tokens_to_remove = current_total_tokens - self.context_length # Calculate tokens and determine which messages to delete
total_tokens = 0
message_tokens = []
for msg in messages:
token_count = msg.get("token_count")
if token_count is None and self.calculate_token_count:
# Recalculate if missing
content = self._deserialize_content(msg.get("content", ""))
token_count = self.tokenizer.count_tokens(str(content))
message_tokens.append({
"id": msg["id"],
"tokens": token_count or 0
})
total_tokens += token_count or 0
tokens_to_remove = total_tokens - self.context_length
if tokens_to_remove <= 0: if tokens_to_remove <= 0:
return # No truncation needed return # No truncation needed
deleted_count = 0 # Collect IDs to delete (oldest first)
for msg in messages: # Oldest messages first ids_to_delete = []
for msg_info in message_tokens:
if tokens_to_remove <= 0: if tokens_to_remove <= 0:
break break
ids_to_delete.append(msg_info["id"])
msg_id = msg.get("id") tokens_to_remove -= msg_info["tokens"]
if not msg_id:
continue
msg_tokens = msg.get("token_count", 0) if not ids_to_delete:
if msg_tokens == 0 and self.calculate_token_count: # Recalculate if zero and enabled return
msg_tokens = self.tokenizer.count_tokens(self._serialize_content(msg["content"]))
# Batch delete for better performance
self.delete(msg_id) # Delete by primary key if len(ids_to_delete) == 1:
tokens_to_remove -= msg_tokens # Single delete
deleted_count +=1 response = self.client.table(self.table_name).delete() \
.eq("id", ids_to_delete[0]) \
.eq("conversation_id", self.current_conversation_id) \
.execute()
else:
# Batch delete using 'in' operator
response = self.client.table(self.table_name).delete() \
.in_("id", ids_to_delete) \
.eq("conversation_id", self.current_conversation_id) \
.execute()
self.logger.info(f"Truncated conversation {self.current_conversation_id}, removed {deleted_count} oldest messages.") self._handle_api_response(response, "truncate_conversation_batch_delete")
if self.enable_logging:
self.logger.info(f"Truncated conversation {self.current_conversation_id}, removed {len(ids_to_delete)} oldest messages.")
except Exception as e: except Exception as e:
self.logger.error(f"Error during memory truncation for conversation {self.current_conversation_id}: {e}") if self.enable_logging:
self.logger.error(f"Error during memory truncation for conversation {self.current_conversation_id}: {e}")
# Don't re-raise, truncation is best-effort # Don't re-raise, truncation is best-effort
# Methods from duckdb_wrap.py that seem generally useful and can be adapted # Methods from duckdb_wrap.py that seem generally useful and can be adapted
@ -1178,6 +1218,6 @@ CREATE POLICY "Users can manage their own conversations" ON {self.table_name}
content: Union[str, dict, list], content: Union[str, dict, list],
metadata: Optional[Dict] = None, metadata: Optional[Dict] = None,
) -> bool: ) -> bool:
"""Update an existing message.""" """Update an existing message. Matches BaseCommunication.update_message signature exactly."""
# Use the unified update method which now returns a boolean # Use the flexible internal method
return self.update(index=message_id, content=content, metadata=metadata) return self._update_flexible(index=message_id, content=content, metadata=metadata)

@ -1,151 +0,0 @@
import os
from swarms.communication.supabase_wrap import SupabaseConversation, MessageType, SupabaseOperationError
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# --- Configuration ---
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
TABLE_NAME = "conversations" # Make sure this table exists in your Supabase DB
def main():
if not SUPABASE_URL or not SUPABASE_KEY:
print("Error: SUPABASE_URL and SUPABASE_KEY environment variables must be set.")
print("Please create a .env file with these values or set them in your environment.")
return
print(f"Attempting to connect to Supabase URL: {SUPABASE_URL[:20]}...") # Print partial URL for security
try:
# Initialize SupabaseConversation
print(f"\n--- Initializing SupabaseConversation for table '{TABLE_NAME}' ---")
convo = SupabaseConversation(
supabase_url=SUPABASE_URL,
supabase_key=SUPABASE_KEY,
table_name=TABLE_NAME,
time_enabled=True, # DB schema handles timestamps by default
enable_logging=True,
)
print(f"Initialized. Current Conversation ID: {convo.get_conversation_id()}")
# --- Add messages ---
print("\n--- Adding messages ---")
user_msg_id = convo.add("user", "Hello, Supabase!", message_type=MessageType.USER, metadata={"source": "test_script"})
print(f"Added user message. ID: {user_msg_id}")
assistant_msg_content = {"response": "Hi there! How can I help you today?", "confidence": 0.95}
assistant_msg_id = convo.add("assistant", assistant_msg_content, message_type=MessageType.ASSISTANT)
print(f"Added assistant message. ID: {assistant_msg_id}")
system_msg_id = convo.add("system", "Conversation started.", message_type=MessageType.SYSTEM)
print(f"Added system message. ID: {system_msg_id}")
# --- Display conversation ---
print("\n--- Displaying conversation ---")
convo.display_conversation()
# --- Get all messages for current conversation ---
print("\n--- Retrieving all messages for current conversation ---")
all_messages = convo.get_messages()
if all_messages:
print(f"Retrieved {len(all_messages)} messages:")
for msg in all_messages:
print(f" ID: {msg.get('id')}, Role: {msg.get('role')}, Content: {str(msg.get('content'))[:50]}...")
else:
print("No messages found.")
# --- Query a specific message ---
if user_msg_id:
print(f"\n--- Querying message with ID: {user_msg_id} ---")
queried_msg = convo.query(str(user_msg_id)) # Query expects string ID
if queried_msg:
print(f"Queried message: {queried_msg}")
else:
print(f"Message with ID {user_msg_id} not found.")
# --- Search messages ---
print("\n--- Searching for messages containing 'Supabase' ---")
search_results = convo.search("Supabase")
if search_results:
print(f"Found {len(search_results)} matching messages:")
for msg in search_results:
print(f" ID: {msg.get('id')}, Content: {str(msg.get('content'))[:50]}...")
else:
print("No messages found matching 'Supabase'.")
# --- Update a message ---
if assistant_msg_id:
print(f"\n--- Updating message with ID: {assistant_msg_id} ---")
new_content = {"response": "I am an updated assistant!", "confidence": 0.99}
convo.update(index_or_id=str(assistant_msg_id), content=new_content, metadata={"updated_by": "test_script"})
updated_msg = convo.query(str(assistant_msg_id))
print(f"Updated message: {updated_msg}")
# --- Get last message ---
print("\n--- Getting last message ---")
last_msg = convo.get_last_message_as_string()
print(f"Last message: {last_msg}")
# --- Export and Import (example) ---
# Create a dummy export file name based on conversation ID
export_filename_json = f"convo_{convo.get_conversation_id()}.json"
export_filename_yaml = f"convo_{convo.get_conversation_id()}.yaml"
print(f"\n--- Exporting conversation to {export_filename_json} and {export_filename_yaml} ---")
convo.save_as_json_on_export = True # Test JSON export
convo.export_conversation(export_filename_json)
convo.save_as_json_on_export = False # Switch to YAML for next export
convo.save_as_yaml_on_export = True
convo.export_conversation(export_filename_yaml)
print("\n--- Starting a new conversation and importing from JSON ---")
new_convo_id_before_import = convo.start_new_conversation()
print(f"New conversation started with ID: {new_convo_id_before_import}")
convo.import_conversation(export_filename_json) # This will start another new convo internally
print(f"Conversation imported from {export_filename_json}. Current ID: {convo.get_conversation_id()}")
convo.display_conversation()
# --- Delete a message ---
if system_msg_id: # Using system_msg_id from the *original* conversation for this demo
print(f"\n--- Attempting to delete message with ID: {system_msg_id} from a *previous* conversation (might not exist in current) ---")
# Note: After import, system_msg_id refers to an ID from a *previous* conversation.
# To robustly test delete, you'd query a message from the *current* imported conversation.
# For this example, we'll just show the call.
# Let's add a message to the *current* conversation and delete that one.
temp_msg_id_to_delete = convo.add("system", "This message will be deleted.")
print(f"Added temporary message with ID: {temp_msg_id_to_delete}")
convo.delete(str(temp_msg_id_to_delete))
print(f"Message with ID {temp_msg_id_to_delete} deleted (if it existed in current convo).")
if convo.query(str(temp_msg_id_to_delete)) is None:
print("Verified: Message no longer exists.")
else:
print("Warning: Message still exists or query failed.")
# --- Clear current conversation ---
print("\n--- Clearing current conversation ---")
convo.clear()
print(f"Conversation {convo.get_conversation_id()} cleared.")
if not convo.get_messages():
print("Verified: No messages in current conversation after clearing.")
print("\n--- Example Finished ---")
except SupabaseOperationError as e:
print(f"Supabase Connection Error: {e}")
except SupabaseOperationError as e:
print(f"Supabase Operation Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

@ -1,13 +1,7 @@
import os import os
import sys
from pathlib import Path from pathlib import Path
import tempfile import tempfile
import threading import threading
# Add the project root to Python path to allow imports
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from swarms.communication.duckdb_wrap import ( from swarms.communication.duckdb_wrap import (
DuckDBConversation, DuckDBConversation,
Message, Message,

@ -1,14 +1,7 @@
import json import json
import datetime import datetime
import os import os
import sys
from pathlib import Path
from typing import Dict, List, Any, Tuple from typing import Dict, List, Any, Tuple
# Add the project root to Python path to allow imports
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from loguru import logger from loguru import logger
from swarms.communication.sqlite_wrap import ( from swarms.communication.sqlite_wrap import (
SQLiteConversation, SQLiteConversation,
@ -19,9 +12,9 @@ from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.panel import Panel from rich.panel import Panel
# Initialize logger
console = Console() console = Console()
def print_test_header(test_name: str) -> None: def print_test_header(test_name: str) -> None:
"""Print a formatted test header.""" """Print a formatted test header."""
console.print( console.print(

@ -478,20 +478,21 @@ def test_update_and_delete() -> bool:
# Add a message to update/delete # Add a message to update/delete
msg_id = conversation.add("user", "Original message") msg_id = conversation.add("user", "Original message")
# Test update method # Test update method (BaseCommunication signature)
conversation.update( conversation.update(
index=str(msg_id), index=str(msg_id),
role="user", role="user",
content="Updated message" content="Updated message"
) )
updated_msg = conversation.query(str(msg_id)) updated_msg = conversation.query_optional(str(msg_id))
assert updated_msg is not None, "Message should exist after update"
assert updated_msg["content"] == "Updated message", "Message should be updated" assert updated_msg["content"] == "Updated message", "Message should be updated"
# Test delete # Test delete
conversation.delete(str(msg_id)) conversation.delete(str(msg_id))
deleted_msg = conversation.query(str(msg_id)) deleted_msg = conversation.query_optional(str(msg_id))
assert deleted_msg is None, "Message should be deleted" assert deleted_msg is None, "Message should be deleted"
print("✓ Update and delete test passed") print("✓ Update and delete test passed")
@ -893,15 +894,20 @@ def test_enhanced_error_handling() -> bool:
# Test with valid conversation # Test with valid conversation
conversation = setup_test_conversation() conversation = setup_test_conversation()
try: try:
# Test querying non-existent message # Test querying non-existent message with query (should return empty dict)
non_existent = conversation.query("999999") non_existent = conversation.query("999999")
assert non_existent is None, "Non-existent message should return None" assert non_existent == {}, "Non-existent message should return empty dict"
# Test querying non-existent message with query_optional (should return None)
non_existent_opt = conversation.query_optional("999999")
assert non_existent_opt is None, "Non-existent message should return None with query_optional"
# Test deleting non-existent message (should not raise exception) # Test deleting non-existent message (should not raise exception)
conversation.delete("999999") # Should handle gracefully conversation.delete("999999") # Should handle gracefully
# Test updating non-existent message (should not raise exception) # Test updating non-existent message (should return False)
conversation.update("999999", "user", "content") # Should handle gracefully update_result = conversation._update_flexible("999999", "user", "content")
assert update_result == False, "_update_flexible should return False for invalid ID"
# Test update_message with invalid ID # Test update_message with invalid ID
result = conversation.update_message(999999, "invalid content") result = conversation.update_message(999999, "invalid content")
@ -911,24 +917,16 @@ def test_enhanced_error_handling() -> bool:
empty_results = conversation.search("") empty_results = conversation.search("")
assert isinstance(empty_results, list), "Empty search should return list" assert isinstance(empty_results, list), "Empty search should return list"
# Test invalid message ID formats # Test invalid message ID formats (should return empty dict now)
try: invalid_query = conversation.query("not_a_number")
conversation.query("not_a_number") assert invalid_query == {}, "Invalid ID should return empty dict"
assert False, "Should raise ValueError for non-numeric ID"
except ValueError:
pass # Expected
try: invalid_query_opt = conversation.query_optional("not_a_number")
conversation.update("not_a_number", "user", "content") assert invalid_query_opt is None, "Invalid ID should return None with query_optional"
assert False, "Should raise ValueError for non-numeric ID"
except ValueError:
pass # Expected
try: # Test update with invalid ID (should return False)
conversation.delete("not_a_number") invalid_update = conversation._update_flexible("not_a_number", "user", "content")
assert False, "Should raise ValueError for non-numeric ID" assert invalid_update == False, "Invalid ID should return False for update"
except ValueError:
pass # Expected
print("✓ Enhanced error handling test passed") print("✓ Enhanced error handling test passed")
return True return True

Loading…
Cancel
Save