From 449b2db79ed82532a3e2c915510079ebdf24fd6d Mon Sep 17 00:00:00 2001 From: mike dupont Date: Sat, 7 Dec 2024 16:00:03 -0500 Subject: [PATCH] adding main --- api/agent_api.py | 3 + api/main.py | 638 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 641 insertions(+) create mode 100644 api/main.py diff --git a/api/agent_api.py b/api/agent_api.py index d1968d9d..83d05101 100644 --- a/api/agent_api.py +++ b/api/agent_api.py @@ -619,6 +619,7 @@ def create_app() -> FastAPI: if __name__ == "__main__": # Configure uvicorn logging + print("in main") logger.info("API Starting") uvicorn.run( "main:create_app", @@ -627,3 +628,5 @@ if __name__ == "__main__": reload=True, workers=4, ) +else: + print("not in main") diff --git a/api/main.py b/api/main.py new file mode 100644 index 00000000..768e8d96 --- /dev/null +++ b/api/main.py @@ -0,0 +1,638 @@ +import os +from fastapi import ( + FastAPI, + HTTPException, + status, + Query, + BackgroundTasks, +) +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel, Field +from typing import Optional, Dict, Any, List +from loguru import logger +import uvicorn +from datetime import datetime, timedelta +from uuid import UUID, uuid4 +from enum import Enum +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor +import traceback + +from swarms import Agent +from dotenv import load_dotenv + +print ("starting") +# Load environment variables +load_dotenv() + +# Configure Loguru +logger.add( + "logs/api_{time}.log", + rotation="500 MB", + retention="10 days", + level="INFO", + format="{time} {level} {message}", + backtrace=True, + diagnose=True, +) + + +class AgentStatus(str, Enum): + """Enum for agent status.""" + + IDLE = "idle" + PROCESSING = "processing" + ERROR = "error" + MAINTENANCE = "maintenance" + + +class AgentConfig(BaseModel): + """Configuration model for creating a new agent.""" + + agent_name: str = Field(..., description="Name of the agent") + model_name: str = Field( + ..., + description="Name of the llm you want to use provided by litellm", + ) + description: str = Field( + default="", description="Description of the agent's purpose" + ) + system_prompt: str = Field( + ..., description="System prompt for the agent" + ) + model_name: str = Field( + default="gpt-4", description="Model name to use" + ) + temperature: float = Field( + default=0.1, + ge=0.0, + le=2.0, + description="Temperature for the model", + ) + max_loops: int = Field( + default=1, ge=1, description="Maximum number of loops" + ) + autosave: bool = Field( + default=True, description="Enable autosave" + ) + dashboard: bool = Field( + default=False, description="Enable dashboard" + ) + verbose: bool = Field( + default=True, description="Enable verbose output" + ) + dynamic_temperature_enabled: bool = Field( + default=True, description="Enable dynamic temperature" + ) + user_name: str = Field( + default="default_user", description="Username for the agent" + ) + retry_attempts: int = Field( + default=1, ge=1, description="Number of retry attempts" + ) + context_length: int = Field( + default=200000, ge=1000, description="Context length" + ) + output_type: str = Field( + default="string", description="Output type (string or json)" + ) + streaming_on: bool = Field( + default=False, description="Enable streaming" + ) + tags: List[str] = Field( + default_factory=list, + description="Tags for categorizing the agent", + ) + + +class AgentUpdate(BaseModel): + """Model for updating agent configuration.""" + + description: Optional[str] = None + system_prompt: Optional[str] = None + temperature: Optional[float] = None + max_loops: Optional[int] = None + tags: Optional[List[str]] = None + status: Optional[AgentStatus] = None + + +class AgentSummary(BaseModel): + """Summary model for agent listing.""" + + agent_id: UUID + agent_name: str + description: str + created_at: datetime + last_used: datetime + total_completions: int + tags: List[str] + status: AgentStatus + + +class AgentMetrics(BaseModel): + """Model for agent performance metrics.""" + + total_completions: int + average_response_time: float + error_rate: float + last_24h_completions: int + total_tokens_used: int + uptime_percentage: float + success_rate: float + peak_tokens_per_minute: int + + +class CompletionRequest(BaseModel): + """Model for completion requests.""" + + prompt: str = Field(..., description="The prompt to process") + agent_id: UUID = Field(..., description="ID of the agent to use") + max_tokens: Optional[int] = Field( + None, description="Maximum tokens to generate" + ) + temperature_override: Optional[float] = None + stream: bool = Field( + default=False, description="Enable streaming response" + ) + + +class CompletionResponse(BaseModel): + """Model for completion responses.""" + + agent_id: UUID + response: str + metadata: Dict[str, Any] + timestamp: datetime + processing_time: float + token_usage: Dict[str, int] + + +class AgentStore: + """Enhanced store for managing agents.""" + + def __init__(self): + self.agents: Dict[UUID, Agent] = {} + self.agent_metadata: Dict[UUID, Dict[str, Any]] = {} + self.executor = ThreadPoolExecutor(max_workers=4) + self._ensure_directories() + + def _ensure_directories(self): + """Ensure required directories exist.""" + Path("logs").mkdir(exist_ok=True) + Path("states").mkdir(exist_ok=True) + + async def create_agent(self, config: AgentConfig) -> UUID: + """Create a new agent with the given configuration.""" + try: + + agent = Agent( + agent_name=config.agent_name, + system_prompt=config.system_prompt, + model_name=config.model_name, + max_loops=config.max_loops, + autosave=config.autosave, + dashboard=config.dashboard, + verbose=config.verbose, + dynamic_temperature_enabled=config.dynamic_temperature_enabled, + saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", + user_name=config.user_name, + retry_attempts=config.retry_attempts, + context_length=config.context_length, + return_step_meta=True, + output_type="str", + streaming_on=config.streaming_on, + ) + + agent_id = uuid4() + self.agents[agent_id] = agent + self.agent_metadata[agent_id] = { + "description": config.description, + "created_at": datetime.utcnow(), + "last_used": datetime.utcnow(), + "total_completions": 0, + "tags": config.tags, + "total_tokens": 0, + "error_count": 0, + "response_times": [], + "status": AgentStatus.IDLE, + "start_time": datetime.utcnow(), + "downtime": timedelta(), + "successful_completions": 0, + } + + logger.info(f"Created agent with ID: {agent_id}") + return agent_id + + except Exception as e: + logger.error(f"Error creating agent: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to create agent: {str(e)}", + ) + + async def get_agent(self, agent_id: UUID) -> Agent: + """Retrieve an agent by ID.""" + agent = self.agents.get(agent_id) + if not agent: + logger.error(f"Agent not found: {agent_id}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {agent_id} not found", + ) + return agent + + async def update_agent( + self, agent_id: UUID, update: AgentUpdate + ) -> None: + """Update agent configuration.""" + agent = await self.get_agent(agent_id) + metadata = self.agent_metadata[agent_id] + + if update.system_prompt: + agent.system_prompt = update.system_prompt + if update.temperature is not None: + agent.llm.temperature = update.temperature + if update.max_loops is not None: + agent.max_loops = update.max_loops + if update.tags is not None: + metadata["tags"] = update.tags + if update.description is not None: + metadata["description"] = update.description + if update.status is not None: + metadata["status"] = update.status + if update.status == AgentStatus.MAINTENANCE: + metadata["downtime"] += ( + datetime.utcnow() - metadata["last_used"] + ) + + logger.info(f"Updated agent {agent_id}") + + async def list_agents( + self, + tags: Optional[List[str]] = None, + status: Optional[AgentStatus] = None, + ) -> List[AgentSummary]: + """List all agents, optionally filtered by tags and status.""" + summaries = [] + for agent_id, agent in self.agents.items(): + metadata = self.agent_metadata[agent_id] + + # Apply filters + if tags and not any( + tag in metadata["tags"] for tag in tags + ): + continue + if status and metadata["status"] != status: + continue + + summaries.append( + AgentSummary( + agent_id=agent_id, + agent_name=agent.agent_name, + description=metadata["description"], + created_at=metadata["created_at"], + last_used=metadata["last_used"], + total_completions=metadata["total_completions"], + tags=metadata["tags"], + status=metadata["status"], + ) + ) + return summaries + + async def get_agent_metrics(self, agent_id: UUID) -> AgentMetrics: + """Get performance metrics for an agent.""" + metadata = self.agent_metadata[agent_id] + response_times = metadata["response_times"] + + # Calculate metrics + total_time = datetime.utcnow() - metadata["start_time"] + uptime = total_time - metadata["downtime"] + uptime_percentage = ( + uptime.total_seconds() / total_time.total_seconds() + ) * 100 + + success_rate = ( + metadata["successful_completions"] + / metadata["total_completions"] + * 100 + if metadata["total_completions"] > 0 + else 0 + ) + + return AgentMetrics( + total_completions=metadata["total_completions"], + average_response_time=( + sum(response_times) / len(response_times) + if response_times + else 0 + ), + error_rate=( + metadata["error_count"] + / metadata["total_completions"] + if metadata["total_completions"] > 0 + else 0 + ), + last_24h_completions=sum( + 1 + for t in response_times + if (datetime.utcnow() - t).days < 1 + ), + total_tokens_used=metadata["total_tokens"], + uptime_percentage=uptime_percentage, + success_rate=success_rate, + peak_tokens_per_minute=max( + metadata.get("tokens_per_minute", [0]) + ), + ) + + async def clone_agent( + self, agent_id: UUID, new_name: str + ) -> UUID: + """Clone an existing agent with a new name.""" + original_agent = await self.get_agent(agent_id) + original_metadata = self.agent_metadata[agent_id] + + config = AgentConfig( + agent_name=new_name, + description=f"Clone of {original_agent.agent_name}", + system_prompt=original_agent.system_prompt, + model_name=original_agent.llm.model_name, + temperature=original_agent.llm.temperature, + max_loops=original_agent.max_loops, + tags=original_metadata["tags"], + ) + + return await self.create_agent(config) + + async def delete_agent(self, agent_id: UUID) -> None: + """Delete an agent.""" + if agent_id not in self.agents: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {agent_id} not found", + ) + + # Clean up any resources + agent = self.agents[agent_id] + if agent.autosave and os.path.exists(agent.saved_state_path): + os.remove(agent.saved_state_path) + + del self.agents[agent_id] + del self.agent_metadata[agent_id] + logger.info(f"Deleted agent {agent_id}") + + async def process_completion( + self, + agent: Agent, + prompt: str, + agent_id: UUID, + max_tokens: Optional[int] = None, + temperature_override: Optional[float] = None, + ) -> CompletionResponse: + """Process a completion request using the specified agent.""" + start_time = datetime.utcnow() + metadata = self.agent_metadata[agent_id] + + try: + # Update agent status + metadata["status"] = AgentStatus.PROCESSING + metadata["last_used"] = start_time + + # Apply temporary overrides if specified + original_temp = agent.llm.temperature + if temperature_override is not None: + agent.llm.temperature = temperature_override + + # Process the completion + response = agent.run(prompt) + + # Reset overrides + if temperature_override is not None: + agent.llm.temperature = original_temp + + # Update metrics + processing_time = ( + datetime.utcnow() - start_time + ).total_seconds() + metadata["response_times"].append(processing_time) + metadata["total_completions"] += 1 + metadata["successful_completions"] += 1 + + # Estimate token usage (this is a rough estimate) + prompt_tokens = len(prompt.split()) * 1.3 + completion_tokens = len(response.split()) * 1.3 + total_tokens = int(prompt_tokens + completion_tokens) + metadata["total_tokens"] += total_tokens + + # Update tokens per minute tracking + current_minute = datetime.utcnow().replace( + second=0, microsecond=0 + ) + if "tokens_per_minute" not in metadata: + metadata["tokens_per_minute"] = {} + metadata["tokens_per_minute"][current_minute] = ( + metadata["tokens_per_minute"].get(current_minute, 0) + + total_tokens + ) + + return CompletionResponse( + agent_id=agent_id, + response=response, + metadata={ + "agent_name": agent.agent_name, + "model_name": agent.llm.model_name, + "temperature": agent.llm.temperature, + }, + timestamp=datetime.utcnow(), + processing_time=processing_time, + token_usage={ + "prompt_tokens": int(prompt_tokens), + "completion_tokens": int(completion_tokens), + "total_tokens": total_tokens, + }, + ) + + except Exception as e: + metadata["error_count"] += 1 + metadata["status"] = AgentStatus.ERROR + logger.error( + f"Error in completion processing: {str(e)}\n{traceback.format_exc()}" + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error processing completion: {str(e)}", + ) + finally: + metadata["status"] = AgentStatus.IDLE + + +class SwarmsAPI: + """Enhanced API class for Swarms agent integration.""" + + def __init__(self): + self.app = FastAPI( + title="Swarms Agent API", + description="Production-grade API for Swarms agent interaction", + version="1.0.0", + docs_url="/v1/docs", + redoc_url="/v1/redoc", + ) + self.store = AgentStore() + # Configure CORS + self.app.add_middleware( + CORSMiddleware, + allow_origins=[ + "*" + ], # Configure appropriately for production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + self._setup_routes() + + def _setup_routes(self): + """Set up API routes.""" + + @self.app.post("/v1/agent", response_model=Dict[str, UUID]) + async def create_agent(config: AgentConfig): + """Create a new agent with the specified configuration.""" + agent_id = await self.store.create_agent(config) + return {"agent_id": agent_id} + + @self.app.get("/v1/agents", response_model=List[AgentSummary]) + async def list_agents( + tags: Optional[List[str]] = Query(None), + status: Optional[AgentStatus] = None, + ): + """List all agents, optionally filtered by tags and status.""" + return await self.store.list_agents(tags, status) + + @self.app.patch( + "/v1/agent/{agent_id}", response_model=Dict[str, str] + ) + async def update_agent(agent_id: UUID, update: AgentUpdate): + """Update an existing agent's configuration.""" + await self.store.update_agent(agent_id, update) + return {"status": "updated"} + + @self.app.get( + "/v1/agent/{agent_id}/metrics", + response_model=AgentMetrics, + ) + async def get_agent_metrics(agent_id: UUID): + """Get performance metrics for a specific agent.""" + return await self.store.get_agent_metrics(agent_id) + + @self.app.post( + "/v1/agent/{agent_id}/clone", + response_model=Dict[str, UUID], + ) + async def clone_agent(agent_id: UUID, new_name: str): + """Clone an existing agent with a new name.""" + new_id = await self.store.clone_agent(agent_id, new_name) + return {"agent_id": new_id} + + @self.app.delete("/v1/agent/{agent_id}") + async def delete_agent(agent_id: UUID): + """Delete an agent.""" + await self.store.delete_agent(agent_id) + return {"status": "deleted"} + + @self.app.post( + "/v1/agent/completions", response_model=CompletionResponse + ) + async def create_completion( + request: CompletionRequest, + background_tasks: BackgroundTasks, + ): + """Process a completion request with the specified agent.""" + try: + agent = await self.store.get_agent(request.agent_id) + + # Process completion + response = await self.store.process_completion( + agent, + request.prompt, + request.agent_id, + request.max_tokens, + request.temperature_override, + ) + + # Schedule background cleanup + background_tasks.add_task( + self._cleanup_old_metrics, request.agent_id + ) + + return response + + except Exception as e: + logger.error(f"Error processing completion: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error processing completion: {str(e)}", + ) + + @self.app.get("/v1/agent/{agent_id}/status") + async def get_agent_status(agent_id: UUID): + """Get the current status of an agent.""" + metadata = self.store.agent_metadata.get(agent_id) + if not metadata: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {agent_id} not found", + ) + return { + "agent_id": agent_id, + "status": metadata["status"], + "last_used": metadata["last_used"], + "total_completions": metadata["total_completions"], + "error_count": metadata["error_count"], + } + + async def _cleanup_old_metrics(self, agent_id: UUID): + """Clean up old metrics data to prevent memory bloat.""" + metadata = self.store.agent_metadata.get(agent_id) + if metadata: + # Keep only last 24 hours of response times + cutoff = datetime.utcnow() - timedelta(days=1) + metadata["response_times"] = [ + t + for t in metadata["response_times"] + if isinstance(t, (int, float)) + and t > cutoff.timestamp() + ] + + # Clean up old tokens per minute data + if "tokens_per_minute" in metadata: + metadata["tokens_per_minute"] = { + k: v + for k, v in metadata["tokens_per_minute"].items() + if k > cutoff + } + + +def create_app() -> FastAPI: + """Create and configure the FastAPI application.""" + print("create app") + api = SwarmsAPI() + return api.app + + +#if __name__ == "__main__": +if __name__ == '__main__': + #freeze_support() + print("yes in main") + # Configure uvicorn logging + logger.info("API Starting") + + uvicorn.run( + "main:create_app", + host="0.0.0.0", + port=8000, + # reload=True, + # workers=4, + ) +else: + print("not in main") +