diff --git a/tests/structs/test_agent_registry.py b/tests/structs/test_agent_registry.py new file mode 100644 index 00000000..f4877d21 --- /dev/null +++ b/tests/structs/test_agent_registry.py @@ -0,0 +1,813 @@ +import os + +try: + import pytest +except ImportError: + pytest = None + +from loguru import logger + +try: + from swarms.structs.agent_registry import AgentRegistry + from swarms.structs.agent import Agent +except (ImportError, ModuleNotFoundError) as e: + import importlib.util + _current_dir = os.path.dirname(os.path.abspath(__file__)) + + agent_registry_path = os.path.join(_current_dir, "..", "..", "swarms", "structs", "agent_registry.py") + agent_path = os.path.join(_current_dir, "..", "..", "swarms", "structs", "agent.py") + + if os.path.exists(agent_registry_path): + spec = importlib.util.spec_from_file_location("agent_registry", agent_registry_path) + agent_registry_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(agent_registry_module) + AgentRegistry = agent_registry_module.AgentRegistry + + if os.path.exists(agent_path): + spec = importlib.util.spec_from_file_location("agent", agent_path) + agent_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(agent_module) + Agent = agent_module.Agent + else: + raise ImportError(f"Could not find required modules") from e + +logger.remove() +logger.add(lambda msg: None, level="ERROR") + + +def test_agent_registry_initialization(): + """Test AgentRegistry initialization.""" + try: + registry = AgentRegistry() + assert registry is not None, "AgentRegistry should not be None" + assert registry.name == "Agent Registry", "Default name should be set" + assert registry.description == "A registry for managing agents.", "Default description should be set" + assert isinstance(registry.agents, dict), "Agents should be a dictionary" + assert len(registry.agents) == 0, "Initial registry should be empty" + + registry2 = AgentRegistry( + name="Test Registry", + description="Test description", + return_json=False, + auto_save=True + ) + assert registry2.name == "Test Registry", "Custom name should be set" + assert registry2.description == "Test description", "Custom description should be set" + assert registry2.return_json is False, "return_json should be False" + assert registry2.auto_save is True, "auto_save should be True" + + logger.info("✓ AgentRegistry initialization test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_initialization: {str(e)}") + raise + + +def test_agent_registry_add_single_agent(): + """Test adding a single agent to the registry.""" + try: + registry = AgentRegistry() + + agent = Agent( + agent_name="Test-Agent-1", + agent_description="Test agent for registry", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent) + + assert len(registry.agents) == 1, "Registry should have one agent" + assert "Test-Agent-1" in registry.agents, "Agent should be in registry" + assert registry.agents["Test-Agent-1"] is not None, "Agent object should not be None" + assert registry.agents["Test-Agent-1"].agent_name == "Test-Agent-1", "Agent name should match" + + logger.info("✓ Add single agent test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_add_single_agent: {str(e)}") + raise + + +def test_agent_registry_add_multiple_agents(): + """Test adding multiple agents to the registry.""" + try: + registry = AgentRegistry() + + agent1 = Agent( + agent_name="Test-Agent-1", + agent_description="First test agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="Test-Agent-2", + agent_description="Second test agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent3 = Agent( + agent_name="Test-Agent-3", + agent_description="Third test agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add_many([agent1, agent2, agent3]) + + assert len(registry.agents) == 3, "Registry should have three agents" + assert "Test-Agent-1" in registry.agents, "Agent 1 should be in registry" + assert "Test-Agent-2" in registry.agents, "Agent 2 should be in registry" + assert "Test-Agent-3" in registry.agents, "Agent 3 should be in registry" + + for agent_name in ["Test-Agent-1", "Test-Agent-2", "Test-Agent-3"]: + assert registry.agents[agent_name] is not None, f"{agent_name} should not be None" + + logger.info("✓ Add multiple agents test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_add_multiple_agents: {str(e)}") + raise + + +def test_agent_registry_get_agent(): + """Test retrieving an agent from the registry.""" + try: + registry = AgentRegistry() + + agent = Agent( + agent_name="Retrievable-Agent", + agent_description="Agent for retrieval testing", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent) + + retrieved_agent = registry.get("Retrievable-Agent") + + assert retrieved_agent is not None, "Retrieved agent should not be None" + assert retrieved_agent.agent_name == "Retrievable-Agent", "Agent name should match" + assert hasattr(retrieved_agent, "run"), "Agent should have run method" + assert retrieved_agent is agent, "Should return the same agent object" + + logger.info("✓ Get agent test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_get_agent: {str(e)}") + raise + + +def test_agent_registry_delete_agent(): + """Test deleting an agent from the registry.""" + try: + registry = AgentRegistry() + + agent1 = Agent( + agent_name="Agent-To-Delete", + agent_description="Agent that will be deleted", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="Agent-To-Keep", + agent_description="Agent that will remain", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent1) + registry.add(agent2) + + assert len(registry.agents) == 2, "Registry should have two agents" + + registry.delete("Agent-To-Delete") + + assert len(registry.agents) == 1, "Registry should have one agent after deletion" + assert "Agent-To-Delete" not in registry.agents, "Deleted agent should not be in registry" + assert "Agent-To-Keep" in registry.agents, "Other agent should still be in registry" + + logger.info("✓ Delete agent test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_delete_agent: {str(e)}") + raise + + +def test_agent_registry_update_agent(): + """Test updating an agent in the registry.""" + try: + registry = AgentRegistry() + + original_agent = Agent( + agent_name="Agent-To-Update", + agent_description="Original description", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(original_agent) + + updated_agent = Agent( + agent_name="Agent-To-Update", + agent_description="Updated description", + model_name="gpt-4o-mini", + max_loops=2, + verbose=True, + print_on=False, + streaming_on=True, + ) + + registry.update_agent("Agent-To-Update", updated_agent) + + retrieved_agent = registry.get("Agent-To-Update") + + assert retrieved_agent is not None, "Updated agent should not be None" + assert retrieved_agent is updated_agent, "Should return the updated agent" + assert retrieved_agent.max_loops == 2, "Max loops should be updated" + assert retrieved_agent.verbose is True, "Verbose should be updated" + + logger.info("✓ Update agent test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_update_agent: {str(e)}") + raise + + +def test_agent_registry_list_agents(): + """Test listing all agent names in the registry.""" + try: + registry = AgentRegistry() + + agent1 = Agent( + agent_name="List-Agent-1", + agent_description="First agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="List-Agent-2", + agent_description="Second agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent1) + registry.add(agent2) + + agent_names = registry.list_agents() + + assert agent_names is not None, "Agent names list should not be None" + assert isinstance(agent_names, list), "Should return a list" + assert len(agent_names) == 2, "Should have two agent names" + assert "List-Agent-1" in agent_names, "First agent name should be in list" + assert "List-Agent-2" in agent_names, "Second agent name should be in list" + + logger.info("✓ List agents test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_list_agents: {str(e)}") + raise + + +def test_agent_registry_return_all_agents(): + """Test returning all agents from the registry.""" + try: + registry = AgentRegistry() + + agent1 = Agent( + agent_name="Return-Agent-1", + agent_description="First agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="Return-Agent-2", + agent_description="Second agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent1) + registry.add(agent2) + + all_agents = registry.return_all_agents() + + assert all_agents is not None, "All agents list should not be None" + assert isinstance(all_agents, list), "Should return a list" + assert len(all_agents) == 2, "Should have two agents" + + for agent in all_agents: + assert agent is not None, "Each agent should not be None" + assert hasattr(agent, "agent_name"), "Agent should have agent_name" + assert hasattr(agent, "run"), "Agent should have run method" + + logger.info("✓ Return all agents test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_return_all_agents: {str(e)}") + raise + + +def test_agent_registry_query_with_condition(): + """Test querying agents with a condition.""" + try: + registry = AgentRegistry() + + agent1 = Agent( + agent_name="Query-Agent-1", + agent_description="Agent with max_loops=1", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="Query-Agent-2", + agent_description="Agent with max_loops=2", + model_name="gpt-4o-mini", + max_loops=2, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent3 = Agent( + agent_name="Query-Agent-3", + agent_description="Agent with max_loops=1", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent1) + registry.add(agent2) + registry.add(agent3) + + def condition_max_loops_1(agent): + return agent.max_loops == 1 + + filtered_agents = registry.query(condition_max_loops_1) + + assert filtered_agents is not None, "Filtered agents should not be None" + assert isinstance(filtered_agents, list), "Should return a list" + assert len(filtered_agents) == 2, "Should have two agents with max_loops=1" + + for agent in filtered_agents: + assert agent.max_loops == 1, "All filtered agents should have max_loops=1" + + logger.info("✓ Query with condition test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_query_with_condition: {str(e)}") + raise + + +def test_agent_registry_query_without_condition(): + """Test querying all agents without a condition.""" + try: + registry = AgentRegistry() + + agent1 = Agent( + agent_name="Query-All-Agent-1", + agent_description="First agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="Query-All-Agent-2", + agent_description="Second agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent1) + registry.add(agent2) + + all_agents = registry.query() + + assert all_agents is not None, "All agents should not be None" + assert isinstance(all_agents, list), "Should return a list" + assert len(all_agents) == 2, "Should return all agents" + + logger.info("✓ Query without condition test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_query_without_condition: {str(e)}") + raise + + +def test_agent_registry_find_agent_by_name(): + """Test finding an agent by name.""" + try: + registry = AgentRegistry() + + agent = Agent( + agent_name="Findable-Agent", + agent_description="Agent to find", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent) + + found_agent = registry.find_agent_by_name("Findable-Agent") + + assert found_agent is not None, "Found agent should not be None" + assert found_agent.agent_name == "Findable-Agent", "Agent name should match" + assert hasattr(found_agent, "run"), "Agent should have run method" + + logger.info("✓ Find agent by name test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_find_agent_by_name: {str(e)}") + raise + + +def test_agent_registry_find_agent_by_id(): + """Test finding an agent by ID.""" + try: + registry = AgentRegistry() + + agent = Agent( + agent_name="ID-Agent", + agent_description="Agent with ID", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent) + + agent_id = agent.id + found_agent = registry.find_agent_by_id(agent.agent_name) + + assert found_agent is not None, "Found agent should not be None" + assert found_agent.agent_name == "ID-Agent", "Agent name should match" + + logger.info("✓ Find agent by ID test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_find_agent_by_id: {str(e)}") + raise + + +def test_agent_registry_agents_to_json(): + """Test converting agents to JSON.""" + try: + registry = AgentRegistry() + + agent1 = Agent( + agent_name="JSON-Agent-1", + agent_description="First agent for JSON", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="JSON-Agent-2", + agent_description="Second agent for JSON", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent1) + registry.add(agent2) + + json_output = registry.agents_to_json() + + assert json_output is not None, "JSON output should not be None" + assert isinstance(json_output, str), "Should return a string" + assert len(json_output) > 0, "JSON should not be empty" + assert "JSON-Agent-1" in json_output, "First agent should be in JSON" + assert "JSON-Agent-2" in json_output, "Second agent should be in JSON" + + import json + parsed_json = json.loads(json_output) + assert isinstance(parsed_json, dict), "Should be valid JSON dict" + assert len(parsed_json) == 2, "Should have two agents in JSON" + + logger.info("✓ Agents to JSON test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_agents_to_json: {str(e)}") + raise + + +def test_agent_registry_initialization_with_agents(): + """Test initializing registry with agents.""" + try: + agent1 = Agent( + agent_name="Init-Agent-1", + agent_description="First initial agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="Init-Agent-2", + agent_description="Second initial agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry = AgentRegistry(agents=[agent1, agent2]) + + assert registry is not None, "Registry should not be None" + assert len(registry.agents) == 2, "Registry should have two agents" + assert "Init-Agent-1" in registry.agents, "First agent should be in registry" + assert "Init-Agent-2" in registry.agents, "Second agent should be in registry" + + logger.info("✓ Initialize with agents test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_initialization_with_agents: {str(e)}") + raise + + +def test_agent_registry_error_duplicate_agent(): + """Test error handling for duplicate agent names.""" + try: + registry = AgentRegistry() + + agent1 = Agent( + agent_name="Duplicate-Agent", + agent_description="First agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="Duplicate-Agent", + agent_description="Duplicate agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent1) + + try: + registry.add(agent2) + assert False, "Should have raised ValueError for duplicate agent" + except ValueError as e: + assert "already exists" in str(e).lower(), "Error message should mention duplicate" + assert len(registry.agents) == 1, "Registry should still have only one agent" + + logger.info("✓ Error handling for duplicate agent test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_error_duplicate_agent: {str(e)}") + raise + + +def test_agent_registry_error_nonexistent_agent(): + """Test error handling for nonexistent agent.""" + try: + registry = AgentRegistry() + + try: + registry.get("Nonexistent-Agent") + assert False, "Should have raised KeyError for nonexistent agent" + except KeyError as e: + assert e is not None, "Should raise KeyError" + + try: + registry.delete("Nonexistent-Agent") + assert False, "Should have raised KeyError for nonexistent agent" + except KeyError as e: + assert e is not None, "Should raise KeyError" + + logger.info("✓ Error handling for nonexistent agent test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_error_nonexistent_agent: {str(e)}") + raise + + +def test_agent_registry_retrieved_agents_can_run(): + """Test that retrieved agents can actually run tasks.""" + try: + registry = AgentRegistry() + + agent = Agent( + agent_name="Runnable-Registry-Agent", + agent_description="Agent for running tasks", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent) + + retrieved_agent = registry.get("Runnable-Registry-Agent") + + assert retrieved_agent is not None, "Retrieved agent should not be None" + + result = retrieved_agent.run("What is 2 + 2? Answer briefly.") + + assert result is not None, "Agent run result should not be None" + assert isinstance(result, str), "Result should be a string" + assert len(result) > 0, "Result should not be empty" + + logger.info("✓ Retrieved agents can run test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_retrieved_agents_can_run: {str(e)}") + raise + + +def test_agent_registry_thread_safety(): + """Test thread safety of registry operations.""" + try: + registry = AgentRegistry() + + agent1 = Agent( + agent_name="Thread-Agent-1", + agent_description="First thread agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + agent2 = Agent( + agent_name="Thread-Agent-2", + agent_description="Second thread agent", + model_name="gpt-4o-mini", + max_loops=1, + verbose=False, + print_on=False, + streaming_on=True, + ) + + registry.add(agent1) + registry.add(agent2) + + agent_names = registry.list_agents() + all_agents = registry.return_all_agents() + + assert agent_names is not None, "Agent names should not be None" + assert all_agents is not None, "All agents should not be None" + assert len(agent_names) == 2, "Should have two agent names" + assert len(all_agents) == 2, "Should have two agents" + + logger.info("✓ Thread safety test passed") + + except Exception as e: + logger.error(f"Error in test_agent_registry_thread_safety: {str(e)}") + raise + + +if __name__ == "__main__": + import sys + + test_dict = { + "test_agent_registry_initialization": test_agent_registry_initialization, + "test_agent_registry_add_single_agent": test_agent_registry_add_single_agent, + "test_agent_registry_add_multiple_agents": test_agent_registry_add_multiple_agents, + "test_agent_registry_get_agent": test_agent_registry_get_agent, + "test_agent_registry_delete_agent": test_agent_registry_delete_agent, + "test_agent_registry_update_agent": test_agent_registry_update_agent, + "test_agent_registry_list_agents": test_agent_registry_list_agents, + "test_agent_registry_return_all_agents": test_agent_registry_return_all_agents, + "test_agent_registry_query_with_condition": test_agent_registry_query_with_condition, + "test_agent_registry_query_without_condition": test_agent_registry_query_without_condition, + "test_agent_registry_find_agent_by_name": test_agent_registry_find_agent_by_name, + "test_agent_registry_find_agent_by_id": test_agent_registry_find_agent_by_id, + "test_agent_registry_agents_to_json": test_agent_registry_agents_to_json, + "test_agent_registry_initialization_with_agents": test_agent_registry_initialization_with_agents, + "test_agent_registry_error_duplicate_agent": test_agent_registry_error_duplicate_agent, + "test_agent_registry_error_nonexistent_agent": test_agent_registry_error_nonexistent_agent, + "test_agent_registry_retrieved_agents_can_run": test_agent_registry_retrieved_agents_can_run, + "test_agent_registry_thread_safety": test_agent_registry_thread_safety, + } + + if len(sys.argv) > 1: + requested_tests = [] + for test_name in sys.argv[1:]: + if test_name in test_dict: + requested_tests.append(test_dict[test_name]) + elif test_name == "all" or test_name == "--all": + requested_tests = list(test_dict.values()) + break + else: + print(f"⚠ Warning: Test '{test_name}' not found.") + print(f"Available tests: {', '.join(test_dict.keys())}") + sys.exit(1) + + tests_to_run = requested_tests + else: + tests_to_run = list(test_dict.values()) + + if len(tests_to_run) == 1: + print(f"Running: {tests_to_run[0].__name__}") + else: + print(f"Running {len(tests_to_run)} test(s)...") + + passed = 0 + failed = 0 + + for test_func in tests_to_run: + try: + print(f"\n{'='*60}") + print(f"Running: {test_func.__name__}") + print(f"{'='*60}") + test_func() + print(f"✓ PASSED: {test_func.__name__}") + passed += 1 + except Exception as e: + print(f"✗ FAILED: {test_func.__name__}") + print(f" Error: {str(e)}") + import traceback + traceback.print_exc() + failed += 1 + + print(f"\n{'='*60}") + print(f"Test Summary: {passed} passed, {failed} failed") + print(f"{'='*60}") + + if len(sys.argv) == 1: + print("\n💡 Tip: Run a specific test with:") + print(" python test_agent_registry.py test_agent_registry_initialization") + print("\n Or use pytest:") + print(" pytest test_agent_registry.py") + print(" pytest test_agent_registry.py::test_agent_registry_initialization") + diff --git a/tests/structs/test_base_structure.py b/tests/structs/test_base_structure.py new file mode 100644 index 00000000..8a7ce1bf --- /dev/null +++ b/tests/structs/test_base_structure.py @@ -0,0 +1,810 @@ +import os +import tempfile +import asyncio +import json + +try: + import pytest +except ImportError: + pytest = None + +from loguru import logger + +try: + from swarms.structs.base_structure import BaseStructure +except (ImportError, ModuleNotFoundError) as e: + import importlib.util + _current_dir = os.path.dirname(os.path.abspath(__file__)) + + base_structure_path = os.path.join(_current_dir, "..", "..", "swarms", "structs", "base_structure.py") + + if os.path.exists(base_structure_path): + spec = importlib.util.spec_from_file_location("base_structure", base_structure_path) + base_structure_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(base_structure_module) + BaseStructure = base_structure_module.BaseStructure + else: + raise ImportError(f"Could not find base_structure.py at {base_structure_path}") from e + +logger.remove() +logger.add(lambda msg: None, level="ERROR") + + +class TestStructure(BaseStructure): + def run(self, task: str = "test"): + return f"Processed: {task}" + + +def test_base_structure_initialization(): + """Test BaseStructure initialization.""" + try: + structure = BaseStructure() + assert structure is not None, "BaseStructure should not be None" + assert structure.name is None, "Default name should be None" + assert structure.description is None, "Default description should be None" + assert structure.save_metadata_on is True, "save_metadata_on should default to True" + assert structure.save_artifact_path == "./artifacts", "Default artifact path should be set" + assert structure.save_metadata_path == "./metadata", "Default metadata path should be set" + assert structure.save_error_path == "./errors", "Default error path should be set" + assert structure.workspace_dir == "./workspace", "Default workspace dir should be set" + + structure2 = BaseStructure( + name="TestStructure", + description="Test description", + save_metadata_on=False, + save_artifact_path="/tmp/artifacts", + save_metadata_path="/tmp/metadata", + save_error_path="/tmp/errors", + workspace_dir="/tmp/workspace" + ) + assert structure2.name == "TestStructure", "Custom name should be set" + assert structure2.description == "Test description", "Custom description should be set" + assert structure2.save_metadata_on is False, "save_metadata_on should be False" + assert structure2.save_artifact_path == "/tmp/artifacts", "Custom artifact path should be set" + + logger.info("✓ BaseStructure initialization test passed") + + except Exception as e: + logger.error(f"Error in test_base_structure_initialization: {str(e)}") + raise + + +def test_save_and_load_file(): + """Test saving and loading files.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure(name="TestFileOps") + test_file = os.path.join(tmpdir, "test_data.json") + test_data = {"key": "value", "number": 42, "list": [1, 2, 3]} + + structure.save_to_file(test_data, test_file) + + assert os.path.exists(test_file), "File should be created" + + loaded_data = structure.load_from_file(test_file) + + assert loaded_data is not None, "Loaded data should not be None" + assert isinstance(loaded_data, dict), "Loaded data should be a dict" + assert loaded_data["key"] == "value", "Data should match" + assert loaded_data["number"] == 42, "Number should match" + assert loaded_data["list"] == [1, 2, 3], "List should match" + + logger.info("✓ Save and load file test passed") + + except Exception as e: + logger.error(f"Error in test_save_and_load_file: {str(e)}") + raise + + +def test_save_and_load_metadata(): + """Test saving and loading metadata.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestMetadata", + save_metadata_path=tmpdir + ) + metadata = {"timestamp": "2024-01-01", "status": "active", "count": 5} + + structure.save_metadata(metadata) + + metadata_file = os.path.join(tmpdir, "TestMetadata_metadata.json") + assert os.path.exists(metadata_file), "Metadata file should be created" + + loaded_metadata = structure.load_metadata() + + assert loaded_metadata is not None, "Loaded metadata should not be None" + assert isinstance(loaded_metadata, dict), "Metadata should be a dict" + assert loaded_metadata["status"] == "active", "Metadata should match" + assert loaded_metadata["count"] == 5, "Count should match" + + logger.info("✓ Save and load metadata test passed") + + except Exception as e: + logger.error(f"Error in test_save_and_load_metadata: {str(e)}") + raise + + +def test_save_and_load_artifact(): + """Test saving and loading artifacts.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestArtifact", + save_artifact_path=tmpdir + ) + artifact = {"result": "success", "data": [1, 2, 3, 4, 5]} + + structure.save_artifact(artifact, "test_artifact") + + artifact_file = os.path.join(tmpdir, "test_artifact.json") + assert os.path.exists(artifact_file), "Artifact file should be created" + + loaded_artifact = structure.load_artifact("test_artifact") + + assert loaded_artifact is not None, "Loaded artifact should not be None" + assert isinstance(loaded_artifact, dict), "Artifact should be a dict" + assert loaded_artifact["result"] == "success", "Artifact result should match" + assert len(loaded_artifact["data"]) == 5, "Artifact data should match" + + logger.info("✓ Save and load artifact test passed") + + except Exception as e: + logger.error(f"Error in test_save_and_load_artifact: {str(e)}") + raise + + +def test_log_error(): + """Test error logging.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestErrorLog", + save_error_path=tmpdir + ) + error_message = "Test error message" + + structure.log_error(error_message) + + error_file = os.path.join(tmpdir, "TestErrorLog_errors.log") + assert os.path.exists(error_file), "Error log file should be created" + + with open(error_file, "r") as f: + content = f.read() + assert error_message in content, "Error message should be in log" + + structure.log_error("Another error") + + with open(error_file, "r") as f: + content = f.read() + assert "Another error" in content, "Second error should be in log" + + logger.info("✓ Log error test passed") + + except Exception as e: + logger.error(f"Error in test_log_error: {str(e)}") + raise + + +def test_log_event(): + """Test event logging.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestEventLog", + save_metadata_path=tmpdir + ) + event_message = "Test event occurred" + + structure.log_event(event_message, "INFO") + + event_file = os.path.join(tmpdir, "TestEventLog_events.log") + assert os.path.exists(event_file), "Event log file should be created" + + with open(event_file, "r") as f: + content = f.read() + assert event_message in content, "Event message should be in log" + assert "INFO" in content, "Event type should be in log" + + structure.log_event("Warning event", "WARNING") + + with open(event_file, "r") as f: + content = f.read() + assert "WARNING" in content, "Warning type should be in log" + + logger.info("✓ Log event test passed") + + except Exception as e: + logger.error(f"Error in test_log_event: {str(e)}") + raise + + +def test_compress_and_decompress_data(): + """Test data compression and decompression.""" + try: + structure = BaseStructure() + test_data = {"key": "value", "large_data": "x" * 1000} + + compressed = structure.compress_data(test_data) + + assert compressed is not None, "Compressed data should not be None" + assert isinstance(compressed, bytes), "Compressed data should be bytes" + assert len(compressed) < len(json.dumps(test_data).encode()), "Compressed should be smaller" + + decompressed = structure.decompres_data(compressed) + + assert decompressed is not None, "Decompressed data should not be None" + assert isinstance(decompressed, dict), "Decompressed data should be a dict" + assert decompressed["key"] == "value", "Decompressed data should match" + assert len(decompressed["large_data"]) == 1000, "Large data should match" + + logger.info("✓ Compress and decompress data test passed") + + except Exception as e: + logger.error(f"Error in test_compress_and_decompress_data: {str(e)}") + raise + + +def test_to_dict(): + """Test converting structure to dictionary.""" + try: + structure = BaseStructure( + name="TestDict", + description="Test description" + ) + + structure_dict = structure.to_dict() + + assert structure_dict is not None, "Dictionary should not be None" + assert isinstance(structure_dict, dict), "Should return a dict" + assert structure_dict["name"] == "TestDict", "Name should be in dict" + assert structure_dict["description"] == "Test description", "Description should be in dict" + + logger.info("✓ To dict test passed") + + except Exception as e: + logger.error(f"Error in test_to_dict: {str(e)}") + raise + + +def test_to_json(): + """Test converting structure to JSON.""" + try: + structure = BaseStructure( + name="TestJSON", + description="Test JSON description" + ) + + json_output = structure.to_json() + + assert json_output is not None, "JSON output should not be None" + assert isinstance(json_output, str), "Should return a string" + assert "TestJSON" in json_output, "Name should be in JSON" + assert "Test JSON description" in json_output, "Description should be in JSON" + + parsed = json.loads(json_output) + assert isinstance(parsed, dict), "Should be valid JSON dict" + + logger.info("✓ To JSON test passed") + + except Exception as e: + logger.error(f"Error in test_to_json: {str(e)}") + raise + + +def test_to_yaml(): + """Test converting structure to YAML.""" + try: + structure = BaseStructure( + name="TestYAML", + description="Test YAML description" + ) + + yaml_output = structure.to_yaml() + + assert yaml_output is not None, "YAML output should not be None" + assert isinstance(yaml_output, str), "Should return a string" + assert "TestYAML" in yaml_output, "Name should be in YAML" + + logger.info("✓ To YAML test passed") + + except Exception as e: + logger.error(f"Error in test_to_yaml: {str(e)}") + raise + + +def test_to_toml(): + """Test converting structure to TOML.""" + try: + structure = BaseStructure( + name="TestTOML", + description="Test TOML description" + ) + + toml_output = structure.to_toml() + + assert toml_output is not None, "TOML output should not be None" + assert isinstance(toml_output, str), "Should return a string" + + logger.info("✓ To TOML test passed") + + except Exception as e: + logger.error(f"Error in test_to_toml: {str(e)}") + raise + + +def test_run_async(): + """Test async run method.""" + try: + structure = TestStructure(name="TestAsync") + + async def run_test(): + result = await structure.run_async("test_task") + return result + + result = asyncio.run(run_test()) + + assert result is not None, "Async result should not be None" + + logger.info("✓ Run async test passed") + + except Exception as e: + logger.error(f"Error in test_run_async: {str(e)}") + raise + + +def test_save_metadata_async(): + """Test async save metadata.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestAsyncMetadata", + save_metadata_path=tmpdir + ) + metadata = {"async": "test", "value": 123} + + async def save_test(): + await structure.save_metadata_async(metadata) + + asyncio.run(save_test()) + + loaded = structure.load_metadata() + + assert loaded is not None, "Loaded metadata should not be None" + assert loaded["async"] == "test", "Metadata should match" + + logger.info("✓ Save metadata async test passed") + + except Exception as e: + logger.error(f"Error in test_save_metadata_async: {str(e)}") + raise + + +def test_load_metadata_async(): + """Test async load metadata.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestAsyncLoad", + save_metadata_path=tmpdir + ) + metadata = {"load": "async", "number": 456} + structure.save_metadata(metadata) + + async def load_test(): + return await structure.load_metadata_async() + + loaded = asyncio.run(load_test()) + + assert loaded is not None, "Loaded metadata should not be None" + assert loaded["load"] == "async", "Metadata should match" + + logger.info("✓ Load metadata async test passed") + + except Exception as e: + logger.error(f"Error in test_load_metadata_async: {str(e)}") + raise + + +def test_save_artifact_async(): + """Test async save artifact.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestAsyncArtifact", + save_artifact_path=tmpdir + ) + artifact = {"async_artifact": True, "data": [1, 2, 3]} + + async def save_test(): + await structure.save_artifact_async(artifact, "async_artifact") + + asyncio.run(save_test()) + + loaded = structure.load_artifact("async_artifact") + + assert loaded is not None, "Loaded artifact should not be None" + assert loaded["async_artifact"] is True, "Artifact should match" + + logger.info("✓ Save artifact async test passed") + + except Exception as e: + logger.error(f"Error in test_save_artifact_async: {str(e)}") + raise + + +def test_load_artifact_async(): + """Test async load artifact.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestAsyncLoadArtifact", + save_artifact_path=tmpdir + ) + artifact = {"load_async": True, "items": ["a", "b", "c"]} + structure.save_artifact(artifact, "load_async_artifact") + + async def load_test(): + return await structure.load_artifact_async("load_async_artifact") + + loaded = asyncio.run(load_test()) + + assert loaded is not None, "Loaded artifact should not be None" + assert loaded["load_async"] is True, "Artifact should match" + + logger.info("✓ Load artifact async test passed") + + except Exception as e: + logger.error(f"Error in test_load_artifact_async: {str(e)}") + raise + + +def test_asave_and_aload_from_file(): + """Test async save and load from file.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure() + test_file = os.path.join(tmpdir, "async_test.json") + test_data = {"async": "file", "test": True} + + async def save_and_load(): + await structure.asave_to_file(test_data, test_file) + return await structure.aload_from_file(test_file) + + loaded = asyncio.run(save_and_load()) + + assert loaded is not None, "Loaded data should not be None" + assert loaded["async"] == "file", "Data should match" + assert loaded["test"] is True, "Boolean should match" + + logger.info("✓ Async save and load from file test passed") + + except Exception as e: + logger.error(f"Error in test_asave_and_aload_from_file: {str(e)}") + raise + + +def test_run_in_thread(): + """Test running in thread.""" + try: + structure = TestStructure(name="TestThread") + + future = structure.run_in_thread("thread_task") + result = future.result() + + assert result is not None, "Thread result should not be None" + + logger.info("✓ Run in thread test passed") + + except Exception as e: + logger.error(f"Error in test_run_in_thread: {str(e)}") + raise + + +def test_save_metadata_in_thread(): + """Test saving metadata in thread.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestThreadMetadata", + save_metadata_path=tmpdir + ) + metadata = {"thread": "test", "value": 789} + + future = structure.save_metadata_in_thread(metadata) + future.result() + + loaded = structure.load_metadata() + + assert loaded is not None, "Loaded metadata should not be None" + assert loaded["thread"] == "test", "Metadata should match" + + logger.info("✓ Save metadata in thread test passed") + + except Exception as e: + logger.error(f"Error in test_save_metadata_in_thread: {str(e)}") + raise + + +def test_run_batched(): + """Test batched execution.""" + try: + structure = TestStructure(name="TestBatched") + batched_data = ["task1", "task2", "task3", "task4", "task5"] + + results = structure.run_batched(batched_data, batch_size=3) + + assert results is not None, "Results should not be None" + assert isinstance(results, list), "Results should be a list" + assert len(results) == 5, "Should have 5 results" + + for result in results: + assert result is not None, "Each result should not be None" + assert "Processed:" in result, "Result should contain processed message" + + logger.info("✓ Run batched test passed") + + except Exception as e: + logger.error(f"Error in test_run_batched: {str(e)}") + raise + + +def test_load_config(): + """Test loading configuration.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure() + config_file = os.path.join(tmpdir, "config.json") + config_data = {"setting1": "value1", "setting2": 42} + + structure.save_to_file(config_data, config_file) + + loaded_config = structure.load_config(config_file) + + assert loaded_config is not None, "Loaded config should not be None" + assert isinstance(loaded_config, dict), "Config should be a dict" + assert loaded_config["setting1"] == "value1", "Config should match" + assert loaded_config["setting2"] == 42, "Config number should match" + + logger.info("✓ Load config test passed") + + except Exception as e: + logger.error(f"Error in test_load_config: {str(e)}") + raise + + +def test_backup_data(): + """Test backing up data.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure() + backup_path = os.path.join(tmpdir, "backups") + os.makedirs(backup_path, exist_ok=True) + + backup_data = {"backup": "test", "items": [1, 2, 3]} + + structure.backup_data(backup_data, backup_path) + + backup_files = os.listdir(backup_path) + assert len(backup_files) > 0, "Backup file should be created" + + backup_file = os.path.join(backup_path, backup_files[0]) + loaded_backup = structure.load_from_file(backup_file) + + assert loaded_backup is not None, "Loaded backup should not be None" + assert loaded_backup["backup"] == "test", "Backup data should match" + + logger.info("✓ Backup data test passed") + + except Exception as e: + logger.error(f"Error in test_backup_data: {str(e)}") + raise + + +def test_monitor_resources(): + """Test resource monitoring.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = BaseStructure( + name="TestResources", + save_metadata_path=tmpdir + ) + + structure.monitor_resources() + + event_file = os.path.join(tmpdir, "TestResources_events.log") + assert os.path.exists(event_file), "Event log should be created" + + with open(event_file, "r") as f: + content = f.read() + assert "Resource usage" in content, "Resource usage should be logged" + assert "Memory" in content, "Memory should be logged" + assert "CPU" in content, "CPU should be logged" + + logger.info("✓ Monitor resources test passed") + + except Exception as e: + logger.error(f"Error in test_monitor_resources: {str(e)}") + raise + + +def test_run_with_resources(): + """Test running with resource monitoring.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = TestStructure( + name="TestRunResources", + save_metadata_path=tmpdir + ) + + result = structure.run_with_resources("monitored_task") + + assert result is not None, "Result should not be None" + + event_file = os.path.join(tmpdir, "TestRunResources_events.log") + assert os.path.exists(event_file), "Event log should be created" + + logger.info("✓ Run with resources test passed") + + except Exception as e: + logger.error(f"Error in test_run_with_resources: {str(e)}") + raise + + +def test_run_with_resources_batched(): + """Test batched execution with resource monitoring.""" + try: + with tempfile.TemporaryDirectory() as tmpdir: + structure = TestStructure( + name="TestBatchedResources", + save_metadata_path=tmpdir + ) + batched_data = ["task1", "task2", "task3"] + + results = structure.run_with_resources_batched(batched_data, batch_size=2) + + assert results is not None, "Results should not be None" + assert isinstance(results, list), "Results should be a list" + assert len(results) == 3, "Should have 3 results" + + event_file = os.path.join(tmpdir, "TestBatchedResources_events.log") + assert os.path.exists(event_file), "Event log should be created" + + logger.info("✓ Run with resources batched test passed") + + except Exception as e: + logger.error(f"Error in test_run_with_resources_batched: {str(e)}") + raise + + +def test_serialize_callable(): + """Test serializing callable attributes.""" + try: + def test_function(): + """Test function docstring.""" + pass + + structure = BaseStructure() + serialized = structure._serialize_callable(test_function) + + assert serialized is not None, "Serialized callable should not be None" + assert isinstance(serialized, dict), "Should return a dict" + assert "name" in serialized, "Should have name" + assert "doc" in serialized, "Should have doc" + assert serialized["name"] == "test_function", "Name should match" + + logger.info("✓ Serialize callable test passed") + + except Exception as e: + logger.error(f"Error in test_serialize_callable: {str(e)}") + raise + + +def test_serialize_attr(): + """Test serializing attributes.""" + try: + structure = BaseStructure() + + serialized_str = structure._serialize_attr("test_attr", "test_value") + assert serialized_str == "test_value", "String should serialize correctly" + + serialized_dict = structure._serialize_attr("test_attr", {"key": "value"}) + assert serialized_dict == {"key": "value"}, "Dict should serialize correctly" + + def test_func(): + pass + + serialized_func = structure._serialize_attr("test_func", test_func) + assert isinstance(serialized_func, dict), "Function should serialize to dict" + + logger.info("✓ Serialize attr test passed") + + except Exception as e: + logger.error(f"Error in test_serialize_attr: {str(e)}") + raise + + +if __name__ == "__main__": + import sys + + test_dict = { + "test_base_structure_initialization": test_base_structure_initialization, + "test_save_and_load_file": test_save_and_load_file, + "test_save_and_load_metadata": test_save_and_load_metadata, + "test_save_and_load_artifact": test_save_and_load_artifact, + "test_log_error": test_log_error, + "test_log_event": test_log_event, + "test_compress_and_decompress_data": test_compress_and_decompress_data, + "test_to_dict": test_to_dict, + "test_to_json": test_to_json, + "test_to_yaml": test_to_yaml, + "test_to_toml": test_to_toml, + "test_run_async": test_run_async, + "test_save_metadata_async": test_save_metadata_async, + "test_load_metadata_async": test_load_metadata_async, + "test_save_artifact_async": test_save_artifact_async, + "test_load_artifact_async": test_load_artifact_async, + "test_asave_and_aload_from_file": test_asave_and_aload_from_file, + "test_run_in_thread": test_run_in_thread, + "test_save_metadata_in_thread": test_save_metadata_in_thread, + "test_run_batched": test_run_batched, + "test_load_config": test_load_config, + "test_backup_data": test_backup_data, + "test_monitor_resources": test_monitor_resources, + "test_run_with_resources": test_run_with_resources, + "test_run_with_resources_batched": test_run_with_resources_batched, + "test_serialize_callable": test_serialize_callable, + "test_serialize_attr": test_serialize_attr, + } + + if len(sys.argv) > 1: + requested_tests = [] + for test_name in sys.argv[1:]: + if test_name in test_dict: + requested_tests.append(test_dict[test_name]) + elif test_name == "all" or test_name == "--all": + requested_tests = list(test_dict.values()) + break + else: + print(f"⚠ Warning: Test '{test_name}' not found.") + print(f"Available tests: {', '.join(test_dict.keys())}") + sys.exit(1) + + tests_to_run = requested_tests + else: + tests_to_run = list(test_dict.values()) + + if len(tests_to_run) == 1: + print(f"Running: {tests_to_run[0].__name__}") + else: + print(f"Running {len(tests_to_run)} test(s)...") + + passed = 0 + failed = 0 + + for test_func in tests_to_run: + try: + print(f"\n{'='*60}") + print(f"Running: {test_func.__name__}") + print(f"{'='*60}") + test_func() + print(f"✓ PASSED: {test_func.__name__}") + passed += 1 + except Exception as e: + print(f"✗ FAILED: {test_func.__name__}") + print(f" Error: {str(e)}") + import traceback + traceback.print_exc() + failed += 1 + + print(f"\n{'='*60}") + print(f"Test Summary: {passed} passed, {failed} failed") + print(f"{'='*60}") + + if len(sys.argv) == 1: + print("\n💡 Tip: Run a specific test with:") + print(" python test_base_structure.py test_base_structure_initialization") + print("\n Or use pytest:") + print(" pytest test_base_structure.py") + print(" pytest test_base_structure.py::test_base_structure_initialization") +