diff --git a/examples/multi_agent/graphworkflow_examples/test_graphworlfolw_validation.py b/examples/multi_agent/graphworkflow_examples/test_graphworlfolw_validation.py new file mode 100644 index 00000000..70e00ae4 --- /dev/null +++ b/examples/multi_agent/graphworkflow_examples/test_graphworlfolw_validation.py @@ -0,0 +1,68 @@ +# demo_validation.py + +from swarms.structs.agent import Agent +from swarms.structs.graph_workflow import GraphWorkflow +from dotenv import load_dotenv + + +# Load environment variables from .env file +load_dotenv() + +# Create simple workflow +print("Creating simple workflow...") +wf = GraphWorkflow(name="Demo-Workflow", verbose=True) + + +agent1 = Agent(agent_name="DataCollector", model_name="claude-3-7-sonnet-20250219") +agent2 = Agent(agent_name="Analyzer", model_name="claude-3-7-sonnet-20250219") +agent3 = Agent(agent_name="Reporter", model_name="claude-3-7-sonnet-20250219") +agent4 = Agent(agent_name="Isolated", model_name="claude-3-7-sonnet-20250219") # Isolated node + + +wf.add_node(agent1) +wf.add_node(agent2) +wf.add_node(agent3) +wf.add_node(agent4) # Add isolated node + + +# Add edges +wf.add_edge("DataCollector", "Analyzer") +wf.add_edge("Analyzer", "Reporter") + + +print("\nValidate workflow (without auto-fix):") +result = wf.validate() +print(f"Workflow is valid: {result['is_valid']}") +print(f"Warnings: {result['warnings']}") +print(f"Errors: {result['errors']}") + + +print("\nValidate workflow (with auto-fix enabled):") +result = wf.validate(auto_fix=True) +print(f"Workflow is valid: {result['is_valid']}") +print(f"Warnings: {result['warnings']}") +print(f"Errors: {result['errors']}") +print(f"Fixed: {result['fixed']}") + + +# Create workflow with cycles +print("\n\nCreating workflow with cycles...") +wf2 = GraphWorkflow(name="Cyclic-Workflow", verbose=True) + + +wf2.add_node(Agent(agent_name="A", model_name="claude-3-7-sonnet-20250219")) +wf2.add_node(Agent(agent_name="B", model_name="claude-3-7-sonnet-20250219")) +wf2.add_node(Agent(agent_name="C", model_name="claude-3-7-sonnet-20250219")) + + +wf2.add_edge("A", "B") +wf2.add_edge("B", "C") +wf2.add_edge("C", "A") # Create cycle + + +print("\nValidate workflow with cycles:") +result = wf2.validate() +print(f"Workflow is valid: {result['is_valid']}") +print(f"Warnings: {result['warnings']}") +if "cycles" in result: + print(f"Detected cycles: {result['cycles']}") \ No newline at end of file diff --git a/swarms/structs/cron_job.py b/swarms/structs/cron_job.py index b6cdf441..79bd1090 100644 --- a/swarms/structs/cron_job.py +++ b/swarms/structs/cron_job.py @@ -6,9 +6,11 @@ from typing import Any, Callable, List, Optional, Union import schedule from loguru import logger + # from swarms import Agent + class CronJobError(Exception): """Base exception class for CronJob errors.""" diff --git a/swarms/structs/graph_workflow.py b/swarms/structs/graph_workflow.py index 667e7a1e..6cdccfaa 100644 --- a/swarms/structs/graph_workflow.py +++ b/swarms/structs/graph_workflow.py @@ -2176,6 +2176,136 @@ class GraphWorkflow: f"Failed to load GraphWorkflow from {filepath}: {e}" ) raise e + def validate(self, auto_fix=False) -> Dict[str, Any]: + """ + Validate the workflow structure, checking for potential issues such as isolated nodes, + cyclic dependencies, etc. + + Args: + auto_fix (bool): Whether to automatically fix some simple issues (like auto-setting entry/exit points) + + Returns: + Dict[str, Any]: Dictionary containing validation results, including validity, warnings and errors + """ + if self.verbose: + logger.debug(f"Validating GraphWorkflow structure (auto_fix={auto_fix})") + + result = { + "is_valid": True, + "warnings": [], + "errors": [], + "fixed": [] + } + + try: + # Check for empty graph + if not self.nodes: + result["errors"].append("Workflow has no nodes") + result["is_valid"] = False + return result + + if not self.edges: + result["warnings"].append("Workflow has no edges between nodes") + + # Check for node agent instance validity + invalid_agents = [] + for node_id, node in self.nodes.items(): + if node.agent is None: + invalid_agents.append(node_id) + + if invalid_agents: + result["errors"].append(f"Found {len(invalid_agents)} nodes with invalid agent instances: {invalid_agents}") + result["is_valid"] = False + + # Check for isolated nodes (no incoming or outgoing edges) + isolated = [n for n in self.nodes if self.graph.in_degree(n) == 0 and self.graph.out_degree(n) == 0] + if isolated: + result["warnings"].append(f"Found {len(isolated)} isolated nodes: {isolated}") + + # Check for cyclic dependencies + try: + cycles = list(nx.simple_cycles(self.graph)) + if cycles: + result["warnings"].append(f"Found {len(cycles)} cycles in workflow") + result["cycles"] = cycles + except Exception as e: + result["warnings"].append(f"Could not check for cycles: {e}") + + # Check entry points + if not self.entry_points: + result["warnings"].append("No entry points defined") + if auto_fix: + self.auto_set_entry_points() + result["fixed"].append("Auto-set entry points") + + # Check exit points + if not self.end_points: + result["warnings"].append("No end points defined") + if auto_fix: + self.auto_set_end_points() + result["fixed"].append("Auto-set end points") + + # Check for unreachable nodes (not reachable from entry points) + if self.entry_points: + reachable = set() + for entry in self.entry_points: + reachable.update(nx.descendants(self.graph, entry)) + reachable.add(entry) + + unreachable = set(self.nodes.keys()) - reachable + if unreachable: + result["warnings"].append(f"Found {len(unreachable)} nodes unreachable from entry points: {unreachable}") + if auto_fix and unreachable: + # Add unreachable nodes as entry points + updated_entries = self.entry_points + list(unreachable) + self.set_entry_points(updated_entries) + result["fixed"].append(f"Added {len(unreachable)} unreachable nodes to entry points") + + # Check for dead-end nodes (cannot reach any exit point) + if self.end_points: + reverse_graph = self.graph.reverse() + reachable_to_exit = set() + for exit_point in self.end_points: + reachable_to_exit.update(nx.descendants(reverse_graph, exit_point)) + reachable_to_exit.add(exit_point) + + dead_ends = set(self.nodes.keys()) - reachable_to_exit + if dead_ends: + result["warnings"].append(f"Found {len(dead_ends)} nodes that cannot reach any exit point: {dead_ends}") + if auto_fix and dead_ends: + # Add dead-end nodes as exit points + updated_exits = self.end_points + list(dead_ends) + self.set_end_points(updated_exits) + result["fixed"].append(f"Added {len(dead_ends)} dead-end nodes to exit points") + + # Check for serious warnings + has_serious_warnings = any( + "cycle" in warning.lower() or "unreachable" in warning.lower() + for warning in result["warnings"] + ) + + # If there are errors or serious warnings without fixes, the workflow is invalid + if result["errors"] or (has_serious_warnings and not auto_fix): + result["is_valid"] = False + + if self.verbose: + if result["is_valid"]: + if result["warnings"]: + logger.warning(f"Validation found {len(result['warnings'])} warnings but workflow is still valid") + else: + logger.success("Workflow validation completed with no issues") + else: + logger.error(f"Validation found workflow to be invalid with {len(result['errors'])} errors and {len(result['warnings'])} warnings") + + if result["fixed"]: + logger.info(f"Auto-fixed {len(result['fixed'])} issues: {', '.join(result['fixed'])}") + + return result + except Exception as e: + result["is_valid"] = False + result["errors"].append(str(e)) + logger.exception(f"Error during workflow validation: {e}") + return result def export_summary(self) -> Dict[str, Any]: """