diff --git a/conversation_example_supabase.py b/conversation_example_supabase.py index a70fed47..1767d833 100644 --- a/conversation_example_supabase.py +++ b/conversation_example_supabase.py @@ -15,7 +15,7 @@ Prerequisites: """ import os -from typing import List, Callable +from typing import List from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.structs.multi_agent_exec import run_agents_concurrently @@ -24,6 +24,7 @@ from swarms.utils.history_output_formatter import ( HistoryOutputType, ) + def aggregator_agent_task_prompt( task: str, workers: List[Agent], conversation: Conversation ): @@ -52,7 +53,7 @@ def aggregator_agent_task_prompt( def create_research_agents() -> List[Agent]: """Create a team of specialized research agents.""" - + # Data Analyst Agent data_analyst = Agent( agent_name="DataAnalyst", @@ -70,7 +71,7 @@ def create_research_agents() -> List[Agent]: verbose=True, output_type="string", ) - + # Research Specialist Agent researcher = Agent( agent_name="ResearchSpecialist", @@ -88,10 +89,10 @@ def create_research_agents() -> List[Agent]: verbose=True, output_type="string", ) - + # Strategic Advisor Agent strategist = Agent( - agent_name="StrategicAdvisor", + agent_name="StrategicAdvisor", agent_description="Expert in strategic planning, business strategy, and decision-making", system_prompt="""You are a strategic advisor with expertise in: - Strategic planning and business strategy @@ -106,7 +107,7 @@ def create_research_agents() -> List[Agent]: verbose=True, output_type="string", ) - + return [data_analyst, researcher, strategist] @@ -148,7 +149,7 @@ def aggregate_with_supabase( ): """ Aggregate agent responses using Supabase for conversation storage. - + Args: workers: List of Agent instances task: The task to execute @@ -158,56 +159,62 @@ def aggregate_with_supabase( supabase_url: Supabase project URL supabase_key: Supabase API key """ - + if task is None: raise ValueError("Task is required for agent aggregation") - + if not workers: raise ValueError("At least one worker agent is required") - + if not all(isinstance(worker, Agent) for worker in workers): raise ValueError("All workers must be Agent instances") - + # Set up Supabase conversation storage conversation_kwargs = {} if backend == "supabase": url = supabase_url or os.getenv("SUPABASE_URL") key = supabase_key or os.getenv("SUPABASE_ANON_KEY") - + if not url or not key: raise ValueError( "Supabase backend requires SUPABASE_URL and SUPABASE_ANON_KEY " "environment variables or explicit parameters" ) - - conversation_kwargs.update({ - "supabase_url": url, - "supabase_key": key, - }) + + conversation_kwargs.update( + { + "supabase_url": url, + "supabase_key": key, + } + ) try: # Create conversation with Supabase backend conversation = Conversation( - backend=backend, + backend=backend, **conversation_kwargs, system_prompt="Multi-agent collaboration session with persistent storage", time_enabled=True, ) - print(f"โœ… Successfully initialized {backend} backend for conversation storage") - + print( + f"โœ… Successfully initialized {backend} backend for conversation storage" + ) + # Add initial task to conversation conversation.add("system", f"Task: {task}") - + except ImportError as e: print(f"โŒ Backend initialization failed: {e}") - print(f"๐Ÿ’ก Falling back to in-memory storage") + print("๐Ÿ’ก Falling back to in-memory storage") conversation = Conversation(backend="in-memory") # Create aggregator agent aggregator_agent = create_aggregator_agent() - print(f"๐Ÿš€ Starting multi-agent execution with {len(workers)} agents...") - + print( + f"๐Ÿš€ Starting multi-agent execution with {len(workers)} agents..." + ) + # Run agents concurrently results = run_agents_concurrently(agents=workers, task=task) @@ -217,7 +224,7 @@ def aggregate_with_supabase( print(f"๐Ÿ“ Stored response from {agent.agent_name}") print("๐Ÿ”„ Running aggregation analysis...") - + # Generate aggregated analysis final_result = aggregator_agent.run( task=aggregator_agent_task_prompt(task, workers, conversation) @@ -225,12 +232,11 @@ def aggregate_with_supabase( # Store aggregated result conversation.add( - content=final_result, - role=aggregator_agent.agent_name + content=final_result, role=aggregator_agent.agent_name ) print("โœ… Aggregation complete!") - + # Return formatted history return history_output_formatter( conversation=conversation, type=type @@ -239,41 +245,51 @@ def aggregate_with_supabase( # Example usage with real Swarms agents if __name__ == "__main__": - print("๐Ÿงช Testing Swarms Multi-Agent System with Supabase Backend") - print("="*70) - + print( + "๐Ÿงช Testing Swarms Multi-Agent System with Supabase Backend" + ) + print("=" * 70) + # Check environment setup print("\nโš™๏ธ Environment Setup Check") print("-" * 40) - + supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_ANON_KEY") openai_key = os.getenv("OPENAI_API_KEY") - - print(f"SUPABASE_URL: {'โœ… Set' if supabase_url else 'โŒ Not set'}") - print(f"SUPABASE_ANON_KEY: {'โœ… Set' if supabase_key else 'โŒ Not set'}") - print(f"OPENAI_API_KEY: {'โœ… Set' if openai_key else 'โŒ Not set'}") - + + print( + f"SUPABASE_URL: {'โœ… Set' if supabase_url else 'โŒ Not set'}" + ) + print( + f"SUPABASE_ANON_KEY: {'โœ… Set' if supabase_key else 'โŒ Not set'}" + ) + print( + f"OPENAI_API_KEY: {'โœ… Set' if openai_key else 'โŒ Not set'}" + ) + if not (supabase_url and supabase_key): print("\nโš ๏ธ Missing Supabase configuration!") print("Please set the following environment variables:") print("export SUPABASE_URL=https://your-project.supabase.co") print("export SUPABASE_ANON_KEY=your-anon-key") print("\nFalling back to demonstration with mock data...") - + if not openai_key: print("\nโš ๏ธ Missing OpenAI API key!") print("Please set: export OPENAI_API_KEY=your-api-key") - print("You can also use other LLM providers (Anthropic, Google, etc.)") + print( + "You can also use other LLM providers (Anthropic, Google, etc.)" + ) # Example 1: Basic Multi-Agent Research Task print("\n๐Ÿ“ฆ Example 1: Multi-Agent Market Research") print("-" * 50) - + try: # Create research team research_team = create_research_agents() - + # Define research task research_task = """ Analyze the current state and future prospects of artificial intelligence @@ -281,10 +297,12 @@ if __name__ == "__main__": regulatory challenges, and investment opportunities. Provide insights on key players, emerging technologies, and potential risks. """ - + print(f"๐Ÿ“‹ Task: {research_task.strip()}") - print(f"๐Ÿ‘ฅ Team: {[agent.agent_name for agent in research_team]}") - + print( + f"๐Ÿ‘ฅ Team: {[agent.agent_name for agent in research_team]}" + ) + if supabase_url and supabase_key and openai_key: # Run with real agents and Supabase storage result = aggregate_with_supabase( @@ -295,21 +313,23 @@ if __name__ == "__main__": supabase_url=supabase_url, supabase_key=supabase_key, ) - + print("\n๐Ÿ“Š Research Results:") print("=" * 50) print(result) - + else: - print("โŒ Skipping real agent execution due to missing configuration") - + print( + "โŒ Skipping real agent execution due to missing configuration" + ) + except Exception as e: print(f"โŒ Error in multi-agent research: {e}") # Example 2: Simple Conversation Storage Test print("\n๐Ÿ“ฆ Example 2: Direct Conversation Storage Test") print("-" * 50) - + try: if supabase_url and supabase_key: # Test direct conversation with Supabase @@ -319,37 +339,62 @@ if __name__ == "__main__": supabase_key=supabase_key, time_enabled=True, ) - + print("โœ… Supabase conversation created successfully") - + # Add sample conversation - conv.add("user", "What are the latest trends in AI technology?") - conv.add("assistant", "Based on current developments, key AI trends include:") - conv.add("assistant", "1. Large Language Models (LLMs) advancing rapidly") - conv.add("assistant", "2. Multimodal AI combining text, image, and video") - conv.add("assistant", "3. AI agents becoming more autonomous and capable") + conv.add( + "user", "What are the latest trends in AI technology?" + ) + conv.add( + "assistant", + "Based on current developments, key AI trends include:", + ) + conv.add( + "assistant", + "1. Large Language Models (LLMs) advancing rapidly", + ) + conv.add( + "assistant", + "2. Multimodal AI combining text, image, and video", + ) + conv.add( + "assistant", + "3. AI agents becoming more autonomous and capable", + ) conv.add("user", "How do these trends affect businesses?") - conv.add("assistant", "These trends are transforming businesses through automation, enhanced decision-making, and new product capabilities.") - + conv.add( + "assistant", + "These trends are transforming businesses through automation, enhanced decision-making, and new product capabilities.", + ) + # Test conversation operations print(f"๐Ÿ“Š Message count: {len(conv.to_dict())}") - print(f"๐Ÿ” Search results for 'AI': {len(conv.search('AI'))}") - print(f"๐Ÿ“ˆ Role distribution: {conv.count_messages_by_role()}") - + print( + f"๐Ÿ” Search results for 'AI': {len(conv.search('AI'))}" + ) + print( + f"๐Ÿ“ˆ Role distribution: {conv.count_messages_by_role()}" + ) + # Export conversation conv.export_conversation("supabase_ai_conversation.json") - print("๐Ÿ’พ Conversation exported to supabase_ai_conversation.json") - + print( + "๐Ÿ’พ Conversation exported to supabase_ai_conversation.json" + ) + else: - print("โŒ Skipping Supabase test due to missing configuration") - + print( + "โŒ Skipping Supabase test due to missing configuration" + ) + except Exception as e: print(f"โŒ Error in conversation storage test: {e}") # Example 3: Agent Creation and Configuration Demo print("\n๐Ÿ“ฆ Example 3: Agent Configuration Demo") print("-" * 50) - + try: if openai_key: # Create a simple agent for demonstration @@ -361,55 +406,71 @@ if __name__ == "__main__": max_loops=1, verbose=False, ) - + print("โœ… Demo agent created successfully") print(f"Agent: {demo_agent.agent_name}") print(f"Description: {demo_agent.agent_description}") - + # Test simple agent run simple_task = "Explain the benefits of using persistent conversation storage in AI applications." response = demo_agent.run(simple_task) - - print(f"\n๐Ÿ“ Agent Response:") + + print("\n๐Ÿ“ Agent Response:") print("-" * 30) - print(response[:500] + "..." if len(response) > 500 else response) - + print( + response[:500] + "..." + if len(response) > 500 + else response + ) + else: - print("โŒ Skipping agent demo due to missing OpenAI API key") - + print( + "โŒ Skipping agent demo due to missing OpenAI API key" + ) + except Exception as e: print(f"โŒ Error in agent demo: {e}") # Summary and Next Steps - print("\n" + "="*70) + print("\n" + "=" * 70) print("๐Ÿ Demo Summary") - print("="*70) - + print("=" * 70) + print("\nโœจ What was demonstrated:") print("1. ๐Ÿ—๏ธ Real Swarms agent creation with specialized roles") print("2. ๐Ÿ—„๏ธ Supabase backend integration for persistent storage") print("3. ๐Ÿค Multi-agent collaboration and response aggregation") print("4. ๐Ÿ’พ Conversation export and search capabilities") print("5. โš™๏ธ Proper error handling and graceful fallbacks") - + print("\n๐Ÿš€ Next Steps to get started:") print("1. Set up Supabase project: https://supabase.com") print("2. Configure environment variables") print("3. Install dependencies: pip install swarms supabase") print("4. Customize agents for your specific use cases") print("5. Scale to larger agent teams and complex workflows") - + print("\n๐Ÿ”— Resources:") print("- Swarms Documentation: https://docs.swarms.world") - print("- Supabase Python Docs: https://supabase.com/docs/reference/python/") + print( + "- Supabase Python Docs: https://supabase.com/docs/reference/python/" + ) print("- GitHub Repository: https://github.com/kyegomez/swarms") - - print(f"\nโš™๏ธ Final Configuration Status:") - print(f" SUPABASE_URL: {'โœ… Set' if supabase_url else 'โŒ Not set'}") - print(f" SUPABASE_ANON_KEY: {'โœ… Set' if supabase_key else 'โŒ Not set'}") - print(f" OPENAI_API_KEY: {'โœ… Set' if openai_key else 'โŒ Not set'}") - + + print("\nโš™๏ธ Final Configuration Status:") + print( + f" SUPABASE_URL: {'โœ… Set' if supabase_url else 'โŒ Not set'}" + ) + print( + f" SUPABASE_ANON_KEY: {'โœ… Set' if supabase_key else 'โŒ Not set'}" + ) + print( + f" OPENAI_API_KEY: {'โœ… Set' if openai_key else 'โŒ Not set'}" + ) + if supabase_url and supabase_key and openai_key: print("\n๐ŸŽ‰ All systems ready! You can run the full demo.") else: - print("\nโš ๏ธ Set missing environment variables to run the full demo.") \ No newline at end of file + print( + "\nโš ๏ธ Set missing environment variables to run the full demo." + ) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 9a4a8823..c975434f 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -315,6 +315,7 @@ nav: - Agent Output Types: "swarms/examples/agent_output_types.md" - Agent with Structured Outputs: "swarms/examples/agent_structured_outputs.md" - Agents with Vision: "swarms/examples/vision_processing.md" + - Gradio Chat Interface: "swarms/ui/main.md" - Various Model Providers: - OpenAI: "swarms/examples/openai_example.md" - Anthropic: "swarms/examples/claude.md" @@ -392,6 +393,7 @@ nav: - Swarms API Pricing: "swarms_cloud/api_pricing.md" - Swarms API Pricing in Chinese: "swarms_cloud/chinese_api_pricing.md" - Swarms Cloud Subscription Tiers: "swarms_cloud/subscription_tiers.md" + - Swarm Ecosystem APIs: - MCS API: "swarms_cloud/mcs_api.md" # - CreateNow API: "swarms_cloud/create_api.md" @@ -403,15 +405,14 @@ nav: - Overview: "swarms_platform/index.md" - Swarm Platform API Keys: "swarms_platform/apikeys.md" - Account Management: "swarms_platform/account_management.md" - - Swarms Chat Tutorial: "swarms/ui/main.md" - Swarms Rust: - Overview: "swarms_rs/overview.md" - Agents: "swarms_rs/agents.md" - - Governance: - - Resources: "governance/main.md" - - Tokenomics: "web3/token.md" + - Resources: + - Overview: "governance/main.md" + # - Tokenomics: "web3/token.md" # - Prompts API: diff --git a/swarms/cli/main.py b/swarms/cli/main.py index ef4151eb..48feec94 100644 --- a/swarms/cli/main.py +++ b/swarms/cli/main.py @@ -67,7 +67,7 @@ def show_ascii_art(): Text(ASCII_ART, style=f"bold {COLORS['primary']}"), border_style=COLORS["secondary"], title="[bold]Welcome to Swarms[/bold]", - subtitle="[dim]Power to the Swarms[/dim]", + subtitle="[dim]swarms.ai[/dim]", ) console.print(panel) diff --git a/swarms/communication/duckdb_wrap.py b/swarms/communication/duckdb_wrap.py index eb275dc6..d09f5e8b 100644 --- a/swarms/communication/duckdb_wrap.py +++ b/swarms/communication/duckdb_wrap.py @@ -78,6 +78,7 @@ class DuckDBConversation(BaseCommunication): # Lazy load duckdb with auto-installation try: import duckdb + self.duckdb = duckdb self.duckdb_available = True except ImportError: @@ -86,19 +87,20 @@ class DuckDBConversation(BaseCommunication): try: import subprocess import sys - + # Install duckdb - subprocess.check_call([ - sys.executable, "-m", "pip", "install", "duckdb" - ]) + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "duckdb"] + ) print("โœ… DuckDB installed successfully!") - + # Try importing again import duckdb + self.duckdb = duckdb self.duckdb_available = True print("โœ… DuckDB loaded successfully!") - + except Exception as e: raise ImportError( f"Failed to auto-install DuckDB. Please install manually with 'pip install duckdb': {e}" diff --git a/swarms/communication/pulsar_struct.py b/swarms/communication/pulsar_struct.py index cecac121..cc89c0c6 100644 --- a/swarms/communication/pulsar_struct.py +++ b/swarms/communication/pulsar_struct.py @@ -66,27 +66,37 @@ class PulsarConversation(BaseCommunication): # Lazy load Pulsar with auto-installation try: import pulsar + self.pulsar = pulsar self.pulsar_available = True except ImportError: # Auto-install pulsar-client if not available - print("๐Ÿ“ฆ Pulsar client not found. Installing automatically...") + print( + "๐Ÿ“ฆ Pulsar client not found. Installing automatically..." + ) try: import subprocess import sys - + # Install pulsar-client - subprocess.check_call([ - sys.executable, "-m", "pip", "install", "pulsar-client" - ]) + subprocess.check_call( + [ + sys.executable, + "-m", + "pip", + "install", + "pulsar-client", + ] + ) print("โœ… Pulsar client installed successfully!") - + # Try importing again import pulsar + self.pulsar = pulsar self.pulsar_available = True print("โœ… Pulsar loaded successfully!") - + except Exception as e: self.pulsar_available = False logger.error( @@ -646,6 +656,7 @@ class PulsarConversation(BaseCommunication): """ try: import pulsar + pulsar_available = True except ImportError: logger.error("Pulsar client library is not installed") diff --git a/swarms/communication/redis_wrap.py b/swarms/communication/redis_wrap.py index 40fc1505..575b62af 100644 --- a/swarms/communication/redis_wrap.py +++ b/swarms/communication/redis_wrap.py @@ -31,6 +31,7 @@ try: RedisError, TimeoutError, ) + REDIS_AVAILABLE = True except ImportError: # Auto-install Redis at import time @@ -38,13 +39,13 @@ except ImportError: try: import subprocess import sys - + # Install redis - subprocess.check_call([ - sys.executable, "-m", "pip", "install", "redis" - ]) + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "redis"] + ) print("โœ… Redis installed successfully!") - + # Try importing again import redis from redis.exceptions import ( @@ -54,12 +55,15 @@ except ImportError: RedisError, TimeoutError, ) + REDIS_AVAILABLE = True print("โœ… Redis loaded successfully!") - + except Exception as e: REDIS_AVAILABLE = False - print(f"โŒ Failed to auto-install Redis. Please install manually with 'pip install redis': {e}") + print( + f"โŒ Failed to auto-install Redis. Please install manually with 'pip install redis': {e}" + ) class RedisConnectionError(Exception): @@ -186,7 +190,11 @@ rdbchecksum yes try: if self.process: # Send SAVE and BGSAVE commands before stopping if persistence is enabled - if self.persist and self.auto_persist and REDIS_AVAILABLE: + if ( + self.persist + and self.auto_persist + and REDIS_AVAILABLE + ): try: r = redis.Redis( host="localhost", port=self.port @@ -328,14 +336,14 @@ class RedisConversation(BaseStructure): RedisOperationError: If Redis operations fail. """ global REDIS_AVAILABLE - + # Check if Redis is available (should be True after module import auto-installation) if not REDIS_AVAILABLE: raise ImportError( "Redis is not available. Module-level auto-installation failed. " "Please install manually with 'pip install redis'" ) - + self.redis_available = True super().__init__() diff --git a/swarms/communication/supabase_wrap.py b/swarms/communication/supabase_wrap.py index 098aa6e7..6a2d2c09 100644 --- a/swarms/communication/supabase_wrap.py +++ b/swarms/communication/supabase_wrap.py @@ -96,29 +96,39 @@ class SupabaseConversation(BaseCommunication): # Lazy load Supabase with auto-installation try: from supabase import Client, create_client + self.supabase_client = Client self.create_client = create_client self.supabase_available = True except ImportError: # Auto-install supabase if not available - print("๐Ÿ“ฆ Supabase not found. Installing automatically...") + print( + "๐Ÿ“ฆ Supabase not found. Installing automatically..." + ) try: import subprocess import sys - + # Install supabase - subprocess.check_call([ - sys.executable, "-m", "pip", "install", "supabase" - ]) + subprocess.check_call( + [ + sys.executable, + "-m", + "pip", + "install", + "supabase", + ] + ) print("โœ… Supabase installed successfully!") - + # Try importing again from supabase import Client, create_client + self.supabase_client = Client self.create_client = create_client self.supabase_available = True print("โœ… Supabase loaded successfully!") - + except Exception as e: self.supabase_available = False if logger: @@ -179,7 +189,9 @@ class SupabaseConversation(BaseCommunication): ) # For thread-safe operations if any (e.g. token calculation) try: - self.client = self.create_client(supabase_url, supabase_key) + self.client = self.create_client( + supabase_url, supabase_key + ) if self.enable_logging: self.logger.info( f"Successfully initialized Supabase client for URL: {supabase_url}" diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index aae1acaa..3b4052a1 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -53,41 +53,65 @@ def get_conversation_dir(): # Define available providers -providers = Literal["mem0", "in-memory", "supabase", "redis", "sqlite", "duckdb", "pulsar"] +providers = Literal[ + "mem0", + "in-memory", + "supabase", + "redis", + "sqlite", + "duckdb", + "pulsar", +] + def _create_backend_conversation(backend: str, **kwargs): """ Create a backend conversation instance based on the specified backend type. - + This function uses lazy loading to import backend dependencies only when needed. Each backend class handles its own dependency management and error messages. - + Args: backend (str): The backend type to create **kwargs: Arguments to pass to the backend constructor - + Returns: Backend conversation instance - + Raises: ImportError: If required packages for the backend are not installed (raised by lazy loading) ValueError: If backend is not supported """ try: if backend == "supabase": - from swarms.communication.supabase_wrap import SupabaseConversation + from swarms.communication.supabase_wrap import ( + SupabaseConversation, + ) + return SupabaseConversation(**kwargs) elif backend == "redis": - from swarms.communication.redis_wrap import RedisConversation + from swarms.communication.redis_wrap import ( + RedisConversation, + ) + return RedisConversation(**kwargs) elif backend == "sqlite": - from swarms.communication.sqlite_wrap import SQLiteConversation + from swarms.communication.sqlite_wrap import ( + SQLiteConversation, + ) + return SQLiteConversation(**kwargs) elif backend == "duckdb": - from swarms.communication.duckdb_wrap import DuckDBConversation + from swarms.communication.duckdb_wrap import ( + DuckDBConversation, + ) + return DuckDBConversation(**kwargs) elif backend == "pulsar": - from swarms.communication.pulsar_struct import PulsarConversation + from swarms.communication.pulsar_struct import ( + PulsarConversation, + ) + return PulsarConversation(**kwargs) else: raise ValueError( @@ -103,8 +127,10 @@ def _create_backend_conversation(backend: str, **kwargs): "duckdb": "pip install duckdb", "pulsar": "pip install pulsar-client", } - - install_cmd = backend_deps.get(backend, f"Check documentation for {backend}") + + install_cmd = backend_deps.get( + backend, f"Check documentation for {backend}" + ) logger.error( f"Failed to initialize {backend} backend. " f"Missing dependencies. Install with: {install_cmd}" @@ -190,7 +216,6 @@ class Conversation(BaseStructure): auto_persist: bool = True, redis_data_dir: Optional[str] = None, conversations_dir: Optional[str] = None, - *args, **kwargs, ): @@ -202,7 +227,15 @@ class Conversation(BaseStructure): self.backend_instance = None # Validate backend - valid_backends = ["in-memory", "mem0", "supabase", "redis", "sqlite", "duckdb", "pulsar"] + valid_backends = [ + "in-memory", + "mem0", + "supabase", + "redis", + "sqlite", + "duckdb", + "pulsar", + ] if self.backend not in valid_backends: raise ValueError( f"Invalid backend: '{self.backend}'. " @@ -243,7 +276,13 @@ class Conversation(BaseStructure): self.conversations_dir = conversations_dir # Initialize backend if using persistent storage - if self.backend in ["supabase", "redis", "sqlite", "duckdb", "pulsar"]: + if self.backend in [ + "supabase", + "redis", + "sqlite", + "duckdb", + "pulsar", + ]: try: self._initialize_backend( supabase_url=supabase_url, @@ -258,7 +297,7 @@ class Conversation(BaseStructure): persist_redis=persist_redis, auto_persist=auto_persist, redis_data_dir=redis_data_dir, - **kwargs + **kwargs, ) except Exception as e: logger.warning( @@ -275,7 +314,7 @@ class Conversation(BaseStructure): def _initialize_backend(self, **kwargs): """ Initialize the persistent storage backend. - + Args: **kwargs: Backend-specific configuration parameters """ @@ -297,52 +336,78 @@ class Conversation(BaseStructure): # Add backend-specific parameters if self.backend == "supabase": - supabase_url = kwargs.get("supabase_url") or os.getenv("SUPABASE_URL") - supabase_key = kwargs.get("supabase_key") or os.getenv("SUPABASE_ANON_KEY") - + supabase_url = kwargs.get("supabase_url") or os.getenv( + "SUPABASE_URL" + ) + supabase_key = kwargs.get("supabase_key") or os.getenv( + "SUPABASE_ANON_KEY" + ) + if not supabase_url or not supabase_key: raise ValueError( "Supabase backend requires 'supabase_url' and 'supabase_key' parameters " "or SUPABASE_URL and SUPABASE_ANON_KEY environment variables" ) - backend_kwargs.update({ - "supabase_url": supabase_url, - "supabase_key": supabase_key, - "table_name": kwargs.get("table_name", "conversations"), - }) - + backend_kwargs.update( + { + "supabase_url": supabase_url, + "supabase_key": supabase_key, + "table_name": kwargs.get( + "table_name", "conversations" + ), + } + ) + elif self.backend == "redis": - backend_kwargs.update({ - "redis_host": kwargs.get("redis_host", "localhost"), - "redis_port": kwargs.get("redis_port", 6379), - "redis_db": kwargs.get("redis_db", 0), - "redis_password": kwargs.get("redis_password"), - "use_embedded_redis": kwargs.get("use_embedded_redis", True), - "persist_redis": kwargs.get("persist_redis", True), - "auto_persist": kwargs.get("auto_persist", True), - "redis_data_dir": kwargs.get("redis_data_dir"), - "conversation_id": self.id, - "name": self.name, - }) - + backend_kwargs.update( + { + "redis_host": kwargs.get( + "redis_host", "localhost" + ), + "redis_port": kwargs.get("redis_port", 6379), + "redis_db": kwargs.get("redis_db", 0), + "redis_password": kwargs.get("redis_password"), + "use_embedded_redis": kwargs.get( + "use_embedded_redis", True + ), + "persist_redis": kwargs.get( + "persist_redis", True + ), + "auto_persist": kwargs.get("auto_persist", True), + "redis_data_dir": kwargs.get("redis_data_dir"), + "conversation_id": self.id, + "name": self.name, + } + ) + elif self.backend in ["sqlite", "duckdb"]: db_path = kwargs.get("db_path") if db_path: backend_kwargs["db_path"] = db_path - + elif self.backend == "pulsar": # Add pulsar-specific parameters - backend_kwargs.update({ - "pulsar_url": kwargs.get("pulsar_url", "pulsar://localhost:6650"), - "topic": kwargs.get("topic", f"conversation-{self.id}"), - }) + backend_kwargs.update( + { + "pulsar_url": kwargs.get( + "pulsar_url", "pulsar://localhost:6650" + ), + "topic": kwargs.get( + "topic", f"conversation-{self.id}" + ), + } + ) # Create the backend instance logger.info(f"Initializing {self.backend} backend...") - self.backend_instance = _create_backend_conversation(self.backend, **backend_kwargs) - + self.backend_instance = _create_backend_conversation( + self.backend, **backend_kwargs + ) + # Log successful initialization - logger.info(f"Successfully initialized {self.backend} backend for conversation '{self.name}'") + logger.info( + f"Successfully initialized {self.backend} backend for conversation '{self.name}'" + ) def setup(self): # Set up conversations directory @@ -473,7 +538,9 @@ class Conversation(BaseStructure): ) else: # Fallback to in-memory if mem0 is not available - logger.warning("Mem0 provider not available, falling back to in-memory storage") + logger.warning( + "Mem0 provider not available, falling back to in-memory storage" + ) self.add_in_memory(role, content) def add( @@ -486,9 +553,13 @@ class Conversation(BaseStructure): # If using a persistent backend, delegate to it if self.backend_instance: try: - return self.backend_instance.add(role=role, content=content, metadata=metadata) + return self.backend_instance.add( + role=role, content=content, metadata=metadata + ) except Exception as e: - logger.error(f"Backend add failed: {e}. Falling back to in-memory.") + logger.error( + f"Backend add failed: {e}. Falling back to in-memory." + ) return self.add_in_memory(role, content) elif self.provider == "in-memory": return self.add_in_memory(role, content) @@ -570,7 +641,9 @@ class Conversation(BaseStructure): """ if self.backend_instance: try: - return self.backend_instance.update(index, role, content) + return self.backend_instance.update( + index, role, content + ) except Exception as e: logger.error(f"Backend update failed: {e}") raise @@ -615,7 +688,7 @@ class Conversation(BaseStructure): logger.error(f"Backend search failed: {e}") # Fallback to in-memory search pass - + return [ message for message in self.conversation_history @@ -630,12 +703,14 @@ class Conversation(BaseStructure): """ if self.backend_instance: try: - return self.backend_instance.display_conversation(detailed) + return self.backend_instance.display_conversation( + detailed + ) except Exception as e: logger.error(f"Backend display failed: {e}") # Fallback to in-memory display pass - + # In-memory display implementation with proper formatting for message in self.conversation_history: content = message.get("content", "") @@ -668,21 +743,25 @@ class Conversation(BaseStructure): if self.backend_instance: try: - return self.backend_instance.export_conversation(filename, *args, **kwargs) + return self.backend_instance.export_conversation( + filename, *args, **kwargs + ) except Exception as e: logger.error(f"Backend export failed: {e}") # Fallback to in-memory export pass - + # In-memory export implementation # If the filename ends with .json, use save_as_json if filename.endswith(".json"): self.save_as_json(filename) else: # Simple text export for non-JSON files - with open(filename, "w",encoding="utf-8") as f: + with open(filename, "w", encoding="utf-8") as f: for message in self.conversation_history: - f.write(f"{message['role']}: {message['content']}\n") + f.write( + f"{message['role']}: {message['content']}\n" + ) def import_conversation(self, filename: str): """Import a conversation history from a file. @@ -692,7 +771,9 @@ class Conversation(BaseStructure): """ if self.backend_instance: try: - return self.backend_instance.import_conversation(filename) + return self.backend_instance.import_conversation( + filename + ) except Exception as e: logger.error(f"Backend import failed: {e}") # Fallback to in-memory import @@ -710,7 +791,9 @@ class Conversation(BaseStructure): try: return self.backend_instance.count_messages_by_role() except Exception as e: - logger.error(f"Backend count_messages_by_role failed: {e}") + logger.error( + f"Backend count_messages_by_role failed: {e}" + ) # Fallback to local implementation below pass # Initialize counts with expected roles @@ -720,7 +803,7 @@ class Conversation(BaseStructure): "assistant": 0, "function": 0, } - + # Count messages by role for message in self.conversation_history: role = message["role"] @@ -729,8 +812,9 @@ class Conversation(BaseStructure): else: # Handle unexpected roles dynamically counts[role] = counts.get(role, 0) + 1 - + return counts + def return_history_as_string(self): """Return the conversation history as a string. @@ -739,12 +823,16 @@ class Conversation(BaseStructure): """ if self.backend_instance: try: - return self.backend_instance.return_history_as_string() + return ( + self.backend_instance.return_history_as_string() + ) except Exception as e: - logger.error(f"Backend return_history_as_string failed: {e}") + logger.error( + f"Backend return_history_as_string failed: {e}" + ) # Fallback to in-memory implementation pass - + formatted_messages = [] for message in self.conversation_history: formatted_messages.append( @@ -781,7 +869,7 @@ class Conversation(BaseStructure): except Exception as e: logger.error(f"Backend save_as_json failed: {e}") # Fallback to local save implementation below - + # Don't save if saving is disabled if not self.save_enabled: return @@ -1000,9 +1088,13 @@ class Conversation(BaseStructure): """ if self.backend_instance: try: - return self.backend_instance.get_last_message_as_string() + return ( + self.backend_instance.get_last_message_as_string() + ) except Exception as e: - logger.error(f"Backend get_last_message_as_string failed: {e}") + logger.error( + f"Backend get_last_message_as_string failed: {e}" + ) # Fallback to in-memory implementation pass elif self.provider == "mem0": @@ -1025,7 +1117,9 @@ class Conversation(BaseStructure): try: return self.backend_instance.return_messages_as_list() except Exception as e: - logger.error(f"Backend return_messages_as_list failed: {e}") + logger.error( + f"Backend return_messages_as_list failed: {e}" + ) # Fallback to in-memory implementation pass return [ @@ -1041,9 +1135,13 @@ class Conversation(BaseStructure): """ if self.backend_instance: try: - return self.backend_instance.return_messages_as_dictionary() + return ( + self.backend_instance.return_messages_as_dictionary() + ) except Exception as e: - logger.error(f"Backend return_messages_as_dictionary failed: {e}") + logger.error( + f"Backend return_messages_as_dictionary failed: {e}" + ) # Fallback to in-memory implementation pass return [ @@ -1099,9 +1197,13 @@ class Conversation(BaseStructure): """ if self.backend_instance: try: - return self.backend_instance.get_final_message_content() + return ( + self.backend_instance.get_final_message_content() + ) except Exception as e: - logger.error(f"Backend get_final_message_content failed: {e}") + logger.error( + f"Backend get_final_message_content failed: {e}" + ) # Fallback to in-memory implementation pass if self.conversation_history: @@ -1119,7 +1221,9 @@ class Conversation(BaseStructure): try: return self.backend_instance.return_all_except_first() except Exception as e: - logger.error(f"Backend return_all_except_first failed: {e}") + logger.error( + f"Backend return_all_except_first failed: {e}" + ) # Fallback to in-memory implementation pass return self.conversation_history[2:] @@ -1132,9 +1236,13 @@ class Conversation(BaseStructure): """ if self.backend_instance: try: - return self.backend_instance.return_all_except_first_string() + return ( + self.backend_instance.return_all_except_first_string() + ) except Exception as e: - logger.error(f"Backend return_all_except_first_string failed: {e}") + logger.error( + f"Backend return_all_except_first_string failed: {e}" + ) # Fallback to in-memory implementation pass return "\n".join( diff --git a/tests/communication/test_redis.py b/tests/communication/test_redis.py index 194dfe6e..8f8acafd 100644 --- a/tests/communication/test_redis.py +++ b/tests/communication/test_redis.py @@ -86,7 +86,9 @@ class RedisConversationTester: """Initialize Redis server and conversation for testing.""" try: # Try first with external Redis (if available) - logger.info("Trying to connect to external Redis server...") + logger.info( + "Trying to connect to external Redis server..." + ) self.conversation = RedisConversation( system_prompt="Test System Prompt", redis_host="localhost", @@ -94,10 +96,14 @@ class RedisConversationTester: redis_retry_attempts=1, use_embedded_redis=False, # Try external first ) - logger.info("Successfully connected to external Redis server") + logger.info( + "Successfully connected to external Redis server" + ) return True except Exception as external_error: - logger.info(f"External Redis connection failed: {external_error}") + logger.info( + f"External Redis connection failed: {external_error}" + ) logger.info("Trying to start embedded Redis server...") try: @@ -109,10 +115,14 @@ class RedisConversationTester: redis_retry_attempts=3, use_embedded_redis=True, ) - logger.info("Successfully started embedded Redis server") + logger.info( + "Successfully started embedded Redis server" + ) return True except Exception as embedded_error: - logger.error(f"Both external and embedded Redis failed:") + logger.error( + "Both external and embedded Redis failed:" + ) logger.error(f" External: {external_error}") logger.error(f" Embedded: {embedded_error}") return False @@ -122,10 +132,16 @@ class RedisConversationTester: if self.conversation: try: # Check if we have an embedded server to stop - if hasattr(self.conversation, 'embedded_server') and self.conversation.embedded_server is not None: + if ( + hasattr(self.conversation, "embedded_server") + and self.conversation.embedded_server is not None + ): self.conversation.embedded_server.stop() # Close Redis client if it exists - if hasattr(self.conversation, 'redis_client') and self.conversation.redis_client: + if ( + hasattr(self.conversation, "redis_client") + and self.conversation.redis_client + ): self.conversation.redis_client.close() except Exception as e: logger.warning(f"Error during cleanup: {str(e)}") @@ -151,16 +167,22 @@ class RedisConversationTester: json_content = {"key": "value", "nested": {"data": 123}} self.conversation.add("system", json_content) last_message = self.conversation.get_final_message_content() - + # Parse the JSON string back to dict for comparison if isinstance(last_message, str): try: parsed_content = json.loads(last_message) - assert isinstance(parsed_content, dict), "Failed to handle JSON message" + assert isinstance( + parsed_content, dict + ), "Failed to handle JSON message" except json.JSONDecodeError: - assert False, "JSON message was not stored as valid JSON" + assert ( + False + ), "JSON message was not stored as valid JSON" else: - assert isinstance(last_message, dict), "Failed to handle JSON message" + assert isinstance( + last_message, dict + ), "Failed to handle JSON message" def test_search(self): """Test search functionality.""" @@ -175,7 +197,9 @@ class RedisConversationTester: ) if initial_count > 0: self.conversation.delete(0) - new_count = len(self.conversation.return_messages_as_list()) + new_count = len( + self.conversation.return_messages_as_list() + ) assert ( new_count == initial_count - 1 ), "Failed to delete message" @@ -228,7 +252,9 @@ class RedisConversationTester: self.conversation.add("user", "token test message") time.sleep(1) # Wait for async token counting messages = self.conversation.to_dict() - assert isinstance(messages, list), "Token counting test completed" + assert isinstance( + messages, list + ), "Token counting test completed" def test_cache_operations(self): """Test cache operations.""" @@ -254,8 +280,10 @@ class RedisConversationTester: try: if not self.setup(): - logger.warning("Failed to setup Redis connection. This is expected on systems without Redis server.") - + logger.warning( + "Failed to setup Redis connection. This is expected on systems without Redis server." + ) + # Generate a report indicating the limitation setup_failed_md = [ "# Redis Conversation Test Results", @@ -265,9 +293,9 @@ class RedisConversationTester: "## Summary", "โŒ **Redis Server Setup Failed**", "", - "The Redis conversation class will work properly when a Redis server is available." + "The Redis conversation class will work properly when a Redis server is available.", ] - + return "\n".join(setup_failed_md) tests = [ @@ -304,12 +332,16 @@ def main(): # Save results to file try: - with open("redis_test_results.md", "w", encoding="utf-8") as f: + with open( + "redis_test_results.md", "w", encoding="utf-8" + ) as f: f.write(markdown_results) - logger.info("Test results have been saved to redis_test_results.md") + logger.info( + "Test results have been saved to redis_test_results.md" + ) except Exception as e: logger.error(f"Failed to save test results: {e}") - + # Also print results to console print(markdown_results)