You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
219 lines
5.7 KiB
219 lines
5.7 KiB
from __future__ import annotations
|
|
|
|
import json
|
|
import pprint
|
|
import uuid
|
|
from abc import ABC
|
|
from enum import Enum
|
|
from typing import Any, Optional
|
|
|
|
from swarms.artifacts.main import Artifact
|
|
from pydantic import BaseModel, Field, StrictStr, conlist
|
|
|
|
from swarms.artifacts.error_artifact import ErrorArtifact
|
|
|
|
|
|
class BaseTask(ABC):
|
|
class State(Enum):
|
|
PENDING = 1
|
|
EXECUTING = 2
|
|
FINISHED = 3
|
|
|
|
def __init__(self):
|
|
self.id = uuid.uuid4().hex
|
|
self.state = self.State.PENDING
|
|
self.parent_ids = []
|
|
self.child_ids = []
|
|
self.output = None
|
|
self.structure = None
|
|
|
|
@property
|
|
# @abstractmethod
|
|
def input(self):
|
|
pass
|
|
|
|
@property
|
|
def parents(self):
|
|
return [self.structure.find_task(parent_id) for parent_id in self.parent_ids]
|
|
|
|
@property
|
|
def children(self):
|
|
return [self.structure.find_task(child_id) for child_id in self.child_ids]
|
|
|
|
def __rshift__(self, child):
|
|
return self.add_child(child)
|
|
|
|
def __lshift__(self, child):
|
|
return self.add_parent(child)
|
|
|
|
def preprocess(self, structure):
|
|
self.structure = structure
|
|
return self
|
|
|
|
def add_child(self, child):
|
|
if self.structure:
|
|
child.structure = self.structure
|
|
elif child.structure:
|
|
self.structure = child.structure
|
|
|
|
if child not in self.structure.tasks:
|
|
self.structure.tasks.append(child)
|
|
|
|
if self not in self.structure.tasks:
|
|
self.structure.tasks.append(self)
|
|
|
|
if child.id not in self.child_ids:
|
|
self.child_ids.append(child.id)
|
|
|
|
if self.id not in child.parent_ids:
|
|
child.parent_ids.append(self.id)
|
|
|
|
return child
|
|
|
|
def add_parent(self, parent):
|
|
if self.structure:
|
|
parent.structure = self.structure
|
|
elif parent.structure:
|
|
self.structure = parent.structure
|
|
|
|
if parent not in self.structure.tasks:
|
|
self.structure.tasks.append(parent)
|
|
|
|
if self not in self.structure.tasks:
|
|
self.structure.tasks.append(self)
|
|
|
|
if parent.id not in self.parent_ids:
|
|
self.parent_ids.append(parent.id)
|
|
|
|
if self.id not in parent.child_ids:
|
|
parent.child_ids.append(self.id)
|
|
|
|
return parent
|
|
|
|
def is_pending(self):
|
|
return self.state == self.State.PENDING
|
|
|
|
def is_finished(self):
|
|
return self.state == self.State.FINISHED
|
|
|
|
def is_executing(self):
|
|
return self.state == self.State.EXECUTING
|
|
|
|
def before_run(self):
|
|
pass
|
|
|
|
def after_run(self):
|
|
pass
|
|
|
|
def execute(self):
|
|
try:
|
|
self.state = self.State.EXECUTING
|
|
self.before_run()
|
|
self.output = self.run()
|
|
self.after_run()
|
|
except Exception as e:
|
|
self.output = ErrorArtifact(str(e))
|
|
finally:
|
|
self.state = self.State.FINISHED
|
|
return self.output
|
|
|
|
def can_execute(self):
|
|
return self.state == self.State.PENDING and all(parent.is_finished() for parent in self.parents)
|
|
|
|
def reset(self):
|
|
self.state = self.State.PENDING
|
|
self.output = None
|
|
return self
|
|
|
|
# @abstractmethod
|
|
def run(self):
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Task(BaseModel):
|
|
input: Optional[StrictStr] = Field(
|
|
None,
|
|
description="Input prompt for the task"
|
|
)
|
|
additional_input: Optional[Any] = Field(
|
|
None,
|
|
description="Input parameters for the task. Any value is allowed"
|
|
)
|
|
task_id: StrictStr = Field(
|
|
...,
|
|
description="ID of the task"
|
|
)
|
|
artifacts: conlist(Artifact) = Field(
|
|
...,
|
|
description="A list of artifacts that the task has been produced"
|
|
)
|
|
|
|
__properties = ["input", "additional_input", "task_id", "artifact"]
|
|
|
|
class Config:
|
|
#pydantic config
|
|
|
|
allow_population_by_field_name = True
|
|
validate_assignment = True
|
|
|
|
def to_str(self) -> str:
|
|
"""Returns the str representation of the model using alias"""
|
|
return pprint.pformat(self.dict(by_alias=True))
|
|
|
|
def to_json(self) -> str:
|
|
"""Returns the JSON representation of the model using alias"""
|
|
return json.dumps(self.to_dict())
|
|
|
|
@classmethod
|
|
def from_json(cls, json_str: str) -> Task:
|
|
"""Create an instance of Task from a json string"""
|
|
return cls.from_dict(json.loads(json_str))
|
|
|
|
def to_dict(self):
|
|
"""Returns the dict representation of the model using alias"""
|
|
_dict = self.dict(by_alias=True, exclude={}, exclude_none=True)
|
|
_items =[]
|
|
if self.artifacts:
|
|
for _item in self.artifacts:
|
|
if _item:
|
|
_items.append(_item.to_dict())
|
|
_dict["artifacts"] = _items
|
|
#set to None if additional input is None
|
|
# and __fields__set contains the field
|
|
if self.additional_input is None and "additional_input" in self.__fields__set__:
|
|
_dict["additional_input"] = None
|
|
|
|
return _dict
|
|
|
|
@classmethod
|
|
def from_dict(cls, obj: dict) -> Task:
|
|
"""Create an instance of Task from dict"""
|
|
if obj is None:
|
|
return None
|
|
|
|
if not isinstance(obj, dict):
|
|
return Task.parse_obj(obj)
|
|
|
|
_obj = Task.parse_obj(
|
|
{
|
|
"input": obj.get("input"),
|
|
"additional_input": obj.get("additional_input"),
|
|
"task_id": obj.get("task_id"),
|
|
"artifacts": [
|
|
Artifact.from_dict(_item) for _item in obj.get("artifacts")
|
|
]
|
|
if obj.get("artifacts") is not None
|
|
else None,
|
|
}
|
|
) |