[FEAT][Logging][All swarms.structs modules]

pull/362/head
Kye 12 months ago
parent 01beeda39c
commit 9ef3e68e27

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "3.6.4" version = "3.6.5"
description = "Swarms - Pytorch" description = "Swarms - Pytorch"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -1,6 +1,9 @@
from swarms.structs.base import BaseStructure from swarms.structs.base import BaseStructure
import logging
class GraphWorkflow(BaseStructure): class GraphWorkflow(BaseStructure):
""" """
Represents a graph-based workflow structure. Represents a graph-based workflow structure.
@ -31,27 +34,96 @@ class GraphWorkflow(BaseStructure):
self.entry_point = None self.entry_point = None
def add(self, node, node_value): def add(self, node, node_value):
"""
Adds a node to the graph with the specified value.
Args:
node (str): The name of the node.
node_value (str): The value of the node.
Returns:
None
"""
self.graph[node] = {"value": node_value, "edges": {}} self.graph[node] = {"value": node_value, "edges": {}}
logging.info(f"Added node: {node}")
def start(self, node_name): def start(self, node_name):
"""
Sets the starting node for the workflow.
Args:
node_name (str): The name of the starting node.
Returns:
None
"""
self._check_node_exists(node_name) self._check_node_exists(node_name)
def connect(self, from_node, to_node): def connect(self, from_node, to_node):
"""
Connects two nodes in the graph.
Args:
from_node (str): The name of the source node.
to_node (str): The name of the target node.
Returns:
None
"""
self._check_node_exists(from_node, to_node) self._check_node_exists(from_node, to_node)
def set_entry_point(self, node_name): def set_entry_point(self, node_name):
"""
Sets the entry point node for the workflow.
Args:
node_name (str): The name of the entry point node.
Returns:
None
Raises:
ValueError: If the specified node does not exist in the graph.
"""
if node_name is self.graph: if node_name is self.graph:
self.entry_point = node_name self.entry_point = node_name
else: else:
raise ValueError("Node does not exist in graph") raise ValueError("Node does not exist in graph")
def add_edge(self, from_node, to_node): def add_edge(self, from_node, to_node):
"""
Adds an edge between two nodes in the graph.
Args:
from_node (str): The name of the source node.
to_node (str): The name of the target node.
Returns:
None
Raises:
ValueError: If either the source or target node does not exist in the graph.
"""
if from_node in self.graph and to_node in self.graph: if from_node in self.graph and to_node in self.graph:
self.graph[from_node]["edges"][to_node] = "edge" self.graph[from_node]["edges"][to_node] = "edge"
else: else:
raise ValueError("Node does not exist in graph") raise ValueError("Node does not exist in graph")
def add_conditional_edges(self, from_node, condition, edge_dict): def add_conditional_edges(self, from_node, condition, edge_dict):
"""
Adds conditional edges from a node to multiple nodes based on a condition.
Args:
from_node (str): The name of the source node.
condition: The condition for the conditional edges.
edge_dict (dict): A dictionary mapping condition values to target nodes.
Returns:
None
Raises:
ValueError: If the source node or any of the target nodes do not exist in the graph.
"""
if from_node in self.graph: if from_node in self.graph:
for condition_value, to_node in edge_dict.items(): for condition_value, to_node in edge_dict.items():
if to_node in self.graph: if to_node in self.graph:
@ -66,16 +138,43 @@ class GraphWorkflow(BaseStructure):
) )
def run(self): def run(self):
"""
Runs the workflow and returns the graph.
Returns:
dict: The graph representing the nodes and edges.
Raises:
ValueError: If the entry point is not set.
"""
if self.entry_point is None: if self.entry_point is None:
raise ValueError("Entry point not set") raise ValueError("Entry point not set")
return self.graph return self.graph
def _check_node_exists(self, node_name): def _check_node_exists(self, node_name):
"""Checks if a node exists in the graph.
Args:
node_name (_type_): _description_
Raises:
ValueError: _description_
"""
if node_name not in self.graph: if node_name not in self.graph:
raise ValueError( raise ValueError(
f"Node {node_name} does not exist in graph" f"Node {node_name} does not exist in graph"
) )
def _check_nodes_exist(self, from_node, to_node): def _check_nodes_exist(self, from_node, to_node):
"""
Checks if the given from_node and to_node exist in the graph.
Args:
from_node: The starting node of the edge.
to_node: The ending node of the edge.
Raises:
NodeNotFoundError: If either from_node or to_node does not exist in the graph.
"""
self._check_node_exists(from_node) self._check_node_exists(from_node)
self._check_node_exists(to_node) self._check_node_exists(to_node)

@ -199,9 +199,23 @@ class ModelParallelizer:
def add_llm(self, llm: Callable): def add_llm(self, llm: Callable):
"""Add an llm to the god mode""" """Add an llm to the god mode"""
logger.info(f"[INFO][ModelParallelizer] Adding LLM {llm}") logger.info(f"[INFO][ModelParallelizer] Adding LLM {llm}")
self.llms.append(llm)
try:
self.llms.append(llm)
except Exception as error:
logger.error(
f"[ERROR][ModelParallelizer] [ROOT CAUSE] [{error}]"
)
raise error
def remove_llm(self, llm: Callable): def remove_llm(self, llm: Callable):
"""Remove an llm from the god mode""" """Remove an llm from the god mode"""
logger.info(f"[INFO][ModelParallelizer] Removing LLM {llm}") logger.info(f"[INFO][ModelParallelizer] Removing LLM {llm}")
self.llms.remove(llm)
try:
self.llms.remove(llm)
except Exception as error:
logger.error(
f"[ERROR][ModelParallelizer] [ROOT CAUSE] [{error}]"
)
raise error

@ -13,6 +13,7 @@ from typing import (
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.utils.logger import logger from swarms.utils.logger import logger
@dataclass @dataclass
class Task: class Task:
""" """
@ -69,7 +70,7 @@ class Task:
priority: int = 0 priority: int = 0
dependencies: List["Task"] = field(default_factory=list) dependencies: List["Task"] = field(default_factory=list)
def execute(self, *args, **kwargs): def execute(self, task: str, img: str = None, *args, **kwargs):
""" """
Execute the task by calling the agent or model with the arguments and Execute the task by calling the agent or model with the arguments and
keyword arguments. You can add images to the agent by passing the keyword arguments. You can add images to the agent by passing the
@ -85,11 +86,13 @@ class Task:
>>> task.result >>> task.result
""" """
logger.info(f"[INFO][Task] Executing task: {task}")
task = self.description or task
try: try:
if isinstance(self.agent, Agent): if isinstance(self.agent, Agent):
if self.condition is None or self.condition(): if self.condition is None or self.condition():
self.result = self.agent.run( self.result = self.agent.run(
task=self.description, task=task,
*args, *args,
**kwargs, **kwargs,
) )
@ -104,7 +107,7 @@ class Task:
self.history.append(self.result) self.history.append(self.result)
except Exception as error: except Exception as error:
print(f"[ERROR][Task] {error}") logger.error(f"[ERROR][Task] {error}")
def run(self): def run(self):
self.execute() self.execute()
@ -119,6 +122,7 @@ class Task:
If the schedule time is not set or has already passed, the task is executed immediately. If the schedule time is not set or has already passed, the task is executed immediately.
Otherwise, the task is scheduled to be executed at the specified schedule time. Otherwise, the task is scheduled to be executed at the specified schedule time.
""" """
logger.info("[INFO][Task] Handling scheduled task")
try: try:
if ( if (
self.schedule_time is None self.schedule_time is None
@ -193,7 +197,7 @@ class Task:
Returns: Returns:
bool: True if all the dependencies have been completed, False otherwise. bool: True if all the dependencies have been completed, False otherwise.
""" """
logger.info(f"[INFO][Task] Checking dependency completion") logger.info("[INFO][Task] Checking dependency completion")
try: try:
for task in self.dependencies: for task in self.dependencies:
if not task.is_completed(): if not task.is_completed():

@ -18,6 +18,7 @@ from swarms.utils.data_to_text import (
txt_to_text, txt_to_text,
data_to_text, data_to_text,
) )
from swarms.utils.try_except_wrapper import try_except_wrapper
__all__ = [ __all__ = [
@ -37,4 +38,5 @@ __all__ = [
"json_to_text", "json_to_text",
"txt_to_text", "txt_to_text",
"data_to_text", "data_to_text",
"try_except_wrapper",
] ]

@ -1,4 +1,7 @@
def try_except_wrapper(func): from swarms.utils.logger import logger
def try_except_wrapper(func, verbose: bool = False):
""" """
A decorator that wraps a function with a try-except block. A decorator that wraps a function with a try-except block.
It catches any exception that occurs during the execution of the function, It catches any exception that occurs during the execution of the function,
@ -10,6 +13,14 @@ def try_except_wrapper(func):
Returns: Returns:
function: The wrapped function. function: The wrapped function.
Examples:
>>> @try_except_wrapper(verbose=True)
... def divide(a, b):
... return a / b
>>> divide(1, 0)
An error occurred in function divide: division by zero
Exiting function: divide
""" """
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
@ -17,11 +28,17 @@ def try_except_wrapper(func):
result = func(*args, **kwargs) result = func(*args, **kwargs)
return result return result
except Exception as error: except Exception as error:
print( if verbose:
f"An error occurred in function {func.__name__}:" logger.error(
f" {error}" f"An error occurred in function {func.__name__}:"
) f" {error}"
return None )
else:
print(
f"An error occurred in function {func.__name__}:"
f" {error}"
)
return None
finally: finally:
print(f"Exiting function: {func.__name__}") print(f"Exiting function: {func.__name__}")

Loading…
Cancel
Save