pull/993/head
王祥宇 1 month ago
parent 95001c1f4b
commit b2ff8fbedd

@ -6,7 +6,7 @@ from typing import Any, Callable, List, Optional, Union
import schedule
from loguru import logger
from swarms import Agent
from swarms.structs.agent import Agent
class CronJobError(Exception):

@ -2176,6 +2176,136 @@ class GraphWorkflow:
f"Failed to load GraphWorkflow from {filepath}: {e}"
)
raise e
def validate(self, auto_fix=False) -> Dict[str, Any]:
"""
Validate the workflow structure, checking for potential issues such as isolated nodes,
cyclic dependencies, etc.
Args:
auto_fix (bool): Whether to automatically fix some simple issues (like auto-setting entry/exit points)
Returns:
Dict[str, Any]: Dictionary containing validation results, including validity, warnings and errors
"""
if self.verbose:
logger.debug(f"Validating GraphWorkflow structure (auto_fix={auto_fix})")
result = {
"is_valid": True,
"warnings": [],
"errors": [],
"fixed": []
}
try:
# Check for empty graph
if not self.nodes:
result["errors"].append("Workflow has no nodes")
result["is_valid"] = False
return result
if not self.edges:
result["warnings"].append("Workflow has no edges between nodes")
# Check for node agent instance validity
invalid_agents = []
for node_id, node in self.nodes.items():
if node.agent is None:
invalid_agents.append(node_id)
if invalid_agents:
result["errors"].append(f"Found {len(invalid_agents)} nodes with invalid agent instances: {invalid_agents}")
result["is_valid"] = False
# Check for isolated nodes (no incoming or outgoing edges)
isolated = [n for n in self.nodes if self.graph.in_degree(n) == 0 and self.graph.out_degree(n) == 0]
if isolated:
result["warnings"].append(f"Found {len(isolated)} isolated nodes: {isolated}")
# Check for cyclic dependencies
try:
cycles = list(nx.simple_cycles(self.graph))
if cycles:
result["warnings"].append(f"Found {len(cycles)} cycles in workflow")
result["cycles"] = cycles
except Exception as e:
result["warnings"].append(f"Could not check for cycles: {e}")
# Check entry points
if not self.entry_points:
result["warnings"].append("No entry points defined")
if auto_fix:
self.auto_set_entry_points()
result["fixed"].append("Auto-set entry points")
# Check exit points
if not self.end_points:
result["warnings"].append("No end points defined")
if auto_fix:
self.auto_set_end_points()
result["fixed"].append("Auto-set end points")
# Check for unreachable nodes (not reachable from entry points)
if self.entry_points:
reachable = set()
for entry in self.entry_points:
reachable.update(nx.descendants(self.graph, entry))
reachable.add(entry)
unreachable = set(self.nodes.keys()) - reachable
if unreachable:
result["warnings"].append(f"Found {len(unreachable)} nodes unreachable from entry points: {unreachable}")
if auto_fix and unreachable:
# Add unreachable nodes as entry points
updated_entries = self.entry_points + list(unreachable)
self.set_entry_points(updated_entries)
result["fixed"].append(f"Added {len(unreachable)} unreachable nodes to entry points")
# Check for dead-end nodes (cannot reach any exit point)
if self.end_points:
reverse_graph = self.graph.reverse()
reachable_to_exit = set()
for exit_point in self.end_points:
reachable_to_exit.update(nx.descendants(reverse_graph, exit_point))
reachable_to_exit.add(exit_point)
dead_ends = set(self.nodes.keys()) - reachable_to_exit
if dead_ends:
result["warnings"].append(f"Found {len(dead_ends)} nodes that cannot reach any exit point: {dead_ends}")
if auto_fix and dead_ends:
# Add dead-end nodes as exit points
updated_exits = self.end_points + list(dead_ends)
self.set_end_points(updated_exits)
result["fixed"].append(f"Added {len(dead_ends)} dead-end nodes to exit points")
# Check for serious warnings
has_serious_warnings = any(
"cycle" in warning.lower() or "unreachable" in warning.lower()
for warning in result["warnings"]
)
# If there are errors or serious warnings without fixes, the workflow is invalid
if result["errors"] or (has_serious_warnings and not auto_fix):
result["is_valid"] = False
if self.verbose:
if result["is_valid"]:
if result["warnings"]:
logger.warning(f"Validation found {len(result['warnings'])} warnings but workflow is still valid")
else:
logger.success("Workflow validation completed with no issues")
else:
logger.error(f"Validation found workflow to be invalid with {len(result['errors'])} errors and {len(result['warnings'])} warnings")
if result["fixed"]:
logger.info(f"Auto-fixed {len(result['fixed'])} issues: {', '.join(result['fixed'])}")
return result
except Exception as e:
result["is_valid"] = False
result["errors"].append(str(e))
logger.exception(f"Error during workflow validation: {e}")
return result
def export_summary(self) -> Dict[str, Any]:
"""

Loading…
Cancel
Save