import os try: import pytest except ImportError: pytest = None from loguru import logger try: from swarms.structs.agent_registry import AgentRegistry from swarms.structs.agent import Agent except (ImportError, ModuleNotFoundError) as e: import importlib.util _current_dir = os.path.dirname(os.path.abspath(__file__)) agent_registry_path = os.path.join(_current_dir, "..", "..", "swarms", "structs", "agent_registry.py") agent_path = os.path.join(_current_dir, "..", "..", "swarms", "structs", "agent.py") if os.path.exists(agent_registry_path): spec = importlib.util.spec_from_file_location("agent_registry", agent_registry_path) agent_registry_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(agent_registry_module) AgentRegistry = agent_registry_module.AgentRegistry if os.path.exists(agent_path): spec = importlib.util.spec_from_file_location("agent", agent_path) agent_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(agent_module) Agent = agent_module.Agent else: raise ImportError(f"Could not find required modules") from e logger.remove() logger.add(lambda msg: None, level="ERROR") def test_agent_registry_initialization(): """Test AgentRegistry initialization.""" try: registry = AgentRegistry() assert registry is not None, "AgentRegistry should not be None" assert registry.name == "Agent Registry", "Default name should be set" assert registry.description == "A registry for managing agents.", "Default description should be set" assert isinstance(registry.agents, dict), "Agents should be a dictionary" assert len(registry.agents) == 0, "Initial registry should be empty" registry2 = AgentRegistry( name="Test Registry", description="Test description", return_json=False, auto_save=True ) assert registry2.name == "Test Registry", "Custom name should be set" assert registry2.description == "Test description", "Custom description should be set" assert registry2.return_json is False, "return_json should be False" assert registry2.auto_save is True, "auto_save should be True" logger.info("✓ AgentRegistry initialization test passed") except Exception as e: logger.error(f"Error in test_agent_registry_initialization: {str(e)}") raise def test_agent_registry_add_single_agent(): """Test adding a single agent to the registry.""" try: registry = AgentRegistry() agent = Agent( agent_name="Test-Agent-1", agent_description="Test agent for registry", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent) assert len(registry.agents) == 1, "Registry should have one agent" assert "Test-Agent-1" in registry.agents, "Agent should be in registry" assert registry.agents["Test-Agent-1"] is not None, "Agent object should not be None" assert registry.agents["Test-Agent-1"].agent_name == "Test-Agent-1", "Agent name should match" logger.info("✓ Add single agent test passed") except Exception as e: logger.error(f"Error in test_agent_registry_add_single_agent: {str(e)}") raise def test_agent_registry_add_multiple_agents(): """Test adding multiple agents to the registry.""" try: registry = AgentRegistry() agent1 = Agent( agent_name="Test-Agent-1", agent_description="First test agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="Test-Agent-2", agent_description="Second test agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent3 = Agent( agent_name="Test-Agent-3", agent_description="Third test agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add_many([agent1, agent2, agent3]) assert len(registry.agents) == 3, "Registry should have three agents" assert "Test-Agent-1" in registry.agents, "Agent 1 should be in registry" assert "Test-Agent-2" in registry.agents, "Agent 2 should be in registry" assert "Test-Agent-3" in registry.agents, "Agent 3 should be in registry" for agent_name in ["Test-Agent-1", "Test-Agent-2", "Test-Agent-3"]: assert registry.agents[agent_name] is not None, f"{agent_name} should not be None" logger.info("✓ Add multiple agents test passed") except Exception as e: logger.error(f"Error in test_agent_registry_add_multiple_agents: {str(e)}") raise def test_agent_registry_get_agent(): """Test retrieving an agent from the registry.""" try: registry = AgentRegistry() agent = Agent( agent_name="Retrievable-Agent", agent_description="Agent for retrieval testing", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent) retrieved_agent = registry.get("Retrievable-Agent") assert retrieved_agent is not None, "Retrieved agent should not be None" assert retrieved_agent.agent_name == "Retrievable-Agent", "Agent name should match" assert hasattr(retrieved_agent, "run"), "Agent should have run method" assert retrieved_agent is agent, "Should return the same agent object" logger.info("✓ Get agent test passed") except Exception as e: logger.error(f"Error in test_agent_registry_get_agent: {str(e)}") raise def test_agent_registry_delete_agent(): """Test deleting an agent from the registry.""" try: registry = AgentRegistry() agent1 = Agent( agent_name="Agent-To-Delete", agent_description="Agent that will be deleted", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="Agent-To-Keep", agent_description="Agent that will remain", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent1) registry.add(agent2) assert len(registry.agents) == 2, "Registry should have two agents" registry.delete("Agent-To-Delete") assert len(registry.agents) == 1, "Registry should have one agent after deletion" assert "Agent-To-Delete" not in registry.agents, "Deleted agent should not be in registry" assert "Agent-To-Keep" in registry.agents, "Other agent should still be in registry" logger.info("✓ Delete agent test passed") except Exception as e: logger.error(f"Error in test_agent_registry_delete_agent: {str(e)}") raise def test_agent_registry_update_agent(): """Test updating an agent in the registry.""" try: registry = AgentRegistry() original_agent = Agent( agent_name="Agent-To-Update", agent_description="Original description", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(original_agent) updated_agent = Agent( agent_name="Agent-To-Update", agent_description="Updated description", model_name="gpt-4o-mini", max_loops=2, verbose=True, print_on=False, streaming_on=True, ) registry.update_agent("Agent-To-Update", updated_agent) retrieved_agent = registry.get("Agent-To-Update") assert retrieved_agent is not None, "Updated agent should not be None" assert retrieved_agent is updated_agent, "Should return the updated agent" assert retrieved_agent.max_loops == 2, "Max loops should be updated" assert retrieved_agent.verbose is True, "Verbose should be updated" logger.info("✓ Update agent test passed") except Exception as e: logger.error(f"Error in test_agent_registry_update_agent: {str(e)}") raise def test_agent_registry_list_agents(): """Test listing all agent names in the registry.""" try: registry = AgentRegistry() agent1 = Agent( agent_name="List-Agent-1", agent_description="First agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="List-Agent-2", agent_description="Second agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent1) registry.add(agent2) agent_names = registry.list_agents() assert agent_names is not None, "Agent names list should not be None" assert isinstance(agent_names, list), "Should return a list" assert len(agent_names) == 2, "Should have two agent names" assert "List-Agent-1" in agent_names, "First agent name should be in list" assert "List-Agent-2" in agent_names, "Second agent name should be in list" logger.info("✓ List agents test passed") except Exception as e: logger.error(f"Error in test_agent_registry_list_agents: {str(e)}") raise def test_agent_registry_return_all_agents(): """Test returning all agents from the registry.""" try: registry = AgentRegistry() agent1 = Agent( agent_name="Return-Agent-1", agent_description="First agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="Return-Agent-2", agent_description="Second agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent1) registry.add(agent2) all_agents = registry.return_all_agents() assert all_agents is not None, "All agents list should not be None" assert isinstance(all_agents, list), "Should return a list" assert len(all_agents) == 2, "Should have two agents" for agent in all_agents: assert agent is not None, "Each agent should not be None" assert hasattr(agent, "agent_name"), "Agent should have agent_name" assert hasattr(agent, "run"), "Agent should have run method" logger.info("✓ Return all agents test passed") except Exception as e: logger.error(f"Error in test_agent_registry_return_all_agents: {str(e)}") raise def test_agent_registry_query_with_condition(): """Test querying agents with a condition.""" try: registry = AgentRegistry() agent1 = Agent( agent_name="Query-Agent-1", agent_description="Agent with max_loops=1", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="Query-Agent-2", agent_description="Agent with max_loops=2", model_name="gpt-4o-mini", max_loops=2, verbose=False, print_on=False, streaming_on=True, ) agent3 = Agent( agent_name="Query-Agent-3", agent_description="Agent with max_loops=1", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent1) registry.add(agent2) registry.add(agent3) def condition_max_loops_1(agent): return agent.max_loops == 1 filtered_agents = registry.query(condition_max_loops_1) assert filtered_agents is not None, "Filtered agents should not be None" assert isinstance(filtered_agents, list), "Should return a list" assert len(filtered_agents) == 2, "Should have two agents with max_loops=1" for agent in filtered_agents: assert agent.max_loops == 1, "All filtered agents should have max_loops=1" logger.info("✓ Query with condition test passed") except Exception as e: logger.error(f"Error in test_agent_registry_query_with_condition: {str(e)}") raise def test_agent_registry_query_without_condition(): """Test querying all agents without a condition.""" try: registry = AgentRegistry() agent1 = Agent( agent_name="Query-All-Agent-1", agent_description="First agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="Query-All-Agent-2", agent_description="Second agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent1) registry.add(agent2) all_agents = registry.query() assert all_agents is not None, "All agents should not be None" assert isinstance(all_agents, list), "Should return a list" assert len(all_agents) == 2, "Should return all agents" logger.info("✓ Query without condition test passed") except Exception as e: logger.error(f"Error in test_agent_registry_query_without_condition: {str(e)}") raise def test_agent_registry_find_agent_by_name(): """Test finding an agent by name.""" try: registry = AgentRegistry() agent = Agent( agent_name="Findable-Agent", agent_description="Agent to find", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent) found_agent = registry.find_agent_by_name("Findable-Agent") assert found_agent is not None, "Found agent should not be None" assert found_agent.agent_name == "Findable-Agent", "Agent name should match" assert hasattr(found_agent, "run"), "Agent should have run method" logger.info("✓ Find agent by name test passed") except Exception as e: logger.error(f"Error in test_agent_registry_find_agent_by_name: {str(e)}") raise def test_agent_registry_find_agent_by_id(): """Test finding an agent by ID.""" try: registry = AgentRegistry() agent = Agent( agent_name="ID-Agent", agent_description="Agent with ID", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent) agent_id = agent.id found_agent = registry.find_agent_by_id(agent.agent_name) assert found_agent is not None, "Found agent should not be None" assert found_agent.agent_name == "ID-Agent", "Agent name should match" logger.info("✓ Find agent by ID test passed") except Exception as e: logger.error(f"Error in test_agent_registry_find_agent_by_id: {str(e)}") raise def test_agent_registry_agents_to_json(): """Test converting agents to JSON.""" try: registry = AgentRegistry() agent1 = Agent( agent_name="JSON-Agent-1", agent_description="First agent for JSON", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="JSON-Agent-2", agent_description="Second agent for JSON", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent1) registry.add(agent2) json_output = registry.agents_to_json() assert json_output is not None, "JSON output should not be None" assert isinstance(json_output, str), "Should return a string" assert len(json_output) > 0, "JSON should not be empty" assert "JSON-Agent-1" in json_output, "First agent should be in JSON" assert "JSON-Agent-2" in json_output, "Second agent should be in JSON" import json parsed_json = json.loads(json_output) assert isinstance(parsed_json, dict), "Should be valid JSON dict" assert len(parsed_json) == 2, "Should have two agents in JSON" logger.info("✓ Agents to JSON test passed") except Exception as e: logger.error(f"Error in test_agent_registry_agents_to_json: {str(e)}") raise def test_agent_registry_initialization_with_agents(): """Test initializing registry with agents.""" try: agent1 = Agent( agent_name="Init-Agent-1", agent_description="First initial agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="Init-Agent-2", agent_description="Second initial agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry = AgentRegistry(agents=[agent1, agent2]) assert registry is not None, "Registry should not be None" assert len(registry.agents) == 2, "Registry should have two agents" assert "Init-Agent-1" in registry.agents, "First agent should be in registry" assert "Init-Agent-2" in registry.agents, "Second agent should be in registry" logger.info("✓ Initialize with agents test passed") except Exception as e: logger.error(f"Error in test_agent_registry_initialization_with_agents: {str(e)}") raise def test_agent_registry_error_duplicate_agent(): """Test error handling for duplicate agent names.""" try: registry = AgentRegistry() agent1 = Agent( agent_name="Duplicate-Agent", agent_description="First agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="Duplicate-Agent", agent_description="Duplicate agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent1) try: registry.add(agent2) assert False, "Should have raised ValueError for duplicate agent" except ValueError as e: assert "already exists" in str(e).lower(), "Error message should mention duplicate" assert len(registry.agents) == 1, "Registry should still have only one agent" logger.info("✓ Error handling for duplicate agent test passed") except Exception as e: logger.error(f"Error in test_agent_registry_error_duplicate_agent: {str(e)}") raise def test_agent_registry_error_nonexistent_agent(): """Test error handling for nonexistent agent.""" try: registry = AgentRegistry() try: registry.get("Nonexistent-Agent") assert False, "Should have raised KeyError for nonexistent agent" except KeyError as e: assert e is not None, "Should raise KeyError" try: registry.delete("Nonexistent-Agent") assert False, "Should have raised KeyError for nonexistent agent" except KeyError as e: assert e is not None, "Should raise KeyError" logger.info("✓ Error handling for nonexistent agent test passed") except Exception as e: logger.error(f"Error in test_agent_registry_error_nonexistent_agent: {str(e)}") raise def test_agent_registry_retrieved_agents_can_run(): """Test that retrieved agents can actually run tasks.""" try: registry = AgentRegistry() agent = Agent( agent_name="Runnable-Registry-Agent", agent_description="Agent for running tasks", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent) retrieved_agent = registry.get("Runnable-Registry-Agent") assert retrieved_agent is not None, "Retrieved agent should not be None" result = retrieved_agent.run("What is 2 + 2? Answer briefly.") assert result is not None, "Agent run result should not be None" assert isinstance(result, str), "Result should be a string" assert len(result) > 0, "Result should not be empty" logger.info("✓ Retrieved agents can run test passed") except Exception as e: logger.error(f"Error in test_agent_registry_retrieved_agents_can_run: {str(e)}") raise def test_agent_registry_thread_safety(): """Test thread safety of registry operations.""" try: registry = AgentRegistry() agent1 = Agent( agent_name="Thread-Agent-1", agent_description="First thread agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) agent2 = Agent( agent_name="Thread-Agent-2", agent_description="Second thread agent", model_name="gpt-4o-mini", max_loops=1, verbose=False, print_on=False, streaming_on=True, ) registry.add(agent1) registry.add(agent2) agent_names = registry.list_agents() all_agents = registry.return_all_agents() assert agent_names is not None, "Agent names should not be None" assert all_agents is not None, "All agents should not be None" assert len(agent_names) == 2, "Should have two agent names" assert len(all_agents) == 2, "Should have two agents" logger.info("✓ Thread safety test passed") except Exception as e: logger.error(f"Error in test_agent_registry_thread_safety: {str(e)}") raise if __name__ == "__main__": import sys test_dict = { "test_agent_registry_initialization": test_agent_registry_initialization, "test_agent_registry_add_single_agent": test_agent_registry_add_single_agent, "test_agent_registry_add_multiple_agents": test_agent_registry_add_multiple_agents, "test_agent_registry_get_agent": test_agent_registry_get_agent, "test_agent_registry_delete_agent": test_agent_registry_delete_agent, "test_agent_registry_update_agent": test_agent_registry_update_agent, "test_agent_registry_list_agents": test_agent_registry_list_agents, "test_agent_registry_return_all_agents": test_agent_registry_return_all_agents, "test_agent_registry_query_with_condition": test_agent_registry_query_with_condition, "test_agent_registry_query_without_condition": test_agent_registry_query_without_condition, "test_agent_registry_find_agent_by_name": test_agent_registry_find_agent_by_name, "test_agent_registry_find_agent_by_id": test_agent_registry_find_agent_by_id, "test_agent_registry_agents_to_json": test_agent_registry_agents_to_json, "test_agent_registry_initialization_with_agents": test_agent_registry_initialization_with_agents, "test_agent_registry_error_duplicate_agent": test_agent_registry_error_duplicate_agent, "test_agent_registry_error_nonexistent_agent": test_agent_registry_error_nonexistent_agent, "test_agent_registry_retrieved_agents_can_run": test_agent_registry_retrieved_agents_can_run, "test_agent_registry_thread_safety": test_agent_registry_thread_safety, } if len(sys.argv) > 1: requested_tests = [] for test_name in sys.argv[1:]: if test_name in test_dict: requested_tests.append(test_dict[test_name]) elif test_name == "all" or test_name == "--all": requested_tests = list(test_dict.values()) break else: print(f"⚠ Warning: Test '{test_name}' not found.") print(f"Available tests: {', '.join(test_dict.keys())}") sys.exit(1) tests_to_run = requested_tests else: tests_to_run = list(test_dict.values()) if len(tests_to_run) == 1: print(f"Running: {tests_to_run[0].__name__}") else: print(f"Running {len(tests_to_run)} test(s)...") passed = 0 failed = 0 for test_func in tests_to_run: try: print(f"\n{'='*60}") print(f"Running: {test_func.__name__}") print(f"{'='*60}") test_func() print(f"✓ PASSED: {test_func.__name__}") passed += 1 except Exception as e: print(f"✗ FAILED: {test_func.__name__}") print(f" Error: {str(e)}") import traceback traceback.print_exc() failed += 1 print(f"\n{'='*60}") print(f"Test Summary: {passed} passed, {failed} failed") print(f"{'='*60}") if len(sys.argv) == 1: print("\n💡 Tip: Run a specific test with:") print(" python test_agent_registry.py test_agent_registry_initialization") print("\n Or use pytest:") print(" pytest test_agent_registry.py") print(" pytest test_agent_registry.py::test_agent_registry_initialization")