You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/swarm_cloud_api/swarm_cloud_code.py

370 lines
12 KiB

"""
SimpleSkyServe: A simplified interface for SkyPilot's serve functionality.
This module provides an easy-to-use interface for deploying, managing, updating and monitoring
services using SkyPilot's serve functionality. It supports the full lifecycle of services
including deployment, updates, status monitoring, and cleanup.
Key Features:
- Simple deployment with code and requirements
- Service updates with different update modes
- Status monitoring and deployment fetching
- Service cleanup and deletion
"""
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Union
import tempfile
from loguru import logger
class UpdateMode(Enum):
"""Update modes for service updates.
IMMEDIATE: Update all replicas immediately
GRADUAL: Update replicas gradually with zero downtime
"""
IMMEDIATE = "immediate"
GRADUAL = "gradual"
@dataclass
class ServiceConfig:
"""Configuration for a SkyPilot service.
Attributes:
code: Python code to run as a service
requirements: List of pip packages required by the service
envs: Environment variables to set for the service
name: Optional name for the service (auto-generated if not provided)
num_cpus: Number of CPUs to request (default: 2)
memory: Memory in GB to request (default: 4)
use_spot: Whether to use spot instances (default: False)
"""
code: str
requirements: Optional[List[str]] = None
envs: Optional[Dict[str, str]] = None
name: Optional[str] = None
num_cpus: int = 2
memory: int = 4
use_spot: bool = False
num_nodes: int = 1
class SimpleSkyServe:
"""Simple interface for SkyPilot serve functionality."""
@staticmethod
def deploy(config: ServiceConfig) -> Tuple[str, str]:
"""Deploy a new service using the provided configuration.
Args:
config: ServiceConfig object containing service configuration
Returns:
Tuple of (service_name: str, endpoint: str)
Raises:
ValueError: If the configuration is invalid
RuntimeError: If deployment fails
"""
logger.info("Deploying new service...")
# Create temporary files for setup and service code
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt"
) as req_file, tempfile.NamedTemporaryFile(
mode="w", suffix=".py"
) as code_file:
# Write requirements if provided
setup_cmd = ""
if config.requirements:
req_file.write("\n".join(config.requirements))
req_file.flush()
setup_cmd = f"pip install -r {req_file.name}"
# Write service code
code_file.write(config.code)
code_file.flush()
# Create SkyPilot task
task = sky.Task(
name=config.name,
setup=setup_cmd,
run=f"python {code_file.name}",
envs=config.envs,
num_nodes=config.num_nodes,
)
# Set resource requirements
resources = sky.Resources(
cpus=config.num_cpus,
memory=config.memory,
use_spot=config.use_spot,
)
task.set_resources(resources)
try:
# Deploy the service
service_name, endpoint = sky.serve.up(
task, service_name=config.name
)
logger.success(
f"Service deployed successfully at {endpoint}"
)
return service_name, endpoint
except Exception as e:
logger.error(f"Failed to deploy service: {str(e)}")
raise RuntimeError(
f"Service deployment failed: {str(e)}"
) from e
@staticmethod
def status(service_name: Optional[str] = None) -> List[Dict]:
"""Get status of services.
Args:
service_name: Optional name of specific service to get status for
If None, returns status of all services
Returns:
List of service status dictionaries containing:
- name: Service name
- status: Current status
- endpoint: Service endpoint
- uptime: Service uptime in seconds
...and other service metadata
"""
logger.info(
f"Getting status for service: {service_name or 'all'}"
)
try:
status_list = sky.serve.status(service_name)
logger.debug(
f"Retrieved status for {len(status_list)} services"
)
return status_list
except Exception as e:
logger.error(f"Failed to get service status: {str(e)}")
raise RuntimeError(
f"Failed to get service status: {str(e)}"
) from e
@staticmethod
def update(
service_name: str,
config: ServiceConfig,
mode: UpdateMode = UpdateMode.GRADUAL,
) -> None:
"""Update an existing service with new configuration.
Args:
service_name: Name of service to update
config: New service configuration
mode: Update mode (IMMEDIATE or GRADUAL)
Raises:
ValueError: If service doesn't exist or config is invalid
RuntimeError: If update fails
"""
logger.info(
f"Updating service {service_name} with mode {mode.value}"
)
# Create temporary files for setup and service code
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt"
) as req_file, tempfile.NamedTemporaryFile(
mode="w", suffix=".py"
) as code_file:
# Write requirements if provided
setup_cmd = ""
if config.requirements:
req_file.write("\n".join(config.requirements))
req_file.flush()
setup_cmd = f"pip install -r {req_file.name}"
# Write service code
code_file.write(config.code)
code_file.flush()
# Create SkyPilot task for update
task = sky.Task(
name=config.name or service_name,
setup=setup_cmd,
run=f"python {code_file.name}",
envs=config.envs,
)
# Set resource requirements
resources = sky.Resources(
cpus=config.num_cpus,
memory=config.memory,
use_spot=config.use_spot,
)
task.set_resources(resources)
try:
# Update the service
sky.serve.update(
task=task,
service_name=service_name,
mode=sky.serve.UpdateMode(mode.value),
)
logger.success(
f"Service {service_name} updated successfully"
)
except Exception as e:
logger.error(f"Failed to update service: {str(e)}")
raise RuntimeError(
f"Service update failed: {str(e)}"
) from e
@staticmethod
def get_deployments(
service_name: Optional[str] = None,
) -> List[Dict]:
"""Get detailed information about service deployments.
Args:
service_name: Optional name of specific service to get deployments for
If None, returns deployments for all services
Returns:
List of deployment dictionaries containing:
- name: Service name
- versions: List of deployed versions
- active_version: Currently active version
- replicas: Number of replicas
- resources: Resource usage
- status: Deployment status
"""
logger.info(
f"Fetching deployments for: {service_name or 'all services'}"
)
try:
status_list = sky.serve.status(service_name)
deployments = []
for status in status_list:
deployment = {
"name": status["name"],
"versions": status["active_versions"],
"status": status["status"],
"replicas": len(status.get("replica_info", [])),
"resources": status.get(
"requested_resources_str", ""
),
"uptime": status.get("uptime", 0),
"endpoint": None,
}
# Extract endpoint if available
if status.get("load_balancer_port"):
deployment["endpoint"] = (
f"http://{status.get('controller_addr')}:{status['load_balancer_port']}"
)
deployments.append(deployment)
logger.debug(f"Retrieved {len(deployments)} deployments")
return deployments
except Exception as e:
logger.error(f"Failed to fetch deployments: {str(e)}")
raise RuntimeError(
f"Failed to fetch deployments: {str(e)}"
) from e
@staticmethod
def delete(
service_name: Union[str, List[str]], purge: bool = False
) -> None:
"""Delete one or more services.
Args:
service_name: Name of service(s) to delete
purge: Whether to purge services in failed status
Raises:
RuntimeError: If deletion fails
"""
names = (
[service_name]
if isinstance(service_name, str)
else service_name
)
logger.info(f"Deleting services: {names}")
try:
sky.serve.down(service_names=names, purge=purge)
logger.success(f"Successfully deleted services: {names}")
except Exception as e:
logger.error(f"Failed to delete services: {str(e)}")
raise RuntimeError(
f"Service deletion failed: {str(e)}"
) from e
# # Example usage:
# if __name__ == "__main__":
# from time import sleep
# # Configuration for a simple FastAPI service
# config = ServiceConfig(
# code="""
# from fastapi import FastAPI
# app = FastAPI()
# @app.get("/")
# def read_root():
# return {"Hello": "World"}
# """,
# requirements=["fastapi", "uvicorn"],
# envs={"PORT": "8000"},
# name="fastapi-demo"
# )
# # Deploy the service
# name, endpoint = SimpleSkyServe.deploy(config)
# print(f"Service deployed at: {endpoint}")
# # Get service status
# status = SimpleSkyServe.status(name)
# print(f"Service status: {status}")
# # Get deployment information
# deployments = SimpleSkyServe.get_deployments(name)
# print(f"Deployment info: {deployments}")
# # Update the service with new code
# new_config = ServiceConfig(
# code="""
# from fastapi import FastAPI
# app = FastAPI()
# @app.get("/")
# def read_root():
# return {"Hello": "Updated World"}
# """,
# requirements=["fastapi", "uvicorn"],
# envs={"PORT": "8000"}
# )
# SimpleSkyServe.update(name, new_config, mode=UpdateMode.GRADUAL)
# print("Service updated")
# # Wait for update to complete
# sleep(30)
# # Check status after update
# status = SimpleSkyServe.status(name)
# print(f"Updated service status: {status}")
# # Delete the service
# SimpleSkyServe.delete(name)