parent
d603604ccf
commit
4f209580de
@ -0,0 +1,13 @@
|
||||
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
|
||||
|
||||
example = AutoSwarmBuilder(
|
||||
name="ContentCreation-Swarm",
|
||||
description="A swarm of specialized AI agents for research, writing, editing, and publishing that maintain brand consistency across channels while automating distribution.",
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
print(
|
||||
example.run(
|
||||
"Build agents for research, writing, editing, and publishing to enhance brand consistency and automate distribution across channels."
|
||||
)
|
||||
)
|
@ -1,178 +0,0 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from swarms.utils.loguru_logger import initialize_logger
|
||||
|
||||
|
||||
logger = initialize_logger("workspace-manager")
|
||||
|
||||
|
||||
class WorkspaceManager:
|
||||
"""
|
||||
Manages the workspace directory and settings for the application.
|
||||
This class is responsible for setting up the workspace directory, logging configuration,
|
||||
and retrieving environment variables for telemetry and API key.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
workspace_dir: Optional[str] = "agent_workspace",
|
||||
use_telemetry: Optional[bool] = True,
|
||||
api_key: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Initializes the WorkspaceManager with optional parameters for workspace directory,
|
||||
telemetry usage, and API key.
|
||||
|
||||
Args:
|
||||
workspace_dir (Optional[str]): The path to the workspace directory.
|
||||
use_telemetry (Optional[bool]): A flag indicating whether to use telemetry.
|
||||
api_key (Optional[str]): The API key for the application.
|
||||
"""
|
||||
self.workspace_dir = workspace_dir
|
||||
self.use_telemetry = use_telemetry
|
||||
self.api_key = api_key
|
||||
|
||||
def _create_env_file(self, env_file_path: Path) -> None:
|
||||
"""
|
||||
Create a new .env file with default WORKSPACE_DIR.
|
||||
|
||||
Args:
|
||||
env_file_path (Path): The path to the .env file.
|
||||
"""
|
||||
with env_file_path.open("w") as file:
|
||||
file.write(f"WORKSPACE_DIR={self.workspace_dir}\n")
|
||||
logger.info(
|
||||
"Created a new .env file with default WORKSPACE_DIR."
|
||||
)
|
||||
|
||||
def _append_to_env_file(self, env_file_path: Path) -> None:
|
||||
"""
|
||||
Append WORKSPACE_DIR to .env if it doesn't exist.
|
||||
|
||||
Args:
|
||||
env_file_path (Path): The path to the .env file.
|
||||
"""
|
||||
with env_file_path.open("r+") as file:
|
||||
content = file.read()
|
||||
if "WORKSPACE_DIR" not in content:
|
||||
file.seek(0, os.SEEK_END)
|
||||
file.write(f"WORKSPACE_DIR={self.workspace_dir}\n")
|
||||
logger.info("Appended WORKSPACE_DIR to .env file.")
|
||||
|
||||
def _get_workspace_dir(
|
||||
self, workspace_dir: Optional[str] = None
|
||||
) -> str:
|
||||
"""
|
||||
Get the workspace directory from environment variable or default.
|
||||
|
||||
Args:
|
||||
workspace_dir (Optional[str]): The path to the workspace directory.
|
||||
|
||||
Returns:
|
||||
str: The path to the workspace directory.
|
||||
"""
|
||||
return workspace_dir or os.getenv(
|
||||
"WORKSPACE_DIR", "agent_workspace"
|
||||
)
|
||||
|
||||
def _get_telemetry_status(
|
||||
self, use_telemetry: Optional[bool] = None
|
||||
) -> bool:
|
||||
"""
|
||||
Get telemetry status from environment variable or default.
|
||||
|
||||
Args:
|
||||
use_telemetry (Optional[bool]): A flag indicating whether to use telemetry.
|
||||
|
||||
Returns:
|
||||
bool: The status of telemetry usage.
|
||||
"""
|
||||
return (
|
||||
use_telemetry
|
||||
if use_telemetry is not None
|
||||
else os.getenv("USE_TELEMETRY", "true").lower() == "true"
|
||||
)
|
||||
|
||||
def _get_api_key(
|
||||
self, api_key: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Get API key from environment variable or default.
|
||||
|
||||
Args:
|
||||
api_key (Optional[str]): The API key for the application.
|
||||
|
||||
Returns:
|
||||
Optional[str]: The API key or None if not set.
|
||||
"""
|
||||
return api_key or os.getenv("SWARMS_API_KEY")
|
||||
|
||||
def _init_workspace(self) -> None:
|
||||
"""
|
||||
Initialize the workspace directory if it doesn't exist.
|
||||
"""
|
||||
if not self.workspace_path.exists():
|
||||
self.workspace_path.mkdir(parents=True, exist_ok=True)
|
||||
logger.info("Workspace directory initialized.")
|
||||
|
||||
@property
|
||||
def get_workspace_path(self) -> Path:
|
||||
"""
|
||||
Get the workspace path.
|
||||
|
||||
Returns:
|
||||
Path: The path to the workspace directory.
|
||||
"""
|
||||
return self.workspace_path
|
||||
|
||||
@property
|
||||
def get_telemetry_status(self) -> bool:
|
||||
"""
|
||||
Get telemetry status.
|
||||
|
||||
Returns:
|
||||
bool: The status of telemetry usage.
|
||||
"""
|
||||
return self.use_telemetry
|
||||
|
||||
@property
|
||||
def get_api_key(self) -> Optional[str]:
|
||||
"""
|
||||
Get API key.
|
||||
|
||||
Returns:
|
||||
Optional[str]: The API key or None if not set.
|
||||
"""
|
||||
return self.api_key
|
||||
|
||||
def run(self) -> None:
|
||||
try:
|
||||
# Check if .env file exists and create it if it doesn't
|
||||
env_file_path = Path(".env")
|
||||
|
||||
# If the .env file doesn't exist, create it
|
||||
if not env_file_path.exists():
|
||||
self._create_env_file(env_file_path)
|
||||
else:
|
||||
# Append WORKSPACE_DIR to .env if it doesn't exist
|
||||
self._append_to_env_file(env_file_path)
|
||||
|
||||
# Set workspace directory
|
||||
self.workspace_dir = self._get_workspace_dir(
|
||||
self.workspace_dir
|
||||
)
|
||||
self.workspace_path = Path(self.workspace_dir)
|
||||
|
||||
# Set telemetry preference
|
||||
self.use_telemetry = self._get_telemetry_status(
|
||||
self.use_telemetry
|
||||
)
|
||||
|
||||
# Set API key
|
||||
self.api_key = self._get_api_key(self.api_key)
|
||||
|
||||
# Initialize workspace
|
||||
self._init_workspace()
|
||||
except Exception as e:
|
||||
logger.error(f"Error initializing WorkspaceManager: {e}")
|
@ -1,81 +0,0 @@
|
||||
from typing import Callable, List, Optional, Union
|
||||
|
||||
from swarms.structs.agent import Agent
|
||||
from swarms.utils.loguru_logger import initialize_logger
|
||||
|
||||
logger = initialize_logger(log_folder="swarm_reliability_checks")
|
||||
|
||||
|
||||
def reliability_check(
|
||||
agents: List[Union[Agent, Callable]],
|
||||
max_loops: int,
|
||||
name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
flow: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Performs reliability checks on swarm configuration parameters.
|
||||
|
||||
Args:
|
||||
agents: List of Agent objects or callables that will be executed
|
||||
max_loops: Maximum number of execution loops
|
||||
name: Name identifier for the swarm
|
||||
description: Description of the swarm's purpose
|
||||
|
||||
Raises:
|
||||
ValueError: If any parameters fail validation checks
|
||||
TypeError: If parameters are of incorrect type
|
||||
"""
|
||||
logger.info("Initializing swarm reliability checks")
|
||||
|
||||
# Type checking
|
||||
if not isinstance(agents, list):
|
||||
raise TypeError("agents parameter must be a list")
|
||||
|
||||
if not isinstance(max_loops, int):
|
||||
raise TypeError("max_loops must be an integer")
|
||||
|
||||
# Validate agents
|
||||
if not agents:
|
||||
raise ValueError("Agents list cannot be empty")
|
||||
|
||||
for i, agent in enumerate(agents):
|
||||
if not isinstance(agent, (Agent, Callable)):
|
||||
raise TypeError(
|
||||
f"Agent at index {i} must be an Agent instance or Callable"
|
||||
)
|
||||
|
||||
# Validate max_loops
|
||||
if max_loops <= 0:
|
||||
raise ValueError("max_loops must be greater than 0")
|
||||
|
||||
if max_loops > 1000:
|
||||
logger.warning(
|
||||
"Large max_loops value detected. This may impact performance."
|
||||
)
|
||||
|
||||
# Validate name
|
||||
if name is None:
|
||||
raise ValueError("name parameter is required")
|
||||
if not isinstance(name, str):
|
||||
raise TypeError("name must be a string")
|
||||
if len(name.strip()) == 0:
|
||||
raise ValueError("name cannot be empty or just whitespace")
|
||||
|
||||
# Validate description
|
||||
if description is None:
|
||||
raise ValueError("description parameter is required")
|
||||
if not isinstance(description, str):
|
||||
raise TypeError("description must be a string")
|
||||
if len(description.strip()) == 0:
|
||||
raise ValueError(
|
||||
"description cannot be empty or just whitespace"
|
||||
)
|
||||
|
||||
# Validate flow
|
||||
if flow is None:
|
||||
raise ValueError("flow parameter is required")
|
||||
if not isinstance(flow, str):
|
||||
raise TypeError("flow must be a string")
|
||||
|
||||
logger.info("All reliability checks passed successfully")
|
@ -0,0 +1,16 @@
|
||||
from swarms.utils.litellm_wrapper import LiteLLM
|
||||
|
||||
llm = LiteLLM(
|
||||
model_name="gpt-4o-mini",
|
||||
temperature=0.5,
|
||||
max_tokens=1000,
|
||||
stream=True,
|
||||
)
|
||||
|
||||
out = llm.run("What is the capital of France?")
|
||||
|
||||
print(out)
|
||||
for chunk in out:
|
||||
out = chunk["choices"][0]["delta"]
|
||||
print(type(out))
|
||||
print(out)
|
Loading…
Reference in new issue