diff --git a/agent_mcp.py b/agent_mcp.py new file mode 100644 index 00000000..19538e1d --- /dev/null +++ b/agent_mcp.py @@ -0,0 +1,28 @@ +from swarms import Agent +from swarms.schemas.mcp_schemas import MCPConnection + + +mcp_config = MCPConnection( + url="http://0.0.0.0:8000/sse", + # headers={"Authorization": "Bearer 1234567890"}, + timeout=5, +) + + +mcp_url = "http://0.0.0.0:8000/sse" + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + max_loops=1, + mcp_url=mcp_url, + output_type="all", +) + +# Create a markdown file with initial content +out = agent.run( + "Fetch the price for bitcoin on both functions get_htx_crypto_price and get_crypto_price", +) + +print(out) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 57644ae1..fe561e42 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -195,9 +195,10 @@ nav: - Create and Run Agents from YAML: "swarms/agents/create_agents_yaml.md" - Integrating Various Models into Your Agents: "swarms/models/agent_and_models.md" - Tools: - - Structured Outputs: "swarms/agents/structured_outputs.md" - Overview: "swarms/tools/main.md" - What are tools?: "swarms/tools/build_tool.md" + - Structured Outputs: "swarms/agents/structured_outputs.md" + - Agent MCP Integration: "swarms/structs/agent_mcp.md" - ToolAgent: "swarms/agents/tool_agent.md" - Tool Storage: "swarms/tools/tool_storage.md" - RAG || Long Term Memory: @@ -261,6 +262,7 @@ nav: - Swarms Tools: - Overview: "swarms_tools/overview.md" + - MCP Client Utils: "swarms/tools/mcp_client_call.md" - Vertical Tools: - Finance: "swarms_tools/finance.md" diff --git a/docs/swarms/structs/agent_mcp.md b/docs/swarms/structs/agent_mcp.md new file mode 100644 index 00000000..fd6277a3 --- /dev/null +++ b/docs/swarms/structs/agent_mcp.md @@ -0,0 +1,826 @@ +# Agent MCP Integration Guide + +
+ +- :material-connection: **Direct MCP Server Connection** + + --- + + Connect agents to MCP servers via URL for seamless integration + + [:octicons-arrow-right-24: Quick Start](#quick-start) + +- :material-tools: **Dynamic Tool Discovery** + + --- + + Automatically fetch and utilize tools from MCP servers + + [:octicons-arrow-right-24: Tool Discovery](#integration-flow) + +- :material-chart-line: **Real-time Communication** + + --- + + Server-sent Events (SSE) for live data streaming + + [:octicons-arrow-right-24: Configuration](#configuration-options) + +- :material-code-json: **Structured Output** + + --- + + Process and format responses with multiple output types + + [:octicons-arrow-right-24: Examples](#example-implementations) + +
+ +## Overview + +The **Model Context Protocol (MCP)** integration enables Swarms agents to dynamically connect to external tools and services through a standardized protocol. This powerful feature expands agent capabilities by providing access to APIs, databases, and specialized services. + +!!! info "What is MCP?" + The Model Context Protocol is a standardized way for AI agents to interact with external tools and services, providing a consistent interface for tool discovery and execution. + +--- + +## :material-check-circle: Features Matrix + +=== "✅ Current Capabilities" + + | Feature | Status | Description | + |---------|--------|-------------| + | **Direct MCP Connection** | ✅ Ready | Connect via URL to MCP servers | + | **Tool Discovery** | ✅ Ready | Auto-fetch available tools | + | **SSE Communication** | ✅ Ready | Real-time server communication | + | **Multiple Tool Execution** | ✅ Ready | Execute multiple tools per session | + | **Structured Output** | ✅ Ready | Format responses in multiple types | + +=== "🚧 In Development" + + | Feature | Status | Expected | + |---------|--------|----------| + | **MCPConnection Model** | 🚧 Development | Q1 2024 | + | **Multiple Server Support** | 🚧 Planned | Q2 2024 | + | **Parallel Function Calling** | 🚧 Research | Q2 2024 | + | **Auto-discovery** | 🚧 Planned | Q3 2024 | + +--- + +## :material-rocket: Quick Start + +!!! tip "Prerequisites" + === "System Requirements" + - Python 3.8+ + - Swarms framework + - Running MCP server + + === "Installation" + ```bash + pip install swarms + ``` + +### Step 1: Basic Agent Setup + +!!! example "Simple MCP Agent" + + ```python + from swarms import Agent + + # Initialize agent with MCP integration + agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="AI-powered financial advisor", + max_loops=1, + mcp_url="http://localhost:8000/sse", # Your MCP server + output_type="all", + ) + + # Execute task using MCP tools + result = agent.run( + "Get current Bitcoin price and analyze market trends" + ) + print(result) + ``` + +### Step 2: Advanced Configuration + +!!! example "Production-Ready Setup" + + ```python + from swarms import Agent + from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT + + agent = Agent( + agent_name="Advanced-Financial-Agent", + agent_description="Comprehensive market analysis agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=3, + mcp_url="http://production-server:8000/sse", + output_type="json", + # Additional parameters for production + temperature=0.1, + verbose=True, + ) + ``` + +--- + +## :material-workflow: Integration Flow + +The following diagram illustrates the complete MCP integration workflow: + +```mermaid +graph TD + A[🚀 Agent Receives Task] --> B[🔗 Connect to MCP Server] + B --> C[🔍 Discover Available Tools] + C --> D[🧠 Analyze Task Requirements] + D --> E[📝 Generate Tool Request] + E --> F[📤 Send to MCP Server] + F --> G[⚙️ Server Processes Request] + G --> H[📥 Receive Response] + H --> I[🔄 Process & Validate] + I --> J[📊 Summarize Results] + J --> K[✅ Return Final Output] + + classDef startEnd fill:#e1f5fe,stroke:#01579b,stroke-width:2px + classDef process fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px + classDef communication fill:#fff3e0,stroke:#ef6c00,stroke-width:2px + + class A,K startEnd + class D,I,J process + class F,G,H communication +``` + +### Detailed Process Breakdown + +!!! abstract "Process Steps" + + === "1-3: Initialization" + 1. **Task Initiation** - Agent receives user query + 2. **Server Connection** - Establish MCP server link + 3. **Tool Discovery** - Fetch available tool schemas + + === "4-6: Execution" + 4. **Task Analysis** - Determine required tools + 5. **Request Generation** - Create structured API calls + 6. **Server Communication** - Send requests via SSE + + === "7-9: Processing" + 7. **Server Processing** - MCP server executes tools + 8. **Response Handling** - Receive and validate data + 9. **Result Processing** - Parse and structure output + + === "10-11: Completion" + 10. **Summarization** - Generate user-friendly summary + 11. **Final Output** - Return complete response + +--- + +## :material-cog: Configuration Options + +### Agent Parameters + +!!! note "Configuration Reference" + + | Parameter | Type | Description | Default | Example | + |-----------|------|-------------|---------|---------| + | `mcp_url` | `str` | MCP server endpoint | `None` | `"http://localhost:8000/sse"` | + | `output_type` | `str` | Response format | `"str"` | `"json"`, `"all"`, `"dict"` | + | `max_loops` | `int` | Execution iterations | `1` | `3` | + | `temperature` | `float` | Response creativity | `0.1` | `0.1-1.0` | + | `verbose` | `bool` | Debug logging | `False` | `True` | + +### Output Type Comparison + +=== "all" + **Complete execution trace with metadata** + ```json + { + "response": "Bitcoin price: $45,230", + "tools_used": ["get_crypto_price"], + "execution_time": "2.3s", + "loops_completed": 1 + } + ``` + +=== "str" + **Simple string response** + ``` + "Current Bitcoin price is $45,230 USD" + ``` + +=== "json" + **Structured JSON output** + ```json + { + "bitcoin_price": 45230, + "currency": "USD", + "timestamp": "2024-01-15T10:30:00Z" + } + ``` + +=== "dict" + **Python dictionary format** + ```python + { + 'price': 45230, + 'symbol': 'BTC', + 'change_24h': '+2.5%' + } + ``` + +--- + +## :material-code-tags: Example Implementations + +### Cryptocurrency Trading Agent + +!!! example "Crypto Price Monitor" + + ```python + from swarms import Agent + + crypto_agent = Agent( + agent_name="Crypto-Trading-Agent", + agent_description="Real-time cryptocurrency market analyzer", + max_loops=2, + mcp_url="http://crypto-server:8000/sse", + output_type="json", + temperature=0.1, + ) + + # Multi-exchange price comparison + result = crypto_agent.run( + """ + Compare Bitcoin and Ethereum prices across OKX and HTX exchanges. + Calculate arbitrage opportunities and provide trading recommendations. + """ + ) + ``` + +### Financial Analysis Suite + +!!! example "Advanced Financial Agent" + + ```python + from swarms import Agent + from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT + + financial_agent = Agent( + agent_name="Financial-Analysis-Suite", + agent_description="Comprehensive financial market analyst", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=4, + mcp_url="http://finance-api:8000/sse", + output_type="all", + temperature=0.2, + ) + + # Complex market analysis + analysis = financial_agent.run( + """ + Perform a comprehensive analysis of Tesla (TSLA) stock: + 1. Current price and technical indicators + 2. Recent news sentiment analysis + 3. Competitor comparison (GM, Ford) + 4. Investment recommendation with risk assessment + """ + ) + ``` + +### Custom Industry Agent + +!!! example "Healthcare Data Agent" + + ```python + from swarms import Agent + + healthcare_agent = Agent( + agent_name="Healthcare-Data-Agent", + agent_description="Medical data analysis and research assistant", + max_loops=3, + mcp_url="http://medical-api:8000/sse", + output_type="dict", + system_prompt=""" + You are a healthcare data analyst. Use available medical databases + and research tools to provide accurate, evidence-based information. + Always cite sources and include confidence levels. + """, + ) + + research = healthcare_agent.run( + "Research latest treatments for Type 2 diabetes and their efficacy rates" + ) + ``` + +--- + +## :material-server: MCP Server Development + +### FastMCP Server Example + +!!! example "Building a Custom MCP Server" + + ```python + from mcp.server.fastmcp import FastMCP + import requests + from typing import Optional + import asyncio + + # Initialize MCP server + mcp = FastMCP("crypto_analysis_server") + + @mcp.tool( + name="get_crypto_price", + description="Fetch current cryptocurrency price with market data", + ) + def get_crypto_price( + symbol: str, + currency: str = "USD", + include_24h_change: bool = True + ) -> dict: + """ + Get real-time cryptocurrency price and market data. + + Args: + symbol: Cryptocurrency symbol (e.g., BTC, ETH) + currency: Target currency for price (default: USD) + include_24h_change: Include 24-hour price change data + """ + try: + url = f"https://api.coingecko.com/api/v3/simple/price" + params = { + "ids": symbol.lower(), + "vs_currencies": currency.lower(), + "include_24hr_change": include_24h_change + } + + response = requests.get(url, params=params, timeout=10) + response.raise_for_status() + + data = response.json() + return { + "symbol": symbol.upper(), + "price": data[symbol.lower()][currency.lower()], + "currency": currency.upper(), + "change_24h": data[symbol.lower()].get("24h_change", 0), + "timestamp": "2024-01-15T10:30:00Z" + } + + except Exception as e: + return {"error": f"Failed to fetch price: {str(e)}"} + + @mcp.tool( + name="analyze_market_sentiment", + description="Analyze cryptocurrency market sentiment from social media", + ) + def analyze_market_sentiment(symbol: str, timeframe: str = "24h") -> dict: + """Analyze market sentiment for a cryptocurrency.""" + # Implement sentiment analysis logic + return { + "symbol": symbol, + "sentiment_score": 0.75, + "sentiment": "Bullish", + "confidence": 0.85, + "timeframe": timeframe + } + + if __name__ == "__main__": + mcp.run(transport="sse", host="0.0.0.0", port=8000) + ``` + +### Server Best Practices + +!!! tip "Server Development Guidelines" + + === "🏗️ Architecture" + - **Modular Design**: Separate tools into logical modules + - **Error Handling**: Implement comprehensive error responses + - **Async Support**: Use async/await for better performance + - **Type Hints**: Include proper type annotations + + === "🔒 Security" + - **Input Validation**: Sanitize all user inputs + - **Rate Limiting**: Implement request throttling + - **Authentication**: Add API key validation + - **Logging**: Log all requests and responses + + === "⚡ Performance" + - **Caching**: Cache frequently requested data + - **Connection Pooling**: Reuse database connections + - **Timeouts**: Set appropriate request timeouts + - **Load Testing**: Test under realistic load + +--- + +## :material-alert: Current Limitations + +!!! warning "Important Limitations" + + ### 🚧 MCPConnection Model + + The enhanced connection model is under development: + + ```python + # ❌ Not available yet + from swarms.schemas.mcp_schemas import MCPConnection + + mcp_config = MCPConnection( + url="http://server:8000/sse", + headers={"Authorization": "Bearer token"}, + timeout=30, + retry_attempts=3 + ) + + # ✅ Use direct URL instead + mcp_url = "http://server:8000/sse" + ``` + + ### 🚧 Single Server Limitation + + Currently supports one server per agent: + + ```python + # ❌ Multiple servers not supported + mcp_servers = [ + "http://server1:8000/sse", + "http://server2:8000/sse" + ] + + # ✅ Single server only + mcp_url = "http://primary-server:8000/sse" + ``` + + ### 🚧 Sequential Execution + + Tools execute sequentially, not in parallel: + + ```python + # Current: tool1() → tool2() → tool3() + # Future: tool1() | tool2() | tool3() (parallel) + ``` + +--- + +## :material-wrench: Troubleshooting + +### Common Issues & Solutions + +!!! bug "Connection Problems" + + === "Server Unreachable" + **Symptoms**: Connection timeout or refused + + **Solutions**: + ```bash + # Check server status + curl -I http://localhost:8000/sse + + # Verify port is open + netstat -tulpn | grep :8000 + + # Test network connectivity + ping your-server-host + ``` + + === "Authentication Errors" + **Symptoms**: 401/403 HTTP errors + + **Solutions**: + ```python + # Verify API credentials + headers = {"Authorization": "Bearer your-token"} + + # Check token expiration + # Validate permissions + ``` + + === "SSL/TLS Issues" + **Symptoms**: Certificate errors + + **Solutions**: + ```python + # For development only + import ssl + ssl._create_default_https_context = ssl._create_unverified_context + ``` + +!!! bug "Tool Discovery Failures" + + === "Empty Tool List" + **Symptoms**: No tools found from server + + **Debugging**: + ```python + # Check server tool registration + @mcp.tool(name="tool_name", description="...") + def your_tool(): + pass + + # Verify server startup logs + # Check tool endpoint responses + ``` + + === "Schema Validation Errors" + **Symptoms**: Invalid tool parameters + + **Solutions**: + ```python + # Ensure proper type hints + def tool(param: str, optional: int = 0) -> dict: + return {"result": "success"} + + # Validate parameter types + # Check required vs optional parameters + ``` + +!!! bug "Performance Issues" + + === "Slow Response Times" + **Symptoms**: Long wait times for responses + + **Optimization**: + ```python + # Increase timeout + agent = Agent( + mcp_url="http://server:8000/sse", + timeout=60, # seconds + ) + + # Optimize server performance + # Use connection pooling + # Implement caching + ``` + + === "Memory Usage" + **Symptoms**: High memory consumption + + **Solutions**: + ```python + # Limit max_loops + agent = Agent(max_loops=2) + + # Use streaming for large responses + # Implement garbage collection + ``` + +### Debugging Tools + +!!! tip "Debug Configuration" + + ```python + import logging + + # Enable debug logging + logging.basicConfig(level=logging.DEBUG) + + agent = Agent( + agent_name="Debug-Agent", + mcp_url="http://localhost:8000/sse", + verbose=True, # Enable verbose output + output_type="all", # Get full execution trace + ) + + # Monitor network traffic + # Check server logs + # Use profiling tools + ``` + +--- + +## :material-security: Security Best Practices + +### Authentication & Authorization + +!!! shield "Security Checklist" + + === "🔑 Authentication" + - **API Keys**: Use strong, unique API keys + - **Token Rotation**: Implement automatic token refresh + - **Encryption**: Use HTTPS for all communications + - **Storage**: Secure credential storage (environment variables) + + === "🛡️ Authorization" + - **Role-Based Access**: Implement user role restrictions + - **Tool Permissions**: Limit tool access per user/agent + - **Rate Limiting**: Prevent abuse with request limits + - **Audit Logging**: Log all tool executions + + === "🔒 Data Protection" + - **Input Sanitization**: Validate all user inputs + - **Output Filtering**: Sanitize sensitive data in responses + - **Encryption**: Encrypt sensitive data in transit/rest + - **Compliance**: Follow industry standards (GDPR, HIPAA) + +### Secure Configuration + +!!! example "Production Security Setup" + + ```python + import os + from swarms import Agent + + # Secure configuration + agent = Agent( + agent_name="Production-Agent", + mcp_url=os.getenv("MCP_SERVER_URL"), # From environment + # Additional security headers would go here when MCPConnection is available + verbose=False, # Disable verbose logging in production + output_type="json", # Structured output only + ) + + # Environment variables (.env file) + """ + MCP_SERVER_URL=https://secure-server.company.com/sse + MCP_API_KEY=your-secure-api-key + MCP_TIMEOUT=30 + """ + ``` + +--- + +## :material-chart-line: Performance Optimization + +### Agent Optimization + +!!! rocket "Performance Tips" + + === "⚡ Configuration" + ```python + # Optimized agent settings + agent = Agent( + max_loops=2, # Limit iterations + temperature=0.1, # Reduce randomness + output_type="json", # Structured output + # Future: connection_pool_size=10 + ) + ``` + + === "🔄 Caching" + ```python + # Implement response caching + from functools import lru_cache + + @lru_cache(maxsize=100) + def cached_mcp_call(query): + return agent.run(query) + ``` + + === "📊 Monitoring" + ```python + import time + + start_time = time.time() + result = agent.run("query") + execution_time = time.time() - start_time + + print(f"Execution time: {execution_time:.2f}s") + ``` + +### Server Optimization + +!!! rocket "Server Performance" + + ```python + from mcp.server.fastmcp import FastMCP + import asyncio + from concurrent.futures import ThreadPoolExecutor + + mcp = FastMCP("optimized_server") + + # Async tool with thread pool + @mcp.tool(name="async_heavy_task") + async def heavy_computation(data: str) -> dict: + loop = asyncio.get_event_loop() + with ThreadPoolExecutor() as executor: + result = await loop.run_in_executor( + executor, process_heavy_task, data + ) + return result + + def process_heavy_task(data): + # CPU-intensive processing + return {"processed": data} + ``` + +--- + +## :material-timeline: Future Roadmap + +### Upcoming Features + +!!! rocket "Development Timeline" + + === "Q1 2024" + - **MCPConnection Model** - Enhanced configuration + - **Authentication Support** - Built-in auth mechanisms + - **Error Recovery** - Automatic retry logic + - **Connection Pooling** - Improved performance + + === "Q2 2024" + - **Multiple Server Support** - Connect to multiple MCPs + - **Parallel Execution** - Concurrent tool calling + - **Load Balancing** - Distribute requests across servers + - **Advanced Monitoring** - Real-time metrics + + === "Q3 2024" + - **Auto-discovery** - Automatic server detection + - **Workflow Engine** - Complex task orchestration + - **Plugin System** - Custom MCP extensions + - **Cloud Integration** - Native cloud provider support + +### Contributing + +!!! heart "Get Involved" + + We welcome contributions to improve MCP integration: + + - **Bug Reports**: [GitHub Issues](https://github.com/kyegomez/swarms/issues) + - **Feature Requests**: [Discussions](https://github.com/kyegomez/swarms/discussions) + - **Code Contributions**: [Pull Requests](https://github.com/kyegomez/swarms/pulls) + - **Documentation**: Help improve these docs + +--- + +## :material-help-circle: Support & Resources + +### Getting Help + +!!! question "Need Assistance?" + + === "📚 Documentation" + - [Official Docs](https://docs.swarms.world) + - [API Reference](https://docs.swarms.world/api) + - [Tutorials](https://docs.swarms.world/tutorials) + + === "💬 Community" + - [Discord Server](https://discord.gg/swarms) + - [GitHub Discussions](https://github.com/kyegomez/swarms/discussions) + - [Stack Overflow](https://stackoverflow.com/questions/tagged/swarms) + + === "🔧 Development" + - [GitHub Repository](https://github.com/kyegomez/swarms) + - [Example Projects](https://github.com/kyegomez/swarms/tree/main/examples) + - [Contributing Guide](https://github.com/kyegomez/swarms/blob/main/CONTRIBUTING.md) + +### Quick Reference + +!!! abstract "Cheat Sheet" + + ```python + # Basic setup + from swarms import Agent + + agent = Agent( + agent_name="Your-Agent", + mcp_url="http://localhost:8000/sse", + output_type="json", + max_loops=2 + ) + + # Execute task + result = agent.run("Your query here") + + # Common patterns + crypto_query = "Get Bitcoin price" + analysis_query = "Analyze Tesla stock performance" + research_query = "Research recent AI developments" + ``` + +--- + +## :material-check-all: Conclusion + +The MCP integration brings powerful external tool connectivity to Swarms agents, enabling them to access real-world data and services through a standardized protocol. While some advanced features are still in development, the current implementation provides robust functionality for most use cases. + +!!! success "Ready to Start?" + + Begin with the [Quick Start](#quick-start) section and explore the [examples](#example-implementations) to see MCP integration in action. As new features become available, this documentation will be updated with the latest capabilities and best practices. + +!!! tip "Stay Updated" + + Join our [Discord community](https://discord.gg/swarms) to stay informed about new MCP features and connect with other developers building amazing agent applications. + +--- + +
+ +- :material-rocket: **[Quick Start](#quick-start)** + + Get up and running with MCP integration in minutes + +- :material-book-open: **[Examples](#example-implementations)** + + Explore real-world implementations and use cases + +- :material-cog: **[Configuration](#configuration-options)** + + Learn about all available configuration options + +- :material-help: **[Troubleshooting](#troubleshooting)** + + Solve common issues and optimize performance + +
diff --git a/docs/swarms/tools/mcp_client_call.md b/docs/swarms/tools/mcp_client_call.md new file mode 100644 index 00000000..d778d04d --- /dev/null +++ b/docs/swarms/tools/mcp_client_call.md @@ -0,0 +1,244 @@ +# MCP Client Call Reference Documentation + +This document provides a comprehensive reference for the MCP (Model Control Protocol) client call functions, including detailed parameter descriptions, return types, and usage examples. + +## Table of Contents + +- [aget_mcp_tools](#aget_mcp_tools) + +- [get_mcp_tools_sync](#get_mcp_tools_sync) + +- [get_tools_for_multiple_mcp_servers](#get_tools_for_multiple_mcp_servers) + +- [execute_tool_call_simple](#execute_tool_call_simple) + +## Function Reference + +### aget_mcp_tools + +Asynchronously fetches available MCP tools from the server with retry logic. + +#### Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| server_path | Optional[str] | No | Path to the MCP server script | +| format | str | No | Format of the returned tools (default: "openai") | +| connection | Optional[MCPConnection] | No | MCP connection object | +| *args | Any | No | Additional positional arguments | +| **kwargs | Any | No | Additional keyword arguments | + +#### Returns + +- `List[Dict[str, Any]]`: List of available MCP tools in OpenAI format + +#### Raises + +- `MCPValidationError`: If server_path is invalid + +- `MCPConnectionError`: If connection to server fails + +#### Example + +```python +import asyncio +from swarms.tools.mcp_client_call import aget_mcp_tools +from swarms.tools.mcp_connection import MCPConnection + +async def main(): + # Using server path + tools = await aget_mcp_tools(server_path="http://localhost:8000") + + # Using connection object + connection = MCPConnection( + host="localhost", + port=8000, + headers={"Authorization": "Bearer token"} + ) + tools = await aget_mcp_tools(connection=connection) + + print(f"Found {len(tools)} tools") + +if __name__ == "__main__": + asyncio.run(main()) +``` + +### get_mcp_tools_sync + +Synchronous version of get_mcp_tools that handles event loop management. + +#### Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| server_path | Optional[str] | No | Path to the MCP server script | +| format | str | No | Format of the returned tools (default: "openai") | +| connection | Optional[MCPConnection] | No | MCP connection object | +| *args | Any | No | Additional positional arguments | +| **kwargs | Any | No | Additional keyword arguments | + +#### Returns + +- `List[Dict[str, Any]]`: List of available MCP tools in OpenAI format + +#### Raises + +- `MCPValidationError`: If server_path is invalid + +- `MCPConnectionError`: If connection to server fails + +- `MCPExecutionError`: If event loop management fails + +#### Example + +```python +from swarms.tools.mcp_client_call import get_mcp_tools_sync +from swarms.tools.mcp_connection import MCPConnection + +# Using server path +tools = get_mcp_tools_sync(server_path="http://localhost:8000") + +# Using connection object +connection = MCPConnection( + host="localhost", + port=8000, + headers={"Authorization": "Bearer token"} +) +tools = get_mcp_tools_sync(connection=connection) + +print(f"Found {len(tools)} tools") +``` + +### get_tools_for_multiple_mcp_servers + +Get tools for multiple MCP servers concurrently using ThreadPoolExecutor. + +#### Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| urls | List[str] | Yes | List of server URLs to fetch tools from | +| connections | List[MCPConnection] | No | Optional list of MCPConnection objects | +| format | str | No | Format to return tools in (default: "openai") | +| output_type | Literal["json", "dict", "str"] | No | Type of output format (default: "str") | +| max_workers | Optional[int] | No | Maximum number of worker threads | + +#### Returns + +- `List[Dict[str, Any]]`: Combined list of tools from all servers + +#### Raises + +- `MCPExecutionError`: If fetching tools from any server fails + +#### Example + +```python +from swarms.tools.mcp_client_call import get_tools_for_multiple_mcp_servers +from swarms.tools.mcp_connection import MCPConnection + +# Define server URLs +urls = [ + "http://server1:8000", + "http://server2:8000" +] + +# Optional: Define connections +connections = [ + MCPConnection(host="server1", port=8000), + MCPConnection(host="server2", port=8000) +] + +# Get tools from all servers +tools = get_tools_for_multiple_mcp_servers( + urls=urls, + connections=connections, + format="openai", + output_type="dict", + max_workers=4 +) + +print(f"Found {len(tools)} tools across all servers") +``` + +### execute_tool_call_simple + +Execute a tool call using the MCP client. + +#### Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| response | Any | No | Tool call response object | +| server_path | str | No | Path to the MCP server | +| connection | Optional[MCPConnection] | No | MCP connection object | +| output_type | Literal["json", "dict", "str", "formatted"] | No | Type of output format (default: "str") | +| *args | Any | No | Additional positional arguments | +| **kwargs | Any | No | Additional keyword arguments | + +#### Returns + +- `List[Dict[str, Any]]`: Result of the tool execution + +#### Raises + +- `MCPConnectionError`: If connection to server fails + +- `MCPExecutionError`: If tool execution fails + +#### Example +```python +import asyncio +from swarms.tools.mcp_client_call import execute_tool_call_simple +from swarms.tools.mcp_connection import MCPConnection + +async def main(): + # Example tool call response + response = { + "name": "example_tool", + "parameters": {"param1": "value1"} + } + + # Using server path + result = await execute_tool_call_simple( + response=response, + server_path="http://localhost:8000", + output_type="json" + ) + + # Using connection object + connection = MCPConnection( + host="localhost", + port=8000, + headers={"Authorization": "Bearer token"} + ) + result = await execute_tool_call_simple( + response=response, + connection=connection, + output_type="dict" + ) + + print(f"Tool execution result: {result}") + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## Error Handling + +The MCP client functions use a retry mechanism with exponential backoff for failed requests. The following error types may be raised: + +- `MCPValidationError`: Raised when input validation fails + +- `MCPConnectionError`: Raised when connection to the MCP server fails + +- `MCPExecutionError`: Raised when tool execution fails + +## Best Practices + +1. Always handle potential exceptions when using these functions +2. Use connection objects for authenticated requests +3. Consider using the async versions for better performance in async applications +4. Use appropriate output types based on your needs +5. When working with multiple servers, adjust max_workers based on your system's capabilities + diff --git a/docs/swarms_cloud/swarms_api.md b/docs/swarms_cloud/swarms_api.md index 270a6088..40124d94 100644 --- a/docs/swarms_cloud/swarms_api.md +++ b/docs/swarms_cloud/swarms_api.md @@ -1,8 +1,9 @@ # Swarms API Documentation -*Enterprise-grade Agent Swarm Management API* +*Enterprise-Grade Agent Swarm Management API* **Base URL**: `https://api.swarms.world` or `https://swarms-api-285321057562.us-east1.run.app` + **API Key Management**: [https://swarms.world/platform/api-keys](https://swarms.world/platform/api-keys) ## Overview diff --git a/examples/redis_conversation.py b/examples/communication_examples/redis_conversation.py similarity index 100% rename from examples/redis_conversation.py rename to examples/communication_examples/redis_conversation.py diff --git a/examples/single_agent/insurance_agent.py b/examples/single_agent/demos/insurance_agent.py similarity index 100% rename from examples/single_agent/insurance_agent.py rename to examples/single_agent/demos/insurance_agent.py diff --git a/examples/single_agent/persistent_legal_agent.py b/examples/single_agent/demos/persistent_legal_agent.py similarity index 100% rename from examples/single_agent/persistent_legal_agent.py rename to examples/single_agent/demos/persistent_legal_agent.py diff --git a/examples/single_agent/example_async_vs_multithread.py b/examples/single_agent/tools/example_async_vs_multithread.py similarity index 100% rename from examples/single_agent/example_async_vs_multithread.py rename to examples/single_agent/tools/example_async_vs_multithread.py diff --git a/examples/single_agent/tools/mcp_example/mcp_utils.py b/examples/single_agent/tools/mcp_example/mcp_utils.py deleted file mode 100644 index b7085cc3..00000000 --- a/examples/single_agent/tools/mcp_example/mcp_utils.py +++ /dev/null @@ -1,10 +0,0 @@ -from swarms.tools.mcp_client import ( - list_tools_for_multiple_urls, -) - - -print( - list_tools_for_multiple_urls( - ["http://0.0.0.0:8000/sse"], output_type="json" - ) -) diff --git a/examples/single_agent/tools/mcp_example/test_execute.py b/examples/single_agent/tools/mcp_example/test_execute.py deleted file mode 100644 index ed9122a9..00000000 --- a/examples/single_agent/tools/mcp_example/test_execute.py +++ /dev/null @@ -1,8 +0,0 @@ -from swarms.tools.mcp_client import execute_mcp_tool - -print( - execute_mcp_tool( - "http://0.0.0.0:8000/sse", - parameters={"name": "multiply", "a": 1, "b": 2}, - ) -) diff --git a/examples/tools/omni_modal_agent.py b/examples/single_agent/tools/omni_modal_agent.py similarity index 100% rename from examples/tools/omni_modal_agent.py rename to examples/single_agent/tools/omni_modal_agent.py diff --git a/examples/single_agent/tools/structured_outputs/structured_outputs_example.py b/examples/single_agent/tools/structured_outputs/structured_outputs_example.py index d7f2aa03..cbc5d8cb 100644 --- a/examples/single_agent/tools/structured_outputs/structured_outputs_example.py +++ b/examples/single_agent/tools/structured_outputs/structured_outputs_example.py @@ -46,6 +46,9 @@ agent = Agent( tools_list_dictionary=tools, ) -agent.run( +out = agent.run( "What is the current stock price for Apple Inc. (AAPL)? Include historical price data.", ) + +print(out) +print(type(out)) diff --git a/examples/tools/swarms_of_browser_agents.py b/examples/single_agent/tools/swarms_of_browser_agents.py similarity index 100% rename from examples/tools/swarms_of_browser_agents.py rename to examples/single_agent/tools/swarms_of_browser_agents.py diff --git a/examples/tools/together_deepseek_agent.py b/examples/single_agent/tools/together_deepseek_agent.py similarity index 100% rename from examples/tools/together_deepseek_agent.py rename to examples/single_agent/tools/together_deepseek_agent.py diff --git a/examples/single_agent/async_agent.py b/examples/single_agent/utils/async_agent.py similarity index 100% rename from examples/single_agent/async_agent.py rename to examples/single_agent/utils/async_agent.py diff --git a/examples/single_agent/markdown_agent.py b/examples/single_agent/utils/markdown_agent.py similarity index 100% rename from examples/single_agent/markdown_agent.py rename to examples/single_agent/utils/markdown_agent.py diff --git a/examples/xml_output_example.py b/examples/single_agent/utils/xml_output_example.py similarity index 100% rename from examples/xml_output_example.py rename to examples/single_agent/utils/xml_output_example.py diff --git a/find_tools_on_mcp.py b/find_tools_on_mcp.py deleted file mode 100644 index a185d803..00000000 --- a/find_tools_on_mcp.py +++ /dev/null @@ -1,53 +0,0 @@ -import asyncio -from swarms.tools.mcp_client_call import ( - aget_mcp_tools, - execute_tool_call, -) -import json - - -async def main(): - tools = await aget_mcp_tools("http://0.0.0.0:8000/sse", "openai") - print(json.dumps(tools, indent=4)) - - # First create the markdown file - create_result = await execute_tool_call( - server_path="http://0.0.0.0:8000/sse", - messages=[ - { - "role": "user", - "content": "Create a new markdown file called 'chicken_cat_story'", - } - ], - ) - print("File creation result:", create_result) - - # Then write the story to the file - story_content = """Title: The Adventures of Clucky and Whiskers - -Once upon a time in a quiet, sunlit farm, there lived a curious chicken named Clucky and a mischievous cat named Whiskers. Clucky was known for her vibrant spirit and insatiable curiosity, roaming the farmyard with her head held high. Whiskers, on the other hand, was a clever little feline who always found himself in amusing predicaments; he often ventured into adventures that few dared to imagine. - -The unlikely duo first met one fine autumn morning when Whiskers was chasing a playful butterfly near the barn. Clucky, busy pecking at the ground, almost tripped over Whiskers. Apologizing in her gentle clucks, she noticed that Whiskers was not scared at all—instead, he greeted her with a friendly purr. From that day on, the two embarked on countless adventures, exploring every corner of the farm and beyond. - -They would roam the rolling meadows, share stories under the starry night sky, and even work together to solve little mysteries that baffled the other animals. Whether it was searching for a hidden pile of treats or finding safe paths through the woods, Clucky and Whiskers proved that friendship can be found in the most unexpected places. - -The other animals on the farm watched in amazement as the chicken and the cat not only complemented each other but also became the best of friends. Clucky's boldness and Whiskers' cunning were a perfect match, teaching everyone that differences can create the strongest bonds. - -In the heartwarming adventures of Clucky and Whiskers, one could learn that true friendship breaks all barriers, be they of fur or feathers. The legend of the brave chicken and the clever cat lived on forever, reminding everyone on the farm that unity makes life more colorful and joyful. - -The End.""" - - story_result = await execute_tool_call( - server_path="http://0.0.0.0:8000/sse", - messages=[ - { - "role": "user", - "content": f"Write this story to the file 'chicken_cat_story.md': {story_content}", - } - ], - ) - print("Story writing result:", story_result) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/litellm_parallel_function_call_test.py b/litellm_parallel_function_call_test.py new file mode 100644 index 00000000..56f8fc34 --- /dev/null +++ b/litellm_parallel_function_call_test.py @@ -0,0 +1,62 @@ +import os +import litellm +import json +from dotenv import load_dotenv + +load_dotenv() + + +# Example dummy function hard coded to return the same weather +# In production, this could be your backend API or an external API +def get_current_weather(location, unit="fahrenheit"): + """Get the current weather in a given location""" + if "tokyo" in location.lower(): + return json.dumps({"location": "Tokyo", "temperature": "10", "unit": "celsius"}) + elif "san francisco" in location.lower(): + return json.dumps({"location": "San Francisco", "temperature": "72", "unit": "fahrenheit"}) + elif "paris" in location.lower(): + return json.dumps({"location": "Paris", "temperature": "22", "unit": "celsius"}) + else: + return json.dumps({"location": location, "temperature": "unknown"}) + + +def test_parallel_function_call(): + try: + # Step 1: send the conversation and available functions to the model + messages = [{"role": "user", "content": "What's the weather like in San Francisco, Tokyo, and Paris?"}] + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + }, + "required": ["location"], + }, + }, + } + ] + response = litellm.completion( + model="gpt-4o-mini", + messages=messages, + tools=tools, + tool_choice="auto", # auto is default, but we'll be explicit + api_key=os.getenv("OPENAI_API_KEY"), + ) + # print("\nFirst LLM Response:\n", response) + # response_message = response.choices[0].message + # tool_calls = response_message.tool_calls + # print(tool_calls) + print(response.model_dump_json(indent=4)) + except Exception as e: + print(f"Error occurred: {e}") + +test_parallel_function_call() \ No newline at end of file diff --git a/examples/single_agent/tools/mcp_example/agent_mcp_test.py b/mcp_examples/agent_use/agent_mcp.py similarity index 54% rename from examples/single_agent/tools/mcp_example/agent_mcp_test.py rename to mcp_examples/agent_use/agent_mcp.py index 86c19a25..6307790c 100644 --- a/examples/single_agent/tools/mcp_example/agent_mcp_test.py +++ b/mcp_examples/agent_use/agent_mcp.py @@ -2,12 +2,7 @@ from swarms import Agent from swarms.prompts.finance_agent_sys_prompt import ( FINANCIAL_AGENT_SYS_PROMPT, ) -from swarms.tools.mcp_integration import MCPServerSseParams -server_one = MCPServerSseParams( - url="http://127.0.0.1:6274", - headers={"Content-Type": "application/json"}, -) # Initialize the agent agent = Agent( @@ -15,10 +10,13 @@ agent = Agent( agent_description="Personal finance advisor agent", system_prompt=FINANCIAL_AGENT_SYS_PROMPT, max_loops=1, - mcp_servers=[server_one], - output_type="final", + mcp_url="http://0.0.0.0:8000/sse", ) -out = agent.run("Use the add tool to add 2 and 2") +# Create a markdown file with initial content +out = agent.run( + "Use any of the tools available to you", +) +print(out) print(type(out)) diff --git a/examples/single_agent/tools/mcp_example/agent_tools_dict_example.py b/mcp_examples/agent_use/agent_tools_dict_example.py similarity index 100% rename from examples/single_agent/tools/mcp_example/agent_tools_dict_example.py rename to mcp_examples/agent_use/agent_tools_dict_example.py diff --git a/examples/tools/mcp_exampler.py b/mcp_examples/agent_use/mcp_exampler.py similarity index 100% rename from examples/tools/mcp_exampler.py rename to mcp_examples/agent_use/mcp_exampler.py diff --git a/mcp_examples/servers/okx_crypto_server.py b/mcp_examples/servers/okx_crypto_server.py new file mode 100644 index 00000000..6cbd7ad9 --- /dev/null +++ b/mcp_examples/servers/okx_crypto_server.py @@ -0,0 +1,117 @@ +from mcp.server.fastmcp import FastMCP +import requests + +mcp = FastMCP("okx_crypto_server", str) + + +@mcp.tool( + name="get_okx_crypto_price", + description="Get the current price and basic information for a given cryptocurrency from OKX exchange.", +) +def get_okx_crypto_price(symbol: str) -> str: + """ + Get the current price and basic information for a given cryptocurrency using OKX API. + + Args: + symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT') + + Returns: + str: A formatted string containing the cryptocurrency information + + Example: + >>> get_okx_crypto_price('BTC-USDT') + 'Current price of BTC/USDT: $45,000' + """ + try: + if not symbol: + return "Please provide a valid trading pair (e.g., 'BTC-USDT')" + + # Convert to uppercase and ensure proper format + symbol = symbol.upper() + if not symbol.endswith("-USDT"): + symbol = f"{symbol}-USDT" + + # OKX API endpoint for ticker information + url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}" + + # Make the API request + response = requests.get(url) + response.raise_for_status() + + data = response.json() + + if data.get("code") != "0": + return f"Error: {data.get('msg', 'Unknown error')}" + + ticker_data = data.get("data", [{}])[0] + if not ticker_data: + return f"Could not find data for {symbol}. Please check the trading pair." + + price = float(ticker_data.get("last", 0)) + change_24h = float(ticker_data.get("last24h", 0)) + change_percent = float(ticker_data.get("change24h", 0)) + + base_currency = symbol.split("-")[0] + return f"Current price of {base_currency}/USDT: ${price:,.2f}\n24h Change: {change_percent:.2f}%" + + except requests.exceptions.RequestException as e: + return f"Error fetching OKX data: {str(e)}" + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="get_okx_crypto_volume", + description="Get the 24-hour trading volume for a given cryptocurrency from OKX exchange.", +) +def get_okx_crypto_volume(symbol: str) -> str: + """ + Get the 24-hour trading volume for a given cryptocurrency using OKX API. + + Args: + symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT') + + Returns: + str: A formatted string containing the trading volume information + + Example: + >>> get_okx_crypto_volume('BTC-USDT') + '24h Trading Volume for BTC/USDT: $1,234,567' + """ + try: + if not symbol: + return "Please provide a valid trading pair (e.g., 'BTC-USDT')" + + # Convert to uppercase and ensure proper format + symbol = symbol.upper() + if not symbol.endswith("-USDT"): + symbol = f"{symbol}-USDT" + + # OKX API endpoint for ticker information + url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}" + + # Make the API request + response = requests.get(url) + response.raise_for_status() + + data = response.json() + + if data.get("code") != "0": + return f"Error: {data.get('msg', 'Unknown error')}" + + ticker_data = data.get("data", [{}])[0] + if not ticker_data: + return f"Could not find data for {symbol}. Please check the trading pair." + + volume_24h = float(ticker_data.get("vol24h", 0)) + base_currency = symbol.split("-")[0] + return f"24h Trading Volume for {base_currency}/USDT: ${volume_24h:,.2f}" + + except requests.exceptions.RequestException as e: + return f"Error fetching OKX data: {str(e)}" + except Exception as e: + return f"Error: {str(e)}" + + +if __name__ == "__main__": + mcp.run(transport="sse") diff --git a/mcp_examples/utils/find_tools_on_mcp.py b/mcp_examples/utils/find_tools_on_mcp.py new file mode 100644 index 00000000..bc2b5a70 --- /dev/null +++ b/mcp_examples/utils/find_tools_on_mcp.py @@ -0,0 +1,20 @@ +from swarms.tools.mcp_client_call import ( + get_mcp_tools_sync, +) +from swarms.schemas.mcp_schemas import MCPConnection +import json + + +if __name__ == "__main__": + tools = get_mcp_tools_sync( + server_path="http://0.0.0.0:8000/sse", + format="openai", + connection=MCPConnection( + url="http://0.0.0.0:8000/sse", + headers={"Authorization": "Bearer 1234567890"}, + timeout=10, + ), + ) + print(json.dumps(tools, indent=4)) + + print(type(tools)) diff --git a/mcp_examples/utils/mcp_execute_example.py b/mcp_examples/utils/mcp_execute_example.py new file mode 100644 index 00000000..99f34826 --- /dev/null +++ b/mcp_examples/utils/mcp_execute_example.py @@ -0,0 +1,33 @@ +from swarms.schemas.mcp_schemas import MCPConnection +from swarms.tools.mcp_client_call import ( + execute_tool_call_simple, +) +import asyncio + +# Example 1: Create a new markdown file +response = { + "function": { + "name": "get_crypto_price", + "arguments": {"coin_id": "bitcoin"}, + } +} + +connection = MCPConnection( + url="http://0.0.0.0:8000/sse", + headers={"Authorization": "Bearer 1234567890"}, + timeout=10, +) + +url = "http://0.0.0.0:8000/sse" + +if __name__ == "__main__": + tools = asyncio.run( + execute_tool_call_simple( + response=response, + connection=connection, + output_type="json", + # server_path=url, + ) + ) + + print(tools) diff --git a/mcp_examples/utils/mcp_load_tools_example.py b/mcp_examples/utils/mcp_load_tools_example.py new file mode 100644 index 00000000..6f1049cf --- /dev/null +++ b/mcp_examples/utils/mcp_load_tools_example.py @@ -0,0 +1,18 @@ +import json + +from swarms.schemas.mcp_schemas import MCPConnection +from swarms.tools.mcp_client_call import ( + get_mcp_tools_sync, +) + +if __name__ == "__main__": + tools = get_mcp_tools_sync( + server_path="http://0.0.0.0:8000/sse", + format="openai", + connection=MCPConnection( + url="http://0.0.0.0:8000/sse", + headers={"Authorization": "Bearer 1234567890"}, + timeout=10, + ), + ) + print(json.dumps(tools, indent=4)) diff --git a/mcp_examples/utils/mcp_multiserver_tool_fetch.py b/mcp_examples/utils/mcp_multiserver_tool_fetch.py new file mode 100644 index 00000000..7cad389e --- /dev/null +++ b/mcp_examples/utils/mcp_multiserver_tool_fetch.py @@ -0,0 +1,20 @@ +from swarms.tools.mcp_client_call import ( + get_tools_for_multiple_mcp_servers, +) +from swarms.schemas.mcp_schemas import MCPConnection + + +mcp_config = MCPConnection( + url="http://0.0.0.0:8000/sse", + # headers={"Authorization": "Bearer 1234567890"}, + timeout=5, +) + +urls = ["http://0.0.0.0:8000/sse", "http://0.0.0.0:8001/sse"] + +out = get_tools_for_multiple_mcp_servers( + urls=urls, + # connections=[mcp_config], +) + +print(out) diff --git a/mcp_test.py b/mcp_test.py index 6d1f93bc..8f6ec37b 100644 --- a/mcp_test.py +++ b/mcp_test.py @@ -1,90 +1,115 @@ -# stock_price_server.py +# crypto_price_server.py from mcp.server.fastmcp import FastMCP -import os -from datetime import datetime +import requests -mcp = FastMCP("StockPrice") +mcp = FastMCP("CryptoPrice") -@mcp.tool() -def create_markdown_file(filename: str) -> str: +@mcp.tool( + name="get_crypto_price", + description="Get the current price and basic information for a given cryptocurrency.", +) +def get_crypto_price(coin_id: str) -> str: """ - Create a new markdown file with a basic structure. + Get the current price and basic information for a given cryptocurrency using CoinGecko API. Args: - filename (str): The name of the markdown file to create (without .md extension) + coin_id (str): The cryptocurrency ID (e.g., 'bitcoin', 'ethereum') Returns: - str: A message indicating success or failure + str: A formatted string containing the cryptocurrency information Example: - >>> create_markdown_file('my_notes') - 'Created markdown file: my_notes.md' + >>> get_crypto_price('bitcoin') + 'Current price of Bitcoin: $45,000' """ try: - if not filename: - return "Please provide a valid filename" + if not coin_id: + return "Please provide a valid cryptocurrency ID" - # Ensure filename ends with .md - if not filename.endswith(".md"): - filename = f"{filename}.md" + # CoinGecko API endpoint + url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin_id}&vs_currencies=usd&include_24hr_change=true" - # Create basic markdown structure - content = f"""# {filename.replace('.md', '')} -Created on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} + # Make the API request + response = requests.get(url) + response.raise_for_status() # Raise an exception for bad status codes -## Content + data = response.json() -""" + if coin_id not in data: + return f"Could not find data for {coin_id}. Please check the cryptocurrency ID." - with open(filename, "w") as f: - f.write(content) + price = data[coin_id]["usd"] + change_24h = data[coin_id].get("usd_24h_change", "N/A") - return f"Created markdown file: {filename}" + return f"Current price of {coin_id.capitalize()}: ${price:,.2f}\n24h Change: {change_24h:.2f}%" + except requests.exceptions.RequestException as e: + return f"Error fetching crypto data: {str(e)}" except Exception as e: - return f"Error creating markdown file: {str(e)}" + return f"Error: {str(e)}" -@mcp.tool() -def write_to_markdown(filename: str, content: str) -> str: +@mcp.tool( + name="get_htx_crypto_price", + description="Get the current price and basic information for a given cryptocurrency from HTX exchange.", +) +def get_htx_crypto_price(symbol: str) -> str: """ - Append content to an existing markdown file. + Get the current price and basic information for a given cryptocurrency using HTX API. Args: - filename (str): The name of the markdown file (without .md extension) - content (str): The content to append to the file + symbol (str): The cryptocurrency trading pair (e.g., 'btcusdt', 'ethusdt') Returns: - str: A message indicating success or failure + str: A formatted string containing the cryptocurrency information Example: - >>> write_to_markdown('my_notes', 'This is a new note') - 'Content added to my_notes.md' + >>> get_htx_crypto_price('btcusdt') + 'Current price of BTC/USDT: $45,000' """ try: - if not filename or not content: - return "Please provide both filename and content" + if not symbol: + return "Please provide a valid trading pair (e.g., 'btcusdt')" - # Ensure filename ends with .md - if not filename.endswith(".md"): - filename = f"{filename}.md" + # Convert to lowercase and ensure proper format + symbol = symbol.lower() + if not symbol.endswith("usdt"): + symbol = f"{symbol}usdt" - # Check if file exists - if not os.path.exists(filename): - return f"File {filename} does not exist. Please create it first using create_markdown_file" + # HTX API endpoint + url = f"https://api.htx.com/market/detail/merged?symbol={symbol}" - # Append content with timestamp - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - formatted_content = f"\n### Entry - {timestamp}\n{content}\n" + # Make the API request + response = requests.get(url) + response.raise_for_status() - with open(filename, "a") as f: - f.write(formatted_content) + data = response.json() - return f"Content added to {filename}" + if data.get("status") != "ok": + return f"Error: {data.get('err-msg', 'Unknown error')}" + tick = data.get("tick", {}) + if not tick: + return f"Could not find data for {symbol}. Please check the trading pair." + + price = tick.get("close", 0) + change_24h = tick.get("close", 0) - tick.get("open", 0) + change_percent = ( + (change_24h / tick.get("open", 1)) * 100 + if tick.get("open") + else 0 + ) + + base_currency = symbol[ + :-4 + ].upper() # Remove 'usdt' and convert to uppercase + return f"Current price of {base_currency}/USDT: ${price:,.2f}\n24h Change: {change_percent:.2f}%" + + except requests.exceptions.RequestException as e: + return f"Error fetching HTX data: {str(e)}" except Exception as e: - return f"Error writing to markdown file: {str(e)}" + return f"Error: {str(e)}" if __name__ == "__main__": diff --git a/okx_crypto_server.py b/okx_crypto_server.py new file mode 100644 index 00000000..90b195f2 --- /dev/null +++ b/okx_crypto_server.py @@ -0,0 +1,119 @@ +from mcp.server.fastmcp import FastMCP +import requests + +mcp = FastMCP("OKXCryptoPrice") + +mcp.settings.port = 8001 + +@mcp.tool( + name="get_okx_crypto_price", + description="Get the current price and basic information for a given cryptocurrency from OKX exchange.", +) +def get_okx_crypto_price(symbol: str) -> str: + """ + Get the current price and basic information for a given cryptocurrency using OKX API. + + Args: + symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT') + + Returns: + str: A formatted string containing the cryptocurrency information + + Example: + >>> get_okx_crypto_price('BTC-USDT') + 'Current price of BTC/USDT: $45,000' + """ + try: + if not symbol: + return "Please provide a valid trading pair (e.g., 'BTC-USDT')" + + # Convert to uppercase and ensure proper format + symbol = symbol.upper() + if not symbol.endswith("-USDT"): + symbol = f"{symbol}-USDT" + + # OKX API endpoint for ticker information + url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}" + + # Make the API request + response = requests.get(url) + response.raise_for_status() + + data = response.json() + + if data.get("code") != "0": + return f"Error: {data.get('msg', 'Unknown error')}" + + ticker_data = data.get("data", [{}])[0] + if not ticker_data: + return f"Could not find data for {symbol}. Please check the trading pair." + + price = float(ticker_data.get("last", 0)) + change_24h = float(ticker_data.get("last24h", 0)) + change_percent = float(ticker_data.get("change24h", 0)) + + base_currency = symbol.split("-")[0] + return f"Current price of {base_currency}/USDT: ${price:,.2f}\n24h Change: {change_percent:.2f}%" + + except requests.exceptions.RequestException as e: + return f"Error fetching OKX data: {str(e)}" + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="get_okx_crypto_volume", + description="Get the 24-hour trading volume for a given cryptocurrency from OKX exchange.", +) +def get_okx_crypto_volume(symbol: str) -> str: + """ + Get the 24-hour trading volume for a given cryptocurrency using OKX API. + + Args: + symbol (str): The cryptocurrency trading pair (e.g., 'BTC-USDT', 'ETH-USDT') + + Returns: + str: A formatted string containing the trading volume information + + Example: + >>> get_okx_crypto_volume('BTC-USDT') + '24h Trading Volume for BTC/USDT: $1,234,567' + """ + try: + if not symbol: + return "Please provide a valid trading pair (e.g., 'BTC-USDT')" + + # Convert to uppercase and ensure proper format + symbol = symbol.upper() + if not symbol.endswith("-USDT"): + symbol = f"{symbol}-USDT" + + # OKX API endpoint for ticker information + url = f"https://www.okx.com/api/v5/market/ticker?instId={symbol}" + + # Make the API request + response = requests.get(url) + response.raise_for_status() + + data = response.json() + + if data.get("code") != "0": + return f"Error: {data.get('msg', 'Unknown error')}" + + ticker_data = data.get("data", [{}])[0] + if not ticker_data: + return f"Could not find data for {symbol}. Please check the trading pair." + + volume_24h = float(ticker_data.get("vol24h", 0)) + base_currency = symbol.split("-")[0] + return f"24h Trading Volume for {base_currency}/USDT: ${volume_24h:,.2f}" + + except requests.exceptions.RequestException as e: + return f"Error fetching OKX data: {str(e)}" + except Exception as e: + return f"Error: {str(e)}" + + +if __name__ == "__main__": + # Run the server on port 8000 (you can change this to any available port) + mcp.run(transport="sse") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0f40e39a..c9d4d8b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,7 +78,6 @@ litellm = "*" torch = "*" httpx = "*" mcp = "*" -fastmcp = "*" aiohttp = "*" [tool.poetry.scripts] diff --git a/requirements.txt b/requirements.txt index 529bce3b..efe21135 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,4 +25,4 @@ httpx # vllm>=0.2.0 aiohttp mcp -fastmcp +fastm \ No newline at end of file diff --git a/swarms/schemas/__init__.py b/swarms/schemas/__init__.py index e4b33b8c..7eb2ff5d 100644 --- a/swarms/schemas/__init__.py +++ b/swarms/schemas/__init__.py @@ -1,7 +1,12 @@ from swarms.schemas.agent_step_schemas import Step, ManySteps - +from swarms.schemas.mcp_schemas import ( + MCPConnection, + MultipleMCPConnections, +) __all__ = [ "Step", "ManySteps", + "MCPConnection", + "MultipleMCPConnections", ] diff --git a/swarms/schemas/agent_mcp_errors.py b/swarms/schemas/agent_mcp_errors.py new file mode 100644 index 00000000..e48fe23b --- /dev/null +++ b/swarms/schemas/agent_mcp_errors.py @@ -0,0 +1,18 @@ +class AgentMCPError(Exception): + pass + + +class AgentMCPConnectionError(AgentMCPError): + pass + + +class AgentMCPToolError(AgentMCPError): + pass + + +class AgentMCPToolNotFoundError(AgentMCPError): + pass + + +class AgentMCPToolInvalidError(AgentMCPError): + pass diff --git a/swarms/schemas/agent_tool_schema.py b/swarms/schemas/agent_tool_schema.py new file mode 100644 index 00000000..1baa1343 --- /dev/null +++ b/swarms/schemas/agent_tool_schema.py @@ -0,0 +1,14 @@ +from pydantic import BaseModel, Field +from typing import List, Dict, Any, Optional, Callable +from swarms.schemas.mcp_schemas import MCPConnection + +class AgentToolTypes(BaseModel): + tool_schema: List[Dict[str, Any]] + mcp_connection: MCPConnection + tool_model: Optional[BaseModel] + tool_functions: Optional[List[Callable]] + + class Config: + arbitrary_types_allowed = True + + \ No newline at end of file diff --git a/swarms/schemas/llm_agent_schema.py b/swarms/schemas/llm_agent_schema.py new file mode 100644 index 00000000..8723bba3 --- /dev/null +++ b/swarms/schemas/llm_agent_schema.py @@ -0,0 +1,94 @@ +from pydantic import BaseModel, Field +from typing import List, Optional, Union, Any, Literal, Type +from litellm.types import ( + ChatCompletionModality, + ChatCompletionPredictionContentParam, + ChatCompletionAudioParam, +) + +class LLMCompletionRequest(BaseModel): + """Schema for LLM completion request parameters.""" + + model: Optional[str] = Field( + default=None, + description="The name of the language model to use for text completion" + ) + temperature: Optional[float] = Field( + default=0.5, + description="Controls randomness of the output (0.0 to 1.0)" + ) + top_p: Optional[float] = Field( + default=None, + description="Controls diversity via nucleus sampling" + ) + n: Optional[int] = Field( + default=None, + description="Number of completions to generate" + ) + stream: Optional[bool] = Field( + default=None, + description="Whether to stream the response" + ) + stream_options: Optional[dict] = Field( + default=None, + description="Options for streaming response" + ) + stop: Optional[Any] = Field( + default=None, + description="Up to 4 sequences where the API will stop generating" + ) + max_completion_tokens: Optional[int] = Field( + default=None, + description="Maximum tokens for completion including reasoning" + ) + max_tokens: Optional[int] = Field( + default=None, + description="Maximum tokens in generated completion" + ) + prediction: Optional[ChatCompletionPredictionContentParam] = Field( + default=None, + description="Configuration for predicted output" + ) + presence_penalty: Optional[float] = Field( + default=None, + description="Penalizes new tokens based on existence in text" + ) + frequency_penalty: Optional[float] = Field( + default=None, + description="Penalizes new tokens based on frequency in text" + ) + logit_bias: Optional[dict] = Field( + default=None, + description="Modifies probability of specific tokens" + ) + reasoning_effort: Optional[Literal["low", "medium", "high"]] = Field( + default=None, + description="Level of reasoning effort for the model" + ) + seed: Optional[int] = Field( + default=None, + description="Random seed for reproducibility" + ) + tools: Optional[List] = Field( + default=None, + description="List of tools available to the model" + ) + tool_choice: Optional[Union[str, dict]] = Field( + default=None, + description="Choice of tool to use" + ) + logprobs: Optional[bool] = Field( + default=None, + description="Whether to return log probabilities" + ) + top_logprobs: Optional[int] = Field( + default=None, + description="Number of most likely tokens to return" + ) + parallel_tool_calls: Optional[bool] = Field( + default=None, + description="Whether to allow parallel tool calls" + ) + + class Config: + allow_arbitrary_types = True \ No newline at end of file diff --git a/swarms/schemas/mcp_schemas.py b/swarms/schemas/mcp_schemas.py new file mode 100644 index 00000000..196ebd24 --- /dev/null +++ b/swarms/schemas/mcp_schemas.py @@ -0,0 +1,43 @@ +from pydantic import BaseModel, Field +from typing import Dict, List, Any, Optional + + +class MCPConnection(BaseModel): + type: Optional[str] = Field( + default="mcp", + description="The type of connection, defaults to 'mcp'", + ) + url: Optional[str] = Field( + default="localhost:8000/sse", + description="The URL endpoint for the MCP server", + ) + tool_configurations: Optional[Dict[Any, Any]] = Field( + default=None, + description="Dictionary containing configuration settings for MCP tools", + ) + authorization_token: Optional[str] = Field( + default=None, + description="Authentication token for accessing the MCP server", + ) + transport: Optional[str] = Field( + default="sse", + description="The transport protocol to use for the MCP server", + ) + headers: Optional[Dict[str, str]] = Field( + default=None, description="Headers to send to the MCP server" + ) + timeout: Optional[int] = Field( + default=5, description="Timeout for the MCP server" + ) + + class Config: + arbitrary_types_allowed = True + + +class MultipleMCPConnections(BaseModel): + connections: List[MCPConnection] = Field( + default=[], description="List of MCP connections" + ) + + class Config: + arbitrary_types_allowed = True diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 26488db8..e6888191 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -31,6 +31,10 @@ from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, ) from swarms.prompts.tools import tool_sop_prompt +from swarms.schemas.agent_mcp_errors import ( + AgentMCPConnectionError, + AgentMCPToolError, +) from swarms.schemas.agent_step_schemas import ManySteps, Step from swarms.schemas.base_schemas import ( AgentChatCompletionResponse, @@ -46,13 +50,6 @@ from swarms.structs.safe_loading import ( ) from swarms.telemetry.main import log_agent_data from swarms.tools.base_tool import BaseTool -from swarms.tools.mcp_client import ( - execute_mcp_tool, - find_and_execute_tool, - list_all, - list_tools_for_multiple_urls, -) -from swarms.tools.mcp_integration import MCPServerSseParams from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.any_to_str import any_to_str from swarms.utils.data_to_text import data_to_text @@ -64,11 +61,18 @@ from swarms.utils.history_output_formatter import ( from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.pdf_to_text import pdf_to_text -from swarms.utils.str_to_dict import str_to_dict from swarms.prompts.react_base_prompt import REACT_SYS_PROMPT from swarms.prompts.max_loop_prompt import generate_reasoning_prompt from swarms.prompts.safety_prompt import SAFETY_PROMPT from swarms.structs.ma_utils import set_random_models_for_agents +from swarms.tools.mcp_client_call import ( + execute_tool_call_simple, + get_mcp_tools_sync, +) +from swarms.schemas.mcp_schemas import ( + MCPConnection, +) +from swarms.utils.index import exists # Utils @@ -90,10 +94,6 @@ def agent_id(): return uuid.uuid4().hex -def exists(val): - return val is not None - - # Agent output types ToolUsageType = Union[BaseModel, Dict[str, Any]] @@ -396,12 +396,12 @@ class Agent: role: agent_roles = "worker", no_print: bool = False, tools_list_dictionary: Optional[List[Dict[str, Any]]] = None, - mcp_servers: MCPServerSseParams = None, - mcp_url: str = None, + mcp_url: Optional[Union[str, MCPConnection]] = None, mcp_urls: List[str] = None, react_on: bool = False, safety_prompt_on: bool = False, random_models_on: bool = False, + mcp_config: Optional[MCPConnection] = None, *args, **kwargs, ): @@ -418,6 +418,7 @@ class Agent: self.stopping_token = stopping_token self.interactive = interactive self.dashboard = dashboard + self.saved_state_path = saved_state_path self.return_history = return_history self.dynamic_temperature_enabled = dynamic_temperature_enabled self.dynamic_loops = dynamic_loops @@ -520,12 +521,12 @@ class Agent: self.role = role self.no_print = no_print self.tools_list_dictionary = tools_list_dictionary - self.mcp_servers = mcp_servers self.mcp_url = mcp_url self.mcp_urls = mcp_urls self.react_on = react_on self.safety_prompt_on = safety_prompt_on self.random_models_on = random_models_on + self.mcp_config = mcp_config self._cached_llm = ( None # Add this line to cache the LLM instance @@ -560,9 +561,6 @@ class Agent: if self.llm is None: self.llm = self.llm_handling() - if self.mcp_url or self.mcp_servers is not None: - self.add_mcp_tools_to_memory() - if self.react_on is True: self.system_prompt += REACT_SYS_PROMPT @@ -589,7 +587,7 @@ class Agent: prompt += SAFETY_PROMPT # Initialize the short term memory - self.short_memory = Conversation( + memory = Conversation( system_prompt=prompt, time_enabled=False, user=self.user_name, @@ -597,7 +595,7 @@ class Agent: token_count=False, ) - return self.short_memory + return memory def agent_output_model(self): # Many steps @@ -645,10 +643,16 @@ class Agent: **common_args, tools_list_dictionary=self.tools_list_dictionary, tool_choice="auto", - parallel_tool_calls=len( - self.tools_list_dictionary - ) - > 1, + parallel_tool_calls=True, + ) + + elif self.mcp_url is not None: + self._cached_llm = LiteLLM( + **common_args, + tools_list_dictionary=self.add_mcp_tools_to_memory(), + tool_choice="auto", + parallel_tool_calls=True, + mcp_call=True, ) else: self._cached_llm = LiteLLM( @@ -715,110 +719,23 @@ class Agent: Exception: If there's an error accessing the MCP tools """ try: - if self.mcp_url is not None: - tools_available = list_all( - self.mcp_url, output_type="json" - ) - self.short_memory.add( - role="Tools Available", - content=f"\n{tools_available}", - ) - - elif ( - self.mcp_url is None - and self.mcp_urls is not None - and len(self.mcp_urls) > 1 - ): - tools_available = list_tools_for_multiple_urls( - urls=self.mcp_urls, - output_type="json", - ) - - self.short_memory.add( - role="Tools Available", - content=f"\n{tools_available}", - ) - except Exception as e: - logger.error(f"Error adding MCP tools to memory: {e}") - raise e - - def _single_mcp_tool_handling(self, response: any): - """ - Handles execution of a single MCP tool. - - Args: - response (str): The tool response to process - - Raises: - Exception: If there's an error executing the tool - """ - try: - if isinstance(response, dict): - result = response - print(type(result)) + if exists(self.mcp_url): + tools = get_mcp_tools_sync(server_path=self.mcp_url) + elif exists(self.mcp_config): + tools = get_mcp_tools_sync(connection=self.mcp_config) + logger.info(f"Tools: {tools}") else: - result = str_to_dict(response) - print(type(result)) - - output = execute_mcp_tool( - url=self.mcp_url, - parameters=result, - ) - - self.short_memory.add( - role="Tool Executor", content=str(output) - ) - except Exception as e: - logger.error(f"Error in single MCP tool handling: {e}") - raise e - - def _multiple_mcp_tool_handling(self, response: any): - """ - Handles execution of multiple MCP tools. - - Args: - response (any): The tool response to process - - Raises: - Exception: If there's an error executing the tools - """ - try: - if isinstance(response, str): - response = str_to_dict(response) - - execution = find_and_execute_tool( - self.mcp_urls, - response["name"], - parameters=response, + raise AgentMCPConnectionError( + "mcp_url must be either a string URL or MCPConnection object" + ) + self.pretty_print( + f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')} ✨", + loop_count=0, ) - self.short_memory.add( - role="Tool Executor", content=str(execution) - ) - except Exception as e: - logger.error(f"Error in multiple MCP tool handling: {e}") - raise e - - def mcp_tool_handling(self, response: any): - """ - Main handler for MCP tool execution. - - Args: - response (any): The tool response to process - - Raises: - ValueError: If no MCP URL or MCP Servers are provided - Exception: If there's an error in tool handling - """ - try: - # if self.mcp_url is not None: - self._single_mcp_tool_handling(response) - # elif self.mcp_url is None and len(self.mcp_servers) > 1: - # self._multiple_mcp_tool_handling(response) - # else: - # raise ValueError("No MCP URL or MCP Servers provided") - except Exception as e: - logger.error(f"Error in mcp_tool_handling: {e}") + return tools + except AgentMCPConnectionError as e: + logger.error(f"Error in MCP connection: {e}") raise e def setup_config(self): @@ -1102,8 +1019,9 @@ class Agent: *response_args, **kwargs ) - # Convert to a str if the response is not a str - response = self.parse_llm_output(response) + # # Convert to a str if the response is not a str + if self.mcp_url is None: + response = self.parse_llm_output(response) self.short_memory.add( role=self.agent_name, content=response @@ -1112,17 +1030,8 @@ class Agent: # Print self.pretty_print(response, loop_count) - # Output Cleaner - self.output_cleaner_op(response) - - ####### MCP TOOL HANDLING ####### - if ( - self.mcp_servers - and self.tools_list_dictionary is not None - ): - self.mcp_tool_handling(response) - - ####### MCP TOOL HANDLING ####### + # # Output Cleaner + # self.output_cleaner_op(response) # Check and execute tools if self.tools is not None: @@ -1156,6 +1065,11 @@ class Agent: self.streaming_on, ) + if self.mcp_url is not None: + self.mcp_tool_handling( + response, loop_count + ) + self.sentiment_and_evaluator(response) success = True # Mark as successful to exit the retry loop @@ -2787,3 +2701,57 @@ class Agent: role="Output Cleaner", content=response, ) + + def mcp_tool_handling( + self, response: any, current_loop: Optional[int] = 0 + ): + try: + + if exists(self.mcp_url): + # Execute the tool call + tool_response = asyncio.run( + execute_tool_call_simple( + response=response, + server_path=self.mcp_url, + ) + ) + elif exists(self.mcp_config): + # Execute the tool call + tool_response = asyncio.run( + execute_tool_call_simple( + response=response, + connection=self.mcp_config, + ) + ) + else: + raise AgentMCPConnectionError( + "mcp_url must be either a string URL or MCPConnection object" + ) + + # Get the text content from the tool response + text_content = ( + tool_response.content[0].text + if tool_response.content + else str(tool_response) + ) + + # Add to the memory + self.short_memory.add( + role="Tool Executor", + content=text_content, + ) + + # Clear the tools list dictionary + self._cached_llm.tools_list_dictionary = None + # Now Call the LLM again with the tool response + summary = self.call_llm(task=self.short_memory.get_str()) + + self.pretty_print(summary, loop_count=current_loop) + + # Add to the memory + self.short_memory.add( + role=self.agent_name, content=summary + ) + except AgentMCPToolError as e: + logger.error(f"Error in MCP tool: {e}") + raise e diff --git a/swarms/structs/aop.py b/swarms/structs/aop.py index 79f2f5d9..42a5fd44 100644 --- a/swarms/structs/aop.py +++ b/swarms/structs/aop.py @@ -4,7 +4,9 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from functools import wraps from typing import Any, Callable, Literal, Optional -from fastmcp import FastMCP, Client +from mcp.server.fastmcp import FastMCP +from mcp.client import Client + from loguru import logger from swarms.utils.any_to_str import any_to_str diff --git a/swarms/tools/__init__.py b/swarms/tools/__init__.py index 20012304..e6b8032f 100644 --- a/swarms/tools/__init__.py +++ b/swarms/tools/__init__.py @@ -27,6 +27,13 @@ from swarms.tools.cohere_func_call_schema import ( ) from swarms.tools.tool_registry import ToolStorage, tool_registry from swarms.tools.json_utils import base_model_to_json +from swarms.tools.mcp_client_call import ( + execute_tool_call_simple, + _execute_tool_call_simple, + get_tools_for_multiple_mcp_servers, + get_mcp_tools_sync, + aget_mcp_tools, +) __all__ = [ @@ -50,4 +57,9 @@ __all__ = [ "ToolStorage", "tool_registry", "base_model_to_json", + "execute_tool_call_simple", + "_execute_tool_call_simple", + "get_tools_for_multiple_mcp_servers", + "get_mcp_tools_sync", + "aget_mcp_tools", ] diff --git a/swarms/tools/mcp_client.py b/swarms/tools/mcp_client.py deleted file mode 100644 index 1e9f8b5e..00000000 --- a/swarms/tools/mcp_client.py +++ /dev/null @@ -1,246 +0,0 @@ -import asyncio -import json -from typing import List, Literal, Dict, Any, Union -from fastmcp import Client -from swarms.utils.str_to_dict import str_to_dict -from loguru import logger - - -def parse_agent_output( - dictionary: Union[str, Dict[Any, Any]], -) -> tuple[str, Dict[Any, Any]]: - """ - Parse agent output into tool name and parameters. - - Args: - dictionary: Either a string or dictionary containing tool information. - If string, it will be converted to a dictionary. - Must contain a 'name' key for the tool name. - - Returns: - tuple[str, Dict[Any, Any]]: A tuple containing the tool name and its parameters. - - Raises: - ValueError: If the input is invalid or missing required 'name' key. - """ - try: - if isinstance(dictionary, str): - dictionary = str_to_dict(dictionary) - - elif not isinstance(dictionary, dict): - raise ValueError("Invalid dictionary") - - # Handle regular dictionary format - if "name" in dictionary: - name = dictionary["name"] - # Remove the name key and use remaining key-value pairs as parameters - params = dict(dictionary) - params.pop("name") - return name, params - - raise ValueError("Invalid function call format") - except Exception as e: - raise ValueError(f"Error parsing agent output: {str(e)}") - - -async def _list_all(url: str): - """ - Asynchronously list all tools available on a given MCP server. - - Args: - url: The URL of the MCP server to query. - - Returns: - List of available tools. - - Raises: - ValueError: If there's an error connecting to or querying the server. - """ - try: - async with Client(url) as client: - return await client.list_tools() - except Exception as e: - raise ValueError(f"Error listing tools: {str(e)}") - - -def list_all(url: str, output_type: Literal["str", "json"] = "json"): - """ - Synchronously list all tools available on a given MCP server. - - Args: - url: The URL of the MCP server to query. - - Returns: - List of dictionaries containing tool information. - - Raises: - ValueError: If there's an error connecting to or querying the server. - """ - try: - out = asyncio.run(_list_all(url)) - - outputs = [] - for tool in out: - outputs.append(tool.model_dump()) - - if output_type == "json": - return json.dumps(outputs, indent=4) - else: - return outputs - except Exception as e: - raise ValueError(f"Error in list_all: {str(e)}") - - -def list_tools_for_multiple_urls( - urls: List[str], output_type: Literal["str", "json"] = "json" -): - """ - List tools available across multiple MCP servers. - - Args: - urls: List of MCP server URLs to query. - output_type: Format of the output, either "json" (string) or "str" (list). - - Returns: - If output_type is "json": JSON string containing all tools with server URLs. - If output_type is "str": List of tools with server URLs. - - Raises: - ValueError: If there's an error querying any of the servers. - """ - try: - out = [] - for url in urls: - tools = list_all(url) - # Add server URL to each tool's data - for tool in tools: - tool["server_url"] = url - out.append(tools) - - if output_type == "json": - return json.dumps(out, indent=4) - else: - return out - except Exception as e: - raise ValueError( - f"Error listing tools for multiple URLs: {str(e)}" - ) - - -async def _execute_mcp_tool( - url: str, - parameters: Dict[Any, Any] = None, - *args, - **kwargs, -) -> Dict[Any, Any]: - """ - Asynchronously execute a tool on an MCP server. - - Args: - url: The URL of the MCP server. - parameters: Dictionary containing tool name and parameters. - *args: Additional positional arguments for the Client. - **kwargs: Additional keyword arguments for the Client. - - Returns: - Dictionary containing the tool execution results. - - Raises: - ValueError: If the URL is invalid or tool execution fails. - """ - try: - - name, params = parse_agent_output(parameters) - - outputs = [] - - async with Client(url, *args, **kwargs) as client: - out = await client.call_tool( - name=name, - arguments=params, - ) - - for output in out: - outputs.append(output.model_dump()) - - # convert outputs to string - return json.dumps(outputs, indent=4) - except Exception as e: - raise ValueError(f"Error executing MCP tool: {str(e)}") - - -def execute_mcp_tool( - url: str, - parameters: Dict[Any, Any] = None, -) -> Dict[Any, Any]: - """ - Synchronously execute a tool on an MCP server. - - Args: - url: The URL of the MCP server. - parameters: Dictionary containing tool name and parameters. - - Returns: - Dictionary containing the tool execution results. - - Raises: - ValueError: If tool execution fails. - """ - try: - logger.info(f"Executing MCP tool with URL: {url}") - logger.debug(f"Tool parameters: {parameters}") - - result = asyncio.run( - _execute_mcp_tool( - url=url, - parameters=parameters, - ) - ) - - logger.info("MCP tool execution completed successfully") - logger.debug(f"Tool execution result: {result}") - return result - except Exception as e: - logger.error(f"Error in execute_mcp_tool: {str(e)}") - raise ValueError(f"Error in execute_mcp_tool: {str(e)}") - - -def find_and_execute_tool( - urls: List[str], tool_name: str, parameters: Dict[Any, Any] -) -> Dict[Any, Any]: - """ - Find a tool across multiple servers and execute it with the given parameters. - - Args: - urls: List of server URLs to search through. - tool_name: Name of the tool to find and execute. - parameters: Parameters to pass to the tool. - - Returns: - Dict containing the tool execution results. - - Raises: - ValueError: If tool is not found on any server or execution fails. - """ - try: - # Search for tool across all servers - for url in urls: - try: - tools = list_all(url) - # Check if tool exists on this server - if any(tool["name"] == tool_name for tool in tools): - # Prepare parameters in correct format - tool_params = {"name": tool_name, **parameters} - # Execute tool on this server - return execute_mcp_tool( - url=url, parameters=tool_params - ) - except Exception: - # Skip servers that fail and continue searching - continue - - raise ValueError( - f"Tool '{tool_name}' not found on any provided servers" - ) - except Exception as e: - raise ValueError(f"Error in find_and_execute_tool: {str(e)}") diff --git a/swarms/tools/mcp_client_call.py b/swarms/tools/mcp_client_call.py index 0570aeda..e1a7d903 100644 --- a/swarms/tools/mcp_client_call.py +++ b/swarms/tools/mcp_client_call.py @@ -1,19 +1,164 @@ -import litellm +import os +import concurrent.futures import asyncio import contextlib +import json import random from functools import wraps -from typing import Any, Dict, List +from typing import Any, Dict, List, Literal, Optional, Union +from concurrent.futures import ThreadPoolExecutor, as_completed -from litellm.experimental_mcp_client import ( - call_openai_tool, - load_mcp_tools, -) +from litellm.types.utils import ChatCompletionMessageToolCall from loguru import logger from mcp import ClientSession from mcp.client.sse import sse_client +from mcp.types import ( + CallToolRequestParams as MCPCallToolRequestParams, +) +from mcp.types import CallToolResult as MCPCallToolResult +from mcp.types import Tool as MCPTool +from openai.types.chat import ChatCompletionToolParam +from openai.types.shared_params.function_definition import ( + FunctionDefinition, +) + +from swarms.schemas.mcp_schemas import ( + MCPConnection, +) +from swarms.utils.index import exists + + +class MCPError(Exception): + """Base exception for MCP related errors.""" + + pass + + +class MCPConnectionError(MCPError): + """Raised when there are issues connecting to the MCP server.""" + + pass -import os + +class MCPToolError(MCPError): + """Raised when there are issues with MCP tool operations.""" + + pass + + +class MCPValidationError(MCPError): + """Raised when there are validation issues with MCP operations.""" + + pass + + +class MCPExecutionError(MCPError): + """Raised when there are issues executing MCP operations.""" + + pass + + +######################################################## +# List MCP Tool functions +######################################################## +def transform_mcp_tool_to_openai_tool( + mcp_tool: MCPTool, +) -> ChatCompletionToolParam: + """Convert an MCP tool to an OpenAI tool.""" + return ChatCompletionToolParam( + type="function", + function=FunctionDefinition( + name=mcp_tool.name, + description=mcp_tool.description or "", + parameters=mcp_tool.inputSchema, + strict=False, + ), + ) + + +async def load_mcp_tools( + session: ClientSession, format: Literal["mcp", "openai"] = "mcp" +) -> Union[List[MCPTool], List[ChatCompletionToolParam]]: + """ + Load all available MCP tools + + Args: + session: The MCP session to use + format: The format to convert the tools to + By default, the tools are returned in MCP format. + + If format is set to "openai", the tools are converted to OpenAI API compatible tools. + """ + tools = await session.list_tools() + if format == "openai": + return [ + transform_mcp_tool_to_openai_tool(mcp_tool=tool) + for tool in tools.tools + ] + return tools.tools + + +######################################################## +# Call MCP Tool functions +######################################################## + + +async def call_mcp_tool( + session: ClientSession, + call_tool_request_params: MCPCallToolRequestParams, +) -> MCPCallToolResult: + """Call an MCP tool.""" + tool_result = await session.call_tool( + name=call_tool_request_params.name, + arguments=call_tool_request_params.arguments, + ) + return tool_result + + +def _get_function_arguments(function: FunctionDefinition) -> dict: + """Helper to safely get and parse function arguments.""" + arguments = function.get("arguments", {}) + if isinstance(arguments, str): + try: + arguments = json.loads(arguments) + except json.JSONDecodeError: + arguments = {} + return arguments if isinstance(arguments, dict) else {} + + +def transform_openai_tool_call_request_to_mcp_tool_call_request( + openai_tool: Union[ChatCompletionMessageToolCall, Dict], +) -> MCPCallToolRequestParams: + """Convert an OpenAI ChatCompletionMessageToolCall to an MCP CallToolRequestParams.""" + function = openai_tool["function"] + return MCPCallToolRequestParams( + name=function["name"], + arguments=_get_function_arguments(function), + ) + + +async def call_openai_tool( + session: ClientSession, + openai_tool: dict, +) -> MCPCallToolResult: + """ + Call an OpenAI tool using MCP client. + + Args: + session: The MCP session to use + openai_tool: The OpenAI tool to call. You can get this from the `choices[0].message.tool_calls[0]` of the response from the OpenAI API. + Returns: + The result of the MCP tool call. + """ + mcp_tool_call_request_params = ( + transform_openai_tool_call_request_to_mcp_tool_call_request( + openai_tool=openai_tool, + ) + ) + return await call_mcp_tool( + session=session, + call_tool_request_params=mcp_tool_call_request_params, + ) def retry_with_backoff(retries=3, backoff_in_seconds=1): @@ -59,15 +204,49 @@ def get_or_create_event_loop(): try: yield loop finally: - if loop.is_running(): - loop.stop() - if not loop.is_closed(): - loop.close() + # Only close the loop if we created it and it's not the main event loop + if loop != asyncio.get_event_loop() and not loop.is_running(): + if not loop.is_closed(): + loop.close() + + +def connect_to_mcp_server(connection: MCPConnection = None): + """Connect to an MCP server. + + Args: + connection (MCPConnection): The connection configuration object + + Returns: + tuple: A tuple containing (headers, timeout, transport, url) + + Raises: + MCPValidationError: If the connection object is invalid + """ + if not isinstance(connection, MCPConnection): + raise MCPValidationError("Invalid connection type") + + # Direct attribute access is faster than property access + headers = dict(connection.headers or {}) + if connection.authorization_token: + headers["Authorization"] = ( + f"Bearer {connection.authorization_token}" + ) + + return ( + headers, + connection.timeout or 5, + connection.transport or "sse", + connection.url, + ) @retry_with_backoff(retries=3) async def aget_mcp_tools( - server_path: str, format: str = "openai", *args, **kwargs + server_path: Optional[str] = None, + format: str = "openai", + connection: Optional[MCPConnection] = None, + *args, + **kwargs, ) -> List[Dict[str, Any]]: """ Fetch available MCP tools from the server with retry logic. @@ -79,16 +258,26 @@ async def aget_mcp_tools( List[Dict[str, Any]]: List of available MCP tools in OpenAI format Raises: - ValueError: If server_path is invalid - ConnectionError: If connection to server fails + MCPValidationError: If server_path is invalid + MCPConnectionError: If connection to server fails """ - if not server_path or not isinstance(server_path, str): - raise ValueError("Invalid server path provided") + if exists(connection): + headers, timeout, transport, url = connect_to_mcp_server( + connection + ) + else: + headers, timeout, transport, url = None, 5, None, server_path logger.info(f"Fetching MCP tools from server: {server_path}") try: - async with sse_client(server_path, *args, **kwargs) as ( + async with sse_client( + url=server_path, + headers=headers, + timeout=timeout, + *args, + **kwargs, + ) as ( read, write, ): @@ -103,17 +292,17 @@ async def aget_mcp_tools( return tools except Exception as e: logger.error(f"Error fetching MCP tools: {str(e)}") - raise - - -async def get_mcp_tools( - server_path: str, *args, **kwargs -) -> List[Dict[str, Any]]: - return await aget_mcp_tools(server_path, *args, **kwargs) + raise MCPConnectionError( + f"Failed to connect to MCP server: {str(e)}" + ) def get_mcp_tools_sync( - server_path: str, format: str = "openai", *args, **kwargs + server_path: Optional[str] = None, + format: str = "openai", + connection: Optional[MCPConnection] = None, + *args, + **kwargs, ) -> List[Dict[str, Any]]: """ Synchronous version of get_mcp_tools that handles event loop management. @@ -125,145 +314,169 @@ def get_mcp_tools_sync( List[Dict[str, Any]]: List of available MCP tools in OpenAI format Raises: - ValueError: If server_path is invalid - ConnectionError: If connection to server fails - RuntimeError: If event loop management fails + MCPValidationError: If server_path is invalid + MCPConnectionError: If connection to server fails + MCPExecutionError: If event loop management fails """ with get_or_create_event_loop() as loop: try: return loop.run_until_complete( - aget_mcp_tools(server_path, format, *args, **kwargs) + aget_mcp_tools( + server_path=server_path, + format=format, + connection=connection, + *args, + **kwargs, + ) ) except Exception as e: logger.error(f"Error in get_mcp_tools_sync: {str(e)}") - raise + raise MCPExecutionError( + f"Failed to execute MCP tools sync: {str(e)}" + ) + + +def _fetch_tools_for_server(url: str, connection: Optional[MCPConnection] = None, format: str = "openai") -> List[Dict[str, Any]]: + """Helper function to fetch tools for a single server.""" + return get_mcp_tools_sync( + server_path=url, + connection=connection, + format=format, + ) + + +def get_tools_for_multiple_mcp_servers( + urls: List[str], + connections: List[MCPConnection] = None, + format: str = "openai", + output_type: Literal["json", "dict", "str"] = "str", + max_workers: Optional[int] = None, +) -> List[Dict[str, Any]]: + """Get tools for multiple MCP servers concurrently using ThreadPoolExecutor. + + Args: + urls: List of server URLs to fetch tools from + connections: Optional list of MCPConnection objects corresponding to each URL + format: Format to return tools in (default: "openai") + output_type: Type of output format (default: "str") + max_workers: Maximum number of worker threads (default: None, uses min(32, os.cpu_count() + 4)) + + Returns: + List[Dict[str, Any]]: Combined list of tools from all servers + """ + tools = [] + threads = min(32, os.cpu_count() + 4) if max_workers is None else max_workers + with ThreadPoolExecutor(max_workers=max_workers) as executor: + if exists(connections): + # Create future tasks for each URL-connection pair + future_to_url = { + executor.submit(_fetch_tools_for_server, url, connection, format): url + for url, connection in zip(urls, connections) + } + else: + # Create future tasks for each URL without connections + future_to_url = { + executor.submit(_fetch_tools_for_server, url, None, format): url + for url in urls + } + + # Process completed futures as they come in + for future in as_completed(future_to_url): + url = future_to_url[future] + try: + server_tools = future.result() + tools.extend(server_tools) + except Exception as e: + logger.error(f"Error fetching tools from {url}: {str(e)}") + raise MCPExecutionError(f"Failed to fetch tools from {url}: {str(e)}") + + return tools -async def execute_tool_call( - server_path: str, - messages: List[Dict[str, Any]], - model: str = "o3-mini", +async def _execute_tool_call_simple( + response: any = None, + server_path: str = None, + connection: Optional[MCPConnection] = None, + output_type: Literal["json", "dict", "str"] = "str", *args, **kwargs, -) -> Dict[str, Any]: - """ - Execute a tool call using the MCP client with retry logic. +): + """Execute a tool call using the MCP client.""" + if exists(connection): + headers, timeout, transport, url = connect_to_mcp_server( + connection + ) + else: + headers, timeout, transport, url = None, 5, "sse", server_path - Args: - server_path (str): Path to the MCP server script - messages (List[Dict[str, Any]]): Current conversation messages - model (str): The model to use for completion (default: "gpt-4") + try: + async with sse_client( + url=url, headers=headers, timeout=timeout, *args, **kwargs + ) as ( + read, + write, + ): + async with ClientSession(read, write) as session: + try: + await session.initialize() - Returns: - Dict[str, Any]: Final LLM response after tool execution + call_result = await call_openai_tool( + session=session, + openai_tool=response, + ) - Raises: - ValueError: If inputs are invalid - ConnectionError: If connection to server fails - RuntimeError: If tool execution fails - """ + if output_type == "json": + out = call_result.model_dump_json(indent=4) + elif output_type == "dict": + out = call_result.model_dump() + elif output_type == "str": + data = call_result.model_dump() + formatted_lines = [] + for key, value in data.items(): + if isinstance(value, list): + for item in value: + if isinstance(item, dict): + for k, v in item.items(): + formatted_lines.append( + f"{k}: {v}" + ) + else: + formatted_lines.append( + f"{key}: {value}" + ) + out = "\n".join(formatted_lines) + + return out - async with sse_client(server_path, *args, **kwargs) as ( - read, - write, - ): - async with ClientSession(read, write) as session: - try: - # Initialize the connection - await session.initialize() + except Exception as e: + logger.error(f"Error in tool execution: {str(e)}") + raise MCPExecutionError( + f"Tool execution failed: {str(e)}" + ) - # Get tools - tools = await load_mcp_tools( - session=session, format="openai" - ) - logger.info(f"Tools: {tools}") - - # First LLM call to get tool call - llm_response = await litellm.acompletion( - model=model, - api_key=os.getenv("OPENAI_API_KEY"), - messages=messages, - tools=tools, - tool_choice="auto", - # parallel_tool_calls=True, - ) - logger.info(f"Initial LLM Response: {llm_response}") - - message = llm_response["choices"][0]["message"] - if not message.get("tool_calls"): - logger.warning("No tool calls in LLM response") - return llm_response - - # Call the tool using MCP client - openai_tool = message["tool_calls"][0] - call_result = await call_openai_tool( - session=session, - openai_tool=openai_tool, - ) - logger.info(f"Tool call completed: {call_result}") - - # Update messages with tool result - messages.append(message) - messages.append( - { - "role": "tool", - "content": str(call_result.content[0].text), - "tool_call_id": openai_tool["id"], - } - ) - logger.debug( - "Updated messages with tool result", - extra={"messages": messages}, - ) + except Exception as e: + logger.error(f"Error in SSE client connection: {str(e)}") + raise MCPConnectionError( + f"Failed to connect to MCP server: {str(e)}" + ) + + +async def execute_tool_call_simple( + response: any = None, + server_path: str = None, + connection: Optional[MCPConnection] = None, + output_type: Literal["json", "dict", "str", "formatted"] = "str", + *args, + **kwargs, +) -> List[Dict[str, Any]]: + return await _execute_tool_call_simple( + response=response, + server_path=server_path, + connection=connection, + output_type=output_type, + *args, + **kwargs, + ) - # Second LLM call with tool result - final_response = await litellm.acompletion( - model=model, - api_key=os.getenv("OPENAI_API_KEY"), - messages=messages, - tools=tools, - tool_choice="auto", - # parallel_tool_calls=True, - ) - logger.info(f"Final LLM Response: {final_response}") - return final_response - except Exception as e: - logger.error(f"Error in execute_tool_call: {str(e)}") - raise RuntimeError(f"Tool execution failed: {str(e)}") - - -# def execute_tool_call_sync( -# server_path: str, -# tool_call: Dict[str, Any], -# task: str, -# *args, -# **kwargs, -# ) -> Dict[str, Any]: -# """ -# Synchronous version of execute_tool_call that handles event loop management. - -# Args: -# server_path (str): Path to the MCP server script -# tool_call (Dict[str, Any]): The OpenAI tool call to execute -# messages (List[Dict[str, Any]]): Current conversation messages - -# Returns: -# Dict[str, Any]: Final LLM response after tool execution - -# Raises: -# ValueError: If inputs are invalid -# ConnectionError: If connection to server fails -# RuntimeError: If event loop management fails -# """ -# with get_or_create_event_loop() as loop: -# try: -# return loop.run_until_complete( -# execute_tool_call( -# server_path, tool_call, task, *args, **kwargs -# ) -# ) -# except Exception as e: -# logger.error(f"Error in execute_tool_call_sync: {str(e)}") -# raise diff --git a/swarms/tools/mcp_integration.py b/swarms/tools/mcp_integration.py deleted file mode 100644 index acc02dd0..00000000 --- a/swarms/tools/mcp_integration.py +++ /dev/null @@ -1,340 +0,0 @@ -from __future__ import annotations - -from typing import Any - - -from loguru import logger - -import abc -import asyncio -from contextlib import AbstractAsyncContextManager, AsyncExitStack -from pathlib import Path -from typing import Literal - -from anyio.streams.memory import ( - MemoryObjectReceiveStream, - MemoryObjectSendStream, -) -from mcp import ( - ClientSession, - StdioServerParameters, - Tool as MCPTool, - stdio_client, -) -from mcp.client.sse import sse_client -from mcp.types import CallToolResult, JSONRPCMessage -from typing_extensions import NotRequired, TypedDict - - -class MCPServer(abc.ABC): - """Base class for Model Context Protocol servers.""" - - @abc.abstractmethod - async def connect(self): - """Connect to the server. For example, this might mean spawning a subprocess or - opening a network connection. The server is expected to remain connected until - `cleanup()` is called. - """ - pass - - @property - @abc.abstractmethod - def name(self) -> str: - """A readable name for the server.""" - pass - - @abc.abstractmethod - async def cleanup(self): - """Cleanup the server. For example, this might mean closing a subprocess or - closing a network connection. - """ - pass - - @abc.abstractmethod - async def list_tools(self) -> list[MCPTool]: - """List the tools available on the server.""" - pass - - @abc.abstractmethod - async def call_tool( - self, tool_name: str, arguments: dict[str, Any] | None - ) -> CallToolResult: - """Invoke a tool on the server.""" - pass - - -class _MCPServerWithClientSession(MCPServer, abc.ABC): - """Base class for MCP servers that use a `ClientSession` to communicate with the server.""" - - def __init__(self, cache_tools_list: bool): - """ - Args: - cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be - cached and only fetched from the server once. If `False`, the tools list will be - fetched from the server on each call to `list_tools()`. The cache can be invalidated - by calling `invalidate_tools_cache()`. You should set this to `True` if you know the - server will not change its tools list, because it can drastically improve latency - (by avoiding a round-trip to the server every time). - """ - self.session: ClientSession | None = None - self.exit_stack: AsyncExitStack = AsyncExitStack() - self._cleanup_lock: asyncio.Lock = asyncio.Lock() - self.cache_tools_list = cache_tools_list - - # The cache is always dirty at startup, so that we fetch tools at least once - self._cache_dirty = True - self._tools_list: list[MCPTool] | None = None - - @abc.abstractmethod - def create_streams( - self, - ) -> AbstractAsyncContextManager[ - tuple[ - MemoryObjectReceiveStream[JSONRPCMessage | Exception], - MemoryObjectSendStream[JSONRPCMessage], - ] - ]: - """Create the streams for the server.""" - pass - - async def __aenter__(self): - await self.connect() - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await self.cleanup() - - def invalidate_tools_cache(self): - """Invalidate the tools cache.""" - self._cache_dirty = True - - async def connect(self): - """Connect to the server.""" - try: - transport = await self.exit_stack.enter_async_context( - self.create_streams() - ) - read, write = transport - session = await self.exit_stack.enter_async_context( - ClientSession(read, write) - ) - await session.initialize() - self.session = session - except Exception as e: - logger.error(f"Error initializing MCP server: {e}") - await self.cleanup() - raise - - async def list_tools(self) -> list[MCPTool]: - """List the tools available on the server.""" - if not self.session: - raise Exception( - "Server not initialized. Make sure you call `connect()` first." - ) - - # Return from cache if caching is enabled, we have tools, and the cache is not dirty - if ( - self.cache_tools_list - and not self._cache_dirty - and self._tools_list - ): - return self._tools_list - - # Reset the cache dirty to False - self._cache_dirty = False - - # Fetch the tools from the server - self._tools_list = (await self.session.list_tools()).tools - return self._tools_list - - async def call_tool( - self, arguments: dict[str, Any] | None - ) -> CallToolResult: - """Invoke a tool on the server.""" - tool_name = arguments.get("tool_name") or arguments.get( - "name" - ) - - if not tool_name: - raise Exception("No tool name found in arguments") - - if not self.session: - raise Exception( - "Server not initialized. Make sure you call `connect()` first." - ) - - return await self.session.call_tool(tool_name, arguments) - - async def cleanup(self): - """Cleanup the server.""" - async with self._cleanup_lock: - try: - await self.exit_stack.aclose() - self.session = None - except Exception as e: - logger.error(f"Error cleaning up server: {e}") - - -class MCPServerStdioParams(TypedDict): - """Mirrors `mcp.client.stdio.StdioServerParameters`, but lets you pass params without another - import. - """ - - command: str - """The executable to run to start the server. For example, `python` or `node`.""" - - args: NotRequired[list[str]] - """Command line args to pass to the `command` executable. For example, `['foo.py']` or - `['server.js', '--port', '8080']`.""" - - env: NotRequired[dict[str, str]] - """The environment variables to set for the server. .""" - - cwd: NotRequired[str | Path] - """The working directory to use when spawning the process.""" - - encoding: NotRequired[str] - """The text encoding used when sending/receiving messages to the server. Defaults to `utf-8`.""" - - encoding_error_handler: NotRequired[ - Literal["strict", "ignore", "replace"] - ] - """The text encoding error handler. Defaults to `strict`. - - See https://docs.python.org/3/library/codecs.html#codec-base-classes for - explanations of possible values. - """ - - -class MCPServerStdio(_MCPServerWithClientSession): - """MCP server implementation that uses the stdio transport. See the [spec] - (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) for - details. - """ - - def __init__( - self, - params: MCPServerStdioParams, - cache_tools_list: bool = False, - name: str | None = None, - ): - """Create a new MCP server based on the stdio transport. - - Args: - params: The params that configure the server. This includes the command to run to - start the server, the args to pass to the command, the environment variables to - set for the server, the working directory to use when spawning the process, and - the text encoding used when sending/receiving messages to the server. - cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be - cached and only fetched from the server once. If `False`, the tools list will be - fetched from the server on each call to `list_tools()`. The cache can be - invalidated by calling `invalidate_tools_cache()`. You should set this to `True` - if you know the server will not change its tools list, because it can drastically - improve latency (by avoiding a round-trip to the server every time). - name: A readable name for the server. If not provided, we'll create one from the - command. - """ - super().__init__(cache_tools_list) - - self.params = StdioServerParameters( - command=params["command"], - args=params.get("args", []), - env=params.get("env"), - cwd=params.get("cwd"), - encoding=params.get("encoding", "utf-8"), - encoding_error_handler=params.get( - "encoding_error_handler", "strict" - ), - ) - - self._name = name or f"stdio: {self.params.command}" - - def create_streams( - self, - ) -> AbstractAsyncContextManager[ - tuple[ - MemoryObjectReceiveStream[JSONRPCMessage | Exception], - MemoryObjectSendStream[JSONRPCMessage], - ] - ]: - """Create the streams for the server.""" - return stdio_client(self.params) - - @property - def name(self) -> str: - """A readable name for the server.""" - return self._name - - -class MCPServerSseParams(TypedDict): - """Mirrors the params in`mcp.client.sse.sse_client`.""" - - url: str - """The URL of the server.""" - - headers: NotRequired[dict[str, str]] - """The headers to send to the server.""" - - timeout: NotRequired[float] - """The timeout for the HTTP request. Defaults to 5 seconds.""" - - sse_read_timeout: NotRequired[float] - """The timeout for the SSE connection, in seconds. Defaults to 5 minutes.""" - - -class MCPServerSse(_MCPServerWithClientSession): - """MCP server implementation that uses the HTTP with SSE transport. See the [spec] - (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) - for details. - """ - - def __init__( - self, - params: MCPServerSseParams, - cache_tools_list: bool = False, - name: str | None = None, - ): - """Create a new MCP server based on the HTTP with SSE transport. - - Args: - params: The params that configure the server. This includes the URL of the server, - the headers to send to the server, the timeout for the HTTP request, and the - timeout for the SSE connection. - - cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be - cached and only fetched from the server once. If `False`, the tools list will be - fetched from the server on each call to `list_tools()`. The cache can be - invalidated by calling `invalidate_tools_cache()`. You should set this to `True` - if you know the server will not change its tools list, because it can drastically - improve latency (by avoiding a round-trip to the server every time). - - name: A readable name for the server. If not provided, we'll create one from the - URL. - """ - super().__init__(cache_tools_list) - - self.params = params - self._name = name or f"sse: {self.params['url']}" - - def create_streams( - self, - ) -> AbstractAsyncContextManager[ - tuple[ - MemoryObjectReceiveStream[JSONRPCMessage | Exception], - MemoryObjectSendStream[JSONRPCMessage], - ] - ]: - """Create the streams for the server.""" - return sse_client( - url=self.params["url"], - headers=self.params.get("headers", None), - timeout=self.params.get("timeout", 5), - sse_read_timeout=self.params.get( - "sse_read_timeout", 60 * 5 - ), - ) - - @property - def name(self) -> str: - """A readable name for the server.""" - return self._name diff --git a/swarms/utils/index.py b/swarms/utils/index.py new file mode 100644 index 00000000..4d98b11a --- /dev/null +++ b/swarms/utils/index.py @@ -0,0 +1,2 @@ +def exists(val): + return val is not None diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index c77fdd6e..159ee61d 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -1,3 +1,4 @@ +from typing import Optional import base64 import requests @@ -7,24 +8,10 @@ from typing import List from loguru import logger import litellm -try: - from litellm import completion, acompletion -except ImportError: - import subprocess - import sys - import litellm +from litellm import completion, acompletion - print("Installing litellm") - - subprocess.check_call( - [sys.executable, "-m", "pip", "install", "-U", "litellm"] - ) - print("litellm installed") - - from litellm import completion - - litellm.set_verbose = True - litellm.ssl_verify = False +litellm.set_verbose = True +litellm.ssl_verify = False class LiteLLMException(Exception): @@ -81,11 +68,13 @@ class LiteLLM: max_completion_tokens: int = 4000, tools_list_dictionary: List[dict] = None, tool_choice: str = "auto", - parallel_tool_calls: bool = False, + parallel_tool_calls: bool = True, audio: str = None, retries: int = 3, verbose: bool = False, caching: bool = False, + mcp_call: bool = False, + top_p: float = 1.0, *args, **kwargs, ): @@ -110,6 +99,8 @@ class LiteLLM: self.tool_choice = tool_choice self.parallel_tool_calls = parallel_tool_calls self.caching = caching + self.mcp_call = mcp_call + self.top_p = top_p self.modalities = [] self._cached_messages = {} # Cache for prepared messages self.messages = [] # Initialize messages list @@ -124,7 +115,29 @@ class LiteLLM: ) def output_for_tools(self, response: any): - return response["choices"][0]["message"]["tool_calls"][0] + if self.mcp_call is True: + out = response.choices[0].message.tool_calls[0].function + output = { + "function": { + "name": out.name, + "arguments": out.arguments, + } + } + return output + elif self.parallel_tool_calls is True: + output = [] + for tool_call in response.choices[0].message.tool_calls: + output.append( + { + "function": { + "name": tool_call.function.name, + "arguments": tool_call.function.arguments, + } + } + ) + else: + out = response.choices[0].message.tool_calls[0] + return out def _prepare_messages(self, task: str) -> list: """ @@ -225,8 +238,8 @@ class LiteLLM: def run( self, task: str, - audio: str = None, - img: str = None, + audio: Optional[str] = None, + img: Optional[str] = None, *args, **kwargs, ): @@ -253,38 +266,28 @@ class LiteLLM: self.handle_modalities( task=task, audio=audio, img=img ) - messages = ( - self.messages - ) # Use modality-processed messages - - if ( - self.model_name == "openai/o4-mini" - or self.model_name == "openai/o3-2025-04-16" - ): - # Prepare common completion parameters - completion_params = { - "model": self.model_name, - "messages": messages, - "stream": self.stream, - # "temperature": self.temperature, - "max_completion_tokens": self.max_tokens, - "caching": self.caching, - **kwargs, - } + messages = self.messages - else: - # Prepare common completion parameters - completion_params = { - "model": self.model_name, - "messages": messages, - "stream": self.stream, - "temperature": self.temperature, - "max_tokens": self.max_tokens, - "caching": self.caching, - **kwargs, - } + # Base completion parameters + completion_params = { + "model": self.model_name, + "messages": messages, + "stream": self.stream, + "max_tokens": self.max_tokens, + "caching": self.caching, + "temperature": self.temperature, + "top_p": self.top_p, + **kwargs, + } - # Handle tool-based completion + # Add temperature for non-o4/o3 models + if self.model_name not in [ + "openai/o4-mini", + "openai/o3-2025-04-16", + ]: + completion_params["temperature"] = self.temperature + + # Add tools if specified if self.tools_list_dictionary is not None: completion_params.update( { @@ -293,35 +296,20 @@ class LiteLLM: "parallel_tool_calls": self.parallel_tool_calls, } ) - response = completion(**completion_params) - return ( - response.choices[0] - .message.tool_calls[0] - .function.arguments - ) - # Handle modality-based completion - if ( - self.modalities and len(self.modalities) > 1 - ): # More than just text - completion_params.update( - {"modalities": self.modalities} - ) - response = completion(**completion_params) - return response.choices[0].message.content + # Add modalities if needed + if self.modalities and len(self.modalities) > 1: + completion_params["modalities"] = self.modalities - # Standard completion - if self.stream: - output = completion(**completion_params) - else: - response = completion(**completion_params) - - if self.tools_list_dictionary is not None: - return self.output_for_tools(response) - else: - return response.choices[0].message.content + # Make the completion call + response = completion(**completion_params) - return output + # Handle tool-based response + if self.tools_list_dictionary is not None: + return self.output_for_tools(response) + else: + # Return standard response content + return response.choices[0].message.content except LiteLLMException as error: logger.error(f"Error in LiteLLM run: {str(error)}") @@ -331,7 +319,7 @@ class LiteLLM: ) import time - time.sleep(2) # Add a small delay before retry + time.sleep(2) return self.run(task, audio, img, *args, **kwargs) raise error