parent
65d5631c37
commit
c74c433015
@ -0,0 +1,207 @@
|
||||
# How to Add a New Swarm Class
|
||||
|
||||
This guide provides comprehensive step-by-step instructions for developers to create and add a new swarm. It emphasizes the importance of adhering to best practices, using proper type hints, and documenting code thoroughly to ensure maintainability, scalability, and clarity in your implementations.
|
||||
|
||||
## Overview
|
||||
|
||||
A Swarm class enables developers to manage and coordinate multiple agents working together to accomplish complex tasks efficiently. Each Swarm must:
|
||||
|
||||
- Contain a `run(task: str, img: str, *args, **kwargs)` method, which serves as the primary execution method for tasks.
|
||||
- Include `name`, `description`, and `agents` parameters.
|
||||
- Ensure `agents` is a callable function that adheres to specific requirements for dynamic agent behavior.
|
||||
- Follow type-hinting and documentation best practices to maintain code clarity and reliability.
|
||||
|
||||
Each Agent within the swarm must:
|
||||
|
||||
- Contain `agent_name`, `system_prompt`, and a `run` method.
|
||||
- Follow similar type hinting and documentation standards to ensure consistency and readability.
|
||||
|
||||
By adhering to these requirements, you can create robust, reusable, and modular swarms that streamline task management and enhance collaborative functionality.
|
||||
|
||||
---
|
||||
|
||||
## Creating a Swarm Class
|
||||
|
||||
Below is a detailed template for creating a Swarm class. Ensure that all elements are documented and clearly defined:
|
||||
|
||||
```python
|
||||
from typing import Callable, Any
|
||||
|
||||
class MySwarm:
|
||||
"""
|
||||
A custom swarm class to manage and execute tasks with multiple agents.
|
||||
|
||||
Attributes:
|
||||
name (str): The name of the swarm.
|
||||
description (str): A brief description of the swarm's purpose.
|
||||
agents (Callable): A callable that returns the list of agents to be utilized.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, description: str, agents: Callable):
|
||||
"""
|
||||
Initialize the Swarm with its name, description, and agents.
|
||||
|
||||
Args:
|
||||
name (str): The name of the swarm.
|
||||
description (str): A description of the swarm.
|
||||
agents (Callable): A callable that provides the agents for the swarm.
|
||||
"""
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.agents = agents
|
||||
|
||||
def run(self, task: str, img: str, *args: Any, **kwargs: Any) -> Any:
|
||||
"""
|
||||
Execute a task using the swarm and its agents.
|
||||
|
||||
Args:
|
||||
task (str): The task description.
|
||||
img (str): The image input.
|
||||
*args: Additional positional arguments for customization.
|
||||
**kwargs: Additional keyword arguments for fine-tuning behavior.
|
||||
|
||||
Returns:
|
||||
Any: The result of the task execution, aggregated from all agents.
|
||||
"""
|
||||
results = []
|
||||
for agent in self.agents():
|
||||
result = agent.run(task, img, *args, **kwargs)
|
||||
results.append(result)
|
||||
return results
|
||||
```
|
||||
|
||||
This Swarm class serves as the main orchestrator for coordinating agents and running tasks dynamically and flexibly.
|
||||
|
||||
---
|
||||
|
||||
## Creating an Agent Class
|
||||
|
||||
Each agent must follow a well-defined structure to ensure compatibility with the swarm. Below is an example of an agent class:
|
||||
|
||||
```python
|
||||
class Agent:
|
||||
"""
|
||||
A single agent class to handle specific tasks assigned by the swarm.
|
||||
|
||||
Attributes:
|
||||
agent_name (str): The name of the agent.
|
||||
system_prompt (str): The system prompt guiding the agent's behavior and purpose.
|
||||
"""
|
||||
|
||||
def __init__(self, agent_name: str, system_prompt: str):
|
||||
"""
|
||||
Initialize the agent with its name and system prompt.
|
||||
|
||||
Args:
|
||||
agent_name (str): The name of the agent.
|
||||
system_prompt (str): The guiding prompt for the agent.
|
||||
"""
|
||||
self.agent_name = agent_name
|
||||
self.system_prompt = system_prompt
|
||||
|
||||
def run(self, task: str, img: str, *args: Any, **kwargs: Any) -> Any:
|
||||
"""
|
||||
Execute a specific task assigned to the agent.
|
||||
|
||||
Args:
|
||||
task (str): The task description.
|
||||
img (str): The image input for processing.
|
||||
*args: Additional positional arguments for task details.
|
||||
**kwargs: Additional keyword arguments for extended functionality.
|
||||
|
||||
Returns:
|
||||
Any: The result of the task execution, which can be customized.
|
||||
"""
|
||||
# Example implementation (to be customized by developer)
|
||||
return f"Agent {self.agent_name} executed task: {task}"
|
||||
```
|
||||
|
||||
This structure ensures that each agent can independently handle tasks and integrate seamlessly into a swarm.
|
||||
|
||||
---
|
||||
|
||||
## Adding Your Swarm to a Project
|
||||
|
||||
### Step 1: Define Your Agents
|
||||
Create one or more instances of the `Agent` class to serve as components of your swarm. For example:
|
||||
|
||||
```python
|
||||
def create_agents():
|
||||
return [
|
||||
Agent(agent_name="Agent1", system_prompt="Analyze the image and summarize results."),
|
||||
Agent(agent_name="Agent2", system_prompt="Detect objects and highlight key features."),
|
||||
]
|
||||
```
|
||||
|
||||
### Step 2: Implement Your Swarm
|
||||
Create an instance of your Swarm class, defining its name, description, and associated agents:
|
||||
|
||||
```python
|
||||
my_swarm = MySwarm(
|
||||
name="Image Analysis Swarm",
|
||||
description="A swarm designed to analyze images and perform a range of related tasks.",
|
||||
agents=create_agents
|
||||
)
|
||||
```
|
||||
|
||||
### Step 3: Execute Tasks
|
||||
Call the `run` method of your swarm, passing in the required parameters for execution:
|
||||
|
||||
```python
|
||||
results = my_swarm.run(task="Analyze image content", img="path/to/image.jpg")
|
||||
print(results)
|
||||
```
|
||||
|
||||
This simple flow allows you to dynamically utilize agents for diverse operations and ensures efficient task execution.
|
||||
|
||||
---
|
||||
|
||||
## Best Practices
|
||||
|
||||
To ensure your swarm implementation is efficient and maintainable, follow these best practices:
|
||||
|
||||
1. **Type Annotations:**
|
||||
Use precise type hints for parameters and return types to improve code readability and support static analysis tools.
|
||||
|
||||
2. **Comprehensive Documentation:**
|
||||
Include clear and detailed docstrings for all classes, methods, and attributes to ensure your code is understandable.
|
||||
|
||||
3. **Thorough Testing:**
|
||||
Test your swarm and agents with various tasks to verify correctness and identify potential edge cases.
|
||||
|
||||
4. **Modular Design:**
|
||||
Keep your swarm and agent logic modular, enabling reuse and easy extensions for future enhancements.
|
||||
|
||||
5. **Error Handling:**
|
||||
Implement robust error handling in the `run` methods to gracefully manage unexpected inputs or issues during execution.
|
||||
|
||||
6. **Code Review:**
|
||||
Regularly review and refactor your code to align with the latest best practices and maintain high quality.
|
||||
|
||||
7. **Scalability:**
|
||||
Design your swarm with scalability in mind, ensuring it can handle a large number of agents and complex tasks.
|
||||
|
||||
8. **Logging and Monitoring:**
|
||||
Include comprehensive logging to track task execution and monitor performance, enabling easier debugging and optimization.
|
||||
|
||||
---
|
||||
|
||||
## Example Output
|
||||
|
||||
Given the implementation above, executing a task might produce output such as:
|
||||
|
||||
```plaintext
|
||||
[
|
||||
"Agent Agent1 executed task: Analyze image content",
|
||||
"Agent Agent2 executed task: Analyze image content"
|
||||
]
|
||||
```
|
||||
|
||||
The modular design ensures that each agent contributes to the overall functionality of the swarm, allowing seamless scalability and dynamic task management.
|
||||
|
||||
---
|
||||
|
||||
## Conclusion
|
||||
|
||||
By following these guidelines, you can create swarms that are powerful, flexible, and maintainable. Leveraging the provided templates and best practices enables you to build efficient multi-agent systems capable of handling diverse and complex tasks. Proper structuring, thorough testing, and adherence to best practices will ensure your swarm integrates effectively into any project, delivering robust and reliable performance. Furthermore, maintaining clear documentation and emphasizing modularity will help your implementation adapt to future needs and use cases. Empower your projects with a well-designed swarm architecture today.
|
||||
|
@ -0,0 +1,226 @@
|
||||
"""
|
||||
SkyServe API: Production-grade FastAPI server for SimpleSkyServe.
|
||||
|
||||
This module provides a REST API interface for managing SkyPilot services with
|
||||
proper error handling, validation, and production configurations.
|
||||
"""
|
||||
|
||||
import multiprocessing
|
||||
import os
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi import FastAPI, HTTPException, status
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic.v1 import validator
|
||||
from swarm_cloud_code import ServiceConfig, SimpleSkyServe, UpdateMode
|
||||
|
||||
# Calculate optimal number of workers
|
||||
CPU_COUNT = multiprocessing.cpu_count()
|
||||
WORKERS = CPU_COUNT * 2
|
||||
|
||||
# Configure logging
|
||||
logger.add(
|
||||
"logs/skyserve-api.log",
|
||||
rotation="500 MB",
|
||||
retention="10 days",
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
# Initialize FastAPI app
|
||||
app = FastAPI(
|
||||
title="SkyServe API",
|
||||
description="REST API for managing SkyPilot services",
|
||||
version="1.0.0",
|
||||
docs_url="/docs",
|
||||
redoc_url="/redoc",
|
||||
)
|
||||
|
||||
# Configure CORS
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=os.getenv("ALLOWED_ORIGINS", "*").split(","),
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
# Pydantic models for request/response validation
|
||||
class ServiceConfigRequest(BaseModel):
|
||||
"""Request model for service configuration."""
|
||||
|
||||
code: str = Field(
|
||||
..., description="Python code to run as a service"
|
||||
)
|
||||
requirements: Optional[List[str]] = Field(
|
||||
default=None, description="List of pip packages"
|
||||
)
|
||||
envs: Optional[dict] = Field(
|
||||
default=None, description="Environment variables"
|
||||
)
|
||||
name: Optional[str] = Field(
|
||||
default=None, description="Service name"
|
||||
)
|
||||
num_cpus: int = Field(
|
||||
default=2, ge=1, description="Number of CPUs"
|
||||
)
|
||||
memory: int = Field(default=4, ge=1, description="Memory in GB")
|
||||
use_spot: bool = Field(
|
||||
default=False, description="Use spot instances"
|
||||
)
|
||||
num_nodes: int = Field(
|
||||
default=1, ge=1, description="Number of nodes"
|
||||
)
|
||||
|
||||
@validator("name")
|
||||
def validate_name(cls, v):
|
||||
if v and not v.isalnum():
|
||||
raise ValueError("Service name must be alphanumeric")
|
||||
return v
|
||||
|
||||
|
||||
class DeploymentResponse(BaseModel):
|
||||
"""Response model for deployment information."""
|
||||
|
||||
service_name: str
|
||||
endpoint: str
|
||||
|
||||
|
||||
class ServiceStatusResponse(BaseModel):
|
||||
"""Response model for service status."""
|
||||
|
||||
name: str
|
||||
status: str
|
||||
versions: List[int]
|
||||
replicas: int
|
||||
resources: str
|
||||
uptime: int
|
||||
endpoint: Optional[str]
|
||||
|
||||
|
||||
@app.post(
|
||||
"/services/",
|
||||
response_model=DeploymentResponse,
|
||||
status_code=status.HTTP_201_CREATED,
|
||||
tags=["services"],
|
||||
)
|
||||
async def create_service(config: ServiceConfigRequest):
|
||||
"""Deploy a new service."""
|
||||
try:
|
||||
service_config = ServiceConfig(
|
||||
code=config.code,
|
||||
requirements=config.requirements,
|
||||
envs=config.envs,
|
||||
name=config.name,
|
||||
num_cpus=config.num_cpus,
|
||||
memory=config.memory,
|
||||
use_spot=config.use_spot,
|
||||
num_nodes=config.num_nodes,
|
||||
)
|
||||
name, endpoint = SimpleSkyServe.deploy(service_config)
|
||||
return {"service_name": name, "endpoint": endpoint}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create service: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=str(e),
|
||||
)
|
||||
|
||||
|
||||
@app.get(
|
||||
"/services/",
|
||||
response_model=List[ServiceStatusResponse],
|
||||
tags=["services"],
|
||||
)
|
||||
async def list_services(name: Optional[str] = None):
|
||||
"""Get status of all services or a specific service."""
|
||||
try:
|
||||
deployments = SimpleSkyServe.get_deployments(name)
|
||||
return deployments
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list services: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=str(e),
|
||||
)
|
||||
|
||||
|
||||
@app.put(
|
||||
"/services/{service_name}",
|
||||
status_code=status.HTTP_200_OK,
|
||||
tags=["services"],
|
||||
)
|
||||
async def update_service(
|
||||
service_name: str,
|
||||
config: ServiceConfigRequest,
|
||||
mode: UpdateMode = UpdateMode.GRADUAL,
|
||||
):
|
||||
"""Update an existing service."""
|
||||
try:
|
||||
service_config = ServiceConfig(
|
||||
code=config.code,
|
||||
requirements=config.requirements,
|
||||
envs=config.envs,
|
||||
name=config.name,
|
||||
num_cpus=config.num_cpus,
|
||||
memory=config.memory,
|
||||
use_spot=config.use_spot,
|
||||
num_nodes=config.num_nodes,
|
||||
)
|
||||
SimpleSkyServe.update(service_name, service_config, mode)
|
||||
return {
|
||||
"message": f"Service {service_name} updated successfully"
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update service: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=str(e),
|
||||
)
|
||||
|
||||
|
||||
@app.delete(
|
||||
"/services/{service_name}",
|
||||
status_code=status.HTTP_204_NO_CONTENT,
|
||||
tags=["services"],
|
||||
)
|
||||
async def delete_service(service_name: str, purge: bool = False):
|
||||
"""Delete a service."""
|
||||
try:
|
||||
SimpleSkyServe.delete(service_name, purge)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete service: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=str(e),
|
||||
)
|
||||
|
||||
|
||||
@app.exception_handler(Exception)
|
||||
async def general_exception_handler(request, exc):
|
||||
"""Global exception handler."""
|
||||
logger.error(f"Unhandled exception: {str(exc)}")
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content={"detail": "Internal server error"},
|
||||
)
|
||||
|
||||
|
||||
# Entry point for uvicorn
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
uvicorn.run(
|
||||
"api:app",
|
||||
host="0.0.0.0",
|
||||
port=8000,
|
||||
workers=WORKERS,
|
||||
log_level="info",
|
||||
reload=False, # Disable in production
|
||||
proxy_headers=True,
|
||||
forwarded_allow_ips="*",
|
||||
access_log=True,
|
||||
)
|
@ -0,0 +1,10 @@
|
||||
fastapi
|
||||
uvicorn[standard]
|
||||
pydantic
|
||||
loguru
|
||||
python-multipart
|
||||
python-jose[cryptography]
|
||||
passlib[bcrypt]
|
||||
gunicorn
|
||||
prometheus-fastapi-instrumentator
|
||||
httpx
|
@ -0,0 +1,369 @@
|
||||
"""
|
||||
SimpleSkyServe: A simplified interface for SkyPilot's serve functionality.
|
||||
|
||||
This module provides an easy-to-use interface for deploying, managing, updating and monitoring
|
||||
services using SkyPilot's serve functionality. It supports the full lifecycle of services
|
||||
including deployment, updates, status monitoring, and cleanup.
|
||||
|
||||
Key Features:
|
||||
- Simple deployment with code and requirements
|
||||
- Service updates with different update modes
|
||||
- Status monitoring and deployment fetching
|
||||
- Service cleanup and deletion
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional, Tuple, Union
|
||||
import tempfile
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class UpdateMode(Enum):
|
||||
"""Update modes for service updates.
|
||||
|
||||
IMMEDIATE: Update all replicas immediately
|
||||
GRADUAL: Update replicas gradually with zero downtime
|
||||
"""
|
||||
|
||||
IMMEDIATE = "immediate"
|
||||
GRADUAL = "gradual"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ServiceConfig:
|
||||
"""Configuration for a SkyPilot service.
|
||||
|
||||
Attributes:
|
||||
code: Python code to run as a service
|
||||
requirements: List of pip packages required by the service
|
||||
envs: Environment variables to set for the service
|
||||
name: Optional name for the service (auto-generated if not provided)
|
||||
num_cpus: Number of CPUs to request (default: 2)
|
||||
memory: Memory in GB to request (default: 4)
|
||||
use_spot: Whether to use spot instances (default: False)
|
||||
"""
|
||||
|
||||
code: str
|
||||
requirements: Optional[List[str]] = None
|
||||
envs: Optional[Dict[str, str]] = None
|
||||
name: Optional[str] = None
|
||||
num_cpus: int = 2
|
||||
memory: int = 4
|
||||
use_spot: bool = False
|
||||
num_nodes: int = 1
|
||||
|
||||
|
||||
class SimpleSkyServe:
|
||||
"""Simple interface for SkyPilot serve functionality."""
|
||||
|
||||
@staticmethod
|
||||
def deploy(config: ServiceConfig) -> Tuple[str, str]:
|
||||
"""Deploy a new service using the provided configuration.
|
||||
|
||||
Args:
|
||||
config: ServiceConfig object containing service configuration
|
||||
|
||||
Returns:
|
||||
Tuple of (service_name: str, endpoint: str)
|
||||
|
||||
Raises:
|
||||
ValueError: If the configuration is invalid
|
||||
RuntimeError: If deployment fails
|
||||
"""
|
||||
logger.info("Deploying new service...")
|
||||
|
||||
# Create temporary files for setup and service code
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".txt"
|
||||
) as req_file, tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".py"
|
||||
) as code_file:
|
||||
|
||||
# Write requirements if provided
|
||||
setup_cmd = ""
|
||||
if config.requirements:
|
||||
req_file.write("\n".join(config.requirements))
|
||||
req_file.flush()
|
||||
setup_cmd = f"pip install -r {req_file.name}"
|
||||
|
||||
# Write service code
|
||||
code_file.write(config.code)
|
||||
code_file.flush()
|
||||
|
||||
# Create SkyPilot task
|
||||
task = sky.Task(
|
||||
name=config.name,
|
||||
setup=setup_cmd,
|
||||
run=f"python {code_file.name}",
|
||||
envs=config.envs,
|
||||
num_nodes=config.num_nodes,
|
||||
)
|
||||
|
||||
# Set resource requirements
|
||||
resources = sky.Resources(
|
||||
cpus=config.num_cpus,
|
||||
memory=config.memory,
|
||||
use_spot=config.use_spot,
|
||||
)
|
||||
task.set_resources(resources)
|
||||
|
||||
try:
|
||||
# Deploy the service
|
||||
service_name, endpoint = sky.serve.up(
|
||||
task, service_name=config.name
|
||||
)
|
||||
logger.success(
|
||||
f"Service deployed successfully at {endpoint}"
|
||||
)
|
||||
return service_name, endpoint
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to deploy service: {str(e)}")
|
||||
raise RuntimeError(
|
||||
f"Service deployment failed: {str(e)}"
|
||||
) from e
|
||||
|
||||
@staticmethod
|
||||
def status(service_name: Optional[str] = None) -> List[Dict]:
|
||||
"""Get status of services.
|
||||
|
||||
Args:
|
||||
service_name: Optional name of specific service to get status for
|
||||
If None, returns status of all services
|
||||
|
||||
Returns:
|
||||
List of service status dictionaries containing:
|
||||
- name: Service name
|
||||
- status: Current status
|
||||
- endpoint: Service endpoint
|
||||
- uptime: Service uptime in seconds
|
||||
...and other service metadata
|
||||
"""
|
||||
logger.info(
|
||||
f"Getting status for service: {service_name or 'all'}"
|
||||
)
|
||||
try:
|
||||
status_list = sky.serve.status(service_name)
|
||||
logger.debug(
|
||||
f"Retrieved status for {len(status_list)} services"
|
||||
)
|
||||
return status_list
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get service status: {str(e)}")
|
||||
raise RuntimeError(
|
||||
f"Failed to get service status: {str(e)}"
|
||||
) from e
|
||||
|
||||
@staticmethod
|
||||
def update(
|
||||
service_name: str,
|
||||
config: ServiceConfig,
|
||||
mode: UpdateMode = UpdateMode.GRADUAL,
|
||||
) -> None:
|
||||
"""Update an existing service with new configuration.
|
||||
|
||||
Args:
|
||||
service_name: Name of service to update
|
||||
config: New service configuration
|
||||
mode: Update mode (IMMEDIATE or GRADUAL)
|
||||
|
||||
Raises:
|
||||
ValueError: If service doesn't exist or config is invalid
|
||||
RuntimeError: If update fails
|
||||
"""
|
||||
logger.info(
|
||||
f"Updating service {service_name} with mode {mode.value}"
|
||||
)
|
||||
|
||||
# Create temporary files for setup and service code
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".txt"
|
||||
) as req_file, tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".py"
|
||||
) as code_file:
|
||||
|
||||
# Write requirements if provided
|
||||
setup_cmd = ""
|
||||
if config.requirements:
|
||||
req_file.write("\n".join(config.requirements))
|
||||
req_file.flush()
|
||||
setup_cmd = f"pip install -r {req_file.name}"
|
||||
|
||||
# Write service code
|
||||
code_file.write(config.code)
|
||||
code_file.flush()
|
||||
|
||||
# Create SkyPilot task for update
|
||||
task = sky.Task(
|
||||
name=config.name or service_name,
|
||||
setup=setup_cmd,
|
||||
run=f"python {code_file.name}",
|
||||
envs=config.envs,
|
||||
)
|
||||
|
||||
# Set resource requirements
|
||||
resources = sky.Resources(
|
||||
cpus=config.num_cpus,
|
||||
memory=config.memory,
|
||||
use_spot=config.use_spot,
|
||||
)
|
||||
task.set_resources(resources)
|
||||
|
||||
try:
|
||||
# Update the service
|
||||
sky.serve.update(
|
||||
task=task,
|
||||
service_name=service_name,
|
||||
mode=sky.serve.UpdateMode(mode.value),
|
||||
)
|
||||
logger.success(
|
||||
f"Service {service_name} updated successfully"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update service: {str(e)}")
|
||||
raise RuntimeError(
|
||||
f"Service update failed: {str(e)}"
|
||||
) from e
|
||||
|
||||
@staticmethod
|
||||
def get_deployments(
|
||||
service_name: Optional[str] = None,
|
||||
) -> List[Dict]:
|
||||
"""Get detailed information about service deployments.
|
||||
|
||||
Args:
|
||||
service_name: Optional name of specific service to get deployments for
|
||||
If None, returns deployments for all services
|
||||
|
||||
Returns:
|
||||
List of deployment dictionaries containing:
|
||||
- name: Service name
|
||||
- versions: List of deployed versions
|
||||
- active_version: Currently active version
|
||||
- replicas: Number of replicas
|
||||
- resources: Resource usage
|
||||
- status: Deployment status
|
||||
"""
|
||||
logger.info(
|
||||
f"Fetching deployments for: {service_name or 'all services'}"
|
||||
)
|
||||
try:
|
||||
status_list = sky.serve.status(service_name)
|
||||
deployments = []
|
||||
|
||||
for status in status_list:
|
||||
deployment = {
|
||||
"name": status["name"],
|
||||
"versions": status["active_versions"],
|
||||
"status": status["status"],
|
||||
"replicas": len(status.get("replica_info", [])),
|
||||
"resources": status.get(
|
||||
"requested_resources_str", ""
|
||||
),
|
||||
"uptime": status.get("uptime", 0),
|
||||
"endpoint": None,
|
||||
}
|
||||
|
||||
# Extract endpoint if available
|
||||
if status.get("load_balancer_port"):
|
||||
deployment["endpoint"] = (
|
||||
f"http://{status.get('controller_addr')}:{status['load_balancer_port']}"
|
||||
)
|
||||
|
||||
deployments.append(deployment)
|
||||
|
||||
logger.debug(f"Retrieved {len(deployments)} deployments")
|
||||
return deployments
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch deployments: {str(e)}")
|
||||
raise RuntimeError(
|
||||
f"Failed to fetch deployments: {str(e)}"
|
||||
) from e
|
||||
|
||||
@staticmethod
|
||||
def delete(
|
||||
service_name: Union[str, List[str]], purge: bool = False
|
||||
) -> None:
|
||||
"""Delete one or more services.
|
||||
|
||||
Args:
|
||||
service_name: Name of service(s) to delete
|
||||
purge: Whether to purge services in failed status
|
||||
|
||||
Raises:
|
||||
RuntimeError: If deletion fails
|
||||
"""
|
||||
names = (
|
||||
[service_name]
|
||||
if isinstance(service_name, str)
|
||||
else service_name
|
||||
)
|
||||
logger.info(f"Deleting services: {names}")
|
||||
try:
|
||||
sky.serve.down(service_names=names, purge=purge)
|
||||
logger.success(f"Successfully deleted services: {names}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete services: {str(e)}")
|
||||
raise RuntimeError(
|
||||
f"Service deletion failed: {str(e)}"
|
||||
) from e
|
||||
|
||||
|
||||
# # Example usage:
|
||||
# if __name__ == "__main__":
|
||||
# from time import sleep
|
||||
# # Configuration for a simple FastAPI service
|
||||
# config = ServiceConfig(
|
||||
# code="""
|
||||
# from fastapi import FastAPI
|
||||
# app = FastAPI()
|
||||
|
||||
# @app.get("/")
|
||||
# def read_root():
|
||||
# return {"Hello": "World"}
|
||||
# """,
|
||||
# requirements=["fastapi", "uvicorn"],
|
||||
# envs={"PORT": "8000"},
|
||||
# name="fastapi-demo"
|
||||
# )
|
||||
|
||||
# # Deploy the service
|
||||
# name, endpoint = SimpleSkyServe.deploy(config)
|
||||
# print(f"Service deployed at: {endpoint}")
|
||||
|
||||
# # Get service status
|
||||
# status = SimpleSkyServe.status(name)
|
||||
# print(f"Service status: {status}")
|
||||
|
||||
# # Get deployment information
|
||||
# deployments = SimpleSkyServe.get_deployments(name)
|
||||
# print(f"Deployment info: {deployments}")
|
||||
|
||||
# # Update the service with new code
|
||||
# new_config = ServiceConfig(
|
||||
# code="""
|
||||
# from fastapi import FastAPI
|
||||
# app = FastAPI()
|
||||
|
||||
# @app.get("/")
|
||||
# def read_root():
|
||||
# return {"Hello": "Updated World"}
|
||||
# """,
|
||||
# requirements=["fastapi", "uvicorn"],
|
||||
# envs={"PORT": "8000"}
|
||||
# )
|
||||
|
||||
# SimpleSkyServe.update(name, new_config, mode=UpdateMode.GRADUAL)
|
||||
# print("Service updated")
|
||||
|
||||
# # Wait for update to complete
|
||||
# sleep(30)
|
||||
|
||||
# # Check status after update
|
||||
# status = SimpleSkyServe.status(name)
|
||||
# print(f"Updated service status: {status}")
|
||||
|
||||
# # Delete the service
|
||||
# SimpleSkyServe.delete(name)
|
@ -0,0 +1,160 @@
|
||||
"""
|
||||
Simple test script for SkyServe API using requests.
|
||||
No test framework dependencies - just pure requests and assertions.
|
||||
"""
|
||||
|
||||
import time
|
||||
import requests
|
||||
from typing import Any
|
||||
|
||||
# API Configuration
|
||||
BASE_URL = "http://localhost:8000"
|
||||
HEADERS = {"Content-Type": "application/json"}
|
||||
|
||||
|
||||
def assert_equals(actual: Any, expected: Any, message: str = ""):
|
||||
"""Simple assertion helper."""
|
||||
if actual != expected:
|
||||
raise AssertionError(
|
||||
f"{message}\nExpected: {expected}\nGot: {actual}"
|
||||
)
|
||||
|
||||
|
||||
def test_create_service() -> str:
|
||||
"""Test service creation and return the service name."""
|
||||
print("\n🧪 Testing service creation...")
|
||||
|
||||
payload = {
|
||||
"code": """
|
||||
from fastapi import FastAPI
|
||||
app = FastAPI()
|
||||
|
||||
@app.get("/")
|
||||
def read_root():
|
||||
return {"Hello": "World"}
|
||||
""",
|
||||
"requirements": ["fastapi", "uvicorn"],
|
||||
"name": "test_service",
|
||||
"num_cpus": 2,
|
||||
"memory": 4,
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
f"{BASE_URL}/services/", json=payload, headers=HEADERS
|
||||
)
|
||||
|
||||
assert_equals(
|
||||
response.status_code, 201, "Service creation failed"
|
||||
)
|
||||
data = response.json()
|
||||
assert "service_name" in data, "Response missing service_name"
|
||||
assert "endpoint" in data, "Response missing endpoint"
|
||||
|
||||
print("✅ Service created successfully!")
|
||||
return data["service_name"]
|
||||
|
||||
|
||||
def test_list_services(expected_service_name: str):
|
||||
"""Test listing services."""
|
||||
print("\n🧪 Testing service listing...")
|
||||
|
||||
response = requests.get(f"{BASE_URL}/services/")
|
||||
assert_equals(response.status_code, 200, "Service listing failed")
|
||||
|
||||
services = response.json()
|
||||
assert isinstance(services, list), "Expected list of services"
|
||||
|
||||
# Find our service in the list
|
||||
service_found = False
|
||||
for service in services:
|
||||
if service["name"] == expected_service_name:
|
||||
service_found = True
|
||||
break
|
||||
|
||||
assert (
|
||||
service_found
|
||||
), f"Created service {expected_service_name} not found in list"
|
||||
print("✅ Services listed successfully!")
|
||||
|
||||
|
||||
def test_update_service(service_name: str):
|
||||
"""Test service update."""
|
||||
print("\n🧪 Testing service update...")
|
||||
|
||||
update_payload = {
|
||||
"code": """
|
||||
from fastapi import FastAPI
|
||||
app = FastAPI()
|
||||
|
||||
@app.get("/")
|
||||
def read_root():
|
||||
return {"Hello": "Updated World"}
|
||||
""",
|
||||
"requirements": ["fastapi", "uvicorn"],
|
||||
"name": service_name,
|
||||
"num_cpus": 2,
|
||||
"memory": 4,
|
||||
}
|
||||
|
||||
response = requests.put(
|
||||
f"{BASE_URL}/services/{service_name}",
|
||||
json=update_payload,
|
||||
headers=HEADERS,
|
||||
params={"mode": "gradual"},
|
||||
)
|
||||
|
||||
assert_equals(response.status_code, 200, "Service update failed")
|
||||
print("✅ Service updated successfully!")
|
||||
|
||||
|
||||
def test_delete_service(service_name: str):
|
||||
"""Test service deletion."""
|
||||
print("\n🧪 Testing service deletion...")
|
||||
|
||||
response = requests.delete(f"{BASE_URL}/services/{service_name}")
|
||||
assert_equals(
|
||||
response.status_code, 204, "Service deletion failed"
|
||||
)
|
||||
|
||||
# Verify service is gone
|
||||
list_response = requests.get(f"{BASE_URL}/services/")
|
||||
services = list_response.json()
|
||||
for service in services:
|
||||
if service["name"] == service_name:
|
||||
raise AssertionError(
|
||||
f"Service {service_name} still exists after deletion"
|
||||
)
|
||||
|
||||
print("✅ Service deleted successfully!")
|
||||
|
||||
|
||||
def run_tests():
|
||||
"""Run all tests in sequence."""
|
||||
try:
|
||||
print("🚀 Starting API tests...")
|
||||
|
||||
# Run tests in sequence
|
||||
service_name = test_create_service()
|
||||
|
||||
# Wait a bit for service to be fully ready
|
||||
print("⏳ Waiting for service to be ready...")
|
||||
time.sleep(5)
|
||||
|
||||
test_list_services(service_name)
|
||||
test_update_service(service_name)
|
||||
test_delete_service(service_name)
|
||||
|
||||
print("\n✨ All tests passed successfully! ✨")
|
||||
|
||||
except AssertionError as e:
|
||||
print(f"\n❌ Test failed: {str(e)}")
|
||||
raise
|
||||
except Exception as e:
|
||||
print(f"\n❌ Unexpected error: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
print("\n🏁 Tests completed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_tests()
|
Loading…
Reference in new issue