Merge pull request #1197 from hughiwnl/test_base_structure

Added tests for base structure
pull/1190/merge
Kye Gomez 2 days ago committed by GitHub
commit 4db7e45908
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,813 @@
import os
try:
import pytest
except ImportError:
pytest = None
from loguru import logger
try:
from swarms.structs.agent_registry import AgentRegistry
from swarms.structs.agent import Agent
except (ImportError, ModuleNotFoundError) as e:
import importlib.util
_current_dir = os.path.dirname(os.path.abspath(__file__))
agent_registry_path = os.path.join(_current_dir, "..", "..", "swarms", "structs", "agent_registry.py")
agent_path = os.path.join(_current_dir, "..", "..", "swarms", "structs", "agent.py")
if os.path.exists(agent_registry_path):
spec = importlib.util.spec_from_file_location("agent_registry", agent_registry_path)
agent_registry_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(agent_registry_module)
AgentRegistry = agent_registry_module.AgentRegistry
if os.path.exists(agent_path):
spec = importlib.util.spec_from_file_location("agent", agent_path)
agent_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(agent_module)
Agent = agent_module.Agent
else:
raise ImportError(f"Could not find required modules") from e
logger.remove()
logger.add(lambda msg: None, level="ERROR")
def test_agent_registry_initialization():
"""Test AgentRegistry initialization."""
try:
registry = AgentRegistry()
assert registry is not None, "AgentRegistry should not be None"
assert registry.name == "Agent Registry", "Default name should be set"
assert registry.description == "A registry for managing agents.", "Default description should be set"
assert isinstance(registry.agents, dict), "Agents should be a dictionary"
assert len(registry.agents) == 0, "Initial registry should be empty"
registry2 = AgentRegistry(
name="Test Registry",
description="Test description",
return_json=False,
auto_save=True
)
assert registry2.name == "Test Registry", "Custom name should be set"
assert registry2.description == "Test description", "Custom description should be set"
assert registry2.return_json is False, "return_json should be False"
assert registry2.auto_save is True, "auto_save should be True"
logger.info("✓ AgentRegistry initialization test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_initialization: {str(e)}")
raise
def test_agent_registry_add_single_agent():
"""Test adding a single agent to the registry."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="Test-Agent-1",
agent_description="Test agent for registry",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
assert len(registry.agents) == 1, "Registry should have one agent"
assert "Test-Agent-1" in registry.agents, "Agent should be in registry"
assert registry.agents["Test-Agent-1"] is not None, "Agent object should not be None"
assert registry.agents["Test-Agent-1"].agent_name == "Test-Agent-1", "Agent name should match"
logger.info("✓ Add single agent test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_add_single_agent: {str(e)}")
raise
def test_agent_registry_add_multiple_agents():
"""Test adding multiple agents to the registry."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Test-Agent-1",
agent_description="First test agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Test-Agent-2",
agent_description="Second test agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent3 = Agent(
agent_name="Test-Agent-3",
agent_description="Third test agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add_many([agent1, agent2, agent3])
assert len(registry.agents) == 3, "Registry should have three agents"
assert "Test-Agent-1" in registry.agents, "Agent 1 should be in registry"
assert "Test-Agent-2" in registry.agents, "Agent 2 should be in registry"
assert "Test-Agent-3" in registry.agents, "Agent 3 should be in registry"
for agent_name in ["Test-Agent-1", "Test-Agent-2", "Test-Agent-3"]:
assert registry.agents[agent_name] is not None, f"{agent_name} should not be None"
logger.info("✓ Add multiple agents test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_add_multiple_agents: {str(e)}")
raise
def test_agent_registry_get_agent():
"""Test retrieving an agent from the registry."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="Retrievable-Agent",
agent_description="Agent for retrieval testing",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
retrieved_agent = registry.get("Retrievable-Agent")
assert retrieved_agent is not None, "Retrieved agent should not be None"
assert retrieved_agent.agent_name == "Retrievable-Agent", "Agent name should match"
assert hasattr(retrieved_agent, "run"), "Agent should have run method"
assert retrieved_agent is agent, "Should return the same agent object"
logger.info("✓ Get agent test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_get_agent: {str(e)}")
raise
def test_agent_registry_delete_agent():
"""Test deleting an agent from the registry."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Agent-To-Delete",
agent_description="Agent that will be deleted",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Agent-To-Keep",
agent_description="Agent that will remain",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
assert len(registry.agents) == 2, "Registry should have two agents"
registry.delete("Agent-To-Delete")
assert len(registry.agents) == 1, "Registry should have one agent after deletion"
assert "Agent-To-Delete" not in registry.agents, "Deleted agent should not be in registry"
assert "Agent-To-Keep" in registry.agents, "Other agent should still be in registry"
logger.info("✓ Delete agent test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_delete_agent: {str(e)}")
raise
def test_agent_registry_update_agent():
"""Test updating an agent in the registry."""
try:
registry = AgentRegistry()
original_agent = Agent(
agent_name="Agent-To-Update",
agent_description="Original description",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(original_agent)
updated_agent = Agent(
agent_name="Agent-To-Update",
agent_description="Updated description",
model_name="gpt-4o-mini",
max_loops=2,
verbose=True,
print_on=False,
streaming_on=True,
)
registry.update_agent("Agent-To-Update", updated_agent)
retrieved_agent = registry.get("Agent-To-Update")
assert retrieved_agent is not None, "Updated agent should not be None"
assert retrieved_agent is updated_agent, "Should return the updated agent"
assert retrieved_agent.max_loops == 2, "Max loops should be updated"
assert retrieved_agent.verbose is True, "Verbose should be updated"
logger.info("✓ Update agent test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_update_agent: {str(e)}")
raise
def test_agent_registry_list_agents():
"""Test listing all agent names in the registry."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="List-Agent-1",
agent_description="First agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="List-Agent-2",
agent_description="Second agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
agent_names = registry.list_agents()
assert agent_names is not None, "Agent names list should not be None"
assert isinstance(agent_names, list), "Should return a list"
assert len(agent_names) == 2, "Should have two agent names"
assert "List-Agent-1" in agent_names, "First agent name should be in list"
assert "List-Agent-2" in agent_names, "Second agent name should be in list"
logger.info("✓ List agents test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_list_agents: {str(e)}")
raise
def test_agent_registry_return_all_agents():
"""Test returning all agents from the registry."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Return-Agent-1",
agent_description="First agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Return-Agent-2",
agent_description="Second agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
all_agents = registry.return_all_agents()
assert all_agents is not None, "All agents list should not be None"
assert isinstance(all_agents, list), "Should return a list"
assert len(all_agents) == 2, "Should have two agents"
for agent in all_agents:
assert agent is not None, "Each agent should not be None"
assert hasattr(agent, "agent_name"), "Agent should have agent_name"
assert hasattr(agent, "run"), "Agent should have run method"
logger.info("✓ Return all agents test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_return_all_agents: {str(e)}")
raise
def test_agent_registry_query_with_condition():
"""Test querying agents with a condition."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Query-Agent-1",
agent_description="Agent with max_loops=1",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Query-Agent-2",
agent_description="Agent with max_loops=2",
model_name="gpt-4o-mini",
max_loops=2,
verbose=False,
print_on=False,
streaming_on=True,
)
agent3 = Agent(
agent_name="Query-Agent-3",
agent_description="Agent with max_loops=1",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
registry.add(agent3)
def condition_max_loops_1(agent):
return agent.max_loops == 1
filtered_agents = registry.query(condition_max_loops_1)
assert filtered_agents is not None, "Filtered agents should not be None"
assert isinstance(filtered_agents, list), "Should return a list"
assert len(filtered_agents) == 2, "Should have two agents with max_loops=1"
for agent in filtered_agents:
assert agent.max_loops == 1, "All filtered agents should have max_loops=1"
logger.info("✓ Query with condition test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_query_with_condition: {str(e)}")
raise
def test_agent_registry_query_without_condition():
"""Test querying all agents without a condition."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Query-All-Agent-1",
agent_description="First agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Query-All-Agent-2",
agent_description="Second agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
all_agents = registry.query()
assert all_agents is not None, "All agents should not be None"
assert isinstance(all_agents, list), "Should return a list"
assert len(all_agents) == 2, "Should return all agents"
logger.info("✓ Query without condition test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_query_without_condition: {str(e)}")
raise
def test_agent_registry_find_agent_by_name():
"""Test finding an agent by name."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="Findable-Agent",
agent_description="Agent to find",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
found_agent = registry.find_agent_by_name("Findable-Agent")
assert found_agent is not None, "Found agent should not be None"
assert found_agent.agent_name == "Findable-Agent", "Agent name should match"
assert hasattr(found_agent, "run"), "Agent should have run method"
logger.info("✓ Find agent by name test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_find_agent_by_name: {str(e)}")
raise
def test_agent_registry_find_agent_by_id():
"""Test finding an agent by ID."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="ID-Agent",
agent_description="Agent with ID",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
agent_id = agent.id
found_agent = registry.find_agent_by_id(agent.agent_name)
assert found_agent is not None, "Found agent should not be None"
assert found_agent.agent_name == "ID-Agent", "Agent name should match"
logger.info("✓ Find agent by ID test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_find_agent_by_id: {str(e)}")
raise
def test_agent_registry_agents_to_json():
"""Test converting agents to JSON."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="JSON-Agent-1",
agent_description="First agent for JSON",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="JSON-Agent-2",
agent_description="Second agent for JSON",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
json_output = registry.agents_to_json()
assert json_output is not None, "JSON output should not be None"
assert isinstance(json_output, str), "Should return a string"
assert len(json_output) > 0, "JSON should not be empty"
assert "JSON-Agent-1" in json_output, "First agent should be in JSON"
assert "JSON-Agent-2" in json_output, "Second agent should be in JSON"
import json
parsed_json = json.loads(json_output)
assert isinstance(parsed_json, dict), "Should be valid JSON dict"
assert len(parsed_json) == 2, "Should have two agents in JSON"
logger.info("✓ Agents to JSON test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_agents_to_json: {str(e)}")
raise
def test_agent_registry_initialization_with_agents():
"""Test initializing registry with agents."""
try:
agent1 = Agent(
agent_name="Init-Agent-1",
agent_description="First initial agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Init-Agent-2",
agent_description="Second initial agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry = AgentRegistry(agents=[agent1, agent2])
assert registry is not None, "Registry should not be None"
assert len(registry.agents) == 2, "Registry should have two agents"
assert "Init-Agent-1" in registry.agents, "First agent should be in registry"
assert "Init-Agent-2" in registry.agents, "Second agent should be in registry"
logger.info("✓ Initialize with agents test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_initialization_with_agents: {str(e)}")
raise
def test_agent_registry_error_duplicate_agent():
"""Test error handling for duplicate agent names."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Duplicate-Agent",
agent_description="First agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Duplicate-Agent",
agent_description="Duplicate agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
try:
registry.add(agent2)
assert False, "Should have raised ValueError for duplicate agent"
except ValueError as e:
assert "already exists" in str(e).lower(), "Error message should mention duplicate"
assert len(registry.agents) == 1, "Registry should still have only one agent"
logger.info("✓ Error handling for duplicate agent test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_error_duplicate_agent: {str(e)}")
raise
def test_agent_registry_error_nonexistent_agent():
"""Test error handling for nonexistent agent."""
try:
registry = AgentRegistry()
try:
registry.get("Nonexistent-Agent")
assert False, "Should have raised KeyError for nonexistent agent"
except KeyError as e:
assert e is not None, "Should raise KeyError"
try:
registry.delete("Nonexistent-Agent")
assert False, "Should have raised KeyError for nonexistent agent"
except KeyError as e:
assert e is not None, "Should raise KeyError"
logger.info("✓ Error handling for nonexistent agent test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_error_nonexistent_agent: {str(e)}")
raise
def test_agent_registry_retrieved_agents_can_run():
"""Test that retrieved agents can actually run tasks."""
try:
registry = AgentRegistry()
agent = Agent(
agent_name="Runnable-Registry-Agent",
agent_description="Agent for running tasks",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent)
retrieved_agent = registry.get("Runnable-Registry-Agent")
assert retrieved_agent is not None, "Retrieved agent should not be None"
result = retrieved_agent.run("What is 2 + 2? Answer briefly.")
assert result is not None, "Agent run result should not be None"
assert isinstance(result, str), "Result should be a string"
assert len(result) > 0, "Result should not be empty"
logger.info("✓ Retrieved agents can run test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_retrieved_agents_can_run: {str(e)}")
raise
def test_agent_registry_thread_safety():
"""Test thread safety of registry operations."""
try:
registry = AgentRegistry()
agent1 = Agent(
agent_name="Thread-Agent-1",
agent_description="First thread agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
agent2 = Agent(
agent_name="Thread-Agent-2",
agent_description="Second thread agent",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
print_on=False,
streaming_on=True,
)
registry.add(agent1)
registry.add(agent2)
agent_names = registry.list_agents()
all_agents = registry.return_all_agents()
assert agent_names is not None, "Agent names should not be None"
assert all_agents is not None, "All agents should not be None"
assert len(agent_names) == 2, "Should have two agent names"
assert len(all_agents) == 2, "Should have two agents"
logger.info("✓ Thread safety test passed")
except Exception as e:
logger.error(f"Error in test_agent_registry_thread_safety: {str(e)}")
raise
if __name__ == "__main__":
import sys
test_dict = {
"test_agent_registry_initialization": test_agent_registry_initialization,
"test_agent_registry_add_single_agent": test_agent_registry_add_single_agent,
"test_agent_registry_add_multiple_agents": test_agent_registry_add_multiple_agents,
"test_agent_registry_get_agent": test_agent_registry_get_agent,
"test_agent_registry_delete_agent": test_agent_registry_delete_agent,
"test_agent_registry_update_agent": test_agent_registry_update_agent,
"test_agent_registry_list_agents": test_agent_registry_list_agents,
"test_agent_registry_return_all_agents": test_agent_registry_return_all_agents,
"test_agent_registry_query_with_condition": test_agent_registry_query_with_condition,
"test_agent_registry_query_without_condition": test_agent_registry_query_without_condition,
"test_agent_registry_find_agent_by_name": test_agent_registry_find_agent_by_name,
"test_agent_registry_find_agent_by_id": test_agent_registry_find_agent_by_id,
"test_agent_registry_agents_to_json": test_agent_registry_agents_to_json,
"test_agent_registry_initialization_with_agents": test_agent_registry_initialization_with_agents,
"test_agent_registry_error_duplicate_agent": test_agent_registry_error_duplicate_agent,
"test_agent_registry_error_nonexistent_agent": test_agent_registry_error_nonexistent_agent,
"test_agent_registry_retrieved_agents_can_run": test_agent_registry_retrieved_agents_can_run,
"test_agent_registry_thread_safety": test_agent_registry_thread_safety,
}
if len(sys.argv) > 1:
requested_tests = []
for test_name in sys.argv[1:]:
if test_name in test_dict:
requested_tests.append(test_dict[test_name])
elif test_name == "all" or test_name == "--all":
requested_tests = list(test_dict.values())
break
else:
print(f"⚠ Warning: Test '{test_name}' not found.")
print(f"Available tests: {', '.join(test_dict.keys())}")
sys.exit(1)
tests_to_run = requested_tests
else:
tests_to_run = list(test_dict.values())
if len(tests_to_run) == 1:
print(f"Running: {tests_to_run[0].__name__}")
else:
print(f"Running {len(tests_to_run)} test(s)...")
passed = 0
failed = 0
for test_func in tests_to_run:
try:
print(f"\n{'='*60}")
print(f"Running: {test_func.__name__}")
print(f"{'='*60}")
test_func()
print(f"✓ PASSED: {test_func.__name__}")
passed += 1
except Exception as e:
print(f"✗ FAILED: {test_func.__name__}")
print(f" Error: {str(e)}")
import traceback
traceback.print_exc()
failed += 1
print(f"\n{'='*60}")
print(f"Test Summary: {passed} passed, {failed} failed")
print(f"{'='*60}")
if len(sys.argv) == 1:
print("\n💡 Tip: Run a specific test with:")
print(" python test_agent_registry.py test_agent_registry_initialization")
print("\n Or use pytest:")
print(" pytest test_agent_registry.py")
print(" pytest test_agent_registry.py::test_agent_registry_initialization")

@ -0,0 +1,810 @@
import os
import tempfile
import asyncio
import json
try:
import pytest
except ImportError:
pytest = None
from loguru import logger
try:
from swarms.structs.base_structure import BaseStructure
except (ImportError, ModuleNotFoundError) as e:
import importlib.util
_current_dir = os.path.dirname(os.path.abspath(__file__))
base_structure_path = os.path.join(_current_dir, "..", "..", "swarms", "structs", "base_structure.py")
if os.path.exists(base_structure_path):
spec = importlib.util.spec_from_file_location("base_structure", base_structure_path)
base_structure_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(base_structure_module)
BaseStructure = base_structure_module.BaseStructure
else:
raise ImportError(f"Could not find base_structure.py at {base_structure_path}") from e
logger.remove()
logger.add(lambda msg: None, level="ERROR")
class TestStructure(BaseStructure):
def run(self, task: str = "test"):
return f"Processed: {task}"
def test_base_structure_initialization():
"""Test BaseStructure initialization."""
try:
structure = BaseStructure()
assert structure is not None, "BaseStructure should not be None"
assert structure.name is None, "Default name should be None"
assert structure.description is None, "Default description should be None"
assert structure.save_metadata_on is True, "save_metadata_on should default to True"
assert structure.save_artifact_path == "./artifacts", "Default artifact path should be set"
assert structure.save_metadata_path == "./metadata", "Default metadata path should be set"
assert structure.save_error_path == "./errors", "Default error path should be set"
assert structure.workspace_dir == "./workspace", "Default workspace dir should be set"
structure2 = BaseStructure(
name="TestStructure",
description="Test description",
save_metadata_on=False,
save_artifact_path="/tmp/artifacts",
save_metadata_path="/tmp/metadata",
save_error_path="/tmp/errors",
workspace_dir="/tmp/workspace"
)
assert structure2.name == "TestStructure", "Custom name should be set"
assert structure2.description == "Test description", "Custom description should be set"
assert structure2.save_metadata_on is False, "save_metadata_on should be False"
assert structure2.save_artifact_path == "/tmp/artifacts", "Custom artifact path should be set"
logger.info("✓ BaseStructure initialization test passed")
except Exception as e:
logger.error(f"Error in test_base_structure_initialization: {str(e)}")
raise
def test_save_and_load_file():
"""Test saving and loading files."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(name="TestFileOps")
test_file = os.path.join(tmpdir, "test_data.json")
test_data = {"key": "value", "number": 42, "list": [1, 2, 3]}
structure.save_to_file(test_data, test_file)
assert os.path.exists(test_file), "File should be created"
loaded_data = structure.load_from_file(test_file)
assert loaded_data is not None, "Loaded data should not be None"
assert isinstance(loaded_data, dict), "Loaded data should be a dict"
assert loaded_data["key"] == "value", "Data should match"
assert loaded_data["number"] == 42, "Number should match"
assert loaded_data["list"] == [1, 2, 3], "List should match"
logger.info("✓ Save and load file test passed")
except Exception as e:
logger.error(f"Error in test_save_and_load_file: {str(e)}")
raise
def test_save_and_load_metadata():
"""Test saving and loading metadata."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestMetadata",
save_metadata_path=tmpdir
)
metadata = {"timestamp": "2024-01-01", "status": "active", "count": 5}
structure.save_metadata(metadata)
metadata_file = os.path.join(tmpdir, "TestMetadata_metadata.json")
assert os.path.exists(metadata_file), "Metadata file should be created"
loaded_metadata = structure.load_metadata()
assert loaded_metadata is not None, "Loaded metadata should not be None"
assert isinstance(loaded_metadata, dict), "Metadata should be a dict"
assert loaded_metadata["status"] == "active", "Metadata should match"
assert loaded_metadata["count"] == 5, "Count should match"
logger.info("✓ Save and load metadata test passed")
except Exception as e:
logger.error(f"Error in test_save_and_load_metadata: {str(e)}")
raise
def test_save_and_load_artifact():
"""Test saving and loading artifacts."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestArtifact",
save_artifact_path=tmpdir
)
artifact = {"result": "success", "data": [1, 2, 3, 4, 5]}
structure.save_artifact(artifact, "test_artifact")
artifact_file = os.path.join(tmpdir, "test_artifact.json")
assert os.path.exists(artifact_file), "Artifact file should be created"
loaded_artifact = structure.load_artifact("test_artifact")
assert loaded_artifact is not None, "Loaded artifact should not be None"
assert isinstance(loaded_artifact, dict), "Artifact should be a dict"
assert loaded_artifact["result"] == "success", "Artifact result should match"
assert len(loaded_artifact["data"]) == 5, "Artifact data should match"
logger.info("✓ Save and load artifact test passed")
except Exception as e:
logger.error(f"Error in test_save_and_load_artifact: {str(e)}")
raise
def test_log_error():
"""Test error logging."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestErrorLog",
save_error_path=tmpdir
)
error_message = "Test error message"
structure.log_error(error_message)
error_file = os.path.join(tmpdir, "TestErrorLog_errors.log")
assert os.path.exists(error_file), "Error log file should be created"
with open(error_file, "r") as f:
content = f.read()
assert error_message in content, "Error message should be in log"
structure.log_error("Another error")
with open(error_file, "r") as f:
content = f.read()
assert "Another error" in content, "Second error should be in log"
logger.info("✓ Log error test passed")
except Exception as e:
logger.error(f"Error in test_log_error: {str(e)}")
raise
def test_log_event():
"""Test event logging."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestEventLog",
save_metadata_path=tmpdir
)
event_message = "Test event occurred"
structure.log_event(event_message, "INFO")
event_file = os.path.join(tmpdir, "TestEventLog_events.log")
assert os.path.exists(event_file), "Event log file should be created"
with open(event_file, "r") as f:
content = f.read()
assert event_message in content, "Event message should be in log"
assert "INFO" in content, "Event type should be in log"
structure.log_event("Warning event", "WARNING")
with open(event_file, "r") as f:
content = f.read()
assert "WARNING" in content, "Warning type should be in log"
logger.info("✓ Log event test passed")
except Exception as e:
logger.error(f"Error in test_log_event: {str(e)}")
raise
def test_compress_and_decompress_data():
"""Test data compression and decompression."""
try:
structure = BaseStructure()
test_data = {"key": "value", "large_data": "x" * 1000}
compressed = structure.compress_data(test_data)
assert compressed is not None, "Compressed data should not be None"
assert isinstance(compressed, bytes), "Compressed data should be bytes"
assert len(compressed) < len(json.dumps(test_data).encode()), "Compressed should be smaller"
decompressed = structure.decompres_data(compressed)
assert decompressed is not None, "Decompressed data should not be None"
assert isinstance(decompressed, dict), "Decompressed data should be a dict"
assert decompressed["key"] == "value", "Decompressed data should match"
assert len(decompressed["large_data"]) == 1000, "Large data should match"
logger.info("✓ Compress and decompress data test passed")
except Exception as e:
logger.error(f"Error in test_compress_and_decompress_data: {str(e)}")
raise
def test_to_dict():
"""Test converting structure to dictionary."""
try:
structure = BaseStructure(
name="TestDict",
description="Test description"
)
structure_dict = structure.to_dict()
assert structure_dict is not None, "Dictionary should not be None"
assert isinstance(structure_dict, dict), "Should return a dict"
assert structure_dict["name"] == "TestDict", "Name should be in dict"
assert structure_dict["description"] == "Test description", "Description should be in dict"
logger.info("✓ To dict test passed")
except Exception as e:
logger.error(f"Error in test_to_dict: {str(e)}")
raise
def test_to_json():
"""Test converting structure to JSON."""
try:
structure = BaseStructure(
name="TestJSON",
description="Test JSON description"
)
json_output = structure.to_json()
assert json_output is not None, "JSON output should not be None"
assert isinstance(json_output, str), "Should return a string"
assert "TestJSON" in json_output, "Name should be in JSON"
assert "Test JSON description" in json_output, "Description should be in JSON"
parsed = json.loads(json_output)
assert isinstance(parsed, dict), "Should be valid JSON dict"
logger.info("✓ To JSON test passed")
except Exception as e:
logger.error(f"Error in test_to_json: {str(e)}")
raise
def test_to_yaml():
"""Test converting structure to YAML."""
try:
structure = BaseStructure(
name="TestYAML",
description="Test YAML description"
)
yaml_output = structure.to_yaml()
assert yaml_output is not None, "YAML output should not be None"
assert isinstance(yaml_output, str), "Should return a string"
assert "TestYAML" in yaml_output, "Name should be in YAML"
logger.info("✓ To YAML test passed")
except Exception as e:
logger.error(f"Error in test_to_yaml: {str(e)}")
raise
def test_to_toml():
"""Test converting structure to TOML."""
try:
structure = BaseStructure(
name="TestTOML",
description="Test TOML description"
)
toml_output = structure.to_toml()
assert toml_output is not None, "TOML output should not be None"
assert isinstance(toml_output, str), "Should return a string"
logger.info("✓ To TOML test passed")
except Exception as e:
logger.error(f"Error in test_to_toml: {str(e)}")
raise
def test_run_async():
"""Test async run method."""
try:
structure = TestStructure(name="TestAsync")
async def run_test():
result = await structure.run_async("test_task")
return result
result = asyncio.run(run_test())
assert result is not None, "Async result should not be None"
logger.info("✓ Run async test passed")
except Exception as e:
logger.error(f"Error in test_run_async: {str(e)}")
raise
def test_save_metadata_async():
"""Test async save metadata."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestAsyncMetadata",
save_metadata_path=tmpdir
)
metadata = {"async": "test", "value": 123}
async def save_test():
await structure.save_metadata_async(metadata)
asyncio.run(save_test())
loaded = structure.load_metadata()
assert loaded is not None, "Loaded metadata should not be None"
assert loaded["async"] == "test", "Metadata should match"
logger.info("✓ Save metadata async test passed")
except Exception as e:
logger.error(f"Error in test_save_metadata_async: {str(e)}")
raise
def test_load_metadata_async():
"""Test async load metadata."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestAsyncLoad",
save_metadata_path=tmpdir
)
metadata = {"load": "async", "number": 456}
structure.save_metadata(metadata)
async def load_test():
return await structure.load_metadata_async()
loaded = asyncio.run(load_test())
assert loaded is not None, "Loaded metadata should not be None"
assert loaded["load"] == "async", "Metadata should match"
logger.info("✓ Load metadata async test passed")
except Exception as e:
logger.error(f"Error in test_load_metadata_async: {str(e)}")
raise
def test_save_artifact_async():
"""Test async save artifact."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestAsyncArtifact",
save_artifact_path=tmpdir
)
artifact = {"async_artifact": True, "data": [1, 2, 3]}
async def save_test():
await structure.save_artifact_async(artifact, "async_artifact")
asyncio.run(save_test())
loaded = structure.load_artifact("async_artifact")
assert loaded is not None, "Loaded artifact should not be None"
assert loaded["async_artifact"] is True, "Artifact should match"
logger.info("✓ Save artifact async test passed")
except Exception as e:
logger.error(f"Error in test_save_artifact_async: {str(e)}")
raise
def test_load_artifact_async():
"""Test async load artifact."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestAsyncLoadArtifact",
save_artifact_path=tmpdir
)
artifact = {"load_async": True, "items": ["a", "b", "c"]}
structure.save_artifact(artifact, "load_async_artifact")
async def load_test():
return await structure.load_artifact_async("load_async_artifact")
loaded = asyncio.run(load_test())
assert loaded is not None, "Loaded artifact should not be None"
assert loaded["load_async"] is True, "Artifact should match"
logger.info("✓ Load artifact async test passed")
except Exception as e:
logger.error(f"Error in test_load_artifact_async: {str(e)}")
raise
def test_asave_and_aload_from_file():
"""Test async save and load from file."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure()
test_file = os.path.join(tmpdir, "async_test.json")
test_data = {"async": "file", "test": True}
async def save_and_load():
await structure.asave_to_file(test_data, test_file)
return await structure.aload_from_file(test_file)
loaded = asyncio.run(save_and_load())
assert loaded is not None, "Loaded data should not be None"
assert loaded["async"] == "file", "Data should match"
assert loaded["test"] is True, "Boolean should match"
logger.info("✓ Async save and load from file test passed")
except Exception as e:
logger.error(f"Error in test_asave_and_aload_from_file: {str(e)}")
raise
def test_run_in_thread():
"""Test running in thread."""
try:
structure = TestStructure(name="TestThread")
future = structure.run_in_thread("thread_task")
result = future.result()
assert result is not None, "Thread result should not be None"
logger.info("✓ Run in thread test passed")
except Exception as e:
logger.error(f"Error in test_run_in_thread: {str(e)}")
raise
def test_save_metadata_in_thread():
"""Test saving metadata in thread."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestThreadMetadata",
save_metadata_path=tmpdir
)
metadata = {"thread": "test", "value": 789}
future = structure.save_metadata_in_thread(metadata)
future.result()
loaded = structure.load_metadata()
assert loaded is not None, "Loaded metadata should not be None"
assert loaded["thread"] == "test", "Metadata should match"
logger.info("✓ Save metadata in thread test passed")
except Exception as e:
logger.error(f"Error in test_save_metadata_in_thread: {str(e)}")
raise
def test_run_batched():
"""Test batched execution."""
try:
structure = TestStructure(name="TestBatched")
batched_data = ["task1", "task2", "task3", "task4", "task5"]
results = structure.run_batched(batched_data, batch_size=3)
assert results is not None, "Results should not be None"
assert isinstance(results, list), "Results should be a list"
assert len(results) == 5, "Should have 5 results"
for result in results:
assert result is not None, "Each result should not be None"
assert "Processed:" in result, "Result should contain processed message"
logger.info("✓ Run batched test passed")
except Exception as e:
logger.error(f"Error in test_run_batched: {str(e)}")
raise
def test_load_config():
"""Test loading configuration."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure()
config_file = os.path.join(tmpdir, "config.json")
config_data = {"setting1": "value1", "setting2": 42}
structure.save_to_file(config_data, config_file)
loaded_config = structure.load_config(config_file)
assert loaded_config is not None, "Loaded config should not be None"
assert isinstance(loaded_config, dict), "Config should be a dict"
assert loaded_config["setting1"] == "value1", "Config should match"
assert loaded_config["setting2"] == 42, "Config number should match"
logger.info("✓ Load config test passed")
except Exception as e:
logger.error(f"Error in test_load_config: {str(e)}")
raise
def test_backup_data():
"""Test backing up data."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure()
backup_path = os.path.join(tmpdir, "backups")
os.makedirs(backup_path, exist_ok=True)
backup_data = {"backup": "test", "items": [1, 2, 3]}
structure.backup_data(backup_data, backup_path)
backup_files = os.listdir(backup_path)
assert len(backup_files) > 0, "Backup file should be created"
backup_file = os.path.join(backup_path, backup_files[0])
loaded_backup = structure.load_from_file(backup_file)
assert loaded_backup is not None, "Loaded backup should not be None"
assert loaded_backup["backup"] == "test", "Backup data should match"
logger.info("✓ Backup data test passed")
except Exception as e:
logger.error(f"Error in test_backup_data: {str(e)}")
raise
def test_monitor_resources():
"""Test resource monitoring."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = BaseStructure(
name="TestResources",
save_metadata_path=tmpdir
)
structure.monitor_resources()
event_file = os.path.join(tmpdir, "TestResources_events.log")
assert os.path.exists(event_file), "Event log should be created"
with open(event_file, "r") as f:
content = f.read()
assert "Resource usage" in content, "Resource usage should be logged"
assert "Memory" in content, "Memory should be logged"
assert "CPU" in content, "CPU should be logged"
logger.info("✓ Monitor resources test passed")
except Exception as e:
logger.error(f"Error in test_monitor_resources: {str(e)}")
raise
def test_run_with_resources():
"""Test running with resource monitoring."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = TestStructure(
name="TestRunResources",
save_metadata_path=tmpdir
)
result = structure.run_with_resources("monitored_task")
assert result is not None, "Result should not be None"
event_file = os.path.join(tmpdir, "TestRunResources_events.log")
assert os.path.exists(event_file), "Event log should be created"
logger.info("✓ Run with resources test passed")
except Exception as e:
logger.error(f"Error in test_run_with_resources: {str(e)}")
raise
def test_run_with_resources_batched():
"""Test batched execution with resource monitoring."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
structure = TestStructure(
name="TestBatchedResources",
save_metadata_path=tmpdir
)
batched_data = ["task1", "task2", "task3"]
results = structure.run_with_resources_batched(batched_data, batch_size=2)
assert results is not None, "Results should not be None"
assert isinstance(results, list), "Results should be a list"
assert len(results) == 3, "Should have 3 results"
event_file = os.path.join(tmpdir, "TestBatchedResources_events.log")
assert os.path.exists(event_file), "Event log should be created"
logger.info("✓ Run with resources batched test passed")
except Exception as e:
logger.error(f"Error in test_run_with_resources_batched: {str(e)}")
raise
def test_serialize_callable():
"""Test serializing callable attributes."""
try:
def test_function():
"""Test function docstring."""
pass
structure = BaseStructure()
serialized = structure._serialize_callable(test_function)
assert serialized is not None, "Serialized callable should not be None"
assert isinstance(serialized, dict), "Should return a dict"
assert "name" in serialized, "Should have name"
assert "doc" in serialized, "Should have doc"
assert serialized["name"] == "test_function", "Name should match"
logger.info("✓ Serialize callable test passed")
except Exception as e:
logger.error(f"Error in test_serialize_callable: {str(e)}")
raise
def test_serialize_attr():
"""Test serializing attributes."""
try:
structure = BaseStructure()
serialized_str = structure._serialize_attr("test_attr", "test_value")
assert serialized_str == "test_value", "String should serialize correctly"
serialized_dict = structure._serialize_attr("test_attr", {"key": "value"})
assert serialized_dict == {"key": "value"}, "Dict should serialize correctly"
def test_func():
pass
serialized_func = structure._serialize_attr("test_func", test_func)
assert isinstance(serialized_func, dict), "Function should serialize to dict"
logger.info("✓ Serialize attr test passed")
except Exception as e:
logger.error(f"Error in test_serialize_attr: {str(e)}")
raise
if __name__ == "__main__":
import sys
test_dict = {
"test_base_structure_initialization": test_base_structure_initialization,
"test_save_and_load_file": test_save_and_load_file,
"test_save_and_load_metadata": test_save_and_load_metadata,
"test_save_and_load_artifact": test_save_and_load_artifact,
"test_log_error": test_log_error,
"test_log_event": test_log_event,
"test_compress_and_decompress_data": test_compress_and_decompress_data,
"test_to_dict": test_to_dict,
"test_to_json": test_to_json,
"test_to_yaml": test_to_yaml,
"test_to_toml": test_to_toml,
"test_run_async": test_run_async,
"test_save_metadata_async": test_save_metadata_async,
"test_load_metadata_async": test_load_metadata_async,
"test_save_artifact_async": test_save_artifact_async,
"test_load_artifact_async": test_load_artifact_async,
"test_asave_and_aload_from_file": test_asave_and_aload_from_file,
"test_run_in_thread": test_run_in_thread,
"test_save_metadata_in_thread": test_save_metadata_in_thread,
"test_run_batched": test_run_batched,
"test_load_config": test_load_config,
"test_backup_data": test_backup_data,
"test_monitor_resources": test_monitor_resources,
"test_run_with_resources": test_run_with_resources,
"test_run_with_resources_batched": test_run_with_resources_batched,
"test_serialize_callable": test_serialize_callable,
"test_serialize_attr": test_serialize_attr,
}
if len(sys.argv) > 1:
requested_tests = []
for test_name in sys.argv[1:]:
if test_name in test_dict:
requested_tests.append(test_dict[test_name])
elif test_name == "all" or test_name == "--all":
requested_tests = list(test_dict.values())
break
else:
print(f"⚠ Warning: Test '{test_name}' not found.")
print(f"Available tests: {', '.join(test_dict.keys())}")
sys.exit(1)
tests_to_run = requested_tests
else:
tests_to_run = list(test_dict.values())
if len(tests_to_run) == 1:
print(f"Running: {tests_to_run[0].__name__}")
else:
print(f"Running {len(tests_to_run)} test(s)...")
passed = 0
failed = 0
for test_func in tests_to_run:
try:
print(f"\n{'='*60}")
print(f"Running: {test_func.__name__}")
print(f"{'='*60}")
test_func()
print(f"✓ PASSED: {test_func.__name__}")
passed += 1
except Exception as e:
print(f"✗ FAILED: {test_func.__name__}")
print(f" Error: {str(e)}")
import traceback
traceback.print_exc()
failed += 1
print(f"\n{'='*60}")
print(f"Test Summary: {passed} passed, {failed} failed")
print(f"{'='*60}")
if len(sys.argv) == 1:
print("\n💡 Tip: Run a specific test with:")
print(" python test_base_structure.py test_base_structure_initialization")
print("\n Or use pytest:")
print(" pytest test_base_structure.py")
print(" pytest test_base_structure.py::test_base_structure_initialization")
Loading…
Cancel
Save