Update agent.py

pull/1005/merge^2
CI-DEV 2 months ago committed by GitHub
parent baa2e5d99d
commit 90d8743796
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -77,6 +77,20 @@ from swarms.tools.mcp_client_call import (
get_mcp_tools_sync, get_mcp_tools_sync,
get_tools_for_multiple_mcp_servers, get_tools_for_multiple_mcp_servers,
) )
# Import the unified MCP client for streaming support
try:
from swarms.tools.mcp_unified_client import (
UnifiedMCPClient,
UnifiedTransportConfig,
call_tool_streaming,
call_tool_streaming_sync,
execute_tool_call_streaming_unified,
)
MCP_STREAMING_AVAILABLE = True
except ImportError:
MCP_STREAMING_AVAILABLE = False
logger.warning("MCP streaming support not available - install mcp[streamable-http] for full streaming capabilities")
from swarms.schemas.mcp_schemas import ( from swarms.schemas.mcp_schemas import (
MCPConnection, MCPConnection,
) )
@ -250,6 +264,13 @@ class Agent:
artifacts_output_path (str): The artifacts output path artifacts_output_path (str): The artifacts output path
artifacts_file_extension (str): The artifacts file extension (.pdf, .md, .txt, ) artifacts_file_extension (str): The artifacts file extension (.pdf, .md, .txt, )
scheduled_run_date (datetime): The date and time to schedule the task scheduled_run_date (datetime): The date and time to schedule the task
mcp_url (Union[str, MCPConnection]): MCP server URL or connection object
mcp_urls (List[str]): List of multiple MCP server URLs
mcp_config (MCPConnection): MCP connection configuration
mcp_streaming_enabled (bool): Enable MCP streaming functionality (default: False)
mcp_streaming_callback (Callable): Optional callback for streaming chunks
mcp_streaming_timeout (int): Timeout for MCP streaming in seconds (default: 30)
mcp_enable_streaming (bool): Enable streaming for MCP tools (default: True)
Methods: Methods:
run: Run the agent run: Run the agent
@ -282,6 +303,10 @@ class Agent:
run_async_concurrent: Run the agent asynchronously and concurrently run_async_concurrent: Run the agent asynchronously and concurrently
construct_dynamic_prompt: Construct the dynamic prompt construct_dynamic_prompt: Construct the dynamic prompt
handle_artifacts: Handle artifacts handle_artifacts: Handle artifacts
enable_mcp_streaming: Enable MCP streaming functionality
disable_mcp_streaming: Disable MCP streaming functionality
is_mcp_streaming_available: Check if MCP streaming is available
get_mcp_streaming_status: Get MCP streaming configuration status
Examples: Examples:
@ -296,6 +321,20 @@ class Agent:
>>> response = agent.run("Tell me a long story.") # Will stream in real-time >>> response = agent.run("Tell me a long story.") # Will stream in real-time
>>> print(response) # Final complete response >>> print(response) # Final complete response
>>> # MCP streaming example
>>> agent = Agent(
... model_name="gpt-4o",
... mcp_url="http://localhost:8000/mcp",
... mcp_streaming_enabled=True,
... mcp_streaming_timeout=60
... )
>>> # Enable streaming with custom callback
>>> def streaming_callback(chunk: str):
... print(f"Streaming chunk: {chunk}")
>>> agent.enable_mcp_streaming(timeout=60, callback=streaming_callback)
>>> response = agent.run("Use the MCP tools to analyze this data.")
>>> print(response) # Will show streaming MCP tool execution
""" """
def __init__( def __init__(
@ -432,6 +471,11 @@ class Agent:
tool_retry_attempts: int = 3, tool_retry_attempts: int = 3,
speed_mode: str = None, speed_mode: str = None,
reasoning_prompt_on: bool = True, reasoning_prompt_on: bool = True,
# MCP Streaming parameters
mcp_streaming_enabled: bool = False,
mcp_streaming_callback: Optional[Callable[[str], None]] = None,
mcp_streaming_timeout: int = 30,
mcp_enable_streaming: bool = True,
*args, *args,
**kwargs, **kwargs,
): ):
@ -574,6 +618,10 @@ class Agent:
self.tool_retry_attempts = tool_retry_attempts self.tool_retry_attempts = tool_retry_attempts
self.speed_mode = speed_mode self.speed_mode = speed_mode
self.reasoning_prompt_on = reasoning_prompt_on self.reasoning_prompt_on = reasoning_prompt_on
self.mcp_streaming_enabled = mcp_streaming_enabled
self.mcp_streaming_callback = mcp_streaming_callback
self.mcp_streaming_timeout = mcp_streaming_timeout
self.mcp_enable_streaming = mcp_enable_streaming
# Initialize the feedback # Initialize the feedback
self.feedback = [] self.feedback = []
@ -1294,7 +1342,7 @@ class Agent:
except KeyboardInterrupt as error: except KeyboardInterrupt as error:
self._handle_run_error(error) self._handle_run_error(error)
def __handle_run_error(self, error: any): def __handle_run_error(self, error: Any):
import traceback import traceback
if self.autosave is True: if self.autosave is True:
@ -1318,7 +1366,7 @@ class Agent:
raise error raise error
def _handle_run_error(self, error: any): def _handle_run_error(self, error: Any):
# Handle error directly instead of using daemon thread # Handle error directly instead of using daemon thread
# to ensure proper exception propagation # to ensure proper exception propagation
self.__handle_run_error(error) self.__handle_run_error(error)
@ -2969,81 +3017,197 @@ class Agent:
) )
def mcp_tool_handling( def mcp_tool_handling(
self, response: any, current_loop: Optional[int] = 0 self, response: Any, current_loop: Optional[int] = 0
): ):
"""
Enhanced MCP tool handling with streaming support.
This method handles MCP tool execution with optional streaming capabilities.
It supports both traditional MCP calls and streaming MCP calls based on configuration.
Args:
response: The response from the LLM that may contain tool calls
current_loop: The current iteration loop number for logging
"""
try: try:
# Check if streaming is enabled and available
use_streaming = (
self.mcp_streaming_enabled
and MCP_STREAMING_AVAILABLE
and self.mcp_enable_streaming
)
if use_streaming:
tool_response = self._handle_mcp_streaming(response, current_loop)
else:
tool_response = self._handle_mcp_traditional(response, current_loop)
# Process the tool response
self._process_mcp_response(tool_response, current_loop)
except AgentMCPToolError as e:
logger.error(f"Error in MCP tool: {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error in MCP tool handling: {e}")
raise AgentMCPToolError(f"MCP tool execution failed: {str(e)}")
def _handle_mcp_streaming(self, response: Any, current_loop: int) -> Any:
"""
Handle MCP tool execution with streaming support.
Args:
response: The response from the LLM
current_loop: Current loop iteration
Returns:
The streaming tool response
"""
try:
# Create unified transport config for streaming
config = UnifiedTransportConfig(
enable_streaming=True,
streaming_timeout=self.mcp_streaming_timeout,
streaming_callback=self.mcp_streaming_callback
)
if exists(self.mcp_url): if exists(self.mcp_url):
# Execute the tool call # Single MCP URL with streaming
tool_response = asyncio.run( if self.print_on:
execute_tool_call_simple( formatter.print_panel(
response=response, f"Executing MCP tool with streaming: {self.mcp_url}",
server_path=self.mcp_url, title="[MCP] Streaming Tool Execution",
style="blue"
) )
tool_response = call_tool_streaming_sync(
response=response,
server_path=self.mcp_url,
config=config
) )
elif exists(self.mcp_config): elif exists(self.mcp_config):
# Execute the tool call # MCP config with streaming
tool_response = asyncio.run( if self.print_on:
execute_tool_call_simple( formatter.print_panel(
response=response, f"Executing MCP tool with streaming: {self.mcp_config}",
connection=self.mcp_config, title="[MCP] Streaming Tool Execution",
style="blue"
) )
tool_response = call_tool_streaming_sync(
response=response,
connection=self.mcp_config,
config=config
) )
elif exists(self.mcp_urls): elif exists(self.mcp_urls):
# Multiple MCP URLs - use traditional method for now
# (streaming for multiple servers not yet implemented)
logger.warning("Streaming not yet supported for multiple MCP servers, falling back to traditional method")
tool_response = execute_multiple_tools_on_multiple_mcp_servers_sync( tool_response = execute_multiple_tools_on_multiple_mcp_servers_sync(
responses=response, responses=response,
urls=self.mcp_urls, urls=self.mcp_urls,
output_type="json", output_type="json",
) )
# tool_response = format_data_structure(tool_response)
# print(f"Multiple MCP Tool Response: {tool_response}")
else: else:
raise AgentMCPConnectionError( raise AgentMCPConnectionError(
"mcp_url must be either a string URL or MCPConnection object" "mcp_url must be either a string URL or MCPConnection object"
) )
# Get the text content from the tool response return tool_response
# execute_tool_call_simple returns a string directly, not an object with content attribute
text_content = f"MCP Tool Response: \n\n {json.dumps(tool_response, indent=2)}"
if self.print_on is True: except Exception as e:
formatter.print_panel( logger.error(f"Error in MCP streaming: {e}")
content=text_content, # Fallback to traditional method
title="MCP Tool Response: 🛠️", logger.info("Falling back to traditional MCP method")
style="green", return self._handle_mcp_traditional(response, current_loop)
)
# Add to the memory def _handle_mcp_traditional(self, response: Any, current_loop: int) -> Any:
self.short_memory.add( """
role="Tool Executor", Handle MCP tool execution using traditional (non-streaming) method.
content=text_content,
)
# Create a temporary LLM instance without tools for the follow-up call Args:
try: response: The response from the LLM
temp_llm = self.temp_llm_instance_for_tool_summary() current_loop: Current loop iteration
summary = temp_llm.run( Returns:
task=self.short_memory.get_str() The tool response
"""
if exists(self.mcp_url):
# Execute the tool call
tool_response = asyncio.run(
execute_tool_call_simple(
response=response,
server_path=self.mcp_url,
) )
except Exception as e: )
logger.error( elif exists(self.mcp_config):
f"Error calling LLM after MCP tool execution: {e}" # Execute the tool call
tool_response = asyncio.run(
execute_tool_call_simple(
response=response,
connection=self.mcp_config,
) )
# Fallback: provide a default summary )
summary = "I successfully executed the MCP tool and retrieved the information above." elif exists(self.mcp_urls):
tool_response = execute_multiple_tools_on_multiple_mcp_servers_sync(
responses=response,
urls=self.mcp_urls,
output_type="json",
)
else:
raise AgentMCPConnectionError(
"mcp_url must be either a string URL or MCPConnection object"
)
if self.print_on is True: return tool_response
self.pretty_print(summary, loop_count=current_loop)
# Add to the memory def _process_mcp_response(self, tool_response: Any, current_loop: int) -> None:
self.short_memory.add( """
role=self.agent_name, content=summary Process the MCP tool response and add it to memory.
Args:
tool_response: The response from the MCP tool
current_loop: Current loop iteration
"""
# Get the text content from the tool response
text_content = f"MCP Tool Response: \n\n {json.dumps(tool_response, indent=2)}"
if self.print_on is True:
formatter.print_panel(
content=text_content,
title="MCP Tool Response: [TOOLS]",
style="green",
) )
except AgentMCPToolError as e:
logger.error(f"Error in MCP tool: {e}") # Add to the memory
raise e self.short_memory.add(
role="Tool Executor",
content=text_content,
)
# Create a temporary LLM instance without tools for the follow-up call
try:
temp_llm = self.temp_llm_instance_for_tool_summary()
summary = temp_llm.run(
task=self.short_memory.get_str()
)
except Exception as e:
logger.error(
f"Error calling LLM after MCP tool execution: {e}"
)
# Fallback: provide a default summary
summary = "I successfully executed the MCP tool and retrieved the information above."
if self.print_on is True:
self.pretty_print(summary, loop_count=current_loop)
# Add to the memory
self.short_memory.add(
role=self.agent_name, content=summary
)
def temp_llm_instance_for_tool_summary(self): def temp_llm_instance_for_tool_summary(self):
return LiteLLM( return LiteLLM(
@ -3058,7 +3222,65 @@ class Agent:
api_key=self.llm_api_key, api_key=self.llm_api_key,
) )
def execute_tools(self, response: any, loop_count: int): def enable_mcp_streaming(self, timeout: int = 30, callback: Optional[Callable[[str], None]] = None) -> None:
"""
Enable MCP streaming functionality.
Args:
timeout: Streaming timeout in seconds (default: 30)
callback: Optional callback function for streaming chunks
"""
if not MCP_STREAMING_AVAILABLE:
logger.warning("MCP streaming not available - install mcp[streamable-http] for streaming support")
return
self.mcp_streaming_enabled = True
self.mcp_enable_streaming = True
self.mcp_streaming_timeout = timeout
if callback:
self.mcp_streaming_callback = callback
logger.info(f"MCP streaming enabled with timeout: {timeout}s")
def disable_mcp_streaming(self) -> None:
"""Disable MCP streaming functionality."""
self.mcp_streaming_enabled = False
self.mcp_enable_streaming = False
logger.info("MCP streaming disabled")
def is_mcp_streaming_available(self) -> bool:
"""
Check if MCP streaming is available and enabled.
Returns:
bool: True if streaming is available and enabled
"""
return (
MCP_STREAMING_AVAILABLE
and self.mcp_streaming_enabled
and self.mcp_enable_streaming
)
def get_mcp_streaming_status(self) -> Dict[str, Any]:
"""
Get the current MCP streaming configuration status.
Returns:
Dict containing streaming configuration details
"""
return {
"streaming_available": MCP_STREAMING_AVAILABLE,
"streaming_enabled": self.mcp_streaming_enabled,
"enable_streaming": self.mcp_enable_streaming,
"streaming_timeout": self.mcp_streaming_timeout,
"has_callback": self.mcp_streaming_callback is not None,
"mcp_url": self.mcp_url,
"mcp_config": self.mcp_config,
"mcp_urls": self.mcp_urls
}
def execute_tools(self, response: Any, loop_count: int):
# Handle None response gracefully # Handle None response gracefully
if response is None: if response is None:
logger.warning( logger.warning(
@ -3254,7 +3476,7 @@ class Agent:
f"Failed to find correct answer '{correct_answer}' after {max_attempts} attempts" f"Failed to find correct answer '{correct_answer}' after {max_attempts} attempts"
) )
def tool_execution_retry(self, response: any, loop_count: int): def tool_execution_retry(self, response: Any, loop_count: int):
""" """
Execute tools with retry logic for handling failures. Execute tools with retry logic for handling failures.
@ -3264,9 +3486,9 @@ class Agent:
using the configured retry attempts. using the configured retry attempts.
Args: Args:
response (any): The response from the LLM that may contain tool calls to execute. response: The response from the LLM that may contain tool calls to execute.
Can be None if the LLM failed to provide a valid response. Can be None if the LLM failed to provide a valid response.
loop_count (int): The current iteration loop number for logging and debugging purposes. loop_count: The current iteration loop number for logging and debugging purposes.
Returns: Returns:
None None

Loading…
Cancel
Save