diff --git a/swarms/schemas/mcp_schemas.py b/swarms/schemas/mcp_schemas.py index c09259f8..78773dbb 100644 --- a/swarms/schemas/mcp_schemas.py +++ b/swarms/schemas/mcp_schemas.py @@ -1,113 +1,210 @@ -""" -Unified MCP Client for Swarms Framework - -This module provides a unified interface for MCP (Model Context Protocol) operations -with support for multiple transport types: stdio, http, streamable_http, and sse. +from pydantic import BaseModel, Field +from typing import Dict, List, Any, Optional, Literal, Callable -All transport types are optional and can be configured based on requirements. -Streaming support is included for real-time communication. -Dependencies: -- Core MCP: pip install mcp -- Streamable HTTP: pip install mcp[streamable-http] -- HTTP transport: pip install httpx -- All dependencies are optional and gracefully handled +class MCPConnection(BaseModel): + """ + Configuration for MCP (Model Context Protocol) connections. + + This schema supports multiple transport types including stdio, http, + streamable_http, and sse. All transport types are optional and can be + configured based on requirements. Includes streaming support for real-time communication. + """ + + type: Optional[str] = Field( + default="mcp", + description="The type of connection, defaults to 'mcp'", + ) + + url: Optional[str] = Field( + default="http://localhost:8000/mcp", + description="The URL endpoint for the MCP server or command path for stdio", + ) + + transport: Optional[Literal["stdio", "http", "streamable_http", "sse", "auto"]] = Field( + default="streamable_http", + description="The transport protocol to use for the MCP server. 'auto' enables auto-detection.", + ) + + # STDIO specific + command: Optional[List[str]] = Field( + default=None, + description="Command and arguments for stdio transport", + ) + + # HTTP specific + headers: Optional[Dict[str, str]] = Field( + default=None, + description="Headers to send to the MCP server" + ) + + authorization_token: Optional[str] = Field( + default=None, + description="Authentication token for accessing the MCP server", + ) + + timeout: Optional[int] = Field( + default=10, + description="Timeout for the MCP server in seconds" + ) + + # Auto-detection settings + auto_detect: Optional[bool] = Field( + default=True, + description="Whether to auto-detect transport type from URL" + ) + + fallback_transport: Optional[Literal["stdio", "http", "streamable_http", "sse"]] = Field( + default="sse", + description="Fallback transport if auto-detection fails" + ) + + # Streaming settings + enable_streaming: Optional[bool] = Field( + default=True, + description="Whether to enable streaming support for real-time communication" + ) + + streaming_timeout: Optional[int] = Field( + default=None, + description="Timeout for streaming operations in seconds" + ) + + streaming_callback: Optional[Callable[[str], None]] = Field( + default=None, + description="Callback function for streaming chunks" + ) + + # Tool configurations + tool_configurations: Optional[Dict[Any, Any]] = Field( + default=None, + description="Dictionary containing configuration settings for MCP tools", + ) -Transport Types: -- stdio: Local command-line tools (no additional deps) -- http: Standard HTTP communication (requires httpx) -- streamable_http: Real-time HTTP streaming (requires mcp[streamable-http]) -- sse: Server-Sent Events (included with core mcp) -- auto: Auto-detection based on URL scheme -""" + class Config: + arbitrary_types_allowed = True + extra = "allow" -import asyncio -import json -import os -import sys -from concurrent.futures import ThreadPoolExecutor, as_completed -from contextlib import asynccontextmanager -from functools import wraps -from typing import Any, Dict, List, Literal, Optional, Union, AsyncGenerator, Callable -from urllib.parse import urlparse -from loguru import logger -from pydantic import BaseModel, Field +class MultipleMCPConnections(BaseModel): + """ + Configuration for multiple MCP connections. + + This allows managing multiple MCP servers with different transport types + and configurations simultaneously. Includes streaming support. + """ + + connections: List[MCPConnection] = Field( + default=[], + description="List of MCP connections" + ) + + # Global settings for multiple connections + max_concurrent: Optional[int] = Field( + default=None, + description="Maximum number of concurrent connections" + ) + + retry_attempts: Optional[int] = Field( + default=3, + description="Number of retry attempts for failed connections" + ) + + retry_delay: Optional[float] = Field( + default=1.0, + description="Delay between retry attempts in seconds" + ) + + # Global streaming settings + enable_streaming: Optional[bool] = Field( + default=True, + description="Whether to enable streaming support globally" + ) -# Import existing MCP functionality -from swarms.schemas.mcp_schemas import MCPConnection -from swarms.tools.mcp_client_call import ( - MCPConnectionError, - MCPExecutionError, - MCPToolError, - MCPValidationError, - aget_mcp_tools, - execute_multiple_tools_on_multiple_mcp_servers, - execute_multiple_tools_on_multiple_mcp_servers_sync, - execute_tool_call_simple, - get_mcp_tools_sync, - get_or_create_event_loop, -) + class Config: + arbitrary_types_allowed = True -# Try to import MCP libraries -try: - from mcp import ClientSession - from mcp.client.sse import sse_client - from mcp.client.stdio import stdio_client - MCP_AVAILABLE = True -except ImportError: - logger.warning("MCP client libraries not available. Install with: pip install mcp") - MCP_AVAILABLE = False -try: - from mcp.client.streamable_http import streamablehttp_client - STREAMABLE_HTTP_AVAILABLE = True -except ImportError: - logger.warning("Streamable HTTP client not available. Install with: pip install mcp[streamable-http]") - STREAMABLE_HTTP_AVAILABLE = False +class MCPToolConfig(BaseModel): + """ + Configuration for individual MCP tools. + + This allows fine-grained control over tool behavior and settings. + Includes streaming support for individual tools. + """ + + name: str = Field( + description="Name of the tool" + ) + + description: Optional[str] = Field( + default=None, + description="Description of the tool" + ) + + enabled: bool = Field( + default=True, + description="Whether the tool is enabled" + ) + + timeout: Optional[int] = Field( + default=None, + description="Tool-specific timeout in seconds" + ) + + retry_attempts: Optional[int] = Field( + default=None, + description="Tool-specific retry attempts" + ) + + parameters: Optional[Dict[str, Any]] = Field( + default=None, + description="Tool-specific parameters" + ) + + # Tool-specific streaming settings + enable_streaming: Optional[bool] = Field( + default=True, + description="Whether to enable streaming for this specific tool" + ) + + streaming_timeout: Optional[int] = Field( + default=None, + description="Tool-specific streaming timeout in seconds" + ) -try: - import httpx - HTTPX_AVAILABLE = True -except ImportError: - logger.warning("HTTPX not available. Install with: pip install httpx") - HTTPX_AVAILABLE = False + class Config: + arbitrary_types_allowed = True -class UnifiedTransportConfig(BaseModel): +class MCPTransportConfig(BaseModel): """ - Unified configuration for MCP transport types. + Detailed transport configuration for MCP connections. - This extends the existing MCPConnection schema with additional - transport-specific options and auto-detection capabilities. - Includes streaming support for real-time communication. + This provides advanced configuration options for each transport type. + Includes comprehensive streaming support. """ - # Transport type - can be auto-detected transport_type: Literal["stdio", "http", "streamable_http", "sse", "auto"] = Field( - default="auto", - description="The transport type to use. 'auto' enables auto-detection." + description="The transport type to use" ) - # Connection details + # Connection settings url: Optional[str] = Field( default=None, - description="URL for HTTP-based transports or stdio command path" + description="URL for HTTP-based transports or command path for stdio" ) - # STDIO specific command: Optional[List[str]] = Field( default=None, description="Command and arguments for stdio transport" ) - # HTTP specific headers: Optional[Dict[str, str]] = Field( default=None, description="HTTP headers for HTTP-based transports" ) - # Common settings timeout: int = Field( default=30, description="Timeout in seconds" @@ -124,12 +221,32 @@ class UnifiedTransportConfig(BaseModel): description="Whether to auto-detect transport type from URL" ) - # Fallback settings fallback_transport: Literal["stdio", "http", "streamable_http", "sse"] = Field( default="sse", description="Fallback transport if auto-detection fails" ) + # Advanced settings + max_retries: int = Field( + default=3, + description="Maximum number of retry attempts" + ) + + retry_delay: float = Field( + default=1.0, + description="Delay between retry attempts in seconds" + ) + + keep_alive: bool = Field( + default=True, + description="Whether to keep the connection alive" + ) + + verify_ssl: bool = Field( + default=True, + description="Whether to verify SSL certificates for HTTPS connections" + ) + # Streaming settings enable_streaming: bool = Field( default=True, @@ -138,652 +255,253 @@ class UnifiedTransportConfig(BaseModel): streaming_timeout: Optional[int] = Field( default=None, - description="Timeout for streaming operations" + description="Timeout for streaming operations in seconds" ) - streaming_callback: Optional[Callable[[str], None]] = Field( - default=None, - description="Optional callback function for streaming chunks" + streaming_buffer_size: Optional[int] = Field( + default=1024, + description="Buffer size for streaming operations" ) - - -class MCPUnifiedClient: - """ - Unified MCP client that supports multiple transport types. - - This client integrates with the existing swarms framework and provides - a unified interface for all MCP operations with streaming support. - """ - def __init__(self, config: Union[UnifiedTransportConfig, MCPConnection, str]): - """ - Initialize the unified MCP client. - - Args: - config: Transport configuration (UnifiedTransportConfig, MCPConnection, or URL string) - """ - self.config = self._normalize_config(config) - self._validate_config() - - def _normalize_config(self, config: Union[UnifiedTransportConfig, MCPConnection, str]) -> UnifiedTransportConfig: - """ - Normalize different config types to UnifiedTransportConfig. - - Args: - config: Configuration in various formats - - Returns: - Normalized UnifiedTransportConfig - """ - if isinstance(config, str): - # URL string - create config with auto-detection - return UnifiedTransportConfig( - url=config, - transport_type="auto", - auto_detect=True, - enable_streaming=True - ) - elif isinstance(config, MCPConnection): - # Convert existing MCPConnection to UnifiedTransportConfig - return UnifiedTransportConfig( - transport_type=config.transport or "auto", - url=config.url, - headers=config.headers, - timeout=config.timeout or 30, - authorization_token=config.authorization_token, - auto_detect=True, - enable_streaming=True - ) - elif isinstance(config, UnifiedTransportConfig): - return config - else: - raise ValueError(f"Unsupported config type: {type(config)}") - - def _validate_config(self) -> None: - """Validate the transport configuration.""" - if not MCP_AVAILABLE: - raise ImportError("MCP client libraries are required") - - if self.config.transport_type == "streamable_http" and not STREAMABLE_HTTP_AVAILABLE: - raise ImportError("Streamable HTTP transport requires mcp[streamable-http]") - - if self.config.transport_type == "http" and not HTTPX_AVAILABLE: - raise ImportError("HTTP transport requires httpx") - - def _auto_detect_transport(self, url: str) -> str: - """ - Auto-detect transport type from URL. - - Args: - url: The URL to analyze - - Returns: - Detected transport type - """ - if not url: - return "stdio" - - parsed = urlparse(url) - scheme = parsed.scheme.lower() - - if scheme in ("http", "https"): - if STREAMABLE_HTTP_AVAILABLE and self.config.enable_streaming: - return "streamable_http" - else: - return "http" - elif scheme in ("ws", "wss"): - return "sse" - elif scheme == "" or "stdio" in url: - return "stdio" - else: - return self.config.fallback_transport - - def _get_effective_transport(self) -> str: - """ - Get the effective transport type after auto-detection. - - Returns: - Effective transport type - """ - transport = self.config.transport_type - - if transport == "auto" and self.config.auto_detect and self.config.url: - transport = self._auto_detect_transport(self.config.url) - logger.info(f"Auto-detected transport type: {transport}") - - return transport - - @asynccontextmanager - async def get_client_context(self): - """ - Get the appropriate MCP client context manager. - - Yields: - MCP client context manager - """ - transport_type = self._get_effective_transport() - - if transport_type == "stdio": - command = self.config.command or [self.config.url] if self.config.url else None - if not command: - raise ValueError("Command is required for stdio transport") - async with stdio_client(command) as (read, write): - yield read, write - - elif transport_type == "streamable_http": - if not STREAMABLE_HTTP_AVAILABLE: - raise ImportError("Streamable HTTP transport not available") - if not self.config.url: - raise ValueError("URL is required for streamable_http transport") - async with streamablehttp_client( - self.config.url, - headers=self.config.headers, - timeout=self.config.streaming_timeout or self.config.timeout - ) as (read, write): - yield read, write - - elif transport_type == "http": - if not HTTPX_AVAILABLE: - raise ImportError("HTTP transport requires httpx") - if not self.config.url: - raise ValueError("URL is required for http transport") - async with self._http_client_context() as (read, write): - yield read, write - - elif transport_type == "sse": - if not self.config.url: - raise ValueError("URL is required for sse transport") - async with sse_client( - self.config.url, - headers=self.config.headers, - timeout=self.config.streaming_timeout or self.config.timeout - ) as (read, write): - yield read, write - else: - raise ValueError(f"Unsupported transport type: {transport_type}") - - @asynccontextmanager - async def _http_client_context(self): - """ - HTTP client context manager using httpx. - - Yields: - Tuple of (read, write) functions - """ - if not HTTPX_AVAILABLE: - raise ImportError("HTTPX is required for HTTP transport") - - async with httpx.AsyncClient(timeout=self.config.timeout) as client: - # Create read/write functions for HTTP transport - async def read(): - # Implement HTTP read logic for MCP - try: - response = await client.get(self.config.url) - response.raise_for_status() - return response.text - except Exception as e: - logger.error(f"HTTP read error: {e}") - raise MCPConnectionError(f"HTTP read failed: {e}") - - async def write(data): - # Implement HTTP write logic for MCP - try: - response = await client.post( - self.config.url, - json=data, - headers=self.config.headers or {} - ) - response.raise_for_status() - return response.json() - except Exception as e: - logger.error(f"HTTP write error: {e}") - raise MCPConnectionError(f"HTTP write failed: {e}") - - yield read, write - - async def get_tools(self, format: Literal["mcp", "openai"] = "openai") -> List[Dict[str, Any]]: - """ - Get available tools from the MCP server. - - Args: - format: Output format for tools - - Returns: - List of available tools - """ - async with self.get_client_context() as (read, write): - async with ClientSession(read, write) as session: - await session.initialize() - tools = await session.list_tools() - - if format == "openai": - return [self._convert_mcp_tool_to_openai(tool) for tool in tools.tools] - else: - return [tool.model_dump() for tool in tools.tools] - - def _convert_mcp_tool_to_openai(self, mcp_tool) -> Dict[str, Any]: - """ - Convert MCP tool to OpenAI format. - - Args: - mcp_tool: MCP tool object - - Returns: - OpenAI-compatible tool format - """ - return { - "type": "function", - "function": { - "name": mcp_tool.name, - "description": mcp_tool.description or "", - "parameters": mcp_tool.inputSchema - } - } - - async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: - """ - Call a tool on the MCP server. - - Args: - tool_name: Name of the tool to call - arguments: Tool arguments - - Returns: - Tool execution result - """ - async with self.get_client_context() as (read, write): - async with ClientSession(read, write) as session: - await session.initialize() - result = await session.call_tool(name=tool_name, arguments=arguments) - return result.model_dump() - - async def call_tool_streaming(self, tool_name: str, arguments: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]: - """ - Call a tool on the MCP server with streaming support. - - Args: - tool_name: Name of the tool to call - arguments: Tool arguments - - Yields: - Streaming tool execution results - """ - if not self.config.enable_streaming: - # Fallback to non-streaming - result = await self.call_tool(tool_name, arguments) - yield result - return - - async with self.get_client_context() as (read, write): - async with ClientSession(read, write) as session: - await session.initialize() - - # Use streaming call if available - try: - # Check if streaming method exists - if hasattr(session, 'call_tool_streaming'): - async for result in session.call_tool_streaming(name=tool_name, arguments=arguments): - yield result.model_dump() - else: - # Fallback to non-streaming if streaming not available - logger.warning("Streaming not available in MCP session, falling back to non-streaming") - result = await session.call_tool(name=tool_name, arguments=arguments) - yield result.model_dump() - except AttributeError: - # Fallback to non-streaming if streaming not available - logger.warning("Streaming method not found, falling back to non-streaming") - result = await session.call_tool(name=tool_name, arguments=arguments) - yield result.model_dump() - except Exception as e: - logger.error(f"Error in streaming tool call: {e}") - # Final fallback to non-streaming - try: - result = await session.call_tool(name=tool_name, arguments=arguments) - yield result.model_dump() - except Exception as fallback_error: - logger.error(f"Fallback tool call also failed: {fallback_error}") - raise MCPExecutionError(f"Tool call failed: {fallback_error}") - - def get_tools_sync(self, format: Literal["mcp", "openai"] = "openai") -> List[Dict[str, Any]]: - """ - Synchronous version of get_tools. - - Args: - format: Output format for tools - - Returns: - List of available tools - """ - with get_or_create_event_loop() as loop: - try: - return loop.run_until_complete(self.get_tools(format=format)) - except Exception as e: - logger.error(f"Error in get_tools_sync: {str(e)}") - raise MCPExecutionError(f"Failed to get tools sync: {str(e)}") - - def call_tool_sync(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: - """ - Synchronous version of call_tool. - - Args: - tool_name: Name of the tool to call - arguments: Tool arguments - - Returns: - Tool execution result - """ - with get_or_create_event_loop() as loop: - try: - return loop.run_until_complete(self.call_tool(tool_name, arguments)) - except Exception as e: - logger.error(f"Error in call_tool_sync: {str(e)}") - raise MCPExecutionError(f"Failed to call tool sync: {str(e)}") - - def call_tool_streaming_sync(self, tool_name: str, arguments: Dict[str, Any]) -> List[Dict[str, Any]]: - """ - Synchronous version of call_tool_streaming. - - Args: - tool_name: Name of the tool to call - arguments: Tool arguments - - Returns: - List of streaming tool execution results - """ - with get_or_create_event_loop() as loop: - try: - results = [] - async def collect_streaming_results(): - async for result in self.call_tool_streaming(tool_name, arguments): - results.append(result) - loop.run_until_complete(collect_streaming_results()) - return results - except Exception as e: - logger.error(f"Error in call_tool_streaming_sync: {str(e)}") - raise MCPExecutionError(f"Failed to call tool streaming sync: {str(e)}") - - -# Enhanced functions that work with the unified client -def get_mcp_tools_unified( - config: Union[UnifiedTransportConfig, MCPConnection, str], - format: Literal["mcp", "openai"] = "openai" -) -> List[Dict[str, Any]]: - """ - Get MCP tools using the unified client. - - Args: - config: Transport configuration - format: Output format for tools - - Returns: - List of available tools - """ - client = MCPUnifiedClient(config) - return client.get_tools_sync(format=format) - + streaming_chunk_size: Optional[int] = Field( + default=1024, + description="Chunk size for streaming operations" + ) -async def aget_mcp_tools_unified( - config: Union[UnifiedTransportConfig, MCPConnection, str], - format: Literal["mcp", "openai"] = "openai" -) -> List[Dict[str, Any]]: - """ - Async version of get_mcp_tools_unified. - - Args: - config: Transport configuration - format: Output format for tools - - Returns: - List of available tools - """ - client = MCPUnifiedClient(config) - return await client.get_tools(format=format) + class Config: + arbitrary_types_allowed = True -def execute_tool_call_unified( - config: Union[UnifiedTransportConfig, MCPConnection, str], - tool_name: str, - arguments: Dict[str, Any] -) -> Dict[str, Any]: +class MCPErrorResponse(BaseModel): """ - Execute a tool call using the unified client. - - Args: - config: Transport configuration - tool_name: Name of the tool to call - arguments: Tool arguments - - Returns: - Tool execution result + Standardized error response for MCP operations. """ - client = MCPUnifiedClient(config) - return client.call_tool_sync(tool_name, arguments) - + + error: str = Field( + description="Error message" + ) + + error_type: str = Field( + description="Type of error (e.g., 'connection', 'timeout', 'validation')" + ) + + details: Optional[Dict[str, Any]] = Field( + default=None, + description="Additional error details" + ) + + timestamp: Optional[str] = Field( + default=None, + description="Timestamp when the error occurred" + ) -async def aexecute_tool_call_unified( - config: Union[UnifiedTransportConfig, MCPConnection, str], - tool_name: str, - arguments: Dict[str, Any] -) -> Dict[str, Any]: - """ - Async version of execute_tool_call_unified. - - Args: - config: Transport configuration - tool_name: Name of the tool to call - arguments: Tool arguments - - Returns: - Tool execution result - """ - client = MCPUnifiedClient(config) - return await client.call_tool(tool_name, arguments) + class Config: + arbitrary_types_allowed = True -def execute_tool_call_streaming_unified( - config: Union[UnifiedTransportConfig, MCPConnection, str], - tool_name: str, - arguments: Dict[str, Any] -) -> List[Dict[str, Any]]: +class MCPToolCall(BaseModel): """ - Execute a tool call with streaming using the unified client. - - Args: - config: Transport configuration - tool_name: Name of the tool to call - arguments: Tool arguments - - Returns: - List of streaming tool execution results + Standardized tool call request. """ - client = MCPUnifiedClient(config) - return client.call_tool_streaming_sync(tool_name, arguments) - + + tool_name: str = Field( + description="Name of the tool to call" + ) + + arguments: Dict[str, Any] = Field( + default={}, + description="Arguments to pass to the tool" + ) + + timeout: Optional[int] = Field( + default=None, + description="Timeout for this specific tool call" + ) + + retry_attempts: Optional[int] = Field( + default=None, + description="Retry attempts for this specific tool call" + ) + + # Streaming settings for tool calls + enable_streaming: Optional[bool] = Field( + default=True, + description="Whether to enable streaming for this tool call" + ) + + streaming_timeout: Optional[int] = Field( + default=None, + description="Timeout for streaming operations in this tool call" + ) -async def aexecute_tool_call_streaming_unified( - config: Union[UnifiedTransportConfig, MCPConnection, str], - tool_name: str, - arguments: Dict[str, Any] -) -> AsyncGenerator[Dict[str, Any], None]: - """ - Async version of execute_tool_call_streaming_unified. - - Args: - config: Transport configuration - tool_name: Name of the tool to call - arguments: Tool arguments - - Yields: - Streaming tool execution results - """ - client = MCPUnifiedClient(config) - async for result in client.call_tool_streaming(tool_name, arguments): - yield result + class Config: + arbitrary_types_allowed = True -# Helper functions for creating configurations -def create_stdio_config(command: List[str], **kwargs) -> UnifiedTransportConfig: +class MCPToolResult(BaseModel): """ - Create configuration for stdio transport. - - Args: - command: Command and arguments to run - **kwargs: Additional configuration options - - Returns: - Transport configuration + Standardized tool call result. """ - return UnifiedTransportConfig( - transport_type="stdio", - command=command, - enable_streaming=True, - **kwargs + + success: bool = Field( + description="Whether the tool call was successful" ) - - -def create_http_config(url: str, headers: Optional[Dict[str, str]] = None, **kwargs) -> UnifiedTransportConfig: - """ - Create configuration for HTTP transport. - - Args: - url: Server URL - headers: Optional HTTP headers - **kwargs: Additional configuration options - - Returns: - Transport configuration - """ - return UnifiedTransportConfig( - transport_type="http", - url=url, - headers=headers, - enable_streaming=True, - **kwargs + + result: Optional[Any] = Field( + default=None, + description="Result of the tool call" ) - - -def create_streamable_http_config(url: str, headers: Optional[Dict[str, str]] = None, **kwargs) -> UnifiedTransportConfig: - """ - Create configuration for streamable HTTP transport. - - Args: - url: Server URL - headers: Optional HTTP headers - **kwargs: Additional configuration options - - Returns: - Transport configuration - """ - return UnifiedTransportConfig( - transport_type="streamable_http", - url=url, - headers=headers, - enable_streaming=True, - **kwargs + + error: Optional[str] = Field( + default=None, + description="Error message if the call failed" ) + + execution_time: Optional[float] = Field( + default=None, + description="Execution time in seconds" + ) + + metadata: Optional[Dict[str, Any]] = Field( + default=None, + description="Additional metadata about the execution" + ) + + # Streaming result metadata + is_streaming: Optional[bool] = Field( + default=False, + description="Whether this result is from a streaming operation" + ) + + stream_chunk: Optional[int] = Field( + default=None, + description="Chunk number for streaming results" + ) + + stream_complete: Optional[bool] = Field( + default=False, + description="Whether the streaming operation is complete" + ) + + class Config: + arbitrary_types_allowed = True -def create_sse_config(url: str, headers: Optional[Dict[str, str]] = None, **kwargs) -> UnifiedTransportConfig: +class MCPStreamingConfig(BaseModel): """ - Create configuration for SSE transport. - - Args: - url: Server URL - headers: Optional HTTP headers - **kwargs: Additional configuration options - - Returns: - Transport configuration + Configuration for MCP streaming operations. """ - return UnifiedTransportConfig( - transport_type="sse", - url=url, - headers=headers, - enable_streaming=True, - **kwargs + + enable_streaming: bool = Field( + default=True, + description="Whether to enable streaming support" ) + + streaming_timeout: Optional[int] = Field( + default=None, + description="Timeout for streaming operations in seconds" + ) + + buffer_size: int = Field( + default=1024, + description="Buffer size for streaming operations" + ) + + chunk_size: int = Field( + default=1024, + description="Chunk size for streaming operations" + ) + + max_stream_duration: Optional[int] = Field( + default=None, + description="Maximum duration for streaming operations in seconds" + ) + + enable_compression: bool = Field( + default=False, + description="Whether to enable compression for streaming" + ) + + compression_level: int = Field( + default=6, + description="Compression level (1-9)" + ) + + class Config: + arbitrary_types_allowed = True -def create_auto_config(url: str, **kwargs) -> UnifiedTransportConfig: +class UnifiedTransportConfig(BaseModel): """ - Create configuration with auto-detection. - - Args: - url: Server URL or command - **kwargs: Additional configuration options - - Returns: - Transport configuration + Unified configuration for MCP transport types. + + This extends the existing MCPConnection schema with additional + transport-specific options and auto-detection capabilities. + Includes streaming support for real-time communication. """ - return UnifiedTransportConfig( - transport_type="auto", - url=url, - auto_detect=True, - enable_streaming=True, - **kwargs + + # Transport type - can be auto-detected + transport_type: Literal["stdio", "http", "streamable_http", "sse", "auto"] = Field( + default="auto", + description="The transport type to use. 'auto' enables auto-detection." + ) + + # Connection details + url: Optional[str] = Field( + default=None, + description="URL for HTTP-based transports or stdio command path" + ) + + # STDIO specific + command: Optional[List[str]] = Field( + default=None, + description="Command and arguments for stdio transport" + ) + + # HTTP specific + headers: Optional[Dict[str, str]] = Field( + default=None, + description="HTTP headers for HTTP-based transports" + ) + + # Common settings + timeout: int = Field( + default=30, + description="Timeout in seconds" + ) + + authorization_token: Optional[str] = Field( + default=None, + description="Authentication token for accessing the MCP server" + ) + + # Auto-detection settings + auto_detect: bool = Field( + default=True, + description="Whether to auto-detect transport type from URL" + ) + + # Fallback settings + fallback_transport: Literal["stdio", "http", "streamable_http", "sse"] = Field( + default="sse", + description="Fallback transport if auto-detection fails" + ) + + # Streaming settings + enable_streaming: bool = Field( + default=True, + description="Whether to enable streaming support" + ) + + streaming_timeout: Optional[int] = Field( + default=None, + description="Timeout for streaming operations" + ) + + streaming_callback: Optional[Callable[[str], None]] = Field( + default=None, + description="Optional callback function for streaming chunks" ) - -# Example usage -async def example_unified_usage(): - """Example of how to use the unified MCP client with streaming support.""" - - # Example 1: Auto-detection from URL with streaming - config1 = create_auto_config("http://localhost:8000/mcp") - client1 = MCPUnifiedClient(config1) - - # Example 2: Explicit stdio transport with streaming - config2 = create_stdio_config(["python", "path/to/mcp/server.py"]) - client2 = MCPUnifiedClient(config2) - - # Example 3: Explicit streamable HTTP transport with streaming - config3 = create_streamable_http_config("http://localhost:8001/mcp") - client3 = MCPUnifiedClient(config3) - - # Get tools from different transports - try: - tools1 = await client1.get_tools() - print(f"Auto-detected transport tools: {len(tools1)}") - - tools2 = await client2.get_tools() - print(f"STDIO transport tools: {len(tools2)}") - - tools3 = await client3.get_tools() - print(f"Streamable HTTP transport tools: {len(tools3)}") - - # Example streaming tool call - if tools1: - tool_name = tools1[0]["function"]["name"] - print(f"Calling tool with streaming: {tool_name}") - - async for result in client1.call_tool_streaming(tool_name, {}): - print(f"Streaming result: {result}") - - except Exception as e: - logger.error(f"Error getting tools: {e}") - - -# Export constants for availability checking -MCP_STREAMING_AVAILABLE = MCP_AVAILABLE and STREAMABLE_HTTP_AVAILABLE - -# Export all public functions and classes -__all__ = [ - "MCPUnifiedClient", - "UnifiedTransportConfig", - "create_auto_config", - "create_http_config", - "create_streamable_http_config", - "create_stdio_config", - "create_sse_config", - "MCP_STREAMING_AVAILABLE", - "STREAMABLE_HTTP_AVAILABLE", - "HTTPX_AVAILABLE", - "MCP_AVAILABLE", - "call_tool_streaming_sync", - "execute_tool_call_streaming_unified", -] - - -if __name__ == "__main__": - # Run example - asyncio.run(example_unified_usage()) + class Config: + arbitrary_types_allowed = True