You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
802 lines
24 KiB
802 lines
24 KiB
# GraphWorkflow
|
|
|
|
A powerful workflow orchestration system that creates directed graphs of agents for complex multi-agent collaboration and task execution.
|
|
|
|
## Overview
|
|
|
|
The `GraphWorkflow` class is a sophisticated workflow management system that enables the creation and execution of complex multi-agent workflows. It represents workflows as directed graphs where nodes are agents and edges represent data flow and dependencies between agents. The system supports parallel execution, automatic compilation optimization, and comprehensive visualization capabilities.
|
|
|
|
Key features:
|
|
|
|
| Feature | Description |
|
|
|------------------------|-----------------------------------------------------------------------------------------------|
|
|
| **Agent-based nodes** | Each node represents an agent that can process tasks |
|
|
| **Directed graph structure** | Edges define the flow of data between agents |
|
|
| **Parallel execution** | Multiple agents can run simultaneously within layers |
|
|
| **Automatic compilation** | Optimizes workflow structure for efficient execution |
|
|
| **Rich visualization** | Generate visual representations using Graphviz |
|
|
| **Serialization** | Save and load workflows as JSON |
|
|
| **Pattern detection** | Automatically identifies parallel processing patterns |
|
|
|
|
## Architecture
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "GraphWorkflow Architecture"
|
|
A[GraphWorkflow] --> B[Node Collection]
|
|
A --> C[Edge Collection]
|
|
A --> D[NetworkX Graph]
|
|
A --> E[Execution Engine]
|
|
|
|
B --> F[Agent Nodes]
|
|
C --> G[Directed Edges]
|
|
D --> H[Topological Sort]
|
|
E --> I[Parallel Execution]
|
|
E --> J[Layer Processing]
|
|
|
|
subgraph "Node Types"
|
|
F --> K[Agent Node]
|
|
K --> L[Agent Instance]
|
|
K --> M[Node Metadata]
|
|
end
|
|
|
|
subgraph "Edge Types"
|
|
G --> N[Simple Edge]
|
|
G --> O[Fan-out Edge]
|
|
G --> P[Fan-in Edge]
|
|
G --> Q[Parallel Chain]
|
|
end
|
|
|
|
subgraph "Execution Patterns"
|
|
I --> R[Thread Pool]
|
|
I --> S[Concurrent Futures]
|
|
J --> T[Layer-by-layer]
|
|
J --> U[Dependency Resolution]
|
|
end
|
|
end
|
|
```
|
|
|
|
## Class Reference
|
|
|
|
| Parameter | Type | Description | Default |
|
|
|-----------|------|-------------|---------|
|
|
| `id` | `Optional[str]` | Unique identifier for the workflow | Auto-generated UUID |
|
|
| `name` | `Optional[str]` | Human-readable name for the workflow | "Graph-Workflow-01" |
|
|
| `description` | `Optional[str]` | Detailed description of the workflow | Generic description |
|
|
| `nodes` | `Optional[Dict[str, Node]]` | Initial collection of nodes | `{}` |
|
|
| `edges` | `Optional[List[Edge]]` | Initial collection of edges | `[]` |
|
|
| `entry_points` | `Optional[List[str]]` | Node IDs that serve as starting points | `[]` |
|
|
| `end_points` | `Optional[List[str]]` | Node IDs that serve as ending points | `[]` |
|
|
| `max_loops` | `int` | Maximum number of execution loops | `1` |
|
|
| `task` | `Optional[str]` | The task to be executed by the workflow | `None` |
|
|
| `auto_compile` | `bool` | Whether to automatically compile the workflow | `True` |
|
|
| `verbose` | `bool` | Whether to enable detailed logging | `False` |
|
|
|
|
### Core Methods
|
|
|
|
#### `add_node(agent: Agent, **kwargs)`
|
|
|
|
Adds an agent node to the workflow graph.
|
|
|
|
| Parameter | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `agent` | `Agent` | The agent to add as a node |
|
|
| `**kwargs` | `Any` | Additional keyword arguments for the node |
|
|
|
|
**Raises:**
|
|
|
|
- `ValueError`: If a node with the same ID already exists
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow = GraphWorkflow()
|
|
agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
|
|
workflow.add_node(agent, metadata={"priority": "high"})
|
|
```
|
|
|
|
#### `add_edge(edge_or_source, target=None, **kwargs)`
|
|
|
|
Adds an edge to connect nodes in the workflow.
|
|
|
|
| Parameter | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `edge_or_source` | `Edge` or `str` | Either an Edge object or source node ID |
|
|
| `target` | `str` | Target node ID (required if edge_or_source is not an Edge) |
|
|
| `**kwargs` | `Any` | Additional keyword arguments for the edge |
|
|
|
|
**Raises:**
|
|
|
|
- `ValueError`: If source or target nodes don't exist
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
# Using Edge object
|
|
edge = Edge(source="agent1", target="agent2")
|
|
workflow.add_edge(edge)
|
|
|
|
# Using node IDs
|
|
workflow.add_edge("agent1", "agent2", metadata={"priority": "high"})
|
|
```
|
|
|
|
#### `add_edges_from_source(source, targets, **kwargs)`
|
|
|
|
Creates a fan-out pattern where one source connects to multiple targets.
|
|
|
|
| Parameter | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `source` | `str` | Source node ID |
|
|
| `targets` | `List[str]` | List of target node IDs |
|
|
| `**kwargs` | `Any` | Additional keyword arguments for all edges |
|
|
|
|
**Returns:**
|
|
|
|
- `List[Edge]`: List of created Edge objects
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow.add_edges_from_source(
|
|
"DataCollector",
|
|
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
|
|
)
|
|
```
|
|
|
|
#### `add_edges_to_target(sources, target, **kwargs)`
|
|
|
|
Creates a fan-in pattern where multiple sources connect to one target.
|
|
|
|
| Parameter | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `sources` | `List[str]` | List of source node IDs |
|
|
| `target` | `str` | Target node ID |
|
|
| `**kwargs` | `Any` | Additional keyword arguments for all edges |
|
|
|
|
**Returns:**
|
|
|
|
- `List[Edge]`: List of created Edge objects
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow.add_edges_to_target(
|
|
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
|
|
"SynthesisAgent"
|
|
)
|
|
```
|
|
|
|
#### `add_parallel_chain(sources, targets, **kwargs)`
|
|
|
|
Creates a full mesh connection between multiple sources and targets.
|
|
|
|
| Parameter | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `sources` | `List[str]` | List of source node IDs |
|
|
| `targets` | `List[str]` | List of target node IDs |
|
|
| `**kwargs` | `Any` | Additional keyword arguments for all edges |
|
|
|
|
**Returns:**
|
|
|
|
- `List[Edge]`: List of created Edge objects
|
|
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow.add_parallel_chain(
|
|
["DataCollector1", "DataCollector2"],
|
|
["Analyst1", "Analyst2", "Analyst3"]
|
|
)
|
|
```
|
|
|
|
### Execution Methods
|
|
|
|
#### `run(task: str = None, img: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]`
|
|
|
|
Executes the workflow with optimized parallel agent execution.
|
|
|
|
| Parameter | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `task` | `str` | Task to execute (uses self.task if not provided) |
|
|
| `img` | `Optional[str]` | Image path for vision-enabled agents |
|
|
| `*args` | `Any` | Additional positional arguments |
|
|
| `**kwargs` | `Any` | Additional keyword arguments |
|
|
|
|
**Returns:**
|
|
|
|
- `Dict[str, Any]`: Execution results from all nodes
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
results = workflow.run(
|
|
task="Analyze market trends for cryptocurrency",
|
|
max_loops=2
|
|
)
|
|
```
|
|
|
|
#### `arun(task: str = None, *args, **kwargs) -> Dict[str, Any]`
|
|
|
|
Async version of run for better performance with I/O bound operations.
|
|
|
|
| Parameter | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `task` | `str` | Task to execute |
|
|
| `*args` | `Any` | Additional positional arguments |
|
|
| `**kwargs` | `Any` | Additional keyword arguments |
|
|
|
|
**Returns:**
|
|
|
|
- `Dict[str, Any]`: Execution results from all nodes
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
import asyncio
|
|
results = await workflow.arun("Process large dataset")
|
|
```
|
|
|
|
### Compilation and Optimization
|
|
|
|
#### `compile()`
|
|
|
|
Pre-computes expensive operations for faster execution.
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow.compile()
|
|
status = workflow.get_compilation_status()
|
|
print(f"Compiled: {status['is_compiled']}")
|
|
```
|
|
|
|
#### `get_compilation_status() -> Dict[str, Any]`
|
|
|
|
Returns detailed compilation status information.
|
|
|
|
**Returns:**
|
|
|
|
- `Dict[str, Any]`: Compilation status including cache state and performance metrics
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
status = workflow.get_compilation_status()
|
|
print(f"Layers: {status['cached_layers_count']}")
|
|
print(f"Max workers: {status['max_workers']}")
|
|
```
|
|
|
|
### Visualization Methods
|
|
|
|
#### `visualize(format: str = "png", view: bool = True, engine: str = "dot", show_summary: bool = False) -> str`
|
|
|
|
Generates a visual representation of the workflow using Graphviz.
|
|
|
|
| Parameter | Type | Description | Default |
|
|
|-----------|------|-------------|---------|
|
|
| `format` | `str` | Output format ('png', 'svg', 'pdf', 'dot') | `"png"` |
|
|
| `view` | `bool` | Whether to open the visualization | `True` |
|
|
| `engine` | `str` | Graphviz layout engine | `"dot"` |
|
|
| `show_summary` | `bool` | Whether to print parallel processing summary | `False` |
|
|
|
|
**Returns:**
|
|
|
|
- `str`: Path to the generated visualization file
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
output_file = workflow.visualize(
|
|
format="svg",
|
|
show_summary=True
|
|
)
|
|
print(f"Visualization saved to: {output_file}")
|
|
```
|
|
|
|
#### `visualize_simple() -> str`
|
|
|
|
Generates a simple text-based visualization.
|
|
|
|
**Returns:**
|
|
|
|
- `str`: Text representation of the workflow
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
text_viz = workflow.visualize_simple()
|
|
print(text_viz)
|
|
```
|
|
|
|
### Serialization Methods
|
|
|
|
#### `to_json(fast: bool = True, include_conversation: bool = False, include_runtime_state: bool = False) -> str`
|
|
|
|
Serializes the workflow to JSON format.
|
|
|
|
| Parameter | Type | Description | Default |
|
|
|-----------|------|-------------|---------|
|
|
| `fast` | `bool` | Whether to use fast JSON serialization | `True` |
|
|
| `include_conversation` | `bool` | Whether to include conversation history | `False` |
|
|
| `include_runtime_state` | `bool` | Whether to include runtime state | `False` |
|
|
|
|
**Returns:**
|
|
|
|
- `str`: JSON representation of the workflow
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
json_data = workflow.to_json(
|
|
include_conversation=True,
|
|
include_runtime_state=True
|
|
)
|
|
```
|
|
|
|
#### `from_json(json_str: str, restore_runtime_state: bool = False) -> GraphWorkflow`
|
|
|
|
Deserializes a workflow from JSON format.
|
|
|
|
| Parameter | Type | Description | Default |
|
|
|-----------|------|-------------|---------|
|
|
| `json_str` | `str` | JSON string representation | Required |
|
|
| `restore_runtime_state` | `bool` | Whether to restore runtime state | `False` |
|
|
|
|
**Returns:**
|
|
|
|
- `GraphWorkflow`: A new GraphWorkflow instance
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow = GraphWorkflow.from_json(json_data, restore_runtime_state=True)
|
|
```
|
|
|
|
#### `save_to_file(filepath: str, include_conversation: bool = False, include_runtime_state: bool = False, overwrite: bool = False) -> str`
|
|
|
|
Saves the workflow to a JSON file.
|
|
|
|
| Parameter | Type | Description | Default |
|
|
|-----------|------|-------------|---------|
|
|
| `filepath` | `str` | Path to save the JSON file | Required |
|
|
| `include_conversation` | `bool` | Whether to include conversation history | `False` |
|
|
| `include_runtime_state` | `bool` | Whether to include runtime state | `False` |
|
|
| `overwrite` | `bool` | Whether to overwrite existing files | `False` |
|
|
|
|
**Returns:**
|
|
|
|
- `str`: Path to the saved file
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
filepath = workflow.save_to_file(
|
|
"my_workflow.json",
|
|
include_conversation=True
|
|
)
|
|
```
|
|
|
|
#### `load_from_file(filepath: str, restore_runtime_state: bool = False) -> GraphWorkflow`
|
|
|
|
Loads a workflow from a JSON file.
|
|
|
|
| Parameter | Type | Description | Default |
|
|
|-----------|------|-------------|---------|
|
|
| `filepath` | `str` | Path to the JSON file | Required |
|
|
| `restore_runtime_state` | `bool` | Whether to restore runtime state | `False` |
|
|
|
|
**Returns:**
|
|
|
|
- `GraphWorkflow`: Loaded workflow instance
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow = GraphWorkflow.load_from_file("my_workflow.json")
|
|
```
|
|
|
|
### Utility Methods
|
|
|
|
#### `export_summary() -> Dict[str, Any]`
|
|
|
|
Generates a human-readable summary of the workflow.
|
|
|
|
**Returns:**
|
|
|
|
- `Dict[str, Any]`: Comprehensive workflow summary
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
summary = workflow.export_summary()
|
|
print(f"Workflow has {summary['structure']['nodes']} nodes")
|
|
print(f"Compilation status: {summary['compilation_status']['is_compiled']}")
|
|
```
|
|
|
|
#### `set_entry_points(entry_points: List[str])`
|
|
|
|
Sets the entry points for the workflow.
|
|
|
|
| Parameter | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `entry_points` | `List[str]` | List of node IDs to serve as entry points |
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow.set_entry_points(["DataCollector", "ResearchAgent"])
|
|
```
|
|
|
|
#### `set_end_points(end_points: List[str])`
|
|
|
|
Sets the end points for the workflow.
|
|
|
|
| Parameter | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `end_points` | `List[str]` | List of node IDs to serve as end points |
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow.set_end_points(["SynthesisAgent", "ReportGenerator"])
|
|
```
|
|
|
|
### Class Methods
|
|
|
|
#### `from_spec(agents, edges, entry_points=None, end_points=None, task=None, **kwargs) -> GraphWorkflow`
|
|
|
|
Constructs a workflow from a list of agents and connections.
|
|
|
|
| Parameter | Type | Description | Default |
|
|
|-----------|------|-------------|---------|
|
|
| `agents` | `List` | List of agents or Node objects | Required |
|
|
| `edges` | `List` | List of edges or edge tuples | Required |
|
|
| `entry_points` | `List[str]` | List of entry point node IDs | `None` |
|
|
| `end_points` | `List[str]` | List of end point node IDs | `None` |
|
|
| `task` | `str` | Task to be executed by the workflow | `None` |
|
|
| `**kwargs` | `Any` | Additional keyword arguments | `{}` |
|
|
|
|
**Returns:**
|
|
|
|
- `GraphWorkflow`: A new GraphWorkflow instance
|
|
|
|
**Example:**
|
|
|
|
```python
|
|
workflow = GraphWorkflow.from_spec(
|
|
agents=[agent1, agent2, agent3],
|
|
edges=[
|
|
("agent1", "agent2"),
|
|
("agent2", "agent3"),
|
|
("agent1", ["agent2", "agent3"]) # Fan-out
|
|
],
|
|
task="Analyze market data"
|
|
)
|
|
```
|
|
|
|
## Examples
|
|
|
|
### Basic Sequential Workflow
|
|
|
|
```python
|
|
from swarms import Agent, GraphWorkflow
|
|
from swarms.prompts.multi_agent_collab_prompt import MULTI_AGENT_COLLAB_PROMPT_TWO
|
|
|
|
# Create agents
|
|
research_agent = Agent(
|
|
agent_name="ResearchAgent",
|
|
model_name="gpt-4",
|
|
system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO,
|
|
max_loops=1
|
|
)
|
|
|
|
analysis_agent = Agent(
|
|
agent_name="AnalysisAgent",
|
|
model_name="gpt-4",
|
|
system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO,
|
|
max_loops=1
|
|
)
|
|
|
|
# Build workflow
|
|
workflow = GraphWorkflow(name="Research-Analysis-Workflow")
|
|
workflow.add_node(research_agent)
|
|
workflow.add_node(analysis_agent)
|
|
workflow.add_edge("ResearchAgent", "AnalysisAgent")
|
|
|
|
# Execute
|
|
results = workflow.run("What are the latest trends in AI?")
|
|
print(results)
|
|
```
|
|
|
|
### Parallel Processing Workflow
|
|
|
|
```python
|
|
from swarms import Agent, GraphWorkflow
|
|
|
|
# Create specialized agents
|
|
data_collector = Agent(agent_name="DataCollector", model_name="gpt-4")
|
|
technical_analyst = Agent(agent_name="TechnicalAnalyst", model_name="gpt-4")
|
|
fundamental_analyst = Agent(agent_name="FundamentalAnalyst", model_name="gpt-4")
|
|
sentiment_analyst = Agent(agent_name="SentimentAnalyst", model_name="gpt-4")
|
|
synthesis_agent = Agent(agent_name="SynthesisAgent", model_name="gpt-4")
|
|
|
|
# Build parallel workflow
|
|
workflow = GraphWorkflow(name="Market-Analysis-Workflow")
|
|
|
|
# Add all agents
|
|
for agent in [data_collector, technical_analyst, fundamental_analyst,
|
|
sentiment_analyst, synthesis_agent]:
|
|
workflow.add_node(agent)
|
|
|
|
# Create fan-out pattern: data collector feeds all analysts
|
|
workflow.add_edges_from_source(
|
|
"DataCollector",
|
|
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
|
|
)
|
|
|
|
# Create fan-in pattern: all analysts feed synthesis agent
|
|
workflow.add_edges_to_target(
|
|
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
|
|
"SynthesisAgent"
|
|
)
|
|
|
|
# Execute
|
|
results = workflow.run("Analyze Bitcoin market trends")
|
|
print(results)
|
|
```
|
|
|
|
### Complex Multi-Layer Workflow
|
|
|
|
```python
|
|
from swarms import Agent, GraphWorkflow
|
|
|
|
# Create agents for different stages
|
|
data_collectors = [
|
|
Agent(agent_name=f"DataCollector{i}", model_name="gpt-4")
|
|
for i in range(1, 4)
|
|
]
|
|
|
|
analysts = [
|
|
Agent(agent_name=f"Analyst{i}", model_name="gpt-4")
|
|
for i in range(1, 4)
|
|
]
|
|
|
|
validators = [
|
|
Agent(agent_name=f"Validator{i}", model_name="gpt-4")
|
|
for i in range(1, 3)
|
|
]
|
|
|
|
synthesis_agent = Agent(agent_name="SynthesisAgent", model_name="gpt-4")
|
|
|
|
# Build complex workflow
|
|
workflow = GraphWorkflow(name="Complex-Research-Workflow")
|
|
|
|
# Add all agents
|
|
all_agents = data_collectors + analysts + validators + [synthesis_agent]
|
|
for agent in all_agents:
|
|
workflow.add_node(agent)
|
|
|
|
# Layer 1: Data collectors feed all analysts in parallel
|
|
workflow.add_parallel_chain(
|
|
[agent.agent_name for agent in data_collectors],
|
|
[agent.agent_name for agent in analysts]
|
|
)
|
|
|
|
# Layer 2: Analysts feed validators
|
|
workflow.add_parallel_chain(
|
|
[agent.agent_name for agent in analysts],
|
|
[agent.agent_name for agent in validators]
|
|
)
|
|
|
|
# Layer 3: Validators feed synthesis agent
|
|
workflow.add_edges_to_target(
|
|
[agent.agent_name for agent in validators],
|
|
"SynthesisAgent"
|
|
)
|
|
|
|
# Visualize and execute
|
|
workflow.visualize(show_summary=True)
|
|
results = workflow.run("Comprehensive analysis of renewable energy markets")
|
|
```
|
|
|
|
### Workflow with Custom Metadata
|
|
|
|
```python
|
|
from swarms import Agent, GraphWorkflow, Edge
|
|
|
|
# Create agents with specific roles
|
|
research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
|
|
analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")
|
|
|
|
# Build workflow with metadata
|
|
workflow = GraphWorkflow(
|
|
name="Metadata-Workflow",
|
|
description="Workflow demonstrating metadata usage"
|
|
)
|
|
|
|
workflow.add_node(research_agent, metadata={"priority": "high", "timeout": 300})
|
|
workflow.add_node(analysis_agent, metadata={"priority": "medium", "timeout": 600})
|
|
|
|
# Add edge with metadata
|
|
edge = Edge(
|
|
source="ResearchAgent",
|
|
target="AnalysisAgent",
|
|
metadata={"data_type": "research_findings", "priority": "high"}
|
|
)
|
|
workflow.add_edge(edge)
|
|
|
|
# Execute with custom parameters
|
|
results = workflow.run(
|
|
"Analyze the impact of climate change on agriculture",
|
|
max_loops=2
|
|
)
|
|
```
|
|
|
|
### Workflow Serialization and Persistence
|
|
|
|
```python
|
|
from swarms import Agent, GraphWorkflow
|
|
|
|
# Create workflow
|
|
research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
|
|
analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")
|
|
|
|
workflow = GraphWorkflow(name="Persistent-Workflow")
|
|
workflow.add_node(research_agent)
|
|
workflow.add_node(analysis_agent)
|
|
workflow.add_edge("ResearchAgent", "AnalysisAgent")
|
|
|
|
# Execute and get conversation
|
|
results = workflow.run("Research quantum computing applications")
|
|
|
|
# Save workflow with conversation history
|
|
filepath = workflow.save_to_file(
|
|
"quantum_research_workflow.json",
|
|
include_conversation=True,
|
|
include_runtime_state=True
|
|
)
|
|
|
|
# Load workflow later
|
|
loaded_workflow = GraphWorkflow.load_from_file(
|
|
filepath,
|
|
restore_runtime_state=True
|
|
)
|
|
|
|
# Continue execution
|
|
new_results = loaded_workflow.run("Continue with quantum cryptography analysis")
|
|
```
|
|
|
|
### Advanced Pattern Detection
|
|
|
|
```python
|
|
from swarms import Agent, GraphWorkflow
|
|
|
|
# Create a complex workflow with multiple patterns
|
|
workflow = GraphWorkflow(name="Pattern-Detection-Workflow", verbose=True)
|
|
|
|
# Create agents
|
|
agents = {
|
|
"collector": Agent(agent_name="DataCollector", model_name="gpt-4"),
|
|
"tech_analyst": Agent(agent_name="TechnicalAnalyst", model_name="gpt-4"),
|
|
"fund_analyst": Agent(agent_name="FundamentalAnalyst", model_name="gpt-4"),
|
|
"sentiment_analyst": Agent(agent_name="SentimentAnalyst", model_name="gpt-4"),
|
|
"risk_analyst": Agent(agent_name="RiskAnalyst", model_name="gpt-4"),
|
|
"synthesis": Agent(agent_name="SynthesisAgent", model_name="gpt-4"),
|
|
"validator": Agent(agent_name="Validator", model_name="gpt-4")
|
|
}
|
|
|
|
# Add all agents
|
|
for agent in agents.values():
|
|
workflow.add_node(agent)
|
|
|
|
# Create complex patterns
|
|
# Fan-out from collector
|
|
workflow.add_edges_from_source(
|
|
"DataCollector",
|
|
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst", "RiskAnalyst"]
|
|
)
|
|
|
|
# Fan-in to synthesis
|
|
workflow.add_edges_to_target(
|
|
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst", "RiskAnalyst"],
|
|
"SynthesisAgent"
|
|
)
|
|
|
|
# Final validation step
|
|
workflow.add_edge("SynthesisAgent", "Validator")
|
|
|
|
# Compile and get status
|
|
workflow.compile()
|
|
status = workflow.get_compilation_status()
|
|
|
|
print(f"Compilation status: {status}")
|
|
print(f"Layers: {status['cached_layers_count']}")
|
|
print(f"Max workers: {status['max_workers']}")
|
|
|
|
# Visualize with pattern detection
|
|
workflow.visualize(show_summary=True, format="png")
|
|
```
|
|
|
|
### Error Handling and Recovery
|
|
|
|
```python
|
|
from swarms import Agent, GraphWorkflow
|
|
import logging
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# Create workflow with error handling
|
|
workflow = GraphWorkflow(
|
|
name="Error-Handling-Workflow",
|
|
verbose=True,
|
|
max_loops=1
|
|
)
|
|
|
|
# Create agents
|
|
try:
|
|
research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
|
|
analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")
|
|
|
|
workflow.add_node(research_agent)
|
|
workflow.add_node(analysis_agent)
|
|
workflow.add_edge("ResearchAgent", "AnalysisAgent")
|
|
|
|
# Execute with error handling
|
|
try:
|
|
results = workflow.run("Analyze market trends")
|
|
print("Workflow completed successfully")
|
|
print(results)
|
|
|
|
except Exception as e:
|
|
print(f"Workflow execution failed: {e}")
|
|
|
|
# Get workflow summary for debugging
|
|
summary = workflow.export_summary()
|
|
print(f"Workflow state: {summary['structure']}")
|
|
|
|
except Exception as e:
|
|
print(f"Workflow setup failed: {e}")
|
|
```
|
|
|
|
## Conclusion
|
|
|
|
The `GraphWorkflow` class provides a powerful and flexible framework for orchestrating complex multi-agent workflows. Its key benefits include:
|
|
|
|
### Benefits
|
|
|
|
| Benefit | Description |
|
|
|-----------------|--------------------------------------------------------------------------------------------------|
|
|
| **Scalability** | Supports workflows with hundreds of agents through efficient parallel execution |
|
|
| **Flexibility** | Multiple connection patterns (sequential, fan-out, fan-in, parallel chains) |
|
|
| **Performance** | Automatic compilation and optimization for faster execution |
|
|
| **Visualization** | Rich visual representations for workflow understanding and debugging |
|
|
| **Persistence** | Complete serialization and deserialization capabilities |
|
|
| **Error Handling** | Comprehensive error handling and recovery mechanisms |
|
|
| **Monitoring** | Detailed logging and status reporting |
|
|
|
|
### Use Cases
|
|
|
|
| Use Case | Description |
|
|
|-------------------------|--------------------------------------------------------------------|
|
|
| **Research Workflows** | Multi-stage research with data collection, analysis, and synthesis |
|
|
| **Content Generation** | Parallel content creation with validation and refinement |
|
|
| **Data Processing** | Complex ETL pipelines with multiple processing stages |
|
|
| **Decision Making** | Multi-agent decision systems with voting and consensus |
|
|
| **Quality Assurance** | Multi-stage validation and verification processes |
|
|
| **Automated Testing** | Complex test orchestration with parallel execution |
|
|
|
|
### Best Practices
|
|
|
|
| Best Practice | Description |
|
|
|---------------------------------------|------------------------------------------------------------------|
|
|
| **Use meaningful agent names** | Helps with debugging and visualization |
|
|
| **Leverage parallel patterns** | Use fan-out and fan-in for better performance |
|
|
| **Compile workflows** | Always compile before execution for optimal performance |
|
|
| **Monitor execution** | Use verbose mode and status reporting for debugging |
|
|
| **Save important workflows** | Use serialization for workflow persistence |
|
|
| **Handle errors gracefully** | Implement proper error handling and recovery |
|
|
| **Visualize complex workflows** | Use visualization to understand and debug workflows |
|
|
|
|
The GraphWorkflow system represents a significant advancement in multi-agent orchestration, providing the tools needed to build complex, scalable, and maintainable AI workflows. |