[Agent][Clusterops fixes]

pull/643/head
Your Name 2 months ago
parent 47a359ec34
commit 0ce9ad929a

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "6.1.3"
version = "6.2.0"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -1,8 +1,13 @@
import concurrent.futures
from dotenv import load_dotenv
# from swarms.structs.workspace_manager import WorkspaceManager
# workspace_manager = WorkspaceManager()
# workspace_manager.run()
load_dotenv()
from swarms.telemetry.bootup import bootup # noqa: E402, F403
from swarms.telemetry.sentry_active import (
activate_sentry,

@ -1,13 +1,11 @@
from typing import Any, Optional, Callable
from swarms.structs.agent import Agent
from swarms.tools.json_former import Jsonformer
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="tool_agent")
class ToolAgent(Agent):
class ToolAgent:
"""
Represents a tool agent that performs a specific task using a model and tokenizer.
@ -153,3 +151,6 @@ class ToolAgent(Agent):
f"Error running {self.name} for task: {task}"
)
raise error
def __call__(self, task: str, *args, **kwargs):
return self.run(task, *args, **kwargs)

@ -134,7 +134,6 @@ class Prompt(BaseModel):
self.edit_count += 1
self.last_modified_at = time.strftime("%Y-%m-%d %H:%M:%S")
# logger.debug(
# f"Prompt {self.id} updated. Edit count: {self.edit_count}. New content: '{self.content}'"
# )
@ -165,15 +164,15 @@ class Prompt(BaseModel):
)
raise IndexError("Invalid version number for rollback.")
logger.info(
f"Rolling back prompt {self.id} to version {version}."
)
# logger.info(
# f"Rolling back prompt {self.id} to version {version}."
# )
self.content = self.edit_history[version]
self.edit_count = version
self.last_modified_at = time.strftime("%Y-%m-%d %H:%M:%S")
logger.debug(
f"Prompt {self.id} rolled back to version {version}. Current content: '{self.content}'"
)
# logger.debug(
# f"Prompt {self.id} rolled back to version {version}. Current content: '{self.content}'"
# )
self.log_telemetry()
@ -203,7 +202,7 @@ class Prompt(BaseModel):
Raises:
NotImplementedError: This method is a placeholder for storage integration.
"""
logger.info(f"Saving prompt {self.id} to persistent storage.")
# logger.info(f"Saving prompt {self.id} to persistent storage.")
raise NotImplementedError(
"Persistent storage integration is required."
)
@ -221,9 +220,9 @@ class Prompt(BaseModel):
Raises:
NotImplementedError: This method is a placeholder for storage integration.
"""
logger.info(
f"Loading prompt {prompt_id} from persistent storage."
)
# logger.info(
# f"Loading prompt {prompt_id} from persistent storage."
# )
raise NotImplementedError(
"Persistent storage integration is required."
)

@ -12,7 +12,7 @@ from swarms.structs.graph_workflow import (
Node,
NodeType,
)
from swarms.structs.groupchat import GroupChat
from swarms.structs.groupchat import GroupChat, GroupChatState
from swarms.structs.majority_voting import (
MajorityVoting,
majority_voting,
@ -144,4 +144,5 @@ __all__ = [
"swarm_router",
"run_agents_with_tasks_concurrently",
"showcase_available_agents",
"GroupChatState",
]

@ -22,10 +22,6 @@ from typing import (
import toml
import yaml
from clusterops import (
execute_on_gpu,
execute_with_cpu_cores,
)
from pydantic import BaseModel
from swarm_models.tiktoken_wrapper import TikTokenizer
from termcolor import colored
@ -54,6 +50,9 @@ from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.artifacts.main_artifact import Artifact
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
logger = initialize_logger(log_folder="agents")
@ -338,6 +337,7 @@ class Agent:
all_cores: bool = True,
device_id: int = 0,
scheduled_run_date: Optional[datetime] = None,
do_not_use_cluster_ops: bool = True,
*args,
**kwargs,
):
@ -451,6 +451,7 @@ class Agent:
self.all_cores = all_cores
self.device_id = device_id
self.scheduled_run_date = scheduled_run_date
self.do_not_use_cluster_ops = do_not_use_cluster_ops
# Initialize the short term memory
self.short_memory = Conversation(
@ -2247,11 +2248,12 @@ class Agent:
self,
task: Optional[str] = None,
img: Optional[str] = None,
is_last: bool = False,
device: str = "cpu", # gpu
device_id: int = 0,
all_cores: bool = True,
scheduled_run_date: Optional[datetime] = None,
do_not_use_cluster_ops: bool = False,
all_gpus: bool = False,
*args,
**kwargs,
) -> Any:
@ -2265,11 +2267,11 @@ class Agent:
Args:
task (Optional[str], optional): The task to be executed. Defaults to None.
img (Optional[str], optional): The image to be processed. Defaults to None.
is_last (bool, optional): Indicates if this is the last task. Defaults to False.
device (str, optional): The device to use for execution. Defaults to "cpu".
device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True.
scheduled_run_date (Optional[datetime], optional): The date and time to schedule the task. Defaults to None.
do_not_use_cluster_ops (bool, optional): If True, does not use cluster ops. Defaults to False.
*args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method.
@ -2282,6 +2284,11 @@ class Agent:
"""
device = device or self.device
device_id = device_id or self.device_id
all_cores = all_cores or self.all_cores
all_gpus = all_gpus or self.all_gpus
do_not_use_cluster_ops = do_not_use_cluster_ops or self.do_not_use_cluster_ops
if scheduled_run_date:
while datetime.now() < scheduled_run_date:
@ -2290,32 +2297,24 @@ class Agent:
) # Sleep for a short period to avoid busy waiting
try:
logger.info(f"Attempting to run on device: {device}")
if device == "cpu":
logger.info("Device set to CPU")
if all_cores is True:
count = os.cpu_count()
logger.info(
f"Using all available CPU cores: {count}"
)
else:
count = device_id
logger.info(f"Using specific CPU core: {count}")
return execute_with_cpu_cores(
count, self._run, task, img, *args, **kwargs
)
# If cluster ops disabled, run directly
if do_not_use_cluster_ops is True:
logger.info("Running without cluster operations")
return self._run(task=task, img=img, *args, **kwargs)
# If device gpu
elif device == "gpu":
logger.info("Device set to GPU")
return execute_on_gpu(
device_id, self._run, task, img, *args, **kwargs
)
else:
raise ValueError(
f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'."
return exec_callable_with_clusterops(
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
func=self._run,
task=task,
img=img,
*args,
**kwargs,
)
except ValueError as e:
logger.error(f"Invalid device specified: {e}")
raise e

@ -59,6 +59,12 @@ class GroupChatState(BaseModel):
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Todo:
# Build a function that prompts the llm to output the
# [Agent-Name] in square brackets and then the question or something
# An agentic Language notation
class AgentWrapper:
"""Wrapper class to standardize agent interfaces"""

@ -0,0 +1,176 @@
import os
from pathlib import Path
from typing import Optional
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger("workspace-manager")
class WorkspaceManager:
"""
Manages the workspace directory and settings for the application.
This class is responsible for setting up the workspace directory, logging configuration,
and retrieving environment variables for telemetry and API key.
"""
def __init__(
self,
workspace_dir: Optional[str] = "agent_workspace",
use_telemetry: Optional[bool] = True,
api_key: Optional[str] = None,
):
"""
Initializes the WorkspaceManager with optional parameters for workspace directory,
telemetry usage, and API key.
Args:
workspace_dir (Optional[str]): The path to the workspace directory.
use_telemetry (Optional[bool]): A flag indicating whether to use telemetry.
api_key (Optional[str]): The API key for the application.
"""
self.workspace_dir = workspace_dir
self.use_telemetry = use_telemetry
self.api_key = api_key
def _create_env_file(self, env_file_path: Path) -> None:
"""
Create a new .env file with default WORKSPACE_DIR.
Args:
env_file_path (Path): The path to the .env file.
"""
with env_file_path.open("w") as file:
file.write("WORKSPACE_DIR=agent_workspace\n")
logger.info(
"Created a new .env file with default WORKSPACE_DIR."
)
def _append_to_env_file(self, env_file_path: Path) -> None:
"""
Append WORKSPACE_DIR to .env if it doesn't exist.
Args:
env_file_path (Path): The path to the .env file.
"""
with env_file_path.open("r+") as file:
content = file.read()
if "WORKSPACE_DIR" not in content:
file.seek(0, os.SEEK_END)
file.write("WORKSPACE_DIR=agent_workspace\n")
logger.info("Appended WORKSPACE_DIR to .env file.")
def _get_workspace_dir(
self, workspace_dir: Optional[str] = None
) -> str:
"""
Get the workspace directory from environment variable or default.
Args:
workspace_dir (Optional[str]): The path to the workspace directory.
Returns:
str: The path to the workspace directory.
"""
return workspace_dir or os.getenv(
"WORKSPACE_DIR", "agent_workspace"
)
def _get_telemetry_status(
self, use_telemetry: Optional[bool] = None
) -> bool:
"""
Get telemetry status from environment variable or default.
Args:
use_telemetry (Optional[bool]): A flag indicating whether to use telemetry.
Returns:
bool: The status of telemetry usage.
"""
return (
use_telemetry
if use_telemetry is not None
else os.getenv("USE_TELEMETRY", "true").lower() == "true"
)
def _get_api_key(
self, api_key: Optional[str] = None
) -> Optional[str]:
"""
Get API key from environment variable or default.
Args:
api_key (Optional[str]): The API key for the application.
Returns:
Optional[str]: The API key or None if not set.
"""
return api_key or os.getenv("SWARMS_API_KEY")
def _init_workspace(self) -> None:
"""
Initialize the workspace directory if it doesn't exist.
"""
if not self.workspace_path.exists():
self.workspace_path.mkdir(parents=True, exist_ok=True)
logger.info("Workspace directory initialized.")
@property
def get_workspace_path(self) -> Path:
"""
Get the workspace path.
Returns:
Path: The path to the workspace directory.
"""
return self.workspace_path
@property
def get_telemetry_status(self) -> bool:
"""
Get telemetry status.
Returns:
bool: The status of telemetry usage.
"""
return self.use_telemetry
@property
def get_api_key(self) -> Optional[str]:
"""
Get API key.
Returns:
Optional[str]: The API key or None if not set.
"""
return self.api_key
def run(self) -> None:
try:
# Check if .env file exists and create it if it doesn't
env_file_path = Path(".env")
if not env_file_path.exists():
self._create_env_file(env_file_path)
else:
# Append WORKSPACE_DIR to .env if it doesn't exist
self._append_to_env_file(env_file_path)
# Set workspace directory
self.workspace_dir = self._get_workspace_dir(
self.workspace_dir
)
self.workspace_path = Path(self.workspace_dir)
# Set telemetry preference
self.use_telemetry = self._get_telemetry_status(
self.use_telemetry
)
# Set API key
self.api_key = self._get_api_key(self.api_key)
# Initialize workspace
self._init_workspace()
except Exception as e:
logger.error(f"Error initializing WorkspaceManager: {e}")

@ -5,12 +5,16 @@ from loguru import logger
def initialize_logger(log_folder: str = "logs"):
WORKSPACE_DIR = os.getenv("WORKSPACE_DIR")
if not os.path.exists(WORKSPACE_DIR):
os.makedirs(WORKSPACE_DIR)
AGENT_WORKSPACE = "agent_workspace"
# Create a folder within the workspace_dir
log_folder_path = os.path.join(WORKSPACE_DIR, log_folder)
# Check if WORKSPACE_DIR is set, if not, set it to AGENT_WORKSPACE
if "WORKSPACE_DIR" not in os.environ:
os.environ["WORKSPACE_DIR"] = AGENT_WORKSPACE
# Create a folder within the agent_workspace
log_folder_path = os.path.join(
os.getenv("WORKSPACE_DIR"), log_folder
)
if not os.path.exists(log_folder_path):
os.makedirs(log_folder_path)
@ -28,6 +32,6 @@ def initialize_logger(log_folder: str = "logs"):
diagnose=True,
enqueue=True,
retention="10 days",
compression="zip",
# compression="zip",
)
return logger

@ -1,12 +1,12 @@
import os
from typing import Any
from clusterops import (
execute_on_gpu,
execute_on_multiple_gpus,
execute_with_cpu_cores,
list_available_gpus,
execute_with_all_cpu_cores,
execute_on_cpu,
)
from swarms.utils.loguru_logger import initialize_logger
@ -43,38 +43,51 @@ def exec_callable_with_clusterops(
ValueError: If an invalid device is specified.
Exception: If any other error occurs during execution.
"""
if func is None:
raise ValueError("A callable function must be provided")
try:
logger.info(f"Attempting to run on device: {device}")
device = device.lower()
if device == "cpu":
logger.info("Device set to CPU")
if all_cores is True:
count = os.cpu_count()
logger.info(f"Using all available CPU cores: {count}")
else:
count = device_id
logger.info(f"Using specific CPU core: {count}")
return execute_with_cpu_cores(
count, func, *args, **kwargs
if all_cores:
logger.info("Using all CPU cores")
return execute_with_all_cpu_cores(
func, *args, **kwargs
)
if device_id is not None:
logger.info(f"Using specific CPU core: {device_id}")
return execute_on_cpu(
device_id, func, *args, **kwargs
)
# If device gpu
elif device == "gpu":
logger.info("Device set to GPU")
return execute_on_gpu(device_id, func, *args, **kwargs)
elif device == "gpu" and all_gpus is True:
logger.info("Device set to GPU and running all gpus")
if all_gpus:
logger.info("Using all available GPUs")
gpus = [int(gpu) for gpu in list_available_gpus()]
return execute_on_multiple_gpus(
gpus, func, *args, **kwargs
)
logger.info(f"Using GPU device ID: {device_id}")
return execute_on_gpu(device_id, func, *args, **kwargs)
else:
raise ValueError(
f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'."
)
except ValueError as e:
logger.error(f"Invalid device specified: {e}")
raise e
logger.error(
f"Invalid device or configuration specified: {e}"
)
raise
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
raise e
raise

Loading…
Cancel
Save