Merge pull request #1168 from Steve-Dusty/master

fix: correct Conversation.add() API calls and refactor swarm architecture tests + refactored swarms spreadsheet test
pull/1172/head
Kye Gomez 1 week ago committed by GitHub
commit a18717299f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -52,12 +52,12 @@ def circular_swarm(
for agent in flat_agents:
conversation.add(
role="User",
message=task,
content=task,
)
response = agent.run(conversation.get_str())
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -88,7 +88,7 @@ def grid_swarm(
conversation.add(
role="User",
message=tasks,
content=tasks,
)
grid_size = int(
@ -101,7 +101,7 @@ def grid_swarm(
response = agents[i * grid_size + j].run(task)
conversation.add(
role=agents[i * grid_size + j].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -139,12 +139,12 @@ def linear_swarm(
task = tasks.pop(0)
conversation.add(
role="User",
message=task,
content=task,
)
response = agent.run(conversation.get_str())
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -182,12 +182,12 @@ def star_swarm(
# Central agent processes the task
conversation.add(
role="User",
message=task,
content=task,
)
center_response = center_agent.run(conversation.get_str())
conversation.add(
role=center_agent.agent_name,
message=center_response,
content=center_response,
)
# Other agents process the same task
@ -195,7 +195,7 @@ def star_swarm(
response = agent.run(task)
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -229,7 +229,7 @@ def mesh_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
task_queue = tasks.copy()
@ -240,7 +240,7 @@ def mesh_swarm(
response = agent.run(task)
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -285,7 +285,7 @@ def pyramid_swarm(
response = agents[agent_index].run(task)
conversation.add(
role=agents[agent_index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -312,7 +312,7 @@ def fibonacci_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
fib = [1, 1]
while len(fib) < len(agents):
@ -324,7 +324,7 @@ def fibonacci_swarm(
response = agents[int(sum(fib[:i]) + j)].run(task)
conversation.add(
role=agents[int(sum(fib[:i]) + j)].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -351,7 +351,7 @@ def prime_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
primes = [
2,
@ -386,7 +386,7 @@ def prime_swarm(
output = agents[prime].run(task)
conversation.add(
role=agents[prime].agent_name,
message=output,
content=output,
)
return history_output_formatter(conversation, output_type)
@ -412,7 +412,7 @@ def power_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
powers = [2**i for i in range(int(len(agents) ** 0.5))]
for power in powers:
@ -421,7 +421,7 @@ def power_swarm(
output = agents[power].run(task)
conversation.add(
role=agents[power].agent_name,
message=output,
content=output,
)
return history_output_formatter(conversation, output_type)
@ -447,7 +447,7 @@ def log_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
if 2**i < len(agents) and tasks:
@ -455,7 +455,7 @@ def log_swarm(
output = agents[2**i].run(task)
conversation.add(
role=agents[2**i].agent_name,
message=output,
content=output,
)
return history_output_formatter(conversation, output_type)
@ -481,7 +481,7 @@ def exponential_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
@ -492,7 +492,7 @@ def exponential_swarm(
conversation.add(
role=agents[index].agent_name,
message=output,
content=output,
)
return history_output_formatter(conversation, output_type)
@ -521,7 +521,7 @@ def geometric_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
@ -531,7 +531,7 @@ def geometric_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -558,7 +558,7 @@ def harmonic_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(1, len(agents) + 1):
@ -568,7 +568,7 @@ def harmonic_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -595,7 +595,7 @@ def staircase_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
step = len(agents) // 5
@ -606,7 +606,7 @@ def staircase_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -633,7 +633,7 @@ def sigmoid_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
@ -643,7 +643,7 @@ def sigmoid_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -670,7 +670,7 @@ def sinusoidal_swarm(
conversation = Conversation()
conversation.add(
role="User",
message=tasks,
content=tasks,
)
for i in range(len(agents)):
@ -680,7 +680,7 @@ def sinusoidal_swarm(
response = agents[index].run(task)
conversation.add(
role=agents[index].agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -715,7 +715,7 @@ def one_to_one(
conversation = Conversation()
conversation.add(
role="User",
message=task,
content=task,
)
try:
@ -724,14 +724,14 @@ def one_to_one(
sender_response = sender.run(task)
conversation.add(
role=sender.agent_name,
message=sender_response,
content=sender_response,
)
# Receiver processes the result of the sender
receiver_response = receiver.run(sender_response)
conversation.add(
role=receiver.agent_name,
message=receiver_response,
content=receiver_response,
)
return history_output_formatter(conversation, output_type)
@ -769,7 +769,7 @@ async def broadcast(
conversation = Conversation()
conversation.add(
role="User",
message=task,
content=task,
)
if not sender or not agents or not task:
@ -781,7 +781,7 @@ async def broadcast(
conversation.add(
role=sender.agent_name,
message=broadcast_message,
content=broadcast_message,
)
# Then have all agents process it
@ -789,7 +789,7 @@ async def broadcast(
response = agent.run(conversation.get_str())
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)
@ -832,7 +832,7 @@ async def one_to_three(
conversation.add(
role="User",
message=task,
content=task,
)
try:
@ -840,7 +840,7 @@ async def one_to_three(
sender_message = sender.run(conversation.get_str())
conversation.add(
role=sender.agent_name,
message=sender_message,
content=sender_message,
)
# Have each receiver process the message
@ -848,7 +848,7 @@ async def one_to_three(
response = agent.run(conversation.get_str())
conversation.add(
role=agent.agent_name,
message=response,
content=response,
)
return history_output_formatter(conversation, output_type)

@ -1,226 +1,553 @@
import os
import asyncio
from loguru import logger
import json
import csv
import pytest
from swarms.structs.agent import Agent
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
def create_test_csv() -> str:
"""Create a test CSV file with agent configurations."""
print("\nStarting creation of test CSV file")
try:
csv_content = """agent_name,description,system_prompt,task
test_agent_1,Test Agent 1,System prompt 1,Task 1
test_agent_2,Test Agent 2,System prompt 2,Task 2"""
file_path = "test_agents.csv"
with open(file_path, "w") as f:
f.write(csv_content)
print(f"Created CSV with content:\n{csv_content}")
print(f"CSV file created at: {file_path}")
return file_path
except Exception as e:
logger.error(f"Failed to create test CSV: {str(e)}")
raise
def create_test_agent(name: str) -> Agent:
"""Create a test agent with specified name."""
print(f"\nCreating test agent: {name}")
try:
agent = Agent(
agent_name=name,
system_prompt=f"Test prompt for {name}",
@pytest.fixture
def temp_workspace(tmp_path):
"""Create a temporary workspace directory for test isolation."""
workspace = tmp_path / "test_workspace"
workspace.mkdir()
return str(workspace)
@pytest.fixture
def sample_csv_file(tmp_path):
"""Create a sample CSV file with agent configurations."""
csv_path = tmp_path / "test_agents.csv"
csv_content = [
["agent_name", "description", "system_prompt", "task", "model_name"],
["agent_1", "First test agent", "You are a helpful assistant. Respond with exactly 'Task completed.'", "Say hello", "gpt-4o-mini"],
["agent_2", "Second test agent", "You are a code reviewer. Respond with exactly 'Review done.'", "Review this: print('hello')", "gpt-4o-mini"],
]
with open(csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerows(csv_content)
return str(csv_path)
def test_swarm_initialization_basic(temp_workspace):
"""Test basic swarm initialization with required parameters."""
agent = Agent(
agent_name="test_agent_1",
system_prompt="You are a helpful assistant",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
name="Test Swarm",
description="Test swarm description",
agents=[agent],
workspace_dir=temp_workspace,
)
assert swarm.name == "Test Swarm"
assert swarm.description == "Test swarm description"
assert len(swarm.agents) == 1
assert swarm.max_loops == 1
assert swarm.autosave is True
assert swarm.tasks_completed == 0
assert swarm.outputs == []
def test_swarm_initialization_multiple_agents(temp_workspace):
"""Test swarm initialization with multiple agents."""
agents = [
Agent(
agent_name="agent_1",
system_prompt="You are agent 1",
model_name="gpt-4o-mini",
max_loops=1,
),
Agent(
agent_name="agent_2",
system_prompt="You are agent 2",
model_name="gpt-4o-mini",
max_loops=1,
),
]
swarm = SpreadSheetSwarm(
name="Multi Agent Swarm",
agents=agents,
workspace_dir=temp_workspace,
)
assert len(swarm.agents) == 2
assert swarm.agents[0].agent_name == "agent_1"
assert swarm.agents[1].agent_name == "agent_2"
def test_swarm_initialization_custom_max_loops(temp_workspace):
"""Test initialization with custom max_loops setting."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
name="Custom Loop Swarm",
agents=[agent],
max_loops=3,
workspace_dir=temp_workspace,
)
assert swarm.max_loops == 3
def test_swarm_initialization_autosave_disabled(temp_workspace):
"""Test initialization with autosave disabled."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
autosave=False,
workspace_dir=temp_workspace,
)
assert swarm.autosave is False
def test_swarm_save_file_path_generation(temp_workspace):
"""Test that save file path is correctly generated with workspace_dir."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
assert swarm.save_file_path is not None
assert "spreadsheet_swarm_run_id_" in swarm.save_file_path
assert swarm.save_file_path.startswith(temp_workspace)
assert swarm.save_file_path.endswith(".csv")
def test_swarm_initialization_no_agents_raises_error():
"""Test that initialization without agents raises ValueError."""
with pytest.raises(ValueError, match="No agents are provided"):
SpreadSheetSwarm(agents=None)
def test_swarm_initialization_no_max_loops_raises_error():
"""Test that initialization without max_loops raises ValueError."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
with pytest.raises(ValueError, match="No max loops are provided"):
SpreadSheetSwarm(agents=[agent], max_loops=None)
def test_track_output_single(temp_workspace):
"""Test tracking a single task output."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
assert swarm.tasks_completed == 1
assert len(swarm.outputs) == 1
assert swarm.outputs[0]["agent_name"] == "test_agent"
assert swarm.outputs[0]["task"] == "Test task"
assert swarm.outputs[0]["result"] == "Test result"
assert "timestamp" in swarm.outputs[0]
def test_track_output_multiple(temp_workspace):
"""Test tracking multiple task outputs."""
agents = [
Agent(
agent_name=f"agent_{i}",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
autosave=True,
verbose=True,
)
print(f"Created agent: {name}")
return agent
except Exception as e:
logger.error(f"Failed to create agent {name}: {str(e)}")
raise
def test_swarm_initialization() -> None:
"""Test basic swarm initialization."""
print("\n[TEST] Starting swarm initialization test")
try:
print("Creating test agents...")
agents = [
create_test_agent("agent1"),
create_test_agent("agent2"),
]
print("Initializing swarm...")
swarm = SpreadSheetSwarm(
name="Test Swarm",
description="Test Description",
agents=agents,
max_loops=2,
)
print("Verifying swarm configuration...")
assert swarm.name == "Test Swarm"
assert swarm.description == "Test Description"
assert len(swarm.agents) == 2
assert swarm.max_loops == 2
print("✅ Swarm initialization test PASSED")
except Exception as e:
logger.error(f"❌ Swarm initialization test FAILED: {str(e)}")
raise
async def test_load_from_csv() -> None:
"""Test loading agent configurations from CSV."""
print("\n[TEST] Starting CSV loading test")
try:
csv_path = create_test_csv()
print("Initializing swarm with CSV...")
swarm = SpreadSheetSwarm(load_path=csv_path)
print("Loading configurations...")
await swarm._load_from_csv()
print("Verifying loaded configurations...")
assert len(swarm.agents) == 2
assert len(swarm.agent_configs) == 2
assert "test_agent_1" in swarm.agent_configs
assert "test_agent_2" in swarm.agent_configs
os.remove(csv_path)
print(f"Cleaned up test file: {csv_path}")
print("✅ CSV loading test PASSED")
except Exception as e:
logger.error(f"❌ CSV loading test FAILED: {str(e)}")
raise
async def test_run_tasks() -> None:
"""Test running tasks with multiple agents."""
print("\n[TEST] Starting task execution test")
try:
print("Setting up test swarm...")
agents = [
create_test_agent("agent1"),
create_test_agent("agent2"),
]
swarm = SpreadSheetSwarm(agents=agents, max_loops=1)
test_task = "Test task for all agents"
print(f"Running test task: {test_task}")
await swarm._run_tasks(test_task)
print("Verifying task execution...")
assert swarm.metadata.tasks_completed == 2
assert len(swarm.metadata.outputs) == 2
print("✅ Task execution test PASSED")
except Exception as e:
logger.error(f"❌ Task execution test FAILED: {str(e)}")
raise
def test_output_tracking() -> None:
"""Test tracking of task outputs."""
print("\n[TEST] Starting output tracking test")
try:
print("Creating test swarm...")
swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")])
print("Tracking test output...")
swarm._track_output("agent1", "Test task", "Test result")
print("Verifying output tracking...")
assert swarm.metadata.tasks_completed == 1
assert len(swarm.metadata.outputs) == 1
assert swarm.metadata.outputs[0].agent_name == "agent1"
print("✅ Output tracking test PASSED")
except Exception as e:
logger.error(f"❌ Output tracking test FAILED: {str(e)}")
raise
async def test_save_to_csv() -> None:
"""Test saving metadata to CSV."""
print("\n[TEST] Starting CSV saving test")
try:
print("Setting up test data...")
swarm = SpreadSheetSwarm(
agents=[create_test_agent("agent1")],
save_file_path="test_output.csv",
)
swarm._track_output("agent1", "Test task", "Test result")
print("Saving to CSV...")
await swarm._save_to_csv()
print("Verifying file creation...")
assert os.path.exists(swarm.save_file_path)
os.remove(swarm.save_file_path)
print("Cleaned up test file")
print("✅ CSV saving test PASSED")
except Exception as e:
logger.error(f"❌ CSV saving test FAILED: {str(e)}")
raise
def test_json_export() -> None:
"""Test JSON export functionality."""
print("\n[TEST] Starting JSON export test")
try:
print("Creating test data...")
swarm = SpreadSheetSwarm(agents=[create_test_agent("agent1")])
swarm._track_output("agent1", "Test task", "Test result")
print("Exporting to JSON...")
json_output = swarm.export_to_json()
print("Verifying JSON output...")
assert isinstance(json_output, str)
assert "run_id" in json_output
assert "tasks_completed" in json_output
print("✅ JSON export test PASSED")
except Exception as e:
logger.error(f"❌ JSON export test FAILED: {str(e)}")
raise
async def run_all_tests() -> None:
"""Run all test functions."""
print("\n" + "=" * 50)
print("Starting SpreadsheetSwarm Test Suite")
print("=" * 50 + "\n")
try:
# Run synchronous tests
print("Running synchronous tests...")
test_swarm_initialization()
test_output_tracking()
test_json_export()
# Run asynchronous tests
print("\nRunning asynchronous tests...")
await test_load_from_csv()
await test_run_tasks()
await test_save_to_csv()
print("\n🎉 All tests completed successfully!")
print("=" * 50)
except Exception as e:
logger.error(f"\n❌ Test suite failed: {str(e)}")
print("=" * 50)
raise
if __name__ == "__main__":
# Run all tests
asyncio.run(run_all_tests())
for i in range(1, 4)
]
swarm = SpreadSheetSwarm(
agents=agents,
workspace_dir=temp_workspace,
)
for i in range(1, 4):
swarm._track_output(f"agent_{i}", f"Task {i}", f"Result {i}")
assert swarm.tasks_completed == 3
assert len(swarm.outputs) == 3
for i, output in enumerate(swarm.outputs, 1):
assert output["agent_name"] == f"agent_{i}"
assert output["task"] == f"Task {i}"
assert output["result"] == f"Result {i}"
def test_track_output_increments_counter(temp_workspace):
"""Test that tasks_completed counter increments correctly."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
initial_count = swarm.tasks_completed
swarm._track_output("test_agent", "Task 1", "Result 1")
assert swarm.tasks_completed == initial_count + 1
swarm._track_output("test_agent", "Task 2", "Result 2")
assert swarm.tasks_completed == initial_count + 2
def test_load_from_csv_basic(sample_csv_file, temp_workspace):
"""Test loading agents from a CSV file."""
# Initialize with empty agents list, will be populated from CSV
agent = Agent(
agent_name="placeholder",
system_prompt="placeholder",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
load_path=sample_csv_file,
workspace_dir=temp_workspace,
)
swarm._load_from_csv()
# Should have loaded 2 agents from CSV plus the initial placeholder
assert len(swarm.agents) == 3
assert len(swarm.agent_tasks) == 2
assert "agent_1" in swarm.agent_tasks
assert "agent_2" in swarm.agent_tasks
assert swarm.agent_tasks["agent_1"] == "Say hello"
assert swarm.agent_tasks["agent_2"] == "Review this: print('hello')"
def test_load_from_csv_creates_agents(sample_csv_file, temp_workspace):
"""Test that CSV loading creates proper Agent objects."""
agent = Agent(
agent_name="placeholder",
system_prompt="placeholder",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
load_path=sample_csv_file,
workspace_dir=temp_workspace,
)
swarm._load_from_csv()
# Verify the loaded agents are proper Agent instances
for agent in swarm.agents[1:]: # Skip placeholder
assert isinstance(agent, Agent)
assert hasattr(agent, "agent_name")
assert hasattr(agent, "system_prompt")
def test_load_from_nonexistent_csv(temp_workspace):
"""Test loading from non-existent CSV file."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
load_path="nonexistent_file.csv",
workspace_dir=temp_workspace,
)
# Error is caught and logged, not raised
swarm._load_from_csv()
# Should still have the original agent
assert len(swarm.agents) == 1
def test_save_to_csv_creates_file(temp_workspace):
"""Test that saving to CSV creates the output file."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
swarm._save_to_csv()
assert os.path.exists(swarm.save_file_path)
def test_save_to_csv_headers(temp_workspace):
"""Test that CSV file includes proper headers."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
swarm._save_to_csv()
with open(swarm.save_file_path, "r") as f:
reader = csv.reader(f)
headers = next(reader)
assert headers == ["Run ID", "Agent Name", "Task", "Result", "Timestamp"]
def test_save_to_csv_data(temp_workspace):
"""Test that CSV file includes the tracked output data."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
swarm._save_to_csv()
with open(swarm.save_file_path, "r") as f:
reader = csv.DictReader(f)
rows = list(reader)
assert len(rows) == 1
assert rows[0]["Agent Name"] == "test_agent"
assert rows[0]["Task"] == "Test task"
assert rows[0]["Result"] == "Test result"
def test_save_to_csv_appends(temp_workspace):
"""Test that multiple saves append to the same CSV file.
Note: _save_to_csv() saves ALL outputs in swarm.outputs each time,
so calling it twice will result in duplicates. This tests the actual behavior.
"""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Task 1", "Result 1")
swarm._save_to_csv()
swarm._track_output("test_agent", "Task 2", "Result 2")
swarm._save_to_csv()
# After first save: Task 1 (1 row)
# After second save: Task 1 + Task 2 (2 more rows)
# Total: 3 rows (Task 1 appears twice, Task 2 appears once)
with open(swarm.save_file_path, "r") as f:
reader = csv.DictReader(f)
rows = list(reader)
assert len(rows) == 3
# Verify the data
assert rows[0]["Task"] == "Task 1"
assert rows[1]["Task"] == "Task 1"
assert rows[2]["Task"] == "Task 2"
def test_export_to_json_structure(temp_workspace):
"""Test that JSON export contains expected structure."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
name="JSON Test Swarm",
description="Testing JSON export",
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Test task", "Test result")
json_output = swarm.export_to_json()
data = json.loads(json_output)
assert "run_id" in data
assert "name" in data
assert "description" in data
assert "tasks_completed" in data
assert "number_of_agents" in data
assert "outputs" in data
assert data["name"] == "JSON Test Swarm"
assert data["description"] == "Testing JSON export"
assert data["tasks_completed"] == 1
assert data["number_of_agents"] == 1
def test_export_to_json_outputs(temp_workspace):
"""Test that JSON export includes all tracked outputs."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
swarm._track_output("test_agent", "Task 1", "Result 1")
swarm._track_output("test_agent", "Task 2", "Result 2")
json_output = swarm.export_to_json()
data = json.loads(json_output)
assert len(data["outputs"]) == 2
assert data["outputs"][0]["agent_name"] == "test_agent"
assert data["outputs"][0]["task"] == "Task 1"
assert data["outputs"][1]["task"] == "Task 2"
def test_export_to_json_valid_format(temp_workspace):
"""Test that JSON export is valid JSON."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
json_output = swarm.export_to_json()
data = json.loads(json_output)
assert isinstance(data, dict)
def test_export_empty_swarm_to_json(temp_workspace):
"""Test JSON export with no completed tasks."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
workspace_dir=temp_workspace,
)
json_output = swarm.export_to_json()
data = json.loads(json_output)
assert data["tasks_completed"] == 0
assert data["outputs"] == []
assert data["number_of_agents"] == 1
def test_reliability_check_passes(temp_workspace):
"""Test that reliability check passes with valid configuration."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
max_loops=1,
workspace_dir=temp_workspace,
)
assert swarm is not None
def test_reliability_check_verbose(temp_workspace):
"""Test verbose mode during initialization."""
agent = Agent(
agent_name="test_agent",
system_prompt="Test prompt",
model_name="gpt-4o-mini",
max_loops=1,
)
swarm = SpreadSheetSwarm(
agents=[agent],
verbose=True,
workspace_dir=temp_workspace,
)
assert swarm.verbose is True

@ -1,6 +1,4 @@
import asyncio
import time
from typing import List
import pytest
from swarms.structs.agent import Agent
from swarms.structs.swarming_architectures import (
@ -34,268 +32,251 @@ def create_test_agent(name: str) -> Agent:
)
def create_test_agents(num_agents: int) -> List[Agent]:
def create_test_agents(num_agents: int) -> list[Agent]:
"""Create specified number of test agents"""
return [
create_test_agent(f"Agent{i+1}") for i in range(num_agents)
]
return [create_test_agent(f"Agent{i+1}") for i in range(num_agents)]
def print_separator():
print("\n" + "=" * 50 + "\n")
def test_circular_swarm():
"""Test circular swarm outputs"""
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Generate report",
"Summarize findings",
]
result = circular_swarm(agents, tasks)
def test_circular_swarm():
"""Test and display circular swarm outputs"""
print_separator()
print("CIRCULAR SWARM TEST")
try:
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Generate report",
"Summarize findings",
]
print("Running circular swarm with:")
print(f"Tasks: {tasks}\n")
result = circular_swarm(agents, tasks)
print("Circular Swarm Outputs:")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_grid_swarm():
"""Test and display grid swarm outputs"""
print_separator()
print("GRID SWARM TEST")
try:
agents = create_test_agents(4) # 2x2 grid
tasks = ["Task A", "Task B", "Task C", "Task D"]
"""Test grid swarm with 2x2 grid"""
agents = create_test_agents(4)
tasks = ["Task A", "Task B", "Task C", "Task D"]
print("Running grid swarm with 2x2 grid")
print(f"Tasks: {tasks}\n")
result = grid_swarm(agents, tasks)
print(grid_swarm(agents, tasks))
print(
"Grid Swarm completed - each agent processed tasks in its grid position"
)
except Exception as e:
print(f"Error: {str(e)}")
assert isinstance(result, list)
assert len(result) > 0
def test_linear_swarm():
"""Test and display linear swarm outputs"""
print_separator()
print("LINEAR SWARM TEST")
try:
agents = create_test_agents(3)
tasks = ["Research task", "Write content", "Review output"]
print("Running linear swarm with:")
print(f"Tasks: {tasks}\n")
result = linear_swarm(agents, tasks)
print("Linear Swarm Outputs:")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
"""Test linear swarm sequential processing"""
agents = create_test_agents(3)
tasks = ["Research task", "Write content", "Review output"]
result = linear_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_star_swarm():
"""Test and display star swarm outputs"""
print_separator()
print("STAR SWARM TEST")
try:
agents = create_test_agents(4) # 1 center + 3 peripheral
tasks = ["Coordinate workflow", "Process data"]
print("Running star swarm with:")
print(f"Center agent: {agents[0].agent_name}")
print(
f"Peripheral agents: {[agent.agent_name for agent in agents[1:]]}"
)
print(f"Tasks: {tasks}\n")
result = star_swarm(agents, tasks)
print("Star Swarm Outputs:")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
"""Test star swarm with central and peripheral agents"""
agents = create_test_agents(4)
tasks = ["Coordinate workflow", "Process data"]
result = star_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_mesh_swarm():
"""Test and display mesh swarm outputs"""
print_separator()
print("MESH SWARM TEST")
try:
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Process information",
"Generate insights",
]
print("Running mesh swarm with:")
print(f"Tasks: {tasks}\n")
result = mesh_swarm(agents, tasks)
print(f"Mesh Swarm Outputs: {result}")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
"""Test mesh swarm interconnected processing"""
agents = create_test_agents(3)
tasks = [
"Analyze data",
"Process information",
"Generate insights",
]
result = mesh_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_pyramid_swarm():
"""Test and display pyramid swarm outputs"""
print_separator()
print("PYRAMID SWARM TEST")
try:
agents = create_test_agents(6) # 1-2-3 pyramid
tasks = [
"Top task",
"Middle task 1",
"Middle task 2",
"Bottom task 1",
"Bottom task 2",
"Bottom task 3",
]
print("Running pyramid swarm with:")
print(f"Tasks: {tasks}\n")
result = pyramid_swarm(agents, tasks)
print(f"Pyramid Swarm Outputs: {result}")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
except Exception as e:
print(f"Error: {str(e)}")
async def test_communication_patterns():
"""Test and display agent communication patterns"""
print_separator()
print("COMMUNICATION PATTERNS TEST")
try:
sender = create_test_agent("Sender")
receiver = create_test_agent("Receiver")
task = "Process and relay this message"
print("Testing One-to-One Communication:")
result = one_to_one(sender, receiver, task)
print(f"\nOne-to-One Communication Outputs: {result}")
for log in result["history"]:
print(f"\nAgent: {log['agent_name']}")
print(f"Task: {log['task']}")
print(f"Response: {log['response']}")
print("\nTesting One-to-Three Communication:")
receivers = create_test_agents(3)
await one_to_three(sender, receivers, task)
print("\nTesting Broadcast Communication:")
broadcast_receivers = create_test_agents(5)
await broadcast(sender, broadcast_receivers, task)
except Exception as e:
print(f"Error: {str(e)}")
def test_mathematical_swarms():
"""Test and display mathematical swarm patterns"""
print_separator()
print("MATHEMATICAL SWARMS TEST")
try:
agents = create_test_agents(8)
base_tasks = ["Calculate", "Process", "Analyze"]
# Test each mathematical swarm
for swarm_type, swarm_func in [
("Power Swarm", power_swarm),
("Log Swarm", log_swarm),
("Exponential Swarm", exponential_swarm),
("Geometric Swarm", geometric_swarm),
("Harmonic Swarm", harmonic_swarm),
]:
print(f"\nTesting {swarm_type}:")
tasks = [f"{task} in {swarm_type}" for task in base_tasks]
print(f"Tasks: {tasks}")
swarm_func(agents, tasks.copy())
except Exception as e:
print(f"Error: {str(e)}")
def test_pattern_swarms():
"""Test and display pattern-based swarms"""
print_separator()
print("PATTERN-BASED SWARMS TEST")
try:
agents = create_test_agents(10)
task = "Process according to pattern"
for swarm_type, swarm_func in [
("Staircase Swarm", staircase_swarm),
("Sigmoid Swarm", sigmoid_swarm),
("Sinusoidal Swarm", sinusoidal_swarm),
]:
print(f"\nTesting {swarm_type}:")
print(f"Task: {task}")
swarm_func(agents, task)
except Exception as e:
print(f"Error: {str(e)}")
def run_all_tests():
"""Run all swarm architecture tests"""
print(
"\n=== Starting Swarm Architectures Test Suite with Outputs ==="
)
start_time = time.time()
"""Test pyramid swarm hierarchical structure"""
agents = create_test_agents(6)
tasks = [
"Top task",
"Middle task 1",
"Middle task 2",
"Bottom task 1",
"Bottom task 2",
"Bottom task 3",
]
result = pyramid_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
def test_power_swarm():
"""Test power swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Power Swarm", "Process in Power Swarm", "Analyze in Power Swarm"]
result = power_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_log_swarm():
"""Test log swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Log Swarm", "Process in Log Swarm", "Analyze in Log Swarm"]
result = log_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_exponential_swarm():
"""Test exponential swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Exponential Swarm", "Process in Exponential Swarm", "Analyze in Exponential Swarm"]
result = exponential_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_geometric_swarm():
"""Test geometric swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Geometric Swarm", "Process in Geometric Swarm", "Analyze in Geometric Swarm"]
result = geometric_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_harmonic_swarm():
"""Test harmonic swarm mathematical pattern"""
agents = create_test_agents(8)
tasks = ["Calculate in Harmonic Swarm", "Process in Harmonic Swarm", "Analyze in Harmonic Swarm"]
result = harmonic_swarm(agents, tasks.copy())
assert isinstance(result, list)
assert len(result) > 0
def test_staircase_swarm():
"""Test staircase swarm pattern"""
agents = create_test_agents(10)
tasks = ["Process step 1", "Process step 2", "Process step 3", "Process step 4", "Process step 5"]
result = staircase_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
def test_sigmoid_swarm():
"""Test sigmoid swarm pattern"""
agents = create_test_agents(10)
tasks = ["Sigmoid task 1", "Sigmoid task 2", "Sigmoid task 3", "Sigmoid task 4", "Sigmoid task 5"]
result = sigmoid_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
def test_sinusoidal_swarm():
"""Test sinusoidal swarm pattern"""
agents = create_test_agents(10)
tasks = ["Wave task 1", "Wave task 2", "Wave task 3", "Wave task 4", "Wave task 5"]
result = sinusoidal_swarm(agents, tasks)
assert isinstance(result, list)
assert len(result) > 0
def test_one_to_one():
"""Test one-to-one communication pattern"""
sender = create_test_agent("Sender")
receiver = create_test_agent("Receiver")
task = "Process and relay this message"
result = one_to_one(sender, receiver, task)
assert isinstance(result, list)
assert len(result) > 0
for log in result:
assert "role" in log
assert "content" in log
try:
# Test basic swarm patterns
test_circular_swarm()
test_grid_swarm()
test_linear_swarm()
test_star_swarm()
test_mesh_swarm()
test_pyramid_swarm()
@pytest.mark.asyncio
async def test_one_to_three():
"""Test one-to-three communication pattern"""
sender = create_test_agent("Sender")
receivers = create_test_agents(3)
task = "Process and relay this message"
# Test mathematical and pattern swarms
test_mathematical_swarms()
test_pattern_swarms()
result = await one_to_three(sender, receivers, task)
# Test communication patterns
asyncio.run(test_communication_patterns())
assert isinstance(result, list)
assert len(result) > 0
end_time = time.time()
duration = round(end_time - start_time, 2)
print("\n=== Test Suite Completed Successfully ===")
print(f"Time taken: {duration} seconds")
except Exception as e:
print("\n=== Test Suite Failed ===")
print(f"Error: {str(e)}")
raise
@pytest.mark.asyncio
async def test_broadcast():
"""Test broadcast communication pattern"""
sender = create_test_agent("Broadcaster")
receivers = create_test_agents(5)
task = "Broadcast this message"
result = await broadcast(sender, receivers, task)
if __name__ == "__main__":
run_all_tests()
assert isinstance(result, list)
assert len(result) > 0

Loading…
Cancel
Save