pull/714/head
Kye Gomez 6 days ago
parent 83f00d95ba
commit 65d5631c37

@ -1,74 +0,0 @@
# Build stage
FROM python:3.11-slim as builder
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir wheel && \
pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt
# Final stage
FROM python:3.11-slim
# Set environment variables
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PATH="/app/venv/bin:$PATH" \
PYTHONPATH=/app \
PORT=8080
# Create app user
RUN useradd -m -s /bin/bash app && \
mkdir -p /app/logs && \
chown -R app:app /app
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy wheels from builder
COPY --from=builder /app/wheels /app/wheels
# Create and activate virtual environment
RUN python -m venv /app/venv && \
/app/venv/bin/pip install --no-cache-dir /app/wheels/*
# Copy application code
COPY --chown=app:app . /app/api
# Switch to app user
USER app
# Create directories for logs
RUN mkdir -p /app/logs
# Required environment variables
ENV SUPABASE_URL="" \
SUPABASE_SERVICE_KEY="" \
ENVIRONMENT="production" \
LOG_LEVEL="info" \
WORKERS=4 \
MAX_REQUESTS_PER_MINUTE=60 \
API_KEY_LENGTH=32
# Expose port
EXPOSE $PORT
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:$PORT/health || exit 1
# Start command
CMD ["sh", "-c", "uvicorn api:app --host 0.0.0.0 --port $PORT --workers $WORKERS --log-level $LOG_LEVEL"]

File diff suppressed because it is too large Load Diff

@ -1,936 +0,0 @@
import os
import secrets
import traceback
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from uuid import UUID, uuid4
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
#consider if the following imports need to be added to the main swarms requirements.txt:
#opentelemetry-api
#opentelemetry-sdk
#opentelemetry-instrumentation-fastapi
#opentelemetry-instrumentation-requests
#opentelemetry-exporter-otlp-proto-grpc
import uvicorn
from dotenv import load_dotenv
from fastapi import (
BackgroundTasks,
Depends,
FastAPI,
Header,
HTTPException,
Query,
Request,
status,
)
from fastapi.middleware.cors import CORSMiddleware
from loguru import logger
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
OTEL_SERVICE_NAME = os.getenv("OTEL_SERVICE_NAME", "swarms-api")
OTEL_EXPORTER_OTLP_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://aws-otel-collector:4317")
# Load environment variables
load_dotenv()
class AgentStatus(str, Enum):
"""Enum for agent status."""
IDLE = "idle"
PROCESSING = "processing"
ERROR = "error"
MAINTENANCE = "maintenance"
# Security configurations
API_KEY_LENGTH = 32 # Length of generated API keys
class APIKey(BaseModel):
key: str
name: str
created_at: datetime
last_used: datetime
is_active: bool = True
class APIKeyCreate(BaseModel):
name: str # A friendly name for the API key
class User(BaseModel):
id: UUID
username: str
is_active: bool = True
is_admin: bool = False
api_keys: Dict[str, APIKey] = {} # key -> APIKey object
class AgentConfig(BaseModel):
"""Configuration model for creating a new agent."""
agent_name: str = Field(..., description="Name of the agent")
model_name: str = Field(
...,
description="Name of the llm you want to use provided by litellm",
)
description: str = Field(
default="", description="Description of the agent's purpose"
)
system_prompt: str = Field(
..., description="System prompt for the agent"
)
model_name: str = Field(
default="gpt-4", description="Model name to use"
)
temperature: float = Field(
default=0.1,
ge=0.0,
le=2.0,
description="Temperature for the model",
)
max_loops: int = Field(
default=1, ge=1, description="Maximum number of loops"
)
autosave: bool = Field(
default=True, description="Enable autosave"
)
dashboard: bool = Field(
default=False, description="Enable dashboard"
)
verbose: bool = Field(
default=True, description="Enable verbose output"
)
dynamic_temperature_enabled: bool = Field(
default=True, description="Enable dynamic temperature"
)
user_name: str = Field(
default="default_user", description="Username for the agent"
)
retry_attempts: int = Field(
default=1, ge=1, description="Number of retry attempts"
)
context_length: int = Field(
default=200000, ge=1000, description="Context length"
)
output_type: str = Field(
default="string", description="Output type (string or json)"
)
streaming_on: bool = Field(
default=False, description="Enable streaming"
)
tags: List[str] = Field(
default_factory=list,
description="Tags for categorizing the agent",
)
class AgentUpdate(BaseModel):
"""Model for updating agent configuration."""
description: Optional[str] = None
system_prompt: Optional[str] = None
temperature: Optional[float] = 0.5
max_loops: Optional[int] = 1
tags: Optional[List[str]] = None
status: Optional[AgentStatus] = None
class AgentSummary(BaseModel):
"""Summary model for agent listing."""
agent_id: UUID
agent_name: str
description: str
created_at: datetime
last_used: datetime
total_completions: int
tags: List[str]
status: AgentStatus
class AgentMetrics(BaseModel):
"""Model for agent performance metrics."""
total_completions: int
average_response_time: float
error_rate: float
last_24h_completions: int
total_tokens_used: int
uptime_percentage: float
success_rate: float
peak_tokens_per_minute: int
class CompletionRequest(BaseModel):
"""Model for completion requests."""
prompt: str = Field(..., description="The prompt to process")
agent_id: UUID = Field(..., description="ID of the agent to use")
max_tokens: Optional[int] = Field(
None, description="Maximum tokens to generate"
)
temperature_override: Optional[float] = 0.5
stream: bool = Field(
default=False, description="Enable streaming response"
)
class CompletionResponse(BaseModel):
"""Model for completion responses."""
agent_id: UUID
response: str
metadata: Dict[str, Any]
timestamp: datetime
processing_time: float
token_usage: Dict[str, int]
class AgentStore:
"""Enhanced store for managing agents."""
def __init__(self):
self.agents: Dict[UUID, Agent] = {}
self.agent_metadata: Dict[UUID, Dict[str, Any]] = {}
self.users: Dict[UUID, User] = {} # user_id -> User
self.api_keys: Dict[str, UUID] = {} # api_key -> user_id
self.user_agents: Dict[UUID, List[UUID]] = (
{}
) # user_id -> [agent_ids]
self.executor = ThreadPoolExecutor(max_workers=4)
self._ensure_directories()
def _ensure_directories(self):
"""Ensure required directories exist."""
Path("logs").mkdir(exist_ok=True)
Path("states").mkdir(exist_ok=True)
def create_api_key(self, user_id: UUID, key_name: str) -> APIKey:
"""Create a new API key for a user."""
if user_id not in self.users:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Generate a secure random API key
api_key = secrets.token_urlsafe(API_KEY_LENGTH)
# Create the API key object
key_object = APIKey(
key=api_key,
name=key_name,
created_at=datetime.utcnow(),
last_used=datetime.utcnow(),
)
# Store the API key
self.users[user_id].api_keys[api_key] = key_object
self.api_keys[api_key] = user_id
return key_object
async def verify_agent_access(
self, agent_id: UUID, user_id: UUID
) -> bool:
"""Verify if a user has access to an agent."""
if agent_id not in self.agents:
return False
return (
self.agent_metadata[agent_id]["owner_id"] == user_id
or self.users[user_id].is_admin
)
def validate_api_key(self, api_key: str) -> Optional[UUID]:
"""Validate an API key and return the associated user ID."""
user_id = self.api_keys.get(api_key)
if not user_id or api_key not in self.users[user_id].api_keys:
return None
key_object = self.users[user_id].api_keys[api_key]
if not key_object.is_active:
return None
# Update last used timestamp
key_object.last_used = datetime.utcnow()
return user_id
async def create_agent(
self, config: AgentConfig, user_id: UUID
) -> UUID:
"""Create a new agent with the given configuration."""
try:
agent = Agent(
agent_name=config.agent_name,
system_prompt=config.system_prompt,
model_name=config.model_name,
max_loops=config.max_loops,
autosave=config.autosave,
dashboard=config.dashboard,
verbose=config.verbose,
dynamic_temperature_enabled=True,
saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
user_name=config.user_name,
retry_attempts=config.retry_attempts,
context_length=config.context_length,
return_step_meta=True,
output_type="str",
streaming_on=config.streaming_on,
)
agent_id = uuid4()
self.agents[agent_id] = agent
self.agent_metadata[agent_id] = {
"description": config.description,
"created_at": datetime.utcnow(),
"last_used": datetime.utcnow(),
"total_completions": 0,
"tags": config.tags,
"total_tokens": 0,
"error_count": 0,
"response_times": [],
"status": AgentStatus.IDLE,
"start_time": datetime.utcnow(),
"downtime": timedelta(),
"successful_completions": 0,
}
# Add to user's agents list
if user_id not in self.user_agents:
self.user_agents[user_id] = []
self.user_agents[user_id].append(agent_id)
return agent_id
except Exception as e:
logger.error(f"Error creating agent: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create agent: {str(e)}",
)
async def get_agent(self, agent_id: UUID) -> Agent:
"""Retrieve an agent by ID."""
agent = self.agents.get(agent_id)
if not agent:
logger.error(f"Agent not found: {agent_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Agent {agent_id} not found",
)
return agent
async def update_agent(
self, agent_id: UUID, update: AgentUpdate
) -> None:
"""Update agent configuration."""
agent = await self.get_agent(agent_id)
metadata = self.agent_metadata[agent_id]
if update.system_prompt:
agent.system_prompt = update.system_prompt
if update.max_loops is not None:
agent.max_loops = update.max_loops
if update.tags is not None:
metadata["tags"] = update.tags
if update.description is not None:
metadata["description"] = update.description
if update.status is not None:
metadata["status"] = update.status
if update.status == AgentStatus.MAINTENANCE:
metadata["downtime"] += (
datetime.utcnow() - metadata["last_used"]
)
logger.info(f"Updated agent {agent_id}")
async def list_agents(
self,
tags: Optional[List[str]] = None,
status: Optional[AgentStatus] = None,
) -> List[AgentSummary]:
"""List all agents, optionally filtered by tags and status."""
summaries = []
for agent_id, agent in self.agents.items():
metadata = self.agent_metadata[agent_id]
# Apply filters
if tags and not any(
tag in metadata["tags"] for tag in tags
):
continue
if status and metadata["status"] != status:
continue
summaries.append(
AgentSummary(
agent_id=agent_id,
agent_name=agent.agent_name,
description=metadata["description"],
created_at=metadata["created_at"],
last_used=metadata["last_used"],
total_completions=metadata["total_completions"],
tags=metadata["tags"],
status=metadata["status"],
)
)
return summaries
async def get_agent_metrics(self, agent_id: UUID) -> AgentMetrics:
"""Get performance metrics for an agent."""
metadata = self.agent_metadata[agent_id]
response_times = metadata["response_times"]
# Calculate metrics
total_time = datetime.utcnow() - metadata["start_time"]
uptime = total_time - metadata["downtime"]
uptime_percentage = (
uptime.total_seconds() / total_time.total_seconds()
) * 100
success_rate = (
metadata["successful_completions"]
/ metadata["total_completions"]
* 100
if metadata["total_completions"] > 0
else 0
)
return AgentMetrics(
total_completions=metadata["total_completions"],
average_response_time=(
sum(response_times) / len(response_times)
if response_times
else 0
),
error_rate=(
metadata["error_count"]
/ metadata["total_completions"]
if metadata["total_completions"] > 0
else 0
),
last_24h_completions=sum(
1
for t in response_times
if (datetime.utcnow() - t).days < 1
),
total_tokens_used=metadata["total_tokens"],
uptime_percentage=uptime_percentage,
success_rate=success_rate,
peak_tokens_per_minute=max(
metadata.get("tokens_per_minute", [0])
),
)
async def clone_agent(
self, agent_id: UUID, new_name: str
) -> UUID:
"""Clone an existing agent with a new name."""
original_agent = await self.get_agent(agent_id)
original_metadata = self.agent_metadata[agent_id]
config = AgentConfig(
agent_name=new_name,
description=f"Clone of {original_agent.agent_name}",
system_prompt=original_agent.system_prompt,
model_name=original_agent.model_name,
temperature=0.5,
max_loops=original_agent.max_loops,
tags=original_metadata["tags"],
)
return await self.create_agent(config)
async def delete_agent(self, agent_id: UUID) -> None:
"""Delete an agent."""
if agent_id not in self.agents:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Agent {agent_id} not found",
)
# Clean up any resources
agent = self.agents[agent_id]
if agent.autosave and os.path.exists(agent.saved_state_path):
os.remove(agent.saved_state_path)
del self.agents[agent_id]
del self.agent_metadata[agent_id]
logger.info(f"Deleted agent {agent_id}")
async def process_completion(
self,
agent: Agent,
prompt: str,
agent_id: UUID,
max_tokens: Optional[int] = None,
temperature_override: Optional[float] = None,
) -> CompletionResponse:
"""Process a completion request using the specified agent."""
# TELEMETRY CHANGE 6: Initialize tracer for this module
tracer = trace.get_tracer(__name__)
# TELEMETRY CHANGE 7: Create parent span for entire completion process
with tracer.start_as_current_span("process_completion") as span:
# TELEMETRY CHANGE 8: Add context attributes
span.set_attribute("agent.id", str(agent_id))
span.set_attribute("agent.name", agent.agent_name)
span.set_attribute("prompt.length", len(prompt))
if max_tokens:
span.set_attribute("max_tokens", max_tokens)
start_time = datetime.utcnow()
metadata = self.agent_metadata[agent_id]
try:
with tracer.start_span("update_agent_status") as status_span:
metadata["status"] = AgentStatus.PROCESSING
metadata["last_used"] = start_time
status_span.set_attribute("agent.status", AgentStatus.PROCESSING.value)
with tracer.start_span("process_agent_completion") as completion_span:
response = agent.run(prompt)
completion_span.set_attribute("completion.success", True)
with tracer.start_span("update_metrics") as metrics_span:
processing_time = (datetime.utcnow() - start_time).total_seconds()
metadata["response_times"].append(processing_time)
metadata["total_completions"] += 1
metadata["successful_completions"] += 1
prompt_tokens = len(prompt.split()) * 1.3
completion_tokens = len(response.split()) * 1.3
total_tokens = int(prompt_tokens + completion_tokens)
metadata["total_tokens"] += total_tokens
metrics_span.set_attribute("processing.time", processing_time)
metrics_span.set_attribute("tokens.total", total_tokens)
metrics_span.set_attribute("tokens.prompt", int(prompt_tokens))
metrics_span.set_attribute("tokens.completion", int(completion_tokens))
with tracer.start_span("update_token_tracking") as token_span:
current_minute = datetime.utcnow().replace(second=0, microsecond=0)
if "tokens_per_minute" not in metadata:
metadata["tokens_per_minute"] = {}
metadata["tokens_per_minute"][current_minute] = (
metadata["tokens_per_minute"].get(current_minute, 0) + total_tokens
)
token_span.set_attribute("tokens.per_minute",
metadata["tokens_per_minute"][current_minute])
completion_response = CompletionResponse(
agent_id=agent_id,
response=response,
metadata={
"agent_name": agent.agent_name,
},
timestamp=datetime.utcnow(),
processing_time=processing_time,
token_usage={
"prompt_tokens": int(prompt_tokens),
"completion_tokens": int(completion_tokens),
"total_tokens": total_tokens,
},
)
# TELEMETRY CHANGE 10: Detailed error tracking
span.set_attribute("completion.status", "success")
return completion_response
except Exception as e:
metadata["error_count"] += 1
metadata["status"] = AgentStatus.ERROR
# TELEMETRY CHANGE 11: Detailed error recording
span.set_attribute("completion.status", "error")
span.set_attribute("error.type", e.__class__.__name__)
span.set_attribute("error.message", str(e))
span.record_exception(e)
logger.error(
f"Error in completion processing: {str(e)}\n{traceback.format_exc()}"
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error processing completion: {str(e)}",
)
finally:
metadata["status"] = AgentStatus.IDLE
span.set_attribute("agent.final_status", AgentStatus.IDLE.value)
class StoreManager:
_instance = None
@classmethod
def get_instance(cls) -> "AgentStore":
if cls._instance is None:
cls._instance = AgentStore()
return cls._instance
# Modify the dependency function
def get_store() -> AgentStore:
"""Dependency to get the AgentStore instance."""
return StoreManager.get_instance()
# Security utility function using the new dependency
async def get_current_user(
api_key: str = Header(
..., description="API key for authentication"
),
store: AgentStore = Depends(get_store),
) -> User:
"""Validate API key and return current user."""
user_id = store.validate_api_key(api_key)
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired API key",
headers={"WWW-Authenticate": "ApiKey"},
)
return store.users[user_id]
class SwarmsAPI:
"""Enhanced API class for Swarms agent integration."""
def __init__(self):
self.app = FastAPI(
title="Swarms Agent API",
description="Production-grade API for Swarms agent interaction",
version="1.0.0",
docs_url="/v1/docs",
redoc_url="/v1/redoc",
)
# Initialize the store using the singleton manager
self.store = StoreManager.get_instance()
# Configure CORS
self.app.add_middleware(
CORSMiddleware,
allow_origins=[
"*"
], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
self._setup_routes()
def _setup_routes(self):
"""Set up API routes."""
# In your API code
@self.app.post("/v1/users", response_model=Dict[str, Any])
async def create_user(request: Request):
"""Create a new user and initial API key."""
try:
body = await request.json()
username = body.get("username")
if not username or len(username) < 3:
raise HTTPException(
status_code=400, detail="Invalid username"
)
user_id = uuid4()
user = User(id=user_id, username=username)
self.store.users[user_id] = user
initial_key = self.store.create_api_key(
user_id, "Initial Key"
)
return {
"user_id": user_id,
"api_key": initial_key.key,
}
except Exception as e:
logger.error(f"Error creating user: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
@self.app.post(
"/v1/users/{user_id}/api-keys", response_model=APIKey
)
async def create_api_key(
user_id: UUID,
key_create: APIKeyCreate,
current_user: User = Depends(get_current_user),
):
"""Create a new API key for a user."""
if (
current_user.id != user_id
and not current_user.is_admin
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to create API keys for this user",
)
return self.store.create_api_key(user_id, key_create.name)
@self.app.get(
"/v1/users/{user_id}/api-keys",
response_model=List[APIKey],
)
async def list_api_keys(
user_id: UUID,
current_user: User = Depends(get_current_user),
):
"""List all API keys for a user."""
if (
current_user.id != user_id
and not current_user.is_admin
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to view API keys for this user",
)
return list(self.store.users[user_id].api_keys.values())
@self.app.delete("/v1/users/{user_id}/api-keys/{key}")
async def revoke_api_key(
user_id: UUID,
key: str,
current_user: User = Depends(get_current_user),
):
"""Revoke an API key."""
if (
current_user.id != user_id
and not current_user.is_admin
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to revoke API keys for this user",
)
if key in self.store.users[user_id].api_keys:
self.store.users[user_id].api_keys[
key
].is_active = False
del self.store.api_keys[key]
return {"status": "API key revoked"}
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="API key not found",
)
@self.app.get(
"/v1/users/me/agents", response_model=List[AgentSummary]
)
async def list_user_agents(
current_user: User = Depends(get_current_user),
tags: Optional[List[str]] = Query(None),
status: Optional[AgentStatus] = None,
):
"""List all agents owned by the current user."""
user_agents = self.store.user_agents.get(
current_user.id, []
)
return [
agent
for agent in await self.store.list_agents(
tags, status
)
if agent.agent_id in user_agents
]
# Modify existing routes to use API key authentication
@self.app.post("/v1/agent", response_model=Dict[str, UUID])
async def create_agent(
config: AgentConfig,
current_user: User = Depends(get_current_user),
):
"""Create a new agent with the specified configuration."""
agent_id = await self.store.create_agent(
config, current_user.id
)
return {"agent_id": agent_id}
@self.app.get("/v1/agents", response_model=List[AgentSummary])
async def list_agents(
tags: Optional[List[str]] = Query(None),
status: Optional[AgentStatus] = None,
):
"""List all agents, optionally filtered by tags and status."""
return await self.store.list_agents(tags, status)
@self.app.patch(
"/v1/agent/{agent_id}", response_model=Dict[str, str]
)
async def update_agent(agent_id: UUID, update: AgentUpdate):
"""Update an existing agent's configuration."""
await self.store.update_agent(agent_id, update)
return {"status": "updated"}
@self.app.get(
"/v1/agent/{agent_id}/metrics",
response_model=AgentMetrics,
)
async def get_agent_metrics(agent_id: UUID):
"""Get performance metrics for a specific agent."""
return await self.store.get_agent_metrics(agent_id)
@self.app.post(
"/v1/agent/{agent_id}/clone",
response_model=Dict[str, UUID],
)
async def clone_agent(agent_id: UUID, new_name: str):
"""Clone an existing agent with a new name."""
new_id = await self.store.clone_agent(agent_id, new_name)
return {"agent_id": new_id}
@self.app.delete("/v1/agent/{agent_id}")
async def delete_agent(agent_id: UUID):
"""Delete an agent."""
await self.store.delete_agent(agent_id)
return {"status": "deleted"}
@self.app.post(
"/v1/agent/completions", response_model=CompletionResponse
)
async def create_completion(
request: CompletionRequest,
background_tasks: BackgroundTasks,
):
"""Process a completion request with the specified agent."""
try:
agent = await self.store.get_agent(request.agent_id)
# Process completion
response = await self.store.process_completion(
agent,
request.prompt,
request.agent_id,
request.max_tokens,
0.5,
)
# Schedule background cleanup
background_tasks.add_task(
self._cleanup_old_metrics, request.agent_id
)
return response
except Exception as e:
logger.error(f"Error processing completion: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error processing completion: {str(e)}",
)
@self.app.get("/v1/agent/{agent_id}/status")
async def get_agent_status(agent_id: UUID):
"""Get the current status of an agent."""
metadata = self.store.agent_metadata.get(agent_id)
if not metadata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Agent {agent_id} not found",
)
return {
"agent_id": agent_id,
"status": metadata["status"],
"last_used": metadata["last_used"],
"total_completions": metadata["total_completions"],
"error_count": metadata["error_count"],
}
async def _cleanup_old_metrics(self, agent_id: UUID):
"""Clean up old metrics data to prevent memory bloat."""
metadata = self.store.agent_metadata.get(agent_id)
if metadata:
# Keep only last 24 hours of response times
cutoff = datetime.utcnow() - timedelta(days=1)
metadata["response_times"] = [
t
for t in metadata["response_times"]
if isinstance(t, (int, float))
and t > cutoff.timestamp()
]
# Clean up old tokens per minute data
if "tokens_per_minute" in metadata:
metadata["tokens_per_minute"] = {
k: v
for k, v in metadata["tokens_per_minute"].items()
if k > cutoff
}
@app.middleware("http")
async def add_trace_context(request: Request, call_next):
span = trace.get_current_span()
span.set_attribute("http.url", str(request.url))
span.set_attribute("http.method", request.method)
response = await call_next(request)
span.set_attribute("http.status_code", response.status_code)
return response
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
logger.info("Creating FastAPI application")
# TELEMETRY CHANGE 1: Configure OpenTelemetry resource with service name
resource = Resource.create({"service.name": "swarms-api"})
trace.set_tracer_provider(TracerProvider(resource=resource))
# TELEMETRY CHANGE 2: Set up OTLP exporter for AWS
otlp_exporter = OTLPSpanExporter(
endpoint="http://aws-otel-collector:4317", # AWS OpenTelemetry Collector endpoint
insecure=True
)
# TELEMETRY CHANGE 3: Configure batch processing of spans
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
api = SwarmsAPI()
app = api.app
# TELEMETRY CHANGE 4: Instrument FastAPI framework
FastAPIInstrumentor.instrument_app(app)
# TELEMETRY CHANGE 5: Instrument HTTP client library
RequestsInstrumentor().instrument()
logger.info("FastAPI application created successfully")
return app
app = create_app()
if __name__ == "__main__":
try:
logger.info("Starting API server...")
print("Starting API server on http://0.0.0.0:8000")
uvicorn.run(
app, # Pass the app instance directly
host="0.0.0.0",
port=8000,
log_level="info",
)
except Exception as e:
logger.error(f"Failed to start API: {str(e)}")
print(f"Error starting server: {str(e)}")

File diff suppressed because it is too large Load Diff

@ -1,13 +0,0 @@
fastapi
uvicorn
pydantic
loguru
python-dotenv
swarms # Specify the version or source if it's not on PyPI
opentelemetry-api
opentelemetry-sdk
opentelemetry-instrumentation-fastapi
opentelemetry-instrumentation-requests
opentelemetry-exporter-otlp-proto-grpc
swarms
supabase

@ -1,43 +0,0 @@
name: agentapi
service:
readiness_probe:
path: /docs
initial_delay_seconds: 300
timeout_seconds: 30
replica_policy:
min_replicas: 1
max_replicas: 50
target_qps_per_replica: 5
upscale_delay_seconds: 180
downscale_delay_seconds: 600
envs:
WORKSPACE_DIR: "agent_workspace"
OPENAI_API_KEY: ""
resources:
ports: 8000 # FastAPI default port
cpus: 16
memory: 64
disk_size: 50
use_spot: true
workdir: .
setup: |
git clone https://github.com/kyegomez/swarms.git
cd swarms/api
pip install -r requirements.txt
pip install swarms
run: |
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# env:
# PYTHONPATH: /app/swarms
# LOG_LEVEL: "INFO"
# # MAX_WORKERS: "4"

@ -1,327 +0,0 @@
import asyncio
import json
import os
import sys
from typing import Any, Dict
import aiohttp
from loguru import logger
# Configure loguru
LOG_PATH = "api_tests.log"
logger.add(
LOG_PATH,
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
rotation="1 day",
retention="7 days",
level="DEBUG",
)
BASE_URL = (
"https://api.swarms.ai/v1" # Change this to match your server URL
)
async def log_request_details(
method: str, url: str, headers: dict, data: Any = None
):
"""Log request details before sending."""
logger.debug(f"\n{'='*50}")
logger.debug(f"REQUEST: {method} {url}")
logger.debug(f"HEADERS: {json.dumps(headers, indent=2)}")
if data:
logger.debug(f"PAYLOAD: {json.dumps(data, indent=2)}")
async def log_response_details(
response: aiohttp.ClientResponse, data: Any = None
):
"""Log response details after receiving."""
logger.debug(f"\nRESPONSE Status: {response.status}")
logger.debug(
f"RESPONSE Headers: {json.dumps(dict(response.headers), indent=2)}"
)
if data:
logger.debug(f"RESPONSE Body: {json.dumps(data, indent=2)}")
logger.debug(f"{'='*50}\n")
async def test_create_user(
session: aiohttp.ClientSession,
) -> Dict[str, str]:
"""Test user creation endpoint."""
url = f"{BASE_URL}/users"
payload = {"username": "test_user"}
logger.info("Testing user creation...")
await log_request_details("POST", url, {}, payload)
try:
async with session.post(url, json=payload) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(
f"Failed to create user. Status: {response.status}, Response: {data}"
)
sys.exit(1)
logger.success("✓ Created user successfully")
return {
"user_id": data["user_id"],
"api_key": data["api_key"],
}
except Exception as e:
logger.exception(f"Exception in user creation: {str(e)}")
sys.exit(1)
async def test_create_agent(
session: aiohttp.ClientSession, api_key: str
) -> str:
"""Test agent creation endpoint."""
url = f"{BASE_URL}/agent"
config = {
"agent_name": "test_agent",
"system_prompt": "You are a helpful test agent",
"model_name": "gpt-4",
"description": "Test agent for API validation",
"max_loops": 1,
"temperature": 0.5,
"tags": ["test"],
"streaming_on": False,
"user_name": "test_user", # Added required field
"output_type": "string", # Added required field
}
headers = {"api-key": api_key}
logger.info("Testing agent creation...")
await log_request_details("POST", url, headers, config)
try:
async with session.post(
url, headers=headers, json=config
) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(
f"Failed to create agent. Status: {response.status}, Response: {data}"
)
return None
logger.success("✓ Created agent successfully")
return data["agent_id"]
except Exception as e:
logger.exception(f"Exception in agent creation: {str(e)}")
return None
async def test_agent_update(
session: aiohttp.ClientSession, agent_id: str, api_key: str
):
"""Test agent update endpoint."""
url = f"{BASE_URL}/agent/{agent_id}"
update_data = {
"description": "Updated test agent",
"system_prompt": "Updated system prompt",
"temperature": 0.7,
"tags": ["test", "updated"],
}
headers = {"api-key": api_key}
logger.info(f"Testing agent update for agent {agent_id}...")
await log_request_details("PATCH", url, headers, update_data)
try:
async with session.patch(
url, headers=headers, json=update_data
) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(
f"Failed to update agent. Status: {response.status}, Response: {data}"
)
return False
logger.success("✓ Updated agent successfully")
return True
except Exception as e:
logger.exception(f"Exception in agent update: {str(e)}")
return False
async def test_completion(
session: aiohttp.ClientSession, agent_id: str, api_key: str
):
"""Test completion endpoint."""
url = f"{BASE_URL}/agent/completions"
completion_request = {
"prompt": "Hello, how are you?",
"agent_id": agent_id,
"max_tokens": 100,
"stream": False,
}
headers = {"api-key": api_key}
logger.info(f"Testing completion for agent {agent_id}...")
await log_request_details(
"POST", url, headers, completion_request
)
try:
async with session.post(
url, headers=headers, json=completion_request
) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(
f"Failed to process completion. Status: {response.status}, Response: {data}"
)
return False
logger.success("✓ Processed completion successfully")
return True
except Exception as e:
logger.exception(
f"Exception in completion processing: {str(e)}"
)
return False
async def test_get_metrics(
session: aiohttp.ClientSession, agent_id: str, api_key: str
):
"""Test metrics endpoint."""
url = f"{BASE_URL}/agent/{agent_id}/metrics"
headers = {"api-key": api_key}
logger.info(f"Testing metrics retrieval for agent {agent_id}...")
await log_request_details("GET", url, headers)
try:
async with session.get(url, headers=headers) as response:
data = await response.json()
await log_response_details(response, data)
if response.status != 200:
logger.error(
f"Failed to get metrics. Status: {response.status}, Response: {data}"
)
return False
logger.success("✓ Retrieved metrics successfully")
return True
except Exception as e:
logger.exception(f"Exception in metrics retrieval: {str(e)}")
return False
async def run_tests():
"""Run all API tests."""
logger.info("Starting API test suite...")
logger.info(f"Using base URL: {BASE_URL}")
timeout = aiohttp.ClientTimeout(total=30) # 30 second timeout
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
# Create test user
user_data = await test_create_user(session)
if not user_data:
logger.error("User creation failed, stopping tests.")
return
logger.info(
"User created successfully, proceeding with agent tests..."
)
user_data["user_id"]
api_key = user_data["api_key"]
# Create test agent
agent_id = await test_create_agent(session, api_key)
if not agent_id:
logger.error("Agent creation failed, stopping tests.")
return
logger.info(
"Agent created successfully, proceeding with other tests..."
)
# Run remaining tests
test_results = []
# Test metrics retrieval
logger.info("Testing metrics retrieval...")
metrics_result = await test_get_metrics(
session, agent_id, api_key
)
test_results.append(("Metrics", metrics_result))
# Test agent update
logger.info("Testing agent update...")
update_result = await test_agent_update(
session, agent_id, api_key
)
test_results.append(("Agent Update", update_result))
# Test completion
logger.info("Testing completion...")
completion_result = await test_completion(
session, agent_id, api_key
)
test_results.append(("Completion", completion_result))
# Log final results
logger.info("\nTest Results Summary:")
all_passed = True
for test_name, result in test_results:
status = "PASSED" if result else "FAILED"
logger.info(f"{test_name}: {status}")
if not result:
all_passed = False
if all_passed:
logger.success(
"\n🎉 All tests completed successfully!"
)
else:
logger.error(
"\n❌ Some tests failed. Check the logs for details."
)
logger.info(
f"\nDetailed logs available at: {os.path.abspath(LOG_PATH)}"
)
except Exception as e:
logger.exception(
f"Unexpected error during test execution: {str(e)}"
)
raise
finally:
logger.info("Test suite execution completed.")
def main():
logger.info("=" * 50)
logger.info("API TEST SUITE EXECUTION")
logger.info("=" * 50)
try:
asyncio.run(run_tests())
except KeyboardInterrupt:
logger.warning("Test execution interrupted by user.")
except Exception:
logger.exception("Fatal error in test execution:")
finally:
logger.info("Test suite shutdown complete.")
if __name__ == "__main__":
main()
Loading…
Cancel
Save