import asyncio import os from unittest.mock import patch import pytest from swarms.models import OpenAIChat from swarms.structs.agent import Agent from swarms.structs.sequential_workflow import SequentialWorkflow, Task # Mock the OpenAI API key using environment variables os.environ["OPENAI_API_KEY"] = "mocked_api_key" # Mock OpenAIChat class for testing class MockOpenAIChat: def __init__(self, *args, **kwargs): pass def run(self, *args, **kwargs): return "Mocked result" # Mock Agent class for testing class MockAgent: def __init__(self, *args, **kwargs): pass def run(self, *args, **kwargs): return "Mocked result" # Mock SequentialWorkflow class for testing class MockSequentialWorkflow: def __init__(self, *args, **kwargs): pass def add(self, *args, **kwargs): pass def run(self): pass # Test Task class def test_task_initialization(): description = "Sample Task" agent = MockOpenAIChat() task = Task(description=description, agent=agent) assert task.description == description assert task.agent == agent def test_task_execute(): description = "Sample Task" agent = MockOpenAIChat() task = Task(description=description, agent=agent) task.execute() assert task.result == "Mocked result" # Test SequentialWorkflow class def test_sequential_workflow_initialization(): workflow = SequentialWorkflow() assert isinstance(workflow, SequentialWorkflow) assert len(workflow.tasks) == 0 assert workflow.max_loops == 1 assert workflow.autosave is False assert workflow.saved_state_filepath == "sequential_workflow_state.json" assert workflow.restore_state_filepath is None assert workflow.dashboard is False def test_sequential_workflow_add_task(): workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = MockOpenAIChat() workflow.add(task_description, task_flow) assert len(workflow.tasks) == 1 assert workflow.tasks[0].description == task_description assert workflow.tasks[0].agent == task_flow def test_sequential_workflow_reset_workflow(): workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = MockOpenAIChat() workflow.add(task_description, task_flow) workflow.reset_workflow() assert workflow.tasks[0].result is None def test_sequential_workflow_get_task_results(): workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = MockOpenAIChat() workflow.add(task_description, task_flow) workflow.run() results = workflow.get_task_results() assert len(results) == 1 assert task_description in results assert results[task_description] == "Mocked result" def test_sequential_workflow_remove_task(): workflow = SequentialWorkflow() task1_description = "Task 1" task2_description = "Task 2" task1_flow = MockOpenAIChat() task2_flow = MockOpenAIChat() workflow.add(task1_description, task1_flow) workflow.add(task2_description, task2_flow) workflow.remove_task(task1_description) assert len(workflow.tasks) == 1 assert workflow.tasks[0].description == task2_description def test_sequential_workflow_update_task(): workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = MockOpenAIChat() workflow.add(task_description, task_flow) workflow.update_task(task_description, max_tokens=1000) assert workflow.tasks[0].kwargs["max_tokens"] == 1000 def test_sequential_workflow_save_workflow_state(): workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = MockOpenAIChat() workflow.add(task_description, task_flow) workflow.save_workflow_state("test_state.json") assert os.path.exists("test_state.json") os.remove("test_state.json") def test_sequential_workflow_load_workflow_state(): workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = MockOpenAIChat() workflow.add(task_description, task_flow) workflow.save_workflow_state("test_state.json") workflow.load_workflow_state("test_state.json") assert len(workflow.tasks) == 1 assert workflow.tasks[0].description == task_description os.remove("test_state.json") def test_sequential_workflow_run(): workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = MockOpenAIChat() workflow.add(task_description, task_flow) workflow.run() assert workflow.tasks[0].result == "Mocked result" def test_sequential_workflow_workflow_bootup(capfd): workflow = SequentialWorkflow() workflow.workflow_bootup() out, _ = capfd.readouterr() assert "Sequential Workflow Initializing..." in out def test_sequential_workflow_workflow_dashboard(capfd): workflow = SequentialWorkflow() workflow.workflow_dashboard() out, _ = capfd.readouterr() assert "Sequential Workflow Dashboard" in out # Mock Agent class for async testing class MockAsyncAgent: def __init__(self, *args, **kwargs): pass async def arun(self, *args, **kwargs): return "Mocked result" # Test async execution in SequentialWorkflow @pytest.mark.asyncio async def test_sequential_workflow_arun(): workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = MockAsyncAgent() workflow.add(task_description, task_flow) await workflow.arun() assert workflow.tasks[0].result == "Mocked result" def test_real_world_usage_with_openai_key(): # Initialize the language model llm = OpenAIChat() assert isinstance(llm, OpenAIChat) def test_real_world_usage_with_flow_and_openai_key(): # Initialize a agent with the language model agent = Agent(llm=OpenAIChat()) assert isinstance(agent, Agent) def test_real_world_usage_with_sequential_workflow(): # Initialize a sequential workflow workflow = SequentialWorkflow() assert isinstance(workflow, SequentialWorkflow) def test_real_world_usage_add_tasks(): # Create a sequential workflow and add tasks workflow = SequentialWorkflow() task1_description = "Task 1" task2_description = "Task 2" task1_flow = OpenAIChat() task2_flow = OpenAIChat() workflow.add(task1_description, task1_flow) workflow.add(task2_description, task2_flow) assert len(workflow.tasks) == 2 assert workflow.tasks[0].description == task1_description assert workflow.tasks[1].description == task2_description def test_real_world_usage_run_workflow(): # Create a sequential workflow, add a task, and run the workflow workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = OpenAIChat() workflow.add(task_description, task_flow) workflow.run() assert workflow.tasks[0].result is not None def test_real_world_usage_dashboard_display(): # Create a sequential workflow, add tasks, and display the dashboard workflow = SequentialWorkflow() task1_description = "Task 1" task2_description = "Task 2" task1_flow = OpenAIChat() task2_flow = OpenAIChat() workflow.add(task1_description, task1_flow) workflow.add(task2_description, task2_flow) with patch("builtins.print") as mock_print: workflow.workflow_dashboard() mock_print.assert_called() def test_real_world_usage_async_execution(): # Create a sequential workflow, add an async task, and run the workflow asynchronously workflow = SequentialWorkflow() task_description = "Sample Task" async_task_flow = OpenAIChat() async def async_run_workflow(): await workflow.arun() workflow.add(task_description, async_task_flow) asyncio.run(async_run_workflow()) assert workflow.tasks[0].result is not None def test_real_world_usage_multiple_loops(): # Create a sequential workflow with multiple loops, add a task, and run the workflow workflow = SequentialWorkflow(max_loops=3) task_description = "Sample Task" task_flow = OpenAIChat() workflow.add(task_description, task_flow) workflow.run() assert workflow.tasks[0].result is not None def test_real_world_usage_autosave_state(): # Create a sequential workflow with autosave, add a task, run the workflow, and check if state is saved workflow = SequentialWorkflow(autosave=True) task_description = "Sample Task" task_flow = OpenAIChat() workflow.add(task_description, task_flow) workflow.run() assert workflow.tasks[0].result is not None assert os.path.exists("sequential_workflow_state.json") os.remove("sequential_workflow_state.json") def test_real_world_usage_load_state(): # Create a sequential workflow, add a task, save state, load state, and run the workflow workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = OpenAIChat() workflow.add(task_description, task_flow) workflow.run() workflow.save_workflow_state("test_state.json") workflow.load_workflow_state("test_state.json") workflow.run() assert workflow.tasks[0].result is not None os.remove("test_state.json") def test_real_world_usage_update_task_args(): # Create a sequential workflow, add a task, and update task arguments workflow = SequentialWorkflow() task_description = "Sample Task" task_flow = OpenAIChat() workflow.add(task_description, task_flow) workflow.update_task(task_description, max_tokens=1000) assert workflow.tasks[0].kwargs["max_tokens"] == 1000 def test_real_world_usage_remove_task(): # Create a sequential workflow, add tasks, remove a task, and run the workflow workflow = SequentialWorkflow() task1_description = "Task 1" task2_description = "Task 2" task1_flow = OpenAIChat() task2_flow = OpenAIChat() workflow.add(task1_description, task1_flow) workflow.add(task2_description, task2_flow) workflow.remove_task(task1_description) workflow.run() assert len(workflow.tasks) == 1 assert workflow.tasks[0].description == task2_description def test_real_world_usage_with_environment_variables(): # Ensure that the OpenAI API key is set using environment variables assert "OPENAI_API_KEY" in os.environ assert os.environ["OPENAI_API_KEY"] == "mocked_api_key" del os.environ["OPENAI_API_KEY"] # Clean up after the test def test_real_world_usage_no_openai_key(): # Ensure that an exception is raised when the OpenAI API key is not set with pytest.raises(ValueError): OpenAIChat() # API key not provided, should raise an exception