non linear workflow

Former-commit-id: 83baf833c7
discord-bot-framework
Kye 1 year ago
parent fcc0653488
commit 7635284e8d

@ -19,3 +19,62 @@ class Task:
def execute(self):
raise NotImplementedError
class NonLinearWorkflow:
def __init__(
self,
agents,
):
"""A workflow is a collection of tasks that can be executed in parallel or sequentially."""
super().__init__()
self.executor = ThreadPoolExecutor()
self.agents = agents
self.tasks = []
def add(self, task: Task):
"""Add a task to the workflow"""
assert isinstance(
task,
Task
), "Input must be an nstance of Task"
self.tasks.append(task)
return task
def run(self):
"""Run the workflow"""
ordered_tasks = self.ordered_tasks
exit_loop = False
while not self.is_finished() and not exit_loop:
futures_list = {}
for task in ordered_tasks:
if task.can_execute:
future = self.executor.submit(self.agents.run, task.task_string)
futures_list[future] = task
for future in as_completed(futures_list):
if isinstance(future.result(), Exception):
exit_loop = True
break
return self.output_tasks()
def output_tasks(self) -> List[Task]:
"""Output tasks from the workflow"""
return [task for task in self.tasks if not task.children]
def to_graph(self) -> Dict[str, set[str]]:
"""Convert the workflow to a graph"""
graph = {
task.id: set(child.id for child in task.children) for task in self.tasks
}
return graph
def order_tasks(self) -> List[Task]:
"""Order the tasks USING TOPOLOGICAL SORTING"""
task_order = TopologicalSorter(
self.to_graph()
).static_order()
return [
self.find_task(task_id) for task_id in task_order
]

@ -134,13 +134,6 @@ class BaseTask(ABC):
class Task(BaseModel):
input: Optional[StrictStr] = Field(
None,

@ -6,6 +6,7 @@ from typing import Any, Dict, List, Optional
from swarms.artifacts.error_artifact import ErrorArtifact
from swarms.structs.task import BaseTask
class StringTask(BaseTask):
def __init__(self, task):
super().__init__()
@ -43,10 +44,10 @@ class Workflow:
"""
def __init__(
self,
llm,
agent,
parallel: bool = False
):
self.llm = llm
self.agent = agent
self.tasks: List[BaseTask] = []
self.parallel = parallel

Loading…
Cancel
Save