diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index eb5a7abc..6fa058c7 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -23,7 +23,6 @@ import yaml from loguru import logger from pydantic import BaseModel -from swarms.agents.agent_print import agent_print from swarms.agents.ape_agent import auto_generate_prompt from swarms.artifacts.main_artifact import Artifact from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3 @@ -31,6 +30,10 @@ from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, ) from swarms.prompts.tools import tool_sop_prompt +from swarms.schemas.agent_mcp_errors import ( + AgentMCPConnectionError, + AgentMCPToolError, +) from swarms.schemas.agent_step_schemas import ManySteps, Step from swarms.schemas.base_schemas import ( AgentChatCompletionResponse, @@ -46,14 +49,9 @@ from swarms.structs.safe_loading import ( ) from swarms.telemetry.main import log_agent_data from swarms.tools.base_tool import BaseTool -from swarms.tools.mcp_client import ( - execute_mcp_tool, - find_and_execute_tool, - list_all, - list_tools_for_multiple_urls, +from swarms.tools.py_func_to_openai_func_str import ( + convert_multiple_functions_to_openai_function_schema, ) -from swarms.tools.mcp_integration import MCPServerSseParams -from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.any_to_str import any_to_str from swarms.utils.data_to_text import data_to_text from swarms.utils.file_processing import create_file_in_folder @@ -64,10 +62,22 @@ from swarms.utils.history_output_formatter import ( from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.pdf_to_text import pdf_to_text -from swarms.utils.str_to_dict import str_to_dict from swarms.prompts.react_base_prompt import REACT_SYS_PROMPT from swarms.prompts.max_loop_prompt import generate_reasoning_prompt -from swarms.structs.agent_non_serializable import restore_non_serializable_properties +from swarms.prompts.safety_prompt import SAFETY_PROMPT +from swarms.structs.ma_utils import set_random_models_for_agents +from swarms.tools.mcp_client_call import ( + execute_tool_call_simple, + get_mcp_tools_sync, +) +from swarms.schemas.mcp_schemas import ( + MCPConnection, +) +from swarms.utils.index import ( + exists, + format_data_structure, + format_dict_to_string, +) # Utils @@ -89,10 +99,6 @@ def agent_id(): return uuid.uuid4().hex -def exists(val): - return val is not None - - # Agent output types ToolUsageType = Union[BaseModel, Dict[str, Any]] @@ -358,9 +364,9 @@ class Agent: log_directory: str = None, tool_system_prompt: str = tool_sop_prompt(), max_tokens: int = 4096, - frequency_penalty: float = 0.0, - presence_penalty: float = 0.0, - temperature: float = 0.1, + frequency_penalty: float = 0.8, + presence_penalty: float = 0.6, + temperature: float = 0.5, workspace_dir: str = "agent_workspace", timeout: Optional[int] = None, # short_memory: Optional[str] = None, @@ -374,7 +380,6 @@ class Agent: "%Y-%m-%d %H:%M:%S", time.localtime() ), agent_output: ManySteps = None, - executor_workers: int = os.cpu_count(), data_memory: Optional[Callable] = None, load_yaml_path: str = None, auto_generate_prompt: bool = False, @@ -395,10 +400,13 @@ class Agent: role: agent_roles = "worker", no_print: bool = False, tools_list_dictionary: Optional[List[Dict[str, Any]]] = None, - mcp_servers: MCPServerSseParams = None, - mcp_url: str = None, + mcp_url: Optional[Union[str, MCPConnection]] = None, mcp_urls: List[str] = None, react_on: bool = False, + safety_prompt_on: bool = False, + random_models_on: bool = False, + mcp_config: Optional[MCPConnection] = None, + top_p: float = 0.90, *args, **kwargs, ): @@ -415,6 +423,7 @@ class Agent: self.stopping_token = stopping_token self.interactive = interactive self.dashboard = dashboard + self.saved_state_path = saved_state_path self.return_history = return_history self.dynamic_temperature_enabled = dynamic_temperature_enabled self.dynamic_loops = dynamic_loops @@ -517,10 +526,13 @@ class Agent: self.role = role self.no_print = no_print self.tools_list_dictionary = tools_list_dictionary - self.mcp_servers = mcp_servers self.mcp_url = mcp_url self.mcp_urls = mcp_urls self.react_on = react_on + self.safety_prompt_on = safety_prompt_on + self.random_models_on = random_models_on + self.mcp_config = mcp_config + self.top_p = top_p self._cached_llm = ( None # Add this line to cache the LLM instance @@ -532,41 +544,58 @@ class Agent: self.feedback = [] # self.init_handling() - # Define tasks as pairs of (function, condition) - # Each task will only run if its condition is True self.setup_config() if exists(self.docs_folder): self.get_docs_from_doc_folders() - if exists(self.tools): - self.handle_tool_init() - if exists(self.tool_schema) or exists(self.list_base_models): self.handle_tool_schema_ops() if exists(self.sop) or exists(self.sop_list): self.handle_sop_ops() + if self.max_loops >= 2: + self.system_prompt += generate_reasoning_prompt( + self.max_loops + ) + + if self.react_on is True: + self.system_prompt += REACT_SYS_PROMPT + + self.short_memory = self.short_memory_init() + # Run sequential operations after all concurrent tasks are done # self.agent_output = self.agent_output_model() log_agent_data(self.to_dict()) + if exists(self.tools): + self.tool_handling() + if self.llm is None: self.llm = self.llm_handling() - if self.mcp_url or self.mcp_servers is not None: - self.add_mcp_tools_to_memory() + if self.random_models_on is True: + self.model_name = set_random_models_for_agents() - if self.react_on is True: - self.system_prompt += REACT_SYS_PROMPT + def tool_handling(self): - if self.max_loops >= 2: - self.system_prompt += generate_reasoning_prompt( - self.max_loops + self.tool_struct = BaseTool( + tools=self.tools, + verbose=self.verbose, + ) + + # Convert all the tools into a list of dictionaries + self.tools_list_dictionary = ( + convert_multiple_functions_to_openai_function_schema( + self.tools ) + ) - self.short_memory = self.short_memory_init() + self.short_memory.add( + role=f"{self.agent_name}", + content=f"Tools available: {format_data_structure(self.tools_list_dictionary)}", + ) def short_memory_init(self): if ( @@ -577,8 +606,11 @@ class Agent: else: prompt = self.system_prompt + if self.safety_prompt_on is True: + prompt += SAFETY_PROMPT + # Initialize the short term memory - self.short_memory = Conversation( + memory = Conversation( system_prompt=prompt, time_enabled=False, user=self.user_name, @@ -586,7 +618,7 @@ class Agent: token_count=False, ) - return self.short_memory + return memory def agent_output_model(self): # Many steps @@ -616,6 +648,11 @@ class Agent: if self.model_name is None: self.model_name = "gpt-4o-mini" + if exists(self.tools) and len(self.tools) >= 2: + parallel_tool_calls = True + else: + parallel_tool_calls = False + try: # Simplify initialization logic common_args = { @@ -634,10 +671,16 @@ class Agent: **common_args, tools_list_dictionary=self.tools_list_dictionary, tool_choice="auto", - parallel_tool_calls=len( - self.tools_list_dictionary - ) - > 1, + parallel_tool_calls=parallel_tool_calls, + ) + + elif self.mcp_url is not None: + self._cached_llm = LiteLLM( + **common_args, + tools_list_dictionary=self.add_mcp_tools_to_memory(), + tool_choice="auto", + parallel_tool_calls=parallel_tool_calls, + mcp_call=True, ) else: self._cached_llm = LiteLLM( @@ -651,48 +694,6 @@ class Agent: ) return None - def handle_tool_init(self): - # Initialize the tool struct - if ( - exists(self.tools) - or exists(self.list_base_models) - or exists(self.tool_schema) - ): - - self.tool_struct = BaseTool( - tools=self.tools, - base_models=self.list_base_models, - tool_system_prompt=self.tool_system_prompt, - ) - - if self.tools is not None: - logger.info( - "Tools provided make sure the functions have documentation ++ type hints, otherwise tool execution won't be reliable." - ) - # Add the tool prompt to the memory - self.short_memory.add( - role="system", content=self.tool_system_prompt - ) - - # Log the tools - logger.info( - f"Tools provided: Accessing {len(self.tools)} tools" - ) - - # Transform the tools into an openai schema - # self.convert_tool_into_openai_schema() - - # Transform the tools into an openai schema - tool_dict = ( - self.tool_struct.convert_tool_into_openai_schema() - ) - self.short_memory.add(role="system", content=tool_dict) - - # Now create a function calling map for every tools - self.function_map = { - tool.__name__: tool for tool in self.tools - } - def add_mcp_tools_to_memory(self): """ Adds MCP tools to the agent's short-term memory. @@ -704,110 +705,23 @@ class Agent: Exception: If there's an error accessing the MCP tools """ try: - if self.mcp_url is not None: - tools_available = list_all( - self.mcp_url, output_type="json" - ) - self.short_memory.add( - role="Tools Available", - content=f"\n{tools_available}", - ) - - elif ( - self.mcp_url is None - and self.mcp_urls is not None - and len(self.mcp_urls) > 1 - ): - tools_available = list_tools_for_multiple_urls( - urls=self.mcp_urls, - output_type="json", - ) - - self.short_memory.add( - role="Tools Available", - content=f"\n{tools_available}", - ) - except Exception as e: - logger.error(f"Error adding MCP tools to memory: {e}") - raise e - - def _single_mcp_tool_handling(self, response: any): - """ - Handles execution of a single MCP tool. - - Args: - response (str): The tool response to process - - Raises: - Exception: If there's an error executing the tool - """ - try: - if isinstance(response, dict): - result = response - print(type(result)) + if exists(self.mcp_url): + tools = get_mcp_tools_sync(server_path=self.mcp_url) + elif exists(self.mcp_config): + tools = get_mcp_tools_sync(connection=self.mcp_config) + logger.info(f"Tools: {tools}") else: - result = str_to_dict(response) - print(type(result)) - - output = execute_mcp_tool( - url=self.mcp_url, - parameters=result, - ) - - self.short_memory.add( - role="Tool Executor", content=str(output) - ) - except Exception as e: - logger.error(f"Error in single MCP tool handling: {e}") - raise e - - def _multiple_mcp_tool_handling(self, response: any): - """ - Handles execution of multiple MCP tools. - - Args: - response (any): The tool response to process - - Raises: - Exception: If there's an error executing the tools - """ - try: - if isinstance(response, str): - response = str_to_dict(response) - - execution = find_and_execute_tool( - self.mcp_urls, - response["name"], - parameters=response, - ) - - self.short_memory.add( - role="Tool Executor", content=str(execution) + raise AgentMCPConnectionError( + "mcp_url must be either a string URL or MCPConnection object" + ) + self.pretty_print( + f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')} ✨", + loop_count=0, ) - except Exception as e: - logger.error(f"Error in multiple MCP tool handling: {e}") - raise e - - def mcp_tool_handling(self, response: any): - """ - Main handler for MCP tool execution. - - Args: - response (any): The tool response to process - Raises: - ValueError: If no MCP URL or MCP Servers are provided - Exception: If there's an error in tool handling - """ - try: - # if self.mcp_url is not None: - self._single_mcp_tool_handling(response) - # elif self.mcp_url is None and len(self.mcp_servers) > 1: - # self._multiple_mcp_tool_handling(response) - # else: - # raise ValueError("No MCP URL or MCP Servers provided") - except Exception as e: - logger.error(f"Error in mcp_tool_handling: {e}") + return tools + except AgentMCPConnectionError as e: + logger.error(f"Error in MCP connection: {e}") raise e def setup_config(self): @@ -1091,60 +1005,67 @@ class Agent: *response_args, **kwargs ) - # Convert to a str if the response is not a str + if exists(self.tools_list_dictionary): + if isinstance(response, BaseModel): + response = response.model_dump() + + # # Convert to a str if the response is not a str + # if self.mcp_url is None or self.tools is None: response = self.parse_llm_output(response) self.short_memory.add( - role=self.agent_name, content=response + role=self.agent_name, + content=format_dict_to_string(response), ) # Print self.pretty_print(response, loop_count) - # Output Cleaner - self.output_cleaner_op(response) - - ####### MCP TOOL HANDLING ####### - if ( - self.mcp_servers - and self.tools_list_dictionary is not None - ): - self.mcp_tool_handling(response) - - ####### MCP TOOL HANDLING ####### + # # Output Cleaner + # self.output_cleaner_op(response) # Check and execute tools - if self.tools is not None: - out = self.parse_and_execute_tools( - response + if exists(self.tools): + # out = self.parse_and_execute_tools( + # response + # ) + + # self.short_memory.add( + # role="Tool Executor", content=out + # ) + + # if self.no_print is False: + # agent_print( + # f"{self.agent_name} - Tool Executor", + # out, + # loop_count, + # self.streaming_on, + # ) + + # out = self.call_llm(task=out) + + # self.short_memory.add( + # role=self.agent_name, content=out + # ) + + # if self.no_print is False: + # agent_print( + # f"{self.agent_name} - Agent Analysis", + # out, + # loop_count, + # self.streaming_on, + # ) + + self.execute_tools( + response=response, + loop_count=loop_count, ) - self.short_memory.add( - role="Tool Executor", content=out + if exists(self.mcp_url): + self.mcp_tool_handling( + response, loop_count ) - if self.no_print is False: - agent_print( - f"{self.agent_name} - Tool Executor", - out, - loop_count, - self.streaming_on, - ) - - out = self.call_llm(task=out) - - self.short_memory.add( - role=self.agent_name, content=out - ) - - if self.no_print is False: - agent_print( - f"{self.agent_name} - Agent Analysis", - out, - loop_count, - self.streaming_on, - ) - self.sentiment_and_evaluator(response) success = True # Mark as successful to exit the retry loop @@ -1362,36 +1283,36 @@ class Agent: return output.getvalue() - def parse_and_execute_tools(self, response: str, *args, **kwargs): - max_retries = 3 # Maximum number of retries - retries = 0 - while retries < max_retries: - try: - logger.info("Executing tool...") - - # try to Execute the tool and return a string - out = parse_and_execute_json( - functions=self.tools, - json_string=response, - parse_md=True, - *args, - **kwargs, - ) - logger.info(f"Tool Output: {out}") - # Add the output to the memory - # self.short_memory.add( - # role="Tool Executor", - # content=out, - # ) - return out - except Exception as error: - retries += 1 - logger.error( - f"Attempt {retries}: Error executing tool: {error}" - ) - if retries == max_retries: - raise error - time.sleep(1) # Wait for a bit before retrying + # def parse_and_execute_tools(self, response: str, *args, **kwargs): + # max_retries = 3 # Maximum number of retries + # retries = 0 + # while retries < max_retries: + # try: + # logger.info("Executing tool...") + + # # try to Execute the tool and return a string + # out = parse_and_execute_json( + # functions=self.tools, + # json_string=response, + # parse_md=True, + # *args, + # **kwargs, + # ) + # logger.info(f"Tool Output: {out}") + # # Add the output to the memory + # # self.short_memory.add( + # # role="Tool Executor", + # # content=out, + # # ) + # return out + # except Exception as error: + # retries += 1 + # logger.error( + # f"Attempt {retries}: Error executing tool: {error}" + # ) + # if retries == max_retries: + # raise error + # time.sleep(1) # Wait for a bit before retrying def add_memory(self, message: str): """Add a memory to the agent @@ -1720,9 +1641,6 @@ class Agent: # Reinitialize any necessary runtime components self._reinitialize_after_load() - # Restore non-serializable properties (tokenizer, long_term_memory, logger_handler, agent_output, executor) - self.restore_non_serializable_properties() - if self.verbose: self._log_loaded_state_info(resolved_path) @@ -2709,7 +2627,7 @@ class Agent: f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]", ) - def parse_llm_output(self, response: Any) -> str: + def parse_llm_output(self, response: Any): """Parse and standardize the output from the LLM. Args: @@ -2722,7 +2640,7 @@ class Agent: ValueError: If the response format is unexpected and can't be handled """ try: - # Handle dictionary responses + if isinstance(response, dict): if "choices" in response: return response["choices"][0]["message"][ @@ -2732,17 +2650,23 @@ class Agent: response ) # Convert other dicts to string - # Handle string responses - elif isinstance(response, str): - return response + elif isinstance(response, BaseModel): + out = response.model_dump() - # Handle list responses (from check_llm_outputs) - elif isinstance(response, list): - return "\n".join(response) + # Handle List[BaseModel] responses + elif ( + isinstance(response, list) + and response + and isinstance(response[0], BaseModel) + ): + return [item.model_dump() for item in response] - # Handle any other type by converting to string + elif isinstance(response, list): + out = format_data_structure(response) else: - return str(response) + out = str(response) + + return out except Exception as e: logger.error(f"Error parsing LLM output: {e}") @@ -2780,24 +2704,123 @@ class Agent: content=response, ) - def restore_non_serializable_properties(self): - """ - Restore non-serializable properties for the Agent instance after loading. - This should be called after loading agent state from disk. - """ - restore_non_serializable_properties(self) + def mcp_tool_handling( + self, response: any, current_loop: Optional[int] = 0 + ): + try: + + if exists(self.mcp_url): + # Execute the tool call + tool_response = asyncio.run( + execute_tool_call_simple( + response=response, + server_path=self.mcp_url, + ) + ) + elif exists(self.mcp_config): + # Execute the tool call + tool_response = asyncio.run( + execute_tool_call_simple( + response=response, + connection=self.mcp_config, + ) + ) + else: + raise AgentMCPConnectionError( + "mcp_url must be either a string URL or MCPConnection object" + ) - # Custom serialization for non-serializable properties - def __getstate__(self): - state = self.__dict__.copy() - # Remove non-serializable properties - for prop in ["tokenizer", "long_term_memory", "logger_handler", "agent_output", "executor"]: - if prop in state: - state[prop] = None # Or a serializable placeholder if needed - return state + # Get the text content from the tool response + text_content = ( + tool_response.content[0].text + if tool_response.content + else str(tool_response) + ) + + # Add to the memory + self.short_memory.add( + role="Tool Executor", + content=text_content, + ) + + # Create a temporary LLM instance without tools for the follow-up call + try: + temp_llm = LiteLLM( + model_name=self.model_name, + temperature=self.temperature, + max_tokens=self.max_tokens, + system_prompt=self.system_prompt, + stream=self.streaming_on, + ) + + summary = temp_llm.run( + task=self.short_memory.get_str() + ) + except Exception as e: + logger.error( + f"Error calling LLM after MCP tool execution: {e}" + ) + # Fallback: provide a default summary + summary = "I successfully executed the MCP tool and retrieved the information above." + + self.pretty_print(summary, loop_count=current_loop) + + # Add to the memory + self.short_memory.add( + role=self.agent_name, content=summary + ) + except AgentMCPToolError as e: + logger.error(f"Error in MCP tool: {e}") + raise e + + def execute_tools(self, response: any, loop_count: int): + + output = ( + self.tool_struct.execute_function_calls_from_api_response( + response + ) + ) + + self.short_memory.add( + role="Tool Executor", + content=format_data_structure(output), + ) + + self.pretty_print( + f"{format_data_structure(output)}", + loop_count, + ) + + # Now run the LLM again without tools - create a temporary LLM instance + # instead of modifying the cached one + # Create a temporary LLM instance without tools for the follow-up call + temp_llm = LiteLLM( + model_name=self.model_name, + temperature=self.temperature, + max_tokens=self.max_tokens, + system_prompt=self.system_prompt, + stream=self.streaming_on, + tools_list_dictionary=None, + parallel_tool_calls=False, + ) + + tool_response = temp_llm.run( + f""" + Please analyze and summarize the following tool execution output in a clear and concise way. + Focus on the key information and insights that would be most relevant to the user's original request. + If there are any errors or issues, highlight them prominently. + + Tool Output: + {output} + """ + ) + + self.short_memory.add( + role=self.agent_name, + content=tool_response, + ) - def __setstate__(self, state): - self.__dict__.update(state) - # Restore non-serializable properties after loading - if hasattr(self, 'restore_non_serializable_properties'): - self.restore_non_serializable_properties() + self.pretty_print( + f"{tool_response}", + loop_count, + ) \ No newline at end of file