pull/698/head
Kye Gomez 2 days ago
parent f1f113352d
commit 48140c58c6

@ -0,0 +1,48 @@
# Example usage
from pathlib import Path
from swarms.structs.csv_to_agent import AgentLoader, AgentValidationError
if __name__ == "__main__":
# Example agent configurations
agent_configs = [
{
"agent_name": "Financial-Analysis-Agent",
"system_prompt": "You are a financial expert...",
"model_name": "gpt-4o-mini", # Updated to correct model name
"max_loops": 1,
"autosave": True,
"dashboard": False,
"verbose": True,
"dynamic_temperature": True,
"saved_state_path": "finance_agent.json",
"user_name": "swarms_corp",
"retry_attempts": 3,
"context_length": 200000,
"return_step_meta": False,
"output_type": "string",
"streaming": False,
}
]
try:
# Initialize CSV manager
csv_manager = AgentLoader(Path("agents.csv"))
# Create CSV with initial agents
csv_manager.create_agent_csv(agent_configs)
# Load agents from CSV
agents = csv_manager.load_agents()
# Use an agent
if agents:
financial_agent = agents[0]
financial_agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break?"
)
except AgentValidationError as e:
print(f"Validation error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")

@ -0,0 +1,108 @@
import os
import subprocess
import logging
import time
import psutil
# Configure logging
logging.basicConfig(
filename="test_runner.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
def run_tests_in_subfolders(
base_folders: list,
file_extension=".py",
python_interpreter="python",
):
report_file = "test_report.txt"
with open(report_file, "w") as report:
for base_folder in base_folders:
if not os.path.exists(base_folder):
logging.warning(
f"Base folder does not exist: {base_folder}"
)
continue
for root, dirs, files in os.walk(base_folder):
for file in files:
if file.endswith(file_extension):
file_path = os.path.join(root, file)
try:
logging.info(f"Running {file_path}...")
# Start time measurement
start_time = time.time()
# Get initial memory usage
process = psutil.Process(os.getpid())
initial_memory = (
process.memory_info().rss
) # Resident Set Size
result = subprocess.run(
[python_interpreter, file_path],
capture_output=True,
text=True,
)
# End time measurement
end_time = time.time()
# Get final memory usage
final_memory = process.memory_info().rss
# Calculate metrics
execution_time = end_time - start_time
memory_used = (
final_memory - initial_memory
)
report.write(f"Running {file_path}:\n")
report.write(result.stdout)
report.write(result.stderr)
report.write(
f"\nExecution Time: {execution_time:.2f} seconds\n"
)
report.write(
f"Memory Used: {memory_used / (1024 ** 2):.2f} MB\n"
) # Convert to MB
report.write("\n" + "-" * 40 + "\n")
logging.info(
f"Completed {file_path} with return code {result.returncode}"
)
logging.info(
f"Execution Time: {execution_time:.2f} seconds, Memory Used: {memory_used / (1024 ** 2):.2f} MB"
)
except FileNotFoundError:
logging.error(
f"File not found: {file_path}"
)
report.write(
f"File not found: {file_path}\n"
)
except Exception as e:
logging.error(
f"Error running {file_path}: {e}"
)
report.write(
f"Error running {file_path}: {e}\n"
)
# Example usage
base_folders = [
"folder1",
"folder2",
] # Replace with your actual folder names
file_extension = ".py" # Specify the file extension to run
python_interpreter = "python" # Specify the Python interpreter to use
run_tests_in_subfolders(
base_folders, file_extension, python_interpreter
)

@ -1,226 +0,0 @@
"""
SkyServe API: Production-grade FastAPI server for SimpleSkyServe.
This module provides a REST API interface for managing SkyPilot services with
proper error handling, validation, and production configurations.
"""
import multiprocessing
import os
from typing import List, Optional
from fastapi import FastAPI, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from loguru import logger
from pydantic import BaseModel, Field
from pydantic.v1 import validator
from swarm_cloud_code import ServiceConfig, SimpleSkyServe, UpdateMode
# Calculate optimal number of workers
CPU_COUNT = multiprocessing.cpu_count()
WORKERS = CPU_COUNT * 2
# Configure logging
logger.add(
"logs/skyserve-api.log",
rotation="500 MB",
retention="10 days",
level="INFO",
)
# Initialize FastAPI app
app = FastAPI(
title="SkyServe API",
description="REST API for managing SkyPilot services",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("ALLOWED_ORIGINS", "*").split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models for request/response validation
class ServiceConfigRequest(BaseModel):
"""Request model for service configuration."""
code: str = Field(
..., description="Python code to run as a service"
)
requirements: Optional[List[str]] = Field(
default=None, description="List of pip packages"
)
envs: Optional[dict] = Field(
default=None, description="Environment variables"
)
name: Optional[str] = Field(
default=None, description="Service name"
)
num_cpus: int = Field(
default=2, ge=1, description="Number of CPUs"
)
memory: int = Field(default=4, ge=1, description="Memory in GB")
use_spot: bool = Field(
default=False, description="Use spot instances"
)
num_nodes: int = Field(
default=1, ge=1, description="Number of nodes"
)
@validator("name")
def validate_name(cls, v):
if v and not v.isalnum():
raise ValueError("Service name must be alphanumeric")
return v
class DeploymentResponse(BaseModel):
"""Response model for deployment information."""
service_name: str
endpoint: str
class ServiceStatusResponse(BaseModel):
"""Response model for service status."""
name: str
status: str
versions: List[int]
replicas: int
resources: str
uptime: int
endpoint: Optional[str]
@app.post(
"/services/",
response_model=DeploymentResponse,
status_code=status.HTTP_201_CREATED,
tags=["services"],
)
async def create_service(config: ServiceConfigRequest):
"""Deploy a new service."""
try:
service_config = ServiceConfig(
code=config.code,
requirements=config.requirements,
envs=config.envs,
name=config.name,
num_cpus=config.num_cpus,
memory=config.memory,
use_spot=config.use_spot,
num_nodes=config.num_nodes,
)
name, endpoint = SimpleSkyServe.deploy(service_config)
return {"service_name": name, "endpoint": endpoint}
except Exception as e:
logger.error(f"Failed to create service: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@app.get(
"/services/",
response_model=List[ServiceStatusResponse],
tags=["services"],
)
async def list_services(name: Optional[str] = None):
"""Get status of all services or a specific service."""
try:
deployments = SimpleSkyServe.get_deployments(name)
return deployments
except Exception as e:
logger.error(f"Failed to list services: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@app.put(
"/services/{service_name}",
status_code=status.HTTP_200_OK,
tags=["services"],
)
async def update_service(
service_name: str,
config: ServiceConfigRequest,
mode: UpdateMode = UpdateMode.GRADUAL,
):
"""Update an existing service."""
try:
service_config = ServiceConfig(
code=config.code,
requirements=config.requirements,
envs=config.envs,
name=config.name,
num_cpus=config.num_cpus,
memory=config.memory,
use_spot=config.use_spot,
num_nodes=config.num_nodes,
)
SimpleSkyServe.update(service_name, service_config, mode)
return {
"message": f"Service {service_name} updated successfully"
}
except Exception as e:
logger.error(f"Failed to update service: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@app.delete(
"/services/{service_name}",
status_code=status.HTTP_204_NO_CONTENT,
tags=["services"],
)
async def delete_service(service_name: str, purge: bool = False):
"""Delete a service."""
try:
SimpleSkyServe.delete(service_name, purge)
except Exception as e:
logger.error(f"Failed to delete service: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""Global exception handler."""
logger.error(f"Unhandled exception: {str(exc)}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": "Internal server error"},
)
# Entry point for uvicorn
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"api:app",
host="0.0.0.0",
port=8000,
workers=WORKERS,
log_level="info",
reload=False, # Disable in production
proxy_headers=True,
forwarded_allow_ips="*",
access_log=True,
)

@ -1,10 +0,0 @@
fastapi
uvicorn[standard]
pydantic
loguru
python-multipart
python-jose[cryptography]
passlib[bcrypt]
gunicorn
prometheus-fastapi-instrumentator
httpx

@ -1,369 +0,0 @@
"""
SimpleSkyServe: A simplified interface for SkyPilot's serve functionality.
This module provides an easy-to-use interface for deploying, managing, updating and monitoring
services using SkyPilot's serve functionality. It supports the full lifecycle of services
including deployment, updates, status monitoring, and cleanup.
Key Features:
- Simple deployment with code and requirements
- Service updates with different update modes
- Status monitoring and deployment fetching
- Service cleanup and deletion
"""
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Union
import tempfile
from loguru import logger
class UpdateMode(Enum):
"""Update modes for service updates.
IMMEDIATE: Update all replicas immediately
GRADUAL: Update replicas gradually with zero downtime
"""
IMMEDIATE = "immediate"
GRADUAL = "gradual"
@dataclass
class ServiceConfig:
"""Configuration for a SkyPilot service.
Attributes:
code: Python code to run as a service
requirements: List of pip packages required by the service
envs: Environment variables to set for the service
name: Optional name for the service (auto-generated if not provided)
num_cpus: Number of CPUs to request (default: 2)
memory: Memory in GB to request (default: 4)
use_spot: Whether to use spot instances (default: False)
"""
code: str
requirements: Optional[List[str]] = None
envs: Optional[Dict[str, str]] = None
name: Optional[str] = None
num_cpus: int = 2
memory: int = 4
use_spot: bool = False
num_nodes: int = 1
class SimpleSkyServe:
"""Simple interface for SkyPilot serve functionality."""
@staticmethod
def deploy(config: ServiceConfig) -> Tuple[str, str]:
"""Deploy a new service using the provided configuration.
Args:
config: ServiceConfig object containing service configuration
Returns:
Tuple of (service_name: str, endpoint: str)
Raises:
ValueError: If the configuration is invalid
RuntimeError: If deployment fails
"""
logger.info("Deploying new service...")
# Create temporary files for setup and service code
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt"
) as req_file, tempfile.NamedTemporaryFile(
mode="w", suffix=".py"
) as code_file:
# Write requirements if provided
setup_cmd = ""
if config.requirements:
req_file.write("\n".join(config.requirements))
req_file.flush()
setup_cmd = f"pip install -r {req_file.name}"
# Write service code
code_file.write(config.code)
code_file.flush()
# Create SkyPilot task
task = sky.Task(
name=config.name,
setup=setup_cmd,
run=f"python {code_file.name}",
envs=config.envs,
num_nodes=config.num_nodes,
)
# Set resource requirements
resources = sky.Resources(
cpus=config.num_cpus,
memory=config.memory,
use_spot=config.use_spot,
)
task.set_resources(resources)
try:
# Deploy the service
service_name, endpoint = sky.serve.up(
task, service_name=config.name
)
logger.success(
f"Service deployed successfully at {endpoint}"
)
return service_name, endpoint
except Exception as e:
logger.error(f"Failed to deploy service: {str(e)}")
raise RuntimeError(
f"Service deployment failed: {str(e)}"
) from e
@staticmethod
def status(service_name: Optional[str] = None) -> List[Dict]:
"""Get status of services.
Args:
service_name: Optional name of specific service to get status for
If None, returns status of all services
Returns:
List of service status dictionaries containing:
- name: Service name
- status: Current status
- endpoint: Service endpoint
- uptime: Service uptime in seconds
...and other service metadata
"""
logger.info(
f"Getting status for service: {service_name or 'all'}"
)
try:
status_list = sky.serve.status(service_name)
logger.debug(
f"Retrieved status for {len(status_list)} services"
)
return status_list
except Exception as e:
logger.error(f"Failed to get service status: {str(e)}")
raise RuntimeError(
f"Failed to get service status: {str(e)}"
) from e
@staticmethod
def update(
service_name: str,
config: ServiceConfig,
mode: UpdateMode = UpdateMode.GRADUAL,
) -> None:
"""Update an existing service with new configuration.
Args:
service_name: Name of service to update
config: New service configuration
mode: Update mode (IMMEDIATE or GRADUAL)
Raises:
ValueError: If service doesn't exist or config is invalid
RuntimeError: If update fails
"""
logger.info(
f"Updating service {service_name} with mode {mode.value}"
)
# Create temporary files for setup and service code
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt"
) as req_file, tempfile.NamedTemporaryFile(
mode="w", suffix=".py"
) as code_file:
# Write requirements if provided
setup_cmd = ""
if config.requirements:
req_file.write("\n".join(config.requirements))
req_file.flush()
setup_cmd = f"pip install -r {req_file.name}"
# Write service code
code_file.write(config.code)
code_file.flush()
# Create SkyPilot task for update
task = sky.Task(
name=config.name or service_name,
setup=setup_cmd,
run=f"python {code_file.name}",
envs=config.envs,
)
# Set resource requirements
resources = sky.Resources(
cpus=config.num_cpus,
memory=config.memory,
use_spot=config.use_spot,
)
task.set_resources(resources)
try:
# Update the service
sky.serve.update(
task=task,
service_name=service_name,
mode=sky.serve.UpdateMode(mode.value),
)
logger.success(
f"Service {service_name} updated successfully"
)
except Exception as e:
logger.error(f"Failed to update service: {str(e)}")
raise RuntimeError(
f"Service update failed: {str(e)}"
) from e
@staticmethod
def get_deployments(
service_name: Optional[str] = None,
) -> List[Dict]:
"""Get detailed information about service deployments.
Args:
service_name: Optional name of specific service to get deployments for
If None, returns deployments for all services
Returns:
List of deployment dictionaries containing:
- name: Service name
- versions: List of deployed versions
- active_version: Currently active version
- replicas: Number of replicas
- resources: Resource usage
- status: Deployment status
"""
logger.info(
f"Fetching deployments for: {service_name or 'all services'}"
)
try:
status_list = sky.serve.status(service_name)
deployments = []
for status in status_list:
deployment = {
"name": status["name"],
"versions": status["active_versions"],
"status": status["status"],
"replicas": len(status.get("replica_info", [])),
"resources": status.get(
"requested_resources_str", ""
),
"uptime": status.get("uptime", 0),
"endpoint": None,
}
# Extract endpoint if available
if status.get("load_balancer_port"):
deployment["endpoint"] = (
f"http://{status.get('controller_addr')}:{status['load_balancer_port']}"
)
deployments.append(deployment)
logger.debug(f"Retrieved {len(deployments)} deployments")
return deployments
except Exception as e:
logger.error(f"Failed to fetch deployments: {str(e)}")
raise RuntimeError(
f"Failed to fetch deployments: {str(e)}"
) from e
@staticmethod
def delete(
service_name: Union[str, List[str]], purge: bool = False
) -> None:
"""Delete one or more services.
Args:
service_name: Name of service(s) to delete
purge: Whether to purge services in failed status
Raises:
RuntimeError: If deletion fails
"""
names = (
[service_name]
if isinstance(service_name, str)
else service_name
)
logger.info(f"Deleting services: {names}")
try:
sky.serve.down(service_names=names, purge=purge)
logger.success(f"Successfully deleted services: {names}")
except Exception as e:
logger.error(f"Failed to delete services: {str(e)}")
raise RuntimeError(
f"Service deletion failed: {str(e)}"
) from e
# # Example usage:
# if __name__ == "__main__":
# from time import sleep
# # Configuration for a simple FastAPI service
# config = ServiceConfig(
# code="""
# from fastapi import FastAPI
# app = FastAPI()
# @app.get("/")
# def read_root():
# return {"Hello": "World"}
# """,
# requirements=["fastapi", "uvicorn"],
# envs={"PORT": "8000"},
# name="fastapi-demo"
# )
# # Deploy the service
# name, endpoint = SimpleSkyServe.deploy(config)
# print(f"Service deployed at: {endpoint}")
# # Get service status
# status = SimpleSkyServe.status(name)
# print(f"Service status: {status}")
# # Get deployment information
# deployments = SimpleSkyServe.get_deployments(name)
# print(f"Deployment info: {deployments}")
# # Update the service with new code
# new_config = ServiceConfig(
# code="""
# from fastapi import FastAPI
# app = FastAPI()
# @app.get("/")
# def read_root():
# return {"Hello": "Updated World"}
# """,
# requirements=["fastapi", "uvicorn"],
# envs={"PORT": "8000"}
# )
# SimpleSkyServe.update(name, new_config, mode=UpdateMode.GRADUAL)
# print("Service updated")
# # Wait for update to complete
# sleep(30)
# # Check status after update
# status = SimpleSkyServe.status(name)
# print(f"Updated service status: {status}")
# # Delete the service
# SimpleSkyServe.delete(name)

@ -1,160 +0,0 @@
"""
Simple test script for SkyServe API using requests.
No test framework dependencies - just pure requests and assertions.
"""
import time
import requests
from typing import Any
# API Configuration
BASE_URL = "http://localhost:8000"
HEADERS = {"Content-Type": "application/json"}
def assert_equals(actual: Any, expected: Any, message: str = ""):
"""Simple assertion helper."""
if actual != expected:
raise AssertionError(
f"{message}\nExpected: {expected}\nGot: {actual}"
)
def test_create_service() -> str:
"""Test service creation and return the service name."""
print("\n🧪 Testing service creation...")
payload = {
"code": """
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
""",
"requirements": ["fastapi", "uvicorn"],
"name": "test_service",
"num_cpus": 2,
"memory": 4,
}
response = requests.post(
f"{BASE_URL}/services/", json=payload, headers=HEADERS
)
assert_equals(
response.status_code, 201, "Service creation failed"
)
data = response.json()
assert "service_name" in data, "Response missing service_name"
assert "endpoint" in data, "Response missing endpoint"
print("✅ Service created successfully!")
return data["service_name"]
def test_list_services(expected_service_name: str):
"""Test listing services."""
print("\n🧪 Testing service listing...")
response = requests.get(f"{BASE_URL}/services/")
assert_equals(response.status_code, 200, "Service listing failed")
services = response.json()
assert isinstance(services, list), "Expected list of services"
# Find our service in the list
service_found = False
for service in services:
if service["name"] == expected_service_name:
service_found = True
break
assert (
service_found
), f"Created service {expected_service_name} not found in list"
print("✅ Services listed successfully!")
def test_update_service(service_name: str):
"""Test service update."""
print("\n🧪 Testing service update...")
update_payload = {
"code": """
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "Updated World"}
""",
"requirements": ["fastapi", "uvicorn"],
"name": service_name,
"num_cpus": 2,
"memory": 4,
}
response = requests.put(
f"{BASE_URL}/services/{service_name}",
json=update_payload,
headers=HEADERS,
params={"mode": "gradual"},
)
assert_equals(response.status_code, 200, "Service update failed")
print("✅ Service updated successfully!")
def test_delete_service(service_name: str):
"""Test service deletion."""
print("\n🧪 Testing service deletion...")
response = requests.delete(f"{BASE_URL}/services/{service_name}")
assert_equals(
response.status_code, 204, "Service deletion failed"
)
# Verify service is gone
list_response = requests.get(f"{BASE_URL}/services/")
services = list_response.json()
for service in services:
if service["name"] == service_name:
raise AssertionError(
f"Service {service_name} still exists after deletion"
)
print("✅ Service deleted successfully!")
def run_tests():
"""Run all tests in sequence."""
try:
print("🚀 Starting API tests...")
# Run tests in sequence
service_name = test_create_service()
# Wait a bit for service to be fully ready
print("⏳ Waiting for service to be ready...")
time.sleep(5)
test_list_services(service_name)
test_update_service(service_name)
test_delete_service(service_name)
print("\n✨ All tests passed successfully! ✨")
except AssertionError as e:
print(f"\n❌ Test failed: {str(e)}")
raise
except Exception as e:
print(f"\n❌ Unexpected error: {str(e)}")
raise
finally:
print("\n🏁 Tests completed")
if __name__ == "__main__":
run_tests()

@ -7,14 +7,9 @@ from typing import (
from dataclasses import dataclass
import csv
from pathlib import Path
import logging
from enum import Enum
from swarms import Agent
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ModelName(str, Enum):
"""Valid model names for swarms agents"""
@ -141,9 +136,7 @@ class AgentValidator:
str(e), str(e.__class__.__name__), str(config)
)
@dataclass
class AgentCSV:
class AgentLoader:
"""Class to manage agents through CSV with type safety"""
csv_path: Path
@ -184,7 +177,7 @@ class AgentCSV:
)
validated_agents.append(validated_config)
except AgentValidationError as e:
logger.error(
print(
f"Validation error for agent {agent.get('agent_name', 'unknown')}: {e}"
)
raise
@ -194,17 +187,25 @@ class AgentCSV:
writer.writeheader()
writer.writerows(validated_agents)
logger.info(
print(
f"Created CSV with {len(validated_agents)} agents at {self.csv_path}"
)
def load_agents(self) -> List[Agent]:
"""Load and create agents from CSV with validation"""
if not self.csv_path.exists():
raise FileNotFoundError(
f"CSV file not found at {self.csv_path}"
)
def load_agents(self, file_type: str = "csv") -> List[Agent]:
"""Load and create agents from CSV or JSON with validation"""
if file_type == "csv":
if not self.csv_path.exists():
raise FileNotFoundError(
f"CSV file not found at {self.csv_path}"
)
return self._load_agents_from_csv()
elif file_type == "json":
return self._load_agents_from_json()
else:
raise ValueError("Unsupported file type. Use 'csv' or 'json'.")
def _load_agents_from_csv(self) -> List[Agent]:
"""Load agents from a CSV file"""
agents: List[Agent] = []
with open(self.csv_path, "r") as f:
reader = csv.DictReader(f)
@ -213,105 +214,61 @@ class AgentCSV:
validated_config = AgentValidator.validate_config(
row
)
agent = Agent(
agent_name=validated_config["agent_name"],
system_prompt=validated_config[
"system_prompt"
],
model_name=validated_config["model_name"],
max_loops=validated_config["max_loops"],
autosave=validated_config["autosave"],
dashboard=validated_config["dashboard"],
verbose=validated_config["verbose"],
dynamic_temperature_enabled=validated_config[
"dynamic_temperature"
],
saved_state_path=validated_config[
"saved_state_path"
],
user_name=validated_config["user_name"],
retry_attempts=validated_config[
"retry_attempts"
],
context_length=validated_config[
"context_length"
],
return_step_meta=validated_config[
"return_step_meta"
],
output_type=validated_config["output_type"],
streaming_on=validated_config["streaming"],
)
agent = self._create_agent(validated_config)
agents.append(agent)
except AgentValidationError as e:
logger.error(
print(
f"Skipping invalid agent configuration: {e}"
)
continue
logger.info(
f"Loaded {len(agents)} agents from {self.csv_path}"
)
print(f"Loaded {len(agents)} agents from {self.csv_path}")
return agents
def add_agent(self, agent_config: Dict[str, Any]) -> None:
"""Add a new validated agent configuration to CSV"""
validated_config = AgentValidator.validate_config(
agent_config
)
def _load_agents_from_json(self) -> List[Agent]:
"""Load agents from a JSON file"""
import json
with open(self.csv_path, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=self.headers)
writer.writerow(validated_config)
logger.info(
f"Added new agent {validated_config['agent_name']} to {self.csv_path}"
)
if not self.csv_path.with_suffix('.json').exists():
raise FileNotFoundError(
f"JSON file not found at {self.csv_path.with_suffix('.json')}"
)
agents: List[Agent] = []
with open(self.csv_path.with_suffix('.json'), "r") as f:
agents_data = json.load(f)
for agent in agents_data:
try:
validated_config = AgentValidator.validate_config(
agent
)
agent = self._create_agent(validated_config)
agents.append(agent)
except AgentValidationError as e:
print(
f"Skipping invalid agent configuration: {e}"
)
continue
# Example usage
if __name__ == "__main__":
# Example agent configurations
agent_configs = [
{
"agent_name": "Financial-Analysis-Agent",
"system_prompt": "You are a financial expert...",
"model_name": "gpt-4o-mini", # Updated to correct model name
"max_loops": 1,
"autosave": True,
"dashboard": False,
"verbose": True,
"dynamic_temperature": True,
"saved_state_path": "finance_agent.json",
"user_name": "swarms_corp",
"retry_attempts": 3,
"context_length": 200000,
"return_step_meta": False,
"output_type": "string",
"streaming": False,
}
]
try:
# Initialize CSV manager
csv_manager = AgentCSV(Path("agents.csv"))
# Create CSV with initial agents
csv_manager.create_agent_csv(agent_configs)
# Load agents from CSV
agents = csv_manager.load_agents()
# Use an agent
if agents:
financial_agent = agents[0]
response = financial_agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break?"
)
print(response)
print(f"Loaded {len(agents)} agents from {self.csv_path.with_suffix('.json')}")
return agents
except AgentValidationError as e:
logger.error(f"Validation error: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
def _create_agent(self, validated_config: AgentConfigDict) -> Agent:
"""Create an Agent instance from validated configuration"""
return Agent(
agent_name=validated_config["agent_name"],
system_prompt=validated_config["system_prompt"],
model_name=validated_config["model_name"],
max_loops=validated_config["max_loops"],
autosave=validated_config["autosave"],
dashboard=validated_config["dashboard"],
verbose=validated_config["verbose"],
dynamic_temperature_enabled=validated_config["dynamic_temperature"],
saved_state_path=validated_config["saved_state_path"],
user_name=validated_config["user_name"],
retry_attempts=validated_config["retry_attempts"],
context_length=validated_config["context_length"],
return_step_meta=validated_config["return_step_meta"],
output_type=validated_config["output_type"],
streaming_on=validated_config["streaming"],
)

@ -3,15 +3,16 @@ import json
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import networkx as nx
from loguru import logger
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.utils.auto_download_check_packages import (
auto_check_and_download_package,
)
from swarms.structs.agent import Agent
# Configure logging
logger.add(
@ -188,14 +189,20 @@ class GraphSwarm:
def __init__(
self,
name: str = "graph-swarm-01",
description: str = "Graph swarm : build your own graph of agents",
agents: Union[
List[Agent], List[Tuple[Agent, List[str]]], None
List[Agent], List[Tuple[Agent, List[str]]], List[Callable]
] = None,
max_workers: Optional[int] = None,
swarm_name: str = "Collaborative Agent Swarm",
memory_collection: str = "swarm_memory",
*args,
**kwargs,
):
"""Initialize GraphSwarm."""
self.name = name
self.description = description
self.graph = nx.DiGraph()
self.agents: Dict[str, Agent] = {}
self.dependencies: Dict[str, List[str]] = {}

@ -1,7 +1,7 @@
# TESTING
# -==================
# Use an official Python runtime as a parent image
FROM python:3.9-slim
FROM python:3.11-slim
# Set environment variables to make Python output unbuffered and disable the PIP cache
ENV PYTHONDONTWRITEBYTECODE 1
@ -10,6 +10,10 @@ ENV PIP_NO_CACHE_DIR off
ENV PIP_DISABLE_PIP_VERSION_CHECK on
ENV PIP_DEFAULT_TIMEOUT 100
# Set environment variables for OpenAI API key and workspace directory
ENV OPENAI_API_KEY your_api_key_here
ENV WORKSPACE_DIR /path/to/your/workspace
# Set the working directory in the container
WORKDIR /usr/src/app
@ -22,11 +26,8 @@ RUN pip install poetry
# Disable virtualenv creation by poetry and install dependencies
RUN poetry config virtualenvs.create false
# Install the 'swarms' package if it's not included in the poetry.lock
RUN pip install swarms
# Assuming tests require pytest to run
RUN pip install pytest
# Install the 'swarms' package and any additional packages
RUN pip install swarms swarm-models swarms-memory pytest
# Run pytest on all tests in the tests directory
CMD pytest
# Run all tests in the tests directory
CMD python3 -m unittest discover -s tests

@ -1,62 +0,0 @@
from unittest.mock import Mock
import pytest
from swarms.telemetry.posthog_utils import (
log_activity_posthog,
posthog,
)
# Mock Posthog client
@pytest.fixture
def mock_posthog():
return Mock()
# Mock environment variables
@pytest.fixture
def mock_env(monkeypatch):
monkeypatch.setenv("POSTHOG_API_KEY", "test_api_key")
monkeypatch.setenv("POSTHOG_HOST", "test_host")
# Test the log_activity_posthog decorator
def test_log_activity_posthog(mock_posthog, mock_env):
event_name = "test_event"
event_properties = {"test_property": "test_value"}
# Create a test function with the decorator
@log_activity_posthog(event_name, **event_properties)
def test_function():
pass
# Call the test function
test_function()
# Check if the Posthog capture method was called with the expected arguments
mock_posthog.capture.assert_called_once_with(
"test_user_id", event_name, event_properties
)
# Test a scenario where environment variables are not set
def test_missing_env_variables(monkeypatch):
# Unset environment variables
monkeypatch.delenv("POSTHOG_API_KEY", raising=False)
monkeypatch.delenv("POSTHOG_HOST", raising=False)
# Create a test function with the decorator
@log_activity_posthog("test_event", test_property="test_value")
def test_function():
pass
# Ensure that calling the test function does not raise errors
test_function()
# Test the Posthog client initialization
def test_posthog_client_initialization(mock_env):
assert posthog.api_key == "test_api_key"
assert posthog.host == "test_host"
assert posthog.debug is True

@ -85,3 +85,17 @@ def test_generate_unique_identifier_edge_case():
unique_id = generate_unique_identifier()
unique_ids.add(unique_id)
assert len(unique_ids) == 100 # Ensure generated IDs are unique
def test_all():
test_generate_user_id()
test_get_machine_id()
test_get_system_info()
test_generate_unique_identifier()
test_generate_user_id_edge_case()
test_get_machine_id_edge_case()
test_get_system_info_edge_case()
test_generate_unique_identifier_edge_case()
test_all()

@ -1,66 +0,0 @@
import logging
import torch
from swarms.utils import check_device
# For the purpose of the test, we're assuming that the `memory_allocated`
# and `memory_reserved` function behave the same as `torch.cuda.memory_allocated`
# and `torch.cuda.memory_reserved`
def test_check_device_no_cuda(monkeypatch):
# Mock torch.cuda.is_available to always return False
monkeypatch.setattr(torch.cuda, "is_available", lambda: False)
result = check_device(log_level=logging.DEBUG)
assert result.type == "cpu"
def test_check_device_cuda_exception(monkeypatch):
# Mock torch.cuda.is_available to raise an exception
monkeypatch.setattr(
torch.cuda, "is_available", lambda: 1 / 0
) # Raises ZeroDivisionError
result = check_device(log_level=logging.DEBUG)
assert result.type == "cpu"
def test_check_device_one_cuda(monkeypatch):
# Mock torch.cuda.is_available to return True
monkeypatch.setattr(torch.cuda, "is_available", lambda: True)
# Mock torch.cuda.device_count to return 1
monkeypatch.setattr(torch.cuda, "device_count", lambda: 1)
# Mock torch.cuda.memory_allocated and torch.cuda.memory_reserved to return 0
monkeypatch.setattr(
torch.cuda, "memory_allocated", lambda device: 0
)
monkeypatch.setattr(
torch.cuda, "memory_reserved", lambda device: 0
)
result = check_device(log_level=logging.DEBUG)
assert len(result) == 1
assert result[0].type == "cuda"
assert result[0].index == 0
def test_check_device_multiple_cuda(monkeypatch):
# Mock torch.cuda.is_available to return True
monkeypatch.setattr(torch.cuda, "is_available", lambda: True)
# Mock torch.cuda.device_count to return 4
monkeypatch.setattr(torch.cuda, "device_count", lambda: 4)
# Mock torch.cuda.memory_allocated and torch.cuda.memory_reserved to return 0
monkeypatch.setattr(
torch.cuda, "memory_allocated", lambda device: 0
)
monkeypatch.setattr(
torch.cuda, "memory_reserved", lambda device: 0
)
result = check_device(log_level=logging.DEBUG)
assert len(result) == 4
for i in range(4):
assert result[i].type == "cuda"
assert result[i].index == i
Loading…
Cancel
Save