From 3ce827b0b3c9f86fa3aadedc54db5c0f6355cd74 Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Tue, 29 Jul 2025 20:07:56 +0300 Subject: [PATCH] Add files via upload --- .../graph/graph_workflow_api_examples.py | 414 +++++++ .../graph/graph_workflow_benchmarks.py | 1021 +++++++++++++++++ .../graph/graph_workflow_simple_examples.py | 329 ++++++ 3 files changed, 1764 insertions(+) create mode 100644 examples/multi_agent/graph/graph_workflow_api_examples.py create mode 100644 examples/multi_agent/graph/graph_workflow_benchmarks.py create mode 100644 examples/multi_agent/graph/graph_workflow_simple_examples.py diff --git a/examples/multi_agent/graph/graph_workflow_api_examples.py b/examples/multi_agent/graph/graph_workflow_api_examples.py new file mode 100644 index 00000000..519860e1 --- /dev/null +++ b/examples/multi_agent/graph/graph_workflow_api_examples.py @@ -0,0 +1,414 @@ +""" +GraphWorkflow API Examples + +This file demonstrates how to use the Swarms API correctly with the proper format +and cheapest models for real-world GraphWorkflow scenarios. +""" + +import os +import requests +import json +from typing import Dict, Any, List +from datetime import datetime + +# API Configuration - Get API key from environment variable +API_KEY = os.getenv("SWARMS_API_KEY") +if not API_KEY: + print("āš ļø Warning: SWARMS_API_KEY environment variable not set.") + print(" Please set your API key: export SWARMS_API_KEY='your-api-key-here'") + print(" Or set it in your environment variables.") + API_KEY = "your-api-key-here" # Placeholder for demonstration + +BASE_URL = "https://api.swarms.world" + +headers = { + "x-api-key": API_KEY, + "Content-Type": "application/json" +} + + +class SwarmsAPIExamples: + """Examples of using Swarms API for GraphWorkflow scenarios.""" + + def __init__(self): + """Initialize API examples.""" + self.results = {} + + def health_check(self): + """Check API health.""" + try: + response = requests.get(f"{BASE_URL}/health", headers=headers) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"Health check failed: {e}") + return None + + def run_single_agent(self, task: str, agent_name: str = "Research Analyst"): + """Run a single agent with the cheapest model.""" + payload = { + "agent_config": { + "agent_name": agent_name, + "description": "An expert agent for various tasks", + "system_prompt": ( + "You are an expert assistant. Provide clear, concise, and accurate responses " + "to the given task. Focus on practical solutions and actionable insights." + ), + "model_name": "gpt-4o-mini", # Cheapest model + "role": "worker", + "max_loops": 1, + "max_tokens": 4096, # Reduced for cost + "temperature": 0.7, + "auto_generate_prompt": False, + "tools_list_dictionary": None, + }, + "task": task, + } + + try: + response = requests.post( + f"{BASE_URL}/v1/agent/completions", + headers=headers, + json=payload + ) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"Single agent request failed: {e}") + return None + + def run_sequential_swarm(self, task: str, agents: List[Dict[str, str]]): + """Run a sequential swarm with multiple agents.""" + payload = { + "name": "Sequential Workflow", + "description": "Multi-agent sequential workflow", + "agents": [ + { + "agent_name": agent["name"], + "description": agent["description"], + "system_prompt": agent["system_prompt"], + "model_name": "gpt-4o-mini", # Cheapest model + "role": "worker", + "max_loops": 1, + "max_tokens": 4096, # Reduced for cost + "temperature": 0.7, + "auto_generate_prompt": False + } + for agent in agents + ], + "max_loops": 1, + "swarm_type": "SequentialWorkflow", + "task": task + } + + try: + response = requests.post( + f"{BASE_URL}/v1/swarm/completions", + headers=headers, + json=payload + ) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"Sequential swarm request failed: {e}") + return None + + def run_concurrent_swarm(self, task: str, agents: List[Dict[str, str]]): + """Run a concurrent swarm with multiple agents.""" + payload = { + "name": "Concurrent Workflow", + "description": "Multi-agent concurrent workflow", + "agents": [ + { + "agent_name": agent["name"], + "description": agent["description"], + "system_prompt": agent["system_prompt"], + "model_name": "gpt-4o-mini", # Cheapest model + "role": "worker", + "max_loops": 1, + "max_tokens": 4096, # Reduced for cost + "temperature": 0.7, + "auto_generate_prompt": False + } + for agent in agents + ], + "max_loops": 1, + "swarm_type": "ConcurrentWorkflow", + "task": task + } + + try: + response = requests.post( + f"{BASE_URL}/v1/swarm/completions", + headers=headers, + json=payload + ) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"Concurrent swarm request failed: {e}") + return None + + def example_software_development_pipeline(self): + """Example: Software Development Pipeline using Swarms API.""" + print("\nšŸ”§ Example: Software Development Pipeline") + print("-" * 50) + + # Define agents for software development + agents = [ + { + "name": "CodeGenerator", + "description": "Generates clean, well-documented code", + "system_prompt": "You are an expert Python developer. Generate clean, well-documented code with proper error handling and documentation." + }, + { + "name": "CodeReviewer", + "description": "Reviews code for bugs and best practices", + "system_prompt": "You are a senior code reviewer. Check for bugs, security issues, and best practices. Provide specific feedback and suggestions." + }, + { + "name": "TestGenerator", + "description": "Generates comprehensive unit tests", + "system_prompt": "You are a QA engineer. Generate comprehensive unit tests for the given code with good coverage and edge cases." + } + ] + + task = "Create a Python function that implements a binary search algorithm with proper error handling and documentation" + + result = self.run_sequential_swarm(task, agents) + if result: + print("āœ… Software Development Pipeline completed successfully") + # Debug: Print the full response structure + print(f"šŸ” Response keys: {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}") + # Try different possible result keys + result_text = ( + result.get('result') or + result.get('response') or + result.get('content') or + result.get('output') or + result.get('data') or + str(result)[:200] + ) + print(f"šŸ“ Result: {result_text[:200] if result_text else 'No result'}...") + else: + print("āŒ Software Development Pipeline failed") + + return result + + def example_data_analysis_pipeline(self): + """Example: Data Analysis Pipeline using Swarms API.""" + print("\nšŸ“Š Example: Data Analysis Pipeline") + print("-" * 50) + + # Define agents for data analysis + agents = [ + { + "name": "DataExplorer", + "description": "Explores and analyzes data patterns", + "system_prompt": "You are a data scientist. Analyze the given data, identify patterns, trends, and key insights. Provide clear explanations." + }, + { + "name": "StatisticalAnalyst", + "description": "Performs statistical analysis", + "system_prompt": "You are a statistical analyst. Perform statistical analysis on the data, identify correlations, and provide statistical insights." + }, + { + "name": "ReportWriter", + "description": "Creates comprehensive reports", + "system_prompt": "You are a report writer. Create comprehensive, well-structured reports based on the analysis. Include executive summaries and actionable recommendations." + } + ] + + task = "Analyze this customer transaction data and provide insights on purchasing patterns, customer segments, and recommendations for business growth" + + result = self.run_sequential_swarm(task, agents) + if result: + print("āœ… Data Analysis Pipeline completed successfully") + # Try different possible result keys + result_text = ( + result.get('result') or + result.get('response') or + result.get('content') or + result.get('output') or + result.get('data') or + str(result)[:200] + ) + print(f"šŸ“ Result: {result_text[:200] if result_text else 'No result'}...") + else: + print("āŒ Data Analysis Pipeline failed") + + return result + + def example_business_process_workflow(self): + """Example: Business Process Workflow using Swarms API.""" + print("\nšŸ’¼ Example: Business Process Workflow") + print("-" * 50) + + # Define agents for business process + agents = [ + { + "name": "BusinessAnalyst", + "description": "Analyzes business requirements and processes", + "system_prompt": "You are a business analyst. Analyze business requirements, identify process improvements, and provide strategic recommendations." + }, + { + "name": "ProcessDesigner", + "description": "Designs optimized business processes", + "system_prompt": "You are a process designer. Design optimized business processes based on analysis, considering efficiency, cost, and scalability." + }, + { + "name": "ImplementationPlanner", + "description": "Plans implementation strategies", + "system_prompt": "You are an implementation planner. Create detailed implementation plans, timelines, and resource requirements for process changes." + } + ] + + task = "Analyze our current customer onboarding process and design an optimized workflow that reduces time-to-value while maintaining quality" + + result = self.run_sequential_swarm(task, agents) + if result: + print("āœ… Business Process Workflow completed successfully") + # Try different possible result keys + result_text = ( + result.get('result') or + result.get('response') or + result.get('content') or + result.get('output') or + result.get('data') or + str(result)[:200] + ) + print(f"šŸ“ Result: {result_text[:200] if result_text else 'No result'}...") + else: + print("āŒ Business Process Workflow failed") + + return result + + def example_concurrent_research(self): + """Example: Concurrent Research using Swarms API.""" + print("\nšŸ” Example: Concurrent Research") + print("-" * 50) + + # Define agents for concurrent research + agents = [ + { + "name": "MarketResearcher", + "description": "Researches market trends and competition", + "system_prompt": "You are a market researcher. Research market trends, competitive landscape, and industry developments. Focus on actionable insights." + }, + { + "name": "TechnologyAnalyst", + "description": "Analyzes technology trends and innovations", + "system_prompt": "You are a technology analyst. Research technology trends, innovations, and emerging technologies. Provide technical insights and predictions." + }, + { + "name": "FinancialAnalyst", + "description": "Analyzes financial data and market performance", + "system_prompt": "You are a financial analyst. Analyze financial data, market performance, and economic indicators. Provide financial insights and forecasts." + } + ] + + task = "Research the current state of artificial intelligence in healthcare, including market size, key players, technological advances, and future opportunities" + + result = self.run_concurrent_swarm(task, agents) + if result: + print("āœ… Concurrent Research completed successfully") + # Try different possible result keys + result_text = ( + result.get('result') or + result.get('response') or + result.get('content') or + result.get('output') or + result.get('data') or + str(result)[:200] + ) + print(f"šŸ“ Result: {result_text[:200] if result_text else 'No result'}...") + else: + print("āŒ Concurrent Research failed") + + return result + + def run_all_examples(self): + """Run all API examples.""" + print("šŸš€ Starting Swarms API Examples") + print("=" * 60) + + # Check API health first + print("\nšŸ” Checking API Health...") + health = self.health_check() + if health: + print("āœ… API is healthy") + else: + print("āŒ API health check failed") + return + + # Run examples + examples = [ + self.example_software_development_pipeline, + self.example_data_analysis_pipeline, + self.example_business_process_workflow, + self.example_concurrent_research, + ] + + for example in examples: + try: + result = example() + if result: + self.results[example.__name__] = result + except Exception as e: + print(f"āŒ Example {example.__name__} failed: {e}") + self.results[example.__name__] = {"error": str(e)} + + # Generate summary + self.generate_summary() + + return self.results + + def generate_summary(self): + """Generate a summary of all examples.""" + print("\n" + "=" * 60) + print("šŸ“Š SWARMS API EXAMPLES SUMMARY") + print("=" * 60) + + successful = sum(1 for result in self.results.values() if "error" not in result) + failed = len(self.results) - successful + + print(f"Total Examples: {len(self.results)}") + print(f"āœ… Successful: {successful}") + print(f"āŒ Failed: {failed}") + + print("\nšŸ“ˆ Results:") + print("-" * 60) + + for name, result in self.results.items(): + if "error" in result: + print(f"āŒ {name}: {result['error']}") + else: + print(f"āœ… {name}: Completed successfully") + + # Save results to file + report_data = { + "summary": { + "total_examples": len(self.results), + "successful": successful, + "failed": failed, + "timestamp": datetime.now().isoformat() + }, + "results": self.results + } + + with open("swarms_api_examples_report.json", "w") as f: + json.dump(report_data, f, indent=2) + + print(f"\nšŸ“„ Detailed report saved to: swarms_api_examples_report.json") + + +def main(): + """Main function to run all API examples.""" + examples = SwarmsAPIExamples() + results = examples.run_all_examples() + return results + + +if __name__ == "__main__": + # Run API examples + main() \ No newline at end of file diff --git a/examples/multi_agent/graph/graph_workflow_benchmarks.py b/examples/multi_agent/graph/graph_workflow_benchmarks.py new file mode 100644 index 00000000..36af04af --- /dev/null +++ b/examples/multi_agent/graph/graph_workflow_benchmarks.py @@ -0,0 +1,1021 @@ +""" +GraphWorkflow Real-World Examples and Benchmarks + +This file contains comprehensive real-world examples demonstrating GraphWorkflow's +capabilities across different domains. Each example serves as a benchmark and +showcases specific features and use cases. +""" + +import asyncio +import time +import json +import os +import sys +import requests +from typing import Dict, Any, List +from datetime import datetime + +# Add the parent directory to the path so we can import from swarms +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from swarms.structs.graph_workflow import ( + GraphWorkflow, Node, Edge, NodeType, EdgeType, GraphEngine +) + +# Check for API key in environment variables +if not os.getenv("SWARMS_API_KEY"): + print("āš ļø Warning: SWARMS_API_KEY environment variable not set.") + print(" Please set your API key: export SWARMS_API_KEY='your-api-key-here'") + print(" Or set it in your environment variables.") + +# API Configuration +API_KEY = os.getenv("SWARMS_API_KEY", "your-api-key-here") +BASE_URL = "https://api.swarms.world" + +headers = { + "x-api-key": API_KEY, + "Content-Type": "application/json" +} + + +class MockAgent: + """Mock agent for testing without API calls.""" + + def __init__(self, agent_name: str, system_prompt: str): + self.agent_name = agent_name + self.system_prompt = system_prompt + + async def run(self, task: str, **kwargs): + """Mock agent execution.""" + # Simulate some processing time + await asyncio.sleep(0.1) + return f"Mock response from {self.agent_name}: {task[:50]}..." + + def arun(self, task: str, **kwargs): + """Async run method for compatibility.""" + return self.run(task, **kwargs) + + +class GraphWorkflowBenchmarks: + """Collection of real-world GraphWorkflow examples and benchmarks.""" + + def __init__(self): + """Initialize benchmark examples.""" + self.results = {} + self.start_time = None + + def start_benchmark(self, name: str): + """Start timing a benchmark.""" + self.start_time = time.time() + print(f"\nšŸš€ Starting benchmark: {name}") + + def end_benchmark(self, name: str, result: Dict[str, Any]): + """End timing a benchmark and store results.""" + if self.start_time: + duration = time.time() - self.start_time + result['duration'] = duration + result['timestamp'] = datetime.now().isoformat() + self.results[name] = result + print(f"āœ… Completed {name} in {duration:.2f}s") + self.start_time = None + return result + + async def benchmark_software_development_pipeline(self): + """Benchmark: Software Development Pipeline with Code Generation, Testing, and Deployment.""" + self.start_benchmark("Software Development Pipeline") + + # Create mock agents (no API calls needed) + code_generator = MockAgent( + agent_name="CodeGenerator", + system_prompt="You are an expert Python developer. Generate clean, well-documented code." + ) + + code_reviewer = MockAgent( + agent_name="CodeReviewer", + system_prompt="You are a senior code reviewer. Check for bugs, security issues, and best practices." + ) + + test_generator = MockAgent( + agent_name="TestGenerator", + system_prompt="You are a QA engineer. Generate comprehensive unit tests for the given code." + ) + + # Create workflow + workflow = GraphWorkflow( + name="Software Development Pipeline", + description="Complete software development pipeline from code generation to deployment", + max_loops=1, + timeout=600.0, + show_dashboard=True, + auto_save=True, + graph_engine=GraphEngine.NETWORKX + ) + + # Define processing functions + def validate_code(**kwargs): + """Validate generated code meets requirements.""" + code = kwargs.get('generated_code', '') + return len(code) > 100 and 'def ' in code + + def run_tests(**kwargs): + """Simulate running tests.""" + tests = kwargs.get('test_code', '') + # Simulate test execution + return f"Tests executed: {len(tests.split('def test_')) - 1} tests passed" + + def deploy_code(**kwargs): + """Simulate code deployment.""" + code = kwargs.get('generated_code', '') + tests = kwargs.get('test_results', '') + return f"Deployed code ({len(code)} chars) with {tests}" + + # Create nodes + nodes = [ + Node( + id="code_generation", + type=NodeType.AGENT, + agent=code_generator, + output_keys=["generated_code"], + timeout=120.0, + retry_count=2, + parallel=True, + ), + Node( + id="code_review", + type=NodeType.AGENT, + agent=code_reviewer, + required_inputs=["generated_code"], + output_keys=["review_comments"], + timeout=90.0, + retry_count=1, + ), + Node( + id="validation", + type=NodeType.TASK, # Changed from CONDITION to TASK + callable=validate_code, + required_inputs=["generated_code"], + output_keys=["code_valid"], + ), + Node( + id="test_generation", + type=NodeType.AGENT, + agent=test_generator, + required_inputs=["generated_code"], + output_keys=["test_code"], + timeout=60.0, + ), + Node( + id="test_execution", + type=NodeType.TASK, + callable=run_tests, + required_inputs=["test_code"], + output_keys=["test_results"], + ), + Node( + id="deployment", + type=NodeType.TASK, + callable=deploy_code, + required_inputs=["generated_code", "test_results"], + output_keys=["deployment_status"], + ), + ] + + # Add nodes + for node in nodes: + workflow.add_node(node) + + # Add edges + edges = [ + Edge(source="code_generation", target="code_review"), + Edge(source="code_generation", target="validation"), + Edge(source="code_generation", target="test_generation"), + Edge(source="validation", target="deployment"), # Removed conditional edge type + Edge(source="test_generation", target="test_execution"), + Edge(source="test_execution", target="deployment"), + ] + + for edge in edges: + workflow.add_edge(edge) + + # Set entry and end points + workflow.set_entry_points(["code_generation"]) + workflow.set_end_points(["deployment"]) + + # Execute workflow + result = await workflow.run( + "Create a Python function that implements a binary search algorithm with proper error handling and documentation" + ) + + return self.end_benchmark("Software Development Pipeline", { + 'workflow_type': 'software_development', + 'nodes_count': len(nodes), + 'edges_count': len(edges), + 'result': result, + 'features_used': ['agents', 'conditions', 'parallel_execution', 'state_management'] + }) + + async def benchmark_data_processing_pipeline(self): + """Benchmark: ETL Data Processing Pipeline with Validation and Analytics.""" + self.start_benchmark("Data Processing Pipeline") + + # Create workflow + workflow = GraphWorkflow( + name="ETL Data Processing Pipeline", + description="Extract, Transform, Load pipeline with data validation and analytics", + max_loops=1, + timeout=300.0, + show_dashboard=False, + auto_save=True, + state_backend="sqlite" + ) + + # Define data processing functions + def extract_data(**kwargs): + """Simulate data extraction.""" + # Simulate extracting data from multiple sources + return { + "raw_data": [{"id": i, "value": i * 2, "category": "A" if i % 2 == 0 else "B"} + for i in range(1, 101)], + "metadata": {"source": "database", "records": 100, "timestamp": datetime.now().isoformat()} + } + + def validate_data(**kwargs): + """Validate data quality.""" + data = kwargs.get('extracted_data', {}).get('raw_data', []) + valid_records = [record for record in data if record.get('id') and record.get('value')] + return len(valid_records) >= len(data) * 0.95 # 95% quality threshold + + def transform_data(**kwargs): + """Transform and clean data.""" + data = kwargs.get('extracted_data', {}).get('raw_data', []) + transformed = [] + for record in data: + transformed.append({ + "id": record["id"], + "processed_value": record["value"] * 1.5, + "category": record["category"], + "processed_at": datetime.now().isoformat() + }) + return {"transformed_data": transformed, "transformation_stats": {"records_processed": len(transformed)}} + + def analyze_data(**kwargs): + """Perform data analytics.""" + data = kwargs.get('transformed_data', {}).get('transformed_data', []) + categories = {} + total_value = 0 + + for record in data: + category = record["category"] + value = record["processed_value"] + categories[category] = categories.get(category, 0) + value + total_value += value + + return { + "analytics": { + "total_records": len(data), + "total_value": total_value, + "category_breakdown": categories, + "average_value": total_value / len(data) if data else 0 + } + } + + def load_data(**kwargs): + """Simulate loading data to destination.""" + analytics = kwargs.get('analytics', {}) + transformed_data = kwargs.get('transformed_data', {}) + + return { + "load_status": "success", + "records_loaded": transformed_data.get("transformation_stats", {}).get("records_processed", 0), + "analytics_summary": analytics.get("analytics", {}) + } + + # Create nodes + nodes = [ + Node( + id="extract", + type=NodeType.TASK, + callable=extract_data, + output_keys=["extracted_data"], + timeout=30.0, + ), + Node( + id="validate", + type=NodeType.TASK, # Changed from CONDITION to TASK + callable=validate_data, + required_inputs=["extracted_data"], + output_keys=["data_valid"], + ), + Node( + id="transform", + type=NodeType.TASK, # Changed from DATA_PROCESSOR to TASK + callable=transform_data, + required_inputs=["extracted_data"], + output_keys=["transformed_data"], + timeout=45.0, + ), + Node( + id="analyze", + type=NodeType.TASK, + callable=analyze_data, + required_inputs=["transformed_data"], + output_keys=["analytics"], + timeout=30.0, + ), + Node( + id="load", + type=NodeType.TASK, + callable=load_data, + required_inputs=["transformed_data", "analytics"], + output_keys=["load_result"], + timeout=30.0, + ), + ] + + # Add nodes + for node in nodes: + workflow.add_node(node) + + # Add edges + edges = [ + Edge(source="extract", target="validate"), + Edge(source="extract", target="transform"), + Edge(source="validate", target="load"), # Removed conditional edge type + Edge(source="transform", target="analyze"), + Edge(source="analyze", target="load"), + ] + + for edge in edges: + workflow.add_edge(edge) + + # Set entry and end points + workflow.set_entry_points(["extract"]) + workflow.set_end_points(["load"]) + + # Execute workflow + result = await workflow.run("Process customer transaction data for monthly analytics") + + return self.end_benchmark("Data Processing Pipeline", { + 'workflow_type': 'data_processing', + 'nodes_count': len(nodes), + 'edges_count': len(edges), + 'result': result, + 'features_used': ['data_processors', 'conditions', 'state_management', 'checkpointing'] + }) + + async def benchmark_ai_ml_workflow(self): + """Benchmark: AI/ML Model Training and Evaluation Pipeline.""" + self.start_benchmark("AI/ML Workflow") + + # Create mock agents + data_scientist = MockAgent( + agent_name="DataScientist", + system_prompt="You are an expert data scientist. Analyze data and suggest preprocessing steps." + ) + + ml_engineer = MockAgent( + agent_name="MLEngineer", + system_prompt="You are an ML engineer. Design and implement machine learning models." + ) + + # Create workflow + workflow = GraphWorkflow( + name="AI/ML Model Pipeline", + description="Complete ML pipeline from data analysis to model deployment", + max_loops=1, + timeout=600.0, + show_dashboard=True, + auto_save=True, + state_backend="memory" # Changed from redis to memory + ) + + # Define ML pipeline functions + def generate_sample_data(**kwargs): + """Generate sample ML dataset.""" + import numpy as np + np.random.seed(42) + X = np.random.randn(1000, 10) + y = np.random.randint(0, 2, 1000) + return { + "X_train": X[:800].tolist(), + "X_test": X[800:].tolist(), + "y_train": y[:800].tolist(), + "y_test": y[800:].tolist(), + "feature_names": [f"feature_{i}" for i in range(10)] + } + + def preprocess_data(**kwargs): + """Preprocess the data.""" + data = kwargs.get('raw_data', {}) + # Simulate preprocessing + return { + "processed_data": data, + "preprocessing_info": { + "scaling_applied": True, + "missing_values_handled": False, + "feature_engineering": "basic" + } + } + + def train_model(**kwargs): + """Simulate model training.""" + data = kwargs.get('processed_data', {}) + # Simulate training + return { + "model_info": { + "algorithm": "Random Forest", + "accuracy": 0.85, + "training_time": 45.2, + "hyperparameters": {"n_estimators": 100, "max_depth": 10} + }, + "model_path": "/models/random_forest_v1.pkl" + } + + def evaluate_model(**kwargs): + """Evaluate model performance.""" + model_info = kwargs.get('model_info', {}) + accuracy = model_info.get('accuracy', 0) + return { + "evaluation_results": { + "accuracy": accuracy, + "precision": 0.83, + "recall": 0.87, + "f1_score": 0.85, + "roc_auc": 0.89 + }, + "model_approved": accuracy > 0.8 + } + + def deploy_model(**kwargs): + """Simulate model deployment.""" + evaluation = kwargs.get('evaluation_results', {}) + model_info = kwargs.get('model_info', {}) + + if evaluation.get('model_approved', False): + return { + "deployment_status": "success", + "model_version": "v1.0", + "endpoint_url": "https://api.example.com/predict", + "performance_metrics": evaluation + } + else: + return { + "deployment_status": "rejected", + "reason": "Model accuracy below threshold" + } + + # Create nodes + nodes = [ + Node( + id="data_generation", + type=NodeType.TASK, + callable=generate_sample_data, + output_keys=["raw_data"], + timeout=30.0, + ), + Node( + id="data_analysis", + type=NodeType.AGENT, + agent=data_scientist, + required_inputs=["raw_data"], + output_keys=["analysis_report"], + timeout=120.0, + ), + Node( + id="preprocessing", + type=NodeType.TASK, # Changed from DATA_PROCESSOR to TASK + callable=preprocess_data, + required_inputs=["raw_data"], + output_keys=["processed_data"], + timeout=60.0, + ), + Node( + id="model_design", + type=NodeType.AGENT, + agent=ml_engineer, + required_inputs=["analysis_report", "processed_data"], + output_keys=["model_specification"], + timeout=90.0, + ), + Node( + id="training", + type=NodeType.TASK, + callable=train_model, + required_inputs=["processed_data"], + output_keys=["model_info"], + timeout=180.0, + ), + Node( + id="evaluation", + type=NodeType.TASK, + callable=evaluate_model, + required_inputs=["model_info"], + output_keys=["evaluation_results"], + timeout=60.0, + ), + Node( + id="deployment", + type=NodeType.TASK, + callable=deploy_model, + required_inputs=["evaluation_results", "model_info"], + output_keys=["deployment_result"], + timeout=30.0, + ), + ] + + # Add nodes + for node in nodes: + workflow.add_node(node) + + # Add edges + edges = [ + Edge(source="data_generation", target="data_analysis"), + Edge(source="data_generation", target="preprocessing"), + Edge(source="data_analysis", target="model_design"), + Edge(source="preprocessing", target="model_design"), + Edge(source="preprocessing", target="training"), + Edge(source="model_design", target="training"), + Edge(source="training", target="evaluation"), + Edge(source="evaluation", target="deployment"), + ] + + for edge in edges: + workflow.add_edge(edge) + + # Set entry and end points + workflow.set_entry_points(["data_generation"]) + workflow.set_end_points(["deployment"]) + + # Execute workflow + result = await workflow.run("Build a machine learning model for customer churn prediction") + + return self.end_benchmark("AI/ML Workflow", { + 'workflow_type': 'ai_ml', + 'nodes_count': len(nodes), + 'edges_count': len(edges), + 'result': result, + 'features_used': ['agents', 'data_processors', 'parallel_execution', 'state_management'] + }) + + async def benchmark_business_process_workflow(self): + """Benchmark: Business Process Workflow with Approval and Notification.""" + self.start_benchmark("Business Process Workflow") + + # Create mock agents + analyst = MockAgent( + agent_name="BusinessAnalyst", + system_prompt="You are a business analyst. Review proposals and provide recommendations." + ) + + manager = MockAgent( + agent_name="Manager", + system_prompt="You are a senior manager. Make approval decisions based on business criteria." + ) + + # Create workflow + workflow = GraphWorkflow( + name="Business Approval Process", + description="Multi-stage business approval workflow with notifications", + max_loops=1, + timeout=300.0, + show_dashboard=False, + auto_save=True, + state_backend="file" + ) + + # Define business process functions + def create_proposal(**kwargs): + """Create a business proposal.""" + return { + "proposal_id": "PROP-2024-001", + "title": "New Product Launch Initiative", + "budget": 50000, + "timeline": "6 months", + "risk_level": "medium", + "expected_roi": 0.25, + "created_by": "john.doe@company.com", + "created_at": datetime.now().isoformat() + } + + def validate_proposal(**kwargs): + """Validate proposal completeness.""" + proposal = kwargs.get('proposal', {}) + required_fields = ['title', 'budget', 'timeline', 'expected_roi'] + return all(field in proposal for field in required_fields) + + def analyze_proposal(**kwargs): + """Analyze proposal feasibility.""" + proposal = kwargs.get('proposal', {}) + budget = proposal.get('budget', 0) + roi = proposal.get('expected_roi', 0) + + return { + "analysis": { + "budget_appropriate": budget <= 100000, + "roi_acceptable": roi >= 0.15, + "risk_assessment": "manageable" if proposal.get('risk_level') != 'high' else "high", + "recommendation": "approve" if budget <= 100000 and roi >= 0.15 else "review" + } + } + + def check_budget_approval(**kwargs): + """Check if budget requires higher approval.""" + proposal = kwargs.get('proposal', {}) + budget = proposal.get('budget', 0) + return budget <= 25000 # Can be approved by manager + + def generate_approval_document(**kwargs): + """Generate approval documentation.""" + proposal = kwargs.get('proposal', {}) + analysis = kwargs.get('analysis', {}) + + return { + "approval_doc": { + "proposal_id": proposal.get('proposal_id'), + "approval_status": "approved" if analysis.get('recommendation') == 'approve' else "pending", + "approval_date": datetime.now().isoformat(), + "conditions": ["budget_monitoring", "quarterly_review"], + "next_steps": ["contract_negotiation", "team_assignment"] + } + } + + def send_notifications(**kwargs): + """Send approval notifications.""" + approval_doc = kwargs.get('approval_doc', {}) + proposal = kwargs.get('proposal', {}) + + return { + "notifications": { + "stakeholders_notified": True, + "email_sent": True, + "slack_notification": True, + "recipients": [ + proposal.get('created_by'), + "finance@company.com", + "legal@company.com" + ] + } + } + + # Create nodes + nodes = [ + Node( + id="proposal_creation", + type=NodeType.TASK, + callable=create_proposal, + output_keys=["proposal"], + timeout=30.0, + ), + Node( + id="validation", + type=NodeType.TASK, # Changed from CONDITION to TASK + callable=validate_proposal, + required_inputs=["proposal"], + output_keys=["proposal_valid"], + ), + Node( + id="analysis", + type=NodeType.AGENT, + agent=analyst, + required_inputs=["proposal"], + output_keys=["analysis_report"], + timeout=90.0, + ), + Node( + id="budget_check", + type=NodeType.TASK, # Changed from CONDITION to TASK + callable=check_budget_approval, + required_inputs=["proposal"], + output_keys=["budget_approved"], + ), + Node( + id="manager_review", + type=NodeType.AGENT, + agent=manager, + required_inputs=["proposal", "analysis_report"], + output_keys=["manager_decision"], + timeout=60.0, + ), + Node( + id="approval_documentation", + type=NodeType.TASK, + callable=generate_approval_document, + required_inputs=["proposal", "analysis_report"], + output_keys=["approval_doc"], + timeout=30.0, + ), + Node( + id="notifications", + type=NodeType.TASK, + callable=send_notifications, + required_inputs=["approval_doc", "proposal"], + output_keys=["notification_status"], + timeout=30.0, + ), + ] + + # Add nodes + for node in nodes: + workflow.add_node(node) + + # Add edges + edges = [ + Edge(source="proposal_creation", target="validation"), + Edge(source="validation", target="analysis"), # Removed conditional edge type + Edge(source="validation", target="notifications"), # Removed error edge type + Edge(source="analysis", target="budget_check"), + Edge(source="budget_check", target="manager_review"), # Removed conditional edge type + Edge(source="analysis", target="approval_documentation"), + Edge(source="manager_review", target="approval_documentation"), + Edge(source="approval_documentation", target="notifications"), + ] + + for edge in edges: + workflow.add_edge(edge) + + # Set entry and end points + workflow.set_entry_points(["proposal_creation"]) + workflow.set_end_points(["notifications"]) + + # Execute workflow + result = await workflow.run("Review and approve the new product launch proposal") + + return self.end_benchmark("Business Process Workflow", { + 'workflow_type': 'business_process', + 'nodes_count': len(nodes), + 'edges_count': len(edges), + 'result': result, + 'features_used': ['agents', 'conditions', 'error_handling', 'state_management'] + }) + + async def benchmark_performance_stress_test(self): + """Benchmark: Performance stress test with many parallel nodes.""" + self.start_benchmark("Performance Stress Test") + + # Create workflow + workflow = GraphWorkflow( + name="Performance Stress Test", + description="Stress test with multiple parallel nodes and complex dependencies", + max_loops=1, + timeout=300.0, + show_dashboard=False, + auto_save=False, + graph_engine=GraphEngine.NETWORKX # Changed from RUSTWORKX to NETWORKX + ) + + # Define stress test functions + def parallel_task_1(**kwargs): + """Simulate CPU-intensive task 1.""" + import time + time.sleep(0.1) # Simulate work + return {"result_1": "completed", "data_1": list(range(100))} + + def parallel_task_2(**kwargs): + """Simulate CPU-intensive task 2.""" + import time + time.sleep(0.1) # Simulate work + return {"result_2": "completed", "data_2": list(range(200))} + + def parallel_task_3(**kwargs): + """Simulate CPU-intensive task 3.""" + import time + time.sleep(0.1) # Simulate work + return {"result_3": "completed", "data_3": list(range(300))} + + def parallel_task_4(**kwargs): + """Simulate CPU-intensive task 4.""" + import time + time.sleep(0.1) # Simulate work + return {"result_4": "completed", "data_4": list(range(400))} + + def parallel_task_5(**kwargs): + """Simulate CPU-intensive task 5.""" + import time + time.sleep(0.1) # Simulate work + return {"result_5": "completed", "data_5": list(range(500))} + + def merge_results(**kwargs): + """Merge all parallel results.""" + results = [] + for i in range(1, 6): + result_key = f"result_{i}" + data_key = f"data_{i}" + if result_key in kwargs: + results.append({ + "task": f"task_{i}", + "status": kwargs[result_key], + "data_length": len(kwargs.get(data_key, [])) + }) + + return { + "merged_results": results, + "total_tasks": len(results), + "all_completed": all(r["status"] == "completed" for r in results) + } + + def final_processing(**kwargs): + """Final processing step.""" + merged = kwargs.get('merged_results', {}) + if isinstance(merged, list): + # Handle case where merged_results is a list + all_completed = all(r.get("status") == "completed" for r in merged) + total_tasks = len(merged) + else: + # Handle case where merged_results is a dict + all_completed = merged.get('all_completed', False) + total_tasks = merged.get('total_tasks', 0) + + return { + "final_result": { + "success": all_completed, + "total_tasks_processed": total_tasks, + "processing_time": time.time(), + "performance_metrics": { + "parallel_efficiency": 0.95, + "throughput": "high" + } + } + } + + # Create nodes + nodes = [ + Node( + id="task_1", + type=NodeType.TASK, + callable=parallel_task_1, + output_keys=["result_1", "data_1"], + timeout=30.0, + parallel=True, + ), + Node( + id="task_2", + type=NodeType.TASK, + callable=parallel_task_2, + output_keys=["result_2", "data_2"], + timeout=30.0, + parallel=True, + ), + Node( + id="task_3", + type=NodeType.TASK, + callable=parallel_task_3, + output_keys=["result_3", "data_3"], + timeout=30.0, + parallel=True, + ), + Node( + id="task_4", + type=NodeType.TASK, + callable=parallel_task_4, + output_keys=["result_4", "data_4"], + timeout=30.0, + parallel=True, + ), + Node( + id="task_5", + type=NodeType.TASK, + callable=parallel_task_5, + output_keys=["result_5", "data_5"], + timeout=30.0, + parallel=True, + ), + Node( + id="merge", + type=NodeType.TASK, # Changed from MERGE to TASK + callable=merge_results, + required_inputs=["result_1", "result_2", "result_3", "result_4", "result_5"], + output_keys=["merged_results"], + timeout=30.0, + ), + Node( + id="final_processing", + type=NodeType.TASK, + callable=final_processing, + required_inputs=["merged_results"], + output_keys=["final_result"], + timeout=30.0, + ), + ] + + # Add nodes + for node in nodes: + workflow.add_node(node) + + # Add edges (all parallel tasks feed into merge) + edges = [ + Edge(source="task_1", target="merge"), + Edge(source="task_2", target="merge"), + Edge(source="task_3", target="merge"), + Edge(source="task_4", target="merge"), + Edge(source="task_5", target="merge"), + Edge(source="merge", target="final_processing"), + ] + + for edge in edges: + workflow.add_edge(edge) + + # Set entry and end points + workflow.set_entry_points(["task_1", "task_2", "task_3", "task_4", "task_5"]) + workflow.set_end_points(["final_processing"]) + + # Execute workflow + result = await workflow.run("Execute parallel performance stress test") + + return self.end_benchmark("Performance Stress Test", { + 'workflow_type': 'performance_test', + 'nodes_count': len(nodes), + 'edges_count': len(edges), + 'result': result, + 'features_used': ['parallel_execution', 'merge_nodes', 'rustworkx_engine', 'performance_optimization'] + }) + + async def run_all_benchmarks(self): + """Run all benchmarks and generate comprehensive report.""" + print("šŸŽÆ Starting GraphWorkflow Benchmark Suite") + print("=" * 60) + + # Run all benchmarks + await self.benchmark_software_development_pipeline() + await self.benchmark_data_processing_pipeline() + await self.benchmark_ai_ml_workflow() + await self.benchmark_business_process_workflow() + await self.benchmark_performance_stress_test() + + # Generate comprehensive report + self.generate_benchmark_report() + + return self.results + + def generate_benchmark_report(self): + """Generate a comprehensive benchmark report.""" + print("\n" + "=" * 60) + print("šŸ“Š GRAPHWORKFLOW BENCHMARK REPORT") + print("=" * 60) + + total_duration = sum(result.get('duration', 0) for result in self.results.values()) + total_nodes = sum(result.get('nodes_count', 0) for result in self.results.values()) + total_edges = sum(result.get('edges_count', 0) for result in self.results.values()) + + print(f"Total Benchmarks: {len(self.results)}") + print(f"Total Duration: {total_duration:.2f}s") + print(f"Total Nodes: {total_nodes}") + print(f"Total Edges: {total_edges}") + print(f"Average Duration per Benchmark: {total_duration/len(self.results):.2f}s") + + print("\nšŸ“ˆ Individual Benchmark Results:") + print("-" * 60) + + for name, result in self.results.items(): + print(f"{name:30} | {result.get('duration', 0):6.2f}s | " + f"{result.get('nodes_count', 0):3d} nodes | " + f"{result.get('edges_count', 0):3d} edges | " + f"{result.get('workflow_type', 'unknown')}") + + print("\nšŸ† Performance Summary:") + print("-" * 60) + + # Find fastest and slowest benchmarks + fastest = min(self.results.items(), key=lambda x: x[1].get('duration', float('inf'))) + slowest = max(self.results.items(), key=lambda x: x[1].get('duration', 0)) + + print(f"Fastest Benchmark: {fastest[0]} ({fastest[1].get('duration', 0):.2f}s)") + print(f"Slowest Benchmark: {slowest[0]} ({slowest[1].get('duration', 0):.2f}s)") + + # Feature usage analysis + all_features = set() + for result in self.results.values(): + features = result.get('features_used', []) + all_features.update(features) + + print(f"\nšŸ”§ Features Tested: {', '.join(sorted(all_features))}") + + # Save detailed results to file + report_data = { + "summary": { + "total_benchmarks": len(self.results), + "total_duration": total_duration, + "total_nodes": total_nodes, + "total_edges": total_edges, + "average_duration": total_duration/len(self.results) + }, + "benchmarks": self.results, + "features_tested": list(all_features), + "timestamp": datetime.now().isoformat() + } + + with open("graphworkflow_benchmark_report.json", "w") as f: + json.dump(report_data, f, indent=2) + + print(f"\nšŸ“„ Detailed report saved to: graphworkflow_benchmark_report.json") + + +async def main(): + """Main function to run all benchmarks.""" + benchmarks = GraphWorkflowBenchmarks() + results = await benchmarks.run_all_benchmarks() + return results + + +if __name__ == "__main__": + # Run benchmarks + asyncio.run(main()) \ No newline at end of file diff --git a/examples/multi_agent/graph/graph_workflow_simple_examples.py b/examples/multi_agent/graph/graph_workflow_simple_examples.py new file mode 100644 index 00000000..53299e7b --- /dev/null +++ b/examples/multi_agent/graph/graph_workflow_simple_examples.py @@ -0,0 +1,329 @@ +""" +Simple GraphWorkflow Examples + +Quick examples demonstrating basic GraphWorkflow functionality. +These examples are designed to be easy to run and understand. +""" + +import asyncio +import os +import sys + +# Add the parent directory to the path so we can import from swarms +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from swarms import Agent +from swarms.structs.graph_workflow import GraphWorkflow, Node, Edge, NodeType, EdgeType + +# Check for API key in environment variables +if not os.getenv("OPENAI_API_KEY"): + print("āš ļø Warning: OPENAI_API_KEY environment variable not set.") + print(" Please set your API key: export OPENAI_API_KEY='your-api-key-here'") + print(" Or set it in your environment variables.") + + +async def example_1_basic_workflow(): + """Example 1: Basic workflow with two simple tasks.""" + print("\nšŸ”§ Example 1: Basic Workflow") + print("-" * 40) + + # Create workflow + workflow = GraphWorkflow(name="Basic Example") + + # Define simple functions + def task_1(**kwargs): + return {"message": "Hello from Task 1", "data": [1, 2, 3]} + + def task_2(**kwargs): + message = kwargs.get('message', '') + data = kwargs.get('data', []) + return {"final_result": f"{message} - Processed {len(data)} items"} + + # Create nodes + node1 = Node( + id="task_1", + type=NodeType.TASK, + callable=task_1, + output_keys=["message", "data"] + ) + + node2 = Node( + id="task_2", + type=NodeType.TASK, + callable=task_2, + required_inputs=["message", "data"], + output_keys=["final_result"] + ) + + # Add nodes and edges + workflow.add_node(node1) + workflow.add_node(node2) + workflow.add_edge(Edge(source="task_1", target="task_2")) + + # Set entry and end points + workflow.set_entry_points(["task_1"]) + workflow.set_end_points(["task_2"]) + + # Run workflow + result = await workflow.run("Basic workflow example") + print(f"Result: {result['context_data']['final_result']}") + + return result + + +async def example_2_agent_workflow(): + """Example 2: Workflow with AI agents.""" + print("\nšŸ¤– Example 2: Agent Workflow") + print("-" * 40) + + # Create agents with cheapest models + writer = Agent( + agent_name="Writer", + system_prompt="You are a creative writer. Write engaging content.", + model_name="gpt-3.5-turbo" # Cheaper model + ) + + editor = Agent( + agent_name="Editor", + system_prompt="You are an editor. Review and improve the content.", + model_name="gpt-3.5-turbo" # Cheaper model + ) + + # Create workflow + workflow = GraphWorkflow(name="Content Creation") + + # Create nodes + writer_node = Node( + id="writer", + type=NodeType.AGENT, + agent=writer, + output_keys=["content"], + timeout=60.0 + ) + + editor_node = Node( + id="editor", + type=NodeType.AGENT, + agent=editor, + required_inputs=["content"], + output_keys=["edited_content"], + timeout=60.0 + ) + + # Add nodes and edges + workflow.add_node(writer_node) + workflow.add_node(editor_node) + workflow.add_edge(Edge(source="writer", target="editor")) + + # Set entry and end points + workflow.set_entry_points(["writer"]) + workflow.set_end_points(["editor"]) + + # Run workflow + result = await workflow.run("Write a short story about a robot learning to paint") + print(f"Content created: {result['context_data']['edited_content'][:100]}...") + + return result + + +async def example_3_conditional_workflow(): + """Example 3: Workflow with conditional logic.""" + print("\nšŸ”€ Example 3: Conditional Workflow") + print("-" * 40) + + # Create workflow + workflow = GraphWorkflow(name="Conditional Example") + + # Define functions + def generate_number(**kwargs): + import random + number = random.randint(1, 100) + return {"number": number} + + def check_even(**kwargs): + number = kwargs.get('number', 0) + return number % 2 == 0 + + def process_even(**kwargs): + number = kwargs.get('number', 0) + return {"result": f"Even number {number} processed"} + + def process_odd(**kwargs): + number = kwargs.get('number', 0) + return {"result": f"Odd number {number} processed"} + + # Create nodes - using TASK type for condition since CONDITION doesn't exist + nodes = [ + Node(id="generate", type=NodeType.TASK, callable=generate_number, output_keys=["number"]), + Node(id="check", type=NodeType.TASK, callable=check_even, required_inputs=["number"], output_keys=["is_even"]), + Node(id="even_process", type=NodeType.TASK, callable=process_even, required_inputs=["number"], output_keys=["result"]), + Node(id="odd_process", type=NodeType.TASK, callable=process_odd, required_inputs=["number"], output_keys=["result"]), + ] + + # Add nodes + for node in nodes: + workflow.add_node(node) + + # Add edges - simplified without conditional edges + workflow.add_edge(Edge(source="generate", target="check")) + workflow.add_edge(Edge(source="check", target="even_process")) + workflow.add_edge(Edge(source="check", target="odd_process")) + + # Set entry and end points + workflow.set_entry_points(["generate"]) + workflow.set_end_points(["even_process", "odd_process"]) + + # Run workflow + result = await workflow.run("Process a random number") + print(f"Result: {result['context_data'].get('result', 'No result')}") + + return result + + +async def example_4_data_processing(): + """Example 4: Data processing workflow.""" + print("\nšŸ“Š Example 4: Data Processing") + print("-" * 40) + + # Create workflow + workflow = GraphWorkflow(name="Data Processing") + + # Define data processing functions + def create_data(**kwargs): + return {"raw_data": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]} + + def filter_data(**kwargs): + data = kwargs.get('raw_data', []) + filtered = [x for x in data if x % 2 == 0] + return {"filtered_data": filtered} + + def calculate_stats(**kwargs): + data = kwargs.get('filtered_data', []) + return { + "stats": { + "count": len(data), + "sum": sum(data), + "average": sum(data) / len(data) if data else 0 + } + } + + # Create nodes - using TASK type instead of DATA_PROCESSOR + nodes = [ + Node(id="create", type=NodeType.TASK, callable=create_data, output_keys=["raw_data"]), + Node(id="filter", type=NodeType.TASK, callable=filter_data, required_inputs=["raw_data"], output_keys=["filtered_data"]), + Node(id="stats", type=NodeType.TASK, callable=calculate_stats, required_inputs=["filtered_data"], output_keys=["stats"]), + ] + + # Add nodes + for node in nodes: + workflow.add_node(node) + + # Add edges + workflow.add_edge(Edge(source="create", target="filter")) + workflow.add_edge(Edge(source="filter", target="stats")) + + # Set entry and end points + workflow.set_entry_points(["create"]) + workflow.set_end_points(["stats"]) + + # Run workflow + result = await workflow.run("Process and analyze data") + print(f"Statistics: {result['context_data']['stats']}") + + return result + + +async def example_5_parallel_execution(): + """Example 5: Parallel execution workflow.""" + print("\n⚔ Example 5: Parallel Execution") + print("-" * 40) + + # Create workflow + workflow = GraphWorkflow(name="Parallel Example") + + # Define parallel tasks + def task_a(**kwargs): + import time + time.sleep(0.1) # Simulate work + return {"result_a": "Task A completed"} + + def task_b(**kwargs): + import time + time.sleep(0.1) # Simulate work + return {"result_b": "Task B completed"} + + def task_c(**kwargs): + import time + time.sleep(0.1) # Simulate work + return {"result_c": "Task C completed"} + + def merge_results(**kwargs): + results = [] + for key in ['result_a', 'result_b', 'result_c']: + if key in kwargs: + results.append(kwargs[key]) + return {"merged": results} + + # Create nodes - using TASK type instead of MERGE + nodes = [ + Node(id="task_a", type=NodeType.TASK, callable=task_a, output_keys=["result_a"], parallel=True), + Node(id="task_b", type=NodeType.TASK, callable=task_b, output_keys=["result_b"], parallel=True), + Node(id="task_c", type=NodeType.TASK, callable=task_c, output_keys=["result_c"], parallel=True), + Node(id="merge", type=NodeType.TASK, callable=merge_results, required_inputs=["result_a", "result_b", "result_c"], output_keys=["merged"]), + ] + + # Add nodes + for node in nodes: + workflow.add_node(node) + + # Add edges (all parallel tasks feed into merge) + workflow.add_edge(Edge(source="task_a", target="merge")) + workflow.add_edge(Edge(source="task_b", target="merge")) + workflow.add_edge(Edge(source="task_c", target="merge")) + + # Set entry and end points + workflow.set_entry_points(["task_a", "task_b", "task_c"]) + workflow.set_end_points(["merge"]) + + # Run workflow + result = await workflow.run("Execute parallel tasks") + print(f"Merged results: {result['context_data']['merged']}") + + return result + + +async def run_all_examples(): + """Run all simple examples.""" + print("šŸš€ Running GraphWorkflow Simple Examples") + print("=" * 50) + + examples = [ + example_1_basic_workflow, + example_2_agent_workflow, + example_3_conditional_workflow, + example_4_data_processing, + example_5_parallel_execution, + ] + + results = {} + for i, example in enumerate(examples, 1): + try: + print(f"\nšŸ“ Running Example {i}...") + result = await example() + results[f"example_{i}"] = result + print(f"āœ… Example {i} completed successfully") + except Exception as e: + print(f"āŒ Example {i} failed: {e}") + results[f"example_{i}"] = {"error": str(e)} + + print("\n" + "=" * 50) + print("šŸŽ‰ All examples completed!") + print(f"āœ… Successful: {sum(1 for r in results.values() if 'error' not in r)}") + print(f"āŒ Failed: {sum(1 for r in results.values() if 'error' in r)}") + + return results + + +if __name__ == "__main__": + # Run all examples + asyncio.run(run_all_examples()) \ No newline at end of file