From 83baf833c7791eac38a3c3a7e9da04b649020ce5 Mon Sep 17 00:00:00 2001 From: Kye Date: Fri, 6 Oct 2023 17:45:45 -0400 Subject: [PATCH] non linear workflow --- swarms/structs/nonlinear_workflow.py | 59 ++++++++++++++++++++++++++++ swarms/structs/task.py | 7 ---- swarms/structs/workflow.py | 5 ++- 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/swarms/structs/nonlinear_workflow.py b/swarms/structs/nonlinear_workflow.py index 077b0b7d..2eeee9fc 100644 --- a/swarms/structs/nonlinear_workflow.py +++ b/swarms/structs/nonlinear_workflow.py @@ -19,3 +19,62 @@ class Task: def execute(self): raise NotImplementedError + +class NonLinearWorkflow: + def __init__( + self, + agents, + ): + """A workflow is a collection of tasks that can be executed in parallel or sequentially.""" + super().__init__() + self.executor = ThreadPoolExecutor() + self.agents = agents + self.tasks = [] + + def add(self, task: Task): + """Add a task to the workflow""" + assert isinstance( + task, + Task + ), "Input must be an nstance of Task" + self.tasks.append(task) + return task + + def run(self): + """Run the workflow""" + ordered_tasks = self.ordered_tasks + exit_loop = False + + while not self.is_finished() and not exit_loop: + futures_list = {} + + for task in ordered_tasks: + if task.can_execute: + future = self.executor.submit(self.agents.run, task.task_string) + futures_list[future] = task + + for future in as_completed(futures_list): + if isinstance(future.result(), Exception): + exit_loop = True + break + return self.output_tasks() + + def output_tasks(self) -> List[Task]: + """Output tasks from the workflow""" + return [task for task in self.tasks if not task.children] + + def to_graph(self) -> Dict[str, set[str]]: + """Convert the workflow to a graph""" + graph = { + task.id: set(child.id for child in task.children) for task in self.tasks + } + return graph + + def order_tasks(self) -> List[Task]: + """Order the tasks USING TOPOLOGICAL SORTING""" + task_order = TopologicalSorter( + self.to_graph() + ).static_order() + return [ + self.find_task(task_id) for task_id in task_order + ] \ No newline at end of file diff --git a/swarms/structs/task.py b/swarms/structs/task.py index 03ec48a2..df0ecc85 100644 --- a/swarms/structs/task.py +++ b/swarms/structs/task.py @@ -134,13 +134,6 @@ class BaseTask(ABC): - - - - - - - class Task(BaseModel): input: Optional[StrictStr] = Field( None, diff --git a/swarms/structs/workflow.py b/swarms/structs/workflow.py index 5e4aae15..55167de3 100644 --- a/swarms/structs/workflow.py +++ b/swarms/structs/workflow.py @@ -6,6 +6,7 @@ from typing import Any, Dict, List, Optional from swarms.artifacts.error_artifact import ErrorArtifact from swarms.structs.task import BaseTask + class StringTask(BaseTask): def __init__(self, task): super().__init__() @@ -43,10 +44,10 @@ class Workflow: """ def __init__( self, - llm, + agent, parallel: bool = False ): - self.llm = llm + self.agent = agent self.tasks: List[BaseTask] = [] self.parallel = parallel