feat][agent + mcp

pull/859/head
Kye Gomez 2 months ago
parent d61c218799
commit 181060e3ed

@ -0,0 +1,28 @@
from swarms import Agent
from swarms.schemas.mcp_schemas import MCPConnection
mcp_config = MCPConnection(
url="http://0.0.0.0:8000/sse",
# headers={"Authorization": "Bearer 1234567890"},
timeout=5,
)
mcp_url = "http://0.0.0.0:8000/sse"
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
max_loops=1,
mcp_url=mcp_url,
output_type="all",
)
# Create a markdown file with initial content
out = agent.run(
"Fetch the price for bitcoin on both functions get_htx_crypto_price and get_crypto_price",
)
print(out)

@ -195,9 +195,10 @@ nav:
- Create and Run Agents from YAML: "swarms/agents/create_agents_yaml.md" - Create and Run Agents from YAML: "swarms/agents/create_agents_yaml.md"
- Integrating Various Models into Your Agents: "swarms/models/agent_and_models.md" - Integrating Various Models into Your Agents: "swarms/models/agent_and_models.md"
- Tools: - Tools:
- Structured Outputs: "swarms/agents/structured_outputs.md"
- Overview: "swarms/tools/main.md" - Overview: "swarms/tools/main.md"
- What are tools?: "swarms/tools/build_tool.md" - What are tools?: "swarms/tools/build_tool.md"
- Structured Outputs: "swarms/agents/structured_outputs.md"
- Agent MCP Integration: "swarms/structs/agent_mcp.md"
- ToolAgent: "swarms/agents/tool_agent.md" - ToolAgent: "swarms/agents/tool_agent.md"
- Tool Storage: "swarms/tools/tool_storage.md" - Tool Storage: "swarms/tools/tool_storage.md"
- RAG || Long Term Memory: - RAG || Long Term Memory:
@ -261,6 +262,7 @@ nav:
- Swarms Tools: - Swarms Tools:
- Overview: "swarms_tools/overview.md" - Overview: "swarms_tools/overview.md"
- MCP Client Utils: "swarms/tools/mcp_client_call.md"
- Vertical Tools: - Vertical Tools:
- Finance: "swarms_tools/finance.md" - Finance: "swarms_tools/finance.md"

@ -0,0 +1,826 @@
# Agent MCP Integration Guide
<div class="grid cards" markdown>
- :material-connection: **Direct MCP Server Connection**
---
Connect agents to MCP servers via URL for seamless integration
[:octicons-arrow-right-24: Quick Start](#quick-start)
- :material-tools: **Dynamic Tool Discovery**
---
Automatically fetch and utilize tools from MCP servers
[:octicons-arrow-right-24: Tool Discovery](#integration-flow)
- :material-chart-line: **Real-time Communication**
---
Server-sent Events (SSE) for live data streaming
[:octicons-arrow-right-24: Configuration](#configuration-options)
- :material-code-json: **Structured Output**
---
Process and format responses with multiple output types
[:octicons-arrow-right-24: Examples](#example-implementations)
</div>
## Overview
The **Model Context Protocol (MCP)** integration enables Swarms agents to dynamically connect to external tools and services through a standardized protocol. This powerful feature expands agent capabilities by providing access to APIs, databases, and specialized services.
!!! info "What is MCP?"
The Model Context Protocol is a standardized way for AI agents to interact with external tools and services, providing a consistent interface for tool discovery and execution.
---
## :material-check-circle: Features Matrix
=== "✅ Current Capabilities"
| Feature | Status | Description |
|---------|--------|-------------|
| **Direct MCP Connection** | ✅ Ready | Connect via URL to MCP servers |
| **Tool Discovery** | ✅ Ready | Auto-fetch available tools |
| **SSE Communication** | ✅ Ready | Real-time server communication |
| **Multiple Tool Execution** | ✅ Ready | Execute multiple tools per session |
| **Structured Output** | ✅ Ready | Format responses in multiple types |
=== "🚧 In Development"
| Feature | Status | Expected |
|---------|--------|----------|
| **MCPConnection Model** | 🚧 Development | Q1 2024 |
| **Multiple Server Support** | 🚧 Planned | Q2 2024 |
| **Parallel Function Calling** | 🚧 Research | Q2 2024 |
| **Auto-discovery** | 🚧 Planned | Q3 2024 |
---
## :material-rocket: Quick Start
!!! tip "Prerequisites"
=== "System Requirements"
- Python 3.8+
- Swarms framework
- Running MCP server
=== "Installation"
```bash
pip install swarms
```
### Step 1: Basic Agent Setup
!!! example "Simple MCP Agent"
```python
from swarms import Agent
# Initialize agent with MCP integration
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="AI-powered financial advisor",
max_loops=1,
mcp_url="http://localhost:8000/sse", # Your MCP server
output_type="all",
)
# Execute task using MCP tools
result = agent.run(
"Get current Bitcoin price and analyze market trends"
)
print(result)
```
### Step 2: Advanced Configuration
!!! example "Production-Ready Setup"
```python
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
agent = Agent(
agent_name="Advanced-Financial-Agent",
agent_description="Comprehensive market analysis agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=3,
mcp_url="http://production-server:8000/sse",
output_type="json",
# Additional parameters for production
temperature=0.1,
verbose=True,
)
```
---
## :material-workflow: Integration Flow
The following diagram illustrates the complete MCP integration workflow:
```mermaid
graph TD
A[🚀 Agent Receives Task] --> B[🔗 Connect to MCP Server]
B --> C[🔍 Discover Available Tools]
C --> D[🧠 Analyze Task Requirements]
D --> E[📝 Generate Tool Request]
E --> F[📤 Send to MCP Server]
F --> G[⚙️ Server Processes Request]
G --> H[📥 Receive Response]
H --> I[🔄 Process & Validate]
I --> J[📊 Summarize Results]
J --> K[✅ Return Final Output]
classDef startEnd fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef process fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef communication fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
class A,K startEnd
class D,I,J process
class F,G,H communication
```
### Detailed Process Breakdown
!!! abstract "Process Steps"
=== "1-3: Initialization"
1. **Task Initiation** - Agent receives user query
2. **Server Connection** - Establish MCP server link
3. **Tool Discovery** - Fetch available tool schemas
=== "4-6: Execution"
4. **Task Analysis** - Determine required tools
5. **Request Generation** - Create structured API calls
6. **Server Communication** - Send requests via SSE
=== "7-9: Processing"
7. **Server Processing** - MCP server executes tools
8. **Response Handling** - Receive and validate data
9. **Result Processing** - Parse and structure output
=== "10-11: Completion"
10. **Summarization** - Generate user-friendly summary
11. **Final Output** - Return complete response
---
## :material-cog: Configuration Options
### Agent Parameters
!!! note "Configuration Reference"
| Parameter | Type | Description | Default | Example |
|-----------|------|-------------|---------|---------|
| `mcp_url` | `str` | MCP server endpoint | `None` | `"http://localhost:8000/sse"` |
| `output_type` | `str` | Response format | `"str"` | `"json"`, `"all"`, `"dict"` |
| `max_loops` | `int` | Execution iterations | `1` | `3` |
| `temperature` | `float` | Response creativity | `0.1` | `0.1-1.0` |
| `verbose` | `bool` | Debug logging | `False` | `True` |
### Output Type Comparison
=== "all"
**Complete execution trace with metadata**
```json
{
"response": "Bitcoin price: $45,230",
"tools_used": ["get_crypto_price"],
"execution_time": "2.3s",
"loops_completed": 1
}
```
=== "str"
**Simple string response**
```
"Current Bitcoin price is $45,230 USD"
```
=== "json"
**Structured JSON output**
```json
{
"bitcoin_price": 45230,
"currency": "USD",
"timestamp": "2024-01-15T10:30:00Z"
}
```
=== "dict"
**Python dictionary format**
```python
{
'price': 45230,
'symbol': 'BTC',
'change_24h': '+2.5%'
}
```
---
## :material-code-tags: Example Implementations
### Cryptocurrency Trading Agent
!!! example "Crypto Price Monitor"
```python
from swarms import Agent
crypto_agent = Agent(
agent_name="Crypto-Trading-Agent",
agent_description="Real-time cryptocurrency market analyzer",
max_loops=2,
mcp_url="http://crypto-server:8000/sse",
output_type="json",
temperature=0.1,
)
# Multi-exchange price comparison
result = crypto_agent.run(
"""
Compare Bitcoin and Ethereum prices across OKX and HTX exchanges.
Calculate arbitrage opportunities and provide trading recommendations.
"""
)
```
### Financial Analysis Suite
!!! example "Advanced Financial Agent"
```python
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
financial_agent = Agent(
agent_name="Financial-Analysis-Suite",
agent_description="Comprehensive financial market analyst",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=4,
mcp_url="http://finance-api:8000/sse",
output_type="all",
temperature=0.2,
)
# Complex market analysis
analysis = financial_agent.run(
"""
Perform a comprehensive analysis of Tesla (TSLA) stock:
1. Current price and technical indicators
2. Recent news sentiment analysis
3. Competitor comparison (GM, Ford)
4. Investment recommendation with risk assessment
"""
)
```
### Custom Industry Agent
!!! example "Healthcare Data Agent"
```python
from swarms import Agent
healthcare_agent = Agent(
agent_name="Healthcare-Data-Agent",
agent_description="Medical data analysis and research assistant",
max_loops=3,
mcp_url="http://medical-api:8000/sse",
output_type="dict",
system_prompt="""
You are a healthcare data analyst. Use available medical databases
and research tools to provide accurate, evidence-based information.
Always cite sources and include confidence levels.
""",
)
research = healthcare_agent.run(
"Research latest treatments for Type 2 diabetes and their efficacy rates"
)
```
---
## :material-server: MCP Server Development
### FastMCP Server Example
!!! example "Building a Custom MCP Server"
```python
from mcp.server.fastmcp import FastMCP
import requests
from typing import Optional
import asyncio
# Initialize MCP server
mcp = FastMCP("crypto_analysis_server")
@mcp.tool(
name="get_crypto_price",
description="Fetch current cryptocurrency price with market data",
)
def get_crypto_price(
symbol: str,
currency: str = "USD",
include_24h_change: bool = True
) -> dict:
"""
Get real-time cryptocurrency price and market data.
Args:
symbol: Cryptocurrency symbol (e.g., BTC, ETH)
currency: Target currency for price (default: USD)
include_24h_change: Include 24-hour price change data
"""
try:
url = f"https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": symbol.lower(),
"vs_currencies": currency.lower(),
"include_24hr_change": include_24h_change
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
return {
"symbol": symbol.upper(),
"price": data[symbol.lower()][currency.lower()],
"currency": currency.upper(),
"change_24h": data[symbol.lower()].get("24h_change", 0),
"timestamp": "2024-01-15T10:30:00Z"
}
except Exception as e:
return {"error": f"Failed to fetch price: {str(e)}"}
@mcp.tool(
name="analyze_market_sentiment",
description="Analyze cryptocurrency market sentiment from social media",
)
def analyze_market_sentiment(symbol: str, timeframe: str = "24h") -> dict:
"""Analyze market sentiment for a cryptocurrency."""
# Implement sentiment analysis logic
return {
"symbol": symbol,
"sentiment_score": 0.75,
"sentiment": "Bullish",
"confidence": 0.85,
"timeframe": timeframe
}
if __name__ == "__main__":
mcp.run(transport="sse", host="0.0.0.0", port=8000)
```
### Server Best Practices
!!! tip "Server Development Guidelines"
=== "🏗️ Architecture"
- **Modular Design**: Separate tools into logical modules
- **Error Handling**: Implement comprehensive error responses
- **Async Support**: Use async/await for better performance
- **Type Hints**: Include proper type annotations
=== "🔒 Security"
- **Input Validation**: Sanitize all user inputs
- **Rate Limiting**: Implement request throttling
- **Authentication**: Add API key validation
- **Logging**: Log all requests and responses
=== "⚡ Performance"
- **Caching**: Cache frequently requested data
- **Connection Pooling**: Reuse database connections
- **Timeouts**: Set appropriate request timeouts
- **Load Testing**: Test under realistic load
---
## :material-alert: Current Limitations
!!! warning "Important Limitations"
### 🚧 MCPConnection Model
The enhanced connection model is under development:
```python
# ❌ Not available yet
from swarms.schemas.mcp_schemas import MCPConnection
mcp_config = MCPConnection(
url="http://server:8000/sse",
headers={"Authorization": "Bearer token"},
timeout=30,
retry_attempts=3
)
# ✅ Use direct URL instead
mcp_url = "http://server:8000/sse"
```
### 🚧 Single Server Limitation
Currently supports one server per agent:
```python
# ❌ Multiple servers not supported
mcp_servers = [
"http://server1:8000/sse",
"http://server2:8000/sse"
]
# ✅ Single server only
mcp_url = "http://primary-server:8000/sse"
```
### 🚧 Sequential Execution
Tools execute sequentially, not in parallel:
```python
# Current: tool1() → tool2() → tool3()
# Future: tool1() | tool2() | tool3() (parallel)
```
---
## :material-wrench: Troubleshooting
### Common Issues & Solutions
!!! bug "Connection Problems"
=== "Server Unreachable"
**Symptoms**: Connection timeout or refused
**Solutions**:
```bash
# Check server status
curl -I http://localhost:8000/sse
# Verify port is open
netstat -tulpn | grep :8000
# Test network connectivity
ping your-server-host
```
=== "Authentication Errors"
**Symptoms**: 401/403 HTTP errors
**Solutions**:
```python
# Verify API credentials
headers = {"Authorization": "Bearer your-token"}
# Check token expiration
# Validate permissions
```
=== "SSL/TLS Issues"
**Symptoms**: Certificate errors
**Solutions**:
```python
# For development only
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
```
!!! bug "Tool Discovery Failures"
=== "Empty Tool List"
**Symptoms**: No tools found from server
**Debugging**:
```python
# Check server tool registration
@mcp.tool(name="tool_name", description="...")
def your_tool():
pass
# Verify server startup logs
# Check tool endpoint responses
```
=== "Schema Validation Errors"
**Symptoms**: Invalid tool parameters
**Solutions**:
```python
# Ensure proper type hints
def tool(param: str, optional: int = 0) -> dict:
return {"result": "success"}
# Validate parameter types
# Check required vs optional parameters
```
!!! bug "Performance Issues"
=== "Slow Response Times"
**Symptoms**: Long wait times for responses
**Optimization**:
```python
# Increase timeout
agent = Agent(
mcp_url="http://server:8000/sse",
timeout=60, # seconds
)
# Optimize server performance
# Use connection pooling
# Implement caching
```
=== "Memory Usage"
**Symptoms**: High memory consumption
**Solutions**:
```python
# Limit max_loops
agent = Agent(max_loops=2)
# Use streaming for large responses
# Implement garbage collection
```
### Debugging Tools
!!! tip "Debug Configuration"
```python
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
agent = Agent(
agent_name="Debug-Agent",
mcp_url="http://localhost:8000/sse",
verbose=True, # Enable verbose output
output_type="all", # Get full execution trace
)
# Monitor network traffic
# Check server logs
# Use profiling tools
```
---
## :material-security: Security Best Practices
### Authentication & Authorization
!!! shield "Security Checklist"
=== "🔑 Authentication"
- **API Keys**: Use strong, unique API keys
- **Token Rotation**: Implement automatic token refresh
- **Encryption**: Use HTTPS for all communications
- **Storage**: Secure credential storage (environment variables)
=== "🛡️ Authorization"
- **Role-Based Access**: Implement user role restrictions
- **Tool Permissions**: Limit tool access per user/agent
- **Rate Limiting**: Prevent abuse with request limits
- **Audit Logging**: Log all tool executions
=== "🔒 Data Protection"
- **Input Sanitization**: Validate all user inputs
- **Output Filtering**: Sanitize sensitive data in responses
- **Encryption**: Encrypt sensitive data in transit/rest
- **Compliance**: Follow industry standards (GDPR, HIPAA)
### Secure Configuration
!!! example "Production Security Setup"
```python
import os
from swarms import Agent
# Secure configuration
agent = Agent(
agent_name="Production-Agent",
mcp_url=os.getenv("MCP_SERVER_URL"), # From environment
# Additional security headers would go here when MCPConnection is available
verbose=False, # Disable verbose logging in production
output_type="json", # Structured output only
)
# Environment variables (.env file)
"""
MCP_SERVER_URL=https://secure-server.company.com/sse
MCP_API_KEY=your-secure-api-key
MCP_TIMEOUT=30
"""
```
---
## :material-chart-line: Performance Optimization
### Agent Optimization
!!! rocket "Performance Tips"
=== "⚡ Configuration"
```python
# Optimized agent settings
agent = Agent(
max_loops=2, # Limit iterations
temperature=0.1, # Reduce randomness
output_type="json", # Structured output
# Future: connection_pool_size=10
)
```
=== "🔄 Caching"
```python
# Implement response caching
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_mcp_call(query):
return agent.run(query)
```
=== "📊 Monitoring"
```python
import time
start_time = time.time()
result = agent.run("query")
execution_time = time.time() - start_time
print(f"Execution time: {execution_time:.2f}s")
```
### Server Optimization
!!! rocket "Server Performance"
```python
from mcp.server.fastmcp import FastMCP
import asyncio
from concurrent.futures import ThreadPoolExecutor
mcp = FastMCP("optimized_server")
# Async tool with thread pool
@mcp.tool(name="async_heavy_task")
async def heavy_computation(data: str) -> dict:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor, process_heavy_task, data
)
return result
def process_heavy_task(data):
# CPU-intensive processing
return {"processed": data}
```
---
## :material-timeline: Future Roadmap
### Upcoming Features
!!! rocket "Development Timeline"
=== "Q1 2024"
- **MCPConnection Model** - Enhanced configuration
- **Authentication Support** - Built-in auth mechanisms
- **Error Recovery** - Automatic retry logic
- **Connection Pooling** - Improved performance
=== "Q2 2024"
- **Multiple Server Support** - Connect to multiple MCPs
- **Parallel Execution** - Concurrent tool calling
- **Load Balancing** - Distribute requests across servers
- **Advanced Monitoring** - Real-time metrics
=== "Q3 2024"
- **Auto-discovery** - Automatic server detection
- **Workflow Engine** - Complex task orchestration
- **Plugin System** - Custom MCP extensions
- **Cloud Integration** - Native cloud provider support
### Contributing
!!! heart "Get Involved"
We welcome contributions to improve MCP integration:
- **Bug Reports**: [GitHub Issues](https://github.com/kyegomez/swarms/issues)
- **Feature Requests**: [Discussions](https://github.com/kyegomez/swarms/discussions)
- **Code Contributions**: [Pull Requests](https://github.com/kyegomez/swarms/pulls)
- **Documentation**: Help improve these docs
---
## :material-help-circle: Support & Resources
### Getting Help
!!! question "Need Assistance?"
=== "📚 Documentation"
- [Official Docs](https://docs.swarms.world)
- [API Reference](https://docs.swarms.world/api)
- [Tutorials](https://docs.swarms.world/tutorials)
=== "💬 Community"
- [Discord Server](https://discord.gg/swarms)
- [GitHub Discussions](https://github.com/kyegomez/swarms/discussions)
- [Stack Overflow](https://stackoverflow.com/questions/tagged/swarms)
=== "🔧 Development"
- [GitHub Repository](https://github.com/kyegomez/swarms)
- [Example Projects](https://github.com/kyegomez/swarms/tree/main/examples)
- [Contributing Guide](https://github.com/kyegomez/swarms/blob/main/CONTRIBUTING.md)
### Quick Reference
!!! abstract "Cheat Sheet"
```python
# Basic setup
from swarms import Agent
agent = Agent(
agent_name="Your-Agent",
mcp_url="http://localhost:8000/sse",
output_type="json",
max_loops=2
)
# Execute task
result = agent.run("Your query here")
# Common patterns
crypto_query = "Get Bitcoin price"
analysis_query = "Analyze Tesla stock performance"
research_query = "Research recent AI developments"
```
---
## :material-check-all: Conclusion
The MCP integration brings powerful external tool connectivity to Swarms agents, enabling them to access real-world data and services through a standardized protocol. While some advanced features are still in development, the current implementation provides robust functionality for most use cases.
!!! success "Ready to Start?"
Begin with the [Quick Start](#quick-start) section and explore the [examples](#example-implementations) to see MCP integration in action. As new features become available, this documentation will be updated with the latest capabilities and best practices.
!!! tip "Stay Updated"
Join our [Discord community](https://discord.gg/swarms) to stay informed about new MCP features and connect with other developers building amazing agent applications.
---
<div class="grid cards" markdown>
- :material-rocket: **[Quick Start](#quick-start)**
Get up and running with MCP integration in minutes
- :material-book-open: **[Examples](#example-implementations)**
Explore real-world implementations and use cases
- :material-cog: **[Configuration](#configuration-options)**
Learn about all available configuration options
- :material-help: **[Troubleshooting](#troubleshooting)**
Solve common issues and optimize performance
</div>

@ -0,0 +1,244 @@
# MCP Client Call Reference Documentation
This document provides a comprehensive reference for the MCP (Model Control Protocol) client call functions, including detailed parameter descriptions, return types, and usage examples.
## Table of Contents
- [aget_mcp_tools](#aget_mcp_tools)
- [get_mcp_tools_sync](#get_mcp_tools_sync)
- [get_tools_for_multiple_mcp_servers](#get_tools_for_multiple_mcp_servers)
- [execute_tool_call_simple](#execute_tool_call_simple)
## Function Reference
### aget_mcp_tools
Asynchronously fetches available MCP tools from the server with retry logic.
#### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| server_path | Optional[str] | No | Path to the MCP server script |
| format | str | No | Format of the returned tools (default: "openai") |
| connection | Optional[MCPConnection] | No | MCP connection object |
| *args | Any | No | Additional positional arguments |
| **kwargs | Any | No | Additional keyword arguments |
#### Returns
- `List[Dict[str, Any]]`: List of available MCP tools in OpenAI format
#### Raises
- `MCPValidationError`: If server_path is invalid
- `MCPConnectionError`: If connection to server fails
#### Example
```python
import asyncio
from swarms.tools.mcp_client_call import aget_mcp_tools
from swarms.tools.mcp_connection import MCPConnection
async def main():
# Using server path
tools = await aget_mcp_tools(server_path="http://localhost:8000")
# Using connection object
connection = MCPConnection(
host="localhost",
port=8000,
headers={"Authorization": "Bearer token"}
)
tools = await aget_mcp_tools(connection=connection)
print(f"Found {len(tools)} tools")
if __name__ == "__main__":
asyncio.run(main())
```
### get_mcp_tools_sync
Synchronous version of get_mcp_tools that handles event loop management.
#### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| server_path | Optional[str] | No | Path to the MCP server script |
| format | str | No | Format of the returned tools (default: "openai") |
| connection | Optional[MCPConnection] | No | MCP connection object |
| *args | Any | No | Additional positional arguments |
| **kwargs | Any | No | Additional keyword arguments |
#### Returns
- `List[Dict[str, Any]]`: List of available MCP tools in OpenAI format
#### Raises
- `MCPValidationError`: If server_path is invalid
- `MCPConnectionError`: If connection to server fails
- `MCPExecutionError`: If event loop management fails
#### Example
```python
from swarms.tools.mcp_client_call import get_mcp_tools_sync
from swarms.tools.mcp_connection import MCPConnection
# Using server path
tools = get_mcp_tools_sync(server_path="http://localhost:8000")
# Using connection object
connection = MCPConnection(
host="localhost",
port=8000,
headers={"Authorization": "Bearer token"}
)
tools = get_mcp_tools_sync(connection=connection)
print(f"Found {len(tools)} tools")
```
### get_tools_for_multiple_mcp_servers
Get tools for multiple MCP servers concurrently using ThreadPoolExecutor.
#### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| urls | List[str] | Yes | List of server URLs to fetch tools from |
| connections | List[MCPConnection] | No | Optional list of MCPConnection objects |
| format | str | No | Format to return tools in (default: "openai") |
| output_type | Literal["json", "dict", "str"] | No | Type of output format (default: "str") |
| max_workers | Optional[int] | No | Maximum number of worker threads |
#### Returns
- `List[Dict[str, Any]]`: Combined list of tools from all servers
#### Raises
- `MCPExecutionError`: If fetching tools from any server fails
#### Example
```python
from swarms.tools.mcp_client_call import get_tools_for_multiple_mcp_servers
from swarms.tools.mcp_connection import MCPConnection
# Define server URLs
urls = [
"http://server1:8000",
"http://server2:8000"
]
# Optional: Define connections
connections = [
MCPConnection(host="server1", port=8000),
MCPConnection(host="server2", port=8000)
]
# Get tools from all servers
tools = get_tools_for_multiple_mcp_servers(
urls=urls,
connections=connections,
format="openai",
output_type="dict",
max_workers=4
)
print(f"Found {len(tools)} tools across all servers")
```
### execute_tool_call_simple
Execute a tool call using the MCP client.
#### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| response | Any | No | Tool call response object |
| server_path | str | No | Path to the MCP server |
| connection | Optional[MCPConnection] | No | MCP connection object |
| output_type | Literal["json", "dict", "str", "formatted"] | No | Type of output format (default: "str") |
| *args | Any | No | Additional positional arguments |
| **kwargs | Any | No | Additional keyword arguments |
#### Returns
- `List[Dict[str, Any]]`: Result of the tool execution
#### Raises
- `MCPConnectionError`: If connection to server fails
- `MCPExecutionError`: If tool execution fails
#### Example
```python
import asyncio
from swarms.tools.mcp_client_call import execute_tool_call_simple
from swarms.tools.mcp_connection import MCPConnection
async def main():
# Example tool call response
response = {
"name": "example_tool",
"parameters": {"param1": "value1"}
}
# Using server path
result = await execute_tool_call_simple(
response=response,
server_path="http://localhost:8000",
output_type="json"
)
# Using connection object
connection = MCPConnection(
host="localhost",
port=8000,
headers={"Authorization": "Bearer token"}
)
result = await execute_tool_call_simple(
response=response,
connection=connection,
output_type="dict"
)
print(f"Tool execution result: {result}")
if __name__ == "__main__":
asyncio.run(main())
```
## Error Handling
The MCP client functions use a retry mechanism with exponential backoff for failed requests. The following error types may be raised:
- `MCPValidationError`: Raised when input validation fails
- `MCPConnectionError`: Raised when connection to the MCP server fails
- `MCPExecutionError`: Raised when tool execution fails
## Best Practices
1. Always handle potential exceptions when using these functions
2. Use connection objects for authenticated requests
3. Consider using the async versions for better performance in async applications
4. Use appropriate output types based on your needs
5. When working with multiple servers, adjust max_workers based on your system's capabilities

@ -1,8 +1,9 @@
# Swarms API Documentation # Swarms API Documentation
*Enterprise-grade Agent Swarm Management API* *Enterprise-Grade Agent Swarm Management API*
**Base URL**: `https://api.swarms.world` or `https://swarms-api-285321057562.us-east1.run.app` **Base URL**: `https://api.swarms.world` or `https://swarms-api-285321057562.us-east1.run.app`
**API Key Management**: [https://swarms.world/platform/api-keys](https://swarms.world/platform/api-keys) **API Key Management**: [https://swarms.world/platform/api-keys](https://swarms.world/platform/api-keys)
## Overview ## Overview

@ -1,10 +0,0 @@
from swarms.tools.mcp_client import (
list_tools_for_multiple_urls,
)
print(
list_tools_for_multiple_urls(
["http://0.0.0.0:8000/sse"], output_type="json"
)
)

@ -1,8 +0,0 @@
from swarms.tools.mcp_client import execute_mcp_tool
print(
execute_mcp_tool(
"http://0.0.0.0:8000/sse",
parameters={"name": "multiply", "a": 1, "b": 2},
)
)

@ -46,6 +46,9 @@ agent = Agent(
tools_list_dictionary=tools, tools_list_dictionary=tools,
) )
agent.run( out = agent.run(
"What is the current stock price for Apple Inc. (AAPL)? Include historical price data.", "What is the current stock price for Apple Inc. (AAPL)? Include historical price data.",
) )
print(out)
print(type(out))

@ -1,53 +0,0 @@
import asyncio
from swarms.tools.mcp_client_call import (
aget_mcp_tools,
execute_tool_call,
)
import json
async def main():
tools = await aget_mcp_tools("http://0.0.0.0:8000/sse", "openai")
print(json.dumps(tools, indent=4))
# First create the markdown file
create_result = await execute_tool_call(
server_path="http://0.0.0.0:8000/sse",
messages=[
{
"role": "user",
"content": "Create a new markdown file called 'chicken_cat_story'",
}
],
)
print("File creation result:", create_result)
# Then write the story to the file
story_content = """Title: The Adventures of Clucky and Whiskers
Once upon a time in a quiet, sunlit farm, there lived a curious chicken named Clucky and a mischievous cat named Whiskers. Clucky was known for her vibrant spirit and insatiable curiosity, roaming the farmyard with her head held high. Whiskers, on the other hand, was a clever little feline who always found himself in amusing predicaments; he often ventured into adventures that few dared to imagine.
The unlikely duo first met one fine autumn morning when Whiskers was chasing a playful butterfly near the barn. Clucky, busy pecking at the ground, almost tripped over Whiskers. Apologizing in her gentle clucks, she noticed that Whiskers was not scared at allinstead, he greeted her with a friendly purr. From that day on, the two embarked on countless adventures, exploring every corner of the farm and beyond.
They would roam the rolling meadows, share stories under the starry night sky, and even work together to solve little mysteries that baffled the other animals. Whether it was searching for a hidden pile of treats or finding safe paths through the woods, Clucky and Whiskers proved that friendship can be found in the most unexpected places.
The other animals on the farm watched in amazement as the chicken and the cat not only complemented each other but also became the best of friends. Clucky's boldness and Whiskers' cunning were a perfect match, teaching everyone that differences can create the strongest bonds.
In the heartwarming adventures of Clucky and Whiskers, one could learn that true friendship breaks all barriers, be they of fur or feathers. The legend of the brave chicken and the clever cat lived on forever, reminding everyone on the farm that unity makes life more colorful and joyful.
The End."""
story_result = await execute_tool_call(
server_path="http://0.0.0.0:8000/sse",
messages=[
{
"role": "user",
"content": f"Write this story to the file 'chicken_cat_story.md': {story_content}",
}
],
)
print("Story writing result:", story_result)
if __name__ == "__main__":
asyncio.run(main())

@ -0,0 +1,62 @@
import os
import litellm
import json
from dotenv import load_dotenv
load_dotenv()
# Example dummy function hard coded to return the same weather
# In production, this could be your backend API or an external API
def get_current_weather(location, unit="fahrenheit"):
"""Get the current weather in a given location"""
if "tokyo" in location.lower():
return json.dumps({"location": "Tokyo", "temperature": "10", "unit": "celsius"})
elif "san francisco" in location.lower():
return json.dumps({"location": "San Francisco", "temperature": "72", "unit": "fahrenheit"})
elif "paris" in location.lower():
return json.dumps({"location": "Paris", "temperature": "22", "unit": "celsius"})
else:
return json.dumps({"location": location, "temperature": "unknown"})
def test_parallel_function_call():
try:
# Step 1: send the conversation and available functions to the model
messages = [{"role": "user", "content": "What's the weather like in San Francisco, Tokyo, and Paris?"}]
tools = [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
},
}
]
response = litellm.completion(
model="gpt-4o-mini",
messages=messages,
tools=tools,
tool_choice="auto", # auto is default, but we'll be explicit
api_key=os.getenv("OPENAI_API_KEY"),
)
# print("\nFirst LLM Response:\n", response)
# response_message = response.choices[0].message
# tool_calls = response_message.tool_calls
# print(tool_calls)
print(response.model_dump_json(indent=4))
except Exception as e:
print(f"Error occurred: {e}")
test_parallel_function_call()

@ -2,12 +2,7 @@ from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import ( from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT, FINANCIAL_AGENT_SYS_PROMPT,
) )
from swarms.tools.mcp_integration import MCPServerSseParams
server_one = MCPServerSseParams(
url="http://127.0.0.1:6274",
headers={"Content-Type": "application/json"},
)
# Initialize the agent # Initialize the agent
agent = Agent( agent = Agent(
@ -15,10 +10,13 @@ agent = Agent(
agent_description="Personal finance advisor agent", agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT, system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1, max_loops=1,
mcp_servers=[server_one], mcp_url="http://0.0.0.0:8000/sse",
output_type="final",
) )
out = agent.run("Use the add tool to add 2 and 2") # Create a markdown file with initial content
out = agent.run(
"Use any of the tools available to you",
)
print(out)
print(type(out)) print(type(out))

@ -0,0 +1,117 @@
from mcp.server.fastmcp import FastMCP
import requests
mcp = FastMCP("okx_crypto_server", str)
@mcp.tool(
name="get_okx_crypto_price",
description="Get the current price and basic information for a given cryptocurrency from OKX exchange.",
)
def get_okx_crypto_price(symbol: str) -> str:
"""
Get the current price and basic information for a given cryptocurrency using OKX API.
Args:
symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT')
Returns:
str: A formatted string containing the cryptocurrency information
Example:
>>> get_okx_crypto_price('BTC-USDT')
'Current price of BTC/USDT: $45,000'
"""
try:
if not symbol:
return "Please provide a valid trading pair (e.g., 'BTC-USDT')"
# Convert to uppercase and ensure proper format
symbol = symbol.upper()
if not symbol.endswith("-USDT"):
symbol = f"{symbol}-USDT"
# OKX API endpoint for ticker information
url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}"
# Make the API request
response = requests.get(url)
response.raise_for_status()
data = response.json()
if data.get("code") != "0":
return f"Error: {data.get('msg', 'Unknown error')}"
ticker_data = data.get("data", [{}])[0]
if not ticker_data:
return f"Could not find data for {symbol}. Please check the trading pair."
price = float(ticker_data.get("last", 0))
change_24h = float(ticker_data.get("last24h", 0))
change_percent = float(ticker_data.get("change24h", 0))
base_currency = symbol.split("-")[0]
return f"Current price of {base_currency}/USDT: ${price:,.2f}\n24h Change: {change_percent:.2f}%"
except requests.exceptions.RequestException as e:
return f"Error fetching OKX data: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool(
name="get_okx_crypto_volume",
description="Get the 24-hour trading volume for a given cryptocurrency from OKX exchange.",
)
def get_okx_crypto_volume(symbol: str) -> str:
"""
Get the 24-hour trading volume for a given cryptocurrency using OKX API.
Args:
symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT')
Returns:
str: A formatted string containing the trading volume information
Example:
>>> get_okx_crypto_volume('BTC-USDT')
'24h Trading Volume for BTC/USDT: $1,234,567'
"""
try:
if not symbol:
return "Please provide a valid trading pair (e.g., 'BTC-USDT')"
# Convert to uppercase and ensure proper format
symbol = symbol.upper()
if not symbol.endswith("-USDT"):
symbol = f"{symbol}-USDT"
# OKX API endpoint for ticker information
url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}"
# Make the API request
response = requests.get(url)
response.raise_for_status()
data = response.json()
if data.get("code") != "0":
return f"Error: {data.get('msg', 'Unknown error')}"
ticker_data = data.get("data", [{}])[0]
if not ticker_data:
return f"Could not find data for {symbol}. Please check the trading pair."
volume_24h = float(ticker_data.get("vol24h", 0))
base_currency = symbol.split("-")[0]
return f"24h Trading Volume for {base_currency}/USDT: ${volume_24h:,.2f}"
except requests.exceptions.RequestException as e:
return f"Error fetching OKX data: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
if __name__ == "__main__":
mcp.run(transport="sse")

@ -0,0 +1,20 @@
from swarms.tools.mcp_client_call import (
get_mcp_tools_sync,
)
from swarms.schemas.mcp_schemas import MCPConnection
import json
if __name__ == "__main__":
tools = get_mcp_tools_sync(
server_path="http://0.0.0.0:8000/sse",
format="openai",
connection=MCPConnection(
url="http://0.0.0.0:8000/sse",
headers={"Authorization": "Bearer 1234567890"},
timeout=10,
),
)
print(json.dumps(tools, indent=4))
print(type(tools))

@ -0,0 +1,33 @@
from swarms.schemas.mcp_schemas import MCPConnection
from swarms.tools.mcp_client_call import (
execute_tool_call_simple,
)
import asyncio
# Example 1: Create a new markdown file
response = {
"function": {
"name": "get_crypto_price",
"arguments": {"coin_id": "bitcoin"},
}
}
connection = MCPConnection(
url="http://0.0.0.0:8000/sse",
headers={"Authorization": "Bearer 1234567890"},
timeout=10,
)
url = "http://0.0.0.0:8000/sse"
if __name__ == "__main__":
tools = asyncio.run(
execute_tool_call_simple(
response=response,
connection=connection,
output_type="json",
# server_path=url,
)
)
print(tools)

@ -0,0 +1,18 @@
import json
from swarms.schemas.mcp_schemas import MCPConnection
from swarms.tools.mcp_client_call import (
get_mcp_tools_sync,
)
if __name__ == "__main__":
tools = get_mcp_tools_sync(
server_path="http://0.0.0.0:8000/sse",
format="openai",
connection=MCPConnection(
url="http://0.0.0.0:8000/sse",
headers={"Authorization": "Bearer 1234567890"},
timeout=10,
),
)
print(json.dumps(tools, indent=4))

@ -0,0 +1,20 @@
from swarms.tools.mcp_client_call import (
get_tools_for_multiple_mcp_servers,
)
from swarms.schemas.mcp_schemas import MCPConnection
mcp_config = MCPConnection(
url="http://0.0.0.0:8000/sse",
# headers={"Authorization": "Bearer 1234567890"},
timeout=5,
)
urls = ["http://0.0.0.0:8000/sse", "http://0.0.0.0:8001/sse"]
out = get_tools_for_multiple_mcp_servers(
urls=urls,
# connections=[mcp_config],
)
print(out)

@ -1,90 +1,115 @@
# stock_price_server.py # crypto_price_server.py
from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import FastMCP
import os import requests
from datetime import datetime
mcp = FastMCP("StockPrice") mcp = FastMCP("CryptoPrice")
@mcp.tool() @mcp.tool(
def create_markdown_file(filename: str) -> str: name="get_crypto_price",
description="Get the current price and basic information for a given cryptocurrency.",
)
def get_crypto_price(coin_id: str) -> str:
""" """
Create a new markdown file with a basic structure. Get the current price and basic information for a given cryptocurrency using CoinGecko API.
Args: Args:
filename (str): The name of the markdown file to create (without .md extension) coin_id (str): The cryptocurrency ID (e.g., 'bitcoin', 'ethereum')
Returns: Returns:
str: A message indicating success or failure str: A formatted string containing the cryptocurrency information
Example: Example:
>>> create_markdown_file('my_notes') >>> get_crypto_price('bitcoin')
'Created markdown file: my_notes.md' 'Current price of Bitcoin: $45,000'
""" """
try: try:
if not filename: if not coin_id:
return "Please provide a valid filename" return "Please provide a valid cryptocurrency ID"
# Ensure filename ends with .md # CoinGecko API endpoint
if not filename.endswith(".md"): url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin_id}&vs_currencies=usd&include_24hr_change=true"
filename = f"{filename}.md"
# Create basic markdown structure # Make the API request
content = f"""# {filename.replace('.md', '')} response = requests.get(url)
Created on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} response.raise_for_status() # Raise an exception for bad status codes
## Content data = response.json()
""" if coin_id not in data:
return f"Could not find data for {coin_id}. Please check the cryptocurrency ID."
with open(filename, "w") as f: price = data[coin_id]["usd"]
f.write(content) change_24h = data[coin_id].get("usd_24h_change", "N/A")
return f"Created markdown file: {filename}" return f"Current price of {coin_id.capitalize()}: ${price:,.2f}\n24h Change: {change_24h:.2f}%"
except requests.exceptions.RequestException as e:
return f"Error fetching crypto data: {str(e)}"
except Exception as e: except Exception as e:
return f"Error creating markdown file: {str(e)}" return f"Error: {str(e)}"
@mcp.tool() @mcp.tool(
def write_to_markdown(filename: str, content: str) -> str: name="get_htx_crypto_price",
description="Get the current price and basic information for a given cryptocurrency from HTX exchange.",
)
def get_htx_crypto_price(symbol: str) -> str:
""" """
Append content to an existing markdown file. Get the current price and basic information for a given cryptocurrency using HTX API.
Args: Args:
filename (str): The name of the markdown file (without .md extension) symbol (str): The cryptocurrency trading pair (e.g., 'btcusdt', 'ethusdt')
content (str): The content to append to the file
Returns: Returns:
str: A message indicating success or failure str: A formatted string containing the cryptocurrency information
Example: Example:
>>> write_to_markdown('my_notes', 'This is a new note') >>> get_htx_crypto_price('btcusdt')
'Content added to my_notes.md' 'Current price of BTC/USDT: $45,000'
""" """
try: try:
if not filename or not content: if not symbol:
return "Please provide both filename and content" return "Please provide a valid trading pair (e.g., 'btcusdt')"
# Ensure filename ends with .md # Convert to lowercase and ensure proper format
if not filename.endswith(".md"): symbol = symbol.lower()
filename = f"{filename}.md" if not symbol.endswith("usdt"):
symbol = f"{symbol}usdt"
# Check if file exists # HTX API endpoint
if not os.path.exists(filename): url = f"https://api.htx.com/market/detail/merged?symbol={symbol}"
return f"File {filename} does not exist. Please create it first using create_markdown_file"
# Append content with timestamp # Make the API request
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") response = requests.get(url)
formatted_content = f"\n### Entry - {timestamp}\n{content}\n" response.raise_for_status()
with open(filename, "a") as f: data = response.json()
f.write(formatted_content)
return f"Content added to {filename}" if data.get("status") != "ok":
return f"Error: {data.get('err-msg', 'Unknown error')}"
tick = data.get("tick", {})
if not tick:
return f"Could not find data for {symbol}. Please check the trading pair."
price = tick.get("close", 0)
change_24h = tick.get("close", 0) - tick.get("open", 0)
change_percent = (
(change_24h / tick.get("open", 1)) * 100
if tick.get("open")
else 0
)
base_currency = symbol[
:-4
].upper() # Remove 'usdt' and convert to uppercase
return f"Current price of {base_currency}/USDT: ${price:,.2f}\n24h Change: {change_percent:.2f}%"
except requests.exceptions.RequestException as e:
return f"Error fetching HTX data: {str(e)}"
except Exception as e: except Exception as e:
return f"Error writing to markdown file: {str(e)}" return f"Error: {str(e)}"
if __name__ == "__main__": if __name__ == "__main__":

@ -0,0 +1,119 @@
from mcp.server.fastmcp import FastMCP
import requests
mcp = FastMCP("OKXCryptoPrice")
mcp.settings.port = 8001
@mcp.tool(
name="get_okx_crypto_price",
description="Get the current price and basic information for a given cryptocurrency from OKX exchange.",
)
def get_okx_crypto_price(symbol: str) -> str:
"""
Get the current price and basic information for a given cryptocurrency using OKX API.
Args:
symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT')
Returns:
str: A formatted string containing the cryptocurrency information
Example:
>>> get_okx_crypto_price('BTC-USDT')
'Current price of BTC/USDT: $45,000'
"""
try:
if not symbol:
return "Please provide a valid trading pair (e.g., 'BTC-USDT')"
# Convert to uppercase and ensure proper format
symbol = symbol.upper()
if not symbol.endswith("-USDT"):
symbol = f"{symbol}-USDT"
# OKX API endpoint for ticker information
url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}"
# Make the API request
response = requests.get(url)
response.raise_for_status()
data = response.json()
if data.get("code") != "0":
return f"Error: {data.get('msg', 'Unknown error')}"
ticker_data = data.get("data", [{}])[0]
if not ticker_data:
return f"Could not find data for {symbol}. Please check the trading pair."
price = float(ticker_data.get("last", 0))
change_24h = float(ticker_data.get("last24h", 0))
change_percent = float(ticker_data.get("change24h", 0))
base_currency = symbol.split("-")[0]
return f"Current price of {base_currency}/USDT: ${price:,.2f}\n24h Change: {change_percent:.2f}%"
except requests.exceptions.RequestException as e:
return f"Error fetching OKX data: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool(
name="get_okx_crypto_volume",
description="Get the 24-hour trading volume for a given cryptocurrency from OKX exchange.",
)
def get_okx_crypto_volume(symbol: str) -> str:
"""
Get the 24-hour trading volume for a given cryptocurrency using OKX API.
Args:
symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT')
Returns:
str: A formatted string containing the trading volume information
Example:
>>> get_okx_crypto_volume('BTC-USDT')
'24h Trading Volume for BTC/USDT: $1,234,567'
"""
try:
if not symbol:
return "Please provide a valid trading pair (e.g., 'BTC-USDT')"
# Convert to uppercase and ensure proper format
symbol = symbol.upper()
if not symbol.endswith("-USDT"):
symbol = f"{symbol}-USDT"
# OKX API endpoint for ticker information
url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}"
# Make the API request
response = requests.get(url)
response.raise_for_status()
data = response.json()
if data.get("code") != "0":
return f"Error: {data.get('msg', 'Unknown error')}"
ticker_data = data.get("data", [{}])[0]
if not ticker_data:
return f"Could not find data for {symbol}. Please check the trading pair."
volume_24h = float(ticker_data.get("vol24h", 0))
base_currency = symbol.split("-")[0]
return f"24h Trading Volume for {base_currency}/USDT: ${volume_24h:,.2f}"
except requests.exceptions.RequestException as e:
return f"Error fetching OKX data: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
if __name__ == "__main__":
# Run the server on port 8000 (you can change this to any available port)
mcp.run(transport="sse")

@ -78,7 +78,6 @@ litellm = "*"
torch = "*" torch = "*"
httpx = "*" httpx = "*"
mcp = "*" mcp = "*"
fastmcp = "*"
aiohttp = "*" aiohttp = "*"
[tool.poetry.scripts] [tool.poetry.scripts]

@ -25,4 +25,4 @@ httpx
# vllm>=0.2.0 # vllm>=0.2.0
aiohttp aiohttp
mcp mcp
fastmcp fastm

@ -1,7 +1,12 @@
from swarms.schemas.agent_step_schemas import Step, ManySteps from swarms.schemas.agent_step_schemas import Step, ManySteps
from swarms.schemas.mcp_schemas import (
MCPConnection,
MultipleMCPConnections,
)
__all__ = [ __all__ = [
"Step", "Step",
"ManySteps", "ManySteps",
"MCPConnection",
"MultipleMCPConnections",
] ]

@ -0,0 +1,18 @@
class AgentMCPError(Exception):
pass
class AgentMCPConnectionError(AgentMCPError):
pass
class AgentMCPToolError(AgentMCPError):
pass
class AgentMCPToolNotFoundError(AgentMCPError):
pass
class AgentMCPToolInvalidError(AgentMCPError):
pass

@ -0,0 +1,14 @@
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional, Callable
from swarms.schemas.mcp_schemas import MCPConnection
class AgentToolTypes(BaseModel):
tool_schema: List[Dict[str, Any]]
mcp_connection: MCPConnection
tool_model: Optional[BaseModel]
tool_functions: Optional[List[Callable]]
class Config:
arbitrary_types_allowed = True

@ -0,0 +1,94 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Union, Any, Literal, Type
from litellm.types import (
ChatCompletionModality,
ChatCompletionPredictionContentParam,
ChatCompletionAudioParam,
)
class LLMCompletionRequest(BaseModel):
"""Schema for LLM completion request parameters."""
model: Optional[str] = Field(
default=None,
description="The name of the language model to use for text completion"
)
temperature: Optional[float] = Field(
default=0.5,
description="Controls randomness of the output (0.0 to 1.0)"
)
top_p: Optional[float] = Field(
default=None,
description="Controls diversity via nucleus sampling"
)
n: Optional[int] = Field(
default=None,
description="Number of completions to generate"
)
stream: Optional[bool] = Field(
default=None,
description="Whether to stream the response"
)
stream_options: Optional[dict] = Field(
default=None,
description="Options for streaming response"
)
stop: Optional[Any] = Field(
default=None,
description="Up to 4 sequences where the API will stop generating"
)
max_completion_tokens: Optional[int] = Field(
default=None,
description="Maximum tokens for completion including reasoning"
)
max_tokens: Optional[int] = Field(
default=None,
description="Maximum tokens in generated completion"
)
prediction: Optional[ChatCompletionPredictionContentParam] = Field(
default=None,
description="Configuration for predicted output"
)
presence_penalty: Optional[float] = Field(
default=None,
description="Penalizes new tokens based on existence in text"
)
frequency_penalty: Optional[float] = Field(
default=None,
description="Penalizes new tokens based on frequency in text"
)
logit_bias: Optional[dict] = Field(
default=None,
description="Modifies probability of specific tokens"
)
reasoning_effort: Optional[Literal["low", "medium", "high"]] = Field(
default=None,
description="Level of reasoning effort for the model"
)
seed: Optional[int] = Field(
default=None,
description="Random seed for reproducibility"
)
tools: Optional[List] = Field(
default=None,
description="List of tools available to the model"
)
tool_choice: Optional[Union[str, dict]] = Field(
default=None,
description="Choice of tool to use"
)
logprobs: Optional[bool] = Field(
default=None,
description="Whether to return log probabilities"
)
top_logprobs: Optional[int] = Field(
default=None,
description="Number of most likely tokens to return"
)
parallel_tool_calls: Optional[bool] = Field(
default=None,
description="Whether to allow parallel tool calls"
)
class Config:
allow_arbitrary_types = True

@ -0,0 +1,43 @@
from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional
class MCPConnection(BaseModel):
type: Optional[str] = Field(
default="mcp",
description="The type of connection, defaults to 'mcp'",
)
url: Optional[str] = Field(
default="localhost:8000/sse",
description="The URL endpoint for the MCP server",
)
tool_configurations: Optional[Dict[Any, Any]] = Field(
default=None,
description="Dictionary containing configuration settings for MCP tools",
)
authorization_token: Optional[str] = Field(
default=None,
description="Authentication token for accessing the MCP server",
)
transport: Optional[str] = Field(
default="sse",
description="The transport protocol to use for the MCP server",
)
headers: Optional[Dict[str, str]] = Field(
default=None, description="Headers to send to the MCP server"
)
timeout: Optional[int] = Field(
default=5, description="Timeout for the MCP server"
)
class Config:
arbitrary_types_allowed = True
class MultipleMCPConnections(BaseModel):
connections: List[MCPConnection] = Field(
default=[], description="List of MCP connections"
)
class Config:
arbitrary_types_allowed = True

@ -31,6 +31,10 @@ from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
) )
from swarms.prompts.tools import tool_sop_prompt from swarms.prompts.tools import tool_sop_prompt
from swarms.schemas.agent_mcp_errors import (
AgentMCPConnectionError,
AgentMCPToolError,
)
from swarms.schemas.agent_step_schemas import ManySteps, Step from swarms.schemas.agent_step_schemas import ManySteps, Step
from swarms.schemas.base_schemas import ( from swarms.schemas.base_schemas import (
AgentChatCompletionResponse, AgentChatCompletionResponse,
@ -46,13 +50,6 @@ from swarms.structs.safe_loading import (
) )
from swarms.telemetry.main import log_agent_data from swarms.telemetry.main import log_agent_data
from swarms.tools.base_tool import BaseTool from swarms.tools.base_tool import BaseTool
from swarms.tools.mcp_client import (
execute_mcp_tool,
find_and_execute_tool,
list_all,
list_tools_for_multiple_urls,
)
from swarms.tools.mcp_integration import MCPServerSseParams
from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.tools.tool_parse_exec import parse_and_execute_json
from swarms.utils.any_to_str import any_to_str from swarms.utils.any_to_str import any_to_str
from swarms.utils.data_to_text import data_to_text from swarms.utils.data_to_text import data_to_text
@ -64,11 +61,18 @@ from swarms.utils.history_output_formatter import (
from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.litellm_wrapper import LiteLLM
from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.str_to_dict import str_to_dict
from swarms.prompts.react_base_prompt import REACT_SYS_PROMPT from swarms.prompts.react_base_prompt import REACT_SYS_PROMPT
from swarms.prompts.max_loop_prompt import generate_reasoning_prompt from swarms.prompts.max_loop_prompt import generate_reasoning_prompt
from swarms.prompts.safety_prompt import SAFETY_PROMPT from swarms.prompts.safety_prompt import SAFETY_PROMPT
from swarms.structs.ma_utils import set_random_models_for_agents from swarms.structs.ma_utils import set_random_models_for_agents
from swarms.tools.mcp_client_call import (
execute_tool_call_simple,
get_mcp_tools_sync,
)
from swarms.schemas.mcp_schemas import (
MCPConnection,
)
from swarms.utils.index import exists
# Utils # Utils
@ -90,10 +94,6 @@ def agent_id():
return uuid.uuid4().hex return uuid.uuid4().hex
def exists(val):
return val is not None
# Agent output types # Agent output types
ToolUsageType = Union[BaseModel, Dict[str, Any]] ToolUsageType = Union[BaseModel, Dict[str, Any]]
@ -396,12 +396,12 @@ class Agent:
role: agent_roles = "worker", role: agent_roles = "worker",
no_print: bool = False, no_print: bool = False,
tools_list_dictionary: Optional[List[Dict[str, Any]]] = None, tools_list_dictionary: Optional[List[Dict[str, Any]]] = None,
mcp_servers: MCPServerSseParams = None, mcp_url: Optional[Union[str, MCPConnection]] = None,
mcp_url: str = None,
mcp_urls: List[str] = None, mcp_urls: List[str] = None,
react_on: bool = False, react_on: bool = False,
safety_prompt_on: bool = False, safety_prompt_on: bool = False,
random_models_on: bool = False, random_models_on: bool = False,
mcp_config: Optional[MCPConnection] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -418,6 +418,7 @@ class Agent:
self.stopping_token = stopping_token self.stopping_token = stopping_token
self.interactive = interactive self.interactive = interactive
self.dashboard = dashboard self.dashboard = dashboard
self.saved_state_path = saved_state_path
self.return_history = return_history self.return_history = return_history
self.dynamic_temperature_enabled = dynamic_temperature_enabled self.dynamic_temperature_enabled = dynamic_temperature_enabled
self.dynamic_loops = dynamic_loops self.dynamic_loops = dynamic_loops
@ -520,12 +521,12 @@ class Agent:
self.role = role self.role = role
self.no_print = no_print self.no_print = no_print
self.tools_list_dictionary = tools_list_dictionary self.tools_list_dictionary = tools_list_dictionary
self.mcp_servers = mcp_servers
self.mcp_url = mcp_url self.mcp_url = mcp_url
self.mcp_urls = mcp_urls self.mcp_urls = mcp_urls
self.react_on = react_on self.react_on = react_on
self.safety_prompt_on = safety_prompt_on self.safety_prompt_on = safety_prompt_on
self.random_models_on = random_models_on self.random_models_on = random_models_on
self.mcp_config = mcp_config
self._cached_llm = ( self._cached_llm = (
None # Add this line to cache the LLM instance None # Add this line to cache the LLM instance
@ -560,9 +561,6 @@ class Agent:
if self.llm is None: if self.llm is None:
self.llm = self.llm_handling() self.llm = self.llm_handling()
if self.mcp_url or self.mcp_servers is not None:
self.add_mcp_tools_to_memory()
if self.react_on is True: if self.react_on is True:
self.system_prompt += REACT_SYS_PROMPT self.system_prompt += REACT_SYS_PROMPT
@ -589,7 +587,7 @@ class Agent:
prompt += SAFETY_PROMPT prompt += SAFETY_PROMPT
# Initialize the short term memory # Initialize the short term memory
self.short_memory = Conversation( memory = Conversation(
system_prompt=prompt, system_prompt=prompt,
time_enabled=False, time_enabled=False,
user=self.user_name, user=self.user_name,
@ -597,7 +595,7 @@ class Agent:
token_count=False, token_count=False,
) )
return self.short_memory return memory
def agent_output_model(self): def agent_output_model(self):
# Many steps # Many steps
@ -645,10 +643,16 @@ class Agent:
**common_args, **common_args,
tools_list_dictionary=self.tools_list_dictionary, tools_list_dictionary=self.tools_list_dictionary,
tool_choice="auto", tool_choice="auto",
parallel_tool_calls=len( parallel_tool_calls=True,
self.tools_list_dictionary )
)
> 1, elif self.mcp_url is not None:
self._cached_llm = LiteLLM(
**common_args,
tools_list_dictionary=self.add_mcp_tools_to_memory(),
tool_choice="auto",
parallel_tool_calls=True,
mcp_call=True,
) )
else: else:
self._cached_llm = LiteLLM( self._cached_llm = LiteLLM(
@ -715,110 +719,23 @@ class Agent:
Exception: If there's an error accessing the MCP tools Exception: If there's an error accessing the MCP tools
""" """
try: try:
if self.mcp_url is not None: if exists(self.mcp_url):
tools_available = list_all( tools = get_mcp_tools_sync(server_path=self.mcp_url)
self.mcp_url, output_type="json" elif exists(self.mcp_config):
) tools = get_mcp_tools_sync(connection=self.mcp_config)
self.short_memory.add( logger.info(f"Tools: {tools}")
role="Tools Available",
content=f"\n{tools_available}",
)
elif (
self.mcp_url is None
and self.mcp_urls is not None
and len(self.mcp_urls) > 1
):
tools_available = list_tools_for_multiple_urls(
urls=self.mcp_urls,
output_type="json",
)
self.short_memory.add(
role="Tools Available",
content=f"\n{tools_available}",
)
except Exception as e:
logger.error(f"Error adding MCP tools to memory: {e}")
raise e
def _single_mcp_tool_handling(self, response: any):
"""
Handles execution of a single MCP tool.
Args:
response (str): The tool response to process
Raises:
Exception: If there's an error executing the tool
"""
try:
if isinstance(response, dict):
result = response
print(type(result))
else: else:
result = str_to_dict(response) raise AgentMCPConnectionError(
print(type(result)) "mcp_url must be either a string URL or MCPConnection object"
)
output = execute_mcp_tool( self.pretty_print(
url=self.mcp_url, f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')}",
parameters=result, loop_count=0,
)
self.short_memory.add(
role="Tool Executor", content=str(output)
)
except Exception as e:
logger.error(f"Error in single MCP tool handling: {e}")
raise e
def _multiple_mcp_tool_handling(self, response: any):
"""
Handles execution of multiple MCP tools.
Args:
response (any): The tool response to process
Raises:
Exception: If there's an error executing the tools
"""
try:
if isinstance(response, str):
response = str_to_dict(response)
execution = find_and_execute_tool(
self.mcp_urls,
response["name"],
parameters=response,
) )
self.short_memory.add( return tools
role="Tool Executor", content=str(execution) except AgentMCPConnectionError as e:
) logger.error(f"Error in MCP connection: {e}")
except Exception as e:
logger.error(f"Error in multiple MCP tool handling: {e}")
raise e
def mcp_tool_handling(self, response: any):
"""
Main handler for MCP tool execution.
Args:
response (any): The tool response to process
Raises:
ValueError: If no MCP URL or MCP Servers are provided
Exception: If there's an error in tool handling
"""
try:
# if self.mcp_url is not None:
self._single_mcp_tool_handling(response)
# elif self.mcp_url is None and len(self.mcp_servers) > 1:
# self._multiple_mcp_tool_handling(response)
# else:
# raise ValueError("No MCP URL or MCP Servers provided")
except Exception as e:
logger.error(f"Error in mcp_tool_handling: {e}")
raise e raise e
def setup_config(self): def setup_config(self):
@ -1102,8 +1019,9 @@ class Agent:
*response_args, **kwargs *response_args, **kwargs
) )
# Convert to a str if the response is not a str # # Convert to a str if the response is not a str
response = self.parse_llm_output(response) if self.mcp_url is None:
response = self.parse_llm_output(response)
self.short_memory.add( self.short_memory.add(
role=self.agent_name, content=response role=self.agent_name, content=response
@ -1112,17 +1030,8 @@ class Agent:
# Print # Print
self.pretty_print(response, loop_count) self.pretty_print(response, loop_count)
# Output Cleaner # # Output Cleaner
self.output_cleaner_op(response) # self.output_cleaner_op(response)
####### MCP TOOL HANDLING #######
if (
self.mcp_servers
and self.tools_list_dictionary is not None
):
self.mcp_tool_handling(response)
####### MCP TOOL HANDLING #######
# Check and execute tools # Check and execute tools
if self.tools is not None: if self.tools is not None:
@ -1156,6 +1065,11 @@ class Agent:
self.streaming_on, self.streaming_on,
) )
if self.mcp_url is not None:
self.mcp_tool_handling(
response, loop_count
)
self.sentiment_and_evaluator(response) self.sentiment_and_evaluator(response)
success = True # Mark as successful to exit the retry loop success = True # Mark as successful to exit the retry loop
@ -2787,3 +2701,57 @@ class Agent:
role="Output Cleaner", role="Output Cleaner",
content=response, content=response,
) )
def mcp_tool_handling(
self, response: any, current_loop: Optional[int] = 0
):
try:
if exists(self.mcp_url):
# Execute the tool call
tool_response = asyncio.run(
execute_tool_call_simple(
response=response,
server_path=self.mcp_url,
)
)
elif exists(self.mcp_config):
# Execute the tool call
tool_response = asyncio.run(
execute_tool_call_simple(
response=response,
connection=self.mcp_config,
)
)
else:
raise AgentMCPConnectionError(
"mcp_url must be either a string URL or MCPConnection object"
)
# Get the text content from the tool response
text_content = (
tool_response.content[0].text
if tool_response.content
else str(tool_response)
)
# Add to the memory
self.short_memory.add(
role="Tool Executor",
content=text_content,
)
# Clear the tools list dictionary
self._cached_llm.tools_list_dictionary = None
# Now Call the LLM again with the tool response
summary = self.call_llm(task=self.short_memory.get_str())
self.pretty_print(summary, loop_count=current_loop)
# Add to the memory
self.short_memory.add(
role=self.agent_name, content=summary
)
except AgentMCPToolError as e:
logger.error(f"Error in MCP tool: {e}")
raise e

@ -4,7 +4,9 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps from functools import wraps
from typing import Any, Callable, Literal, Optional from typing import Any, Callable, Literal, Optional
from fastmcp import FastMCP, Client from mcp.server.fastmcp import FastMCP
from mcp.client import Client
from loguru import logger from loguru import logger
from swarms.utils.any_to_str import any_to_str from swarms.utils.any_to_str import any_to_str

@ -27,6 +27,13 @@ from swarms.tools.cohere_func_call_schema import (
) )
from swarms.tools.tool_registry import ToolStorage, tool_registry from swarms.tools.tool_registry import ToolStorage, tool_registry
from swarms.tools.json_utils import base_model_to_json from swarms.tools.json_utils import base_model_to_json
from swarms.tools.mcp_client_call import (
execute_tool_call_simple,
_execute_tool_call_simple,
get_tools_for_multiple_mcp_servers,
get_mcp_tools_sync,
aget_mcp_tools,
)
__all__ = [ __all__ = [
@ -50,4 +57,9 @@ __all__ = [
"ToolStorage", "ToolStorage",
"tool_registry", "tool_registry",
"base_model_to_json", "base_model_to_json",
"execute_tool_call_simple",
"_execute_tool_call_simple",
"get_tools_for_multiple_mcp_servers",
"get_mcp_tools_sync",
"aget_mcp_tools",
] ]

@ -1,246 +0,0 @@
import asyncio
import json
from typing import List, Literal, Dict, Any, Union
from fastmcp import Client
from swarms.utils.str_to_dict import str_to_dict
from loguru import logger
def parse_agent_output(
dictionary: Union[str, Dict[Any, Any]],
) -> tuple[str, Dict[Any, Any]]:
"""
Parse agent output into tool name and parameters.
Args:
dictionary: Either a string or dictionary containing tool information.
If string, it will be converted to a dictionary.
Must contain a 'name' key for the tool name.
Returns:
tuple[str, Dict[Any, Any]]: A tuple containing the tool name and its parameters.
Raises:
ValueError: If the input is invalid or missing required 'name' key.
"""
try:
if isinstance(dictionary, str):
dictionary = str_to_dict(dictionary)
elif not isinstance(dictionary, dict):
raise ValueError("Invalid dictionary")
# Handle regular dictionary format
if "name" in dictionary:
name = dictionary["name"]
# Remove the name key and use remaining key-value pairs as parameters
params = dict(dictionary)
params.pop("name")
return name, params
raise ValueError("Invalid function call format")
except Exception as e:
raise ValueError(f"Error parsing agent output: {str(e)}")
async def _list_all(url: str):
"""
Asynchronously list all tools available on a given MCP server.
Args:
url: The URL of the MCP server to query.
Returns:
List of available tools.
Raises:
ValueError: If there's an error connecting to or querying the server.
"""
try:
async with Client(url) as client:
return await client.list_tools()
except Exception as e:
raise ValueError(f"Error listing tools: {str(e)}")
def list_all(url: str, output_type: Literal["str", "json"] = "json"):
"""
Synchronously list all tools available on a given MCP server.
Args:
url: The URL of the MCP server to query.
Returns:
List of dictionaries containing tool information.
Raises:
ValueError: If there's an error connecting to or querying the server.
"""
try:
out = asyncio.run(_list_all(url))
outputs = []
for tool in out:
outputs.append(tool.model_dump())
if output_type == "json":
return json.dumps(outputs, indent=4)
else:
return outputs
except Exception as e:
raise ValueError(f"Error in list_all: {str(e)}")
def list_tools_for_multiple_urls(
urls: List[str], output_type: Literal["str", "json"] = "json"
):
"""
List tools available across multiple MCP servers.
Args:
urls: List of MCP server URLs to query.
output_type: Format of the output, either "json" (string) or "str" (list).
Returns:
If output_type is "json": JSON string containing all tools with server URLs.
If output_type is "str": List of tools with server URLs.
Raises:
ValueError: If there's an error querying any of the servers.
"""
try:
out = []
for url in urls:
tools = list_all(url)
# Add server URL to each tool's data
for tool in tools:
tool["server_url"] = url
out.append(tools)
if output_type == "json":
return json.dumps(out, indent=4)
else:
return out
except Exception as e:
raise ValueError(
f"Error listing tools for multiple URLs: {str(e)}"
)
async def _execute_mcp_tool(
url: str,
parameters: Dict[Any, Any] = None,
*args,
**kwargs,
) -> Dict[Any, Any]:
"""
Asynchronously execute a tool on an MCP server.
Args:
url: The URL of the MCP server.
parameters: Dictionary containing tool name and parameters.
*args: Additional positional arguments for the Client.
**kwargs: Additional keyword arguments for the Client.
Returns:
Dictionary containing the tool execution results.
Raises:
ValueError: If the URL is invalid or tool execution fails.
"""
try:
name, params = parse_agent_output(parameters)
outputs = []
async with Client(url, *args, **kwargs) as client:
out = await client.call_tool(
name=name,
arguments=params,
)
for output in out:
outputs.append(output.model_dump())
# convert outputs to string
return json.dumps(outputs, indent=4)
except Exception as e:
raise ValueError(f"Error executing MCP tool: {str(e)}")
def execute_mcp_tool(
url: str,
parameters: Dict[Any, Any] = None,
) -> Dict[Any, Any]:
"""
Synchronously execute a tool on an MCP server.
Args:
url: The URL of the MCP server.
parameters: Dictionary containing tool name and parameters.
Returns:
Dictionary containing the tool execution results.
Raises:
ValueError: If tool execution fails.
"""
try:
logger.info(f"Executing MCP tool with URL: {url}")
logger.debug(f"Tool parameters: {parameters}")
result = asyncio.run(
_execute_mcp_tool(
url=url,
parameters=parameters,
)
)
logger.info("MCP tool execution completed successfully")
logger.debug(f"Tool execution result: {result}")
return result
except Exception as e:
logger.error(f"Error in execute_mcp_tool: {str(e)}")
raise ValueError(f"Error in execute_mcp_tool: {str(e)}")
def find_and_execute_tool(
urls: List[str], tool_name: str, parameters: Dict[Any, Any]
) -> Dict[Any, Any]:
"""
Find a tool across multiple servers and execute it with the given parameters.
Args:
urls: List of server URLs to search through.
tool_name: Name of the tool to find and execute.
parameters: Parameters to pass to the tool.
Returns:
Dict containing the tool execution results.
Raises:
ValueError: If tool is not found on any server or execution fails.
"""
try:
# Search for tool across all servers
for url in urls:
try:
tools = list_all(url)
# Check if tool exists on this server
if any(tool["name"] == tool_name for tool in tools):
# Prepare parameters in correct format
tool_params = {"name": tool_name, **parameters}
# Execute tool on this server
return execute_mcp_tool(
url=url, parameters=tool_params
)
except Exception:
# Skip servers that fail and continue searching
continue
raise ValueError(
f"Tool '{tool_name}' not found on any provided servers"
)
except Exception as e:
raise ValueError(f"Error in find_and_execute_tool: {str(e)}")

@ -1,19 +1,164 @@
import litellm import os
import concurrent.futures
import asyncio import asyncio
import contextlib import contextlib
import json
import random import random
from functools import wraps from functools import wraps
from typing import Any, Dict, List from typing import Any, Dict, List, Literal, Optional, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
from litellm.experimental_mcp_client import ( from litellm.types.utils import ChatCompletionMessageToolCall
call_openai_tool,
load_mcp_tools,
)
from loguru import logger from loguru import logger
from mcp import ClientSession from mcp import ClientSession
from mcp.client.sse import sse_client from mcp.client.sse import sse_client
from mcp.types import (
CallToolRequestParams as MCPCallToolRequestParams,
)
from mcp.types import CallToolResult as MCPCallToolResult
from mcp.types import Tool as MCPTool
from openai.types.chat import ChatCompletionToolParam
from openai.types.shared_params.function_definition import (
FunctionDefinition,
)
from swarms.schemas.mcp_schemas import (
MCPConnection,
)
from swarms.utils.index import exists
class MCPError(Exception):
"""Base exception for MCP related errors."""
pass
class MCPConnectionError(MCPError):
"""Raised when there are issues connecting to the MCP server."""
pass
import os
class MCPToolError(MCPError):
"""Raised when there are issues with MCP tool operations."""
pass
class MCPValidationError(MCPError):
"""Raised when there are validation issues with MCP operations."""
pass
class MCPExecutionError(MCPError):
"""Raised when there are issues executing MCP operations."""
pass
########################################################
# List MCP Tool functions
########################################################
def transform_mcp_tool_to_openai_tool(
mcp_tool: MCPTool,
) -> ChatCompletionToolParam:
"""Convert an MCP tool to an OpenAI tool."""
return ChatCompletionToolParam(
type="function",
function=FunctionDefinition(
name=mcp_tool.name,
description=mcp_tool.description or "",
parameters=mcp_tool.inputSchema,
strict=False,
),
)
async def load_mcp_tools(
session: ClientSession, format: Literal["mcp", "openai"] = "mcp"
) -> Union[List[MCPTool], List[ChatCompletionToolParam]]:
"""
Load all available MCP tools
Args:
session: The MCP session to use
format: The format to convert the tools to
By default, the tools are returned in MCP format.
If format is set to "openai", the tools are converted to OpenAI API compatible tools.
"""
tools = await session.list_tools()
if format == "openai":
return [
transform_mcp_tool_to_openai_tool(mcp_tool=tool)
for tool in tools.tools
]
return tools.tools
########################################################
# Call MCP Tool functions
########################################################
async def call_mcp_tool(
session: ClientSession,
call_tool_request_params: MCPCallToolRequestParams,
) -> MCPCallToolResult:
"""Call an MCP tool."""
tool_result = await session.call_tool(
name=call_tool_request_params.name,
arguments=call_tool_request_params.arguments,
)
return tool_result
def _get_function_arguments(function: FunctionDefinition) -> dict:
"""Helper to safely get and parse function arguments."""
arguments = function.get("arguments", {})
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
arguments = {}
return arguments if isinstance(arguments, dict) else {}
def transform_openai_tool_call_request_to_mcp_tool_call_request(
openai_tool: Union[ChatCompletionMessageToolCall, Dict],
) -> MCPCallToolRequestParams:
"""Convert an OpenAI ChatCompletionMessageToolCall to an MCP CallToolRequestParams."""
function = openai_tool["function"]
return MCPCallToolRequestParams(
name=function["name"],
arguments=_get_function_arguments(function),
)
async def call_openai_tool(
session: ClientSession,
openai_tool: dict,
) -> MCPCallToolResult:
"""
Call an OpenAI tool using MCP client.
Args:
session: The MCP session to use
openai_tool: The OpenAI tool to call. You can get this from the `choices[0].message.tool_calls[0]` of the response from the OpenAI API.
Returns:
The result of the MCP tool call.
"""
mcp_tool_call_request_params = (
transform_openai_tool_call_request_to_mcp_tool_call_request(
openai_tool=openai_tool,
)
)
return await call_mcp_tool(
session=session,
call_tool_request_params=mcp_tool_call_request_params,
)
def retry_with_backoff(retries=3, backoff_in_seconds=1): def retry_with_backoff(retries=3, backoff_in_seconds=1):
@ -59,15 +204,49 @@ def get_or_create_event_loop():
try: try:
yield loop yield loop
finally: finally:
if loop.is_running(): # Only close the loop if we created it and it's not the main event loop
loop.stop() if loop != asyncio.get_event_loop() and not loop.is_running():
if not loop.is_closed(): if not loop.is_closed():
loop.close() loop.close()
def connect_to_mcp_server(connection: MCPConnection = None):
"""Connect to an MCP server.
Args:
connection (MCPConnection): The connection configuration object
Returns:
tuple: A tuple containing (headers, timeout, transport, url)
Raises:
MCPValidationError: If the connection object is invalid
"""
if not isinstance(connection, MCPConnection):
raise MCPValidationError("Invalid connection type")
# Direct attribute access is faster than property access
headers = dict(connection.headers or {})
if connection.authorization_token:
headers["Authorization"] = (
f"Bearer {connection.authorization_token}"
)
return (
headers,
connection.timeout or 5,
connection.transport or "sse",
connection.url,
)
@retry_with_backoff(retries=3) @retry_with_backoff(retries=3)
async def aget_mcp_tools( async def aget_mcp_tools(
server_path: str, format: str = "openai", *args, **kwargs server_path: Optional[str] = None,
format: str = "openai",
connection: Optional[MCPConnection] = None,
*args,
**kwargs,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Fetch available MCP tools from the server with retry logic. Fetch available MCP tools from the server with retry logic.
@ -79,16 +258,26 @@ async def aget_mcp_tools(
List[Dict[str, Any]]: List of available MCP tools in OpenAI format List[Dict[str, Any]]: List of available MCP tools in OpenAI format
Raises: Raises:
ValueError: If server_path is invalid MCPValidationError: If server_path is invalid
ConnectionError: If connection to server fails MCPConnectionError: If connection to server fails
""" """
if not server_path or not isinstance(server_path, str): if exists(connection):
raise ValueError("Invalid server path provided") headers, timeout, transport, url = connect_to_mcp_server(
connection
)
else:
headers, timeout, transport, url = None, 5, None, server_path
logger.info(f"Fetching MCP tools from server: {server_path}") logger.info(f"Fetching MCP tools from server: {server_path}")
try: try:
async with sse_client(server_path, *args, **kwargs) as ( async with sse_client(
url=server_path,
headers=headers,
timeout=timeout,
*args,
**kwargs,
) as (
read, read,
write, write,
): ):
@ -103,17 +292,17 @@ async def aget_mcp_tools(
return tools return tools
except Exception as e: except Exception as e:
logger.error(f"Error fetching MCP tools: {str(e)}") logger.error(f"Error fetching MCP tools: {str(e)}")
raise raise MCPConnectionError(
f"Failed to connect to MCP server: {str(e)}"
)
async def get_mcp_tools(
server_path: str, *args, **kwargs
) -> List[Dict[str, Any]]:
return await aget_mcp_tools(server_path, *args, **kwargs)
def get_mcp_tools_sync( def get_mcp_tools_sync(
server_path: str, format: str = "openai", *args, **kwargs server_path: Optional[str] = None,
format: str = "openai",
connection: Optional[MCPConnection] = None,
*args,
**kwargs,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Synchronous version of get_mcp_tools that handles event loop management. Synchronous version of get_mcp_tools that handles event loop management.
@ -125,145 +314,169 @@ def get_mcp_tools_sync(
List[Dict[str, Any]]: List of available MCP tools in OpenAI format List[Dict[str, Any]]: List of available MCP tools in OpenAI format
Raises: Raises:
ValueError: If server_path is invalid MCPValidationError: If server_path is invalid
ConnectionError: If connection to server fails MCPConnectionError: If connection to server fails
RuntimeError: If event loop management fails MCPExecutionError: If event loop management fails
""" """
with get_or_create_event_loop() as loop: with get_or_create_event_loop() as loop:
try: try:
return loop.run_until_complete( return loop.run_until_complete(
aget_mcp_tools(server_path, format, *args, **kwargs) aget_mcp_tools(
server_path=server_path,
format=format,
connection=connection,
*args,
**kwargs,
)
) )
except Exception as e: except Exception as e:
logger.error(f"Error in get_mcp_tools_sync: {str(e)}") logger.error(f"Error in get_mcp_tools_sync: {str(e)}")
raise raise MCPExecutionError(
f"Failed to execute MCP tools sync: {str(e)}"
)
def _fetch_tools_for_server(url: str, connection: Optional[MCPConnection] = None, format: str = "openai") -> List[Dict[str, Any]]:
"""Helper function to fetch tools for a single server."""
return get_mcp_tools_sync(
server_path=url,
connection=connection,
format=format,
)
def get_tools_for_multiple_mcp_servers(
urls: List[str],
connections: List[MCPConnection] = None,
format: str = "openai",
output_type: Literal["json", "dict", "str"] = "str",
max_workers: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Get tools for multiple MCP servers concurrently using ThreadPoolExecutor.
Args:
urls: List of server URLs to fetch tools from
connections: Optional list of MCPConnection objects corresponding to each URL
format: Format to return tools in (default: "openai")
output_type: Type of output format (default: "str")
max_workers: Maximum number of worker threads (default: None, uses min(32, os.cpu_count() + 4))
Returns:
List[Dict[str, Any]]: Combined list of tools from all servers
"""
tools = []
threads = min(32, os.cpu_count() + 4) if max_workers is None else max_workers
with ThreadPoolExecutor(max_workers=max_workers) as executor:
if exists(connections):
# Create future tasks for each URL-connection pair
future_to_url = {
executor.submit(_fetch_tools_for_server, url, connection, format): url
for url, connection in zip(urls, connections)
}
else:
# Create future tasks for each URL without connections
future_to_url = {
executor.submit(_fetch_tools_for_server, url, None, format): url
for url in urls
}
# Process completed futures as they come in
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
server_tools = future.result()
tools.extend(server_tools)
except Exception as e:
logger.error(f"Error fetching tools from {url}: {str(e)}")
raise MCPExecutionError(f"Failed to fetch tools from {url}: {str(e)}")
return tools
async def execute_tool_call( async def _execute_tool_call_simple(
server_path: str, response: any = None,
messages: List[Dict[str, Any]], server_path: str = None,
model: str = "o3-mini", connection: Optional[MCPConnection] = None,
output_type: Literal["json", "dict", "str"] = "str",
*args, *args,
**kwargs, **kwargs,
) -> Dict[str, Any]: ):
""" """Execute a tool call using the MCP client."""
Execute a tool call using the MCP client with retry logic. if exists(connection):
headers, timeout, transport, url = connect_to_mcp_server(
connection
)
else:
headers, timeout, transport, url = None, 5, "sse", server_path
Args: try:
server_path (str): Path to the MCP server script async with sse_client(
messages (List[Dict[str, Any]]): Current conversation messages url=url, headers=headers, timeout=timeout, *args, **kwargs
model (str): The model to use for completion (default: "gpt-4") ) as (
read,
write,
):
async with ClientSession(read, write) as session:
try:
await session.initialize()
Returns: call_result = await call_openai_tool(
Dict[str, Any]: Final LLM response after tool execution session=session,
openai_tool=response,
)
Raises: if output_type == "json":
ValueError: If inputs are invalid out = call_result.model_dump_json(indent=4)
ConnectionError: If connection to server fails elif output_type == "dict":
RuntimeError: If tool execution fails out = call_result.model_dump()
""" elif output_type == "str":
data = call_result.model_dump()
formatted_lines = []
for key, value in data.items():
if isinstance(value, list):
for item in value:
if isinstance(item, dict):
for k, v in item.items():
formatted_lines.append(
f"{k}: {v}"
)
else:
formatted_lines.append(
f"{key}: {value}"
)
out = "\n".join(formatted_lines)
return out
async with sse_client(server_path, *args, **kwargs) as ( except Exception as e:
read, logger.error(f"Error in tool execution: {str(e)}")
write, raise MCPExecutionError(
): f"Tool execution failed: {str(e)}"
async with ClientSession(read, write) as session: )
try:
# Initialize the connection
await session.initialize()
# Get tools except Exception as e:
tools = await load_mcp_tools( logger.error(f"Error in SSE client connection: {str(e)}")
session=session, format="openai" raise MCPConnectionError(
) f"Failed to connect to MCP server: {str(e)}"
logger.info(f"Tools: {tools}") )
# First LLM call to get tool call
llm_response = await litellm.acompletion( async def execute_tool_call_simple(
model=model, response: any = None,
api_key=os.getenv("OPENAI_API_KEY"), server_path: str = None,
messages=messages, connection: Optional[MCPConnection] = None,
tools=tools, output_type: Literal["json", "dict", "str", "formatted"] = "str",
tool_choice="auto", *args,
# parallel_tool_calls=True, **kwargs,
) ) -> List[Dict[str, Any]]:
logger.info(f"Initial LLM Response: {llm_response}") return await _execute_tool_call_simple(
response=response,
message = llm_response["choices"][0]["message"] server_path=server_path,
if not message.get("tool_calls"): connection=connection,
logger.warning("No tool calls in LLM response") output_type=output_type,
return llm_response *args,
**kwargs,
# Call the tool using MCP client )
openai_tool = message["tool_calls"][0]
call_result = await call_openai_tool(
session=session,
openai_tool=openai_tool,
)
logger.info(f"Tool call completed: {call_result}")
# Update messages with tool result
messages.append(message)
messages.append(
{
"role": "tool",
"content": str(call_result.content[0].text),
"tool_call_id": openai_tool["id"],
}
)
logger.debug(
"Updated messages with tool result",
extra={"messages": messages},
)
# Second LLM call with tool result
final_response = await litellm.acompletion(
model=model,
api_key=os.getenv("OPENAI_API_KEY"),
messages=messages,
tools=tools,
tool_choice="auto",
# parallel_tool_calls=True,
)
logger.info(f"Final LLM Response: {final_response}")
return final_response
except Exception as e:
logger.error(f"Error in execute_tool_call: {str(e)}")
raise RuntimeError(f"Tool execution failed: {str(e)}")
# def execute_tool_call_sync(
# server_path: str,
# tool_call: Dict[str, Any],
# task: str,
# *args,
# **kwargs,
# ) -> Dict[str, Any]:
# """
# Synchronous version of execute_tool_call that handles event loop management.
# Args:
# server_path (str): Path to the MCP server script
# tool_call (Dict[str, Any]): The OpenAI tool call to execute
# messages (List[Dict[str, Any]]): Current conversation messages
# Returns:
# Dict[str, Any]: Final LLM response after tool execution
# Raises:
# ValueError: If inputs are invalid
# ConnectionError: If connection to server fails
# RuntimeError: If event loop management fails
# """
# with get_or_create_event_loop() as loop:
# try:
# return loop.run_until_complete(
# execute_tool_call(
# server_path, tool_call, task, *args, **kwargs
# )
# )
# except Exception as e:
# logger.error(f"Error in execute_tool_call_sync: {str(e)}")
# raise

@ -1,340 +0,0 @@
from __future__ import annotations
from typing import Any
from loguru import logger
import abc
import asyncio
from contextlib import AbstractAsyncContextManager, AsyncExitStack
from pathlib import Path
from typing import Literal
from anyio.streams.memory import (
MemoryObjectReceiveStream,
MemoryObjectSendStream,
)
from mcp import (
ClientSession,
StdioServerParameters,
Tool as MCPTool,
stdio_client,
)
from mcp.client.sse import sse_client
from mcp.types import CallToolResult, JSONRPCMessage
from typing_extensions import NotRequired, TypedDict
class MCPServer(abc.ABC):
"""Base class for Model Context Protocol servers."""
@abc.abstractmethod
async def connect(self):
"""Connect to the server. For example, this might mean spawning a subprocess or
opening a network connection. The server is expected to remain connected until
`cleanup()` is called.
"""
pass
@property
@abc.abstractmethod
def name(self) -> str:
"""A readable name for the server."""
pass
@abc.abstractmethod
async def cleanup(self):
"""Cleanup the server. For example, this might mean closing a subprocess or
closing a network connection.
"""
pass
@abc.abstractmethod
async def list_tools(self) -> list[MCPTool]:
"""List the tools available on the server."""
pass
@abc.abstractmethod
async def call_tool(
self, tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult:
"""Invoke a tool on the server."""
pass
class _MCPServerWithClientSession(MCPServer, abc.ABC):
"""Base class for MCP servers that use a `ClientSession` to communicate with the server."""
def __init__(self, cache_tools_list: bool):
"""
Args:
cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
cached and only fetched from the server once. If `False`, the tools list will be
fetched from the server on each call to `list_tools()`. The cache can be invalidated
by calling `invalidate_tools_cache()`. You should set this to `True` if you know the
server will not change its tools list, because it can drastically improve latency
(by avoiding a round-trip to the server every time).
"""
self.session: ClientSession | None = None
self.exit_stack: AsyncExitStack = AsyncExitStack()
self._cleanup_lock: asyncio.Lock = asyncio.Lock()
self.cache_tools_list = cache_tools_list
# The cache is always dirty at startup, so that we fetch tools at least once
self._cache_dirty = True
self._tools_list: list[MCPTool] | None = None
@abc.abstractmethod
def create_streams(
self,
) -> AbstractAsyncContextManager[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
]
]:
"""Create the streams for the server."""
pass
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.cleanup()
def invalidate_tools_cache(self):
"""Invalidate the tools cache."""
self._cache_dirty = True
async def connect(self):
"""Connect to the server."""
try:
transport = await self.exit_stack.enter_async_context(
self.create_streams()
)
read, write = transport
session = await self.exit_stack.enter_async_context(
ClientSession(read, write)
)
await session.initialize()
self.session = session
except Exception as e:
logger.error(f"Error initializing MCP server: {e}")
await self.cleanup()
raise
async def list_tools(self) -> list[MCPTool]:
"""List the tools available on the server."""
if not self.session:
raise Exception(
"Server not initialized. Make sure you call `connect()` first."
)
# Return from cache if caching is enabled, we have tools, and the cache is not dirty
if (
self.cache_tools_list
and not self._cache_dirty
and self._tools_list
):
return self._tools_list
# Reset the cache dirty to False
self._cache_dirty = False
# Fetch the tools from the server
self._tools_list = (await self.session.list_tools()).tools
return self._tools_list
async def call_tool(
self, arguments: dict[str, Any] | None
) -> CallToolResult:
"""Invoke a tool on the server."""
tool_name = arguments.get("tool_name") or arguments.get(
"name"
)
if not tool_name:
raise Exception("No tool name found in arguments")
if not self.session:
raise Exception(
"Server not initialized. Make sure you call `connect()` first."
)
return await self.session.call_tool(tool_name, arguments)
async def cleanup(self):
"""Cleanup the server."""
async with self._cleanup_lock:
try:
await self.exit_stack.aclose()
self.session = None
except Exception as e:
logger.error(f"Error cleaning up server: {e}")
class MCPServerStdioParams(TypedDict):
"""Mirrors `mcp.client.stdio.StdioServerParameters`, but lets you pass params without another
import.
"""
command: str
"""The executable to run to start the server. For example, `python` or `node`."""
args: NotRequired[list[str]]
"""Command line args to pass to the `command` executable. For example, `['foo.py']` or
`['server.js', '--port', '8080']`."""
env: NotRequired[dict[str, str]]
"""The environment variables to set for the server. ."""
cwd: NotRequired[str | Path]
"""The working directory to use when spawning the process."""
encoding: NotRequired[str]
"""The text encoding used when sending/receiving messages to the server. Defaults to `utf-8`."""
encoding_error_handler: NotRequired[
Literal["strict", "ignore", "replace"]
]
"""The text encoding error handler. Defaults to `strict`.
See https://docs.python.org/3/library/codecs.html#codec-base-classes for
explanations of possible values.
"""
class MCPServerStdio(_MCPServerWithClientSession):
"""MCP server implementation that uses the stdio transport. See the [spec]
(https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) for
details.
"""
def __init__(
self,
params: MCPServerStdioParams,
cache_tools_list: bool = False,
name: str | None = None,
):
"""Create a new MCP server based on the stdio transport.
Args:
params: The params that configure the server. This includes the command to run to
start the server, the args to pass to the command, the environment variables to
set for the server, the working directory to use when spawning the process, and
the text encoding used when sending/receiving messages to the server.
cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
cached and only fetched from the server once. If `False`, the tools list will be
fetched from the server on each call to `list_tools()`. The cache can be
invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
if you know the server will not change its tools list, because it can drastically
improve latency (by avoiding a round-trip to the server every time).
name: A readable name for the server. If not provided, we'll create one from the
command.
"""
super().__init__(cache_tools_list)
self.params = StdioServerParameters(
command=params["command"],
args=params.get("args", []),
env=params.get("env"),
cwd=params.get("cwd"),
encoding=params.get("encoding", "utf-8"),
encoding_error_handler=params.get(
"encoding_error_handler", "strict"
),
)
self._name = name or f"stdio: {self.params.command}"
def create_streams(
self,
) -> AbstractAsyncContextManager[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
]
]:
"""Create the streams for the server."""
return stdio_client(self.params)
@property
def name(self) -> str:
"""A readable name for the server."""
return self._name
class MCPServerSseParams(TypedDict):
"""Mirrors the params in`mcp.client.sse.sse_client`."""
url: str
"""The URL of the server."""
headers: NotRequired[dict[str, str]]
"""The headers to send to the server."""
timeout: NotRequired[float]
"""The timeout for the HTTP request. Defaults to 5 seconds."""
sse_read_timeout: NotRequired[float]
"""The timeout for the SSE connection, in seconds. Defaults to 5 minutes."""
class MCPServerSse(_MCPServerWithClientSession):
"""MCP server implementation that uses the HTTP with SSE transport. See the [spec]
(https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse)
for details.
"""
def __init__(
self,
params: MCPServerSseParams,
cache_tools_list: bool = False,
name: str | None = None,
):
"""Create a new MCP server based on the HTTP with SSE transport.
Args:
params: The params that configure the server. This includes the URL of the server,
the headers to send to the server, the timeout for the HTTP request, and the
timeout for the SSE connection.
cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
cached and only fetched from the server once. If `False`, the tools list will be
fetched from the server on each call to `list_tools()`. The cache can be
invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
if you know the server will not change its tools list, because it can drastically
improve latency (by avoiding a round-trip to the server every time).
name: A readable name for the server. If not provided, we'll create one from the
URL.
"""
super().__init__(cache_tools_list)
self.params = params
self._name = name or f"sse: {self.params['url']}"
def create_streams(
self,
) -> AbstractAsyncContextManager[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
]
]:
"""Create the streams for the server."""
return sse_client(
url=self.params["url"],
headers=self.params.get("headers", None),
timeout=self.params.get("timeout", 5),
sse_read_timeout=self.params.get(
"sse_read_timeout", 60 * 5
),
)
@property
def name(self) -> str:
"""A readable name for the server."""
return self._name

@ -0,0 +1,2 @@
def exists(val):
return val is not None

@ -1,3 +1,4 @@
from typing import Optional
import base64 import base64
import requests import requests
@ -7,24 +8,10 @@ from typing import List
from loguru import logger from loguru import logger
import litellm import litellm
try: from litellm import completion, acompletion
from litellm import completion, acompletion
except ImportError:
import subprocess
import sys
import litellm
print("Installing litellm") litellm.set_verbose = True
litellm.ssl_verify = False
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "-U", "litellm"]
)
print("litellm installed")
from litellm import completion
litellm.set_verbose = True
litellm.ssl_verify = False
class LiteLLMException(Exception): class LiteLLMException(Exception):
@ -81,11 +68,13 @@ class LiteLLM:
max_completion_tokens: int = 4000, max_completion_tokens: int = 4000,
tools_list_dictionary: List[dict] = None, tools_list_dictionary: List[dict] = None,
tool_choice: str = "auto", tool_choice: str = "auto",
parallel_tool_calls: bool = False, parallel_tool_calls: bool = True,
audio: str = None, audio: str = None,
retries: int = 3, retries: int = 3,
verbose: bool = False, verbose: bool = False,
caching: bool = False, caching: bool = False,
mcp_call: bool = False,
top_p: float = 1.0,
*args, *args,
**kwargs, **kwargs,
): ):
@ -110,6 +99,8 @@ class LiteLLM:
self.tool_choice = tool_choice self.tool_choice = tool_choice
self.parallel_tool_calls = parallel_tool_calls self.parallel_tool_calls = parallel_tool_calls
self.caching = caching self.caching = caching
self.mcp_call = mcp_call
self.top_p = top_p
self.modalities = [] self.modalities = []
self._cached_messages = {} # Cache for prepared messages self._cached_messages = {} # Cache for prepared messages
self.messages = [] # Initialize messages list self.messages = [] # Initialize messages list
@ -124,7 +115,29 @@ class LiteLLM:
) )
def output_for_tools(self, response: any): def output_for_tools(self, response: any):
return response["choices"][0]["message"]["tool_calls"][0] if self.mcp_call is True:
out = response.choices[0].message.tool_calls[0].function
output = {
"function": {
"name": out.name,
"arguments": out.arguments,
}
}
return output
elif self.parallel_tool_calls is True:
output = []
for tool_call in response.choices[0].message.tool_calls:
output.append(
{
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}
)
else:
out = response.choices[0].message.tool_calls[0]
return out
def _prepare_messages(self, task: str) -> list: def _prepare_messages(self, task: str) -> list:
""" """
@ -225,8 +238,8 @@ class LiteLLM:
def run( def run(
self, self,
task: str, task: str,
audio: str = None, audio: Optional[str] = None,
img: str = None, img: Optional[str] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -253,38 +266,28 @@ class LiteLLM:
self.handle_modalities( self.handle_modalities(
task=task, audio=audio, img=img task=task, audio=audio, img=img
) )
messages = ( messages = self.messages
self.messages
) # Use modality-processed messages
if (
self.model_name == "openai/o4-mini"
or self.model_name == "openai/o3-2025-04-16"
):
# Prepare common completion parameters
completion_params = {
"model": self.model_name,
"messages": messages,
"stream": self.stream,
# "temperature": self.temperature,
"max_completion_tokens": self.max_tokens,
"caching": self.caching,
**kwargs,
}
else: # Base completion parameters
# Prepare common completion parameters completion_params = {
completion_params = { "model": self.model_name,
"model": self.model_name, "messages": messages,
"messages": messages, "stream": self.stream,
"stream": self.stream, "max_tokens": self.max_tokens,
"temperature": self.temperature, "caching": self.caching,
"max_tokens": self.max_tokens, "temperature": self.temperature,
"caching": self.caching, "top_p": self.top_p,
**kwargs, **kwargs,
} }
# Handle tool-based completion # Add temperature for non-o4/o3 models
if self.model_name not in [
"openai/o4-mini",
"openai/o3-2025-04-16",
]:
completion_params["temperature"] = self.temperature
# Add tools if specified
if self.tools_list_dictionary is not None: if self.tools_list_dictionary is not None:
completion_params.update( completion_params.update(
{ {
@ -293,35 +296,20 @@ class LiteLLM:
"parallel_tool_calls": self.parallel_tool_calls, "parallel_tool_calls": self.parallel_tool_calls,
} }
) )
response = completion(**completion_params)
return (
response.choices[0]
.message.tool_calls[0]
.function.arguments
)
# Handle modality-based completion # Add modalities if needed
if ( if self.modalities and len(self.modalities) > 1:
self.modalities and len(self.modalities) > 1 completion_params["modalities"] = self.modalities
): # More than just text
completion_params.update(
{"modalities": self.modalities}
)
response = completion(**completion_params)
return response.choices[0].message.content
# Standard completion # Make the completion call
if self.stream: response = completion(**completion_params)
output = completion(**completion_params)
else:
response = completion(**completion_params)
if self.tools_list_dictionary is not None:
return self.output_for_tools(response)
else:
return response.choices[0].message.content
return output # Handle tool-based response
if self.tools_list_dictionary is not None:
return self.output_for_tools(response)
else:
# Return standard response content
return response.choices[0].message.content
except LiteLLMException as error: except LiteLLMException as error:
logger.error(f"Error in LiteLLM run: {str(error)}") logger.error(f"Error in LiteLLM run: {str(error)}")
@ -331,7 +319,7 @@ class LiteLLM:
) )
import time import time
time.sleep(2) # Add a small delay before retry time.sleep(2)
return self.run(task, audio, img, *args, **kwargs) return self.run(task, audio, img, *args, **kwargs)
raise error raise error

Loading…
Cancel
Save