Merge pull request #1215 from Steve-Dusty/sequential-workflow

Add async execution support to AgentRearrange and fix SequentialWorkflow    initialization tests
pull/1224/head
Kye Gomez 5 days ago committed by GitHub
commit 3e6d23474a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,7 +1,7 @@
import json
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional, Union
import asyncio
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import run_agents_concurrently
@ -908,6 +908,45 @@ class AgentRearrange:
except Exception as e:
self._catch_error(e)
async def run_async(
self,
task: str,
img: Optional[str] = None,
*args,
**kwargs,
) -> Any:
"""
Asynchronously executes a task through the agent workflow.
This method enables asynchronous execution of tasks by running the
synchronous run method in a separate thread using asyncio.to_thread.
This is ideal for integrating the agent workflow into async applications
or when you want non-blocking execution.
Args:
task (str): The task to be executed through the agent workflow.
img (Optional[str]): Optional image input for the task. Defaults to None.
*args: Additional positional arguments passed to the run method.
**kwargs: Additional keyword arguments passed to the run method.
Returns:
Any: The result of the task execution, format depends on output_type setting.
Raises:
Exception: If an error occurs during task execution.
Note:
This method uses asyncio.to_thread to run the synchronous run method
asynchronously, allowing integration with async/await patterns.
"""
try:
return await asyncio.to_thread(
self.run, task=task, img=img, *args, **kwargs
)
except Exception as e:
self._catch_error(e)
def _serialize_callable(
self, attr_value: Callable
) -> Dict[str, Any]:

@ -3,20 +3,6 @@ import pytest
from swarms import Agent, SequentialWorkflow
# Test SequentialWorkflow class
def test_sequential_workflow_initialization():
workflow = SequentialWorkflow()
assert isinstance(workflow, SequentialWorkflow)
assert len(workflow.tasks) == 0
assert workflow.max_loops == 1
assert workflow.autosave is False
assert (
workflow.saved_state_filepath
== "sequential_workflow_state.json"
)
assert workflow.restore_state_filepath is None
assert workflow.dashboard is False
def test_sequential_workflow_initialization_with_agents():
"""Test SequentialWorkflow initialization with agents"""

Loading…
Cancel
Save