You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
998 lines
29 KiB
998 lines
29 KiB
import os
|
|
|
|
try:
|
|
import pytest
|
|
except ImportError:
|
|
pytest = None
|
|
|
|
from loguru import logger
|
|
|
|
try:
|
|
from swarms.structs.agent_registry import AgentRegistry
|
|
from swarms.structs.agent import Agent
|
|
except (ImportError, ModuleNotFoundError) as e:
|
|
import importlib.util
|
|
|
|
_current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
agent_registry_path = os.path.join(
|
|
_current_dir,
|
|
"..",
|
|
"..",
|
|
"swarms",
|
|
"structs",
|
|
"agent_registry.py",
|
|
)
|
|
agent_path = os.path.join(
|
|
_current_dir, "..", "..", "swarms", "structs", "agent.py"
|
|
)
|
|
|
|
if os.path.exists(agent_registry_path):
|
|
spec = importlib.util.spec_from_file_location(
|
|
"agent_registry", agent_registry_path
|
|
)
|
|
agent_registry_module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(agent_registry_module)
|
|
AgentRegistry = agent_registry_module.AgentRegistry
|
|
|
|
if os.path.exists(agent_path):
|
|
spec = importlib.util.spec_from_file_location(
|
|
"agent", agent_path
|
|
)
|
|
agent_module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(agent_module)
|
|
Agent = agent_module.Agent
|
|
else:
|
|
raise ImportError("Could not find required modules") from e
|
|
|
|
logger.remove()
|
|
logger.add(lambda msg: None, level="ERROR")
|
|
|
|
|
|
def test_agent_registry_initialization():
|
|
"""Test AgentRegistry initialization."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
assert (
|
|
registry is not None
|
|
), "AgentRegistry should not be None"
|
|
assert (
|
|
registry.name == "Agent Registry"
|
|
), "Default name should be set"
|
|
assert (
|
|
registry.description == "A registry for managing agents."
|
|
), "Default description should be set"
|
|
assert isinstance(
|
|
registry.agents, dict
|
|
), "Agents should be a dictionary"
|
|
assert (
|
|
len(registry.agents) == 0
|
|
), "Initial registry should be empty"
|
|
|
|
registry2 = AgentRegistry(
|
|
name="Test Registry",
|
|
description="Test description",
|
|
return_json=False,
|
|
auto_save=True,
|
|
)
|
|
assert (
|
|
registry2.name == "Test Registry"
|
|
), "Custom name should be set"
|
|
assert (
|
|
registry2.description == "Test description"
|
|
), "Custom description should be set"
|
|
assert (
|
|
registry2.return_json is False
|
|
), "return_json should be False"
|
|
assert registry2.auto_save is True, "auto_save should be True"
|
|
|
|
logger.info("✓ AgentRegistry initialization test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_initialization: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_add_single_agent():
|
|
"""Test adding a single agent to the registry."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent = Agent(
|
|
agent_name="Test-Agent-1",
|
|
agent_description="Test agent for registry",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent)
|
|
|
|
assert (
|
|
len(registry.agents) == 1
|
|
), "Registry should have one agent"
|
|
assert (
|
|
"Test-Agent-1" in registry.agents
|
|
), "Agent should be in registry"
|
|
assert (
|
|
registry.agents["Test-Agent-1"] is not None
|
|
), "Agent object should not be None"
|
|
assert (
|
|
registry.agents["Test-Agent-1"].agent_name
|
|
== "Test-Agent-1"
|
|
), "Agent name should match"
|
|
|
|
logger.info("✓ Add single agent test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_add_single_agent: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_add_multiple_agents():
|
|
"""Test adding multiple agents to the registry."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent1 = Agent(
|
|
agent_name="Test-Agent-1",
|
|
agent_description="First test agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="Test-Agent-2",
|
|
agent_description="Second test agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent3 = Agent(
|
|
agent_name="Test-Agent-3",
|
|
agent_description="Third test agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add_many([agent1, agent2, agent3])
|
|
|
|
assert (
|
|
len(registry.agents) == 3
|
|
), "Registry should have three agents"
|
|
assert (
|
|
"Test-Agent-1" in registry.agents
|
|
), "Agent 1 should be in registry"
|
|
assert (
|
|
"Test-Agent-2" in registry.agents
|
|
), "Agent 2 should be in registry"
|
|
assert (
|
|
"Test-Agent-3" in registry.agents
|
|
), "Agent 3 should be in registry"
|
|
|
|
for agent_name in [
|
|
"Test-Agent-1",
|
|
"Test-Agent-2",
|
|
"Test-Agent-3",
|
|
]:
|
|
assert (
|
|
registry.agents[agent_name] is not None
|
|
), f"{agent_name} should not be None"
|
|
|
|
logger.info("✓ Add multiple agents test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_add_multiple_agents: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_get_agent():
|
|
"""Test retrieving an agent from the registry."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent = Agent(
|
|
agent_name="Retrievable-Agent",
|
|
agent_description="Agent for retrieval testing",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent)
|
|
|
|
retrieved_agent = registry.get("Retrievable-Agent")
|
|
|
|
assert (
|
|
retrieved_agent is not None
|
|
), "Retrieved agent should not be None"
|
|
assert (
|
|
retrieved_agent.agent_name == "Retrievable-Agent"
|
|
), "Agent name should match"
|
|
assert hasattr(
|
|
retrieved_agent, "run"
|
|
), "Agent should have run method"
|
|
assert (
|
|
retrieved_agent is agent
|
|
), "Should return the same agent object"
|
|
|
|
logger.info("✓ Get agent test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_get_agent: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_delete_agent():
|
|
"""Test deleting an agent from the registry."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent1 = Agent(
|
|
agent_name="Agent-To-Delete",
|
|
agent_description="Agent that will be deleted",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="Agent-To-Keep",
|
|
agent_description="Agent that will remain",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent1)
|
|
registry.add(agent2)
|
|
|
|
assert (
|
|
len(registry.agents) == 2
|
|
), "Registry should have two agents"
|
|
|
|
registry.delete("Agent-To-Delete")
|
|
|
|
assert (
|
|
len(registry.agents) == 1
|
|
), "Registry should have one agent after deletion"
|
|
assert (
|
|
"Agent-To-Delete" not in registry.agents
|
|
), "Deleted agent should not be in registry"
|
|
assert (
|
|
"Agent-To-Keep" in registry.agents
|
|
), "Other agent should still be in registry"
|
|
|
|
logger.info("✓ Delete agent test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_delete_agent: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_update_agent():
|
|
"""Test updating an agent in the registry."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
original_agent = Agent(
|
|
agent_name="Agent-To-Update",
|
|
agent_description="Original description",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(original_agent)
|
|
|
|
updated_agent = Agent(
|
|
agent_name="Agent-To-Update",
|
|
agent_description="Updated description",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=2,
|
|
verbose=True,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.update_agent("Agent-To-Update", updated_agent)
|
|
|
|
retrieved_agent = registry.get("Agent-To-Update")
|
|
|
|
assert (
|
|
retrieved_agent is not None
|
|
), "Updated agent should not be None"
|
|
assert (
|
|
retrieved_agent is updated_agent
|
|
), "Should return the updated agent"
|
|
assert (
|
|
retrieved_agent.max_loops == 2
|
|
), "Max loops should be updated"
|
|
assert (
|
|
retrieved_agent.verbose is True
|
|
), "Verbose should be updated"
|
|
|
|
logger.info("✓ Update agent test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_update_agent: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_list_agents():
|
|
"""Test listing all agent names in the registry."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent1 = Agent(
|
|
agent_name="List-Agent-1",
|
|
agent_description="First agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="List-Agent-2",
|
|
agent_description="Second agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent1)
|
|
registry.add(agent2)
|
|
|
|
agent_names = registry.list_agents()
|
|
|
|
assert (
|
|
agent_names is not None
|
|
), "Agent names list should not be None"
|
|
assert isinstance(agent_names, list), "Should return a list"
|
|
assert len(agent_names) == 2, "Should have two agent names"
|
|
assert (
|
|
"List-Agent-1" in agent_names
|
|
), "First agent name should be in list"
|
|
assert (
|
|
"List-Agent-2" in agent_names
|
|
), "Second agent name should be in list"
|
|
|
|
logger.info("✓ List agents test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_list_agents: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_return_all_agents():
|
|
"""Test returning all agents from the registry."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent1 = Agent(
|
|
agent_name="Return-Agent-1",
|
|
agent_description="First agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="Return-Agent-2",
|
|
agent_description="Second agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent1)
|
|
registry.add(agent2)
|
|
|
|
all_agents = registry.return_all_agents()
|
|
|
|
assert (
|
|
all_agents is not None
|
|
), "All agents list should not be None"
|
|
assert isinstance(all_agents, list), "Should return a list"
|
|
assert len(all_agents) == 2, "Should have two agents"
|
|
|
|
for agent in all_agents:
|
|
assert agent is not None, "Each agent should not be None"
|
|
assert hasattr(
|
|
agent, "agent_name"
|
|
), "Agent should have agent_name"
|
|
assert hasattr(
|
|
agent, "run"
|
|
), "Agent should have run method"
|
|
|
|
logger.info("✓ Return all agents test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_return_all_agents: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_query_with_condition():
|
|
"""Test querying agents with a condition."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent1 = Agent(
|
|
agent_name="Query-Agent-1",
|
|
agent_description="Agent with max_loops=1",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="Query-Agent-2",
|
|
agent_description="Agent with max_loops=2",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=2,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent3 = Agent(
|
|
agent_name="Query-Agent-3",
|
|
agent_description="Agent with max_loops=1",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent1)
|
|
registry.add(agent2)
|
|
registry.add(agent3)
|
|
|
|
def condition_max_loops_1(agent):
|
|
return agent.max_loops == 1
|
|
|
|
filtered_agents = registry.query(condition_max_loops_1)
|
|
|
|
assert (
|
|
filtered_agents is not None
|
|
), "Filtered agents should not be None"
|
|
assert isinstance(
|
|
filtered_agents, list
|
|
), "Should return a list"
|
|
assert (
|
|
len(filtered_agents) == 2
|
|
), "Should have two agents with max_loops=1"
|
|
|
|
for agent in filtered_agents:
|
|
assert (
|
|
agent.max_loops == 1
|
|
), "All filtered agents should have max_loops=1"
|
|
|
|
logger.info("✓ Query with condition test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_query_with_condition: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_query_without_condition():
|
|
"""Test querying all agents without a condition."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent1 = Agent(
|
|
agent_name="Query-All-Agent-1",
|
|
agent_description="First agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="Query-All-Agent-2",
|
|
agent_description="Second agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent1)
|
|
registry.add(agent2)
|
|
|
|
all_agents = registry.query()
|
|
|
|
assert all_agents is not None, "All agents should not be None"
|
|
assert isinstance(all_agents, list), "Should return a list"
|
|
assert len(all_agents) == 2, "Should return all agents"
|
|
|
|
logger.info("✓ Query without condition test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_query_without_condition: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_find_agent_by_name():
|
|
"""Test finding an agent by name."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent = Agent(
|
|
agent_name="Findable-Agent",
|
|
agent_description="Agent to find",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent)
|
|
|
|
found_agent = registry.find_agent_by_name("Findable-Agent")
|
|
|
|
assert (
|
|
found_agent is not None
|
|
), "Found agent should not be None"
|
|
assert (
|
|
found_agent.agent_name == "Findable-Agent"
|
|
), "Agent name should match"
|
|
assert hasattr(
|
|
found_agent, "run"
|
|
), "Agent should have run method"
|
|
|
|
logger.info("✓ Find agent by name test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_find_agent_by_name: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_find_agent_by_id():
|
|
"""Test finding an agent by ID."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent = Agent(
|
|
agent_name="ID-Agent",
|
|
agent_description="Agent with ID",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent)
|
|
|
|
agent_id = agent.id
|
|
found_agent = registry.find_agent_by_id(agent.agent_name)
|
|
|
|
assert (
|
|
found_agent is not None
|
|
), "Found agent should not be None"
|
|
assert (
|
|
found_agent.agent_name == "ID-Agent"
|
|
), "Agent name should match"
|
|
|
|
logger.info("✓ Find agent by ID test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_find_agent_by_id: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_agents_to_json():
|
|
"""Test converting agents to JSON."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent1 = Agent(
|
|
agent_name="JSON-Agent-1",
|
|
agent_description="First agent for JSON",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="JSON-Agent-2",
|
|
agent_description="Second agent for JSON",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent1)
|
|
registry.add(agent2)
|
|
|
|
json_output = registry.agents_to_json()
|
|
|
|
assert (
|
|
json_output is not None
|
|
), "JSON output should not be None"
|
|
assert isinstance(json_output, str), "Should return a string"
|
|
assert len(json_output) > 0, "JSON should not be empty"
|
|
assert (
|
|
"JSON-Agent-1" in json_output
|
|
), "First agent should be in JSON"
|
|
assert (
|
|
"JSON-Agent-2" in json_output
|
|
), "Second agent should be in JSON"
|
|
|
|
import json
|
|
|
|
parsed_json = json.loads(json_output)
|
|
assert isinstance(
|
|
parsed_json, dict
|
|
), "Should be valid JSON dict"
|
|
assert len(parsed_json) == 2, "Should have two agents in JSON"
|
|
|
|
logger.info("✓ Agents to JSON test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_agents_to_json: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_initialization_with_agents():
|
|
"""Test initializing registry with agents."""
|
|
try:
|
|
agent1 = Agent(
|
|
agent_name="Init-Agent-1",
|
|
agent_description="First initial agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="Init-Agent-2",
|
|
agent_description="Second initial agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry = AgentRegistry(agents=[agent1, agent2])
|
|
|
|
assert registry is not None, "Registry should not be None"
|
|
assert (
|
|
len(registry.agents) == 2
|
|
), "Registry should have two agents"
|
|
assert (
|
|
"Init-Agent-1" in registry.agents
|
|
), "First agent should be in registry"
|
|
assert (
|
|
"Init-Agent-2" in registry.agents
|
|
), "Second agent should be in registry"
|
|
|
|
logger.info("✓ Initialize with agents test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_initialization_with_agents: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_error_duplicate_agent():
|
|
"""Test error handling for duplicate agent names."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent1 = Agent(
|
|
agent_name="Duplicate-Agent",
|
|
agent_description="First agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="Duplicate-Agent",
|
|
agent_description="Duplicate agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent1)
|
|
|
|
try:
|
|
registry.add(agent2)
|
|
assert (
|
|
False
|
|
), "Should have raised ValueError for duplicate agent"
|
|
except ValueError as e:
|
|
assert (
|
|
"already exists" in str(e).lower()
|
|
), "Error message should mention duplicate"
|
|
assert (
|
|
len(registry.agents) == 1
|
|
), "Registry should still have only one agent"
|
|
|
|
logger.info(
|
|
"✓ Error handling for duplicate agent test passed"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_error_duplicate_agent: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_error_nonexistent_agent():
|
|
"""Test error handling for nonexistent agent."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
try:
|
|
registry.get("Nonexistent-Agent")
|
|
assert (
|
|
False
|
|
), "Should have raised KeyError for nonexistent agent"
|
|
except KeyError as e:
|
|
assert e is not None, "Should raise KeyError"
|
|
|
|
try:
|
|
registry.delete("Nonexistent-Agent")
|
|
assert (
|
|
False
|
|
), "Should have raised KeyError for nonexistent agent"
|
|
except KeyError as e:
|
|
assert e is not None, "Should raise KeyError"
|
|
|
|
logger.info(
|
|
"✓ Error handling for nonexistent agent test passed"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_error_nonexistent_agent: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_retrieved_agents_can_run():
|
|
"""Test that retrieved agents can actually run tasks."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent = Agent(
|
|
agent_name="Runnable-Registry-Agent",
|
|
agent_description="Agent for running tasks",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent)
|
|
|
|
retrieved_agent = registry.get("Runnable-Registry-Agent")
|
|
|
|
assert (
|
|
retrieved_agent is not None
|
|
), "Retrieved agent should not be None"
|
|
|
|
result = retrieved_agent.run("What is 2 + 2? Answer briefly.")
|
|
|
|
assert (
|
|
result is not None
|
|
), "Agent run result should not be None"
|
|
assert isinstance(result, str), "Result should be a string"
|
|
assert len(result) > 0, "Result should not be empty"
|
|
|
|
logger.info("✓ Retrieved agents can run test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_retrieved_agents_can_run: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
def test_agent_registry_thread_safety():
|
|
"""Test thread safety of registry operations."""
|
|
try:
|
|
registry = AgentRegistry()
|
|
|
|
agent1 = Agent(
|
|
agent_name="Thread-Agent-1",
|
|
agent_description="First thread agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
agent2 = Agent(
|
|
agent_name="Thread-Agent-2",
|
|
agent_description="Second thread agent",
|
|
model_name="gpt-4o-mini",
|
|
max_loops=1,
|
|
verbose=False,
|
|
print_on=False,
|
|
streaming_on=True,
|
|
)
|
|
|
|
registry.add(agent1)
|
|
registry.add(agent2)
|
|
|
|
agent_names = registry.list_agents()
|
|
all_agents = registry.return_all_agents()
|
|
|
|
assert (
|
|
agent_names is not None
|
|
), "Agent names should not be None"
|
|
assert all_agents is not None, "All agents should not be None"
|
|
assert len(agent_names) == 2, "Should have two agent names"
|
|
assert len(all_agents) == 2, "Should have two agents"
|
|
|
|
logger.info("✓ Thread safety test passed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error in test_agent_registry_thread_safety: {str(e)}"
|
|
)
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
test_dict = {
|
|
"test_agent_registry_initialization": test_agent_registry_initialization,
|
|
"test_agent_registry_add_single_agent": test_agent_registry_add_single_agent,
|
|
"test_agent_registry_add_multiple_agents": test_agent_registry_add_multiple_agents,
|
|
"test_agent_registry_get_agent": test_agent_registry_get_agent,
|
|
"test_agent_registry_delete_agent": test_agent_registry_delete_agent,
|
|
"test_agent_registry_update_agent": test_agent_registry_update_agent,
|
|
"test_agent_registry_list_agents": test_agent_registry_list_agents,
|
|
"test_agent_registry_return_all_agents": test_agent_registry_return_all_agents,
|
|
"test_agent_registry_query_with_condition": test_agent_registry_query_with_condition,
|
|
"test_agent_registry_query_without_condition": test_agent_registry_query_without_condition,
|
|
"test_agent_registry_find_agent_by_name": test_agent_registry_find_agent_by_name,
|
|
"test_agent_registry_find_agent_by_id": test_agent_registry_find_agent_by_id,
|
|
"test_agent_registry_agents_to_json": test_agent_registry_agents_to_json,
|
|
"test_agent_registry_initialization_with_agents": test_agent_registry_initialization_with_agents,
|
|
"test_agent_registry_error_duplicate_agent": test_agent_registry_error_duplicate_agent,
|
|
"test_agent_registry_error_nonexistent_agent": test_agent_registry_error_nonexistent_agent,
|
|
"test_agent_registry_retrieved_agents_can_run": test_agent_registry_retrieved_agents_can_run,
|
|
"test_agent_registry_thread_safety": test_agent_registry_thread_safety,
|
|
}
|
|
|
|
if len(sys.argv) > 1:
|
|
requested_tests = []
|
|
for test_name in sys.argv[1:]:
|
|
if test_name in test_dict:
|
|
requested_tests.append(test_dict[test_name])
|
|
elif test_name == "all" or test_name == "--all":
|
|
requested_tests = list(test_dict.values())
|
|
break
|
|
else:
|
|
print(f"⚠ Warning: Test '{test_name}' not found.")
|
|
print(
|
|
f"Available tests: {', '.join(test_dict.keys())}"
|
|
)
|
|
sys.exit(1)
|
|
|
|
tests_to_run = requested_tests
|
|
else:
|
|
tests_to_run = list(test_dict.values())
|
|
|
|
if len(tests_to_run) == 1:
|
|
print(f"Running: {tests_to_run[0].__name__}")
|
|
else:
|
|
print(f"Running {len(tests_to_run)} test(s)...")
|
|
|
|
passed = 0
|
|
failed = 0
|
|
|
|
for test_func in tests_to_run:
|
|
try:
|
|
print(f"\n{'='*60}")
|
|
print(f"Running: {test_func.__name__}")
|
|
print(f"{'='*60}")
|
|
test_func()
|
|
print(f"✓ PASSED: {test_func.__name__}")
|
|
passed += 1
|
|
except Exception as e:
|
|
print(f"✗ FAILED: {test_func.__name__}")
|
|
print(f" Error: {str(e)}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
failed += 1
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"Test Summary: {passed} passed, {failed} failed")
|
|
print(f"{'='*60}")
|
|
|
|
if len(sys.argv) == 1:
|
|
print("\n💡 Tip: Run a specific test with:")
|
|
print(
|
|
" python test_agent_registry.py test_agent_registry_initialization"
|
|
)
|
|
print("\n Or use pytest:")
|
|
print(" pytest test_agent_registry.py")
|
|
print(
|
|
" pytest test_agent_registry.py::test_agent_registry_initialization"
|
|
)
|