commit
c5c769306f
@ -1,287 +0,0 @@
|
|||||||
import os
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from swarms.structs.base_structure import BaseStructure
|
|
||||||
|
|
||||||
|
|
||||||
class TestBaseStructure:
|
|
||||||
def test_init(self):
|
|
||||||
base_structure = BaseStructure(
|
|
||||||
name="TestStructure",
|
|
||||||
description="Test description",
|
|
||||||
save_metadata=True,
|
|
||||||
save_artifact_path="./test_artifacts",
|
|
||||||
save_metadata_path="./test_metadata",
|
|
||||||
save_error_path="./test_errors",
|
|
||||||
)
|
|
||||||
|
|
||||||
assert base_structure.name == "TestStructure"
|
|
||||||
assert base_structure.description == "Test description"
|
|
||||||
assert base_structure.save_metadata is True
|
|
||||||
assert base_structure.save_artifact_path == "./test_artifacts"
|
|
||||||
assert base_structure.save_metadata_path == "./test_metadata"
|
|
||||||
assert base_structure.save_error_path == "./test_errors"
|
|
||||||
|
|
||||||
def test_save_to_file_and_load_from_file(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
file_path = os.path.join(tmp_dir, "test_file.json")
|
|
||||||
|
|
||||||
data_to_save = {"key": "value"}
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
|
|
||||||
base_structure.save_to_file(data_to_save, file_path)
|
|
||||||
loaded_data = base_structure.load_from_file(file_path)
|
|
||||||
|
|
||||||
assert loaded_data == data_to_save
|
|
||||||
|
|
||||||
def test_save_metadata_and_load_metadata(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure(save_metadata_path=tmp_dir)
|
|
||||||
|
|
||||||
metadata = {"name": "Test", "description": "Test metadata"}
|
|
||||||
base_structure.save_metadata(metadata)
|
|
||||||
loaded_metadata = base_structure.load_metadata()
|
|
||||||
|
|
||||||
assert loaded_metadata == metadata
|
|
||||||
|
|
||||||
def test_log_error(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure(save_error_path=tmp_dir)
|
|
||||||
|
|
||||||
error_message = "Test error message"
|
|
||||||
base_structure.log_error(error_message)
|
|
||||||
|
|
||||||
log_file = os.path.join(tmp_dir, "TestStructure_errors.log")
|
|
||||||
with open(log_file) as file:
|
|
||||||
lines = file.readlines()
|
|
||||||
assert len(lines) == 1
|
|
||||||
assert lines[0] == f"{error_message}\n"
|
|
||||||
|
|
||||||
def test_save_artifact_and_load_artifact(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure(save_artifact_path=tmp_dir)
|
|
||||||
|
|
||||||
artifact = {"key": "value"}
|
|
||||||
artifact_name = "test_artifact"
|
|
||||||
base_structure.save_artifact(artifact, artifact_name)
|
|
||||||
loaded_artifact = base_structure.load_artifact(artifact_name)
|
|
||||||
|
|
||||||
assert loaded_artifact == artifact
|
|
||||||
|
|
||||||
def test_current_timestamp(self):
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
timestamp = base_structure._current_timestamp()
|
|
||||||
assert timestamp == current_time
|
|
||||||
|
|
||||||
def test_log_event(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure(save_metadata_path=tmp_dir)
|
|
||||||
|
|
||||||
event = "Test event"
|
|
||||||
event_type = "INFO"
|
|
||||||
base_structure.log_event(event, event_type)
|
|
||||||
|
|
||||||
log_file = os.path.join(tmp_dir, "TestStructure_events.log")
|
|
||||||
with open(log_file) as file:
|
|
||||||
lines = file.readlines()
|
|
||||||
assert len(lines) == 1
|
|
||||||
assert (
|
|
||||||
lines[0] == f"[{base_structure._current_timestamp()}]"
|
|
||||||
f" [{event_type}] {event}\n"
|
|
||||||
)
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_run_async(self):
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
|
|
||||||
async def async_function():
|
|
||||||
return "Async Test Result"
|
|
||||||
|
|
||||||
result = await base_structure.run_async(async_function)
|
|
||||||
assert result == "Async Test Result"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_save_metadata_async(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure(save_metadata_path=tmp_dir)
|
|
||||||
|
|
||||||
metadata = {"name": "Test", "description": "Test metadata"}
|
|
||||||
await base_structure.save_metadata_async(metadata)
|
|
||||||
loaded_metadata = base_structure.load_metadata()
|
|
||||||
|
|
||||||
assert loaded_metadata == metadata
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_log_error_async(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure(save_error_path=tmp_dir)
|
|
||||||
|
|
||||||
error_message = "Test error message"
|
|
||||||
await base_structure.log_error_async(error_message)
|
|
||||||
|
|
||||||
log_file = os.path.join(tmp_dir, "TestStructure_errors.log")
|
|
||||||
with open(log_file) as file:
|
|
||||||
lines = file.readlines()
|
|
||||||
assert len(lines) == 1
|
|
||||||
assert lines[0] == f"{error_message}\n"
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_save_artifact_async(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure(save_artifact_path=tmp_dir)
|
|
||||||
|
|
||||||
artifact = {"key": "value"}
|
|
||||||
artifact_name = "test_artifact"
|
|
||||||
await base_structure.save_artifact_async(
|
|
||||||
artifact, artifact_name
|
|
||||||
)
|
|
||||||
loaded_artifact = base_structure.load_artifact(artifact_name)
|
|
||||||
|
|
||||||
assert loaded_artifact == artifact
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_load_artifact_async(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure(save_artifact_path=tmp_dir)
|
|
||||||
|
|
||||||
artifact = {"key": "value"}
|
|
||||||
artifact_name = "test_artifact"
|
|
||||||
base_structure.save_artifact(artifact, artifact_name)
|
|
||||||
loaded_artifact = await base_structure.load_artifact_async(
|
|
||||||
artifact_name
|
|
||||||
)
|
|
||||||
|
|
||||||
assert loaded_artifact == artifact
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_log_event_async(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure(save_metadata_path=tmp_dir)
|
|
||||||
|
|
||||||
event = "Test event"
|
|
||||||
event_type = "INFO"
|
|
||||||
await base_structure.log_event_async(event, event_type)
|
|
||||||
|
|
||||||
log_file = os.path.join(tmp_dir, "TestStructure_events.log")
|
|
||||||
with open(log_file) as file:
|
|
||||||
lines = file.readlines()
|
|
||||||
assert len(lines) == 1
|
|
||||||
assert (
|
|
||||||
lines[0] == f"[{base_structure._current_timestamp()}]"
|
|
||||||
f" [{event_type}] {event}\n"
|
|
||||||
)
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_asave_to_file(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
file_path = os.path.join(tmp_dir, "test_file.json")
|
|
||||||
data_to_save = {"key": "value"}
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
|
|
||||||
await base_structure.asave_to_file(data_to_save, file_path)
|
|
||||||
loaded_data = base_structure.load_from_file(file_path)
|
|
||||||
|
|
||||||
assert loaded_data == data_to_save
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_aload_from_file(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
file_path = os.path.join(tmp_dir, "test_file.json")
|
|
||||||
data_to_save = {"key": "value"}
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
base_structure.save_to_file(data_to_save, file_path)
|
|
||||||
|
|
||||||
loaded_data = await base_structure.aload_from_file(file_path)
|
|
||||||
assert loaded_data == data_to_save
|
|
||||||
|
|
||||||
def test_run_in_thread(self):
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
result = base_structure.run_in_thread(
|
|
||||||
lambda: "Thread Test Result"
|
|
||||||
)
|
|
||||||
assert result.result() == "Thread Test Result"
|
|
||||||
|
|
||||||
def test_save_and_decompress_data(self):
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
data = {"key": "value"}
|
|
||||||
compressed_data = base_structure.compress_data(data)
|
|
||||||
decompressed_data = base_structure.decompres_data(
|
|
||||||
compressed_data
|
|
||||||
)
|
|
||||||
assert decompressed_data == data
|
|
||||||
|
|
||||||
def test_run_batched(self):
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
|
|
||||||
def run_function(data):
|
|
||||||
return f"Processed {data}"
|
|
||||||
|
|
||||||
batched_data = list(range(10))
|
|
||||||
result = base_structure.run_batched(
|
|
||||||
batched_data, batch_size=5, func=run_function
|
|
||||||
)
|
|
||||||
|
|
||||||
expected_result = [
|
|
||||||
f"Processed {data}" for data in batched_data
|
|
||||||
]
|
|
||||||
assert result == expected_result
|
|
||||||
|
|
||||||
def test_load_config(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
config_file = os.path.join(tmp_dir, "config.json")
|
|
||||||
config_data = {"key": "value"}
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
|
|
||||||
base_structure.save_to_file(config_data, config_file)
|
|
||||||
loaded_config = base_structure.load_config(config_file)
|
|
||||||
|
|
||||||
assert loaded_config == config_data
|
|
||||||
|
|
||||||
def test_backup_data(self, tmpdir):
|
|
||||||
tmp_dir = tmpdir.mkdir("test_dir")
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
data_to_backup = {"key": "value"}
|
|
||||||
base_structure.backup_data(
|
|
||||||
data_to_backup, backup_path=tmp_dir
|
|
||||||
)
|
|
||||||
backup_files = os.listdir(tmp_dir)
|
|
||||||
|
|
||||||
assert len(backup_files) == 1
|
|
||||||
loaded_data = base_structure.load_from_file(
|
|
||||||
os.path.join(tmp_dir, backup_files[0])
|
|
||||||
)
|
|
||||||
assert loaded_data == data_to_backup
|
|
||||||
|
|
||||||
def test_monitor_resources(self):
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
base_structure.monitor_resources()
|
|
||||||
|
|
||||||
def test_run_with_resources(self):
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
|
|
||||||
def run_function():
|
|
||||||
base_structure.monitor_resources()
|
|
||||||
return "Resource Test Result"
|
|
||||||
|
|
||||||
result = base_structure.run_with_resources(run_function)
|
|
||||||
assert result == "Resource Test Result"
|
|
||||||
|
|
||||||
def test_run_with_resources_batched(self):
|
|
||||||
base_structure = BaseStructure()
|
|
||||||
|
|
||||||
def run_function(data):
|
|
||||||
base_structure.monitor_resources()
|
|
||||||
return f"Processed {data}"
|
|
||||||
|
|
||||||
batched_data = list(range(10))
|
|
||||||
result = base_structure.run_with_resources_batched(
|
|
||||||
batched_data, batch_size=5, func=run_function
|
|
||||||
)
|
|
||||||
|
|
||||||
expected_result = [
|
|
||||||
f"Processed {data}" for data in batched_data
|
|
||||||
]
|
|
||||||
assert result == expected_result
|
|
||||||
@ -1,97 +0,0 @@
|
|||||||
from pydantic import BaseModel
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from swarms import (
|
|
||||||
create_yaml_schema_from_dict,
|
|
||||||
YamlModel,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class TestDataClass:
|
|
||||||
name: str
|
|
||||||
age: int
|
|
||||||
is_active: bool
|
|
||||||
|
|
||||||
|
|
||||||
class TestPydanticModel(BaseModel):
|
|
||||||
name: str
|
|
||||||
age: int
|
|
||||||
is_active: bool
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_yaml_schema_from_dict_dataclass():
|
|
||||||
data = {"name": "Alice", "age": 30, "is_active": True}
|
|
||||||
result = create_yaml_schema_from_dict(data, TestDataClass)
|
|
||||||
expected_result = """
|
|
||||||
name:
|
|
||||||
type: str
|
|
||||||
default: None
|
|
||||||
description: No description provided
|
|
||||||
age:
|
|
||||||
type: int
|
|
||||||
default: None
|
|
||||||
description: No description provided
|
|
||||||
is_active:
|
|
||||||
type: bool
|
|
||||||
default: None
|
|
||||||
description: No description provided
|
|
||||||
"""
|
|
||||||
assert result == expected_result
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_yaml_schema_from_dict_pydantic():
|
|
||||||
data = {"name": "Alice", "age": 30, "is_active": True}
|
|
||||||
result = create_yaml_schema_from_dict(data, TestPydanticModel)
|
|
||||||
expected_result = """
|
|
||||||
name:
|
|
||||||
type: str
|
|
||||||
default: None
|
|
||||||
description: No description provided
|
|
||||||
age:
|
|
||||||
type: int
|
|
||||||
default: None
|
|
||||||
description: No description provided
|
|
||||||
is_active:
|
|
||||||
type: bool
|
|
||||||
default: None
|
|
||||||
description: No description provided
|
|
||||||
"""
|
|
||||||
assert result == expected_result
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_yaml_schema_from_dict_regular_class():
|
|
||||||
class TestRegularClass:
|
|
||||||
def __init__(self, name, age, is_active):
|
|
||||||
self.name = name
|
|
||||||
self.age = age
|
|
||||||
self.is_active = is_active
|
|
||||||
|
|
||||||
data = {"name": "Alice", "age": 30, "is_active": True}
|
|
||||||
result = create_yaml_schema_from_dict(data, TestRegularClass)
|
|
||||||
expected_result = """
|
|
||||||
name:
|
|
||||||
type: str
|
|
||||||
description: No description provided
|
|
||||||
age:
|
|
||||||
type: int
|
|
||||||
description: No description provided
|
|
||||||
is_active:
|
|
||||||
type: bool
|
|
||||||
description: No description provided
|
|
||||||
"""
|
|
||||||
assert result == expected_result
|
|
||||||
|
|
||||||
|
|
||||||
class User(YamlModel):
|
|
||||||
name: str
|
|
||||||
age: int
|
|
||||||
is_active: bool
|
|
||||||
|
|
||||||
|
|
||||||
def test_yaml_model():
|
|
||||||
# Create an instance of the User model
|
|
||||||
user = User(name="Alice", age=30, is_active=True)
|
|
||||||
|
|
||||||
assert user.name == "Alice"
|
|
||||||
assert user.age == 30
|
|
||||||
assert user.is_active is True
|
|
||||||
Loading…
Reference in new issue