parent
f1b31d4c88
commit
ef2f745ff5
@ -1,81 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
import json
|
|
||||||
import uuid
|
|
||||||
from abc import ABC, abstractmethod
|
|
||||||
from attr import define, field, Factory
|
|
||||||
from marshmallow import class_registry
|
|
||||||
from marshmallow.exceptions import RegistryError
|
|
||||||
|
|
||||||
|
|
||||||
@define
|
|
||||||
class BaseArtifact(ABC):
|
|
||||||
id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True)
|
|
||||||
name: str = field(
|
|
||||||
default=Factory(lambda self: self.id, takes_self=True), kw_only=True
|
|
||||||
)
|
|
||||||
value: any = field()
|
|
||||||
type: str = field(
|
|
||||||
default=Factory(lambda self: self.__class__.__name__, takes_self=True),
|
|
||||||
kw_only=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def value_to_bytes(cls, value: any) -> bytes:
|
|
||||||
if isinstance(value, bytes):
|
|
||||||
return value
|
|
||||||
else:
|
|
||||||
return str(value).encode()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def value_to_dict(cls, value: any) -> dict:
|
|
||||||
if isinstance(value, dict):
|
|
||||||
dict_value = value
|
|
||||||
else:
|
|
||||||
dict_value = json.loads(value)
|
|
||||||
|
|
||||||
return {k: v for k, v in dict_value.items()}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_dict(cls, artifact_dict: dict) -> BaseArtifact:
|
|
||||||
from griptape.schemas import (
|
|
||||||
TextArtifactSchema,
|
|
||||||
InfoArtifactSchema,
|
|
||||||
ErrorArtifactSchema,
|
|
||||||
BlobArtifactSchema,
|
|
||||||
CsvRowArtifactSchema,
|
|
||||||
ListArtifactSchema,
|
|
||||||
)
|
|
||||||
|
|
||||||
class_registry.register("TextArtifact", TextArtifactSchema)
|
|
||||||
class_registry.register("InfoArtifact", InfoArtifactSchema)
|
|
||||||
class_registry.register("ErrorArtifact", ErrorArtifactSchema)
|
|
||||||
class_registry.register("BlobArtifact", BlobArtifactSchema)
|
|
||||||
class_registry.register("CsvRowArtifact", CsvRowArtifactSchema)
|
|
||||||
class_registry.register("ListArtifact", ListArtifactSchema)
|
|
||||||
|
|
||||||
try:
|
|
||||||
return class_registry.get_class(artifact_dict["type"])().load(artifact_dict)
|
|
||||||
except RegistryError:
|
|
||||||
raise ValueError("Unsupported artifact type")
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_json(cls, artifact_str: str) -> BaseArtifact:
|
|
||||||
return cls.from_dict(json.loads(artifact_str))
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return json.dumps(self.to_dict())
|
|
||||||
|
|
||||||
def to_json(self) -> str:
|
|
||||||
return json.dumps(self.to_dict())
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def to_text(self) -> str:
|
|
||||||
...
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def to_dict(self) -> dict:
|
|
||||||
...
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def __add__(self, other: BaseArtifact) -> BaseArtifact:
|
|
||||||
...
|
|
@ -1,19 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
from attr import define, field
|
|
||||||
from swarms.artifacts.base import BaseArtifact
|
|
||||||
|
|
||||||
|
|
||||||
@define(frozen=True)
|
|
||||||
class ErrorArtifact(BaseArtifact):
|
|
||||||
value: str = field(converter=str)
|
|
||||||
|
|
||||||
def __add__(self, other: ErrorArtifact) -> ErrorArtifact:
|
|
||||||
return ErrorArtifact(self.value + other.value)
|
|
||||||
|
|
||||||
def to_text(self) -> str:
|
|
||||||
return self.value
|
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
|
||||||
from griptape.schemas import ErrorArtifactSchema
|
|
||||||
|
|
||||||
return dict(ErrorArtifactSchema().dump(self))
|
|
@ -1,74 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
import pprint
|
|
||||||
import json
|
|
||||||
|
|
||||||
from typing import Optional
|
|
||||||
from pydantic import BaseModel, Field, StrictStr
|
|
||||||
|
|
||||||
|
|
||||||
class Artifact(BaseModel):
|
|
||||||
"""
|
|
||||||
|
|
||||||
Artifact that has the task has been produced
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
-----------
|
|
||||||
|
|
||||||
artifact_id: str
|
|
||||||
ID of the artifact
|
|
||||||
|
|
||||||
file_name: str
|
|
||||||
Filename of the artifact
|
|
||||||
|
|
||||||
relative_path: str
|
|
||||||
Relative path of the artifact
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
artifact_id: StrictStr = Field(..., description="ID of the artifact")
|
|
||||||
file_name: StrictStr = Field(..., description="Filename of the artifact")
|
|
||||||
relative_path: Optional[StrictStr] = Field(
|
|
||||||
None, description="Relative path of the artifact"
|
|
||||||
)
|
|
||||||
__properties = ["artifact_id", "file_name", "relative_path"]
|
|
||||||
|
|
||||||
class Config:
|
|
||||||
"""Pydantic configuration"""
|
|
||||||
|
|
||||||
allow_population_by_field_name = True
|
|
||||||
validate_assignment = True
|
|
||||||
|
|
||||||
def to_str(self) -> str:
|
|
||||||
"""Returns the string representation of the model using alias"""
|
|
||||||
return pprint.pformat(self.dict(by_alias=True))
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_json(cls, json_str: str) -> Artifact:
|
|
||||||
"""Create an instance of Artifact from a json string"""
|
|
||||||
return cls.from_dict(json.loads(json_str))
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
"""Returns the dict representation of the model"""
|
|
||||||
_dict = self.dict(by_alias=True, exclude={}, exclude_none=True)
|
|
||||||
return _dict
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_dict(cls, obj: dict) -> Artifact:
|
|
||||||
"""Create an instance of Artifact from a dict"""
|
|
||||||
|
|
||||||
if obj is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if not isinstance(obj, dict):
|
|
||||||
return Artifact.parse_obj(obj)
|
|
||||||
|
|
||||||
_obj = Artifact.parse_obj(
|
|
||||||
{
|
|
||||||
"artifact_id": obj.get("artifact_id"),
|
|
||||||
"file_name": obj.get("file_name"),
|
|
||||||
"relative_path": obj.get("relative_path"),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
return _obj
|
|
@ -1,6 +1,6 @@
|
|||||||
from swarms.structs.workflow import Workflow
|
# from swarms.structs.workflow import Workflow
|
||||||
from swarms.structs.task import Task
|
# from swarms.structs.task import Task
|
||||||
from swarms.structs.flow import Flow
|
from swarms.structs.flow import Flow
|
||||||
from swarms.structs.sequential_workflow import SequentialWorkflow
|
from swarms.structs.sequential_workflow import SequentialWorkflow
|
||||||
|
|
||||||
__all__ = ["Workflow", "Task", "Flow", "SequentialWorkflow"]
|
__all__ = ["Flow", "SequentialWorkflow"]
|
||||||
|
@ -1,174 +1,174 @@
|
|||||||
from __future__ import annotations
|
# from __future__ import annotations
|
||||||
|
|
||||||
import json
|
# import json
|
||||||
import pprint
|
# import pprint
|
||||||
import uuid
|
# import uuid
|
||||||
from abc import ABC, abstractmethod
|
# from abc import ABC, abstractmethod
|
||||||
from enum import Enum
|
# from enum import Enum
|
||||||
from typing import Any, List, Optional, Union
|
# from typing import Any, List, Optional, Union
|
||||||
|
|
||||||
from pydantic import BaseModel, Field, StrictStr
|
# from pydantic import BaseModel, Field, StrictStr
|
||||||
from swarms.artifacts.main import Artifact
|
# # from swarms.artifacts.main import Artifact
|
||||||
from swarms.artifacts.error_artifact import ErrorArtifact
|
# # from swarms.artifacts.error_artifact import ErrorArtifact
|
||||||
|
|
||||||
|
|
||||||
class BaseTask(ABC):
|
# class BaseTask(ABC):
|
||||||
class State(Enum):
|
# class State(Enum):
|
||||||
PENDING = 1
|
# PENDING = 1
|
||||||
EXECUTING = 2
|
# EXECUTING = 2
|
||||||
FINISHED = 3
|
# FINISHED = 3
|
||||||
|
|
||||||
def __init__(self):
|
# def __init__(self):
|
||||||
self.id: str = uuid.uuid4().hex
|
# self.id: str = uuid.uuid4().hex
|
||||||
self.state: BaseTask.State = self.State.PENDING
|
# self.state: BaseTask.State = self.State.PENDING
|
||||||
self.parent_ids: List[str] = []
|
# self.parent_ids: List[str] = []
|
||||||
self.child_ids: List[str] = []
|
# self.child_ids: List[str] = []
|
||||||
self.output: Optional[Union[Artifact, ErrorArtifact]] = None
|
# self.output = None
|
||||||
self.structure = None
|
# self.structure = None
|
||||||
|
|
||||||
@property
|
# @property
|
||||||
@abstractmethod
|
# @abstractmethod
|
||||||
def input(self) -> Any:
|
# def input(self) -> Any:
|
||||||
pass
|
# pass
|
||||||
|
|
||||||
@property
|
# @property
|
||||||
def parents(self) -> List[BaseTask]:
|
# def parents(self) -> List[BaseTask]:
|
||||||
return [self.structure.find_task(parent_id) for parent_id in self.parent_ids]
|
# return [self.structure.find_task(parent_id) for parent_id in self.parent_ids]
|
||||||
|
|
||||||
@property
|
# @property
|
||||||
def children(self) -> List[BaseTask]:
|
# def children(self) -> List[BaseTask]:
|
||||||
return [self.structure.find_task(child_id) for child_id in self.child_ids]
|
# return [self.structure.find_task(child_id) for child_id in self.child_ids]
|
||||||
|
|
||||||
def __rshift__(self, child: BaseTask) -> BaseTask:
|
# def __rshift__(self, child: BaseTask) -> BaseTask:
|
||||||
return self.add_child(child)
|
# return self.add_child(child)
|
||||||
|
|
||||||
def __lshift__(self, child: BaseTask) -> BaseTask:
|
# def __lshift__(self, child: BaseTask) -> BaseTask:
|
||||||
return self.add_parent(child)
|
# return self.add_parent(child)
|
||||||
|
|
||||||
def preprocess(self, structure) -> BaseTask:
|
# def preprocess(self, structure) -> BaseTask:
|
||||||
self.structure = structure
|
# self.structure = structure
|
||||||
return self
|
# return self
|
||||||
|
|
||||||
def add_child(self, child: BaseTask) -> BaseTask:
|
# def add_child(self, child: BaseTask) -> BaseTask:
|
||||||
if self.structure:
|
# if self.structure:
|
||||||
child.structure = self.structure
|
# child.structure = self.structure
|
||||||
elif child.structure:
|
# elif child.structure:
|
||||||
self.structure = child.structure
|
# self.structure = child.structure
|
||||||
|
|
||||||
if child not in self.structure.tasks:
|
# if child not in self.structure.tasks:
|
||||||
self.structure.tasks.append(child)
|
# self.structure.tasks.append(child)
|
||||||
|
|
||||||
if self not in self.structure.tasks:
|
# if self not in self.structure.tasks:
|
||||||
self.structure.tasks.append(self)
|
# self.structure.tasks.append(self)
|
||||||
|
|
||||||
if child.id not in self.child_ids:
|
# if child.id not in self.child_ids:
|
||||||
self.child_ids.append(child.id)
|
# self.child_ids.append(child.id)
|
||||||
|
|
||||||
if self.id not in child.parent_ids:
|
# if self.id not in child.parent_ids:
|
||||||
child.parent_ids.append(self.id)
|
# child.parent_ids.append(self.id)
|
||||||
|
|
||||||
return child
|
# return child
|
||||||
|
|
||||||
def add_parent(self, parent: BaseTask) -> BaseTask:
|
# def add_parent(self, parent: BaseTask) -> BaseTask:
|
||||||
if self.structure:
|
# if self.structure:
|
||||||
parent.structure = self.structure
|
# parent.structure = self.structure
|
||||||
elif parent.structure:
|
# elif parent.structure:
|
||||||
self.structure = parent.structure
|
# self.structure = parent.structure
|
||||||
|
|
||||||
if parent not in self.structure.tasks:
|
# if parent not in self.structure.tasks:
|
||||||
self.structure.tasks.append(parent)
|
# self.structure.tasks.append(parent)
|
||||||
|
|
||||||
if self not in self.structure.tasks:
|
# if self not in self.structure.tasks:
|
||||||
self.structure.tasks.append(self)
|
# self.structure.tasks.append(self)
|
||||||
|
|
||||||
if parent.id not in self.parent_ids:
|
# if parent.id not in self.parent_ids:
|
||||||
self.parent_ids.append(parent.id)
|
# self.parent_ids.append(parent.id)
|
||||||
|
|
||||||
if self.id not in parent.child_ids:
|
# if self.id not in parent.child_ids:
|
||||||
parent.child_ids.append(self.id)
|
# parent.child_ids.append(self.id)
|
||||||
|
|
||||||
return parent
|
# return parent
|
||||||
|
|
||||||
def is_pending(self) -> bool:
|
# def is_pending(self) -> bool:
|
||||||
return self.state == self.State.PENDING
|
# return self.state == self.State.PENDING
|
||||||
|
|
||||||
def is_finished(self) -> bool:
|
# def is_finished(self) -> bool:
|
||||||
return self.state == self.State.FINISHED
|
# return self.state == self.State.FINISHED
|
||||||
|
|
||||||
def is_executing(self) -> bool:
|
# def is_executing(self) -> bool:
|
||||||
return self.state == self.State.EXECUTING
|
# return self.state == self.State.EXECUTING
|
||||||
|
|
||||||
def before_run(self) -> None:
|
# def before_run(self) -> None:
|
||||||
pass
|
# pass
|
||||||
|
|
||||||
def after_run(self) -> None:
|
# def after_run(self) -> None:
|
||||||
pass
|
# pass
|
||||||
|
|
||||||
def execute(self) -> Optional[Union[Artifact, ErrorArtifact]]:
|
# def execute(self) -> Optional[Union[Artifact, ErrorArtifact]]:
|
||||||
try:
|
# try:
|
||||||
self.state = self.State.EXECUTING
|
# self.state = self.State.EXECUTING
|
||||||
self.before_run()
|
# self.before_run()
|
||||||
self.output = self.run()
|
# self.output = self.run()
|
||||||
self.after_run()
|
# self.after_run()
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
self.output = ErrorArtifact(str(e))
|
# self.output = ErrorArtifact(str(e))
|
||||||
finally:
|
# finally:
|
||||||
self.state = self.State.FINISHED
|
# self.state = self.State.FINISHED
|
||||||
return self.output
|
# return self.output
|
||||||
|
|
||||||
def can_execute(self) -> bool:
|
# def can_execute(self) -> bool:
|
||||||
return self.state == self.State.PENDING and all(
|
# return self.state == self.State.PENDING and all(
|
||||||
parent.is_finished() for parent in self.parents
|
# parent.is_finished() for parent in self.parents
|
||||||
)
|
# )
|
||||||
|
|
||||||
def reset(self) -> BaseTask:
|
# def reset(self) -> BaseTask:
|
||||||
self.state = self.State.PENDING
|
# self.state = self.State.PENDING
|
||||||
self.output = None
|
# self.output = None
|
||||||
return self
|
# return self
|
||||||
|
|
||||||
@abstractmethod
|
# @abstractmethod
|
||||||
def run(self) -> Optional[Union[Artifact, ErrorArtifact]]:
|
# def run(self) -> Optional[Union[Artifact, ErrorArtifact]]:
|
||||||
pass
|
# pass
|
||||||
|
|
||||||
|
|
||||||
class Task(BaseModel):
|
# class Task(BaseModel):
|
||||||
input: Optional[StrictStr] = Field(None, description="Input prompt for the task")
|
# input: Optional[StrictStr] = Field(None, description="Input prompt for the task")
|
||||||
additional_input: Optional[Any] = Field(
|
# additional_input: Optional[Any] = Field(
|
||||||
None, description="Input parameters for the task. Any value is allowed"
|
# None, description="Input parameters for the task. Any value is allowed"
|
||||||
)
|
# )
|
||||||
task_id: StrictStr = Field(..., description="ID of the task")
|
# task_id: StrictStr = Field(..., description="ID of the task")
|
||||||
|
|
||||||
class Config:
|
# class Config:
|
||||||
allow_population_by_field_name = True
|
# allow_population_by_field_name = True
|
||||||
validate_assignment = True
|
# validate_assignment = True
|
||||||
|
|
||||||
def to_str(self) -> str:
|
# def to_str(self) -> str:
|
||||||
return pprint.pformat(self.dict(by_alias=True))
|
# return pprint.pformat(self.dict(by_alias=True))
|
||||||
|
|
||||||
def to_json(self) -> str:
|
# def to_json(self) -> str:
|
||||||
return json.dumps(self.dict(by_alias=True, exclude_none=True))
|
# return json.dumps(self.dict(by_alias=True, exclude_none=True))
|
||||||
|
|
||||||
@classmethod
|
# @classmethod
|
||||||
def from_json(cls, json_str: str) -> "Task":
|
# def from_json(cls, json_str: str) -> "Task":
|
||||||
return cls.parse_raw(json_str)
|
# return cls.parse_raw(json_str)
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
# def to_dict(self) -> dict:
|
||||||
_dict = self.dict(by_alias=True, exclude_none=True)
|
# _dict = self.dict(by_alias=True, exclude_none=True)
|
||||||
if self.artifacts:
|
# if self.artifacts:
|
||||||
_dict["artifacts"] = [
|
# _dict["artifacts"] = [
|
||||||
artifact.dict(by_alias=True, exclude_none=True)
|
# artifact.dict(by_alias=True, exclude_none=True)
|
||||||
for artifact in self.artifacts
|
# for artifact in self.artifacts
|
||||||
]
|
# ]
|
||||||
return _dict
|
# return _dict
|
||||||
|
|
||||||
@classmethod
|
# @classmethod
|
||||||
def from_dict(cls, obj: dict) -> "Task":
|
# def from_dict(cls, obj: dict) -> "Task":
|
||||||
if obj is None:
|
# if obj is None:
|
||||||
return None
|
# return None
|
||||||
if not isinstance(obj, dict):
|
# if not isinstance(obj, dict):
|
||||||
raise ValueError("Input must be a dictionary.")
|
# raise ValueError("Input must be a dictionary.")
|
||||||
if "artifacts" in obj:
|
# if "artifacts" in obj:
|
||||||
obj["artifacts"] = [
|
# obj["artifacts"] = [
|
||||||
Artifact.parse_obj(artifact) for artifact in obj["artifacts"]
|
# Artifact.parse_obj(artifact) for artifact in obj["artifacts"]
|
||||||
]
|
# ]
|
||||||
return cls.parse_obj(obj)
|
# return cls.parse_obj(obj)
|
||||||
|
@ -1,83 +1,84 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import uuid
|
import uuid
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
from typing import Any, Dict, List, Optional
|
# from concurrent.futures import ThreadPoolExecutor
|
||||||
from swarms.structs.task import Task
|
# from typing import Any, Dict, List, Optional
|
||||||
|
# # from swarms.structs.task import Task
|
||||||
|
|
||||||
class Workflow:
|
|
||||||
"""
|
# class Workflow:
|
||||||
Workflows are ideal for prescriptive processes that need to be executed
|
# """
|
||||||
sequentially.
|
# Workflows are ideal for prescriptive processes that need to be executed
|
||||||
They string together multiple tasks of varying types, and can use Short-Term Memory
|
# sequentially.
|
||||||
or pass specific arguments downstream.
|
# They string together multiple tasks of varying types, and can use Short-Term Memory
|
||||||
|
# or pass specific arguments downstream.
|
||||||
Usage
|
|
||||||
llm = LLM()
|
# Usage
|
||||||
workflow = Workflow(llm)
|
# llm = LLM()
|
||||||
|
# workflow = Workflow(llm)
|
||||||
workflow.add("What's the weather in miami")
|
|
||||||
workflow.add("Provide details for {{ parent_output }}")
|
# workflow.add("What's the weather in miami")
|
||||||
workflow.add("Summarize the above information: {{ parent_output}})
|
# workflow.add("Provide details for {{ parent_output }}")
|
||||||
|
# workflow.add("Summarize the above information: {{ parent_output}})
|
||||||
workflow.run()
|
|
||||||
|
# workflow.run()
|
||||||
"""
|
|
||||||
|
# """
|
||||||
def __init__(self, agent, parallel: bool = False):
|
|
||||||
"""__init__"""
|
# def __init__(self, agent, parallel: bool = False):
|
||||||
self.agent = agent
|
# """__init__"""
|
||||||
self.tasks: List[Task] = []
|
# self.agent = agent
|
||||||
self.parallel = parallel
|
# self.tasks: List[Task] = []
|
||||||
|
# self.parallel = parallel
|
||||||
def add(self, task: str) -> Task:
|
|
||||||
"""Add a task"""
|
# def add(self, task: str) -> Task:
|
||||||
task = Task(task_id=uuid.uuid4().hex, input=task)
|
# """Add a task"""
|
||||||
|
# task = Task(task_id=uuid.uuid4().hex, input=task)
|
||||||
if self.last_task():
|
|
||||||
self.last_task().add_child(task)
|
# if self.last_task():
|
||||||
else:
|
# self.last_task().add_child(task)
|
||||||
task.structure = self
|
# else:
|
||||||
self.tasks.append(task)
|
# task.structure = self
|
||||||
return task
|
# self.tasks.append(task)
|
||||||
|
# return task
|
||||||
def first_task(self) -> Optional[Task]:
|
|
||||||
"""Add first task"""
|
# def first_task(self) -> Optional[Task]:
|
||||||
return self.tasks[0] if self.tasks else None
|
# """Add first task"""
|
||||||
|
# return self.tasks[0] if self.tasks else None
|
||||||
def last_task(self) -> Optional[Task]:
|
|
||||||
"""Last task"""
|
# def last_task(self) -> Optional[Task]:
|
||||||
return self.tasks[-1] if self.tasks else None
|
# """Last task"""
|
||||||
|
# return self.tasks[-1] if self.tasks else None
|
||||||
def run(self, task: str) -> Task:
|
|
||||||
"""Run tasks"""
|
# def run(self, task: str) -> Task:
|
||||||
self.add(task)
|
# """Run tasks"""
|
||||||
|
# self.add(task)
|
||||||
if self.parallel:
|
|
||||||
with ThreadPoolExecutor() as executor:
|
# if self.parallel:
|
||||||
list(executor.map(self.__run_from_task, [self.first_task]))
|
# with ThreadPoolExecutor() as executor:
|
||||||
else:
|
# list(executor.map(self.__run_from_task, [self.first_task]))
|
||||||
self.__run_from_task(self.first_task())
|
# else:
|
||||||
|
# self.__run_from_task(self.first_task())
|
||||||
return self.last_task()
|
|
||||||
|
# return self.last_task()
|
||||||
def context(self, task: Task) -> Dict[str, Any]:
|
|
||||||
"""Context in tasks"""
|
# def context(self, task: Task) -> Dict[str, Any]:
|
||||||
return {
|
# """Context in tasks"""
|
||||||
"parent_output": task.parents[0].output
|
# return {
|
||||||
if task.parents and task.parents[0].output
|
# "parent_output": task.parents[0].output
|
||||||
else None,
|
# if task.parents and task.parents[0].output
|
||||||
"parent": task.parents[0] if task.parents else None,
|
# else None,
|
||||||
"child": task.children[0] if task.children else None,
|
# "parent": task.parents[0] if task.parents else None,
|
||||||
}
|
# "child": task.children[0] if task.children else None,
|
||||||
|
# }
|
||||||
def __run_from_task(self, task: Optional[Task]) -> None:
|
|
||||||
"""Run from task"""
|
# def __run_from_task(self, task: Optional[Task]) -> None:
|
||||||
if task is None:
|
# """Run from task"""
|
||||||
return
|
# if task is None:
|
||||||
else:
|
# return
|
||||||
if isinstance(task.execute(), Exception):
|
# else:
|
||||||
return
|
# if isinstance(task.execute(), Exception):
|
||||||
else:
|
# return
|
||||||
self.__run_from_task(next(iter(task.children), None))
|
# else:
|
||||||
|
# self.__run_from_task(next(iter(task.children), None))
|
||||||
|
Loading…
Reference in new issue