diff --git a/docs/swarms/structs/nonlinear_worfklow.py b/docs/swarms/structs/nonlinear_worfklow.py new file mode 100644 index 00000000..e53c8796 --- /dev/null +++ b/docs/swarms/structs/nonlinear_worfklow.py @@ -0,0 +1,18 @@ +from swarms.agents.base import agent +from swarms.structs.nonlinear_worfklow import NonLinearWorkflow, Task + +prompt = "develop a feedforward network in pytorch" +prompt2 = "Develop a self attention using pytorch" + +task1 = Task("task1", prompt) +task2 = Task("task2", prompt2, parents=[task1]) + +#add tasks to workflow +workflow = NonLinearWorkflow(agent) + +#add tasks to tree +workflow.add(task1) +workflow.add(task2) + +#run +workflow.run() \ No newline at end of file diff --git a/swarms/structs/nonlinear_workflow.py b/swarms/structs/nonlinear_workflow.py index 2eeee9fc..ed155f60 100644 --- a/swarms/structs/nonlinear_workflow.py +++ b/swarms/structs/nonlinear_workflow.py @@ -21,9 +21,35 @@ class Task: class NonLinearWorkflow: + """ + NonLinearWorkflow constructs a non sequential DAG of tasks to be executed by agents + + + Architecture: + NonLinearWorkflow = Task + Agent + Executor + + ASCII Diagram: + +-------------------+ + | NonLinearWorkflow | + +-------------------+ + | | + | | + | | + | | + | | + | | + | | + | | + | | + | | + +-------------------+ + + + """ def __init__( self, agents, + iters_per_task ): """A workflow is a collection of tasks that can be executed in parallel or sequentially.""" super().__init__() diff --git a/swarms/structs/task.py b/swarms/structs/task.py index df0ecc85..b1f56fe3 100644 --- a/swarms/structs/task.py +++ b/swarms/structs/task.py @@ -3,13 +3,12 @@ from __future__ import annotations import json import pprint import uuid -from abc import ABC +from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Optional +from typing import Any, List, Optional, Union -from swarms.artifacts.main import Artifact from pydantic import BaseModel, Field, StrictStr, conlist - +from swarms.artifacts.main import Artifact from swarms.artifacts.error_artifact import ErrorArtifact @@ -20,37 +19,37 @@ class BaseTask(ABC): FINISHED = 3 def __init__(self): - self.id = uuid.uuid4().hex - self.state = self.State.PENDING - self.parent_ids = [] - self.child_ids = [] - self.output = None - self.structure = None + self.id: str = uuid.uuid4().hex + self.state: BaseTask.State = self.State.PENDING + self.parent_ids: List[str] = [] + self.child_ids: List[str] = [] + self.output: Optional[Union[Artifact, ErrorArtifact]] = None + self.structure: Optional['Structure'] = None @property - # @abstractmethod - def input(self): + @abstractmethod + def input(self) -> Any: pass @property - def parents(self): + def parents(self) -> List[BaseTask]: return [self.structure.find_task(parent_id) for parent_id in self.parent_ids] @property - def children(self): + def children(self) -> List[BaseTask]: return [self.structure.find_task(child_id) for child_id in self.child_ids] - def __rshift__(self, child): + def __rshift__(self, child: BaseTask) -> BaseTask: return self.add_child(child) - def __lshift__(self, child): + def __lshift__(self, child: BaseTask) -> BaseTask: return self.add_parent(child) - def preprocess(self, structure): + def preprocess(self, structure: 'Structure') -> BaseTask: self.structure = structure return self - def add_child(self, child): + def add_child(self, child: BaseTask) -> BaseTask: if self.structure: child.structure = self.structure elif child.structure: @@ -70,7 +69,7 @@ class BaseTask(ABC): return child - def add_parent(self, parent): + def add_parent(self, parent: BaseTask) -> BaseTask: if self.structure: parent.structure = self.structure elif parent.structure: @@ -90,22 +89,22 @@ class BaseTask(ABC): return parent - def is_pending(self): + def is_pending(self) -> bool: return self.state == self.State.PENDING - def is_finished(self): + def is_finished(self) -> bool: return self.state == self.State.FINISHED - def is_executing(self): + def is_executing(self) -> bool: return self.state == self.State.EXECUTING - def before_run(self): + def before_run(self) -> None: pass - def after_run(self): + def after_run(self) -> None: pass - def execute(self): + def execute(self) -> Optional[Union[Artifact, ErrorArtifact]]: try: self.state = self.State.EXECUTING self.before_run() @@ -117,23 +116,19 @@ class BaseTask(ABC): self.state = self.State.FINISHED return self.output - def can_execute(self): + def can_execute(self) -> bool: return self.state == self.State.PENDING and all(parent.is_finished() for parent in self.parents) - def reset(self): + def reset(self) -> BaseTask: self.state = self.State.PENDING self.output = None return self - # @abstractmethod - def run(self): + @abstractmethod + def run(self) -> Optional[Union[Artifact, ErrorArtifact]]: pass - - - - class Task(BaseModel): input: Optional[StrictStr] = Field( None, @@ -147,66 +142,37 @@ class Task(BaseModel): ..., description="ID of the task" ) - artifacts: conlist(Artifact) = Field( + artifacts: conlist(Artifact, min_items=1) = Field( ..., description="A list of artifacts that the task has been produced" ) - __properties = ["input", "additional_input", "task_id", "artifact"] - class Config: - #pydantic config - allow_population_by_field_name = True validate_assignment = True def to_str(self) -> str: - """Returns the str representation of the model using alias""" return pprint.pformat(self.dict(by_alias=True)) - + def to_json(self) -> str: - """Returns the JSON representation of the model using alias""" - return json.dumps(self.to_dict()) - + return json.dumps(self.dict(by_alias=True, exclude_none=True)) + @classmethod - def from_json(cls, json_str: str) -> Task: - """Create an instance of Task from a json string""" - return cls.from_dict(json.loads(json_str)) - - def to_dict(self): - """Returns the dict representation of the model using alias""" - _dict = self.dict(by_alias=True, exclude={}, exclude_none=True) - _items =[] + def from_json(cls, json_str: str) -> 'Task': + return cls.parse_raw(json_str) + + def to_dict(self) -> dict: + _dict = self.dict(by_alias=True, exclude_none=True) if self.artifacts: - for _item in self.artifacts: - if _item: - _items.append(_item.to_dict()) - _dict["artifacts"] = _items - #set to None if additional input is None - # and __fields__set contains the field - if self.additional_input is None and "additional_input" in self.__fields__set__: - _dict["additional_input"] = None - + _dict["artifacts"] = [artifact.dict(by_alias=True, exclude_none=True) for artifact in self.artifacts] return _dict - + @classmethod - def from_dict(cls, obj: dict) -> Task: - """Create an instance of Task from dict""" + def from_dict(cls, obj: dict) -> 'Task': if obj is None: return None - if not isinstance(obj, dict): - return Task.parse_obj(obj) - - _obj = Task.parse_obj( - { - "input": obj.get("input"), - "additional_input": obj.get("additional_input"), - "task_id": obj.get("task_id"), - "artifacts": [ - Artifact.from_dict(_item) for _item in obj.get("artifacts") - ] - if obj.get("artifacts") is not None - else None, - } - ) \ No newline at end of file + raise ValueError("Input must be a dictionary.") + if 'artifacts' in obj: + obj['artifacts'] = [Artifact.parse_obj(artifact) for artifact in obj['artifacts']] + return cls.parse_obj(obj) \ No newline at end of file