From 9ef3e68e276cb323bc14c4a393bb39947b3bbd32 Mon Sep 17 00:00:00 2001 From: Kye Date: Wed, 17 Jan 2024 13:08:09 -0500 Subject: [PATCH] [FEAT][Logging][All swarms.structs modules] --- pyproject.toml | 2 +- swarms/structs/graph_workflow.py | 99 ++++++++++++++++++++++++++++++ swarms/structs/model_parallizer.py | 18 +++++- swarms/structs/task.py | 12 ++-- swarms/utils/__init__.py | 2 + swarms/utils/try_except_wrapper.py | 29 +++++++-- 6 files changed, 149 insertions(+), 13 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index de519015..a9c61cb4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "3.6.4" +version = "3.6.5" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/structs/graph_workflow.py b/swarms/structs/graph_workflow.py index c463aef1..c4bcea7e 100644 --- a/swarms/structs/graph_workflow.py +++ b/swarms/structs/graph_workflow.py @@ -1,6 +1,9 @@ from swarms.structs.base import BaseStructure +import logging + + class GraphWorkflow(BaseStructure): """ Represents a graph-based workflow structure. @@ -31,27 +34,96 @@ class GraphWorkflow(BaseStructure): self.entry_point = None def add(self, node, node_value): + """ + Adds a node to the graph with the specified value. + + Args: + node (str): The name of the node. + node_value (str): The value of the node. + + Returns: + None + """ self.graph[node] = {"value": node_value, "edges": {}} + logging.info(f"Added node: {node}") def start(self, node_name): + """ + Sets the starting node for the workflow. + + Args: + node_name (str): The name of the starting node. + + Returns: + None + """ self._check_node_exists(node_name) def connect(self, from_node, to_node): + """ + Connects two nodes in the graph. + + Args: + from_node (str): The name of the source node. + to_node (str): The name of the target node. + + Returns: + None + """ self._check_node_exists(from_node, to_node) def set_entry_point(self, node_name): + """ + Sets the entry point node for the workflow. + + Args: + node_name (str): The name of the entry point node. + + Returns: + None + + Raises: + ValueError: If the specified node does not exist in the graph. + """ if node_name is self.graph: self.entry_point = node_name else: raise ValueError("Node does not exist in graph") def add_edge(self, from_node, to_node): + """ + Adds an edge between two nodes in the graph. + + Args: + from_node (str): The name of the source node. + to_node (str): The name of the target node. + + Returns: + None + + Raises: + ValueError: If either the source or target node does not exist in the graph. + """ if from_node in self.graph and to_node in self.graph: self.graph[from_node]["edges"][to_node] = "edge" else: raise ValueError("Node does not exist in graph") def add_conditional_edges(self, from_node, condition, edge_dict): + """ + Adds conditional edges from a node to multiple nodes based on a condition. + + Args: + from_node (str): The name of the source node. + condition: The condition for the conditional edges. + edge_dict (dict): A dictionary mapping condition values to target nodes. + + Returns: + None + + Raises: + ValueError: If the source node or any of the target nodes do not exist in the graph. + """ if from_node in self.graph: for condition_value, to_node in edge_dict.items(): if to_node in self.graph: @@ -66,16 +138,43 @@ class GraphWorkflow(BaseStructure): ) def run(self): + """ + Runs the workflow and returns the graph. + + Returns: + dict: The graph representing the nodes and edges. + + Raises: + ValueError: If the entry point is not set. + """ if self.entry_point is None: raise ValueError("Entry point not set") return self.graph def _check_node_exists(self, node_name): + """Checks if a node exists in the graph. + + Args: + node_name (_type_): _description_ + + Raises: + ValueError: _description_ + """ if node_name not in self.graph: raise ValueError( f"Node {node_name} does not exist in graph" ) def _check_nodes_exist(self, from_node, to_node): + """ + Checks if the given from_node and to_node exist in the graph. + + Args: + from_node: The starting node of the edge. + to_node: The ending node of the edge. + + Raises: + NodeNotFoundError: If either from_node or to_node does not exist in the graph. + """ self._check_node_exists(from_node) self._check_node_exists(to_node) diff --git a/swarms/structs/model_parallizer.py b/swarms/structs/model_parallizer.py index c6ac6150..8e9dfcdd 100644 --- a/swarms/structs/model_parallizer.py +++ b/swarms/structs/model_parallizer.py @@ -199,9 +199,23 @@ class ModelParallelizer: def add_llm(self, llm: Callable): """Add an llm to the god mode""" logger.info(f"[INFO][ModelParallelizer] Adding LLM {llm}") - self.llms.append(llm) + + try: + self.llms.append(llm) + except Exception as error: + logger.error( + f"[ERROR][ModelParallelizer] [ROOT CAUSE] [{error}]" + ) + raise error def remove_llm(self, llm: Callable): """Remove an llm from the god mode""" logger.info(f"[INFO][ModelParallelizer] Removing LLM {llm}") - self.llms.remove(llm) + + try: + self.llms.remove(llm) + except Exception as error: + logger.error( + f"[ERROR][ModelParallelizer] [ROOT CAUSE] [{error}]" + ) + raise error diff --git a/swarms/structs/task.py b/swarms/structs/task.py index 7eb9f335..699b7313 100644 --- a/swarms/structs/task.py +++ b/swarms/structs/task.py @@ -13,6 +13,7 @@ from typing import ( from swarms.structs.agent import Agent from swarms.utils.logger import logger + @dataclass class Task: """ @@ -69,7 +70,7 @@ class Task: priority: int = 0 dependencies: List["Task"] = field(default_factory=list) - def execute(self, *args, **kwargs): + def execute(self, task: str, img: str = None, *args, **kwargs): """ Execute the task by calling the agent or model with the arguments and keyword arguments. You can add images to the agent by passing the @@ -85,11 +86,13 @@ class Task: >>> task.result """ + logger.info(f"[INFO][Task] Executing task: {task}") + task = self.description or task try: if isinstance(self.agent, Agent): if self.condition is None or self.condition(): self.result = self.agent.run( - task=self.description, + task=task, *args, **kwargs, ) @@ -104,7 +107,7 @@ class Task: self.history.append(self.result) except Exception as error: - print(f"[ERROR][Task] {error}") + logger.error(f"[ERROR][Task] {error}") def run(self): self.execute() @@ -119,6 +122,7 @@ class Task: If the schedule time is not set or has already passed, the task is executed immediately. Otherwise, the task is scheduled to be executed at the specified schedule time. """ + logger.info("[INFO][Task] Handling scheduled task") try: if ( self.schedule_time is None @@ -193,7 +197,7 @@ class Task: Returns: bool: True if all the dependencies have been completed, False otherwise. """ - logger.info(f"[INFO][Task] Checking dependency completion") + logger.info("[INFO][Task] Checking dependency completion") try: for task in self.dependencies: if not task.is_completed(): diff --git a/swarms/utils/__init__.py b/swarms/utils/__init__.py index 77c99ccd..e265a1c8 100644 --- a/swarms/utils/__init__.py +++ b/swarms/utils/__init__.py @@ -18,6 +18,7 @@ from swarms.utils.data_to_text import ( txt_to_text, data_to_text, ) +from swarms.utils.try_except_wrapper import try_except_wrapper __all__ = [ @@ -37,4 +38,5 @@ __all__ = [ "json_to_text", "txt_to_text", "data_to_text", + "try_except_wrapper", ] diff --git a/swarms/utils/try_except_wrapper.py b/swarms/utils/try_except_wrapper.py index a12b4393..1f0b431d 100644 --- a/swarms/utils/try_except_wrapper.py +++ b/swarms/utils/try_except_wrapper.py @@ -1,4 +1,7 @@ -def try_except_wrapper(func): +from swarms.utils.logger import logger + + +def try_except_wrapper(func, verbose: bool = False): """ A decorator that wraps a function with a try-except block. It catches any exception that occurs during the execution of the function, @@ -10,6 +13,14 @@ def try_except_wrapper(func): Returns: function: The wrapped function. + + Examples: + >>> @try_except_wrapper(verbose=True) + ... def divide(a, b): + ... return a / b + >>> divide(1, 0) + An error occurred in function divide: division by zero + Exiting function: divide """ def wrapper(*args, **kwargs): @@ -17,11 +28,17 @@ def try_except_wrapper(func): result = func(*args, **kwargs) return result except Exception as error: - print( - f"An error occurred in function {func.__name__}:" - f" {error}" - ) - return None + if verbose: + logger.error( + f"An error occurred in function {func.__name__}:" + f" {error}" + ) + else: + print( + f"An error occurred in function {func.__name__}:" + f" {error}" + ) + return None finally: print(f"Exiting function: {func.__name__}")