Update agent.py

pull/765/head
nathanogaga118 3 months ago committed by GitHub
parent 2dc4684476
commit 4dbd4a6b4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -760,343 +760,35 @@ class Agent:
generate_speech: Optional[bool] = False,
*args,
**kwargs,
) -> Any:
"""
run the agent
Args:
task (str): The task to be performed.
img (str): The image to be processed.
is_last (bool): Indicates if this is the last task.
Returns:
Any: The output of the agent.
(string, list, json, dict, yaml)
Examples:
agent(task="What is the capital of France?")
agent(task="What is the capital of France?", img="path/to/image.jpg")
agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True)
"""
) -> Any:
try:
self.check_if_no_prompt_then_autogenerate(task)
self.agent_output.task = task
# ... (other setup code)
# Add task to memory
self.short_memory.add(role=self.user_name, content=task)
# Plan
if self.plan_enabled is True:
self.plan(task)
# Set the loop count
loop_count = 0
# Clear the short memory
response = None
all_responses = []
# Query the long term memory first for the context
if self.long_term_memory is not None:
self.memory_query(task)
# Print the user's request
if self.autosave:
self.save()
# Print the request
if print_task is True:
formatter.print_panel(
f"\n User: {task}",
f"Task Request for {self.agent_name}",
)
while (
self.max_loops == "auto"
or loop_count < self.max_loops
):
max_retries = 5 # Set a maximum number of retries
while loop_count < self.max_loops:
loop_count += 1
self.loop_count_print(loop_count, self.max_loops)
print("\n")
# Dynamic temperature
if self.dynamic_temperature_enabled is True:
self.dynamic_temperature()
# ... (code within the loop)
# Task prompt
task_prompt = (
self.short_memory.return_history_as_string()
)
# Parameters
attempt = 0
success = False
while attempt < self.retry_attempts and not success:
while attempt < max_retries and not success:
try:
if (
self.long_term_memory is not None
and self.rag_every_loop is True
):
logger.info(
"Querying RAG database for context..."
)
self.memory_query(task_prompt)
# Generate response using LLM
response_args = (
(task_prompt, *args)
if img is None
else (task_prompt, img, *args)
)
response = self.call_llm(
*response_args, **kwargs
)
# Convert to a str if the response is not a str
response = self.llm_output_parser(response)
# Print
if self.streaming_on is True:
# self.stream_response(response)
formatter.print_panel_token_by_token(
f"{self.agent_name}: {response}",
title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]",
)
else:
# logger.info(f"Response: {response}")
formatter.print_panel(
f"{self.agent_name}: {response}",
f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]",
)
# Check if response is a dictionary and has 'choices' key
if (
isinstance(response, dict)
and "choices" in response
):
response = response["choices"][0][
"message"
]["content"]
elif isinstance(response, str):
# If response is already a string, use it as is
pass
else:
raise ValueError(
f"Unexpected response format: {type(response)}"
)
# Check and execute tools
if self.tools is not None:
out = self.parse_and_execute_tools(
response
)
self.short_memory.add(
role="Tool Executor", content=out
)
agent_print(
f"{self.agent_name} - Tool Executor",
out,
loop_count,
self.streaming_on,
)
out = self.llm.run(out)
agent_print(
f"{self.agent_name} - Agent Analysis",
out,
loop_count,
self.streaming_on,
)
self.short_memory.add(
role=self.agent_name, content=out
)
# Add the response to the memory
self.short_memory.add(
role=self.agent_name, content=response
)
# Add to all responses
all_responses.append(response)
# # TODO: Implement reliability check
if self.evaluator:
logger.info("Evaluating response...")
evaluated_response = self.evaluator(
response
)
print(
"Evaluated Response:"
f" {evaluated_response}"
)
self.short_memory.add(
role="Evaluator",
content=evaluated_response,
)
# Sentiment analysis
if self.sentiment_analyzer:
logger.info("Analyzing sentiment...")
self.sentiment_analysis_handler(response)
# ... (code to attempt an operation)
success = True # Mark as successful to exit the retry loop
except Exception as e:
log_agent_data(self.to_dict())
if self.autosave is True:
self.save()
logger.error(
f"Attempt {attempt+1}: Error generating"
f" response: {e}"
)
# ... (error handling code)
attempt += 1
time.sleep(2 ** attempt) # Exponential backoff
if not success:
log_agent_data(self.to_dict())
if self.autosave is True:
self.save()
logger.error(
"Failed to generate a valid response after"
" retry attempts."
)
# ... (handle failure after retries)
break # Exit the loop if all retry attempts fail
# Check stopping conditions
if (
self.stopping_condition is not None
and self._check_stopping_condition(response)
):
logger.info("Stopping condition met.")
break
elif (
self.stopping_func is not None
and self.stopping_func(response)
):
logger.info("Stopping function met.")
break
if self.interactive:
logger.info("Interactive mode enabled.")
user_input = input("You: ")
# User-defined exit command
if (
user_input.lower()
== self.custom_exit_command.lower()
):
print("Exiting as per user request.")
break
self.short_memory.add(
role=self.user_name, content=user_input
)
if self.loop_interval:
logger.info(
f"Sleeping for {self.loop_interval} seconds"
)
time.sleep(self.loop_interval)
if self.autosave is True:
log_agent_data(self.to_dict())
if self.autosave is True:
self.save()
# Apply the cleaner function to the response
if self.output_cleaner is not None:
logger.info("Applying output cleaner to response.")
response = self.output_cleaner(response)
logger.info(
f"Response after output cleaner: {response}"
)
self.short_memory.add(
role="Output Cleaner",
content=response,
)
if self.agent_ops_on is True and is_last is True:
self.check_end_session_agentops()
# Merge all responses
all_responses = [
response
for response in all_responses
if response is not None
]
self.agent_output.steps = self.short_memory.to_dict()
self.agent_output.full_history = (
self.short_memory.get_str()
)
self.agent_output.total_tokens = count_tokens(
self.short_memory.get_str()
)
# Handle artifacts
if self.artifacts_on is True:
self.handle_artifacts(
concat_strings(all_responses),
self.artifacts_output_path,
self.artifacts_file_extension,
)
log_agent_data(self.to_dict())
if self.autosave is True:
self.save()
# More flexible output types
if (
self.output_type == "string"
or self.output_type == "str"
):
return concat_strings(all_responses)
elif self.output_type == "list":
return all_responses
elif (
self.output_type == "json"
or self.return_step_meta is True
):
return self.agent_output.model_dump_json(indent=4)
elif self.output_type == "csv":
return self.dict_to_csv(
self.agent_output.model_dump()
)
elif self.output_type == "dict":
return self.agent_output.model_dump()
elif self.output_type == "yaml":
return yaml.safe_dump(
self.agent_output.model_dump(), sort_keys=False
)
elif self.return_history is True:
history = self.short_memory.get_str()
formatter.print_panel(
history, title=f"{self.agent_name} History"
)
return history
else:
raise ValueError(
f"Invalid output type: {self.output_type}"
)
# ... (rest of the function)
except Exception as error:
self._handle_run_error(error)
except KeyboardInterrupt as error:
self._handle_run_error(error)
def _handle_run_error(self, error: any):
log_agent_data(self.to_dict())

Loading…
Cancel
Save