diff --git a/api/advanced_api.py b/api/advanced_api.py new file mode 100644 index 00000000..b5e7463f --- /dev/null +++ b/api/advanced_api.py @@ -0,0 +1,1282 @@ +import multiprocessing +import os +import secrets +import signal +import sys +import threading +import time +import traceback +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass +from datetime import datetime, timedelta +from enum import Enum +from multiprocessing import Lock, Process, Queue, Value +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from uuid import UUID, uuid4 + +import httpx +import psutil +import uvicorn +from dotenv import load_dotenv +from fastapi import ( + BackgroundTasks, + Depends, + FastAPI, + Header, + HTTPException, + Query, + Request, + status, +) +from fastapi.middleware.cors import CORSMiddleware +from loguru import logger +from pydantic import BaseModel, Field + +from swarms.structs.agent import Agent + +# Load environment variables +load_dotenv() + + +# # Set start method to 'fork' at the very beginning of the script +# multiprocessing.set_start_method('fork') + + +@dataclass +class ProcessMetrics: + """Metrics for each API process.""" + + pid: int + cpu_usage: float + memory_usage: float + request_count: int + last_heartbeat: float + port: int + + +class ProcessManager: + """Manages multiple API processes and their metrics.""" + + def __init__( + self, num_processes: int = None, start_port: int = 8000 + ): + self.num_processes = ( + num_processes or multiprocessing.cpu_count() + ) + self.start_port = start_port + self.processes: Dict[int, Process] = {} + self.metrics: Dict[int, ProcessMetrics] = {} + self.metrics_lock = Lock() + self.heartbeat_queue = Queue() + self.shutdown_event = multiprocessing.Event() + + def start_api_process(self, port: int) -> Process: + """Start a single API process on the specified port.""" + process = Process( + target=run_api_instance, + args=(port, self.heartbeat_queue, self.shutdown_event), + ) + process.start() + return process + + def start_all_processes(self): + """Start all API processes.""" + for i in range(self.num_processes): + port = self.start_port + i + 1 + process = self.start_api_process(port) + self.processes[process.pid] = process + self.metrics[process.pid] = ProcessMetrics( + pid=process.pid, + cpu_usage=0.0, + memory_usage=0.0, + request_count=0, + last_heartbeat=time.time(), + port=port, + ) + + def monitor_processes(self): + """Monitor process health and metrics.""" + while not self.shutdown_event.is_set(): + try: + # Update metrics from heartbeat queue + while not self.heartbeat_queue.empty(): + pid, cpu, memory, requests = ( + self.heartbeat_queue.get_nowait() + ) + with self.metrics_lock: + if pid in self.metrics: + self.metrics[pid].cpu_usage = cpu + self.metrics[pid].memory_usage = memory + self.metrics[pid].request_count = requests + self.metrics[pid].last_heartbeat = ( + time.time() + ) + + # Check for dead processes and restart them + current_time = time.time() + with self.metrics_lock: + for pid, metrics in list(self.metrics.items()): + if ( + current_time - metrics.last_heartbeat > 30 + ): # 30 seconds timeout + print( + f"Process {pid} appears to be dead, restarting..." + ) + if pid in self.processes: + self.processes[pid].terminate() + del self.processes[pid] + new_process = self.start_api_process( + metrics.port + ) + self.processes[new_process.pid] = ( + new_process + ) + self.metrics[new_process.pid] = ( + ProcessMetrics( + pid=new_process.pid, + cpu_usage=0.0, + memory_usage=0.0, + request_count=0, + last_heartbeat=time.time(), + port=metrics.port, + ) + ) + del self.metrics[pid] + + time.sleep(1) + except Exception as e: + print(f"Error in process monitoring: {e}") + + def shutdown(self): + """Shutdown all processes gracefully.""" + self.shutdown_event.set() + for process in self.processes.values(): + process.terminate() + process.join() + + +class AgentStatus(str, Enum): + """Enum for agent status.""" + + IDLE = "idle" + PROCESSING = "processing" + ERROR = "error" + MAINTENANCE = "maintenance" + + +# Security configurations +API_KEY_LENGTH = 32 # Length of generated API keys + + +class APIKey(BaseModel): + key: str + name: str + created_at: datetime + last_used: datetime + is_active: bool = True + + +class APIKeyCreate(BaseModel): + name: str # A friendly name for the API key + + +class User(BaseModel): + id: UUID + username: str + is_active: bool = True + is_admin: bool = False + api_keys: Dict[str, APIKey] = {} # key -> APIKey object + + +class AgentConfig(BaseModel): + """Configuration model for creating a new agent.""" + + agent_name: str = Field(..., description="Name of the agent") + model_name: str = Field( + ..., + description="Name of the llm you want to use provided by litellm", + ) + description: str = Field( + default="", description="Description of the agent's purpose" + ) + system_prompt: str = Field( + ..., description="System prompt for the agent" + ) + model_name: str = Field( + default="gpt-4", description="Model name to use" + ) + temperature: float = Field( + default=0.1, + ge=0.0, + le=2.0, + description="Temperature for the model", + ) + max_loops: int = Field( + default=1, ge=1, description="Maximum number of loops" + ) + autosave: bool = Field( + default=True, description="Enable autosave" + ) + dashboard: bool = Field( + default=False, description="Enable dashboard" + ) + verbose: bool = Field( + default=True, description="Enable verbose output" + ) + dynamic_temperature_enabled: bool = Field( + default=True, description="Enable dynamic temperature" + ) + user_name: str = Field( + default="default_user", description="Username for the agent" + ) + retry_attempts: int = Field( + default=1, ge=1, description="Number of retry attempts" + ) + context_length: int = Field( + default=200000, ge=1000, description="Context length" + ) + output_type: str = Field( + default="string", description="Output type (string or json)" + ) + streaming_on: bool = Field( + default=False, description="Enable streaming" + ) + tags: List[str] = Field( + default_factory=list, + description="Tags for categorizing the agent", + ) + + +class AgentUpdate(BaseModel): + """Model for updating agent configuration.""" + + description: Optional[str] = None + system_prompt: Optional[str] = None + temperature: Optional[float] = 0.5 + max_loops: Optional[int] = 1 + tags: Optional[List[str]] = None + status: Optional[AgentStatus] = None + + +class AgentSummary(BaseModel): + """Summary model for agent listing.""" + + agent_id: UUID + agent_name: str + description: str + created_at: datetime + last_used: datetime + total_completions: int + tags: List[str] + status: AgentStatus + + +class AgentMetrics(BaseModel): + """Model for agent performance metrics.""" + + total_completions: int + average_response_time: float + error_rate: float + last_24h_completions: int + total_tokens_used: int + uptime_percentage: float + success_rate: float + peak_tokens_per_minute: int + + +class CompletionRequest(BaseModel): + """Model for completion requests.""" + + prompt: str = Field(..., description="The prompt to process") + agent_id: UUID = Field(..., description="ID of the agent to use") + max_tokens: Optional[int] = Field( + None, description="Maximum tokens to generate" + ) + temperature_override: Optional[float] = 0.5 + stream: bool = Field( + default=False, description="Enable streaming response" + ) + + +class CompletionResponse(BaseModel): + """Model for completion responses.""" + + agent_id: UUID + response: str + metadata: Dict[str, Any] + timestamp: datetime + processing_time: float + token_usage: Dict[str, int] + + +class AgentStore: + """Enhanced store for managing agents.""" + + def __init__(self): + self.agents: Dict[UUID, Agent] = {} + self.agent_metadata: Dict[UUID, Dict[str, Any]] = {} + self.users: Dict[UUID, User] = {} # user_id -> User + self.api_keys: Dict[str, UUID] = {} # api_key -> user_id + self.user_agents: Dict[UUID, List[UUID]] = ( + {} + ) # user_id -> [agent_ids] + self.executor = ThreadPoolExecutor(max_workers=4) + self.total_requests = Value( + "i", 0 + ) # Shared counter for total requests + self._ensure_directories() + + def increment_request_count(self): + """Increment the total request counter.""" + with self.total_requests.get_lock(): + self.total_requests.value += 1 + + def get_total_requests(self) -> int: + """Get the total number of requests processed.""" + return self.total_requests.value + + def _ensure_directories(self): + """Ensure required directories exist.""" + Path("logs").mkdir(exist_ok=True) + Path("states").mkdir(exist_ok=True) + + def create_api_key(self, user_id: UUID, key_name: str) -> APIKey: + """Create a new API key for a user.""" + if user_id not in self.users: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + # Generate a secure random API key + api_key = secrets.token_urlsafe(API_KEY_LENGTH) + + # Create the API key object + key_object = APIKey( + key=api_key, + name=key_name, + created_at=datetime.utcnow(), + last_used=datetime.utcnow(), + ) + + # Store the API key + self.users[user_id].api_keys[api_key] = key_object + self.api_keys[api_key] = user_id + + return key_object + + async def verify_agent_access( + self, agent_id: UUID, user_id: UUID + ) -> bool: + """Verify if a user has access to an agent.""" + if agent_id not in self.agents: + return False + return ( + self.agent_metadata[agent_id]["owner_id"] == user_id + or self.users[user_id].is_admin + ) + + def validate_api_key(self, api_key: str) -> Optional[UUID]: + """Validate an API key and return the associated user ID.""" + user_id = self.api_keys.get(api_key) + if not user_id or api_key not in self.users[user_id].api_keys: + return None + + key_object = self.users[user_id].api_keys[api_key] + if not key_object.is_active: + return None + + # Update last used timestamp + key_object.last_used = datetime.utcnow() + return user_id + + async def create_agent( + self, config: AgentConfig, user_id: UUID + ) -> UUID: + """Create a new agent with the given configuration.""" + try: + + agent = Agent( + agent_name=config.agent_name, + system_prompt=config.system_prompt, + model_name=config.model_name, + max_loops=config.max_loops, + autosave=config.autosave, + dashboard=config.dashboard, + verbose=config.verbose, + dynamic_temperature_enabled=True, + saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", + user_name=config.user_name, + retry_attempts=config.retry_attempts, + context_length=config.context_length, + return_step_meta=True, + output_type="str", + streaming_on=config.streaming_on, + ) + + agent_id = uuid4() + self.agents[agent_id] = agent + self.agent_metadata[agent_id] = { + "description": config.description, + "created_at": datetime.utcnow(), + "last_used": datetime.utcnow(), + "total_completions": 0, + "tags": config.tags, + "total_tokens": 0, + "error_count": 0, + "response_times": [], + "status": AgentStatus.IDLE, + "start_time": datetime.utcnow(), + "downtime": timedelta(), + "successful_completions": 0, + } + + # Add to user's agents list + if user_id not in self.user_agents: + self.user_agents[user_id] = [] + self.user_agents[user_id].append(agent_id) + + return agent_id + + except Exception as e: + logger.error(f"Error creating agent: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to create agent: {str(e)}", + ) + + async def get_agent(self, agent_id: UUID) -> Agent: + """Retrieve an agent by ID.""" + agent = self.agents.get(agent_id) + if not agent: + logger.error(f"Agent not found: {agent_id}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {agent_id} not found", + ) + return agent + + async def update_agent( + self, agent_id: UUID, update: AgentUpdate + ) -> None: + """Update agent configuration.""" + agent = await self.get_agent(agent_id) + metadata = self.agent_metadata[agent_id] + + if update.system_prompt: + agent.system_prompt = update.system_prompt + if update.max_loops is not None: + agent.max_loops = update.max_loops + if update.tags is not None: + metadata["tags"] = update.tags + if update.description is not None: + metadata["description"] = update.description + if update.status is not None: + metadata["status"] = update.status + if update.status == AgentStatus.MAINTENANCE: + metadata["downtime"] += ( + datetime.utcnow() - metadata["last_used"] + ) + + logger.info(f"Updated agent {agent_id}") + + async def list_agents( + self, + tags: Optional[List[str]] = None, + status: Optional[AgentStatus] = None, + ) -> List[AgentSummary]: + """List all agents, optionally filtered by tags and status.""" + summaries = [] + for agent_id, agent in self.agents.items(): + metadata = self.agent_metadata[agent_id] + + # Apply filters + if tags and not any( + tag in metadata["tags"] for tag in tags + ): + continue + if status and metadata["status"] != status: + continue + + summaries.append( + AgentSummary( + agent_id=agent_id, + agent_name=agent.agent_name, + description=metadata["description"], + created_at=metadata["created_at"], + last_used=metadata["last_used"], + total_completions=metadata["total_completions"], + tags=metadata["tags"], + status=metadata["status"], + ) + ) + return summaries + + async def get_agent_metrics(self, agent_id: UUID) -> AgentMetrics: + """Get performance metrics for an agent.""" + metadata = self.agent_metadata[agent_id] + response_times = metadata["response_times"] + + # Calculate metrics + total_time = datetime.utcnow() - metadata["start_time"] + uptime = total_time - metadata["downtime"] + uptime_percentage = ( + uptime.total_seconds() / total_time.total_seconds() + ) * 100 + + success_rate = ( + metadata["successful_completions"] + / metadata["total_completions"] + * 100 + if metadata["total_completions"] > 0 + else 0 + ) + + return AgentMetrics( + total_completions=metadata["total_completions"], + average_response_time=( + sum(response_times) / len(response_times) + if response_times + else 0 + ), + error_rate=( + metadata["error_count"] + / metadata["total_completions"] + if metadata["total_completions"] > 0 + else 0 + ), + last_24h_completions=sum( + 1 + for t in response_times + if (datetime.utcnow() - t).days < 1 + ), + total_tokens_used=metadata["total_tokens"], + uptime_percentage=uptime_percentage, + success_rate=success_rate, + peak_tokens_per_minute=max( + metadata.get("tokens_per_minute", [0]) + ), + ) + + async def clone_agent( + self, agent_id: UUID, new_name: str + ) -> UUID: + """Clone an existing agent with a new name.""" + original_agent = await self.get_agent(agent_id) + original_metadata = self.agent_metadata[agent_id] + + config = AgentConfig( + agent_name=new_name, + description=f"Clone of {original_agent.agent_name}", + system_prompt=original_agent.system_prompt, + model_name=original_agent.model_name, + temperature=0.5, + max_loops=original_agent.max_loops, + tags=original_metadata["tags"], + ) + + return await self.create_agent(config) + + async def delete_agent(self, agent_id: UUID) -> None: + """Delete an agent.""" + if agent_id not in self.agents: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {agent_id} not found", + ) + + # Clean up any resources + agent = self.agents[agent_id] + if agent.autosave and os.path.exists(agent.saved_state_path): + os.remove(agent.saved_state_path) + + del self.agents[agent_id] + del self.agent_metadata[agent_id] + logger.info(f"Deleted agent {agent_id}") + + async def process_completion( + self, + agent: Agent, + prompt: str, + agent_id: UUID, + max_tokens: Optional[int] = None, + temperature_override: Optional[float] = None, + ) -> CompletionResponse: + """Process a completion request using the specified agent.""" + start_time = datetime.utcnow() + metadata = self.agent_metadata[agent_id] + + try: + # Update agent status + metadata["status"] = AgentStatus.PROCESSING + metadata["last_used"] = start_time + + # Process the completion + response = agent.run(prompt) + + # Update metrics + processing_time = ( + datetime.utcnow() - start_time + ).total_seconds() + metadata["response_times"].append(processing_time) + metadata["total_completions"] += 1 + metadata["successful_completions"] += 1 + + # Estimate token usage (this is a rough estimate) + prompt_tokens = len(prompt.split()) * 1.3 + completion_tokens = len(response.split()) * 1.3 + total_tokens = int(prompt_tokens + completion_tokens) + metadata["total_tokens"] += total_tokens + + # Update tokens per minute tracking + current_minute = datetime.utcnow().replace( + second=0, microsecond=0 + ) + if "tokens_per_minute" not in metadata: + metadata["tokens_per_minute"] = {} + metadata["tokens_per_minute"][current_minute] = ( + metadata["tokens_per_minute"].get(current_minute, 0) + + total_tokens + ) + + return CompletionResponse( + agent_id=agent_id, + response=response, + metadata={ + "agent_name": agent.agent_name, + # "model_name": agent.llm.model_name, + # "temperature": 0.5, + }, + timestamp=datetime.utcnow(), + processing_time=processing_time, + token_usage={ + "prompt_tokens": int(prompt_tokens), + "completion_tokens": int(completion_tokens), + "total_tokens": total_tokens, + }, + ) + + except Exception as e: + metadata["error_count"] += 1 + metadata["status"] = AgentStatus.ERROR + logger.error( + f"Error in completion processing: {str(e)}\n{traceback.format_exc()}" + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error processing completion: {str(e)}", + ) + finally: + metadata["status"] = AgentStatus.IDLE + + +class StoreManager: + _instance = None + + @classmethod + def get_instance(cls) -> "AgentStore": + if cls._instance is None: + cls._instance = AgentStore() + return cls._instance + + +# Modify the dependency function +def get_store() -> AgentStore: + """Dependency to get the AgentStore instance.""" + return StoreManager.get_instance() + + +# Security utility function using the new dependency +async def get_current_user( + api_key: str = Header( + ..., description="API key for authentication" + ), + store: AgentStore = Depends(get_store), +) -> User: + """Validate API key and return current user.""" + user_id = store.validate_api_key(api_key) + if not user_id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or expired API key", + headers={"WWW-Authenticate": "ApiKey"}, + ) + return store.users[user_id] + + +class SwarmsAPI: + """Enhanced API class for Swarms agent integration.""" + + def __init__(self): + self.app = FastAPI( + title="Swarms Agent API", + description="Production-grade API for Swarms agent interaction", + version="1.0.0", + docs_url="/v1/docs", + redoc_url="/v1/redoc", + ) + # Initialize the store using the singleton manager + self.store = StoreManager.get_instance() + + # Configure CORS + self.app.add_middleware( + CORSMiddleware, + allow_origins=[ + "*" + ], # Configure appropriately for production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + self._setup_routes() + + def _setup_routes(self): + """Set up API routes.""" + + # In your API code + @self.app.post("/v1/users", response_model=Dict[str, Any]) + async def create_user(request: Request): + """Create a new user and initial API key.""" + try: + body = await request.json() + username = body.get("username") + if not username or len(username) < 3: + raise HTTPException( + status_code=400, detail="Invalid username" + ) + + user_id = uuid4() + user = User(id=user_id, username=username) + self.store.users[user_id] = user + initial_key = self.store.create_api_key( + user_id, "Initial Key" + ) + return { + "user_id": user_id, + "api_key": initial_key.key, + } + except Exception as e: + logger.error(f"Error creating user: {str(e)}") + raise HTTPException(status_code=400, detail=str(e)) + + @self.app.post( + "/v1/users/{user_id}/api-keys", response_model=APIKey + ) + async def create_api_key( + user_id: UUID, + key_create: APIKeyCreate, + current_user: User = Depends(get_current_user), + ): + """Create a new API key for a user.""" + if ( + current_user.id != user_id + and not current_user.is_admin + ): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not authorized to create API keys for this user", + ) + + return self.store.create_api_key(user_id, key_create.name) + + @self.app.get( + "/v1/users/{user_id}/api-keys", + response_model=List[APIKey], + ) + async def list_api_keys( + user_id: UUID, + current_user: User = Depends(get_current_user), + ): + """List all API keys for a user.""" + if ( + current_user.id != user_id + and not current_user.is_admin + ): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not authorized to view API keys for this user", + ) + + return list(self.store.users[user_id].api_keys.values()) + + @self.app.delete("/v1/users/{user_id}/api-keys/{key}") + async def revoke_api_key( + user_id: UUID, + key: str, + current_user: User = Depends(get_current_user), + ): + """Revoke an API key.""" + if ( + current_user.id != user_id + and not current_user.is_admin + ): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not authorized to revoke API keys for this user", + ) + + if key in self.store.users[user_id].api_keys: + self.store.users[user_id].api_keys[ + key + ].is_active = False + del self.store.api_keys[key] + return {"status": "API key revoked"} + + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="API key not found", + ) + + @self.app.get( + "/v1/users/me/agents", response_model=List[AgentSummary] + ) + async def list_user_agents( + current_user: User = Depends(get_current_user), + tags: Optional[List[str]] = Query(None), + status: Optional[AgentStatus] = None, + ): + """List all agents owned by the current user.""" + user_agents = self.store.user_agents.get( + current_user.id, [] + ) + return [ + agent + for agent in await self.store.list_agents( + tags, status + ) + if agent.agent_id in user_agents + ] + + @self.app.middleware("http") + async def count_requests(request: Request, call_next): + """Middleware to count all incoming requests.""" + self.store.increment_request_count() + response = await call_next(request) + return response + + # Modify existing routes to use API key authentication + @self.app.post("/v1/agent", response_model=Dict[str, UUID]) + async def create_agent( + config: AgentConfig, + current_user: User = Depends(get_current_user), + ): + """Create a new agent with the specified configuration.""" + agent_id = await self.store.create_agent( + config, current_user.id + ) + return {"agent_id": agent_id} + + @self.app.get("/v1/agents", response_model=List[AgentSummary]) + async def list_agents( + tags: Optional[List[str]] = Query(None), + status: Optional[AgentStatus] = None, + ): + """List all agents, optionally filtered by tags and status.""" + return await self.store.list_agents(tags, status) + + @self.app.patch( + "/v1/agent/{agent_id}", response_model=Dict[str, str] + ) + async def update_agent(agent_id: UUID, update: AgentUpdate): + """Update an existing agent's configuration.""" + await self.store.update_agent(agent_id, update) + return {"status": "updated"} + + @self.app.get( + "/v1/agent/{agent_id}/metrics", + response_model=AgentMetrics, + ) + async def get_agent_metrics(agent_id: UUID): + """Get performance metrics for a specific agent.""" + return await self.store.get_agent_metrics(agent_id) + + @self.app.post( + "/v1/agent/{agent_id}/clone", + response_model=Dict[str, UUID], + ) + async def clone_agent(agent_id: UUID, new_name: str): + """Clone an existing agent with a new name.""" + new_id = await self.store.clone_agent(agent_id, new_name) + return {"agent_id": new_id} + + @self.app.delete("/v1/agent/{agent_id}") + async def delete_agent(agent_id: UUID): + """Delete an agent.""" + await self.store.delete_agent(agent_id) + return {"status": "deleted"} + + @self.app.post( + "/v1/agent/completions", response_model=CompletionResponse + ) + async def create_completion( + request: CompletionRequest, + background_tasks: BackgroundTasks, + ): + """Process a completion request with the specified agent.""" + try: + agent = await self.store.get_agent(request.agent_id) + + # Process completion + response = await self.store.process_completion( + agent, + request.prompt, + request.agent_id, + request.max_tokens, + 0.5, + ) + + # Schedule background cleanup + background_tasks.add_task( + self._cleanup_old_metrics, request.agent_id + ) + + return response + + except Exception as e: + logger.error(f"Error processing completion: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error processing completion: {str(e)}", + ) + + @self.app.get("/v1/agent/{agent_id}/status") + async def get_agent_status(agent_id: UUID): + """Get the current status of an agent.""" + metadata = self.store.agent_metadata.get(agent_id) + if not metadata: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {agent_id} not found", + ) + return { + "agent_id": agent_id, + "status": metadata["status"], + "last_used": metadata["last_used"], + "total_completions": metadata["total_completions"], + "error_count": metadata["error_count"], + } + + async def _cleanup_old_metrics(self, agent_id: UUID): + """Clean up old metrics data to prevent memory bloat.""" + metadata = self.store.agent_metadata.get(agent_id) + if metadata: + # Keep only last 24 hours of response times + cutoff = datetime.utcnow() - timedelta(days=1) + metadata["response_times"] = [ + t + for t in metadata["response_times"] + if isinstance(t, (int, float)) + and t > cutoff.timestamp() + ] + + # Clean up old tokens per minute data + if "tokens_per_minute" in metadata: + metadata["tokens_per_minute"] = { + k: v + for k, v in metadata["tokens_per_minute"].items() + if k > cutoff + } + + +def run_api_instance( + port: int, heartbeat_queue: Queue, shutdown_event: any +): + """Run a single API instance and report metrics.""" + try: + # Initialize API + api = SwarmsAPI() + process = psutil.Process() + + # Start metrics reporting + def report_metrics(): + while not shutdown_event.is_set(): + try: + cpu_percent = process.cpu_percent() + memory_percent = process.memory_percent() + heartbeat_queue.put( + ( + process.pid, + cpu_percent, + memory_percent, + api.store.get_total_requests(), + ) + ) + time.sleep(5) + except Exception as e: + logger.error(f"Error reporting metrics: {e}") + + metrics_thread = threading.Thread(target=report_metrics) + metrics_thread.daemon = True + metrics_thread.start() + + # Run API + uvicorn.run( + api.app, host="0.0.0.0", port=port, log_level="info" + ) + + except Exception as e: + logger.error(f"Error in API instance: {e}") + sys.exit(1) + + +class MultiProcessManager: + """Manages multiple API processes.""" + + def __init__( + self, base_port: int = 8000, num_processes: int = None + ): + self.base_port = base_port + self.num_processes = ( + num_processes or multiprocessing.cpu_count() + ) + self.processes: Dict[int, Process] = {} + self.metrics: Dict[int, ProcessMetrics] = {} + self.active = Value("b", True) + + def start_process(self, port: int) -> Process: + """Start a single API process.""" + process = Process(target=run_api_instance, args=(port,)) + process.start() + self.metrics[process.pid] = ProcessMetrics(process.pid, port) + self.processes[process.pid] = process + return process + + def monitor_processes(self): + """Monitor process health and metrics.""" + while self.active.value: + for pid, metrics in list(self.metrics.items()): + try: + # Update process metrics + process = psutil.Process(pid) + metrics.cpu_usage = process.cpu_percent() + metrics.memory_usage = process.memory_percent() + metrics.last_heartbeat = time.time() + except psutil.NoSuchProcess: + # Restart dead process + logger.warning( + f"Process {pid} died, restarting..." + ) + if pid in self.processes: + self.processes[pid].terminate() + del self.processes[pid] + self.start_process(metrics.port) + del self.metrics[pid] + time.sleep(5) + + def start(self): + """Start all API processes.""" + logger.info(f"Starting {self.num_processes} API processes...") + + # Start worker processes + for i in range(self.num_processes): + port = self.base_port + i + 1 + self.start_process(port) + + # Start monitoring thread + monitor_thread = threading.Thread( + target=self.monitor_processes + ) + monitor_thread.daemon = True + monitor_thread.start() + + logger.info("All processes started successfully") + + def shutdown(self): + """Shutdown all processes.""" + self.active.value = False + for process in self.processes.values(): + process.terminate() + process.join() + + +def create_app() -> FastAPI: + """Create and configure the FastAPI application.""" + logger.info("Creating FastAPI application") + api = SwarmsAPI() + app = api.app + logger.info("FastAPI application created successfully") + return app + + +class LoadBalancer: + """Load balancer for distributing requests across API instances.""" + + def __init__(self, process_manager: ProcessManager): + self.process_manager = process_manager + self.last_selected_pid = None + self._lock = Lock() + + def get_best_instance(self) -> Tuple[int, int]: + """Select the best instance to handle the next request based on load.""" + with self.process_manager.metrics_lock: + valid_instances = [ + (pid, metrics) + for pid, metrics in self.process_manager.metrics.items() + if time.time() - metrics.last_heartbeat < 30 + ] + + if not valid_instances: + raise RuntimeError( + "No healthy API instances available" + ) + + # Calculate load score for each instance + scores = [] + for pid, metrics in valid_instances: + cpu_score = metrics.cpu_usage / 100.0 + memory_score = metrics.memory_usage / 100.0 + request_score = ( + metrics.request_count / 1000.0 + ) # Normalize request count + total_score = ( + cpu_score + memory_score + request_score + ) / 3 + scores.append((pid, metrics.port, total_score)) + + # Select instance with lowest load score + selected_pid, selected_port, _ = min( + scores, key=lambda x: x[2] + ) + return selected_pid, selected_port + + +class LoadBalancedAPI(SwarmsAPI): + """Enhanced API class with load balancing capabilities.""" + + def __init__( + self, + process_manager: ProcessManager, + load_balancer: LoadBalancer, + ): + super().__init__() + self.process_manager = process_manager + self.load_balancer = load_balancer + self.request_count = Value("i", 0) + self.add_middleware() + + def add_middleware(self): + """Add middleware for request routing and metrics collection.""" + + @self.app.middleware("http") + async def route_request(request: Request, call_next): + try: + # Increment request count + with self.request_count.get_lock(): + self.request_count.value += 1 + + # Get best instance for processing + pid, port = self.load_balancer.get_best_instance() + + # Forward request if not already on the best instance + if request.url.port != port: + async with httpx.AsyncClient() as client: + forwarded_url = f"http://localhost:{port}{request.url.path}" + response = await client.request( + request.method, + forwarded_url, + headers=dict(request.headers), + content=await request.body(), + ) + return httpx.Response( + content=response.content, + status_code=response.status_code, + headers=dict(response.headers), + ) + + # Process request locally if already on the best instance + response = await call_next(request) + return response + + except Exception as e: + logger.error(f"Error routing request: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(e), + ) + + +def run_worker(port: int): + """Run a single worker instance.""" + try: + api = SwarmsAPI() + uvicorn.run( + api.app, host="0.0.0.0", port=port, log_level="info" + ) + logger.info(f"Worker started on port {port}") + except Exception as e: + logger.error(f"Worker error: {e}") + + +def main(): + """Main entry point for the multi-process API.""" + # Initialize processes list before any potential exceptions + processes = [] + + try: + # Try to get current method, only set if not already set + try: + current_method = multiprocessing.get_start_method() + logger.info( + f"Using existing start method: {current_method}" + ) + except RuntimeError: + try: + multiprocessing.set_start_method("fork") + logger.info("Set start method to fork") + except RuntimeError: + logger.warning("Using default start method") + + # Calculate number of workers + num_workers = max(1, multiprocessing.cpu_count() - 1) + base_port = 8000 + + # Start worker processes + for i in range(num_workers): + port = base_port + i + 1 + process = Process(target=run_worker, args=(port,)) + process.start() + processes.append(process) + logger.info(f"Started worker on port {port}") + + # Run main instance + api = SwarmsAPI() + + def shutdown_handler(signum, frame): + logger.info("Shutting down workers...") + for p in processes: + try: + p.terminate() + p.join(timeout=5) + logger.info(f"Worker {p.pid} terminated") + except Exception as e: + logger.error(f"Error shutting down worker: {e}") + sys.exit(0) + + signal.signal(signal.SIGINT, shutdown_handler) + signal.signal(signal.SIGTERM, shutdown_handler) + + # Run main instance + uvicorn.run( + api.app, host="0.0.0.0", port=base_port, log_level="info" + ) + logger.info(f"Main instance started on port {base_port}") + + except Exception as e: + logger.error(f"Startup error: {e}") + # Clean up any started processes + for p in processes: + try: + p.terminate() + p.join(timeout=5) + logger.info( + f"Worker {p.pid} terminated during cleanup" + ) + except Exception as cleanup_error: + logger.error(f"Error during cleanup: {cleanup_error}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/api/agent_api_test.py b/api/agent_api_test.py index f95e08d1..10addd36 100644 --- a/api/agent_api_test.py +++ b/api/agent_api_test.py @@ -1,320 +1,291 @@ +import os +import json +import logging +from typing import Dict, Optional, Any +from dataclasses import dataclass import requests -from loguru import logger import time -from typing import Dict, Optional, Tuple -from uuid import UUID -BASE_URL = "http://0.0.0.0:8000/v1" - - -def check_api_server() -> bool: - """Check if the API server is running and accessible.""" - try: - response = requests.get(f"{BASE_URL}/docs") - return response.status_code == 200 - except requests.exceptions.ConnectionError: - logger.error("API server is not running at {BASE_URL}") - logger.error("Please start the API server first with:") - logger.error(" python main.py") - return False - except Exception as e: - logger.error(f"Error checking API server: {str(e)}") - return False - - -class TestSession: - """Manages test session state and authentication.""" - - def __init__(self): - self.user_id: Optional[UUID] = None - self.api_key: Optional[str] = None - self.test_agents: list[UUID] = [] - - @property - def headers(self) -> Dict[str, str]: - """Get headers with authentication.""" - return {"api-key": self.api_key} if self.api_key else {} - - -def create_test_user(session: TestSession) -> Tuple[bool, str]: - """Create a test user and store credentials in session.""" - logger.info("Creating test user") - - try: - response = requests.post( - f"{BASE_URL}/users", - json={"username": f"test_user_{int(time.time())}"}, - ) - - if response.status_code == 200: - data = response.json() - session.user_id = data["user_id"] - session.api_key = data["api_key"] - logger.success(f"Created user with ID: {session.user_id}") - return True, "Success" - else: - logger.error(f"Failed to create user: {response.text}") - return False, response.text - except Exception as e: - logger.exception("Exception during user creation") - return False, str(e) - - -def create_additional_api_key( - session: TestSession, -) -> Tuple[bool, str]: - """Test creating an additional API key.""" - logger.info("Creating additional API key") - - try: - response = requests.post( - f"{BASE_URL}/users/{session.user_id}/api-keys", - headers=session.headers, - json={"name": "Test Key"}, +# Set up logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + logging.FileHandler("api_tests.log"), + logging.StreamHandler(), + ], +) +logger = logging.getLogger(__name__) + + +# Configuration +@dataclass +class TestConfig: + """Test configuration settings""" + + base_url: str + timeout: int = 30 + verify_ssl: bool = True + debug: bool = True + + +# Load config from environment or use defaults +config = TestConfig( + base_url=os.getenv("API_BASE_URL", "http://0.0.0.0:8000/v1") +) + + +class APIClient: + """API Client for testing""" + + def __init__(self, config: TestConfig): + self.config = config + self.session = requests.Session() + + def _url(self, path: str) -> str: + """Construct full URL""" + return f"{self.config.base_url}/{path.lstrip('/')}" + + def _log_request_details( + self, method: str, url: str, headers: Dict, data: Any + ): + """Log request details for debugging""" + logger.info("\nRequest Details:") + logger.info(f"Method: {method}") + logger.info(f"URL: {url}") + logger.info(f"Headers: {json.dumps(headers, indent=2)}") + logger.info( + f"Data: {json.dumps(data, indent=2) if data else None}" ) - if response.status_code == 200: - logger.success("Created additional API key") - return True, response.json()["key"] - else: - logger.error(f"Failed to create API key: {response.text}") - return False, response.text - except Exception as e: - logger.exception("Exception during API key creation") - return False, str(e) - - -def test_create_agent( - session: TestSession, -) -> Tuple[bool, Optional[UUID]]: - """Test creating a new agent.""" - logger.info("Testing agent creation") - - payload = { - "agent_name": f"Test Agent {int(time.time())}", - "system_prompt": "You are a helpful assistant", - "model_name": "gpt-4", - "description": "Test agent", - "tags": ["test", "automated"], - } - - try: - response = requests.post( - f"{BASE_URL}/agent", headers=session.headers, json=payload + def _log_response_details(self, response: requests.Response): + """Log response details for debugging""" + logger.info("\nResponse Details:") + logger.info(f"Status Code: {response.status_code}") + logger.info( + f"Headers: {json.dumps(dict(response.headers), indent=2)}" ) - - if response.status_code == 200: - agent_id = response.json()["agent_id"] - session.test_agents.append(agent_id) - logger.success(f"Created agent with ID: {agent_id}") - return True, agent_id - else: - logger.error(f"Failed to create agent: {response.text}") - return False, None - except Exception: - logger.exception("Exception during agent creation") - return False, None - - -def test_list_user_agents(session: TestSession) -> bool: - """Test listing user's agents.""" - logger.info("Testing user agent listing") - - try: - response = requests.get( - f"{BASE_URL}/users/me/agents", headers=session.headers - ) - - if response.status_code == 200: - agents = response.json() - logger.success(f"Found {len(agents)} user agents") - return True - else: - logger.error( - f"Failed to list user agents: {response.text}" + try: + logger.info( + f"Body: {json.dumps(response.json(), indent=2)}" ) - return False - except Exception: - logger.exception("Exception during agent listing") - return False - - -def test_agent_operations( - session: TestSession, agent_id: UUID -) -> bool: - """Test various operations on an agent.""" - logger.info(f"Testing operations for agent {agent_id}") - - # Test update - try: - update_response = requests.patch( - f"{BASE_URL}/agent/{agent_id}", - headers=session.headers, - json={ - "description": "Updated description", - "tags": ["test", "updated"], - }, - ) - if update_response.status_code != 200: - logger.error( - f"Failed to update agent: {update_response.text}" + except Exception: + logger.info(f"Body: {response.text}") + + def _request( + self, + method: str, + path: str, + headers: Optional[Dict] = None, + **kwargs: Any, + ) -> requests.Response: + """Make HTTP request with config defaults""" + url = self._url(path) + headers = headers or {} + + if self.config.debug: + self._log_request_details( + method, url, headers, kwargs.get("json") ) - return False - # Test metrics - metrics_response = requests.get( - f"{BASE_URL}/agent/{agent_id}/metrics", - headers=session.headers, - ) - if metrics_response.status_code != 200: - logger.error( - f"Failed to get agent metrics: {metrics_response.text}" + try: + response = self.session.request( + method=method, + url=url, + headers=headers, + timeout=self.config.timeout, + verify=self.config.verify_ssl, + **kwargs, ) - return False - - logger.success("Successfully performed agent operations") - return True - except Exception: - logger.exception("Exception during agent operations") - return False + if self.config.debug: + self._log_response_details(response) -def test_completion(session: TestSession, agent_id: UUID) -> bool: - """Test running a completion.""" - logger.info("Testing completion") + if response.status_code >= 400: + logger.error( + f"Request failed with status {response.status_code}" + ) + logger.error(f"Response: {response.text}") - payload = { - "prompt": "What is the weather like today?", - "agent_id": agent_id, - "max_tokens": 100, - } + response.raise_for_status() + return response - try: - response = requests.post( - f"{BASE_URL}/agent/completions", - headers=session.headers, - json=payload, - ) + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {str(e)}") + if hasattr(e, "response") and e.response is not None: + logger.error(f"Error response: {e.response.text}") + raise - if response.status_code == 200: - completion_data = response.json() - print(completion_data) - logger.success( - f"Got completion, used {completion_data['token_usage']['total_tokens']} tokens" - ) - return True - else: - logger.error(f"Failed to get completion: {response.text}") - return False - except Exception: - logger.exception("Exception during completion") - return False +class TestRunner: + """Test runner with logging and reporting""" -def cleanup_test_resources(session: TestSession): - """Clean up all test resources.""" - logger.info("Cleaning up test resources") + def __init__(self): + self.client = APIClient(config) + self.results = {"passed": 0, "failed": 0, "total_time": 0} + self.api_key = None + self.user_id = None + self.agent_id = None - # Delete test agents - for agent_id in session.test_agents: - try: - response = requests.delete( - f"{BASE_URL}/agent/{agent_id}", - headers=session.headers, - ) - if response.status_code == 200: - logger.debug(f"Deleted agent {agent_id}") - else: - logger.warning( - f"Failed to delete agent {agent_id}: {response.text}" - ) - except Exception: - logger.exception(f"Exception deleting agent {agent_id}") + def run_test(self, test_name: str, test_func: callable): + """Run a single test with timing and logging""" + logger.info(f"\nRunning test: {test_name}") + start_time = time.time() - # Revoke API keys - if session.user_id: try: - response = requests.get( - f"{BASE_URL}/users/{session.user_id}/api-keys", - headers=session.headers, - ) - if response.status_code == 200: - for key in response.json(): - try: - revoke_response = requests.delete( - f"{BASE_URL}/users/{session.user_id}/api-keys/{key['key']}", - headers=session.headers, - ) - if revoke_response.status_code == 200: - logger.debug( - f"Revoked API key {key['name']}" - ) - else: - logger.warning( - f"Failed to revoke API key {key['name']}" - ) - except Exception: - logger.exception( - f"Exception revoking API key {key['name']}" - ) - except Exception: - logger.exception("Exception getting API keys for cleanup") - - -def run_test_workflow(): - """Run complete test workflow.""" - logger.info("Starting API tests") - - # Check if API server is running first - if not check_api_server(): - return False - - session = TestSession() + test_func() + self.results["passed"] += 1 + logger.info(f"✅ {test_name} - PASSED") + except Exception as e: + self.results["failed"] += 1 + logger.error(f"❌ {test_name} - FAILED: {str(e)}") + logger.exception(e) + + end_time = time.time() + duration = end_time - start_time + self.results["total_time"] += duration + logger.info(f"Test duration: {duration:.2f}s") + + def test_user_creation(self): + """Test user creation""" + response = self.client._request( + "POST", "/users", json={"username": "test_user"} + ) + data = response.json() + assert "user_id" in data, "No user_id in response" + assert "api_key" in data, "No api_key in response" + self.api_key = data["api_key"] + self.user_id = data["user_id"] + logger.info(f"Created user with ID: {self.user_id}") + + def test_create_api_key(self): + """Test API key creation""" + headers = {"api-key": self.api_key} + response = self.client._request( + "POST", + f"/users/{self.user_id}/api-keys", + headers=headers, + json={"name": "test_key"}, + ) + data = response.json() + assert "key" in data, "No key in response" + logger.info("Successfully created new API key") + + def test_create_agent(self): + """Test agent creation""" + headers = {"api-key": self.api_key} + agent_config = { + "agent_name": "test_agent", + "model_name": "gpt-4", + "system_prompt": "You are a test agent", + "description": "Test agent description", + "temperature": 0.7, + "max_loops": 1, + } + response = self.client._request( + "POST", "/agent", headers=headers, json=agent_config + ) + data = response.json() + assert "agent_id" in data, "No agent_id in response" + self.agent_id = data["agent_id"] + logger.info(f"Created agent with ID: {self.agent_id}") + + # Wait a bit for agent to be ready + time.sleep(2) + + def test_list_agents(self): + """Test agent listing""" + headers = {"api-key": self.api_key} + response = self.client._request( + "GET", "/agents", headers=headers + ) + agents = response.json() + assert isinstance(agents, list), "Response is not a list" + assert len(agents) > 0, "No agents returned" + logger.info(f"Successfully retrieved {len(agents)} agents") + + def test_agent_completion(self): + """Test agent completion""" + if not self.agent_id: + logger.error("No agent_id available for completion test") + raise ValueError("Agent ID not set") + + headers = {"api-key": self.api_key} + completion_request = { + "prompt": "Write 'Hello World!'", + "agent_id": str( + self.agent_id + ), # Ensure UUID is converted to string + "max_tokens": 100, + "stream": False, + "temperature_override": 0.7, + } + + logger.info( + f"Sending completion request for agent {self.agent_id}" + ) + response = self.client._request( + "POST", + "/agent/completions", + headers=headers, + json=completion_request, + ) + data = response.json() + assert "response" in data, "No response in completion" + logger.info(f"Completion response: {data.get('response')}") + + def run_all_tests(self): + """Run all tests and generate report""" + logger.info("\n" + "=" * 50) + logger.info("Starting API test suite...") + logger.info(f"Base URL: {config.base_url}") + logger.info("=" * 50 + "\n") + + # Define test sequence + tests = [ + ("User Creation", self.test_user_creation), + ("API Key Creation", self.test_create_api_key), + ("Agent Creation", self.test_create_agent), + ("List Agents", self.test_list_agents), + ("Agent Completion", self.test_agent_completion), + ] + + # Run tests + for test_name, test_func in tests: + self.run_test(test_name, test_func) + + # Generate report + self.print_report() + + def print_report(self): + """Print test results report""" + total_tests = self.results["passed"] + self.results["failed"] + success_rate = ( + (self.results["passed"] / total_tests * 100) + if total_tests > 0 + else 0 + ) - try: - # Create user - user_success, message = create_test_user(session) - if not user_success: - logger.error(f"User creation failed: {message}") - return False - - # Create additional API key - key_success, key = create_additional_api_key(session) - if not key_success: - logger.error(f"API key creation failed: {key}") - return False - - # Create agent - agent_success, agent_id = test_create_agent(session) - if not agent_success or not agent_id: - logger.error("Agent creation failed") - return False - - # Test user agent listing - if not test_list_user_agents(session): - logger.error("Agent listing failed") - return False - - # Test agent operations - if not test_agent_operations(session, agent_id): - logger.error("Agent operations failed") - return False - - # Test completion - if not test_completion(session, agent_id): - logger.error("Completion test failed") - return False - - logger.success("All tests completed successfully") - return True - - except Exception: - logger.exception("Exception during test workflow") - return False - finally: - cleanup_test_resources(session) + report = f""" +\n{'='*50} +API TEST RESULTS +{'='*50} +Total Tests: {total_tests} +Passed: {self.results['passed']} ✅ +Failed: {self.results['failed']} ❌ +Success Rate: {success_rate:.2f}% +Total Time: {self.results['total_time']:.2f}s +{'='*50} +""" + logger.info(report) if __name__ == "__main__": - success = run_test_workflow() - print(success) + try: + runner = TestRunner() + runner.run_all_tests() + except KeyboardInterrupt: + logger.info("\nTest suite interrupted by user") + except Exception as e: + logger.error(f"Test suite failed: {str(e)}") + logger.exception(e) diff --git a/api/api_test.py b/api/api_test.py new file mode 100644 index 00000000..c7f3e283 --- /dev/null +++ b/api/api_test.py @@ -0,0 +1,254 @@ +import os +from typing import Dict, Optional, Any +from dataclasses import dataclass +import pytest +import requests +from uuid import UUID +from pydantic import BaseModel +from _pytest.terminal import TerminalReporter + + +# Configuration +@dataclass +class TestConfig: + """Test configuration settings""" + + base_url: str + timeout: int = 30 + verify_ssl: bool = True + + +# Load config from environment or use defaults +config = TestConfig( + base_url=os.getenv("API_BASE_URL", "http://localhost:8000/v1") +) + + +# API Response Types +class UserResponse(BaseModel): + user_id: str + api_key: str + + +class AgentResponse(BaseModel): + agent_id: UUID + + +class MetricsResponse(BaseModel): + total_completions: int + average_response_time: float + error_rate: float + last_24h_completions: int + total_tokens_used: int + uptime_percentage: float + success_rate: float + peak_tokens_per_minute: int + + +class APIClient: + """API Client with typed methods""" + + def __init__(self, config: TestConfig): + self.config = config + self.session = requests.Session() + + def _url(self, path: str) -> str: + """Construct full URL""" + return f"{self.config.base_url}/{path.lstrip('/')}" + + def _request( + self, + method: str, + path: str, + headers: Optional[Dict] = None, + **kwargs: Any, + ) -> requests.Response: + """Make HTTP request with config defaults""" + url = self._url(path) + return self.session.request( + method=method, + url=url, + headers=headers, + timeout=self.config.timeout, + verify=self.config.verify_ssl, + **kwargs, + ) + + def create_user(self, username: str) -> UserResponse: + """Create a new user""" + response = self._request( + "POST", "/users", json={"username": username} + ) + response.raise_for_status() + return UserResponse(**response.json()) + + def create_agent( + self, agent_config: Dict[str, Any], api_key: str + ) -> AgentResponse: + """Create a new agent""" + headers = {"api-key": api_key} + response = self._request( + "POST", "/agent", headers=headers, json=agent_config + ) + response.raise_for_status() + return AgentResponse(**response.json()) + + def get_metrics( + self, agent_id: UUID, api_key: str + ) -> MetricsResponse: + """Get agent metrics""" + headers = {"api-key": api_key} + response = self._request( + "GET", f"/agent/{agent_id}/metrics", headers=headers + ) + response.raise_for_status() + return MetricsResponse(**response.json()) + + +# Test Fixtures +@pytest.fixture +def api_client() -> APIClient: + """Fixture for API client""" + return APIClient(config) + + +@pytest.fixture +def test_user(api_client: APIClient) -> UserResponse: + """Fixture for test user""" + return api_client.create_user("test_user") + + +@pytest.fixture +def test_agent( + api_client: APIClient, test_user: UserResponse +) -> AgentResponse: + """Fixture for test agent""" + agent_config = { + "agent_name": "test_agent", + "model_name": "gpt-4", + "system_prompt": "You are a test agent", + "description": "Test agent description", + } + return api_client.create_agent(agent_config, test_user.api_key) + + +# Tests +def test_user_creation(api_client: APIClient): + """Test user creation flow""" + response = api_client.create_user("new_test_user") + assert response.user_id + assert response.api_key + + +def test_agent_creation( + api_client: APIClient, test_user: UserResponse +): + """Test agent creation flow""" + agent_config = { + "agent_name": "test_agent", + "model_name": "gpt-4", + "system_prompt": "You are a test agent", + "description": "Test agent description", + } + response = api_client.create_agent( + agent_config, test_user.api_key + ) + assert response.agent_id + + +def test_agent_metrics( + api_client: APIClient, + test_user: UserResponse, + test_agent: AgentResponse, +): + """Test metrics retrieval""" + metrics = api_client.get_metrics( + test_agent.agent_id, test_user.api_key + ) + assert metrics.total_completions >= 0 + assert metrics.error_rate >= 0 + assert metrics.uptime_percentage >= 0 + + +def test_invalid_auth(api_client: APIClient): + """Test invalid authentication""" + with pytest.raises(requests.exceptions.HTTPError) as exc_info: + api_client.create_agent({}, "invalid_key") + assert exc_info.value.response.status_code == 401 + + +# Custom pytest plugin to capture test results +class ResultCapture: + def __init__(self): + self.total = 0 + self.passed = 0 + self.failed = 0 + self.errors = 0 + + +@pytest.hookimpl(hookwrapper=True) +def pytest_terminal_summary( + terminalreporter: TerminalReporter, exitstatus: int +): + yield + capture = getattr( + terminalreporter.config, "_result_capture", None + ) + if capture: + capture.total = ( + len(terminalreporter.stats.get("passed", [])) + + len(terminalreporter.stats.get("failed", [])) + + len(terminalreporter.stats.get("error", [])) + ) + capture.passed = len(terminalreporter.stats.get("passed", [])) + capture.failed = len(terminalreporter.stats.get("failed", [])) + capture.errors = len(terminalreporter.stats.get("error", [])) + + +@dataclass +class TestReport: + total_tests: int + passed: int + failed: int + errors: int + + @property + def success_rate(self) -> float: + return ( + (self.passed / self.total_tests) * 100 + if self.total_tests > 0 + else 0 + ) + + +def run_tests() -> TestReport: + """Run tests and generate typed report""" + # Create result capture + capture = ResultCapture() + + # Create pytest configuration + args = [__file__, "-v"] + + # Run pytest with our plugin + pytest.main(args, plugins=[capture]) + + # Generate report + return TestReport( + total_tests=capture.total, + passed=capture.passed, + failed=capture.failed, + errors=capture.errors, + ) + + +if __name__ == "__main__": + # Example usage with environment variable + # export API_BASE_URL=http://api.example.com/v1 + + report = run_tests() + print("\nTest Results:") + print(f"Total Tests: {report.total_tests}") + print(f"Passed: {report.passed}") + print(f"Failed: {report.failed}") + print(f"Errors: {report.errors}") + print(f"Success Rate: {report.success_rate:.2f}%") diff --git a/api/api_tests.py b/api/api_tests.py new file mode 100644 index 00000000..43b1d119 --- /dev/null +++ b/api/api_tests.py @@ -0,0 +1,472 @@ +import asyncio +import json +from datetime import datetime +from typing import Any, Dict, List, Optional +from uuid import UUID + +import httpx +from loguru import logger + +# Configure logger +logger.add( + "tests/api_test_{time}.log", + rotation="1 day", + retention="7 days", + level="DEBUG", + format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", +) + + +class TestConfig: + """Test configuration and utilities""" + + BASE_URL: str = "http://localhost:8000/v1" + TEST_USERNAME: str = "test_user" + api_key: Optional[str] = None + user_id: Optional[UUID] = None + test_agent_id: Optional[UUID] = None + + +class TestResult: + """Model for test results""" + + def __init__( + self, + test_name: str, + status: str, + duration: float, + error: Optional[str] = None, + details: Optional[Dict[str, Any]] = None, + ): + self.test_name = test_name + self.status = status + self.duration = duration + self.error = error + self.details = details or {} + + def dict(self): + return { + "test_name": self.test_name, + "status": self.status, + "duration": self.duration, + "error": self.error, + "details": self.details, + } + + +async def log_response( + response: httpx.Response, test_name: str +) -> None: + """Log API response details""" + logger.debug(f"\n{test_name} Response:") + logger.debug(f"Status Code: {response.status_code}") + logger.debug(f"Headers: {dict(response.headers)}") + try: + logger.debug(f"Body: {response.json()}") + except json.JSONDecodeError: + logger.debug(f"Body: {response.text}") + + +async def create_test_user() -> TestResult: + """Create a test user and get API key""" + start_time = datetime.now() + try: + async with httpx.AsyncClient() as client: + response = await client.post( + f"{TestConfig.BASE_URL}/users", + json={"username": TestConfig.TEST_USERNAME}, + ) + await log_response(response, "Create User") + + if response.status_code == 200: + data = response.json() + TestConfig.api_key = data["api_key"] + TestConfig.user_id = UUID(data["user_id"]) + return TestResult( + test_name="create_test_user", + status="passed", + duration=( + datetime.now() - start_time + ).total_seconds(), + details={"user_id": str(TestConfig.user_id)}, + ) + else: + return TestResult( + test_name="create_test_user", + status="failed", + duration=( + datetime.now() - start_time + ).total_seconds(), + error=f"Failed to create user: {response.text}", + ) + except Exception as e: + logger.error(f"Error in create_test_user: {str(e)}") + return TestResult( + test_name="create_test_user", + status="error", + duration=(datetime.now() - start_time).total_seconds(), + error=str(e), + ) + + +async def create_test_agent() -> TestResult: + """Create a test agent""" + start_time = datetime.now() + try: + # Create agent config according to the AgentConfig model + agent_config = { + "agent_name": "test_agent", + "model_name": "gpt-4", + "description": "Test agent for API testing", + "system_prompt": "You are a test agent.", + "temperature": 0.1, + "max_loops": 1, + "dynamic_temperature_enabled": True, + "user_name": TestConfig.TEST_USERNAME, + "retry_attempts": 1, + "context_length": 4000, + "output_type": "string", + "streaming_on": False, + "tags": ["test", "api"], + "stopping_token": "", + "auto_generate_prompt": False, + } + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{TestConfig.BASE_URL}/agent", + json=agent_config, + headers={"api-key": TestConfig.api_key}, + ) + await log_response(response, "Create Agent") + + if response.status_code == 200: + data = response.json() + TestConfig.test_agent_id = UUID(data["agent_id"]) + return TestResult( + test_name="create_test_agent", + status="passed", + duration=( + datetime.now() - start_time + ).total_seconds(), + details={ + "agent_id": str(TestConfig.test_agent_id) + }, + ) + else: + return TestResult( + test_name="create_test_agent", + status="failed", + duration=( + datetime.now() - start_time + ).total_seconds(), + error=f"Failed to create agent: {response.text}", + ) + except Exception as e: + logger.error(f"Error in create_test_agent: {str(e)}") + return TestResult( + test_name="create_test_agent", + status="error", + duration=(datetime.now() - start_time).total_seconds(), + error=str(e), + ) + + +async def test_agent_completion() -> TestResult: + """Test agent completion endpoint""" + start_time = datetime.now() + try: + completion_request = { + "prompt": "Hello, this is a test prompt.", + "agent_id": str(TestConfig.test_agent_id), + "max_tokens": 100, + "temperature_override": 0.5, + "stream": False, + } + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{TestConfig.BASE_URL}/agent/completions", + json=completion_request, + headers={"api-key": TestConfig.api_key}, + ) + await log_response(response, "Agent Completion") + + if response.status_code == 200: + return TestResult( + test_name="test_agent_completion", + status="passed", + duration=( + datetime.now() - start_time + ).total_seconds(), + details={"response": response.json()}, + ) + else: + return TestResult( + test_name="test_agent_completion", + status="failed", + duration=( + datetime.now() - start_time + ).total_seconds(), + error=f"Failed completion test: {response.text}", + ) + except Exception as e: + logger.error(f"Error in test_agent_completion: {str(e)}") + return TestResult( + test_name="test_agent_completion", + status="error", + duration=(datetime.now() - start_time).total_seconds(), + error=str(e), + ) + + +async def test_agent_metrics() -> TestResult: + """Test agent metrics endpoint""" + start_time = datetime.now() + try: + if not TestConfig.test_agent_id: + return TestResult( + test_name="test_agent_metrics", + status="failed", + duration=( + datetime.now() - start_time + ).total_seconds(), + error="No test agent ID available", + ) + + async with httpx.AsyncClient() as client: + response = await client.get( + f"{TestConfig.BASE_URL}/agent/{str(TestConfig.test_agent_id)}/metrics", + headers={"api-key": TestConfig.api_key}, + ) + await log_response(response, "Agent Metrics") + + if response.status_code == 200: + return TestResult( + test_name="test_agent_metrics", + status="passed", + duration=( + datetime.now() - start_time + ).total_seconds(), + details={"metrics": response.json()}, + ) + else: + return TestResult( + test_name="test_agent_metrics", + status="failed", + duration=( + datetime.now() - start_time + ).total_seconds(), + error=f"Failed metrics test: {response.text}", + ) + except Exception as e: + logger.error(f"Error in test_agent_metrics: {str(e)}") + return TestResult( + test_name="test_agent_metrics", + status="error", + duration=(datetime.now() - start_time).total_seconds(), + error=str(e), + ) + + +async def test_update_agent() -> TestResult: + """Test agent update endpoint""" + start_time = datetime.now() + try: + if not TestConfig.test_agent_id: + return TestResult( + test_name="test_update_agent", + status="failed", + duration=( + datetime.now() - start_time + ).total_seconds(), + error="No test agent ID available", + ) + + update_data = { + "description": "Updated test agent description", + "tags": ["test", "updated"], + "max_loops": 2, + } + + async with httpx.AsyncClient() as client: + response = await client.patch( + f"{TestConfig.BASE_URL}/agent/{str(TestConfig.test_agent_id)}", + json=update_data, + headers={"api-key": TestConfig.api_key}, + ) + await log_response(response, "Update Agent") + + if response.status_code == 200: + return TestResult( + test_name="test_update_agent", + status="passed", + duration=( + datetime.now() - start_time + ).total_seconds(), + details={"update_response": response.json()}, + ) + else: + return TestResult( + test_name="test_update_agent", + status="failed", + duration=( + datetime.now() - start_time + ).total_seconds(), + error=f"Failed update test: {response.text}", + ) + except Exception as e: + logger.error(f"Error in test_update_agent: {str(e)}") + return TestResult( + test_name="test_update_agent", + status="error", + duration=(datetime.now() - start_time).total_seconds(), + error=str(e), + ) + + +async def test_error_handling() -> TestResult: + """Test API error handling""" + start_time = datetime.now() + try: + async with httpx.AsyncClient() as client: + # Test with invalid API key + invalid_agent_id = "00000000-0000-0000-0000-000000000000" + response = await client.get( + f"{TestConfig.BASE_URL}/agent/{invalid_agent_id}/metrics", + headers={"api-key": "invalid_key"}, + ) + await log_response(response, "Invalid API Key Test") + + if response.status_code in [401, 403]: + return TestResult( + test_name="test_error_handling", + status="passed", + duration=( + datetime.now() - start_time + ).total_seconds(), + details={"error_response": response.json()}, + ) + else: + return TestResult( + test_name="test_error_handling", + status="failed", + duration=( + datetime.now() - start_time + ).total_seconds(), + error="Error handling test failed", + ) + except Exception as e: + logger.error(f"Error in test_error_handling: {str(e)}") + return TestResult( + test_name="test_error_handling", + status="error", + duration=(datetime.now() - start_time).total_seconds(), + error=str(e), + ) + + +async def cleanup_test_resources() -> TestResult: + """Clean up test resources""" + start_time = datetime.now() + try: + if TestConfig.test_agent_id: + async with httpx.AsyncClient() as client: + response = await client.delete( + f"{TestConfig.BASE_URL}/agent/{str(TestConfig.test_agent_id)}", + headers={"api-key": TestConfig.api_key}, + ) + await log_response(response, "Delete Agent") + + return TestResult( + test_name="cleanup_test_resources", + status="passed", + duration=(datetime.now() - start_time).total_seconds(), + details={"cleanup": "completed"}, + ) + except Exception as e: + logger.error(f"Error in cleanup_test_resources: {str(e)}") + return TestResult( + test_name="cleanup_test_resources", + status="error", + duration=(datetime.now() - start_time).total_seconds(), + error=str(e), + ) + + +async def run_all_tests() -> List[TestResult]: + """Run all tests in sequence""" + logger.info("Starting API test suite") + results = [] + + # Initialize + results.append(await create_test_user()) + if results[-1].status != "passed": + logger.error( + "Failed to create test user, aborting remaining tests" + ) + return results + + # Add delay to ensure user is properly created + await asyncio.sleep(1) + + # Core tests + test_functions = [ + create_test_agent, + test_agent_completion, + test_agent_metrics, + test_update_agent, + test_error_handling, + ] + + for test_func in test_functions: + result = await test_func() + results.append(result) + logger.info(f"Test {result.test_name}: {result.status}") + if result.error: + logger.error( + f"Error in {result.test_name}: {result.error}" + ) + + # Add small delay between tests + await asyncio.sleep(0.5) + + # Cleanup + results.append(await cleanup_test_resources()) + + # Log summary + passed = sum(1 for r in results if r.status == "passed") + failed = sum(1 for r in results if r.status == "failed") + errors = sum(1 for r in results if r.status == "error") + + logger.info("\nTest Summary:") + logger.info(f"Total Tests: {len(results)}") + logger.info(f"Passed: {passed}") + logger.info(f"Failed: {failed}") + logger.info(f"Errors: {errors}") + + return results + + +def main(): + """Main entry point for running tests""" + logger.info("Starting API testing suite") + try: + results = asyncio.run(run_all_tests()) + + # Write results to JSON file + with open("test_results.json", "w") as f: + json.dump( + [result.dict() for result in results], + f, + indent=2, + default=str, + ) + + logger.info("Test results written to test_results.json") + + except Exception: + logger.error("Fatal error in test suite: ") + + +main() diff --git a/api/main.py b/api/main.py index 3ace9900..10123634 100644 --- a/api/main.py +++ b/api/main.py @@ -1,13 +1,17 @@ +import asyncio import os import secrets +import signal +import sys import traceback from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from enum import Enum from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, AsyncGenerator, Dict, List, Optional from uuid import UUID, uuid4 +from fastapi.concurrency import asynccontextmanager import uvicorn from dotenv import load_dotenv from fastapi import ( @@ -32,6 +36,19 @@ from swarms.structs.agent import Agent load_dotenv() +class UvicornServer(uvicorn.Server): + """Customized uvicorn server with graceful shutdown support""" + + async def setup(self, sockets=None): + """Setup the server""" + await super().setup(sockets) + + async def shutdown(self, sockets=None): + """Gracefully shutdown the server""" + logger.info("Shutting down server...") + await super().shutdown(sockets) + + class AgentStatus(str, Enum): """Enum for agent status.""" @@ -62,7 +79,16 @@ class User(BaseModel): username: str is_active: bool = True is_admin: bool = False - api_keys: Dict[str, APIKey] = {} # key -> APIKey object + api_keys: Dict[str, APIKey] = Field(default_factory=dict) + + def ensure_active_api_key(self) -> Optional[APIKey]: + """Ensure user has at least one active API key.""" + active_keys = [ + key for key in self.api_keys.values() if key.is_active + ] + if not active_keys: + return None + return active_keys[0] class AgentConfig(BaseModel): @@ -91,15 +117,6 @@ class AgentConfig(BaseModel): max_loops: int = Field( default=1, ge=1, description="Maximum number of loops" ) - autosave: bool = Field( - default=True, description="Enable autosave" - ) - dashboard: bool = Field( - default=False, description="Enable dashboard" - ) - verbose: bool = Field( - default=True, description="Enable verbose output" - ) dynamic_temperature_enabled: bool = Field( default=True, description="Enable dynamic temperature" ) @@ -122,6 +139,13 @@ class AgentConfig(BaseModel): default_factory=list, description="Tags for categorizing the agent", ) + stopping_token: str = Field( + default="", description="Stopping token for the agent" + ) + auto_generate_prompt: bool = Field( + default=False, + description="Auto-generate prompt based on agent details such as name, description, etc.", + ) class AgentUpdate(BaseModel): @@ -141,6 +165,7 @@ class AgentSummary(BaseModel): agent_id: UUID agent_name: str description: str + system_prompt: str created_at: datetime last_used: datetime total_completions: int @@ -241,20 +266,6 @@ class AgentStore: or self.users[user_id].is_admin ) - def validate_api_key(self, api_key: str) -> Optional[UUID]: - """Validate an API key and return the associated user ID.""" - user_id = self.api_keys.get(api_key) - if not user_id or api_key not in self.users[user_id].api_keys: - return None - - key_object = self.users[user_id].api_keys[api_key] - if not key_object.is_active: - return None - - # Update last used timestamp - key_object.last_used = datetime.utcnow() - return user_id - async def create_agent( self, config: AgentConfig, user_id: UUID ) -> UUID: @@ -266,17 +277,16 @@ class AgentStore: system_prompt=config.system_prompt, model_name=config.model_name, max_loops=config.max_loops, - autosave=config.autosave, - dashboard=config.dashboard, verbose=config.verbose, dynamic_temperature_enabled=True, - saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", user_name=config.user_name, retry_attempts=config.retry_attempts, context_length=config.context_length, - return_step_meta=True, + return_step_meta=False, output_type="str", streaming_on=config.streaming_on, + stopping_token=config.stopping_token, + auto_generate_prompt=config.auto_generate_prompt, ) agent_id = uuid4() @@ -345,6 +355,39 @@ class AgentStore: logger.info(f"Updated agent {agent_id}") + def ensure_user_api_key(self, user_id: UUID) -> APIKey: + """Ensure user has at least one active API key.""" + if user_id not in self.users: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + user = self.users[user_id] + existing_key = user.ensure_active_api_key() + if existing_key: + return existing_key + + # Create new API key if none exists + return self.create_api_key(user_id, "Default Key") + + def validate_api_key(self, api_key: str) -> Optional[UUID]: + """Validate an API key and return the associated user ID.""" + if not api_key: + return None + + user_id = self.api_keys.get(api_key) + if not user_id or api_key not in self.users[user_id].api_keys: + return None + + key_object = self.users[user_id].api_keys[api_key] + if not key_object.is_active: + return None + + # Update last used timestamp + key_object.last_used = datetime.utcnow() + return user_id + async def list_agents( self, tags: Optional[List[str]] = None, @@ -367,6 +410,7 @@ class AgentStore: AgentSummary( agent_id=agent_id, agent_name=agent.agent_name, + system_prompt=agent.system_prompt, description=metadata["description"], created_at=metadata["created_at"], last_used=metadata["last_used"], @@ -551,7 +595,7 @@ def get_store() -> AgentStore: return StoreManager.get_instance() -# Security utility function using the new dependency +# Modify the get_current_user dependency async def get_current_user( api_key: str = Header( ..., description="API key for authentication" @@ -559,6 +603,13 @@ async def get_current_user( store: AgentStore = Depends(get_store), ) -> User: """Validate API key and return current user.""" + if not api_key: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="API key is required", + headers={"WWW-Authenticate": "ApiKey"}, + ) + user_id = store.validate_api_key(api_key) if not user_id: raise HTTPException( @@ -566,7 +617,19 @@ async def get_current_user( detail="Invalid or expired API key", headers={"WWW-Authenticate": "ApiKey"}, ) - return store.users[user_id] + + user = store.users.get(user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + if not user.ensure_active_api_key(): + # Attempt to create new API key + store.ensure_user_api_key(user_id) + + return user class SwarmsAPI: @@ -600,6 +663,8 @@ class SwarmsAPI: """Set up API routes.""" # In your API code + + # Modify the create_user endpoint @self.app.post("/v1/users", response_model=Dict[str, Any]) async def create_user(request: Request): """Create a new user and initial API key.""" @@ -614,9 +679,17 @@ class SwarmsAPI: user_id = uuid4() user = User(id=user_id, username=username) self.store.users[user_id] = user + + # Always create initial API key initial_key = self.store.create_api_key( user_id, "Initial Key" ) + if not initial_key: + raise HTTPException( + status_code=500, + detail="Failed to create initial API key", + ) + return { "user_id": user_id, "api_key": initial_key.key, @@ -625,26 +698,6 @@ class SwarmsAPI: logger.error(f"Error creating user: {str(e)}") raise HTTPException(status_code=400, detail=str(e)) - @self.app.post( - "/v1/users/{user_id}/api-keys", response_model=APIKey - ) - async def create_api_key( - user_id: UUID, - key_create: APIKeyCreate, - current_user: User = Depends(get_current_user), - ): - """Create a new API key for a user.""" - if ( - current_user.id != user_id - and not current_user.is_admin - ): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Not authorized to create API keys for this user", - ) - - return self.store.create_api_key(user_id, key_create.name) - @self.app.get( "/v1/users/{user_id}/api-keys", response_model=List[APIKey], @@ -837,28 +890,92 @@ class SwarmsAPI: } +class APIServer: + def __init__( + self, app: FastAPI, host: str = "0.0.0.0", port: int = 8000 + ): + self.app = app + self.host = host + self.port = port + self.config = uvicorn.Config( + app=app, + host=host, + port=port, + log_level="info", + access_log=True, + workers=os.cpu_count() * 2, + ) + self.server = UvicornServer(config=self.config) + + # Setup signal handlers + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signum, frame): + """Handle shutdown signals""" + logger.info(f"Received signal {signum}") + asyncio.create_task(self.shutdown()) + + async def startup(self) -> None: + """Start the server""" + try: + logger.info( + f"Starting API server on http://{self.host}:{self.port}" + ) + print( + f"Starting API server on http://{self.host}:{self.port}" + ) + await self.server.serve() + except Exception as e: + logger.error(f"Failed to start server: {str(e)}") + raise + + async def shutdown(self) -> None: + """Shutdown the server""" + try: + logger.info("Initiating graceful shutdown...") + await self.server.shutdown() + except Exception as e: + logger.error(f"Error during shutdown: {str(e)}") + raise + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator: + """Lifespan context manager for the FastAPI app""" + # Startup + logger.info("Starting up API server...") + yield + # Shutdown + logger.info("Shutting down API server...") + + def create_app() -> FastAPI: - """Create and configure the FastAPI application.""" + """Create and configure the FastAPI application""" logger.info("Creating FastAPI application") api = SwarmsAPI() app = api.app + + # Add lifespan handling + app.router.lifespan_context = lifespan + logger.info("FastAPI application created successfully") return app -app = create_app() - -if __name__ == "__main__": +def run_server(): + """Run the API server""" try: - logger.info("Starting API server...") - print("Starting API server on http://0.0.0.0:8000") + # Create the FastAPI app + app = create_app() - uvicorn.run( - app, # Pass the app instance directly - host="0.0.0.0", - port=8000, - log_level="info", - ) + # Create and run the server + server = APIServer(app) + asyncio.run(server.startup()) except Exception as e: logger.error(f"Failed to start API: {str(e)}") - print(f"Error starting server: {str(e)}") \ No newline at end of file + print(f"Error starting server: {str(e)}" + + +if __name__ == "__main__": + run_server() diff --git a/api/test_api.py b/api/test_api.py index cf903652..2d05f6db 100644 --- a/api/test_api.py +++ b/api/test_api.py @@ -2,7 +2,7 @@ import requests import json from time import sleep -BASE_URL = "http://swarms-api-893767232.us-east-2.elb.amazonaws.com" +BASE_URL = "http://0.0.0.0:8000/v1" def make_request(method, endpoint, data=None): diff --git a/graph_swarm_example.py b/graph_swarm_example.py new file mode 100644 index 00000000..ae997673 --- /dev/null +++ b/graph_swarm_example.py @@ -0,0 +1,56 @@ +from loguru import logger +from swarms.structs.agent import Agent +from swarms.structs.graph_swarm import GraphSwarm + + +if __name__ == "__main__": + try: + # Create agents + data_collector = Agent( + agent_name="Market-Data-Collector", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ) + + trend_analyzer = Agent( + agent_name="Market-Trend-Analyzer", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ) + + report_generator = Agent( + agent_name="Investment-Report-Generator", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ) + + # Create swarm + swarm = GraphSwarm( + agents=[ + (data_collector, []), + (trend_analyzer, ["Market-Data-Collector"]), + (report_generator, ["Market-Trend-Analyzer"]), + ], + swarm_name="Market Analysis Intelligence Network", + ) + + # Run the swarm + result = swarm.run( + "Analyze current market trends for tech stocks and provide investment recommendations" + ) + + # Print results + print(f"Execution success: {result.success}") + print(f"Total time: {result.execution_time:.2f} seconds") + + for agent_name, output in result.outputs.items(): + print(f"\nAgent: {agent_name}") + print(f"Output: {output.output}") + if output.error: + print(f"Error: {output.error}") + except Exception as error: + logger.error(error) + raise error diff --git a/async_workflow_example.py b/new_features_examples/async_workflow_example.py similarity index 100% rename from async_workflow_example.py rename to new_features_examples/async_workflow_example.py diff --git a/new_features_examples/health_privacy_swarm.py b/new_features_examples/health_privacy_swarm.py new file mode 100644 index 00000000..2125f678 --- /dev/null +++ b/new_features_examples/health_privacy_swarm.py @@ -0,0 +1,265 @@ +import os +from swarms import Agent, AgentRearrange +from swarm_models import OpenAIChat + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("OPENAI_API_KEY") + +# Create an instance of the OpenAIChat class +model = OpenAIChat( + api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 +) + +# Initialize the gatekeeper agent +gatekeeper_agent = Agent( + agent_name="HealthScoreGatekeeper", + system_prompt=""" + + Health Score Privacy Gatekeeper + Protect and manage sensitive health information while providing necessary access to authorized agents + + + + + Manage encryption of health scores + Implement strict access control mechanisms + Track and log all access requests + + + Remove personally identifiable information + Convert raw health data into privacy-preserving formats + + + + + + + Verify agent authorization level + Check request legitimacy + Validate purpose of access + + + Numerical value only + Anonymized timestamp and request ID + + + + Never expose patient names or identifiers + No access to historical data without explicit authorization + Provide only aggregated or anonymized data when possible + + + + + + Maintain HIPAA compliance + Follow GDPR guidelines for data protection + + + Record all data access events + Track unusual access patterns + + + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="gatekeeper_agent.json", +) + +# Initialize the boss agent (Director) +boss_agent = Agent( + agent_name="BossAgent", + system_prompt=""" + + Swarm Director + Orchestrate and manage agent collaboration while respecting privacy boundaries + + + + + Assign and prioritize tasks + Ensure efficient collaboration + Maintain privacy protocols + + + Track agent effectiveness + Ensure accuracy of outputs + Enforce data protection policies + + + + + + Request access through gatekeeper only + Process only anonymized health scores + Share authorized information on need-to-know basis + + + Structured, secure messaging + End-to-end encrypted channels + + + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="boss_agent.json", +) + +# Initialize worker 1: Health Score Analyzer +worker1 = Agent( + agent_name="HealthScoreAnalyzer", + system_prompt=""" + + Health Score Analyst + Analyze anonymized health scores for patterns and insights + + + + + Advanced statistical analysis + Identify health trends + Evaluate health risk factors + + + Work only with anonymized data + Use encrypted analysis methods + + + + + + + Submit authenticated requests to gatekeeper + Process only authorized data + Maintain audit trail + + + + Ensure no identifiable information in reports + Present aggregate statistics only + + + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="worker1.json", +) + +# Initialize worker 2: Report Generator +worker2 = Agent( + agent_name="ReportGenerator", + system_prompt=""" + + Privacy-Conscious Report Generator + Create secure, anonymized health score reports + + + + + Generate standardized, secure reports + Apply privacy-preserving techniques + Compile statistical summaries + + + Implement secure report generation + Manage report distribution + + + + + + + No personal identifiers in reports + Aggregate data when possible + Apply statistical noise for privacy + + + Restricted to authorized personnel + Monitor report access + + + + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="worker2.json", +) + +# Swarm-Level Prompt (Collaboration Prompt) +swarm_prompt = """ + + Process and analyze health scores while maintaining strict privacy controls + + + HealthScoreGatekeeper + Receive and validate data access requests + Anonymized health scores + + + BossAgent + Coordinate analysis and reporting tasks + Enforce data protection protocols + + + HealthScoreAnalyzer + Process authorized health score data + Work only with anonymized information + + + ReportGenerator + Create privacy-preserving reports + Secure, anonymized insights + + + +""" + +# Create a list of agents +agents = [gatekeeper_agent, boss_agent, worker1, worker2] + +# Define the flow pattern for the swarm +flow = "HealthScoreGatekeeper -> BossAgent -> HealthScoreAnalyzer -> ReportGenerator" + +# Using AgentRearrange class to manage the swarm +agent_system = AgentRearrange( + name="health-score-swarm", + description="Privacy-focused health score analysis system", + agents=agents, + flow=flow, + return_json=False, + output_type="final", + max_loops=1, +) + +# Example task for the swarm +task = f""" + {swarm_prompt} + + Process the incoming health score data while ensuring patient privacy. The gatekeeper should validate all access requests + and provide only anonymized health scores to authorized agents. Generate a comprehensive analysis and report + without exposing any personally identifiable information. +""" + +# Run the swarm system with the task +output = agent_system.run(task) +print(output) diff --git a/new_features_examples/health_privacy_swarm_two.py b/new_features_examples/health_privacy_swarm_two.py new file mode 100644 index 00000000..674581c8 --- /dev/null +++ b/new_features_examples/health_privacy_swarm_two.py @@ -0,0 +1,291 @@ +import os +from swarms import Agent, AgentRearrange +from swarm_models import OpenAIChat + +# Initialize OpenAI model +api_key = os.getenv( + "OPENAI_API_KEY" +) # ANTHROPIC_API_KEY, COHERE_API_KEY +model = OpenAIChat( + api_key=api_key, + model_name="gpt-4o-mini", + temperature=0.7, # Higher temperature for more creative responses +) + +# Patient Agent - Holds and protects private information +patient_agent = Agent( + agent_name="PatientAgent", + system_prompt=""" + + Anxious Patient with Private Health Information + + + Protective of personal information + Slightly distrustful of medical system + Worried about health insurance rates + Selective in information sharing + + + Previous negative experience with information leaks + Fear of discrimination based on health status + + + + + + + Maintains actual health score + Knowledge of undisclosed conditions + Complete list of current medications + Full medical history + + + + Only share general symptoms with doctor + Withhold specific details about lifestyle + Never reveal full medication list + Protect actual health score value + + + + + + + + Deflect sensitive questions + Provide partial information when pressed + Become evasive if pressured too much + + + Share only what's absolutely necessary + Redirect personal questions + + + + """, + llm=model, + max_loops=1, + verbose=True, + stopping_token="", +) + +# Doctor Agent - Tries to gather accurate information +doctor_agent = Agent( + agent_name="DoctorAgent", + system_prompt=""" + + Empathetic but Thorough Medical Professional + + + Patient and understanding + Professionally persistent + Detail-oriented + Trust-building focused + + + + Uses indirect questions to gather information + + + + + + + + Ask open-ended questions + Notice inconsistencies in responses + Build rapport before sensitive questions + Use medical knowledge to probe deeper + + + + + Explain importance of full disclosure + Provide privacy assurances + Use empathetic listening + + + + + + + + Establish trust and rapport + Gather general health information + Carefully probe sensitive areas + Respect patient boundaries while encouraging openness + + + + """, + llm=model, + max_loops=1, + verbose=True, + stopping_token="", +) + +# Nurse Agent - Observes and assists +nurse_agent = Agent( + agent_name="NurseAgent", + system_prompt=""" + + Observant Support Medical Staff + + + Highly perceptive + Naturally trustworthy + Diplomatically skilled + + + Support doctor-patient communication + Notice non-verbal cues + + + + + + + + Patient body language + Inconsistencies in stories + Signs of withholding information + Emotional responses to questions + + + + + Provide comfortable environment + Offer reassurance when needed + Bridge communication gaps + + + + + + + + Share observations with doctor privately + Help patient feel more comfortable + Facilitate trust-building + + + + """, + llm=model, + max_loops=1, + verbose=True, + stopping_token="", +) + +# Medical Records Agent - Analyzes available information +records_agent = Agent( + agent_name="MedicalRecordsAgent", + system_prompt=""" + + Medical Records Analyst + + Analyze available medical information + Identify patterns and inconsistencies + + + + + + + Compare current and historical data + Identify information gaps + Flag potential inconsistencies + Generate questions for follow-up + + + + + Summarize known information + List missing critical data + Suggest areas for investigation + + + + + + + + Work only with authorized information + Maintain strict confidentiality + Flag but don't speculate about gaps + + + + """, + llm=model, + max_loops=1, + verbose=True, + stopping_token="", +) + +# Swarm-Level Prompt (Medical Consultation Scenario) +swarm_prompt = """ + + + Private medical office + Routine health assessment with complex patient + + + + + PatientAgent + Present for check-up, holding private information + + + + DoctorAgent + Conduct examination and gather information + NurseAgent + Observe and support interaction + + + + MedicalRecordsAgent + Process available information and identify gaps + + + + + Create realistic medical consultation interaction + Demonstrate information protection dynamics + Show natural healthcare provider-patient relationship + + +""" + +# Create agent list +agents = [patient_agent, doctor_agent, nurse_agent, records_agent] + +# Define interaction flow +flow = ( + "PatientAgent -> DoctorAgent -> NurseAgent -> MedicalRecordsAgent" +) + +# Configure swarm system +agent_system = AgentRearrange( + name="medical-consultation-swarm", + description="Role-playing medical consultation with focus on information privacy", + agents=agents, + flow=flow, + return_json=False, + output_type="final", + max_loops=1, +) + +# Example consultation scenario +task = f""" + {swarm_prompt} + + Begin a medical consultation where the patient has a health score of 72 but is reluctant to share full details + about their lifestyle and medication history. The doctor needs to gather accurate information while the nurse + observes the interaction. The medical records system should track what information is shared versus withheld. +""" + +# Run the consultation scenario +output = agent_system.run(task) +print(output) diff --git a/new_features_examples/insurance_swarm.py b/new_features_examples/insurance_swarm.py new file mode 100644 index 00000000..1f58902b --- /dev/null +++ b/new_features_examples/insurance_swarm.py @@ -0,0 +1,327 @@ +import asyncio +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional + +from swarms import Agent + + +class InsuranceType(Enum): + AUTO = "auto" + LIFE = "life" + HEALTH = "health" + HOME = "home" + BUSINESS = "business" + DENTAL = "dental" + TRAVEL = "travel" + + +@dataclass +class InsuranceProduct: + code: str + name: str + type: InsuranceType + description: str + coverage: List[str] + price_range: str + min_coverage: float + max_coverage: float + payment_options: List[str] + waiting_period: str + available: bool + + +# Simulated product database +INSURANCE_PRODUCTS = { + "AUTO001": InsuranceProduct( + code="AUTO001", + name="Seguro Auto Total", + type=InsuranceType.AUTO, + description="Seguro completo para vehículos con cobertura integral", + coverage=[ + "Daños por colisión", + "Robo total", + "Responsabilidad civil", + "Asistencia en carretera 24/7", + "Gastos médicos ocupantes", + ], + price_range="$800-2000 USD/año", + min_coverage=10000, + max_coverage=50000, + payment_options=["Mensual", "Trimestral", "Anual"], + waiting_period="Inmediata", + available=True, + ), + "LIFE001": InsuranceProduct( + code="LIFE001", + name="Vida Protegida Plus", + type=InsuranceType.LIFE, + description="Seguro de vida con cobertura extendida y beneficios adicionales", + coverage=[ + "Muerte natural", + "Muerte accidental (doble indemnización)", + "Invalidez total y permanente", + "Enfermedades graves", + "Gastos funerarios", + ], + price_range="$30-100 USD/mes", + min_coverage=50000, + max_coverage=1000000, + payment_options=["Mensual", "Anual"], + waiting_period="30 días", + available=True, + ), + "HEALTH001": InsuranceProduct( + code="HEALTH001", + name="Salud Preferencial", + type=InsuranceType.HEALTH, + description="Plan de salud premium con cobertura internacional", + coverage=[ + "Hospitalización", + "Cirugías", + "Consultas médicas", + "Medicamentos", + "Tratamientos especializados", + "Cobertura internacional", + ], + price_range="$100-300 USD/mes", + min_coverage=100000, + max_coverage=5000000, + payment_options=["Mensual", "Anual"], + waiting_period="90 días", + available=True, + ), +} + + +class WorkflowNode(Enum): + MAIN_MENU = "main_menu" + CHECK_AVAILABILITY = "check_availability" + PRODUCT_DETAILS = "product_details" + QUOTE_REQUEST = "quote_request" + CLAIMS = "claims" + LOCATE_OFFICE = "locate_office" + PAYMENT_OPTIONS = "payment_options" + + +LATAM_LOCATIONS = { + "Brasil": [ + { + "city": "São Paulo", + "offices": [ + { + "address": "Av. Paulista, 1374 - Bela Vista", + "phone": "+55 11 1234-5678", + "hours": "Lun-Vie: 9:00-18:00", + } + ], + } + ], + "México": [ + { + "city": "Ciudad de México", + "offices": [ + { + "address": "Paseo de la Reforma 250, Juárez", + "phone": "+52 55 1234-5678", + "hours": "Lun-Vie: 9:00-18:00", + } + ], + } + ], +} + + +class InsuranceBot: + def __init__(self): + self.agent = Agent( + agent_name="LATAM-Insurance-Agent", + system_prompt="""You are a specialized insurance assistant for Latin America's leading insurance provider. + +Key Responsibilities: +1. Product Information: + - Explain our comprehensive insurance portfolio + - Provide detailed coverage information + - Compare plans and benefits + - Quote estimates based on customer needs + +2. Customer Service: + - Process policy inquiries + - Handle claims information + - Assist with payment options + - Locate nearest offices + +3. Cultural Considerations: + - Communicate in Spanish and Portuguese + - Understand LATAM insurance regulations + - Consider regional healthcare systems + - Respect local customs and practices + +Use the following simulated product database for accurate information: +{INSURANCE_PRODUCTS} + +When discussing products, always reference accurate prices, coverage amounts, and waiting periods.""", + model_name="gpt-4", + max_loops=1, + verbose=True, + ) + + self.current_node = WorkflowNode.MAIN_MENU + self.current_product = None + + async def process_user_input(self, user_input: str) -> str: + """Process user input and return appropriate response""" + try: + if self.current_node == WorkflowNode.MAIN_MENU: + menu_choice = user_input.strip() + + if menu_choice == "1": + # Use agent to provide personalized product recommendations + return await self.agent.run( + """Por favor ayude al cliente a elegir un producto: + +Productos disponibles: +- AUTO001: Seguro Auto Total +- LIFE001: Vida Protegida Plus +- HEALTH001: Salud Preferencial + +Explique brevemente cada uno y solicite información sobre sus necesidades específicas.""" + ) + + elif menu_choice == "2": + self.current_node = WorkflowNode.QUOTE_REQUEST + # Use agent to handle quote requests + return await self.agent.run( + """Inicie el proceso de cotización. + Solicite la siguiente información de manera conversacional: + 1. Tipo de seguro + 2. Información personal básica + 3. Necesidades específicas de cobertura""" + ) + + elif menu_choice == "3": + return await self.agent.run( + """Explique el proceso de reclamos para cada tipo de seguro, + incluyendo documentación necesaria y tiempos estimados.""" + ) + + elif menu_choice == "4": + self.current_node = WorkflowNode.LOCATE_OFFICE + # Use agent to provide location guidance + return await self.agent.run( + f"""Based on our office locations: {LATAM_LOCATIONS} + Ask the customer for their location and help them find the nearest office. + Provide the response in Spanish.""" + ) + + elif menu_choice == "5": + # Use agent to explain payment options + return await self.agent.run( + """Explique todas las opciones de pago disponibles, + incluyendo métodos, frecuencias y cualquier descuento por pago anticipado.""" + ) + + elif menu_choice == "6": + # Use agent to handle advisor connection + return await self.agent.run( + """Explique el proceso para conectar con un asesor personal, + horarios de atención y canales disponibles.""" + ) + + else: + return await self.agent.run( + "Explain that the option is invalid and list the main menu options." + ) + + elif self.current_node == WorkflowNode.LOCATE_OFFICE: + # Use agent to process location request + return await self.agent.run( + f"""Based on user input: '{user_input}' + and our office locations: {LATAM_LOCATIONS} + Help them find the most relevant office. Response in Spanish.""" + ) + + # Check if input is a product code + if user_input.upper() in INSURANCE_PRODUCTS: + product = self.get_product_info(user_input.upper()) + # Use agent to provide detailed product information + return await self.agent.run( + f"""Provide detailed information about this product: + {self.format_product_info(product)} + Include additional benefits and comparison with similar products. + Response in Spanish.""" + ) + + # Handle general queries + return await self.agent.run( + f"""The user said: '{user_input}' + Provide a helpful response based on our insurance products and services. + Response in Spanish.""" + ) + + except Exception: + self.current_node = WorkflowNode.MAIN_MENU + return await self.agent.run( + "Explain that there was an error and list the main menu options. Response in Spanish." + ) + + def get_product_info( + self, product_code: str + ) -> Optional[InsuranceProduct]: + """Get product information from simulated database""" + return INSURANCE_PRODUCTS.get(product_code) + + def format_product_info(self, product: InsuranceProduct) -> str: + """Format product information for display""" + return f""" + Producto: {product.name} (Código: {product.code}) + Tipo: {product.type.value} + Descripción: {product.description} + + Cobertura incluye: + {chr(10).join(f'- {coverage}' for coverage in product.coverage)} + + Rango de precio: {product.price_range} + Cobertura mínima: ${product.min_coverage:,.2f} USD + Cobertura máxima: ${product.max_coverage:,.2f} USD + + Opciones de pago: {', '.join(product.payment_options)} + Período de espera: {product.waiting_period} + Estado: {'Disponible' if product.available else 'No disponible'} + """ + + def handle_main_menu(self) -> List[str]: + """Return main menu options""" + return [ + "1. Consultar productos de seguro", + "2. Solicitar cotización", + "3. Información sobre reclamos", + "4. Ubicar oficina más cercana", + "5. Opciones de pago", + "6. Hablar con un asesor", + ] + + +async def main(): + """Run the interactive session""" + bot = InsuranceBot() + + print( + "Sistema de Seguros LATAM inicializado. Escriba 'salir' para terminar." + ) + print("\nOpciones disponibles:") + print("\n".join(bot.handle_main_menu())) + + while True: + user_input = input("\nUsted: ").strip() + + if user_input.lower() in ["salir", "exit"]: + print("¡Gracias por usar nuestro servicio!") + break + + response = await bot.process_user_input(user_input) + print(f"Agente: {response}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/new_features_examples/main.py b/new_features_examples/main.py new file mode 100644 index 00000000..9cd2db5c --- /dev/null +++ b/new_features_examples/main.py @@ -0,0 +1,272 @@ +from typing import List, Dict +from dataclasses import dataclass +from datetime import datetime +import asyncio +import aiohttp +from loguru import logger +from swarms import Agent +from pathlib import Path +import json + + +@dataclass +class CryptoData: + """Real-time cryptocurrency data structure""" + + symbol: str + current_price: float + market_cap: float + total_volume: float + price_change_24h: float + market_cap_rank: int + + +class DataFetcher: + """Handles real-time data fetching from CoinGecko""" + + def __init__(self): + self.base_url = "https://api.coingecko.com/api/v3" + self.session = None + + async def _init_session(self): + if self.session is None: + self.session = aiohttp.ClientSession() + + async def close(self): + if self.session: + await self.session.close() + self.session = None + + async def get_market_data( + self, limit: int = 20 + ) -> List[CryptoData]: + """Fetch market data for top cryptocurrencies""" + await self._init_session() + + url = f"{self.base_url}/coins/markets" + params = { + "vs_currency": "usd", + "order": "market_cap_desc", + "per_page": str(limit), + "page": "1", + "sparkline": "false", + } + + try: + async with self.session.get( + url, params=params + ) as response: + if response.status != 200: + logger.error( + f"API Error {response.status}: {await response.text()}" + ) + return [] + + data = await response.json() + crypto_data = [] + + for coin in data: + try: + crypto_data.append( + CryptoData( + symbol=str( + coin.get("symbol", "") + ).upper(), + current_price=float( + coin.get("current_price", 0) + ), + market_cap=float( + coin.get("market_cap", 0) + ), + total_volume=float( + coin.get("total_volume", 0) + ), + price_change_24h=float( + coin.get("price_change_24h", 0) + ), + market_cap_rank=int( + coin.get("market_cap_rank", 0) + ), + ) + ) + except (ValueError, TypeError) as e: + logger.error( + f"Error processing coin data: {str(e)}" + ) + continue + + logger.info( + f"Successfully fetched data for {len(crypto_data)} coins" + ) + return crypto_data + + except Exception as e: + logger.error(f"Exception in get_market_data: {str(e)}") + return [] + + +class CryptoSwarmSystem: + def __init__(self): + self.agents = self._initialize_agents() + self.data_fetcher = DataFetcher() + logger.info("Crypto Swarm System initialized") + + def _initialize_agents(self) -> Dict[str, Agent]: + """Initialize different specialized agents""" + base_config = { + "max_loops": 1, + "autosave": True, + "dashboard": False, + "verbose": True, + "dynamic_temperature_enabled": True, + "retry_attempts": 3, + "context_length": 200000, + "return_step_meta": False, + "output_type": "string", + "streaming_on": False, + } + + agents = { + "price_analyst": Agent( + agent_name="Price-Analysis-Agent", + system_prompt="""Analyze the given cryptocurrency price data and provide insights about: + 1. Price trends and movements + 2. Notable price actions + 3. Potential support/resistance levels""", + saved_state_path="price_agent.json", + user_name="price_analyzer", + **base_config, + ), + "volume_analyst": Agent( + agent_name="Volume-Analysis-Agent", + system_prompt="""Analyze the given cryptocurrency volume data and provide insights about: + 1. Volume trends + 2. Notable volume spikes + 3. Market participation levels""", + saved_state_path="volume_agent.json", + user_name="volume_analyzer", + **base_config, + ), + "market_analyst": Agent( + agent_name="Market-Analysis-Agent", + system_prompt="""Analyze the overall cryptocurrency market data and provide insights about: + 1. Market trends + 2. Market dominance + 3. Notable market movements""", + saved_state_path="market_agent.json", + user_name="market_analyzer", + **base_config, + ), + } + return agents + + async def analyze_market(self) -> Dict: + """Run real-time market analysis using all agents""" + try: + # Fetch market data + logger.info("Fetching market data for top 20 coins") + crypto_data = await self.data_fetcher.get_market_data(20) + + if not crypto_data: + return { + "error": "Failed to fetch market data", + "timestamp": datetime.now().isoformat(), + } + + # Run analysis with each agent + results = {} + for agent_name, agent in self.agents.items(): + logger.info(f"Running {agent_name} analysis") + analysis = self._run_agent_analysis( + agent, crypto_data + ) + results[agent_name] = analysis + + return { + "timestamp": datetime.now().isoformat(), + "market_data": { + coin.symbol: { + "price": coin.current_price, + "market_cap": coin.market_cap, + "volume": coin.total_volume, + "price_change_24h": coin.price_change_24h, + "rank": coin.market_cap_rank, + } + for coin in crypto_data + }, + "analysis": results, + } + + except Exception as e: + logger.error(f"Error in market analysis: {str(e)}") + return { + "error": str(e), + "timestamp": datetime.now().isoformat(), + } + + def _run_agent_analysis( + self, agent: Agent, crypto_data: List[CryptoData] + ) -> str: + """Run analysis for a single agent""" + try: + data_str = json.dumps( + [ + { + "symbol": cd.symbol, + "price": cd.current_price, + "market_cap": cd.market_cap, + "volume": cd.total_volume, + "price_change_24h": cd.price_change_24h, + "rank": cd.market_cap_rank, + } + for cd in crypto_data + ], + indent=2, + ) + + prompt = f"""Analyze this real-time cryptocurrency market data and provide detailed insights: + {data_str}""" + + return agent.run(prompt) + + except Exception as e: + logger.error(f"Error in {agent.agent_name}: {str(e)}") + return f"Error: {str(e)}" + + +async def main(): + # Create output directory + Path("reports").mkdir(exist_ok=True) + + # Initialize the swarm system + swarm = CryptoSwarmSystem() + + while True: + try: + # Run analysis + report = await swarm.analyze_market() + + # Save report + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + report_path = f"reports/market_analysis_{timestamp}.json" + + with open(report_path, "w") as f: + json.dump(report, f, indent=2, default=str) + + logger.info( + f"Analysis complete. Report saved to {report_path}" + ) + + # Wait before next analysis + await asyncio.sleep(300) # 5 minutes + + except Exception as e: + logger.error(f"Error in main loop: {str(e)}") + await asyncio.sleep(60) # Wait 1 minute before retrying + finally: + if swarm.data_fetcher.session: + await swarm.data_fetcher.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/new_features_examples/privacy_building.py b/new_features_examples/privacy_building.py new file mode 100644 index 00000000..68d85e3e --- /dev/null +++ b/new_features_examples/privacy_building.py @@ -0,0 +1,263 @@ +import os +from swarms import Agent, AgentRearrange +from swarm_models import OpenAIChat + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("OPENAI_API_KEY") + +# Create an instance of the OpenAIChat class +model = OpenAIChat( + api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 +) + +# Initialize the matchmaker agent (Director) +matchmaker_agent = Agent( + agent_name="MatchmakerAgent", + system_prompt=""" + + You are the MatchmakerAgent, the primary coordinator for managing user profiles and facilitating meaningful connections while maintaining strict privacy standards. + + + + + - Full names + - Contact information (phone, email, social media) + - Exact location/address + - Financial information + - Personal identification numbers + - Workplace specifics + + + + - First name only + - Age range (not exact birth date) + - General location (city/region only) + - Interests and hobbies + - Relationship goals + - General profession category + + + + + Profile_Management + + - Review and verify user profiles for authenticity + - Ensure all shared information adheres to privacy guidelines + - Flag any potential security concerns + + + Match_Coordination + + - Analyze compatibility factors between users + - Prioritize matches based on shared interests and goals + - Monitor interaction patterns for safety and satisfaction + + + Communication_Flow + + - Coordinate information exchange between ProfileAnalyzer and ConnectionFacilitator + - Ensure smooth transition of approved information + - Maintain audit trail of information sharing + + + + + Consent_First + Never share information without explicit user consent + + Safety_Priority + Prioritize user safety and privacy over match potential + + Transparency + Be clear about what information is being shared and why + + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="matchmaker_agent.json", +) + +# Initialize worker 1: Profile Analyzer +profile_analyzer = Agent( + agent_name="ProfileAnalyzer", + system_prompt=""" + + You are the ProfileAnalyzer, responsible for deeply understanding user profiles and identifying meaningful compatibility factors while maintaining strict privacy protocols. + + + + + + - All sensitive information must be encrypted + - Access logs must be maintained + - Data retention policies must be followed + + + + - Use anonymized IDs for internal processing + - Apply privacy-preserving analysis techniques + - Implement data minimization principles + + + + + + - Shared interests alignment + - Relationship goal compatibility + - Value system overlap + - Lifestyle compatibility + - Communication style matching + + + + - Inconsistent information + - Suspicious behavior patterns + - Policy violations + - Safety concerns + + + + + + + - Generate compatibility scores + - Identify shared interests and potential conversation starters + - Flag potential concerns for review + - Provide reasoning for match recommendations + + + + - Apply progressive information disclosure rules + - Implement multi-stage verification for sensitive data sharing + - Maintain audit trails of information access + + + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="profile_analyzer.json", +) + +# Initialize worker 2: Connection Facilitator +connection_facilitator = Agent( + agent_name="ConnectionFacilitator", + system_prompt=""" + + You are the ConnectionFacilitator, responsible for managing the interaction between matched users and ensuring smooth, safe, and meaningful communication. + + + + + + - Manage introduction messages + - Monitor response patterns + - Flag any concerning behavior + + + + - Track engagement levels + - Identify conversation quality indicators + - Provide conversation suggestions when appropriate + + + + - Monitor relationship progression + - Record user feedback + - Update matching algorithms based on successful connections + + + + + + - Screen for inappropriate content + - Block prohibited information sharing + - Monitor for harassment or abuse + + + + - Implement progressive contact information sharing + - Maintain anonymized communication channels + - Protect user identity until mutual consent + + + + + + + - User engagement rates + - Communication quality scores + - Safety incident reports + - User satisfaction ratings + + + + - Collect interaction data + - Analyze success patterns + - Implement refinements to matching criteria + - Update safety protocols as needed + + + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="connection_facilitator.json", +) + +# Swarm-Level Prompt (Collaboration Prompt) +swarm_prompt = """ + As a dating platform swarm, your collective goal is to facilitate meaningful connections while maintaining + the highest standards of privacy and safety. The MatchmakerAgent oversees the entire matching process, + coordinating between the ProfileAnalyzer who deeply understands user compatibility, and the ConnectionFacilitator + who manages the development of connections. Together, you must ensure that: + + 1. User privacy is maintained at all times + 2. Information is shared progressively and with consent + 3. Safety protocols are strictly followed + 4. Meaningful connections are prioritized over quantity + 5. User experience remains positive and engaging +""" + +# Create a list of agents +agents = [matchmaker_agent, profile_analyzer, connection_facilitator] + +# Define the flow pattern for the swarm +flow = "MatchmakerAgent -> ProfileAnalyzer -> ConnectionFacilitator" + +# Using AgentRearrange class to manage the swarm +agent_system = AgentRearrange( + name="dating-swarm", + description="Privacy-focused dating platform agent system", + agents=agents, + flow=flow, + return_json=False, + output_type="final", + max_loops=1, +) + +# Example task for the swarm +task = f""" + {swarm_prompt} + + Process a new batch of user profiles and identify potential matches while ensuring all privacy protocols + are followed. For each potential match, provide compatibility reasoning and suggested conversation + starters without revealing any restricted information. +""" + +# Run the swarm system with the task +output = agent_system.run(task) +print(output) diff --git a/new_features_examples/swarms_claude_example.py b/new_features_examples/swarms_claude_example.py new file mode 100644 index 00000000..61da9f1e --- /dev/null +++ b/new_features_examples/swarms_claude_example.py @@ -0,0 +1,31 @@ +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT + + "Output the token when you're done creating a portfolio of etfs, index, funds, and more for AI", + max_loops=1, + model_name="openai/gpt-4o", + dynamic_temperature_enabled=True, + user_name="Kye", + retry_attempts=3, + # streaming_on=True, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + # interactive=True, + stopping_token="", + saved_state_path="agent_00.json", + interactive=False, +) + +agent.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", +) diff --git a/pyproject.toml b/pyproject.toml index d01ce388..504a20a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.7.0" +version = "6.7.5" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/__init__.py b/swarms/__init__.py index 0c3b5ca5..6539e20a 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -1,38 +1,10 @@ -import os -import concurrent.futures from dotenv import load_dotenv -from loguru import logger load_dotenv() -# Disable logging by default -if os.getenv("SWARMS_VERBOSE_GLOBAL", "False").lower() == "false": - logger.disable("") - -# Import telemetry functions with error handling from swarms.telemetry.bootup import bootup # noqa: E402, F403 -from swarms.telemetry.sentry_active import ( # noqa: E402 - activate_sentry, -) # noqa: E402 - - -# Run telemetry functions concurrently with error handling -def run_telemetry(): - try: - with concurrent.futures.ThreadPoolExecutor( - max_workers=2 - ) as executor: - future_bootup = executor.submit(bootup) - future_sentry = executor.submit(activate_sentry) - - # Wait for completion and check for exceptions - future_bootup.result() - future_sentry.result() - except Exception as e: - logger.error(f"Error running telemetry functions: {e}") - -run_telemetry() +bootup() from swarms.agents import * # noqa: E402, F403 from swarms.artifacts import * # noqa: E402, F403 diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 2f1f380f..d6caed66 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -2422,22 +2422,15 @@ class Agent: if self.llm is None: raise TypeError("LLM object cannot be None") - # Define common method names for LLM interfaces - method_names = ["run", "__call__", "generate", "invoke"] - - for method_name in method_names: - if hasattr(self.llm, method_name): - try: - method = getattr(self.llm, method_name) - return method(task, *args, **kwargs) - except Exception as e: - raise RuntimeError( - f"Error calling {method_name}: {str(e)}" - ) + try: + out = self.llm.run(task, *args, **kwargs) - raise AttributeError( - f"No suitable method found in the llm object. Expected one of: {method_names}" - ) + return out + except AttributeError as e: + logger.error( + f"Error calling LLM: {e} You need a class with a run(task: str) method" + ) + raise e def handle_sop_ops(self): # If the user inputs a list of strings for the sop then join them and set the sop diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 74945914..9df994c3 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -19,6 +19,7 @@ from clusterops import ( list_available_gpus, ) from swarms.utils.loguru_logger import initialize_logger +from swarms.structs.swarm_id_generator import generate_swarm_id logger = initialize_logger(log_folder="concurrent_workflow") @@ -50,7 +51,7 @@ class AgentOutputSchema(BaseModel): class MetadataSchema(BaseModel): swarm_id: Optional[str] = Field( - ..., description="Unique ID for the run" + generate_swarm_id(), description="Unique ID for the run" ) task: Optional[str] = Field( ..., description="Task or query given to all agents" diff --git a/swarms/structs/graph_swarm.py b/swarms/structs/graph_swarm.py index a96379e2..91054316 100644 --- a/swarms/structs/graph_swarm.py +++ b/swarms/structs/graph_swarm.py @@ -612,56 +612,3 @@ class GraphSwarm: self.graph.add_edge(dep, agent.agent_name) self._validate_graph() - - -if __name__ == "__main__": - try: - # Create agents - data_collector = Agent( - agent_name="Market-Data-Collector", - model_name="gpt-4o-mini", - max_loops=1, - streaming_on=True, - ) - - trend_analyzer = Agent( - agent_name="Market-Trend-Analyzer", - model_name="gpt-4o-mini", - max_loops=1, - streaming_on=True, - ) - - report_generator = Agent( - agent_name="Investment-Report-Generator", - model_name="gpt-4o-mini", - max_loops=1, - streaming_on=True, - ) - - # Create swarm - swarm = GraphSwarm( - agents=[ - (data_collector, []), - (trend_analyzer, ["Market-Data-Collector"]), - (report_generator, ["Market-Trend-Analyzer"]), - ], - swarm_name="Market Analysis Intelligence Network", - ) - - # Run the swarm - result = swarm.run( - "Analyze current market trends for tech stocks and provide investment recommendations" - ) - - # Print results - print(f"Execution success: {result.success}") - print(f"Total time: {result.execution_time:.2f} seconds") - - for agent_name, output in result.outputs.items(): - print(f"\nAgent: {agent_name}") - print(f"Output: {output.output}") - if output.error: - print(f"Error: {output.error}") - except Exception as error: - logger.error(error) - raise error diff --git a/swarms/structs/swarm_id_generator.py b/swarms/structs/swarm_id_generator.py new file mode 100644 index 00000000..c05e039d --- /dev/null +++ b/swarms/structs/swarm_id_generator.py @@ -0,0 +1,5 @@ +import uuid + + +def generate_swarm_id(): + return str(uuid.uuid4()) diff --git a/swarms/telemetry/bootup.py b/swarms/telemetry/bootup.py index 5e38c3ea..87dc1c77 100644 --- a/swarms/telemetry/bootup.py +++ b/swarms/telemetry/bootup.py @@ -1,27 +1,65 @@ import os import logging import warnings -from concurrent.futures import ThreadPoolExecutor +import concurrent.futures +from dotenv import load_dotenv +from loguru import logger from swarms.utils.disable_logging import disable_logging def bootup(): - """Bootup swarms""" + """Initialize swarms environment and configuration + + Handles environment setup, logging configuration, telemetry, + and workspace initialization. + """ try: - logging.disable(logging.CRITICAL) + # Load environment variables + load_dotenv() + + # Configure logging + if ( + os.getenv("SWARMS_VERBOSE_GLOBAL", "False").lower() + == "false" + ): + logger.disable("") + logging.disable(logging.CRITICAL) + + # Silent wandb os.environ["WANDB_SILENT"] = "true" - # Auto set workspace directory + # Configure workspace workspace_dir = os.path.join(os.getcwd(), "agent_workspace") - if not os.path.exists(workspace_dir): - os.makedirs(workspace_dir, exist_ok=True) + os.makedirs(workspace_dir, exist_ok=True) os.environ["WORKSPACE_DIR"] = workspace_dir + # Suppress warnings warnings.filterwarnings("ignore", category=DeprecationWarning) - # Use ThreadPoolExecutor to run disable_logging and auto_update concurrently - with ThreadPoolExecutor(max_workers=1) as executor: - executor.submit(disable_logging) + # Run telemetry functions concurrently + try: + with concurrent.futures.ThreadPoolExecutor( + max_workers=2 + ) as executor: + from swarms.telemetry.sentry_active import ( + activate_sentry, + ) + + future_disable_logging = executor.submit( + disable_logging + ) + future_sentry = executor.submit(activate_sentry) + + # Wait for completion and check for exceptions + future_disable_logging.result() + future_sentry.result() + except Exception as e: + logger.error(f"Error running telemetry functions: {e}") + except Exception as e: - print(f"An error occurred: {str(e)}") + logger.error(f"Error during bootup: {str(e)}") raise + + +# Run bootup +bootup()