[CODE QUALITY]

pull/343/head
Kye 1 year ago
parent 2f48cfc071
commit db2dbf36db

@ -1,6 +1,7 @@
from swarms.structs.agent import Agent
from swarms.structs.autoscaler import AutoScaler
from swarms.structs.base_swarm import AbstractSwarm
from swarms.structs.base_workflow import BaseWorkflow
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.conversation import Conversation
from swarms.structs.groupchat import GroupChat, GroupChatManager
@ -49,4 +50,5 @@ __all__ = [
"ConcurrentWorkflow",
"RecursiveWorkflow",
"NonlinearWorkflow",
"BaseWorkflow",
]

@ -15,7 +15,7 @@ class BaseWorkflow(BaseStructure):
task_pool (list): A list to store tasks.
Methods:
add(task: Task = None, tasks: List[Task] = None, *args, **kwargs):
add(task: Task = None, tasks: List[Task] = None, *args, **kwargs):
Adds a task or a list of tasks to the task pool.
run():
Abstract method to run the workflow.
@ -24,8 +24,14 @@ class BaseWorkflow(BaseStructure):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task_pool = []
def add(self, task: Task = None, tasks: List[Task] = None, *args, **kwargs):
def add(
self,
task: Task = None,
tasks: List[Task] = None,
*args,
**kwargs,
):
"""
Adds a task or a list of tasks to the task pool.
@ -41,21 +47,23 @@ class BaseWorkflow(BaseStructure):
elif tasks:
self.task_pool.extend(tasks)
else:
raise ValueError("You must provide a task or a list of tasks")
raise ValueError(
"You must provide a task or a list of tasks"
)
def run(self):
"""
Abstract method to run the workflow.
"""
raise NotImplementedError("You must implement this method")
def __sequential_loop(self):
"""
Abstract method for the sequential loop.
"""
# raise NotImplementedError("You must implement this method")
pass
def __log(self, message: str):
"""
Logs a message if verbose mode is enabled.
@ -65,13 +73,13 @@ class BaseWorkflow(BaseStructure):
"""
if self.verbose:
print(message)
def __str__(self):
return f"Workflow with {len(self.task_pool)} tasks"
def __repr__(self):
return f"Workflow with {len(self.task_pool)} tasks"
def reset(self) -> None:
"""Resets the workflow by clearing the results of each task."""
try:
@ -193,7 +201,6 @@ class BaseWorkflow(BaseStructure):
),
)
def save_workflow_state(
self,
filepath: Optional[str] = "sequential_workflow_state.json",
@ -240,7 +247,7 @@ class BaseWorkflow(BaseStructure):
"red",
)
)
def add_objective_to_workflow(self, task: str, **kwargs) -> None:
"""Adds an objective to the workflow."""
try:

Loading…
Cancel
Save