added run_async to agent rearrange and handeled test sequantil workflow init

pull/1215/head
Steve-Dusty 1 week ago
parent 8c7670121c
commit d817ee7913

@ -908,6 +908,46 @@ class AgentRearrange:
except Exception as e: except Exception as e:
self._catch_error(e) self._catch_error(e)
async def run_async(
self,
task: str,
img: Optional[str] = None,
*args,
**kwargs,
) -> Any:
"""
Asynchronously executes a task through the agent workflow.
This method enables asynchronous execution of tasks by running the
synchronous run method in a separate thread using asyncio.to_thread.
This is ideal for integrating the agent workflow into async applications
or when you want non-blocking execution.
Args:
task (str): The task to be executed through the agent workflow.
img (Optional[str]): Optional image input for the task. Defaults to None.
*args: Additional positional arguments passed to the run method.
**kwargs: Additional keyword arguments passed to the run method.
Returns:
Any: The result of the task execution, format depends on output_type setting.
Raises:
Exception: If an error occurs during task execution.
Note:
This method uses asyncio.to_thread to run the synchronous run method
asynchronously, allowing integration with async/await patterns.
"""
import asyncio
try:
return await asyncio.to_thread(
self.run, task=task, img=img, *args, **kwargs
)
except Exception as e:
self._catch_error(e)
def _serialize_callable( def _serialize_callable(
self, attr_value: Callable self, attr_value: Callable
) -> Dict[str, Any]: ) -> Dict[str, Any]:

@ -5,17 +5,9 @@ from swarms import Agent, SequentialWorkflow
# Test SequentialWorkflow class # Test SequentialWorkflow class
def test_sequential_workflow_initialization(): def test_sequential_workflow_initialization():
# SequentialWorkflow requires agents, so expect ValueError
with pytest.raises(ValueError, match="Agents list cannot be None or empty"):
workflow = SequentialWorkflow() workflow = SequentialWorkflow()
assert isinstance(workflow, SequentialWorkflow)
assert len(workflow.tasks) == 0
assert workflow.max_loops == 1
assert workflow.autosave is False
assert (
workflow.saved_state_filepath
== "sequential_workflow_state.json"
)
assert workflow.restore_state_filepath is None
assert workflow.dashboard is False
def test_sequential_workflow_initialization_with_agents(): def test_sequential_workflow_initialization_with_agents():

Loading…
Cancel
Save