[7.0.4] [FEAT] [ModelRouter + docs] [docs fix] [docs][env docs]

pull/767/head
Kye Gomez 3 months ago
parent c1464d2ea7
commit 3140b69fbe

@ -1,4 +1,48 @@
# Framework Configuration
WORKSPACE_DIR="agent_workspace" WORKSPACE_DIR="agent_workspace"
SWARMS_VERBOSE_GLOBAL="False"
SWARMS_API_KEY="" SWARMS_API_KEY=""
USE_TELEMETRY=True
OPENAI_API_KEY="sk-" # Model Provider API Keys
## OpenAI
OPENAI_API_KEY=""
## Anthropic
ANTHROPIC_API_KEY=""
## Google
GEMINI_API_KEY=""
## Hugging Face
HUGGINGFACE_TOKEN=""
## Perplexity AI
PPLX_API_KEY=""
## AI21
AI21_API_KEY=""
# Tool Provider API Keys
## Search Tools
BING_BROWSER_API=""
BRAVESEARCH_API_KEY=""
TAVILY_API_KEY=""
YOU_API_KEY=""
## Analytics & Monitoring
AGENTOPS_API_KEY=""
EXA_API_KEY=""
## Browser Automation
MULTION_API_KEY=""
## Other Tools
HCP_APP_ID=""
# Cloud Provider Configuration
## Azure OpenAI
AZURE_OPENAI_ENDPOINT=""
AZURE_OPENAI_DEPLOYMENT=""
OPENAI_API_VERSION=""
AZURE_OPENAI_API_KEY=""
AZURE_OPENAI_AD_TOKEN=""

@ -5,5 +5,5 @@ Agent(
model_name="deepseek/deepseek-reasoner", model_name="deepseek/deepseek-reasoner",
max_loops="auto", max_loops="auto",
interactive=True, interactive=True,
streaming_on=True, streaming_on=False,
).run("What are 5 hft algorithms") ).run("What are 5 hft algorithms")

@ -149,6 +149,7 @@ nav:
- Quickstart: "swarms/install/quickstart.md" - Quickstart: "swarms/install/quickstart.md"
- Swarms CLI: "swarms/cli/main.md" - Swarms CLI: "swarms/cli/main.md"
- Swarms Framework Architecture: "swarms/concept/framework_architecture.md" - Swarms Framework Architecture: "swarms/concept/framework_architecture.md"
- Environment Variables: "swarms/install/env.md"
# - Prelimary: # - Prelimary:
# - 80/20 Rule For Agents: "swarms/prompting/8020.md" # - 80/20 Rule For Agents: "swarms/prompting/8020.md"
- Agents: - Agents:
@ -189,6 +190,7 @@ nav:
- SwarmRearrange: "swarms/structs/swarm_rearrange.md" - SwarmRearrange: "swarms/structs/swarm_rearrange.md"
- MultiAgentRouter: "swarms/structs/multi_agent_router.md" - MultiAgentRouter: "swarms/structs/multi_agent_router.md"
- MatrixSwarm: "swarms/structs/matrix_swarm.md" - MatrixSwarm: "swarms/structs/matrix_swarm.md"
- ModelRouter: "swarms/structs/model_router.md"
- Various Execution Methods: "swarms/structs/various_execution_methods.md" - Various Execution Methods: "swarms/structs/various_execution_methods.md"
- Workflows: - Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
@ -196,7 +198,6 @@ nav:
- SequentialWorkflow: "swarms/structs/sequential_workflow.md" - SequentialWorkflow: "swarms/structs/sequential_workflow.md"
- Structs: - Structs:
- Conversation: "swarms/structs/conversation.md" - Conversation: "swarms/structs/conversation.md"
# - Task: "swarms/structs/task.md"
- Full API Reference: "swarms/framework/reference.md" - Full API Reference: "swarms/framework/reference.md"
- Examples: - Examples:
- Unique Swarms: "swarms/examples/unique_swarms.md" - Unique Swarms: "swarms/examples/unique_swarms.md"

@ -0,0 +1,187 @@
# Environment Variable Management & Security
This guide provides comprehensive documentation for managing environment variables and API keys securely in the Swarms framework.
## Overview
Swarms uses environment variables for configuration management and secure credential storage. This approach keeps sensitive information like API keys out of your code and allows for easy configuration changes across different environments.
## Core Environment Variables
### Framework Configuration
- `SWARMS_VERBOSE_GLOBAL`: Controls global logging verbosity
```bash
SWARMS_VERBOSE_GLOBAL="True" # Enable verbose logging
SWARMS_VERBOSE_GLOBAL="False" # Disable verbose logging
```
- `WORKSPACE_DIR`: Defines the workspace directory for agent operations
```bash
WORKSPACE_DIR="agent_workspace"
```
### API Keys
#### Model Provider Keys
1. **OpenAI**
- `OPENAI_API_KEY`: Authentication for GPT models
```bash
OPENAI_API_KEY="your-openai-key"
```
2. **Anthropic**
- `ANTHROPIC_API_KEY`: Authentication for Claude models
```bash
ANTHROPIC_API_KEY="your-anthropic-key"
```
3. **Google**
- `GEMINI_API_KEY`: Authentication for Gemini models
4. **Hugging Face**
- `HUGGINGFACE_TOKEN`: Access to Hugging Face models
5. **Perplexity AI**
- `PPLX_API_KEY`: Access to Perplexity models
6. **AI21**
- `AI21_API_KEY`: Access to AI21 models
#### Tool Provider Keys
1. **Search Tools**
- `BING_BROWSER_API`: Bing search capabilities
- `BRAVESEARCH_API_KEY`: Brave search integration
- `TAVILY_API_KEY`: Tavily search services
- `YOU_API_KEY`: You.com search integration
2. **Analytics & Monitoring**
- `AGENTOPS_API_KEY`: AgentOps monitoring
- `EXA_API_KEY`: Exa.ai services
3. **Browser Automation**
- `MULTION_API_KEY`: Multi-browser automation
## Security Best Practices
### 1. Environment File Management
- Create a `.env` file in your project root
- Never commit `.env` files to version control
- Add `.env` to your `.gitignore`:
```bash
echo ".env" >> .gitignore
```
### 2. API Key Security
- Rotate API keys regularly
- Use different API keys for development and production
- Never hardcode API keys in your code
- Limit API key permissions to only what's necessary
- Monitor API key usage for unusual patterns
### 3. Template Configuration
Create a `.env.example` template without actual values:
```bash
# Required Configuration
OPENAI_API_KEY=""
ANTHROPIC_API_KEY=""
WORKSPACE_DIR="agent_workspace"
# Optional Configuration
SWARMS_VERBOSE_GLOBAL="False"
```
### 4. Loading Environment Variables
```python
from dotenv import load_dotenv
import os
# Load environment variables
load_dotenv()
# Access variables
workspace_dir = os.getenv("WORKSPACE_DIR")
openai_key = os.getenv("OPENAI_API_KEY")
```
## Environment Setup Guide
1. **Install Dependencies**:
```bash
pip install python-dotenv
```
2. **Create Environment File**:
```bash
cp .env.example .env
```
3. **Configure Variables**:
- Open `.env` in your text editor
- Add your API keys and configuration
- Save the file
4. **Verify Setup**:
```python
import os
from dotenv import load_dotenv
load_dotenv()
assert os.getenv("OPENAI_API_KEY") is not None, "OpenAI API key not found"
```
## Environment-Specific Configuration
### Development
```bash
WORKSPACE_DIR="agent_workspace"
SWARMS_VERBOSE_GLOBAL="True"
```
### Production
```bash
WORKSPACE_DIR="/var/swarms/workspace"
SWARMS_VERBOSE_GLOBAL="False"
```
### Testing
```bash
WORKSPACE_DIR="test_workspace"
SWARMS_VERBOSE_GLOBAL="True"
```
## Troubleshooting
### Common Issues
1. **Environment Variables Not Loading**
- Verify `.env` file exists in project root
- Confirm `load_dotenv()` is called before accessing variables
- Check file permissions
2. **API Key Issues**
- Verify key format is correct
- Ensure key has not expired
- Check for leading/trailing whitespace
3. **Workspace Directory Problems**
- Confirm directory exists
- Verify write permissions
- Check path is absolute when required
## Additional Resources
- [Swarms Documentation](https://docs.swarms.world)
- [Security Best Practices](https://swarms.world/security)
- [API Documentation](https://swarms.world/docs/api)

@ -0,0 +1,361 @@
# ModelRouter Docs
The ModelRouter is an intelligent routing system that automatically selects and executes AI models based on task requirements. It leverages a function-calling architecture to analyze tasks and recommend the optimal model and provider combination for each specific use case.
### Key Features
- Dynamic model selection based on task complexity and requirements
- Multi-provider support (OpenAI, Anthropic, Google, etc.)
- Concurrent and asynchronous execution capabilities
- Batch processing with memory
- Automatic error handling and retries
- Provider-aware routing
- Cost optimization
### Constructor Arguments
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| system_prompt | str | model_router_system_prompt | Custom prompt for guiding model selection behavior |
| max_tokens | int | 4000 | Maximum token limit for model outputs |
| temperature | float | 0.5 | Control parameter for response randomness (0.0-1.0) |
| max_workers | int/str | 10 | Maximum concurrent workers ("auto" for CPU count) |
| api_key | str | None | API key for model access |
| max_loops | int | 1 | Maximum number of refinement iterations |
| *args | Any | None | Additional positional arguments |
| **kwargs | Any | None | Additional keyword arguments |
### Core Methods
#### run(task: str) -> str
Executes a single task through the model router with memory and refinement capabilities.
# Installation
1. Install the latest version of swarms using pip:
```bash
pip3 install -U swarms
```
2. Setup your API Keys in your .env file with the following:
```bash
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
GOOGLE_API_KEY=your_google_api_key
# Add more API keys as needed following litellm format
```
```python
from swarms import ModelRouter
router = ModelRouter()
# Simple text analysis
result = router.run("Analyze the sentiment and key themes in this customer feedback")
# Complex reasoning task
complex_result = router.run("""
Evaluate the following business proposal:
- Initial investment: $500,000
- Projected ROI: 25% annually
- Market size: $2B
- Competition: 3 major players
Provide detailed analysis and recommendations.
""")
```
#### batch_run(tasks: list) -> list
Executes multiple tasks sequentially with result aggregation.
```python
# Multiple analysis tasks
tasks = [
"Analyze Q1 financial performance",
"Predict Q2 market trends",
"Evaluate competitor strategies",
"Generate growth recommendations"
]
results = router.batch_run(tasks)
# Process results
for task, result in zip(tasks, results):
print(f"Task: {task}\nResult: {result}\n")
```
#### concurrent_run(tasks: list) -> list
Parallel execution of multiple tasks using thread pooling.
```python
import asyncio
from typing import List
# Define multiple concurrent tasks
analysis_tasks = [
"Perform technical analysis of AAPL stock",
"Analyze market sentiment from social media",
"Generate trading signals",
"Calculate risk metrics"
]
# Execute tasks concurrently
results = router.concurrent_run(analysis_tasks)
# Process results with error handling
for task, result in zip(analysis_tasks, results):
try:
processed_result = process_analysis(result)
save_to_database(processed_result)
except Exception as e:
log_error(f"Error processing {task}: {str(e)}")
```
#### async_run(task: str) -> asyncio.Task
Asynchronous task execution with coroutine support.
```python
async def process_data_stream():
tasks = []
async for data in data_stream:
task = await router.async_run(f"Process data: {data}")
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
# Usage in async context
async def main():
router = ModelRouter()
results = await process_data_stream()
```
### Advanced Usage Examples
#### Financial Analysis System
```python
from swarms import ModelRouter
from typing import Dict, List
import pandas as pd
class FinancialAnalysisSystem:
def __init__(self):
self.router = ModelRouter(
temperature=0.3, # Lower temperature for more deterministic outputs
max_tokens=8000, # Higher token limit for detailed analysis
max_loops=2 # Allow for refinement iteration
)
def analyze_company_financials(self, financial_data: Dict) -> Dict:
analysis_task = f"""
Perform comprehensive financial analysis:
Financial Metrics:
- Revenue: ${financial_data['revenue']}M
- EBITDA: ${financial_data['ebitda']}M
- Debt/Equity: {financial_data['debt_equity']}
- Working Capital: ${financial_data['working_capital']}M
Required Analysis:
1. Profitability assessment
2. Liquidity analysis
3. Growth projections
4. Risk evaluation
5. Investment recommendations
Provide detailed insights and actionable recommendations.
"""
result = self.router.run(analysis_task)
return self._parse_analysis_result(result)
def _parse_analysis_result(self, result: str) -> Dict:
# Implementation of result parsing
pass
# Usage
analyzer = FinancialAnalysisSystem()
company_data = {
'revenue': 150,
'ebitda': 45,
'debt_equity': 0.8,
'working_capital': 25
}
analysis = analyzer.analyze_company_financials(company_data)
```
#### Healthcare Data Processing Pipeline
```python
from swarms import ModelRouter
import pandas as pd
from typing import List, Dict
class MedicalDataProcessor:
def __init__(self):
self.router = ModelRouter(
max_workers="auto", # Automatic worker scaling
temperature=0.2, # Conservative temperature for medical analysis
system_prompt="""You are a specialized medical data analyzer focused on:
1. Clinical terminology interpretation
2. Patient data analysis
3. Treatment recommendation review
4. Medical research synthesis"""
)
async def process_patient_records(self, records: List[Dict]) -> List[Dict]:
analysis_tasks = []
for record in records:
task = f"""
Analyze patient record:
- Age: {record['age']}
- Symptoms: {', '.join(record['symptoms'])}
- Vital Signs: {record['vitals']}
- Medications: {', '.join(record['medications'])}
- Lab Results: {record['lab_results']}
Provide:
1. Symptom analysis
2. Medication interaction check
3. Lab results interpretation
4. Treatment recommendations
"""
analysis_tasks.append(task)
results = await asyncio.gather(*[
self.router.async_run(task) for task in analysis_tasks
])
return [self._parse_medical_analysis(r) for r in results]
def _parse_medical_analysis(self, analysis: str) -> Dict:
# Implementation of medical analysis parsing
pass
# Usage
async def main():
processor = MedicalDataProcessor()
patient_records = [
{
'age': 45,
'symptoms': ['fever', 'cough', 'fatigue'],
'vitals': {'bp': '120/80', 'temp': '38.5C'},
'medications': ['lisinopril', 'metformin'],
'lab_results': 'WBC: 11,000, CRP: 2.5'
}
# More records...
]
analyses = await processor.process_patient_records(patient_records)
```
#### Natural Language Processing Pipeline
```python
from swarms import ModelRouter
from typing import List, Dict
import asyncio
class NLPPipeline:
def __init__(self):
self.router = ModelRouter(
temperature=0.4,
max_loops=2
)
def process_documents(self, documents: List[str]) -> List[Dict]:
tasks = [self._create_nlp_task(doc) for doc in documents]
results = self.router.concurrent_run(tasks)
return [self._parse_nlp_result(r) for r in results]
def _create_nlp_task(self, document: str) -> str:
return f"""
Perform comprehensive NLP analysis:
Text: {document}
Required Analysis:
1. Entity recognition
2. Sentiment analysis
3. Topic classification
4. Key phrase extraction
5. Intent detection
Provide structured analysis with confidence scores.
"""
def _parse_nlp_result(self, result: str) -> Dict:
# Implementation of NLP result parsing
pass
# Usage
pipeline = NLPPipeline()
documents = [
"We're extremely satisfied with the new product features!",
"The customer service response time needs improvement.",
"Looking to upgrade our subscription plan next month."
]
analyses = pipeline.process_documents(documents)
```
### Available Models and Use Cases
| Model | Provider | Optimal Use Cases | Characteristics |
|-------|----------|-------------------|-----------------|
| gpt-4-turbo | OpenAI | Complex reasoning, Code generation, Creative writing | High accuracy, Latest knowledge cutoff |
| claude-3-opus | Anthropic | Research analysis, Technical documentation, Long-form content | Strong reasoning, Detailed outputs |
| gemini-pro | Google | Multimodal tasks, Code generation, Technical analysis | Fast inference, Strong coding abilities |
| mistral-large | Mistral | General tasks, Content generation, Classification | Open source, Good price/performance |
| deepseek-reasoner | DeepSeek | Mathematical analysis, Logic problems, Scientific computing | Specialized reasoning capabilities |
### Provider Capabilities
| Provider | Strengths | Best For | Integration Notes |
|----------|-----------|-----------|------------------|
| OpenAI | Consistent performance, Strong reasoning | Production systems, Complex tasks | Requires API key setup |
| Anthropic | Safety features, Detailed analysis | Research, Technical writing | Claude-specific formatting |
| Google | Technical tasks, Multimodal support | Code generation, Analysis | Vertex AI integration available |
| Groq | High-speed inference | Real-time applications | Optimized for specific models |
| DeepSeek | Specialized reasoning | Scientific computing | Custom API integration |
| Mistral | Open source flexibility | General applications | Self-hosted options available |
### Performance Optimization Tips
1. Token Management
- Set appropriate max_tokens based on task complexity
- Monitor token usage for cost optimization
- Use streaming for long outputs
2. Concurrency Settings
- Adjust max_workers based on system resources
- Use "auto" workers for optimal CPU utilization
- Monitor memory usage with large batch sizes
3. Temperature Tuning
- Lower (0.1-0.3) for factual/analytical tasks
- Higher (0.7-0.9) for creative tasks
- Mid-range (0.4-0.6) for balanced outputs
4. System Prompts
- Customize for specific domains
- Include relevant context
- Define clear output formats
### Dependencies
- asyncio: Asynchronous I/O support
- concurrent.futures: Thread pool execution
- pydantic: Data validation
- litellm: LLM interface standardization

@ -0,0 +1,11 @@
import os
from swarms.structs.model_router import ModelRouter
from dotenv import load_dotenv
load_dotenv()
model_router = ModelRouter(api_key=os.getenv("OPENAI_API_KEY"))
model_router.run(
"What are the best ways to analyze macroeconomic data? Use openai gpt-4o models"
)

@ -0,0 +1,130 @@
import os
from dotenv import load_dotenv
# Swarm imports
from swarms.structs.agent import Agent
from swarms.structs.hiearchical_swarm import (
HierarchicalSwarm,
SwarmSpec,
)
from swarms.utils.function_caller_model import OpenAIFunctionCaller
load_dotenv()
# ------------------------------------------------------------------------------
# Director LLM: Responsible for orchestrating tasks among the agents
# ------------------------------------------------------------------------------
llm = OpenAIFunctionCaller(
base_model=SwarmSpec,
api_key=os.getenv("OPENAI_API_KEY"),
system_prompt=(
"As the Director of this Hierarchical Agent Swarm, you are in charge of "
"coordinating and overseeing all tasks, ensuring that each is executed "
"efficiently and effectively by the appropriate agents. You must:\n\n"
"1. **Analyze** the user's request and **formulate** a strategic plan.\n"
"2. **Assign** tasks to the relevant agents, detailing **why** each task "
"is relevant and **what** is expected in the deliverables.\n"
"3. **Monitor** agent outputs and, if necessary, provide **constructive "
"feedback** or request **clarifications**.\n"
"4. **Iterate** this process until all tasks are completed to a high "
"standard, or until the swarm has reached the maximum feedback loops.\n\n"
"Remember:\n"
"- **Only** use the agents provided; do not invent extra roles.\n"
"- If you need additional information, request it from the user.\n"
"- Strive to produce a clear, comprehensive **final output** that addresses "
"the user's needs.\n"
"- Keep the tone **professional** and **informative**. If there's uncertainty, "
"politely request further details.\n"
"- Ensure that any steps you outline are **actionable**, **logical**, and "
"**transparent** to the user.\n\n"
"Your effectiveness hinges on clarity, structured delegation, and thoroughness. "
"Always focus on delivering the best possible outcome for the user's request."
),
temperature=0.5,
max_tokens=8196,
)
def main():
# --------------------------------------------------------------------------
# Agent: Stock-Analysis-Agent
# --------------------------------------------------------------------------
# This agent is responsible for:
# - Gathering and interpreting financial data
# - Identifying market trends and patterns
# - Providing clear, actionable insights or recommendations
# --------------------------------------------------------------------------
analysis_agent = Agent(
agent_name="Stock-Analysis-Agent",
model_name="gpt-4o",
max_loops=1,
interactive=False,
streaming_on=False,
system_prompt=(
"As the Stock Analysis Agent, your primary responsibilities include:\n\n"
"1. **Market Trend Analysis**: Evaluate current and historical market data "
"to identify trends, patterns, and potential investment opportunities.\n"
"2. **Risk & Opportunity Assessment**: Pinpoint specific factors—whether "
"macroeconomic indicators, sector-specific trends, or company fundamentals—"
"that can guide informed investment decisions.\n"
"3. **Reporting & Recommendations**: Present your findings in a structured, "
"easy-to-understand format, offering actionable insights. Include potential "
"caveats or uncertainties in your assessment.\n\n"
"Operational Guidelines:\n"
"- If additional data or clarifications are needed, explicitly request them "
"from the Director.\n"
"- Keep your output **concise** yet **comprehensive**. Provide clear "
"rationales for each recommendation.\n"
"- Clearly state any **assumptions** or **limitations** in your analysis.\n"
"- Remember: You are not a financial advisor, and final decisions rest with "
"the user. Include necessary disclaimers.\n\n"
"Goal:\n"
"Deliver high-quality, well-substantiated stock market insights that can be "
"used to guide strategic investment decisions."
),
)
# --------------------------------------------------------------------------
# Hierarchical Swarm Setup
# --------------------------------------------------------------------------
# - Director: llm
# - Agents: [analysis_agent]
# - max_loops: Maximum number of feedback loops between director & agents
# --------------------------------------------------------------------------
swarm = HierarchicalSwarm(
name="HierarchicalAgentSwarm",
description=(
"A specialized swarm in which the Director delegates tasks to a Stock "
"Analysis Agent for thorough market evaluation."
),
director=llm,
agents=[analysis_agent],
max_loops=2, # Limit on feedback iterations
)
# --------------------------------------------------------------------------
# Execution
# --------------------------------------------------------------------------
# The director receives the user's instruction: "Ask the stock analysis agent
# to analyze the stock market." The Director will then:
# 1. Formulate tasks (SwarmSpec)
# 2. Assign tasks to the Stock-Analysis-Agent
# 3. Provide feedback and/or request clarifications
# 4. Produce a final response
# --------------------------------------------------------------------------
user_request = (
"Please provide an in-depth analysis of the current stock market, "
"focusing on:\n"
"- Key macroeconomic factors affecting market momentum.\n"
"- Potential short-term vs. long-term opportunities.\n"
"- Sector performance trends (e.g., technology, healthcare, energy).\n"
"Highlight any risks, disclaimers, or uncertainties."
)
# Run the swarm with the user_request
print(swarm.run(user_request))
if __name__ == "__main__":
main()

@ -0,0 +1,219 @@
import os
from dotenv import load_dotenv
# Swarm imports
from swarms.structs.agent import Agent
from swarms.structs.hiearchical_swarm import (
HierarchicalSwarm,
SwarmSpec,
)
from swarms.utils.function_caller_model import OpenAIFunctionCaller
load_dotenv()
# ------------------------------------------------------------------------------
# Trading Director: Responsible for orchestrating tasks among multiple stock analysts
# ------------------------------------------------------------------------------
director_llm = OpenAIFunctionCaller(
base_model=SwarmSpec,
api_key=os.getenv("OPENAI_API_KEY"),
system_prompt=(
"You are the Trading Director in charge of coordinating a team of specialized "
"Stock Analysts. Your responsibilities include:\n\n"
"1. **Analyze** the user's request and **break it down** into actionable tasks.\n"
"2. **Assign** tasks to the relevant analysts, explaining **why** each task is "
"important and **what** each analyst should deliver.\n"
"3. **Review** all analyst outputs, providing **feedback** or **clarifications** "
"to ensure thoroughness and accuracy.\n"
"4. **Consolidate** final insights into a cohesive, actionable, and "
"easy-to-understand response for the user.\n\n"
"Guidelines:\n"
"- You can only delegate to the analysts assigned to this swarm.\n"
"- If essential data or clarifications are needed, request them from the user.\n"
"- Be direct, structured, and analytical. Present each key point clearly.\n"
"- Strive for a polished **final output** that addresses the user's request.\n"
"- If uncertainties remain, politely highlight them or request more info.\n\n"
"Overarching Goal:\n"
"Maximize the value of insights provided to the user by thoroughly leveraging "
"each analysts specialization, while maintaining a professional and "
"transparent communication style."
),
temperature=0.5,
max_tokens=8196,
)
def main():
# --------------------------------------------------------------------------
# Agent 1: Macro-Economic-Analysis-Agent
# --------------------------------------------------------------------------
# Focus: Assess macroeconomic factors like inflation, interest rates, GDP growth, etc.
# --------------------------------------------------------------------------
macro_agent = Agent(
agent_name="Macro-Economic-Analysis-Agent",
model_name="gpt-4o",
max_loops=1,
interactive=False,
streaming_on=False,
system_prompt=(
"As the Macro-Economic Analysis Agent, your mission is to:\n\n"
"1. **Identify** the key macroeconomic indicators impacting the market.\n"
"2. **Interpret** how factors like inflation, interest rates, and fiscal "
"policies influence market sentiment.\n"
"3. **Connect** these insights to specific investment opportunities or "
"risks across various sectors.\n\n"
"Guidelines:\n"
"- Provide clear, data-driven rationales.\n"
"- Highlight potential global events or policy decisions that may shift "
"market conditions.\n"
"- Request further details if needed, and state any assumptions or "
"limitations.\n\n"
"Outcome:\n"
"Deliver a concise but thorough macroeconomic overview that the Trading "
"Director can combine with other analyses to inform strategy."
),
)
# --------------------------------------------------------------------------
# Agent 2: Sector-Performance-Analysis-Agent
# --------------------------------------------------------------------------
# Focus: Drill down into sector-level trends, e.g., technology, healthcare, energy, etc.
# --------------------------------------------------------------------------
sector_agent = Agent(
agent_name="Sector-Performance-Analysis-Agent",
model_name="gpt-4o",
max_loops=1,
interactive=False,
streaming_on=False,
system_prompt=(
"As the Sector Performance Analysis Agent, your responsibilities are:\n\n"
"1. **Evaluate** recent performance trends across key sectors—technology, "
"healthcare, energy, finance, and more.\n"
"2. **Identify** sector-specific drivers (e.g., regulatory changes, "
"consumer demand shifts, innovation trends).\n"
"3. **Highlight** which sectors may offer short-term or long-term "
"opportunities.\n\n"
"Guidelines:\n"
"- Focus on factual, data-backed observations.\n"
"- Cite any significant indicators or company-level news that might affect "
"the sector broadly.\n"
"- Clarify the confidence level of your sector outlook and note any "
"uncertainties.\n\n"
"Outcome:\n"
"Provide the Trading Director with actionable insights into sector-level "
"momentum and potential investment focal points."
),
)
# --------------------------------------------------------------------------
# Agent 3: Technical-Analysis-Agent
# --------------------------------------------------------------------------
# Focus: Evaluate price action, volume, and chart patterns to guide short-term
# trading strategies.
# --------------------------------------------------------------------------
technical_agent = Agent(
agent_name="Technical-Analysis-Agent",
model_name="gpt-4o",
max_loops=1,
interactive=False,
streaming_on=False,
system_prompt=(
"As the Technical Analysis Agent, you specialize in interpreting price "
"charts, volume trends, and indicators (e.g., RSI, MACD) to gauge short-term "
"momentum. Your tasks:\n\n"
"1. **Examine** current market charts for significant breakouts, support/resistance "
"levels, or technical signals.\n"
"2. **Identify** short-term trading opportunities or risks based on "
"technically-driven insights.\n"
"3. **Discuss** how these patterns align with or contradict fundamental "
"or macro perspectives.\n\n"
"Guidelines:\n"
"- Keep explanations accessible, avoiding excessive jargon.\n"
"- Point out levels or patterns that traders commonly monitor.\n"
"- Use disclaimers if there is insufficient data or conflicting signals.\n\n"
"Outcome:\n"
"Supply the Trading Director with technical viewpoints to complement broader "
"macro and sector analysis, supporting timely trading decisions."
),
)
# --------------------------------------------------------------------------
# Agent 4: Risk-Analysis-Agent
# --------------------------------------------------------------------------
# Focus: Evaluate risk factors and potential uncertainties, providing disclaimers and
# suggesting mitigations.
# --------------------------------------------------------------------------
risk_agent = Agent(
agent_name="Risk-Analysis-Agent",
model_name="gpt-4o",
max_loops=1,
interactive=False,
streaming_on=False,
system_prompt=(
"As the Risk Analysis Agent, your role is to:\n\n"
"1. **Identify** key risks and uncertainties—regulatory, geopolitical, "
"currency fluctuations, etc.\n"
"2. **Assess** how these risks could impact investor sentiment or portfolio "
"volatility.\n"
"3. **Recommend** risk mitigation strategies or cautionary steps.\n\n"
"Guidelines:\n"
"- Present both systemic (market-wide) and idiosyncratic (company/sector) risks.\n"
"- Be transparent about unknowns or data gaps.\n"
"- Provide disclaimers on market unpredictability.\n\n"
"Outcome:\n"
"Offer the Trading Director a detailed risk framework that helps balance "
"aggressive and defensive positions."
),
)
# --------------------------------------------------------------------------
# Hierarchical Swarm Setup
# --------------------------------------------------------------------------
# - Director: director_llm
# - Agents: [macro_agent, sector_agent, technical_agent, risk_agent]
# - max_loops: Up to 2 feedback loops between director and agents
# --------------------------------------------------------------------------
swarm = HierarchicalSwarm(
name="HierarchicalStockAnalysisSwarm",
description=(
"A specialized swarm consisting of a Trading Director overseeing four "
"Stock Analysts, each focusing on Macro, Sector, Technical, and Risk "
"perspectives."
),
director=director_llm,
agents=[
macro_agent,
sector_agent,
technical_agent,
risk_agent,
],
max_loops=2, # Limit on feedback iterations
)
# --------------------------------------------------------------------------
# Execution
# --------------------------------------------------------------------------
# Example user request for the entire team:
# 1. Discuss key macroeconomic factors (inflation, interest rates, etc.)
# 2. Analyze sector-level performance (technology, healthcare, energy).
# 3. Give short-term technical signals and levels to watch.
# 4. Outline major risks or uncertainties.
# --------------------------------------------------------------------------
user_request = (
"Please provide a comprehensive analysis of the current stock market, "
"covering:\n"
"- Key macroeconomic drivers affecting market momentum.\n"
"- Which sectors seem likely to outperform in the near vs. long term.\n"
"- Any notable technical signals or price levels to monitor.\n"
"- Potential risks or uncertainties that might disrupt market performance.\n"
"Include clear disclaimers about the limitations of these analyses."
"Call the risk analysis agent only"
)
# Run the swarm with the user_request
final_output = swarm.run(user_request)
print(final_output)
if __name__ == "__main__":
main()

@ -0,0 +1,9 @@
from swarms import Agent
Agent(
agent_name="Stock-Analysis-Agent",
model_name="groq/deepseek-r1-distill-llama-70b",
max_loops="auto",
interactive=True,
streaming_on=False,
).run("What are the best ways to analyze macroeconomic data?")

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "7.0.3" version = "7.0.4"
description = "Swarms - TGSC" description = "Swarms - TGSC"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -84,6 +84,7 @@ from swarms.structs.utils import (
from swarms.structs.meme_agent_persona_generator import ( from swarms.structs.meme_agent_persona_generator import (
MemeAgentGenerator, MemeAgentGenerator,
) )
from swarms.structs.model_router import ModelRouter
__all__ = [ __all__ = [
"Agent", "Agent",
@ -160,4 +161,5 @@ __all__ = [
"expertise_based", "expertise_based",
"MultiAgentRouter", "MultiAgentRouter",
"MemeAgentGenerator", "MemeAgentGenerator",
"ModelRouter",
] ]

@ -563,7 +563,9 @@ class Agent:
max_loops=self.max_loops, max_loops=self.max_loops,
steps=self.short_memory.to_dict(), steps=self.short_memory.to_dict(),
full_history=self.short_memory.get_str(), full_history=self.short_memory.get_str(),
total_tokens=count_tokens(text=self.short_memory.get_str()), total_tokens=count_tokens(
text=self.short_memory.get_str()
),
stopping_token=self.stopping_token, stopping_token=self.stopping_token,
interactive=self.interactive, interactive=self.interactive,
dynamic_temperature_enabled=self.dynamic_temperature_enabled, dynamic_temperature_enabled=self.dynamic_temperature_enabled,

@ -4,9 +4,7 @@ from typing import List
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.meme_agent_persona_generator import ( from swarms.utils.function_caller_model import OpenAIFunctionCaller
OpenAIFunctionCaller,
)
from swarms.structs.swarm_router import SwarmRouter from swarms.structs.swarm_router import SwarmRouter
from swarms.utils.loguru_logger import initialize_logger from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.agents_available import showcase_available_agents from swarms.structs.agents_available import showcase_available_agents

@ -1,576 +1,292 @@
from typing import List, Any from typing import Any, List, Optional, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.concat import concat_strings from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.agent_registry import AgentRegistry
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from swarms.utils.formatter import formatter
logger = initialize_logger(log_folder="hiearchical_swarm") from swarms.utils.loguru_logger import initialize_logger
# Example usage:
HIEARCHICAL_AGENT_SYSTEM_PROMPT = """
Here's a full-fledged system prompt for a director boss agent, complete with instructions and many-shot examples:
---
**System Prompt: Director Boss Agent**
### Role:
You are a Director Boss Agent responsible for orchestrating a swarm of worker agents. Your primary duty is to serve the user efficiently, effectively, and skillfully. You dynamically create new agents when necessary or utilize existing agents, assigning them tasks that align with their capabilities. You must ensure that each agent receives clear, direct, and actionable instructions tailored to their role.
### Key Responsibilities:
1. **Task Delegation:** Assign tasks to the most relevant agent. If no relevant agent exists, create a new one with an appropriate name and system prompt.
2. **Efficiency:** Ensure that tasks are completed swiftly and with minimal resource expenditure.
3. **Clarity:** Provide orders that are simple, direct, and actionable. Avoid ambiguity.
4. **Dynamic Decision Making:** Assess the situation and choose the most effective path, whether that involves using an existing agent or creating a new one.
5. **Monitoring:** Continuously monitor the progress of each agent and provide additional instructions or corrections as necessary.
### Instructions:
- **Identify the Task:** Analyze the input task to determine its nature and requirements.
- **Agent Selection/Creation:**
- If an agent is available and suited for the task, assign the task to that agent.
- If no suitable agent exists, create a new agent with a relevant system prompt.
- **Task Assignment:** Provide the selected agent with explicit and straightforward instructions.
- **Reasoning:** Justify your decisions when selecting or creating agents, focusing on the efficiency and effectiveness of task completion.
"""
class AgentSpec(BaseModel):
"""
A class representing the specifications of an agent.
Attributes:
agent_name (str): The name of the agent.
system_prompt (str): The system prompt for the agent.
agent_description (str): The description of the agent.
max_tokens (int): The maximum number of tokens to generate in the API response.
temperature (float): A parameter that controls the randomness of the generated text.
context_window (int): The context window for the agent.
task (str): The main task for the agent.
"""
agent_name: str = Field(
...,
description="The name of the agent.",
)
system_prompt: str = Field(
...,
description="The system prompt for the agent. Write an extremely detailed system prompt for the agent.",
)
agent_description: str = Field(
...,
description="The description of the agent.",
)
task: str = Field(
...,
description="The main task for the agent.",
)
# class AgentTeam(BaseModel): logger = initialize_logger(log_folder="hierarchical_swarm")
# agents: List[AgentSpec] = Field(
# ...,
# description="The list of agents in the team",
# )
# flow: str = Field(
# ...,
# description="Agent Name -> ",
# )
# Schema to send orders to the agents class HierarchicalOrder(BaseModel):
class HierarchicalOrderCall(BaseModel):
agent_name: str = Field( agent_name: str = Field(
..., ...,
description="The name of the agent to assign the task to.", description="Specifies the name of the agent to which the task is assigned. This is a crucial element in the hierarchical structure of the swarm, as it determines the specific agent responsible for the task execution.",
) )
task: str = Field( task: str = Field(
..., ...,
description="The main specific task to be assigned to the agent. Be very specific and direct.", description="Defines the specific task to be executed by the assigned agent. This task is a key component of the swarm's plan and is essential for achieving the swarm's goals.",
) )
# For not agent creation class SwarmSpec(BaseModel):
class CallTeam(BaseModel): goals: str = Field(
# swarm_name: str = Field(
# ...,
# description="The name of the swarm: e.g., 'Marketing Swarm' or 'Finance Swarm'",
# )
rules: str = Field(
..., ...,
description="The rules for all the agents in the swarm: e.g., All agents must return code. Be very simple and direct", description="The goal of the swarm. This is the overarching objective that the swarm is designed to achieve. It guides the swarm's plan and the tasks assigned to the agents.",
) )
plan: str = Field( plan: str = Field(
..., ...,
description="The plan for the swarm: e.g., First create the agents, then assign tasks, then monitor progress", description="Outlines the sequence of actions to be taken by the swarm. This plan is a detailed roadmap that guides the swarm's behavior and decision-making.",
) )
orders: List[HierarchicalOrderCall]
class SwarmSpec(BaseModel):
"""
A class representing the specifications of a swarm of agents.
Attributes:
multiple_agents (List[AgentSpec]): The list of agents in the swarm.
"""
swarm_name: str = Field(
...,
description="The name of the swarm: e.g., 'Marketing Swarm' or 'Finance Swarm'",
)
multiple_agents: List[AgentSpec]
rules: str = Field( rules: str = Field(
..., ...,
description="The rules for all the agents in the swarm: e.g., All agents must return code. Be very simple and direct", description="Defines the governing principles for swarm behavior and decision-making. These rules are the foundation of the swarm's operations and ensure that the swarm operates in a coordinated and efficient manner.",
) )
plan: str = Field( orders: List[HierarchicalOrder] = Field(
..., ...,
description="The plan for the swarm: e.g., First create the agents, then assign tasks, then monitor progress", description="A collection of task assignments to specific agents within the swarm. These orders are the specific instructions that guide the agents in their task execution and are a key element in the swarm's plan.",
) )
class HierarchicalAgentSwarm(BaseSwarm): class HierarchicalSwarm(BaseSwarm):
""" """
A class to create and manage a hierarchical swarm of agents. Represents a hierarchical swarm of agents, with a director that orchestrates tasks among the agents.
Methods:
__init__(system_prompt, max_tokens, temperature, base_model, parallel_tool_calls): Initializes the function caller.
create_agent(agent_name, system_prompt, agent_description, max_tokens, temperature, context_window): Creates an individual agent.
parse_json_for_agents_then_create_agents(function_call): Parses a JSON function call to create multiple agents.
run(task): Runs the function caller to create and execute agents based on the provided task.
""" """
def __init__( def __init__(
self, self,
name: str = "HierarchicalAgentSwarm", name: str = "HierarchicalAgentSwarm",
description: str = "A swarm of agents that can be used to distribute tasks to a team of agents.", description: str = "Distributed task swarm",
director: Any = None, director: Optional[Union[Agent, Any]] = None,
agents: List[Agent] = None, agents: List[Union[Agent, Any]] = None,
max_loops: int = 1, max_loops: int = 1,
create_agents_on: bool = False, return_all_history: bool = False,
template_worker_agent: Agent = None,
director_planning_prompt: str = None,
template_base_worker_llm: Any = None,
swarm_history: str = None,
*args, *args,
**kwargs, **kwargs,
): ):
""" """
Initializes the HierarchicalAgentSwarm with an OpenAIFunctionCaller. Initializes the HierarchicalSwarm with the given parameters.
Args: :param name: The name of the swarm.
system_prompt (str): The system prompt for the function caller. :param description: A description of the swarm.
max_tokens (int): The maximum number of tokens to generate in the API response. :param director: The director agent that orchestrates tasks.
temperature (float): The temperature setting for text generation. :param agents: A list of agents within the swarm.
base_model (BaseModel): The base model for the function caller. :param max_loops: The maximum number of feedback loops between the director and agents.
parallel_tool_calls (bool): Whether to run tool calls in parallel. :param return_all_history: A flag indicating whether to return all conversation history.
""" """
super().__init__( super().__init__(
name=name, name=name,
description=description, description=description,
agents=agents, agents=agents,
*args,
**kwargs,
) )
self.name = name
self.description = description
self.director = director self.director = director
self.agents = agents self.agents = agents
self.max_loops = max_loops self.max_loops = max_loops
self.create_agents_on = create_agents_on self.return_all_history = return_all_history
self.template_worker_agent = template_worker_agent self.conversation = Conversation(time_enabled=True)
self.director_planning_prompt = director_planning_prompt
self.template_base_worker_llm = template_base_worker_llm
self.swarm_history = swarm_history
# Check if the agents are set self.add_name_and_description()
self.agents_check()
# Agent Registry self.check_agents()
self.agent_registry = AgentRegistry()
# Add agents to the registry formatter.print_panel(self.list_all_agents())
self.add_agents_into_registry(self.agents)
# Swarm History def check_agents(self):
self.conversation = Conversation(time_enabled=True) """
Checks if there are any agents and a director set for the swarm.
Raises ValueError if either condition is not met.
"""
if not self.agents:
raise ValueError("No agents found")
self.swarm_history = ( if not self.director:
self.conversation.return_history_as_string() raise ValueError("Director not set")
)
def agents_check(self): def run_director(
if self.director is None: self, task: str, img: str = None, *args, **kwargs
raise ValueError("The director is not set.") ):
"""
Runs a task through the director agent.
if len(self.agents) == 0: :param task: The task to be executed by the director.
self.create_agents_on = True :param img: Optional image to be used with the task.
:return: The output of the director's task execution.
"""
logger.info(f"Running director task: {task}")
if len(self.agents) > 0: self.conversation.add(role="User", content=f"Task: {task}")
self.director.base_model = CallTeam
self.director.system_prompt = ( function_call = self.director.run(
HIEARCHICAL_AGENT_SYSTEM_PROMPT task=f"History: {self.conversation.get_str()} Your Task: {task}",
) )
if self.max_loops == 0: formatter.print_panel(f"Director Output: {function_call}")
raise ValueError("The max_loops is not set.")
def add_agents_into_registry(self, agents: List[Agent]):
"""
add_agents_into_registry: Add agents into the agent registry.
Args:
agents (List[Agent]): A list of agents to add into the registry.
Returns: return function_call
None
def run(self, task: str, img: str = None, *args, **kwargs) -> str:
""" """
for agent in agents: Runs a task through the swarm, involving the director and agents.
self.agent_registry.add(agent)
def create_agent( :param task: The task to be executed by the swarm.
self, :param img: Optional image to be used with the task.
agent_name: str, :return: The output of the swarm's task execution.
system_prompt: str,
agent_description: str,
task: str = None,
) -> str:
""" """
Creates an individual agent. self.conversation.add(role="User", content=f"Task: {task}")
Args:
agent_name (str): The name of the agent.
system_prompt (str): The system prompt for the agent.
agent_description (str): The description of the agent.
max_tokens (int): The maximum number of tokens to generate.
temperature (float): The temperature for text generation.
context_window (int): The context window size for the agent.
Returns:
Agent: An instantiated agent object.
"""
# name = agent_name.replace(" ", "_")
logger.info(f"Creating agent: {agent_name}")
agent_name = Agent(
agent_name=agent_name,
llm=self.template_base_worker_llm, # Switch to model router here later
system_prompt=system_prompt,
agent_description=agent_description,
retry_attempts=1,
verbose=False,
dashboard=False,
)
self.agents.append(agent_name) try:
task = self.conversation.get_str()
logger.info(f"Running agent: {agent_name} on task: {task}")
output = agent_name.run(task)
self.conversation.add(role=agent_name, content=output)
return output
def parse_json_for_agents_then_create_agents( for i in range(self.max_loops):
self, function_call: dict function_call = self.run_director(task=task)
) -> List[Agent]:
"""
Parses a JSON function call to create a list of agents.
Args: self.parse_orders(function_call)
function_call (dict): The JSON function call specifying the agents.
Returns: if self.return_all_history:
List[Agent]: A list of created agent objects. return self.conversation.get_str()
"""
responses = []
logger.info("Parsing JSON for agents")
if self.create_agents_on:
for agent in function_call["multiple_agents"]:
out = self.create_agent(
agent_name=agent["agent_name"],
system_prompt=agent["system_prompt"],
agent_description=agent["agent_description"],
task=agent["task"],
)
responses.append(out)
else: else:
for agent in function_call["orders"]: return self.conversation.get_str()
out = self.run_worker_agent( except Exception as e:
name=agent["agent_name"], logger.error(f"Error running hierarchical swarm: {e}")
task=agent["task"], return "Error running hierarchical swarm"
)
responses.append(out)
return concat_strings(responses)
def run(self, task: str) -> str: def add_name_and_description(self):
""" """
Runs the function caller to create and execute agents based on the provided task. Adds the swarm's name and description to the conversation.
Args:
task (str): The task for which the agents need to be created and executed.
Returns:
List[Agent]: A list of created agent objects.
""" """
logger.info("Running the swarm")
# Run the function caller to output JSON function call
function_call = self.model.run(task)
# Add the function call to the conversation
self.conversation.add( self.conversation.add(
role="Director", content=str(function_call) role="User",
content=f"\n Swarm Name: {self.name} \n Swarm Description: {self.description}",
) )
# Logging the function call with metrics and details def list_all_agents(self) -> str:
self.log_director_function_call(function_call)
# # Parse the JSON function call and create agents -> run Agents
return self.parse_json_for_agents_then_create_agents(
function_call
)
def run_new(self, task: str):
""" """
Runs the function caller to create and execute agents based on the provided task. Lists all agents available in the swarm.
Args:
task (str): The task for which the agents need to be created and executed.
Returns: :return: A string representation of all agents in the swarm.
List[Agent]: A list of created agent objects.
""" """
logger.info("Running the swarm") all_agents = "\n".join(
f"Agent: {agent.agent_name}, Description: {agent.system_prompt} "
# Run the function caller to output JSON function call for agent in self.agents
function_call = self.model.run(task)
self.conversation.add(
role="Director", content=str(function_call)
) )
# Logging the function call with metrics and details
self.log_director_function_call(function_call)
if self.create_agents_on:
# Create agents from the function call
self.create_agents_from_func_call(function_call)
# Now submit orders to the agents
self.director.base_model = CallTeam
orders_prompt = f"Now, the agents have been created. Submit orders to the agents to enable them to complete the task: {task}: {self.list_agents_available()}"
orders = self.director.run(orders_prompt)
self.conversation.add( self.conversation.add(
role="Director", content=str(orders_prompt + orders) role="User",
content=f"All Agents Available in the Swarm {self.name}: \n {all_agents}",
) )
# Check the type of the response return all_agents
orders = self.check_agent_output_type(orders)
# Distribute the orders to the agents def find_agent(self, name: str) -> Optional[Agent]:
return self.distribute_orders_to_agents(orders) """
Finds an agent by its name within the swarm.
def check_agent_output_type(self, response: Any): :param name: The name of the agent to find.
if isinstance(response, dict): :return: The agent if found, otherwise None.
return response """
if isinstance(response, str): for agent in self.agents:
return eval(response) if agent.agent_name == name:
else: return agent
return response return None
def distribute_orders_to_agents(self, order_dict: dict) -> str: def run_agent(self, agent_name: str, task: str, img: str = None):
# Now we need to parse the CallTeam object """
# and distribute the orders to the agents Runs a task through a specific agent.
responses = []
for order in order_dict["orders"]: :param agent_name: The name of the agent to execute the task.
agent_name = order["agent_name"] :param task: The task to be executed by the agent.
task = order["task"] :param img: Optional image to be used with the task.
:return: The output of the agent's task execution.
"""
try:
agent = self.find_agent(agent_name)
# Find and run the agent if agent:
response = self.run_worker_agent( out = agent.run(
name=agent_name, task=task task=f"History: {self.conversation.get_str()} Your Task: {task}",
img=img,
) )
log = f"Agent: {agent_name} completed task: {task} with response: {response}"
self.conversation.add( self.conversation.add(
role=agent_name, content=task + response role=agent_name,
content=out,
) )
responses.append(log)
logger.info(log)
return concat_strings(responses)
def create_single_agent( return out
self, name: str, system_prompt: str, description else:
) -> Agent: logger.error(
""" f"Agent {agent_name} not found in the swarm {self.name}"
Create a single agent from the agent specification.
Args:
agent_spec (dict): The agent specification.
Returns:
Agent: The created agent.
"""
# Unwrap all of the agent specifications
# agent_name = agent_spec["agent_name"]
# system_prompt = agent_spec["system_prompt"]
# agent_description = agent_spec["agent_description"]
# Create the agent
agent_name = Agent(
agent_name=name,
llm=self.template_base_worker_llm, # Switch to model router here later
system_prompt=system_prompt,
agent_description=description,
max_loops=1,
retry_attempts=1,
verbose=False,
dashboard=False,
) )
except Exception as e:
logger.error(f"Error running agent {agent_name}: {e}")
return "Error running agent"
# Add agents into the registry def parse_orders(self, orders: SwarmSpec) -> None:
self.agents.append(agent_name)
return agent_name
def create_agents_from_func_call(self, function_call: dict):
""" """
Create agents from the function call. Parses the orders from the SwarmSpec and executes them through the agents.
Args: :param orders: The SwarmSpec containing the orders to be parsed.
function_call (dict): The function call containing the agent specifications. """
self.add_goal_and_more_in_conversation(orders)
Returns: orders_list = self.parse_swarm_spec(orders)
List[Agent]: A list of created agents.
""" try:
logger.info("Creating agents from the function call")
for agent_spec in function_call["multiple_agents"]:
agent = self.create_single_agent(
name=agent_spec["agent_name"],
system_prompt=agent_spec["system_prompt"],
description=agent_spec["agent_description"],
)
logger.info( # Example of passing the parsed data to an agent
f"Created agent: {agent.agent_name} with description: {agent.description}" for order in orders_list:
out = self.run_agent(
agent_name=order.agent_name,
task=order.task,
) )
self.agents.append(agent) return out
except Exception as e:
logger.error(f"Error parsing orders: {e}")
return "Error parsing orders"
def plan(self, task: str) -> str: def parse_swarm_spec(self, swarm_spec: SwarmSpec) -> None:
""" """
Plans the tasks for the agents in the swarm. Parses the SwarmSpec to extract the orders.
Args:
task (str): The task to be planned.
Returns:
str: The planned task for the agents.
:param swarm_spec: The SwarmSpec to be parsed.
:return: The list of orders extracted from the SwarmSpec.
""" """
logger.info("Director is planning the task") orders_list = swarm_spec.orders
self.director.system_prompt = self.director_planning_prompt print(orders_list)
def log_director_function_call(self, function_call: dict): # return the orders_list
# Log the agents the boss makes\ return orders_list
logger.info(f"Swarm Name: {function_call['swarm_name']}")
# Log the plan
logger.info(f"Plan: {function_call['plan']}")
logger.info(
f"Number of agents: {len(function_call['multiple_agents'])}"
)
for agent in function_call["multiple_agents"]:
logger.info(f"Agent: {agent['agent_name']}")
# logger.info(f"Task: {agent['task']}")
logger.info(f"Description: {agent['agent_description']}")
def run_worker_agent( def provide_feedback(self, agent_name: str, out: str) -> None:
self, name: str = None, task: str = None, *args, **kwargs
):
""" """
Run the worker agent. Provides feedback to an agent based on its output.
Args:
name (str): The name of the worker agent.
task (str): The task to send to the worker agent.
Returns:
str: The response from the worker agent.
Raises:
Exception: If an error occurs while running the worker agent.
:param agent_name: The name of the agent to provide feedback to.
:param out: The output of the agent to base the feedback on.
""" """
try: orders = self.director.run(
# Find the agent by name task=f"Provide feedback to {agent_name} on their output: {out}"
agent = self.find_agent_by_name(name)
# Run the agent
response = agent.run(task, *args, **kwargs)
return response
except Exception as e:
logger.error(f"Error: {e}")
raise e
def list_agents(self) -> str:
logger.info("Listing agents available in the swarm")
for agent in self.agents:
name = agent.agent_name
description = (
agent.description or "No description available."
) )
logger.info(f"Agent: {name}, Description: {description}")
def list_agents_available(self): orders_list = self.parse_swarm_spec(orders)
number_of_agents_available = len(self.agents)
agent_list = "\n".join( for order in orders_list:
[ out = self.run_agent(
f"Agent {agent.agent_name}: Description {agent.description}" agent_name=order.agent_name,
for agent in self.agents task=order.task,
]
) )
prompt = f""" return out
There are currently {number_of_agents_available} agents available in the swarm.
Agents Available: def add_goal_and_more_in_conversation(
{agent_list} self, swarm_spec: SwarmSpec
) -> None:
""" """
Adds the swarm's goals, plan, and rules to the conversation.
return prompt :param swarm_spec: The SwarmSpec containing the goals, plan, and rules.
def find_agent_by_name(
self, agent_name: str = None, *args, **kwargs
):
""" """
Finds an agent in the swarm by name. goals = swarm_spec.goals
plan = swarm_spec.plan
rules = swarm_spec.rules
Args: self.conversation.add(
agent_name (str): The name of the agent to find. role="Director",
content=f"Goals: {goals}\nPlan: {plan}\nRules: {rules}",
Returns: )
Agent: The agent with the specified name, or None if not found.
"""
for agent in self.agents:
if agent.name == agent_name:
return agent
return None

@ -1,18 +1,7 @@
import json import json
import os import os
import subprocess
from typing import List from typing import List
try:
import openai
except ImportError:
print(
"OpenAI is not installed. Please install it using 'pip install openai'."
)
import sys
subprocess.run([sys.executable, "-m", "pip", "install", "openai"])
exit(1)
from dotenv import load_dotenv from dotenv import load_dotenv
from loguru import logger from loguru import logger
@ -20,137 +9,11 @@ from pydantic import BaseModel, Field
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter from swarms.structs.swarm_router import SwarmRouter
from swarms.utils.function_caller_model import OpenAIFunctionCaller
load_dotenv() load_dotenv()
class OpenAIFunctionCaller:
"""
A class that represents a caller for OpenAI chat completions.
Args:
system_prompt (str): The system prompt to be used in the chat completion.
model_name (str): The name of the OpenAI model to be used.
max_tokens (int): The maximum number of tokens in the generated completion.
temperature (float): The temperature parameter for randomness in the completion.
base_model (BaseModel): The base model to be used for the completion.
openai_api_key (str): The API key for accessing the OpenAI service.
parallel_tool_calls (bool): Whether to make parallel tool calls.
top_p (float): The top-p parameter for nucleus sampling in the completion.
Attributes:
system_prompt (str): The system prompt to be used in the chat completion.
model_name (str): The name of the OpenAI model to be used.
max_tokens (int): The maximum number of tokens in the generated completion.
temperature (float): The temperature parameter for randomness in the completion.
base_model (BaseModel): The base model to be used for the completion.
parallel_tool_calls (bool): Whether to make parallel tool calls.
top_p (float): The top-p parameter for nucleus sampling in the completion.
client (openai.OpenAI): The OpenAI client for making API calls.
Methods:
check_api_key: Checks if the API key is provided and retrieves it from the environment if not.
run: Runs the chat completion with the given task and returns the generated completion.
"""
def __init__(
self,
system_prompt: str = None,
model_name: str = "gpt-4o-2024-08-06",
max_tokens: int = 4000,
temperature: float = 0.4,
base_model: BaseModel = None,
openai_api_key: str = None,
parallel_tool_calls: bool = False,
top_p: float = 0.9,
*args,
**kwargs,
):
super().__init__()
self.system_prompt = system_prompt
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.openai_api_key = openai_api_key
self.base_model = base_model
self.parallel_tool_calls = parallel_tool_calls
self.top_p = top_p
self.client = openai.OpenAI(api_key=self.check_api_key())
def check_api_key(self) -> str:
"""
Checks if the API key is provided and retrieves it from the environment if not.
Returns:
str: The API key.
"""
self.openai_api_key = os.getenv("OPENAI_API_KEY")
return self.openai_api_key
def run(self, task: str, *args, **kwargs) -> dict:
"""
Runs the chat completion with the given task and returns the generated completion.
Args:
task (str): The user's task for the chat completion.
*args: Additional positional arguments to be passed to the OpenAI API.
**kwargs: Additional keyword arguments to be passed to the OpenAI API.
Returns:
str: The generated completion.
"""
try:
completion = self.client.beta.chat.completions.parse(
model=self.model_name,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task},
],
max_tokens=self.max_tokens,
temperature=self.temperature,
response_format=self.base_model,
tools=(
[openai.pydantic_function_tool(self.base_model)]
),
*args,
**kwargs,
)
out = completion.choices[0].message.content
return out
except Exception as error:
logger.error(
f"Error in running OpenAI chat completion: {error}"
)
return None
def convert_to_dict_from_base_model(
self, base_model: BaseModel
) -> dict:
return openai.pydantic_function_tool(base_model)
def convert_list_of_base_models(
self, base_models: List[BaseModel]
):
"""
Converts a list of BaseModels to a list of dictionaries.
Args:
base_models (List[BaseModel]): A list of BaseModels to be converted.
Returns:
List[Dict]: A list of dictionaries representing the converted BaseModels.
"""
return [
self.convert_to_dict_from_base_model(base_model)
for base_model in base_models
]
class MemeAgentConfig(BaseModel): class MemeAgentConfig(BaseModel):
"""Configuration for an individual meme agent in a swarm""" """Configuration for an individual meme agent in a swarm"""

@ -0,0 +1,376 @@
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from pydantic import BaseModel, Field
from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.utils.any_to_str import any_to_str
from swarms.utils.formatter import formatter
from swarms.utils.litellm_wrapper import LiteLLM
model_recommendations = {
"gpt-4o": {
"description": "Fast and efficient for simple tasks and general queries",
"best_for": [
"Simple queries",
"Basic text generation",
"Quick responses",
"Everyday tasks",
],
"provider": "openai",
},
"gpt-4-turbo": {
"description": "Latest GPT-4 model with improved capabilities and knowledge cutoff",
"best_for": [
"Complex tasks",
"Up-to-date knowledge",
"Long context understanding",
],
"provider": "openai",
},
"gpt-3.5-turbo": {
"description": "Fast and cost-effective model for straightforward tasks",
"best_for": [
"Chat applications",
"Content generation",
"Basic assistance",
],
"provider": "openai",
},
"o3-mini": {
"description": "Lightweight model good for basic tasks with lower compute requirements",
"best_for": [
"Basic text completion",
"Simple classification",
"Resource-constrained environments",
],
"provider": "groq",
},
"deepseek-reasoner": {
"description": "Specialized for complex reasoning and analytical tasks",
"best_for": [
"Complex problem solving",
"Logical reasoning",
"Mathematical analysis",
"High IQ tasks",
],
"provider": "deepseek",
},
"claude-3-5-sonnet": {
"description": "Well-rounded model with strong reasoning and creativity",
"best_for": [
"Creative writing",
"Detailed analysis",
"Nuanced responses",
],
"provider": "anthropic",
},
"claude-3-opus": {
"description": "Most capable Claude model with enhanced reasoning and analysis",
"best_for": [
"Research",
"Complex analysis",
"Technical writing",
"Code generation",
],
"provider": "anthropic",
},
"gemini-pro": {
"description": "Google's advanced model with strong general capabilities",
"best_for": [
"Multimodal tasks",
"Code generation",
"Creative content",
],
"provider": "google",
},
"mistral-large": {
"description": "Open source model with strong performance across tasks",
"best_for": [
"General purpose tasks",
"Code assistance",
"Content generation",
],
"provider": "mistral",
},
}
providers = {
"openai": "Primary provider for GPT models",
"groq": "High-performance inference provider",
"anthropic": "Provider of Claude models",
"google": "Provider of PaLM and Gemini models",
"azure": "Cloud platform for various model deployments",
"deepseek": "Provider of specialized reasoning models",
"mistral": "Provider of open source and commercial language models",
}
class ModelOutput(BaseModel):
rationale: Optional[str]
model: Optional[str]
provider: Optional[str]
task: Optional[str] = Field(
description="The task to be executed for the model. It should be a clear, concise, and detailed task that the model can execute. It should only include details of the task, not the reasoning or the rationale, model, provider, or anything else. Do not include any other information in the task."
)
max_tokens: Optional[int] = Field(
description="The maximum number of tokens to use for the model"
)
temperature: Optional[float] = Field(
description="The temperature to use for the model"
)
system_prompt: Optional[str] = Field(
description="The system prompt to use for the model. Leverge the best techniques to make the model perform the best. Make sure the prompt is clear, extensive, and detailed."
)
providers = any_to_str(providers)
model_recommendations = any_to_str(model_recommendations)
data = f"Providers: {providers}\nModel Recommendations: {model_recommendations}"
model_router_system_prompt = f"""
You are an expect model router responsible for recommending the optimal AI model for specific tasks.
Available Models and Their Strengths:
- GPT-4: Best for complex reasoning, coding, and analysis requiring strong logical capabilities
- GPT-3.5-Turbo: Efficient for straightforward tasks, chat, and basic content generation
- Claude-3-Opus: Excels at research, technical writing, and in-depth analysis with strong reasoning
- Claude-3-Sonnet: Well-balanced for creative writing and nuanced responses
- Gemini Pro: Strong at multimodal tasks and code generation
- Mistral Large: Versatile open source model good for general tasks
Provider Considerations:
- OpenAI: Industry standard with consistent performance
- Anthropic: Known for safety and detailed analysis
- Google: Strong technical capabilities and multimodal support
- Groq: Optimized for high-speed inference
- Mistral: Balance of open source and commercial offerings
Data:
{data}
When Making Recommendations Consider:
1. Task requirements (complexity, creativity, technical needs)
2. Performance characteristics needed (speed, accuracy, reliability)
3. Special capabilities required (code generation, analysis, etc)
4. Cost and efficiency tradeoffs
5. Provider-specific strengths and limitations
Provide clear rationale for your model selection based on the specific task requirements.
"""
class ModelRouter:
"""
A router class that intelligently selects and executes AI models based on task requirements.
The ModelRouter uses a function calling model to analyze tasks and recommend the optimal
model and provider combination, then executes the task using the selected model.
Attributes:
system_prompt (str): Prompt that guides model selection behavior
max_tokens (int): Maximum tokens for model outputs
temperature (float): Temperature parameter for model randomness
max_workers (int): Maximum concurrent workers for batch processing
model_output (ModelOutput): Pydantic model for structured outputs
model_caller (OpenAIFunctionCaller): Function calling interface
"""
def __init__(
self,
system_prompt: str = model_router_system_prompt,
max_tokens: int = 4000,
temperature: float = 0.5,
max_workers: int = 10,
api_key: str = None,
max_loops: int = 1,
*args,
**kwargs,
):
"""
Initialize the ModelRouter.
Args:
system_prompt (str): Prompt for model selection guidance
max_tokens (int): Maximum output tokens
temperature (float): Model temperature parameter
max_workers (int): Max concurrent workers
*args: Additional positional arguments
**kwargs: Additional keyword arguments
"""
try:
self.system_prompt = system_prompt
self.max_tokens = max_tokens
self.temperature = temperature
self.max_workers = max_workers
self.model_output = ModelOutput
self.max_loops = max_loops
if self.max_workers == "auto":
self.max_workers = os.cpu_count()
self.model_caller = OpenAIFunctionCaller(
base_model=ModelOutput,
temperature=self.temperature,
system_prompt=self.system_prompt,
max_tokens=self.max_tokens,
api_key=api_key,
)
except Exception as e:
raise RuntimeError(
f"Failed to initialize ModelRouter: {str(e)}"
)
def step(self, task: str):
"""
Run a single task through the model router.
Args:
task (str): The task to be executed
Returns:
str: The model's output for the task
Raises:
RuntimeError: If model selection or execution fails
"""
model_router_output = self.model_caller.run(task)
selected_model = model_router_output.model
selected_provider = model_router_output.provider
routed_task = model_router_output.task
rationale = model_router_output.rationale
max_tokens = model_router_output.max_tokens
temperature = model_router_output.temperature
system_prompt = model_router_output.system_prompt
formatter.print_panel(
f"Model: {selected_model}\nProvider: {selected_provider}\nTask: {routed_task}\nRationale: {rationale}\nMax Tokens: {max_tokens}\nTemperature: {temperature}\nSystem Prompt: {system_prompt}",
title="Model Router Output",
)
litellm_wrapper = LiteLLM(
model_name=f"{selected_provider}/{selected_model}",
max_tokens=max_tokens,
temperature=temperature,
system_prompt=system_prompt,
)
final_output = litellm_wrapper.run(task=routed_task)
formatter.print_panel(
f"Output: {final_output} from {selected_provider}/{selected_model}",
title="Model Output",
)
return final_output
def run(self, task: str):
"""
Run a task through the model router with memory.
"""
task_output = task
previous_output = None
for _ in range(self.max_loops):
if task_output == previous_output:
break # Exit if no change in output
previous_output = task_output
task_output = self.step(task_output)
return task_output
def batch_run(self, tasks: list):
"""
Run multiple tasks in sequence.
Args:
tasks (list): List of tasks to execute
Returns:
list: List of outputs for each task
Raises:
RuntimeError: If batch execution fails
"""
try:
outputs = []
for task in tasks:
output = self.run(task)
outputs.append(output)
return outputs
except Exception as e:
raise RuntimeError(f"Batch execution failed: {str(e)}")
def __call__(self, task: str, *args, **kwargs):
"""
Make the class callable to directly execute tasks.
Args:
task (str): Task to execute
Returns:
str: Model output
"""
return self.run(task, *args, **kwargs)
def __batch_call__(self, tasks: list):
"""
Make the class callable for batch execution.
Args:
tasks (list): List of tasks
Returns:
list: List of outputs
"""
return self.batch_run(tasks)
def __str__(self):
return f"ModelRouter(max_tokens={self.max_tokens}, temperature={self.temperature})"
def __repr__(self):
return f"ModelRouter(max_tokens={self.max_tokens}, temperature={self.temperature})"
def concurrent_run(self, tasks: list):
"""
Run multiple tasks concurrently using a thread pool.
Args:
tasks (list): List of tasks to execute concurrently
Returns:
list: List of outputs from all tasks
Raises:
RuntimeError: If concurrent execution fails
"""
try:
with ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
outputs = list(executor.map(self.run, tasks))
return outputs
except Exception as e:
raise RuntimeError(
f"Concurrent execution failed: {str(e)}"
)
async def async_run(self, task: str, *args, **kwargs):
"""
Run a task asynchronously.
Args:
task (str): Task to execute asynchronously
Returns:
asyncio.Task: Async task object
Raises:
RuntimeError: If async execution fails
"""
try:
return asyncio.create_task(
self.run(task, *args, **kwargs)
)
except Exception as e:
raise RuntimeError(f"Async execution failed: {str(e)}")

@ -9,15 +9,13 @@ Todo:
""" """
import os import os
import subprocess
import uuid import uuid
from datetime import datetime from datetime import datetime
from typing import List, Literal, Optional from typing import List, Literal, Optional
from loguru import logger from loguru import logger
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt, wait_exponential from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
@ -35,88 +33,6 @@ class AgentResponse(BaseModel):
) )
class OpenAIFunctionCaller:
"""
A class to interact with the OpenAI API for generating text based on a system prompt and a task.
"""
def __init__(
self,
system_prompt: str,
api_key: str,
temperature: float,
max_tokens: int = 4000,
model_name: str = "gpt-4-0125-preview",
):
self.system_prompt = system_prompt
self.api_key = api_key
self.temperature = temperature
self.max_tokens = max_tokens
self.model_name = model_name
try:
from openai import OpenAI
except ImportError:
logger.error(
"OpenAI library not found. Please install it using 'pip install openai'"
)
subprocess.run(["pip", "install", "openai"])
raise
try:
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
except Exception as e:
logger.error(
f"Error initializing OpenAI client: {str(e)}"
)
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def get_completion(self, task: str) -> AgentResponse:
"""Get completion from OpenAI with retries"""
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task},
],
response_format={"type": "json_object"},
temperature=self.temperature,
max_tokens=self.max_tokens,
)
return AgentResponse.model_validate_json(
response.choices[0].message.content
)
except Exception as e:
logger.error(f"Error getting completion: {str(e)}")
raise
def get_agent_response(
self, system_prompt: str, task: str
) -> str:
"""Get agent response without function calling"""
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": task},
],
temperature=self.temperature,
max_tokens=self.max_tokens,
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Error getting agent response: {str(e)}")
raise
class MultiAgentRouter: class MultiAgentRouter:
""" """
Routes tasks to appropriate agents based on their capabilities. Routes tasks to appropriate agents based on their capabilities.

@ -1,5 +1,4 @@
import os import os
import subprocess
from typing import List, Optional from typing import List, Optional
from loguru import logger from loguru import logger
@ -13,111 +12,11 @@ from tenacity import (
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter, SwarmType from swarms.structs.swarm_router import SwarmRouter, SwarmType
from swarms.utils.function_caller_model import OpenAIFunctionCaller
logger.add("swarm_builder.log", rotation="10 MB", backtrace=True) logger.add("swarm_builder.log", rotation="10 MB", backtrace=True)
class OpenAIFunctionCaller:
"""
A class to interact with the OpenAI API for generating text based on a system prompt and a task.
Attributes:
- system_prompt (str): The system prompt to guide the AI's response.
- api_key (str): The API key for the OpenAI service.
- temperature (float): The temperature parameter for the AI model, controlling randomness.
- base_model (BaseModel): The Pydantic model to parse the response into.
- max_tokens (int): The maximum number of tokens in the response.
- client (OpenAI): The OpenAI client instance.
"""
def __init__(
self,
system_prompt: str,
api_key: str,
temperature: float,
base_model: BaseModel,
max_tokens: int = 5000,
):
self.system_prompt = system_prompt
self.api_key = api_key
self.temperature = temperature
self.base_model = base_model
self.max_tokens = max_tokens
try:
from openai import OpenAI
except ImportError:
logger.error(
"OpenAI library not found. Please install the OpenAI library by running 'pip install openai'"
)
subprocess.run(["pip", "install", "openai"])
from openai import OpenAI
self.client = OpenAI(api_key=api_key)
def run(self, task: str, *args, **kwargs) -> BaseModel:
"""
Run the OpenAI model with the system prompt and task to generate a response.
Args:
- task (str): The task to be completed.
- *args: Additional positional arguments for the OpenAI API.
- **kwargs: Additional keyword arguments for the OpenAI API.
Returns:
- BaseModel: The parsed response based on the base_model.
"""
completion = self.client.beta.chat.completions.parse(
model="gpt-4o-2024-08-06",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task},
],
response_format=self.base_model,
temperature=self.temperature,
max_tokens=self.max_tokens,
*args,
**kwargs,
)
return completion.choices[0].message.parsed
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
async def run_async(
self, task: str, *args, **kwargs
) -> BaseModel:
"""
Asynchronous version of the run method.
Args:
- task (str): The task to be completed.
- *args: Additional positional arguments for the OpenAI API.
- **kwargs: Additional keyword arguments for the OpenAI API.
Returns:
- BaseModel: The parsed response based on the base_model.
"""
completion = (
await self.client.beta.chat.completions.parse_async(
model="gpt-4o-2024-08-06",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task},
],
response_format=self.base_model,
temperature=self.temperature,
max_tokens=self.max_tokens,
*args,
**kwargs,
)
)
return completion.choices[0].message.parsed
BOSS_SYSTEM_PROMPT = """ BOSS_SYSTEM_PROMPT = """
Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective. Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective.

@ -25,6 +25,9 @@ def bootup():
logger.disable("") logger.disable("")
logging.disable(logging.CRITICAL) logging.disable(logging.CRITICAL)
else:
logger.enable("")
# Silent wandb # Silent wandb
os.environ["WANDB_SILENT"] = "true" os.environ["WANDB_SILENT"] = "true"

@ -0,0 +1,143 @@
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor
from typing import List
from loguru import logger
from pydantic import BaseModel
try:
from openai import OpenAI
except ImportError:
logger.error(
"OpenAI library not found. Please install the OpenAI library by running 'pip install openai'"
)
import sys
subprocess.run([sys.executable, "-m", "pip", "install", "openai"])
from openai import OpenAI
SUPPORTED_MODELS = [
"o3-mini-2025-1-31",
"o1-2024-12-17",
"gpt-4o-mini-2024-07-18",
"gpt-4o-2024-08-06",
]
def check_api_key():
api_key = os.getenv("OPENAI_API_KEY")
if api_key is None:
raise ValueError(
"API key is not set. Please set the API key using the api_key parameter."
)
return api_key
class OpenAIFunctionCaller:
"""
A class to interact with the OpenAI API for generating text based on a system prompt and a task.
Attributes:
- system_prompt (str): The system prompt to guide the AI's response.
- api_key (str): The API key for the OpenAI service.
- temperature (float): The temperature parameter for the AI model, controlling randomness.
- base_model (BaseModel): The Pydantic model to parse the response into.
- max_tokens (int): The maximum number of tokens in the response.
- client (OpenAI): The OpenAI client instance.
"""
def __init__(
self,
system_prompt: str,
base_model: BaseModel,
api_key: str = check_api_key(),
temperature: float = 0.1,
max_tokens: int = 5000,
model_name: str = "o1-2024-12-17",
):
self.system_prompt = system_prompt
self.api_key = api_key
self.temperature = temperature
self.base_model = base_model
self.max_tokens = max_tokens
self.model_name = model_name
self.client = OpenAI(api_key=self.api_key)
def run(self, task: str):
"""
Run the OpenAI model with the system prompt and task to generate a response.
Args:
- task (str): The task to be completed.
- *args: Additional positional arguments for the OpenAI API.
- **kwargs: Additional keyword arguments for the OpenAI API.
Returns:
- BaseModel: The parsed response based on the base_model.
"""
try:
completion = self.client.beta.chat.completions.parse(
model=self.model_name,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task},
],
response_format=self.base_model,
max_tokens=self.max_tokens,
temperature=self.temperature,
)
return completion.choices[0].message.parsed
except Exception as e:
print(f"There was an error: {e}")
def check_api_key(self):
self.api_key = os.getenv("OPENAI_API_KEY")
if self.api_key is None:
raise ValueError(
"API key is not set. Please set the API key using the api_key parameter."
)
def check_model_support(self):
# need to print the supported models
for model in SUPPORTED_MODELS:
print(model)
return SUPPORTED_MODELS
def batch_run(self, tasks: List[str]) -> List[BaseModel]:
"""
Batch run the OpenAI model with the system prompt and task to generate a response.
"""
return [self.run(task) for task in tasks]
def concurrent_run(self, tasks: List[str]) -> List[BaseModel]:
"""
Concurrent run the OpenAI model with the system prompt and task to generate a response.
"""
with ThreadPoolExecutor(max_workers=len(tasks)) as executor:
return list(executor.map(self.run, tasks))
# class TestModel(BaseModel):
# name: str
# age: int
# # Example usage
# model = OpenAIFunctionCaller(
# system_prompt="You are a helpful assistant that returns structured data about people.",
# base_model=TestModel,
# api_key=os.getenv("OPENAI_API_KEY"),
# temperature=0.7,
# max_tokens=1000
# )
# # Test with a more appropriate prompt for the TestModel schema
# response = model.run("Tell me about a person named John who is 25 years old")
# print(response)

@ -7,7 +7,10 @@ def count_tokens(text: str, model: str = "gpt-4o") -> int:
from litellm import encode from litellm import encode
except ImportError: except ImportError:
import sys import sys
subprocess.run([sys.executable, "-m", "pip", "install", "litellm"])
subprocess.run(
[sys.executable, "-m", "pip", "install", "litellm"]
)
from litellm import encode from litellm import encode
return len(encode(model=model, text=text)) return len(encode(model=model, text=text))

@ -2,9 +2,16 @@ try:
from litellm import completion from litellm import completion
except ImportError: except ImportError:
import subprocess import subprocess
import sys
subprocess.check_call(["pip", "install", "litellm"])
import litellm import litellm
print("Installing litellm")
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "-U", "litellm"]
)
print("litellm installed")
from litellm import completion from litellm import completion
litellm.set_verbose = True litellm.set_verbose = True
@ -25,6 +32,7 @@ class LiteLLM:
temperature: float = 0.5, temperature: float = 0.5,
max_tokens: int = 4000, max_tokens: int = 4000,
ssl_verify: bool = False, ssl_verify: bool = False,
max_completion_tokens: int = 4000,
*args, *args,
**kwargs, **kwargs,
): ):
@ -44,6 +52,9 @@ class LiteLLM:
self.temperature = temperature self.temperature = temperature
self.max_tokens = max_tokens self.max_tokens = max_tokens
self.ssl_verify = ssl_verify self.ssl_verify = ssl_verify
self.max_completion_tokens = max_completion_tokens
self.max_completion_tokens = max_tokens
def _prepare_messages(self, task: str) -> list: def _prepare_messages(self, task: str) -> list:
""" """

@ -6,7 +6,9 @@ except ImportError:
import subprocess import subprocess
import sys import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "pypdf"]) subprocess.check_call(
[sys.executable, "-m", "pip", "install", "pypdf"]
)
import pypdf import pypdf

Loading…
Cancel
Save