Fixed bug: conversation.add(message=) → conversation.add(content=) in all swarm architectures.

Refactored test_spreadsheet.py and test_swarm_architectures.py to use pytest with real agent execution
pull/1168/head
Steve-Dusty 1 week ago
parent a9be38ea9c
commit d105482347

@ -52,12 +52,12 @@ def circular_swarm(
for agent in flat_agents:
conversation.add(
role="User",
message=task,
content=task,
)
response = agent.run(conversation.get_str())
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -88,7 +88,7 @@ def grid_swarm(
conversation.add(
role="User",
message=tasks,
content=tasks,
)
grid_size = int(
@ -101,7 +101,7 @@ def grid_swarm(
response = agents[i * grid_size + j].run(task)
conversation.add(
role=agents[i * grid_size + j].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -139,12 +139,12 @@ def linear_swarm(
task = tasks.pop(0)
conversation.add(
role="User",
message=task,
content=task,
)
response = agent.run(conversation.get_str())
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -182,12 +182,12 @@ def star_swarm(
# Central agent processes the task
conversation.add(
role="User",
message=task,
content=task,
)
center_response = center_agent.run(conversation.get_str())
conversation.add(
role=center_agent.agent_name,
message=center_response,
content=center_response,
)
# Other agents process the same task
@ -195,7 +195,7 @@ def star_swarm(
response = agent.run(task)
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -229,7 +229,7 @@ def mesh_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
task_queue = tasks.copy()
@ -240,7 +240,7 @@ def mesh_swarm(
response = agent.run(task)
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -285,7 +285,7 @@ def pyramid_swarm(
response = agents[agent_index].run(task)
conversation.add(
role=agents[agent_index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -312,7 +312,7 @@ def fibonacci_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
fib = [1, 1]
while len(fib) < len(agents):
@ -324,7 +324,7 @@ def fibonacci_swarm(
response = agents[int(sum(fib[:i]) + j)].run(task)
conversation.add(
role=agents[int(sum(fib[:i]) + j)].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -351,7 +351,7 @@ def prime_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
primes = [
2,
@ -386,7 +386,7 @@ def prime_swarm(
output = agents[prime].run(task)
conversation.add(
role=agents[prime].agent_name,
message=output,
content=output,
)
return history_output_formatter(conversation, output_type)
@ -412,7 +412,7 @@ def power_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
powers = [2**i for i in range(int(len(agents) ** 0.5))]
for power in powers:
@ -421,7 +421,7 @@ def power_swarm(
output = agents[power].run(task)
conversation.add(
role=agents[power].agent_name,
message=output,
content=output,
)
return history_output_formatter(conversation, output_type)
@ -447,7 +447,7 @@ def log_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
if 2**i < len(agents) and tasks:
@ -455,7 +455,7 @@ def log_swarm(
output = agents[2**i].run(task)
conversation.add(
role=agents[2**i].agent_name,
message=output,
content=output,
)
return history_output_formatter(conversation, output_type)
@ -481,7 +481,7 @@ def exponential_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
@ -492,7 +492,7 @@ def exponential_swarm(
conversation.add(
role=agents[index].agent_name,
message=output,
content=output,
)
return history_output_formatter(conversation, output_type)
@ -521,7 +521,7 @@ def geometric_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
@ -531,7 +531,7 @@ def geometric_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -558,7 +558,7 @@ def harmonic_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(1, len(agents) + 1):
@ -568,7 +568,7 @@ def harmonic_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -595,7 +595,7 @@ def staircase_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
step = len(agents) // 5
@ -606,7 +606,7 @@ def staircase_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -633,7 +633,7 @@ def sigmoid_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
@ -643,7 +643,7 @@ def sigmoid_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -670,7 +670,7 @@ def sinusoidal_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
@ -680,7 +680,7 @@ def sinusoidal_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -715,7 +715,7 @@ def one_to_one(
conversation = Conversation()
conversation.add(
role="User",
message=task,
content=task,
)
try:
@ -724,14 +724,14 @@ def one_to_one(
sender_response = sender.run(task)
conversation.add(
role=sender.agent_name,
message=sender_response,
content=sender_response,
)
# Receiver processes the result of the sender
receiver_response = receiver.run(sender_response)
conversation.add(
role=receiver.agent_name,
message=receiver_response,
content=receiver_response,
)
return history_output_formatter(conversation, output_type)
@ -769,7 +769,7 @@ async def broadcast(
conversation = Conversation()
conversation.add(
role="User",
message=task,
content=task,
)
if not sender or not agents or not task:
@ -781,7 +781,7 @@ async def broadcast(
conversation.add(
role=sender.agent_name,
message=broadcast_message,
content=broadcast_message,
)
# Then have all agents process it
@ -789,7 +789,7 @@ async def broadcast(
response = agent.run(conversation.get_str())
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -832,7 +832,7 @@ async def one_to_three(
conversation.add(
role="User",
message=task,
content=task,
)
try:
@ -840,7 +840,7 @@ async def one_to_three(
sender_message = sender.run(conversation.get_str())
conversation.add(
role=sender.agent_name,
message=sender_message,
content=sender_message,
)
# Have each receiver process the message
@ -848,7 +848,7 @@ async def one_to_three(
response = agent.run(conversation.get_str())
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)

@ -1,226 +1,563 @@
"""
SpreadSheetSwarm Test Suite
Tests for the SpreadSheetSwarm class, which manages multiple agents to execute tasks
concurrently with support for CSV-based agent loading and automatic metadata tracking.
The SpreadSheetSwarm processes tasks across multiple agents in parallel, tracks outputs,
and provides data export capabilities in both CSV and JSON formats.
"""
import os
import asyncio
from loguru import logger
import json
import csv
import pytest
from swarms.structs.agent import Agent
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
def create_test_csv() -> str:
"""Create a test CSV file with agent configurations."""
print("\nStarting creation of test CSV file")
try:
csv_content = """agent_name,description,system_prompt,task
test_agent_1,Test Agent 1,System prompt 1,Task 1
test_agent_2,Test Agent 2,System prompt 2,Task 2"""
file_path = "test_agents.csv"
with open(file_path, "w") as f:
f.write(csv_content)
print(f"Created CSV with content:\n{csv_content}")
print(f"CSV file created at: {file_path}")
return file_path
except Exception as e:
logger.error(f"Failed to create test CSV: {str(e)}")
raise
def create_test_agent(name: str) -> Agent:
"""Create a test agent with specified name."""
print(f"\nCreating test agent: {name}")
try:
agent = Agent(
agent_name=name,
system_prompt=f"Test prompt for {name}",
@pytest.fixture
def temp_workspace(tmp_path):
"""Create a temporary workspace directory for test isolation."""
workspace = tmp_path / "test_workspace"
workspace.mkdir()
return str(workspace)
@pytest.fixture
def sample_csv_file(tmp_path):
"""Create a sample CSV file with agent configurations."""
csv_path = tmp_path / "test_agents.csv"
csv_content = [
["agent_name", "description", "system_prompt", "task", "model_name"],
["agent_1", "First test agent", "You are a helpful assistant. Respond with exactly 'Task completed.'", "Say hello", "gpt-4o-mini"],
["agent_2", "Second test agent", "You are a code reviewer. Respond with exactly 'Review done.'", "Review this: print('hello')", "gpt-4o-mini"],
]
with open(csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerows(csv_content)
return str(csv_path)
def test_swarm_initialization_basic(temp_workspace):
"""Test basic swarm initialization with required parameters."""
agent = Agent(
agent_name="test_agent_1",
system_prompt="You are a helpful assistant",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
name="Test Swarm",
description="Test swarm description",
agents=[agent],
workspace_dir=temp_workspace,
)
assert swarm.name == "Test Swarm"
assert swarm.description == "Test swarm description"
assert len(swarm.agents) == 1
assert swarm.max_loops == 1
assert swarm.autosave is True
assert swarm.tasks_completed == 0
assert swarm.outputs == []
def test_swarm_initialization_multiple_agents(temp_workspace):
"""Test swarm initialization with multiple agents."""
agents = [
Agent(
agent_name="agent_1",
system_prompt="You are agent 1",
model_name="gpt-4o-mini",
max_loops=1,
),
Agent(
agent_name="agent_2",
system_prompt="You are agent 2",
model_name="gpt-4o-mini",
max_loops=1,
),
]
swarm = SpreadSheetSwarm(
name="Multi Agent Swarm",
agents=agents,
workspace_dir=temp_workspace,
)
assert len(swarm.agents) == 2
assert swarm.agents[0].agent_name == "agent_1"
assert swarm.agents[1].agent_name == "agent_2"
def test_swarm_initialization_custom_max_loops(temp_workspace):
"""Test initialization with custom max_loops setting."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
name="Custom Loop Swarm",
agents=[agent],
max_loops=3,
workspace_dir=temp_workspace,
)
assert swarm.max_loops == 3
def test_swarm_initialization_autosave_disabled(temp_workspace):
"""Test initialization with autosave disabled."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
autosave=False,
workspace_dir=temp_workspace,
)
assert swarm.autosave is False
def test_swarm_save_file_path_generation(temp_workspace):
"""Test that save file path is correctly generated with workspace_dir."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
assert swarm.save_file_path is not None
assert "spreadsheet_swarm_run_id_" in swarm.save_file_path
assert swarm.save_file_path.startswith(temp_workspace)
assert swarm.save_file_path.endswith(".csv")
def test_swarm_initialization_no_agents_raises_error():
"""Test that initialization without agents raises ValueError."""
with pytest.raises(ValueError, match="No agents are provided"):
SpreadSheetSwarm(agents=None)
def test_swarm_initialization_no_max_loops_raises_error():
"""Test that initialization without max_loops raises ValueError."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
with pytest.raises(ValueError, match="No max loops are provided"):
SpreadSheetSwarm(agents=[agent], max_loops=None)
def test_track_output_single(temp_workspace):
"""Test tracking a single task output."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
assert swarm.tasks_completed == 1
assert len(swarm.outputs) == 1
assert swarm.outputs[0]["agent_name"] == "test_agent"
assert swarm.outputs[0]["task"] == "Test task"
assert swarm.outputs[0]["result"] == "Test result"
assert "timestamp" in swarm.outputs[0]
def test_track_output_multiple(temp_workspace):
"""Test tracking multiple task outputs."""
agents = [
Agent(
agent_name=f"agent_{i}",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
autosave=True,
verbose=True,
)
print(f"Created agent: {name}")
return agent
except Exception as e:
logger.error(f"Failed to create agent {name}: {str(e)}")
raise
def test_swarm_initialization() -> None:
"""Test basic swarm initialization."""
print("\n[TEST] Starting swarm initialization test")
try:
print("Creating test agents...")
agents = [
create_test_agent("agent1"),
create_test_agent("agent2"),
]
print("Initializing swarm...")
swarm = SpreadSheetSwarm(
name="Test Swarm",
description="Test Description",
agents=agents,
max_loops=2,
)
print("Verifying swarm configuration...")
assert swarm.name == "Test Swarm"
assert swarm.description == "Test Description"
assert len(swarm.agents) == 2
assert swarm.max_loops == 2
print("✅ Swarm initialization test PASSED")
except Exception as e:
logger.error(f"❌ Swarm initialization test FAILED: {str(e)}")
raise
async def test_load_from_csv() -> None:
"""Test loading agent configurations from CSV."""
print("\n[TEST] Starting CSV loading test")
try:
csv_path = create_test_csv()
print("Initializing swarm with CSV...")
swarm = SpreadSheetSwarm(load_path=csv_path)
print("Loading configurations...")
await swarm._load_from_csv()
print("Verifying loaded configurations...")
assert len(swarm.agents) == 2
assert len(swarm.agent_configs) == 2
assert "test_agent_1" in swarm.agent_configs
assert "test_agent_2" in swarm.agent_configs
os.remove(csv_path)
print(f"Cleaned up test file: {csv_path}")
print("✅ CSV loading test PASSED")
except Exception as e:
logger.error(f"❌ CSV loading test FAILED: {str(e)}")
raise
async def test_run_tasks() -> None:
"""Test running tasks with multiple agents."""
print("\n[TEST] Starting task execution test")
try:
print("Setting up test swarm...")
agents = [
create_test_agent("agent1"),
create_test_agent("agent2"),
]
swarm = SpreadSheetSwarm(agents=agents, max_loops=1)
test_task = "Test task for all agents"
print(f"Running test task: {test_task}")
await swarm._run_tasks(test_task)
print("Verifying task execution...")
assert swarm.metadata.tasks_completed == 2
assert len(swarm.metadata.outputs) == 2
print("✅ Task execution test PASSED")
except Exception as e:
logger.error(f"❌ Task execution test FAILED: {str(e)}")
raise
def test_output_tracking() -> None:
"""Test tracking of task outputs."""
print("\n[TEST] Starting output tracking test")
try:
print("Creating test swarm...")
swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")])
print("Tracking test output...")
swarm._track_output("agent1", "Test task", "Test result")
print("Verifying output tracking...")
assert swarm.metadata.tasks_completed == 1
assert len(swarm.metadata.outputs) == 1
assert swarm.metadata.outputs[0].agent_name == "agent1"
print("✅ Output tracking test PASSED")
except Exception as e:
logger.error(f"❌ Output tracking test FAILED: {str(e)}")
raise
async def test_save_to_csv() -> None:
"""Test saving metadata to CSV."""
print("\n[TEST] Starting CSV saving test")
try:
print("Setting up test data...")
swarm = SpreadSheetSwarm(
agents=[create_test_agent("agent1")],
save_file_path="test_output.csv",
)
swarm._track_output("agent1", "Test task", "Test result")
print("Saving to CSV...")
await swarm._save_to_csv()
print("Verifying file creation...")
assert os.path.exists(swarm.save_file_path)
os.remove(swarm.save_file_path)
print("Cleaned up test file")
print("✅ CSV saving test PASSED")
except Exception as e:
logger.error(f"❌ CSV saving test FAILED: {str(e)}")
raise
def test_json_export() -> None:
"""Test JSON export functionality."""
print("\n[TEST] Starting JSON export test")
try:
print("Creating test data...")
swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")])
swarm._track_output("agent1", "Test task", "Test result")
print("Exporting to JSON...")
json_output = swarm.export_to_json()
print("Verifying JSON output...")
assert isinstance(json_output, str)
assert "run_id" in json_output
assert "tasks_completed" in json_output
print("✅ JSON export test PASSED")
except Exception as e:
logger.error(f"❌ JSON export test FAILED: {str(e)}")
raise
async def run_all_tests() -> None:
"""Run all test functions."""
print("\n" + "=" * 50)
print("Starting SpreadsheetSwarm Test Suite")
print("=" * 50 + "\n")
try:
# Run synchronous tests
print("Running synchronous tests...")
test_swarm_initialization()
test_output_tracking()
test_json_export()
# Run asynchronous tests
print("\nRunning asynchronous tests...")
await test_load_from_csv()
await test_run_tasks()
await test_save_to_csv()
print("\n🎉 All tests completed successfully!")
print("=" * 50)
except Exception as e:
logger.error(f"\n❌ Test suite failed: {str(e)}")
print("=" * 50)
raise
if __name__ == "__main__":
# Run all tests
asyncio.run(run_all_tests())
for i in range(1, 4)
]
swarm = SpreadSheetSwarm(
agents=agents,
workspace_dir=temp_workspace,
)
for i in range(1, 4):
swarm._track_output(f"agent_{i}", f"Task {i}", f"Result {i}")
assert swarm.tasks_completed == 3
assert len(swarm.outputs) == 3
for i, output in enumerate(swarm.outputs, 1):
assert output["agent_name"] == f"agent_{i}"
assert output["task"] == f"Task {i}"
assert output["result"] == f"Result {i}"
def test_track_output_increments_counter(temp_workspace):
"""Test that tasks_completed counter increments correctly."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
initial_count = swarm.tasks_completed
swarm._track_output("test_agent", "Task 1", "Result 1")
assert swarm.tasks_completed == initial_count + 1
swarm._track_output("test_agent", "Task 2", "Result 2")
assert swarm.tasks_completed == initial_count + 2
def test_load_from_csv_basic(sample_csv_file, temp_workspace):
"""Test loading agents from a CSV file."""
# Initialize with empty agents list, will be populated from CSV
agent = Agent(
agent_name="placeholder",
system_prompt="placeholder",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
load_path=sample_csv_file,
workspace_dir=temp_workspace,
)
swarm._load_from_csv()
# Should have loaded 2 agents from CSV plus the initial placeholder
assert len(swarm.agents) == 3
assert len(swarm.agent_tasks) == 2
assert "agent_1" in swarm.agent_tasks
assert "agent_2" in swarm.agent_tasks
assert swarm.agent_tasks["agent_1"] == "Say hello"
assert swarm.agent_tasks["agent_2"] == "Review this: print('hello')"
def test_load_from_csv_creates_agents(sample_csv_file, temp_workspace):
"""Test that CSV loading creates proper Agent objects."""
agent = Agent(
agent_name="placeholder",
system_prompt="placeholder",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
load_path=sample_csv_file,
workspace_dir=temp_workspace,
)
swarm._load_from_csv()
# Verify the loaded agents are proper Agent instances
for agent in swarm.agents[1:]: # Skip placeholder
assert isinstance(agent, Agent)
assert hasattr(agent, "agent_name")
assert hasattr(agent, "system_prompt")
def test_load_from_nonexistent_csv(temp_workspace):
"""Test loading from non-existent CSV file."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
load_path="nonexistent_file.csv",
workspace_dir=temp_workspace,
)
# Error is caught and logged, not raised
swarm._load_from_csv()
# Should still have the original agent
assert len(swarm.agents) == 1
def test_save_to_csv_creates_file(temp_workspace):
"""Test that saving to CSV creates the output file."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
swarm._save_to_csv()
assert os.path.exists(swarm.save_file_path)
def test_save_to_csv_headers(temp_workspace):
"""Test that CSV file includes proper headers."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
swarm._save_to_csv()
with open(swarm.save_file_path, "r") as f:
reader = csv.reader(f)
headers = next(reader)
assert headers == ["Run ID", "Agent Name", "Task", "Result", "Timestamp"]
def test_save_to_csv_data(temp_workspace):
"""Test that CSV file includes the tracked output data."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
swarm._save_to_csv()
with open(swarm.save_file_path, "r") as f:
reader = csv.DictReader(f)
rows = list(reader)
assert len(rows) == 1
assert rows[0]["Agent Name"] == "test_agent"
assert rows[0]["Task"] == "Test task"
assert rows[0]["Result"] == "Test result"
def test_save_to_csv_appends(temp_workspace):
"""Test that multiple saves append to the same CSV file.
Note: _save_to_csv() saves ALL outputs in swarm.outputs each time,
so calling it twice will result in duplicates. This tests the actual behavior.
"""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Task 1", "Result 1")
swarm._save_to_csv()
swarm._track_output("test_agent", "Task 2", "Result 2")
swarm._save_to_csv()
# After first save: Task 1 (1 row)
# After second save: Task 1 + Task 2 (2 more rows)
# Total: 3 rows (Task 1 appears twice, Task 2 appears once)
with open(swarm.save_file_path, "r") as f:
reader = csv.DictReader(f)
rows = list(reader)
assert len(rows) == 3
# Verify the data
assert rows[0]["Task"] == "Task 1"
assert rows[1]["Task"] == "Task 1"
assert rows[2]["Task"] == "Task 2"
def test_export_to_json_structure(temp_workspace):
"""Test that JSON export contains expected structure."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
name="JSON Test Swarm",
description="Testing JSON export",
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
json_output = swarm.export_to_json()
data = json.loads(json_output)
assert "run_id" in data
assert "name" in data
assert "description" in data
assert "tasks_completed" in data
assert "number_of_agents" in data
assert "outputs" in data
assert data["name"] == "JSON Test Swarm"
assert data["description"] == "Testing JSON export"
assert data["tasks_completed"] == 1
assert data["number_of_agents"] == 1
def test_export_to_json_outputs(temp_workspace):
"""Test that JSON export includes all tracked outputs."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Task 1", "Result 1")
swarm._track_output("test_agent", "Task 2", "Result 2")
json_output = swarm.export_to_json()
data = json.loads(json_output)
assert len(data["outputs"]) == 2
assert data["outputs"][0]["agent_name"] == "test_agent"
assert data["outputs"][0]["task"] == "Task 1"
assert data["outputs"][1]["task"] == "Task 2"
def test_export_to_json_valid_format(temp_workspace):
"""Test that JSON export is valid JSON."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
json_output = swarm.export_to_json()
data = json.loads(json_output)
assert isinstance(data, dict)
def test_export_empty_swarm_to_json(temp_workspace):
"""Test JSON export with no completed tasks."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
json_output = swarm.export_to_json()
data = json.loads(json_output)
assert data["tasks_completed"] == 0
assert data["outputs"] == []
assert data["number_of_agents"] == 1
def test_reliability_check_passes(temp_workspace):
"""Test that reliability check passes with valid configuration."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
max_loops=1,
workspace_dir=temp_workspace,
)
assert swarm is not None
def test_reliability_check_verbose(temp_workspace):
"""Test verbose mode during initialization."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
verbose=True,
workspace_dir=temp_workspace,
)
assert swarm.verbose is True

@ -1,6 +1,4 @@
import asyncio
import time
from typing import List
import pytest
from swarms.structs.agent import Agent
from swarms.structs.swarming_architectures import (
@ -34,268 +32,251 @@ def create_test_agent(name: str) -> Agent:
)
def create_test_agents(num_agents: int) -> List[Agent]:
def create_test_agents(num_agents: int) -> list[Agent]:
"""Create specified number of test agents"""
return [
create_test_agent(f"Agent{i+1}") for i in range(num_agents)
]
return [create_test_agent(f"Agent{i+1}") for i in range(num_agents)]
def print_separator():
print("\n" + "=" * 50 + "\n")
def test_circular_swarm():
"""Test circular swarm outputs"""
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Generate report",
"Summarize findings",
]
result = circular_swarm(agents, tasks)
def test_circular_swarm():
"""Test and display circular swarm outputs"""
print_separator()
print("CIRCULAR SWARM TEST")
try:
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Generate report",
"Summarize findings",
]
print("Running circular swarm with:")
print(f"Tasks: {tasks}\n")
result = circular_swarm(agents, tasks)
print("Circular Swarm Outputs:")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_grid_swarm():
"""Test and display grid swarm outputs"""
print_separator()
print("GRID SWARM TEST")
try:
agents = create_test_agents(4) # 2x2 grid
tasks = ["Task A", "Task B", "Task C", "Task D"]
"""Test grid swarm with 2x2 grid"""
agents = create_test_agents(4)
tasks = ["Task A", "Task B", "Task C", "Task D"]
print("Running grid swarm with 2x2 grid")
print(f"Tasks: {tasks}\n")
result = grid_swarm(agents, tasks)
print(grid_swarm(agents, tasks))
print(
"Grid Swarm completed - each agent processed tasks in its grid position"
)
except Exception as e:
print(f"Error: {str(e)}")
assert isinstance(result, list)
assert len(result) > 0
def test_linear_swarm():
"""Test and display linear swarm outputs"""
print_separator()
print("LINEAR SWARM TEST")
try:
agents = create_test_agents(3)
tasks = ["Research task", "Write content", "Review output"]
print("Running linear swarm with:")
print(f"Tasks: {tasks}\n")
result = linear_swarm(agents, tasks)
print("Linear Swarm Outputs:")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
"""Test linear swarm sequential processing"""
agents = create_test_agents(3)
tasks = ["Research task", "Write content", "Review output"]
result = linear_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_star_swarm():
"""Test and display star swarm outputs"""
print_separator()
print("STAR SWARM TEST")
try:
agents = create_test_agents(4) # 1 center + 3 peripheral
tasks = ["Coordinate workflow", "Process data"]
print("Running star swarm with:")
print(f"Center agent: {agents[0].agent_name}")
print(
f"Peripheral agents: {[agent.agent_name for agent in agents[1:]]}"
)
print(f"Tasks: {tasks}\n")
result = star_swarm(agents, tasks)
print("Star Swarm Outputs:")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
"""Test star swarm with central and peripheral agents"""
agents = create_test_agents(4)
tasks = ["Coordinate workflow", "Process data"]
result = star_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_mesh_swarm():
"""Test and display mesh swarm outputs"""
print_separator()
print("MESH SWARM TEST")
try:
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Process information",
"Generate insights",
]
print("Running mesh swarm with:")
print(f"Tasks: {tasks}\n")
result = mesh_swarm(agents, tasks)
print(f"Mesh Swarm Outputs: {result}")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
"""Test mesh swarm interconnected processing"""
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Process information",
"Generate insights",
]
result = mesh_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_pyramid_swarm():
"""Test and display pyramid swarm outputs"""
print_separator()
print("PYRAMID SWARM TEST")
try:
agents = create_test_agents(6) # 1-2-3 pyramid
tasks = [
"Top task",
"Middle task 1",
"Middle task 2",
"Bottom task 1",
"Bottom task 2",
"Bottom task 3",
]
print("Running pyramid swarm with:")
print(f"Tasks: {tasks}\n")
result = pyramid_swarm(agents, tasks)
print(f"Pyramid Swarm Outputs: {result}")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
async def test_communication_patterns():
"""Test and display agent communication patterns"""
print_separator()
print("COMMUNICATION PATTERNS TEST")
try:
sender = create_test_agent("Sender")
receiver = create_test_agent("Receiver")
task = "Process and relay this message"
print("Testing One-to-One Communication:")
result = one_to_one(sender, receiver, task)
print(f"\nOne-to-One Communication Outputs: {result}")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
print("\nTesting One-to-Three Communication:")
receivers = create_test_agents(3)
await one_to_three(sender, receivers, task)
print("\nTesting Broadcast Communication:")
broadcast_receivers = create_test_agents(5)
await broadcast(sender, broadcast_receivers, task)
except Exception as e:
print(f"Error: {str(e)}")
def test_mathematical_swarms():
"""Test and display mathematical swarm patterns"""
print_separator()
print("MATHEMATICAL SWARMS TEST")
try:
agents = create_test_agents(8)
base_tasks = ["Calculate", "Process", "Analyze"]
# Test each mathematical swarm
for swarm_type, swarm_func in [
("Power Swarm", power_swarm),
("Log Swarm", log_swarm),
("Exponential Swarm", exponential_swarm),
("Geometric Swarm", geometric_swarm),
("Harmonic Swarm", harmonic_swarm),
]:
print(f"\nTesting {swarm_type}:")
tasks = [f"{task} in {swarm_type}" for task in base_tasks]
print(f"Tasks: {tasks}")
swarm_func(agents, tasks.copy())
except Exception as e:
print(f"Error: {str(e)}")
def test_pattern_swarms():
"""Test and display pattern-based swarms"""
print_separator()
print("PATTERN-BASED SWARMS TEST")
try:
agents = create_test_agents(10)
task = "Process according to pattern"
for swarm_type, swarm_func in [
("Staircase Swarm", staircase_swarm),
("Sigmoid Swarm", sigmoid_swarm),
("Sinusoidal Swarm", sinusoidal_swarm),
]:
print(f"\nTesting {swarm_type}:")
print(f"Task: {task}")
swarm_func(agents, task)
except Exception as e:
print(f"Error: {str(e)}")
def run_all_tests():
"""Run all swarm architecture tests"""
print(
"\n=== Starting Swarm Architectures Test Suite with Outputs ==="
)
start_time = time.time()
"""Test pyramid swarm hierarchical structure"""
agents = create_test_agents(6)
tasks = [
"Top task",
"Middle task 1",
"Middle task 2",
"Bottom task 1",
"Bottom task 2",
"Bottom task 3",
]
result = pyramid_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_power_swarm():
"""Test power swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Power Swarm", "Process in Power Swarm", "Analyze in Power Swarm"]
result = power_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_log_swarm():
"""Test log swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Log Swarm", "Process in Log Swarm", "Analyze in Log Swarm"]
result = log_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_exponential_swarm():
"""Test exponential swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Exponential Swarm", "Process in Exponential Swarm", "Analyze in Exponential Swarm"]
result = exponential_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_geometric_swarm():
"""Test geometric swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Geometric Swarm", "Process in Geometric Swarm", "Analyze in Geometric Swarm"]
result = geometric_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_harmonic_swarm():
"""Test harmonic swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Harmonic Swarm", "Process in Harmonic Swarm", "Analyze in Harmonic Swarm"]
result = harmonic_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_staircase_swarm():
"""Test staircase swarm pattern"""
agents = create_test_agents(10)
tasks = ["Process step 1", "Process step 2", "Process step 3", "Process step 4", "Process step 5"]
result = staircase_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
def test_sigmoid_swarm():
"""Test sigmoid swarm pattern"""
agents = create_test_agents(10)
tasks = ["Sigmoid task 1", "Sigmoid task 2", "Sigmoid task 3", "Sigmoid task 4", "Sigmoid task 5"]
result = sigmoid_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
def test_sinusoidal_swarm():
"""Test sinusoidal swarm pattern"""
agents = create_test_agents(10)
tasks = ["Wave task 1", "Wave task 2", "Wave task 3", "Wave task 4", "Wave task 5"]
result = sinusoidal_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
def test_one_to_one():
"""Test one-to-one communication pattern"""
sender = create_test_agent("Sender")
receiver = create_test_agent("Receiver")
task = "Process and relay this message"
result = one_to_one(sender, receiver, task)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
try:
# Test basic swarm patterns
test_circular_swarm()
test_grid_swarm()
test_linear_swarm()
test_star_swarm()
test_mesh_swarm()
test_pyramid_swarm()
@pytest.mark.asyncio
async def test_one_to_three():
"""Test one-to-three communication pattern"""
sender = create_test_agent("Sender")
receivers = create_test_agents(3)
task = "Process and relay this message"
# Test mathematical and pattern swarms
test_mathematical_swarms()
test_pattern_swarms()
result = await one_to_three(sender, receivers, task)
# Test communication patterns
asyncio.run(test_communication_patterns())
assert isinstance(result, list)
assert len(result) > 0
end_time = time.time()
duration = round(end_time - start_time, 2)
print("\n=== Test Suite Completed Successfully ===")
print(f"Time taken: {duration} seconds")
except Exception as e:
print("\n=== Test Suite Failed ===")
print(f"Error: {str(e)}")
raise
@pytest.mark.asyncio
async def test_broadcast():
"""Test broadcast communication pattern"""
sender = create_test_agent("Broadcaster")
receivers = create_test_agents(5)
task = "Broadcast this message"
result = await broadcast(sender, receivers, task)
if __name__ == "__main__":
run_all_tests()
assert isinstance(result, list)
assert len(result) > 0

Loading…
Cancel
Save