@ -8,9 +8,9 @@ TODO:
- add async processing for run and batch run
- add async processing for run and batch run
- add plan module
- add plan module
- concurrent
- concurrent
-
- Add batched inputs
"""
"""
import asyncio
import json
import json
import logging
import logging
import time
import time
@ -100,24 +100,26 @@ class Flow:
self ,
self ,
llm : Any ,
llm : Any ,
# template: str,
# template: str,
max_loops : int = 5 ,
max_loops = 5 ,
stopping_condition : Optional [ Callable [ [ str ] , bool ] ] = None ,
stopping_condition : Optional [ Callable [ [ str ] , bool ] ] = None ,
loop_interval : int = 1 ,
loop_interval : int = 1 ,
retry_attempts : int = 3 ,
retry_attempts : int = 3 ,
retry_interval : int = 1 ,
retry_interval : int = 1 ,
return_history : bool = False ,
dynamic_loops : Optional [ bool ] = False ,
interactive : bool = False ,
interactive : bool = False ,
dashboard : bool = False ,
dashboard : bool = False ,
name: str = " Flow agent " ,
agent_ name: str = " Flow agent " ,
system_prompt : str = FLOW_SYSTEM_PROMPT ,
system_prompt : str = FLOW_SYSTEM_PROMPT ,
# tools: List[BaseTool] = None,
# tools: List[BaseTool] = None,
dynamic_temperature : bool = False ,
dynamic_temperature : bool = False ,
saved_state_path : Optional [ str ] = " flow_state.json " ,
saved_state_path : Optional [ str ] = " flow_state.json " ,
autosave : bool = False ,
autosave : bool = False ,
context_length : int = 8192 ,
context_length : int = 8192 ,
user_name : str = " Human " ,
* * kwargs : Any ,
* * kwargs : Any ,
) :
) :
self . llm = llm
self . llm = llm
# self.template = template
self . max_loops = max_loops
self . max_loops = max_loops
self . stopping_condition = stopping_condition
self . stopping_condition = stopping_condition
self . loop_interval = loop_interval
self . loop_interval = loop_interval
@ -130,9 +132,14 @@ class Flow:
self . interactive = interactive
self . interactive = interactive
self . dashboard = dashboard
self . dashboard = dashboard
self . dynamic_temperature = dynamic_temperature
self . dynamic_temperature = dynamic_temperature
self . dynamic_loops = dynamic_loops
self . user_name = user_name
# The max_loops will be set dynamically if the dynamic_loop
if self . dynamic_loops :
self . max_loops = " auto "
# self.tools = tools
# self.tools = tools
self . system_prompt = system_prompt
self . system_prompt = system_prompt
self . name = name
self . agent_ name = agent_ name
self . saved_state_path = saved_state_path
self . saved_state_path = saved_state_path
self . autosave = autosave
self . autosave = autosave
self . response_filters = [ ]
self . response_filters = [ ]
@ -194,7 +201,7 @@ class Flow:
def add_task_to_memory ( self , task : str ) :
def add_task_to_memory ( self , task : str ) :
""" Add the task to the memory """
""" Add the task to the memory """
self . memory . append ( [ f " Human : { task } " ] )
self . memory . append ( [ f " { self . user_name } : { task } " ] )
def add_message_to_memory ( self , message : str ) :
def add_message_to_memory ( self , message : str ) :
""" Add the message to the memory """
""" Add the message to the memory """
@ -222,7 +229,7 @@ class Flow:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Flow Configuration :
Flow Configuration :
Name : { self . name}
Name : { self . agent_ name}
System Prompt : { self . system_prompt }
System Prompt : { self . system_prompt }
Task : { task }
Task : { task }
Max Loops : { self . max_loops }
Max Loops : { self . max_loops }
@ -277,47 +284,40 @@ class Flow:
5. Repeat until stopping condition is met or max_loops is reached
5. Repeat until stopping condition is met or max_loops is reached
"""
"""
# Restore from saved state if provided, ortherwise start with a new history
# Activate Autonomous agent message
# if self.saved_state:
# self.load_state(self.saved_state)
# history = self.memory[-1]
# print(f"Loaded state from {self.saved_state}")
# else:
# history = [f"Human: {task}"]
# self.memory.append(history)
# print(colored(">>> Autonomous Agent Activated", "cyan", attrs=["bold"]))
self . activate_autonomous_agent ( )
self . activate_autonomous_agent ( )
# if self.autosave:
response = task
response = task
history = [ f " Human : { task } " ]
history = [ f " { self . user_name } : { task } " ]
# If dashboard = True then print the dashboard
# If dashboard = True then print the dashboard
if self . dashboard :
if self . dashboard :
self . print_dashboard ( task )
self . print_dashboard ( task )
for i in range ( self . max_loops ) :
loop_count = 0
print ( colored ( f " \n Loop { i + 1 } of { self . max_loops } " , " blue " ) )
# for i in range(self.max_loops):
while self . max_loops == ' auto ' or loop_count < self . max_loops :
loop_count + = 1
print ( colored ( f " \n Loop { loop_count } of { self . max_loops } " , " blue " ) )
print ( " \n " )
print ( " \n " )
if self . _check_stopping_condition ( response ) or parse_done_token (
response ) :
if self . _check_stopping_condition ( response ) or parse_done_token ( response ) :
break
break
# Adjust temperature, comment if no work
# Adjust temperature, comment if no work
if self . dynamic_temperature :
if self . dynamic_temperature :
self . dynamic_temperature ( )
self . dynamic_temperature ( )
# Preparing the prompt
task = self . agent_history_prompt ( FLOW_SYSTEM_PROMPT , response )
attempt = 0
attempt = 0
while attempt < self . retry_attempts :
while attempt < self . retry_attempts :
try :
try :
response = self . llm (
response = self . llm (
self . agent_history_prompt ( FLOW_SYSTEM_PROMPT , response ) ,
task
* * kwargs ,
* * kwargs ,
)
)
# print(f"Next query: {response}")
# break
if self . interactive :
if self . interactive :
print ( f " AI: { response } " )
print ( f " AI: { response } " )
history . append ( f " AI: { response } " )
history . append ( f " AI: { response } " )
@ -341,13 +341,14 @@ class Flow:
print ( colored ( f " Autosaving flow state to { save_path } " , " green " ) )
print ( colored ( f " Autosaving flow state to { save_path } " , " green " ) )
self . save_state ( save_path )
self . save_state ( save_path )
return response # , history
if self . return_history :
return response , history
return response
async def arun ( self , task : str , * * kwargs ) :
async def arun ( self , task : str , * * kwargs ) :
""" Async run """
pass
"""
"""
Run the autonomous agent loop
Run the autonomous agent loop aschnronously
Args :
Args :
task ( str ) : The initial task to run
task ( str ) : The initial task to run
@ -360,44 +361,40 @@ class Flow:
5. Repeat until stopping condition is met or max_loops is reached
5. Repeat until stopping condition is met or max_loops is reached
"""
"""
# Restore from saved state if provided, ortherwise start with a new history
# Activate Autonomous agent message
# if self.saved_state:
self . activate_autonomous_agent ( )
# self.load_state(self.saved_state)
# history = self.memory[-1]
# print(f"Loaded state from {self.saved_state}")
# else:
# history = [f"Human: {task}"]
# self.memory.append(history)
print ( colored ( " >>> Autonomous Agent Activated " , " cyan " , attrs = [ " bold " ] ) )
response = task
response = task
history = [ f " Human : { task } " ]
history = [ f " { self . user_name } : { task } " ]
# If dashboard = True then print the dashboard
# If dashboard = True then print the dashboard
if self . dashboard :
if self . dashboard :
self . print_dashboard ( task )
self . print_dashboard ( task )
for i in range ( self . max_loops ) :
loop_count = 0
print ( colored ( f " \n Loop { i + 1 } of { self . max_loops } " , " blue " ) )
# for i in range(self.max_loops):
while self . max_loops == ' auto ' or loop_count < self . max_loops :
loop_count + = 1
print ( colored ( f " \n Loop { loop_count } of { self . max_loops } " , " blue " ) )
print ( " \n " )
print ( " \n " )
if self . _check_stopping_condition ( response ) or parse_done_token (
response ) :
if self . _check_stopping_condition ( response ) or parse_done_token ( response ) :
break
break
# Adjust temperature, comment if no work
# Adjust temperature, comment if no work
if self . dynamic_temperature :
if self . dynamic_temperature :
self . dynamic_temperature ( )
self . dynamic_temperature ( )
# Preparing the prompt
task = self . agent_history_prompt ( FLOW_SYSTEM_PROMPT , response )
attempt = 0
attempt = 0
while attempt < self . retry_attempts :
while attempt < self . retry_attempts :
try :
try :
response = self . llm (
response = self . llm (
self . agent_history_prompt ( FLOW_SYSTEM_PROMPT , response ) ,
task
* * kwargs ,
* * kwargs ,
)
)
# print(f"Next query: {response}")
# break
if self . interactive :
if self . interactive :
print ( f " AI: { response } " )
print ( f " AI: { response } " )
history . append ( f " AI: { response } " )
history . append ( f " AI: { response } " )
@ -416,10 +413,15 @@ class Flow:
time . sleep ( self . loop_interval )
time . sleep ( self . loop_interval )
self . memory . append ( history )
self . memory . append ( history )
# if self.autosave:
if self . autosave :
# self.save_state("flow_state.json")
save_path = self . saved_state_path or " flow_state.json "
print ( colored ( f " Autosaving flow state to { save_path } " , " green " ) )
self . save_state ( save_path )
return response # , history
if self . return_history :
return response , history
return response
def _run ( self , * * kwargs : Any ) - > str :
def _run ( self , * * kwargs : Any ) - > str :
""" Generate a result using the provided keyword args. """
""" Generate a result using the provided keyword args. """
@ -451,6 +453,19 @@ class Flow:
"""
"""
return agent_history_prompt
return agent_history_prompt
async def run_concurrent ( self , tasks : List [ str ] , * * kwargs ) :
"""
Run a batch of tasks concurrently and handle an infinite level of task inputs .
Args :
tasks ( List [ str ] ) : A list of tasks to run .
"""
task_coroutines = [
self . run_async ( task , * * kwargs ) for task in tasks
]
completed_tasks = await asyncio . gather ( * task_coroutines )
return completed_tasks
def bulk_run ( self , inputs : List [ Dict [ str , Any ] ] ) - > List [ str ] :
def bulk_run ( self , inputs : List [ Dict [ str , Any ] ] ) - > List [ str ] :
""" Generate responses for multiple input sets. """
""" Generate responses for multiple input sets. """
return [ self . run ( * * input_data ) for input_data in inputs ]
return [ self . run ( * * input_data ) for input_data in inputs ]
@ -666,7 +681,8 @@ class Flow:
def get_llm_params ( self ) :
def get_llm_params ( self ) :
"""
"""
Extracts and returns the parameters of the llm object for serialization .
Extracts and returns the parameters of the llm object for serialization .
It assumes that the llm object has an __init__ method with parameters that can be used to recreate it .
It assumes that the llm object has an __init__ method
with parameters that can be used to recreate it .
"""
"""
if not hasattr ( self . llm , " __init__ " ) :
if not hasattr ( self . llm , " __init__ " ) :
return None
return None
@ -770,8 +786,24 @@ class Flow:
Your response :
Your response :
"""
"""
response = self . llm ( prompt , * * kwargs )
response = self . llm ( prompt , * * kwargs )
return { " role " : self . name, " content " : response }
return { " role " : self . agent_ name, " content " : response }
def update_system_prompt ( self , system_prompt : str ) :
def update_system_prompt ( self , system_prompt : str ) :
""" Upddate the system message """
""" Upddate the system message """
self . system_prompt = system_prompt
self . system_prompt = system_prompt
def update_max_loops ( self , max_loops : int ) :
""" Update the max loops """
self . max_loops = max_loops
def update_loop_interval ( self , loop_interval : int ) :
""" Update the loop interval """
self . loop_interval = loop_interval
def update_retry_attempts ( self , retry_attempts : int ) :
""" Update the retry attempts """
self . retry_attempts = retry_attempts
def update_retry_interval ( self , retry_interval : int ) :
""" Update the retry interval """
self . retry_interval = retry_interval