From 0ce9ad929a196192adb9ab151466eefa201196a6 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 19 Nov 2024 13:20:17 -0800 Subject: [PATCH] [Agent][Clusterops fixes] --- pyproject.toml | 2 +- swarms/__init__.py | 5 + swarms/agents/tool_agent.py | 7 +- swarms/prompts/prompt.py | 25 ++-- swarms/structs/__init__.py | 3 +- swarms/structs/agent.py | 57 +++++---- swarms/structs/groupchat.py | 6 + swarms/structs/workspace_manager.py | 176 ++++++++++++++++++++++++++++ swarms/utils/loguru_logger.py | 16 ++- swarms/utils/workspace_manager.py | 0 swarms/utils/wrapper_clusterop.py | 57 +++++---- 11 files changed, 279 insertions(+), 75 deletions(-) create mode 100644 swarms/structs/workspace_manager.py delete mode 100644 swarms/utils/workspace_manager.py diff --git a/pyproject.toml b/pyproject.toml index ca879b48..6e92f651 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.1.3" +version = "6.2.0" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/__init__.py b/swarms/__init__.py index c2df2d73..59fee672 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -1,8 +1,13 @@ import concurrent.futures from dotenv import load_dotenv +# from swarms.structs.workspace_manager import WorkspaceManager +# workspace_manager = WorkspaceManager() +# workspace_manager.run() + load_dotenv() + from swarms.telemetry.bootup import bootup # noqa: E402, F403 from swarms.telemetry.sentry_active import ( activate_sentry, diff --git a/swarms/agents/tool_agent.py b/swarms/agents/tool_agent.py index 34a316b3..2d19ec26 100644 --- a/swarms/agents/tool_agent.py +++ b/swarms/agents/tool_agent.py @@ -1,13 +1,11 @@ from typing import Any, Optional, Callable - -from swarms.structs.agent import Agent from swarms.tools.json_former import Jsonformer from swarms.utils.loguru_logger import initialize_logger logger = initialize_logger(log_folder="tool_agent") -class ToolAgent(Agent): +class ToolAgent: """ Represents a tool agent that performs a specific task using a model and tokenizer. @@ -153,3 +151,6 @@ class ToolAgent(Agent): f"Error running {self.name} for task: {task}" ) raise error + + def __call__(self, task: str, *args, **kwargs): + return self.run(task, *args, **kwargs) diff --git a/swarms/prompts/prompt.py b/swarms/prompts/prompt.py index 221250f0..b8628b20 100644 --- a/swarms/prompts/prompt.py +++ b/swarms/prompts/prompt.py @@ -133,8 +133,7 @@ class Prompt(BaseModel): self.content = new_content self.edit_count += 1 self.last_modified_at = time.strftime("%Y-%m-%d %H:%M:%S") - - + # logger.debug( # f"Prompt {self.id} updated. Edit count: {self.edit_count}. New content: '{self.content}'" # ) @@ -165,15 +164,15 @@ class Prompt(BaseModel): ) raise IndexError("Invalid version number for rollback.") - logger.info( - f"Rolling back prompt {self.id} to version {version}." - ) + # logger.info( + # f"Rolling back prompt {self.id} to version {version}." + # ) self.content = self.edit_history[version] self.edit_count = version self.last_modified_at = time.strftime("%Y-%m-%d %H:%M:%S") - logger.debug( - f"Prompt {self.id} rolled back to version {version}. Current content: '{self.content}'" - ) + # logger.debug( + # f"Prompt {self.id} rolled back to version {version}. Current content: '{self.content}'" + # ) self.log_telemetry() @@ -203,7 +202,7 @@ class Prompt(BaseModel): Raises: NotImplementedError: This method is a placeholder for storage integration. """ - logger.info(f"Saving prompt {self.id} to persistent storage.") + # logger.info(f"Saving prompt {self.id} to persistent storage.") raise NotImplementedError( "Persistent storage integration is required." ) @@ -221,9 +220,9 @@ class Prompt(BaseModel): Raises: NotImplementedError: This method is a placeholder for storage integration. """ - logger.info( - f"Loading prompt {prompt_id} from persistent storage." - ) + # logger.info( + # f"Loading prompt {prompt_id} from persistent storage." + # ) raise NotImplementedError( "Persistent storage integration is required." ) @@ -259,7 +258,7 @@ class Prompt(BaseModel): with open(file_path, "w") as file: json.dump(self.model_dump(), file) # logger.info(f"Autosaved prompt {self.id} to {file_path}.") - + # return "Prompt autosaved successfully." # def auto_generate_prompt(self): diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index a660ed1a..adb33324 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -12,7 +12,7 @@ from swarms.structs.graph_workflow import ( Node, NodeType, ) -from swarms.structs.groupchat import GroupChat +from swarms.structs.groupchat import GroupChat, GroupChatState from swarms.structs.majority_voting import ( MajorityVoting, majority_voting, @@ -144,4 +144,5 @@ __all__ = [ "swarm_router", "run_agents_with_tasks_concurrently", "showcase_available_agents", + "GroupChatState", ] diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index a59e0d4e..f40910b6 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -22,10 +22,6 @@ from typing import ( import toml import yaml -from clusterops import ( - execute_on_gpu, - execute_with_cpu_cores, -) from pydantic import BaseModel from swarm_models.tiktoken_wrapper import TikTokenizer from termcolor import colored @@ -54,6 +50,9 @@ from swarms.utils.file_processing import create_file_in_folder from swarms.utils.pdf_to_text import pdf_to_text from swarms.artifacts.main_artifact import Artifact from swarms.utils.loguru_logger import initialize_logger +from swarms.utils.wrapper_clusterop import ( + exec_callable_with_clusterops, +) logger = initialize_logger(log_folder="agents") @@ -338,6 +337,7 @@ class Agent: all_cores: bool = True, device_id: int = 0, scheduled_run_date: Optional[datetime] = None, + do_not_use_cluster_ops: bool = True, *args, **kwargs, ): @@ -451,6 +451,7 @@ class Agent: self.all_cores = all_cores self.device_id = device_id self.scheduled_run_date = scheduled_run_date + self.do_not_use_cluster_ops = do_not_use_cluster_ops # Initialize the short term memory self.short_memory = Conversation( @@ -2247,11 +2248,12 @@ class Agent: self, task: Optional[str] = None, img: Optional[str] = None, - is_last: bool = False, device: str = "cpu", # gpu device_id: int = 0, all_cores: bool = True, scheduled_run_date: Optional[datetime] = None, + do_not_use_cluster_ops: bool = False, + all_gpus: bool = False, *args, **kwargs, ) -> Any: @@ -2265,11 +2267,11 @@ class Agent: Args: task (Optional[str], optional): The task to be executed. Defaults to None. img (Optional[str], optional): The image to be processed. Defaults to None. - is_last (bool, optional): Indicates if this is the last task. Defaults to False. device (str, optional): The device to use for execution. Defaults to "cpu". device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0. all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True. scheduled_run_date (Optional[datetime], optional): The date and time to schedule the task. Defaults to None. + do_not_use_cluster_ops (bool, optional): If True, does not use cluster ops. Defaults to False. *args: Additional positional arguments to be passed to the execution method. **kwargs: Additional keyword arguments to be passed to the execution method. @@ -2282,6 +2284,11 @@ class Agent: """ device = device or self.device device_id = device_id or self.device_id + all_cores = all_cores or self.all_cores + all_gpus = all_gpus or self.all_gpus + do_not_use_cluster_ops = do_not_use_cluster_ops or self.do_not_use_cluster_ops + + if scheduled_run_date: while datetime.now() < scheduled_run_date: @@ -2290,32 +2297,24 @@ class Agent: ) # Sleep for a short period to avoid busy waiting try: - logger.info(f"Attempting to run on device: {device}") - if device == "cpu": - logger.info("Device set to CPU") - if all_cores is True: - count = os.cpu_count() - logger.info( - f"Using all available CPU cores: {count}" - ) - else: - count = device_id - logger.info(f"Using specific CPU core: {count}") - - return execute_with_cpu_cores( - count, self._run, task, img, *args, **kwargs - ) + # If cluster ops disabled, run directly + if do_not_use_cluster_ops is True: + logger.info("Running without cluster operations") + return self._run(task=task, img=img, *args, **kwargs) - # If device gpu - elif device == "gpu": - logger.info("Device set to GPU") - return execute_on_gpu( - device_id, self._run, task, img, *args, **kwargs - ) else: - raise ValueError( - f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'." + return exec_callable_with_clusterops( + device=device, + device_id=device_id, + all_cores=all_cores, + all_gpus=all_gpus, + func=self._run, + task=task, + img=img, + *args, + **kwargs, ) + except ValueError as e: logger.error(f"Invalid device specified: {e}") raise e diff --git a/swarms/structs/groupchat.py b/swarms/structs/groupchat.py index 0e347f42..46e798ba 100644 --- a/swarms/structs/groupchat.py +++ b/swarms/structs/groupchat.py @@ -59,6 +59,12 @@ class GroupChatState(BaseModel): updated_at: datetime = Field(default_factory=datetime.utcnow) +# Todo: +# Build a function that prompts the llm to output the +# [Agent-Name] in square brackets and then the question or something +# An agentic Language notation + + class AgentWrapper: """Wrapper class to standardize agent interfaces""" diff --git a/swarms/structs/workspace_manager.py b/swarms/structs/workspace_manager.py new file mode 100644 index 00000000..cec3615d --- /dev/null +++ b/swarms/structs/workspace_manager.py @@ -0,0 +1,176 @@ +import os +from pathlib import Path +from typing import Optional +from swarms.utils.loguru_logger import initialize_logger + + +logger = initialize_logger("workspace-manager") + + +class WorkspaceManager: + """ + Manages the workspace directory and settings for the application. + This class is responsible for setting up the workspace directory, logging configuration, + and retrieving environment variables for telemetry and API key. + """ + + def __init__( + self, + workspace_dir: Optional[str] = "agent_workspace", + use_telemetry: Optional[bool] = True, + api_key: Optional[str] = None, + ): + """ + Initializes the WorkspaceManager with optional parameters for workspace directory, + telemetry usage, and API key. + + Args: + workspace_dir (Optional[str]): The path to the workspace directory. + use_telemetry (Optional[bool]): A flag indicating whether to use telemetry. + api_key (Optional[str]): The API key for the application. + """ + self.workspace_dir = workspace_dir + self.use_telemetry = use_telemetry + self.api_key = api_key + + def _create_env_file(self, env_file_path: Path) -> None: + """ + Create a new .env file with default WORKSPACE_DIR. + + Args: + env_file_path (Path): The path to the .env file. + """ + with env_file_path.open("w") as file: + file.write("WORKSPACE_DIR=agent_workspace\n") + logger.info( + "Created a new .env file with default WORKSPACE_DIR." + ) + + def _append_to_env_file(self, env_file_path: Path) -> None: + """ + Append WORKSPACE_DIR to .env if it doesn't exist. + + Args: + env_file_path (Path): The path to the .env file. + """ + with env_file_path.open("r+") as file: + content = file.read() + if "WORKSPACE_DIR" not in content: + file.seek(0, os.SEEK_END) + file.write("WORKSPACE_DIR=agent_workspace\n") + logger.info("Appended WORKSPACE_DIR to .env file.") + + def _get_workspace_dir( + self, workspace_dir: Optional[str] = None + ) -> str: + """ + Get the workspace directory from environment variable or default. + + Args: + workspace_dir (Optional[str]): The path to the workspace directory. + + Returns: + str: The path to the workspace directory. + """ + return workspace_dir or os.getenv( + "WORKSPACE_DIR", "agent_workspace" + ) + + def _get_telemetry_status( + self, use_telemetry: Optional[bool] = None + ) -> bool: + """ + Get telemetry status from environment variable or default. + + Args: + use_telemetry (Optional[bool]): A flag indicating whether to use telemetry. + + Returns: + bool: The status of telemetry usage. + """ + return ( + use_telemetry + if use_telemetry is not None + else os.getenv("USE_TELEMETRY", "true").lower() == "true" + ) + + def _get_api_key( + self, api_key: Optional[str] = None + ) -> Optional[str]: + """ + Get API key from environment variable or default. + + Args: + api_key (Optional[str]): The API key for the application. + + Returns: + Optional[str]: The API key or None if not set. + """ + return api_key or os.getenv("SWARMS_API_KEY") + + def _init_workspace(self) -> None: + """ + Initialize the workspace directory if it doesn't exist. + """ + if not self.workspace_path.exists(): + self.workspace_path.mkdir(parents=True, exist_ok=True) + logger.info("Workspace directory initialized.") + + @property + def get_workspace_path(self) -> Path: + """ + Get the workspace path. + + Returns: + Path: The path to the workspace directory. + """ + return self.workspace_path + + @property + def get_telemetry_status(self) -> bool: + """ + Get telemetry status. + + Returns: + bool: The status of telemetry usage. + """ + return self.use_telemetry + + @property + def get_api_key(self) -> Optional[str]: + """ + Get API key. + + Returns: + Optional[str]: The API key or None if not set. + """ + return self.api_key + + def run(self) -> None: + try: + # Check if .env file exists and create it if it doesn't + env_file_path = Path(".env") + if not env_file_path.exists(): + self._create_env_file(env_file_path) + else: + # Append WORKSPACE_DIR to .env if it doesn't exist + self._append_to_env_file(env_file_path) + + # Set workspace directory + self.workspace_dir = self._get_workspace_dir( + self.workspace_dir + ) + self.workspace_path = Path(self.workspace_dir) + + # Set telemetry preference + self.use_telemetry = self._get_telemetry_status( + self.use_telemetry + ) + + # Set API key + self.api_key = self._get_api_key(self.api_key) + + # Initialize workspace + self._init_workspace() + except Exception as e: + logger.error(f"Error initializing WorkspaceManager: {e}") diff --git a/swarms/utils/loguru_logger.py b/swarms/utils/loguru_logger.py index cf1fbd65..af5c7239 100644 --- a/swarms/utils/loguru_logger.py +++ b/swarms/utils/loguru_logger.py @@ -5,12 +5,16 @@ from loguru import logger def initialize_logger(log_folder: str = "logs"): - WORKSPACE_DIR = os.getenv("WORKSPACE_DIR") - if not os.path.exists(WORKSPACE_DIR): - os.makedirs(WORKSPACE_DIR) + AGENT_WORKSPACE = "agent_workspace" - # Create a folder within the workspace_dir - log_folder_path = os.path.join(WORKSPACE_DIR, log_folder) + # Check if WORKSPACE_DIR is set, if not, set it to AGENT_WORKSPACE + if "WORKSPACE_DIR" not in os.environ: + os.environ["WORKSPACE_DIR"] = AGENT_WORKSPACE + + # Create a folder within the agent_workspace + log_folder_path = os.path.join( + os.getenv("WORKSPACE_DIR"), log_folder + ) if not os.path.exists(log_folder_path): os.makedirs(log_folder_path) @@ -28,6 +32,6 @@ def initialize_logger(log_folder: str = "logs"): diagnose=True, enqueue=True, retention="10 days", - compression="zip", + # compression="zip", ) return logger diff --git a/swarms/utils/workspace_manager.py b/swarms/utils/workspace_manager.py deleted file mode 100644 index e69de29b..00000000 diff --git a/swarms/utils/wrapper_clusterop.py b/swarms/utils/wrapper_clusterop.py index 2343e52f..bf7e0cfc 100644 --- a/swarms/utils/wrapper_clusterop.py +++ b/swarms/utils/wrapper_clusterop.py @@ -1,12 +1,12 @@ -import os from typing import Any from clusterops import ( execute_on_gpu, execute_on_multiple_gpus, - execute_with_cpu_cores, list_available_gpus, + execute_with_all_cpu_cores, + execute_on_cpu, ) from swarms.utils.loguru_logger import initialize_logger @@ -43,38 +43,51 @@ def exec_callable_with_clusterops( ValueError: If an invalid device is specified. Exception: If any other error occurs during execution. """ + if func is None: + raise ValueError("A callable function must be provided") + try: logger.info(f"Attempting to run on device: {device}") + device = device.lower() + if device == "cpu": logger.info("Device set to CPU") - if all_cores is True: - count = os.cpu_count() - logger.info(f"Using all available CPU cores: {count}") - else: - count = device_id - logger.info(f"Using specific CPU core: {count}") - - return execute_with_cpu_cores( - count, func, *args, **kwargs - ) - # If device gpu + if all_cores: + logger.info("Using all CPU cores") + return execute_with_all_cpu_cores( + func, *args, **kwargs + ) + + if device_id is not None: + logger.info(f"Using specific CPU core: {device_id}") + return execute_on_cpu( + device_id, func, *args, **kwargs + ) + elif device == "gpu": logger.info("Device set to GPU") + + if all_gpus: + logger.info("Using all available GPUs") + gpus = [int(gpu) for gpu in list_available_gpus()] + return execute_on_multiple_gpus( + gpus, func, *args, **kwargs + ) + + logger.info(f"Using GPU device ID: {device_id}") return execute_on_gpu(device_id, func, *args, **kwargs) - elif device == "gpu" and all_gpus is True: - logger.info("Device set to GPU and running all gpus") - gpus = [int(gpu) for gpu in list_available_gpus()] - return execute_on_multiple_gpus( - gpus, func, *args, **kwargs - ) + else: raise ValueError( f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'." ) + except ValueError as e: - logger.error(f"Invalid device specified: {e}") - raise e + logger.error( + f"Invalid device or configuration specified: {e}" + ) + raise except Exception as e: logger.error(f"An error occurred during execution: {e}") - raise e + raise