You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/tests/structs/test_agent_registry.py

998 lines
29 KiB

import os
try:
import pytest
except ImportError:
pytest = None
from loguru import logger
try:
from swarms.structs.agent_registry import AgentRegistry
from swarms.structs.agent import Agent
except (ImportError, ModuleNotFoundError) as e:
import importlib.util
_current_dir = os.path.dirname(os.path.abspath(__file__))
agent_registry_path = os.path.join(
_current_dir,
"..",
"..",
"swarms",
"structs",
"agent_registry.py",
)
agent_path = os.path.join(
_current_dir, "..", "..", "swarms", "structs", "agent.py"
)
if os.path.exists(agent_registry_path):
spec = importlib.util.spec_from_file_location(
"agent_registry", agent_registry_path
)
agent_registry_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(agent_registry_module)
AgentRegistry = agent_registry_module.AgentRegistry
if os.path.exists(agent_path):
spec = importlib.util.spec_from_file_location(
"agent", agent_path
)
agent_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(agent_module)
Agent = agent_module.Agent
else:
raise ImportError("Could not find required modules") from e
logger.remove()
logger.add(lambda msg: None, level="ERROR")
def test_agent_registry_initialization():
"""Test AgentRegistry initialization."""
try:
registry = AgentRegistry()
assert (
registry is not None
), "AgentRegistry should not be None"
assert (
registry.name == "Agent Registry"
), "Default name should be set"
assert (
registry.description == "A registry for managing agents."
), "Default description should be set"
assert isinstance(
registry.agents, dict
), "Agents should be a dictionary"
assert (
len(registry.agents) == 0
), "Initial registry should be empty"
registry2 = AgentRegistry(
name="Test Registry",
description="Test description",
return_json=False,
auto_save=True,
)
assert (
registry2.name == "Test Registry"
), "Custom name should be set"
assert (
registry2.description == "Test description"
), "Custom description should be set"
assert (
registry2.return_json is False
), "return_json should be False"
assert registry2.auto_save is True, "auto_save should be True"
logger.info("✓ AgentRegistry initialization test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_initialization: {str(e)}"
)
raise
def test_agent_registry_add_single_agent():
"""Test adding a single agent to the registry."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="Test-Agent-1",
agent_description="Test agent for registry",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
assert (
len(registry.agents) == 1
), "Registry should have one agent"
assert (
"Test-Agent-1" in registry.agents
), "Agent should be in registry"
assert (
registry.agents["Test-Agent-1"] is not None
), "Agent object should not be None"
assert (
registry.agents["Test-Agent-1"].agent_name
== "Test-Agent-1"
), "Agent name should match"
logger.info("✓ Add single agent test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_add_single_agent: {str(e)}"
)
raise
def test_agent_registry_add_multiple_agents():
"""Test adding multiple agents to the registry."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Test-Agent-1",
agent_description="First test agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Test-Agent-2",
agent_description="Second test agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent3 = Agent(
agent_name="Test-Agent-3",
agent_description="Third test agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add_many([agent1, agent2, agent3])
assert (
len(registry.agents) == 3
), "Registry should have three agents"
assert (
"Test-Agent-1" in registry.agents
), "Agent 1 should be in registry"
assert (
"Test-Agent-2" in registry.agents
), "Agent 2 should be in registry"
assert (
"Test-Agent-3" in registry.agents
), "Agent 3 should be in registry"
for agent_name in [
"Test-Agent-1",
"Test-Agent-2",
"Test-Agent-3",
]:
assert (
registry.agents[agent_name] is not None
), f"{agent_name} should not be None"
logger.info("✓ Add multiple agents test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_add_multiple_agents: {str(e)}"
)
raise
def test_agent_registry_get_agent():
"""Test retrieving an agent from the registry."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="Retrievable-Agent",
agent_description="Agent for retrieval testing",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
retrieved_agent = registry.get("Retrievable-Agent")
assert (
retrieved_agent is not None
), "Retrieved agent should not be None"
assert (
retrieved_agent.agent_name == "Retrievable-Agent"
), "Agent name should match"
assert hasattr(
retrieved_agent, "run"
), "Agent should have run method"
assert (
retrieved_agent is agent
), "Should return the same agent object"
logger.info("✓ Get agent test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_get_agent: {str(e)}"
)
raise
def test_agent_registry_delete_agent():
"""Test deleting an agent from the registry."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Agent-To-Delete",
agent_description="Agent that will be deleted",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Agent-To-Keep",
agent_description="Agent that will remain",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
assert (
len(registry.agents) == 2
), "Registry should have two agents"
registry.delete("Agent-To-Delete")
assert (
len(registry.agents) == 1
), "Registry should have one agent after deletion"
assert (
"Agent-To-Delete" not in registry.agents
), "Deleted agent should not be in registry"
assert (
"Agent-To-Keep" in registry.agents
), "Other agent should still be in registry"
logger.info("✓ Delete agent test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_delete_agent: {str(e)}"
)
raise
def test_agent_registry_update_agent():
"""Test updating an agent in the registry."""
try:
registry = AgentRegistry()
original_agent = Agent(
agent_name="Agent-To-Update",
agent_description="Original description",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(original_agent)
updated_agent = Agent(
agent_name="Agent-To-Update",
agent_description="Updated description",
model_name="gpt-4o-mini",
max_loops=2,
verbose=True,
print_on=False,
streaming_on=True,
)
registry.update_agent("Agent-To-Update", updated_agent)
retrieved_agent = registry.get("Agent-To-Update")
assert (
retrieved_agent is not None
), "Updated agent should not be None"
assert (
retrieved_agent is updated_agent
), "Should return the updated agent"
assert (
retrieved_agent.max_loops == 2
), "Max loops should be updated"
assert (
retrieved_agent.verbose is True
), "Verbose should be updated"
logger.info("✓ Update agent test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_update_agent: {str(e)}"
)
raise
def test_agent_registry_list_agents():
"""Test listing all agent names in the registry."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="List-Agent-1",
agent_description="First agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="List-Agent-2",
agent_description="Second agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
agent_names = registry.list_agents()
assert (
agent_names is not None
), "Agent names list should not be None"
assert isinstance(agent_names, list), "Should return a list"
assert len(agent_names) == 2, "Should have two agent names"
assert (
"List-Agent-1" in agent_names
), "First agent name should be in list"
assert (
"List-Agent-2" in agent_names
), "Second agent name should be in list"
logger.info("✓ List agents test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_list_agents: {str(e)}"
)
raise
def test_agent_registry_return_all_agents():
"""Test returning all agents from the registry."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Return-Agent-1",
agent_description="First agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Return-Agent-2",
agent_description="Second agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
all_agents = registry.return_all_agents()
assert (
all_agents is not None
), "All agents list should not be None"
assert isinstance(all_agents, list), "Should return a list"
assert len(all_agents) == 2, "Should have two agents"
for agent in all_agents:
assert agent is not None, "Each agent should not be None"
assert hasattr(
agent, "agent_name"
), "Agent should have agent_name"
assert hasattr(
agent, "run"
), "Agent should have run method"
logger.info("✓ Return all agents test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_return_all_agents: {str(e)}"
)
raise
def test_agent_registry_query_with_condition():
"""Test querying agents with a condition."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Query-Agent-1",
agent_description="Agent with max_loops=1",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Query-Agent-2",
agent_description="Agent with max_loops=2",
model_name="gpt-4o-mini",
max_loops=2,
verbose=False,
print_on=False,
streaming_on=True,
)
agent3 = Agent(
agent_name="Query-Agent-3",
agent_description="Agent with max_loops=1",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
registry.add(agent3)
def condition_max_loops_1(agent):
return agent.max_loops == 1
filtered_agents = registry.query(condition_max_loops_1)
assert (
filtered_agents is not None
), "Filtered agents should not be None"
assert isinstance(
filtered_agents, list
), "Should return a list"
assert (
len(filtered_agents) == 2
), "Should have two agents with max_loops=1"
for agent in filtered_agents:
assert (
agent.max_loops == 1
), "All filtered agents should have max_loops=1"
logger.info("✓ Query with condition test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_query_with_condition: {str(e)}"
)
raise
def test_agent_registry_query_without_condition():
"""Test querying all agents without a condition."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Query-All-Agent-1",
agent_description="First agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Query-All-Agent-2",
agent_description="Second agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
all_agents = registry.query()
assert all_agents is not None, "All agents should not be None"
assert isinstance(all_agents, list), "Should return a list"
assert len(all_agents) == 2, "Should return all agents"
logger.info("✓ Query without condition test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_query_without_condition: {str(e)}"
)
raise
def test_agent_registry_find_agent_by_name():
"""Test finding an agent by name."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="Findable-Agent",
agent_description="Agent to find",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
found_agent = registry.find_agent_by_name("Findable-Agent")
assert (
found_agent is not None
), "Found agent should not be None"
assert (
found_agent.agent_name == "Findable-Agent"
), "Agent name should match"
assert hasattr(
found_agent, "run"
), "Agent should have run method"
logger.info("✓ Find agent by name test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_find_agent_by_name: {str(e)}"
)
raise
def test_agent_registry_find_agent_by_id():
"""Test finding an agent by ID."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="ID-Agent",
agent_description="Agent with ID",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
agent_id = agent.id
found_agent = registry.find_agent_by_id(agent.agent_name)
assert (
found_agent is not None
), "Found agent should not be None"
assert (
found_agent.agent_name == "ID-Agent"
), "Agent name should match"
logger.info("✓ Find agent by ID test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_find_agent_by_id: {str(e)}"
)
raise
def test_agent_registry_agents_to_json():
"""Test converting agents to JSON."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="JSON-Agent-1",
agent_description="First agent for JSON",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="JSON-Agent-2",
agent_description="Second agent for JSON",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
json_output = registry.agents_to_json()
assert (
json_output is not None
), "JSON output should not be None"
assert isinstance(json_output, str), "Should return a string"
assert len(json_output) > 0, "JSON should not be empty"
assert (
"JSON-Agent-1" in json_output
), "First agent should be in JSON"
assert (
"JSON-Agent-2" in json_output
), "Second agent should be in JSON"
import json
parsed_json = json.loads(json_output)
assert isinstance(
parsed_json, dict
), "Should be valid JSON dict"
assert len(parsed_json) == 2, "Should have two agents in JSON"
logger.info("✓ Agents to JSON test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_agents_to_json: {str(e)}"
)
raise
def test_agent_registry_initialization_with_agents():
"""Test initializing registry with agents."""
try:
agent1 = Agent(
agent_name="Init-Agent-1",
agent_description="First initial agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Init-Agent-2",
agent_description="Second initial agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry = AgentRegistry(agents=[agent1, agent2])
assert registry is not None, "Registry should not be None"
assert (
len(registry.agents) == 2
), "Registry should have two agents"
assert (
"Init-Agent-1" in registry.agents
), "First agent should be in registry"
assert (
"Init-Agent-2" in registry.agents
), "Second agent should be in registry"
logger.info("✓ Initialize with agents test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_initialization_with_agents: {str(e)}"
)
raise
def test_agent_registry_error_duplicate_agent():
"""Test error handling for duplicate agent names."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Duplicate-Agent",
agent_description="First agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Duplicate-Agent",
agent_description="Duplicate agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
try:
registry.add(agent2)
assert (
False
), "Should have raised ValueError for duplicate agent"
except ValueError as e:
assert (
"already exists" in str(e).lower()
), "Error message should mention duplicate"
assert (
len(registry.agents) == 1
), "Registry should still have only one agent"
logger.info(
"✓ Error handling for duplicate agent test passed"
)
except Exception as e:
logger.error(
f"Error in test_agent_registry_error_duplicate_agent: {str(e)}"
)
raise
def test_agent_registry_error_nonexistent_agent():
"""Test error handling for nonexistent agent."""
try:
registry = AgentRegistry()
try:
registry.get("Nonexistent-Agent")
assert (
False
), "Should have raised KeyError for nonexistent agent"
except KeyError as e:
assert e is not None, "Should raise KeyError"
try:
registry.delete("Nonexistent-Agent")
assert (
False
), "Should have raised KeyError for nonexistent agent"
except KeyError as e:
assert e is not None, "Should raise KeyError"
logger.info(
"✓ Error handling for nonexistent agent test passed"
)
except Exception as e:
logger.error(
f"Error in test_agent_registry_error_nonexistent_agent: {str(e)}"
)
raise
def test_agent_registry_retrieved_agents_can_run():
"""Test that retrieved agents can actually run tasks."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="Runnable-Registry-Agent",
agent_description="Agent for running tasks",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
retrieved_agent = registry.get("Runnable-Registry-Agent")
assert (
retrieved_agent is not None
), "Retrieved agent should not be None"
result = retrieved_agent.run("What is 2 + 2? Answer briefly.")
assert (
result is not None
), "Agent run result should not be None"
assert isinstance(result, str), "Result should be a string"
assert len(result) > 0, "Result should not be empty"
logger.info("✓ Retrieved agents can run test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_retrieved_agents_can_run: {str(e)}"
)
raise
def test_agent_registry_thread_safety():
"""Test thread safety of registry operations."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Thread-Agent-1",
agent_description="First thread agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Thread-Agent-2",
agent_description="Second thread agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
agent_names = registry.list_agents()
all_agents = registry.return_all_agents()
assert (
agent_names is not None
), "Agent names should not be None"
assert all_agents is not None, "All agents should not be None"
assert len(agent_names) == 2, "Should have two agent names"
assert len(all_agents) == 2, "Should have two agents"
logger.info("✓ Thread safety test passed")
except Exception as e:
logger.error(
f"Error in test_agent_registry_thread_safety: {str(e)}"
)
raise
if __name__ == "__main__":
import sys
test_dict = {
"test_agent_registry_initialization": test_agent_registry_initialization,
"test_agent_registry_add_single_agent": test_agent_registry_add_single_agent,
"test_agent_registry_add_multiple_agents": test_agent_registry_add_multiple_agents,
"test_agent_registry_get_agent": test_agent_registry_get_agent,
"test_agent_registry_delete_agent": test_agent_registry_delete_agent,
"test_agent_registry_update_agent": test_agent_registry_update_agent,
"test_agent_registry_list_agents": test_agent_registry_list_agents,
"test_agent_registry_return_all_agents": test_agent_registry_return_all_agents,
"test_agent_registry_query_with_condition": test_agent_registry_query_with_condition,
"test_agent_registry_query_without_condition": test_agent_registry_query_without_condition,
"test_agent_registry_find_agent_by_name": test_agent_registry_find_agent_by_name,
"test_agent_registry_find_agent_by_id": test_agent_registry_find_agent_by_id,
"test_agent_registry_agents_to_json": test_agent_registry_agents_to_json,
"test_agent_registry_initialization_with_agents": test_agent_registry_initialization_with_agents,
"test_agent_registry_error_duplicate_agent": test_agent_registry_error_duplicate_agent,
"test_agent_registry_error_nonexistent_agent": test_agent_registry_error_nonexistent_agent,
"test_agent_registry_retrieved_agents_can_run": test_agent_registry_retrieved_agents_can_run,
"test_agent_registry_thread_safety": test_agent_registry_thread_safety,
}
if len(sys.argv) > 1:
requested_tests = []
for test_name in sys.argv[1:]:
if test_name in test_dict:
requested_tests.append(test_dict[test_name])
elif test_name == "all" or test_name == "--all":
requested_tests = list(test_dict.values())
break
else:
print(f"⚠ Warning: Test '{test_name}' not found.")
print(
f"Available tests: {', '.join(test_dict.keys())}"
)
sys.exit(1)
tests_to_run = requested_tests
else:
tests_to_run = list(test_dict.values())
if len(tests_to_run) == 1:
print(f"Running: {tests_to_run[0].__name__}")
else:
print(f"Running {len(tests_to_run)} test(s)...")
passed = 0
failed = 0
for test_func in tests_to_run:
try:
print(f"\n{'='*60}")
print(f"Running: {test_func.__name__}")
print(f"{'='*60}")
test_func()
print(f"✓ PASSED: {test_func.__name__}")
passed += 1
except Exception as e:
print(f"✗ FAILED: {test_func.__name__}")
print(f" Error: {str(e)}")
import traceback
traceback.print_exc()
failed += 1
print(f"\n{'='*60}")
print(f"Test Summary: {passed} passed, {failed} failed")
print(f"{'='*60}")
if len(sys.argv) == 1:
print("\n💡 Tip: Run a specific test with:")
print(
" python test_agent_registry.py test_agent_registry_initialization"
)
print("\n Or use pytest:")
print(" pytest test_agent_registry.py")
print(
" pytest test_agent_registry.py::test_agent_registry_initialization"
)