From b65cd5b1fbb02436ed87d968207553fb6d00b2ff Mon Sep 17 00:00:00 2001 From: Kye Date: Thu, 11 Jan 2024 22:25:44 -0500 Subject: [PATCH] [3.5.0] --- ...rent_workflow.py => concurrent_workflow.py | 7 ++-- pyproject.toml | 2 +- swarms/structs/concurrent_workflow.py | 39 +++++++------------ 3 files changed, 19 insertions(+), 29 deletions(-) rename playground/structs/concurrent_workflow.py => concurrent_workflow.py (85%) diff --git a/playground/structs/concurrent_workflow.py b/concurrent_workflow.py similarity index 85% rename from playground/structs/concurrent_workflow.py rename to concurrent_workflow.py index 8033e10a..ef8b19bc 100644 --- a/playground/structs/concurrent_workflow.py +++ b/concurrent_workflow.py @@ -8,12 +8,13 @@ load_dotenv() # Load environment variables llm = OpenAIChat(openai_api_key=os.getenv("OPENAI_API_KEY")) agent = Agent( + system_prompt=None, llm=llm, max_loops=1, ) # Create a workflow -workflow = ConcurrentWorkflow(max_workers=5) +workflow = ConcurrentWorkflow(max_workers=3) # Create tasks task1 = Task(agent=agent, description="What's the weather in miami") @@ -23,9 +24,7 @@ task2 = Task( task3 = Task(agent=agent, description="What's the weather in london") # Add tasks to the workflow -workflow.add(task1) -workflow.add(task2) -workflow.add(task3) +workflow.add(tasks=[task1, task2, task3]) # Run the workflow and print each task result workflow.run() diff --git a/pyproject.toml b/pyproject.toml index d7b16594..4b8056d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "3.4.8" +version = "3.5.0" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index c9f588c1..620f7db2 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -12,15 +12,12 @@ class ConcurrentWorkflow(BaseStructure): ConcurrentWorkflow class for running a set of tasks concurrently using N number of autonomous agents. Args: - max_workers (int): The maximum number of workers to use for concurrent execution. - autosave (bool): Whether to autosave the workflow state. - saved_state_filepath (Optional[str]): The file path to save the workflow state. - - Attributes: - tasks (List[Task]): The list of tasks to execute. - max_workers (int): The maximum number of workers to use for concurrent execution. - autosave (bool): Whether to autosave the workflow state. - saved_state_filepath (Optional[str]): The file path to save the workflow state. + max_workers (int): The maximum number of workers to use for the ThreadPoolExecutor. + autosave (bool): Whether to save the state of the workflow to a file. Default is False. + saved_state_filepath (str): The filepath to save the state of the workflow to. Default is "runs/concurrent_workflow.json". + print_results (bool): Whether to print the results of each task. Default is False. + return_results (bool): Whether to return the results of each task. Default is False. + use_processes (bool): Whether to use processes instead of threads. Default is False. Examples: >>> from swarms.models import OpenAIChat @@ -33,7 +30,7 @@ class ConcurrentWorkflow(BaseStructure): >>> workflow.tasks """ - tasks: List[Dict] = field(default_factory=list) + task_pool: List[Dict] = field(default_factory=list) max_workers: int = 5 autosave: bool = False saved_state_filepath: Optional[str] = ( @@ -43,7 +40,7 @@ class ConcurrentWorkflow(BaseStructure): return_results: bool = False use_processes: bool = False - def add(self, task: Task): + def add(self, task: Task = None, tasks: List[Task] = None): """Adds a task to the workflow. Args: @@ -51,7 +48,12 @@ class ConcurrentWorkflow(BaseStructure): tasks (List[Task]): _description_ """ try: - self.tasks.append(task) + if tasks: + for task in tasks: + self.task_pool.append(task) + else: + if task: + self.task_pool.append(task) except Exception as error: print(f"[ERROR][ConcurrentWorkflow] {error}") raise error @@ -72,7 +74,7 @@ class ConcurrentWorkflow(BaseStructure): ) as executor: futures = { executor.submit(task.execute): task - for task in self.tasks + for task in self.task_pool } results = [] @@ -88,14 +90,3 @@ class ConcurrentWorkflow(BaseStructure): print(f"Task {task} generated an exception: {e}") return results if self.return_results else None - - def _execute_task(self, task: Task): - """Executes a task. - - Args: - task (Task): _description_ - - Returns: - _type_: _description_ - """ - return task.execute()