[FIX][Async Docs]

pull/692/merge
Kye Gomez 3 weeks ago
parent e98d618bd8
commit 1cc69ba925

@ -187,6 +187,7 @@ nav:
- Various Execution Methods: "swarms/structs/various_execution_methods.md" - Various Execution Methods: "swarms/structs/various_execution_methods.md"
- Workflows: - Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
- AsyncWorkflow: "swarms/structs/async_workflow.md"
- SequentialWorkflow: "swarms/structs/sequential_workflow.md" - SequentialWorkflow: "swarms/structs/sequential_workflow.md"
- Structs: - Structs:
- Conversation: "swarms/structs/conversation.md" - Conversation: "swarms/structs/conversation.md"

@ -1,266 +1,181 @@
# AsyncWorkflow Documentation # AsyncWorkflow Documentation
The `AsyncWorkflow` class represents an asynchronous workflow designed to execute tasks concurrently. This class is ideal for scenarios where tasks need to be run asynchronously, leveraging Python's asyncio capabilities to manage multiple tasks efficiently. The `AsyncWorkflow` class represents an asynchronous workflow that executes tasks concurrently using multiple agents. It allows for efficient task management, leveraging Python's `asyncio` for concurrent execution.
### Key Concepts ## Key Features
- **Concurrent Task Execution**: Distribute tasks across multiple agents asynchronously.
- **Configurable Workers**: Limit the number of concurrent workers (agents) for better resource management.
- **Autosave Results**: Optionally save the task execution results automatically.
- **Verbose Logging**: Enable detailed logging to monitor task execution.
- **Error Handling**: Gracefully handles exceptions raised by agents during task execution.
- **Asynchronous Execution**: Tasks are run concurrently using asyncio, allowing for non-blocking operations. ---
- **Task Pool**: A collection of tasks to be executed within the workflow.
- **Event Loop**: The asyncio event loop that manages the execution of asynchronous tasks.
- **Stopping Condition**: A condition that, when met, stops the execution of the workflow.
## Attributes ## Attributes
| Attribute | Type | Description |
### Arguments |-------------------|---------------------|-----------------------------------------------------------------------------|
| `name` | `str` | The name of the workflow. |
| Argument | Type | Default | Description | | `agents` | `List[Agent]` | A list of agents participating in the workflow. |
|----------|------|---------|-------------| | `max_workers` | `int` | The maximum number of concurrent workers (default: 5). |
| `name` | `str` | `"Async Workflow"` | The name of the workflow. | | `dashboard` | `bool` | Whether to display a dashboard (currently not implemented). |
| `description` | `str` | `"A workflow to run asynchronous tasks"` | The description of the workflow. | | `autosave` | `bool` | Whether to autosave task results (default: `False`). |
| `max_loops` | `int` | `1` | The maximum number of loops to run the workflow. | | `verbose` | `bool` | Whether to enable detailed logging (default: `False`). |
| `autosave` | `bool` | `True` | Flag indicating whether to autosave the results. | | `task_pool` | `List` | A pool of tasks to be executed. |
| `dashboard` | `bool` | `False` | Flag indicating whether to display a dashboard. | | `results` | `List` | A list to store results of executed tasks. |
| `task_pool` | `List[Any]` | `[]` | The list of tasks in the workflow. | | `loop` | `asyncio.EventLoop` | The event loop for asynchronous execution. |
| `results` | `List[Any]` | `[]` | The list of results from running the tasks. |
| `loop` | `Optional[asyncio.AbstractEventLoop]` | `None` | The event loop to use. | ---
| `stopping_condition` | `Optional[Callable]` | `None` | The stopping condition for the workflow. |
| `agents` | `List[Agent]` | `None` | A list of agents participating in the workflow. | **Description**:
Initializes the `AsyncWorkflow` with specified agents, configuration, and options.
### Attributes
**Parameters**:
| Attribute | Type | Description | - `name` (`str`): Name of the workflow. Default: "AsyncWorkflow".
|-----------|------|-------------| - `agents` (`List[Agent]`): A list of agents. Default: `None`.
| `name` | `str` | The name of the workflow. | - `max_workers` (`int`): The maximum number of workers. Default: `5`.
| `description` | `str` | The description of the workflow. | - `dashboard` (`bool`): Enable dashboard visualization (placeholder for future implementation).
| `max_loops` | `int` | The maximum number of loops to run the workflow. | - `autosave` (`bool`): Enable autosave of task results. Default: `False`.
| `autosave` | `bool` | Flag indicating whether to autosave the results. | - `verbose` (`bool`): Enable detailed logging. Default: `False`.
| `dashboard` | `bool` | Flag indicating whether to display a dashboard. | - `**kwargs`: Additional parameters for `BaseWorkflow`.
| `task_pool` | `List[Any]` | The list of tasks in the workflow. |
| `results` | `List[Any]` | The list of results from running the tasks. | ---
| `loop` | `Optional[asyncio.AbstractEventLoop]` | The event loop to use. |
| `stopping_condition` | `Optional[Callable]` | The stopping condition for the workflow. | ### `_execute_agent_task`
| `agents` | `List[Agent]` | A list of agents participating in the workflow. |
## Methods
### add
Adds a task or a list of tasks to the task pool.
**Arguments:**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `task` | `Any` | `None` | A single task to add. |
| `tasks` | `List[Any]` | `None` | A list of tasks to add. |
**Raises:**
- `ValueError`: If neither task nor tasks are provided.
**Examples:**
```python ```python
workflow = AsyncWorkflow() async def _execute_agent_task(self, agent: Agent, task: str) -> Any:
task1 = Task(description="Task 1")
task2 = Task(description="Task 2")
# Adding a single task
await workflow.add(task=task1)
# Adding multiple tasks
await workflow.add(tasks=[task1, task2])
``` ```
**Description**:
Executes a single task asynchronously using a given agent.
### delete **Parameters**:
- `agent` (`Agent`): The agent responsible for executing the task.
- `task` (`str`): The task to be executed.
Deletes a task from the workflow. **Returns**:
- `Any`: The result of the task execution or an error message in case of an exception.
**Arguments:**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `task` | `Any` | `None` | A single task to delete. |
| `tasks` | `List[Task]` | `None` | A list of tasks to delete. |
**Examples:**
**Example**:
```python ```python
workflow = AsyncWorkflow() result = await workflow._execute_agent_task(agent, "Sample Task")
task1 = Task(description="Task 1")
task2 = Task(description="Task 2")
# Adding tasks to the workflow
await workflow.add(tasks=[task1, task2])
# Deleting a single task
await workflow.delete(task=task1)
# Deleting multiple tasks
await workflow.delete(tasks=[task1, task2])
``` ```
### run ---
Runs the workflow and returns the results.
**Returns:**
| Return Type | Description |
|-------------|-------------|
| `List[Any]` | The results of the executed tasks. |
**Examples:**
### `run`
```python ```python
workflow = AsyncWorkflow() async def run(self, task: str) -> List[Any]:
task1 = Task(description="Task 1", execute=async_function)
task2 = Task(description="Task 2", execute=async_function)
# Adding tasks to the workflow
await workflow.add(tasks=[task1, task2])
# Running the workflow
results = await workflow.run()
``` ```
**Description**:
Executes the specified task concurrently across all agents.
### Additional Examples **Parameters**:
- `task` (`str`): The task to be executed by all agents.
#### Example 1: Simple AsyncWorkflow
```python
import asyncio
from swarms.structs.agent import Agent
from swarms.structs.task import Task
async def simple_task():
await asyncio.sleep(1)
return "Task Completed"
workflow = AsyncWorkflow()
task = Task(description="Simple Task", execute=simple_task)
# Adding a task to the workflow **Returns**:
await workflow.add(task=task) - `List[Any]`: A list of results or error messages returned by the agents.
# Running the workflow **Raises**:
results = await workflow.run() - `ValueError`: If no agents are provided in the workflow.
print(results) # Output: ["Task Completed"]
```
#### Example 2: Workflow with Multiple Tasks
**Example**:
```python ```python
import asyncio import asyncio
from swarms.structs.agent import Agent
from swarms.structs.task import Task
async def task1():
await asyncio.sleep(1)
return "Task 1 Completed"
async def task2():
await asyncio.sleep(2)
return "Task 2 Completed"
workflow = AsyncWorkflow() agents = [Agent("Agent1"), Agent("Agent2")]
task_1 = Task(description="Task 1", execute=task1) workflow = AsyncWorkflow(agents=agents, verbose=True)
task_2 = Task(description="Task 2", execute=task2)
# Adding tasks to the workflow results = asyncio.run(workflow.run("Process Data"))
await workflow.add(tasks=[task_1, task_2]) print(results)
# Running the workflow
results = await workflow.run()
print(results) # Output: ["Task 1 Completed", "Task 2 Completed"]
``` ```
#### Example 3: Workflow with Stopping Condition ---
## Production-Grade Financial Example: Multiple Agents
### Example: Stock Analysis and Investment Strategy
```python ```python
import asyncio import asyncio
from swarms.structs.agent import Agent from swarms import Agent
from swarms.structs.task import Task from async_workflow import AsyncWorkflow
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
async def task1():
await asyncio.sleep(1) # Initialize multiple Financial Agents
return "Task 1 Completed" portfolio_analysis_agent = Agent(
agent_name="Portfolio-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
model_name="gpt-4o-mini",
autosave=True,
verbose=True,
)
async def task2(): stock_strategy_agent = Agent(
await asyncio.sleep(2) agent_name="Stock-Strategy-Agent",
return "Task 2 Completed" system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
model_name="gpt-4o-mini",
autosave=True,
verbose=True,
)
def stop_condition(results): risk_management_agent = Agent(
return "Task 2 Completed" in results agent_name="Risk-Management-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
model_name="gpt-4o-mini",
autosave=True,
verbose=True,
)
workflow = AsyncWorkflow(stopping_condition=stop_condition) # Create a workflow with multiple agents
task_1 = Task(description="Task 1", execute=task1) workflow = AsyncWorkflow(
task_2 = Task(description="Task 2", execute=task2) name="Financial-Workflow",
agents=[portfolio_analysis_agent, stock_strategy_agent, risk_management_agent],
verbose=True,
)
# Adding tasks to the workflow # Run the workflow
await workflow.add(tasks=[task_1, task_2]) async def main():
task = "Analyze the current stock market trends and provide an investment strategy with risk assessment."
results = await workflow.run(task)
for agent_result in results:
print(agent_result)
# Running the workflow asyncio.run(main())
results = await workflow.run()
print(results) # Output: ["Task 1 Completed", "Task 2 Completed"]
``` ```
# Async Workflow **Output**:
```
INFO: Agent Portfolio-Analysis-Agent processing task: Analyze the current stock market trends and provide an investment strategy with risk assessment.
INFO: Agent Stock-Strategy-Agent processing task: Analyze the current stock market trends and provide an investment strategy with risk assessment.
INFO: Agent Risk-Management-Agent processing task: Analyze the current stock market trends and provide an investment strategy with risk assessment.
INFO: Agent Portfolio-Analysis-Agent completed task
INFO: Agent Stock-Strategy-Agent completed task
INFO: Agent Risk-Management-Agent completed task
Results:
- Detailed portfolio analysis...
- Stock investment strategies...
- Risk assessment insights...
```
The AsyncWorkflow allows multiple agents to process tasks concurrently using Python's asyncio framework. ---
## Usage Example ## Notes
1. **Autosave**: The autosave functionality is a placeholder. Users can implement custom logic to save `self.results`.
2. **Error Handling**: Exceptions raised by agents are logged and returned as part of the results.
3. **Dashboard**: The `dashboard` feature is currently not implemented but can be extended for visualization.
```python ---
import asyncio
from swarms import Agent, AsyncWorkflow
from swarm_models import OpenAIChat
# Initialize model
model = OpenAIChat(
openai_api_key="your-api-key",
model_name="gpt-4",
temperature=0.7
)
# Create agents ## Dependencies
agents = [ - `asyncio`: Python's asynchronous I/O framework.
Agent( - `loguru`: Logging utility for better log management.
agent_name=f"Analysis-Agent-{i}", - `swarms`: Base components (`BaseWorkflow`, `Agent`).
llm=model,
max_loops=1,
dashboard=False,
verbose=True,
)
for i in range(3)
]
# Initialize workflow
workflow = AsyncWorkflow(
name="Analysis-Workflow",
agents=agents,
max_workers=3,
verbose=True
)
# Run workflow ---
async def main():
task = "Analyze the potential impact of AI on healthcare"
results = await workflow.run(task)
for i, result in enumerate(results):
print(f"Agent {i} result: {result}")
# Execute ## Future Extensions
asyncio.run(main()) - **Dashboard**: Implement a real-time dashboard for monitoring agent performance.
``` - **Autosave**: Add persistent storage support for task results.
- **Task Management**: Extend task pooling and scheduling logic to support dynamic workloads.
## Parameters ---
| Parameter | Type | Default | Description | ## License
|-----------|------|---------|-------------| This class is part of the `swarms` framework and follows the framework's licensing terms.
| `name` | str | "AsyncWorkflow" | Name of the workflow |
| `agents` | List[Agent] | None | List of agents to execute tasks |
| `max_workers` | int | 5 | Maximum number of concurrent workers |
| `dashboard` | bool | False | Enable/disable dashboard |
| `autosave` | bool | False | Enable/disable autosaving results |
| `verbose` | bool | False | Enable/disable verbose logging |

@ -2,10 +2,25 @@ import asyncio
from typing import Any, List from typing import Any, List
from swarms.structs.base_workflow import BaseWorkflow from swarms.structs.base_workflow import BaseWorkflow
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.utils.loguru_logger import logger from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger("async_workflow")
class AsyncWorkflow(BaseWorkflow): class AsyncWorkflow(BaseWorkflow):
"""
Represents an asynchronous workflow that can execute tasks concurrently using multiple agents.
Attributes:
- name (str): The name of the workflow.
- agents (List[Agent]): A list of agents participating in the workflow.
- max_workers (int): The maximum number of workers to use for concurrent execution.
- dashboard (bool): Indicates if a dashboard should be displayed.
- autosave (bool): Indicates if the results should be autosaved.
- verbose (bool): Indicates if verbose logging is enabled.
- task_pool (List): A pool of tasks to be executed.
- results (List): The results of the executed tasks.
- loop (asyncio.AbstractEventLoop): The event loop used for asynchronous execution.
"""
def __init__( def __init__(
self, self,
name: str = "AsyncWorkflow", name: str = "AsyncWorkflow",
@ -30,7 +45,16 @@ class AsyncWorkflow(BaseWorkflow):
async def _execute_agent_task( async def _execute_agent_task(
self, agent: Agent, task: str self, agent: Agent, task: str
) -> Any: ) -> Any:
"""Execute a single agent task asynchronously""" """
Executes a single agent task asynchronously.
Args:
- agent (Agent): The agent executing the task.
- task (str): The task to be executed.
Returns:
- Any: The result of the task execution or an error message if an exception occurs.
"""
try: try:
if self.verbose: if self.verbose:
logger.info( logger.info(
@ -49,7 +73,15 @@ class AsyncWorkflow(BaseWorkflow):
return str(e) return str(e)
async def run(self, task: str) -> List[Any]: async def run(self, task: str) -> List[Any]:
"""Run the workflow with all agents processing the task concurrently""" """
Runs the workflow with all agents processing the task concurrently.
Args:
- task (str): The task to be executed by all agents.
Returns:
- List[Any]: A list of results from all agents or error messages if exceptions occur.
"""
if not self.agents: if not self.agents:
raise ValueError("No agents provided to the workflow") raise ValueError("No agents provided to the workflow")

Loading…
Cancel
Save