From 4097a9b703b3e575980ce2b5b38086feb1e2107b Mon Sep 17 00:00:00 2001 From: Kye Date: Sun, 3 Dec 2023 03:46:21 -0800 Subject: [PATCH] [FEAT][Extensive Error handling] --- swarms/structs/agent.py | 350 ++++++++++++++++---------- swarms/structs/sequential_workflow.py | 69 +++-- 2 files changed, 275 insertions(+), 144 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 30faef6e..9987d9e3 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -267,9 +267,17 @@ class Agent: def _check_stopping_condition(self, response: str) -> bool: """Check if the stopping condition is met.""" - if self.stopping_condition: - return self.stopping_condition(response) - return False + try: + if self.stopping_condition: + return self.stopping_condition(response) + return False + except Exception as error: + print( + colored( + f"Error checking stopping condition: {error}", + "red", + ) + ) def dynamic_temperature(self): """ @@ -278,12 +286,19 @@ class Agent: 3. If the temperature is present, then dynamically change the temperature 4. for every loop you can randomly change the temperature on a scale from 0.0 to 1.0 """ - if hasattr(self.llm, "temperature"): - # Randomly change the temperature attribute of self.llm object - self.llm.temperature = random.uniform(0.0, 1.0) - else: - # Use a default temperature - self.llm.temperature = 0.7 + try: + if hasattr(self.llm, "temperature"): + # Randomly change the temperature attribute of self.llm object + self.llm.temperature = random.uniform(0.0, 1.0) + else: + # Use a default temperature + self.llm.temperature = 0.7 + except Exception as error: + print( + colored( + f"Error dynamically changing temperature: {error}" + ) + ) def format_prompt(self, template, **kwargs: Any) -> str: """Format the template with the provided kwargs using f-string interpolation.""" @@ -407,11 +422,25 @@ class Agent: def add_task_to_memory(self, task: str): """Add the task to the memory""" - self.short_memory.append([f"{self.user_name}: {task}"]) + try: + self.short_memory.append([f"{self.user_name}: {task}"]) + except Exception as error: + print( + colored( + f"Error adding task to memory: {error}", "red" + ) + ) def add_message_to_memory(self, message: str): """Add the message to the memory""" - self.short_memory[-1].append(message) + try: + self.short_memory[-1].append(message) + except Exception as error: + print( + colored( + f"Error adding message to memory: {error}", "red" + ) + ) def add_message_to_memory_and_truncate(self, message: str): """Add the message to the memory and truncate""" @@ -466,8 +495,15 @@ class Agent: message (Dict[str, Any]): _description_ metadata (Dict[str, Any]): _description_ """ - if self.memory is not None: - self.memory.add(message, metadata) + try: + if self.memory is not None: + self.memory.add(message, metadata) + except Exception as error: + print( + colored( + f"Error adding message to memory: {error}", "red" + ) + ) def query_memorydb( self, @@ -715,88 +751,105 @@ class Agent: 5. Repeat until stopping condition is met or max_loops is reached """ - # Activate Autonomous agent message - self.activate_autonomous_agent() + try: + # Activate Autonomous agent message + self.activate_autonomous_agent() - response = task - history = [f"{self.user_name}: {task}"] + response = task + history = [f"{self.user_name}: {task}"] - # If dashboard = True then print the dashboard - if self.dashboard: - self.print_dashboard(task) + # If dashboard = True then print the dashboard + if self.dashboard: + self.print_dashboard(task) - loop_count = 0 - # for i in range(self.max_loops): - while self.max_loops == "auto" or loop_count < self.max_loops: - loop_count += 1 - print( - colored( - f"\nLoop {loop_count} of {self.max_loops}", "blue" + loop_count = 0 + # for i in range(self.max_loops): + while ( + self.max_loops == "auto" + or loop_count < self.max_loops + ): + loop_count += 1 + print( + colored( + f"\nLoop {loop_count} of {self.max_loops}", + "blue", + ) ) - ) - print("\n") + print("\n") - if self._check_stopping_condition( - response - ) or parse_done_token(response): - break + if self._check_stopping_condition( + response + ) or parse_done_token(response): + break - # Adjust temperature, comment if no work - if self.dynamic_temperature_enabled: - self.dynamic_temperature() + # Adjust temperature, comment if no work + if self.dynamic_temperature_enabled: + self.dynamic_temperature() - # Preparing the prompt - task = self.agent_history_prompt( - FLOW_SYSTEM_PROMPT, response - ) + # Preparing the prompt + task = self.agent_history_prompt( + FLOW_SYSTEM_PROMPT, response + ) - attempt = 0 - while attempt < self.retry_attempts: - try: - response = self.llm( - task**kwargs, + attempt = 0 + while attempt < self.retry_attempts: + try: + response = self.llm( + task**kwargs, + ) + if self.interactive: + print(f"AI: {response}") + history.append(f"AI: {response}") + response = input("You: ") + history.append(f"Human: {response}") + else: + print(f"AI: {response}") + history.append(f"AI: {response}") + print(response) + break + except Exception as e: + logging.error( + f"Error generating response: {e}" + ) + attempt += 1 + time.sleep(self.retry_interval) + history.append(response) + time.sleep(self.loop_interval) + self.memory.append(history) + + if self.autosave: + print( + colored( + ( + "Autosaving agent state to" + f" {self.saved_state_path}" + ), + "green", ) - if self.interactive: - print(f"AI: {response}") - history.append(f"AI: {response}") - response = input("You: ") - history.append(f"Human: {response}") - else: - print(f"AI: {response}") - history.append(f"AI: {response}") - print(response) - break - except Exception as e: - logging.error(f"Error generating response: {e}") - attempt += 1 - time.sleep(self.retry_interval) - history.append(response) - time.sleep(self.loop_interval) - self.memory.append(history) - - if self.autosave: + ) + self.save_state(self.saved_state_path) + + if self.return_history: + return response, history + + return response + except Exception as error: print( colored( - ( - "Autosaving agent state to" - f" {self.saved_state_path}" - ), - "green", + f"Error asynchronous running agent: {error}", + "red", ) ) - self.save_state(self.saved_state_path) - - if self.return_history: - return response, history - - return response def _run(self, **kwargs: Any) -> str: """Generate a result using the provided keyword args.""" - task = self.format_prompt(**kwargs) - response, history = self._generate(task, task) - logging.info(f"Message history: {history}") - return response + try: + task = self.format_prompt(**kwargs) + response, history = self._generate(task, task) + logging.info(f"Message history: {history}") + return response + except Exception as error: + print(colored(f"Error running agent: {error}", "red")) def agent_history_prompt( self, @@ -844,15 +897,29 @@ class Agent: Args: tasks (List[str]): A list of tasks to run. """ - task_coroutines = [ - self.run_async(task, **kwargs) for task in tasks - ] - completed_tasks = await asyncio.gather(*task_coroutines) - return completed_tasks + try: + task_coroutines = [ + self.run_async(task, **kwargs) for task in tasks + ] + completed_tasks = await asyncio.gather(*task_coroutines) + return completed_tasks + except Exception as error: + print( + colored( + ( + f"Error running agent: {error} while running" + " concurrently" + ), + "red", + ) + ) def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]: - """Generate responses for multiple input sets.""" - return [self.run(**input_data) for input_data in inputs] + try: + """Generate responses for multiple input sets.""" + return [self.run(**input_data) for input_data in inputs] + except Exception as error: + print(colored(f"Error running bulk run: {error}", "red")) @staticmethod def from_llm_and_template(llm: Any, template: str) -> "Agent": @@ -874,9 +941,14 @@ class Agent: Args: file_path (_type_): _description_ """ - with open(file_path, "w") as f: - json.dump(self.short_memory, f) - print(f"Saved agent history to {file_path}") + try: + with open(file_path, "w") as f: + json.dump(self.short_memory, f) + print(f"Saved agent history to {file_path}") + except Exception as error: + print( + colored(f"Error saving agent history: {error}", "red") + ) def load(self, file_path: str): """ @@ -1127,51 +1199,70 @@ class Agent: Example: >>> agent.save_state('saved_flow.json') """ - state = { - "agent_id": str(self.id), - "agent_name": self.agent_name, - "agent_description": self.agent_description, - "system_prompt": self.system_prompt, - "sop": self.sop, - "short_memory": self.short_memory, - "loop_interval": self.loop_interval, - "retry_attempts": self.retry_attempts, - "retry_interval": self.retry_interval, - "interactive": self.interactive, - "dashboard": self.dashboard, - "dynamic_temperature": self.dynamic_temperature_enabled, - "autosave": self.autosave, - "saved_state_path": self.saved_state_path, - "max_loops": self.max_loops, - } + try: + state = { + "agent_id": str(self.id), + "agent_name": self.agent_name, + "agent_description": self.agent_description, + "system_prompt": self.system_prompt, + "sop": self.sop, + "short_memory": self.short_memory, + "loop_interval": self.loop_interval, + "retry_attempts": self.retry_attempts, + "retry_interval": self.retry_interval, + "interactive": self.interactive, + "dashboard": self.dashboard, + "dynamic_temperature": ( + self.dynamic_temperature_enabled + ), + "autosave": self.autosave, + "saved_state_path": self.saved_state_path, + "max_loops": self.max_loops, + } - with open(file_path, "w") as f: - json.dump(state, f, indent=4) + with open(file_path, "w") as f: + json.dump(state, f, indent=4) - saved = colored(f"Saved agent state to: {file_path}", "green") - print(saved) + saved = colored( + f"Saved agent state to: {file_path}", "green" + ) + print(saved) + except Exception as error: + print( + colored(f"Error saving agent state: {error}", "red") + ) def state_to_str(self): """Transform the JSON into a string""" - state = { - "agent_id": str(self.id), - "agent_name": self.agent_name, - "agent_description": self.agent_description, - "system_prompt": self.system_prompt, - "sop": self.sop, - "short_memory": self.short_memory, - "loop_interval": self.loop_interval, - "retry_attempts": self.retry_attempts, - "retry_interval": self.retry_interval, - "interactive": self.interactive, - "dashboard": self.dashboard, - "dynamic_temperature": self.dynamic_temperature_enabled, - "autosave": self.autosave, - "saved_state_path": self.saved_state_path, - "max_loops": self.max_loops, - } - out = str(state) - return out + try: + state = { + "agent_id": str(self.id), + "agent_name": self.agent_name, + "agent_description": self.agent_description, + "system_prompt": self.system_prompt, + "sop": self.sop, + "short_memory": self.short_memory, + "loop_interval": self.loop_interval, + "retry_attempts": self.retry_attempts, + "retry_interval": self.retry_interval, + "interactive": self.interactive, + "dashboard": self.dashboard, + "dynamic_temperature": ( + self.dynamic_temperature_enabled + ), + "autosave": self.autosave, + "saved_state_path": self.saved_state_path, + "max_loops": self.max_loops, + } + out = str(state) + return out + except Exception as error: + print( + colored( + f"Error transforming state to string: {error}", + "red", + ) + ) def load_state(self, file_path: str): """ @@ -1358,6 +1449,7 @@ class Agent: # Response ‘‘‘ """ + return PROMPT def self_healing(self, **kwargs): """ diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index cba591da..93165aee 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -2,7 +2,7 @@ import concurrent.futures import json from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Union -import logging + from termcolor import colored from swarms.structs.agent import Agent @@ -43,6 +43,7 @@ class Task: kwargs: Dict[str, Any] = field(default_factory=dict) result: Any = None history: List[Any] = field(default_factory=list) + # logger = logging.getLogger(__name__) def execute(self): """ @@ -158,14 +159,11 @@ class SequentialWorkflow: def reset_workflow(self) -> None: """Resets the workflow by clearing the results of each task.""" try: - for task in self.tasks: task.result = None except Exception as error: print( - colored( - f"Error resetting workflow: {error}", "red" - ), + colored(f"Error resetting workflow: {error}", "red"), ) def get_task_results(self) -> Dict[str, Any]: @@ -188,14 +186,17 @@ class SequentialWorkflow: def remove_task(self, task: str) -> None: """Remove tasks from sequential workflow""" - try: + try: self.tasks = [ - task for task in self.tasks if task.description != task + task + for task in self.tasks + if task.description != task ] except Exception as error: print( colored( - f"Error removing task from workflow: {error}", "red" + f"Error removing task from workflow: {error}", + "red", ), ) @@ -238,6 +239,44 @@ class SequentialWorkflow: ), ) + def delete_task(self, task: str) -> None: + """ + Delete a task from the workflow. + + Args: + task (str): The description of the task to delete. + + Raises: + ValueError: If the task is not found in the workflow. + + Examples: + >>> from swarms.models import OpenAIChat + >>> from swarms.structs import SequentialWorkflow + >>> llm = OpenAIChat(openai_api_key="") + >>> workflow = SequentialWorkflow(max_loops=1) + >>> workflow.add("What's the weather in miami", llm) + >>> workflow.add("Create a report on these metrics", llm) + >>> workflow.delete_task("What's the weather in miami") + >>> workflow.tasks + [Task(description='Create a report on these metrics', agent=Agent(llm=OpenAIChat(openai_api_key=''), max_loops=1, dashboard=False), args=[], kwargs={}, result=None, history=[])] + """ + try: + for task in self.tasks: + if task.description == task: + self.tasks.remove(task) + break + else: + raise ValueError( + f"Task {task} not found in workflow." + ) + except Exception as error: + print( + colored( + f"Error deleting task from workflow: {error}", + "red", + ), + ) + def concurrent_run(self): """ Concurrently run the workflow using a pool of workers. @@ -384,7 +423,6 @@ class SequentialWorkflow: def add_objective_to_workflow(self, task: str, **kwargs) -> None: """Adds an objective to the workflow.""" try: - print( colored( """ @@ -430,7 +468,6 @@ class SequentialWorkflow: """ try: - filepath = filepath or self.restore_state_filepath with open(filepath, "r") as f: @@ -538,14 +575,16 @@ class SequentialWorkflow: # Ensure that 'task' is provided in the kwargs if "task" not in task.kwargs: raise ValueError( - "The 'task' argument is required for" - " the Agent agent execution in" - f" '{task.description}'" + "The 'task' argument is required" + " for the Agent agent execution" + f" in '{task.description}'" ) # Separate the 'task' argument from other kwargs flow_task_arg = task.kwargs.pop("task") task.result = await task.agent.arun( - flow_task_arg, *task.args, **task.kwargs + flow_task_arg, + *task.args, + **task.kwargs, ) else: # If it's not a Agent instance, call the agent directly @@ -580,4 +619,4 @@ class SequentialWorkflow: "red", attrs=["bold", "underline"], ) - ) \ No newline at end of file + )