@ -119,12 +119,13 @@ class Flow:
retry_attempts : int = 3 ,
retry_interval : int = 1 ,
return_history : bool = False ,
stopping_token : str = None ,
dynamic_loops : Optional [ bool ] = False ,
interactive : bool = False ,
dashboard : bool = False ,
agent_name : str = " Flow agent " ,
system_prompt : str = FLOW_SYSTEM_PROMPT ,
tools : List [ Any ] = None ,
# tools: List[Any] = None ,
dynamic_temperature : bool = False ,
saved_state_path : Optional [ str ] = " flow_state.json " ,
autosave : bool = False ,
@ -141,16 +142,17 @@ class Flow:
self . feedback = [ ]
self . memory = [ ]
self . task = None
self . stopping_token = " <DONE> "
self . stopping_token = stopping_token or " <DONE> "
self . interactive = interactive
self . dashboard = dashboard
self . return_history = return_history
self . dynamic_temperature = dynamic_temperature
self . dynamic_loops = dynamic_loops
self . user_name = user_name
# The max_loops will be set dynamically if the dynamic_loop
if self . dynamic_loops :
self . max_loops = " auto "
self . tools = tools or [ ]
# self.tools = tools or [ ]
self . system_prompt = system_prompt
self . agent_name = agent_name
self . saved_state_path = saved_state_path
@ -206,72 +208,72 @@ class Flow:
return " \n " . join ( params_str_list )
def parse_tool_command ( self , text : str ) :
# Parse the text for tool usage
pass
def get_tool_description ( self ) :
""" Get the tool description """
tool_descriptions = [ ]
for tool in self . tools :
description = f " { tool . name } : { tool . description } "
tool_descriptions . append ( description )
return " \n " . join ( tool_descriptions )
def find_tool_by_name ( self , name : str ) :
""" Find a tool by name """
for tool in self . tools :
if tool . name == name :
return tool
return None
def construct_dynamic_prompt ( self ) :
""" Construct the dynamic prompt """
tools_description = self . get_tool_description ( )
return DYNAMICAL_TOOL_USAGE . format ( tools = tools_description )
def extract_tool_commands ( self , text : str ) :
"""
Extract the tool commands from the text
Example :
` ` ` json
{
" tool " : " tool_name " ,
" params " : {
" tool1 " : " inputs " ,
" param2 " : " value2 "
}
}
` ` `
"""
# Regex to find JSON like strings
pattern = r " ```json(.+?)``` "
matches = re . findall ( pattern , text , re . DOTALL )
json_commands = [ ]
for match in matches :
try :
json_commands = json . loads ( match )
json_commands . append ( json_commands )
except Exception as error :
print ( f " Error parsing JSON command: { error } " )
def parse_and_execute_tools ( self , response ) :
""" Parse and execute the tools """
json_commands = self . extract_tool_commands ( response )
for command in json_commands :
tool_name = command . get ( " tool " )
params = command . get ( " parmas " , { } )
self . execute_tool ( tool_name , params )
def execute_tools ( self , tool_name , params ) :
""" Execute the tool with the provided params """
tool = self . tool_find_by_name ( tool_name )
if tool :
# Execute the tool with the provided parameters
tool_result = tool . run ( * * params )
print ( tool_result )
# def parse_tool_command(self, text: str) :
# # Parse the text for tool usage
# pass
# def get_tool_description(self) :
# """Get the tool description """
# tool_descriptions = []
# for tool in self.tools:
# description = f"{tool.name}: {tool.description}"
# tool_descriptions.append(description)
# return "\n".join(tool_descriptions )
# def find_tool_by_name(self, name: str) :
# """Find a tool by name """
# for tool in self.tools:
# if tool.name == name:
# return tool
# return None
# def construct_dynamic_prompt(self) :
# """Construct the dynamic prompt """
# tools_description = self.get_tool_description()
# return DYNAMICAL_TOOL_USAGE.format(tools=tools_description )
# def extract_tool_commands(self, text: str) :
# """
# Extract the tool commands from the text
# Example:
# ```json
# {
# "tool": "tool_name",
# "params": {
# "tool1": "inputs",
# "param2": "value2 "
# }
# }
# ```
# """
# # Regex to find JSON like strings
# pattern = r"```json(.+?)```"
# matches = re.findall(pattern, text, re.DOTALL)
# json_commands = []
# for match in matches :
# try:
# json_commands = json.loads(match)
# json_commands.append(json_commands)
# except Exception as error :
# print(f"Error parsing JSON command: {error}")
# def parse_and_execute_tools(self, response) :
# """Parse and execute the tools """
# json_commands = self.extract_tool_commands(response)
# for command in json_commands :
# tool_name = command.get("tool" )
# params = command.get("parmas", {})
# self.execute_tool(tool_name, params)
# def execute_tools(self, tool_name, params) :
# """Execute the tool with the provided params """
# tool = self.tool_find_by_name(tool_name)
# if tool:
# # Execute the tool with the provided parameters
# tool_result = tool.run(**params )
# print(tool_result)
def truncate_history ( self ) :
"""
@ -367,13 +369,13 @@ class Flow:
5. Repeat until stopping condition is met or max_loops is reached
"""
dynamic_prompt = self . construct_dynamic_prompt ( )
combined_prompt = f " { dynamic_prompt } \n { task } "
# dynamic_prompt = self.construct_dynamic_prompt( )
# combined_prompt = f"{dynamic_prompt}\n{task} "
# Activate Autonomous agent message
self . activate_autonomous_agent ( )
response = combined_prompt # or task
response = task # or combined_prompt
history = [ f " { self . user_name } : { task } " ]
# If dashboard = True then print the dashboard