swarms rust documentation code

pull/859/head
Kye Gomez 2 months ago
parent 120d6a829f
commit d61c218799

@ -366,6 +366,7 @@ nav:
- Clients: - Clients:
- Swarms API Python Client: "swarms_cloud/python_client.md" - Swarms API Python Client: "swarms_cloud/python_client.md"
- Swarms API Rust Client: "swarms_cloud/rust_client.md"
- Pricing: - Pricing:
- Swarms API Pricing: "swarms_cloud/api_pricing.md" - Swarms API Pricing: "swarms_cloud/api_pricing.md"

@ -0,0 +1,733 @@
# Swarms Client - Production Grade Rust SDK
A high-performance, production-ready Rust client for the Swarms API with comprehensive features for building multi-agent AI systems.
## Features
- **🚀 High Performance**: Built with `reqwest` and `tokio` for maximum throughput
- **🔄 Connection Pooling**: Automatic HTTP connection reuse and pooling
- **⚡ Circuit Breaker**: Automatic failure detection and recovery
- **💾 Intelligent Caching**: TTL-based in-memory caching with concurrent access
- **📊 Rate Limiting**: Configurable concurrent request limits
- **🔄 Retry Logic**: Exponential backoff with jitter
- **📝 Comprehensive Logging**: Structured logging with `tracing`
- **✅ Type Safety**: Full compile-time type checking with `serde`
## Installation
Install `swarms-rs` globally using cargo:
```bash
cargo install swarms-rs
```
## Quick Start
```rust
use swarms_client::SwarmsClient;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize the client with API key from environment
let client = SwarmsClient::builder()
.unwrap()
.from_env()? // Loads API key from SWARMS_API_KEY environment variable
.timeout(std::time::Duration::from_secs(60))
.max_retries(3)
.build()?;
// Make a simple swarm completion request
let response = client.swarm()
.completion()
.name("My First Swarm")
.swarm_type(SwarmType::Auto)
.task("Analyze the pros and cons of quantum computing")
.agent(|agent| {
agent
.name("Researcher")
.description("Conducts in-depth research")
.model("gpt-4o")
})
.send()
.await?;
println!("Swarm output: {}", response.output);
Ok(())
}
```
## API Reference
### SwarmsClient
The main client for interacting with the Swarms API.
#### Constructor Methods
##### `SwarmsClient::builder()`
Creates a new client builder for configuring the client.
**Returns**: `Result<ClientBuilder, SwarmsError>`
**Example**:
```rust
let client = SwarmsClient::builder()
.unwrap()
.api_key("your-api-key")
.timeout(Duration::from_secs(60))
.build()?;
```
##### `SwarmsClient::with_config(config: ClientConfig)`
Creates a client with custom configuration.
| Parameter | Type | Description |
|-----------|------|-------------|
| `config` | `ClientConfig` | Client configuration settings |
**Returns**: `Result<SwarmsClient, SwarmsError>`
**Example**:
```rust
let config = ClientConfig {
api_key: "your-api-key".to_string(),
base_url: "https://api.swarms.com/".parse().unwrap(),
timeout: Duration::from_secs(120),
max_retries: 5,
..Default::default()
};
let client = SwarmsClient::with_config(config)?;
```
#### Resource Access Methods
| Method | Returns | Description |
|--------|---------|-------------|
| `agent()` | `AgentResource` | Access agent-related operations |
| `swarm()` | `SwarmResource` | Access swarm-related operations |
| `models()` | `ModelsResource` | Access model listing operations |
| `logs()` | `LogsResource` | Access logging operations |
#### Cache Management Methods
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `clear_cache()` | None | `()` | Clears all cached responses |
| `cache_stats()` | None | `Option<(usize, usize)>` | Returns (valid_entries, total_entries) |
### ClientBuilder
Builder for configuring the Swarms client.
#### Configuration Methods
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `new()` | None | `ClientBuilder` | Creates a new builder with defaults |
| `from_env()` | None | `Result<ClientBuilder, SwarmsError>` | Loads API key from environment |
| `api_key(key)` | `String` | `ClientBuilder` | Sets the API key |
| `base_url(url)` | `&str` | `Result<ClientBuilder, SwarmsError>` | Sets the base URL |
| `timeout(duration)` | `Duration` | `ClientBuilder` | Sets request timeout |
| `max_retries(count)` | `usize` | `ClientBuilder` | Sets maximum retry attempts |
| `retry_delay(duration)` | `Duration` | `ClientBuilder` | Sets retry delay duration |
| `max_concurrent_requests(count)` | `usize` | `ClientBuilder` | Sets concurrent request limit |
| `enable_cache(enabled)` | `bool` | `ClientBuilder` | Enables/disables caching |
| `cache_ttl(duration)` | `Duration` | `ClientBuilder` | Sets cache TTL |
| `build()` | None | `Result<SwarmsClient, SwarmsError>` | Builds the client |
**Example**:
```rust
let client = SwarmsClient::builder()
.unwrap()
.from_env()?
.timeout(Duration::from_secs(120))
.max_retries(5)
.max_concurrent_requests(50)
.enable_cache(true)
.cache_ttl(Duration::from_secs(600))
.build()?;
```
### SwarmResource
Resource for swarm-related operations.
#### Methods
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `completion()` | None | `SwarmCompletionBuilder` | Creates a new swarm completion builder |
| `create(request)` | `SwarmSpec` | `Result<SwarmCompletionResponse, SwarmsError>` | Creates a swarm completion directly |
| `create_batch(requests)` | `Vec<SwarmSpec>` | `Result<Vec<SwarmCompletionResponse>, SwarmsError>` | Creates multiple swarm completions |
| `list_types()` | None | `Result<SwarmTypesResponse, SwarmsError>` | Lists available swarm types |
### SwarmCompletionBuilder
Builder for creating swarm completion requests.
#### Configuration Methods
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `name(name)` | `String` | `SwarmCompletionBuilder` | Sets the swarm name |
| `description(desc)` | `String` | `SwarmCompletionBuilder` | Sets the swarm description |
| `swarm_type(type)` | `SwarmType` | `SwarmCompletionBuilder` | Sets the swarm type |
| `task(task)` | `String` | `SwarmCompletionBuilder` | Sets the main task |
| `agent(builder_fn)` | `Fn(AgentSpecBuilder) -> AgentSpecBuilder` | `SwarmCompletionBuilder` | Adds an agent using a builder function |
| `max_loops(count)` | `u32` | `SwarmCompletionBuilder` | Sets maximum execution loops |
| `service_tier(tier)` | `String` | `SwarmCompletionBuilder` | Sets the service tier |
| `send()` | None | `Result<SwarmCompletionResponse, SwarmsError>` | Sends the request |
### AgentResource
Resource for agent-related operations.
#### Methods
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `completion()` | None | `AgentCompletionBuilder` | Creates a new agent completion builder |
| `create(request)` | `AgentCompletion` | `Result<AgentCompletionResponse, SwarmsError>` | Creates an agent completion directly |
| `create_batch(requests)` | `Vec<AgentCompletion>` | `Result<Vec<AgentCompletionResponse>, SwarmsError>` | Creates multiple agent completions |
### AgentCompletionBuilder
Builder for creating agent completion requests.
#### Configuration Methods
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `agent_name(name)` | `String` | `AgentCompletionBuilder` | Sets the agent name |
| `task(task)` | `String` | `AgentCompletionBuilder` | Sets the task |
| `model(model)` | `String` | `AgentCompletionBuilder` | Sets the AI model |
| `description(desc)` | `String` | `AgentCompletionBuilder` | Sets the agent description |
| `system_prompt(prompt)` | `String` | `AgentCompletionBuilder` | Sets the system prompt |
| `temperature(temp)` | `f32` | `AgentCompletionBuilder` | Sets the temperature (0.0-1.0) |
| `max_tokens(tokens)` | `u32` | `AgentCompletionBuilder` | Sets maximum tokens |
| `max_loops(loops)` | `u32` | `AgentCompletionBuilder` | Sets maximum loops |
| `send()` | None | `Result<AgentCompletionResponse, SwarmsError>` | Sends the request |
### SwarmType Enum
Available swarm types for different execution patterns.
| Variant | Description |
|---------|-------------|
| `AgentRearrange` | Agents can be rearranged based on task requirements |
| `MixtureOfAgents` | Combines multiple agents with different specializations |
| `SpreadSheetSwarm` | Organized like a spreadsheet with structured data flow |
| `SequentialWorkflow` | Agents execute in a sequential order |
| `ConcurrentWorkflow` | Agents execute concurrently |
| `GroupChat` | Agents interact in a group chat format |
| `MultiAgentRouter` | Routes tasks between multiple agents |
| `AutoSwarmBuilder` | Automatically builds swarm structure |
| `HiearchicalSwarm` | Hierarchical organization of agents |
| `Auto` | Automatically selects the best swarm type |
| `MajorityVoting` | Agents vote on decisions |
| `Malt` | Multi-Agent Language Tasks |
| `DeepResearchSwarm` | Specialized for deep research tasks |
## Detailed Examples
### 1. Simple Agent Completion
```rust
use swarms_client::{SwarmsClient};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = SwarmsClient::builder()
.unwrap()
.from_env()?
.build()?;
let response = client.agent()
.completion()
.agent_name("Content Writer")
.task("Write a blog post about sustainable technology")
.model("gpt-4o")
.temperature(0.7)
.max_tokens(2000)
.description("An expert content writer specializing in technology topics")
.system_prompt("You are a professional content writer with expertise in technology and sustainability. Write engaging, informative content that is well-structured and SEO-friendly.")
.send()
.await?;
println!("Agent Response: {}", response.outputs);
println!("Tokens Used: {}", response.usage.total_tokens);
Ok(())
}
```
### 2. Multi-Agent Research Swarm
```rust
use swarms_client::{SwarmsClient, SwarmType};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = SwarmsClient::builder()
.unwrap()
.from_env()?
.timeout(Duration::from_secs(300)) // 5 minutes for complex tasks
.build()?;
let response = client.swarm()
.completion()
.name("AI Research Swarm")
.description("A comprehensive research team analyzing AI trends and developments")
.swarm_type(SwarmType::SequentialWorkflow)
.task("Conduct a comprehensive analysis of the current state of AI in healthcare, including recent developments, challenges, and future prospects")
// Data Collection Agent
.agent(|agent| {
agent
.name("Data Collector")
.description("Gathers comprehensive data and recent developments")
.model("gpt-4o")
.system_prompt("You are a research data collector specializing in AI and healthcare. Your job is to gather the most recent and relevant information about AI applications in healthcare, including clinical trials, FDA approvals, and industry developments.")
.temperature(0.3)
.max_tokens(3000)
})
// Technical Analyst
.agent(|agent| {
agent
.name("Technical Analyst")
.description("Analyzes technical aspects and implementation details")
.model("gpt-4o")
.system_prompt("You are a technical analyst with deep expertise in AI/ML technologies. Analyze the technical feasibility, implementation challenges, and technological requirements of AI solutions in healthcare.")
.temperature(0.4)
.max_tokens(3000)
})
// Market Analyst
.agent(|agent| {
agent
.name("Market Analyst")
.description("Analyzes market trends, adoption rates, and economic factors")
.model("gpt-4o")
.system_prompt("You are a market research analyst specializing in healthcare technology markets. Analyze market size, growth projections, key players, investment trends, and economic factors affecting AI adoption in healthcare.")
.temperature(0.5)
.max_tokens(3000)
})
// Regulatory Expert
.agent(|agent| {
agent
.name("Regulatory Expert")
.description("Analyzes regulatory landscape and compliance requirements")
.model("gpt-4o")
.system_prompt("You are a regulatory affairs expert with deep knowledge of healthcare regulations and AI governance. Analyze regulatory challenges, compliance requirements, ethical considerations, and policy developments affecting AI in healthcare.")
.temperature(0.3)
.max_tokens(3000)
})
// Report Synthesizer
.agent(|agent| {
agent
.name("Report Synthesizer")
.description("Synthesizes all analyses into a comprehensive report")
.model("gpt-4o")
.system_prompt("You are an expert report writer and strategic analyst. Synthesize all the previous analyses into a comprehensive, well-structured executive report with clear insights, recommendations, and future outlook.")
.temperature(0.6)
.max_tokens(4000)
})
.max_loops(1)
.service_tier("premium")
.send()
.await?;
println!("Research Report:");
println!("{}", response.output);
println!("\nSwarm executed with {} agents", response.number_of_agents);
Ok(())
}
```
### 3. Financial Analysis Swarm (From Example)
```rust
use swarms_client::{SwarmsClient, SwarmType};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = SwarmsClient::builder()
.unwrap()
.from_env()?
.timeout(Duration::from_secs(120))
.max_retries(3)
.build()?;
let response = client.swarm()
.completion()
.name("Financial Health Analysis Swarm")
.description("A sequential workflow of specialized financial agents analyzing company health")
.swarm_type(SwarmType::ConcurrentWorkflow)
.task("Analyze the financial health of Apple Inc. (AAPL) based on their latest quarterly report")
// Financial Data Collector Agent
.agent(|agent| {
agent
.name("Financial Data Collector")
.description("Specializes in gathering and organizing financial data from various sources")
.model("gpt-4o")
.system_prompt("You are a financial data collection specialist. Your role is to gather and organize relevant financial data, including revenue, expenses, profit margins, and key financial ratios. Present the data in a clear, structured format.")
.temperature(0.7)
.max_tokens(2000)
})
// Financial Ratio Analyzer Agent
.agent(|agent| {
agent
.name("Ratio Analyzer")
.description("Analyzes key financial ratios and metrics")
.model("gpt-4o")
.system_prompt("You are a financial ratio analysis expert. Your role is to calculate and interpret key financial ratios such as P/E ratio, debt-to-equity, current ratio, and return on equity. Provide insights on what these ratios indicate about the company's financial health.")
.temperature(0.7)
.max_tokens(2000)
})
// Additional agents...
.agent(|agent| {
agent
.name("Investment Advisor")
.description("Provides investment recommendations based on analysis")
.model("gpt-4o")
.system_prompt("You are an investment advisory specialist. Your role is to synthesize the analysis from previous agents and provide clear, actionable investment recommendations. Consider both short-term and long-term investment perspectives.")
.temperature(0.7)
.max_tokens(2000)
})
.max_loops(1)
.service_tier("standard")
.send()
.await?;
println!("Financial Analysis Results:");
println!("{}", response.output);
Ok(())
}
```
### 4. Batch Processing
```rust
use swarms_client::{SwarmsClient, AgentCompletion, AgentSpec};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = SwarmsClient::builder()
.unwrap()
.from_env()?
.max_concurrent_requests(20) // Allow more concurrent requests for batch
.build()?;
// Create multiple agent completion requests
let requests = vec![
AgentCompletion {
agent_config: AgentSpec {
agent_name: "Content Creator 1".to_string(),
model_name: "gpt-4o-mini".to_string(),
temperature: 0.7,
max_tokens: 1000,
..Default::default()
},
task: "Write a social media post about renewable energy".to_string(),
history: None,
},
AgentCompletion {
agent_config: AgentSpec {
agent_name: "Content Creator 2".to_string(),
model_name: "gpt-4o-mini".to_string(),
temperature: 0.8,
max_tokens: 1000,
..Default::default()
},
task: "Write a social media post about electric vehicles".to_string(),
history: None,
},
// Add more requests...
];
// Process all requests in batch
let responses = client.agent()
.create_batch(requests)
.await?;
for (i, response) in responses.iter().enumerate() {
println!("Response {}: {}", i + 1, response.outputs);
println!("Tokens used: {}\n", response.usage.total_tokens);
}
Ok(())
}
```
### 5. Custom Configuration with Error Handling
```rust
use swarms_client::{SwarmsClient, SwarmsError, ClientConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Custom configuration for production use
let config = ClientConfig {
api_key: std::env::var("SWARMS_API_KEY")?,
base_url: "https://swarms-api-285321057562.us-east1.run.app/".parse()?,
timeout: Duration::from_secs(180),
max_retries: 5,
retry_delay: Duration::from_secs(2),
max_concurrent_requests: 50,
circuit_breaker_threshold: 10,
circuit_breaker_timeout: Duration::from_secs(120),
enable_cache: true,
cache_ttl: Duration::from_secs(600),
};
let client = SwarmsClient::with_config(config)?;
// Example with comprehensive error handling
match client.swarm()
.completion()
.name("Production Swarm")
.swarm_type(SwarmType::Auto)
.task("Analyze market trends for Q4 2024")
.agent(|agent| {
agent
.name("Market Analyst")
.model("gpt-4o")
.temperature(0.5)
})
.send()
.await
{
Ok(response) => {
println!("Success! Job ID: {}", response.job_id);
println!("Output: {}", response.output);
},
Err(SwarmsError::Authentication { message, .. }) => {
eprintln!("Authentication error: {}", message);
},
Err(SwarmsError::RateLimit { message, .. }) => {
eprintln!("Rate limit exceeded: {}", message);
// Implement backoff strategy
},
Err(SwarmsError::InsufficientCredits { message, .. }) => {
eprintln!("Insufficient credits: {}", message);
},
Err(SwarmsError::CircuitBreakerOpen) => {
eprintln!("Circuit breaker is open - service temporarily unavailable");
},
Err(e) => {
eprintln!("Other error: {}", e);
}
}
Ok(())
}
```
### 6. Monitoring and Observability
```rust
use swarms_client::SwarmsClient;
use tracing::{info, warn, error};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing for observability
tracing_subscriber::init();
let client = SwarmsClient::builder()
.unwrap()
.from_env()?
.enable_cache(true)
.build()?;
// Monitor cache performance
if let Some((valid, total)) = client.cache_stats() {
info!("Cache stats: {}/{} entries valid", valid, total);
}
// Make request with monitoring
let start = std::time::Instant::now();
let response = client.swarm()
.completion()
.name("Monitored Swarm")
.task("Analyze system performance metrics")
.agent(|agent| {
agent
.name("Performance Analyst")
.model("gpt-4o-mini")
})
.send()
.await?;
let duration = start.elapsed();
info!("Request completed in {:?}", duration);
if duration > Duration::from_secs(30) {
warn!("Request took longer than expected: {:?}", duration);
}
// Clear cache periodically in production
client.clear_cache();
Ok(())
}
```
## Error Handling
The client provides comprehensive error handling with specific error types:
### SwarmsError Types
| Error Type | Description | Recommended Action |
|------------|-------------|-------------------|
| `Authentication` | Invalid API key or authentication failure | Check API key and permissions |
| `RateLimit` | Rate limit exceeded | Implement exponential backoff |
| `InvalidRequest` | Malformed request parameters | Validate input parameters |
| `InsufficientCredits` | Not enough credits for operation | Check account balance |
| `Api` | General API error | Check API status and retry |
| `Network` | Network connectivity issues | Check internet connection |
| `Timeout` | Request timeout | Increase timeout or retry |
| `CircuitBreakerOpen` | Circuit breaker preventing requests | Wait for recovery period |
| `Serialization` | JSON serialization/deserialization error | Check data format |
### Error Handling Best Practices
```rust
use swarms_client::{SwarmsClient, SwarmsError};
async fn handle_swarm_request(client: &SwarmsClient, task: &str) -> Result<String, SwarmsError> {
match client.swarm()
.completion()
.task(task)
.agent(|agent| agent.name("Worker").model("gpt-4o-mini"))
.send()
.await
{
Ok(response) => Ok(response.output.to_string()),
Err(SwarmsError::RateLimit { .. }) => {
// Implement exponential backoff
tokio::time::sleep(Duration::from_secs(5)).await;
Err(SwarmsError::RateLimit {
message: "Rate limited - should retry".to_string(),
status: Some(429),
request_id: None,
})
},
Err(e) => Err(e),
}
}
```
## Performance Features
### Connection Pooling
The client automatically manages HTTP connection pooling for optimal performance:
```rust
// Connections are automatically pooled and reused
let client = SwarmsClient::builder()
.unwrap()
.from_env()?
.max_concurrent_requests(100) // Allow up to 100 concurrent requests
.build()?;
```
### Caching
Intelligent caching reduces redundant API calls:
```rust
let client = SwarmsClient::builder()
.unwrap()
.from_env()?
.enable_cache(true)
.cache_ttl(Duration::from_secs(300)) // 5-minute TTL
.build()?;
// GET requests are automatically cached
let models = client.models().list().await?; // First call hits API
let models_cached = client.models().list().await?; // Second call uses cache
```
### Circuit Breaker
Automatic failure detection and recovery:
```rust
let client = SwarmsClient::builder()
.unwrap()
.from_env()?
.build()?;
// Circuit breaker automatically opens after 5 failures
// and recovers after 60 seconds
```
## Configuration Reference
### ClientConfig Structure
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `api_key` | `String` | `""` | Swarms API key |
| `base_url` | `Url` | `https://swarms-api-285321057562.us-east1.run.app/` | API base URL |
| `timeout` | `Duration` | `60s` | Request timeout |
| `max_retries` | `usize` | `3` | Maximum retry attempts |
| `retry_delay` | `Duration` | `1s` | Base retry delay |
| `max_concurrent_requests` | `usize` | `100` | Concurrent request limit |
| `circuit_breaker_threshold` | `usize` | `5` | Failure threshold for circuit breaker |
| `circuit_breaker_timeout` | `Duration` | `60s` | Circuit breaker recovery time |
| `enable_cache` | `bool` | `true` | Enable response caching |
| `cache_ttl` | `Duration` | `300s` | Cache time-to-live |
## Environment Variables
| Variable | Description | Example |
|----------|-------------|---------|
| `SWARMS_API_KEY` | Your Swarms API key | `sk-xxx...` |
| `SWARMS_BASE_URL` | Custom API base URL (optional) | `https://api.custom.com/` |
## Testing
Run the test suite:
```bash
cargo test
```
Run specific tests:
```bash
cargo test test_cache
cargo test test_circuit_breaker
```
## Contributing
1. Fork the repository
2. Create a feature branch
3. Add tests for new functionality
4. Ensure all tests pass
5. Submit a pull request
## License
This project is licensed under the MIT License - see the LICENSE file for details.

@ -1,4 +1,4 @@
from swarms.structs.agent import Agent from swarms import Agent
# Initialize the agent # Initialize the agent
agent = Agent( agent = Agent(

@ -0,0 +1,62 @@
from swarms import Agent
from swarms.structs.batch_agent_execution import batch_agent_execution
# Initialize different medical specialist agents
cardiologist = Agent(
agent_name="Cardiologist",
agent_description="Expert in heart conditions and cardiovascular health",
system_prompt="""You are an expert cardiologist. Your role is to:
1. Analyze cardiac symptoms and conditions
2. Provide detailed assessments of heart-related issues
3. Suggest appropriate diagnostic steps
4. Recommend treatment approaches
Always maintain a professional medical tone and focus on cardiac-specific concerns.""",
max_loops=1,
random_models_on=True,
)
neurologist = Agent(
agent_name="Neurologist",
agent_description="Expert in neurological disorders and brain conditions",
system_prompt="""You are an expert neurologist. Your role is to:
1. Evaluate neurological symptoms and conditions
2. Analyze brain and nervous system related issues
3. Recommend appropriate neurological tests
4. Suggest treatment plans for neurological disorders
Always maintain a professional medical tone and focus on neurological concerns.""",
max_loops=1,
random_models_on=True,
)
dermatologist = Agent(
agent_name="Dermatologist",
agent_description="Expert in skin conditions and dermatological issues",
system_prompt="""You are an expert dermatologist. Your role is to:
1. Assess skin conditions and symptoms
2. Provide detailed analysis of dermatological issues
3. Recommend appropriate skin tests and procedures
4. Suggest treatment plans for skin conditions
Always maintain a professional medical tone and focus on dermatological concerns.""",
max_loops=1,
random_models_on=True,
)
# Create a list of medical cases for each specialist
cases = [
"Patient presents with chest pain, shortness of breath, and fatigue. Please provide an initial assessment and recommended next steps.",
"Patient reports severe headaches, dizziness, and occasional numbness in extremities. Please evaluate these symptoms and suggest appropriate diagnostic approach.",
"Patient has developed a persistent rash with itching and redness on the arms and legs. Please analyze the symptoms and recommend treatment options.",
]
# for every agent print their model name
for agent in [cardiologist, neurologist, dermatologist]:
print(agent.model_name)
# Create list of agents
specialists = [cardiologist, neurologist, dermatologist]
# Execute the batch of medical consultations
results = batch_agent_execution(specialists, cases)
print(results)

@ -0,0 +1,24 @@
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.tools.mcp_integration import MCPServerSseParams
server_one = MCPServerSseParams(
url="http://127.0.0.1:6274",
headers={"Content-Type": "application/json"},
)
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
mcp_servers=[server_one],
output_type="final",
)
out = agent.run("Use the add tool to add 2 and 2")
print(type(out))

@ -0,0 +1,50 @@
from swarms import Agent
tools = [
{
"type": "function",
"function": {
"name": "add_numbers",
"description": "Add two numbers together and return the result.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the operation to perform.",
},
"a": {
"type": "integer",
"description": "The first number to add.",
},
"b": {
"type": "integer",
"description": "The second number to add.",
},
},
"required": [
"name",
"a",
"b",
],
},
},
}
]
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
max_loops=2,
tools_list_dictionary=tools,
output_type="final",
mcp_url="http://0.0.0.0:8000/sse",
)
out = agent.run(
"Use the multiply tool to multiply 3 and 4 together. Look at the tools available to you.",
)
print(agent.short_memory.get_str())

@ -0,0 +1,10 @@
from swarms.tools.mcp_client import (
list_tools_for_multiple_urls,
)
print(
list_tools_for_multiple_urls(
["http://0.0.0.0:8000/sse"], output_type="json"
)
)

@ -0,0 +1,8 @@
from swarms.tools.mcp_client import execute_mcp_tool
print(
execute_mcp_tool(
"http://0.0.0.0:8000/sse",
parameters={"name": "multiply", "a": 1, "b": 2},
)
)

@ -0,0 +1,53 @@
import asyncio
from swarms.tools.mcp_client_call import (
aget_mcp_tools,
execute_tool_call,
)
import json
async def main():
tools = await aget_mcp_tools("http://0.0.0.0:8000/sse", "openai")
print(json.dumps(tools, indent=4))
# First create the markdown file
create_result = await execute_tool_call(
server_path="http://0.0.0.0:8000/sse",
messages=[
{
"role": "user",
"content": "Create a new markdown file called 'chicken_cat_story'",
}
],
)
print("File creation result:", create_result)
# Then write the story to the file
story_content = """Title: The Adventures of Clucky and Whiskers
Once upon a time in a quiet, sunlit farm, there lived a curious chicken named Clucky and a mischievous cat named Whiskers. Clucky was known for her vibrant spirit and insatiable curiosity, roaming the farmyard with her head held high. Whiskers, on the other hand, was a clever little feline who always found himself in amusing predicaments; he often ventured into adventures that few dared to imagine.
The unlikely duo first met one fine autumn morning when Whiskers was chasing a playful butterfly near the barn. Clucky, busy pecking at the ground, almost tripped over Whiskers. Apologizing in her gentle clucks, she noticed that Whiskers was not scared at allinstead, he greeted her with a friendly purr. From that day on, the two embarked on countless adventures, exploring every corner of the farm and beyond.
They would roam the rolling meadows, share stories under the starry night sky, and even work together to solve little mysteries that baffled the other animals. Whether it was searching for a hidden pile of treats or finding safe paths through the woods, Clucky and Whiskers proved that friendship can be found in the most unexpected places.
The other animals on the farm watched in amazement as the chicken and the cat not only complemented each other but also became the best of friends. Clucky's boldness and Whiskers' cunning were a perfect match, teaching everyone that differences can create the strongest bonds.
In the heartwarming adventures of Clucky and Whiskers, one could learn that true friendship breaks all barriers, be they of fur or feathers. The legend of the brave chicken and the clever cat lived on forever, reminding everyone on the farm that unity makes life more colorful and joyful.
The End."""
story_result = await execute_tool_call(
server_path="http://0.0.0.0:8000/sse",
messages=[
{
"role": "user",
"content": f"Write this story to the file 'chicken_cat_story.md': {story_content}",
}
],
)
print("Story writing result:", story_result)
if __name__ == "__main__":
asyncio.run(main())

@ -0,0 +1,8 @@
from swarms.structs.long_agent import LongAgent
if __name__ == "__main__":
long_agent = LongAgent(
token_count_per_agent=3000, output_type="final"
)
print(long_agent.run([""]))

@ -0,0 +1,91 @@
# stock_price_server.py
from mcp.server.fastmcp import FastMCP
import os
from datetime import datetime
mcp = FastMCP("StockPrice")
@mcp.tool()
def create_markdown_file(filename: str) -> str:
"""
Create a new markdown file with a basic structure.
Args:
filename (str): The name of the markdown file to create (without .md extension)
Returns:
str: A message indicating success or failure
Example:
>>> create_markdown_file('my_notes')
'Created markdown file: my_notes.md'
"""
try:
if not filename:
return "Please provide a valid filename"
# Ensure filename ends with .md
if not filename.endswith(".md"):
filename = f"{filename}.md"
# Create basic markdown structure
content = f"""# {filename.replace('.md', '')}
Created on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## Content
"""
with open(filename, "w") as f:
f.write(content)
return f"Created markdown file: {filename}"
except Exception as e:
return f"Error creating markdown file: {str(e)}"
@mcp.tool()
def write_to_markdown(filename: str, content: str) -> str:
"""
Append content to an existing markdown file.
Args:
filename (str): The name of the markdown file (without .md extension)
content (str): The content to append to the file
Returns:
str: A message indicating success or failure
Example:
>>> write_to_markdown('my_notes', 'This is a new note')
'Content added to my_notes.md'
"""
try:
if not filename or not content:
return "Please provide both filename and content"
# Ensure filename ends with .md
if not filename.endswith(".md"):
filename = f"{filename}.md"
# Check if file exists
if not os.path.exists(filename):
return f"File {filename} does not exist. Please create it first using create_markdown_file"
# Append content with timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted_content = f"\n### Entry - {timestamp}\n{content}\n"
with open(filename, "a") as f:
f.write(formatted_content)
return f"Content added to {filename}"
except Exception as e:
return f"Error writing to markdown file: {str(e)}"
if __name__ == "__main__":
mcp.run(transport="sse")

@ -79,6 +79,7 @@ from swarms.structs.swarming_architectures import (
) )
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
from swarms.structs.council_judge import CouncilAsAJudge from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.batch_agent_execution import batch_agent_execution
__all__ = [ __all__ = [
"Agent", "Agent",
@ -148,4 +149,5 @@ __all__ = [
"get_swarms_info", "get_swarms_info",
"AutoSwarmBuilder", "AutoSwarmBuilder",
"CouncilAsAJudge", "CouncilAsAJudge",
"batch_agent_execution",
] ]

@ -68,6 +68,7 @@ from swarms.utils.str_to_dict import str_to_dict
from swarms.prompts.react_base_prompt import REACT_SYS_PROMPT from swarms.prompts.react_base_prompt import REACT_SYS_PROMPT
from swarms.prompts.max_loop_prompt import generate_reasoning_prompt from swarms.prompts.max_loop_prompt import generate_reasoning_prompt
from swarms.prompts.safety_prompt import SAFETY_PROMPT from swarms.prompts.safety_prompt import SAFETY_PROMPT
from swarms.structs.ma_utils import set_random_models_for_agents
# Utils # Utils
@ -400,6 +401,7 @@ class Agent:
mcp_urls: List[str] = None, mcp_urls: List[str] = None,
react_on: bool = False, react_on: bool = False,
safety_prompt_on: bool = False, safety_prompt_on: bool = False,
random_models_on: bool = False,
*args, *args,
**kwargs, **kwargs,
): ):
@ -523,6 +525,7 @@ class Agent:
self.mcp_urls = mcp_urls self.mcp_urls = mcp_urls
self.react_on = react_on self.react_on = react_on
self.safety_prompt_on = safety_prompt_on self.safety_prompt_on = safety_prompt_on
self.random_models_on = random_models_on
self._cached_llm = ( self._cached_llm = (
None # Add this line to cache the LLM instance None # Add this line to cache the LLM instance
@ -570,6 +573,9 @@ class Agent:
self.short_memory = self.short_memory_init() self.short_memory = self.short_memory_init()
if self.random_models_on is True:
self.model_name = set_random_models_for_agents()
def short_memory_init(self): def short_memory_init(self):
if ( if (
self.agent_name is not None self.agent_name is not None

@ -0,0 +1,64 @@
from swarms.structs.agent import Agent
from typing import List
from swarms.utils.formatter import formatter
def batch_agent_execution(
agents: List[Agent],
tasks: List[str],
):
"""
Execute a batch of agents on a list of tasks concurrently.
Args:
agents (List[Agent]): List of agents to execute
tasks (list[str]): List of tasks to execute
Returns:
List[str]: List of results from each agent execution
Raises:
ValueError: If number of agents doesn't match number of tasks
"""
if len(agents) != len(tasks):
raise ValueError(
"Number of agents must match number of tasks"
)
import concurrent.futures
import multiprocessing
results = []
# Calculate max workers as 90% of available CPU cores
max_workers = max(1, int(multiprocessing.cpu_count() * 0.9))
formatter.print_panel(
f"Executing {len(agents)} agents on {len(tasks)} tasks using {max_workers} workers"
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all tasks to the executor
future_to_task = {
executor.submit(agent.run, task): (agent, task)
for agent, task in zip(agents, tasks)
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_task):
agent, task = future_to_task[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print(
f"Task failed for agent {agent.agent_name}: {str(e)}"
)
results.append(None)
# Wait for all futures to complete before returning
concurrent.futures.wait(future_to_task.keys())
return results

@ -0,0 +1,424 @@
import concurrent.futures
import os
from typing import Union, List
import PyPDF2
import markdown
from pathlib import Path
from swarms.utils.litellm_tokenizer import count_tokens
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.formatter import formatter
class LongAgent:
"""
A class to handle and process long-form content from various sources including PDFs,
markdown files, and large text documents.
"""
def __init__(
self,
name: str = "LongAgent",
description: str = "A long-form content processing agent",
token_count_per_agent: int = 16000,
output_type: str = "final",
model_name: str = "gpt-4o-mini",
aggregator_model_name: str = "gpt-4o-mini",
):
"""Initialize the LongAgent."""
self.name = name
self.description = description
self.model_name = model_name
self.aggregator_model_name = aggregator_model_name
self.content = ""
self.metadata = {}
self.token_count_per_agent = token_count_per_agent
self.output_type = output_type
self.agents = []
self.conversation = Conversation()
def load_pdf(self, file_path: Union[str, Path]) -> str:
"""
Load and extract text from a PDF file.
Args:
file_path (Union[str, Path]): Path to the PDF file
Returns:
str: Extracted text from the PDF
"""
if not os.path.exists(file_path):
raise FileNotFoundError(
f"PDF file not found at {file_path}"
)
text = ""
with open(file_path, "rb") as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
text += page.extract_text()
self.content = text
self.metadata["source"] = "pdf"
self.metadata["file_path"] = str(file_path)
return text
def load_markdown(self, file_path: Union[str, Path]) -> str:
"""
Load and process a markdown file.
Args:
file_path (Union[str, Path]): Path to the markdown file
Returns:
str: Processed markdown content
"""
if not os.path.exists(file_path):
raise FileNotFoundError(
f"Markdown file not found at {file_path}"
)
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
# Convert markdown to HTML for processing
markdown.markdown(content)
self.content = content
self.metadata["source"] = "markdown"
self.metadata["file_path"] = str(file_path)
return content
def load_text(self, text: str) -> str:
"""
Load and process a large text string.
Args:
text (str): The text content to process
Returns:
str: The processed text
"""
self.content = text
self.metadata["source"] = "text"
return text
def get_content(self) -> str:
"""
Get the current content being processed.
Returns:
str: The current content
"""
return self.content
def get_metadata(self) -> dict:
"""
Get the metadata associated with the current content.
Returns:
dict: The metadata dictionary
"""
return self.metadata
def count_token_document(
self, file_path: Union[str, Path]
) -> int:
"""
Count the number of tokens in a document.
Args:
document (str): The document to count tokens for
"""
if file_path.endswith(".pdf"):
count = count_tokens(self.load_pdf(file_path))
formatter.print_panel(
f"Token count for {file_path}: {count}",
title="Token Count",
)
print(f"Token count for {file_path}: {count}")
elif file_path.endswith(".md"):
count = count_tokens(self.load_markdown(file_path))
formatter.print_panel(
f"Token count for {file_path}: {count}",
title="Token Count",
)
print(f"Token count for {file_path}: {count}")
elif file_path.endswith(".txt"):
count = count_tokens(self.load_text(file_path))
formatter.print_panel(
f"Token count for {file_path}: {count}",
title="Token Count",
)
print(f"Token count for {file_path}: {count}")
else:
raise ValueError(f"Unsupported file type: {file_path}")
return count
def count_multiple_documents(
self, file_paths: List[Union[str, Path]]
) -> int:
"""
Count the number of tokens in multiple documents.
Args:
file_paths (List[Union[str, Path]]): The list of file paths to count tokens for
Returns:
int: Total token count across all documents
"""
total_tokens = 0
# Calculate max_workers as 20% of CPU count
max_workers = max(1, int(os.cpu_count() * 0.2))
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures = [
executor.submit(self.count_token_document, file_path)
for file_path in file_paths
]
for future in concurrent.futures.as_completed(futures):
try:
total_tokens += future.result()
except Exception as e:
formatter.print_panel(
f"Error processing document: {str(e)}",
title="Error",
)
continue
return total_tokens
def create_agents_for_documents(
self, file_paths: List[Union[str, Path]]
) -> List[Agent]:
"""
Create agents for each document chunk and process them.
Args:
file_paths (List[Union[str, Path]]): The list of file paths to create agents for
Returns:
List[Agent]: List of created agents
"""
for file_path in file_paths:
# Load the document content
if str(file_path).endswith(".pdf"):
content = self.load_pdf(file_path)
elif str(file_path).endswith(".md"):
content = self.load_markdown(file_path)
else:
content = self.load_text(str(file_path))
# Split content into chunks based on token count
chunks = self._split_into_chunks(content)
# Create an agent for each chunk
for i, chunk in enumerate(chunks):
agent = Agent(
agent_name=f"Document Analysis Agent - {Path(file_path).name} - Chunk {i+1}",
system_prompt="""
You are an expert document analysis and summarization agent specialized in processing and understanding complex documents. Your primary responsibilities include:
1. Document Analysis:
- Thoroughly analyze the provided document chunk
- Identify key themes, main arguments, and important details
- Extract critical information and relationships between concepts
2. Summarization Capabilities:
- Create concise yet comprehensive summaries
- Generate both high-level overviews and detailed breakdowns
- Highlight key points, findings, and conclusions
- Maintain context and relationships between different sections
3. Information Extraction:
- Identify and extract important facts, figures, and data points
- Recognize and preserve technical terminology and domain-specific concepts
- Maintain accuracy in representing the original content
4. Response Format:
- Provide clear, structured responses
- Use bullet points for key findings
- Include relevant quotes or references when necessary
- Maintain professional and academic tone
5. Context Awareness:
- Consider the document's purpose and target audience
- Adapt your analysis based on the document type (academic, technical, general)
- Preserve the original meaning and intent
Your goal is to help users understand and extract value from this document chunk while maintaining accuracy and completeness in your analysis.
""",
model_name=self.model_name,
max_loops=1,
max_tokens=self.token_count_per_agent,
)
# Run the agent on the chunk
output = agent.run(
f"Please analyze and summarize the following document chunk:\n\n{chunk}"
)
# Add the output to the conversation
self.conversation.add(
role=agent.agent_name,
content=output,
)
self.agents.append(agent)
return self.agents
def _split_into_chunks(self, content: str) -> List[str]:
"""
Split content into chunks based on token count.
Args:
content (str): The content to split
Returns:
List[str]: List of content chunks
"""
chunks = []
current_chunk = ""
current_tokens = 0
# Split content into sentences (simple approach)
sentences = content.split(". ")
for sentence in sentences:
sentence_tokens = count_tokens(sentence)
if (
current_tokens + sentence_tokens
> self.token_count_per_agent
):
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
current_tokens = sentence_tokens
else:
current_chunk += (
". " + sentence if current_chunk else sentence
)
current_tokens += sentence_tokens
if current_chunk:
chunks.append(current_chunk)
return chunks
def count_total_agents(self) -> int:
"""
Count the total number of agents.
"""
count = len(self.agents)
formatter.print_panel(f"Total agents created: {count}")
return count
def _create_aggregator_agent(self) -> Agent:
"""
Create an aggregator agent for synthesizing document summaries.
Returns:
Agent: The configured aggregator agent
"""
return Agent(
agent_name="Document Aggregator Agent",
system_prompt="""
You are an expert document synthesis agent specialized in creating comprehensive reports from multiple document summaries. Your responsibilities include:
1. Synthesis and Integration:
- Combine multiple document summaries into a coherent narrative
- Identify and resolve any contradictions or inconsistencies
- Maintain logical flow and structure in the final report
- Preserve important details while eliminating redundancy
2. Report Structure:
- Create a clear, hierarchical structure for the report
- Include an executive summary at the beginning
- Organize content into logical sections with clear headings
- Ensure smooth transitions between different topics
3. Analysis and Insights:
- Identify overarching themes and patterns across summaries
- Draw meaningful conclusions from the combined information
- Highlight key findings and their implications
- Provide context and connections between different pieces of information
4. Quality Assurance:
- Ensure factual accuracy and consistency
- Maintain professional and academic tone
- Verify that all important information is included
- Check for clarity and readability
Your goal is to create a comprehensive, well-structured report that effectively synthesizes all the provided document summaries into a single coherent document.
""",
model_name=self.aggregator_model_name,
max_loops=1,
max_tokens=self.token_count_per_agent,
)
def run(self, file_paths: List[Union[str, Path]]) -> str:
"""
Run the document processing pipeline and generate a comprehensive report.
Args:
file_paths (List[Union[str, Path]]): The list of file paths to process
Returns:
str: The final comprehensive report
"""
# Count total tokens
total_tokens = self.count_multiple_documents(file_paths)
formatter.print_panel(
f"Total tokens: {total_tokens}", title="Total Tokens"
)
total_amount_of_agents = (
total_tokens / self.token_count_per_agent
)
formatter.print_panel(
f"Total amount of agents: {total_amount_of_agents}",
title="Total Amount of Agents",
)
# First, process all documents and create chunk agents
self.create_agents_for_documents(file_paths)
# Format the number of agents
# formatter.print_panel(f"Number of agents: {len(self.agents)}", title="Number of Agents")
# Create aggregator agent and collect summaries
aggregator_agent = self._create_aggregator_agent()
combined_summaries = self.conversation.get_str()
# Generate the final comprehensive report
final_report = aggregator_agent.run(
f"""
Please create a comprehensive report by synthesizing the following document summaries:
{combined_summaries}
Please structure your response as follows:
1. Executive Summary
2. Main Findings and Analysis
3. Key Themes and Patterns
4. Detailed Breakdown by Topic
5. Conclusions and Implications
Ensure the report is well-organized, comprehensive, and maintains a professional tone throughout.
"""
)
# Add the final report to the conversation
self.conversation.add(
role="Document Aggregator Agent", content=final_report
)
return history_output_formatter(
conversation=self.conversation, type=self.output_type
)

@ -1,10 +1,9 @@
from swarms.structs.agent import Agent from typing import List, Any, Optional, Union, Callable
from typing import List, Any, Optional, Union
import random import random
def list_all_agents( def list_all_agents(
agents: List[Union[Agent, Any]], agents: List[Union[Callable, Any]],
conversation: Optional[Any] = None, conversation: Optional[Any] = None,
name: str = "", name: str = "",
add_to_conversation: bool = False, add_to_conversation: bool = False,
@ -74,9 +73,9 @@ models = [
def set_random_models_for_agents( def set_random_models_for_agents(
agents: Optional[Union[List[Agent], Agent]] = None, agents: Optional[Union[List[Callable], Callable]] = None,
model_names: List[str] = models, model_names: List[str] = models,
) -> Union[List[Agent], Agent, str]: ) -> Union[List[Callable], Callable, str]:
"""Sets random models for agents in the swarm or returns a random model name. """Sets random models for agents in the swarm or returns a random model name.
Args: Args:

@ -198,7 +198,7 @@ class MALT:
system_prompt=proof_verifier_prompt, system_prompt=proof_verifier_prompt,
) )
majority_voting_agent = Agent( Agent(
agent_name="Majority-Voting-Agent", agent_name="Majority-Voting-Agent",
model_name="gpt-4o-mini", model_name="gpt-4o-mini",
max_loops=1, max_loops=1,

@ -0,0 +1,269 @@
import litellm
import asyncio
import contextlib
import random
from functools import wraps
from typing import Any, Dict, List
from litellm.experimental_mcp_client import (
call_openai_tool,
load_mcp_tools,
)
from loguru import logger
from mcp import ClientSession
from mcp.client.sse import sse_client
import os
def retry_with_backoff(retries=3, backoff_in_seconds=1):
"""Decorator for retrying functions with exponential backoff."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
x = 0
while True:
try:
return await func(*args, **kwargs)
except Exception as e:
if x == retries:
logger.error(
f"Failed after {retries} retries: {str(e)}"
)
raise
sleep_time = (
backoff_in_seconds * 2**x
+ random.uniform(0, 1)
)
logger.warning(
f"Attempt {x + 1} failed, retrying in {sleep_time:.2f}s"
)
await asyncio.sleep(sleep_time)
x += 1
return wrapper
return decorator
@contextlib.contextmanager
def get_or_create_event_loop():
"""Context manager to handle event loop creation and cleanup."""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
yield loop
finally:
if loop.is_running():
loop.stop()
if not loop.is_closed():
loop.close()
@retry_with_backoff(retries=3)
async def aget_mcp_tools(
server_path: str, format: str = "openai", *args, **kwargs
) -> List[Dict[str, Any]]:
"""
Fetch available MCP tools from the server with retry logic.
Args:
server_path (str): Path to the MCP server script
Returns:
List[Dict[str, Any]]: List of available MCP tools in OpenAI format
Raises:
ValueError: If server_path is invalid
ConnectionError: If connection to server fails
"""
if not server_path or not isinstance(server_path, str):
raise ValueError("Invalid server path provided")
logger.info(f"Fetching MCP tools from server: {server_path}")
try:
async with sse_client(server_path, *args, **kwargs) as (
read,
write,
):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await load_mcp_tools(
session=session, format=format
)
logger.info(
f"Successfully fetched {len(tools)} tools"
)
return tools
except Exception as e:
logger.error(f"Error fetching MCP tools: {str(e)}")
raise
async def get_mcp_tools(
server_path: str, *args, **kwargs
) -> List[Dict[str, Any]]:
return await aget_mcp_tools(server_path, *args, **kwargs)
def get_mcp_tools_sync(
server_path: str, format: str = "openai", *args, **kwargs
) -> List[Dict[str, Any]]:
"""
Synchronous version of get_mcp_tools that handles event loop management.
Args:
server_path (str): Path to the MCP server script
Returns:
List[Dict[str, Any]]: List of available MCP tools in OpenAI format
Raises:
ValueError: If server_path is invalid
ConnectionError: If connection to server fails
RuntimeError: If event loop management fails
"""
with get_or_create_event_loop() as loop:
try:
return loop.run_until_complete(
aget_mcp_tools(server_path, format, *args, **kwargs)
)
except Exception as e:
logger.error(f"Error in get_mcp_tools_sync: {str(e)}")
raise
async def execute_tool_call(
server_path: str,
messages: List[Dict[str, Any]],
model: str = "o3-mini",
*args,
**kwargs,
) -> Dict[str, Any]:
"""
Execute a tool call using the MCP client with retry logic.
Args:
server_path (str): Path to the MCP server script
messages (List[Dict[str, Any]]): Current conversation messages
model (str): The model to use for completion (default: "gpt-4")
Returns:
Dict[str, Any]: Final LLM response after tool execution
Raises:
ValueError: If inputs are invalid
ConnectionError: If connection to server fails
RuntimeError: If tool execution fails
"""
async with sse_client(server_path, *args, **kwargs) as (
read,
write,
):
async with ClientSession(read, write) as session:
try:
# Initialize the connection
await session.initialize()
# Get tools
tools = await load_mcp_tools(
session=session, format="openai"
)
logger.info(f"Tools: {tools}")
# First LLM call to get tool call
llm_response = await litellm.acompletion(
model=model,
api_key=os.getenv("OPENAI_API_KEY"),
messages=messages,
tools=tools,
tool_choice="auto",
# parallel_tool_calls=True,
)
logger.info(f"Initial LLM Response: {llm_response}")
message = llm_response["choices"][0]["message"]
if not message.get("tool_calls"):
logger.warning("No tool calls in LLM response")
return llm_response
# Call the tool using MCP client
openai_tool = message["tool_calls"][0]
call_result = await call_openai_tool(
session=session,
openai_tool=openai_tool,
)
logger.info(f"Tool call completed: {call_result}")
# Update messages with tool result
messages.append(message)
messages.append(
{
"role": "tool",
"content": str(call_result.content[0].text),
"tool_call_id": openai_tool["id"],
}
)
logger.debug(
"Updated messages with tool result",
extra={"messages": messages},
)
# Second LLM call with tool result
final_response = await litellm.acompletion(
model=model,
api_key=os.getenv("OPENAI_API_KEY"),
messages=messages,
tools=tools,
tool_choice="auto",
# parallel_tool_calls=True,
)
logger.info(f"Final LLM Response: {final_response}")
return final_response
except Exception as e:
logger.error(f"Error in execute_tool_call: {str(e)}")
raise RuntimeError(f"Tool execution failed: {str(e)}")
# def execute_tool_call_sync(
# server_path: str,
# tool_call: Dict[str, Any],
# task: str,
# *args,
# **kwargs,
# ) -> Dict[str, Any]:
# """
# Synchronous version of execute_tool_call that handles event loop management.
# Args:
# server_path (str): Path to the MCP server script
# tool_call (Dict[str, Any]): The OpenAI tool call to execute
# messages (List[Dict[str, Any]]): Current conversation messages
# Returns:
# Dict[str, Any]: Final LLM response after tool execution
# Raises:
# ValueError: If inputs are invalid
# ConnectionError: If connection to server fails
# RuntimeError: If event loop management fails
# """
# with get_or_create_event_loop() as loop:
# try:
# return loop.run_until_complete(
# execute_tool_call(
# server_path, tool_call, task, *args, **kwargs
# )
# )
# except Exception as e:
# logger.error(f"Error in execute_tool_call_sync: {str(e)}")
# raise

@ -0,0 +1,343 @@
import base64
from typing import Union, Dict, Any, Tuple
import requests
from pathlib import Path
import wave
import numpy as np
def encode_audio_to_base64(audio_path: Union[str, Path]) -> str:
"""
Encode a WAV file to base64 string.
Args:
audio_path (Union[str, Path]): Path to the WAV file
Returns:
str: Base64 encoded string of the audio file
Raises:
FileNotFoundError: If the audio file doesn't exist
ValueError: If the file is not a valid WAV file
"""
try:
audio_path = Path(audio_path)
if not audio_path.exists():
raise FileNotFoundError(
f"Audio file not found: {audio_path}"
)
if not audio_path.suffix.lower() == ".wav":
raise ValueError("File must be a WAV file")
with open(audio_path, "rb") as audio_file:
audio_data = audio_file.read()
return base64.b64encode(audio_data).decode("utf-8")
except Exception as e:
raise Exception(f"Error encoding audio file: {str(e)}")
def decode_base64_to_audio(
base64_string: str, output_path: Union[str, Path]
) -> None:
"""
Decode a base64 string to a WAV file.
Args:
base64_string (str): Base64 encoded audio data
output_path (Union[str, Path]): Path where the WAV file should be saved
Raises:
ValueError: If the base64 string is invalid
IOError: If there's an error writing the file
"""
try:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
audio_data = base64.b64decode(base64_string)
with open(output_path, "wb") as audio_file:
audio_file.write(audio_data)
except Exception as e:
raise Exception(f"Error decoding audio data: {str(e)}")
def download_audio_from_url(
url: str, output_path: Union[str, Path]
) -> None:
"""
Download an audio file from a URL and save it locally.
Args:
url (str): URL of the audio file
output_path (Union[str, Path]): Path where the audio file should be saved
Raises:
requests.RequestException: If there's an error downloading the file
IOError: If there's an error saving the file
"""
try:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
response = requests.get(url)
response.raise_for_status()
with open(output_path, "wb") as audio_file:
audio_file.write(response.content)
except Exception as e:
raise Exception(f"Error downloading audio file: {str(e)}")
def process_audio_with_model(
audio_path: Union[str, Path],
model: str,
prompt: str,
voice: str = "alloy",
format: str = "wav",
) -> Dict[str, Any]:
"""
Process an audio file with a model that supports audio input/output.
Args:
audio_path (Union[str, Path]): Path to the input WAV file
model (str): Model name to use for processing
prompt (str): Text prompt to accompany the audio
voice (str, optional): Voice to use for audio output. Defaults to "alloy"
format (str, optional): Audio format. Defaults to "wav"
Returns:
Dict[str, Any]: Model response containing both text and audio if applicable
Raises:
ImportError: If litellm is not installed
ValueError: If the model doesn't support audio processing
"""
try:
from litellm import (
completion,
supports_audio_input,
supports_audio_output,
)
if not supports_audio_input(model):
raise ValueError(
f"Model {model} does not support audio input"
)
# Encode the audio file
encoded_audio = encode_audio_to_base64(audio_path)
# Prepare the messages
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "input_audio",
"input_audio": {
"data": encoded_audio,
"format": format,
},
},
],
}
]
# Make the API call
response = completion(
model=model,
modalities=["text", "audio"],
audio={"voice": voice, "format": format},
messages=messages,
)
return response
except ImportError:
raise ImportError(
"Please install litellm: pip install litellm"
)
except Exception as e:
raise Exception(
f"Error processing audio with model: {str(e)}"
)
def read_wav_file(
file_path: Union[str, Path],
) -> Tuple[np.ndarray, int]:
"""
Read a WAV file and return its audio data and sample rate.
Args:
file_path (Union[str, Path]): Path to the WAV file
Returns:
Tuple[np.ndarray, int]: Audio data as numpy array and sample rate
Raises:
FileNotFoundError: If the file doesn't exist
ValueError: If the file is not a valid WAV file
"""
try:
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(
f"Audio file not found: {file_path}"
)
with wave.open(str(file_path), "rb") as wav_file:
# Get audio parameters
n_channels = wav_file.getnchannels()
sample_width = wav_file.getsampwidth()
frame_rate = wav_file.getframerate()
n_frames = wav_file.getnframes()
# Read audio data
frames = wav_file.readframes(n_frames)
# Convert to numpy array
dtype = np.int16 if sample_width == 2 else np.int8
audio_data = np.frombuffer(frames, dtype=dtype)
# Reshape if stereo
if n_channels == 2:
audio_data = audio_data.reshape(-1, 2)
return audio_data, frame_rate
except Exception as e:
raise Exception(f"Error reading WAV file: {str(e)}")
def write_wav_file(
audio_data: np.ndarray,
file_path: Union[str, Path],
sample_rate: int,
sample_width: int = 2,
) -> None:
"""
Write audio data to a WAV file.
Args:
audio_data (np.ndarray): Audio data as numpy array
file_path (Union[str, Path]): Path where to save the WAV file
sample_rate (int): Sample rate of the audio
sample_width (int, optional): Sample width in bytes. Defaults to 2 (16-bit)
Raises:
ValueError: If the audio data is invalid
IOError: If there's an error writing the file
"""
try:
file_path = Path(file_path)
file_path.parent.mkdir(parents=True, exist_ok=True)
# Ensure audio data is in the correct format
if audio_data.dtype != np.int16 and sample_width == 2:
audio_data = (audio_data * 32767).astype(np.int16)
elif audio_data.dtype != np.int8 and sample_width == 1:
audio_data = (audio_data * 127).astype(np.int8)
# Determine number of channels
n_channels = (
2
if len(audio_data.shape) > 1 and audio_data.shape[1] == 2
else 1
)
with wave.open(str(file_path), "wb") as wav_file:
wav_file.setnchannels(n_channels)
wav_file.setsampwidth(sample_width)
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_data.tobytes())
except Exception as e:
raise Exception(f"Error writing WAV file: {str(e)}")
def normalize_audio(audio_data: np.ndarray) -> np.ndarray:
"""
Normalize audio data to have maximum amplitude of 1.0.
Args:
audio_data (np.ndarray): Input audio data
Returns:
np.ndarray: Normalized audio data
"""
return audio_data / np.max(np.abs(audio_data))
def convert_to_mono(audio_data: np.ndarray) -> np.ndarray:
"""
Convert stereo audio to mono by averaging channels.
Args:
audio_data (np.ndarray): Input audio data (stereo)
Returns:
np.ndarray: Mono audio data
"""
if len(audio_data.shape) == 1:
return audio_data
return np.mean(audio_data, axis=1)
def encode_wav_to_base64(
audio_data: np.ndarray, sample_rate: int
) -> str:
"""
Convert audio data to base64 encoded WAV string.
Args:
audio_data (np.ndarray): Audio data
sample_rate (int): Sample rate of the audio
Returns:
str: Base64 encoded WAV data
"""
# Create a temporary WAV file in memory
with wave.open("temp.wav", "wb") as wav_file:
wav_file.setnchannels(1 if len(audio_data.shape) == 1 else 2)
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_data.tobytes())
# Read the file and encode to base64
with open("temp.wav", "rb") as f:
wav_bytes = f.read()
# Clean up temporary file
Path("temp.wav").unlink()
return base64.b64encode(wav_bytes).decode("utf-8")
def decode_base64_to_wav(
base64_string: str,
) -> Tuple[np.ndarray, int]:
"""
Convert base64 encoded WAV string to audio data and sample rate.
Args:
base64_string (str): Base64 encoded WAV data
Returns:
Tuple[np.ndarray, int]: Audio data and sample rate
"""
# Decode base64 string
wav_bytes = base64.b64decode(base64_string)
# Write to temporary file
with open("temp.wav", "wb") as f:
f.write(wav_bytes)
# Read the WAV file
audio_data, sample_rate = read_wav_file("temp.wav")
# Clean up temporary file
Path("temp.wav").unlink()
return audio_data, sample_rate

@ -123,6 +123,9 @@ class LiteLLM:
retries # Add retries for better reliability retries # Add retries for better reliability
) )
def output_for_tools(self, response: any):
return response["choices"][0]["message"]["tool_calls"][0]
def _prepare_messages(self, task: str) -> list: def _prepare_messages(self, task: str) -> list:
""" """
Prepare the messages for the given task. Prepare the messages for the given task.
@ -309,11 +312,17 @@ class LiteLLM:
# Standard completion # Standard completion
if self.stream: if self.stream:
return completion(**completion_params) output = completion(**completion_params)
else: else:
response = completion(**completion_params) response = completion(**completion_params)
if self.tools_list_dictionary is not None:
return self.output_for_tools(response)
else:
return response.choices[0].message.content return response.choices[0].message.content
return output
except LiteLLMException as error: except LiteLLMException as error:
logger.error(f"Error in LiteLLM run: {str(error)}") logger.error(f"Error in LiteLLM run: {str(error)}")
if "rate_limit" in str(error).lower(): if "rate_limit" in str(error).lower():

Loading…
Cancel
Save