# AsyncWorkflow Documentation
The `AsyncWorkflow` class represents an asynchronous workflow that executes tasks concurrently using multiple agents. It allows for efficient task management, leveraging Python's `asyncio` for concurrent execution.
## Key Features
- **Concurrent Task Execution**: Distribute tasks across multiple agents asynchronously.
- **Configurable Workers**: Limit the number of concurrent workers (agents) for better resource management.
- **Autosave Results**: Optionally save the task execution results automatically.
- **Verbose Logging**: Enable detailed logging to monitor task execution.
- **Error Handling**: Gracefully handles exceptions raised by agents during task execution.
---
## Attributes
| Attribute | Type | Description |
|-------------------|---------------------|-----------------------------------------------------------------------------|
| `name` | `str` | The name of the workflow. |
| `agents` | `List[Agent]` | A list of agents participating in the workflow. |
| `max_workers` | `int` | The maximum number of concurrent workers (default: 5). |
| `dashboard` | `bool` | Whether to display a dashboard (currently not implemented). |
| `autosave` | `bool` | Whether to autosave task results (default: `False` ). |
| `verbose` | `bool` | Whether to enable detailed logging (default: `False` ). |
| `task_pool` | `List` | A pool of tasks to be executed. |
| `results` | `List` | A list to store results of executed tasks. |
| `loop` | `asyncio.EventLoop` | The event loop for asynchronous execution. |
---
**Description**:
Initializes the `AsyncWorkflow` with specified agents, configuration, and options.
**Parameters**:
- `name` (`str`): Name of the workflow. Default: "AsyncWorkflow".
- `agents` (`List[Agent]`): A list of agents. Default: `None` .
- `max_workers` (`int`): The maximum number of workers. Default: `5` .
- `dashboard` (`bool`): Enable dashboard visualization (placeholder for future implementation).
- `autosave` (`bool`): Enable autosave of task results. Default: `False` .
- `verbose` (`bool`): Enable detailed logging. Default: `False` .
- `**kwargs` : Additional parameters for `BaseWorkflow` .
---
### `_execute_agent_task`
```python
async def _execute_agent_task(self, agent: Agent, task: str) -> Any:
```
**Description**:
Executes a single task asynchronously using a given agent.
**Parameters**:
- `agent` (`Agent`): The agent responsible for executing the task.
- `task` (`str`): The task to be executed.
**Returns**:
- `Any` : The result of the task execution or an error message in case of an exception.
**Example**:
```python
result = await workflow._execute_agent_task(agent, "Sample Task")
```
---
### `run`
```python
async def run(self, task: str) -> List[Any]:
```
**Description**:
Executes the specified task concurrently across all agents.
**Parameters**:
- `task` (`str`): The task to be executed by all agents.
**Returns**:
- `List[Any]` : A list of results or error messages returned by the agents.
**Raises**:
- `ValueError` : If no agents are provided in the workflow.
**Example**:
```python
import asyncio
agents = [Agent("Agent1"), Agent("Agent2")]
workflow = AsyncWorkflow(agents=agents, verbose=True)
results = asyncio.run(workflow.run("Process Data"))
print(results)
```
---
## Production-Grade Financial Example: Multiple Agents
### Example: Stock Analysis and Investment Strategy
```python
import asyncio
from typing import List
from swarm_models import OpenAIChat
from swarms.structs.async_workflow import (
SpeakerConfig,
SpeakerRole,
create_default_workflow,
run_workflow_with_retry,
)
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.structs.agent import Agent
async def create_specialized_agents() -> List[Agent]:
"""Create a set of specialized agents for financial analysis"""
# Base model configuration
model = OpenAIChat(model_name="gpt-4o")
# Financial Analysis Agent
financial_agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT
+ "Output the < DONE > token when you're done creating a portfolio of etfs, index, funds, and more for AI",
max_loops=1,
llm=model,
dynamic_temperature_enabled=True,
user_name="Kye",
retry_attempts=3,
context_length=8192,
return_step_meta=False,
output_type="str",
auto_generate_prompt=False,
max_tokens=4000,
stopping_token="< DONE > ",
saved_state_path="financial_agent.json",
interactive=False,
)
# Risk Assessment Agent
risk_agent = Agent(
agent_name="Risk-Assessment-Agent",
agent_description="Investment risk analysis specialist",
system_prompt="Analyze investment risks and provide risk scores. Output < DONE > when analysis is complete.",
max_loops=1,
llm=model,
dynamic_temperature_enabled=True,
user_name="Kye",
retry_attempts=3,
context_length=8192,
output_type="str",
max_tokens=4000,
stopping_token="< DONE > ",
saved_state_path="risk_agent.json",
interactive=False,
)
# Market Research Agent
research_agent = Agent(
agent_name="Market-Research-Agent",
agent_description="AI and tech market research specialist",
system_prompt="Research AI market trends and growth opportunities. Output < DONE > when research is complete.",
max_loops=1,
llm=model,
dynamic_temperature_enabled=True,
user_name="Kye",
retry_attempts=3,
context_length=8192,
output_type="str",
max_tokens=4000,
stopping_token="< DONE > ",
saved_state_path="research_agent.json",
interactive=False,
)
return [financial_agent, risk_agent, research_agent]
async def main():
# Create specialized agents
agents = await create_specialized_agents()
# Create workflow with group chat enabled
workflow = create_default_workflow(
agents=agents,
name="AI-Investment-Analysis-Workflow",
enable_group_chat=True,
)
# Configure speaker roles
workflow.speaker_system.add_speaker(
SpeakerConfig(
role=SpeakerRole.COORDINATOR,
agent=agents[0], # Financial agent as coordinator
priority=1,
concurrent=False,
required=True,
)
)
workflow.speaker_system.add_speaker(
SpeakerConfig(
role=SpeakerRole.CRITIC,
agent=agents[1], # Risk agent as critic
priority=2,
concurrent=True,
)
)
workflow.speaker_system.add_speaker(
SpeakerConfig(
role=SpeakerRole.EXECUTOR,
agent=agents[2], # Research agent as executor
priority=2,
concurrent=True,
)
)
# Investment analysis task
investment_task = """
Create a comprehensive investment analysis for a $40k portfolio focused on AI growth opportunities:
1. Identify high-growth AI ETFs and index funds
2. Analyze risks and potential returns
3. Create a diversified portfolio allocation
4. Provide market trend analysis
Present the results in a structured markdown format.
"""
try:
# Run workflow with retry
result = await run_workflow_with_retry(
workflow=workflow, task=investment_task, max_retries=3
)
print("\nWorkflow Results:")
print("================")
# Process and display agent outputs
for output in result.agent_outputs:
print(f"\nAgent: {output.agent_name}")
print("-" * (len(output.agent_name) + 8))
print(output.output)
# Display group chat history if enabled
if workflow.enable_group_chat:
print("\nGroup Chat Discussion:")
print("=====================")
for msg in workflow.speaker_system.message_history:
print(f"\n{msg.role} ({msg.agent_name}):")
print(msg.content)
# Save detailed results
if result.metadata.get("shared_memory_keys"):
print("\nShared Insights:")
print("===============")
for key in result.metadata["shared_memory_keys"]:
value = workflow.shared_memory.get(key)
if value:
print(f"\n{key}:")
print(value)
except Exception as e:
print(f"Workflow failed: {str(e)}")
finally:
await workflow.cleanup()
if __name__ == "__main__":
# Run the example
asyncio.run(main())
```
---