diff --git a/docs/swarms/structs/graph_workflow.md b/docs/swarms/structs/graph_workflow.md
index ef48d8d0..490dadb7 100644
--- a/docs/swarms/structs/graph_workflow.md
+++ b/docs/swarms/structs/graph_workflow.md
@@ -1,802 +1,345 @@
-# GraphWorkflow
+# `GraphWorkflow`
-A powerful workflow orchestration system that creates directed graphs of agents for complex multi-agent collaboration and task execution.
+GraphWorkflow orchestrates tasks using a Directed Acyclic Graph (DAG), allowing you to manage complex dependencies where some tasks must wait for others to complete. It provides a comprehensive framework for building sophisticated pipelines with advanced state management, error handling, and performance optimization capabilities.
## Overview
-The `GraphWorkflow` class is a sophisticated workflow management system that enables the creation and execution of complex multi-agent workflows. It represents workflows as directed graphs where nodes are agents and edges represent data flow and dependencies between agents. The system supports parallel execution, automatic compilation optimization, and comprehensive visualization capabilities.
+The `GraphWorkflow` class establishes a graph-based workflow system with nodes representing tasks, agents, conditions, or data processors, and edges defining the flow and dependencies between these components. It includes features such as:
-Key features:
-
-| Feature | Description |
-|------------------------|-----------------------------------------------------------------------------------------------|
-| **Agent-based nodes** | Each node represents an agent that can process tasks |
-| **Directed graph structure** | Edges define the flow of data between agents |
-| **Parallel execution** | Multiple agents can run simultaneously within layers |
-| **Automatic compilation** | Optimizes workflow structure for efficient execution |
-| **Rich visualization** | Generate visual representations using Graphviz |
-| **Serialization** | Save and load workflows as JSON |
-| **Pattern detection** | Automatically identifies parallel processing patterns |
+1. **Multiple Node Types**: Support for agents, tasks, conditions, data processors, and more
+2. **Flexible Edge Types**: Sequential, conditional, parallel, and error handling edges
+3. **State Management**: Multiple storage backends with checkpointing and recovery
+4. **Asynchronous Execution**: Full async/await support for efficient resource utilization
+5. **Error Handling**: Comprehensive retry logic and error recovery mechanisms
+6. **Performance Optimization**: Parallel execution and bottleneck detection
+7. **Visualization**: Mermaid diagrams and real-time dashboard monitoring
+8. **Serialization**: Support for JSON, YAML, and custom DSL formats
+9. **Plugin System**: Extensible architecture for custom components
+10. **AI-Augmented Features**: Workflow description, optimization, and generation
## Architecture
```mermaid
-graph TB
- subgraph "GraphWorkflow Architecture"
- A[GraphWorkflow] --> B[Node Collection]
- A --> C[Edge Collection]
- A --> D[NetworkX Graph]
- A --> E[Execution Engine]
-
- B --> F[Agent Nodes]
- C --> G[Directed Edges]
- D --> H[Topological Sort]
- E --> I[Parallel Execution]
- E --> J[Layer Processing]
-
- subgraph "Node Types"
- F --> K[Agent Node]
- K --> L[Agent Instance]
- K --> M[Node Metadata]
- end
-
- subgraph "Edge Types"
- G --> N[Simple Edge]
- G --> O[Fan-out Edge]
- G --> P[Fan-in Edge]
- G --> Q[Parallel Chain]
- end
-
- subgraph "Execution Patterns"
- I --> R[Thread Pool]
- I --> S[Concurrent Futures]
- J --> T[Layer-by-layer]
- J --> U[Dependency Resolution]
- end
- end
-```
-
-## Class Reference
-
-| Parameter | Type | Description | Default |
-|-----------|------|-------------|---------|
-| `id` | `Optional[str]` | Unique identifier for the workflow | Auto-generated UUID |
-| `name` | `Optional[str]` | Human-readable name for the workflow | "Graph-Workflow-01" |
-| `description` | `Optional[str]` | Detailed description of the workflow | Generic description |
-| `nodes` | `Optional[Dict[str, Node]]` | Initial collection of nodes | `{}` |
-| `edges` | `Optional[List[Edge]]` | Initial collection of edges | `[]` |
-| `entry_points` | `Optional[List[str]]` | Node IDs that serve as starting points | `[]` |
-| `end_points` | `Optional[List[str]]` | Node IDs that serve as ending points | `[]` |
-| `max_loops` | `int` | Maximum number of execution loops | `1` |
-| `task` | `Optional[str]` | The task to be executed by the workflow | `None` |
-| `auto_compile` | `bool` | Whether to automatically compile the workflow | `True` |
-| `verbose` | `bool` | Whether to enable detailed logging | `False` |
-
-### Core Methods
-
-#### `add_node(agent: Agent, **kwargs)`
-
-Adds an agent node to the workflow graph.
-
-| Parameter | Type | Description |
-|-----------|------|-------------|
-| `agent` | `Agent` | The agent to add as a node |
-| `**kwargs` | `Any` | Additional keyword arguments for the node |
-
-**Raises:**
-
-- `ValueError`: If a node with the same ID already exists
-
-**Example:**
-
-```python
-workflow = GraphWorkflow()
-agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
-workflow.add_node(agent, metadata={"priority": "high"})
-```
-
-#### `add_edge(edge_or_source, target=None, **kwargs)`
-
-Adds an edge to connect nodes in the workflow.
-
-| Parameter | Type | Description |
-|-----------|------|-------------|
-| `edge_or_source` | `Edge` or `str` | Either an Edge object or source node ID |
-| `target` | `str` | Target node ID (required if edge_or_source is not an Edge) |
-| `**kwargs` | `Any` | Additional keyword arguments for the edge |
-
-**Raises:**
-
-- `ValueError`: If source or target nodes don't exist
-
-**Example:**
-
-```python
-# Using Edge object
-edge = Edge(source="agent1", target="agent2")
-workflow.add_edge(edge)
-
-# Using node IDs
-workflow.add_edge("agent1", "agent2", metadata={"priority": "high"})
-```
-
-#### `add_edges_from_source(source, targets, **kwargs)`
-
-Creates a fan-out pattern where one source connects to multiple targets.
-
-| Parameter | Type | Description |
-|-----------|------|-------------|
-| `source` | `str` | Source node ID |
-| `targets` | `List[str]` | List of target node IDs |
-| `**kwargs` | `Any` | Additional keyword arguments for all edges |
-
-**Returns:**
-
-- `List[Edge]`: List of created Edge objects
-
-**Example:**
-
-```python
-workflow.add_edges_from_source(
- "DataCollector",
- ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
-)
-```
-
-#### `add_edges_to_target(sources, target, **kwargs)`
-
-Creates a fan-in pattern where multiple sources connect to one target.
-
-| Parameter | Type | Description |
-|-----------|------|-------------|
-| `sources` | `List[str]` | List of source node IDs |
-| `target` | `str` | Target node ID |
-| `**kwargs` | `Any` | Additional keyword arguments for all edges |
-
-**Returns:**
-
-- `List[Edge]`: List of created Edge objects
-
-**Example:**
-
-```python
-workflow.add_edges_to_target(
- ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
- "SynthesisAgent"
-)
-```
-
-#### `add_parallel_chain(sources, targets, **kwargs)`
-
-Creates a full mesh connection between multiple sources and targets.
-
-| Parameter | Type | Description |
-|-----------|------|-------------|
-| `sources` | `List[str]` | List of source node IDs |
-| `targets` | `List[str]` | List of target node IDs |
-| `**kwargs` | `Any` | Additional keyword arguments for all edges |
-
-**Returns:**
-
-- `List[Edge]`: List of created Edge objects
-
-
-**Example:**
-
-```python
-workflow.add_parallel_chain(
- ["DataCollector1", "DataCollector2"],
- ["Analyst1", "Analyst2", "Analyst3"]
-)
-```
-
-### Execution Methods
-
-#### `run(task: str = None, img: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]`
-
-Executes the workflow with optimized parallel agent execution.
-
-| Parameter | Type | Description |
-|-----------|------|-------------|
-| `task` | `str` | Task to execute (uses self.task if not provided) |
-| `img` | `Optional[str]` | Image path for vision-enabled agents |
-| `*args` | `Any` | Additional positional arguments |
-| `**kwargs` | `Any` | Additional keyword arguments |
-
-**Returns:**
-
-- `Dict[str, Any]`: Execution results from all nodes
-
-**Example:**
-
-```python
-results = workflow.run(
- task="Analyze market trends for cryptocurrency",
- max_loops=2
-)
-```
-
-#### `arun(task: str = None, *args, **kwargs) -> Dict[str, Any]`
-
-Async version of run for better performance with I/O bound operations.
-
-| Parameter | Type | Description |
-|-----------|------|-------------|
-| `task` | `str` | Task to execute |
-| `*args` | `Any` | Additional positional arguments |
-| `**kwargs` | `Any` | Additional keyword arguments |
-
-**Returns:**
-
-- `Dict[str, Any]`: Execution results from all nodes
-
-**Example:**
-
-```python
-import asyncio
-results = await workflow.arun("Process large dataset")
-```
-
-### Compilation and Optimization
-
-#### `compile()`
-
-Pre-computes expensive operations for faster execution.
-
-**Example:**
-
-```python
-workflow.compile()
-status = workflow.get_compilation_status()
-print(f"Compiled: {status['is_compiled']}")
-```
-
-#### `get_compilation_status() -> Dict[str, Any]`
-
-Returns detailed compilation status information.
-
-**Returns:**
-
-- `Dict[str, Any]`: Compilation status including cache state and performance metrics
-
-**Example:**
-
-```python
-status = workflow.get_compilation_status()
-print(f"Layers: {status['cached_layers_count']}")
-print(f"Max workers: {status['max_workers']}")
-```
-
-### Visualization Methods
-
-#### `visualize(format: str = "png", view: bool = True, engine: str = "dot", show_summary: bool = False) -> str`
-
-Generates a visual representation of the workflow using Graphviz.
-
-| Parameter | Type | Description | Default |
-|-----------|------|-------------|---------|
-| `format` | `str` | Output format ('png', 'svg', 'pdf', 'dot') | `"png"` |
-| `view` | `bool` | Whether to open the visualization | `True` |
-| `engine` | `str` | Graphviz layout engine | `"dot"` |
-| `show_summary` | `bool` | Whether to print parallel processing summary | `False` |
-
-**Returns:**
-
-- `str`: Path to the generated visualization file
-
-**Example:**
-
-```python
-output_file = workflow.visualize(
- format="svg",
- show_summary=True
-)
-print(f"Visualization saved to: {output_file}")
-```
-
-#### `visualize_simple() -> str`
-
-Generates a simple text-based visualization.
-
-**Returns:**
-
-- `str`: Text representation of the workflow
-
-**Example:**
-
-```python
-text_viz = workflow.visualize_simple()
-print(text_viz)
+graph TD
+ A[Workflow Initiation] -->|Creates Graph| B[Node Addition]
+ B -->|Adds Nodes| C[Edge Configuration]
+ C -->|Defines Flow| D[Execution Planning]
+ D -->|Topological Sort| E[Node Execution]
+ E -->|Agent Nodes| F[AI Agent Tasks]
+ E -->|Task Nodes| G[Custom Functions]
+ E -->|Condition Nodes| H[Decision Logic]
+ E -->|Data Nodes| I[Data Processing]
+ F -->|Results| J[State Management]
+ G -->|Results| J
+ H -->|Results| J
+ I -->|Results| J
+ J -->|Checkpointing| K[Storage Backend]
+ K -->|Memory/SQLite/Redis| L[State Persistence]
+ J -->|Next Node| E
+ E -->|All Complete| M[Workflow Complete]
```
-### Serialization Methods
-
-#### `to_json(fast: bool = True, include_conversation: bool = False, include_runtime_state: bool = False) -> str`
-
-Serializes the workflow to JSON format.
-
-| Parameter | Type | Description | Default |
-|-----------|------|-------------|---------|
-| `fast` | `bool` | Whether to use fast JSON serialization | `True` |
-| `include_conversation` | `bool` | Whether to include conversation history | `False` |
-| `include_runtime_state` | `bool` | Whether to include runtime state | `False` |
-
-**Returns:**
-
-- `str`: JSON representation of the workflow
-
-**Example:**
-
-```python
-json_data = workflow.to_json(
- include_conversation=True,
- include_runtime_state=True
-)
+## `GraphWorkflow` Attributes
+
+| Attribute | Description |
+|-----------|-------------|
+| `name` | Name of the workflow instance |
+| `description` | Human-readable description of the workflow |
+| `nodes` | Dictionary of nodes in the graph |
+| `edges` | List of edges connecting nodes |
+| `entry_points` | Node IDs that serve as entry points |
+| `end_points` | Node IDs that serve as end points |
+| `graph` | NetworkX or RustWorkX graph representation |
+| `max_loops` | Maximum execution loops |
+| `timeout` | Overall workflow timeout in seconds |
+| `auto_save` | Whether to auto-save workflow state |
+| `show_dashboard` | Whether to show real-time dashboard |
+| `priority` | Workflow priority level |
+| `distributed` | Whether workflow supports distributed execution |
+| `graph_engine` | Graph engine type (NetworkX or RustWorkX) |
+| `state_backend` | Storage backend for state management |
+| `auto_checkpointing` | Enable automatic checkpointing |
+| `checkpoint_interval` | Checkpoint frequency in seconds |
+
+## Node Types
+
+| Node Type | Description | Use Case |
+|-----------|-------------|----------|
+| `AGENT` | Execute AI agents with task delegation | AI-powered tasks and decision making |
+| `TASK` | Run custom functions and callables | Data processing and business logic |
+| `CONDITION` | Implement conditional logic and branching | Decision points and flow control |
+| `DATA_PROCESSOR` | Transform and process data | Data manipulation and transformation |
+| `GATEWAY` | Control flow routing and decision points | Complex routing logic |
+| `SUBWORKFLOW` | Embed nested workflows | Modular workflow design |
+| `PARALLEL` | Execute tasks concurrently | Performance optimization |
+| `MERGE` | Combine results from parallel executions | Result aggregation |
+
+## Edge Types
+
+| Edge Type | Description | Use Case |
+|-----------|-------------|----------|
+| `SEQUENTIAL` | Standard linear execution flow | Simple task dependencies |
+| `CONDITIONAL` | Branch based on conditions | Decision-based routing |
+| `PARALLEL` | Enable concurrent execution | Performance optimization |
+| `ERROR` | Handle error conditions and recovery | Error handling and fallbacks |
+
+## Storage Backends
+
+| Backend | Description | Use Case |
+|---------|-------------|----------|
+| `MEMORY` | Fast in-memory storage | Development and testing |
+| `SQLITE` | Persistent local storage | Single-machine production |
+| `REDIS` | Distributed storage | Multi-machine production |
+| `FILE` | Simple file-based storage | Basic persistence |
+| `ENCRYPTED_FILE` | Secure encrypted storage | Sensitive data handling |
+
+## Core Methods
+
+| Method | Description | Inputs | Usage Example |
+|--------|-------------|--------|----------------|
+| `add_node(node)` | Adds a node to the workflow graph | `node` (Node): Node to add | `workflow.add_node(node)` |
+| `add_edge(edge)` | Adds an edge to the workflow graph | `edge` (Edge): Edge to add | `workflow.add_edge(edge)` |
+| `set_entry_points(entry_points)` | Sets the entry points for workflow execution | `entry_points` (List[str]): Entry point node IDs | `workflow.set_entry_points(["start"])` |
+| `set_end_points(end_points)` | Sets the end points for workflow completion | `end_points` (List[str]): End point node IDs | `workflow.set_end_points(["end"])` |
+| `run(task, initial_data)` | Executes the workflow asynchronously | `task` (str): Task description
`initial_data` (dict): Initial data | `await workflow.run("Process data")` |
+| `validate_workflow()` | Validates the workflow structure | None | `errors = workflow.validate_workflow()` |
+| `get_execution_order()` | Gets the topological order of nodes | None | `order = workflow.get_execution_order()` |
+| `visualize()` | Generates a Mermaid diagram | None | `diagram = workflow.visualize()` |
+| `save_state(key)` | Saves workflow state | `key` (str): State identifier | `await workflow.save_state("checkpoint")` |
+| `load_state(key)` | Loads workflow state | `key` (str): State identifier | `await workflow.load_state("checkpoint")` |
+| `create_checkpoint(description)` | Creates a workflow checkpoint | `description` (str): Checkpoint description | `await workflow.create_checkpoint("milestone")` |
+| `to_dict()` | Converts workflow to dictionary | None | `data = workflow.to_dict()` |
+| `from_dict(data)` | Creates workflow from dictionary | `data` (dict): Workflow data | `workflow = GraphWorkflow.from_dict(data)` |
+| `to_yaml()` | Converts workflow to YAML | None | `yaml_str = workflow.to_yaml()` |
+| `from_yaml(yaml_str)` | Creates workflow from YAML | `yaml_str` (str): YAML string | `workflow = GraphWorkflow.from_yaml(yaml_str)` |
+| `save_to_file(filepath, format)` | Saves workflow to file | `filepath` (str): File path
`format` (str): File format | `workflow.save_to_file("workflow.json")` |
+| `load_from_file(filepath)` | Loads workflow from file | `filepath` (str): File path | `workflow = GraphWorkflow.load_from_file("workflow.json")` |
+
+## Getting Started
+
+To use GraphWorkflow, first install the required dependencies:
+
+```bash
+pip3 install -U swarms
```
-#### `from_json(json_str: str, restore_runtime_state: bool = False) -> GraphWorkflow`
-
-Deserializes a workflow from JSON format.
-
-| Parameter | Type | Description | Default |
-|-----------|------|-------------|---------|
-| `json_str` | `str` | JSON string representation | Required |
-| `restore_runtime_state` | `bool` | Whether to restore runtime state | `False` |
-
-**Returns:**
-
-- `GraphWorkflow`: A new GraphWorkflow instance
-
-**Example:**
+Then, you can initialize and use the workflow as follows:
```python
-workflow = GraphWorkflow.from_json(json_data, restore_runtime_state=True)
-```
-
-#### `save_to_file(filepath: str, include_conversation: bool = False, include_runtime_state: bool = False, overwrite: bool = False) -> str`
-
-Saves the workflow to a JSON file.
-
-| Parameter | Type | Description | Default |
-|-----------|------|-------------|---------|
-| `filepath` | `str` | Path to save the JSON file | Required |
-| `include_conversation` | `bool` | Whether to include conversation history | `False` |
-| `include_runtime_state` | `bool` | Whether to include runtime state | `False` |
-| `overwrite` | `bool` | Whether to overwrite existing files | `False` |
-
-**Returns:**
-
-- `str`: Path to the saved file
+from swarms import Agent, GraphWorkflow, Node, Edge, NodeType
-**Example:**
-
-```python
-filepath = workflow.save_to_file(
- "my_workflow.json",
- include_conversation=True
+# Define agents
+code_generator = Agent(
+ agent_name="CodeGenerator",
+ system_prompt="Write Python code for the given task.",
+ model_name="gpt-4o-mini"
)
-```
-
-#### `load_from_file(filepath: str, restore_runtime_state: bool = False) -> GraphWorkflow`
-
-Loads a workflow from a JSON file.
-
-| Parameter | Type | Description | Default |
-|-----------|------|-------------|---------|
-| `filepath` | `str` | Path to the JSON file | Required |
-| `restore_runtime_state` | `bool` | Whether to restore runtime state | `False` |
-
-**Returns:**
-
-- `GraphWorkflow`: Loaded workflow instance
-
-**Example:**
-
-```python
-workflow = GraphWorkflow.load_from_file("my_workflow.json")
-```
-
-### Utility Methods
-
-#### `export_summary() -> Dict[str, Any]`
-
-Generates a human-readable summary of the workflow.
-
-**Returns:**
-
-- `Dict[str, Any]`: Comprehensive workflow summary
-
-**Example:**
-
-```python
-summary = workflow.export_summary()
-print(f"Workflow has {summary['structure']['nodes']} nodes")
-print(f"Compilation status: {summary['compilation_status']['is_compiled']}")
-```
-
-#### `set_entry_points(entry_points: List[str])`
-
-Sets the entry points for the workflow.
-
-| Parameter | Type | Description |
-|-----------|------|-------------|
-| `entry_points` | `List[str]` | List of node IDs to serve as entry points |
-
-**Example:**
-
-```python
-workflow.set_entry_points(["DataCollector", "ResearchAgent"])
-```
-
-#### `set_end_points(end_points: List[str])`
-
-Sets the end points for the workflow.
-
-| Parameter | Type | Description |
-|-----------|------|-------------|
-| `end_points` | `List[str]` | List of node IDs to serve as end points |
-
-**Example:**
-
-```python
-workflow.set_end_points(["SynthesisAgent", "ReportGenerator"])
-```
-
-### Class Methods
-
-#### `from_spec(agents, edges, entry_points=None, end_points=None, task=None, **kwargs) -> GraphWorkflow`
-
-Constructs a workflow from a list of agents and connections.
-
-| Parameter | Type | Description | Default |
-|-----------|------|-------------|---------|
-| `agents` | `List` | List of agents or Node objects | Required |
-| `edges` | `List` | List of edges or edge tuples | Required |
-| `entry_points` | `List[str]` | List of entry point node IDs | `None` |
-| `end_points` | `List[str]` | List of end point node IDs | `None` |
-| `task` | `str` | Task to be executed by the workflow | `None` |
-| `**kwargs` | `Any` | Additional keyword arguments | `{}` |
-
-**Returns:**
-
-- `GraphWorkflow`: A new GraphWorkflow instance
-
-**Example:**
-
-```python
-workflow = GraphWorkflow.from_spec(
- agents=[agent1, agent2, agent3],
- edges=[
- ("agent1", "agent2"),
- ("agent2", "agent3"),
- ("agent1", ["agent2", "agent3"]) # Fan-out
- ],
- task="Analyze market data"
+code_tester = Agent(
+ agent_name="CodeTester",
+ system_prompt="Test the given Python code and find bugs.",
+ model_name="gpt-4o-mini"
)
-```
-## Examples
+# Create nodes for the graph
+node1 = Node(id="generator", agent=code_generator)
+node2 = Node(id="tester", agent=code_tester)
-### Basic Sequential Workflow
+# Create the graph and define the dependency
+graph = GraphWorkflow()
+graph.add_nodes([node1, node2])
+graph.add_edge(Edge(source="generator", target="tester"))
-```python
-from swarms import Agent, GraphWorkflow
-from swarms.prompts.multi_agent_collab_prompt import MULTI_AGENT_COLLAB_PROMPT_TWO
-
-# Create agents
-research_agent = Agent(
- agent_name="ResearchAgent",
- model_name="gpt-4",
- system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO,
- max_loops=1
-)
-
-analysis_agent = Agent(
- agent_name="AnalysisAgent",
- model_name="gpt-4",
- system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO,
- max_loops=1
-)
+# Set entry and end points
+graph.set_entry_points(["generator"])
+graph.set_end_points(["tester"])
-# Build workflow
-workflow = GraphWorkflow(name="Research-Analysis-Workflow")
-workflow.add_node(research_agent)
-workflow.add_node(analysis_agent)
-workflow.add_edge("ResearchAgent", "AnalysisAgent")
-
-# Execute
-results = workflow.run("What are the latest trends in AI?")
+# Run the graph workflow
+results = graph.run("Create a function that calculates the factorial of a number.")
print(results)
```
-### Parallel Processing Workflow
+## Advanced Usage
+
+### State Management
```python
-from swarms import Agent, GraphWorkflow
-
-# Create specialized agents
-data_collector = Agent(agent_name="DataCollector", model_name="gpt-4")
-technical_analyst = Agent(agent_name="TechnicalAnalyst", model_name="gpt-4")
-fundamental_analyst = Agent(agent_name="FundamentalAnalyst", model_name="gpt-4")
-sentiment_analyst = Agent(agent_name="SentimentAnalyst", model_name="gpt-4")
-synthesis_agent = Agent(agent_name="SynthesisAgent", model_name="gpt-4")
-
-# Build parallel workflow
-workflow = GraphWorkflow(name="Market-Analysis-Workflow")
-
-# Add all agents
-for agent in [data_collector, technical_analyst, fundamental_analyst,
- sentiment_analyst, synthesis_agent]:
- workflow.add_node(agent)
-
-# Create fan-out pattern: data collector feeds all analysts
-workflow.add_edges_from_source(
- "DataCollector",
- ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
-)
+from swarms.structs.graph_workflow import GraphWorkflow, Node, Edge, NodeType
-# Create fan-in pattern: all analysts feed synthesis agent
-workflow.add_edges_to_target(
- ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
- "SynthesisAgent"
+# Create workflow with state management
+workflow = GraphWorkflow(
+ name="AdvancedWorkflow",
+ state_backend="sqlite",
+ auto_checkpointing=True
)
-# Execute
-results = workflow.run("Analyze Bitcoin market trends")
-print(results)
-```
+# Add nodes
+node1 = Node(id="start", type=NodeType.TASK, callable=lambda: "Hello")
+node2 = Node(id="end", type=NodeType.TASK, callable=lambda x: f"{x} World")
-### Complex Multi-Layer Workflow
+workflow.add_node(node1)
+workflow.add_node(node2)
-```python
-from swarms import Agent, GraphWorkflow
-
-# Create agents for different stages
-data_collectors = [
- Agent(agent_name=f"DataCollector{i}", model_name="gpt-4")
- for i in range(1, 4)
-]
-
-analysts = [
- Agent(agent_name=f"Analyst{i}", model_name="gpt-4")
- for i in range(1, 4)
-]
-
-validators = [
- Agent(agent_name=f"Validator{i}", model_name="gpt-4")
- for i in range(1, 3)
-]
-
-synthesis_agent = Agent(agent_name="SynthesisAgent", model_name="gpt-4")
-
-# Build complex workflow
-workflow = GraphWorkflow(name="Complex-Research-Workflow")
-
-# Add all agents
-all_agents = data_collectors + analysts + validators + [synthesis_agent]
-for agent in all_agents:
- workflow.add_node(agent)
-
-# Layer 1: Data collectors feed all analysts in parallel
-workflow.add_parallel_chain(
- [agent.agent_name for agent in data_collectors],
- [agent.agent_name for agent in analysts]
-)
+# Add edge
+edge = Edge(source="start", target="end")
+workflow.add_edge(edge)
-# Layer 2: Analysts feed validators
-workflow.add_parallel_chain(
- [agent.agent_name for agent in analysts],
- [agent.agent_name for agent in validators]
-)
+# Save state before execution
+await workflow.save_state("pre_execution")
-# Layer 3: Validators feed synthesis agent
-workflow.add_edges_to_target(
- [agent.agent_name for agent in validators],
- "SynthesisAgent"
-)
+# Execute workflow
+result = await workflow.run("Execute workflow")
-# Visualize and execute
-workflow.visualize(show_summary=True)
-results = workflow.run("Comprehensive analysis of renewable energy markets")
+# Create checkpoint
+checkpoint_id = await workflow.create_checkpoint("Execution completed")
```
-### Workflow with Custom Metadata
+### Complex Workflow with Multiple Node Types
```python
-from swarms import Agent, GraphWorkflow, Edge
-
-# Create agents with specific roles
-research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
-analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")
+from swarms.structs.graph_workflow import GraphWorkflow, Node, Edge, NodeType
-# Build workflow with metadata
-workflow = GraphWorkflow(
- name="Metadata-Workflow",
- description="Workflow demonstrating metadata usage"
+# Create workflow
+workflow = GraphWorkflow(name="ComplexWorkflow")
+
+# Research agent node
+research_node = Node(
+ id="research",
+ type=NodeType.AGENT,
+ agent=research_agent,
+ output_keys=["research_results"],
+ timeout=120.0,
+ retry_count=2,
+ parallel=True,
)
-workflow.add_node(research_agent, metadata={"priority": "high", "timeout": 300})
-workflow.add_node(analysis_agent, metadata={"priority": "medium", "timeout": 600})
-
-# Add edge with metadata
-edge = Edge(
- source="ResearchAgent",
- target="AnalysisAgent",
- metadata={"data_type": "research_findings", "priority": "high"}
+# Data processing node
+process_node = Node(
+ id="process",
+ type=NodeType.DATA_PROCESSOR,
+ callable=process_data,
+ required_inputs=["research_results"],
+ output_keys=["processed_data"],
)
-workflow.add_edge(edge)
-# Execute with custom parameters
-results = workflow.run(
- "Analyze the impact of climate change on agriculture",
- max_loops=2
+# Condition node
+validation_node = Node(
+ id="validate",
+ type=NodeType.CONDITION,
+ condition=lambda data: len(data.get("processed_data", "")) > 100,
+ required_inputs=["processed_data"],
+ output_keys=["validation_passed"],
)
-```
-
-### Workflow Serialization and Persistence
-```python
-from swarms import Agent, GraphWorkflow
+# Add nodes
+workflow.add_node(research_node)
+workflow.add_node(process_node)
+workflow.add_node(validation_node)
-# Create workflow
-research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
-analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")
-
-workflow = GraphWorkflow(name="Persistent-Workflow")
-workflow.add_node(research_agent)
-workflow.add_node(analysis_agent)
-workflow.add_edge("ResearchAgent", "AnalysisAgent")
-
-# Execute and get conversation
-results = workflow.run("Research quantum computing applications")
-
-# Save workflow with conversation history
-filepath = workflow.save_to_file(
- "quantum_research_workflow.json",
- include_conversation=True,
- include_runtime_state=True
-)
+# Add edges
+workflow.add_edge(Edge(source="research", target="process"))
+workflow.add_edge(Edge(source="process", target="validate"))
-# Load workflow later
-loaded_workflow = GraphWorkflow.load_from_file(
- filepath,
- restore_runtime_state=True
-)
+# Set entry and end points
+workflow.set_entry_points(["research"])
+workflow.set_end_points(["validate"])
-# Continue execution
-new_results = loaded_workflow.run("Continue with quantum cryptography analysis")
+# Execute with visualization
+workflow.show_dashboard = True
+result = await workflow.run("Research and analyze AI trends")
```
-### Advanced Pattern Detection
+### Workflow Serialization
```python
-from swarms import Agent, GraphWorkflow
-
-# Create a complex workflow with multiple patterns
-workflow = GraphWorkflow(name="Pattern-Detection-Workflow", verbose=True)
-
-# Create agents
-agents = {
- "collector": Agent(agent_name="DataCollector", model_name="gpt-4"),
- "tech_analyst": Agent(agent_name="TechnicalAnalyst", model_name="gpt-4"),
- "fund_analyst": Agent(agent_name="FundamentalAnalyst", model_name="gpt-4"),
- "sentiment_analyst": Agent(agent_name="SentimentAnalyst", model_name="gpt-4"),
- "risk_analyst": Agent(agent_name="RiskAnalyst", model_name="gpt-4"),
- "synthesis": Agent(agent_name="SynthesisAgent", model_name="gpt-4"),
- "validator": Agent(agent_name="Validator", model_name="gpt-4")
-}
-
-# Add all agents
-for agent in agents.values():
- workflow.add_node(agent)
-
-# Create complex patterns
-# Fan-out from collector
-workflow.add_edges_from_source(
- "DataCollector",
- ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst", "RiskAnalyst"]
-)
-
-# Fan-in to synthesis
-workflow.add_edges_to_target(
- ["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst", "RiskAnalyst"],
- "SynthesisAgent"
-)
+# Save workflow to JSON
+workflow.save_to_file("workflow.json", format="json")
-# Final validation step
-workflow.add_edge("SynthesisAgent", "Validator")
+# Load workflow from JSON
+loaded_workflow = GraphWorkflow.load_from_file("workflow.json")
-# Compile and get status
-workflow.compile()
-status = workflow.get_compilation_status()
+# Export to YAML
+yaml_str = workflow.to_yaml()
+print(yaml_str)
-print(f"Compilation status: {status}")
-print(f"Layers: {status['cached_layers_count']}")
-print(f"Max workers: {status['max_workers']}")
-
-# Visualize with pattern detection
-workflow.visualize(show_summary=True, format="png")
+# Create from YAML
+new_workflow = GraphWorkflow.from_yaml(yaml_str)
```
-### Error Handling and Recovery
+### Visualization and Analytics
```python
-from swarms import Agent, GraphWorkflow
-import logging
+# Generate Mermaid diagram
+mermaid_diagram = workflow.visualize()
+print(mermaid_diagram)
-# Set up logging
-logging.basicConfig(level=logging.INFO)
+# Export visualization
+workflow.export_visualization("workflow.png", format="png")
-# Create workflow with error handling
-workflow = GraphWorkflow(
- name="Error-Handling-Workflow",
- verbose=True,
- max_loops=1
-)
+# Get performance report
+report = workflow.generate_performance_report()
+print(f"Success rate: {report['success_rate']}")
-# Create agents
-try:
- research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
- analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")
-
- workflow.add_node(research_agent)
- workflow.add_node(analysis_agent)
- workflow.add_edge("ResearchAgent", "AnalysisAgent")
-
- # Execute with error handling
- try:
- results = workflow.run("Analyze market trends")
- print("Workflow completed successfully")
- print(results)
-
- except Exception as e:
- print(f"Workflow execution failed: {e}")
-
- # Get workflow summary for debugging
- summary = workflow.export_summary()
- print(f"Workflow state: {summary['structure']}")
-
-except Exception as e:
- print(f"Workflow setup failed: {e}")
+# Get workflow statistics
+stats = workflow.get_workflow_statistics()
+print(f"Total nodes: {stats['node_count']}")
```
-## Conclusion
-
-The `GraphWorkflow` class provides a powerful and flexible framework for orchestrating complex multi-agent workflows. Its key benefits include:
-
-### Benefits
-
-| Benefit | Description |
-|-----------------|--------------------------------------------------------------------------------------------------|
-| **Scalability** | Supports workflows with hundreds of agents through efficient parallel execution |
-| **Flexibility** | Multiple connection patterns (sequential, fan-out, fan-in, parallel chains) |
-| **Performance** | Automatic compilation and optimization for faster execution |
-| **Visualization** | Rich visual representations for workflow understanding and debugging |
-| **Persistence** | Complete serialization and deserialization capabilities |
-| **Error Handling** | Comprehensive error handling and recovery mechanisms |
-| **Monitoring** | Detailed logging and status reporting |
-
-### Use Cases
-
-| Use Case | Description |
-|-------------------------|--------------------------------------------------------------------|
-| **Research Workflows** | Multi-stage research with data collection, analysis, and synthesis |
-| **Content Generation** | Parallel content creation with validation and refinement |
-| **Data Processing** | Complex ETL pipelines with multiple processing stages |
-| **Decision Making** | Multi-agent decision systems with voting and consensus |
-| **Quality Assurance** | Multi-stage validation and verification processes |
-| **Automated Testing** | Complex test orchestration with parallel execution |
-
-### Best Practices
-
-| Best Practice | Description |
-|---------------------------------------|------------------------------------------------------------------|
-| **Use meaningful agent names** | Helps with debugging and visualization |
-| **Leverage parallel patterns** | Use fan-out and fan-in for better performance |
-| **Compile workflows** | Always compile before execution for optimal performance |
-| **Monitor execution** | Use verbose mode and status reporting for debugging |
-| **Save important workflows** | Use serialization for workflow persistence |
-| **Handle errors gracefully** | Implement proper error handling and recovery |
-| **Visualize complex workflows** | Use visualization to understand and debug workflows |
-
-The GraphWorkflow system represents a significant advancement in multi-agent orchestration, providing the tools needed to build complex, scalable, and maintainable AI workflows.
\ No newline at end of file
+## Best Practices
+
+### Performance Optimization
+1. Use appropriate graph engines for your use case
+2. Implement parallel execution for independent tasks
+3. Monitor and optimize bottleneck nodes
+4. Use state management for long-running workflows
+5. Enable performance analytics for optimization insights
+
+### Error Handling
+1. Implement comprehensive retry logic
+2. Use error edges for graceful failure handling
+3. Monitor execution metrics for reliability
+4. Create checkpoints at critical points
+5. Set appropriate timeouts for each node
+
+### State Management
+1. Choose appropriate storage backends for your environment
+2. Implement regular cleanup to manage storage
+3. Use encryption for sensitive workflow data
+4. Create checkpoints before major operations
+5. Monitor state storage usage
+
+### Workflow Design
+1. Plan your workflow structure before implementation
+2. Use meaningful node names and descriptions
+3. Validate workflows before execution
+4. Set appropriate timeouts and retry counts
+5. Test workflows with various input scenarios
+
+## Integration
+
+GraphWorkflow integrates seamlessly with the Swarms framework, providing:
+- **Agent Integration**: Direct support for Swarms agents
+- **Tool Integration**: Compatibility with Swarms tools
+- **Memory Integration**: Support for Swarms memory systems
+- **API Integration**: REST API support for external systems
+
+## Use Cases
+
+### Software Development
+- **Build Pipelines**: Compile, test, and deploy software
+- **Code Review**: Automated code analysis and testing
+- **Release Management**: Coordinate release processes
+
+### Data Processing
+- **ETL Pipelines**: Extract, transform, and load data
+- **Data Validation**: Verify data quality and integrity
+- **Report Generation**: Create automated reports
+
+### AI/ML Workflows
+- **Model Training**: Orchestrate machine learning pipelines
+- **Data Preprocessing**: Prepare data for model training
+- **Model Evaluation**: Test and validate AI models
+
+### Business Processes
+- **Approval Workflows**: Manage approval processes
+- **Customer Onboarding**: Automate customer setup
+- **Order Processing**: Handle order fulfillment