[FEAT][SequentialWorkflow Concurrency] [FEAT][SequentialWorkflow Error handling]

pull/245/head
Kye 1 year ago
parent 085d19a8a7
commit 61a06d0edd

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "2.5.6"
version = "2.5.7"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -1,22 +1,8 @@
"""
TODO:
- Add a method to update the arguments of a task
- Add a method to get the results of each task
- Add a method to get the results of a specific task
- Add a method to get the results of the workflow
- Add a method to get the results of the workflow as a dataframe
- Add a method to run the workflow in parallel with a pool of workers and a queue and a dashboard
- Add a dashboard to visualize the workflow
- Add async support
- Add context manager support
- Add workflow history
"""
import concurrent.futures
import json
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Union
import logging
from termcolor import colored
from swarms.structs.agent import Agent
@ -118,11 +104,11 @@ class SequentialWorkflow:
"""
name: str = None
description: str = None
tasks: List[Task] = field(default_factory=list)
max_loops: int = 1
autosave: bool = False
name: str = (None,)
description: str = (None,)
saved_state_filepath: Optional[str] = (
"sequential_workflow_state.json"
)
@ -146,6 +132,7 @@ class SequentialWorkflow:
*args: Additional arguments to pass to the task execution.
**kwargs: Additional keyword arguments to pass to the task execution.
"""
try:
# If the agent is a Agent instance, we include the task in kwargs for Agent.run()
if isinstance(agent, Agent):
kwargs["task"] = (
@ -161,6 +148,12 @@ class SequentialWorkflow:
kwargs=kwargs,
)
)
except Exception as error:
print(
colored(
f"Error adding task to workflow: {error}", "red"
),
)
def reset_workflow(self) -> None:
"""Resets the workflow by clearing the results of each task."""
@ -205,12 +198,57 @@ class SequentialWorkflow:
{'max_tokens': 1000}
"""
try:
for task in self.tasks:
if task.description == task:
task.kwargs.update(updates)
break
else:
raise ValueError(f"Task {task} not found in workflow.")
raise ValueError(
f"Task {task} not found in workflow."
)
except Exception as error:
print(
colored(
f"Error updating task in workflow: {error}", "red"
),
)
def concurrent_run(self):
"""
Concurrently run the workflow using a pool of workers.
Examples:
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import SequentialWorkflow
>>> llm = OpenAIChat(openai_api_key="")
>>> workflow = SequentialWorkflow(max_loops=1)
"""
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures_to_task = {
executor.submit(task.run): task
for task in self.tasks
}
results = []
for future in concurrent.futures.as_completed(
futures_to_task
):
task = futures_to_task[future]
try:
result = future.result()
except Exception as error:
print(f"Error running workflow: {error}")
else:
results.append(result)
print(
f"Task {task} completed successfully with"
f" result: {result}"
)
except Exception as error:
print(colored(f"Error running workflow: {error}", "red"))
def save_workflow_state(
self,
@ -232,6 +270,7 @@ class SequentialWorkflow:
>>> workflow.add("Create a report on these metrics", llm)
>>> workflow.save_workflow_state("sequential_workflow_state.json")
"""
try:
filepath = filepath or self.saved_state_filepath
with open(filepath, "w") as f:
@ -250,8 +289,16 @@ class SequentialWorkflow:
"max_loops": self.max_loops,
}
json.dump(state, f, indent=4)
except Exception as error:
print(
colored(
f"Error saving workflow state: {error}",
"red",
)
)
def workflow_bootup(self, **kwargs) -> None:
"""Boots up the workflow."""
print(
colored(
"""

Loading…
Cancel
Save