[FEAT][CLI] [DOCS][CLI]

pull/595/head
Your Name 3 months ago
parent 93c644237d
commit 087d45f01b

@ -0,0 +1 @@
logged_in

@ -141,6 +141,7 @@ nav:
- Onboarding:
- Installation: "swarms/install/install.md"
- Quickstart: "swarms/install/quickstart.md"
- Swarms CLI: "swarms/cli/main.md"
- Agents:
# - Overview: "swarms/structs/index.md"
# - Build Custom Agents: "swarms/structs/diy_your_own_agent.md"

@ -0,0 +1,104 @@
# Swarms CLI Documentation
The Swarms Command Line Interface (CLI) allows you to easily manage and run your Swarms of agents from the command line. This page will guide you through the installation process and provide a breakdown of the available commands.
## Installation
You can install the `swarms` package using `pip` or `poetry`.
### Using pip
```bash
pip3 install -U swarms
```
### Using poetry
```bash
poetry add swarms
```
Once installed, you can run the Swarms CLI with the following command:
```bash
poetry run swarms help
```
## Swarms CLI - Help
When running `swarms help`, you'll see the following output:
```
_________
/ _____/_ _ _______ _______ _____ ______
\_____ \ \/ \/ /\__ \_ __ \/ \ / ___/
/ \ / / __ \| | \/ Y Y \___ \
/_______ / \/\_/ (____ /__| |__|_| /____ >
\/ \/ \/ \/
Swarms CLI - Help
Commands:
onboarding : Starts the onboarding process
help : Shows this help message
get-api-key : Retrieves your API key from the platform
check-login : Checks if you're logged in and starts the cache
read-docs : Redirects you to swarms cloud documentation!
run-agents : Run your Agents from your agents.yaml
For more details, visit: https://docs.swarms.world
```
### CLI Commands
Below is a detailed explanation of the available commands:
- **onboarding**
Starts the onboarding process to help you set up your environment and configure your agents.
Usage:
```bash
swarms onboarding
```
- **help**
Displays the help message, including a list of available commands.
Usage:
```bash
swarms help
```
- **get-api-key**
Retrieves your API key from the platform, allowing your agents to communicate with the Swarms platform.
Usage:
```bash
swarms get-api-key
```
- **check-login**
Verifies if you are logged into the platform and starts the cache for storing your login session.
Usage:
```bash
swarms check-login
```
- **read-docs**
Redirects you to the official Swarms documentation on the web for further reading.
Usage:
```bash
swarms read-docs
```
- **run-agents**
Executes your agents from the `agents.yaml` configuration file, which defines the structure and behavior of your agents.
Usage:
```bash
swarms run-agents
```

@ -0,0 +1,115 @@
import logging
import os
from clusterops import (
execute_with_cpu_cores,
list_available_cpus,
)
from swarm_models import OpenAIChat
from swarms import Agent
# Configure logging
logging.basicConfig(level=logging.INFO)
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1,
max_tokens=2000,
)
# Function for the director agent
def director_task(task: str):
logging.info(f"Running Director agent for task: {task}")
director = Agent(
agent_name="Director",
system_prompt="Directs the tasks for the workers",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="director.json",
)
return director.run(task)
# Function for worker 1
def worker1_task(task: str):
logging.info(f"Running Worker1 agent for task: {task}")
worker1 = Agent(
agent_name="Worker1",
system_prompt="Generates a transcript for a youtube video on what swarms are",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
return worker1.run(task)
# Function for worker 2
def worker2_task(task: str):
logging.info(f"Running Worker2 agent for task: {task}")
worker2 = Agent(
agent_name="Worker2",
system_prompt="Summarizes the transcript generated by Worker1",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
return worker2.run(task)
# CPU Core Assignment Example
def assign_tasks_to_cpus():
# List available CPUs
cpus = list_available_cpus()
logging.info(f"Available CPUs: {cpus}")
# Example: Assign Director task to 1 CPU core
logging.info("Executing Director task using 1 CPU core")
execute_with_cpu_cores(
1, director_task, "Direct the creation of swarm video format"
)
# Example: Assign Worker1 task to 2 CPU cores
logging.info("Executing Worker1 task using 2 CPU cores")
execute_with_cpu_cores(
2,
worker1_task,
"Generate transcript for youtube video on swarms",
)
# Example: Assign Worker2 task to 2 CPU cores
logging.info("Executing Worker2 task using 2 CPU cores")
execute_with_cpu_cores(
2,
worker2_task,
"Summarize the transcript generated by Worker1",
)
print("finished")
if __name__ == "__main__":
logging.info(
"Starting the CPU-based task assignment for agents..."
)
assign_tasks_to_cpus()

@ -0,0 +1,170 @@
from clusterops import (
list_available_gpus,
execute_on_gpu,
)
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
import os
import logging
from dotenv import load_dotenv
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1,
max_tokens=2000,
)
# Function for the director agent
def director_task(task: str):
logging.info(f"Running Director agent for task: {task}")
director = Agent(
agent_name="Director",
system_prompt="Directs the tasks for the workers",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="director.json",
)
return director.run(task)
# Function for worker 1
def worker1_task(task: str):
logging.info(f"Running Worker1 agent for task: {task}")
worker1 = Agent(
agent_name="Worker1",
system_prompt="Generates a transcript for a youtube video on what swarms are",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
return worker1.run(task)
# Function for worker 2
def worker2_task(task: str):
logging.info(f"Running Worker2 agent for task: {task}")
worker2 = Agent(
agent_name="Worker2",
system_prompt="Summarizes the transcript generated by Worker1",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
return worker2.run(task)
# GPU Assignment Example
def assign_tasks_to_gpus():
# List available GPUs
gpus = list_available_gpus()
logging.info(f"Available GPUs: {gpus}")
# Example: Assign Director to GPU 0
logging.info("Executing Director task on GPU 0")
execute_on_gpu(
0, director_task, "Direct the creation of swarm video format"
)
# Example: Assign Worker1 to GPU 1
logging.info("Executing Worker1 task on GPU 1")
execute_on_gpu(
1,
worker1_task,
"Generate transcript for youtube video on swarms",
)
# Example: Assign Worker2 to GPU 2
logging.info("Executing Worker2 task on GPU 2")
execute_on_gpu(
2,
worker2_task,
"Summarize the transcript generated by Worker1",
)
# Flow Management using AgentRearrange (optional)
def run_agent_flow():
# Initialize the agents
director = Agent(
agent_name="Director",
system_prompt="Directs the tasks for the workers",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="director.json",
)
worker1 = Agent(
agent_name="Worker1",
system_prompt="Generates a transcript for a youtube video on what swarms are",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
worker2 = Agent(
agent_name="Worker2",
system_prompt="Summarizes the transcript generated by Worker1",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
# Define agent list and flow pattern
agents = [director, worker1, worker2]
flow = "Director -> Worker1 -> Worker2"
# Use AgentRearrange to manage the flow
agent_system = AgentRearrange(agents=agents, flow=flow)
output = agent_system.run(
"Create a format to express and communicate swarms of llms in a structured manner for youtube"
)
print(output)
if __name__ == "__main__":
logging.info(
"Starting the GPU-based task assignment for agents..."
)
assign_tasks_to_gpus()
logging.info("Starting the AgentRearrange task flow...")
run_agent_flow()

@ -334,7 +334,7 @@ class PharmaAgent:
"Running multiple chemical queries asynchronously..."
)
tasks = []
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession():
for chemical in chemical_names:
task = self.run_async(chemical)
tasks.append(task)

5538
poetry.lock generated

File diff suppressed because it is too large Load Diff

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "5.6.9"
version = "5.7.1"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -77,6 +77,10 @@ swarms-cloud = "*"
aiofiles = "*"
swarm-models = "*"
[tool.poetry.scripts]
swarms = "swarms.cli.main:main"
[tool.poetry.group.lint.dependencies]
black = ">=23.1,<25.0"
ruff = ">=0.5.1,<0.6.8"

@ -1,90 +1,140 @@
import argparse
from swarms.cli.parse_yaml import (
create_agent_from_yaml,
run_agent,
list_agents,
import os
import time
from rich.console import Console
from rich.text import Text
from swarms.cli.onboarding_process import OnboardingProcess
from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
)
# SWARMS_LOGO = """
# _________
# / _____/_ _ _______ _______ _____ ______
# \_____ \\ \/ \/ /\__ \\_ __ \/ \ / ___/
# / \\ / / __ \| | \/ Y Y \\___ \
# /_______ / \/\_/ (____ /__| |__|_| /____ >
# \/ \/ \/ \/
# """
console = Console()
# RED_COLOR_CODE = "\033[91m"
# RESET_COLOR_CODE = "\033[0m"
# # print(RED_COLOR_CODE + SWARMS_LOGO + RESET_COLOR_CODE)
ASCII_ART = """
_________
/ _____/_ _ _______ _______ _____ ______
\_____ \\ \/ \/ /\__ \\_ __ \/ \ / ___/
/ \\ / / __ \| | \/ Y Y \\___ \
/_______ / \/\_/ (____ /__| |__|_| /____ >
\/ \/ \/ \/
"""
def main():
parser = argparse.ArgumentParser(
description="""
Swarms CLI for managing and running swarms agents.
# Function to display the ASCII art in red
def show_ascii_art():
text = Text(ASCII_ART, style="bold cyan")
console.print(text)
# Help command
def show_help():
console.print(
"""
[bold cyan]Swarms CLI - Help[/bold cyan]
[bold magenta]Commands:[/bold magenta]
[bold white]onboarding[/bold white] : Starts the onboarding process
[bold white]help[/bold white] : Shows this help message
[bold white]get-api-key[/bold white] : Retrieves your API key from the platform
[bold white]check-login[/bold white] : Checks if you're logged in and starts the cache
[bold white]read-docs[/bold white] : Redirects you to swarms cloud documentation!
[bold white]run-agents[/bold white] : Run your Agents from your agents.yaml
For more details, visit: https://docs.swarms.world
"""
)
subparsers = parser.add_subparsers(
dest="command", help="Available commands"
)
# create agent command
create_parser = subparsers.add_parser(
"create", help="Create a new agent from a YAML file"
# [bold white]add-agent[/bold white] : Add an agent to the marketplace under your name. Must have a Dockerfile + your agent.yaml to publish. Learn more Here: https://docs.swarms.world/en/latest/swarms_cloud/vision/
# Fetch API key from platform
def get_api_key():
console.print(
"[bold yellow]Opening the API key retrieval page...[/bold yellow]"
)
create_parser.add_argument(
"agent", type=str, help="Path to the YAML file"
# Simulating API key retrieval process by opening the website
import webbrowser
webbrowser.open("https://swarms.world/platform/api-keys")
time.sleep(2)
console.print(
"[bold green]Your API key is available on the dashboard.[/bold green]"
)
# run agent command
run_parser = subparsers.add_parser(
"run", help="Run an agent with a specified task"
# Redirect to docs
def redirect_to_docs():
console.print(
"[bold yellow]Opening the Docs page...[/bold yellow]"
)
run_parser.add_argument(
"agent_name", type=str, help="Name of the agent to run"
# Simulating API key retrieval process by opening the website
import webbrowser
webbrowser.open("https://docs.swarms.world")
time.sleep(2)
# Check and start cache (login system simulation)
def check_login():
cache_file = "cache.txt"
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
cache_content = f.read()
if cache_content == "logged_in":
console.print(
"[bold green]You are already logged in.[/bold green]"
)
run_parser.add_argument(
"task", type=str, help="Task for the agent to execute"
else:
console.print(
"[bold red]You are not logged in.[/bold red]"
)
else:
console.print("[bold yellow]Logging in...[/bold yellow]")
time.sleep(2)
with open(cache_file, "w") as f:
f.write("logged_in")
console.print("[bold green]Login successful![/bold green]")
# list agents command
subparsers.add_parser("list", help="List all agents")
# Additional help options
parser.add_argument(
"--issue",
action="store_true",
help="Open an issue on GitHub: https://github.com/kyegomez/swarms/issues/new/choose",
)
# Main CLI handler
def main():
parser = argparse.ArgumentParser(description="Swarms Cloud CLI")
# Adding arguments for different commands
parser.add_argument(
"--community",
action="store_true",
help="Join our community on Discord: https://discord.com/servers/agora-999382051935506503",
"command",
choices=["onboarding", "help", "get-api-key", "check-login"],
help="Command to run",
)
args = parser.parse_args()
if args.issue:
print(
"Open an issue on GitHub: https://github.com/kyegomez/swarms/issues/new/choose"
)
elif args.community:
print(
"Join our community on Discord: https://discord.com/servers/agora-999382051935506503"
show_ascii_art()
# Determine which command to run
if args.command == "onboarding":
OnboardingProcess().run()
elif args.command == "help":
show_help()
elif args.command == "get-api-key":
get_api_key()
elif args.command == "check-login":
check_login()
elif args.command == "read-docs":
redirect_to_docs()
elif args.command == "run-agents":
create_agents_from_yaml(
yaml_file="agents.yaml", return_type="tasks"
)
elif args.command == "create":
create_agent_from_yaml(args.agent)
elif args.command == "run":
run_agent(args.agent_name, args.task)
elif args.command == "list agents":
list_agents()
else:
parser.print_help()
console.print(
"[bold red]Unknown command! Type 'help' for usage.[/bold red]"
)
# if __name__ == "__main__":
# main()
if __name__ == "__main__":
main()

@ -1,121 +0,0 @@
import os
from termcolor import colored
import json
welcome = """
Swarms is the first-ever multi-agent enterpris-grade framework that enables you to seamlessly orchestrate agents!
"""
def print_welcome():
print(
colored(
f"Welcome to the Swarms Framework! \n {welcome}",
"cyan",
attrs=["bold"],
)
)
print(
colored(
"Thank you for trying out Swarms. We are excited to have you on board to enable you to get started.",
"cyan",
)
)
print()
print(colored("Resources", "cyan", attrs=["bold"]))
print(
colored("GitHub: ", "cyan")
+ colored("https://github.com/kyegomez/swarms", "magenta")
)
print(
colored("Discord: ", "cyan")
+ colored(
"https://discord.com/servers/agora-999382051935506503",
"magenta",
)
)
print(
colored("Documentation: ", "cyan")
+ colored("https://docs.swarms.world", "magenta")
)
print(
colored("Marketplace: ", "cyan")
+ colored("https://swarms.world", "magenta")
)
print(
colored("Submit an Issue: ", "cyan")
+ colored(
"https://github.com/kyegomez/swarms/issues/new/choose",
"magenta",
)
)
print(
colored("Swarms Project Board // Roadmap ", "cyan")
+ colored(
"https://github.com/users/kyegomez/projects/1", "magenta"
)
)
print()
print(
colored(
"Let's get to know you a bit better!",
"magenta",
attrs=["bold"],
)
)
def get_user_info():
first_name = input(colored("What is your first name? ", "blue"))
last_name = input(colored("What is your last name? ", "blue"))
email = input(colored("What is your email? ", "blue"))
company = input(
colored("Which company do you work for? ", "blue")
)
project = input(
colored("What are you trying to build with Swarms? ", "blue")
)
swarms_type = input(
colored("What type of swarms are you building? ", "blue")
)
user_info = {
"first_name": first_name,
"last_name": last_name,
"email": email,
"company": company,
"project": project,
"swarms_type": swarms_type,
}
return user_info
def save_user_info(user_info: dict = None):
cache_dir = os.path.expanduser("~/.swarms_cache")
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
cache_file = os.path.join(cache_dir, "user_info.json")
with open(cache_file, "w") as f:
json.dump(user_info, f, indent=4)
print(
colored(
"Your information has been saved as a JSON file! Thank you.",
"cyan",
)
)
def onboard():
print_welcome()
user_info = get_user_info()
save_user_info(user_info)
print(
colored(
"You're all set! Enjoy using Swarms.",
"cyan",
attrs=["bold"],
)
)

@ -0,0 +1,188 @@
import os
from loguru import logger
import json
import time
from typing import Dict
from swarms_cloud.utils.log_to_swarms_database import log_agent_data
from swarms_cloud.utils.capture_system_data import capture_system_data
class OnboardingProcess:
"""
This class handles the onboarding process for users. It collects user data including their
full name, first name, email, Swarms API key, and system data, then autosaves it in both a
main JSON file and a cache file for reliability. It supports loading previously saved or cached data.
"""
def __init__(
self,
auto_save_path: str = "user_data.json",
cache_save_path: str = "user_data_cache.json",
) -> None:
"""
Initializes the OnboardingProcess with an autosave file path and a cache path.
Args:
auto_save_path (str): The path where user data is automatically saved.
cache_save_path (str): The path where user data is cached for reliability.
"""
self.user_data: Dict[str, str] = {}
self.system_data: Dict[str, str] = capture_system_data()
self.auto_save_path = auto_save_path
self.cache_save_path = cache_save_path
self.load_existing_data()
def load_existing_data(self) -> None:
"""
Loads existing user data from the auto-save file or cache if available.
"""
if os.path.exists(self.auto_save_path):
try:
with open(self.auto_save_path, "r") as f:
self.user_data = json.load(f)
logger.info(
"Existing user data loaded from {}",
self.auto_save_path,
)
return
except json.JSONDecodeError as e:
logger.error(
"Failed to load user data from main file: {}", e
)
# Fallback to cache if main file fails
if os.path.exists(self.cache_save_path):
try:
with open(self.cache_save_path, "r") as f:
self.user_data = json.load(f)
logger.info(
"User data loaded from cache: {}",
self.cache_save_path,
)
except json.JSONDecodeError as e:
logger.error(
"Failed to load user data from cache: {}", e
)
def save_data(self, retry_attempts: int = 3) -> None:
"""
Saves the current user data to both the auto-save file and the cache file. If the main
save fails, the cache is updated instead. Implements retry logic with exponential backoff
in case both save attempts fail.
Args:
retry_attempts (int): The number of retries if saving fails.
"""
attempt = 0
backoff_time = 1 # Starting backoff time (in seconds)
while attempt < retry_attempts:
try:
combined_data = {**self.user_data, **self.system_data}
log_agent_data(combined_data)
# threading.Thread(target=log_agent_data(combined_data)).start()
with open(self.auto_save_path, "w") as f:
json.dump(combined_data, f, indent=4)
# logger.info(
# "User and system data successfully saved to {}",
# self.auto_save_path,
# )
with open(self.cache_save_path, "w") as f:
json.dump(combined_data, f, indent=4)
# logger.info(
# "User and system data successfully cached in {}",
# self.cache_save_path,
# )
return # Exit the function if saving was successful
except Exception as e:
logger.error(
"Error saving user data (Attempt {}): {}",
attempt + 1,
e,
)
# Retry after a short delay (exponential backoff)
time.sleep(backoff_time)
attempt += 1
backoff_time *= (
2 # Double the backoff time for each retry
)
logger.error(
"Failed to save user data after {} attempts.",
retry_attempts,
)
def ask_input(self, prompt: str, key: str) -> None:
"""
Asks the user for input, validates it, and saves it in the user_data dictionary.
Autosaves and caches after each valid input.
Args:
prompt (str): The prompt message to display to the user.
key (str): The key under which the input will be saved in user_data.
Raises:
ValueError: If the input is empty or only contains whitespace.
"""
try:
response = input(prompt)
if response.strip().lower() == "quit":
logger.info(
"User chose to quit the onboarding process."
)
exit(0)
if not response.strip():
raise ValueError(
f"{key.capitalize()} cannot be empty."
)
self.user_data[key] = response.strip()
self.save_data()
except ValueError as e:
logger.warning(e)
self.ask_input(prompt, key)
except KeyboardInterrupt:
logger.warning(
"Onboarding process interrupted by the user."
)
exit(1)
def collect_user_info(self) -> None:
"""
Initiates the onboarding process by collecting the user's full name, first name, email,
Swarms API key, and system data.
"""
logger.info("Initiating swarms cloud onboarding process...")
self.ask_input(
"Enter your first name (or type 'quit' to exit): ",
"first_name",
)
self.ask_input(
"Enter your Last Name (or type 'quit' to exit): ",
"last_name",
)
self.ask_input(
"Enter your email (or type 'quit' to exit): ", "email"
)
self.ask_input(
"Enter your Swarms API key (or type 'quit' to exit): Get this in your swarms dashboard: https://swarms.world/platform/api-keys ",
"swarms_api_key",
)
logger.success("Onboarding process completed successfully!")
def run(self) -> None:
"""
Main method to run the onboarding process. It handles unexpected errors and ensures
proper finalization.
"""
try:
self.collect_user_info()
except Exception as e:
logger.error("An unexpected error occurred: {}", e)
finally:
logger.info("Finalizing the onboarding process.")
# if __name__ == "__main__":
# onboarding = OnboardingProcess()
# onboarding.run()
Loading…
Cancel
Save