from concurrent.futures import Future from unittest.mock import Mock, create_autospec, patch from swarms.structs import Agent, ConcurrentWorkflow, Task def test_add(): workflow = ConcurrentWorkflow(max_workers=2) task = Mock(spec=Task) workflow.add(task) assert task in workflow.tasks def test_run(): workflow = ConcurrentWorkflow(max_workers=2) task1 = create_autospec(Task) task2 = create_autospec(Task) workflow.add(task1) workflow.add(task2) with patch( "concurrent.futures.ThreadPoolExecutor" ) as mock_executor: future1 = Future() future1.set_result(None) future2 = Future() future2.set_result(None) mock_executor.return_value.__enter__.return_value.submit.side_effect = [ future1, future2, ] mock_executor.return_value.__enter__.return_value.as_completed.return_value = [ future1, future2, ] workflow.run() task1.execute.assert_called_once() task2.execute.assert_called_once() def test_execute_task(): workflow = ConcurrentWorkflow(max_workers=2) task = create_autospec(Task) workflow._execute_task(task) task.execute.assert_called_once() def test_agent_execution(): workflow = ConcurrentWorkflow(max_workers=2) agent = create_autospec(Agent) task = Task(agent) workflow.add(task) workflow._execute_task(task) agent.execute.assert_called_once()