diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 3f726d24..e4074d18 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -77,6 +77,20 @@ from swarms.tools.mcp_client_call import ( get_mcp_tools_sync, get_tools_for_multiple_mcp_servers, ) +# Import the unified MCP client for streaming support +try: + from swarms.tools.mcp_unified_client import ( + UnifiedMCPClient, + UnifiedTransportConfig, + call_tool_streaming, + call_tool_streaming_sync, + execute_tool_call_streaming_unified, + ) + MCP_STREAMING_AVAILABLE = True +except ImportError: + MCP_STREAMING_AVAILABLE = False + logger.warning("MCP streaming support not available - install mcp[streamable-http] for full streaming capabilities") + from swarms.schemas.mcp_schemas import ( MCPConnection, ) @@ -250,6 +264,13 @@ class Agent: artifacts_output_path (str): The artifacts output path artifacts_file_extension (str): The artifacts file extension (.pdf, .md, .txt, ) scheduled_run_date (datetime): The date and time to schedule the task + mcp_url (Union[str, MCPConnection]): MCP server URL or connection object + mcp_urls (List[str]): List of multiple MCP server URLs + mcp_config (MCPConnection): MCP connection configuration + mcp_streaming_enabled (bool): Enable MCP streaming functionality (default: False) + mcp_streaming_callback (Callable): Optional callback for streaming chunks + mcp_streaming_timeout (int): Timeout for MCP streaming in seconds (default: 30) + mcp_enable_streaming (bool): Enable streaming for MCP tools (default: True) Methods: run: Run the agent @@ -282,6 +303,10 @@ class Agent: run_async_concurrent: Run the agent asynchronously and concurrently construct_dynamic_prompt: Construct the dynamic prompt handle_artifacts: Handle artifacts + enable_mcp_streaming: Enable MCP streaming functionality + disable_mcp_streaming: Disable MCP streaming functionality + is_mcp_streaming_available: Check if MCP streaming is available + get_mcp_streaming_status: Get MCP streaming configuration status Examples: @@ -296,6 +321,20 @@ class Agent: >>> response = agent.run("Tell me a long story.") # Will stream in real-time >>> print(response) # Final complete response + >>> # MCP streaming example + >>> agent = Agent( + ... model_name="gpt-4o", + ... mcp_url="http://localhost:8000/mcp", + ... mcp_streaming_enabled=True, + ... mcp_streaming_timeout=60 + ... ) + >>> # Enable streaming with custom callback + >>> def streaming_callback(chunk: str): + ... print(f"Streaming chunk: {chunk}") + >>> agent.enable_mcp_streaming(timeout=60, callback=streaming_callback) + >>> response = agent.run("Use the MCP tools to analyze this data.") + >>> print(response) # Will show streaming MCP tool execution + """ def __init__( @@ -432,6 +471,11 @@ class Agent: tool_retry_attempts: int = 3, speed_mode: str = None, reasoning_prompt_on: bool = True, + # MCP Streaming parameters + mcp_streaming_enabled: bool = False, + mcp_streaming_callback: Optional[Callable[[str], None]] = None, + mcp_streaming_timeout: int = 30, + mcp_enable_streaming: bool = True, *args, **kwargs, ): @@ -574,6 +618,10 @@ class Agent: self.tool_retry_attempts = tool_retry_attempts self.speed_mode = speed_mode self.reasoning_prompt_on = reasoning_prompt_on + self.mcp_streaming_enabled = mcp_streaming_enabled + self.mcp_streaming_callback = mcp_streaming_callback + self.mcp_streaming_timeout = mcp_streaming_timeout + self.mcp_enable_streaming = mcp_enable_streaming # Initialize the feedback self.feedback = [] @@ -1294,7 +1342,7 @@ class Agent: except KeyboardInterrupt as error: self._handle_run_error(error) - def __handle_run_error(self, error: any): + def __handle_run_error(self, error: Any): import traceback if self.autosave is True: @@ -1318,7 +1366,7 @@ class Agent: raise error - def _handle_run_error(self, error: any): + def _handle_run_error(self, error: Any): # Handle error directly instead of using daemon thread # to ensure proper exception propagation self.__handle_run_error(error) @@ -2969,81 +3017,197 @@ class Agent: ) def mcp_tool_handling( - self, response: any, current_loop: Optional[int] = 0 + self, response: Any, current_loop: Optional[int] = 0 ): + """ + Enhanced MCP tool handling with streaming support. + + This method handles MCP tool execution with optional streaming capabilities. + It supports both traditional MCP calls and streaming MCP calls based on configuration. + + Args: + response: The response from the LLM that may contain tool calls + current_loop: The current iteration loop number for logging + """ try: + # Check if streaming is enabled and available + use_streaming = ( + self.mcp_streaming_enabled + and MCP_STREAMING_AVAILABLE + and self.mcp_enable_streaming + ) + + if use_streaming: + tool_response = self._handle_mcp_streaming(response, current_loop) + else: + tool_response = self._handle_mcp_traditional(response, current_loop) + + # Process the tool response + self._process_mcp_response(tool_response, current_loop) + + except AgentMCPToolError as e: + logger.error(f"Error in MCP tool: {e}") + raise e + except Exception as e: + logger.error(f"Unexpected error in MCP tool handling: {e}") + raise AgentMCPToolError(f"MCP tool execution failed: {str(e)}") + def _handle_mcp_streaming(self, response: Any, current_loop: int) -> Any: + """ + Handle MCP tool execution with streaming support. + + Args: + response: The response from the LLM + current_loop: Current loop iteration + + Returns: + The streaming tool response + """ + try: + # Create unified transport config for streaming + config = UnifiedTransportConfig( + enable_streaming=True, + streaming_timeout=self.mcp_streaming_timeout, + streaming_callback=self.mcp_streaming_callback + ) + if exists(self.mcp_url): - # Execute the tool call - tool_response = asyncio.run( - execute_tool_call_simple( - response=response, - server_path=self.mcp_url, + # Single MCP URL with streaming + if self.print_on: + formatter.print_panel( + f"Executing MCP tool with streaming: {self.mcp_url}", + title="[MCP] Streaming Tool Execution", + style="blue" ) + + tool_response = call_tool_streaming_sync( + response=response, + server_path=self.mcp_url, + config=config ) + elif exists(self.mcp_config): - # Execute the tool call - tool_response = asyncio.run( - execute_tool_call_simple( - response=response, - connection=self.mcp_config, + # MCP config with streaming + if self.print_on: + formatter.print_panel( + f"Executing MCP tool with streaming: {self.mcp_config}", + title="[MCP] Streaming Tool Execution", + style="blue" ) + + tool_response = call_tool_streaming_sync( + response=response, + connection=self.mcp_config, + config=config ) + elif exists(self.mcp_urls): + # Multiple MCP URLs - use traditional method for now + # (streaming for multiple servers not yet implemented) + logger.warning("Streaming not yet supported for multiple MCP servers, falling back to traditional method") tool_response = execute_multiple_tools_on_multiple_mcp_servers_sync( responses=response, urls=self.mcp_urls, output_type="json", ) - # tool_response = format_data_structure(tool_response) - - # print(f"Multiple MCP Tool Response: {tool_response}") else: raise AgentMCPConnectionError( "mcp_url must be either a string URL or MCPConnection object" ) + + return tool_response + + except Exception as e: + logger.error(f"Error in MCP streaming: {e}") + # Fallback to traditional method + logger.info("Falling back to traditional MCP method") + return self._handle_mcp_traditional(response, current_loop) - # Get the text content from the tool response - # execute_tool_call_simple returns a string directly, not an object with content attribute - text_content = f"MCP Tool Response: \n\n {json.dumps(tool_response, indent=2)}" - - if self.print_on is True: - formatter.print_panel( - content=text_content, - title="MCP Tool Response: 🛠️", - style="green", + def _handle_mcp_traditional(self, response: Any, current_loop: int) -> Any: + """ + Handle MCP tool execution using traditional (non-streaming) method. + + Args: + response: The response from the LLM + current_loop: Current loop iteration + + Returns: + The tool response + """ + if exists(self.mcp_url): + # Execute the tool call + tool_response = asyncio.run( + execute_tool_call_simple( + response=response, + server_path=self.mcp_url, + ) + ) + elif exists(self.mcp_config): + # Execute the tool call + tool_response = asyncio.run( + execute_tool_call_simple( + response=response, + connection=self.mcp_config, ) + ) + elif exists(self.mcp_urls): + tool_response = execute_multiple_tools_on_multiple_mcp_servers_sync( + responses=response, + urls=self.mcp_urls, + output_type="json", + ) + else: + raise AgentMCPConnectionError( + "mcp_url must be either a string URL or MCPConnection object" + ) + + return tool_response - # Add to the memory - self.short_memory.add( - role="Tool Executor", + def _process_mcp_response(self, tool_response: Any, current_loop: int) -> None: + """ + Process the MCP tool response and add it to memory. + + Args: + tool_response: The response from the MCP tool + current_loop: Current loop iteration + """ + # Get the text content from the tool response + text_content = f"MCP Tool Response: \n\n {json.dumps(tool_response, indent=2)}" + + if self.print_on is True: + formatter.print_panel( content=text_content, + title="MCP Tool Response: [TOOLS]", + style="green", ) - # Create a temporary LLM instance without tools for the follow-up call - try: - temp_llm = self.temp_llm_instance_for_tool_summary() - - summary = temp_llm.run( - task=self.short_memory.get_str() - ) - except Exception as e: - logger.error( - f"Error calling LLM after MCP tool execution: {e}" - ) - # Fallback: provide a default summary - summary = "I successfully executed the MCP tool and retrieved the information above." + # Add to the memory + self.short_memory.add( + role="Tool Executor", + content=text_content, + ) - if self.print_on is True: - self.pretty_print(summary, loop_count=current_loop) + # Create a temporary LLM instance without tools for the follow-up call + try: + temp_llm = self.temp_llm_instance_for_tool_summary() - # Add to the memory - self.short_memory.add( - role=self.agent_name, content=summary + summary = temp_llm.run( + task=self.short_memory.get_str() ) - except AgentMCPToolError as e: - logger.error(f"Error in MCP tool: {e}") - raise e + except Exception as e: + logger.error( + f"Error calling LLM after MCP tool execution: {e}" + ) + # Fallback: provide a default summary + summary = "I successfully executed the MCP tool and retrieved the information above." + + if self.print_on is True: + self.pretty_print(summary, loop_count=current_loop) + + # Add to the memory + self.short_memory.add( + role=self.agent_name, content=summary + ) def temp_llm_instance_for_tool_summary(self): return LiteLLM( @@ -3058,7 +3222,65 @@ class Agent: api_key=self.llm_api_key, ) - def execute_tools(self, response: any, loop_count: int): + def enable_mcp_streaming(self, timeout: int = 30, callback: Optional[Callable[[str], None]] = None) -> None: + """ + Enable MCP streaming functionality. + + Args: + timeout: Streaming timeout in seconds (default: 30) + callback: Optional callback function for streaming chunks + """ + if not MCP_STREAMING_AVAILABLE: + logger.warning("MCP streaming not available - install mcp[streamable-http] for streaming support") + return + + self.mcp_streaming_enabled = True + self.mcp_enable_streaming = True + self.mcp_streaming_timeout = timeout + + if callback: + self.mcp_streaming_callback = callback + + logger.info(f"MCP streaming enabled with timeout: {timeout}s") + + def disable_mcp_streaming(self) -> None: + """Disable MCP streaming functionality.""" + self.mcp_streaming_enabled = False + self.mcp_enable_streaming = False + logger.info("MCP streaming disabled") + + def is_mcp_streaming_available(self) -> bool: + """ + Check if MCP streaming is available and enabled. + + Returns: + bool: True if streaming is available and enabled + """ + return ( + MCP_STREAMING_AVAILABLE + and self.mcp_streaming_enabled + and self.mcp_enable_streaming + ) + + def get_mcp_streaming_status(self) -> Dict[str, Any]: + """ + Get the current MCP streaming configuration status. + + Returns: + Dict containing streaming configuration details + """ + return { + "streaming_available": MCP_STREAMING_AVAILABLE, + "streaming_enabled": self.mcp_streaming_enabled, + "enable_streaming": self.mcp_enable_streaming, + "streaming_timeout": self.mcp_streaming_timeout, + "has_callback": self.mcp_streaming_callback is not None, + "mcp_url": self.mcp_url, + "mcp_config": self.mcp_config, + "mcp_urls": self.mcp_urls + } + + def execute_tools(self, response: Any, loop_count: int): # Handle None response gracefully if response is None: logger.warning( @@ -3254,7 +3476,7 @@ class Agent: f"Failed to find correct answer '{correct_answer}' after {max_attempts} attempts" ) - def tool_execution_retry(self, response: any, loop_count: int): + def tool_execution_retry(self, response: Any, loop_count: int): """ Execute tools with retry logic for handling failures. @@ -3264,9 +3486,9 @@ class Agent: using the configured retry attempts. Args: - response (any): The response from the LLM that may contain tool calls to execute. - Can be None if the LLM failed to provide a valid response. - loop_count (int): The current iteration loop number for logging and debugging purposes. + response: The response from the LLM that may contain tool calls to execute. + Can be None if the LLM failed to provide a valid response. + loop_count: The current iteration loop number for logging and debugging purposes. Returns: None