fix mcp issues

pull/983/head
王祥宇 2 days ago
parent 440173817d
commit 298522d527

@ -30,6 +30,7 @@ from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
)
from swarms.tools.mcp_client_call import aget_mcp_tools
from swarms.prompts.tools import tool_sop_prompt
from swarms.schemas.agent_mcp_errors import (
AgentMCPConnectionError,
@ -433,6 +434,7 @@ class Agent:
summarize_multiple_images: bool = False,
tool_retry_attempts: int = 3,
speed_mode: str = None,
lazy_init_mcp: bool = False,
*args,
**kwargs,
):
@ -621,6 +623,37 @@ class Agent:
self.print_dashboard()
self.reliability_check()
self.lazy_init_mcp = lazy_init_mcp
self._mcp_tools_loaded = False
@classmethod
async def create(cls, **kwargs):
"""
Asynchronously creates an Agent instance.
This is the preferred way to create an Agent that uses MCP tools
when running in an async context (like inside FastAPI, Quart, etc.)
Args:
**kwargs: All parameters accepted by Agent.__init__
Returns:
An initialized Agent instance with MCP tools loaded
"""
# 创建带有延迟初始化标志的实例
instance = cls(lazy_init_mcp=True, **kwargs)
# 异步加载 MCP 工具(如果配置了)
if exists(instance.mcp_url) or exists(instance.mcp_urls) or exists(instance.mcp_config):
await instance.async_init_mcp_tools()
# 完成初始化 LLM
if instance.llm is None:
# 使用异步转换方式运行同步函数
instance.llm = await asyncio.to_thread(instance.llm_handling)
return instance
def rag_setup_handling(self):
return AgentRAGHandler(
@ -774,22 +807,21 @@ class Agent:
This function checks for either a single MCP URL or multiple MCP URLs and adds the available tools
to the agent's memory. The tools are listed in JSON format.
Raises:
Exception: If there's an error accessing the MCP tools
"""
# 如果工具已经加载过且处于懒加载模式,直接返回已缓存的工具
if hasattr(self, '_mcp_tools_loaded') and self._mcp_tools_loaded and self.tools_list_dictionary is not None:
return self.tools_list_dictionary
try:
if exists(self.mcp_url):
tools = get_mcp_tools_sync(server_path=self.mcp_url)
elif exists(self.mcp_config):
tools = get_mcp_tools_sync(connection=self.mcp_config)
# logger.info(f"Tools: {tools}")
elif exists(self.mcp_urls):
tools = get_tools_for_multiple_mcp_servers(
urls=self.mcp_urls,
output_type="str",
)
# print(f"Tools: {tools} for {self.mcp_urls}")
else:
raise AgentMCPConnectionError(
"mcp_url must be either a string URL or MCPConnection object"
@ -799,18 +831,71 @@ class Agent:
exists(self.mcp_url)
or exists(self.mcp_urls)
or exists(self.mcp_config)
):
if self.print_on is True:
self.pretty_print(
f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')}",
loop_count=0,
)
) and self.print_on is True:
self.pretty_print(
f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')}",
loop_count=0,
)
# 标记工具已加载并保存
self._mcp_tools_loaded = True
self.tools_list_dictionary = tools
return tools
except AgentMCPConnectionError as e:
logger.error(f"Error in MCP connection: {e}")
raise e
async def async_init_mcp_tools(self):
"""
Asynchronously initialize MCP tools.
This method should be used when the agent is created in an async context
to avoid event loop conflicts.
Returns:
The list of MCP tools
"""
# 如果工具已加载,直接返回
if hasattr(self, '_mcp_tools_loaded') and self._mcp_tools_loaded and self.tools_list_dictionary is not None:
return self.tools_list_dictionary
try:
if exists(self.mcp_url):
tools = await aget_mcp_tools(server_path=self.mcp_url, format="openai")
elif exists(self.mcp_config):
tools = await aget_mcp_tools(connection=self.mcp_config, format="openai")
elif exists(self.mcp_urls):
# 使用异步转换方式运行同步函数
tools = await asyncio.to_thread(
get_tools_for_multiple_mcp_servers,
urls=self.mcp_urls,
output_type="str"
)
else:
raise AgentMCPConnectionError(
"mcp_url must be either a string URL or MCPConnection object"
)
if (
exists(self.mcp_url)
or exists(self.mcp_urls)
or exists(self.mcp_config)
) and self.print_on is True:
# 使用异步转换方式运行同步函数
await asyncio.to_thread(
self.pretty_print,
f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')}",
loop_count=0
)
# 标记工具已加载并保存
self._mcp_tools_loaded = True
self.tools_list_dictionary = tools
return tools
except Exception as e:
logger.error(f"Error in async MCP tools initialization: {e}")
raise AgentMCPConnectionError(f"Failed to initialize MCP tools: {str(e)}")
def setup_config(self):
# The max_loops will be set dynamically if the dynamic_loop
if self.dynamic_loops is True:
@ -996,6 +1081,7 @@ class Agent:
self,
task: Optional[Union[str, Any]] = None,
img: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
) -> Any:
@ -1077,6 +1163,7 @@ class Agent:
task=task_prompt,
img=img,
current_loop=loop_count,
streaming_callback=streaming_callback,
*args,
**kwargs,
)
@ -1084,6 +1171,7 @@ class Agent:
response = self.call_llm(
task=task_prompt,
current_loop=loop_count,
streaming_callback=streaming_callback,
*args,
**kwargs,
)
@ -1110,6 +1198,8 @@ class Agent:
f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ",
loop_count,
)
elif self.streaming_on:
pass
else:
self.pretty_print(
response, loop_count
@ -1239,12 +1329,13 @@ class Agent:
traceback_info = traceback.format_exc()
logger.error(
f"Error detected running your agent {self.agent_name}\n"
f"An error occurred while running your agent {self.agent_name}.\n"
f"Error Type: {error_type}\n"
f"Error Message: {error_message}\n"
f"Traceback:\n{traceback_info}\n"
f"Agent State: {self.to_dict()}\n"
f"Optimize your input parameters and or add an issue on the swarms github and contact our team on discord for support ;)"
f"Please optimize your input parameters, or create an issue on the Swarms GitHub and contact our team on Discord for support. "
f"For technical support, refer to this document: https://docs.swarms.world/en/latest/swarms/support/"
)
raise error
@ -1270,25 +1361,18 @@ class Agent:
"""
Asynchronously runs the agent with the specified parameters.
Args:
task (Optional[str]): The task to be performed. Defaults to None.
img (Optional[str]): The image to be processed. Defaults to None.
is_last (bool): Indicates if this is the last task. Defaults to False.
device (str): The device to use for execution. Defaults to "cpu".
device_id (int): The ID of the GPU to use if device is set to "gpu". Defaults to 1.
all_cores (bool): If True, uses all available CPU cores. Defaults to True.
do_not_use_cluster_ops (bool): If True, does not use cluster operations. Defaults to True.
all_gpus (bool): If True, uses all available GPUs. Defaults to False.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
Any: The result of the asynchronous operation.
Raises:
Exception: If an error occurs during the asynchronous operation.
Enhanced to support proper async initialization of MCP tools if needed.
"""
try:
# 如果需要且尚未加载 MCP 工具,先进行异步初始化
if (exists(self.mcp_url) or exists(self.mcp_urls) or exists(self.mcp_config)) and \
not (hasattr(self, '_mcp_tools_loaded') and self._mcp_tools_loaded):
await self.async_init_mcp_tools()
# 确保 LLM 已初始化并加载了工具
if self.llm is None:
self.llm = await asyncio.to_thread(self.llm_handling)
# 使用原来的方式调用同步 run 函数
return await asyncio.to_thread(
self.run,
task=task,
@ -1297,9 +1381,7 @@ class Agent:
**kwargs,
)
except Exception as error:
await self._handle_run_error(
error
) # Ensure this is also async if needed
await self._handle_run_error(error)
def __call__(
self,
@ -1334,37 +1416,6 @@ class Agent:
)
return self.run(task=improved_prompt, *args, **kwargs)
# def parse_and_execute_tools(self, response: str, *args, **kwargs):
# max_retries = 3 # Maximum number of retries
# retries = 0
# while retries < max_retries:
# try:
# logger.info("Executing tool...")
# # try to Execute the tool and return a string
# out = parse_and_execute_json(
# functions=self.tools,
# json_string=response,
# parse_md=True,
# *args,
# **kwargs,
# )
# logger.info(f"Tool Output: {out}")
# # Add the output to the memory
# # self.short_memory.add(
# # role="Tool Executor",
# # content=out,
# # )
# return out
# except Exception as error:
# retries += 1
# logger.error(
# f"Attempt {retries}: Error executing tool: {error}"
# )
# if retries == max_retries:
# raise error
# time.sleep(1) # Wait for a bit before retrying
def add_memory(self, message: str):
"""Add a memory to the agent
@ -1539,15 +1590,16 @@ class Agent:
if self.tools_list_dictionary is not None:
if not supports_function_calling(self.model_name):
raise AgentInitializationError(
logger.warning(
f"The model '{self.model_name}' does not support function calling. Please use a model that supports function calling."
)
try:
if self.max_tokens > get_max_tokens(self.model_name):
raise AgentInitializationError(
logger.warning(
f"Max tokens is set to {self.max_tokens}, but the model '{self.model_name}' only supports {get_max_tokens(self.model_name)} tokens. Please set max tokens to {get_max_tokens(self.model_name)} or less."
)
except Exception:
pass
@ -2503,6 +2555,7 @@ class Agent:
task: str,
img: Optional[str] = None,
current_loop: int = 0,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
) -> str:
@ -2513,6 +2566,7 @@ class Agent:
task (str): The task to be performed by the `llm` object.
img (str, optional): Path or URL to an image file.
audio (str, optional): Path or URL to an audio file.
streaming_callback (Optional[Callable[[str], None]]): Callback function to receive streaming tokens in real-time.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
@ -2548,8 +2602,24 @@ class Agent:
if hasattr(
streaming_response, "__iter__"
) and not isinstance(streaming_response, str):
# Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration)
if streaming_callback is not None:
# Real-time callback streaming for dashboard integration
chunks = []
for chunk in streaming_response:
if (
hasattr(chunk, "choices")
and chunk.choices[0].delta.content
):
content = chunk.choices[
0
].delta.content
chunks.append(content)
# Call the streaming callback with the new chunk
streaming_callback(content)
complete_response = "".join(chunks)
# Check print_on parameter for different streaming behaviors
if self.print_on is False:
elif self.print_on is False:
# Silent streaming - no printing, just collect chunks
chunks = []
for chunk in streaming_response:
@ -2632,6 +2702,7 @@ class Agent:
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
correct_answer: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
) -> Any:
@ -2646,6 +2717,7 @@ class Agent:
task (Optional[str], optional): The task to be executed. Defaults to None.
img (Optional[str], optional): The image to be processed. Defaults to None.
imgs (Optional[List[str]], optional): The list of images to be processed. Defaults to None.
streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None.
*args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method.
@ -2677,15 +2749,25 @@ class Agent:
output = self._run(
task=task,
img=img,
streaming_callback=streaming_callback,
*args,
**kwargs,
)
return output
except ValueError as e:
except AgentRunError as e:
self._handle_run_error(e)
except KeyboardInterrupt:
logger.warning(
f"Keyboard interrupt detected for agent '{self.agent_name}'. "
"If autosave is enabled, the agent's state will be saved to the workspace directory. "
"To enable autosave, please initialize the agent with Agent(autosave=True)."
"For technical support, refer to this document: https://docs.swarms.world/en/latest/swarms/support/"
)
raise KeyboardInterrupt
def handle_artifacts(
self, text: str, file_output_path: str, file_extension: str
) -> None:
@ -2824,6 +2906,9 @@ class Agent:
if response is None:
response = "No response generated"
if self.streaming_on:
pass
if self.print_on:
formatter.print_panel(
response,
@ -3225,19 +3310,9 @@ class Agent:
f"Agent '{self.agent_name}' received None response from LLM in loop {loop_count}. "
f"This may indicate an issue with the model or prompt. Skipping tool execution."
)
except Exception as e:
except AgentToolExecutionError as e:
logger.error(
f"Agent '{self.agent_name}' encountered error during tool execution in loop {loop_count}: {str(e)}. "
f"Full traceback: {traceback.format_exc()}. "
f"Attempting to retry tool execution with 3 attempts"
)
def add_tool_schema(self, tool_schema: dict):
self.tools_list_dictionary = [tool_schema]
self.output_type = "dict-all-except-first"
def add_multiple_tool_schemas(self, tool_schemas: list[dict]):
self.tools_list_dictionary = tool_schemas
self.output_type = "dict-all-except-first"

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save