import json import os from unittest.mock import MagicMock, patch import pytest from dotenv import load_dotenv from swarms.models import OpenAIChat from swarms.structs.flow import Flow, stop_when_repeats load_dotenv() openai_api_key = os.getenv("OPENAI_API_KEY") # Mocks and Fixtures @pytest.fixture def mocked_llm(): return OpenAIChat( openai_api_key=openai_api_key, ) @pytest.fixture def basic_flow(mocked_llm): return Flow(llm=mocked_llm, max_loops=5) @pytest.fixture def flow_with_condition(mocked_llm): return Flow(llm=mocked_llm, max_loops=5, stopping_condition=stop_when_repeats) # Basic Tests def test_stop_when_repeats(): assert stop_when_repeats("Please Stop now") assert not stop_when_repeats("Continue the process") def test_flow_initialization(basic_flow): assert basic_flow.max_loops == 5 assert basic_flow.stopping_condition is None assert basic_flow.loop_interval == 1 assert basic_flow.retry_attempts == 3 assert basic_flow.retry_interval == 1 assert basic_flow.feedback == [] assert basic_flow.memory == [] assert basic_flow.task is None assert basic_flow.stopping_token == "" assert not basic_flow.interactive def test_provide_feedback(basic_flow): feedback = "Test feedback" basic_flow.provide_feedback(feedback) assert feedback in basic_flow.feedback @patch("time.sleep", return_value=None) # to speed up tests def test_run_without_stopping_condition(mocked_sleep, basic_flow): response = basic_flow.run("Test task") assert response == "Test task" # since our mocked llm doesn't modify the response @patch("time.sleep", return_value=None) # to speed up tests def test_run_with_stopping_condition(mocked_sleep, flow_with_condition): response = flow_with_condition.run("Stop") assert response == "Stop" @patch("time.sleep", return_value=None) # to speed up tests def test_run_with_exception(mocked_sleep, basic_flow): basic_flow.llm.side_effect = Exception("Test Exception") with pytest.raises(Exception, match="Test Exception"): basic_flow.run("Test task") def test_bulk_run(basic_flow): inputs = [{"task": "Test1"}, {"task": "Test2"}] responses = basic_flow.bulk_run(inputs) assert responses == ["Test1", "Test2"] # Tests involving file IO def test_save_and_load(basic_flow, tmp_path): file_path = tmp_path / "memory.json" basic_flow.memory.append(["Test1", "Test2"]) basic_flow.save(file_path) new_flow = Flow(llm=mocked_llm, max_loops=5) new_flow.load(file_path) assert new_flow.memory == [["Test1", "Test2"]] # Environment variable mock test def test_env_variable_handling(monkeypatch): monkeypatch.setenv("API_KEY", "test_key") assert os.getenv("API_KEY") == "test_key" # TODO: Add more tests, especially edge cases and exception cases. Implement parametrized tests for varied inputs. # Test initializing the flow with different stopping conditions def test_flow_with_custom_stopping_condition(mocked_llm): def stopping_condition(x): return "terminate" in x.lower() flow = Flow(llm=mocked_llm, max_loops=5, stopping_condition=stopping_condition) assert flow.stopping_condition("Please terminate now") assert not flow.stopping_condition("Continue the process") # Test calling the flow directly def test_flow_call(basic_flow): response = basic_flow("Test call") assert response == "Test call" # Test formatting the prompt def test_format_prompt(basic_flow): formatted_prompt = basic_flow.format_prompt("Hello {name}", name="John") assert formatted_prompt == "Hello John" # Test with max loops @patch("time.sleep", return_value=None) def test_max_loops(mocked_sleep, basic_flow): basic_flow.max_loops = 3 response = basic_flow.run("Looping") assert response == "Looping" # Test stopping token @patch("time.sleep", return_value=None) def test_stopping_token(mocked_sleep, basic_flow): basic_flow.stopping_token = "Terminate" response = basic_flow.run("Loop until Terminate") assert response == "Loop until Terminate" # Test interactive mode def test_interactive_mode(basic_flow): basic_flow.interactive = True assert basic_flow.interactive # Test bulk run with varied inputs def test_bulk_run_varied_inputs(basic_flow): inputs = [{"task": "Test1"}, {"task": "Test2"}, {"task": "Stop now"}] responses = basic_flow.bulk_run(inputs) assert responses == ["Test1", "Test2", "Stop now"] # Test loading non-existent file def test_load_non_existent_file(basic_flow, tmp_path): file_path = tmp_path / "non_existent.json" with pytest.raises(FileNotFoundError): basic_flow.load(file_path) # Test saving with different memory data def test_save_different_memory(basic_flow, tmp_path): file_path = tmp_path / "memory.json" basic_flow.memory.append(["Task1", "Task2", "Task3"]) basic_flow.save(file_path) with open(file_path, "r") as f: data = json.load(f) assert data == [["Task1", "Task2", "Task3"]] # Test the stopping condition check def test_check_stopping_condition(flow_with_condition): assert flow_with_condition._check_stopping_condition("Stop this process") assert not flow_with_condition._check_stopping_condition("Continue the task") # Test without providing max loops (default value should be 5) def test_default_max_loops(mocked_llm): flow = Flow(llm=mocked_llm) assert flow.max_loops == 5 # Test creating flow from llm and template def test_from_llm_and_template(mocked_llm): flow = Flow.from_llm_and_template(mocked_llm, "Test template") assert isinstance(flow, Flow) # Mocking the OpenAIChat for testing @patch("swarms.models.OpenAIChat", autospec=True) def test_mocked_openai_chat(MockedOpenAIChat): llm = MockedOpenAIChat(openai_api_key=openai_api_key) llm.return_value = MagicMock() flow = Flow(llm=llm, max_loops=5) flow.run("Mocked run") assert MockedOpenAIChat.called # Test retry attempts @patch("time.sleep", return_value=None) def test_retry_attempts(mocked_sleep, basic_flow): basic_flow.retry_attempts = 2 basic_flow.llm.side_effect = [Exception("Test Exception"), "Valid response"] response = basic_flow.run("Test retry") assert response == "Valid response" # Test different loop intervals @patch("time.sleep", return_value=None) def test_different_loop_intervals(mocked_sleep, basic_flow): basic_flow.loop_interval = 2 response = basic_flow.run("Test loop interval") assert response == "Test loop interval" # Test different retry intervals @patch("time.sleep", return_value=None) def test_different_retry_intervals(mocked_sleep, basic_flow): basic_flow.retry_interval = 2 response = basic_flow.run("Test retry interval") assert response == "Test retry interval" # Test invoking the flow with additional kwargs @patch("time.sleep", return_value=None) def test_flow_call_with_kwargs(mocked_sleep, basic_flow): response = basic_flow("Test call", param1="value1", param2="value2") assert response == "Test call" # Test initializing the flow with all parameters def test_flow_initialization_all_params(mocked_llm): flow = Flow( llm=mocked_llm, max_loops=10, stopping_condition=stop_when_repeats, loop_interval=2, retry_attempts=4, retry_interval=2, interactive=True, param1="value1", param2="value2", ) assert flow.max_loops == 10 assert flow.loop_interval == 2 assert flow.retry_attempts == 4 assert flow.retry_interval == 2 assert flow.interactive # Test the stopping token is in the response @patch("time.sleep", return_value=None) def test_stopping_token_in_response(mocked_sleep, basic_flow): response = basic_flow.run("Test stopping token") assert basic_flow.stopping_token in response