diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index e0427d44..ff5152c5 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -750,353 +750,45 @@ class Agent: # Main function def _run( - self, - task: Optional[str] = None, - img: Optional[str] = None, - speech: Optional[str] = None, - video: Optional[str] = None, - is_last: Optional[bool] = False, - print_task: Optional[bool] = False, - generate_speech: Optional[bool] = False, - *args, - **kwargs, - ) -> Any: - """ - run the agent - - Args: - task (str): The task to be performed. - img (str): The image to be processed. - is_last (bool): Indicates if this is the last task. - - Returns: - Any: The output of the agent. - (string, list, json, dict, yaml) - - Examples: - agent(task="What is the capital of France?") - agent(task="What is the capital of France?", img="path/to/image.jpg") - agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True) - """ - try: - self.check_if_no_prompt_then_autogenerate(task) - - self.agent_output.task = task - - # Add task to memory - self.short_memory.add(role=self.user_name, content=task) - - # Plan - if self.plan_enabled is True: - self.plan(task) - - # Set the loop count - loop_count = 0 - # Clear the short memory - response = None - all_responses = [] - - # Query the long term memory first for the context - if self.long_term_memory is not None: - self.memory_query(task) - - # Print the user's request - - if self.autosave: - self.save() - - # Print the request - if print_task is True: - formatter.print_panel( - f"\n User: {task}", - f"Task Request for {self.agent_name}", - ) - - while ( - self.max_loops == "auto" - or loop_count < self.max_loops - ): - loop_count += 1 - self.loop_count_print(loop_count, self.max_loops) - print("\n") - - # Dynamic temperature - if self.dynamic_temperature_enabled is True: - self.dynamic_temperature() - - # Task prompt - task_prompt = ( - self.short_memory.return_history_as_string() - ) - - # Parameters - attempt = 0 - success = False - while attempt < self.retry_attempts and not success: - try: - if ( - self.long_term_memory is not None - and self.rag_every_loop is True - ): - logger.info( - "Querying RAG database for context..." - ) - self.memory_query(task_prompt) - - # Generate response using LLM - response_args = ( - (task_prompt, *args) - if img is None - else (task_prompt, img, *args) - ) - response = self.call_llm( - *response_args, **kwargs - ) - - # Convert to a str if the response is not a str - response = self.llm_output_parser(response) - - # Print - if self.streaming_on is True: - # self.stream_response(response) - formatter.print_panel_token_by_token( - f"{self.agent_name}: {response}", - title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]", - ) - else: - # logger.info(f"Response: {response}") - formatter.print_panel( - f"{self.agent_name}: {response}", - f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]", - ) - - # Check if response is a dictionary and has 'choices' key - if ( - isinstance(response, dict) - and "choices" in response - ): - response = response["choices"][0][ - "message" - ]["content"] - elif isinstance(response, str): - # If response is already a string, use it as is - pass - else: - raise ValueError( - f"Unexpected response format: {type(response)}" - ) - - # Check and execute tools - if self.tools is not None: - out = self.parse_and_execute_tools( - response - ) - - self.short_memory.add( - role="Tool Executor", content=out - ) - - agent_print( - f"{self.agent_name} - Tool Executor", - out, - loop_count, - self.streaming_on, - ) - - out = self.llm.run(out) - - agent_print( - f"{self.agent_name} - Agent Analysis", - out, - loop_count, - self.streaming_on, - ) - - self.short_memory.add( - role=self.agent_name, content=out - ) - - # Add the response to the memory - self.short_memory.add( - role=self.agent_name, content=response - ) - - # Add to all responses - all_responses.append(response) - - # # TODO: Implement reliability check - - if self.evaluator: - logger.info("Evaluating response...") - evaluated_response = self.evaluator( - response - ) - print( - "Evaluated Response:" - f" {evaluated_response}" - ) - self.short_memory.add( - role="Evaluator", - content=evaluated_response, - ) - - # Sentiment analysis - if self.sentiment_analyzer: - logger.info("Analyzing sentiment...") - self.sentiment_analysis_handler(response) - - success = True # Mark as successful to exit the retry loop - - except Exception as e: - - log_agent_data(self.to_dict()) - - if self.autosave is True: - self.save() - - logger.error( - f"Attempt {attempt+1}: Error generating" - f" response: {e}" - ) - attempt += 1 - - if not success: - - log_agent_data(self.to_dict()) - - if self.autosave is True: - self.save() - - logger.error( - "Failed to generate a valid response after" - " retry attempts." - ) - break # Exit the loop if all retry attempts fail - - # Check stopping conditions - if ( - self.stopping_condition is not None - and self._check_stopping_condition(response) - ): - logger.info("Stopping condition met.") - break - elif ( - self.stopping_func is not None - and self.stopping_func(response) - ): - logger.info("Stopping function met.") - break - - if self.interactive: - logger.info("Interactive mode enabled.") - user_input = input("You: ") - - # User-defined exit command - if ( - user_input.lower() - == self.custom_exit_command.lower() - ): - print("Exiting as per user request.") - break - - self.short_memory.add( - role=self.user_name, content=user_input - ) - - if self.loop_interval: - logger.info( - f"Sleeping for {self.loop_interval} seconds" - ) - time.sleep(self.loop_interval) - - if self.autosave is True: - log_agent_data(self.to_dict()) - - if self.autosave is True: - self.save() - - # Apply the cleaner function to the response - if self.output_cleaner is not None: - logger.info("Applying output cleaner to response.") - response = self.output_cleaner(response) - logger.info( - f"Response after output cleaner: {response}" - ) - self.short_memory.add( - role="Output Cleaner", - content=response, - ) - - if self.agent_ops_on is True and is_last is True: - self.check_end_session_agentops() - - # Merge all responses - all_responses = [ - response - for response in all_responses - if response is not None - ] - - self.agent_output.steps = self.short_memory.to_dict() - self.agent_output.full_history = ( - self.short_memory.get_str() - ) - self.agent_output.total_tokens = count_tokens( - self.short_memory.get_str() - ) - - # Handle artifacts - if self.artifacts_on is True: - self.handle_artifacts( - concat_strings(all_responses), - self.artifacts_output_path, - self.artifacts_file_extension, - ) - - log_agent_data(self.to_dict()) - if self.autosave is True: - self.save() - - # More flexible output types - if ( - self.output_type == "string" - or self.output_type == "str" - ): - return concat_strings(all_responses) - elif self.output_type == "list": - return all_responses - elif ( - self.output_type == "json" - or self.return_step_meta is True - ): - return self.agent_output.model_dump_json(indent=4) - elif self.output_type == "csv": - return self.dict_to_csv( - self.agent_output.model_dump() - ) - elif self.output_type == "dict": - return self.agent_output.model_dump() - elif self.output_type == "yaml": - return yaml.safe_dump( - self.agent_output.model_dump(), sort_keys=False - ) - elif self.return_history is True: - history = self.short_memory.get_str() + self, + task: Optional[str] = None, + img: Optional[str] = None, + speech: Optional[str] = None, + video: Optional[str] = None, + is_last: Optional[bool] = False, + print_task: Optional[bool] = False, + generate_speech: Optional[bool] = False, + *args, + **kwargs, +) -> Any: + try: + # ... (other setup code) + + loop_count = 0 + max_retries = 5 # Set a maximum number of retries + while loop_count < self.max_loops: + loop_count += 1 + # ... (code within the loop) + + attempt = 0 + success = False + while attempt < max_retries and not success: + try: + # ... (code to attempt an operation) + success = True # Mark as successful to exit the retry loop + except Exception as e: + # ... (error handling code) + attempt += 1 + time.sleep(2 ** attempt) # Exponential backoff - formatter.print_panel( - history, title=f"{self.agent_name} History" - ) - return history - else: - raise ValueError( - f"Invalid output type: {self.output_type}" - ) + if not success: + # ... (handle failure after retries) + break # Exit the loop if all retry attempts fail - except Exception as error: - self._handle_run_error(error) + # ... (rest of the function) + except Exception as error: + self._handle_run_error(error) - except KeyboardInterrupt as error: - self._handle_run_error(error) def _handle_run_error(self, error: any): log_agent_data(self.to_dict())