hotfix using datetime.now().strftime() to format the time on spreadsheet_swarm.py

adding unit test for spreadsheet_swarm

fixing issues with linting tests ruff

more linting tests

linting fixes and QA on spreadsheet_swarm.py

spreadsheet_swarm.py confirmed working on windows in tests with Harshal

multi platform spreadsheet_swarm, deps build clusterops from github source

spreadsheet and syntax fix to auto_test_eval.py

spreadsheet and finance tests confirmed to work

align repo with upstream
pull/710/head
Patrick Devaney 2 weeks ago committed by mike dupont
parent 5365915ddd
commit 8258483017

@ -958,7 +958,6 @@ def create_app() -> FastAPI:
logger.info("FastAPI application created successfully")
return app
def run_server():
"""Run the API server"""
try:
@ -970,8 +969,7 @@ def run_server():
asyncio.run(server.startup())
except Exception as e:
logger.error(f"Failed to start API: {str(e)}")
print(f"Error starting server: {str(e)}")
print(f"Error starting server: {str(e)}") # <-- Fixed here
if __name__ == "__main__":
run_server()

@ -420,4 +420,4 @@ agent = ToolAgent(
result = agent.run(
"Calculate returns for $10000 invested at 7% for 10 years"
)
)

@ -74,10 +74,12 @@ docstring_parser = "0.16" # TODO:
tiktoken = "*"
networkx = "*"
aiofiles = "*"
clusterops = "*"
# chromadb = "*"
rich = "*"
# sentence-transformers = "*"
swarm-models = "*"
termcolor = "*"
clusterops = "*"
# [tool.poetry.extras]

@ -23,4 +23,7 @@ mypy-protobuf>=3.0.0
pytest>=8.1.1
networkx
aiofiles
clusterops
clusterops
reportlab
doc-master
termcolor

@ -4,26 +4,30 @@ import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
import concurrent
from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt, wait_exponential
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.file_processing import create_file_in_folder
import concurrent
from clusterops import (
execute_on_gpu,
execute_with_cpu_cores,
execute_on_multiple_gpus,
list_available_gpus,
)
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.swarm_id_generator import generate_swarm_id
logger = initialize_logger(log_folder="concurrent_workflow")
class AgentOutputSchema(BaseModel):
run_id: Optional[str] = Field(
..., description="Unique ID for the run"

@ -1,6 +1,6 @@
import asyncio
import csv
import datetime
from datetime import datetime
import os
import uuid
from typing import Dict, List, Union
@ -16,13 +16,8 @@ from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="spreadsheet_swarm")
time = datetime.datetime.now().isoformat()
uuid_hex = uuid.uuid4().hex
# --------------- NEW CHANGE START ---------------
# Format time variable to be compatible across operating systems
formatted_time = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
# --------------- NEW CHANGE END ---------------
# Replace timestamp-based time with a UUID for file naming
run_id = uuid.uuid4().hex # Unique identifier for each run
class AgentConfig(BaseModel):
@ -43,13 +38,13 @@ class AgentOutput(BaseModel):
class SwarmRunMetadata(BaseModel):
run_id: str = Field(
default_factory=lambda: f"spreadsheet_swarm_run_{uuid_hex}"
default_factory=lambda: f"spreadsheet_swarm_run_{run_id}"
)
name: str
description: str
agents: List[str]
start_time: str = Field(
default_factory=lambda: time,
default_factory=lambda: str(datetime.now().timestamp()), # Numeric timestamp
description="The start time of the swarm run.",
)
end_time: str
@ -80,7 +75,7 @@ class SpreadSheetSwarm(BaseSwarm):
def __init__(
self,
name: str = "Spreadsheet-Swarm",
description: str = "A swarm that that processes tasks concurrently using multiple agents and saves the metadata to a CSV file.",
description: str = "A swarm that processes tasks concurrently using multiple agents and saves the metadata to a CSV file.",
agents: Union[Agent, List[Agent]] = [],
autosave_on: bool = True,
save_file_path: str = None,
@ -106,17 +101,18 @@ class SpreadSheetSwarm(BaseSwarm):
self.load_path = load_path
self.agent_configs: Dict[str, AgentConfig] = {}
# --------------- NEW CHANGE START ---------------
# The save_file_path now uses the formatted_time and uuid_hex
self.save_file_path = f"spreadsheet_swarm_{formatted_time}_run_id_{uuid_hex}.csv"
# --------------- NEW CHANGE END ---------------
# Create a timestamp without colons or periods
timestamp = datetime.now().isoformat().replace(":", "_").replace(".", "_")
# Use this timestamp in the CSV filename
self.save_file_path = f"spreadsheet_swarm_{timestamp}_run_id_{run_id}.csv"
self.metadata = SwarmRunMetadata(
run_id=f"spreadsheet_swarm_run_{time}",
run_id=f"spreadsheet_swarm_run_{run_id}",
name=name,
description=description,
agents=[agent.name for agent in agents],
start_time=time,
start_time=str(datetime.now().timestamp()), # Numeric timestamp
end_time="",
tasks_completed=0,
outputs=[],
@ -283,11 +279,30 @@ class SpreadSheetSwarm(BaseSwarm):
str: The JSON representation of the swarm metadata.
"""
try:
return asyncio.run(self._run(task, *args, **kwargs))
except Exception as e:
logger.error(f"Error running swarm: {e}")
raise e
logger.info(f"Running the swarm with task: {task}")
self.metadata.start_time = str(datetime.now().timestamp()) # Numeric timestamp
# Check if we're already in an event loop
if asyncio.get_event_loop().is_running():
# If so, create and run tasks directly using `create_task` without `asyncio.run`
task_future = asyncio.create_task(self._run_tasks(task, *args, **kwargs))
asyncio.get_event_loop().run_until_complete(task_future)
else:
# If no event loop is running, run using `asyncio.run`
asyncio.run(self._run_tasks(task, *args, **kwargs))
self.metadata.end_time = str(datetime.now().timestamp()) # Numeric timestamp
# Synchronously save metadata
logger.info("Saving metadata to CSV and JSON...")
asyncio.run(self._save_metadata())
if self.autosave_on:
self.data_to_json_file()
print(log_agent_data(self.metadata.model_dump()))
return self.metadata.model_dump_json(indent=4)
async def _run_tasks(self, task: str, *args, **kwargs):
"""
@ -357,7 +372,7 @@ class SpreadSheetSwarm(BaseSwarm):
agent_name=agent_name,
task=task,
result=result,
timestamp=time,
timestamp=str(datetime.now().timestamp()), # Numeric timestamp
)
)
@ -393,38 +408,19 @@ class SpreadSheetSwarm(BaseSwarm):
"""
Save the swarm metadata to a CSV file.
"""
logger.info(
f"Saving swarm metadata to: {self.save_file_path}"
)
logger.info(f"Saving swarm metadata to: {self.save_file_path}")
run_id = uuid.uuid4()
# Check if file exists before opening it
file_exists = os.path.exists(self.save_file_path)
async with aiofiles.open(
self.save_file_path, mode="a"
) as file:
writer = csv.writer(file)
async with aiofiles.open(self.save_file_path, mode="a") as file:
# Write header if file doesn't exist
if not file_exists:
await writer.writerow(
[
"Run ID",
"Agent Name",
"Task",
"Result",
"Timestamp",
]
)
header = "Run ID,Agent Name,Task,Result,Timestamp\n"
await file.write(header)
# Write each output as a new row
for output in self.metadata.outputs:
await writer.writerow(
[
str(run_id),
output.agent_name,
output.task,
output.result,
output.timestamp,
]
)
row = f"{run_id},{output.agent_name},{output.task},{output.result},{output.timestamp}\n"
await file.write(row)

@ -122,9 +122,9 @@ def prepare_output_for_output_model(
"""
if output_type == BaseModel:
return str_to_pydantic_model(output, output_type)
elif output_type == dict:
elif output_type is dict:
return dict_to_json_str(output)
elif output_type == str:
elif output_type is str:
return output
else:
return output

@ -20,7 +20,61 @@ COPY . .
RUN pip install poetry
# Disable virtualenv creation by poetry and install dependencies
RUN poetry config virtualenvs.create false
RUN poetry config vir# Use Python 3.11 slim-bullseye for smaller base image
FROM python:3.11-slim-bullseye AS builder
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# Set the working directory
WORKDIR /build
# Install only essential build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
g++ \
gfortran \
&& rm -rf /var/lib/apt/lists/*
# Install swarms packages
RUN pip install --no-cache-dir swarm-models swarms
# Production stage
FROM python:3.11-slim-bullseye
# Set secure environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
WORKSPACE_DIR="agent_workspace" \
PATH="/app:${PATH}" \
PYTHONPATH="/app:${PYTHONPATH}" \
USER=swarms
# Create non-root user
RUN useradd -m -s /bin/bash -U $USER && \
mkdir -p /app && \
chown -R $USER:$USER /app
# Set working directory
WORKDIR /app
# Copy only necessary files from builder
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Copy application with correct permissions
COPY --chown=$USER:$USER . .
# Switch to non-root user
USER $USER
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD python -c "import swarms; print('Health check passed')" || exit 1tualenvs.create false
# Install the 'swarms' package if it's not included in the poetry.lock
RUN pip install swarms

@ -14,7 +14,6 @@ from swarm_models import OpenAIChat
from swarms.structs.agent import Agent
@dataclass
class SwarmSystemInfo:
"""System information for Swarms issue reports."""
@ -111,7 +110,14 @@ class SwarmsIssueReporter:
gpu_info = torch.cuda.get_device_name(0)
return cuda_available, gpu_info
return False, None
except:
except ModuleNotFoundError as e:
print(f"Error: {e}")
return False, None
except RuntimeError as e:
print(f"Error: {e}")
return False, None
except Exception as e:
print(f"Unexpected error: {e}")
return False, None
def _get_system_info(self) -> SwarmSystemInfo:
@ -130,23 +136,15 @@ class SwarmsIssueReporter:
gpu_info=gpu_info,
)
def _categorize_error(
self, error: Exception, context: Dict
) -> List[str]:
def _categorize_error(self, error: Exception, context: Dict) -> List[str]:
"""Categorize the error and return appropriate labels."""
error_str = str(error).lower()
type(error).__name__
labels = ["bug", "automated"]
# Check error message and context for category keywords
for (
category,
category_labels,
) in self.ISSUE_CATEGORIES.items():
if any(
keyword in error_str for keyword in category_labels
):
for category, category_labels in self.ISSUE_CATEGORIES.items():
if any(keyword in error_str for keyword in category_labels):
labels.extend(category_labels)
break
@ -161,10 +159,7 @@ class SwarmsIssueReporter:
return list(set(labels)) # Remove duplicates
def _format_swarms_issue_body(
self,
error: Exception,
system_info: SwarmSystemInfo,
context: Dict,
self, error: Exception, system_info: SwarmSystemInfo, context: Dict
) -> str:
"""Format the issue body with Swarms-specific information."""
return f"""
@ -207,27 +202,25 @@ class SwarmsIssueReporter:
for dist in pkg_resources.working_set:
deps.append(f"- {dist.key} {dist.version}")
return "\n".join(deps)
except:
except ImportError as e:
print(f"Error: {e}")
return "Unable to fetch dependency information"
except Exception as e:
print(f"Unexpected error: {e}")
return "Unable to fetch dependency information"
# First, add this method to your SwarmsIssueReporter class
def _check_rate_limit(self) -> bool:
"""Check if we're within rate limits."""
now = datetime.now()
time_diff = (now - self.last_issue_time).total_seconds()
if (
len(self.issues_created) >= self.rate_limit
and time_diff < self.rate_period
):
if len(self.issues_created) >= self.rate_limit and time_diff < self.rate_period:
logger.warning("Rate limit exceeded for issue creation")
return False
# Clean up old issues from tracking
self.issues_created = [
time
for time in self.issues_created
if (now - time).total_seconds() < self.rate_period
time for time in self.issues_created if (now - time).total_seconds() < self.rate_period
]
return True
@ -253,9 +246,7 @@ class SwarmsIssueReporter:
"""
try:
if not self._check_rate_limit():
logger.warning(
"Skipping issue creation due to rate limit"
)
logger.warning("Skipping issue creation due to rate limit")
return None
# Collect system information
@ -286,25 +277,19 @@ class SwarmsIssueReporter:
url = f"https://api.github.com/repos/{self.REPO_OWNER}/{self.REPO_NAME}/issues"
data = {
"title": title,
"body": self._format_swarms_issue_body(
error, system_info, full_context
),
"body": self._format_swarms_issue_body(error, system_info, full_context),
"labels": labels,
}
response = requests.post(
url,
headers={
"Authorization": f"token {self.github_token}"
},
headers={"Authorization": f"token {self.github_token}"},
json=data,
)
response.raise_for_status()
issue_number = response.json()["number"]
logger.info(
f"Successfully created Swarms issue #{issue_number}"
)
logger.info(f"Successfully created Swarms issue #{issue_number}")
return issue_number
@ -314,15 +299,11 @@ class SwarmsIssueReporter:
# Setup the reporter with your GitHub token
reporter = SwarmsIssueReporter(
github_token=os.getenv("GITHUB_API_KEY")
)
reporter = SwarmsIssueReporter(github_token=os.getenv("GITHUB_API_KEY"))
# Force an error to test the reporter
try:
# This will raise an error since the input isn't valid
# Create an agent that might have issues
model = OpenAIChat(model_name="gpt-4o")
agent = Agent(agent_name="Test-Agent", max_loops=1)

@ -1,7 +1,4 @@
import time
start_time = time.time()
import os
import uuid
from swarms import Agent
@ -9,7 +6,7 @@ from swarm_models import OpenAIChat
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
start_time = time.time()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")

@ -0,0 +1,253 @@
import os
import json
import asyncio
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime
from loguru import logger
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
# Configure Loguru logger
logger.remove() # Remove default handler
logger.add(
"financial_agent_tests_{time}.log",
rotation="1 day",
retention="7 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
)
class FinancialAgentTestSuite:
"""
Production-grade test suite for Financial Analysis Agent.
This test suite provides comprehensive testing of the Financial Analysis Agent's
functionality, including initialization, configuration, and response validation.
Attributes:
test_data_path (str): Path to store test data and outputs
agent_config (Dict[str, Any]): Default configuration for test agents
"""
def __init__(self, test_data_path: str = "./test_data"):
"""
Initialize the test suite with configuration and setup.
Args:
test_data_path (str): Directory to store test data and outputs
"""
self.test_data_path = test_data_path
self.agent_config = {
"agent_name": "Test-Financial-Analysis-Agent",
"system_prompt": FINANCIAL_AGENT_SYS_PROMPT,
"model_name": "gpt-4o-mini",
"max_loops": 1,
"autosave": True,
"dashboard": False,
"verbose": True,
"dynamic_temperature_enabled": True,
"saved_state_path": "test_finance_agent.json",
"user_name": "test_user",
"retry_attempts": 1,
"context_length": 200000,
"return_step_meta": False,
"output_type": "string",
"streaming_on": False,
}
self._setup_test_environment()
def _setup_test_environment(self) -> None:
"""Create necessary directories and files for testing."""
try:
os.makedirs(self.test_data_path, exist_ok=True)
logger.info(f"Test environment setup completed at {self.test_data_path}")
except Exception as e:
logger.error(f"Failed to setup test environment: {str(e)}")
raise
async def _create_test_agent(self, config_override: Optional[Dict[str, Any]] = None) -> Agent:
"""
Create a test agent with specified or default configuration.
Args:
config_override (Optional[Dict[str, Any]]): Override default config values
Returns:
Agent: Configured test agent instance
"""
try:
test_config = self.agent_config.copy()
if config_override:
test_config.update(config_override)
agent = Agent(**test_config)
logger.debug(f"Created test agent with config: {test_config}")
return agent
except Exception as e:
logger.error(f"Failed to create test agent: {str(e)}")
raise
async def test_agent_initialization(self) -> Tuple[bool, str]:
"""
Test agent initialization with various configurations.
Returns:
Tuple[bool, str]: Success status and result message
"""
try:
logger.info("Starting agent initialization test")
# Test default initialization
agent = await self._create_test_agent()
assert isinstance(agent, Agent), "Agent initialization failed"
# Test with modified configuration
custom_config = {"max_loops": 2, "context_length": 150000}
agent_custom = await self._create_test_agent(custom_config)
assert agent_custom.max_loops == 2, "Custom configuration not applied"
logger.info("Agent initialization test passed")
return True, "Agent initialization successful"
except Exception as e:
logger.error(f"Agent initialization test failed: {str(e)}")
return False, f"Agent initialization failed: {str(e)}"
async def test_agent_response(self) -> Tuple[bool, str]:
"""
Test agent's response functionality with various queries.
Returns:
Tuple[bool, str]: Success status and result message
"""
try:
logger.info("Starting agent response test")
agent = await self._create_test_agent()
test_queries = [
"How can I establish a ROTH IRA?",
"What are the tax implications of stock trading?",
"Explain mutual fund investment strategies"
]
for query in test_queries:
response = agent.run(query)
assert isinstance(response, str), "Response type mismatch"
assert len(response) > 0, "Empty response received"
logger.debug(f"Query: {query[:50]}... | Response length: {len(response)}")
logger.info("Agent response test passed")
return True, "Agent response test successful"
except Exception as e:
logger.error(f"Agent response test failed: {str(e)}")
return False, f"Agent response test failed: {str(e)}"
async def test_agent_persistence(self) -> Tuple[bool, str]:
"""
Test agent's state persistence and recovery.
Returns:
Tuple[bool, str]: Success status and result message
"""
try:
logger.info("Starting agent persistence test")
# Test state saving
save_path = os.path.join(self.test_data_path, "test_state.json")
agent = await self._create_test_agent({"saved_state_path": save_path})
test_query = "What is a 401k plan?"
agent.run(test_query)
assert os.path.exists(save_path), "State file not created"
# Verify state content
with open(save_path, 'r') as f:
saved_state = json.load(f)
assert "agent_name" in saved_state, "Invalid state file content"
logger.info("Agent persistence test passed")
return True, "Agent persistence test successful"
except Exception as e:
logger.error(f"Agent persistence test failed: {str(e)}")
return False, f"Agent persistence test failed: {str(e)}"
async def run_all_tests(self) -> Dict[str, Any]:
"""
Run all test cases and generate a comprehensive report.
Returns:
Dict[str, Any]: Test results and statistics
"""
start_time = datetime.now()
results = []
test_cases = [
("Agent Initialization", self.test_agent_initialization),
("Agent Response", self.test_agent_response),
("Agent Persistence", self.test_agent_persistence)
]
for test_name, test_func in test_cases:
try:
success, message = await test_func()
results.append({
"test_name": test_name,
"success": success,
"message": message,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Test {test_name} failed with unexpected error: {str(e)}")
results.append({
"test_name": test_name,
"success": False,
"message": f"Unexpected error: {str(e)}",
"timestamp": datetime.now().isoformat()
})
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# Generate report
total_tests = len(results)
passed_tests = sum(1 for r in results if r["success"])
report = {
"summary": {
"total_tests": total_tests,
"passed_tests": passed_tests,
"failed_tests": total_tests - passed_tests,
"success_rate": f"{(passed_tests/total_tests)*100:.2f}%",
"duration_seconds": duration
},
"test_results": results,
"timestamp": datetime.now().isoformat()
}
# Save report
report_path = os.path.join(self.test_data_path, f"test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
logger.info(f"Test suite completed. Report saved to {report_path}")
return report
async def main():
"""Main entry point for running the test suite."""
logger.info("Starting Financial Agent Test Suite")
test_suite = FinancialAgentTestSuite()
report = await test_suite.run_all_tests()
# Print summary to console
print("\n" + "="*50)
print("Financial Agent Test Suite Results")
print("="*50)
print(f"Total Tests: {report['summary']['total_tests']}")
print(f"Passed Tests: {report['summary']['passed_tests']}")
print(f"Failed Tests: {report['summary']['failed_tests']}")
print(f"Success Rate: {report['summary']['success_rate']}")
print(f"Duration: {report['summary']['duration_seconds']:.2f} seconds")
print("="*50)
if __name__ == "__main__":
asyncio.run(main())

@ -0,0 +1,65 @@
import os
from datetime import datetime
from uuid import uuid4
# Import necessary classes from your swarm module
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.telemetry.capture_sys_data import log_agent_data
from swarms.utils.file_processing import create_file_in_folder
from swarms import SpreadSheetSwarm
# Ensure you have an environment variable or default workspace dir
workspace_dir = os.getenv("WORKSPACE_DIR", "./workspace")
def create_agents(num_agents: int):
"""
Create a list of agent instances.
Args:
num_agents (int): The number of agents to create.
Returns:
List[Agent]: List of created Agent objects.
"""
agents = []
for i in range(num_agents):
agent_name = f"Agent-{i + 1}"
agents.append(Agent(agent_name=agent_name))
return agents
def main():
# Number of agents to create
num_agents = 5
# Create the agents
agents = create_agents(num_agents)
# Initialize the swarm with agents and other configurations
swarm = SpreadSheetSwarm(
name="Test-Swarm",
description="A swarm for testing purposes.",
agents=agents,
autosave_on=True,
max_loops=2,
workspace_dir=workspace_dir
)
# Run a sample task in the swarm (synchronously)
task = "process_data"
# Ensure the run method is synchronous
swarm_metadata = swarm.run(task) # Assuming this is made synchronous
# Print swarm metadata after task completion
print("Swarm Metadata:")
print(swarm_metadata)
# Check if CSV file has been created and saved
if os.path.exists(swarm.save_file_path):
print(f"Metadata saved to: {swarm.save_file_path}")
else:
print(f"Metadata not saved correctly. Check the save path.")
# Test saving metadata to JSON file
swarm.data_to_json_file()
# Test exporting metadata to JSON
swarm_json = swarm.export_to_json()
print("Exported JSON metadata:")
print(swarm_json)
# Log agent data
print("Logging agent data:")
print(log_agent_data(swarm.metadata.model_dump()))
# Run the synchronous main function
if __name__ == "__main__":
main()

@ -0,0 +1,296 @@
import os
import json
import asyncio
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from loguru import logger
import aiofiles
from swarms.structs.agent import Agent
from swarms.structs.spreadsheet_swarm import (
SpreadSheetSwarm,
AgentOutput,
SwarmRunMetadata
)
# Configure Loguru logger
logger.remove() # Remove default handler
logger.add(
"spreadsheet_swarm_{time}.log",
rotation="1 MB",
retention="7 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
backtrace=True,
diagnose=True
)
class SpreadSheetSwarmTestSuite:
"""
Enhanced test suite for SpreadSheetSwarm functionality.
Provides comprehensive testing of swarm initialization, CSV operations,
task execution, and data persistence with detailed logging and error tracking.
"""
def __init__(self, test_data_path: str = "./test_data"):
"""
Initialize test suite with configuration.
Args:
test_data_path (str): Directory for test data and outputs
"""
self.test_data_path = test_data_path
self._setup_test_environment()
def _setup_test_environment(self) -> None:
"""Setup required test directories and resources."""
try:
os.makedirs(self.test_data_path, exist_ok=True)
logger.info(f"Test environment initialized at {self.test_data_path}")
except Exception as e:
logger.error(f"Failed to setup test environment: {e}")
raise
async def create_test_csv(self) -> str:
"""
Create a test CSV file with agent configurations.
Returns:
str: Path to created CSV file
"""
try:
csv_content = """agent_name,description,system_prompt,task
test_agent_1,Test Agent 1,System prompt 1,Task 1
test_agent_2,Test Agent 2,System prompt 2,Task 2
test_agent_3,Test Agent 3,System prompt 3,Task 3"""
file_path = os.path.join(self.test_data_path, "test_agents.csv")
async with aiofiles.open(file_path, 'w') as f:
await f.write(csv_content)
logger.debug(f"Created test CSV at {file_path} with content:\n{csv_content}")
return file_path
except Exception as e:
logger.error(f"Failed to create test CSV: {e}")
raise
def create_test_agent(self, name: str, **kwargs) -> Agent:
"""
Create a test agent with specified configuration.
Args:
name (str): Agent name
**kwargs: Additional agent configuration
Returns:
Agent: Configured test agent
"""
try:
config = {
"agent_name": name,
"system_prompt": f"Test prompt for {name}",
"model_name": "gpt-4o-mini",
"max_loops": 1,
"autosave": True,
"verbose": True,
**kwargs
}
agent = Agent(**config)
logger.debug(f"Created test agent: {name}")
return agent
except Exception as e:
logger.error(f"Failed to create agent {name}: {e}")
raise
async def test_swarm_initialization(self) -> Tuple[bool, str]:
"""
Test swarm initialization with various configurations.
Returns:
Tuple[bool, str]: Success status and message
"""
try:
logger.info("Starting swarm initialization test")
# Test basic initialization
agents = [
self.create_test_agent("agent1"),
self.create_test_agent("agent2", max_loops=2)
]
swarm = SpreadSheetSwarm(
name="Test Swarm",
description="Test Description",
agents=agents,
max_loops=2
)
# Verify configuration
assert swarm.name == "Test Swarm"
assert swarm.description == "Test Description"
assert len(swarm.agents) == 2
assert swarm.max_loops == 2
# Test empty initialization
empty_swarm = SpreadSheetSwarm()
assert len(empty_swarm.agents) == 0
logger.info("Swarm initialization test passed")
return True, "Initialization successful"
except Exception as e:
logger.error(f"Swarm initialization test failed: {e}")
return False, str(e)
async def test_csv_operations(self) -> Tuple[bool, str]:
"""
Test CSV loading and saving operations.
Returns:
Tuple[bool, str]: Success status and message
"""
try:
logger.info("Starting CSV operations test")
# Test CSV loading
csv_path = await self.create_test_csv()
swarm = SpreadSheetSwarm(load_path=csv_path)
await swarm._load_from_csv()
assert len(swarm.agents) == 3
assert len(swarm.agent_configs) == 3
# Test CSV saving
output_path = os.path.join(self.test_data_path, "test_output.csv")
swarm.save_file_path = output_path
swarm._track_output("test_agent_1", "Test task", "Test result")
await swarm._save_to_csv()
assert os.path.exists(output_path)
# Cleanup
os.remove(csv_path)
os.remove(output_path)
logger.info("CSV operations test passed")
return True, "CSV operations successful"
except Exception as e:
logger.error(f"CSV operations test failed: {e}")
return False, str(e)
async def test_task_execution(self) -> Tuple[bool, str]:
"""
Test task execution and output tracking.
Returns:
Tuple[bool, str]: Success status and message
"""
try:
logger.info("Starting task execution test")
agents = [
self.create_test_agent("agent1"),
self.create_test_agent("agent2")
]
swarm = SpreadSheetSwarm(agents=agents, max_loops=1)
# Run test tasks
test_tasks = ["Task 1", "Task 2"]
for task in test_tasks:
await swarm._run_tasks(task)
# Verify execution
assert swarm.metadata.tasks_completed == 4 # 2 agents × 2 tasks
assert len(swarm.metadata.outputs) == 4
# Test output tracking
assert all(output.agent_name in ["agent1", "agent2"]
for output in swarm.metadata.outputs)
logger.info("Task execution test passed")
return True, "Task execution successful"
except Exception as e:
logger.error(f"Task execution test failed: {e}")
return False, str(e)
async def run_all_tests(self) -> Dict[str, Any]:
"""
Execute all test cases and generate report.
Returns:
Dict[str, Any]: Comprehensive test results and metrics
"""
start_time = datetime.now()
results = []
test_cases = [
("Swarm Initialization", self.test_swarm_initialization),
("CSV Operations", self.test_csv_operations),
("Task Execution", self.test_task_execution)
]
for test_name, test_func in test_cases:
try:
success, message = await test_func()
results.append({
"test_name": test_name,
"success": success,
"message": message,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Unexpected error in {test_name}: {e}")
results.append({
"test_name": test_name,
"success": False,
"message": f"Unexpected error: {e}",
"timestamp": datetime.now().isoformat()
})
# Generate report
duration = (datetime.now() - start_time).total_seconds()
total_tests = len(results)
passed_tests = sum(1 for r in results if r["success"])
report = {
"summary": {
"total_tests": total_tests,
"passed_tests": passed_tests,
"failed_tests": total_tests - passed_tests,
"success_rate": f"{(passed_tests/total_tests)*100:.2f}%",
"duration_seconds": duration
},
"test_results": results,
"timestamp": datetime.now().isoformat()
}
# Save report
report_path = os.path.join(
self.test_data_path,
f"test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
async with aiofiles.open(report_path, 'w') as f:
await f.write(json.dumps(report, indent=2))
logger.info(f"Test suite completed. Report saved to {report_path}")
return report
async def main():
"""Entry point for test execution."""
logger.info("Starting SpreadSheetSwarm Test Suite")
test_suite = SpreadSheetSwarmTestSuite()
report = await test_suite.run_all_tests()
# Print summary
print("\n" + "="*50)
print("SpreadSheetSwarm Test Results")
print("="*50)
print(f"Total Tests: {report['summary']['total_tests']}")
print(f"Passed Tests: {report['summary']['passed_tests']}")
print(f"Failed Tests: {report['summary']['failed_tests']}")
print(f"Success Rate: {report['summary']['success_rate']}")
print(f"Duration: {report['summary']['duration_seconds']:.2f} seconds")
print("="*50)
if __name__ == "__main__":
asyncio.run(main())
Loading…
Cancel
Save