diff --git a/.gitignore b/.gitignore index 0c1030a9..69630b70 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ databases static/generated conversations/ next_swarms_update.txt +.pytest_cache infra.md runs Financial-Analysis-Agent_state.json diff --git a/example.py b/example.py index 586d36b4..0a27c20c 100644 --- a/example.py +++ b/example.py @@ -19,4 +19,5 @@ out = agent.run( task="What are the top five best energy stocks across nuclear, solar, gas, and other energy sources?", n=1, ) + print(json.dumps(out, indent=4)) diff --git a/examples/utils/litellm_connect_issue.py b/examples/utils/litellm_connect_issue.py new file mode 100644 index 00000000..fd4a0788 --- /dev/null +++ b/examples/utils/litellm_connect_issue.py @@ -0,0 +1,10 @@ +from swarms.utils import LiteLLM, NetworkConnectionError + +model = LiteLLM(model_name="gpt-4o-mini") + +try: + response = model.run(task="Your task here") + print(response) +except NetworkConnectionError as e: + print(f"Network issue: {e}") + print("Trying to use local model") diff --git a/examples/utils/litellm_network_error_handling.py b/examples/utils/litellm_network_error_handling.py new file mode 100644 index 00000000..594bc43d --- /dev/null +++ b/examples/utils/litellm_network_error_handling.py @@ -0,0 +1,149 @@ +""" +Example demonstrating network error handling in LiteLLM wrapper. + +This example shows how the LiteLLM wrapper handles network connectivity issues +and provides helpful error messages to guide users to use local models like Ollama +when internet connection is unavailable. +""" + +from swarms.utils import LiteLLM, NetworkConnectionError + + +def example_with_network_handling(): + """ + Example of using LiteLLM with proper network error handling. + + This function demonstrates how to catch NetworkConnectionError + and handle it appropriately. + """ + # Initialize LiteLLM with a cloud model + model = LiteLLM( + model_name="gpt-4o-mini", + temperature=0.7, + max_tokens=1000, + ) + + try: + # Try to run the model + response = model.run( + task="Explain the concept of quantum entanglement in simple terms." + ) + print(f"Response: {response}") + + except NetworkConnectionError as e: + # Handle network connectivity issues + print(f"Network error detected: {e}") + print("\nFalling back to local model...") + + # Fallback to a local Ollama model + local_model = LiteLLM( + model_name="ollama/llama2", + temperature=0.7, + max_tokens=1000, + ) + + try: + response = local_model.run( + task="Explain the concept of quantum entanglement in simple terms." + ) + print(f"Local model response: {response}") + except Exception as local_error: + print(f"Local model error: {local_error}") + print("\nMake sure Ollama is installed and running:") + print("1. Install: https://ollama.ai") + print("2. Run: ollama pull llama2") + print("3. Start the server if not running") + + +def example_check_internet_connection(): + """ + Example of manually checking internet connectivity. + + This function demonstrates how to use the static method + to check internet connection before attempting API calls. + """ + # Check if internet is available + has_internet = LiteLLM.check_internet_connection() + + if has_internet: + print("✓ Internet connection available") + model = LiteLLM(model_name="gpt-4o-mini") + else: + print("✗ No internet connection detected") + print("Using local Ollama model instead...") + model = LiteLLM(model_name="ollama/llama2") + + # Use the model + try: + response = model.run(task="What is the meaning of life?") + print(f"Response: {response}") + except NetworkConnectionError as e: + print(f"Error: {e}") + + +def example_is_local_model(): + """ + Example of checking if a model is a local model. + + This function demonstrates how to determine if a model + is local or requires internet connectivity. + """ + # Check various model names + models = [ + "gpt-4o-mini", + "ollama/llama2", + "anthropic/claude-3", + "ollama/mistral", + "local/custom-model", + ] + + for model_name in models: + is_local = LiteLLM.is_local_model(model_name) + status = "Local" if is_local else "Cloud" + print(f"{model_name}: {status}") + + +def example_with_custom_base_url(): + """ + Example of using LiteLLM with a custom base URL. + + This demonstrates using a local model server with custom base URL. + """ + # Using Ollama with custom base URL + model = LiteLLM( + model_name="ollama/llama2", + base_url="http://localhost:11434", + temperature=0.7, + ) + + try: + response = model.run(task="Write a haiku about programming.") + print(f"Response: {response}") + except NetworkConnectionError as e: + print(f"Connection error: {e}") + print("\nTroubleshooting:") + print("- Ensure Ollama is running on localhost:11434") + print("- Check if the model is loaded: ollama list") + print("- Try: ollama serve") + + +if __name__ == "__main__": + print("=" * 70) + print("Example 1: Network Error Handling with Fallback") + print("=" * 70) + example_with_network_handling() + + print("\n" + "=" * 70) + print("Example 2: Manual Internet Connection Check") + print("=" * 70) + example_check_internet_connection() + + print("\n" + "=" * 70) + print("Example 3: Check if Model is Local") + print("=" * 70) + example_is_local_model() + + print("\n" + "=" * 70) + print("Example 4: Custom Base URL") + print("=" * 70) + example_with_custom_base_url() diff --git a/pyproject.toml b/pyproject.toml index 19af22a2..fcb9a14a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "8.5.3" +version = "8.5.4" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/agents/gkp_agent.py b/swarms/agents/gkp_agent.py index 29360bab..558e7671 100644 --- a/swarms/agents/gkp_agent.py +++ b/swarms/agents/gkp_agent.py @@ -553,7 +553,7 @@ class GKPAgent: ) return results - + def run(self, task: str) -> str: """ Run the GKP agent on a single task. @@ -565,7 +565,7 @@ class GKPAgent: str: The final answer """ return self._run([task])[0] - + def __call__(self, task: str) -> str: """ Run the GKP agent on a single task. diff --git a/swarms/utils/__init__.py b/swarms/utils/__init__.py index 05d147ef..835431e2 100644 --- a/swarms/utils/__init__.py +++ b/swarms/utils/__init__.py @@ -26,6 +26,11 @@ from swarms.utils.history_output_formatter import ( history_output_formatter, ) from swarms.utils.litellm_tokenizer import count_tokens +from swarms.utils.litellm_wrapper import ( + LiteLLM, + NetworkConnectionError, + LiteLLMException, +) from swarms.utils.output_types import HistoryOutputType from swarms.utils.parse_code import extract_code_from_markdown from swarms.utils.pdf_to_text import pdf_to_text @@ -52,4 +57,7 @@ __all__ = [ "load_agents_from_markdown", "dynamic_auto_chunking", "MarkdownAgentLoader", + "LiteLLM", + "NetworkConnectionError", + "LiteLLMException", ] diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index edf8a4f1..be97f5e7 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -4,6 +4,7 @@ import traceback import uuid from pathlib import Path from typing import List, Optional +import socket import litellm from pydantic import BaseModel @@ -18,6 +19,12 @@ class LiteLLMException(Exception): """ +class NetworkConnectionError(Exception): + """ + Exception raised when network connectivity issues are detected. + """ + + def get_audio_base64(audio_source: str) -> str: """ Convert audio data from a URL or local file path to a base64-encoded string. @@ -875,6 +882,69 @@ class LiteLLM: else: return False + @staticmethod + def check_internet_connection( + host: str = "8.8.8.8", port: int = 53, timeout: int = 3 + ) -> bool: + """ + Check if there is an active internet connection. + + This method attempts to establish a socket connection to a DNS server + (default is Google's DNS at 8.8.8.8) to verify internet connectivity. + + Args: + host (str, optional): The host to connect to for checking connectivity. + Defaults to "8.8.8.8" (Google DNS). + port (int, optional): The port to use for the connection. Defaults to 53 (DNS). + timeout (int, optional): Connection timeout in seconds. Defaults to 3. + + Returns: + bool: True if internet connection is available, False otherwise. + """ + try: + socket.setdefaulttimeout(timeout) + socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect( + (host, port) + ) + return True + except (socket.error, socket.timeout): + return False + + @staticmethod + def is_local_model( + model_name: str, base_url: Optional[str] = None + ) -> bool: + """ + Determine if the model is a local model (e.g., Ollama, LlamaCPP). + + Args: + model_name (str): The name of the model to check. + base_url (str, optional): The base URL if specified. Defaults to None. + + Returns: + bool: True if the model is a local model, False otherwise. + """ + local_indicators = [ + "ollama", + "llama-cpp", + "local", + "localhost", + "127.0.0.1", + "custom", + ] + + model_lower = model_name.lower() + is_local_model = any( + indicator in model_lower for indicator in local_indicators + ) + + is_local_url = base_url is not None and any( + indicator in base_url.lower() + for indicator in local_indicators + ) + + return is_local_model or is_local_url + def run( self, task: str, @@ -1024,10 +1094,74 @@ class LiteLLM: else: return response.choices[0].message.content + except ( + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + requests.exceptions.RequestException, + ConnectionError, + TimeoutError, + ) as network_error: + # Check if this is a local model + if self.is_local_model(self.model_name, self.base_url): + error_msg = ( + f"Network error connecting to local model '{self.model_name}': {str(network_error)}\n\n" + "Troubleshooting steps:\n" + "1. Ensure your local model server (e.g., Ollama, LlamaCPP) is running\n" + "2. Verify the base_url is correct and accessible\n" + "3. Check that the model is properly loaded and available\n" + ) + logger.error(error_msg) + raise NetworkConnectionError( + error_msg + ) from network_error + + # Check internet connectivity + has_internet = self.check_internet_connection() + + if not has_internet: + error_msg = ( + f"No internet connection detected while trying to use model '{self.model_name}'.\n\n" + "Possible solutions:\n" + "1. Check your internet connection and try again\n" + "2. Reconnect to your network\n" + "3. Use a local model instead (e.g., Ollama):\n" + " - Install Ollama from https://ollama.ai\n" + " - Run: ollama pull llama2\n" + " - Use model_name='ollama/llama2' in your LiteLLM configuration\n" + "\nExample:\n" + " model = LiteLLM(model_name='ollama/llama2')\n" + ) + logger.error(error_msg) + raise NetworkConnectionError( + error_msg + ) from network_error + else: + # Internet is available but request failed + error_msg = ( + f"Network error occurred while connecting to '{self.model_name}': {str(network_error)}\n\n" + "Possible causes:\n" + "1. The API endpoint may be temporarily unavailable\n" + "2. Connection timeout or slow network\n" + "3. Firewall or proxy blocking the connection\n" + "\nConsider using a local model as a fallback:\n" + " model = LiteLLM(model_name='ollama/llama2')\n" + ) + logger.error(error_msg) + raise NetworkConnectionError( + error_msg + ) from network_error + except LiteLLMException as error: logger.error( f"Error in LiteLLM run: {str(error)} Traceback: {traceback.format_exc()}" ) + raise + + except Exception as error: + logger.error( + f"Unexpected error in LiteLLM run: {str(error)} Traceback: {traceback.format_exc()}" + ) + raise def __call__(self, task: str, *args, **kwargs): """ diff --git a/tests/structs/test_board_of_directors_swarm.py b/tests/structs/test_board_of_directors_swarm.py index 7ad2ecdb..3001a296 100644 --- a/tests/structs/test_board_of_directors_swarm.py +++ b/tests/structs/test_board_of_directors_swarm.py @@ -137,7 +137,7 @@ def test_board_of_directors_swarm_error_handling(): """Test BoardOfDirectorsSwarm error handling and validation""" # Test with empty agents list try: - board_swarm = BoardOfDirectorsSwarm(agents=[]) + BoardOfDirectorsSwarm(agents=[]) assert ( False ), "Should have raised ValueError for empty agents list" @@ -153,7 +153,7 @@ def test_board_of_directors_swarm_error_handling(): ) try: - board_swarm = BoardOfDirectorsSwarm( + BoardOfDirectorsSwarm( agents=[analyst], max_loops=0 ) assert ( diff --git a/tests/structs/test_concurrent_workflow.py b/tests/structs/test_concurrent_workflow.py index e071efa4..1c75203b 100644 --- a/tests/structs/test_concurrent_workflow.py +++ b/tests/structs/test_concurrent_workflow.py @@ -139,7 +139,7 @@ def test_concurrent_workflow_error_handling(): """Test ConcurrentWorkflow error handling and validation""" # Test with empty agents list try: - workflow = ConcurrentWorkflow(agents=[]) + ConcurrentWorkflow(agents=[]) assert ( False ), "Should have raised ValueError for empty agents list" @@ -148,7 +148,7 @@ def test_concurrent_workflow_error_handling(): # Test with None agents try: - workflow = ConcurrentWorkflow(agents=None) + ConcurrentWorkflow(agents=None) assert False, "Should have raised ValueError for None agents" except ValueError as e: assert "No agents provided" in str(e) diff --git a/tests/structs/test_hierarchical_swarm.py b/tests/structs/test_hierarchical_swarm.py index a0206c06..acc78cf5 100644 --- a/tests/structs/test_hierarchical_swarm.py +++ b/tests/structs/test_hierarchical_swarm.py @@ -184,7 +184,7 @@ def test_hierarchical_swarm_error_handling(): """Test HierarchicalSwarm error handling""" # Test with empty agents list try: - swarm = HierarchicalSwarm(agents=[]) + HierarchicalSwarm(agents=[]) assert ( False ), "Should have raised ValueError for empty agents list" @@ -200,7 +200,7 @@ def test_hierarchical_swarm_error_handling(): ) try: - swarm = HierarchicalSwarm(agents=[researcher], max_loops=0) + HierarchicalSwarm(agents=[researcher], max_loops=0) assert ( False ), "Should have raised ValueError for invalid max_loops" diff --git a/tests/structs/test_majority_voting.py b/tests/structs/test_majority_voting.py index f1fa6c47..580028d5 100644 --- a/tests/structs/test_majority_voting.py +++ b/tests/structs/test_majority_voting.py @@ -151,7 +151,7 @@ def test_majority_voting_error_handling(): """Test MajorityVoting error handling and validation""" # Test with empty agents list try: - mv = MajorityVoting(agents=[]) + MajorityVoting(agents=[]) assert ( False ), "Should have raised ValueError for empty agents list" @@ -167,7 +167,7 @@ def test_majority_voting_error_handling(): ) try: - mv = MajorityVoting(agents=[analyst], max_loops=0) + MajorityVoting(agents=[analyst], max_loops=0) assert ( False ), "Should have raised ValueError for invalid max_loops" diff --git a/tests/structs/test_moa.py b/tests/structs/test_moa.py index 1d46d9b2..490e8f65 100644 --- a/tests/structs/test_moa.py +++ b/tests/structs/test_moa.py @@ -170,7 +170,7 @@ def test_mixture_of_agents_error_handling(): """Test MixtureOfAgents error handling and validation""" # Test with empty agents list try: - moa = MixtureOfAgents(agents=[]) + MixtureOfAgents(agents=[]) assert ( False ), "Should have raised ValueError for empty agents list" @@ -186,7 +186,7 @@ def test_mixture_of_agents_error_handling(): ) try: - moa = MixtureOfAgents( + MixtureOfAgents( agents=[analyst], aggregator_system_prompt="" ) assert ( diff --git a/tests/structs/test_reasoning_agent_router.py b/tests/structs/test_reasoning_agent_router.py index cc1d496a..cf5a8782 100644 --- a/tests/structs/test_reasoning_agent_router.py +++ b/tests/structs/test_reasoning_agent_router.py @@ -11,27 +11,33 @@ from swarms.agents.reasoning_agents import ( def test_router_initialization(): """ Test ReasoningAgentRouter initialization with various configurations. - + Tests: - Default initialization - Custom parameter initialization - All agent types initialization """ logger.info("Starting router initialization tests...") - + # Test 1: Default initialization logger.info("Test 1: Default initialization") try: router = ReasoningAgentRouter() assert router is not None, "Default router should not be None" - assert router.agent_name == "reasoning_agent", f"Expected 'reasoning_agent', got {router.agent_name}" - assert router.swarm_type == "reasoning-duo", f"Expected 'reasoning-duo', got {router.swarm_type}" - assert router.model_name == "gpt-4o-mini", f"Expected 'gpt-4o-mini', got {router.model_name}" + assert ( + router.agent_name == "reasoning_agent" + ), f"Expected 'reasoning_agent', got {router.agent_name}" + assert ( + router.swarm_type == "reasoning-duo" + ), f"Expected 'reasoning-duo', got {router.swarm_type}" + assert ( + router.model_name == "gpt-4o-mini" + ), f"Expected 'gpt-4o-mini', got {router.model_name}" logger.success("✓ Default initialization test passed") except Exception as e: logger.error(f"✗ Default initialization test failed: {e}") raise - + # Test 2: Custom parameters initialization logger.info("Test 2: Custom parameters initialization") try: @@ -49,49 +55,67 @@ def test_router_initialization(): eval=True, random_models_on=True, majority_voting_prompt="Custom voting prompt", - reasoning_model_name="claude-3-5-sonnet-20240620" + reasoning_model_name="claude-3-5-sonnet-20240620", + ) + assert ( + custom_router is not None + ), "Custom router should not be None" + assert ( + custom_router.agent_name == "test_agent" + ), f"Expected 'test_agent', got {custom_router.agent_name}" + assert ( + custom_router.swarm_type == "self-consistency" + ), f"Expected 'self-consistency', got {custom_router.swarm_type}" + assert ( + custom_router.max_loops == 5 + ), f"Expected 5, got {custom_router.max_loops}" + assert ( + custom_router.num_samples == 3 + ), f"Expected 3, got {custom_router.num_samples}" + logger.success( + "✓ Custom parameters initialization test passed" ) - assert custom_router is not None, "Custom router should not be None" - assert custom_router.agent_name == "test_agent", f"Expected 'test_agent', got {custom_router.agent_name}" - assert custom_router.swarm_type == "self-consistency", f"Expected 'self-consistency', got {custom_router.swarm_type}" - assert custom_router.max_loops == 5, f"Expected 5, got {custom_router.max_loops}" - assert custom_router.num_samples == 3, f"Expected 3, got {custom_router.num_samples}" - logger.success("✓ Custom parameters initialization test passed") except Exception as e: - logger.error(f"✗ Custom parameters initialization test failed: {e}") + logger.error( + f"✗ Custom parameters initialization test failed: {e}" + ) raise - + # Test 3: All agent types initialization logger.info("Test 3: All agent types initialization") agent_types = [ "reasoning-duo", - "reasoning-agent", + "reasoning-agent", "self-consistency", "consistency-agent", "ire", "ire-agent", "ReflexionAgent", "GKPAgent", - "AgentJudge" + "AgentJudge", ] - + for agent_type in agent_types: try: router = ReasoningAgentRouter(swarm_type=agent_type) - assert router is not None, f"Router for {agent_type} should not be None" - assert router.swarm_type == agent_type, f"Expected {agent_type}, got {router.swarm_type}" + assert ( + router is not None + ), f"Router for {agent_type} should not be None" + assert ( + router.swarm_type == agent_type + ), f"Expected {agent_type}, got {router.swarm_type}" logger.info(f"✓ {agent_type} initialization successful") except Exception as e: logger.error(f"✗ {agent_type} initialization failed: {e}") raise - + logger.success("✓ All router initialization tests passed") def test_reliability_check(): """ Test reliability_check method with various invalid configurations. - + Tests: - Zero max_loops - Empty model_name @@ -100,74 +124,108 @@ def test_reliability_check(): - None swarm_type """ logger.info("Starting reliability check tests...") - + # Test 1: Zero max_loops logger.info("Test 1: Zero max_loops should raise error") try: ReasoningAgentRouter(max_loops=0) - assert False, "Should have raised ReasoningAgentInitializationError" + assert ( + False + ), "Should have raised ReasoningAgentInitializationError" except ReasoningAgentInitializationError as e: - assert "Max loops must be greater than 0" in str(e), f"Expected max loops error, got: {e}" + assert "Max loops must be greater than 0" in str( + e + ), f"Expected max loops error, got: {e}" logger.success("✓ Zero max_loops error handling test passed") except Exception as e: - logger.error(f"✗ Zero max_loops test failed with unexpected error: {e}") + logger.error( + f"✗ Zero max_loops test failed with unexpected error: {e}" + ) raise - + # Test 2: Empty model_name logger.info("Test 2: Empty model_name should raise error") try: ReasoningAgentRouter(model_name="") - assert False, "Should have raised ReasoningAgentInitializationError" + assert ( + False + ), "Should have raised ReasoningAgentInitializationError" except ReasoningAgentInitializationError as e: - assert "Model name must be provided" in str(e), f"Expected model name error, got: {e}" - logger.success("✓ Empty model_name error handling test passed") + assert "Model name must be provided" in str( + e + ), f"Expected model name error, got: {e}" + logger.success( + "✓ Empty model_name error handling test passed" + ) except Exception as e: - logger.error(f"✗ Empty model_name test failed with unexpected error: {e}") + logger.error( + f"✗ Empty model_name test failed with unexpected error: {e}" + ) raise - + # Test 3: None model_name logger.info("Test 3: None model_name should raise error") try: ReasoningAgentRouter(model_name=None) - assert False, "Should have raised ReasoningAgentInitializationError" + assert ( + False + ), "Should have raised ReasoningAgentInitializationError" except ReasoningAgentInitializationError as e: - assert "Model name must be provided" in str(e), f"Expected model name error, got: {e}" + assert "Model name must be provided" in str( + e + ), f"Expected model name error, got: {e}" logger.success("✓ None model_name error handling test passed") except Exception as e: - logger.error(f"✗ None model_name test failed with unexpected error: {e}") + logger.error( + f"✗ None model_name test failed with unexpected error: {e}" + ) raise - + # Test 4: Empty swarm_type logger.info("Test 4: Empty swarm_type should raise error") try: ReasoningAgentRouter(swarm_type="") - assert False, "Should have raised ReasoningAgentInitializationError" + assert ( + False + ), "Should have raised ReasoningAgentInitializationError" except ReasoningAgentInitializationError as e: - assert "Swarm type must be provided" in str(e), f"Expected swarm type error, got: {e}" - logger.success("✓ Empty swarm_type error handling test passed") + assert "Swarm type must be provided" in str( + e + ), f"Expected swarm type error, got: {e}" + logger.success( + "✓ Empty swarm_type error handling test passed" + ) except Exception as e: - logger.error(f"✗ Empty swarm_type test failed with unexpected error: {e}") + logger.error( + f"✗ Empty swarm_type test failed with unexpected error: {e}" + ) raise - + # Test 5: None swarm_type logger.info("Test 5: None swarm_type should raise error") try: ReasoningAgentRouter(swarm_type=None) - assert False, "Should have raised ReasoningAgentInitializationError" + assert ( + False + ), "Should have raised ReasoningAgentInitializationError" except ReasoningAgentInitializationError as e: - assert "Swarm type must be provided" in str(e), f"Expected swarm type error, got: {e}" + assert "Swarm type must be provided" in str( + e + ), f"Expected swarm type error, got: {e}" logger.success("✓ None swarm_type error handling test passed") except Exception as e: - logger.error(f"✗ None swarm_type test failed with unexpected error: {e}") + logger.error( + f"✗ None swarm_type test failed with unexpected error: {e}" + ) raise - + logger.success("✓ All reliability check tests passed") def test_agent_factories(): """ Test all agent factory methods for each agent type. - + Tests: - _create_reasoning_duo - _create_consistency_agent @@ -177,7 +235,7 @@ def test_agent_factories(): - _create_gkp_agent """ logger.info("Starting agent factory tests...") - + # Test configuration test_config = { "agent_name": "test_agent", @@ -192,31 +250,39 @@ def test_agent_factories(): "eval": False, "random_models_on": False, "majority_voting_prompt": None, - "reasoning_model_name": "claude-3-5-sonnet-20240620" + "reasoning_model_name": "claude-3-5-sonnet-20240620", } - + # Test 1: Reasoning Duo factory logger.info("Test 1: _create_reasoning_duo") try: - router = ReasoningAgentRouter(swarm_type="reasoning-duo", **test_config) + router = ReasoningAgentRouter( + swarm_type="reasoning-duo", **test_config + ) agent = router._create_reasoning_duo() - assert agent is not None, "Reasoning duo agent should not be None" + assert ( + agent is not None + ), "Reasoning duo agent should not be None" logger.success("✓ _create_reasoning_duo test passed") except Exception as e: logger.error(f"✗ _create_reasoning_duo test failed: {e}") raise - + # Test 2: Consistency Agent factory logger.info("Test 2: _create_consistency_agent") try: - router = ReasoningAgentRouter(swarm_type="self-consistency", **test_config) + router = ReasoningAgentRouter( + swarm_type="self-consistency", **test_config + ) agent = router._create_consistency_agent() - assert agent is not None, "Consistency agent should not be None" + assert ( + agent is not None + ), "Consistency agent should not be None" logger.success("✓ _create_consistency_agent test passed") except Exception as e: logger.error(f"✗ _create_consistency_agent test failed: {e}") raise - + # Test 3: IRE Agent factory logger.info("Test 3: _create_ire_agent") try: @@ -227,97 +293,117 @@ def test_agent_factories(): except Exception as e: logger.error(f"✗ _create_ire_agent test failed: {e}") raise - + # Test 4: Agent Judge factory logger.info("Test 4: _create_agent_judge") try: - router = ReasoningAgentRouter(swarm_type="AgentJudge", **test_config) + router = ReasoningAgentRouter( + swarm_type="AgentJudge", **test_config + ) agent = router._create_agent_judge() assert agent is not None, "Agent judge should not be None" logger.success("✓ _create_agent_judge test passed") except Exception as e: logger.error(f"✗ _create_agent_judge test failed: {e}") raise - + # Test 5: Reflexion Agent factory logger.info("Test 5: _create_reflexion_agent") try: - router = ReasoningAgentRouter(swarm_type="ReflexionAgent", **test_config) + router = ReasoningAgentRouter( + swarm_type="ReflexionAgent", **test_config + ) agent = router._create_reflexion_agent() assert agent is not None, "Reflexion agent should not be None" logger.success("✓ _create_reflexion_agent test passed") except Exception as e: logger.error(f"✗ _create_reflexion_agent test failed: {e}") raise - + # Test 6: GKP Agent factory logger.info("Test 6: _create_gkp_agent") try: - router = ReasoningAgentRouter(swarm_type="GKPAgent", **test_config) + router = ReasoningAgentRouter( + swarm_type="GKPAgent", **test_config + ) agent = router._create_gkp_agent() assert agent is not None, "GKP agent should not be None" logger.success("✓ _create_gkp_agent test passed") except Exception as e: logger.error(f"✗ _create_gkp_agent test failed: {e}") raise - + logger.success("✓ All agent factory tests passed") def test_select_swarm(): """ Test select_swarm method for all supported agent types. - + Tests: - All valid agent types - Invalid agent type """ logger.info("Starting select_swarm tests...") - + agent_types = [ "reasoning-duo", - "reasoning-agent", + "reasoning-agent", "self-consistency", "consistency-agent", "ire", "ire-agent", "ReflexionAgent", "GKPAgent", - "AgentJudge" + "AgentJudge", ] - + # Test all valid agent types for agent_type in agent_types: logger.info(f"Test: select_swarm for {agent_type}") try: router = ReasoningAgentRouter(swarm_type=agent_type) swarm = router.select_swarm() - assert swarm is not None, f"Swarm for {agent_type} should not be None" - logger.success(f"✓ select_swarm for {agent_type} test passed") + assert ( + swarm is not None + ), f"Swarm for {agent_type} should not be None" + logger.success( + f"✓ select_swarm for {agent_type} test passed" + ) except Exception as e: - logger.error(f"✗ select_swarm for {agent_type} test failed: {e}") + logger.error( + f"✗ select_swarm for {agent_type} test failed: {e}" + ) raise - + # Test invalid agent type logger.info("Test: Invalid agent type should raise error") try: router = ReasoningAgentRouter(swarm_type="invalid_type") swarm = router.select_swarm() - assert False, "Should have raised ReasoningAgentInitializationError" + assert ( + False + ), "Should have raised ReasoningAgentInitializationError" except ReasoningAgentInitializationError as e: - assert "Invalid swarm type" in str(e), f"Expected invalid swarm type error, got: {e}" - logger.success("✓ Invalid agent type error handling test passed") + assert "Invalid swarm type" in str( + e + ), f"Expected invalid swarm type error, got: {e}" + logger.success( + "✓ Invalid agent type error handling test passed" + ) except Exception as e: - logger.error(f"✗ Invalid agent type test failed with unexpected error: {e}") + logger.error( + f"✗ Invalid agent type test failed with unexpected error: {e}" + ) raise - + logger.success("✓ All select_swarm tests passed") def test_run_method(): """ Test run method with different agent types and tasks. - + Tests: - Method structure and signature - Actual execution with mock tasks @@ -325,7 +411,7 @@ def test_run_method(): - Error handling for invalid inputs """ logger.info("Starting run method tests...") - + # Test configuration for different agent types test_configs = [ {"swarm_type": "reasoning-duo", "max_loops": 1}, @@ -333,32 +419,41 @@ def test_run_method(): {"swarm_type": "ire", "max_loops": 1}, {"swarm_type": "ReflexionAgent", "max_loops": 1}, {"swarm_type": "GKPAgent"}, - {"swarm_type": "AgentJudge", "max_loops": 1} + {"swarm_type": "AgentJudge", "max_loops": 1}, ] - + test_tasks = [ "What is 2+2?", "Explain the concept of recursion in programming.", - "List three benefits of renewable energy." + "List three benefits of renewable energy.", ] - + for config in test_configs: agent_type = config["swarm_type"] logger.info(f"Test: run method for {agent_type}") try: router = ReasoningAgentRouter(**config) - + # Test 1: Method structure logger.info(f"Test 1: Method structure for {agent_type}") - assert hasattr(router, 'run'), "Router should have run method" - assert callable(router.run), "run method should be callable" - + assert hasattr( + router, "run" + ), "Router should have run method" + assert callable( + router.run + ), "run method should be callable" + # Test method signature import inspect + sig = inspect.signature(router.run) - assert 'task' in sig.parameters, "run method should have 'task' parameter" - logger.success(f"✓ Method structure for {agent_type} test passed") - + assert ( + "task" in sig.parameters + ), "run method should have 'task' parameter" + logger.success( + f"✓ Method structure for {agent_type} test passed" + ) + # Test 2: Actual execution with mock tasks logger.info(f"Test 2: Actual execution for {agent_type}") for i, task in enumerate(test_tasks): @@ -367,17 +462,35 @@ def test_run_method(): # and catch the expected error to verify the method is working result = router.run(task) # If we get here (unlikely without API keys), verify result is not None - assert result is not None, f"Result for task {i+1} should not be None" - logger.info(f"✓ Task {i+1} execution successful for {agent_type}") + assert ( + result is not None + ), f"Result for task {i+1} should not be None" + logger.info( + f"✓ Task {i+1} execution successful for {agent_type}" + ) except Exception as run_error: # Expected to fail without API keys, but verify it's a reasonable error error_msg = str(run_error).lower() - if any(keyword in error_msg for keyword in ['api', 'key', 'auth', 'token', 'openai', 'anthropic']): - logger.info(f"✓ Task {i+1} failed as expected (no API key) for {agent_type}") + if any( + keyword in error_msg + for keyword in [ + "api", + "key", + "auth", + "token", + "openai", + "anthropic", + ] + ): + logger.info( + f"✓ Task {i+1} failed as expected (no API key) for {agent_type}" + ) else: # If it's not an API key error, it might be a real issue - logger.warning(f"Task {i+1} failed with unexpected error for {agent_type}: {run_error}") - + logger.warning( + f"Task {i+1} failed with unexpected error for {agent_type}: {run_error}" + ) + # Test 3: Error handling for invalid inputs logger.info(f"Test 3: Error handling for {agent_type}") try: @@ -387,8 +500,10 @@ def test_run_method(): logger.info(f"✓ Empty task handling for {agent_type}") except Exception: # This is also acceptable - empty task might be rejected - logger.info(f"✓ Empty task properly rejected for {agent_type}") - + logger.info( + f"✓ Empty task properly rejected for {agent_type}" + ) + try: # Test with None task result = router.run(None) @@ -396,21 +511,27 @@ def test_run_method(): logger.info(f"✓ None task handling for {agent_type}") except Exception: # This is also acceptable - None task might be rejected - logger.info(f"✓ None task properly rejected for {agent_type}") - - logger.success(f"✓ All run method tests for {agent_type} passed") - + logger.info( + f"✓ None task properly rejected for {agent_type}" + ) + + logger.success( + f"✓ All run method tests for {agent_type} passed" + ) + except Exception as e: - logger.error(f"✗ run method for {agent_type} test failed: {e}") + logger.error( + f"✗ run method for {agent_type} test failed: {e}" + ) raise - + logger.success("✓ All run method tests passed") def test_batched_run_method(): """ Test batched_run method with multiple tasks. - + Tests: - Method existence and callability - Parameter validation @@ -418,107 +539,148 @@ def test_batched_run_method(): - Return value validation (list of non-None results) """ logger.info("Starting batched_run method tests...") - + # Test configuration router = ReasoningAgentRouter(swarm_type="reasoning-duo") - + # Test 1: Method existence and callability logger.info("Test 1: Method existence and callability") try: - assert hasattr(router, 'batched_run'), "Router should have batched_run method" - assert callable(router.batched_run), "batched_run method should be callable" - logger.success("✓ Method existence and callability test passed") + assert hasattr( + router, "batched_run" + ), "Router should have batched_run method" + assert callable( + router.batched_run + ), "batched_run method should be callable" + logger.success( + "✓ Method existence and callability test passed" + ) except Exception as e: logger.error(f"✗ Method existence test failed: {e}") raise - + # Test 2: Parameter validation logger.info("Test 2: Parameter validation") try: import inspect + sig = inspect.signature(router.batched_run) - assert 'tasks' in sig.parameters, "batched_run method should have 'tasks' parameter" + assert ( + "tasks" in sig.parameters + ), "batched_run method should have 'tasks' parameter" logger.success("✓ Parameter validation test passed") except Exception as e: logger.error(f"✗ Parameter validation test failed: {e}") raise - + # Test 3: Actual execution with multiple tasks logger.info("Test 3: Actual execution with multiple tasks") test_tasks = [ "What is 2+2?", "What is the capital of France?", - "Explain photosynthesis briefly." + "Explain photosynthesis briefly.", ] - + try: # This will likely fail without API keys, but we test the method call structure results = router.batched_run(test_tasks) - + # If we get here (unlikely without API keys), verify results - assert isinstance(results, list), "batched_run should return a list" - assert len(results) == len(test_tasks), f"Expected {len(test_tasks)} results, got {len(results)}" - + assert isinstance( + results, list + ), "batched_run should return a list" + assert len(results) == len( + test_tasks + ), f"Expected {len(test_tasks)} results, got {len(results)}" + for i, result in enumerate(results): - assert result is not None, f"Result {i+1} should not be None" + assert ( + result is not None + ), f"Result {i+1} should not be None" logger.info(f"✓ Task {i+1} result validation passed") - + logger.success("✓ Actual execution test passed") - + except Exception as run_error: # Expected to fail without API keys, but verify it's a reasonable error error_msg = str(run_error).lower() - if any(keyword in error_msg for keyword in ['api', 'key', 'auth', 'token', 'openai', 'anthropic']): - logger.info("✓ Batched execution failed as expected (no API key)") + if any( + keyword in error_msg + for keyword in [ + "api", + "key", + "auth", + "token", + "openai", + "anthropic", + ] + ): + logger.info( + "✓ Batched execution failed as expected (no API key)" + ) else: # If it's not an API key error, it might be a real issue - logger.warning(f"Batched execution failed with unexpected error: {run_error}") - + logger.warning( + f"Batched execution failed with unexpected error: {run_error}" + ) + # Test 4: Error handling for invalid inputs logger.info("Test 4: Error handling for invalid inputs") - + # Test with empty task list try: results = router.batched_run([]) - assert isinstance(results, list), "Should return empty list for empty input" - assert len(results) == 0, "Empty input should return empty results" + assert isinstance( + results, list + ), "Should return empty list for empty input" + assert ( + len(results) == 0 + ), "Empty input should return empty results" logger.info("✓ Empty task list handling") except Exception as empty_error: - logger.info(f"✓ Empty task list properly handled: {empty_error}") - + logger.info( + f"✓ Empty task list properly handled: {empty_error}" + ) + # Test with None tasks try: results = router.batched_run(None) logger.info("✓ None tasks handling") except Exception as none_error: logger.info(f"✓ None tasks properly rejected: {none_error}") - + logger.success("✓ All batched_run method tests passed") def test_error_handling(): """ Test error handling for various error conditions. - + Tests: - Initialization errors - Execution errors - Invalid configurations """ logger.info("Starting error handling tests...") - + # Test 1: Invalid swarm type in select_swarm logger.info("Test 1: Invalid swarm type error handling") try: router = ReasoningAgentRouter(swarm_type="invalid_type") router.select_swarm() - assert False, "Should have raised ReasoningAgentInitializationError" + assert ( + False + ), "Should have raised ReasoningAgentInitializationError" except ReasoningAgentInitializationError: - logger.success("✓ Invalid swarm type error handling test passed") + logger.success( + "✓ Invalid swarm type error handling test passed" + ) except Exception as e: - logger.error(f"✗ Invalid swarm type error handling test failed: {e}") + logger.error( + f"✗ Invalid swarm type error handling test failed: {e}" + ) raise - + # Test 2: Agent factory error handling logger.info("Test 2: Agent factory error handling") try: @@ -526,52 +688,54 @@ def test_error_handling(): router = ReasoningAgentRouter(swarm_type="reasoning-duo") # This should work without errors agent = router._create_reasoning_duo() - assert agent is not None, "Agent should be created successfully" + assert ( + agent is not None + ), "Agent should be created successfully" logger.success("✓ Agent factory error handling test passed") except Exception as e: - logger.error(f"✗ Agent factory error handling test failed: {e}") + logger.error( + f"✗ Agent factory error handling test failed: {e}" + ) raise - + logger.success("✓ All error handling tests passed") def test_output_types(): """ Test different output types configuration. - + Tests: - Various OutputType configurations - Output type validation """ logger.info("Starting output types tests...") - - output_types = [ - "dict-all-except-first", - "dict", - "string", - "list" - ] - + + output_types = ["dict-all-except-first", "dict", "string", "list"] + for output_type in output_types: logger.info(f"Test: Output type {output_type}") try: router = ReasoningAgentRouter( - swarm_type="reasoning-duo", - output_type=output_type + swarm_type="reasoning-duo", output_type=output_type ) - assert router.output_type == output_type, f"Expected {output_type}, got {router.output_type}" + assert ( + router.output_type == output_type + ), f"Expected {output_type}, got {router.output_type}" logger.success(f"✓ Output type {output_type} test passed") except Exception as e: - logger.error(f"✗ Output type {output_type} test failed: {e}") + logger.error( + f"✗ Output type {output_type} test failed: {e}" + ) raise - + logger.success("✓ All output types tests passed") def test_agent_configurations(): """ Test various agent-specific configurations. - + Tests: - Different num_samples values - Different max_loops values @@ -579,66 +743,76 @@ def test_agent_configurations(): - Different num_knowledge_items values """ logger.info("Starting agent configurations tests...") - + # Test 1: num_samples configuration logger.info("Test 1: num_samples configuration") try: router = ReasoningAgentRouter( - swarm_type="self-consistency", - num_samples=5 + swarm_type="self-consistency", num_samples=5 ) - assert router.num_samples == 5, f"Expected 5, got {router.num_samples}" + assert ( + router.num_samples == 5 + ), f"Expected 5, got {router.num_samples}" logger.success("✓ num_samples configuration test passed") except Exception as e: logger.error(f"✗ num_samples configuration test failed: {e}") raise - + # Test 2: max_loops configuration logger.info("Test 2: max_loops configuration") try: router = ReasoningAgentRouter( - swarm_type="reasoning-duo", - max_loops=10 + swarm_type="reasoning-duo", max_loops=10 ) - assert router.max_loops == 10, f"Expected 10, got {router.max_loops}" + assert ( + router.max_loops == 10 + ), f"Expected 10, got {router.max_loops}" logger.success("✓ max_loops configuration test passed") except Exception as e: logger.error(f"✗ max_loops configuration test failed: {e}") raise - + # Test 3: memory_capacity configuration logger.info("Test 3: memory_capacity configuration") try: router = ReasoningAgentRouter( - swarm_type="ReflexionAgent", - memory_capacity=50 + swarm_type="ReflexionAgent", memory_capacity=50 ) - assert router.memory_capacity == 50, f"Expected 50, got {router.memory_capacity}" + assert ( + router.memory_capacity == 50 + ), f"Expected 50, got {router.memory_capacity}" logger.success("✓ memory_capacity configuration test passed") except Exception as e: - logger.error(f"✗ memory_capacity configuration test failed: {e}") + logger.error( + f"✗ memory_capacity configuration test failed: {e}" + ) raise - + # Test 4: num_knowledge_items configuration logger.info("Test 4: num_knowledge_items configuration") try: router = ReasoningAgentRouter( - swarm_type="GKPAgent", - num_knowledge_items=15 + swarm_type="GKPAgent", num_knowledge_items=15 + ) + assert ( + router.num_knowledge_items == 15 + ), f"Expected 15, got {router.num_knowledge_items}" + logger.success( + "✓ num_knowledge_items configuration test passed" ) - assert router.num_knowledge_items == 15, f"Expected 15, got {router.num_knowledge_items}" - logger.success("✓ num_knowledge_items configuration test passed") except Exception as e: - logger.error(f"✗ num_knowledge_items configuration test failed: {e}") + logger.error( + f"✗ num_knowledge_items configuration test failed: {e}" + ) raise - + logger.success("✓ All agent configurations tests passed") def test_run_method_execution(): """ Comprehensive test for the run method - the core functionality of ReasoningAgentRouter. - + This test focuses specifically on testing the run(self, task) method with: - Actual method execution - Return value validation (non-None) @@ -647,21 +821,23 @@ def test_run_method_execution(): - Error handling - Method signature validation """ - logger.info("Starting comprehensive run method execution tests...") - + logger.info( + "Starting comprehensive run method execution tests..." + ) + # Test all supported agent types agent_types = [ "reasoning-duo", - "reasoning-agent", + "reasoning-agent", "self-consistency", "consistency-agent", "ire", "ire-agent", "ReflexionAgent", "GKPAgent", - "AgentJudge" + "AgentJudge", ] - + # Test tasks of different types and complexities test_tasks = [ "What is 2+2?", @@ -669,138 +845,238 @@ def test_run_method_execution(): "List three benefits of renewable energy.", "What is the capital of France?", "Solve: 15 * 8 = ?", - "Define artificial intelligence briefly." + "Define artificial intelligence briefly.", ] - + for agent_type in agent_types: logger.info(f"\n{'='*50}") logger.info(f"Testing run method for: {agent_type}") logger.info(f"{'='*50}") - + try: # Create router with appropriate configuration router = ReasoningAgentRouter( swarm_type=agent_type, max_loops=1, - num_samples=2 if agent_type in ["self-consistency", "consistency-agent"] else 1 + num_samples=( + 2 + if agent_type + in ["self-consistency", "consistency-agent"] + else 1 + ), ) - + # Test 1: Method existence and callability - logger.info(f"Test 1: Method existence and callability for {agent_type}") - assert hasattr(router, 'run'), f"Router should have run method for {agent_type}" - assert callable(router.run), f"run method should be callable for {agent_type}" - logger.success(f"✓ Method exists and is callable for {agent_type}") - + logger.info( + f"Test 1: Method existence and callability for {agent_type}" + ) + assert hasattr( + router, "run" + ), f"Router should have run method for {agent_type}" + assert callable( + router.run + ), f"run method should be callable for {agent_type}" + logger.success( + f"✓ Method exists and is callable for {agent_type}" + ) + # Test 2: Method signature validation - logger.info(f"Test 2: Method signature validation for {agent_type}") + logger.info( + f"Test 2: Method signature validation for {agent_type}" + ) import inspect + sig = inspect.signature(router.run) params = list(sig.parameters.keys()) - assert 'task' in params, f"run method should have 'task' parameter for {agent_type}" - assert len(params) >= 1, f"run method should have at least one parameter for {agent_type}" - logger.success(f"✓ Method signature valid for {agent_type}: {params}") - + assert ( + "task" in params + ), f"run method should have 'task' parameter for {agent_type}" + assert ( + len(params) >= 1 + ), f"run method should have at least one parameter for {agent_type}" + logger.success( + f"✓ Method signature valid for {agent_type}: {params}" + ) + # Test 3: Actual execution with multiple tasks - logger.info(f"Test 3: Actual execution with multiple tasks for {agent_type}") + logger.info( + f"Test 3: Actual execution with multiple tasks for {agent_type}" + ) successful_executions = 0 total_executions = 0 - + for i, task in enumerate(test_tasks): total_executions += 1 - logger.info(f" Executing task {i+1}/{len(test_tasks)}: '{task[:50]}{'...' if len(task) > 50 else ''}'") - + logger.info( + f" Executing task {i+1}/{len(test_tasks)}: '{task[:50]}{'...' if len(task) > 50 else ''}'" + ) + try: # Execute the run method result = router.run(task) - + # Validate the result if result is not None: - assert result is not None, f"Result should not be None for task {i+1} with {agent_type}" - logger.success(f" ✓ Task {i+1} executed successfully - Result type: {type(result)}") + assert ( + result is not None + ), f"Result should not be None for task {i+1} with {agent_type}" + logger.success( + f" ✓ Task {i+1} executed successfully - Result type: {type(result)}" + ) successful_executions += 1 - + # Additional validation based on result type if isinstance(result, str): - assert len(result) > 0, f"String result should not be empty for task {i+1}" - logger.info(f" ✓ String result length: {len(result)} characters") + assert ( + len(result) > 0 + ), f"String result should not be empty for task {i+1}" + logger.info( + f" ✓ String result length: {len(result)} characters" + ) elif isinstance(result, dict): - assert len(result) > 0, f"Dict result should not be empty for task {i+1}" - logger.info(f" ✓ Dict result keys: {list(result.keys())}") + assert ( + len(result) > 0 + ), f"Dict result should not be empty for task {i+1}" + logger.info( + f" ✓ Dict result keys: {list(result.keys())}" + ) elif isinstance(result, list): - logger.info(f" ✓ List result length: {len(result)}") + logger.info( + f" ✓ List result length: {len(result)}" + ) else: - logger.info(f" ✓ Result type: {type(result)}") + logger.info( + f" ✓ Result type: {type(result)}" + ) else: - logger.warning(f" ⚠ Task {i+1} returned None (might be expected without API keys)") - + logger.warning( + f" ⚠ Task {i+1} returned None (might be expected without API keys)" + ) + except Exception as exec_error: # Analyze the error to determine if it's expected error_msg = str(exec_error).lower() - expected_keywords = ['api', 'key', 'auth', 'token', 'openai', 'anthropic', 'rate', 'limit', 'quota', 'billing'] - - if any(keyword in error_msg for keyword in expected_keywords): - logger.info(f" ✓ Task {i+1} failed as expected (no API key) for {agent_type}") + expected_keywords = [ + "api", + "key", + "auth", + "token", + "openai", + "anthropic", + "rate", + "limit", + "quota", + "billing", + ] + + if any( + keyword in error_msg + for keyword in expected_keywords + ): + logger.info( + f" ✓ Task {i+1} failed as expected (no API key) for {agent_type}" + ) else: # Log unexpected errors for investigation - logger.warning(f" ⚠ Task {i+1} failed with unexpected error for {agent_type}: {exec_error}") - + logger.warning( + f" ⚠ Task {i+1} failed with unexpected error for {agent_type}: {exec_error}" + ) + # Test 4: Execution statistics - logger.info(f"Test 4: Execution statistics for {agent_type}") - success_rate = (successful_executions / total_executions) * 100 if total_executions > 0 else 0 - logger.info(f" Execution success rate: {success_rate:.1f}% ({successful_executions}/{total_executions})") - + logger.info( + f"Test 4: Execution statistics for {agent_type}" + ) + success_rate = ( + (successful_executions / total_executions) * 100 + if total_executions > 0 + else 0 + ) + logger.info( + f" Execution success rate: {success_rate:.1f}% ({successful_executions}/{total_executions})" + ) + if successful_executions > 0: - logger.success(f"✓ {successful_executions} tasks executed successfully for {agent_type}") + logger.success( + f"✓ {successful_executions} tasks executed successfully for {agent_type}" + ) else: - logger.info(f"ℹ No tasks executed successfully for {agent_type} (expected without API keys)") - + logger.info( + f"ℹ No tasks executed successfully for {agent_type} (expected without API keys)" + ) + # Test 5: Error handling for edge cases - logger.info(f"Test 5: Error handling for edge cases with {agent_type}") - + logger.info( + f"Test 5: Error handling for edge cases with {agent_type}" + ) + # Test with empty string try: result = router.run("") if result is not None: - logger.info(f" ✓ Empty string handled gracefully for {agent_type}") + logger.info( + f" ✓ Empty string handled gracefully for {agent_type}" + ) else: - logger.info(f" ✓ Empty string returned None (acceptable) for {agent_type}") + logger.info( + f" ✓ Empty string returned None (acceptable) for {agent_type}" + ) except Exception: - logger.info(f" ✓ Empty string properly rejected for {agent_type}") - + logger.info( + f" ✓ Empty string properly rejected for {agent_type}" + ) + # Test with None try: result = router.run(None) if result is not None: - logger.info(f" ✓ None handled gracefully for {agent_type}") + logger.info( + f" ✓ None handled gracefully for {agent_type}" + ) else: - logger.info(f" ✓ None returned None (acceptable) for {agent_type}") + logger.info( + f" ✓ None returned None (acceptable) for {agent_type}" + ) except Exception: - logger.info(f" ✓ None properly rejected for {agent_type}") - + logger.info( + f" ✓ None properly rejected for {agent_type}" + ) + # Test with very long task long_task = "Explain " + "artificial intelligence " * 100 try: result = router.run(long_task) if result is not None: - logger.info(f" ✓ Long task handled for {agent_type}") + logger.info( + f" ✓ Long task handled for {agent_type}" + ) else: - logger.info(f" ✓ Long task returned None (acceptable) for {agent_type}") + logger.info( + f" ✓ Long task returned None (acceptable) for {agent_type}" + ) except Exception: - logger.info(f" ✓ Long task properly handled for {agent_type}") - - logger.success(f"✓ All run method tests completed for {agent_type}") - + logger.info( + f" ✓ Long task properly handled for {agent_type}" + ) + + logger.success( + f"✓ All run method tests completed for {agent_type}" + ) + except Exception as e: - logger.error(f"✗ Run method test failed for {agent_type}: {e}") + logger.error( + f"✗ Run method test failed for {agent_type}: {e}" + ) raise - - logger.success("✓ All comprehensive run method execution tests passed") + + logger.success( + "✓ All comprehensive run method execution tests passed" + ) def test_run_method_core_functionality(): """ Core functionality test for the run method - the most important test. - + This test specifically focuses on: 1. Testing run(self, task) with actual execution 2. Validating that results are not None @@ -809,108 +1085,190 @@ def test_run_method_core_functionality(): 5. Return value type validation """ logger.info("Starting CORE run method functionality tests...") - logger.info("This is the most important test - validating run(self, task) execution") - + logger.info( + "This is the most important test - validating run(self, task) execution" + ) + # Test configurations for different agent types test_configs = [ - {"swarm_type": "reasoning-duo", "max_loops": 1, "description": "Dual agent collaboration"}, - {"swarm_type": "self-consistency", "num_samples": 3, "description": "Multiple independent solutions"}, - {"swarm_type": "ire", "max_loops": 1, "description": "Iterative reflective expansion"}, - {"swarm_type": "ReflexionAgent", "max_loops": 1, "description": "Self-reflection agent"}, - {"swarm_type": "GKPAgent", "description": "Generated knowledge prompting"}, - {"swarm_type": "AgentJudge", "max_loops": 1, "description": "Agent evaluation"} + { + "swarm_type": "reasoning-duo", + "max_loops": 1, + "description": "Dual agent collaboration", + }, + { + "swarm_type": "self-consistency", + "num_samples": 3, + "description": "Multiple independent solutions", + }, + { + "swarm_type": "ire", + "max_loops": 1, + "description": "Iterative reflective expansion", + }, + { + "swarm_type": "ReflexionAgent", + "max_loops": 1, + "description": "Self-reflection agent", + }, + { + "swarm_type": "GKPAgent", + "description": "Generated knowledge prompting", + }, + { + "swarm_type": "AgentJudge", + "max_loops": 1, + "description": "Agent evaluation", + }, ] - + # Core test tasks core_tasks = [ "What is 2+2?", "Explain the water cycle in one sentence.", "What is the capital of Japan?", "List two benefits of exercise.", - "Solve: 12 * 7 = ?" + "Solve: 12 * 7 = ?", ] - + total_tests = 0 successful_tests = 0 failed_tests = 0 - + for config in test_configs: agent_type = config["swarm_type"] description = config["description"] - + logger.info(f"\n{'='*60}") logger.info(f"Testing {agent_type} - {description}") logger.info(f"{'='*60}") - + try: # Create router router = ReasoningAgentRouter(**config) - + # Test each core task for i, task in enumerate(core_tasks): total_tests += 1 - logger.info(f"\nTask {i+1}/{len(core_tasks)}: '{task}'") + logger.info( + f"\nTask {i+1}/{len(core_tasks)}: '{task}'" + ) logger.info(f"Agent: {agent_type}") - + try: # Execute the run method - THIS IS THE CORE TEST result = router.run(task) - + # CRITICAL VALIDATION: Result must not be None if result is not None: successful_tests += 1 - logger.success("✓ SUCCESS: Task executed and returned non-None result") + logger.success( + "✓ SUCCESS: Task executed and returned non-None result" + ) logger.info(f" Result type: {type(result)}") - + # Validate result content based on type if isinstance(result, str): - assert len(result) > 0, "String result should not be empty" - logger.info(f" String length: {len(result)} characters") - logger.info(f" First 100 chars: {result[:100]}{'...' if len(result) > 100 else ''}") + assert ( + len(result) > 0 + ), "String result should not be empty" + logger.info( + f" String length: {len(result)} characters" + ) + logger.info( + f" First 100 chars: {result[:100]}{'...' if len(result) > 100 else ''}" + ) elif isinstance(result, dict): - assert len(result) > 0, "Dict result should not be empty" - logger.info(f" Dict keys: {list(result.keys())}") - logger.info(f" Dict size: {len(result)} items") + assert ( + len(result) > 0 + ), "Dict result should not be empty" + logger.info( + f" Dict keys: {list(result.keys())}" + ) + logger.info( + f" Dict size: {len(result)} items" + ) elif isinstance(result, list): - logger.info(f" List length: {len(result)} items") + logger.info( + f" List length: {len(result)} items" + ) else: - logger.info(f" Result value: {str(result)[:100]}{'...' if len(str(result)) > 100 else ''}") - + logger.info( + f" Result value: {str(result)[:100]}{'...' if len(str(result)) > 100 else ''}" + ) + # Additional validation: result should be meaningful - if isinstance(result, str) and len(result.strip()) == 0: - logger.warning(" ⚠ Result is empty string") - elif isinstance(result, dict) and len(result) == 0: - logger.warning(" ⚠ Result is empty dictionary") - elif isinstance(result, list) and len(result) == 0: + if ( + isinstance(result, str) + and len(result.strip()) == 0 + ): + logger.warning( + " ⚠ Result is empty string" + ) + elif ( + isinstance(result, dict) + and len(result) == 0 + ): + logger.warning( + " ⚠ Result is empty dictionary" + ) + elif ( + isinstance(result, list) + and len(result) == 0 + ): logger.warning(" ⚠ Result is empty list") else: - logger.success(" ✓ Result appears to be meaningful content") - + logger.success( + " ✓ Result appears to be meaningful content" + ) + else: failed_tests += 1 - logger.error("✗ FAILURE: Task returned None result") - logger.error(" This indicates the run method is not working properly") - + logger.error( + "✗ FAILURE: Task returned None result" + ) + logger.error( + " This indicates the run method is not working properly" + ) + except Exception as exec_error: failed_tests += 1 error_msg = str(exec_error) - logger.error("✗ FAILURE: Task execution failed with error") + logger.error( + "✗ FAILURE: Task execution failed with error" + ) logger.error(f" Error: {error_msg}") - + # Check if it's an expected API key error - if any(keyword in error_msg.lower() for keyword in ['api', 'key', 'auth', 'token', 'openai', 'anthropic']): - logger.info(" ℹ This appears to be an API key error (expected without credentials)") + if any( + keyword in error_msg.lower() + for keyword in [ + "api", + "key", + "auth", + "token", + "openai", + "anthropic", + ] + ): + logger.info( + " ℹ This appears to be an API key error (expected without credentials)" + ) else: - logger.warning(" ⚠ This might be an unexpected error that needs investigation") - + logger.warning( + " ⚠ This might be an unexpected error that needs investigation" + ) + logger.info(f"\n{agent_type} Summary:") logger.info(f" Total tasks tested: {len(core_tasks)}") - + except Exception as e: - logger.error(f"✗ FAILURE: Router creation failed for {agent_type}: {e}") + logger.error( + f"✗ FAILURE: Router creation failed for {agent_type}: {e}" + ) failed_tests += len(core_tasks) total_tests += len(core_tasks) - + # Final summary logger.info(f"\n{'='*60}") logger.info("CORE RUN METHOD TEST SUMMARY") @@ -918,22 +1276,28 @@ def test_run_method_core_functionality(): logger.info(f"Total tests executed: {total_tests}") logger.info(f"Successful executions: {successful_tests}") logger.info(f"Failed executions: {failed_tests}") - + if total_tests > 0: success_rate = (successful_tests / total_tests) * 100 logger.info(f"Success rate: {success_rate:.1f}%") - + if success_rate >= 50: - logger.success(f"✓ CORE TEST PASSED: {success_rate:.1f}% success rate is acceptable") + logger.success( + f"✓ CORE TEST PASSED: {success_rate:.1f}% success rate is acceptable" + ) elif success_rate > 0: - logger.warning(f"⚠ CORE TEST PARTIAL: {success_rate:.1f}% success rate - some functionality working") + logger.warning( + f"⚠ CORE TEST PARTIAL: {success_rate:.1f}% success rate - some functionality working" + ) else: - logger.error("✗ CORE TEST FAILED: 0% success rate - run method not working") + logger.error( + "✗ CORE TEST FAILED: 0% success rate - run method not working" + ) else: logger.error("✗ CORE TEST FAILED: No tests were executed") - + logger.info(f"{'='*60}") - + # The test passes if we have some successful executions or if failures are due to API key issues if successful_tests > 0: logger.success("✓ Core run method functionality test PASSED") @@ -946,17 +1310,17 @@ def test_run_method_core_functionality(): def run_all_tests(): """ Run all unit tests for ReasoningAgentRouter. - + This function executes all test functions and provides a summary. """ logger.info("=" * 60) logger.info("Starting ReasoningAgentRouter Unit Tests") logger.info("=" * 60) - + test_functions = [ test_run_method_core_functionality, # Most important test - run method execution - test_run_method_execution, # Comprehensive run method tests - test_run_method, # Basic run method structure tests + test_run_method_execution, # Comprehensive run method tests + test_run_method, # Basic run method structure tests test_router_initialization, test_reliability_check, test_agent_factories, @@ -964,26 +1328,30 @@ def run_all_tests(): test_batched_run_method, test_error_handling, test_output_types, - test_agent_configurations + test_agent_configurations, ] - + passed_tests = 0 total_tests = len(test_functions) - + for test_func in test_functions: try: logger.info(f"\nRunning {test_func.__name__}...") test_func() passed_tests += 1 - logger.success(f"✓ {test_func.__name__} completed successfully") + logger.success( + f"✓ {test_func.__name__} completed successfully" + ) except Exception as e: logger.error(f"✗ {test_func.__name__} failed: {e}") raise - + logger.info("\n" + "=" * 60) - logger.info(f"Test Summary: {passed_tests}/{total_tests} tests passed") + logger.info( + f"Test Summary: {passed_tests}/{total_tests} tests passed" + ) logger.info("=" * 60) - + if passed_tests == total_tests: logger.success("🎉 All tests passed successfully!") return True @@ -995,60 +1363,70 @@ def run_all_tests(): def run_core_tests_only(): """ Run only the core run method tests - the most important functionality. - + This function focuses specifically on testing the run(self, task) method which is the core functionality of ReasoningAgentRouter. """ logger.info("=" * 60) logger.info("Running CORE RUN METHOD TESTS ONLY") logger.info("=" * 60) - + core_test_functions = [ test_run_method_core_functionality, # Most important test - test_run_method_execution, # Comprehensive run method tests - test_run_method, # Basic run method structure tests + test_run_method_execution, # Comprehensive run method tests + test_run_method, # Basic run method structure tests ] - + passed_tests = 0 total_tests = len(core_test_functions) - + for test_func in core_test_functions: try: logger.info(f"\nRunning {test_func.__name__}...") result = test_func() if result is not False: # Allow True or None passed_tests += 1 - logger.success(f"✓ {test_func.__name__} completed successfully") + logger.success( + f"✓ {test_func.__name__} completed successfully" + ) else: logger.error(f"✗ {test_func.__name__} failed") except Exception as e: logger.error(f"✗ {test_func.__name__} failed: {e}") - + logger.info("\n" + "=" * 60) - logger.info(f"CORE TEST SUMMARY: {passed_tests}/{total_tests} tests passed") + logger.info( + f"CORE TEST SUMMARY: {passed_tests}/{total_tests} tests passed" + ) logger.info("=" * 60) - + if passed_tests == total_tests: - logger.success("🎉 All core run method tests passed successfully!") + logger.success( + "🎉 All core run method tests passed successfully!" + ) return True else: - logger.error(f"❌ {total_tests - passed_tests} core tests failed") + logger.error( + f"❌ {total_tests - passed_tests} core tests failed" + ) return False if __name__ == "__main__": """ Main execution block for running the unit tests. - + This block runs all tests when the script is executed directly. Use run_core_tests_only() for focused testing of the run method. """ import sys - + try: success = run_all_tests() if success: - logger.info("All ReasoningAgentRouter unit tests completed successfully!") + logger.info( + "All ReasoningAgentRouter unit tests completed successfully!" + ) sys.exit(0) else: logger.error("Some tests failed!") diff --git a/tests/structs/test_swarm_router.py b/tests/structs/test_swarm_router.py index 2df7687d..098ce3d5 100644 --- a/tests/structs/test_swarm_router.py +++ b/tests/structs/test_swarm_router.py @@ -127,7 +127,6 @@ def test_initialization_with_agent_rearrange_flow(sample_agents): assert router.rearrange_flow == flow - def test_invalid_swarm_type(): """Test error when invalid swarm type is provided.""" with pytest.raises(ValueError): @@ -639,7 +638,6 @@ def test_handle_rules(sample_agents): ) - def test_update_system_prompt_for_agent_in_swarm(sample_agents): """Test update_system_prompt_for_agent_in_swarm method.""" router = SwarmRouter( @@ -893,6 +891,5 @@ def test_swarm_router_config_model(): assert config.multi_agent_collab_prompt is True - if __name__ == "__main__": pytest.main([__file__, "-v"])