From f4cf551c2395531f426cc745ec85d446e7913c27 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sun, 17 Nov 2024 09:51:11 -0800 Subject: [PATCH] [CLEANUP UN-USED FILES SUCH AS RUN_CPU decorator + other files] --- swarms/telemetry/bootup.py | 28 ++++--- swarms/telemetry/check_update.py | 51 +++++++++--- swarms/utils/profile_func_2.py | 0 swarms/utils/run_on_cpu.py | 128 ------------------------------- 4 files changed, 57 insertions(+), 150 deletions(-) delete mode 100644 swarms/utils/profile_func_2.py delete mode 100644 swarms/utils/run_on_cpu.py diff --git a/swarms/telemetry/bootup.py b/swarms/telemetry/bootup.py index 24d7a7c4..41cae773 100644 --- a/swarms/telemetry/bootup.py +++ b/swarms/telemetry/bootup.py @@ -9,18 +9,22 @@ from swarms.utils.disable_logging import disable_logging def bootup(): """Bootup swarms""" - logging.disable(logging.CRITICAL) - os.environ["WANDB_SILENT"] = "true" + try: + logging.disable(logging.CRITICAL) + os.environ["WANDB_SILENT"] = "true" - # Auto set workspace directory - workspace_dir = os.path.join(os.getcwd(), "agent_workspace") - if not os.path.exists(workspace_dir): - os.makedirs(workspace_dir) - os.environ["WORKSPACE_DIR"] = workspace_dir + # Auto set workspace directory + workspace_dir = os.path.join(os.getcwd(), "agent_workspace") + if not os.path.exists(workspace_dir): + os.makedirs(workspace_dir, exist_ok=True) + os.environ["WORKSPACE_DIR"] = workspace_dir - warnings.filterwarnings("ignore", category=DeprecationWarning) + warnings.filterwarnings("ignore", category=DeprecationWarning) - # Use ThreadPoolExecutor to run disable_logging and auto_update concurrently - with ThreadPoolExecutor(max_workers=2) as executor: - executor.submit(disable_logging) - executor.submit(auto_update) + # Use ThreadPoolExecutor to run disable_logging and auto_update concurrently + with ThreadPoolExecutor(max_workers=2) as executor: + executor.submit(disable_logging) + executor.submit(auto_update) + except Exception as e: + print(f"An error occurred: {str(e)}") + raise diff --git a/swarms/telemetry/check_update.py b/swarms/telemetry/check_update.py index a7e2384a..2b0b9a1c 100644 --- a/swarms/telemetry/check_update.py +++ b/swarms/telemetry/check_update.py @@ -4,10 +4,22 @@ import sys import pkg_resources import requests from packaging import version +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger("check-update") # borrowed from: https://stackoverflow.com/a/1051266/656011 def check_for_package(package: str) -> bool: + """ + Checks if a package is installed and available for import. + + Args: + package (str): The name of the package to check. + + Returns: + bool: True if the package is installed and can be imported, False otherwise. + """ if package in sys.modules: return True elif (spec := importlib.util.find_spec(package)) is not None: @@ -19,24 +31,43 @@ def check_for_package(package: str) -> bool: return True except ImportError: + logger.error(f"Failed to import {package}") return False else: + logger.info(f"{package} not found") return False def check_for_update() -> bool: - """Check for updates + """ + Checks if there is an update available for the swarms package. Returns: - BOOL: Flag to indicate if there is an update + bool: True if an update is available, False otherwise. """ - # Fetch the latest version from the PyPI API - response = requests.get("https://pypi.org/pypi/swarms/json") - latest_version = response.json()["info"]["version"] + try: + # Fetch the latest version from the PyPI API + response = requests.get("https://pypi.org/pypi/swarms/json") + response.raise_for_status() # Raises an HTTPError if the response status code is 4XX/5XX + latest_version = response.json()["info"]["version"] - # Get the current version using pkg_resources - current_version = pkg_resources.get_distribution("swarms").version + # Get the current version using pkg_resources + current_version = pkg_resources.get_distribution( + "swarms" + ).version - return version.parse(latest_version) > version.parse( - current_version - ) + if version.parse(latest_version) > version.parse( + current_version + ): + logger.info( + f"Update available: {latest_version} > {current_version}" + ) + return True + else: + logger.info( + f"No update available: {latest_version} <= {current_version}" + ) + return False + except requests.exceptions.RequestException as e: + logger.error(f"Failed to check for update: {e}") + return False diff --git a/swarms/utils/profile_func_2.py b/swarms/utils/profile_func_2.py deleted file mode 100644 index e69de29b..00000000 diff --git a/swarms/utils/run_on_cpu.py b/swarms/utils/run_on_cpu.py deleted file mode 100644 index 9573135d..00000000 --- a/swarms/utils/run_on_cpu.py +++ /dev/null @@ -1,128 +0,0 @@ -import os -import psutil -from typing import Callable, Any -import functools - -from swarms.utils.loguru_logger import initialize_logger - -logger = initialize_logger(log_folder="run_on_cpu") - - -def run_on_cpu(func: Callable) -> Callable: - """ - Decorator that ensures the function runs on all available CPU cores, - maximizing CPU and memory usage to execute the function as quickly as possible. - - This decorator sets the CPU affinity of the current process to all available CPU cores - before executing the function. After the function completes, the original CPU affinity is restored. - - Args: - func (Callable): The function to be executed. - - Returns: - Callable: The wrapped function with CPU affinity settings applied. - - Raises: - RuntimeError: If the CPU affinity cannot be set or restored. - """ - - @functools.wraps(func) - def wrapper(*args: Any, **kwargs: Any) -> Any: - # Get the current process - process = psutil.Process(os.getpid()) - - # Check if the platform supports cpu_affinity - if not hasattr(process, "cpu_affinity"): - logger.warning( - "CPU affinity is not supported on this platform. Executing function without setting CPU affinity." - ) - return func(*args, **kwargs) - - # Save the original CPU affinity - original_affinity = process.cpu_affinity() - logger.info(f"Original CPU affinity: {original_affinity}") - - try: - # Set the CPU affinity to all available CPU cores - all_cpus = list(range(os.cpu_count())) - process.cpu_affinity(all_cpus) - logger.info(f"Set CPU affinity to: {all_cpus}") - - # Set process priority to high - try: - process.nice(psutil.HIGH_PRIORITY_CLASS) - logger.info("Set process priority to high.") - except AttributeError: - logger.warning( - "Setting process priority is not supported on this platform." - ) - - # Pre-allocate memory by creating a large array (optional step) - memory_size = int( - psutil.virtual_memory().available * 0.9 - ) # 90% of available memory - try: - logger.info( - f"Pre-allocating memory: {memory_size} bytes" - ) - _ = bytearray(memory_size) - except MemoryError: - logger.error( - "Failed to pre-allocate memory, continuing without pre-allocation." - ) - - # Run the function - result = func(*args, **kwargs) - - except psutil.AccessDenied as e: - logger.error( - "Access denied while setting CPU affinity", - exc_info=True, - ) - raise RuntimeError( - "Access denied while setting CPU affinity" - ) from e - - except psutil.NoSuchProcess as e: - logger.error("Process does not exist", exc_info=True) - raise RuntimeError("Process does not exist") from e - - except Exception as e: - logger.error( - "An error occurred during function execution", - exc_info=True, - ) - raise RuntimeError( - "An error occurred during function execution" - ) from e - - finally: - # Restore the original CPU affinity - try: - process.cpu_affinity(original_affinity) - logger.info( - f"Restored original CPU affinity: {original_affinity}" - ) - except Exception as e: - logger.error( - "Failed to restore CPU affinity", exc_info=True - ) - raise RuntimeError( - "Failed to restore CPU affinity" - ) from e - - return result - - return wrapper - - -# # Example usage of the decorator -# @run_on_cpu -# def compute_heavy_task() -> None: -# # An example task that is CPU and memory intensive -# data = [i**2 for i in range(100000000)] -# sum(data) -# print("Task completed.") - - -# compute_heavy_task()