Revert agent.py to match master

pull/855/head
Pavan Kumar 5 months ago
parent 9c58a314bb
commit 771e1e46db

@ -23,7 +23,6 @@ import yaml
from loguru import logger
from pydantic import BaseModel
from swarms.agents.agent_print import agent_print
from swarms.agents.ape_agent import auto_generate_prompt
from swarms.artifacts.main_artifact import Artifact
from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3
@ -31,6 +30,10 @@ from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
)
from swarms.prompts.tools import tool_sop_prompt
from swarms.schemas.agent_mcp_errors import (
AgentMCPConnectionError,
AgentMCPToolError,
)
from swarms.schemas.agent_step_schemas import ManySteps, Step
from swarms.schemas.base_schemas import (
AgentChatCompletionResponse,
@ -46,14 +49,9 @@ from swarms.structs.safe_loading import (
)
from swarms.telemetry.main import log_agent_data
from swarms.tools.base_tool import BaseTool
from swarms.tools.mcp_client import (
execute_mcp_tool,
find_and_execute_tool,
list_all,
list_tools_for_multiple_urls,
from swarms.tools.py_func_to_openai_func_str import (
convert_multiple_functions_to_openai_function_schema,
)
from swarms.tools.mcp_integration import MCPServerSseParams
from swarms.tools.tool_parse_exec import parse_and_execute_json
from swarms.utils.any_to_str import any_to_str
from swarms.utils.data_to_text import data_to_text
from swarms.utils.file_processing import create_file_in_folder
@ -64,10 +62,22 @@ from swarms.utils.history_output_formatter import (
from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.litellm_wrapper import LiteLLM
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.str_to_dict import str_to_dict
from swarms.prompts.react_base_prompt import REACT_SYS_PROMPT
from swarms.prompts.max_loop_prompt import generate_reasoning_prompt
from swarms.structs.agent_non_serializable import restore_non_serializable_properties
from swarms.prompts.safety_prompt import SAFETY_PROMPT
from swarms.structs.ma_utils import set_random_models_for_agents
from swarms.tools.mcp_client_call import (
execute_tool_call_simple,
get_mcp_tools_sync,
)
from swarms.schemas.mcp_schemas import (
MCPConnection,
)
from swarms.utils.index import (
exists,
format_data_structure,
format_dict_to_string,
)
# Utils
@ -89,10 +99,6 @@ def agent_id():
return uuid.uuid4().hex
def exists(val):
return val is not None
# Agent output types
ToolUsageType = Union[BaseModel, Dict[str, Any]]
@ -358,9 +364,9 @@ class Agent:
log_directory: str = None,
tool_system_prompt: str = tool_sop_prompt(),
max_tokens: int = 4096,
frequency_penalty: float = 0.0,
presence_penalty: float = 0.0,
temperature: float = 0.1,
frequency_penalty: float = 0.8,
presence_penalty: float = 0.6,
temperature: float = 0.5,
workspace_dir: str = "agent_workspace",
timeout: Optional[int] = None,
# short_memory: Optional[str] = None,
@ -374,7 +380,6 @@ class Agent:
"%Y-%m-%d %H:%M:%S", time.localtime()
),
agent_output: ManySteps = None,
executor_workers: int = os.cpu_count(),
data_memory: Optional[Callable] = None,
load_yaml_path: str = None,
auto_generate_prompt: bool = False,
@ -395,10 +400,13 @@ class Agent:
role: agent_roles = "worker",
no_print: bool = False,
tools_list_dictionary: Optional[List[Dict[str, Any]]] = None,
mcp_servers: MCPServerSseParams = None,
mcp_url: str = None,
mcp_url: Optional[Union[str, MCPConnection]] = None,
mcp_urls: List[str] = None,
react_on: bool = False,
safety_prompt_on: bool = False,
random_models_on: bool = False,
mcp_config: Optional[MCPConnection] = None,
top_p: float = 0.90,
*args,
**kwargs,
):
@ -415,6 +423,7 @@ class Agent:
self.stopping_token = stopping_token
self.interactive = interactive
self.dashboard = dashboard
self.saved_state_path = saved_state_path
self.return_history = return_history
self.dynamic_temperature_enabled = dynamic_temperature_enabled
self.dynamic_loops = dynamic_loops
@ -517,10 +526,13 @@ class Agent:
self.role = role
self.no_print = no_print
self.tools_list_dictionary = tools_list_dictionary
self.mcp_servers = mcp_servers
self.mcp_url = mcp_url
self.mcp_urls = mcp_urls
self.react_on = react_on
self.safety_prompt_on = safety_prompt_on
self.random_models_on = random_models_on
self.mcp_config = mcp_config
self.top_p = top_p
self._cached_llm = (
None # Add this line to cache the LLM instance
@ -532,41 +544,58 @@ class Agent:
self.feedback = []
# self.init_handling()
# Define tasks as pairs of (function, condition)
# Each task will only run if its condition is True
self.setup_config()
if exists(self.docs_folder):
self.get_docs_from_doc_folders()
if exists(self.tools):
self.handle_tool_init()
if exists(self.tool_schema) or exists(self.list_base_models):
self.handle_tool_schema_ops()
if exists(self.sop) or exists(self.sop_list):
self.handle_sop_ops()
if self.max_loops >= 2:
self.system_prompt += generate_reasoning_prompt(
self.max_loops
)
if self.react_on is True:
self.system_prompt += REACT_SYS_PROMPT
self.short_memory = self.short_memory_init()
# Run sequential operations after all concurrent tasks are done
# self.agent_output = self.agent_output_model()
log_agent_data(self.to_dict())
if exists(self.tools):
self.tool_handling()
if self.llm is None:
self.llm = self.llm_handling()
if self.mcp_url or self.mcp_servers is not None:
self.add_mcp_tools_to_memory()
if self.random_models_on is True:
self.model_name = set_random_models_for_agents()
if self.react_on is True:
self.system_prompt += REACT_SYS_PROMPT
def tool_handling(self):
if self.max_loops >= 2:
self.system_prompt += generate_reasoning_prompt(
self.max_loops
self.tool_struct = BaseTool(
tools=self.tools,
verbose=self.verbose,
)
self.short_memory = self.short_memory_init()
# Convert all the tools into a list of dictionaries
self.tools_list_dictionary = (
convert_multiple_functions_to_openai_function_schema(
self.tools
)
)
self.short_memory.add(
role=f"{self.agent_name}",
content=f"Tools available: {format_data_structure(self.tools_list_dictionary)}",
)
def short_memory_init(self):
if (
@ -577,8 +606,11 @@ class Agent:
else:
prompt = self.system_prompt
if self.safety_prompt_on is True:
prompt += SAFETY_PROMPT
# Initialize the short term memory
self.short_memory = Conversation(
memory = Conversation(
system_prompt=prompt,
time_enabled=False,
user=self.user_name,
@ -586,7 +618,7 @@ class Agent:
token_count=False,
)
return self.short_memory
return memory
def agent_output_model(self):
# Many steps
@ -616,6 +648,11 @@ class Agent:
if self.model_name is None:
self.model_name = "gpt-4o-mini"
if exists(self.tools) and len(self.tools) >= 2:
parallel_tool_calls = True
else:
parallel_tool_calls = False
try:
# Simplify initialization logic
common_args = {
@ -634,10 +671,16 @@ class Agent:
**common_args,
tools_list_dictionary=self.tools_list_dictionary,
tool_choice="auto",
parallel_tool_calls=len(
self.tools_list_dictionary
parallel_tool_calls=parallel_tool_calls,
)
> 1,
elif self.mcp_url is not None:
self._cached_llm = LiteLLM(
**common_args,
tools_list_dictionary=self.add_mcp_tools_to_memory(),
tool_choice="auto",
parallel_tool_calls=parallel_tool_calls,
mcp_call=True,
)
else:
self._cached_llm = LiteLLM(
@ -651,48 +694,6 @@ class Agent:
)
return None
def handle_tool_init(self):
# Initialize the tool struct
if (
exists(self.tools)
or exists(self.list_base_models)
or exists(self.tool_schema)
):
self.tool_struct = BaseTool(
tools=self.tools,
base_models=self.list_base_models,
tool_system_prompt=self.tool_system_prompt,
)
if self.tools is not None:
logger.info(
"Tools provided make sure the functions have documentation ++ type hints, otherwise tool execution won't be reliable."
)
# Add the tool prompt to the memory
self.short_memory.add(
role="system", content=self.tool_system_prompt
)
# Log the tools
logger.info(
f"Tools provided: Accessing {len(self.tools)} tools"
)
# Transform the tools into an openai schema
# self.convert_tool_into_openai_schema()
# Transform the tools into an openai schema
tool_dict = (
self.tool_struct.convert_tool_into_openai_schema()
)
self.short_memory.add(role="system", content=tool_dict)
# Now create a function calling map for every tools
self.function_map = {
tool.__name__: tool for tool in self.tools
}
def add_mcp_tools_to_memory(self):
"""
Adds MCP tools to the agent's short-term memory.
@ -704,110 +705,23 @@ class Agent:
Exception: If there's an error accessing the MCP tools
"""
try:
if self.mcp_url is not None:
tools_available = list_all(
self.mcp_url, output_type="json"
)
self.short_memory.add(
role="Tools Available",
content=f"\n{tools_available}",
)
elif (
self.mcp_url is None
and self.mcp_urls is not None
and len(self.mcp_urls) > 1
):
tools_available = list_tools_for_multiple_urls(
urls=self.mcp_urls,
output_type="json",
)
self.short_memory.add(
role="Tools Available",
content=f"\n{tools_available}",
)
except Exception as e:
logger.error(f"Error adding MCP tools to memory: {e}")
raise e
def _single_mcp_tool_handling(self, response: any):
"""
Handles execution of a single MCP tool.
Args:
response (str): The tool response to process
Raises:
Exception: If there's an error executing the tool
"""
try:
if isinstance(response, dict):
result = response
print(type(result))
if exists(self.mcp_url):
tools = get_mcp_tools_sync(server_path=self.mcp_url)
elif exists(self.mcp_config):
tools = get_mcp_tools_sync(connection=self.mcp_config)
logger.info(f"Tools: {tools}")
else:
result = str_to_dict(response)
print(type(result))
output = execute_mcp_tool(
url=self.mcp_url,
parameters=result,
)
self.short_memory.add(
role="Tool Executor", content=str(output)
)
except Exception as e:
logger.error(f"Error in single MCP tool handling: {e}")
raise e
def _multiple_mcp_tool_handling(self, response: any):
"""
Handles execution of multiple MCP tools.
Args:
response (any): The tool response to process
Raises:
Exception: If there's an error executing the tools
"""
try:
if isinstance(response, str):
response = str_to_dict(response)
execution = find_and_execute_tool(
self.mcp_urls,
response["name"],
parameters=response,
raise AgentMCPConnectionError(
"mcp_url must be either a string URL or MCPConnection object"
)
self.short_memory.add(
role="Tool Executor", content=str(execution)
self.pretty_print(
f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')}",
loop_count=0,
)
except Exception as e:
logger.error(f"Error in multiple MCP tool handling: {e}")
raise e
def mcp_tool_handling(self, response: any):
"""
Main handler for MCP tool execution.
Args:
response (any): The tool response to process
Raises:
ValueError: If no MCP URL or MCP Servers are provided
Exception: If there's an error in tool handling
"""
try:
# if self.mcp_url is not None:
self._single_mcp_tool_handling(response)
# elif self.mcp_url is None and len(self.mcp_servers) > 1:
# self._multiple_mcp_tool_handling(response)
# else:
# raise ValueError("No MCP URL or MCP Servers provided")
except Exception as e:
logger.error(f"Error in mcp_tool_handling: {e}")
return tools
except AgentMCPConnectionError as e:
logger.error(f"Error in MCP connection: {e}")
raise e
def setup_config(self):
@ -1091,58 +1005,65 @@ class Agent:
*response_args, **kwargs
)
# Convert to a str if the response is not a str
if exists(self.tools_list_dictionary):
if isinstance(response, BaseModel):
response = response.model_dump()
# # Convert to a str if the response is not a str
# if self.mcp_url is None or self.tools is None:
response = self.parse_llm_output(response)
self.short_memory.add(
role=self.agent_name, content=response
role=self.agent_name,
content=format_dict_to_string(response),
)
# Print
self.pretty_print(response, loop_count)
# Output Cleaner
self.output_cleaner_op(response)
# # Output Cleaner
# self.output_cleaner_op(response)
####### MCP TOOL HANDLING #######
if (
self.mcp_servers
and self.tools_list_dictionary is not None
):
self.mcp_tool_handling(response)
# Check and execute tools
if exists(self.tools):
# out = self.parse_and_execute_tools(
# response
# )
####### MCP TOOL HANDLING #######
# self.short_memory.add(
# role="Tool Executor", content=out
# )
# Check and execute tools
if self.tools is not None:
out = self.parse_and_execute_tools(
response
)
# if self.no_print is False:
# agent_print(
# f"{self.agent_name} - Tool Executor",
# out,
# loop_count,
# self.streaming_on,
# )
self.short_memory.add(
role="Tool Executor", content=out
)
# out = self.call_llm(task=out)
if self.no_print is False:
agent_print(
f"{self.agent_name} - Tool Executor",
out,
loop_count,
self.streaming_on,
)
# self.short_memory.add(
# role=self.agent_name, content=out
# )
out = self.call_llm(task=out)
# if self.no_print is False:
# agent_print(
# f"{self.agent_name} - Agent Analysis",
# out,
# loop_count,
# self.streaming_on,
# )
self.short_memory.add(
role=self.agent_name, content=out
self.execute_tools(
response=response,
loop_count=loop_count,
)
if self.no_print is False:
agent_print(
f"{self.agent_name} - Agent Analysis",
out,
loop_count,
self.streaming_on,
if exists(self.mcp_url):
self.mcp_tool_handling(
response, loop_count
)
self.sentiment_and_evaluator(response)
@ -1362,36 +1283,36 @@ class Agent:
return output.getvalue()
def parse_and_execute_tools(self, response: str, *args, **kwargs):
max_retries = 3 # Maximum number of retries
retries = 0
while retries < max_retries:
try:
logger.info("Executing tool...")
# try to Execute the tool and return a string
out = parse_and_execute_json(
functions=self.tools,
json_string=response,
parse_md=True,
*args,
**kwargs,
)
logger.info(f"Tool Output: {out}")
# Add the output to the memory
# self.short_memory.add(
# role="Tool Executor",
# content=out,
# def parse_and_execute_tools(self, response: str, *args, **kwargs):
# max_retries = 3 # Maximum number of retries
# retries = 0
# while retries < max_retries:
# try:
# logger.info("Executing tool...")
# # try to Execute the tool and return a string
# out = parse_and_execute_json(
# functions=self.tools,
# json_string=response,
# parse_md=True,
# *args,
# **kwargs,
# )
return out
except Exception as error:
retries += 1
logger.error(
f"Attempt {retries}: Error executing tool: {error}"
)
if retries == max_retries:
raise error
time.sleep(1) # Wait for a bit before retrying
# logger.info(f"Tool Output: {out}")
# # Add the output to the memory
# # self.short_memory.add(
# # role="Tool Executor",
# # content=out,
# # )
# return out
# except Exception as error:
# retries += 1
# logger.error(
# f"Attempt {retries}: Error executing tool: {error}"
# )
# if retries == max_retries:
# raise error
# time.sleep(1) # Wait for a bit before retrying
def add_memory(self, message: str):
"""Add a memory to the agent
@ -1720,9 +1641,6 @@ class Agent:
# Reinitialize any necessary runtime components
self._reinitialize_after_load()
# Restore non-serializable properties (tokenizer, long_term_memory, logger_handler, agent_output, executor)
self.restore_non_serializable_properties()
if self.verbose:
self._log_loaded_state_info(resolved_path)
@ -2709,7 +2627,7 @@ class Agent:
f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]",
)
def parse_llm_output(self, response: Any) -> str:
def parse_llm_output(self, response: Any):
"""Parse and standardize the output from the LLM.
Args:
@ -2722,7 +2640,7 @@ class Agent:
ValueError: If the response format is unexpected and can't be handled
"""
try:
# Handle dictionary responses
if isinstance(response, dict):
if "choices" in response:
return response["choices"][0]["message"][
@ -2732,17 +2650,23 @@ class Agent:
response
) # Convert other dicts to string
# Handle string responses
elif isinstance(response, str):
return response
elif isinstance(response, BaseModel):
out = response.model_dump()
# Handle list responses (from check_llm_outputs)
elif isinstance(response, list):
return "\n".join(response)
# Handle List[BaseModel] responses
elif (
isinstance(response, list)
and response
and isinstance(response[0], BaseModel)
):
return [item.model_dump() for item in response]
# Handle any other type by converting to string
elif isinstance(response, list):
out = format_data_structure(response)
else:
return str(response)
out = str(response)
return out
except Exception as e:
logger.error(f"Error parsing LLM output: {e}")
@ -2780,24 +2704,123 @@ class Agent:
content=response,
)
def restore_non_serializable_properties(self):
"""
Restore non-serializable properties for the Agent instance after loading.
This should be called after loading agent state from disk.
def mcp_tool_handling(
self, response: any, current_loop: Optional[int] = 0
):
try:
if exists(self.mcp_url):
# Execute the tool call
tool_response = asyncio.run(
execute_tool_call_simple(
response=response,
server_path=self.mcp_url,
)
)
elif exists(self.mcp_config):
# Execute the tool call
tool_response = asyncio.run(
execute_tool_call_simple(
response=response,
connection=self.mcp_config,
)
)
else:
raise AgentMCPConnectionError(
"mcp_url must be either a string URL or MCPConnection object"
)
# Get the text content from the tool response
text_content = (
tool_response.content[0].text
if tool_response.content
else str(tool_response)
)
# Add to the memory
self.short_memory.add(
role="Tool Executor",
content=text_content,
)
# Create a temporary LLM instance without tools for the follow-up call
try:
temp_llm = LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=self.streaming_on,
)
summary = temp_llm.run(
task=self.short_memory.get_str()
)
except Exception as e:
logger.error(
f"Error calling LLM after MCP tool execution: {e}"
)
# Fallback: provide a default summary
summary = "I successfully executed the MCP tool and retrieved the information above."
self.pretty_print(summary, loop_count=current_loop)
# Add to the memory
self.short_memory.add(
role=self.agent_name, content=summary
)
except AgentMCPToolError as e:
logger.error(f"Error in MCP tool: {e}")
raise e
def execute_tools(self, response: any, loop_count: int):
output = (
self.tool_struct.execute_function_calls_from_api_response(
response
)
)
self.short_memory.add(
role="Tool Executor",
content=format_data_structure(output),
)
self.pretty_print(
f"{format_data_structure(output)}",
loop_count,
)
# Now run the LLM again without tools - create a temporary LLM instance
# instead of modifying the cached one
# Create a temporary LLM instance without tools for the follow-up call
temp_llm = LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=self.streaming_on,
tools_list_dictionary=None,
parallel_tool_calls=False,
)
tool_response = temp_llm.run(
f"""
Please analyze and summarize the following tool execution output in a clear and concise way.
Focus on the key information and insights that would be most relevant to the user's original request.
If there are any errors or issues, highlight them prominently.
Tool Output:
{output}
"""
restore_non_serializable_properties(self)
)
# Custom serialization for non-serializable properties
def __getstate__(self):
state = self.__dict__.copy()
# Remove non-serializable properties
for prop in ["tokenizer", "long_term_memory", "logger_handler", "agent_output", "executor"]:
if prop in state:
state[prop] = None # Or a serializable placeholder if needed
return state
self.short_memory.add(
role=self.agent_name,
content=tool_response,
)
def __setstate__(self, state):
self.__dict__.update(state)
# Restore non-serializable properties after loading
if hasattr(self, 'restore_non_serializable_properties'):
self.restore_non_serializable_properties()
self.pretty_print(
f"{tool_response}",
loop_count,
)
Loading…
Cancel
Save