diff --git a/api/main.py b/api/main.py index b210a29c..fc498369 100644 --- a/api/main.py +++ b/api/main.py @@ -958,7 +958,6 @@ def create_app() -> FastAPI: logger.info("FastAPI application created successfully") return app - def run_server(): """Run the API server""" try: @@ -970,8 +969,7 @@ def run_server(): asyncio.run(server.startup()) except Exception as e: logger.error(f"Failed to start API: {str(e)}") - print(f"Error starting server: {str(e)}") - + print(f"Error starting server: {str(e)}") # <-- Fixed here if __name__ == "__main__": run_server() diff --git a/new_features_examples/multi_tool_usage_agent.py b/new_features_examples/multi_tool_usage_agent.py index c51596ad..58fae9cb 100644 --- a/new_features_examples/multi_tool_usage_agent.py +++ b/new_features_examples/multi_tool_usage_agent.py @@ -420,4 +420,4 @@ agent = ToolAgent( result = agent.run( "Calculate returns for $10000 invested at 7% for 10 years" -) +) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 6ef6f1f2..52980a31 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,10 +74,12 @@ docstring_parser = "0.16" # TODO: tiktoken = "*" networkx = "*" aiofiles = "*" -clusterops = "*" # chromadb = "*" rich = "*" # sentence-transformers = "*" +swarm-models = "*" +termcolor = "*" +clusterops = "*" # [tool.poetry.extras] diff --git a/requirements.txt b/requirements.txt index 22fe7382..0cacd211 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,7 @@ mypy-protobuf>=3.0.0 pytest>=8.1.1 networkx aiofiles -clusterops \ No newline at end of file +clusterops +reportlab +doc-master +termcolor diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 9df994c3..9b671448 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -4,26 +4,30 @@ import uuid from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import Any, Dict, List, Optional, Union +import concurrent from pydantic import BaseModel, Field from tenacity import retry, stop_after_attempt, wait_exponential -from swarms.structs.agent import Agent -from swarms.structs.base_swarm import BaseSwarm -from swarms.utils.file_processing import create_file_in_folder -import concurrent from clusterops import ( execute_on_gpu, execute_with_cpu_cores, execute_on_multiple_gpus, list_available_gpus, ) + +from swarms.structs.agent import Agent +from swarms.structs.base_swarm import BaseSwarm +from swarms.utils.file_processing import create_file_in_folder + from swarms.utils.loguru_logger import initialize_logger from swarms.structs.swarm_id_generator import generate_swarm_id logger = initialize_logger(log_folder="concurrent_workflow") + + class AgentOutputSchema(BaseModel): run_id: Optional[str] = Field( ..., description="Unique ID for the run" diff --git a/swarms/structs/spreadsheet_swarm.py b/swarms/structs/spreadsheet_swarm.py index 6fd2044d..8f014cef 100644 --- a/swarms/structs/spreadsheet_swarm.py +++ b/swarms/structs/spreadsheet_swarm.py @@ -1,6 +1,6 @@ import asyncio import csv -import datetime +from datetime import datetime import os import uuid from typing import Dict, List, Union @@ -16,13 +16,8 @@ from swarms.utils.loguru_logger import initialize_logger logger = initialize_logger(log_folder="spreadsheet_swarm") -time = datetime.datetime.now().isoformat() -uuid_hex = uuid.uuid4().hex - -# --------------- NEW CHANGE START --------------- -# Format time variable to be compatible across operating systems -formatted_time = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") -# --------------- NEW CHANGE END --------------- +# Replace timestamp-based time with a UUID for file naming +run_id = uuid.uuid4().hex # Unique identifier for each run class AgentConfig(BaseModel): @@ -43,13 +38,13 @@ class AgentOutput(BaseModel): class SwarmRunMetadata(BaseModel): run_id: str = Field( - default_factory=lambda: f"spreadsheet_swarm_run_{uuid_hex}" + default_factory=lambda: f"spreadsheet_swarm_run_{run_id}" ) name: str description: str agents: List[str] start_time: str = Field( - default_factory=lambda: time, + default_factory=lambda: str(datetime.now().timestamp()), # Numeric timestamp description="The start time of the swarm run.", ) end_time: str @@ -80,7 +75,7 @@ class SpreadSheetSwarm(BaseSwarm): def __init__( self, name: str = "Spreadsheet-Swarm", - description: str = "A swarm that that processes tasks concurrently using multiple agents and saves the metadata to a CSV file.", + description: str = "A swarm that processes tasks concurrently using multiple agents and saves the metadata to a CSV file.", agents: Union[Agent, List[Agent]] = [], autosave_on: bool = True, save_file_path: str = None, @@ -106,17 +101,18 @@ class SpreadSheetSwarm(BaseSwarm): self.load_path = load_path self.agent_configs: Dict[str, AgentConfig] = {} - # --------------- NEW CHANGE START --------------- - # The save_file_path now uses the formatted_time and uuid_hex - self.save_file_path = f"spreadsheet_swarm_{formatted_time}_run_id_{uuid_hex}.csv" - # --------------- NEW CHANGE END --------------- + # Create a timestamp without colons or periods + timestamp = datetime.now().isoformat().replace(":", "_").replace(".", "_") + + # Use this timestamp in the CSV filename + self.save_file_path = f"spreadsheet_swarm_{timestamp}_run_id_{run_id}.csv" self.metadata = SwarmRunMetadata( - run_id=f"spreadsheet_swarm_run_{time}", + run_id=f"spreadsheet_swarm_run_{run_id}", name=name, description=description, agents=[agent.name for agent in agents], - start_time=time, + start_time=str(datetime.now().timestamp()), # Numeric timestamp end_time="", tasks_completed=0, outputs=[], @@ -283,11 +279,30 @@ class SpreadSheetSwarm(BaseSwarm): str: The JSON representation of the swarm metadata. """ - try: - return asyncio.run(self._run(task, *args, **kwargs)) - except Exception as e: - logger.error(f"Error running swarm: {e}") - raise e + logger.info(f"Running the swarm with task: {task}") + self.metadata.start_time = str(datetime.now().timestamp()) # Numeric timestamp + + # Check if we're already in an event loop + if asyncio.get_event_loop().is_running(): + # If so, create and run tasks directly using `create_task` without `asyncio.run` + task_future = asyncio.create_task(self._run_tasks(task, *args, **kwargs)) + asyncio.get_event_loop().run_until_complete(task_future) + else: + # If no event loop is running, run using `asyncio.run` + asyncio.run(self._run_tasks(task, *args, **kwargs)) + + self.metadata.end_time = str(datetime.now().timestamp()) # Numeric timestamp + + # Synchronously save metadata + logger.info("Saving metadata to CSV and JSON...") + asyncio.run(self._save_metadata()) + + if self.autosave_on: + self.data_to_json_file() + + print(log_agent_data(self.metadata.model_dump())) + + return self.metadata.model_dump_json(indent=4) async def _run_tasks(self, task: str, *args, **kwargs): """ @@ -357,7 +372,7 @@ class SpreadSheetSwarm(BaseSwarm): agent_name=agent_name, task=task, result=result, - timestamp=time, + timestamp=str(datetime.now().timestamp()), # Numeric timestamp ) ) @@ -393,38 +408,19 @@ class SpreadSheetSwarm(BaseSwarm): """ Save the swarm metadata to a CSV file. """ - logger.info( - f"Saving swarm metadata to: {self.save_file_path}" - ) + logger.info(f"Saving swarm metadata to: {self.save_file_path}") run_id = uuid.uuid4() # Check if file exists before opening it file_exists = os.path.exists(self.save_file_path) - async with aiofiles.open( - self.save_file_path, mode="a" - ) as file: - writer = csv.writer(file) - + async with aiofiles.open(self.save_file_path, mode="a") as file: # Write header if file doesn't exist if not file_exists: - await writer.writerow( - [ - "Run ID", - "Agent Name", - "Task", - "Result", - "Timestamp", - ] - ) + header = "Run ID,Agent Name,Task,Result,Timestamp\n" + await file.write(header) + # Write each output as a new row for output in self.metadata.outputs: - await writer.writerow( - [ - str(run_id), - output.agent_name, - output.task, - output.result, - output.timestamp, - ] - ) + row = f"{run_id},{output.agent_name},{output.task},{output.result},{output.timestamp}\n" + await file.write(row) \ No newline at end of file diff --git a/swarms/tools/func_calling_utils.py b/swarms/tools/func_calling_utils.py index c5a5bd83..b02a95ec 100644 --- a/swarms/tools/func_calling_utils.py +++ b/swarms/tools/func_calling_utils.py @@ -122,9 +122,9 @@ def prepare_output_for_output_model( """ if output_type == BaseModel: return str_to_pydantic_model(output, output_type) - elif output_type == dict: + elif output_type is dict: return dict_to_json_str(output) - elif output_type == str: + elif output_type is str: return output else: return output diff --git a/tests/Dockerfile b/tests/Dockerfile index f6e46515..1643b231 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -20,7 +20,61 @@ COPY . . RUN pip install poetry # Disable virtualenv creation by poetry and install dependencies -RUN poetry config virtualenvs.create false +RUN poetry config vir# Use Python 3.11 slim-bullseye for smaller base image +FROM python:3.11-slim-bullseye AS builder + +# Set environment variables +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +# Set the working directory +WORKDIR /build + +# Install only essential build dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + gcc \ + g++ \ + gfortran \ + && rm -rf /var/lib/apt/lists/* + +# Install swarms packages +RUN pip install --no-cache-dir swarm-models swarms + +# Production stage +FROM python:3.11-slim-bullseye + +# Set secure environment variables +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + WORKSPACE_DIR="agent_workspace" \ + PATH="/app:${PATH}" \ + PYTHONPATH="/app:${PYTHONPATH}" \ + USER=swarms + +# Create non-root user +RUN useradd -m -s /bin/bash -U $USER && \ + mkdir -p /app && \ + chown -R $USER:$USER /app + +# Set working directory +WORKDIR /app + +# Copy only necessary files from builder +COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin + +# Copy application with correct permissions +COPY --chown=$USER:$USER . . + +# Switch to non-root user +USER $USER + +# Health check +HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ + CMD python -c "import swarms; print('Health check passed')" || exit 1tualenvs.create false # Install the 'swarms' package if it's not included in the poetry.lock RUN pip install swarms diff --git a/tests/agent_evals/auto_test_eval.py b/tests/agent_evals/auto_test_eval.py index b9c770fa..fd529a75 100644 --- a/tests/agent_evals/auto_test_eval.py +++ b/tests/agent_evals/auto_test_eval.py @@ -14,7 +14,6 @@ from swarm_models import OpenAIChat from swarms.structs.agent import Agent - @dataclass class SwarmSystemInfo: """System information for Swarms issue reports.""" @@ -111,7 +110,14 @@ class SwarmsIssueReporter: gpu_info = torch.cuda.get_device_name(0) return cuda_available, gpu_info return False, None - except: + except ModuleNotFoundError as e: + print(f"Error: {e}") + return False, None + except RuntimeError as e: + print(f"Error: {e}") + return False, None + except Exception as e: + print(f"Unexpected error: {e}") return False, None def _get_system_info(self) -> SwarmSystemInfo: @@ -130,23 +136,15 @@ class SwarmsIssueReporter: gpu_info=gpu_info, ) - def _categorize_error( - self, error: Exception, context: Dict - ) -> List[str]: + def _categorize_error(self, error: Exception, context: Dict) -> List[str]: """Categorize the error and return appropriate labels.""" error_str = str(error).lower() - type(error).__name__ labels = ["bug", "automated"] # Check error message and context for category keywords - for ( - category, - category_labels, - ) in self.ISSUE_CATEGORIES.items(): - if any( - keyword in error_str for keyword in category_labels - ): + for category, category_labels in self.ISSUE_CATEGORIES.items(): + if any(keyword in error_str for keyword in category_labels): labels.extend(category_labels) break @@ -161,10 +159,7 @@ class SwarmsIssueReporter: return list(set(labels)) # Remove duplicates def _format_swarms_issue_body( - self, - error: Exception, - system_info: SwarmSystemInfo, - context: Dict, + self, error: Exception, system_info: SwarmSystemInfo, context: Dict ) -> str: """Format the issue body with Swarms-specific information.""" return f""" @@ -207,27 +202,25 @@ class SwarmsIssueReporter: for dist in pkg_resources.working_set: deps.append(f"- {dist.key} {dist.version}") return "\n".join(deps) - except: + except ImportError as e: + print(f"Error: {e}") + return "Unable to fetch dependency information" + except Exception as e: + print(f"Unexpected error: {e}") return "Unable to fetch dependency information" - # First, add this method to your SwarmsIssueReporter class def _check_rate_limit(self) -> bool: """Check if we're within rate limits.""" now = datetime.now() time_diff = (now - self.last_issue_time).total_seconds() - if ( - len(self.issues_created) >= self.rate_limit - and time_diff < self.rate_period - ): + if len(self.issues_created) >= self.rate_limit and time_diff < self.rate_period: logger.warning("Rate limit exceeded for issue creation") return False # Clean up old issues from tracking self.issues_created = [ - time - for time in self.issues_created - if (now - time).total_seconds() < self.rate_period + time for time in self.issues_created if (now - time).total_seconds() < self.rate_period ] return True @@ -253,9 +246,7 @@ class SwarmsIssueReporter: """ try: if not self._check_rate_limit(): - logger.warning( - "Skipping issue creation due to rate limit" - ) + logger.warning("Skipping issue creation due to rate limit") return None # Collect system information @@ -286,25 +277,19 @@ class SwarmsIssueReporter: url = f"https://api.github.com/repos/{self.REPO_OWNER}/{self.REPO_NAME}/issues" data = { "title": title, - "body": self._format_swarms_issue_body( - error, system_info, full_context - ), + "body": self._format_swarms_issue_body(error, system_info, full_context), "labels": labels, } response = requests.post( url, - headers={ - "Authorization": f"token {self.github_token}" - }, + headers={"Authorization": f"token {self.github_token}"}, json=data, ) response.raise_for_status() issue_number = response.json()["number"] - logger.info( - f"Successfully created Swarms issue #{issue_number}" - ) + logger.info(f"Successfully created Swarms issue #{issue_number}") return issue_number @@ -314,15 +299,11 @@ class SwarmsIssueReporter: # Setup the reporter with your GitHub token -reporter = SwarmsIssueReporter( - github_token=os.getenv("GITHUB_API_KEY") -) - +reporter = SwarmsIssueReporter(github_token=os.getenv("GITHUB_API_KEY")) # Force an error to test the reporter try: # This will raise an error since the input isn't valid - # Create an agent that might have issues model = OpenAIChat(model_name="gpt-4o") agent = Agent(agent_name="Test-Agent", max_loops=1) diff --git a/tests/profiling_agent.py b/tests/profiling_agent.py index 8f1b0220..cb04552c 100644 --- a/tests/profiling_agent.py +++ b/tests/profiling_agent.py @@ -1,7 +1,4 @@ import time - -start_time = time.time() - import os import uuid from swarms import Agent @@ -9,7 +6,7 @@ from swarm_models import OpenAIChat from swarms.prompts.finance_agent_sys_prompt import ( FINANCIAL_AGENT_SYS_PROMPT, ) - +start_time = time.time() # Get the OpenAI API key from the environment variable api_key = os.getenv("OPENAI_API_KEY") diff --git a/tests/structs/test_financial_agent.py b/tests/structs/test_financial_agent.py new file mode 100644 index 00000000..fb96c999 --- /dev/null +++ b/tests/structs/test_financial_agent.py @@ -0,0 +1,253 @@ +import os +import json +import asyncio +from typing import Optional, Dict, Any, List, Tuple +from datetime import datetime +from loguru import logger +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT + +# Configure Loguru logger +logger.remove() # Remove default handler +logger.add( + "financial_agent_tests_{time}.log", + rotation="1 day", + retention="7 days", + level="DEBUG", + format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", +) + +class FinancialAgentTestSuite: + """ + Production-grade test suite for Financial Analysis Agent. + + This test suite provides comprehensive testing of the Financial Analysis Agent's + functionality, including initialization, configuration, and response validation. + + Attributes: + test_data_path (str): Path to store test data and outputs + agent_config (Dict[str, Any]): Default configuration for test agents + """ + + def __init__(self, test_data_path: str = "./test_data"): + """ + Initialize the test suite with configuration and setup. + + Args: + test_data_path (str): Directory to store test data and outputs + """ + self.test_data_path = test_data_path + self.agent_config = { + "agent_name": "Test-Financial-Analysis-Agent", + "system_prompt": FINANCIAL_AGENT_SYS_PROMPT, + "model_name": "gpt-4o-mini", + "max_loops": 1, + "autosave": True, + "dashboard": False, + "verbose": True, + "dynamic_temperature_enabled": True, + "saved_state_path": "test_finance_agent.json", + "user_name": "test_user", + "retry_attempts": 1, + "context_length": 200000, + "return_step_meta": False, + "output_type": "string", + "streaming_on": False, + } + self._setup_test_environment() + + def _setup_test_environment(self) -> None: + """Create necessary directories and files for testing.""" + try: + os.makedirs(self.test_data_path, exist_ok=True) + logger.info(f"Test environment setup completed at {self.test_data_path}") + except Exception as e: + logger.error(f"Failed to setup test environment: {str(e)}") + raise + + async def _create_test_agent(self, config_override: Optional[Dict[str, Any]] = None) -> Agent: + """ + Create a test agent with specified or default configuration. + + Args: + config_override (Optional[Dict[str, Any]]): Override default config values + + Returns: + Agent: Configured test agent instance + """ + try: + test_config = self.agent_config.copy() + if config_override: + test_config.update(config_override) + + agent = Agent(**test_config) + logger.debug(f"Created test agent with config: {test_config}") + return agent + except Exception as e: + logger.error(f"Failed to create test agent: {str(e)}") + raise + + async def test_agent_initialization(self) -> Tuple[bool, str]: + """ + Test agent initialization with various configurations. + + Returns: + Tuple[bool, str]: Success status and result message + """ + try: + logger.info("Starting agent initialization test") + + # Test default initialization + agent = await self._create_test_agent() + assert isinstance(agent, Agent), "Agent initialization failed" + + # Test with modified configuration + custom_config = {"max_loops": 2, "context_length": 150000} + agent_custom = await self._create_test_agent(custom_config) + assert agent_custom.max_loops == 2, "Custom configuration not applied" + + logger.info("Agent initialization test passed") + return True, "Agent initialization successful" + except Exception as e: + logger.error(f"Agent initialization test failed: {str(e)}") + return False, f"Agent initialization failed: {str(e)}" + + async def test_agent_response(self) -> Tuple[bool, str]: + """ + Test agent's response functionality with various queries. + + Returns: + Tuple[bool, str]: Success status and result message + """ + try: + logger.info("Starting agent response test") + agent = await self._create_test_agent() + + test_queries = [ + "How can I establish a ROTH IRA?", + "What are the tax implications of stock trading?", + "Explain mutual fund investment strategies" + ] + + for query in test_queries: + response = agent.run(query) + assert isinstance(response, str), "Response type mismatch" + assert len(response) > 0, "Empty response received" + logger.debug(f"Query: {query[:50]}... | Response length: {len(response)}") + + logger.info("Agent response test passed") + return True, "Agent response test successful" + except Exception as e: + logger.error(f"Agent response test failed: {str(e)}") + return False, f"Agent response test failed: {str(e)}" + + async def test_agent_persistence(self) -> Tuple[bool, str]: + """ + Test agent's state persistence and recovery. + + Returns: + Tuple[bool, str]: Success status and result message + """ + try: + logger.info("Starting agent persistence test") + + # Test state saving + save_path = os.path.join(self.test_data_path, "test_state.json") + agent = await self._create_test_agent({"saved_state_path": save_path}) + + test_query = "What is a 401k plan?" + agent.run(test_query) + + assert os.path.exists(save_path), "State file not created" + + # Verify state content + with open(save_path, 'r') as f: + saved_state = json.load(f) + assert "agent_name" in saved_state, "Invalid state file content" + + logger.info("Agent persistence test passed") + return True, "Agent persistence test successful" + except Exception as e: + logger.error(f"Agent persistence test failed: {str(e)}") + return False, f"Agent persistence test failed: {str(e)}" + + async def run_all_tests(self) -> Dict[str, Any]: + """ + Run all test cases and generate a comprehensive report. + + Returns: + Dict[str, Any]: Test results and statistics + """ + start_time = datetime.now() + results = [] + + test_cases = [ + ("Agent Initialization", self.test_agent_initialization), + ("Agent Response", self.test_agent_response), + ("Agent Persistence", self.test_agent_persistence) + ] + + for test_name, test_func in test_cases: + try: + success, message = await test_func() + results.append({ + "test_name": test_name, + "success": success, + "message": message, + "timestamp": datetime.now().isoformat() + }) + except Exception as e: + logger.error(f"Test {test_name} failed with unexpected error: {str(e)}") + results.append({ + "test_name": test_name, + "success": False, + "message": f"Unexpected error: {str(e)}", + "timestamp": datetime.now().isoformat() + }) + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + # Generate report + total_tests = len(results) + passed_tests = sum(1 for r in results if r["success"]) + + report = { + "summary": { + "total_tests": total_tests, + "passed_tests": passed_tests, + "failed_tests": total_tests - passed_tests, + "success_rate": f"{(passed_tests/total_tests)*100:.2f}%", + "duration_seconds": duration + }, + "test_results": results, + "timestamp": datetime.now().isoformat() + } + + # Save report + report_path = os.path.join(self.test_data_path, f"test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") + with open(report_path, 'w') as f: + json.dump(report, f, indent=2) + + logger.info(f"Test suite completed. Report saved to {report_path}") + return report + +async def main(): + """Main entry point for running the test suite.""" + logger.info("Starting Financial Agent Test Suite") + test_suite = FinancialAgentTestSuite() + report = await test_suite.run_all_tests() + + # Print summary to console + print("\n" + "="*50) + print("Financial Agent Test Suite Results") + print("="*50) + print(f"Total Tests: {report['summary']['total_tests']}") + print(f"Passed Tests: {report['summary']['passed_tests']}") + print(f"Failed Tests: {report['summary']['failed_tests']}") + print(f"Success Rate: {report['summary']['success_rate']}") + print(f"Duration: {report['summary']['duration_seconds']:.2f} seconds") + print("="*50) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tests/structs/test_spreadsheet_swarm.py b/tests/structs/test_spreadsheet_swarm.py new file mode 100644 index 00000000..4f1346c1 --- /dev/null +++ b/tests/structs/test_spreadsheet_swarm.py @@ -0,0 +1,65 @@ +import os +from datetime import datetime +from uuid import uuid4 +# Import necessary classes from your swarm module +from swarms.structs.agent import Agent +from swarms.structs.base_swarm import BaseSwarm +from swarms.telemetry.capture_sys_data import log_agent_data +from swarms.utils.file_processing import create_file_in_folder +from swarms import SpreadSheetSwarm +# Ensure you have an environment variable or default workspace dir +workspace_dir = os.getenv("WORKSPACE_DIR", "./workspace") +def create_agents(num_agents: int): + """ + Create a list of agent instances. + + Args: + num_agents (int): The number of agents to create. + + Returns: + List[Agent]: List of created Agent objects. + """ + agents = [] + for i in range(num_agents): + agent_name = f"Agent-{i + 1}" + agents.append(Agent(agent_name=agent_name)) + return agents +def main(): + # Number of agents to create + num_agents = 5 + # Create the agents + agents = create_agents(num_agents) + # Initialize the swarm with agents and other configurations + swarm = SpreadSheetSwarm( + name="Test-Swarm", + description="A swarm for testing purposes.", + agents=agents, + autosave_on=True, + max_loops=2, + workspace_dir=workspace_dir + ) + # Run a sample task in the swarm (synchronously) + task = "process_data" + + # Ensure the run method is synchronous + swarm_metadata = swarm.run(task) # Assuming this is made synchronous + # Print swarm metadata after task completion + print("Swarm Metadata:") + print(swarm_metadata) + # Check if CSV file has been created and saved + if os.path.exists(swarm.save_file_path): + print(f"Metadata saved to: {swarm.save_file_path}") + else: + print(f"Metadata not saved correctly. Check the save path.") + # Test saving metadata to JSON file + swarm.data_to_json_file() + # Test exporting metadata to JSON + swarm_json = swarm.export_to_json() + print("Exported JSON metadata:") + print(swarm_json) + # Log agent data + print("Logging agent data:") + print(log_agent_data(swarm.metadata.model_dump())) +# Run the synchronous main function +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/structs/test_spreadsheet_swarm_v2.py b/tests/structs/test_spreadsheet_swarm_v2.py new file mode 100644 index 00000000..5f3ff325 --- /dev/null +++ b/tests/structs/test_spreadsheet_swarm_v2.py @@ -0,0 +1,296 @@ +import os +import json +import asyncio +from typing import List, Dict, Any, Optional, Tuple +from datetime import datetime +from loguru import logger +import aiofiles +from swarms.structs.agent import Agent +from swarms.structs.spreadsheet_swarm import ( + SpreadSheetSwarm, + AgentOutput, + SwarmRunMetadata +) + +# Configure Loguru logger +logger.remove() # Remove default handler +logger.add( + "spreadsheet_swarm_{time}.log", + rotation="1 MB", + retention="7 days", + level="DEBUG", + format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", + backtrace=True, + diagnose=True +) + +class SpreadSheetSwarmTestSuite: + """ + Enhanced test suite for SpreadSheetSwarm functionality. + + Provides comprehensive testing of swarm initialization, CSV operations, + task execution, and data persistence with detailed logging and error tracking. + """ + + def __init__(self, test_data_path: str = "./test_data"): + """ + Initialize test suite with configuration. + + Args: + test_data_path (str): Directory for test data and outputs + """ + self.test_data_path = test_data_path + self._setup_test_environment() + + def _setup_test_environment(self) -> None: + """Setup required test directories and resources.""" + try: + os.makedirs(self.test_data_path, exist_ok=True) + logger.info(f"Test environment initialized at {self.test_data_path}") + except Exception as e: + logger.error(f"Failed to setup test environment: {e}") + raise + + async def create_test_csv(self) -> str: + """ + Create a test CSV file with agent configurations. + + Returns: + str: Path to created CSV file + """ + try: + csv_content = """agent_name,description,system_prompt,task +test_agent_1,Test Agent 1,System prompt 1,Task 1 +test_agent_2,Test Agent 2,System prompt 2,Task 2 +test_agent_3,Test Agent 3,System prompt 3,Task 3""" + + file_path = os.path.join(self.test_data_path, "test_agents.csv") + async with aiofiles.open(file_path, 'w') as f: + await f.write(csv_content) + + logger.debug(f"Created test CSV at {file_path} with content:\n{csv_content}") + return file_path + except Exception as e: + logger.error(f"Failed to create test CSV: {e}") + raise + + def create_test_agent(self, name: str, **kwargs) -> Agent: + """ + Create a test agent with specified configuration. + + Args: + name (str): Agent name + **kwargs: Additional agent configuration + + Returns: + Agent: Configured test agent + """ + try: + config = { + "agent_name": name, + "system_prompt": f"Test prompt for {name}", + "model_name": "gpt-4o-mini", + "max_loops": 1, + "autosave": True, + "verbose": True, + **kwargs + } + agent = Agent(**config) + logger.debug(f"Created test agent: {name}") + return agent + except Exception as e: + logger.error(f"Failed to create agent {name}: {e}") + raise + + async def test_swarm_initialization(self) -> Tuple[bool, str]: + """ + Test swarm initialization with various configurations. + + Returns: + Tuple[bool, str]: Success status and message + """ + try: + logger.info("Starting swarm initialization test") + + # Test basic initialization + agents = [ + self.create_test_agent("agent1"), + self.create_test_agent("agent2", max_loops=2) + ] + + swarm = SpreadSheetSwarm( + name="Test Swarm", + description="Test Description", + agents=agents, + max_loops=2 + ) + + # Verify configuration + assert swarm.name == "Test Swarm" + assert swarm.description == "Test Description" + assert len(swarm.agents) == 2 + assert swarm.max_loops == 2 + + # Test empty initialization + empty_swarm = SpreadSheetSwarm() + assert len(empty_swarm.agents) == 0 + + logger.info("Swarm initialization test passed") + return True, "Initialization successful" + except Exception as e: + logger.error(f"Swarm initialization test failed: {e}") + return False, str(e) + + async def test_csv_operations(self) -> Tuple[bool, str]: + """ + Test CSV loading and saving operations. + + Returns: + Tuple[bool, str]: Success status and message + """ + try: + logger.info("Starting CSV operations test") + + # Test CSV loading + csv_path = await self.create_test_csv() + swarm = SpreadSheetSwarm(load_path=csv_path) + await swarm._load_from_csv() + + assert len(swarm.agents) == 3 + assert len(swarm.agent_configs) == 3 + + # Test CSV saving + output_path = os.path.join(self.test_data_path, "test_output.csv") + swarm.save_file_path = output_path + swarm._track_output("test_agent_1", "Test task", "Test result") + await swarm._save_to_csv() + + assert os.path.exists(output_path) + + # Cleanup + os.remove(csv_path) + os.remove(output_path) + + logger.info("CSV operations test passed") + return True, "CSV operations successful" + except Exception as e: + logger.error(f"CSV operations test failed: {e}") + return False, str(e) + + async def test_task_execution(self) -> Tuple[bool, str]: + """ + Test task execution and output tracking. + + Returns: + Tuple[bool, str]: Success status and message + """ + try: + logger.info("Starting task execution test") + + agents = [ + self.create_test_agent("agent1"), + self.create_test_agent("agent2") + ] + swarm = SpreadSheetSwarm(agents=agents, max_loops=1) + + # Run test tasks + test_tasks = ["Task 1", "Task 2"] + for task in test_tasks: + await swarm._run_tasks(task) + + # Verify execution + assert swarm.metadata.tasks_completed == 4 # 2 agents × 2 tasks + assert len(swarm.metadata.outputs) == 4 + + # Test output tracking + assert all(output.agent_name in ["agent1", "agent2"] + for output in swarm.metadata.outputs) + + logger.info("Task execution test passed") + return True, "Task execution successful" + except Exception as e: + logger.error(f"Task execution test failed: {e}") + return False, str(e) + + async def run_all_tests(self) -> Dict[str, Any]: + """ + Execute all test cases and generate report. + + Returns: + Dict[str, Any]: Comprehensive test results and metrics + """ + start_time = datetime.now() + results = [] + + test_cases = [ + ("Swarm Initialization", self.test_swarm_initialization), + ("CSV Operations", self.test_csv_operations), + ("Task Execution", self.test_task_execution) + ] + + for test_name, test_func in test_cases: + try: + success, message = await test_func() + results.append({ + "test_name": test_name, + "success": success, + "message": message, + "timestamp": datetime.now().isoformat() + }) + except Exception as e: + logger.error(f"Unexpected error in {test_name}: {e}") + results.append({ + "test_name": test_name, + "success": False, + "message": f"Unexpected error: {e}", + "timestamp": datetime.now().isoformat() + }) + + # Generate report + duration = (datetime.now() - start_time).total_seconds() + total_tests = len(results) + passed_tests = sum(1 for r in results if r["success"]) + + report = { + "summary": { + "total_tests": total_tests, + "passed_tests": passed_tests, + "failed_tests": total_tests - passed_tests, + "success_rate": f"{(passed_tests/total_tests)*100:.2f}%", + "duration_seconds": duration + }, + "test_results": results, + "timestamp": datetime.now().isoformat() + } + + # Save report + report_path = os.path.join( + self.test_data_path, + f"test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + ) + async with aiofiles.open(report_path, 'w') as f: + await f.write(json.dumps(report, indent=2)) + + logger.info(f"Test suite completed. Report saved to {report_path}") + return report + +async def main(): + """Entry point for test execution.""" + logger.info("Starting SpreadSheetSwarm Test Suite") + + test_suite = SpreadSheetSwarmTestSuite() + report = await test_suite.run_all_tests() + + # Print summary + print("\n" + "="*50) + print("SpreadSheetSwarm Test Results") + print("="*50) + print(f"Total Tests: {report['summary']['total_tests']}") + print(f"Passed Tests: {report['summary']['passed_tests']}") + print(f"Failed Tests: {report['summary']['failed_tests']}") + print(f"Success Rate: {report['summary']['success_rate']}") + print(f"Duration: {report['summary']['duration_seconds']:.2f} seconds") + print("="*50) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file