From 48140c58c6606e98e82c4eabb16f58648f881c68 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Sun, 5 Jan 2025 16:03:48 -0500 Subject: [PATCH] [WRAP UP] --- csvagent_example.py | 48 ++++ run_all_tests.py | 108 ++++++++ swarm_cloud_api/api.py | 226 ---------------- swarm_cloud_api/requirements.txt | 10 - swarm_cloud_api/swarm_cloud_code.py | 369 -------------------------- swarm_cloud_api/tests.py | 160 ----------- swarms/structs/csv_to_agent.py | 171 +++++------- swarms/structs/graph_swarm.py | 13 +- tests/Dockerfile | 17 +- tests/telemetry/test_posthog_utils.py | 62 ----- tests/telemetry/test_user_utils.py | 14 + tests/utils/test_check_device.py | 66 ----- 12 files changed, 253 insertions(+), 1011 deletions(-) create mode 100644 csvagent_example.py create mode 100644 run_all_tests.py delete mode 100644 swarm_cloud_api/api.py delete mode 100644 swarm_cloud_api/requirements.txt delete mode 100644 swarm_cloud_api/swarm_cloud_code.py delete mode 100644 swarm_cloud_api/tests.py delete mode 100644 tests/telemetry/test_posthog_utils.py delete mode 100644 tests/utils/test_check_device.py diff --git a/csvagent_example.py b/csvagent_example.py new file mode 100644 index 00000000..e781335a --- /dev/null +++ b/csvagent_example.py @@ -0,0 +1,48 @@ +# Example usage +from pathlib import Path +from swarms.structs.csv_to_agent import AgentLoader, AgentValidationError + + +if __name__ == "__main__": + # Example agent configurations + agent_configs = [ + { + "agent_name": "Financial-Analysis-Agent", + "system_prompt": "You are a financial expert...", + "model_name": "gpt-4o-mini", # Updated to correct model name + "max_loops": 1, + "autosave": True, + "dashboard": False, + "verbose": True, + "dynamic_temperature": True, + "saved_state_path": "finance_agent.json", + "user_name": "swarms_corp", + "retry_attempts": 3, + "context_length": 200000, + "return_step_meta": False, + "output_type": "string", + "streaming": False, + } + ] + + try: + # Initialize CSV manager + csv_manager = AgentLoader(Path("agents.csv")) + + # Create CSV with initial agents + csv_manager.create_agent_csv(agent_configs) + + # Load agents from CSV + agents = csv_manager.load_agents() + + # Use an agent + if agents: + financial_agent = agents[0] + financial_agent.run( + "How can I establish a ROTH IRA to buy stocks and get a tax break?" + ) + + except AgentValidationError as e: + print(f"Validation error: {e}") + except Exception as e: + print(f"Unexpected error: {e}") diff --git a/run_all_tests.py b/run_all_tests.py new file mode 100644 index 00000000..b7ba90e2 --- /dev/null +++ b/run_all_tests.py @@ -0,0 +1,108 @@ +import os +import subprocess +import logging +import time +import psutil + +# Configure logging +logging.basicConfig( + filename="test_runner.log", + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) + + +def run_tests_in_subfolders( + base_folders: list, + file_extension=".py", + python_interpreter="python", +): + report_file = "test_report.txt" + + with open(report_file, "w") as report: + for base_folder in base_folders: + if not os.path.exists(base_folder): + logging.warning( + f"Base folder does not exist: {base_folder}" + ) + continue + + for root, dirs, files in os.walk(base_folder): + for file in files: + if file.endswith(file_extension): + file_path = os.path.join(root, file) + try: + logging.info(f"Running {file_path}...") + + # Start time measurement + start_time = time.time() + + # Get initial memory usage + process = psutil.Process(os.getpid()) + initial_memory = ( + process.memory_info().rss + ) # Resident Set Size + + result = subprocess.run( + [python_interpreter, file_path], + capture_output=True, + text=True, + ) + + # End time measurement + end_time = time.time() + + # Get final memory usage + final_memory = process.memory_info().rss + + # Calculate metrics + execution_time = end_time - start_time + memory_used = ( + final_memory - initial_memory + ) + + report.write(f"Running {file_path}:\n") + report.write(result.stdout) + report.write(result.stderr) + report.write( + f"\nExecution Time: {execution_time:.2f} seconds\n" + ) + report.write( + f"Memory Used: {memory_used / (1024 ** 2):.2f} MB\n" + ) # Convert to MB + report.write("\n" + "-" * 40 + "\n") + + logging.info( + f"Completed {file_path} with return code {result.returncode}" + ) + logging.info( + f"Execution Time: {execution_time:.2f} seconds, Memory Used: {memory_used / (1024 ** 2):.2f} MB" + ) + + except FileNotFoundError: + logging.error( + f"File not found: {file_path}" + ) + report.write( + f"File not found: {file_path}\n" + ) + except Exception as e: + logging.error( + f"Error running {file_path}: {e}" + ) + report.write( + f"Error running {file_path}: {e}\n" + ) + + +# Example usage +base_folders = [ + "folder1", + "folder2", +] # Replace with your actual folder names +file_extension = ".py" # Specify the file extension to run +python_interpreter = "python" # Specify the Python interpreter to use + +run_tests_in_subfolders( + base_folders, file_extension, python_interpreter +) diff --git a/swarm_cloud_api/api.py b/swarm_cloud_api/api.py deleted file mode 100644 index 94c06a39..00000000 --- a/swarm_cloud_api/api.py +++ /dev/null @@ -1,226 +0,0 @@ -""" -SkyServe API: Production-grade FastAPI server for SimpleSkyServe. - -This module provides a REST API interface for managing SkyPilot services with -proper error handling, validation, and production configurations. -""" - -import multiprocessing -import os -from typing import List, Optional - -from fastapi import FastAPI, HTTPException, status -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse -from loguru import logger -from pydantic import BaseModel, Field -from pydantic.v1 import validator -from swarm_cloud_code import ServiceConfig, SimpleSkyServe, UpdateMode - -# Calculate optimal number of workers -CPU_COUNT = multiprocessing.cpu_count() -WORKERS = CPU_COUNT * 2 - -# Configure logging -logger.add( - "logs/skyserve-api.log", - rotation="500 MB", - retention="10 days", - level="INFO", -) - -# Initialize FastAPI app -app = FastAPI( - title="SkyServe API", - description="REST API for managing SkyPilot services", - version="1.0.0", - docs_url="/docs", - redoc_url="/redoc", -) - -# Configure CORS -app.add_middleware( - CORSMiddleware, - allow_origins=os.getenv("ALLOWED_ORIGINS", "*").split(","), - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - - -# Pydantic models for request/response validation -class ServiceConfigRequest(BaseModel): - """Request model for service configuration.""" - - code: str = Field( - ..., description="Python code to run as a service" - ) - requirements: Optional[List[str]] = Field( - default=None, description="List of pip packages" - ) - envs: Optional[dict] = Field( - default=None, description="Environment variables" - ) - name: Optional[str] = Field( - default=None, description="Service name" - ) - num_cpus: int = Field( - default=2, ge=1, description="Number of CPUs" - ) - memory: int = Field(default=4, ge=1, description="Memory in GB") - use_spot: bool = Field( - default=False, description="Use spot instances" - ) - num_nodes: int = Field( - default=1, ge=1, description="Number of nodes" - ) - - @validator("name") - def validate_name(cls, v): - if v and not v.isalnum(): - raise ValueError("Service name must be alphanumeric") - return v - - -class DeploymentResponse(BaseModel): - """Response model for deployment information.""" - - service_name: str - endpoint: str - - -class ServiceStatusResponse(BaseModel): - """Response model for service status.""" - - name: str - status: str - versions: List[int] - replicas: int - resources: str - uptime: int - endpoint: Optional[str] - - -@app.post( - "/services/", - response_model=DeploymentResponse, - status_code=status.HTTP_201_CREATED, - tags=["services"], -) -async def create_service(config: ServiceConfigRequest): - """Deploy a new service.""" - try: - service_config = ServiceConfig( - code=config.code, - requirements=config.requirements, - envs=config.envs, - name=config.name, - num_cpus=config.num_cpus, - memory=config.memory, - use_spot=config.use_spot, - num_nodes=config.num_nodes, - ) - name, endpoint = SimpleSkyServe.deploy(service_config) - return {"service_name": name, "endpoint": endpoint} - except Exception as e: - logger.error(f"Failed to create service: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e), - ) - - -@app.get( - "/services/", - response_model=List[ServiceStatusResponse], - tags=["services"], -) -async def list_services(name: Optional[str] = None): - """Get status of all services or a specific service.""" - try: - deployments = SimpleSkyServe.get_deployments(name) - return deployments - except Exception as e: - logger.error(f"Failed to list services: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e), - ) - - -@app.put( - "/services/{service_name}", - status_code=status.HTTP_200_OK, - tags=["services"], -) -async def update_service( - service_name: str, - config: ServiceConfigRequest, - mode: UpdateMode = UpdateMode.GRADUAL, -): - """Update an existing service.""" - try: - service_config = ServiceConfig( - code=config.code, - requirements=config.requirements, - envs=config.envs, - name=config.name, - num_cpus=config.num_cpus, - memory=config.memory, - use_spot=config.use_spot, - num_nodes=config.num_nodes, - ) - SimpleSkyServe.update(service_name, service_config, mode) - return { - "message": f"Service {service_name} updated successfully" - } - except Exception as e: - logger.error(f"Failed to update service: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e), - ) - - -@app.delete( - "/services/{service_name}", - status_code=status.HTTP_204_NO_CONTENT, - tags=["services"], -) -async def delete_service(service_name: str, purge: bool = False): - """Delete a service.""" - try: - SimpleSkyServe.delete(service_name, purge) - except Exception as e: - logger.error(f"Failed to delete service: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e), - ) - - -@app.exception_handler(Exception) -async def general_exception_handler(request, exc): - """Global exception handler.""" - logger.error(f"Unhandled exception: {str(exc)}") - return JSONResponse( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - content={"detail": "Internal server error"}, - ) - - -# Entry point for uvicorn -if __name__ == "__main__": - import uvicorn - - uvicorn.run( - "api:app", - host="0.0.0.0", - port=8000, - workers=WORKERS, - log_level="info", - reload=False, # Disable in production - proxy_headers=True, - forwarded_allow_ips="*", - access_log=True, - ) diff --git a/swarm_cloud_api/requirements.txt b/swarm_cloud_api/requirements.txt deleted file mode 100644 index dd34a4e7..00000000 --- a/swarm_cloud_api/requirements.txt +++ /dev/null @@ -1,10 +0,0 @@ -fastapi -uvicorn[standard] -pydantic -loguru -python-multipart -python-jose[cryptography] -passlib[bcrypt] -gunicorn -prometheus-fastapi-instrumentator -httpx \ No newline at end of file diff --git a/swarm_cloud_api/swarm_cloud_code.py b/swarm_cloud_api/swarm_cloud_code.py deleted file mode 100644 index 1216644a..00000000 --- a/swarm_cloud_api/swarm_cloud_code.py +++ /dev/null @@ -1,369 +0,0 @@ -""" -SimpleSkyServe: A simplified interface for SkyPilot's serve functionality. - -This module provides an easy-to-use interface for deploying, managing, updating and monitoring -services using SkyPilot's serve functionality. It supports the full lifecycle of services -including deployment, updates, status monitoring, and cleanup. - -Key Features: -- Simple deployment with code and requirements -- Service updates with different update modes -- Status monitoring and deployment fetching -- Service cleanup and deletion -""" - -from enum import Enum -from dataclasses import dataclass -from typing import Dict, List, Optional, Tuple, Union -import tempfile - -from loguru import logger - - -class UpdateMode(Enum): - """Update modes for service updates. - - IMMEDIATE: Update all replicas immediately - GRADUAL: Update replicas gradually with zero downtime - """ - - IMMEDIATE = "immediate" - GRADUAL = "gradual" - - -@dataclass -class ServiceConfig: - """Configuration for a SkyPilot service. - - Attributes: - code: Python code to run as a service - requirements: List of pip packages required by the service - envs: Environment variables to set for the service - name: Optional name for the service (auto-generated if not provided) - num_cpus: Number of CPUs to request (default: 2) - memory: Memory in GB to request (default: 4) - use_spot: Whether to use spot instances (default: False) - """ - - code: str - requirements: Optional[List[str]] = None - envs: Optional[Dict[str, str]] = None - name: Optional[str] = None - num_cpus: int = 2 - memory: int = 4 - use_spot: bool = False - num_nodes: int = 1 - - -class SimpleSkyServe: - """Simple interface for SkyPilot serve functionality.""" - - @staticmethod - def deploy(config: ServiceConfig) -> Tuple[str, str]: - """Deploy a new service using the provided configuration. - - Args: - config: ServiceConfig object containing service configuration - - Returns: - Tuple of (service_name: str, endpoint: str) - - Raises: - ValueError: If the configuration is invalid - RuntimeError: If deployment fails - """ - logger.info("Deploying new service...") - - # Create temporary files for setup and service code - with tempfile.NamedTemporaryFile( - mode="w", suffix=".txt" - ) as req_file, tempfile.NamedTemporaryFile( - mode="w", suffix=".py" - ) as code_file: - - # Write requirements if provided - setup_cmd = "" - if config.requirements: - req_file.write("\n".join(config.requirements)) - req_file.flush() - setup_cmd = f"pip install -r {req_file.name}" - - # Write service code - code_file.write(config.code) - code_file.flush() - - # Create SkyPilot task - task = sky.Task( - name=config.name, - setup=setup_cmd, - run=f"python {code_file.name}", - envs=config.envs, - num_nodes=config.num_nodes, - ) - - # Set resource requirements - resources = sky.Resources( - cpus=config.num_cpus, - memory=config.memory, - use_spot=config.use_spot, - ) - task.set_resources(resources) - - try: - # Deploy the service - service_name, endpoint = sky.serve.up( - task, service_name=config.name - ) - logger.success( - f"Service deployed successfully at {endpoint}" - ) - return service_name, endpoint - except Exception as e: - logger.error(f"Failed to deploy service: {str(e)}") - raise RuntimeError( - f"Service deployment failed: {str(e)}" - ) from e - - @staticmethod - def status(service_name: Optional[str] = None) -> List[Dict]: - """Get status of services. - - Args: - service_name: Optional name of specific service to get status for - If None, returns status of all services - - Returns: - List of service status dictionaries containing: - - name: Service name - - status: Current status - - endpoint: Service endpoint - - uptime: Service uptime in seconds - ...and other service metadata - """ - logger.info( - f"Getting status for service: {service_name or 'all'}" - ) - try: - status_list = sky.serve.status(service_name) - logger.debug( - f"Retrieved status for {len(status_list)} services" - ) - return status_list - except Exception as e: - logger.error(f"Failed to get service status: {str(e)}") - raise RuntimeError( - f"Failed to get service status: {str(e)}" - ) from e - - @staticmethod - def update( - service_name: str, - config: ServiceConfig, - mode: UpdateMode = UpdateMode.GRADUAL, - ) -> None: - """Update an existing service with new configuration. - - Args: - service_name: Name of service to update - config: New service configuration - mode: Update mode (IMMEDIATE or GRADUAL) - - Raises: - ValueError: If service doesn't exist or config is invalid - RuntimeError: If update fails - """ - logger.info( - f"Updating service {service_name} with mode {mode.value}" - ) - - # Create temporary files for setup and service code - with tempfile.NamedTemporaryFile( - mode="w", suffix=".txt" - ) as req_file, tempfile.NamedTemporaryFile( - mode="w", suffix=".py" - ) as code_file: - - # Write requirements if provided - setup_cmd = "" - if config.requirements: - req_file.write("\n".join(config.requirements)) - req_file.flush() - setup_cmd = f"pip install -r {req_file.name}" - - # Write service code - code_file.write(config.code) - code_file.flush() - - # Create SkyPilot task for update - task = sky.Task( - name=config.name or service_name, - setup=setup_cmd, - run=f"python {code_file.name}", - envs=config.envs, - ) - - # Set resource requirements - resources = sky.Resources( - cpus=config.num_cpus, - memory=config.memory, - use_spot=config.use_spot, - ) - task.set_resources(resources) - - try: - # Update the service - sky.serve.update( - task=task, - service_name=service_name, - mode=sky.serve.UpdateMode(mode.value), - ) - logger.success( - f"Service {service_name} updated successfully" - ) - except Exception as e: - logger.error(f"Failed to update service: {str(e)}") - raise RuntimeError( - f"Service update failed: {str(e)}" - ) from e - - @staticmethod - def get_deployments( - service_name: Optional[str] = None, - ) -> List[Dict]: - """Get detailed information about service deployments. - - Args: - service_name: Optional name of specific service to get deployments for - If None, returns deployments for all services - - Returns: - List of deployment dictionaries containing: - - name: Service name - - versions: List of deployed versions - - active_version: Currently active version - - replicas: Number of replicas - - resources: Resource usage - - status: Deployment status - """ - logger.info( - f"Fetching deployments for: {service_name or 'all services'}" - ) - try: - status_list = sky.serve.status(service_name) - deployments = [] - - for status in status_list: - deployment = { - "name": status["name"], - "versions": status["active_versions"], - "status": status["status"], - "replicas": len(status.get("replica_info", [])), - "resources": status.get( - "requested_resources_str", "" - ), - "uptime": status.get("uptime", 0), - "endpoint": None, - } - - # Extract endpoint if available - if status.get("load_balancer_port"): - deployment["endpoint"] = ( - f"http://{status.get('controller_addr')}:{status['load_balancer_port']}" - ) - - deployments.append(deployment) - - logger.debug(f"Retrieved {len(deployments)} deployments") - return deployments - - except Exception as e: - logger.error(f"Failed to fetch deployments: {str(e)}") - raise RuntimeError( - f"Failed to fetch deployments: {str(e)}" - ) from e - - @staticmethod - def delete( - service_name: Union[str, List[str]], purge: bool = False - ) -> None: - """Delete one or more services. - - Args: - service_name: Name of service(s) to delete - purge: Whether to purge services in failed status - - Raises: - RuntimeError: If deletion fails - """ - names = ( - [service_name] - if isinstance(service_name, str) - else service_name - ) - logger.info(f"Deleting services: {names}") - try: - sky.serve.down(service_names=names, purge=purge) - logger.success(f"Successfully deleted services: {names}") - except Exception as e: - logger.error(f"Failed to delete services: {str(e)}") - raise RuntimeError( - f"Service deletion failed: {str(e)}" - ) from e - - -# # Example usage: -# if __name__ == "__main__": -# from time import sleep -# # Configuration for a simple FastAPI service -# config = ServiceConfig( -# code=""" -# from fastapi import FastAPI -# app = FastAPI() - -# @app.get("/") -# def read_root(): -# return {"Hello": "World"} -# """, -# requirements=["fastapi", "uvicorn"], -# envs={"PORT": "8000"}, -# name="fastapi-demo" -# ) - -# # Deploy the service -# name, endpoint = SimpleSkyServe.deploy(config) -# print(f"Service deployed at: {endpoint}") - -# # Get service status -# status = SimpleSkyServe.status(name) -# print(f"Service status: {status}") - -# # Get deployment information -# deployments = SimpleSkyServe.get_deployments(name) -# print(f"Deployment info: {deployments}") - -# # Update the service with new code -# new_config = ServiceConfig( -# code=""" -# from fastapi import FastAPI -# app = FastAPI() - -# @app.get("/") -# def read_root(): -# return {"Hello": "Updated World"} -# """, -# requirements=["fastapi", "uvicorn"], -# envs={"PORT": "8000"} -# ) - -# SimpleSkyServe.update(name, new_config, mode=UpdateMode.GRADUAL) -# print("Service updated") - -# # Wait for update to complete -# sleep(30) - -# # Check status after update -# status = SimpleSkyServe.status(name) -# print(f"Updated service status: {status}") - -# # Delete the service -# SimpleSkyServe.delete(name) diff --git a/swarm_cloud_api/tests.py b/swarm_cloud_api/tests.py deleted file mode 100644 index 569d9ed9..00000000 --- a/swarm_cloud_api/tests.py +++ /dev/null @@ -1,160 +0,0 @@ -""" -Simple test script for SkyServe API using requests. -No test framework dependencies - just pure requests and assertions. -""" - -import time -import requests -from typing import Any - -# API Configuration -BASE_URL = "http://localhost:8000" -HEADERS = {"Content-Type": "application/json"} - - -def assert_equals(actual: Any, expected: Any, message: str = ""): - """Simple assertion helper.""" - if actual != expected: - raise AssertionError( - f"{message}\nExpected: {expected}\nGot: {actual}" - ) - - -def test_create_service() -> str: - """Test service creation and return the service name.""" - print("\n🧪 Testing service creation...") - - payload = { - "code": """ -from fastapi import FastAPI -app = FastAPI() - -@app.get("/") -def read_root(): - return {"Hello": "World"} - """, - "requirements": ["fastapi", "uvicorn"], - "name": "test_service", - "num_cpus": 2, - "memory": 4, - } - - response = requests.post( - f"{BASE_URL}/services/", json=payload, headers=HEADERS - ) - - assert_equals( - response.status_code, 201, "Service creation failed" - ) - data = response.json() - assert "service_name" in data, "Response missing service_name" - assert "endpoint" in data, "Response missing endpoint" - - print("✅ Service created successfully!") - return data["service_name"] - - -def test_list_services(expected_service_name: str): - """Test listing services.""" - print("\n🧪 Testing service listing...") - - response = requests.get(f"{BASE_URL}/services/") - assert_equals(response.status_code, 200, "Service listing failed") - - services = response.json() - assert isinstance(services, list), "Expected list of services" - - # Find our service in the list - service_found = False - for service in services: - if service["name"] == expected_service_name: - service_found = True - break - - assert ( - service_found - ), f"Created service {expected_service_name} not found in list" - print("✅ Services listed successfully!") - - -def test_update_service(service_name: str): - """Test service update.""" - print("\n🧪 Testing service update...") - - update_payload = { - "code": """ -from fastapi import FastAPI -app = FastAPI() - -@app.get("/") -def read_root(): - return {"Hello": "Updated World"} - """, - "requirements": ["fastapi", "uvicorn"], - "name": service_name, - "num_cpus": 2, - "memory": 4, - } - - response = requests.put( - f"{BASE_URL}/services/{service_name}", - json=update_payload, - headers=HEADERS, - params={"mode": "gradual"}, - ) - - assert_equals(response.status_code, 200, "Service update failed") - print("✅ Service updated successfully!") - - -def test_delete_service(service_name: str): - """Test service deletion.""" - print("\n🧪 Testing service deletion...") - - response = requests.delete(f"{BASE_URL}/services/{service_name}") - assert_equals( - response.status_code, 204, "Service deletion failed" - ) - - # Verify service is gone - list_response = requests.get(f"{BASE_URL}/services/") - services = list_response.json() - for service in services: - if service["name"] == service_name: - raise AssertionError( - f"Service {service_name} still exists after deletion" - ) - - print("✅ Service deleted successfully!") - - -def run_tests(): - """Run all tests in sequence.""" - try: - print("🚀 Starting API tests...") - - # Run tests in sequence - service_name = test_create_service() - - # Wait a bit for service to be fully ready - print("⏳ Waiting for service to be ready...") - time.sleep(5) - - test_list_services(service_name) - test_update_service(service_name) - test_delete_service(service_name) - - print("\n✨ All tests passed successfully! ✨") - - except AssertionError as e: - print(f"\n❌ Test failed: {str(e)}") - raise - except Exception as e: - print(f"\n❌ Unexpected error: {str(e)}") - raise - finally: - print("\n🏁 Tests completed") - - -if __name__ == "__main__": - run_tests() diff --git a/swarms/structs/csv_to_agent.py b/swarms/structs/csv_to_agent.py index 4488a2d6..aa6fdf73 100644 --- a/swarms/structs/csv_to_agent.py +++ b/swarms/structs/csv_to_agent.py @@ -7,14 +7,9 @@ from typing import ( from dataclasses import dataclass import csv from pathlib import Path -import logging from enum import Enum from swarms import Agent -# Set up logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - class ModelName(str, Enum): """Valid model names for swarms agents""" @@ -141,9 +136,7 @@ class AgentValidator: str(e), str(e.__class__.__name__), str(config) ) - -@dataclass -class AgentCSV: +class AgentLoader: """Class to manage agents through CSV with type safety""" csv_path: Path @@ -184,7 +177,7 @@ class AgentCSV: ) validated_agents.append(validated_config) except AgentValidationError as e: - logger.error( + print( f"Validation error for agent {agent.get('agent_name', 'unknown')}: {e}" ) raise @@ -194,17 +187,25 @@ class AgentCSV: writer.writeheader() writer.writerows(validated_agents) - logger.info( + print( f"Created CSV with {len(validated_agents)} agents at {self.csv_path}" ) - def load_agents(self) -> List[Agent]: - """Load and create agents from CSV with validation""" - if not self.csv_path.exists(): - raise FileNotFoundError( - f"CSV file not found at {self.csv_path}" - ) - + def load_agents(self, file_type: str = "csv") -> List[Agent]: + """Load and create agents from CSV or JSON with validation""" + if file_type == "csv": + if not self.csv_path.exists(): + raise FileNotFoundError( + f"CSV file not found at {self.csv_path}" + ) + return self._load_agents_from_csv() + elif file_type == "json": + return self._load_agents_from_json() + else: + raise ValueError("Unsupported file type. Use 'csv' or 'json'.") + + def _load_agents_from_csv(self) -> List[Agent]: + """Load agents from a CSV file""" agents: List[Agent] = [] with open(self.csv_path, "r") as f: reader = csv.DictReader(f) @@ -213,105 +214,61 @@ class AgentCSV: validated_config = AgentValidator.validate_config( row ) - - agent = Agent( - agent_name=validated_config["agent_name"], - system_prompt=validated_config[ - "system_prompt" - ], - model_name=validated_config["model_name"], - max_loops=validated_config["max_loops"], - autosave=validated_config["autosave"], - dashboard=validated_config["dashboard"], - verbose=validated_config["verbose"], - dynamic_temperature_enabled=validated_config[ - "dynamic_temperature" - ], - saved_state_path=validated_config[ - "saved_state_path" - ], - user_name=validated_config["user_name"], - retry_attempts=validated_config[ - "retry_attempts" - ], - context_length=validated_config[ - "context_length" - ], - return_step_meta=validated_config[ - "return_step_meta" - ], - output_type=validated_config["output_type"], - streaming_on=validated_config["streaming"], - ) + agent = self._create_agent(validated_config) agents.append(agent) except AgentValidationError as e: - logger.error( + print( f"Skipping invalid agent configuration: {e}" ) continue - logger.info( - f"Loaded {len(agents)} agents from {self.csv_path}" - ) + print(f"Loaded {len(agents)} agents from {self.csv_path}") return agents - def add_agent(self, agent_config: Dict[str, Any]) -> None: - """Add a new validated agent configuration to CSV""" - validated_config = AgentValidator.validate_config( - agent_config - ) + def _load_agents_from_json(self) -> List[Agent]: + """Load agents from a JSON file""" + import json - with open(self.csv_path, "a", newline="") as f: - writer = csv.DictWriter(f, fieldnames=self.headers) - writer.writerow(validated_config) - - logger.info( - f"Added new agent {validated_config['agent_name']} to {self.csv_path}" - ) + if not self.csv_path.with_suffix('.json').exists(): + raise FileNotFoundError( + f"JSON file not found at {self.csv_path.with_suffix('.json')}" + ) + agents: List[Agent] = [] + with open(self.csv_path.with_suffix('.json'), "r") as f: + agents_data = json.load(f) + for agent in agents_data: + try: + validated_config = AgentValidator.validate_config( + agent + ) + agent = self._create_agent(validated_config) + agents.append(agent) + except AgentValidationError as e: + print( + f"Skipping invalid agent configuration: {e}" + ) + continue -# Example usage -if __name__ == "__main__": - # Example agent configurations - agent_configs = [ - { - "agent_name": "Financial-Analysis-Agent", - "system_prompt": "You are a financial expert...", - "model_name": "gpt-4o-mini", # Updated to correct model name - "max_loops": 1, - "autosave": True, - "dashboard": False, - "verbose": True, - "dynamic_temperature": True, - "saved_state_path": "finance_agent.json", - "user_name": "swarms_corp", - "retry_attempts": 3, - "context_length": 200000, - "return_step_meta": False, - "output_type": "string", - "streaming": False, - } - ] - - try: - # Initialize CSV manager - csv_manager = AgentCSV(Path("agents.csv")) - - # Create CSV with initial agents - csv_manager.create_agent_csv(agent_configs) - - # Load agents from CSV - agents = csv_manager.load_agents() - - # Use an agent - if agents: - financial_agent = agents[0] - response = financial_agent.run( - "How can I establish a ROTH IRA to buy stocks and get a tax break?" - ) - print(response) + print(f"Loaded {len(agents)} agents from {self.csv_path.with_suffix('.json')}") + return agents - except AgentValidationError as e: - logger.error(f"Validation error: {e}") - except Exception as e: - logger.error(f"Unexpected error: {e}") + def _create_agent(self, validated_config: AgentConfigDict) -> Agent: + """Create an Agent instance from validated configuration""" + return Agent( + agent_name=validated_config["agent_name"], + system_prompt=validated_config["system_prompt"], + model_name=validated_config["model_name"], + max_loops=validated_config["max_loops"], + autosave=validated_config["autosave"], + dashboard=validated_config["dashboard"], + verbose=validated_config["verbose"], + dynamic_temperature_enabled=validated_config["dynamic_temperature"], + saved_state_path=validated_config["saved_state_path"], + user_name=validated_config["user_name"], + retry_attempts=validated_config["retry_attempts"], + context_length=validated_config["context_length"], + return_step_meta=validated_config["return_step_meta"], + output_type=validated_config["output_type"], + streaming_on=validated_config["streaming"], + ) \ No newline at end of file diff --git a/swarms/structs/graph_swarm.py b/swarms/structs/graph_swarm.py index 91054316..70f2323e 100644 --- a/swarms/structs/graph_swarm.py +++ b/swarms/structs/graph_swarm.py @@ -3,15 +3,16 @@ import json import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import networkx as nx from loguru import logger from pydantic import BaseModel, Field + +from swarms.structs.agent import Agent from swarms.utils.auto_download_check_packages import ( auto_check_and_download_package, ) -from swarms.structs.agent import Agent # Configure logging logger.add( @@ -188,14 +189,20 @@ class GraphSwarm: def __init__( self, + name: str = "graph-swarm-01", + description: str = "Graph swarm : build your own graph of agents", agents: Union[ - List[Agent], List[Tuple[Agent, List[str]]], None + List[Agent], List[Tuple[Agent, List[str]]], List[Callable] ] = None, max_workers: Optional[int] = None, swarm_name: str = "Collaborative Agent Swarm", memory_collection: str = "swarm_memory", + *args, + **kwargs, ): """Initialize GraphSwarm.""" + self.name = name + self.description = description self.graph = nx.DiGraph() self.agents: Dict[str, Agent] = {} self.dependencies: Dict[str, List[str]] = {} diff --git a/tests/Dockerfile b/tests/Dockerfile index f6e46515..89e578d7 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -1,7 +1,7 @@ # TESTING # -================== # Use an official Python runtime as a parent image -FROM python:3.9-slim +FROM python:3.11-slim # Set environment variables to make Python output unbuffered and disable the PIP cache ENV PYTHONDONTWRITEBYTECODE 1 @@ -10,6 +10,10 @@ ENV PIP_NO_CACHE_DIR off ENV PIP_DISABLE_PIP_VERSION_CHECK on ENV PIP_DEFAULT_TIMEOUT 100 +# Set environment variables for OpenAI API key and workspace directory +ENV OPENAI_API_KEY your_api_key_here +ENV WORKSPACE_DIR /path/to/your/workspace + # Set the working directory in the container WORKDIR /usr/src/app @@ -22,11 +26,8 @@ RUN pip install poetry # Disable virtualenv creation by poetry and install dependencies RUN poetry config virtualenvs.create false -# Install the 'swarms' package if it's not included in the poetry.lock -RUN pip install swarms - -# Assuming tests require pytest to run -RUN pip install pytest +# Install the 'swarms' package and any additional packages +RUN pip install swarms swarm-models swarms-memory pytest -# Run pytest on all tests in the tests directory -CMD pytest +# Run all tests in the tests directory +CMD python3 -m unittest discover -s tests \ No newline at end of file diff --git a/tests/telemetry/test_posthog_utils.py b/tests/telemetry/test_posthog_utils.py deleted file mode 100644 index 0364cb3a..00000000 --- a/tests/telemetry/test_posthog_utils.py +++ /dev/null @@ -1,62 +0,0 @@ -from unittest.mock import Mock - -import pytest - -from swarms.telemetry.posthog_utils import ( - log_activity_posthog, - posthog, -) - - -# Mock Posthog client -@pytest.fixture -def mock_posthog(): - return Mock() - - -# Mock environment variables -@pytest.fixture -def mock_env(monkeypatch): - monkeypatch.setenv("POSTHOG_API_KEY", "test_api_key") - monkeypatch.setenv("POSTHOG_HOST", "test_host") - - -# Test the log_activity_posthog decorator -def test_log_activity_posthog(mock_posthog, mock_env): - event_name = "test_event" - event_properties = {"test_property": "test_value"} - - # Create a test function with the decorator - @log_activity_posthog(event_name, **event_properties) - def test_function(): - pass - - # Call the test function - test_function() - - # Check if the Posthog capture method was called with the expected arguments - mock_posthog.capture.assert_called_once_with( - "test_user_id", event_name, event_properties - ) - - -# Test a scenario where environment variables are not set -def test_missing_env_variables(monkeypatch): - # Unset environment variables - monkeypatch.delenv("POSTHOG_API_KEY", raising=False) - monkeypatch.delenv("POSTHOG_HOST", raising=False) - - # Create a test function with the decorator - @log_activity_posthog("test_event", test_property="test_value") - def test_function(): - pass - - # Ensure that calling the test function does not raise errors - test_function() - - -# Test the Posthog client initialization -def test_posthog_client_initialization(mock_env): - assert posthog.api_key == "test_api_key" - assert posthog.host == "test_host" - assert posthog.debug is True diff --git a/tests/telemetry/test_user_utils.py b/tests/telemetry/test_user_utils.py index c7b5962c..96f32378 100644 --- a/tests/telemetry/test_user_utils.py +++ b/tests/telemetry/test_user_utils.py @@ -85,3 +85,17 @@ def test_generate_unique_identifier_edge_case(): unique_id = generate_unique_identifier() unique_ids.add(unique_id) assert len(unique_ids) == 100 # Ensure generated IDs are unique + + +def test_all(): + test_generate_user_id() + test_get_machine_id() + test_get_system_info() + test_generate_unique_identifier() + test_generate_user_id_edge_case() + test_get_machine_id_edge_case() + test_get_system_info_edge_case() + test_generate_unique_identifier_edge_case() + + +test_all() diff --git a/tests/utils/test_check_device.py b/tests/utils/test_check_device.py deleted file mode 100644 index 503a3774..00000000 --- a/tests/utils/test_check_device.py +++ /dev/null @@ -1,66 +0,0 @@ -import logging - -import torch - -from swarms.utils import check_device - -# For the purpose of the test, we're assuming that the `memory_allocated` -# and `memory_reserved` function behave the same as `torch.cuda.memory_allocated` -# and `torch.cuda.memory_reserved` - - -def test_check_device_no_cuda(monkeypatch): - # Mock torch.cuda.is_available to always return False - monkeypatch.setattr(torch.cuda, "is_available", lambda: False) - - result = check_device(log_level=logging.DEBUG) - assert result.type == "cpu" - - -def test_check_device_cuda_exception(monkeypatch): - # Mock torch.cuda.is_available to raise an exception - monkeypatch.setattr( - torch.cuda, "is_available", lambda: 1 / 0 - ) # Raises ZeroDivisionError - - result = check_device(log_level=logging.DEBUG) - assert result.type == "cpu" - - -def test_check_device_one_cuda(monkeypatch): - # Mock torch.cuda.is_available to return True - monkeypatch.setattr(torch.cuda, "is_available", lambda: True) - # Mock torch.cuda.device_count to return 1 - monkeypatch.setattr(torch.cuda, "device_count", lambda: 1) - # Mock torch.cuda.memory_allocated and torch.cuda.memory_reserved to return 0 - monkeypatch.setattr( - torch.cuda, "memory_allocated", lambda device: 0 - ) - monkeypatch.setattr( - torch.cuda, "memory_reserved", lambda device: 0 - ) - - result = check_device(log_level=logging.DEBUG) - assert len(result) == 1 - assert result[0].type == "cuda" - assert result[0].index == 0 - - -def test_check_device_multiple_cuda(monkeypatch): - # Mock torch.cuda.is_available to return True - monkeypatch.setattr(torch.cuda, "is_available", lambda: True) - # Mock torch.cuda.device_count to return 4 - monkeypatch.setattr(torch.cuda, "device_count", lambda: 4) - # Mock torch.cuda.memory_allocated and torch.cuda.memory_reserved to return 0 - monkeypatch.setattr( - torch.cuda, "memory_allocated", lambda device: 0 - ) - monkeypatch.setattr( - torch.cuda, "memory_reserved", lambda device: 0 - ) - - result = check_device(log_level=logging.DEBUG) - assert len(result) == 4 - for i in range(4): - assert result[i].type == "cuda" - assert result[i].index == i