You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/tests/structs/test_graph_workflow.py

553 lines
17 KiB

import pytest
from swarms.structs.graph_workflow import (
GraphWorkflow,
Node,
NodeType,
)
from swarms.structs.agent import Agent
try:
import rustworkx as rx
RUSTWORKX_AVAILABLE = True
except ImportError:
RUSTWORKX_AVAILABLE = False
def create_test_agent(name: str, description: str = None) -> Agent:
"""Create a real agent for testing"""
if description is None:
description = f"Test agent for {name} operations"
return Agent(
agent_name=name,
agent_description=description,
model_name="gpt-4o-mini",
verbose=False,
print_on=False,
max_loops=1,
)
def test_graph_workflow_basic_node_creation():
"""Test basic GraphWorkflow node creation with real agents"""
# Test basic node creation
agent = create_test_agent(
"TestAgent", "Test agent for node creation"
)
node = Node.from_agent(agent)
assert node.id == "TestAgent"
assert node.type == NodeType.AGENT
assert node.agent == agent
# Test node with custom id
node2 = Node(id="CustomID", type=NodeType.AGENT, agent=agent)
assert node2.id == "CustomID"
def test_graph_workflow_multi_agent_collaboration():
"""Test GraphWorkflow with multiple agents in a collaboration scenario"""
# Create specialized agents for a business analysis workflow
market_researcher = create_test_agent(
"Market-Researcher",
"Specialist in market analysis and trend identification",
)
data_analyst = create_test_agent(
"Data-Analyst",
"Expert in data processing and statistical analysis",
)
strategy_consultant = create_test_agent(
"Strategy-Consultant",
"Senior consultant for strategic planning and recommendations",
)
# Create workflow with linear execution path
workflow = GraphWorkflow(name="Business-Analysis-Workflow")
workflow.add_node(market_researcher)
workflow.add_node(data_analyst)
workflow.add_node(strategy_consultant)
# Add edges to define execution order
workflow.add_edge("Market-Researcher", "Data-Analyst")
workflow.add_edge("Data-Analyst", "Strategy-Consultant")
# Test workflow execution
result = workflow.run(
"Analyze market opportunities for AI in healthcare"
)
assert result is not None
def test_graph_workflow_parallel_execution():
"""Test GraphWorkflow with parallel execution paths"""
# Create agents for parallel analysis
technical_analyst = create_test_agent(
"Technical-Analyst",
"Technical feasibility and implementation analysis",
)
market_analyst = create_test_agent(
"Market-Analyst",
"Market positioning and competitive analysis",
)
financial_analyst = create_test_agent(
"Financial-Analyst", "Financial modeling and ROI analysis"
)
risk_assessor = create_test_agent(
"Risk-Assessor", "Risk assessment and mitigation planning"
)
# Create workflow with parallel execution
workflow = GraphWorkflow(name="Parallel-Analysis-Workflow")
workflow.add_node(technical_analyst)
workflow.add_node(market_analyst)
workflow.add_node(financial_analyst)
workflow.add_node(risk_assessor)
# Add edges for fan-out execution (one to many)
workflow.add_edges_from_source(
"Technical-Analyst",
["Market-Analyst", "Financial-Analyst", "Risk-Assessor"],
)
# Test parallel execution
result = workflow.run(
"Evaluate feasibility of launching a new fintech platform"
)
assert result is not None
def test_graph_workflow_complex_topology():
"""Test GraphWorkflow with complex node topology"""
# Create agents for a comprehensive product development workflow
product_manager = create_test_agent(
"Product-Manager", "Product strategy and roadmap management"
)
ux_designer = create_test_agent(
"UX-Designer", "User experience design and research"
)
backend_developer = create_test_agent(
"Backend-Developer",
"Backend system architecture and development",
)
frontend_developer = create_test_agent(
"Frontend-Developer",
"Frontend interface and user interaction development",
)
qa_engineer = create_test_agent(
"QA-Engineer", "Quality assurance and testing specialist"
)
devops_engineer = create_test_agent(
"DevOps-Engineer", "Deployment and infrastructure management"
)
# Create workflow with complex dependencies
workflow = GraphWorkflow(name="Product-Development-Workflow")
workflow.add_node(product_manager)
workflow.add_node(ux_designer)
workflow.add_node(backend_developer)
workflow.add_node(frontend_developer)
workflow.add_node(qa_engineer)
workflow.add_node(devops_engineer)
# Define complex execution topology
workflow.add_edge("Product-Manager", "UX-Designer")
workflow.add_edge("UX-Designer", "Frontend-Developer")
workflow.add_edge("Product-Manager", "Backend-Developer")
workflow.add_edge("Backend-Developer", "QA-Engineer")
workflow.add_edge("Frontend-Developer", "QA-Engineer")
workflow.add_edge("QA-Engineer", "DevOps-Engineer")
# Test complex workflow execution
result = workflow.run(
"Develop a comprehensive e-commerce platform with AI recommendations"
)
assert result is not None
def test_graph_workflow_error_handling():
"""Test GraphWorkflow error handling and validation"""
# Test with empty workflow
workflow = GraphWorkflow()
result = workflow.run("Test task")
# Empty workflow should handle gracefully
assert result is not None
# Test workflow compilation and caching
researcher = create_test_agent(
"Researcher", "Research specialist"
)
workflow.add_node(researcher)
# First run should compile
result1 = workflow.run("Research task")
assert result1 is not None
# Second run should use cached compilation
result2 = workflow.run("Another research task")
assert result2 is not None
def test_graph_workflow_node_metadata():
"""Test GraphWorkflow with node metadata"""
# Create agents with different priorities and requirements
high_priority_agent = create_test_agent(
"High-Priority-Analyst", "High priority analysis specialist"
)
standard_agent = create_test_agent(
"Standard-Analyst", "Standard analysis agent"
)
# Create workflow and add nodes with metadata
workflow = GraphWorkflow(name="Metadata-Workflow")
workflow.add_node(
high_priority_agent,
metadata={"priority": "high", "timeout": 60},
)
workflow.add_node(
standard_agent, metadata={"priority": "normal", "timeout": 30}
)
# Add execution dependency
workflow.add_edge("High-Priority-Analyst", "Standard-Analyst")
# Test execution with metadata
result = workflow.run(
"Analyze business requirements with different priorities"
)
assert result is not None
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
def test_graph_workflow_backend_basic(backend):
"""Test GraphWorkflow basic functionality with both backends"""
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
agent1 = create_test_agent("Agent1", "First agent")
agent2 = create_test_agent("Agent2", "Second agent")
workflow = GraphWorkflow(
name=f"Backend-Test-{backend}", backend=backend
)
workflow.add_node(agent1)
workflow.add_node(agent2)
workflow.add_edge(agent1, agent2)
assert len(workflow.nodes) == 2
assert len(workflow.edges) == 1
result = workflow.run("Test task")
assert result is not None
assert "Agent1" in result
assert "Agent2" in result
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
def test_graph_workflow_backend_parallel_execution(backend):
"""Test parallel execution with both backends"""
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
coordinator = create_test_agent(
"Coordinator", "Coordinates tasks"
)
analyst1 = create_test_agent("Analyst1", "First analyst")
analyst2 = create_test_agent("Analyst2", "Second analyst")
analyst3 = create_test_agent("Analyst3", "Third analyst")
workflow = GraphWorkflow(
name=f"Parallel-Test-{backend}", backend=backend
)
workflow.add_node(coordinator)
workflow.add_node(analyst1)
workflow.add_node(analyst2)
workflow.add_node(analyst3)
workflow.add_edges_from_source(
coordinator, [analyst1, analyst2, analyst3]
)
workflow.compile()
assert len(workflow._sorted_layers) >= 1
assert (
len(workflow._sorted_layers[0]) == 1
) # Coordinator in first layer
result = workflow.run("Analyze data in parallel")
assert result is not None
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
def test_graph_workflow_backend_fan_in_pattern(backend):
"""Test fan-in pattern with both backends"""
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
analyst1 = create_test_agent("Analyst1", "First analyst")
analyst2 = create_test_agent("Analyst2", "Second analyst")
analyst3 = create_test_agent("Analyst3", "Third analyst")
synthesizer = create_test_agent(
"Synthesizer", "Synthesizes results"
)
workflow = GraphWorkflow(
name=f"FanIn-Test-{backend}", backend=backend
)
workflow.add_node(analyst1)
workflow.add_node(analyst2)
workflow.add_node(analyst3)
workflow.add_node(synthesizer)
workflow.add_edges_to_target(
[analyst1, analyst2, analyst3], synthesizer
)
workflow.compile()
assert len(workflow._sorted_layers) >= 2
assert synthesizer.agent_name in workflow.end_points
result = workflow.run("Synthesize multiple analyses")
assert result is not None
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
def test_graph_workflow_backend_parallel_chain(backend):
"""Test parallel chain pattern with both backends"""
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
collector1 = create_test_agent("Collector1", "First collector")
collector2 = create_test_agent("Collector2", "Second collector")
processor1 = create_test_agent("Processor1", "First processor")
processor2 = create_test_agent("Processor2", "Second processor")
workflow = GraphWorkflow(
name=f"ParallelChain-Test-{backend}", backend=backend
)
workflow.add_node(collector1)
workflow.add_node(collector2)
workflow.add_node(processor1)
workflow.add_node(processor2)
workflow.add_parallel_chain(
[collector1, collector2], [processor1, processor2]
)
workflow.compile()
assert len(workflow.edges) == 4 # 2x2 = 4 edges
result = workflow.run("Process data from multiple collectors")
assert result is not None
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
def test_graph_workflow_backend_complex_topology(backend):
"""Test complex topology with both backends"""
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
agents = [
create_test_agent(f"Agent{i}", f"Agent {i}") for i in range(5)
]
workflow = GraphWorkflow(
name=f"Complex-Topology-{backend}", backend=backend
)
for agent in agents:
workflow.add_node(agent)
workflow.add_edge(agents[0], agents[1])
workflow.add_edge(agents[0], agents[2])
workflow.add_edge(agents[1], agents[3])
workflow.add_edge(agents[2], agents[3])
workflow.add_edge(agents[3], agents[4])
workflow.compile()
assert len(workflow._sorted_layers) >= 3
result = workflow.run("Execute complex workflow")
assert result is not None
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
def test_graph_workflow_backend_validation(backend):
"""Test workflow validation with both backends"""
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
agent1 = create_test_agent("Agent1", "First agent")
agent2 = create_test_agent("Agent2", "Second agent")
isolated = create_test_agent("Isolated", "Isolated agent")
workflow = GraphWorkflow(
name=f"Validation-Test-{backend}", backend=backend
)
workflow.add_node(agent1)
workflow.add_node(agent2)
workflow.add_node(isolated)
workflow.add_edge(agent1, agent2)
validation = workflow.validate(auto_fix=False)
assert isinstance(validation, dict)
assert "is_valid" in validation
validation_fixed = workflow.validate(auto_fix=True)
assert isinstance(validation_fixed, dict)
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
def test_graph_workflow_backend_entry_end_points(backend):
"""Test entry and end points with both backends"""
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
agent1 = create_test_agent("Agent1", "Entry agent")
agent2 = create_test_agent("Agent2", "Middle agent")
agent3 = create_test_agent("Agent3", "End agent")
workflow = GraphWorkflow(
name=f"EntryEnd-Test-{backend}", backend=backend
)
workflow.add_node(agent1)
workflow.add_node(agent2)
workflow.add_node(agent3)
workflow.add_edge(agent1, agent2)
workflow.add_edge(agent2, agent3)
workflow.auto_set_entry_points()
workflow.auto_set_end_points()
assert agent1.agent_name in workflow.entry_points
assert agent3.agent_name in workflow.end_points
def test_graph_workflow_rustworkx_specific():
"""Test rustworkx-specific features"""
if not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
agent1 = create_test_agent("Agent1", "First agent")
agent2 = create_test_agent("Agent2", "Second agent")
agent3 = create_test_agent("Agent3", "Third agent")
workflow = GraphWorkflow(
name="Rustworkx-Specific-Test", backend="rustworkx"
)
workflow.add_node(agent1)
workflow.add_node(agent2)
workflow.add_node(agent3)
workflow.add_edge(agent1, agent2)
workflow.add_edge(agent2, agent3)
assert (
workflow.graph_backend.__class__.__name__
== "RustworkxBackend"
)
assert hasattr(workflow.graph_backend, "_node_id_to_index")
assert hasattr(workflow.graph_backend, "_index_to_node_id")
workflow.compile()
assert len(workflow._sorted_layers) == 3
predecessors = list(
workflow.graph_backend.predecessors(agent2.agent_name)
)
assert agent1.agent_name in predecessors
descendants = workflow.graph_backend.descendants(
agent1.agent_name
)
assert agent2.agent_name in descendants
assert agent3.agent_name in descendants
result = workflow.run("Test rustworkx backend")
assert result is not None
def test_graph_workflow_rustworkx_large_scale():
"""Test rustworkx with larger workflow"""
if not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
agents = [
create_test_agent(f"Agent{i}", f"Agent {i}")
for i in range(10)
]
workflow = GraphWorkflow(
name="Rustworkx-Large-Scale", backend="rustworkx"
)
for agent in agents:
workflow.add_node(agent)
for i in range(len(agents) - 1):
workflow.add_edge(agents[i], agents[i + 1])
workflow.compile()
assert len(workflow._sorted_layers) == 10
result = workflow.run("Test large scale workflow")
assert result is not None
assert len(result) == 10
def test_graph_workflow_rustworkx_agent_objects():
"""Test rustworkx with Agent objects directly in edges"""
if not RUSTWORKX_AVAILABLE:
pytest.skip("rustworkx not available")
agent1 = create_test_agent("Agent1", "First agent")
agent2 = create_test_agent("Agent2", "Second agent")
agent3 = create_test_agent("Agent3", "Third agent")
workflow = GraphWorkflow(
name="Rustworkx-Agent-Objects", backend="rustworkx"
)
workflow.add_node(agent1)
workflow.add_node(agent2)
workflow.add_node(agent3)
workflow.add_edges_from_source(agent1, [agent2, agent3])
workflow.add_edges_to_target([agent2, agent3], agent1)
workflow.compile()
assert len(workflow.edges) == 4
result = workflow.run("Test agent objects in edges")
assert result is not None
def test_graph_workflow_backend_fallback():
"""Test backend fallback when rustworkx unavailable"""
workflow = GraphWorkflow(
name="Fallback-Test", backend="rustworkx"
)
agent = create_test_agent("Agent", "Test agent")
workflow.add_node(agent)
if not RUSTWORKX_AVAILABLE:
assert (
workflow.graph_backend.__class__.__name__
== "NetworkXBackend"
)
else:
assert (
workflow.graph_backend.__class__.__name__
== "RustworkxBackend"
)
if __name__ == "__main__":
pytest.main([__file__, "-v"])