parent
55018c636a
commit
449b2db79e
@ -0,0 +1,638 @@
|
||||
import os
|
||||
from fastapi import (
|
||||
FastAPI,
|
||||
HTTPException,
|
||||
status,
|
||||
Query,
|
||||
BackgroundTasks,
|
||||
)
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import Optional, Dict, Any, List
|
||||
from loguru import logger
|
||||
import uvicorn
|
||||
from datetime import datetime, timedelta
|
||||
from uuid import UUID, uuid4
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import traceback
|
||||
|
||||
from swarms import Agent
|
||||
from dotenv import load_dotenv
|
||||
|
||||
print ("starting")
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Configure Loguru
|
||||
logger.add(
|
||||
"logs/api_{time}.log",
|
||||
rotation="500 MB",
|
||||
retention="10 days",
|
||||
level="INFO",
|
||||
format="{time} {level} {message}",
|
||||
backtrace=True,
|
||||
diagnose=True,
|
||||
)
|
||||
|
||||
|
||||
class AgentStatus(str, Enum):
|
||||
"""Enum for agent status."""
|
||||
|
||||
IDLE = "idle"
|
||||
PROCESSING = "processing"
|
||||
ERROR = "error"
|
||||
MAINTENANCE = "maintenance"
|
||||
|
||||
|
||||
class AgentConfig(BaseModel):
|
||||
"""Configuration model for creating a new agent."""
|
||||
|
||||
agent_name: str = Field(..., description="Name of the agent")
|
||||
model_name: str = Field(
|
||||
...,
|
||||
description="Name of the llm you want to use provided by litellm",
|
||||
)
|
||||
description: str = Field(
|
||||
default="", description="Description of the agent's purpose"
|
||||
)
|
||||
system_prompt: str = Field(
|
||||
..., description="System prompt for the agent"
|
||||
)
|
||||
model_name: str = Field(
|
||||
default="gpt-4", description="Model name to use"
|
||||
)
|
||||
temperature: float = Field(
|
||||
default=0.1,
|
||||
ge=0.0,
|
||||
le=2.0,
|
||||
description="Temperature for the model",
|
||||
)
|
||||
max_loops: int = Field(
|
||||
default=1, ge=1, description="Maximum number of loops"
|
||||
)
|
||||
autosave: bool = Field(
|
||||
default=True, description="Enable autosave"
|
||||
)
|
||||
dashboard: bool = Field(
|
||||
default=False, description="Enable dashboard"
|
||||
)
|
||||
verbose: bool = Field(
|
||||
default=True, description="Enable verbose output"
|
||||
)
|
||||
dynamic_temperature_enabled: bool = Field(
|
||||
default=True, description="Enable dynamic temperature"
|
||||
)
|
||||
user_name: str = Field(
|
||||
default="default_user", description="Username for the agent"
|
||||
)
|
||||
retry_attempts: int = Field(
|
||||
default=1, ge=1, description="Number of retry attempts"
|
||||
)
|
||||
context_length: int = Field(
|
||||
default=200000, ge=1000, description="Context length"
|
||||
)
|
||||
output_type: str = Field(
|
||||
default="string", description="Output type (string or json)"
|
||||
)
|
||||
streaming_on: bool = Field(
|
||||
default=False, description="Enable streaming"
|
||||
)
|
||||
tags: List[str] = Field(
|
||||
default_factory=list,
|
||||
description="Tags for categorizing the agent",
|
||||
)
|
||||
|
||||
|
||||
class AgentUpdate(BaseModel):
|
||||
"""Model for updating agent configuration."""
|
||||
|
||||
description: Optional[str] = None
|
||||
system_prompt: Optional[str] = None
|
||||
temperature: Optional[float] = None
|
||||
max_loops: Optional[int] = None
|
||||
tags: Optional[List[str]] = None
|
||||
status: Optional[AgentStatus] = None
|
||||
|
||||
|
||||
class AgentSummary(BaseModel):
|
||||
"""Summary model for agent listing."""
|
||||
|
||||
agent_id: UUID
|
||||
agent_name: str
|
||||
description: str
|
||||
created_at: datetime
|
||||
last_used: datetime
|
||||
total_completions: int
|
||||
tags: List[str]
|
||||
status: AgentStatus
|
||||
|
||||
|
||||
class AgentMetrics(BaseModel):
|
||||
"""Model for agent performance metrics."""
|
||||
|
||||
total_completions: int
|
||||
average_response_time: float
|
||||
error_rate: float
|
||||
last_24h_completions: int
|
||||
total_tokens_used: int
|
||||
uptime_percentage: float
|
||||
success_rate: float
|
||||
peak_tokens_per_minute: int
|
||||
|
||||
|
||||
class CompletionRequest(BaseModel):
|
||||
"""Model for completion requests."""
|
||||
|
||||
prompt: str = Field(..., description="The prompt to process")
|
||||
agent_id: UUID = Field(..., description="ID of the agent to use")
|
||||
max_tokens: Optional[int] = Field(
|
||||
None, description="Maximum tokens to generate"
|
||||
)
|
||||
temperature_override: Optional[float] = None
|
||||
stream: bool = Field(
|
||||
default=False, description="Enable streaming response"
|
||||
)
|
||||
|
||||
|
||||
class CompletionResponse(BaseModel):
|
||||
"""Model for completion responses."""
|
||||
|
||||
agent_id: UUID
|
||||
response: str
|
||||
metadata: Dict[str, Any]
|
||||
timestamp: datetime
|
||||
processing_time: float
|
||||
token_usage: Dict[str, int]
|
||||
|
||||
|
||||
class AgentStore:
|
||||
"""Enhanced store for managing agents."""
|
||||
|
||||
def __init__(self):
|
||||
self.agents: Dict[UUID, Agent] = {}
|
||||
self.agent_metadata: Dict[UUID, Dict[str, Any]] = {}
|
||||
self.executor = ThreadPoolExecutor(max_workers=4)
|
||||
self._ensure_directories()
|
||||
|
||||
def _ensure_directories(self):
|
||||
"""Ensure required directories exist."""
|
||||
Path("logs").mkdir(exist_ok=True)
|
||||
Path("states").mkdir(exist_ok=True)
|
||||
|
||||
async def create_agent(self, config: AgentConfig) -> UUID:
|
||||
"""Create a new agent with the given configuration."""
|
||||
try:
|
||||
|
||||
agent = Agent(
|
||||
agent_name=config.agent_name,
|
||||
system_prompt=config.system_prompt,
|
||||
model_name=config.model_name,
|
||||
max_loops=config.max_loops,
|
||||
autosave=config.autosave,
|
||||
dashboard=config.dashboard,
|
||||
verbose=config.verbose,
|
||||
dynamic_temperature_enabled=config.dynamic_temperature_enabled,
|
||||
saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
|
||||
user_name=config.user_name,
|
||||
retry_attempts=config.retry_attempts,
|
||||
context_length=config.context_length,
|
||||
return_step_meta=True,
|
||||
output_type="str",
|
||||
streaming_on=config.streaming_on,
|
||||
)
|
||||
|
||||
agent_id = uuid4()
|
||||
self.agents[agent_id] = agent
|
||||
self.agent_metadata[agent_id] = {
|
||||
"description": config.description,
|
||||
"created_at": datetime.utcnow(),
|
||||
"last_used": datetime.utcnow(),
|
||||
"total_completions": 0,
|
||||
"tags": config.tags,
|
||||
"total_tokens": 0,
|
||||
"error_count": 0,
|
||||
"response_times": [],
|
||||
"status": AgentStatus.IDLE,
|
||||
"start_time": datetime.utcnow(),
|
||||
"downtime": timedelta(),
|
||||
"successful_completions": 0,
|
||||
}
|
||||
|
||||
logger.info(f"Created agent with ID: {agent_id}")
|
||||
return agent_id
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating agent: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to create agent: {str(e)}",
|
||||
)
|
||||
|
||||
async def get_agent(self, agent_id: UUID) -> Agent:
|
||||
"""Retrieve an agent by ID."""
|
||||
agent = self.agents.get(agent_id)
|
||||
if not agent:
|
||||
logger.error(f"Agent not found: {agent_id}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Agent {agent_id} not found",
|
||||
)
|
||||
return agent
|
||||
|
||||
async def update_agent(
|
||||
self, agent_id: UUID, update: AgentUpdate
|
||||
) -> None:
|
||||
"""Update agent configuration."""
|
||||
agent = await self.get_agent(agent_id)
|
||||
metadata = self.agent_metadata[agent_id]
|
||||
|
||||
if update.system_prompt:
|
||||
agent.system_prompt = update.system_prompt
|
||||
if update.temperature is not None:
|
||||
agent.llm.temperature = update.temperature
|
||||
if update.max_loops is not None:
|
||||
agent.max_loops = update.max_loops
|
||||
if update.tags is not None:
|
||||
metadata["tags"] = update.tags
|
||||
if update.description is not None:
|
||||
metadata["description"] = update.description
|
||||
if update.status is not None:
|
||||
metadata["status"] = update.status
|
||||
if update.status == AgentStatus.MAINTENANCE:
|
||||
metadata["downtime"] += (
|
||||
datetime.utcnow() - metadata["last_used"]
|
||||
)
|
||||
|
||||
logger.info(f"Updated agent {agent_id}")
|
||||
|
||||
async def list_agents(
|
||||
self,
|
||||
tags: Optional[List[str]] = None,
|
||||
status: Optional[AgentStatus] = None,
|
||||
) -> List[AgentSummary]:
|
||||
"""List all agents, optionally filtered by tags and status."""
|
||||
summaries = []
|
||||
for agent_id, agent in self.agents.items():
|
||||
metadata = self.agent_metadata[agent_id]
|
||||
|
||||
# Apply filters
|
||||
if tags and not any(
|
||||
tag in metadata["tags"] for tag in tags
|
||||
):
|
||||
continue
|
||||
if status and metadata["status"] != status:
|
||||
continue
|
||||
|
||||
summaries.append(
|
||||
AgentSummary(
|
||||
agent_id=agent_id,
|
||||
agent_name=agent.agent_name,
|
||||
description=metadata["description"],
|
||||
created_at=metadata["created_at"],
|
||||
last_used=metadata["last_used"],
|
||||
total_completions=metadata["total_completions"],
|
||||
tags=metadata["tags"],
|
||||
status=metadata["status"],
|
||||
)
|
||||
)
|
||||
return summaries
|
||||
|
||||
async def get_agent_metrics(self, agent_id: UUID) -> AgentMetrics:
|
||||
"""Get performance metrics for an agent."""
|
||||
metadata = self.agent_metadata[agent_id]
|
||||
response_times = metadata["response_times"]
|
||||
|
||||
# Calculate metrics
|
||||
total_time = datetime.utcnow() - metadata["start_time"]
|
||||
uptime = total_time - metadata["downtime"]
|
||||
uptime_percentage = (
|
||||
uptime.total_seconds() / total_time.total_seconds()
|
||||
) * 100
|
||||
|
||||
success_rate = (
|
||||
metadata["successful_completions"]
|
||||
/ metadata["total_completions"]
|
||||
* 100
|
||||
if metadata["total_completions"] > 0
|
||||
else 0
|
||||
)
|
||||
|
||||
return AgentMetrics(
|
||||
total_completions=metadata["total_completions"],
|
||||
average_response_time=(
|
||||
sum(response_times) / len(response_times)
|
||||
if response_times
|
||||
else 0
|
||||
),
|
||||
error_rate=(
|
||||
metadata["error_count"]
|
||||
/ metadata["total_completions"]
|
||||
if metadata["total_completions"] > 0
|
||||
else 0
|
||||
),
|
||||
last_24h_completions=sum(
|
||||
1
|
||||
for t in response_times
|
||||
if (datetime.utcnow() - t).days < 1
|
||||
),
|
||||
total_tokens_used=metadata["total_tokens"],
|
||||
uptime_percentage=uptime_percentage,
|
||||
success_rate=success_rate,
|
||||
peak_tokens_per_minute=max(
|
||||
metadata.get("tokens_per_minute", [0])
|
||||
),
|
||||
)
|
||||
|
||||
async def clone_agent(
|
||||
self, agent_id: UUID, new_name: str
|
||||
) -> UUID:
|
||||
"""Clone an existing agent with a new name."""
|
||||
original_agent = await self.get_agent(agent_id)
|
||||
original_metadata = self.agent_metadata[agent_id]
|
||||
|
||||
config = AgentConfig(
|
||||
agent_name=new_name,
|
||||
description=f"Clone of {original_agent.agent_name}",
|
||||
system_prompt=original_agent.system_prompt,
|
||||
model_name=original_agent.llm.model_name,
|
||||
temperature=original_agent.llm.temperature,
|
||||
max_loops=original_agent.max_loops,
|
||||
tags=original_metadata["tags"],
|
||||
)
|
||||
|
||||
return await self.create_agent(config)
|
||||
|
||||
async def delete_agent(self, agent_id: UUID) -> None:
|
||||
"""Delete an agent."""
|
||||
if agent_id not in self.agents:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Agent {agent_id} not found",
|
||||
)
|
||||
|
||||
# Clean up any resources
|
||||
agent = self.agents[agent_id]
|
||||
if agent.autosave and os.path.exists(agent.saved_state_path):
|
||||
os.remove(agent.saved_state_path)
|
||||
|
||||
del self.agents[agent_id]
|
||||
del self.agent_metadata[agent_id]
|
||||
logger.info(f"Deleted agent {agent_id}")
|
||||
|
||||
async def process_completion(
|
||||
self,
|
||||
agent: Agent,
|
||||
prompt: str,
|
||||
agent_id: UUID,
|
||||
max_tokens: Optional[int] = None,
|
||||
temperature_override: Optional[float] = None,
|
||||
) -> CompletionResponse:
|
||||
"""Process a completion request using the specified agent."""
|
||||
start_time = datetime.utcnow()
|
||||
metadata = self.agent_metadata[agent_id]
|
||||
|
||||
try:
|
||||
# Update agent status
|
||||
metadata["status"] = AgentStatus.PROCESSING
|
||||
metadata["last_used"] = start_time
|
||||
|
||||
# Apply temporary overrides if specified
|
||||
original_temp = agent.llm.temperature
|
||||
if temperature_override is not None:
|
||||
agent.llm.temperature = temperature_override
|
||||
|
||||
# Process the completion
|
||||
response = agent.run(prompt)
|
||||
|
||||
# Reset overrides
|
||||
if temperature_override is not None:
|
||||
agent.llm.temperature = original_temp
|
||||
|
||||
# Update metrics
|
||||
processing_time = (
|
||||
datetime.utcnow() - start_time
|
||||
).total_seconds()
|
||||
metadata["response_times"].append(processing_time)
|
||||
metadata["total_completions"] += 1
|
||||
metadata["successful_completions"] += 1
|
||||
|
||||
# Estimate token usage (this is a rough estimate)
|
||||
prompt_tokens = len(prompt.split()) * 1.3
|
||||
completion_tokens = len(response.split()) * 1.3
|
||||
total_tokens = int(prompt_tokens + completion_tokens)
|
||||
metadata["total_tokens"] += total_tokens
|
||||
|
||||
# Update tokens per minute tracking
|
||||
current_minute = datetime.utcnow().replace(
|
||||
second=0, microsecond=0
|
||||
)
|
||||
if "tokens_per_minute" not in metadata:
|
||||
metadata["tokens_per_minute"] = {}
|
||||
metadata["tokens_per_minute"][current_minute] = (
|
||||
metadata["tokens_per_minute"].get(current_minute, 0)
|
||||
+ total_tokens
|
||||
)
|
||||
|
||||
return CompletionResponse(
|
||||
agent_id=agent_id,
|
||||
response=response,
|
||||
metadata={
|
||||
"agent_name": agent.agent_name,
|
||||
"model_name": agent.llm.model_name,
|
||||
"temperature": agent.llm.temperature,
|
||||
},
|
||||
timestamp=datetime.utcnow(),
|
||||
processing_time=processing_time,
|
||||
token_usage={
|
||||
"prompt_tokens": int(prompt_tokens),
|
||||
"completion_tokens": int(completion_tokens),
|
||||
"total_tokens": total_tokens,
|
||||
},
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
metadata["error_count"] += 1
|
||||
metadata["status"] = AgentStatus.ERROR
|
||||
logger.error(
|
||||
f"Error in completion processing: {str(e)}\n{traceback.format_exc()}"
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Error processing completion: {str(e)}",
|
||||
)
|
||||
finally:
|
||||
metadata["status"] = AgentStatus.IDLE
|
||||
|
||||
|
||||
class SwarmsAPI:
|
||||
"""Enhanced API class for Swarms agent integration."""
|
||||
|
||||
def __init__(self):
|
||||
self.app = FastAPI(
|
||||
title="Swarms Agent API",
|
||||
description="Production-grade API for Swarms agent interaction",
|
||||
version="1.0.0",
|
||||
docs_url="/v1/docs",
|
||||
redoc_url="/v1/redoc",
|
||||
)
|
||||
self.store = AgentStore()
|
||||
# Configure CORS
|
||||
self.app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=[
|
||||
"*"
|
||||
], # Configure appropriately for production
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
self._setup_routes()
|
||||
|
||||
def _setup_routes(self):
|
||||
"""Set up API routes."""
|
||||
|
||||
@self.app.post("/v1/agent", response_model=Dict[str, UUID])
|
||||
async def create_agent(config: AgentConfig):
|
||||
"""Create a new agent with the specified configuration."""
|
||||
agent_id = await self.store.create_agent(config)
|
||||
return {"agent_id": agent_id}
|
||||
|
||||
@self.app.get("/v1/agents", response_model=List[AgentSummary])
|
||||
async def list_agents(
|
||||
tags: Optional[List[str]] = Query(None),
|
||||
status: Optional[AgentStatus] = None,
|
||||
):
|
||||
"""List all agents, optionally filtered by tags and status."""
|
||||
return await self.store.list_agents(tags, status)
|
||||
|
||||
@self.app.patch(
|
||||
"/v1/agent/{agent_id}", response_model=Dict[str, str]
|
||||
)
|
||||
async def update_agent(agent_id: UUID, update: AgentUpdate):
|
||||
"""Update an existing agent's configuration."""
|
||||
await self.store.update_agent(agent_id, update)
|
||||
return {"status": "updated"}
|
||||
|
||||
@self.app.get(
|
||||
"/v1/agent/{agent_id}/metrics",
|
||||
response_model=AgentMetrics,
|
||||
)
|
||||
async def get_agent_metrics(agent_id: UUID):
|
||||
"""Get performance metrics for a specific agent."""
|
||||
return await self.store.get_agent_metrics(agent_id)
|
||||
|
||||
@self.app.post(
|
||||
"/v1/agent/{agent_id}/clone",
|
||||
response_model=Dict[str, UUID],
|
||||
)
|
||||
async def clone_agent(agent_id: UUID, new_name: str):
|
||||
"""Clone an existing agent with a new name."""
|
||||
new_id = await self.store.clone_agent(agent_id, new_name)
|
||||
return {"agent_id": new_id}
|
||||
|
||||
@self.app.delete("/v1/agent/{agent_id}")
|
||||
async def delete_agent(agent_id: UUID):
|
||||
"""Delete an agent."""
|
||||
await self.store.delete_agent(agent_id)
|
||||
return {"status": "deleted"}
|
||||
|
||||
@self.app.post(
|
||||
"/v1/agent/completions", response_model=CompletionResponse
|
||||
)
|
||||
async def create_completion(
|
||||
request: CompletionRequest,
|
||||
background_tasks: BackgroundTasks,
|
||||
):
|
||||
"""Process a completion request with the specified agent."""
|
||||
try:
|
||||
agent = await self.store.get_agent(request.agent_id)
|
||||
|
||||
# Process completion
|
||||
response = await self.store.process_completion(
|
||||
agent,
|
||||
request.prompt,
|
||||
request.agent_id,
|
||||
request.max_tokens,
|
||||
request.temperature_override,
|
||||
)
|
||||
|
||||
# Schedule background cleanup
|
||||
background_tasks.add_task(
|
||||
self._cleanup_old_metrics, request.agent_id
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing completion: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Error processing completion: {str(e)}",
|
||||
)
|
||||
|
||||
@self.app.get("/v1/agent/{agent_id}/status")
|
||||
async def get_agent_status(agent_id: UUID):
|
||||
"""Get the current status of an agent."""
|
||||
metadata = self.store.agent_metadata.get(agent_id)
|
||||
if not metadata:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Agent {agent_id} not found",
|
||||
)
|
||||
return {
|
||||
"agent_id": agent_id,
|
||||
"status": metadata["status"],
|
||||
"last_used": metadata["last_used"],
|
||||
"total_completions": metadata["total_completions"],
|
||||
"error_count": metadata["error_count"],
|
||||
}
|
||||
|
||||
async def _cleanup_old_metrics(self, agent_id: UUID):
|
||||
"""Clean up old metrics data to prevent memory bloat."""
|
||||
metadata = self.store.agent_metadata.get(agent_id)
|
||||
if metadata:
|
||||
# Keep only last 24 hours of response times
|
||||
cutoff = datetime.utcnow() - timedelta(days=1)
|
||||
metadata["response_times"] = [
|
||||
t
|
||||
for t in metadata["response_times"]
|
||||
if isinstance(t, (int, float))
|
||||
and t > cutoff.timestamp()
|
||||
]
|
||||
|
||||
# Clean up old tokens per minute data
|
||||
if "tokens_per_minute" in metadata:
|
||||
metadata["tokens_per_minute"] = {
|
||||
k: v
|
||||
for k, v in metadata["tokens_per_minute"].items()
|
||||
if k > cutoff
|
||||
}
|
||||
|
||||
|
||||
def create_app() -> FastAPI:
|
||||
"""Create and configure the FastAPI application."""
|
||||
print("create app")
|
||||
api = SwarmsAPI()
|
||||
return api.app
|
||||
|
||||
|
||||
#if __name__ == "__main__":
|
||||
if __name__ == '__main__':
|
||||
#freeze_support()
|
||||
print("yes in main")
|
||||
# Configure uvicorn logging
|
||||
logger.info("API Starting")
|
||||
|
||||
uvicorn.run(
|
||||
"main:create_app",
|
||||
host="0.0.0.0",
|
||||
port=8000,
|
||||
# reload=True,
|
||||
# workers=4,
|
||||
)
|
||||
else:
|
||||
print("not in main")
|
||||
|
Loading…
Reference in new issue