You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
11 KiB
11 KiB
ModelRouter Docs
The ModelRouter is an intelligent routing system that automatically selects and executes AI models based on task requirements. It leverages a function-calling architecture to analyze tasks and recommend the optimal model and provider combination for each specific use case.
Key Features
- Dynamic model selection based on task complexity and requirements
- Multi-provider support (OpenAI, Anthropic, Google, etc.)
- Concurrent and asynchronous execution capabilities
- Batch processing with memory
- Automatic error handling and retries
- Provider-aware routing
- Cost optimization
Constructor Arguments
Parameter | Type | Default | Description |
---|---|---|---|
system_prompt | str | model_router_system_prompt | Custom prompt for guiding model selection behavior |
max_tokens | int | 4000 | Maximum token limit for model outputs |
temperature | float | 0.5 | Control parameter for response randomness (0.0-1.0) |
max_workers | int/str | 10 | Maximum concurrent workers ("auto" for CPU count) |
api_key | str | None | API key for model access |
max_loops | int | 1 | Maximum number of refinement iterations |
*args | Any | None | Additional positional arguments |
**kwargs | Any | None | Additional keyword arguments |
Core Methods
run(task: str) -> str
Executes a single task through the model router with memory and refinement capabilities.
Installation
- Install the latest version of swarms using pip:
pip3 install -U swarms
- Setup your API Keys in your .env file with the following:
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
GOOGLE_API_KEY=your_google_api_key
# Add more API keys as needed following litellm format
from swarms import ModelRouter
router = ModelRouter()
# Simple text analysis
result = router.run("Analyze the sentiment and key themes in this customer feedback")
# Complex reasoning task
complex_result = router.run("""
Evaluate the following business proposal:
- Initial investment: $500,000
- Projected ROI: 25% annually
- Market size: $2B
- Competition: 3 major players
Provide detailed analysis and recommendations.
""")
batch_run(tasks: list) -> list
Executes multiple tasks sequentially with result aggregation.
# Multiple analysis tasks
tasks = [
"Analyze Q1 financial performance",
"Predict Q2 market trends",
"Evaluate competitor strategies",
"Generate growth recommendations"
]
results = router.batch_run(tasks)
# Process results
for task, result in zip(tasks, results):
print(f"Task: {task}\nResult: {result}\n")
concurrent_run(tasks: list) -> list
Parallel execution of multiple tasks using thread pooling.
import asyncio
from typing import List
# Define multiple concurrent tasks
analysis_tasks = [
"Perform technical analysis of AAPL stock",
"Analyze market sentiment from social media",
"Generate trading signals",
"Calculate risk metrics"
]
# Execute tasks concurrently
results = router.concurrent_run(analysis_tasks)
# Process results with error handling
for task, result in zip(analysis_tasks, results):
try:
processed_result = process_analysis(result)
save_to_database(processed_result)
except Exception as e:
log_error(f"Error processing {task}: {str(e)}")
async_run(task: str) -> asyncio.Task
Asynchronous task execution with coroutine support.
async def process_data_stream():
tasks = []
async for data in data_stream:
task = await router.async_run(f"Process data: {data}")
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
# Usage in async context
async def main():
router = ModelRouter()
results = await process_data_stream()
Advanced Usage Examples
Financial Analysis System
from swarms import ModelRouter
from typing import Dict, List
import pandas as pd
class FinancialAnalysisSystem:
def __init__(self):
self.router = ModelRouter(
temperature=0.3, # Lower temperature for more deterministic outputs
max_tokens=8000, # Higher token limit for detailed analysis
max_loops=2 # Allow for refinement iteration
)
def analyze_company_financials(self, financial_data: Dict) -> Dict:
analysis_task = f"""
Perform comprehensive financial analysis:
Financial Metrics:
- Revenue: ${financial_data['revenue']}M
- EBITDA: ${financial_data['ebitda']}M
- Debt/Equity: {financial_data['debt_equity']}
- Working Capital: ${financial_data['working_capital']}M
Required Analysis:
1. Profitability assessment
2. Liquidity analysis
3. Growth projections
4. Risk evaluation
5. Investment recommendations
Provide detailed insights and actionable recommendations.
"""
result = self.router.run(analysis_task)
return self._parse_analysis_result(result)
def _parse_analysis_result(self, result: str) -> Dict:
# Implementation of result parsing
pass
# Usage
analyzer = FinancialAnalysisSystem()
company_data = {
'revenue': 150,
'ebitda': 45,
'debt_equity': 0.8,
'working_capital': 25
}
analysis = analyzer.analyze_company_financials(company_data)
Healthcare Data Processing Pipeline
from swarms import ModelRouter
import pandas as pd
from typing import List, Dict
class MedicalDataProcessor:
def __init__(self):
self.router = ModelRouter(
max_workers="auto", # Automatic worker scaling
temperature=0.2, # Conservative temperature for medical analysis
system_prompt="""You are a specialized medical data analyzer focused on:
1. Clinical terminology interpretation
2. Patient data analysis
3. Treatment recommendation review
4. Medical research synthesis"""
)
async def process_patient_records(self, records: List[Dict]) -> List[Dict]:
analysis_tasks = []
for record in records:
task = f"""
Analyze patient record:
- Age: {record['age']}
- Symptoms: {', '.join(record['symptoms'])}
- Vital Signs: {record['vitals']}
- Medications: {', '.join(record['medications'])}
- Lab Results: {record['lab_results']}
Provide:
1. Symptom analysis
2. Medication interaction check
3. Lab results interpretation
4. Treatment recommendations
"""
analysis_tasks.append(task)
results = await asyncio.gather(*[
self.router.async_run(task) for task in analysis_tasks
])
return [self._parse_medical_analysis(r) for r in results]
def _parse_medical_analysis(self, analysis: str) -> Dict:
# Implementation of medical analysis parsing
pass
# Usage
async def main():
processor = MedicalDataProcessor()
patient_records = [
{
'age': 45,
'symptoms': ['fever', 'cough', 'fatigue'],
'vitals': {'bp': '120/80', 'temp': '38.5C'},
'medications': ['lisinopril', 'metformin'],
'lab_results': 'WBC: 11,000, CRP: 2.5'
}
# More records...
]
analyses = await processor.process_patient_records(patient_records)
Natural Language Processing Pipeline
from swarms import ModelRouter
from typing import List, Dict
import asyncio
class NLPPipeline:
def __init__(self):
self.router = ModelRouter(
temperature=0.4,
max_loops=2
)
def process_documents(self, documents: List[str]) -> List[Dict]:
tasks = [self._create_nlp_task(doc) for doc in documents]
results = self.router.concurrent_run(tasks)
return [self._parse_nlp_result(r) for r in results]
def _create_nlp_task(self, document: str) -> str:
return f"""
Perform comprehensive NLP analysis:
Text: {document}
Required Analysis:
1. Entity recognition
2. Sentiment analysis
3. Topic classification
4. Key phrase extraction
5. Intent detection
Provide structured analysis with confidence scores.
"""
def _parse_nlp_result(self, result: str) -> Dict:
# Implementation of NLP result parsing
pass
# Usage
pipeline = NLPPipeline()
documents = [
"We're extremely satisfied with the new product features!",
"The customer service response time needs improvement.",
"Looking to upgrade our subscription plan next month."
]
analyses = pipeline.process_documents(documents)
Available Models and Use Cases
Model | Provider | Optimal Use Cases | Characteristics |
---|---|---|---|
gpt-4-turbo | OpenAI | Complex reasoning, Code generation, Creative writing | High accuracy, Latest knowledge cutoff |
claude-3-opus | Anthropic | Research analysis, Technical documentation, Long-form content | Strong reasoning, Detailed outputs |
gemini-pro | Multimodal tasks, Code generation, Technical analysis | Fast inference, Strong coding abilities | |
mistral-large | Mistral | General tasks, Content generation, Classification | Open source, Good price/performance |
deepseek-reasoner | DeepSeek | Mathematical analysis, Logic problems, Scientific computing | Specialized reasoning capabilities |
Provider Capabilities
Provider | Strengths | Best For | Integration Notes |
---|---|---|---|
OpenAI | Consistent performance, Strong reasoning | Production systems, Complex tasks | Requires API key setup |
Anthropic | Safety features, Detailed analysis | Research, Technical writing | Claude-specific formatting |
Technical tasks, Multimodal support | Code generation, Analysis | Vertex AI integration available | |
Groq | High-speed inference | Real-time applications | Optimized for specific models |
DeepSeek | Specialized reasoning | Scientific computing | Custom API integration |
Mistral | Open source flexibility | General applications | Self-hosted options available |
Performance Optimization Tips
-
Token Management
- Set appropriate max_tokens based on task complexity
- Monitor token usage for cost optimization
- Use streaming for long outputs
-
Concurrency Settings
- Adjust max_workers based on system resources
- Use "auto" workers for optimal CPU utilization
- Monitor memory usage with large batch sizes
-
Temperature Tuning
- Lower (0.1-0.3) for factual/analytical tasks
- Higher (0.7-0.9) for creative tasks
- Mid-range (0.4-0.6) for balanced outputs
-
System Prompts
- Customize for specific domains
- Include relevant context
- Define clear output formats
Dependencies
- asyncio: Asynchronous I/O support
- concurrent.futures: Thread pool execution
- pydantic: Data validation
- litellm: LLM interface standardization