From ae6d22d5e42398109cfa34f399d58ab72e85a8bc Mon Sep 17 00:00:00 2001 From: Kye Date: Fri, 15 Sep 2023 19:08:41 -0400 Subject: [PATCH] clean up --- pyproject.toml | 2 +- swarms/__init__.py | 5 +- swarms/boss/boss_node.py | 8 +- swarms/models/__init__.py | 2 +- swarms/structs/task.py | 2 +- swarms/structs/workflow.py | 158 +++---------------------------------- 6 files changed, 22 insertions(+), 155 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 02ae42e5..a1ad63a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "1.5.7" +version = "1.6.1" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/__init__.py b/swarms/__init__.py index b27c6348..c318c2e1 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -5,7 +5,7 @@ # from swarms.workers.worker_node import WorkerNode #boss -from swarms.boss.boss_node import BossNode +from swarms.boss.boss_node import Boss #models from swarms.models.anthropic import Anthropic @@ -17,3 +17,6 @@ from swarms.workers.worker import Worker #from swarms.models.openai import OpenAIChat + +#workflows +from swarms.structs.workflow import Workflow diff --git a/swarms/boss/boss_node.py b/swarms/boss/boss_node.py index ced0ebf2..4e3f9c95 100644 --- a/swarms/boss/boss_node.py +++ b/swarms/boss/boss_node.py @@ -19,7 +19,7 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %( class Boss: """ - The BossNode class is responsible for creating and executing tasks using the BabyAGI model. + The Bose class is responsible for creating and executing tasks using the BabyAGI model. It takes a language model (llm), a vectorstore for memory, an agent_executor for task execution, and a maximum number of iterations for the BabyAGI model. # Setup @@ -29,15 +29,15 @@ class Boss: # Objective for the Boss objective = "Analyze website user behavior patterns over the past month." - # Create a BossNode instance - boss = BossNode( + # Create a Bose instance + boss = Bose( objective=objective, boss_system_prompt="You are the main controller of a data analysis swarm...", api_key=api_key, worker_node=WorkerNode ) - # Run the BossNode to process the objective + # Run the Bose to process the objective boss.run() """ def __init__( diff --git a/swarms/models/__init__.py b/swarms/models/__init__.py index 8cf0a623..6ac734f4 100644 --- a/swarms/models/__init__.py +++ b/swarms/models/__init__.py @@ -1,5 +1,5 @@ from swarms.models.anthropic import Anthropic -from swarms.models.huggingface import HuggingFaceLLM +from swarms.models.huggingface import HFLLM # from swarms.models.palm import GooglePalm from swarms.models.petals import Petals #from swarms.models.openai import OpenAIChat \ No newline at end of file diff --git a/swarms/structs/task.py b/swarms/structs/task.py index ee42e122..3da9bd31 100644 --- a/swarms/structs/task.py +++ b/swarms/structs/task.py @@ -7,7 +7,7 @@ from abc import ABC, abstractmethod from enum import Enum from typing import Any, Optional -from artifacts.main import Artifact +from swarms.artifacts.main import Artifact from pydantic import BaseModel, Field, StrictStr, conlist from swarms.artifacts.error_artifact import ErrorArtifact diff --git a/swarms/structs/workflow.py b/swarms/structs/workflow.py index 14f30f72..8a8fde1a 100644 --- a/swarms/structs/workflow.py +++ b/swarms/structs/workflow.py @@ -1,152 +1,10 @@ -# from __future__ import annotations - -# import concurrent.futures as futures -# import logging -# import uuid -# from abc import ABC, abstractmethod -# from graphlib import TopologicalSorter -# from logging import Logger -# from typing import Optional, Union - -# from rich.logging import RichHandler - -# # from swarms.artifacts.error_artifact import ErrorArtifact -# from swarms.artifacts.main import Artifact as ErrorArtifact -# from swarms.structs.task import BaseTask - - -# #@shapeless -# class Workflow(ABC): -# def __init__( -# self, -# id: str = uuid.uuid4().hex, -# model = None, -# custom_logger: Optional[Logger] = None, -# logger_level: int = logging.INFO, -# futures_executor: futures.Executor = futures.ThreadPoolExecutor() -# ): -# self.id = id -# self.model = model -# self.custom_logger = custom_logger -# self.logger_level = logger_level - -# self.futures_executor = futures_executor -# self._execution_args = () -# self._logger = None - -# [task.preprocess(self) for task in self.tasks] - -# self.model.structure = self - -# @property -# def execution_args(self) -> tuple: -# return self._execution_args - -# @property -# def logger(self) -> Logger: -# if self.custom_logger: -# return self.custom_logger -# else: -# if self._logger is None: -# self._logger = logging.getLogger(self.LOGGER_NAME) - -# self._logger.propagate = False -# self._logger.level = self.logger_level - -# self._logger.handlers = [ -# RichHandler( -# show_time=True, -# show_path=False -# ) -# ] -# return self._logger - -# def is_finished(self) -> bool: -# return all(s.is_finished() for s in self.tasks) - -# def is_executing(self) -> bool: -# return any(s for s in self.tasks if s.is_executing()) - -# def find_task(self, task_id: str) -> Optional[BaseTask]: -# return next((task for task in self.tasks if task.id == task_id), None) - -# def add_tasks(self, *tasks: BaseTask) -> list[BaseTask]: -# return [self.add_task(s) for s in tasks] - -# def context(self, task: BaseTask) -> dict[str, any]: -# return { -# "args": self.execution_args, -# "structure": self, -# } - -# @abstractmethod -# def add(self, task: BaseTask) -> BaseTask: -# task.preprocess(self) -# self.tasks.append(task) -# return task - -# @abstractmethod -# def run(self, *args) -> Union[BaseTask, list[BaseTask]]: -# self._execution_args = args -# ordered_tasks = self.order_tasks() -# exit_loop = False - -# while not self.is_finished() and not exit_loop: -# futures_list = {} - -# for task in ordered_tasks: -# if task.can_execute(): -# future = self.futures_executor.submit(task.execute) -# futures_list[future] = task - -# # Wait for all tasks to complete -# for future in futures.as_completed(futures_list): -# if isinstance(future.result(), ErrorArtifact): -# exit_loop = True -# break - -# self._execution_args = () - -# return self.output_tasks() - -# def context(self, task: BaseTask) -> dict[str, any]: -# context = super().context(task) - -# context.update( -# { -# "parent_outputs": {parent.id: parent.output.to_text() if parent.output else "" for parent in task.parents}, -# "parents": {parent.id: parent for parent in task.parents}, -# "children": {child.id: child for child in task.children} -# } -# ) - -# return context - -# def output_tasks(self) -> list[BaseTask]: -# return [task for task in self.tasks if not task.children] - -# def to_graph(self) -> dict[str, set[str]]: -# graph: dict[str, set[str]] = {} - -# for key_task in self.tasks: -# graph[key_task.id] = set() - -# for value_task in self.tasks: -# if key_task.id in value_task.child_ids: -# graph[key_task.id].add(value_task.id) - -# return graph - -# def order_tasks(self) -> list[BaseTask]: -# return [self.find_task(task_id) for task_id in TopologicalSorter(self.to_graph()).static_order()] - - from __future__ import annotations -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from swarms.artifacts.error_artifacts import ErrorArtifact from swarms.structs.task import BaseTask +import concurrent.futures class StringTask(BaseTask): def __init__( @@ -186,10 +44,13 @@ class Workflow: """ def __init__( self, - llm + llm, + parallel: bool = False ): self.llm = llm self.tasks: List[BaseTask] = [] + self.parallel = parallel + def add( self, @@ -215,7 +76,11 @@ class Workflow: [task.reset() for task in self.tasks] - self.__run_from_task(self.first_task()) + if self.parallel: + with concurrent.futures.ThreadPoolExecutor() as executor: + list(executor.map(self.__run_from_task, [self.first_task])) + else: + self.__run_from_task(self.first_task()) self._execution_args = () @@ -245,4 +110,3 @@ class Workflow: else: self.__run_from_task(next(iter(task.children), None)) -