From d105482347a294f49e15407e06454dad8b753b7f Mon Sep 17 00:00:00 2001 From: Steve-Dusty Date: Wed, 29 Oct 2025 00:57:00 -0700 Subject: [PATCH] =?UTF-8?q?=20Fixed=20bug:=20conversation.add(message=3D)?= =?UTF-8?q?=20=E2=86=92=20conversation.add(content=3D)=20in=20all=20swarm?= =?UTF-8?q?=20architectures.=20=20=20Refactored=20test=5Fspreadsheet.py=20?= =?UTF-8?q?and=20test=5Fswarm=5Farchitectures.py=20to=20use=20pytest=20wit?= =?UTF-8?q?h=20real=20agent=20execution?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- swarms/structs/swarming_architectures.py | 82 +-- tests/structs/test_spreadsheet.py | 773 ++++++++++++++++------ tests/structs/test_swarm_architectures.py | 459 ++++++------- 3 files changed, 816 insertions(+), 498 deletions(-) diff --git a/swarms/structs/swarming_architectures.py b/swarms/structs/swarming_architectures.py index f34fc844..c286b653 100644 --- a/swarms/structs/swarming_architectures.py +++ b/swarms/structs/swarming_architectures.py @@ -52,12 +52,12 @@ def circular_swarm( for agent in flat_agents: conversation.add( role="User", - message=task, + content=task, ) response = agent.run(conversation.get_str()) conversation.add( role=agent.agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -88,7 +88,7 @@ def grid_swarm( conversation.add( role="User", - message=tasks, + content=tasks, ) grid_size = int( @@ -101,7 +101,7 @@ def grid_swarm( response = agents[i * grid_size + j].run(task) conversation.add( role=agents[i * grid_size + j].agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -139,12 +139,12 @@ def linear_swarm( task = tasks.pop(0) conversation.add( role="User", - message=task, + content=task, ) response = agent.run(conversation.get_str()) conversation.add( role=agent.agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -182,12 +182,12 @@ def star_swarm( # Central agent processes the task conversation.add( role="User", - message=task, + content=task, ) center_response = center_agent.run(conversation.get_str()) conversation.add( role=center_agent.agent_name, - message=center_response, + content=center_response, ) # Other agents process the same task @@ -195,7 +195,7 @@ def star_swarm( response = agent.run(task) conversation.add( role=agent.agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -229,7 +229,7 @@ def mesh_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) task_queue = tasks.copy() @@ -240,7 +240,7 @@ def mesh_swarm( response = agent.run(task) conversation.add( role=agent.agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -285,7 +285,7 @@ def pyramid_swarm( response = agents[agent_index].run(task) conversation.add( role=agents[agent_index].agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -312,7 +312,7 @@ def fibonacci_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) fib = [1, 1] while len(fib) < len(agents): @@ -324,7 +324,7 @@ def fibonacci_swarm( response = agents[int(sum(fib[:i]) + j)].run(task) conversation.add( role=agents[int(sum(fib[:i]) + j)].agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -351,7 +351,7 @@ def prime_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) primes = [ 2, @@ -386,7 +386,7 @@ def prime_swarm( output = agents[prime].run(task) conversation.add( role=agents[prime].agent_name, - message=output, + content=output, ) return history_output_formatter(conversation, output_type) @@ -412,7 +412,7 @@ def power_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) powers = [2**i for i in range(int(len(agents) ** 0.5))] for power in powers: @@ -421,7 +421,7 @@ def power_swarm( output = agents[power].run(task) conversation.add( role=agents[power].agent_name, - message=output, + content=output, ) return history_output_formatter(conversation, output_type) @@ -447,7 +447,7 @@ def log_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) for i in range(len(agents)): if 2**i < len(agents) and tasks: @@ -455,7 +455,7 @@ def log_swarm( output = agents[2**i].run(task) conversation.add( role=agents[2**i].agent_name, - message=output, + content=output, ) return history_output_formatter(conversation, output_type) @@ -481,7 +481,7 @@ def exponential_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) for i in range(len(agents)): @@ -492,7 +492,7 @@ def exponential_swarm( conversation.add( role=agents[index].agent_name, - message=output, + content=output, ) return history_output_formatter(conversation, output_type) @@ -521,7 +521,7 @@ def geometric_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) for i in range(len(agents)): @@ -531,7 +531,7 @@ def geometric_swarm( response = agents[index].run(task) conversation.add( role=agents[index].agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -558,7 +558,7 @@ def harmonic_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) for i in range(1, len(agents) + 1): @@ -568,7 +568,7 @@ def harmonic_swarm( response = agents[index].run(task) conversation.add( role=agents[index].agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -595,7 +595,7 @@ def staircase_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) step = len(agents) // 5 @@ -606,7 +606,7 @@ def staircase_swarm( response = agents[index].run(task) conversation.add( role=agents[index].agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -633,7 +633,7 @@ def sigmoid_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) for i in range(len(agents)): @@ -643,7 +643,7 @@ def sigmoid_swarm( response = agents[index].run(task) conversation.add( role=agents[index].agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -670,7 +670,7 @@ def sinusoidal_swarm( conversation = Conversation() conversation.add( role="User", - message=tasks, + content=tasks, ) for i in range(len(agents)): @@ -680,7 +680,7 @@ def sinusoidal_swarm( response = agents[index].run(task) conversation.add( role=agents[index].agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -715,7 +715,7 @@ def one_to_one( conversation = Conversation() conversation.add( role="User", - message=task, + content=task, ) try: @@ -724,14 +724,14 @@ def one_to_one( sender_response = sender.run(task) conversation.add( role=sender.agent_name, - message=sender_response, + content=sender_response, ) # Receiver processes the result of the sender receiver_response = receiver.run(sender_response) conversation.add( role=receiver.agent_name, - message=receiver_response, + content=receiver_response, ) return history_output_formatter(conversation, output_type) @@ -769,7 +769,7 @@ async def broadcast( conversation = Conversation() conversation.add( role="User", - message=task, + content=task, ) if not sender or not agents or not task: @@ -781,7 +781,7 @@ async def broadcast( conversation.add( role=sender.agent_name, - message=broadcast_message, + content=broadcast_message, ) # Then have all agents process it @@ -789,7 +789,7 @@ async def broadcast( response = agent.run(conversation.get_str()) conversation.add( role=agent.agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) @@ -832,7 +832,7 @@ async def one_to_three( conversation.add( role="User", - message=task, + content=task, ) try: @@ -840,7 +840,7 @@ async def one_to_three( sender_message = sender.run(conversation.get_str()) conversation.add( role=sender.agent_name, - message=sender_message, + content=sender_message, ) # Have each receiver process the message @@ -848,7 +848,7 @@ async def one_to_three( response = agent.run(conversation.get_str()) conversation.add( role=agent.agent_name, - message=response, + content=response, ) return history_output_formatter(conversation, output_type) diff --git a/tests/structs/test_spreadsheet.py b/tests/structs/test_spreadsheet.py index 25ce6b17..f21fc715 100644 --- a/tests/structs/test_spreadsheet.py +++ b/tests/structs/test_spreadsheet.py @@ -1,226 +1,563 @@ +""" +SpreadSheetSwarm Test Suite + +Tests for the SpreadSheetSwarm class, which manages multiple agents to execute tasks +concurrently with support for CSV-based agent loading and automatic metadata tracking. + +The SpreadSheetSwarm processes tasks across multiple agents in parallel, tracks outputs, +and provides data export capabilities in both CSV and JSON formats. +""" + import os -import asyncio -from loguru import logger +import json +import csv + +import pytest + from swarms.structs.agent import Agent from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm -def create_test_csv() -> str: - """Create a test CSV file with agent configurations.""" - print("\nStarting creation of test CSV file") - try: - csv_content = """agent_name,description,system_prompt,task -test_agent_1,Test Agent 1,System prompt 1,Task 1 -test_agent_2,Test Agent 2,System prompt 2,Task 2""" - - file_path = "test_agents.csv" - with open(file_path, "w") as f: - f.write(csv_content) - - print(f"Created CSV with content:\n{csv_content}") - print(f"CSV file created at: {file_path}") - return file_path - except Exception as e: - logger.error(f"Failed to create test CSV: {str(e)}") - raise - - -def create_test_agent(name: str) -> Agent: - """Create a test agent with specified name.""" - print(f"\nCreating test agent: {name}") - try: - agent = Agent( - agent_name=name, - system_prompt=f"Test prompt for {name}", +@pytest.fixture +def temp_workspace(tmp_path): + """Create a temporary workspace directory for test isolation.""" + workspace = tmp_path / "test_workspace" + workspace.mkdir() + return str(workspace) + + +@pytest.fixture +def sample_csv_file(tmp_path): + """Create a sample CSV file with agent configurations.""" + csv_path = tmp_path / "test_agents.csv" + csv_content = [ + ["agent_name", "description", "system_prompt", "task", "model_name"], + ["agent_1", "First test agent", "You are a helpful assistant. Respond with exactly 'Task completed.'", "Say hello", "gpt-4o-mini"], + ["agent_2", "Second test agent", "You are a code reviewer. Respond with exactly 'Review done.'", "Review this: print('hello')", "gpt-4o-mini"], + ] + + with open(csv_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerows(csv_content) + + return str(csv_path) + + +def test_swarm_initialization_basic(temp_workspace): + """Test basic swarm initialization with required parameters.""" + agent = Agent( + agent_name="test_agent_1", + system_prompt="You are a helpful assistant", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + name="Test Swarm", + description="Test swarm description", + agents=[agent], + workspace_dir=temp_workspace, + ) + + assert swarm.name == "Test Swarm" + assert swarm.description == "Test swarm description" + assert len(swarm.agents) == 1 + assert swarm.max_loops == 1 + assert swarm.autosave is True + assert swarm.tasks_completed == 0 + assert swarm.outputs == [] + + +def test_swarm_initialization_multiple_agents(temp_workspace): + """Test swarm initialization with multiple agents.""" + agents = [ + Agent( + agent_name="agent_1", + system_prompt="You are agent 1", + model_name="gpt-4o-mini", + max_loops=1, + ), + Agent( + agent_name="agent_2", + system_prompt="You are agent 2", + model_name="gpt-4o-mini", + max_loops=1, + ), + ] + + swarm = SpreadSheetSwarm( + name="Multi Agent Swarm", + agents=agents, + workspace_dir=temp_workspace, + ) + + assert len(swarm.agents) == 2 + assert swarm.agents[0].agent_name == "agent_1" + assert swarm.agents[1].agent_name == "agent_2" + + +def test_swarm_initialization_custom_max_loops(temp_workspace): + """Test initialization with custom max_loops setting.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + name="Custom Loop Swarm", + agents=[agent], + max_loops=3, + workspace_dir=temp_workspace, + ) + + assert swarm.max_loops == 3 + + +def test_swarm_initialization_autosave_disabled(temp_workspace): + """Test initialization with autosave disabled.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + autosave=False, + workspace_dir=temp_workspace, + ) + + assert swarm.autosave is False + + +def test_swarm_save_file_path_generation(temp_workspace): + """Test that save file path is correctly generated with workspace_dir.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + assert swarm.save_file_path is not None + assert "spreadsheet_swarm_run_id_" in swarm.save_file_path + assert swarm.save_file_path.startswith(temp_workspace) + assert swarm.save_file_path.endswith(".csv") + + +def test_swarm_initialization_no_agents_raises_error(): + """Test that initialization without agents raises ValueError.""" + with pytest.raises(ValueError, match="No agents are provided"): + SpreadSheetSwarm(agents=None) + + +def test_swarm_initialization_no_max_loops_raises_error(): + """Test that initialization without max_loops raises ValueError.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + with pytest.raises(ValueError, match="No max loops are provided"): + SpreadSheetSwarm(agents=[agent], max_loops=None) + + +def test_track_output_single(temp_workspace): + """Test tracking a single task output.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + swarm._track_output("test_agent", "Test task", "Test result") + + assert swarm.tasks_completed == 1 + assert len(swarm.outputs) == 1 + assert swarm.outputs[0]["agent_name"] == "test_agent" + assert swarm.outputs[0]["task"] == "Test task" + assert swarm.outputs[0]["result"] == "Test result" + assert "timestamp" in swarm.outputs[0] + + +def test_track_output_multiple(temp_workspace): + """Test tracking multiple task outputs.""" + agents = [ + Agent( + agent_name=f"agent_{i}", + system_prompt="Test prompt", model_name="gpt-4o-mini", max_loops=1, - autosave=True, - verbose=True, - ) - print(f"Created agent: {name}") - return agent - except Exception as e: - logger.error(f"Failed to create agent {name}: {str(e)}") - raise - - -def test_swarm_initialization() -> None: - """Test basic swarm initialization.""" - print("\n[TEST] Starting swarm initialization test") - try: - print("Creating test agents...") - agents = [ - create_test_agent("agent1"), - create_test_agent("agent2"), - ] - - print("Initializing swarm...") - swarm = SpreadSheetSwarm( - name="Test Swarm", - description="Test Description", - agents=agents, - max_loops=2, - ) - - print("Verifying swarm configuration...") - assert swarm.name == "Test Swarm" - assert swarm.description == "Test Description" - assert len(swarm.agents) == 2 - assert swarm.max_loops == 2 - - print("✅ Swarm initialization test PASSED") - except Exception as e: - logger.error(f"❌ Swarm initialization test FAILED: {str(e)}") - raise - - -async def test_load_from_csv() -> None: - """Test loading agent configurations from CSV.""" - print("\n[TEST] Starting CSV loading test") - try: - csv_path = create_test_csv() - print("Initializing swarm with CSV...") - swarm = SpreadSheetSwarm(load_path=csv_path) - - print("Loading configurations...") - await swarm._load_from_csv() - - print("Verifying loaded configurations...") - assert len(swarm.agents) == 2 - assert len(swarm.agent_configs) == 2 - assert "test_agent_1" in swarm.agent_configs - assert "test_agent_2" in swarm.agent_configs - - os.remove(csv_path) - print(f"Cleaned up test file: {csv_path}") - - print("✅ CSV loading test PASSED") - except Exception as e: - logger.error(f"❌ CSV loading test FAILED: {str(e)}") - raise - - -async def test_run_tasks() -> None: - """Test running tasks with multiple agents.""" - print("\n[TEST] Starting task execution test") - try: - print("Setting up test swarm...") - agents = [ - create_test_agent("agent1"), - create_test_agent("agent2"), - ] - swarm = SpreadSheetSwarm(agents=agents, max_loops=1) - - test_task = "Test task for all agents" - print(f"Running test task: {test_task}") - await swarm._run_tasks(test_task) - - print("Verifying task execution...") - assert swarm.metadata.tasks_completed == 2 - assert len(swarm.metadata.outputs) == 2 - - print("✅ Task execution test PASSED") - except Exception as e: - logger.error(f"❌ Task execution test FAILED: {str(e)}") - raise - - -def test_output_tracking() -> None: - """Test tracking of task outputs.""" - print("\n[TEST] Starting output tracking test") - try: - print("Creating test swarm...") - swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")]) - - print("Tracking test output...") - swarm._track_output("agent1", "Test task", "Test result") - - print("Verifying output tracking...") - assert swarm.metadata.tasks_completed == 1 - assert len(swarm.metadata.outputs) == 1 - assert swarm.metadata.outputs[0].agent_name == "agent1" - - print("✅ Output tracking test PASSED") - except Exception as e: - logger.error(f"❌ Output tracking test FAILED: {str(e)}") - raise - - -async def test_save_to_csv() -> None: - """Test saving metadata to CSV.""" - print("\n[TEST] Starting CSV saving test") - try: - print("Setting up test data...") - swarm = SpreadSheetSwarm( - agents=[create_test_agent("agent1")], - save_file_path="test_output.csv", ) - swarm._track_output("agent1", "Test task", "Test result") - - print("Saving to CSV...") - await swarm._save_to_csv() - - print("Verifying file creation...") - assert os.path.exists(swarm.save_file_path) - - os.remove(swarm.save_file_path) - print("Cleaned up test file") - - print("✅ CSV saving test PASSED") - except Exception as e: - logger.error(f"❌ CSV saving test FAILED: {str(e)}") - raise - - -def test_json_export() -> None: - """Test JSON export functionality.""" - print("\n[TEST] Starting JSON export test") - try: - print("Creating test data...") - swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")]) - swarm._track_output("agent1", "Test task", "Test result") - - print("Exporting to JSON...") - json_output = swarm.export_to_json() - - print("Verifying JSON output...") - assert isinstance(json_output, str) - assert "run_id" in json_output - assert "tasks_completed" in json_output - - print("✅ JSON export test PASSED") - except Exception as e: - logger.error(f"❌ JSON export test FAILED: {str(e)}") - raise - - -async def run_all_tests() -> None: - """Run all test functions.""" - print("\n" + "=" * 50) - print("Starting SpreadsheetSwarm Test Suite") - print("=" * 50 + "\n") - - try: - # Run synchronous tests - print("Running synchronous tests...") - test_swarm_initialization() - test_output_tracking() - test_json_export() - - # Run asynchronous tests - print("\nRunning asynchronous tests...") - await test_load_from_csv() - await test_run_tasks() - await test_save_to_csv() - - print("\n🎉 All tests completed successfully!") - print("=" * 50) - except Exception as e: - logger.error(f"\n❌ Test suite failed: {str(e)}") - print("=" * 50) - raise - - -if __name__ == "__main__": - # Run all tests - asyncio.run(run_all_tests()) + for i in range(1, 4) + ] + + swarm = SpreadSheetSwarm( + agents=agents, + workspace_dir=temp_workspace, + ) + + for i in range(1, 4): + swarm._track_output(f"agent_{i}", f"Task {i}", f"Result {i}") + + assert swarm.tasks_completed == 3 + assert len(swarm.outputs) == 3 + + for i, output in enumerate(swarm.outputs, 1): + assert output["agent_name"] == f"agent_{i}" + assert output["task"] == f"Task {i}" + assert output["result"] == f"Result {i}" + + +def test_track_output_increments_counter(temp_workspace): + """Test that tasks_completed counter increments correctly.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + initial_count = swarm.tasks_completed + swarm._track_output("test_agent", "Task 1", "Result 1") + assert swarm.tasks_completed == initial_count + 1 + + swarm._track_output("test_agent", "Task 2", "Result 2") + assert swarm.tasks_completed == initial_count + 2 + + +def test_load_from_csv_basic(sample_csv_file, temp_workspace): + """Test loading agents from a CSV file.""" + # Initialize with empty agents list, will be populated from CSV + agent = Agent( + agent_name="placeholder", + system_prompt="placeholder", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + load_path=sample_csv_file, + workspace_dir=temp_workspace, + ) + + swarm._load_from_csv() + + # Should have loaded 2 agents from CSV plus the initial placeholder + assert len(swarm.agents) == 3 + assert len(swarm.agent_tasks) == 2 + assert "agent_1" in swarm.agent_tasks + assert "agent_2" in swarm.agent_tasks + assert swarm.agent_tasks["agent_1"] == "Say hello" + assert swarm.agent_tasks["agent_2"] == "Review this: print('hello')" + + +def test_load_from_csv_creates_agents(sample_csv_file, temp_workspace): + """Test that CSV loading creates proper Agent objects.""" + agent = Agent( + agent_name="placeholder", + system_prompt="placeholder", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + load_path=sample_csv_file, + workspace_dir=temp_workspace, + ) + + swarm._load_from_csv() + + # Verify the loaded agents are proper Agent instances + for agent in swarm.agents[1:]: # Skip placeholder + assert isinstance(agent, Agent) + assert hasattr(agent, "agent_name") + assert hasattr(agent, "system_prompt") + + +def test_load_from_nonexistent_csv(temp_workspace): + """Test loading from non-existent CSV file.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + load_path="nonexistent_file.csv", + workspace_dir=temp_workspace, + ) + + # Error is caught and logged, not raised + swarm._load_from_csv() + # Should still have the original agent + assert len(swarm.agents) == 1 + + +def test_save_to_csv_creates_file(temp_workspace): + """Test that saving to CSV creates the output file.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + swarm._track_output("test_agent", "Test task", "Test result") + swarm._save_to_csv() + + assert os.path.exists(swarm.save_file_path) + + +def test_save_to_csv_headers(temp_workspace): + """Test that CSV file includes proper headers.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + swarm._track_output("test_agent", "Test task", "Test result") + swarm._save_to_csv() + + with open(swarm.save_file_path, "r") as f: + reader = csv.reader(f) + headers = next(reader) + assert headers == ["Run ID", "Agent Name", "Task", "Result", "Timestamp"] + + +def test_save_to_csv_data(temp_workspace): + """Test that CSV file includes the tracked output data.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + swarm._track_output("test_agent", "Test task", "Test result") + swarm._save_to_csv() + + with open(swarm.save_file_path, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + assert len(rows) == 1 + assert rows[0]["Agent Name"] == "test_agent" + assert rows[0]["Task"] == "Test task" + assert rows[0]["Result"] == "Test result" + + +def test_save_to_csv_appends(temp_workspace): + """Test that multiple saves append to the same CSV file. + + Note: _save_to_csv() saves ALL outputs in swarm.outputs each time, + so calling it twice will result in duplicates. This tests the actual behavior. + """ + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + swarm._track_output("test_agent", "Task 1", "Result 1") + swarm._save_to_csv() + + swarm._track_output("test_agent", "Task 2", "Result 2") + swarm._save_to_csv() + + # After first save: Task 1 (1 row) + # After second save: Task 1 + Task 2 (2 more rows) + # Total: 3 rows (Task 1 appears twice, Task 2 appears once) + with open(swarm.save_file_path, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + assert len(rows) == 3 + + # Verify the data + assert rows[0]["Task"] == "Task 1" + assert rows[1]["Task"] == "Task 1" + assert rows[2]["Task"] == "Task 2" + + +def test_export_to_json_structure(temp_workspace): + """Test that JSON export contains expected structure.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + name="JSON Test Swarm", + description="Testing JSON export", + agents=[agent], + workspace_dir=temp_workspace, + ) + + swarm._track_output("test_agent", "Test task", "Test result") + json_output = swarm.export_to_json() + data = json.loads(json_output) + + assert "run_id" in data + assert "name" in data + assert "description" in data + assert "tasks_completed" in data + assert "number_of_agents" in data + assert "outputs" in data + + assert data["name"] == "JSON Test Swarm" + assert data["description"] == "Testing JSON export" + assert data["tasks_completed"] == 1 + assert data["number_of_agents"] == 1 + + +def test_export_to_json_outputs(temp_workspace): + """Test that JSON export includes all tracked outputs.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + swarm._track_output("test_agent", "Task 1", "Result 1") + swarm._track_output("test_agent", "Task 2", "Result 2") + + json_output = swarm.export_to_json() + data = json.loads(json_output) + + assert len(data["outputs"]) == 2 + assert data["outputs"][0]["agent_name"] == "test_agent" + assert data["outputs"][0]["task"] == "Task 1" + assert data["outputs"][1]["task"] == "Task 2" + + +def test_export_to_json_valid_format(temp_workspace): + """Test that JSON export is valid JSON.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + json_output = swarm.export_to_json() + data = json.loads(json_output) + assert isinstance(data, dict) + + +def test_export_empty_swarm_to_json(temp_workspace): + """Test JSON export with no completed tasks.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + workspace_dir=temp_workspace, + ) + + json_output = swarm.export_to_json() + data = json.loads(json_output) + + assert data["tasks_completed"] == 0 + assert data["outputs"] == [] + assert data["number_of_agents"] == 1 + + +def test_reliability_check_passes(temp_workspace): + """Test that reliability check passes with valid configuration.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + max_loops=1, + workspace_dir=temp_workspace, + ) + + assert swarm is not None + + +def test_reliability_check_verbose(temp_workspace): + """Test verbose mode during initialization.""" + agent = Agent( + agent_name="test_agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + + swarm = SpreadSheetSwarm( + agents=[agent], + verbose=True, + workspace_dir=temp_workspace, + ) + + assert swarm.verbose is True diff --git a/tests/structs/test_swarm_architectures.py b/tests/structs/test_swarm_architectures.py index 8913a1d0..8f836233 100644 --- a/tests/structs/test_swarm_architectures.py +++ b/tests/structs/test_swarm_architectures.py @@ -1,6 +1,4 @@ -import asyncio -import time -from typing import List +import pytest from swarms.structs.agent import Agent from swarms.structs.swarming_architectures import ( @@ -34,268 +32,251 @@ def create_test_agent(name: str) -> Agent: ) -def create_test_agents(num_agents: int) -> List[Agent]: +def create_test_agents(num_agents: int) -> list[Agent]: """Create specified number of test agents""" - return [ - create_test_agent(f"Agent{i+1}") for i in range(num_agents) - ] + return [create_test_agent(f"Agent{i+1}") for i in range(num_agents)] -def print_separator(): - print("\n" + "=" * 50 + "\n") +def test_circular_swarm(): + """Test circular swarm outputs""" + agents = create_test_agents(3) + tasks = [ + "Analyze data", + "Generate report", + "Summarize findings", + ] + result = circular_swarm(agents, tasks) -def test_circular_swarm(): - """Test and display circular swarm outputs""" - print_separator() - print("CIRCULAR SWARM TEST") - try: - agents = create_test_agents(3) - tasks = [ - "Analyze data", - "Generate report", - "Summarize findings", - ] - - print("Running circular swarm with:") - print(f"Tasks: {tasks}\n") - - result = circular_swarm(agents, tasks) - print("Circular Swarm Outputs:") - for log in result["history"]: - print(f"\nAgent: {log['agent_name']}") - print(f"Task: {log['task']}") - print(f"Response: {log['response']}") - except Exception as e: - print(f"Error: {str(e)}") + assert isinstance(result, list) + assert len(result) > 0 + + for log in result: + assert "role" in log + assert "content" in log def test_grid_swarm(): - """Test and display grid swarm outputs""" - print_separator() - print("GRID SWARM TEST") - try: - agents = create_test_agents(4) # 2x2 grid - tasks = ["Task A", "Task B", "Task C", "Task D"] + """Test grid swarm with 2x2 grid""" + agents = create_test_agents(4) + tasks = ["Task A", "Task B", "Task C", "Task D"] - print("Running grid swarm with 2x2 grid") - print(f"Tasks: {tasks}\n") + result = grid_swarm(agents, tasks) - print(grid_swarm(agents, tasks)) - print( - "Grid Swarm completed - each agent processed tasks in its grid position" - ) - except Exception as e: - print(f"Error: {str(e)}") + assert isinstance(result, list) + assert len(result) > 0 + def test_linear_swarm(): - """Test and display linear swarm outputs""" - print_separator() - print("LINEAR SWARM TEST") - try: - agents = create_test_agents(3) - tasks = ["Research task", "Write content", "Review output"] - - print("Running linear swarm with:") - print(f"Tasks: {tasks}\n") - - result = linear_swarm(agents, tasks) - print("Linear Swarm Outputs:") - for log in result["history"]: - print(f"\nAgent: {log['agent_name']}") - print(f"Task: {log['task']}") - print(f"Response: {log['response']}") - except Exception as e: - print(f"Error: {str(e)}") + """Test linear swarm sequential processing""" + agents = create_test_agents(3) + tasks = ["Research task", "Write content", "Review output"] + + result = linear_swarm(agents, tasks) + + assert isinstance(result, list) + assert len(result) > 0 + + + for log in result: + assert "role" in log + assert "content" in log def test_star_swarm(): - """Test and display star swarm outputs""" - print_separator() - print("STAR SWARM TEST") - try: - agents = create_test_agents(4) # 1 center + 3 peripheral - tasks = ["Coordinate workflow", "Process data"] - - print("Running star swarm with:") - print(f"Center agent: {agents[0].agent_name}") - print( - f"Peripheral agents: {[agent.agent_name for agent in agents[1:]]}" - ) - print(f"Tasks: {tasks}\n") - - result = star_swarm(agents, tasks) - print("Star Swarm Outputs:") - for log in result["history"]: - print(f"\nAgent: {log['agent_name']}") - print(f"Task: {log['task']}") - print(f"Response: {log['response']}") - except Exception as e: - print(f"Error: {str(e)}") + """Test star swarm with central and peripheral agents""" + agents = create_test_agents(4) + tasks = ["Coordinate workflow", "Process data"] + + result = star_swarm(agents, tasks) + + assert isinstance(result, list) + assert len(result) > 0 + + + for log in result: + assert "role" in log + assert "content" in log def test_mesh_swarm(): - """Test and display mesh swarm outputs""" - print_separator() - print("MESH SWARM TEST") - try: - agents = create_test_agents(3) - tasks = [ - "Analyze data", - "Process information", - "Generate insights", - ] - - print("Running mesh swarm with:") - print(f"Tasks: {tasks}\n") - - result = mesh_swarm(agents, tasks) - print(f"Mesh Swarm Outputs: {result}") - for log in result["history"]: - print(f"\nAgent: {log['agent_name']}") - print(f"Task: {log['task']}") - print(f"Response: {log['response']}") - except Exception as e: - print(f"Error: {str(e)}") + """Test mesh swarm interconnected processing""" + agents = create_test_agents(3) + tasks = [ + "Analyze data", + "Process information", + "Generate insights", + ] + + result = mesh_swarm(agents, tasks) + + assert isinstance(result, list) + assert len(result) > 0 + + + for log in result: + assert "role" in log + assert "content" in log def test_pyramid_swarm(): - """Test and display pyramid swarm outputs""" - print_separator() - print("PYRAMID SWARM TEST") - try: - agents = create_test_agents(6) # 1-2-3 pyramid - tasks = [ - "Top task", - "Middle task 1", - "Middle task 2", - "Bottom task 1", - "Bottom task 2", - "Bottom task 3", - ] - - print("Running pyramid swarm with:") - print(f"Tasks: {tasks}\n") - - result = pyramid_swarm(agents, tasks) - print(f"Pyramid Swarm Outputs: {result}") - for log in result["history"]: - print(f"\nAgent: {log['agent_name']}") - print(f"Task: {log['task']}") - print(f"Response: {log['response']}") - except Exception as e: - print(f"Error: {str(e)}") - - -async def test_communication_patterns(): - """Test and display agent communication patterns""" - print_separator() - print("COMMUNICATION PATTERNS TEST") - try: - sender = create_test_agent("Sender") - receiver = create_test_agent("Receiver") - task = "Process and relay this message" - - print("Testing One-to-One Communication:") - result = one_to_one(sender, receiver, task) - print(f"\nOne-to-One Communication Outputs: {result}") - for log in result["history"]: - print(f"\nAgent: {log['agent_name']}") - print(f"Task: {log['task']}") - print(f"Response: {log['response']}") - - print("\nTesting One-to-Three Communication:") - receivers = create_test_agents(3) - await one_to_three(sender, receivers, task) - - print("\nTesting Broadcast Communication:") - broadcast_receivers = create_test_agents(5) - await broadcast(sender, broadcast_receivers, task) - - except Exception as e: - print(f"Error: {str(e)}") - - -def test_mathematical_swarms(): - """Test and display mathematical swarm patterns""" - print_separator() - print("MATHEMATICAL SWARMS TEST") - try: - agents = create_test_agents(8) - base_tasks = ["Calculate", "Process", "Analyze"] - - # Test each mathematical swarm - for swarm_type, swarm_func in [ - ("Power Swarm", power_swarm), - ("Log Swarm", log_swarm), - ("Exponential Swarm", exponential_swarm), - ("Geometric Swarm", geometric_swarm), - ("Harmonic Swarm", harmonic_swarm), - ]: - print(f"\nTesting {swarm_type}:") - tasks = [f"{task} in {swarm_type}" for task in base_tasks] - print(f"Tasks: {tasks}") - swarm_func(agents, tasks.copy()) - - except Exception as e: - print(f"Error: {str(e)}") - - -def test_pattern_swarms(): - """Test and display pattern-based swarms""" - print_separator() - print("PATTERN-BASED SWARMS TEST") - try: - agents = create_test_agents(10) - task = "Process according to pattern" - - for swarm_type, swarm_func in [ - ("Staircase Swarm", staircase_swarm), - ("Sigmoid Swarm", sigmoid_swarm), - ("Sinusoidal Swarm", sinusoidal_swarm), - ]: - print(f"\nTesting {swarm_type}:") - print(f"Task: {task}") - swarm_func(agents, task) - - except Exception as e: - print(f"Error: {str(e)}") - - -def run_all_tests(): - """Run all swarm architecture tests""" - print( - "\n=== Starting Swarm Architectures Test Suite with Outputs ===" - ) - start_time = time.time() + """Test pyramid swarm hierarchical structure""" + agents = create_test_agents(6) + tasks = [ + "Top task", + "Middle task 1", + "Middle task 2", + "Bottom task 1", + "Bottom task 2", + "Bottom task 3", + ] + + result = pyramid_swarm(agents, tasks) + + assert isinstance(result, list) + assert len(result) > 0 + + + for log in result: + assert "role" in log + assert "content" in log + + +def test_power_swarm(): + """Test power swarm mathematical pattern""" + agents = create_test_agents(8) + tasks = ["Calculate in Power Swarm", "Process in Power Swarm", "Analyze in Power Swarm"] + + result = power_swarm(agents, tasks.copy()) + + assert isinstance(result, list) + assert len(result) > 0 + + + +def test_log_swarm(): + """Test log swarm mathematical pattern""" + agents = create_test_agents(8) + tasks = ["Calculate in Log Swarm", "Process in Log Swarm", "Analyze in Log Swarm"] + + result = log_swarm(agents, tasks.copy()) + + assert isinstance(result, list) + assert len(result) > 0 + + + +def test_exponential_swarm(): + """Test exponential swarm mathematical pattern""" + agents = create_test_agents(8) + tasks = ["Calculate in Exponential Swarm", "Process in Exponential Swarm", "Analyze in Exponential Swarm"] + + result = exponential_swarm(agents, tasks.copy()) + + assert isinstance(result, list) + assert len(result) > 0 + + + +def test_geometric_swarm(): + """Test geometric swarm mathematical pattern""" + agents = create_test_agents(8) + tasks = ["Calculate in Geometric Swarm", "Process in Geometric Swarm", "Analyze in Geometric Swarm"] + + result = geometric_swarm(agents, tasks.copy()) + + assert isinstance(result, list) + assert len(result) > 0 + + + +def test_harmonic_swarm(): + """Test harmonic swarm mathematical pattern""" + agents = create_test_agents(8) + tasks = ["Calculate in Harmonic Swarm", "Process in Harmonic Swarm", "Analyze in Harmonic Swarm"] + + result = harmonic_swarm(agents, tasks.copy()) + + assert isinstance(result, list) + assert len(result) > 0 + + + +def test_staircase_swarm(): + """Test staircase swarm pattern""" + agents = create_test_agents(10) + tasks = ["Process step 1", "Process step 2", "Process step 3", "Process step 4", "Process step 5"] + + result = staircase_swarm(agents, tasks) + + assert isinstance(result, list) + assert len(result) > 0 + + +def test_sigmoid_swarm(): + """Test sigmoid swarm pattern""" + agents = create_test_agents(10) + tasks = ["Sigmoid task 1", "Sigmoid task 2", "Sigmoid task 3", "Sigmoid task 4", "Sigmoid task 5"] + + result = sigmoid_swarm(agents, tasks) + + assert isinstance(result, list) + assert len(result) > 0 + + +def test_sinusoidal_swarm(): + """Test sinusoidal swarm pattern""" + agents = create_test_agents(10) + tasks = ["Wave task 1", "Wave task 2", "Wave task 3", "Wave task 4", "Wave task 5"] + + result = sinusoidal_swarm(agents, tasks) + + assert isinstance(result, list) + assert len(result) > 0 + + + +def test_one_to_one(): + """Test one-to-one communication pattern""" + sender = create_test_agent("Sender") + receiver = create_test_agent("Receiver") + task = "Process and relay this message" + + result = one_to_one(sender, receiver, task) + + assert isinstance(result, list) + assert len(result) > 0 + + + for log in result: + assert "role" in log + assert "content" in log + - try: - # Test basic swarm patterns - test_circular_swarm() - test_grid_swarm() - test_linear_swarm() - test_star_swarm() - test_mesh_swarm() - test_pyramid_swarm() +@pytest.mark.asyncio +async def test_one_to_three(): + """Test one-to-three communication pattern""" + sender = create_test_agent("Sender") + receivers = create_test_agents(3) + task = "Process and relay this message" - # Test mathematical and pattern swarms - test_mathematical_swarms() - test_pattern_swarms() + result = await one_to_three(sender, receivers, task) - # Test communication patterns - asyncio.run(test_communication_patterns()) + assert isinstance(result, list) + assert len(result) > 0 + - end_time = time.time() - duration = round(end_time - start_time, 2) - print("\n=== Test Suite Completed Successfully ===") - print(f"Time taken: {duration} seconds") - except Exception as e: - print("\n=== Test Suite Failed ===") - print(f"Error: {str(e)}") - raise +@pytest.mark.asyncio +async def test_broadcast(): + """Test broadcast communication pattern""" + sender = create_test_agent("Broadcaster") + receivers = create_test_agents(5) + task = "Broadcast this message" + result = await broadcast(sender, receivers, task) -if __name__ == "__main__": - run_all_tests() + assert isinstance(result, list) + assert len(result) > 0 +