diff --git a/api/agent_api.py b/api/agent_api.py deleted file mode 100644 index 83d05101..00000000 --- a/api/agent_api.py +++ /dev/null @@ -1,632 +0,0 @@ -import os -from fastapi import ( - FastAPI, - HTTPException, - status, - Query, - BackgroundTasks, -) -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel, Field -from typing import Optional, Dict, Any, List -from loguru import logger -import uvicorn -from datetime import datetime, timedelta -from uuid import UUID, uuid4 -from enum import Enum -from pathlib import Path -from concurrent.futures import ThreadPoolExecutor -import traceback - -from swarms import Agent -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() - -# Configure Loguru -logger.add( - "logs/api_{time}.log", - rotation="500 MB", - retention="10 days", - level="INFO", - format="{time} {level} {message}", - backtrace=True, - diagnose=True, -) - - -class AgentStatus(str, Enum): - """Enum for agent status.""" - - IDLE = "idle" - PROCESSING = "processing" - ERROR = "error" - MAINTENANCE = "maintenance" - - -class AgentConfig(BaseModel): - """Configuration model for creating a new agent.""" - - agent_name: str = Field(..., description="Name of the agent") - model_name: str = Field( - ..., - description="Name of the llm you want to use provided by litellm", - ) - description: str = Field( - default="", description="Description of the agent's purpose" - ) - system_prompt: str = Field( - ..., description="System prompt for the agent" - ) - model_name: str = Field( - default="gpt-4", description="Model name to use" - ) - temperature: float = Field( - default=0.1, - ge=0.0, - le=2.0, - description="Temperature for the model", - ) - max_loops: int = Field( - default=1, ge=1, description="Maximum number of loops" - ) - autosave: bool = Field( - default=True, description="Enable autosave" - ) - dashboard: bool = Field( - default=False, description="Enable dashboard" - ) - verbose: bool = Field( - default=True, description="Enable verbose output" - ) - dynamic_temperature_enabled: bool = Field( - default=True, description="Enable dynamic temperature" - ) - user_name: str = Field( - default="default_user", description="Username for the agent" - ) - retry_attempts: int = Field( - default=1, ge=1, description="Number of retry attempts" - ) - context_length: int = Field( - default=200000, ge=1000, description="Context length" - ) - output_type: str = Field( - default="string", description="Output type (string or json)" - ) - streaming_on: bool = Field( - default=False, description="Enable streaming" - ) - tags: List[str] = Field( - default_factory=list, - description="Tags for categorizing the agent", - ) - - -class AgentUpdate(BaseModel): - """Model for updating agent configuration.""" - - description: Optional[str] = None - system_prompt: Optional[str] = None - temperature: Optional[float] = None - max_loops: Optional[int] = None - tags: Optional[List[str]] = None - status: Optional[AgentStatus] = None - - -class AgentSummary(BaseModel): - """Summary model for agent listing.""" - - agent_id: UUID - agent_name: str - description: str - created_at: datetime - last_used: datetime - total_completions: int - tags: List[str] - status: AgentStatus - - -class AgentMetrics(BaseModel): - """Model for agent performance metrics.""" - - total_completions: int - average_response_time: float - error_rate: float - last_24h_completions: int - total_tokens_used: int - uptime_percentage: float - success_rate: float - peak_tokens_per_minute: int - - -class CompletionRequest(BaseModel): - """Model for completion requests.""" - - prompt: str = Field(..., description="The prompt to process") - agent_id: UUID = Field(..., description="ID of the agent to use") - max_tokens: Optional[int] = Field( - None, description="Maximum tokens to generate" - ) - temperature_override: Optional[float] = None - stream: bool = Field( - default=False, description="Enable streaming response" - ) - - -class CompletionResponse(BaseModel): - """Model for completion responses.""" - - agent_id: UUID - response: str - metadata: Dict[str, Any] - timestamp: datetime - processing_time: float - token_usage: Dict[str, int] - - -class AgentStore: - """Enhanced store for managing agents.""" - - def __init__(self): - self.agents: Dict[UUID, Agent] = {} - self.agent_metadata: Dict[UUID, Dict[str, Any]] = {} - self.executor = ThreadPoolExecutor(max_workers=4) - self._ensure_directories() - - def _ensure_directories(self): - """Ensure required directories exist.""" - Path("logs").mkdir(exist_ok=True) - Path("states").mkdir(exist_ok=True) - - async def create_agent(self, config: AgentConfig) -> UUID: - """Create a new agent with the given configuration.""" - try: - - agent = Agent( - agent_name=config.agent_name, - system_prompt=config.system_prompt, - model_name=config.model_name, - max_loops=config.max_loops, - autosave=config.autosave, - dashboard=config.dashboard, - verbose=config.verbose, - dynamic_temperature_enabled=config.dynamic_temperature_enabled, - saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", - user_name=config.user_name, - retry_attempts=config.retry_attempts, - context_length=config.context_length, - return_step_meta=True, - output_type="str", - streaming_on=config.streaming_on, - ) - - agent_id = uuid4() - self.agents[agent_id] = agent - self.agent_metadata[agent_id] = { - "description": config.description, - "created_at": datetime.utcnow(), - "last_used": datetime.utcnow(), - "total_completions": 0, - "tags": config.tags, - "total_tokens": 0, - "error_count": 0, - "response_times": [], - "status": AgentStatus.IDLE, - "start_time": datetime.utcnow(), - "downtime": timedelta(), - "successful_completions": 0, - } - - logger.info(f"Created agent with ID: {agent_id}") - return agent_id - - except Exception as e: - logger.error(f"Error creating agent: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to create agent: {str(e)}", - ) - - async def get_agent(self, agent_id: UUID) -> Agent: - """Retrieve an agent by ID.""" - agent = self.agents.get(agent_id) - if not agent: - logger.error(f"Agent not found: {agent_id}") - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Agent {agent_id} not found", - ) - return agent - - async def update_agent( - self, agent_id: UUID, update: AgentUpdate - ) -> None: - """Update agent configuration.""" - agent = await self.get_agent(agent_id) - metadata = self.agent_metadata[agent_id] - - if update.system_prompt: - agent.system_prompt = update.system_prompt - if update.temperature is not None: - agent.llm.temperature = update.temperature - if update.max_loops is not None: - agent.max_loops = update.max_loops - if update.tags is not None: - metadata["tags"] = update.tags - if update.description is not None: - metadata["description"] = update.description - if update.status is not None: - metadata["status"] = update.status - if update.status == AgentStatus.MAINTENANCE: - metadata["downtime"] += ( - datetime.utcnow() - metadata["last_used"] - ) - - logger.info(f"Updated agent {agent_id}") - - async def list_agents( - self, - tags: Optional[List[str]] = None, - status: Optional[AgentStatus] = None, - ) -> List[AgentSummary]: - """List all agents, optionally filtered by tags and status.""" - summaries = [] - for agent_id, agent in self.agents.items(): - metadata = self.agent_metadata[agent_id] - - # Apply filters - if tags and not any( - tag in metadata["tags"] for tag in tags - ): - continue - if status and metadata["status"] != status: - continue - - summaries.append( - AgentSummary( - agent_id=agent_id, - agent_name=agent.agent_name, - description=metadata["description"], - created_at=metadata["created_at"], - last_used=metadata["last_used"], - total_completions=metadata["total_completions"], - tags=metadata["tags"], - status=metadata["status"], - ) - ) - return summaries - - async def get_agent_metrics(self, agent_id: UUID) -> AgentMetrics: - """Get performance metrics for an agent.""" - metadata = self.agent_metadata[agent_id] - response_times = metadata["response_times"] - - # Calculate metrics - total_time = datetime.utcnow() - metadata["start_time"] - uptime = total_time - metadata["downtime"] - uptime_percentage = ( - uptime.total_seconds() / total_time.total_seconds() - ) * 100 - - success_rate = ( - metadata["successful_completions"] - / metadata["total_completions"] - * 100 - if metadata["total_completions"] > 0 - else 0 - ) - - return AgentMetrics( - total_completions=metadata["total_completions"], - average_response_time=( - sum(response_times) / len(response_times) - if response_times - else 0 - ), - error_rate=( - metadata["error_count"] - / metadata["total_completions"] - if metadata["total_completions"] > 0 - else 0 - ), - last_24h_completions=sum( - 1 - for t in response_times - if (datetime.utcnow() - t).days < 1 - ), - total_tokens_used=metadata["total_tokens"], - uptime_percentage=uptime_percentage, - success_rate=success_rate, - peak_tokens_per_minute=max( - metadata.get("tokens_per_minute", [0]) - ), - ) - - async def clone_agent( - self, agent_id: UUID, new_name: str - ) -> UUID: - """Clone an existing agent with a new name.""" - original_agent = await self.get_agent(agent_id) - original_metadata = self.agent_metadata[agent_id] - - config = AgentConfig( - agent_name=new_name, - description=f"Clone of {original_agent.agent_name}", - system_prompt=original_agent.system_prompt, - model_name=original_agent.llm.model_name, - temperature=original_agent.llm.temperature, - max_loops=original_agent.max_loops, - tags=original_metadata["tags"], - ) - - return await self.create_agent(config) - - async def delete_agent(self, agent_id: UUID) -> None: - """Delete an agent.""" - if agent_id not in self.agents: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Agent {agent_id} not found", - ) - - # Clean up any resources - agent = self.agents[agent_id] - if agent.autosave and os.path.exists(agent.saved_state_path): - os.remove(agent.saved_state_path) - - del self.agents[agent_id] - del self.agent_metadata[agent_id] - logger.info(f"Deleted agent {agent_id}") - - async def process_completion( - self, - agent: Agent, - prompt: str, - agent_id: UUID, - max_tokens: Optional[int] = None, - temperature_override: Optional[float] = None, - ) -> CompletionResponse: - """Process a completion request using the specified agent.""" - start_time = datetime.utcnow() - metadata = self.agent_metadata[agent_id] - - try: - # Update agent status - metadata["status"] = AgentStatus.PROCESSING - metadata["last_used"] = start_time - - # Apply temporary overrides if specified - original_temp = agent.llm.temperature - if temperature_override is not None: - agent.llm.temperature = temperature_override - - # Process the completion - response = agent.run(prompt) - - # Reset overrides - if temperature_override is not None: - agent.llm.temperature = original_temp - - # Update metrics - processing_time = ( - datetime.utcnow() - start_time - ).total_seconds() - metadata["response_times"].append(processing_time) - metadata["total_completions"] += 1 - metadata["successful_completions"] += 1 - - # Estimate token usage (this is a rough estimate) - prompt_tokens = len(prompt.split()) * 1.3 - completion_tokens = len(response.split()) * 1.3 - total_tokens = int(prompt_tokens + completion_tokens) - metadata["total_tokens"] += total_tokens - - # Update tokens per minute tracking - current_minute = datetime.utcnow().replace( - second=0, microsecond=0 - ) - if "tokens_per_minute" not in metadata: - metadata["tokens_per_minute"] = {} - metadata["tokens_per_minute"][current_minute] = ( - metadata["tokens_per_minute"].get(current_minute, 0) - + total_tokens - ) - - return CompletionResponse( - agent_id=agent_id, - response=response, - metadata={ - "agent_name": agent.agent_name, - "model_name": agent.llm.model_name, - "temperature": agent.llm.temperature, - }, - timestamp=datetime.utcnow(), - processing_time=processing_time, - token_usage={ - "prompt_tokens": int(prompt_tokens), - "completion_tokens": int(completion_tokens), - "total_tokens": total_tokens, - }, - ) - - except Exception as e: - metadata["error_count"] += 1 - metadata["status"] = AgentStatus.ERROR - logger.error( - f"Error in completion processing: {str(e)}\n{traceback.format_exc()}" - ) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error processing completion: {str(e)}", - ) - finally: - metadata["status"] = AgentStatus.IDLE - - -class SwarmsAPI: - """Enhanced API class for Swarms agent integration.""" - - def __init__(self): - self.app = FastAPI( - title="Swarms Agent API", - description="Production-grade API for Swarms agent interaction", - version="1.0.0", - docs_url="/v1/docs", - redoc_url="/v1/redoc", - ) - self.store = AgentStore() - # Configure CORS - self.app.add_middleware( - CORSMiddleware, - allow_origins=[ - "*" - ], # Configure appropriately for production - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - - self._setup_routes() - - def _setup_routes(self): - """Set up API routes.""" - - @self.app.post("/v1/agent", response_model=Dict[str, UUID]) - async def create_agent(config: AgentConfig): - """Create a new agent with the specified configuration.""" - agent_id = await self.store.create_agent(config) - return {"agent_id": agent_id} - - @self.app.get("/v1/agents", response_model=List[AgentSummary]) - async def list_agents( - tags: Optional[List[str]] = Query(None), - status: Optional[AgentStatus] = None, - ): - """List all agents, optionally filtered by tags and status.""" - return await self.store.list_agents(tags, status) - - @self.app.patch( - "/v1/agent/{agent_id}", response_model=Dict[str, str] - ) - async def update_agent(agent_id: UUID, update: AgentUpdate): - """Update an existing agent's configuration.""" - await self.store.update_agent(agent_id, update) - return {"status": "updated"} - - @self.app.get( - "/v1/agent/{agent_id}/metrics", - response_model=AgentMetrics, - ) - async def get_agent_metrics(agent_id: UUID): - """Get performance metrics for a specific agent.""" - return await self.store.get_agent_metrics(agent_id) - - @self.app.post( - "/v1/agent/{agent_id}/clone", - response_model=Dict[str, UUID], - ) - async def clone_agent(agent_id: UUID, new_name: str): - """Clone an existing agent with a new name.""" - new_id = await self.store.clone_agent(agent_id, new_name) - return {"agent_id": new_id} - - @self.app.delete("/v1/agent/{agent_id}") - async def delete_agent(agent_id: UUID): - """Delete an agent.""" - await self.store.delete_agent(agent_id) - return {"status": "deleted"} - - @self.app.post( - "/v1/agent/completions", response_model=CompletionResponse - ) - async def create_completion( - request: CompletionRequest, - background_tasks: BackgroundTasks, - ): - """Process a completion request with the specified agent.""" - try: - agent = await self.store.get_agent(request.agent_id) - - # Process completion - response = await self.store.process_completion( - agent, - request.prompt, - request.agent_id, - request.max_tokens, - request.temperature_override, - ) - - # Schedule background cleanup - background_tasks.add_task( - self._cleanup_old_metrics, request.agent_id - ) - - return response - - except Exception as e: - logger.error(f"Error processing completion: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error processing completion: {str(e)}", - ) - - @self.app.get("/v1/agent/{agent_id}/status") - async def get_agent_status(agent_id: UUID): - """Get the current status of an agent.""" - metadata = self.store.agent_metadata.get(agent_id) - if not metadata: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Agent {agent_id} not found", - ) - return { - "agent_id": agent_id, - "status": metadata["status"], - "last_used": metadata["last_used"], - "total_completions": metadata["total_completions"], - "error_count": metadata["error_count"], - } - - async def _cleanup_old_metrics(self, agent_id: UUID): - """Clean up old metrics data to prevent memory bloat.""" - metadata = self.store.agent_metadata.get(agent_id) - if metadata: - # Keep only last 24 hours of response times - cutoff = datetime.utcnow() - timedelta(days=1) - metadata["response_times"] = [ - t - for t in metadata["response_times"] - if isinstance(t, (int, float)) - and t > cutoff.timestamp() - ] - - # Clean up old tokens per minute data - if "tokens_per_minute" in metadata: - metadata["tokens_per_minute"] = { - k: v - for k, v in metadata["tokens_per_minute"].items() - if k > cutoff - } - - -def create_app() -> FastAPI: - """Create and configure the FastAPI application.""" - api = SwarmsAPI() - return api.app - - -if __name__ == "__main__": - # Configure uvicorn logging - print("in main") - logger.info("API Starting") - uvicorn.run( - "main:create_app", - host="0.0.0.0", - port=8000, - reload=True, - workers=4, - ) -else: - print("not in main")