Implement SupabaseConversation functionality and comprehensive test suite

- Added main script (test.py) to demonstrate SupabaseConversation features including message addition, retrieval, updating, and deletion.
- Integrated environment variable loading for Supabase configuration.
- Developed a detailed test suite (test_supabase_conversation.py) covering various functionalities such as message handling, error handling, and conversation management.
- Included tests for JSON and YAML operations, concurrent operations, and enhanced error handling.
- Utilized rich console output for improved test reporting and visualization.
- Ensured robust cleanup of test data after each test execution.
pull/861/head
harshalmore31 6 days ago
parent 764961c1a0
commit 9e24767b8a

@ -0,0 +1,212 @@
"""
Example usage of the SupabaseConversation class for the Swarms Framework.
This example demonstrates how to:
1. Initialize a SupabaseConversation with automatic table creation
2. Add messages of different types
3. Query and search messages
4. Export/import conversations
5. Get conversation statistics
Prerequisites:
1. Install supabase-py: pip install supabase
2. Set up a Supabase project with valid URL and API key
3. Set environment variables (table will be created automatically)
Automatic Table Creation:
The SupabaseConversation will automatically create the required table if it doesn't exist.
For optimal results, you can optionally create this RPC function in your Supabase SQL Editor:
CREATE OR REPLACE FUNCTION exec_sql(sql TEXT)
RETURNS TEXT AS $$
BEGIN
EXECUTE sql;
RETURN 'SUCCESS';
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
Environment Variables:
- SUPABASE_URL: Your Supabase project URL
- SUPABASE_KEY: Your Supabase anon/service key
"""
import os
import json
from swarms.communication.supabase_wrap import (
SupabaseConversation,
MessageType,
SupabaseOperationError,
SupabaseConnectionError
)
from swarms.communication.base_communication import Message
def main():
# Load environment variables
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_KEY")
if not supabase_url or not supabase_key:
print("Error: SUPABASE_URL and SUPABASE_KEY environment variables must be set.")
print("Please create a .env file with these values or set them in your environment.")
return
try:
# Initialize SupabaseConversation
print("🚀 Initializing SupabaseConversation with automatic table creation...")
conversation = SupabaseConversation(
supabase_url=supabase_url,
supabase_key=supabase_key,
system_prompt="You are a helpful AI assistant.",
time_enabled=True,
enable_logging=True,
table_name="conversations",
)
print(f"✅ Successfully initialized! Conversation ID: {conversation.get_conversation_id()}")
print("📋 Table created automatically if it didn't exist!")
# Add various types of messages
print("\n📝 Adding messages...")
# Add user message
user_msg_id = conversation.add(
role="user",
content="Hello! Can you help me understand Supabase?",
message_type=MessageType.USER,
metadata={"source": "example_script", "priority": "high"}
)
print(f"Added user message (ID: {user_msg_id})")
# Add assistant message with complex content
assistant_content = {
"response": "Of course! Supabase is an open-source Firebase alternative with a PostgreSQL database.",
"confidence": 0.95,
"topics": ["database", "backend", "realtime"]
}
assistant_msg_id = conversation.add(
role="assistant",
content=assistant_content,
message_type=MessageType.ASSISTANT,
metadata={"model": "gpt-4", "tokens_used": 150}
)
print(f"Added assistant message (ID: {assistant_msg_id})")
# Add system message
system_msg_id = conversation.add(
role="system",
content="User is asking about Supabase features.",
message_type=MessageType.SYSTEM
)
print(f"Added system message (ID: {system_msg_id})")
# Batch add multiple messages
print("\n📦 Batch adding messages...")
batch_messages = [
Message(
role="user",
content="What are the main features of Supabase?",
message_type=MessageType.USER,
metadata={"follow_up": True}
),
Message(
role="assistant",
content="Supabase provides: database, auth, realtime subscriptions, edge functions, and storage.",
message_type=MessageType.ASSISTANT,
metadata={"comprehensive": True}
)
]
batch_ids = conversation.batch_add(batch_messages)
print(f"Batch added {len(batch_ids)} messages: {batch_ids}")
# Get conversation as string
print("\n💬 Current conversation:")
print(conversation.get_str())
# Search for messages
print("\n🔍 Searching for messages containing 'Supabase':")
search_results = conversation.search("Supabase")
for result in search_results:
print(f" - ID {result['id']}: {result['role']} - {result['content'][:50]}...")
# Get conversation statistics
print("\n📊 Conversation statistics:")
stats = conversation.get_conversation_summary()
print(json.dumps(stats, indent=2, default=str))
# Get messages by role
print("\n👤 User messages:")
user_messages = conversation.get_messages_by_role("user")
for msg in user_messages:
print(f" - {msg['content']}")
# Update a message
print(f"\n✏️ Updating message {user_msg_id}...")
conversation.update(
index=str(user_msg_id),
role="user",
content="Hello! Can you help me understand Supabase and its key features?"
)
print("Message updated successfully!")
# Query a specific message
print(f"\n🔎 Querying message {assistant_msg_id}:")
queried_msg = conversation.query(str(assistant_msg_id))
if queried_msg:
print(f" Role: {queried_msg['role']}")
print(f" Content: {queried_msg['content']}")
print(f" Timestamp: {queried_msg['timestamp']}")
# Export conversation
print("\n💾 Exporting conversation...")
conversation.export_conversation("supabase_conversation_export.yaml")
print("Conversation exported to supabase_conversation_export.yaml")
# Get conversation organized by role
print("\n📋 Messages organized by role:")
by_role = conversation.get_conversation_by_role_dict()
for role, messages in by_role.items():
print(f" {role}: {len(messages)} messages")
# Get timeline
print("\n📅 Conversation timeline:")
timeline = conversation.get_conversation_timeline_dict()
for date, messages in timeline.items():
print(f" {date}: {len(messages)} messages")
# Test delete (be careful with this in production!)
print(f"\n🗑️ Deleting system message {system_msg_id}...")
conversation.delete(str(system_msg_id))
print("System message deleted successfully!")
# Final message count
final_stats = conversation.get_conversation_summary()
print(f"\n📈 Final conversation has {final_stats['total_messages']} messages")
# Start a new conversation
print("\n🆕 Starting a new conversation...")
new_conv_id = conversation.start_new_conversation()
print(f"New conversation started with ID: {new_conv_id}")
# Add a message to the new conversation
conversation.add(
role="user",
content="This is a new conversation!",
message_type=MessageType.USER
)
print("Added message to new conversation")
print("\n✅ Example completed successfully!")
except SupabaseConnectionError as e:
print(f"❌ Connection error: {e}")
print("Please check your Supabase URL and key.")
except SupabaseOperationError as e:
print(f"❌ Operation error: {e}")
print("Please check your database schema and permissions.")
except Exception as e:
print(f"❌ Unexpected error: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,42 @@
from swarms.communication.base_communication import (
BaseCommunication,
Message,
MessageType,
)
from swarms.communication.sqlite_wrap import SQLiteConversation
from swarms.communication.duckdb_wrap import DuckDBConversation
try:
from swarms.communication.supabase_wrap import (
SupabaseConversation,
SupabaseConnectionError,
SupabaseOperationError,
)
except ImportError:
# Supabase dependencies might not be installed
SupabaseConversation = None
SupabaseConnectionError = None
SupabaseOperationError = None
try:
from swarms.communication.redis_wrap import RedisConversation
except ImportError:
RedisConversation = None
try:
from swarms.communication.pulsar_struct import PulsarConversation
except ImportError:
PulsarConversation = None
__all__ = [
"BaseCommunication",
"Message",
"MessageType",
"SQLiteConversation",
"DuckDBConversation",
"SupabaseConversation",
"SupabaseConnectionError",
"SupabaseOperationError",
"RedisConversation",
"PulsarConversation",
]

File diff suppressed because it is too large Load Diff

@ -0,0 +1,151 @@
import os
from swarms.communication.supabase_wrap import SupabaseConversation, MessageType, SupabaseOperationError
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# --- Configuration ---
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
TABLE_NAME = "conversations" # Make sure this table exists in your Supabase DB
def main():
if not SUPABASE_URL or not SUPABASE_KEY:
print("Error: SUPABASE_URL and SUPABASE_KEY environment variables must be set.")
print("Please create a .env file with these values or set them in your environment.")
return
print(f"Attempting to connect to Supabase URL: {SUPABASE_URL[:20]}...") # Print partial URL for security
try:
# Initialize SupabaseConversation
print(f"\n--- Initializing SupabaseConversation for table '{TABLE_NAME}' ---")
convo = SupabaseConversation(
supabase_url=SUPABASE_URL,
supabase_key=SUPABASE_KEY,
table_name=TABLE_NAME,
time_enabled=True, # DB schema handles timestamps by default
enable_logging=True,
)
print(f"Initialized. Current Conversation ID: {convo.get_conversation_id()}")
# --- Add messages ---
print("\n--- Adding messages ---")
user_msg_id = convo.add("user", "Hello, Supabase!", message_type=MessageType.USER, metadata={"source": "test_script"})
print(f"Added user message. ID: {user_msg_id}")
assistant_msg_content = {"response": "Hi there! How can I help you today?", "confidence": 0.95}
assistant_msg_id = convo.add("assistant", assistant_msg_content, message_type=MessageType.ASSISTANT)
print(f"Added assistant message. ID: {assistant_msg_id}")
system_msg_id = convo.add("system", "Conversation started.", message_type=MessageType.SYSTEM)
print(f"Added system message. ID: {system_msg_id}")
# --- Display conversation ---
print("\n--- Displaying conversation ---")
convo.display_conversation()
# --- Get all messages for current conversation ---
print("\n--- Retrieving all messages for current conversation ---")
all_messages = convo.get_messages()
if all_messages:
print(f"Retrieved {len(all_messages)} messages:")
for msg in all_messages:
print(f" ID: {msg.get('id')}, Role: {msg.get('role')}, Content: {str(msg.get('content'))[:50]}...")
else:
print("No messages found.")
# --- Query a specific message ---
if user_msg_id:
print(f"\n--- Querying message with ID: {user_msg_id} ---")
queried_msg = convo.query(str(user_msg_id)) # Query expects string ID
if queried_msg:
print(f"Queried message: {queried_msg}")
else:
print(f"Message with ID {user_msg_id} not found.")
# --- Search messages ---
print("\n--- Searching for messages containing 'Supabase' ---")
search_results = convo.search("Supabase")
if search_results:
print(f"Found {len(search_results)} matching messages:")
for msg in search_results:
print(f" ID: {msg.get('id')}, Content: {str(msg.get('content'))[:50]}...")
else:
print("No messages found matching 'Supabase'.")
# --- Update a message ---
if assistant_msg_id:
print(f"\n--- Updating message with ID: {assistant_msg_id} ---")
new_content = {"response": "I am an updated assistant!", "confidence": 0.99}
convo.update(index_or_id=str(assistant_msg_id), content=new_content, metadata={"updated_by": "test_script"})
updated_msg = convo.query(str(assistant_msg_id))
print(f"Updated message: {updated_msg}")
# --- Get last message ---
print("\n--- Getting last message ---")
last_msg = convo.get_last_message_as_string()
print(f"Last message: {last_msg}")
# --- Export and Import (example) ---
# Create a dummy export file name based on conversation ID
export_filename_json = f"convo_{convo.get_conversation_id()}.json"
export_filename_yaml = f"convo_{convo.get_conversation_id()}.yaml"
print(f"\n--- Exporting conversation to {export_filename_json} and {export_filename_yaml} ---")
convo.save_as_json_on_export = True # Test JSON export
convo.export_conversation(export_filename_json)
convo.save_as_json_on_export = False # Switch to YAML for next export
convo.save_as_yaml_on_export = True
convo.export_conversation(export_filename_yaml)
print("\n--- Starting a new conversation and importing from JSON ---")
new_convo_id_before_import = convo.start_new_conversation()
print(f"New conversation started with ID: {new_convo_id_before_import}")
convo.import_conversation(export_filename_json) # This will start another new convo internally
print(f"Conversation imported from {export_filename_json}. Current ID: {convo.get_conversation_id()}")
convo.display_conversation()
# --- Delete a message ---
if system_msg_id: # Using system_msg_id from the *original* conversation for this demo
print(f"\n--- Attempting to delete message with ID: {system_msg_id} from a *previous* conversation (might not exist in current) ---")
# Note: After import, system_msg_id refers to an ID from a *previous* conversation.
# To robustly test delete, you'd query a message from the *current* imported conversation.
# For this example, we'll just show the call.
# Let's add a message to the *current* conversation and delete that one.
temp_msg_id_to_delete = convo.add("system", "This message will be deleted.")
print(f"Added temporary message with ID: {temp_msg_id_to_delete}")
convo.delete(str(temp_msg_id_to_delete))
print(f"Message with ID {temp_msg_id_to_delete} deleted (if it existed in current convo).")
if convo.query(str(temp_msg_id_to_delete)) is None:
print("Verified: Message no longer exists.")
else:
print("Warning: Message still exists or query failed.")
# --- Clear current conversation ---
print("\n--- Clearing current conversation ---")
convo.clear()
print(f"Conversation {convo.get_conversation_id()} cleared.")
if not convo.get_messages():
print("Verified: No messages in current conversation after clearing.")
print("\n--- Example Finished ---")
except SupabaseOperationError as e:
print(f"Supabase Connection Error: {e}")
except SupabaseOperationError as e:
print(f"Supabase Operation Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

@ -1,7 +1,13 @@
import os import os
import sys
from pathlib import Path from pathlib import Path
import tempfile import tempfile
import threading import threading
# Add the project root to Python path to allow imports
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from swarms.communication.duckdb_wrap import ( from swarms.communication.duckdb_wrap import (
DuckDBConversation, DuckDBConversation,
Message, Message,

@ -1,7 +1,14 @@
import json import json
import datetime import datetime
import os import os
import sys
from pathlib import Path
from typing import Dict, List, Any, Tuple from typing import Dict, List, Any, Tuple
# Add the project root to Python path to allow imports
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from loguru import logger from loguru import logger
from swarms.communication.sqlite_wrap import ( from swarms.communication.sqlite_wrap import (
SQLiteConversation, SQLiteConversation,
@ -12,9 +19,9 @@ from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.panel import Panel from rich.panel import Panel
# Initialize logger
console = Console() console = Console()
def print_test_header(test_name: str) -> None: def print_test_header(test_name: str) -> None:
"""Print a formatted test header.""" """Print a formatted test header."""
console.print( console.print(

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save