@ -267,9 +267,17 @@ class Agent:
def _check_stopping_condition ( self , response : str ) - > bool :
""" Check if the stopping condition is met. """
if self . stopping_condition :
return self . stopping_condition ( response )
return False
try :
if self . stopping_condition :
return self . stopping_condition ( response )
return False
except Exception as error :
print (
colored (
f " Error checking stopping condition: { error } " ,
" red " ,
)
)
def dynamic_temperature ( self ) :
"""
@ -278,12 +286,19 @@ class Agent:
3. If the temperature is present , then dynamically change the temperature
4. for every loop you can randomly change the temperature on a scale from 0.0 to 1.0
"""
if hasattr ( self . llm , " temperature " ) :
# Randomly change the temperature attribute of self.llm object
self . llm . temperature = random . uniform ( 0.0 , 1.0 )
else :
# Use a default temperature
self . llm . temperature = 0.7
try :
if hasattr ( self . llm , " temperature " ) :
# Randomly change the temperature attribute of self.llm object
self . llm . temperature = random . uniform ( 0.0 , 1.0 )
else :
# Use a default temperature
self . llm . temperature = 0.7
except Exception as error :
print (
colored (
f " Error dynamically changing temperature: { error } "
)
)
def format_prompt ( self , template , * * kwargs : Any ) - > str :
""" Format the template with the provided kwargs using f-string interpolation. """
@ -407,11 +422,25 @@ class Agent:
def add_task_to_memory ( self , task : str ) :
""" Add the task to the memory """
self . short_memory . append ( [ f " { self . user_name } : { task } " ] )
try :
self . short_memory . append ( [ f " { self . user_name } : { task } " ] )
except Exception as error :
print (
colored (
f " Error adding task to memory: { error } " , " red "
)
)
def add_message_to_memory ( self , message : str ) :
""" Add the message to the memory """
self . short_memory [ - 1 ] . append ( message )
try :
self . short_memory [ - 1 ] . append ( message )
except Exception as error :
print (
colored (
f " Error adding message to memory: { error } " , " red "
)
)
def add_message_to_memory_and_truncate ( self , message : str ) :
""" Add the message to the memory and truncate """
@ -466,8 +495,15 @@ class Agent:
message ( Dict [ str , Any ] ) : _description_
metadata ( Dict [ str , Any ] ) : _description_
"""
if self . memory is not None :
self . memory . add ( message , metadata )
try :
if self . memory is not None :
self . memory . add ( message , metadata )
except Exception as error :
print (
colored (
f " Error adding message to memory: { error } " , " red "
)
)
def query_memorydb (
self ,
@ -715,88 +751,105 @@ class Agent:
5. Repeat until stopping condition is met or max_loops is reached
"""
# Activate Autonomous agent message
self . activate_autonomous_agent ( )
try :
# Activate Autonomous agent message
self . activate_autonomous_agent ( )
response = task
history = [ f " { self . user_name } : { task } " ]
response = task
history = [ f " { self . user_name } : { task } " ]
# If dashboard = True then print the dashboard
if self . dashboard :
self . print_dashboard ( task )
# If dashboard = True then print the dashboard
if self . dashboard :
self . print_dashboard ( task )
loop_count = 0
# for i in range(self.max_loops):
while self . max_loops == " auto " or loop_count < self . max_loops :
loop_count + = 1
print (
colored (
f " \n Loop { loop_count } of { self . max_loops } " , " blue "
loop_count = 0
# for i in range(self.max_loops):
while (
self . max_loops == " auto "
or loop_count < self . max_loops
) :
loop_count + = 1
print (
colored (
f " \n Loop { loop_count } of { self . max_loops } " ,
" blue " ,
)
)
)
print ( " \n " )
print ( " \n " )
if self . _check_stopping_condition (
response
) or parse_done_token ( response ) :
break
if self . _check_stopping_condition (
response
) or parse_done_token ( response ) :
break
# Adjust temperature, comment if no work
if self . dynamic_temperature_enabled :
self . dynamic_temperature ( )
# Adjust temperature, comment if no work
if self . dynamic_temperature_enabled :
self . dynamic_temperature ( )
# Preparing the prompt
task = self . agent_history_prompt (
FLOW_SYSTEM_PROMPT , response
)
# Preparing the prompt
task = self . agent_history_prompt (
FLOW_SYSTEM_PROMPT , response
)
attempt = 0
while attempt < self . retry_attempts :
try :
response = self . llm (
task * * kwargs ,
attempt = 0
while attempt < self . retry_attempts :
try :
response = self . llm (
task * * kwargs ,
)
if self . interactive :
print ( f " AI: { response } " )
history . append ( f " AI: { response } " )
response = input ( " You: " )
history . append ( f " Human: { response } " )
else :
print ( f " AI: { response } " )
history . append ( f " AI: { response } " )
print ( response )
break
except Exception as e :
logging . error (
f " Error generating response: { e } "
)
attempt + = 1
time . sleep ( self . retry_interval )
history . append ( response )
time . sleep ( self . loop_interval )
self . memory . append ( history )
if self . autosave :
print (
colored (
(
" Autosaving agent state to "
f " { self . saved_state_path } "
) ,
" green " ,
)
if self . interactive :
print ( f " AI: { response } " )
history . append ( f " AI: { response } " )
response = input ( " You: " )
history . append ( f " Human: { response } " )
else :
print ( f " AI: { response } " )
history . append ( f " AI: { response } " )
print ( response )
break
except Exception as e :
logging . error ( f " Error generating response: { e } " )
attempt + = 1
time . sleep ( self . retry_interval )
history . append ( response )
time . sleep ( self . loop_interval )
self . memory . append ( history )
if self . autosave :
)
self . save_state ( self . saved_state_path )
if self . return_history :
return response , history
return response
except Exception as error :
print (
colored (
(
" Autosaving agent state to "
f " { self . saved_state_path } "
) ,
" green " ,
f " Error asynchronous running agent: { error } " ,
" red " ,
)
)
self . save_state ( self . saved_state_path )
if self . return_history :
return response , history
return response
def _run ( self , * * kwargs : Any ) - > str :
""" Generate a result using the provided keyword args. """
task = self . format_prompt ( * * kwargs )
response , history = self . _generate ( task , task )
logging . info ( f " Message history: { history } " )
return response
try :
task = self . format_prompt ( * * kwargs )
response , history = self . _generate ( task , task )
logging . info ( f " Message history: { history } " )
return response
except Exception as error :
print ( colored ( f " Error running agent: { error } " , " red " ) )
def agent_history_prompt (
self ,
@ -844,15 +897,29 @@ class Agent:
Args :
tasks ( List [ str ] ) : A list of tasks to run .
"""
task_coroutines = [
self . run_async ( task , * * kwargs ) for task in tasks
]
completed_tasks = await asyncio . gather ( * task_coroutines )
return completed_tasks
try :
task_coroutines = [
self . run_async ( task , * * kwargs ) for task in tasks
]
completed_tasks = await asyncio . gather ( * task_coroutines )
return completed_tasks
except Exception as error :
print (
colored (
(
f " Error running agent: { error } while running "
" concurrently "
) ,
" red " ,
)
)
def bulk_run ( self , inputs : List [ Dict [ str , Any ] ] ) - > List [ str ] :
""" Generate responses for multiple input sets. """
return [ self . run ( * * input_data ) for input_data in inputs ]
try :
""" Generate responses for multiple input sets. """
return [ self . run ( * * input_data ) for input_data in inputs ]
except Exception as error :
print ( colored ( f " Error running bulk run: { error } " , " red " ) )
@staticmethod
def from_llm_and_template ( llm : Any , template : str ) - > " Agent " :
@ -874,9 +941,14 @@ class Agent:
Args :
file_path ( _type_ ) : _description_
"""
with open ( file_path , " w " ) as f :
json . dump ( self . short_memory , f )
print ( f " Saved agent history to { file_path } " )
try :
with open ( file_path , " w " ) as f :
json . dump ( self . short_memory , f )
print ( f " Saved agent history to { file_path } " )
except Exception as error :
print (
colored ( f " Error saving agent history: { error } " , " red " )
)
def load ( self , file_path : str ) :
"""
@ -1127,51 +1199,70 @@ class Agent:
Example :
>> > agent . save_state ( ' saved_flow.json ' )
"""
state = {
" agent_id " : str ( self . id ) ,
" agent_name " : self . agent_name ,
" agent_description " : self . agent_description ,
" system_prompt " : self . system_prompt ,
" sop " : self . sop ,
" short_memory " : self . short_memory ,
" loop_interval " : self . loop_interval ,
" retry_attempts " : self . retry_attempts ,
" retry_interval " : self . retry_interval ,
" interactive " : self . interactive ,
" dashboard " : self . dashboard ,
" dynamic_temperature " : self . dynamic_temperature_enabled ,
" autosave " : self . autosave ,
" saved_state_path " : self . saved_state_path ,
" max_loops " : self . max_loops ,
}
try :
state = {
" agent_id " : str ( self . id ) ,
" agent_name " : self . agent_name ,
" agent_description " : self . agent_description ,
" system_prompt " : self . system_prompt ,
" sop " : self . sop ,
" short_memory " : self . short_memory ,
" loop_interval " : self . loop_interval ,
" retry_attempts " : self . retry_attempts ,
" retry_interval " : self . retry_interval ,
" interactive " : self . interactive ,
" dashboard " : self . dashboard ,
" dynamic_temperature " : (
self . dynamic_temperature_enabled
) ,
" autosave " : self . autosave ,
" saved_state_path " : self . saved_state_path ,
" max_loops " : self . max_loops ,
}
with open ( file_path , " w " ) as f :
json . dump ( state , f , indent = 4 )
with open ( file_path , " w " ) as f :
json . dump ( state , f , indent = 4 )
saved = colored ( f " Saved agent state to: { file_path } " , " green " )
print ( saved )
saved = colored (
f " Saved agent state to: { file_path } " , " green "
)
print ( saved )
except Exception as error :
print (
colored ( f " Error saving agent state: { error } " , " red " )
)
def state_to_str ( self ) :
""" Transform the JSON into a string """
state = {
" agent_id " : str ( self . id ) ,
" agent_name " : self . agent_name ,
" agent_description " : self . agent_description ,
" system_prompt " : self . system_prompt ,
" sop " : self . sop ,
" short_memory " : self . short_memory ,
" loop_interval " : self . loop_interval ,
" retry_attempts " : self . retry_attempts ,
" retry_interval " : self . retry_interval ,
" interactive " : self . interactive ,
" dashboard " : self . dashboard ,
" dynamic_temperature " : self . dynamic_temperature_enabled ,
" autosave " : self . autosave ,
" saved_state_path " : self . saved_state_path ,
" max_loops " : self . max_loops ,
}
out = str ( state )
return out
try :
state = {
" agent_id " : str ( self . id ) ,
" agent_name " : self . agent_name ,
" agent_description " : self . agent_description ,
" system_prompt " : self . system_prompt ,
" sop " : self . sop ,
" short_memory " : self . short_memory ,
" loop_interval " : self . loop_interval ,
" retry_attempts " : self . retry_attempts ,
" retry_interval " : self . retry_interval ,
" interactive " : self . interactive ,
" dashboard " : self . dashboard ,
" dynamic_temperature " : (
self . dynamic_temperature_enabled
) ,
" autosave " : self . autosave ,
" saved_state_path " : self . saved_state_path ,
" max_loops " : self . max_loops ,
}
out = str ( state )
return out
except Exception as error :
print (
colored (
f " Error transforming state to string: { error } " ,
" red " ,
)
)
def load_state ( self , file_path : str ) :
"""
@ -1358,6 +1449,7 @@ class Agent:
# Response
‘ ‘ ‘
"""
return PROMPT
def self_healing ( self , * * kwargs ) :
"""