From f54ae940f434f0146f46ad74826218a6bdec45e7 Mon Sep 17 00:00:00 2001 From: Kye Date: Sun, 3 Dec 2023 01:44:44 -0800 Subject: [PATCH] [FEAT][SequentialWorkflow handling] --- swarms/structs/sequential_workflow.py | 199 ++++++++++++++++---------- 1 file changed, 127 insertions(+), 72 deletions(-) diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index 01c569f6..cba591da 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -157,8 +157,16 @@ class SequentialWorkflow: def reset_workflow(self) -> None: """Resets the workflow by clearing the results of each task.""" - for task in self.tasks: - task.result = None + try: + + for task in self.tasks: + task.result = None + except Exception as error: + print( + colored( + f"Error resetting workflow: {error}", "red" + ), + ) def get_task_results(self) -> Dict[str, Any]: """ @@ -167,13 +175,29 @@ class SequentialWorkflow: Returns: Dict[str, Any]: The results of each task in the workflow """ - return {task.description: task.result for task in self.tasks} + try: + return { + task.description: task.result for task in self.tasks + } + except Exception as error: + print( + colored( + f"Error getting task results: {error}", "red" + ), + ) def remove_task(self, task: str) -> None: """Remove tasks from sequential workflow""" - self.tasks = [ - task for task in self.tasks if task.description != task - ] + try: + self.tasks = [ + task for task in self.tasks if task.description != task + ] + except Exception as error: + print( + colored( + f"Error removing task from workflow: {error}", "red" + ), + ) def update_task(self, task: str, **updates) -> None: """ @@ -359,22 +383,31 @@ class SequentialWorkflow: def add_objective_to_workflow(self, task: str, **kwargs) -> None: """Adds an objective to the workflow.""" - print( - colored( - """ - Adding Objective to Workflow...""", - "green", - attrs=["bold", "underline"], + try: + + print( + colored( + """ + Adding Objective to Workflow...""", + "green", + attrs=["bold", "underline"], + ) ) - ) - task = Task( - description=task, - agent=kwargs["agent"], - args=list(kwargs["args"]), - kwargs=kwargs["kwargs"], - ) - self.tasks.append(task) + task = Task( + description=task, + agent=kwargs["agent"], + args=list(kwargs["args"]), + kwargs=kwargs["kwargs"], + ) + self.tasks.append(task) + except Exception as error: + print( + colored( + f"Error adding objective to workflow: {error}", + "red", + ) + ) def load_workflow_state( self, filepath: str = None, **kwargs @@ -396,22 +429,31 @@ class SequentialWorkflow: >>> workflow.load_workflow_state("sequential_workflow_state.json") """ - filepath = filepath or self.restore_state_filepath - - with open(filepath, "r") as f: - state = json.load(f) - self.max_loops = state["max_loops"] - self.tasks = [] - for task_state in state["tasks"]: - task = Task( - description=task_state["description"], - agent=task_state["agent"], - args=task_state["args"], - kwargs=task_state["kwargs"], - result=task_state["result"], - history=task_state["history"], + try: + + filepath = filepath or self.restore_state_filepath + + with open(filepath, "r") as f: + state = json.load(f) + self.max_loops = state["max_loops"] + self.tasks = [] + for task_state in state["tasks"]: + task = Task( + description=task_state["description"], + agent=task_state["agent"], + args=task_state["args"], + kwargs=task_state["kwargs"], + result=task_state["result"], + history=task_state["history"], + ) + self.tasks.append(task) + except Exception as error: + print( + colored( + f"Error loading workflow state: {error}", + "red", ) - self.tasks.append(task) + ) def run(self) -> None: """ @@ -486,43 +528,56 @@ class SequentialWorkflow: ValueError: If a Agent instance is used as a task and the 'task' argument is not provided. """ - for _ in range(self.max_loops): - for task in self.tasks: - # Check if the current task can be executed - if task.result is None: - # Check if the agent is a Agent and a 'task' argument is needed - if isinstance(task.agent, Agent): - # Ensure that 'task' is provided in the kwargs - if "task" not in task.kwargs: - raise ValueError( - "The 'task' argument is required for" - " the Agent agent execution in" - f" '{task.description}'" + try: + for _ in range(self.max_loops): + for task in self.tasks: + # Check if the current task can be executed + if task.result is None: + # Check if the agent is a Agent and a 'task' argument is needed + if isinstance(task.agent, Agent): + # Ensure that 'task' is provided in the kwargs + if "task" not in task.kwargs: + raise ValueError( + "The 'task' argument is required for" + " the Agent agent execution in" + f" '{task.description}'" + ) + # Separate the 'task' argument from other kwargs + flow_task_arg = task.kwargs.pop("task") + task.result = await task.agent.arun( + flow_task_arg, *task.args, **task.kwargs ) - # Separate the 'task' argument from other kwargs - flow_task_arg = task.kwargs.pop("task") - task.result = await task.agent.arun( - flow_task_arg, *task.args, **task.kwargs - ) - else: - # If it's not a Agent instance, call the agent directly - task.result = await task.agent( - *task.args, **task.kwargs - ) - - # Pass the result as an argument to the next task if it exists - next_task_index = self.tasks.index(task) + 1 - if next_task_index < len(self.tasks): - next_task = self.tasks[next_task_index] - if isinstance(next_task.agent, Agent): - # For Agent flows, 'task' should be a keyword argument - next_task.kwargs["task"] = task.result else: - # For other callable flows, the result is added to args - next_task.args.insert(0, task.result) + # If it's not a Agent instance, call the agent directly + task.result = await task.agent( + *task.args, **task.kwargs + ) - # Autosave the workflow state - if self.autosave: - self.save_workflow_state( - "sequential_workflow_state.json" - ) + # Pass the result as an argument to the next task if it exists + next_task_index = self.tasks.index(task) + 1 + if next_task_index < len(self.tasks): + next_task = self.tasks[next_task_index] + if isinstance(next_task.agent, Agent): + # For Agent flows, 'task' should be a keyword argument + next_task.kwargs["task"] = task.result + else: + # For other callable flows, the result is added to args + next_task.args.insert(0, task.result) + + # Autosave the workflow state + if self.autosave: + self.save_workflow_state( + "sequential_workflow_state.json" + ) + except Exception as e: + print( + colored( + ( + "Error initializing the Sequential workflow:" + f" {e} try optimizing your inputs like the" + " agent class and task description" + ), + "red", + attrs=["bold", "underline"], + ) + ) \ No newline at end of file