From 61a06d0eddff1e4eca6775df12aaabe800ff23d6 Mon Sep 17 00:00:00 2001 From: Kye Date: Sun, 3 Dec 2023 01:41:42 -0800 Subject: [PATCH] [FEAT][SequentialWorkflow Concurrency] [FEAT][SequentialWorkflow Error handling] --- pyproject.toml | 2 +- swarms/structs/sequential_workflow.py | 157 +++++++++++++++++--------- 2 files changed, 103 insertions(+), 56 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b630536c..f051bdb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "2.5.6" +version = "2.5.7" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index f48adb6f..01c569f6 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -1,22 +1,8 @@ -""" -TODO: -- Add a method to update the arguments of a task -- Add a method to get the results of each task -- Add a method to get the results of a specific task -- Add a method to get the results of the workflow -- Add a method to get the results of the workflow as a dataframe - - -- Add a method to run the workflow in parallel with a pool of workers and a queue and a dashboard -- Add a dashboard to visualize the workflow -- Add async support -- Add context manager support -- Add workflow history -""" +import concurrent.futures import json from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Union - +import logging from termcolor import colored from swarms.structs.agent import Agent @@ -118,11 +104,11 @@ class SequentialWorkflow: """ + name: str = None + description: str = None tasks: List[Task] = field(default_factory=list) max_loops: int = 1 autosave: bool = False - name: str = (None,) - description: str = (None,) saved_state_filepath: Optional[str] = ( "sequential_workflow_state.json" ) @@ -146,21 +132,28 @@ class SequentialWorkflow: *args: Additional arguments to pass to the task execution. **kwargs: Additional keyword arguments to pass to the task execution. """ - # If the agent is a Agent instance, we include the task in kwargs for Agent.run() - if isinstance(agent, Agent): - kwargs["task"] = ( - task # Set the task as a keyword argument for Agent - ) + try: + # If the agent is a Agent instance, we include the task in kwargs for Agent.run() + if isinstance(agent, Agent): + kwargs["task"] = ( + task # Set the task as a keyword argument for Agent + ) - # Append the task to the tasks list - self.tasks.append( - Task( - description=task, - agent=agent, - args=list(args), - kwargs=kwargs, + # Append the task to the tasks list + self.tasks.append( + Task( + description=task, + agent=agent, + args=list(args), + kwargs=kwargs, + ) + ) + except Exception as error: + print( + colored( + f"Error adding task to workflow: {error}", "red" + ), ) - ) def reset_workflow(self) -> None: """Resets the workflow by clearing the results of each task.""" @@ -205,12 +198,57 @@ class SequentialWorkflow: {'max_tokens': 1000} """ - for task in self.tasks: - if task.description == task: - task.kwargs.update(updates) - break - else: - raise ValueError(f"Task {task} not found in workflow.") + try: + for task in self.tasks: + if task.description == task: + task.kwargs.update(updates) + break + else: + raise ValueError( + f"Task {task} not found in workflow." + ) + except Exception as error: + print( + colored( + f"Error updating task in workflow: {error}", "red" + ), + ) + + def concurrent_run(self): + """ + Concurrently run the workflow using a pool of workers. + + Examples: + >>> from swarms.models import OpenAIChat + >>> from swarms.structs import SequentialWorkflow + >>> llm = OpenAIChat(openai_api_key="") + >>> workflow = SequentialWorkflow(max_loops=1) + + """ + try: + with concurrent.futures.ThreadPoolExecutor() as executor: + futures_to_task = { + executor.submit(task.run): task + for task in self.tasks + } + results = [] + for future in concurrent.futures.as_completed( + futures_to_task + ): + task = futures_to_task[future] + + try: + result = future.result() + except Exception as error: + print(f"Error running workflow: {error}") + else: + results.append(result) + print( + f"Task {task} completed successfully with" + f" result: {result}" + ) + except Exception as error: + print(colored(f"Error running workflow: {error}", "red")) def save_workflow_state( self, @@ -232,26 +270,35 @@ class SequentialWorkflow: >>> workflow.add("Create a report on these metrics", llm) >>> workflow.save_workflow_state("sequential_workflow_state.json") """ - filepath = filepath or self.saved_state_filepath - - with open(filepath, "w") as f: - # Saving the state as a json for simplicuty - state = { - "tasks": [ - { - "description": task.description, - "args": task.args, - "kwargs": task.kwargs, - "result": task.result, - "history": task.history, - } - for task in self.tasks - ], - "max_loops": self.max_loops, - } - json.dump(state, f, indent=4) + try: + filepath = filepath or self.saved_state_filepath + + with open(filepath, "w") as f: + # Saving the state as a json for simplicuty + state = { + "tasks": [ + { + "description": task.description, + "args": task.args, + "kwargs": task.kwargs, + "result": task.result, + "history": task.history, + } + for task in self.tasks + ], + "max_loops": self.max_loops, + } + json.dump(state, f, indent=4) + except Exception as error: + print( + colored( + f"Error saving workflow state: {error}", + "red", + ) + ) def workflow_bootup(self, **kwargs) -> None: + """Boots up the workflow.""" print( colored( """