[Add workflow validation method to GraphWorkflow] Merge pull request #993 from Wxysnx/2025072902

Add workflow validation method to GraphWorkflow
pull/1001/head
Kye Gomez 1 month ago committed by GitHub
commit d26570902c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,68 @@
# demo_validation.py
from swarms.structs.agent import Agent
from swarms.structs.graph_workflow import GraphWorkflow
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Create simple workflow
print("Creating simple workflow...")
wf = GraphWorkflow(name="Demo-Workflow", verbose=True)
agent1 = Agent(agent_name="DataCollector", model_name="claude-3-7-sonnet-20250219")
agent2 = Agent(agent_name="Analyzer", model_name="claude-3-7-sonnet-20250219")
agent3 = Agent(agent_name="Reporter", model_name="claude-3-7-sonnet-20250219")
agent4 = Agent(agent_name="Isolated", model_name="claude-3-7-sonnet-20250219") # Isolated node
wf.add_node(agent1)
wf.add_node(agent2)
wf.add_node(agent3)
wf.add_node(agent4) # Add isolated node
# Add edges
wf.add_edge("DataCollector", "Analyzer")
wf.add_edge("Analyzer", "Reporter")
print("\nValidate workflow (without auto-fix):")
result = wf.validate()
print(f"Workflow is valid: {result['is_valid']}")
print(f"Warnings: {result['warnings']}")
print(f"Errors: {result['errors']}")
print("\nValidate workflow (with auto-fix enabled):")
result = wf.validate(auto_fix=True)
print(f"Workflow is valid: {result['is_valid']}")
print(f"Warnings: {result['warnings']}")
print(f"Errors: {result['errors']}")
print(f"Fixed: {result['fixed']}")
# Create workflow with cycles
print("\n\nCreating workflow with cycles...")
wf2 = GraphWorkflow(name="Cyclic-Workflow", verbose=True)
wf2.add_node(Agent(agent_name="A", model_name="claude-3-7-sonnet-20250219"))
wf2.add_node(Agent(agent_name="B", model_name="claude-3-7-sonnet-20250219"))
wf2.add_node(Agent(agent_name="C", model_name="claude-3-7-sonnet-20250219"))
wf2.add_edge("A", "B")
wf2.add_edge("B", "C")
wf2.add_edge("C", "A") # Create cycle
print("\nValidate workflow with cycles:")
result = wf2.validate()
print(f"Workflow is valid: {result['is_valid']}")
print(f"Warnings: {result['warnings']}")
if "cycles" in result:
print(f"Detected cycles: {result['cycles']}")

@ -6,9 +6,11 @@ from typing import Any, Callable, List, Optional, Union
import schedule
from loguru import logger
# from swarms import Agent
class CronJobError(Exception):
"""Base exception class for CronJob errors."""

@ -2176,6 +2176,136 @@ class GraphWorkflow:
f"Failed to load GraphWorkflow from {filepath}: {e}"
)
raise e
def validate(self, auto_fix=False) -> Dict[str, Any]:
"""
Validate the workflow structure, checking for potential issues such as isolated nodes,
cyclic dependencies, etc.
Args:
auto_fix (bool): Whether to automatically fix some simple issues (like auto-setting entry/exit points)
Returns:
Dict[str, Any]: Dictionary containing validation results, including validity, warnings and errors
"""
if self.verbose:
logger.debug(f"Validating GraphWorkflow structure (auto_fix={auto_fix})")
result = {
"is_valid": True,
"warnings": [],
"errors": [],
"fixed": []
}
try:
# Check for empty graph
if not self.nodes:
result["errors"].append("Workflow has no nodes")
result["is_valid"] = False
return result
if not self.edges:
result["warnings"].append("Workflow has no edges between nodes")
# Check for node agent instance validity
invalid_agents = []
for node_id, node in self.nodes.items():
if node.agent is None:
invalid_agents.append(node_id)
if invalid_agents:
result["errors"].append(f"Found {len(invalid_agents)} nodes with invalid agent instances: {invalid_agents}")
result["is_valid"] = False
# Check for isolated nodes (no incoming or outgoing edges)
isolated = [n for n in self.nodes if self.graph.in_degree(n) == 0 and self.graph.out_degree(n) == 0]
if isolated:
result["warnings"].append(f"Found {len(isolated)} isolated nodes: {isolated}")
# Check for cyclic dependencies
try:
cycles = list(nx.simple_cycles(self.graph))
if cycles:
result["warnings"].append(f"Found {len(cycles)} cycles in workflow")
result["cycles"] = cycles
except Exception as e:
result["warnings"].append(f"Could not check for cycles: {e}")
# Check entry points
if not self.entry_points:
result["warnings"].append("No entry points defined")
if auto_fix:
self.auto_set_entry_points()
result["fixed"].append("Auto-set entry points")
# Check exit points
if not self.end_points:
result["warnings"].append("No end points defined")
if auto_fix:
self.auto_set_end_points()
result["fixed"].append("Auto-set end points")
# Check for unreachable nodes (not reachable from entry points)
if self.entry_points:
reachable = set()
for entry in self.entry_points:
reachable.update(nx.descendants(self.graph, entry))
reachable.add(entry)
unreachable = set(self.nodes.keys()) - reachable
if unreachable:
result["warnings"].append(f"Found {len(unreachable)} nodes unreachable from entry points: {unreachable}")
if auto_fix and unreachable:
# Add unreachable nodes as entry points
updated_entries = self.entry_points + list(unreachable)
self.set_entry_points(updated_entries)
result["fixed"].append(f"Added {len(unreachable)} unreachable nodes to entry points")
# Check for dead-end nodes (cannot reach any exit point)
if self.end_points:
reverse_graph = self.graph.reverse()
reachable_to_exit = set()
for exit_point in self.end_points:
reachable_to_exit.update(nx.descendants(reverse_graph, exit_point))
reachable_to_exit.add(exit_point)
dead_ends = set(self.nodes.keys()) - reachable_to_exit
if dead_ends:
result["warnings"].append(f"Found {len(dead_ends)} nodes that cannot reach any exit point: {dead_ends}")
if auto_fix and dead_ends:
# Add dead-end nodes as exit points
updated_exits = self.end_points + list(dead_ends)
self.set_end_points(updated_exits)
result["fixed"].append(f"Added {len(dead_ends)} dead-end nodes to exit points")
# Check for serious warnings
has_serious_warnings = any(
"cycle" in warning.lower() or "unreachable" in warning.lower()
for warning in result["warnings"]
)
# If there are errors or serious warnings without fixes, the workflow is invalid
if result["errors"] or (has_serious_warnings and not auto_fix):
result["is_valid"] = False
if self.verbose:
if result["is_valid"]:
if result["warnings"]:
logger.warning(f"Validation found {len(result['warnings'])} warnings but workflow is still valid")
else:
logger.success("Workflow validation completed with no issues")
else:
logger.error(f"Validation found workflow to be invalid with {len(result['errors'])} errors and {len(result['warnings'])} warnings")
if result["fixed"]:
logger.info(f"Auto-fixed {len(result['fixed'])} issues: {', '.join(result['fixed'])}")
return result
except Exception as e:
result["is_valid"] = False
result["errors"].append(str(e))
logger.exception(f"Error during workflow validation: {e}")
return result
def export_summary(self) -> Dict[str, Any]:
"""

Loading…
Cancel
Save