|
|
|
# TaskQueueBase
|
|
|
|
|
|
|
|
import threading
|
|
|
|
from unittest.mock import Mock
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
from swarms.tokenizers import Agent, Task, TaskQueueBase
|
|
|
|
|
|
|
|
|
|
|
|
# Create mocked instances of dependencies
|
|
|
|
@pytest.fixture()
|
|
|
|
def task():
|
|
|
|
return Mock(spec=Task)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
|
def agent():
|
|
|
|
return Mock(spec=Agent)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
|
def concrete_task_queue():
|
|
|
|
class ConcreteTaskQueue(TaskQueueBase):
|
|
|
|
def add_task(self, task):
|
|
|
|
pass # Here you would add concrete implementation of add_task
|
|
|
|
|
|
|
|
def get_task(self, agent):
|
|
|
|
pass # Concrete implementation of get_task
|
|
|
|
|
|
|
|
def complete_task(self, task_id):
|
|
|
|
pass # Concrete implementation of complete_task
|
|
|
|
|
|
|
|
def reset_task(self, task_id):
|
|
|
|
pass # Concrete implementation of reset_task
|
|
|
|
|
|
|
|
return ConcreteTaskQueue()
|
|
|
|
|
|
|
|
|
|
|
|
def test_task_queue_initialization(concrete_task_queue):
|
|
|
|
assert isinstance(concrete_task_queue, TaskQueueBase)
|
|
|
|
assert isinstance(concrete_task_queue.lock, threading.Lock)
|
|
|
|
|
|
|
|
|
|
|
|
def test_add_task_success(concrete_task_queue, task):
|
|
|
|
# Assuming add_task returns True on success
|
|
|
|
assert concrete_task_queue.add_task(task) is True
|
|
|
|
|
|
|
|
|
|
|
|
def test_add_task_failure(concrete_task_queue, task):
|
|
|
|
# Assuming the task is somehow invalid
|
|
|
|
# Note: Concrete implementation requires logic defining what an invalid task is
|
|
|
|
concrete_task_queue.add_task(task)
|
|
|
|
assert (
|
|
|
|
concrete_task_queue.add_task(task) is False
|
|
|
|
) # Adding the same task again
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("invalid_task", [None, "", {}, []])
|
|
|
|
def test_add_task_invalid_input(concrete_task_queue, invalid_task):
|
|
|
|
with pytest.raises(TypeError):
|
|
|
|
concrete_task_queue.add_task(invalid_task)
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_task_success(concrete_task_queue, agent):
|
|
|
|
# Assuming there's a mechanism to populate queue
|
|
|
|
# You will need to add a task before getting it
|
|
|
|
task = Mock(spec=Task)
|
|
|
|
concrete_task_queue.add_task(task)
|
|
|
|
assert concrete_task_queue.get_task(agent) == task
|
|
|
|
|
|
|
|
|
|
|
|
# def test_get_task_no_tasks_available(concrete_task_queue, agent):
|
|
|
|
# with pytest.raises(
|
|
|
|
# EmptyQueueError
|
|
|
|
# ): # Assuming such an exception exists
|
|
|
|
# concrete_task_queue.get_task(agent)
|
|
|
|
|
|
|
|
|
|
|
|
def test_complete_task_success(concrete_task_queue):
|
|
|
|
task_id = "test_task_123"
|
|
|
|
# Populating queue and completing task assumed
|
|
|
|
assert concrete_task_queue.complete_task(task_id) is None
|
|
|
|
|
|
|
|
|
|
|
|
# def test_complete_task_with_invalid_id(concrete_task_queue):
|
|
|
|
# invalid_task_id = "invalid_id"
|
|
|
|
# with pytest.raises(
|
|
|
|
# TaskNotFoundError
|
|
|
|
# ): # Assuming such an exception exists
|
|
|
|
# concrete_task_queue.complete_task(invalid_task_id)
|
|
|
|
|
|
|
|
|
|
|
|
def test_reset_task_success(concrete_task_queue):
|
|
|
|
task_id = "test_task_123"
|
|
|
|
# Populating queue and resetting task assumed
|
|
|
|
assert concrete_task_queue.reset_task(task_id) is None
|
|
|
|
|
|
|
|
|
|
|
|
# def test_reset_task_with_invalid_id(concrete_task_queue):
|
|
|
|
# invalid_task_id = "invalid_id"
|
|
|
|
# with pytest.raises(
|
|
|
|
# TaskNotFoundError
|
|
|
|
# ): # Assuming such an exception exists
|
|
|
|
# concrete_task_queue.reset_task(invalid_task_id)
|