[FEAT][SequentialWorkflow handling]

pull/245/head
Kye 1 year ago
parent 61a06d0edd
commit d295cc44fd

@ -157,8 +157,16 @@ class SequentialWorkflow:
def reset_workflow(self) -> None:
"""Resets the workflow by clearing the results of each task."""
for task in self.tasks:
task.result = None
try:
for task in self.tasks:
task.result = None
except Exception as error:
print(
colored(
f"Error resetting workflow: {error}", "red"
),
)
def get_task_results(self) -> Dict[str, Any]:
"""
@ -167,13 +175,29 @@ class SequentialWorkflow:
Returns:
Dict[str, Any]: The results of each task in the workflow
"""
return {task.description: task.result for task in self.tasks}
try:
return {
task.description: task.result for task in self.tasks
}
except Exception as error:
print(
colored(
f"Error getting task results: {error}", "red"
),
)
def remove_task(self, task: str) -> None:
"""Remove tasks from sequential workflow"""
self.tasks = [
task for task in self.tasks if task.description != task
]
try:
self.tasks = [
task for task in self.tasks if task.description != task
]
except Exception as error:
print(
colored(
f"Error removing task from workflow: {error}", "red"
),
)
def update_task(self, task: str, **updates) -> None:
"""
@ -359,22 +383,31 @@ class SequentialWorkflow:
def add_objective_to_workflow(self, task: str, **kwargs) -> None:
"""Adds an objective to the workflow."""
print(
colored(
"""
Adding Objective to Workflow...""",
"green",
attrs=["bold", "underline"],
try:
print(
colored(
"""
Adding Objective to Workflow...""",
"green",
attrs=["bold", "underline"],
)
)
)
task = Task(
description=task,
agent=kwargs["agent"],
args=list(kwargs["args"]),
kwargs=kwargs["kwargs"],
)
self.tasks.append(task)
task = Task(
description=task,
agent=kwargs["agent"],
args=list(kwargs["args"]),
kwargs=kwargs["kwargs"],
)
self.tasks.append(task)
except Exception as error:
print(
colored(
f"Error adding objective to workflow: {error}",
"red",
)
)
def load_workflow_state(
self, filepath: str = None, **kwargs
@ -396,22 +429,31 @@ class SequentialWorkflow:
>>> workflow.load_workflow_state("sequential_workflow_state.json")
"""
filepath = filepath or self.restore_state_filepath
with open(filepath, "r") as f:
state = json.load(f)
self.max_loops = state["max_loops"]
self.tasks = []
for task_state in state["tasks"]:
task = Task(
description=task_state["description"],
agent=task_state["agent"],
args=task_state["args"],
kwargs=task_state["kwargs"],
result=task_state["result"],
history=task_state["history"],
try:
filepath = filepath or self.restore_state_filepath
with open(filepath, "r") as f:
state = json.load(f)
self.max_loops = state["max_loops"]
self.tasks = []
for task_state in state["tasks"]:
task = Task(
description=task_state["description"],
agent=task_state["agent"],
args=task_state["args"],
kwargs=task_state["kwargs"],
result=task_state["result"],
history=task_state["history"],
)
self.tasks.append(task)
except Exception as error:
print(
colored(
f"Error loading workflow state: {error}",
"red",
)
self.tasks.append(task)
)
def run(self) -> None:
"""
@ -486,43 +528,56 @@ class SequentialWorkflow:
ValueError: If a Agent instance is used as a task and the 'task' argument is not provided.
"""
for _ in range(self.max_loops):
for task in self.tasks:
# Check if the current task can be executed
if task.result is None:
# Check if the agent is a Agent and a 'task' argument is needed
if isinstance(task.agent, Agent):
# Ensure that 'task' is provided in the kwargs
if "task" not in task.kwargs:
raise ValueError(
"The 'task' argument is required for"
" the Agent agent execution in"
f" '{task.description}'"
try:
for _ in range(self.max_loops):
for task in self.tasks:
# Check if the current task can be executed
if task.result is None:
# Check if the agent is a Agent and a 'task' argument is needed
if isinstance(task.agent, Agent):
# Ensure that 'task' is provided in the kwargs
if "task" not in task.kwargs:
raise ValueError(
"The 'task' argument is required for"
" the Agent agent execution in"
f" '{task.description}'"
)
# Separate the 'task' argument from other kwargs
flow_task_arg = task.kwargs.pop("task")
task.result = await task.agent.arun(
flow_task_arg, *task.args, **task.kwargs
)
# Separate the 'task' argument from other kwargs
flow_task_arg = task.kwargs.pop("task")
task.result = await task.agent.arun(
flow_task_arg, *task.args, **task.kwargs
)
else:
# If it's not a Agent instance, call the agent directly
task.result = await task.agent(
*task.args, **task.kwargs
)
# Pass the result as an argument to the next task if it exists
next_task_index = self.tasks.index(task) + 1
if next_task_index < len(self.tasks):
next_task = self.tasks[next_task_index]
if isinstance(next_task.agent, Agent):
# For Agent flows, 'task' should be a keyword argument
next_task.kwargs["task"] = task.result
else:
# For other callable flows, the result is added to args
next_task.args.insert(0, task.result)
# If it's not a Agent instance, call the agent directly
task.result = await task.agent(
*task.args, **task.kwargs
)
# Autosave the workflow state
if self.autosave:
self.save_workflow_state(
"sequential_workflow_state.json"
)
# Pass the result as an argument to the next task if it exists
next_task_index = self.tasks.index(task) + 1
if next_task_index < len(self.tasks):
next_task = self.tasks[next_task_index]
if isinstance(next_task.agent, Agent):
# For Agent flows, 'task' should be a keyword argument
next_task.kwargs["task"] = task.result
else:
# For other callable flows, the result is added to args
next_task.args.insert(0, task.result)
# Autosave the workflow state
if self.autosave:
self.save_workflow_state(
"sequential_workflow_state.json"
)
except Exception as e:
print(
colored(
(
"Error initializing the Sequential workflow:"
f" {e} try optimizing your inputs like the"
" agent class and task description"
),
"red",
attrs=["bold", "underline"],
)
)
Loading…
Cancel
Save