[IMPROVEMENT][Add types to cli] [DOCS] [FastAPI Deployment Guide] [Overview docs of deployment solutions]

pull/1043/head
Kye Gomez 2 weeks ago
parent 8189c50ec1
commit 62952ce336

@ -0,0 +1,477 @@
# FastAPI Agent API
This guide shows you how to deploy your Swarms agents as REST APIs using FastAPI and Uvicorn. This is the fastest way to expose your agents via HTTP endpoints.
## Overview
FastAPI is a modern, fast web framework for building APIs with Python. Combined with Uvicorn (ASGI server), it provides excellent performance and automatic API documentation.
**Benefits:**
- **Fast**: Built on Starlette and Pydantic
- **Auto-docs**: Automatic OpenAPI/Swagger documentation
- **Type-safe**: Full type hints and validation
- **Easy**: Minimal boilerplate code
- **Monitoring**: Built-in logging and metrics
## Quick Start
### 1. Install Dependencies
```bash
pip install fastapi uvicorn swarms
```
### 2. Create Your Agent API
Create a file called `agent_api.py`:
```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from swarms import Agent
import uvicorn
from typing import Optional, Dict, Any
# Initialize FastAPI app
app = FastAPI(
title="Swarms Agent API",
description="REST API for Swarms agents",
version="1.0.0"
)
# Pydantic models for request/response
class AgentRequest(BaseModel):
"""Request model for agent tasks"""
task: str
agent_name: Optional[str] = "default"
max_loops: Optional[int] = 1
temperature: Optional[float] = None
class AgentResponse(BaseModel):
"""Response model for agent tasks"""
success: bool
result: str
agent_name: str
task: str
execution_time: Optional[float] = None
# Initialize your agent (you can customize this)
def create_agent(agent_name: str = "default") -> Agent:
"""Create and return a configured agent"""
return Agent(
agent_name=agent_name,
agent_description="Versatile AI agent for various tasks",
system_prompt="""You are a helpful AI assistant that can handle a wide variety of tasks.
You provide clear, accurate, and helpful responses while maintaining a professional tone.
Always strive to be thorough and accurate in your responses.""",
model_name="claude-sonnet-4-20250514",
dynamic_temperature_enabled=True,
max_loops=1,
dynamic_context_window=True,
)
# API endpoints
@app.get("/")
async def root():
"""Health check endpoint"""
return {"message": "Swarms Agent API is running!", "status": "healthy"}
@app.get("/health")
async def health_check():
"""Detailed health check"""
return {
"status": "healthy",
"service": "Swarms Agent API",
"version": "1.0.0"
}
@app.post("/agent/run", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
"""Run an agent with the specified task"""
try:
import time
start_time = time.time()
# Create agent instance
agent = create_agent(request.agent_name)
# Run the agent
result = agent.run(
task=request.task,
max_loops=request.max_loops
)
execution_time = time.time() - start_time
return AgentResponse(
success=True,
result=str(result),
agent_name=request.agent_name,
task=request.task,
execution_time=execution_time
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Agent execution failed: {str(e)}")
@app.post("/agent/chat")
async def chat_with_agent(request: AgentRequest):
"""Chat with an agent (conversational mode)"""
try:
agent = create_agent(request.agent_name)
# For chat, you might want to maintain conversation history
# This is a simple implementation
result = agent.run(
task=request.task,
max_loops=request.max_loops
)
return {
"success": True,
"response": str(result),
"agent_name": request.agent_name
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Chat failed: {str(e)}")
@app.get("/agents/available")
async def list_available_agents():
"""List available agent configurations"""
return {
"agents": [
{
"name": "default",
"description": "Versatile AI agent for various tasks",
"model": "claude-sonnet-4-20250514"
},
{
"name": "quantitative-trading",
"description": "Advanced quantitative trading and algorithmic analysis agent",
"model": "claude-sonnet-4-20250514"
}
]
}
# Custom agent endpoint example
@app.post("/agent/quantitative-trading")
async def run_quantitative_trading_agent(request: AgentRequest):
"""Run the quantitative trading agent specifically"""
try:
# Create specialized quantitative trading agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading""",
model_name="claude-sonnet-4-20250514",
dynamic_temperature_enabled=True,
max_loops=request.max_loops,
dynamic_context_window=True,
)
result = agent.run(task=request.task)
return {
"success": True,
"result": str(result),
"agent_name": "Quantitative-Trading-Agent",
"task": request.task
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Quantitative trading agent failed: {str(e)}")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
```
### 3. Run Your API
```bash
python agent_api.py
```
Or with uvicorn directly:
```bash
uvicorn agent_api:app --host 0.0.0.0 --port 8000 --reload
```
### 4. Test Your API
Your API will be available at:
- **API**: http://localhost:8000
- **Documentation**: http://localhost:8000/docs
- **Alternative docs**: http://localhost:8000/redoc
## Usage Examples
### Using curl
```bash
# Basic agent task
curl -X POST "http://localhost:8000/agent/run" \
-H "Content-Type: application/json" \
-d '{"task": "What are the best top 3 ETFs for gold coverage?"}'
# Quantitative trading agent
curl -X POST "http://localhost:8000/agent/quantitative-trading" \
-H "Content-Type: application/json" \
-d '{"task": "Analyze the current market conditions for gold ETFs"}'
```
### Using Python requests
```python
import requests
# Run basic agent
response = requests.post(
"http://localhost:8000/agent/run",
json={"task": "Explain quantum computing in simple terms"}
)
print(response.json())
# Run quantitative trading agent
response = requests.post(
"http://localhost:8000/agent/quantitative-trading",
json={"task": "What are the key factors affecting gold prices today?"}
)
print(response.json())
```
## Advanced Configuration
### Environment Variables
Create a `.env` file for configuration:
```bash
# .env
AGENT_MODEL_NAME=claude-sonnet-4-20250514
AGENT_MAX_LOOPS=3
API_HOST=0.0.0.0
API_PORT=8000
LOG_LEVEL=info
```
### Enhanced Agent Factory
```python
import os
from typing import Dict, Type
from swarms import Agent
class AgentFactory:
"""Factory for creating different types of agents"""
AGENT_CONFIGS = {
"default": {
"agent_name": "Default-Agent",
"agent_description": "Versatile AI agent for various tasks",
"system_prompt": "You are a helpful AI assistant...",
"model_name": "claude-sonnet-4-20250514"
},
"quantitative-trading": {
"agent_name": "Quantitative-Trading-Agent",
"agent_description": "Advanced quantitative trading agent",
"system_prompt": "You are an expert quantitative trading agent...",
"model_name": "claude-sonnet-4-20250514"
},
"research": {
"agent_name": "Research-Agent",
"agent_description": "Academic research and analysis agent",
"system_prompt": "You are an expert research agent...",
"model_name": "claude-sonnet-4-20250514"
}
}
@classmethod
def create_agent(cls, agent_type: str = "default", **kwargs) -> Agent:
"""Create an agent of the specified type"""
if agent_type not in cls.AGENT_CONFIGS:
raise ValueError(f"Unknown agent type: {agent_type}")
config = cls.AGENT_CONFIGS[agent_type].copy()
config.update(kwargs)
return Agent(**config)
```
### Authentication & Rate Limiting
```python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import time
# Rate limiting
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Security
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify API token"""
# Implement your token verification logic here
if credentials.credentials != "your-secret-token":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
return credentials.credentials
@app.post("/agent/run", response_model=AgentResponse)
@limiter.limit("10/minute")
async def run_agent(
request: AgentRequest,
token: str = Depends(verify_token)
):
"""Run an agent with authentication and rate limiting"""
# ... existing code ...
```
## Production Deployment
### Using Gunicorn
```bash
pip install gunicorn
gunicorn agent_api:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
```
### Using Docker
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "agent_api:app", "--host", "0.0.0.0", "--port", "8000"]
```
### Using Docker Compose
```yaml
version: '3.8'
services:
agent-api:
build: .
ports:
- "8000:8000"
environment:
- AGENT_MODEL_NAME=claude-sonnet-4-20250514
volumes:
- ./logs:/app/logs
```
## Monitoring & Logging
### Structured Logging
```python
import logging
import json
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_requests(request, call_next):
"""Log all requests and responses"""
start_time = time.time()
# Log request
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# Log response
process_time = time.time() - start_time
logger.info(f"Response: {response.status_code} - {process_time:.2f}s")
return response
```
### Health Checks
```python
@app.get("/health/detailed")
async def detailed_health_check():
"""Detailed health check with agent status"""
try:
# Test agent creation
agent = create_agent()
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"agent_status": "available",
"model_status": "connected"
}
except Exception as e:
return {
"status": "unhealthy",
"timestamp": datetime.utcnow().isoformat(),
"error": str(e)
}
```
## Best Practices
1. **Error Handling**: Always wrap agent execution in try-catch blocks
2. **Validation**: Use Pydantic models for request validation
3. **Rate Limiting**: Implement rate limiting for production APIs
4. **Authentication**: Add proper authentication for sensitive endpoints
5. **Logging**: Log all requests and responses for debugging
6. **Monitoring**: Add health checks and metrics
7. **Testing**: Write tests for your API endpoints
8. **Documentation**: Keep your API documentation up to date
## Troubleshooting
### Common Issues
1. **Port already in use**: Change the port in the uvicorn command
2. **Agent initialization fails**: Check your API keys and model configuration
3. **Memory issues**: Reduce `max_loops` or implement streaming responses
4. **Timeout errors**: Increase timeout settings for long-running tasks
### Performance Tips
1. **Connection pooling**: Reuse agent instances when possible
2. **Async operations**: Use async/await for I/O operations
3. **Caching**: Cache frequently requested responses
4. **Load balancing**: Use multiple worker processes for high traffic
## Next Steps
- [Cron Job Deployment](cron_job_deployment.md) - For scheduled tasks
- [Docker Deployment](docker_deployment.md) - For containerized deployment
- [Kubernetes Deployment](kubernetes_deployment.md) - For orchestrated deployment
- [Cloud Deployment](cloud_deployment.md) - For cloud platforms
Your FastAPI agent API is now ready to handle requests and scale with your needs!

@ -0,0 +1,135 @@
# Deployment Solutions Overview
This section covers various deployment strategies for Swarms agents and multi-agent systems, from simple local deployments to enterprise-grade cloud solutions.
## Deployment Types Comparison & Documentation
| Deployment Type | Use Case | Complexity | Scalability | Cost | Best For | Documentation Link | Status |
|------------------------|---------------------------|------------|-------------|--------------|----------------------------------------|------------------------------------------------------------------------------------|---------------|
| **FastAPI + Uvicorn** | REST API endpoints | Low | Medium | Low | Quick prototypes, internal tools | [FastAPI Agent API Guide](fastapi_agent_api.md) | Available |
| **Cron Jobs** | Scheduled tasks | Low | Low | Very Low | Batch processing, periodic tasks | [Cron Job Examples](../../examples/deployment_solutions/cron_job_examples/) | Available |
| **Docker Containers** | Containerized deployment | Medium | High | Low | Production, portability | [Docker Deployment Guide](docker_deployment.md) | Coming Soon |
| **Kubernetes** | Orchestrated containers | High | Very High | Medium | Enterprise, auto-scaling | [Kubernetes Deployment Guide](kubernetes_deployment.md) | Coming Soon |
| **Cloud Functions** | Serverless execution | Low | High | Pay-per-use | Event-driven, cost-effective | [Cloud Deployment Guide](cloud_deployment.md) | Coming Soon |
| **Cloud Run** | Containerized serverless | Medium | High | Pay-per-use | Production, auto-scaling | [Cloud Deployment Guide](cloud_deployment.md) | Coming Soon |
| **Traditional VMs** | Full control deployment | Medium | Medium | Medium | Custom requirements, full control | [Cloud Deployment Guide](cloud_deployment.md) | Coming Soon |
## Quick Start Guide
### 1. FastAPI + Uvicorn (Recommended for APIs)
- **Best for**: Creating REST APIs for your agents
- **Setup time**: 5-10 minutes
- **Documentation**: [FastAPI Agent API](fastapi_agent_api.md)
- **Example Code**: [FastAPI Example](../../examples/deployment_solutions/fastapi_agent_api_example.py)
### 2. Cron Jobs (Recommended for scheduled tasks)
- **Best for**: Running agents on a schedule
- **Setup time**: 2-5 minutes
- **Examples**: [Cron Job Examples](../../examples/deployment_solutions/cron_job_examples/)
### 3. Docker (Recommended for production)
- **Best for**: Consistent deployment across environments
- **Setup time**: 10-15 minutes
- **Documentation**: [Docker Deployment](docker_deployment.md)
## Deployment Considerations
### Performance
- **FastAPI**: Excellent for high-throughput APIs
- **Cron Jobs**: Good for batch processing
- **Docker**: Consistent performance across environments
- **Kubernetes**: Best for complex, scalable systems
### Security
- **FastAPI**: Built-in security features, easy to add authentication
- **Cron Jobs**: Runs with system permissions
- **Docker**: Isolated environment, security best practices
- **Kubernetes**: Advanced security policies and RBAC
### Monitoring & Observability
- **FastAPI**: Built-in logging, easy to integrate with monitoring tools
- **Cron Jobs**: Basic logging, requires custom monitoring setup
- **Docker**: Container-level monitoring, easy to integrate
- **Kubernetes**: Comprehensive monitoring and alerting
### Cost Optimization
- **FastAPI**: Pay for compute resources
- **Cron Jobs**: Minimal cost, runs on existing infrastructure
- **Docker**: Efficient resource utilization
- **Kubernetes**: Advanced resource management and auto-scaling
## Choosing the Right Deployment
### For Development & Testing
- **FastAPI + Uvicorn**: Quick setup, easy debugging
- **Cron Jobs**: Simple scheduled tasks
### For Production APIs
- **FastAPI + Docker**: Reliable, scalable
- **Cloud Run**: Auto-scaling, managed infrastructure
### For Enterprise Systems
- **Kubernetes**: Full control, advanced features
- **Hybrid approach**: Mix of deployment types based on use case
### For Cost-Sensitive Projects
- **Cron Jobs**: Minimal infrastructure cost
- **Cloud Functions**: Pay-per-use model
- **FastAPI**: Efficient resource utilization
## Next Steps
1. **Start with FastAPI** if you need an API endpoint
2. **Use Cron Jobs** for scheduled tasks
3. **Move to Docker** when you need consistency
4. **Consider Kubernetes** for complex, scalable systems
Each deployment solution includes detailed examples and step-by-step guides to help you get started quickly.

@ -341,7 +341,8 @@ nav:
# - Faiss: "swarms_memory/faiss.md"
- Deployment Solutions:
# - Overview: "swarms_cloud/overview.md"
- Overview: "deployment_solutions/overview.md"
- FastAPI + Uvicorn: "deployment_solutions/fastapi_agent_api.md"
- CronJob: "swarms/structs/cron_job.md"
- Providers:
- Deploy on Google Cloud Run: "swarms_cloud/cloud_run.md"

@ -0,0 +1,81 @@
# Deployment Solutions Examples
This directory contains practical examples of different deployment strategies for Swarms agents and multi-agent systems.
## Examples Overview
### FastAPI + Uvicorn
- **File**: `fastapi_agent_api_example.py`
- **Description**: Complete FastAPI application that exposes Swarms agents as REST APIs
- **Use Case**: Creating HTTP endpoints for your agents
- **Requirements**: `requirements.txt`
### Cron Jobs
- **Directory**: `cron_job_examples/`
- **Description**: Various examples of running agents on schedules
- **Use Case**: Automated, periodic task execution
- **Examples**: Crypto tracking, stock monitoring, data processing
## Quick Start
### FastAPI Example
1. **Install dependencies**:
```bash
pip install -r requirements.txt
```
2. **Run the API server**:
```bash
python fastapi_agent_api_example.py
```
3. **Access the API**:
- API: http://localhost:8000
- Documentation: http://localhost:8000/docs
- Health check: http://localhost:8000/health
4. **Test with curl**:
```bash
curl -X POST "http://localhost:8000/agent/run" \
-H "Content-Type: application/json" \
-d '{"task": "What are the best top 3 ETFs for gold coverage?"}'
```
### Cron Job Examples
Navigate to the `cron_job_examples/` directory for various scheduling examples:
- `cron_job_example.py` - Basic cron job setup
- `crypto_concurrent_cron_example.py` - Concurrent crypto monitoring
- `solana_price_tracker.py` - Solana price tracking
- `figma_stock_example.py` - Stock monitoring with Figma integration
## Testing
Run the test script to verify your setup:
```bash
python test_fastapi_example.py
```
## Documentation
For detailed guides and documentation, see:
- [Deployment Solutions Overview](../../docs/deployment_solutions/overview.md)
- [FastAPI Agent API Guide](../../docs/deployment_solutions/fastapi_agent_api.md)
## Requirements
- Python 3.8+
- Swarms framework
- FastAPI and Uvicorn (for API examples)
- Required API keys for your chosen models
## Support
If you encounter issues:
1. Check the requirements are installed correctly
2. Verify your API keys are set
3. Check the documentation for detailed setup instructions
4. Review the test script output for debugging information

@ -0,0 +1,348 @@
"""
FastAPI Agent API Example
This example shows how to deploy your Swarms agents as a REST API using FastAPI and Uvicorn.
Based on the quantitative trading agent from example.py.
Run this file to start your agent API server.
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from swarms import Agent
import uvicorn
import time
from typing import Optional, List
from loguru import logger
# Initialize FastAPI app
app = FastAPI(
title="Swarms Agent API",
description="REST API for Swarms agents - Quantitative Trading Example",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
# Pydantic models for request/response
class AgentRequest(BaseModel):
"""Request model for agent tasks"""
task: str
agent_name: Optional[str] = "quantitative-trading"
max_loops: Optional[int] = 1
temperature: Optional[float] = None
class AgentResponse(BaseModel):
"""Response model for agent tasks"""
success: bool
result: str
agent_name: str
task: str
execution_time: Optional[float] = None
timestamp: str
class AgentInfo(BaseModel):
"""Model for agent information"""
name: str
description: str
model: str
capabilities: List[str]
# Agent configurations
AGENT_CONFIGS = {
"quantitative-trading": {
"agent_name": "Quantitative-Trading-Agent",
"agent_description": "Advanced quantitative trading and algorithmic analysis agent",
"system_prompt": """You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
"model_name": "claude-sonnet-4-20250514",
"capabilities": [
"Trading strategy development",
"Market analysis",
"Risk management",
"Portfolio optimization",
"Quantitative research",
"Performance evaluation",
],
},
"general": {
"agent_name": "General-AI-Agent",
"agent_description": "Versatile AI agent for various tasks",
"system_prompt": """You are a helpful AI assistant that can handle a wide variety of tasks.
You provide clear, accurate, and helpful responses while maintaining a professional tone.
Always strive to be thorough and accurate in your responses.""",
"model_name": "claude-sonnet-4-20250514",
"capabilities": [
"General assistance",
"Problem solving",
"Information gathering",
"Analysis and synthesis",
],
},
}
def create_agent(
agent_type: str = "quantitative-trading", **kwargs
) -> Agent:
"""Create and return a configured agent"""
if agent_type not in AGENT_CONFIGS:
raise ValueError(f"Unknown agent type: {agent_type}")
config = AGENT_CONFIGS[agent_type].copy()
agent_config = {
"agent_name": config["agent_name"],
"agent_description": config["agent_description"],
"system_prompt": config["system_prompt"],
"model_name": config["model_name"],
"dynamic_temperature_enabled": True,
"max_loops": kwargs.get("max_loops", 1),
"dynamic_context_window": True,
}
# Update with any additional kwargs
agent_config.update(kwargs)
return Agent(**agent_config)
# API endpoints
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"message": "Swarms Agent API is running!",
"status": "healthy",
"version": "1.0.0",
"docs": "/docs",
}
@app.get("/health")
async def health_check():
"""Detailed health check"""
return {
"status": "healthy",
"service": "Swarms Agent API",
"version": "1.0.0",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
}
@app.get("/agents", response_model=List[AgentInfo])
async def list_available_agents():
"""List available agent configurations"""
agents = []
for agent_type, config in AGENT_CONFIGS.items():
agents.append(
AgentInfo(
name=agent_type,
description=config["agent_description"],
model=config["model_name"],
capabilities=config["capabilities"],
)
)
return agents
@app.post("/agent/run", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
"""Run an agent with the specified task"""
try:
start_time = time.time()
# Create agent instance
agent = create_agent(
agent_type=request.agent_name, max_loops=request.max_loops
)
# Run the agent
result = agent.run(task=request.task)
execution_time = time.time() - start_time
logger.info(
f"Agent {request.agent_name} completed task in {execution_time:.2f}s"
)
return AgentResponse(
success=True,
result=str(result),
agent_name=request.agent_name,
task=request.task,
execution_time=execution_time,
timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
)
except Exception as e:
logger.error(f"Agent execution failed: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Agent execution failed: {str(e)}",
)
@app.post("/agent/chat")
async def chat_with_agent(request: AgentRequest):
"""Chat with an agent (conversational mode)"""
try:
agent = create_agent(
agent_type=request.agent_name, max_loops=request.max_loops
)
result = agent.run(task=request.task)
return {
"success": True,
"response": str(result),
"agent_name": request.agent_name,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
}
except Exception as e:
logger.error(f"Chat failed: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Chat failed: {str(e)}"
)
@app.post("/agent/quantitative-trading")
async def run_quantitative_trading_agent(request: AgentRequest):
"""Run the quantitative trading agent specifically"""
try:
start_time = time.time()
# Create specialized quantitative trading agent
agent = create_agent(
"quantitative-trading", max_loops=request.max_loops
)
result = agent.run(task=request.task)
execution_time = time.time() - start_time
logger.info(
f"Quantitative trading agent completed task in {execution_time:.2f}s"
)
return {
"success": True,
"result": str(result),
"agent_name": "Quantitative-Trading-Agent",
"task": request.task,
"execution_time": execution_time,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
}
except Exception as e:
logger.error(f"Quantitative trading agent failed: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Quantitative trading agent failed: {str(e)}",
)
@app.get("/examples")
async def get_example_requests():
"""Get example API requests for testing"""
return {
"examples": [
{
"endpoint": "/agent/run",
"method": "POST",
"description": "Run any available agent",
"request_body": {
"task": "What are the best top 3 ETFs for gold coverage?",
"agent_name": "quantitative-trading",
"max_loops": 1,
},
},
{
"endpoint": "/agent/quantitative-trading",
"method": "POST",
"description": "Run quantitative trading agent specifically",
"request_body": {
"task": "Analyze the current market conditions for gold ETFs",
"max_loops": 1,
},
},
{
"endpoint": "/agent/chat",
"method": "POST",
"description": "Chat with an agent",
"request_body": {
"task": "Explain the concept of statistical arbitrage",
"agent_name": "quantitative-trading",
},
},
]
}
# Middleware for logging
@app.middleware("http")
async def log_requests(request, call_next):
"""Log all requests and responses"""
start_time = time.time()
# Log request
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# Log response
process_time = time.time() - start_time
logger.info(
f"Response: {response.status_code} - {process_time:.2f}s"
)
return response
if __name__ == "__main__":
print("Starting Swarms Agent API...")
print(
"API Documentation will be available at: http://localhost:8000/docs"
)
print("Alternative docs at: http://localhost:8000/redoc")
print("Health check at: http://localhost:8000/health")
print("Available agents at: http://localhost:8000/agents")
print("Examples at: http://localhost:8000/examples")
print("\n" + "=" * 60)
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

@ -0,0 +1,15 @@
# FastAPI Agent API Dependencies
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
pydantic>=2.0.0
swarms>=0.0.1
# Optional dependencies for production
gunicorn>=21.0.0
python-multipart>=0.0.6
# For enhanced features (optional)
slowapi>=0.1.9 # Rate limiting
python-jose[cryptography]>=3.3.0 # JWT tokens
passlib[bcrypt]>=1.7.4 # Password hashing
python-dotenv>=1.0.0 # Environment variables

@ -713,7 +713,7 @@ def demonstrate_visualization_features():
# Test text visualization (always available)
print("\n📝 Testing text visualization...")
try:
text_viz = workflow.visualize_simple()
workflow.visualize_simple()
print("✅ Text visualization successful")
except Exception as e:
print(f"❌ Text visualization failed: {e}")

@ -54,7 +54,7 @@ def install_package(package: str) -> bool:
"""Install a package using pip."""
try:
print(f"📦 Installing {package}...")
result = subprocess.run(
subprocess.run(
[sys.executable, "-m", "pip", "install", package],
capture_output=True,
text=True,

@ -1,27 +1,31 @@
import os
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import traceback
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
import yaml
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from pydantic import (
BaseModel,
Extra,
Field,
field_validator,
)
from swarms.utils.loguru_logger import initialize_logger
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
from swarms.utils.litellm_wrapper import LiteLLM
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="create_agents_from_yaml")
class AgentConfig(BaseModel):
"""Configuration model for creating agents with support for custom kwargs."""
agent_name: str
system_prompt: str
model_name: Optional[str] = None
@ -41,9 +45,14 @@ class AgentConfig(BaseModel):
artifacts_file_extension: str = ".md"
artifacts_output_path: str = ""
# Allow arbitrary additional fields for custom agent parameters
class Config:
extra = "allow"
@field_validator("system_prompt")
@classmethod
def validate_system_prompt(cls, v):
"""Validate that system prompt is a non-empty string."""
if not v or not isinstance(v, str) or len(v.strip()) == 0:
raise ValueError(
"System prompt must be a non-empty string"
@ -52,6 +61,8 @@ class AgentConfig(BaseModel):
class SwarmConfig(BaseModel):
"""Configuration model for creating swarm routers with support for custom kwargs."""
name: str
description: str
max_loops: int = Field(default=1, ge=1)
@ -62,30 +73,21 @@ class SwarmConfig(BaseModel):
return_json: bool = False
rules: str = ""
@field_validator("swarm_type")
@classmethod
def validate_swarm_type(cls, v):
valid_types = {
"SequentialWorkflow",
"ConcurrentWorkflow",
"AgentRearrange",
"MixtureOfAgents",
"auto",
}
if v not in valid_types:
raise ValueError(
f"Swarm type must be one of: {valid_types}"
)
return v
# Allow arbitrary additional fields for custom swarm parameters
class Config:
extra = Extra.allow
class YAMLConfig(BaseModel):
"""Main configuration model for the YAML file."""
agents: List[AgentConfig] = Field(..., min_length=1)
swarm_architecture: Optional[SwarmConfig] = None
model_config = {
"extra": "forbid" # Prevent additional fields not in the model
}
ReturnTypes = Literal[
"auto", "swarm", "agents", "both", "tasks", "run_swarm"
]
def load_yaml_safely(
@ -124,45 +126,53 @@ def load_yaml_safely(
f"Retrying after error: {retry_state.outcome.exception()}"
),
)
def create_agent_with_retry(
agent_config: Dict, model: LiteLLM
) -> Agent:
def create_agent_with_retry(agent_config: Dict) -> Agent:
"""Create an agent with retry logic for handling transient failures."""
try:
validated_config = AgentConfig(**agent_config)
agent = Agent(
agent_name=validated_config.agent_name,
system_prompt=validated_config.system_prompt,
llm=model,
max_loops=validated_config.max_loops,
autosave=validated_config.autosave,
dashboard=validated_config.dashboard,
verbose=validated_config.verbose,
dynamic_temperature_enabled=validated_config.dynamic_temperature_enabled,
saved_state_path=validated_config.saved_state_path,
user_name=validated_config.user_name,
retry_attempts=validated_config.retry_attempts,
context_length=validated_config.context_length,
return_step_meta=validated_config.return_step_meta,
output_type=validated_config.output_type,
auto_generate_prompt=validated_config.auto_generate_prompt,
artifacts_on=validated_config.artifacts_on,
artifacts_file_extension=validated_config.artifacts_file_extension,
artifacts_output_path=validated_config.artifacts_output_path,
)
# Extract standard Agent parameters
standard_params = {
"agent_name": validated_config.agent_name,
"system_prompt": validated_config.system_prompt,
"max_loops": validated_config.max_loops,
"model_name": validated_config.model_name,
"autosave": validated_config.autosave,
"dashboard": validated_config.dashboard,
"verbose": validated_config.verbose,
"dynamic_temperature_enabled": validated_config.dynamic_temperature_enabled,
"saved_state_path": validated_config.saved_state_path,
"user_name": validated_config.user_name,
"retry_attempts": validated_config.retry_attempts,
"context_length": validated_config.context_length,
"return_step_meta": validated_config.return_step_meta,
"output_type": validated_config.output_type,
"auto_generate_prompt": validated_config.auto_generate_prompt,
"artifacts_on": validated_config.artifacts_on,
"artifacts_file_extension": validated_config.artifacts_file_extension,
"artifacts_output_path": validated_config.artifacts_output_path,
}
# Extract any additional custom parameters
custom_params = {}
for key, value in agent_config.items():
if key not in standard_params and key != "agent_name":
custom_params[key] = value
# Create agent with standard and custom parameters
agent = Agent(**standard_params, **custom_params)
return agent
except Exception as e:
logger.error(
f"Error creating agent {agent_config.get('agent_name', 'unknown')}: {str(e)}"
f"Error creating agent {agent_config.get('agent_name', 'unknown')}: {str(e)} Traceback: {traceback.format_exc()}"
)
raise
def create_agents_from_yaml(
model: Callable = None,
yaml_file: str = "agents.yaml",
yaml_string: str = None,
return_type: str = "auto",
return_type: ReturnTypes = "auto",
) -> Union[
SwarmRouter,
Agent,
@ -172,6 +182,39 @@ def create_agents_from_yaml(
]:
"""
Create agents and/or SwarmRouter based on configurations defined in a YAML file or string.
This function now supports custom parameters for both Agent and SwarmRouter creation.
Any additional fields in your YAML configuration will be passed through as kwargs.
Args:
yaml_file: Path to YAML configuration file
yaml_string: YAML configuration as a string (alternative to yaml_file)
return_type: Type of return value ("auto", "swarm", "agents", "both", "tasks", "run_swarm")
Returns:
Depending on return_type and configuration, returns:
- Single Agent (if only one agent and return_type in ["auto", "swarm", "agents"])
- List of Agents (if multiple agents and return_type in ["auto", "swarm", "agents"])
- SwarmRouter (if return_type in ["auto", "swarm"] and swarm_architecture defined)
- Tuple of (SwarmRouter, List[Agent]) (if return_type == "both")
- Task results (if return_type == "tasks")
- Swarm execution result (if return_type == "run_swarm")
Example YAML with custom parameters:
agents:
- agent_name: "CustomAgent"
system_prompt: "You are a helpful assistant"
custom_param1: "value1"
custom_param2: 42
nested_config:
key: "value"
swarm_architecture:
name: "CustomSwarm"
description: "A custom swarm"
swarm_type: "SequentialWorkflow"
custom_swarm_param: "swarm_value"
another_param: 123
"""
agents = []
task_results = []
@ -204,23 +247,7 @@ def create_agents_from_yaml(
f"Creating agent {idx}/{len(config['agents'])}: {agent_config['agent_name']}"
)
if "model_name" in agent_config:
logger.info(
f"Using specified model: {agent_config['model_name']}"
)
model_instance = LiteLLM(
model_name=agent_config["model_name"]
)
else:
model_name = "gpt-4"
logger.info(
f"No model specified, using default: {model_name}"
)
model_instance = LiteLLM(model_name=model_name)
agent = create_agent_with_retry(
agent_config, model_instance
)
agent = create_agent_with_retry(agent_config)
logger.info(
f"Agent {agent_config['agent_name']} created successfully."
)
@ -257,18 +284,34 @@ def create_agents_from_yaml(
logger.info(
f"Creating SwarmRouter with type: {swarm_config.swarm_type}"
)
# Extract standard SwarmRouter parameters
standard_swarm_params = {
"name": swarm_config.name,
"description": swarm_config.description,
"max_loops": swarm_config.max_loops,
"agents": agents,
"swarm_type": swarm_config.swarm_type,
"task": swarm_config.task,
"flow": swarm_config.flow,
"autosave": swarm_config.autosave,
"return_json": swarm_config.return_json,
"rules": swarm_config.rules,
}
# Extract any additional custom parameters for SwarmRouter
custom_swarm_params = {}
for key, value in config[
"swarm_architecture"
].items():
if key not in standard_swarm_params:
custom_swarm_params[key] = value
# Create SwarmRouter with standard and custom parameters
swarm_router = SwarmRouter(
name=swarm_config.name,
description=swarm_config.description,
max_loops=swarm_config.max_loops,
agents=agents,
swarm_type=swarm_config.swarm_type,
task=swarm_config.task,
flow=swarm_config.flow,
autosave=swarm_config.autosave,
return_json=swarm_config.return_json,
rules=swarm_config.rules,
**standard_swarm_params, **custom_swarm_params
)
logger.info(
f"SwarmRouter '{swarm_config.name}' created successfully."
)
@ -284,18 +327,9 @@ def create_agents_from_yaml(
"has a valid swarm_architecture section with required fields."
)
# Handle return types with improved error checking
valid_return_types = {
"auto",
"swarm",
"agents",
"both",
"tasks",
"run_swarm",
}
if return_type not in valid_return_types:
if return_type not in ReturnTypes:
raise ValueError(
f"Invalid return_type. Must be one of: {valid_return_types}"
f"Invalid return_type. Must be one of: {ReturnTypes}"
)
logger.info(f"Processing with return type: {return_type}")
@ -363,6 +397,6 @@ def create_agents_from_yaml(
except Exception as e:
logger.error(
f"Critical error in create_agents_from_yaml: {str(e)}\n"
"Please check your YAML configuration and try again."
f"Please check your YAML configuration and try again. Traceback: {traceback.format_exc()}"
)
raise

@ -1368,7 +1368,6 @@ class Conversation:
# Binary search
left, right = 0, len(text)
best_length = 0
best_text = ""
while left <= right:
@ -1378,7 +1377,6 @@ class Conversation:
if tokens <= target_tokens:
# If current truncated text token count is less than or equal to target, try longer text
best_length = mid
best_text = truncated
left = mid + 1
else:

Loading…
Cancel
Save