You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
425 lines
16 KiB
425 lines
16 KiB
"""
|
|
TODO:
|
|
- Add a method to update the arguments of a task
|
|
- Add a method to get the results of each task
|
|
- Add a method to get the results of a specific task
|
|
- Add a method to get the results of the workflow
|
|
- Add a method to get the results of the workflow as a dataframe
|
|
|
|
|
|
- Add a method to run the workflow in parallel with a pool of workers and a queue and a dashboard
|
|
- Add a dashboard to visualize the workflow
|
|
- Add async support
|
|
- Add context manager support
|
|
- Add workflow history
|
|
"""
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Callable, Dict, List, Optional, Union
|
|
|
|
from termcolor import colored
|
|
|
|
from swarms.structs.flow import Flow
|
|
|
|
|
|
# Define a generic Task that can handle different types of callable objects
|
|
@dataclass
|
|
class Task:
|
|
"""
|
|
Task class for running a task in a sequential workflow.
|
|
|
|
|
|
Examples:
|
|
>>> from swarms.structs import Task, Flow
|
|
>>> from swarms.models import OpenAIChat
|
|
>>> flow = Flow(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False)
|
|
>>> task = Task(description="What's the weather in miami", flow=flow)
|
|
>>> task.execute()
|
|
>>> task.result
|
|
|
|
|
|
|
|
"""
|
|
|
|
description: str
|
|
flow: Union[Callable, Flow]
|
|
args: List[Any] = field(default_factory=list)
|
|
kwargs: Dict[str, Any] = field(default_factory=dict)
|
|
result: Any = None
|
|
history: List[Any] = field(default_factory=list)
|
|
|
|
def execute(self):
|
|
"""
|
|
Execute the task.
|
|
|
|
Raises:
|
|
ValueError: If a Flow instance is used as a task and the 'task' argument is not provided.
|
|
|
|
|
|
|
|
"""
|
|
if isinstance(self.flow, Flow):
|
|
# Add a prompt to notify the Flow of the sequential workflow
|
|
if "prompt" in self.kwargs:
|
|
self.kwargs["prompt"] += (
|
|
f"\n\nPrevious output: {self.result}" if self.result else ""
|
|
)
|
|
else:
|
|
self.kwargs["prompt"] = f"Main task: {self.description}" + (
|
|
f"\n\nPrevious output: {self.result}" if self.result else ""
|
|
)
|
|
self.result = self.flow.run(*self.args, **self.kwargs)
|
|
else:
|
|
self.result = self.flow(*self.args, **self.kwargs)
|
|
|
|
self.history.append(self.result)
|
|
|
|
|
|
# SequentialWorkflow class definition using dataclasses
|
|
@dataclass
|
|
class SequentialWorkflow:
|
|
"""
|
|
SequentialWorkflow class for running a sequence of tasks using N number of autonomous agents.
|
|
|
|
Args:
|
|
max_loops (int): The maximum number of times to run the workflow.
|
|
dashboard (bool): Whether to display the dashboard for the workflow.
|
|
|
|
|
|
Attributes:
|
|
tasks (List[Task]): The list of tasks to execute.
|
|
max_loops (int): The maximum number of times to run the workflow.
|
|
dashboard (bool): Whether to display the dashboard for the workflow.
|
|
|
|
|
|
Examples:
|
|
>>> from swarms.models import OpenAIChat
|
|
>>> from swarms.structs import SequentialWorkflow
|
|
>>> llm = OpenAIChat(openai_api_key="")
|
|
>>> workflow = SequentialWorkflow(max_loops=1)
|
|
>>> workflow.add("What's the weather in miami", llm)
|
|
>>> workflow.add("Create a report on these metrics", llm)
|
|
>>> workflow.run()
|
|
>>> workflow.tasks
|
|
|
|
"""
|
|
|
|
tasks: List[Task] = field(default_factory=list)
|
|
max_loops: int = 1
|
|
autosave: bool = False
|
|
saved_state_filepath: Optional[str] = "sequential_workflow_state.json"
|
|
restore_state_filepath: Optional[str] = None
|
|
dashboard: bool = False
|
|
|
|
def add(self, task: str, flow: Union[Callable, Flow], *args, **kwargs) -> None:
|
|
"""
|
|
Add a task to the workflow.
|
|
|
|
Args:
|
|
task (str): The task description or the initial input for the Flow.
|
|
flow (Union[Callable, Flow]): The model or flow to execute the task.
|
|
*args: Additional arguments to pass to the task execution.
|
|
**kwargs: Additional keyword arguments to pass to the task execution.
|
|
"""
|
|
# If the flow is a Flow instance, we include the task in kwargs for Flow.run()
|
|
if isinstance(flow, Flow):
|
|
kwargs["task"] = task # Set the task as a keyword argument for Flow
|
|
|
|
# Append the task to the tasks list
|
|
self.tasks.append(
|
|
Task(description=task, flow=flow, args=list(args), kwargs=kwargs)
|
|
)
|
|
|
|
def reset_workflow(self) -> None:
|
|
"""Resets the workflow by clearing the results of each task."""
|
|
for task in self.tasks:
|
|
task.result = None
|
|
|
|
def get_task_results(self) -> Dict[str, Any]:
|
|
"""
|
|
Returns the results of each task in the workflow.
|
|
|
|
Returns:
|
|
Dict[str, Any]: The results of each task in the workflow
|
|
"""
|
|
return {task.description: task.result for task in self.tasks}
|
|
|
|
def remove_task(self, task_description: str) -> None:
|
|
self.tasks = [
|
|
task for task in self.tasks if task.description != task_description
|
|
]
|
|
|
|
def update_task(self, task_description: str, **updates) -> None:
|
|
"""
|
|
Updates the arguments of a task in the workflow.
|
|
|
|
Args:
|
|
task_description (str): The description of the task to update.
|
|
**updates: The updates to apply to the task.
|
|
|
|
Raises:
|
|
ValueError: If the task is not found in the workflow.
|
|
|
|
Examples:
|
|
>>> from swarms.models import OpenAIChat
|
|
>>> from swarms.structs import SequentialWorkflow
|
|
>>> llm = OpenAIChat(openai_api_key="")
|
|
>>> workflow = SequentialWorkflow(max_loops=1)
|
|
>>> workflow.add("What's the weather in miami", llm)
|
|
>>> workflow.add("Create a report on these metrics", llm)
|
|
>>> workflow.update_task("What's the weather in miami", max_tokens=1000)
|
|
>>> workflow.tasks[0].kwargs
|
|
{'max_tokens': 1000}
|
|
|
|
"""
|
|
for task in self.tasks:
|
|
if task.description == task_description:
|
|
task.kwargs.update(updates)
|
|
break
|
|
else:
|
|
raise ValueError(f"Task {task_description} not found in workflow.")
|
|
|
|
def save_workflow_state(
|
|
self, filepath: Optional[str] = "sequential_workflow_state.json", **kwargs
|
|
) -> None:
|
|
"""
|
|
Saves the workflow state to a json file.
|
|
|
|
Args:
|
|
filepath (str): The path to save the workflow state to.
|
|
|
|
Examples:
|
|
>>> from swarms.models import OpenAIChat
|
|
>>> from swarms.structs import SequentialWorkflow
|
|
>>> llm = OpenAIChat(openai_api_key="")
|
|
>>> workflow = SequentialWorkflow(max_loops=1)
|
|
>>> workflow.add("What's the weather in miami", llm)
|
|
>>> workflow.add("Create a report on these metrics", llm)
|
|
>>> workflow.save_workflow_state("sequential_workflow_state.json")
|
|
"""
|
|
filepath = filepath or self.saved_state_filepath
|
|
|
|
with open(filepath, "w") as f:
|
|
# Saving the state as a json for simplicuty
|
|
state = {
|
|
"tasks": [
|
|
{
|
|
"description": task.description,
|
|
"args": task.args,
|
|
"kwargs": task.kwargs,
|
|
"result": task.result,
|
|
"history": task.history,
|
|
}
|
|
for task in self.tasks
|
|
],
|
|
"max_loops": self.max_loops,
|
|
}
|
|
json.dump(state, f, indent=4)
|
|
|
|
def workflow_bootup(self, **kwargs) -> None:
|
|
print(
|
|
colored(
|
|
"""
|
|
Sequential Workflow Initializing...""",
|
|
"green",
|
|
attrs=["bold", "underline"],
|
|
)
|
|
)
|
|
|
|
def workflow_dashboard(self, **kwargs) -> None:
|
|
"""
|
|
Displays a dashboard for the workflow.
|
|
|
|
Args:
|
|
**kwargs: Additional keyword arguments to pass to the dashboard.
|
|
|
|
Examples:
|
|
>>> from swarms.models import OpenAIChat
|
|
>>> from swarms.structs import SequentialWorkflow
|
|
>>> llm = OpenAIChat(openai_api_key="")
|
|
>>> workflow = SequentialWorkflow(max_loops=1)
|
|
>>> workflow.add("What's the weather in miami", llm)
|
|
>>> workflow.add("Create a report on these metrics", llm)
|
|
>>> workflow.workflow_dashboard()
|
|
|
|
"""
|
|
print(
|
|
colored(
|
|
f"""
|
|
Sequential Workflow Dashboard
|
|
--------------------------------
|
|
Tasks: {len(self.tasks)}
|
|
Max Loops: {self.max_loops}
|
|
Autosave: {self.autosave}
|
|
Autosave Filepath: {self.saved_state_filepath}
|
|
Restore Filepath: {self.restore_state_filepath}
|
|
--------------------------------
|
|
Metadata:
|
|
kwargs: {kwargs}
|
|
|
|
|
|
|
|
|
|
""",
|
|
"cyan",
|
|
attrs=["bold", "underline"],
|
|
)
|
|
)
|
|
|
|
def workflow_shutdown(self, **kwargs) -> None:
|
|
print(
|
|
colored(
|
|
"""
|
|
Sequential Workflow Shutdown...""",
|
|
"red",
|
|
attrs=["bold", "underline"],
|
|
)
|
|
)
|
|
|
|
def add_objective_to_workflow(self, task: str, **kwargs) -> None:
|
|
print(
|
|
colored(
|
|
"""
|
|
Adding Objective to Workflow...""",
|
|
"green",
|
|
attrs=["bold", "underline"],
|
|
)
|
|
)
|
|
|
|
task = Task(description=task, flow=kwargs["flow"], args=list(kwargs["args"]), kwargs=kwargs["kwargs"])
|
|
self.tasks.append(task)
|
|
|
|
|
|
|
|
|
|
def load_workflow_state(self, filepath: str = None, **kwargs) -> None:
|
|
"""
|
|
Loads the workflow state from a json file and restores the workflow state.
|
|
|
|
Args:
|
|
filepath (str): The path to load the workflow state from.
|
|
|
|
Examples:
|
|
>>> from swarms.models import OpenAIChat
|
|
>>> from swarms.structs import SequentialWorkflow
|
|
>>> llm = OpenAIChat(openai_api_key="")
|
|
>>> workflow = SequentialWorkflow(max_loops=1)
|
|
>>> workflow.add("What's the weather in miami", llm)
|
|
>>> workflow.add("Create a report on these metrics", llm)
|
|
>>> workflow.save_workflow_state("sequential_workflow_state.json")
|
|
>>> workflow.load_workflow_state("sequential_workflow_state.json")
|
|
|
|
"""
|
|
filepath = filepath or self.restore_state_filepath
|
|
|
|
with open(filepath, "r") as f:
|
|
state = json.load(f)
|
|
self.max_loops = state["max_loops"]
|
|
self.tasks = []
|
|
for task_state in state["tasks"]:
|
|
task = Task(
|
|
description=task_state["description"],
|
|
flow=task_state["flow"],
|
|
args=task_state["args"],
|
|
kwargs=task_state["kwargs"],
|
|
result=task_state["result"],
|
|
history=task_state["history"],
|
|
)
|
|
self.tasks.append(task)
|
|
|
|
def run(self) -> None:
|
|
"""
|
|
Run the workflow.
|
|
|
|
Raises:
|
|
ValueError: If a Flow instance is used as a task and the 'task' argument is not provided.
|
|
|
|
"""
|
|
try:
|
|
self.workflow_bootup()
|
|
for _ in range(self.max_loops):
|
|
for task in self.tasks:
|
|
# Check if the current task can be executed
|
|
if task.result is None:
|
|
# Check if the flow is a Flow and a 'task' argument is needed
|
|
if isinstance(task.flow, Flow):
|
|
# Ensure that 'task' is provided in the kwargs
|
|
if "task" not in task.kwargs:
|
|
raise ValueError(
|
|
f"The 'task' argument is required for the Flow flow execution in '{task.description}'"
|
|
)
|
|
# Separate the 'task' argument from other kwargs
|
|
flow_task_arg = task.kwargs.pop("task")
|
|
task.result = task.flow.run(
|
|
flow_task_arg, *task.args, **task.kwargs
|
|
)
|
|
else:
|
|
# If it's not a Flow instance, call the flow directly
|
|
task.result = task.flow(*task.args, **task.kwargs)
|
|
|
|
# Pass the result as an argument to the next task if it exists
|
|
next_task_index = self.tasks.index(task) + 1
|
|
if next_task_index < len(self.tasks):
|
|
next_task = self.tasks[next_task_index]
|
|
if isinstance(next_task.flow, Flow):
|
|
# For Flow flows, 'task' should be a keyword argument
|
|
next_task.kwargs["task"] = task.result
|
|
else:
|
|
# For other callable flows, the result is added to args
|
|
next_task.args.insert(0, task.result)
|
|
|
|
# Autosave the workflow state
|
|
if self.autosave:
|
|
self.save_workflow_state("sequential_workflow_state.json")
|
|
except Exception as e:
|
|
print(
|
|
colored(
|
|
f"Error initializing the Sequential workflow: {e} try optimizing your inputs like the flow class and task description",
|
|
"red",
|
|
attrs=["bold", "underline"],
|
|
)
|
|
)
|
|
|
|
async def arun(self) -> None:
|
|
"""
|
|
Asynchronously run the workflow.
|
|
|
|
Raises:
|
|
ValueError: If a Flow instance is used as a task and the 'task' argument is not provided.
|
|
|
|
"""
|
|
for _ in range(self.max_loops):
|
|
for task in self.tasks:
|
|
# Check if the current task can be executed
|
|
if task.result is None:
|
|
# Check if the flow is a Flow and a 'task' argument is needed
|
|
if isinstance(task.flow, Flow):
|
|
# Ensure that 'task' is provided in the kwargs
|
|
if "task" not in task.kwargs:
|
|
raise ValueError(
|
|
f"The 'task' argument is required for the Flow flow execution in '{task.description}'"
|
|
)
|
|
# Separate the 'task' argument from other kwargs
|
|
flow_task_arg = task.kwargs.pop("task")
|
|
task.result = await task.flow.arun(
|
|
flow_task_arg, *task.args, **task.kwargs
|
|
)
|
|
else:
|
|
# If it's not a Flow instance, call the flow directly
|
|
task.result = await task.flow(*task.args, **task.kwargs)
|
|
|
|
# Pass the result as an argument to the next task if it exists
|
|
next_task_index = self.tasks.index(task) + 1
|
|
if next_task_index < len(self.tasks):
|
|
next_task = self.tasks[next_task_index]
|
|
if isinstance(next_task.flow, Flow):
|
|
# For Flow flows, 'task' should be a keyword argument
|
|
next_task.kwargs["task"] = task.result
|
|
else:
|
|
# For other callable flows, the result is added to args
|
|
next_task.args.insert(0, task.result)
|
|
|
|
# Autosave the workflow state
|
|
if self.autosave:
|
|
self.save_workflow_state("sequential_workflow_state.json")
|