pull/59/head
Kye 1 year ago
parent 83baf833c7
commit 0914871ad6

@ -0,0 +1,18 @@
from swarms.agents.base import agent
from swarms.structs.nonlinear_worfklow import NonLinearWorkflow, Task
prompt = "develop a feedforward network in pytorch"
prompt2 = "Develop a self attention using pytorch"
task1 = Task("task1", prompt)
task2 = Task("task2", prompt2, parents=[task1])
#add tasks to workflow
workflow = NonLinearWorkflow(agent)
#add tasks to tree
workflow.add(task1)
workflow.add(task2)
#run
workflow.run()

@ -21,9 +21,35 @@ class Task:
class NonLinearWorkflow: class NonLinearWorkflow:
"""
NonLinearWorkflow constructs a non sequential DAG of tasks to be executed by agents
Architecture:
NonLinearWorkflow = Task + Agent + Executor
ASCII Diagram:
+-------------------+
| NonLinearWorkflow |
+-------------------+
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
+-------------------+
"""
def __init__( def __init__(
self, self,
agents, agents,
iters_per_task
): ):
"""A workflow is a collection of tasks that can be executed in parallel or sequentially.""" """A workflow is a collection of tasks that can be executed in parallel or sequentially."""
super().__init__() super().__init__()

@ -3,13 +3,12 @@ from __future__ import annotations
import json import json
import pprint import pprint
import uuid import uuid
from abc import ABC from abc import ABC, abstractmethod
from enum import Enum from enum import Enum
from typing import Any, Optional from typing import Any, List, Optional, Union
from swarms.artifacts.main import Artifact
from pydantic import BaseModel, Field, StrictStr, conlist from pydantic import BaseModel, Field, StrictStr, conlist
from swarms.artifacts.main import Artifact
from swarms.artifacts.error_artifact import ErrorArtifact from swarms.artifacts.error_artifact import ErrorArtifact
@ -20,37 +19,37 @@ class BaseTask(ABC):
FINISHED = 3 FINISHED = 3
def __init__(self): def __init__(self):
self.id = uuid.uuid4().hex self.id: str = uuid.uuid4().hex
self.state = self.State.PENDING self.state: BaseTask.State = self.State.PENDING
self.parent_ids = [] self.parent_ids: List[str] = []
self.child_ids = [] self.child_ids: List[str] = []
self.output = None self.output: Optional[Union[Artifact, ErrorArtifact]] = None
self.structure = None self.structure: Optional['Structure'] = None
@property @property
# @abstractmethod @abstractmethod
def input(self): def input(self) -> Any:
pass pass
@property @property
def parents(self): def parents(self) -> List[BaseTask]:
return [self.structure.find_task(parent_id) for parent_id in self.parent_ids] return [self.structure.find_task(parent_id) for parent_id in self.parent_ids]
@property @property
def children(self): def children(self) -> List[BaseTask]:
return [self.structure.find_task(child_id) for child_id in self.child_ids] return [self.structure.find_task(child_id) for child_id in self.child_ids]
def __rshift__(self, child): def __rshift__(self, child: BaseTask) -> BaseTask:
return self.add_child(child) return self.add_child(child)
def __lshift__(self, child): def __lshift__(self, child: BaseTask) -> BaseTask:
return self.add_parent(child) return self.add_parent(child)
def preprocess(self, structure): def preprocess(self, structure: 'Structure') -> BaseTask:
self.structure = structure self.structure = structure
return self return self
def add_child(self, child): def add_child(self, child: BaseTask) -> BaseTask:
if self.structure: if self.structure:
child.structure = self.structure child.structure = self.structure
elif child.structure: elif child.structure:
@ -70,7 +69,7 @@ class BaseTask(ABC):
return child return child
def add_parent(self, parent): def add_parent(self, parent: BaseTask) -> BaseTask:
if self.structure: if self.structure:
parent.structure = self.structure parent.structure = self.structure
elif parent.structure: elif parent.structure:
@ -90,22 +89,22 @@ class BaseTask(ABC):
return parent return parent
def is_pending(self): def is_pending(self) -> bool:
return self.state == self.State.PENDING return self.state == self.State.PENDING
def is_finished(self): def is_finished(self) -> bool:
return self.state == self.State.FINISHED return self.state == self.State.FINISHED
def is_executing(self): def is_executing(self) -> bool:
return self.state == self.State.EXECUTING return self.state == self.State.EXECUTING
def before_run(self): def before_run(self) -> None:
pass pass
def after_run(self): def after_run(self) -> None:
pass pass
def execute(self): def execute(self) -> Optional[Union[Artifact, ErrorArtifact]]:
try: try:
self.state = self.State.EXECUTING self.state = self.State.EXECUTING
self.before_run() self.before_run()
@ -117,23 +116,19 @@ class BaseTask(ABC):
self.state = self.State.FINISHED self.state = self.State.FINISHED
return self.output return self.output
def can_execute(self): def can_execute(self) -> bool:
return self.state == self.State.PENDING and all(parent.is_finished() for parent in self.parents) return self.state == self.State.PENDING and all(parent.is_finished() for parent in self.parents)
def reset(self): def reset(self) -> BaseTask:
self.state = self.State.PENDING self.state = self.State.PENDING
self.output = None self.output = None
return self return self
# @abstractmethod @abstractmethod
def run(self): def run(self) -> Optional[Union[Artifact, ErrorArtifact]]:
pass pass
class Task(BaseModel): class Task(BaseModel):
input: Optional[StrictStr] = Field( input: Optional[StrictStr] = Field(
None, None,
@ -147,66 +142,37 @@ class Task(BaseModel):
..., ...,
description="ID of the task" description="ID of the task"
) )
artifacts: conlist(Artifact) = Field( artifacts: conlist(Artifact, min_items=1) = Field(
..., ...,
description="A list of artifacts that the task has been produced" description="A list of artifacts that the task has been produced"
) )
__properties = ["input", "additional_input", "task_id", "artifact"]
class Config: class Config:
#pydantic config
allow_population_by_field_name = True allow_population_by_field_name = True
validate_assignment = True validate_assignment = True
def to_str(self) -> str: def to_str(self) -> str:
"""Returns the str representation of the model using alias"""
return pprint.pformat(self.dict(by_alias=True)) return pprint.pformat(self.dict(by_alias=True))
def to_json(self) -> str: def to_json(self) -> str:
"""Returns the JSON representation of the model using alias""" return json.dumps(self.dict(by_alias=True, exclude_none=True))
return json.dumps(self.to_dict())
@classmethod @classmethod
def from_json(cls, json_str: str) -> Task: def from_json(cls, json_str: str) -> 'Task':
"""Create an instance of Task from a json string""" return cls.parse_raw(json_str)
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> dict:
def to_dict(self): _dict = self.dict(by_alias=True, exclude_none=True)
"""Returns the dict representation of the model using alias"""
_dict = self.dict(by_alias=True, exclude={}, exclude_none=True)
_items =[]
if self.artifacts: if self.artifacts:
for _item in self.artifacts: _dict["artifacts"] = [artifact.dict(by_alias=True, exclude_none=True) for artifact in self.artifacts]
if _item:
_items.append(_item.to_dict())
_dict["artifacts"] = _items
#set to None if additional input is None
# and __fields__set contains the field
if self.additional_input is None and "additional_input" in self.__fields__set__:
_dict["additional_input"] = None
return _dict return _dict
@classmethod @classmethod
def from_dict(cls, obj: dict) -> Task: def from_dict(cls, obj: dict) -> 'Task':
"""Create an instance of Task from dict"""
if obj is None: if obj is None:
return None return None
if not isinstance(obj, dict): if not isinstance(obj, dict):
return Task.parse_obj(obj) raise ValueError("Input must be a dictionary.")
if 'artifacts' in obj:
_obj = Task.parse_obj( obj['artifacts'] = [Artifact.parse_obj(artifact) for artifact in obj['artifacts']]
{ return cls.parse_obj(obj)
"input": obj.get("input"),
"additional_input": obj.get("additional_input"),
"task_id": obj.get("task_id"),
"artifacts": [
Artifact.from_dict(_item) for _item in obj.get("artifacts")
]
if obj.get("artifacts") is not None
else None,
}
)
Loading…
Cancel
Save