You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/tests/structs/sequential_workflow.py

334 lines
10 KiB

import asyncio
import os
from unittest.mock import patch
import pytest
from swarms.models import OpenAIChat
from swarms.structs.flow import Flow
from swarms.structs.sequential_workflow import SequentialWorkflow, Task
# Mock the OpenAI API key using environment variables
os.environ["OPENAI_API_KEY"] = "mocked_api_key"
# Mock OpenAIChat class for testing
class MockOpenAIChat:
def __init__(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
return "Mocked result"
# Mock Flow class for testing
class MockFlow:
def __init__(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
return "Mocked result"
# Mock SequentialWorkflow class for testing
class MockSequentialWorkflow:
def __init__(self, *args, **kwargs):
pass
def add(self, *args, **kwargs):
pass
def run(self):
pass
# Test Task class
def test_task_initialization():
description = "Sample Task"
flow = MockOpenAIChat()
task = Task(description=description, flow=flow)
assert task.description == description
assert task.flow == flow
def test_task_execute():
description = "Sample Task"
flow = MockOpenAIChat()
task = Task(description=description, flow=flow)
task.execute()
assert task.result == "Mocked result"
# Test SequentialWorkflow class
def test_sequential_workflow_initialization():
workflow = SequentialWorkflow()
assert isinstance(workflow, SequentialWorkflow)
assert len(workflow.tasks) == 0
assert workflow.max_loops == 1
assert workflow.autosave is False
assert workflow.saved_state_filepath == "sequential_workflow_state.json"
assert workflow.restore_state_filepath is None
assert workflow.dashboard is False
def test_sequential_workflow_add_task():
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = MockOpenAIChat()
workflow.add(task_description, task_flow)
assert len(workflow.tasks) == 1
assert workflow.tasks[0].description == task_description
assert workflow.tasks[0].flow == task_flow
def test_sequential_workflow_reset_workflow():
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = MockOpenAIChat()
workflow.add(task_description, task_flow)
workflow.reset_workflow()
assert workflow.tasks[0].result is None
def test_sequential_workflow_get_task_results():
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = MockOpenAIChat()
workflow.add(task_description, task_flow)
workflow.run()
results = workflow.get_task_results()
assert len(results) == 1
assert task_description in results
assert results[task_description] == "Mocked result"
def test_sequential_workflow_remove_task():
workflow = SequentialWorkflow()
task1_description = "Task 1"
task2_description = "Task 2"
task1_flow = MockOpenAIChat()
task2_flow = MockOpenAIChat()
workflow.add(task1_description, task1_flow)
workflow.add(task2_description, task2_flow)
workflow.remove_task(task1_description)
assert len(workflow.tasks) == 1
assert workflow.tasks[0].description == task2_description
def test_sequential_workflow_update_task():
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = MockOpenAIChat()
workflow.add(task_description, task_flow)
workflow.update_task(task_description, max_tokens=1000)
assert workflow.tasks[0].kwargs["max_tokens"] == 1000
def test_sequential_workflow_save_workflow_state():
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = MockOpenAIChat()
workflow.add(task_description, task_flow)
workflow.save_workflow_state("test_state.json")
assert os.path.exists("test_state.json")
os.remove("test_state.json")
def test_sequential_workflow_load_workflow_state():
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = MockOpenAIChat()
workflow.add(task_description, task_flow)
workflow.save_workflow_state("test_state.json")
workflow.load_workflow_state("test_state.json")
assert len(workflow.tasks) == 1
assert workflow.tasks[0].description == task_description
os.remove("test_state.json")
def test_sequential_workflow_run():
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = MockOpenAIChat()
workflow.add(task_description, task_flow)
workflow.run()
assert workflow.tasks[0].result == "Mocked result"
def test_sequential_workflow_workflow_bootup(capfd):
workflow = SequentialWorkflow()
workflow.workflow_bootup()
out, _ = capfd.readouterr()
assert "Sequential Workflow Initializing..." in out
def test_sequential_workflow_workflow_dashboard(capfd):
workflow = SequentialWorkflow()
workflow.workflow_dashboard()
out, _ = capfd.readouterr()
assert "Sequential Workflow Dashboard" in out
# Mock Flow class for async testing
class MockAsyncFlow:
def __init__(self, *args, **kwargs):
pass
async def arun(self, *args, **kwargs):
return "Mocked result"
# Test async execution in SequentialWorkflow
@pytest.mark.asyncio
async def test_sequential_workflow_arun():
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = MockAsyncFlow()
workflow.add(task_description, task_flow)
await workflow.arun()
assert workflow.tasks[0].result == "Mocked result"
def test_real_world_usage_with_openai_key():
# Initialize the language model
llm = OpenAIChat()
assert isinstance(llm, OpenAIChat)
def test_real_world_usage_with_flow_and_openai_key():
# Initialize a flow with the language model
flow = Flow(llm=OpenAIChat())
assert isinstance(flow, Flow)
def test_real_world_usage_with_sequential_workflow():
# Initialize a sequential workflow
workflow = SequentialWorkflow()
assert isinstance(workflow, SequentialWorkflow)
def test_real_world_usage_add_tasks():
# Create a sequential workflow and add tasks
workflow = SequentialWorkflow()
task1_description = "Task 1"
task2_description = "Task 2"
task1_flow = OpenAIChat()
task2_flow = OpenAIChat()
workflow.add(task1_description, task1_flow)
workflow.add(task2_description, task2_flow)
assert len(workflow.tasks) == 2
assert workflow.tasks[0].description == task1_description
assert workflow.tasks[1].description == task2_description
def test_real_world_usage_run_workflow():
# Create a sequential workflow, add a task, and run the workflow
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = OpenAIChat()
workflow.add(task_description, task_flow)
workflow.run()
assert workflow.tasks[0].result is not None
def test_real_world_usage_dashboard_display():
# Create a sequential workflow, add tasks, and display the dashboard
workflow = SequentialWorkflow()
task1_description = "Task 1"
task2_description = "Task 2"
task1_flow = OpenAIChat()
task2_flow = OpenAIChat()
workflow.add(task1_description, task1_flow)
workflow.add(task2_description, task2_flow)
with patch("builtins.print") as mock_print:
workflow.workflow_dashboard()
mock_print.assert_called()
def test_real_world_usage_async_execution():
# Create a sequential workflow, add an async task, and run the workflow asynchronously
workflow = SequentialWorkflow()
task_description = "Sample Task"
async_task_flow = OpenAIChat()
async def async_run_workflow():
await workflow.arun()
workflow.add(task_description, async_task_flow)
asyncio.run(async_run_workflow())
assert workflow.tasks[0].result is not None
def test_real_world_usage_multiple_loops():
# Create a sequential workflow with multiple loops, add a task, and run the workflow
workflow = SequentialWorkflow(max_loops=3)
task_description = "Sample Task"
task_flow = OpenAIChat()
workflow.add(task_description, task_flow)
workflow.run()
assert workflow.tasks[0].result is not None
def test_real_world_usage_autosave_state():
# Create a sequential workflow with autosave, add a task, run the workflow, and check if state is saved
workflow = SequentialWorkflow(autosave=True)
task_description = "Sample Task"
task_flow = OpenAIChat()
workflow.add(task_description, task_flow)
workflow.run()
assert workflow.tasks[0].result is not None
assert os.path.exists("sequential_workflow_state.json")
os.remove("sequential_workflow_state.json")
def test_real_world_usage_load_state():
# Create a sequential workflow, add a task, save state, load state, and run the workflow
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = OpenAIChat()
workflow.add(task_description, task_flow)
workflow.run()
workflow.save_workflow_state("test_state.json")
workflow.load_workflow_state("test_state.json")
workflow.run()
assert workflow.tasks[0].result is not None
os.remove("test_state.json")
def test_real_world_usage_update_task_args():
# Create a sequential workflow, add a task, and update task arguments
workflow = SequentialWorkflow()
task_description = "Sample Task"
task_flow = OpenAIChat()
workflow.add(task_description, task_flow)
workflow.update_task(task_description, max_tokens=1000)
assert workflow.tasks[0].kwargs["max_tokens"] == 1000
def test_real_world_usage_remove_task():
# Create a sequential workflow, add tasks, remove a task, and run the workflow
workflow = SequentialWorkflow()
task1_description = "Task 1"
task2_description = "Task 2"
task1_flow = OpenAIChat()
task2_flow = OpenAIChat()
workflow.add(task1_description, task1_flow)
workflow.add(task2_description, task2_flow)
workflow.remove_task(task1_description)
workflow.run()
assert len(workflow.tasks) == 1
assert workflow.tasks[0].description == task2_description
def test_real_world_usage_with_environment_variables():
# Ensure that the OpenAI API key is set using environment variables
assert "OPENAI_API_KEY" in os.environ
assert os.environ["OPENAI_API_KEY"] == "mocked_api_key"
del os.environ["OPENAI_API_KEY"] # Clean up after the test
def test_real_world_usage_no_openai_key():
# Ensure that an exception is raised when the OpenAI API key is not set
with pytest.raises(ValueError):
OpenAIChat() # API key not provided, should raise an exception