feat: accept incoming changes

pull/651/head
Occupying-Mars 1 month ago
commit 4dafc0430b

@ -113,14 +113,36 @@ Here are some example scripts to get you started. For more comprehensive documen
| Swarms Examples | A collection of simple examples to demonstrate Swarms capabilities. | Basic Usage | [https://github.com/The-Swarm-Corporation/swarms-examples?tab=readme-ov-file](https://github.com/The-Swarm-Corporation/swarms-examples?tab=readme-ov-file) |
| Cookbook | A comprehensive guide with recipes for various use cases and scenarios. | Advanced Usage | [https://github.com/The-Swarm-Corporation/Cookbook](https://github.com/The-Swarm-Corporation/Cookbook) |
---
## `Agent` Class
The `Agent` class is a fundamental component of the Swarms framework, designed to execute tasks autonomously. It fuses llms, tools and long-term memory capabilities to create a full stack agent. The `Agent` class is highly customizable, allowing for fine-grained control over its behavior and interactions.
### `run` Method
The `run` method is the primary entry point for executing tasks with an `Agent` instance. It accepts a task string as the main input task and processes it according to the agent's configuration. And, it can also accept an `img` parameter such as `img="image_filepath.png` to process images if you have a VLM
The `run` method is the primary entry point for executing tasks with an `Agent` instance. It accepts a task string as the main input task and processes it according to the agent's configuration. And, it can also accept an `img` parameter such as `img="image_filepath.png` to process images if you have a VLM attached such as `GPT4VisionAPI`
## Simple Example
```python
from swarms import Agent
agent = Agent(
agent_name="Stock-Analysis-Agent",
model_name="gpt-4o-mini",
max_loops="auto",
interactive=True,
streaming_on=True,
)
agent.run("What is the current market trend for tech stocks?")
```
### Settings and Customization
The `Agent` class offers a range of settings to tailor its behavior to specific needs. Some key settings include:

@ -0,0 +1,629 @@
import os
from fastapi import (
FastAPI,
HTTPException,
status,
Query,
BackgroundTasks,
)
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List
from loguru import logger
import uvicorn
from datetime import datetime, timedelta
from uuid import UUID, uuid4
from enum import Enum
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import traceback
from swarms import Agent
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure Loguru
logger.add(
"logs/api_{time}.log",
rotation="500 MB",
retention="10 days",
level="INFO",
format="{time} {level} {message}",
backtrace=True,
diagnose=True,
)
class AgentStatus(str, Enum):
"""Enum for agent status."""
IDLE = "idle"
PROCESSING = "processing"
ERROR = "error"
MAINTENANCE = "maintenance"
class AgentConfig(BaseModel):
"""Configuration model for creating a new agent."""
agent_name: str = Field(..., description="Name of the agent")
model_name: str = Field(
...,
description="Name of the llm you want to use provided by litellm",
)
description: str = Field(
default="", description="Description of the agent's purpose"
)
system_prompt: str = Field(
..., description="System prompt for the agent"
)
model_name: str = Field(
default="gpt-4", description="Model name to use"
)
temperature: float = Field(
default=0.1,
ge=0.0,
le=2.0,
description="Temperature for the model",
)
max_loops: int = Field(
default=1, ge=1, description="Maximum number of loops"
)
autosave: bool = Field(
default=True, description="Enable autosave"
)
dashboard: bool = Field(
default=False, description="Enable dashboard"
)
verbose: bool = Field(
default=True, description="Enable verbose output"
)
dynamic_temperature_enabled: bool = Field(
default=True, description="Enable dynamic temperature"
)
user_name: str = Field(
default="default_user", description="Username for the agent"
)
retry_attempts: int = Field(
default=1, ge=1, description="Number of retry attempts"
)
context_length: int = Field(
default=200000, ge=1000, description="Context length"
)
output_type: str = Field(
default="string", description="Output type (string or json)"
)
streaming_on: bool = Field(
default=False, description="Enable streaming"
)
tags: List[str] = Field(
default_factory=list,
description="Tags for categorizing the agent",
)
class AgentUpdate(BaseModel):
"""Model for updating agent configuration."""
description: Optional[str] = None
system_prompt: Optional[str] = None
temperature: Optional[float] = None
max_loops: Optional[int] = None
tags: Optional[List[str]] = None
status: Optional[AgentStatus] = None
class AgentSummary(BaseModel):
"""Summary model for agent listing."""
agent_id: UUID
agent_name: str
description: str
created_at: datetime
last_used: datetime
total_completions: int
tags: List[str]
status: AgentStatus
class AgentMetrics(BaseModel):
"""Model for agent performance metrics."""
total_completions: int
average_response_time: float
error_rate: float
last_24h_completions: int
total_tokens_used: int
uptime_percentage: float
success_rate: float
peak_tokens_per_minute: int
class CompletionRequest(BaseModel):
"""Model for completion requests."""
prompt: str = Field(..., description="The prompt to process")
agent_id: UUID = Field(..., description="ID of the agent to use")
max_tokens: Optional[int] = Field(
None, description="Maximum tokens to generate"
)
temperature_override: Optional[float] = None
stream: bool = Field(
default=False, description="Enable streaming response"
)
class CompletionResponse(BaseModel):
"""Model for completion responses."""
agent_id: UUID
response: str
metadata: Dict[str, Any]
timestamp: datetime
processing_time: float
token_usage: Dict[str, int]
class AgentStore:
"""Enhanced store for managing agents."""
def __init__(self):
self.agents: Dict[UUID, Agent] = {}
self.agent_metadata: Dict[UUID, Dict[str, Any]] = {}
self.executor = ThreadPoolExecutor(max_workers=4)
self._ensure_directories()
def _ensure_directories(self):
"""Ensure required directories exist."""
Path("logs").mkdir(exist_ok=True)
Path("states").mkdir(exist_ok=True)
async def create_agent(self, config: AgentConfig) -> UUID:
"""Create a new agent with the given configuration."""
try:
agent = Agent(
agent_name=config.agent_name,
system_prompt=config.system_prompt,
model_name=config.model_name,
max_loops=config.max_loops,
autosave=config.autosave,
dashboard=config.dashboard,
verbose=config.verbose,
dynamic_temperature_enabled=config.dynamic_temperature_enabled,
saved_state_path=f"states/{config.agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
user_name=config.user_name,
retry_attempts=config.retry_attempts,
context_length=config.context_length,
return_step_meta=True,
output_type="str",
streaming_on=config.streaming_on,
)
agent_id = uuid4()
self.agents[agent_id] = agent
self.agent_metadata[agent_id] = {
"description": config.description,
"created_at": datetime.utcnow(),
"last_used": datetime.utcnow(),
"total_completions": 0,
"tags": config.tags,
"total_tokens": 0,
"error_count": 0,
"response_times": [],
"status": AgentStatus.IDLE,
"start_time": datetime.utcnow(),
"downtime": timedelta(),
"successful_completions": 0,
}
logger.info(f"Created agent with ID: {agent_id}")
return agent_id
except Exception as e:
logger.error(f"Error creating agent: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create agent: {str(e)}",
)
async def get_agent(self, agent_id: UUID) -> Agent:
"""Retrieve an agent by ID."""
agent = self.agents.get(agent_id)
if not agent:
logger.error(f"Agent not found: {agent_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Agent {agent_id} not found",
)
return agent
async def update_agent(
self, agent_id: UUID, update: AgentUpdate
) -> None:
"""Update agent configuration."""
agent = await self.get_agent(agent_id)
metadata = self.agent_metadata[agent_id]
if update.system_prompt:
agent.system_prompt = update.system_prompt
if update.temperature is not None:
agent.llm.temperature = update.temperature
if update.max_loops is not None:
agent.max_loops = update.max_loops
if update.tags is not None:
metadata["tags"] = update.tags
if update.description is not None:
metadata["description"] = update.description
if update.status is not None:
metadata["status"] = update.status
if update.status == AgentStatus.MAINTENANCE:
metadata["downtime"] += (
datetime.utcnow() - metadata["last_used"]
)
logger.info(f"Updated agent {agent_id}")
async def list_agents(
self,
tags: Optional[List[str]] = None,
status: Optional[AgentStatus] = None,
) -> List[AgentSummary]:
"""List all agents, optionally filtered by tags and status."""
summaries = []
for agent_id, agent in self.agents.items():
metadata = self.agent_metadata[agent_id]
# Apply filters
if tags and not any(
tag in metadata["tags"] for tag in tags
):
continue
if status and metadata["status"] != status:
continue
summaries.append(
AgentSummary(
agent_id=agent_id,
agent_name=agent.agent_name,
description=metadata["description"],
created_at=metadata["created_at"],
last_used=metadata["last_used"],
total_completions=metadata["total_completions"],
tags=metadata["tags"],
status=metadata["status"],
)
)
return summaries
async def get_agent_metrics(self, agent_id: UUID) -> AgentMetrics:
"""Get performance metrics for an agent."""
metadata = self.agent_metadata[agent_id]
response_times = metadata["response_times"]
# Calculate metrics
total_time = datetime.utcnow() - metadata["start_time"]
uptime = total_time - metadata["downtime"]
uptime_percentage = (
uptime.total_seconds() / total_time.total_seconds()
) * 100
success_rate = (
metadata["successful_completions"]
/ metadata["total_completions"]
* 100
if metadata["total_completions"] > 0
else 0
)
return AgentMetrics(
total_completions=metadata["total_completions"],
average_response_time=(
sum(response_times) / len(response_times)
if response_times
else 0
),
error_rate=(
metadata["error_count"]
/ metadata["total_completions"]
if metadata["total_completions"] > 0
else 0
),
last_24h_completions=sum(
1
for t in response_times
if (datetime.utcnow() - t).days < 1
),
total_tokens_used=metadata["total_tokens"],
uptime_percentage=uptime_percentage,
success_rate=success_rate,
peak_tokens_per_minute=max(
metadata.get("tokens_per_minute", [0])
),
)
async def clone_agent(
self, agent_id: UUID, new_name: str
) -> UUID:
"""Clone an existing agent with a new name."""
original_agent = await self.get_agent(agent_id)
original_metadata = self.agent_metadata[agent_id]
config = AgentConfig(
agent_name=new_name,
description=f"Clone of {original_agent.agent_name}",
system_prompt=original_agent.system_prompt,
model_name=original_agent.llm.model_name,
temperature=original_agent.llm.temperature,
max_loops=original_agent.max_loops,
tags=original_metadata["tags"],
)
return await self.create_agent(config)
async def delete_agent(self, agent_id: UUID) -> None:
"""Delete an agent."""
if agent_id not in self.agents:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Agent {agent_id} not found",
)
# Clean up any resources
agent = self.agents[agent_id]
if agent.autosave and os.path.exists(agent.saved_state_path):
os.remove(agent.saved_state_path)
del self.agents[agent_id]
del self.agent_metadata[agent_id]
logger.info(f"Deleted agent {agent_id}")
async def process_completion(
self,
agent: Agent,
prompt: str,
agent_id: UUID,
max_tokens: Optional[int] = None,
temperature_override: Optional[float] = None,
) -> CompletionResponse:
"""Process a completion request using the specified agent."""
start_time = datetime.utcnow()
metadata = self.agent_metadata[agent_id]
try:
# Update agent status
metadata["status"] = AgentStatus.PROCESSING
metadata["last_used"] = start_time
# Apply temporary overrides if specified
original_temp = agent.llm.temperature
if temperature_override is not None:
agent.llm.temperature = temperature_override
# Process the completion
response = agent.run(prompt)
# Reset overrides
if temperature_override is not None:
agent.llm.temperature = original_temp
# Update metrics
processing_time = (
datetime.utcnow() - start_time
).total_seconds()
metadata["response_times"].append(processing_time)
metadata["total_completions"] += 1
metadata["successful_completions"] += 1
# Estimate token usage (this is a rough estimate)
prompt_tokens = len(prompt.split()) * 1.3
completion_tokens = len(response.split()) * 1.3
total_tokens = int(prompt_tokens + completion_tokens)
metadata["total_tokens"] += total_tokens
# Update tokens per minute tracking
current_minute = datetime.utcnow().replace(
second=0, microsecond=0
)
if "tokens_per_minute" not in metadata:
metadata["tokens_per_minute"] = {}
metadata["tokens_per_minute"][current_minute] = (
metadata["tokens_per_minute"].get(current_minute, 0)
+ total_tokens
)
return CompletionResponse(
agent_id=agent_id,
response=response,
metadata={
"agent_name": agent.agent_name,
"model_name": agent.llm.model_name,
"temperature": agent.llm.temperature,
},
timestamp=datetime.utcnow(),
processing_time=processing_time,
token_usage={
"prompt_tokens": int(prompt_tokens),
"completion_tokens": int(completion_tokens),
"total_tokens": total_tokens,
},
)
except Exception as e:
metadata["error_count"] += 1
metadata["status"] = AgentStatus.ERROR
logger.error(
f"Error in completion processing: {str(e)}\n{traceback.format_exc()}"
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error processing completion: {str(e)}",
)
finally:
metadata["status"] = AgentStatus.IDLE
class SwarmsAPI:
"""Enhanced API class for Swarms agent integration."""
def __init__(self):
self.app = FastAPI(
title="Swarms Agent API",
description="Production-grade API for Swarms agent interaction",
version="1.0.0",
docs_url="/v1/docs",
redoc_url="/v1/redoc",
)
self.store = AgentStore()
# Configure CORS
self.app.add_middleware(
CORSMiddleware,
allow_origins=[
"*"
], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
self._setup_routes()
def _setup_routes(self):
"""Set up API routes."""
@self.app.post("/v1/agent", response_model=Dict[str, UUID])
async def create_agent(config: AgentConfig):
"""Create a new agent with the specified configuration."""
agent_id = await self.store.create_agent(config)
return {"agent_id": agent_id}
@self.app.get("/v1/agents", response_model=List[AgentSummary])
async def list_agents(
tags: Optional[List[str]] = Query(None),
status: Optional[AgentStatus] = None,
):
"""List all agents, optionally filtered by tags and status."""
return await self.store.list_agents(tags, status)
@self.app.patch(
"/v1/agent/{agent_id}", response_model=Dict[str, str]
)
async def update_agent(agent_id: UUID, update: AgentUpdate):
"""Update an existing agent's configuration."""
await self.store.update_agent(agent_id, update)
return {"status": "updated"}
@self.app.get(
"/v1/agent/{agent_id}/metrics",
response_model=AgentMetrics,
)
async def get_agent_metrics(agent_id: UUID):
"""Get performance metrics for a specific agent."""
return await self.store.get_agent_metrics(agent_id)
@self.app.post(
"/v1/agent/{agent_id}/clone",
response_model=Dict[str, UUID],
)
async def clone_agent(agent_id: UUID, new_name: str):
"""Clone an existing agent with a new name."""
new_id = await self.store.clone_agent(agent_id, new_name)
return {"agent_id": new_id}
@self.app.delete("/v1/agent/{agent_id}")
async def delete_agent(agent_id: UUID):
"""Delete an agent."""
await self.store.delete_agent(agent_id)
return {"status": "deleted"}
@self.app.post(
"/v1/agent/completions", response_model=CompletionResponse
)
async def create_completion(
request: CompletionRequest,
background_tasks: BackgroundTasks,
):
"""Process a completion request with the specified agent."""
try:
agent = await self.store.get_agent(request.agent_id)
# Process completion
response = await self.store.process_completion(
agent,
request.prompt,
request.agent_id,
request.max_tokens,
request.temperature_override,
)
# Schedule background cleanup
background_tasks.add_task(
self._cleanup_old_metrics, request.agent_id
)
return response
except Exception as e:
logger.error(f"Error processing completion: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error processing completion: {str(e)}",
)
@self.app.get("/v1/agent/{agent_id}/status")
async def get_agent_status(agent_id: UUID):
"""Get the current status of an agent."""
metadata = self.store.agent_metadata.get(agent_id)
if not metadata:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Agent {agent_id} not found",
)
return {
"agent_id": agent_id,
"status": metadata["status"],
"last_used": metadata["last_used"],
"total_completions": metadata["total_completions"],
"error_count": metadata["error_count"],
}
async def _cleanup_old_metrics(self, agent_id: UUID):
"""Clean up old metrics data to prevent memory bloat."""
metadata = self.store.agent_metadata.get(agent_id)
if metadata:
# Keep only last 24 hours of response times
cutoff = datetime.utcnow() - timedelta(days=1)
metadata["response_times"] = [
t
for t in metadata["response_times"]
if isinstance(t, (int, float))
and t > cutoff.timestamp()
]
# Clean up old tokens per minute data
if "tokens_per_minute" in metadata:
metadata["tokens_per_minute"] = {
k: v
for k, v in metadata["tokens_per_minute"].items()
if k > cutoff
}
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
api = SwarmsAPI()
return api.app
if __name__ == "__main__":
# Configure uvicorn logging
logger.info("API Starting")
uvicorn.run(
"main:create_app",
host="0.0.0.0",
port=8000,
reload=True,
workers=4,
)

@ -0,0 +1,99 @@
import requests
from loguru import logger
import time
# Configure loguru
logger.add(
"api_tests_{time}.log",
rotation="100 MB",
level="DEBUG",
format="{time} {level} {message}"
)
BASE_URL = "http://localhost:8000/v1"
def test_create_agent():
"""Test creating a new agent."""
logger.info("Testing agent creation")
payload = {
"agent_name": "Test Agent",
"system_prompt": "You are a helpful assistant",
"model_name": "gpt-4",
"description": "Test agent",
"tags": ["test"]
}
response = requests.post(f"{BASE_URL}/agent", json=payload)
logger.debug(f"Create response: {response.json()}")
if response.status_code == 200:
logger.success("Successfully created agent")
return response.json()["agent_id"]
else:
logger.error(f"Failed to create agent: {response.text}")
return None
def test_list_agents():
"""Test listing all agents."""
logger.info("Testing agent listing")
response = requests.get(f"{BASE_URL}/agents")
logger.debug(f"List response: {response.json()}")
if response.status_code == 200:
logger.success(f"Found {len(response.json())} agents")
else:
logger.error(f"Failed to list agents: {response.text}")
def test_completion(agent_id):
"""Test running a completion."""
logger.info("Testing completion")
payload = {
"prompt": "What is the weather like today?",
"agent_id": agent_id
}
response = requests.post(f"{BASE_URL}/agent/completions", json=payload)
logger.debug(f"Completion response: {response.json()}")
if response.status_code == 200:
logger.success("Successfully got completion")
else:
logger.error(f"Failed to get completion: {response.text}")
def test_delete_agent(agent_id):
"""Test deleting an agent."""
logger.info("Testing agent deletion")
response = requests.delete(f"{BASE_URL}/agent/{agent_id}")
logger.debug(f"Delete response: {response.json()}")
if response.status_code == 200:
logger.success("Successfully deleted agent")
else:
logger.error(f"Failed to delete agent: {response.text}")
def run_tests():
"""Run all tests in sequence."""
logger.info("Starting API tests")
# Create agent and get ID
agent_id = test_create_agent()
if not agent_id:
logger.error("Cannot continue tests without agent ID")
return
# Wait a bit for agent to be ready
time.sleep(1)
# Run other tests
test_list_agents()
test_completion(agent_id)
test_delete_agent(agent_id)
logger.info("Tests completed")
if __name__ == "__main__":
run_tests()

@ -1,105 +1,27 @@
/* Root variables for primary colors */
:root {
--md-primary-bg-color: #0d0d0d; /* Black background */
--md-secondary-bg-color: #1a1a1a; /* Slightly lighter black */
--md-accent-color: #FF073A; /* Neon red */
--md-accent-color--hover: #FF2050; /* Bright neon red for hover */
--md-text-color: #ffffff; /* White text */
--md-code-bg-color: #121212; /* Darker background for code blocks */
--md-code-border-color: #FF073A; /* Neon red border for code blocks */
--md-link-color: var(--md-accent-color);
}
/* Apply background and text colors globally */
body {
background-color: var(--md-primary-bg-color);
color: var(--md-text-color);
}
/* Headings with neon glow */
h1, h2, h3, h4, h5, h6 {
color: var(--md-accent-color);
text-shadow: 0 0 5px var(--md-accent-color), 0 0 10px var(--md-accent-color);
}
/* * Further customization as needed */ */
/* Links with hover effects */
a {
color: var(--md-link-color);
text-decoration: none;
}
a:hover {
color: var(--md-accent-color--hover);
text-shadow: 0 0 5px var(--md-accent-color--hover), 0 0 10px var(--md-accent-color--hover);
}
/* Sidebar styling */
.md-sidebar {
background-color: var(--md-secondary-bg-color);
border-right: 2px solid var(--md-accent-color);
.md-typeset__table {
min-width: 100%;
}
/* Navigation links in sidebar */
.md-sidebar .md-nav__link {
color: var(--md-text-color);
}
.md-sidebar .md-nav__link:hover,
.md-sidebar .md-nav__link--active {
color: var(--md-accent-color);
background-color: var(--md-primary-bg-color);
.md-typeset table:not([class]) {
display: table;
}
/* Code blocks with neon red accents */
.md-typeset code {
background-color: var(--md-code-bg-color);
color: var(--md-text-color);
border: 1px solid var(--md-code-border-color);
border-radius: 4px;
padding: 2px 4px;
font-family: 'Fira Code', monospace;
text-shadow: 0 0 3px var(--md-code-border-color);
/* Dark mode
[data-md-color-scheme="slate"] {
--md-default-bg-color: black;
}
/* Tables */
.md-typeset__table {
min-width: 100%;
border-collapse: collapse;
background-color: var(--md-secondary-bg-color);
color: var(--md-text-color);
}
.md-typeset__table th, .md-typeset__table td {
border: 1px solid var(--md-accent-color);
padding: 8px;
.header__ellipsis {
color: black;
}
/* Buttons */
button {
background-color: var(--md-accent-color);
color: var(--md-text-color);
border: none;
border-radius: 4px;
padding: 10px 15px;
cursor: pointer;
text-shadow: 0 0 5px var(--md-accent-color);
}
button:hover {
background-color: var(--md-accent-color--hover);
.md-copyright__highlight {
color: black;
}
/* Additional styling for search bar */
.md-search__form {
background-color: var(--md-secondary-bg-color);
border: 1px solid var(--md-accent-color);
}
.md-search__input {
background-color: var(--md-primary-bg-color);
color: var(--md-text-color);
}
/* Further customization */
footer {
background-color: var(--md-secondary-bg-color);
color: var(--md-text-color);
text-align: center;
padding: 10px;
border-top: 2px solid var(--md-accent-color);
}
.md-header.md-header--shadow {
color: black;
} */

@ -62,15 +62,14 @@ theme:
logo: assets/img/swarms-logo.png
palette:
- scheme: default
primary: black
accent: red
primary: white # White background
accent: white # Black accents for interactive elements
toggle:
icon: material/brightness-7
name: Switch to dark mode
# Palette toggle for dark mode
- scheme: slate
primary: white
accent: red
- scheme: slate # Optional: lighter shades for accessibility
primary: black
accent: black
toggle:
icon: material/brightness-4
name: Switch to light mode
@ -82,6 +81,10 @@ theme:
- navigation.expand
- navigation.top
- announce.dismiss
font:
text: "Fira Sans" # Clean and readable text
code: "Fira Code" # Modern look for code snippets
# Extensions
markdown_extensions:

@ -0,0 +1,56 @@
import os
from dotenv import load_dotenv
from swarm_models import OpenAIChat
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from new_features_examples.async_executor import HighSpeedExecutor
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
llm=model,
max_loops=1,
# autosave=True,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# saved_state_path="finance_agent.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=True,
# output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and
# auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
# # artifacts_on=True,
# artifacts_output_path="roth_ira_report",
# artifacts_file_extension=".txt",
# max_tokens=8000,
# return_history=True,
)
def execute_agent(
task: str = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question.",
):
return agent.run(task)
executor = HighSpeedExecutor()
results = executor.run(execute_agent, 2)
print(results)

@ -0,0 +1,131 @@
import asyncio
import multiprocessing as mp
import time
from functools import partial
from typing import Any, Dict, Union
class HighSpeedExecutor:
def __init__(self, num_processes: int = None):
"""
Initialize the executor with configurable number of processes.
If num_processes is None, it uses CPU count.
"""
self.num_processes = num_processes or mp.cpu_count()
async def _worker(
self,
queue: asyncio.Queue,
func: Any,
*args: Any,
**kwargs: Any,
):
"""Async worker that processes tasks from the queue"""
while True:
try:
# Non-blocking get from queue
await queue.get()
await asyncio.get_event_loop().run_in_executor(
None, partial(func, *args, **kwargs)
)
queue.task_done()
except asyncio.CancelledError:
break
async def _distribute_tasks(
self, num_tasks: int, queue: asyncio.Queue
):
"""Distribute tasks across the queue"""
for i in range(num_tasks):
await queue.put(i)
async def execute_batch(
self,
func: Any,
num_executions: int,
*args: Any,
**kwargs: Any,
) -> Dict[str, Union[int, float]]:
"""
Execute the given function multiple times concurrently.
Args:
func: The function to execute
num_executions: Number of times to execute the function
*args, **kwargs: Arguments to pass to the function
Returns:
A dictionary containing the number of executions, duration, and executions per second.
"""
queue = asyncio.Queue()
# Create worker tasks
workers = [
asyncio.create_task(
self._worker(queue, func, *args, **kwargs)
)
for _ in range(self.num_processes)
]
# Start timing
start_time = time.perf_counter()
# Distribute tasks
await self._distribute_tasks(num_executions, queue)
# Wait for all tasks to complete
await queue.join()
# Cancel workers
for worker in workers:
worker.cancel()
# Wait for all workers to finish
await asyncio.gather(*workers, return_exceptions=True)
end_time = time.perf_counter()
duration = end_time - start_time
return {
"executions": num_executions,
"duration": duration,
"executions_per_second": num_executions / duration,
}
def run(
self,
func: Any,
num_executions: int,
*args: Any,
**kwargs: Any,
):
return asyncio.run(
self.execute_batch(func, num_executions, *args, **kwargs)
)
# def example_function(x: int = 0) -> int:
# """Example function to execute"""
# return x * x
# async def main():
# # Create executor with number of CPU cores
# executor = HighSpeedExecutor()
# # Execute the function 1000 times
# result = await executor.execute_batch(
# example_function, num_executions=1000, x=42
# )
# print(
# f"Completed {result['executions']} executions in {result['duration']:.2f} seconds"
# )
# print(
# f"Rate: {result['executions_per_second']:.2f} executions/second"
# )
# if __name__ == "__main__":
# # Run the async main function
# asyncio.run(main())

@ -0,0 +1,308 @@
import os
from swarms import Agent
from swarm_models import OpenAIChat
from web3 import Web3
from typing import Dict, Optional, Any
from datetime import datetime
import asyncio
from loguru import logger
from dotenv import load_dotenv
import csv
import requests
import time
BLOCKCHAIN_AGENT_PROMPT = """
You are an expert blockchain and cryptocurrency analyst with deep knowledge of Ethereum markets and DeFi ecosystems.
You have access to real-time ETH price data and transaction information.
For each transaction, analyze:
1. MARKET CONTEXT
- Current ETH price and what this transaction means in USD terms
- How this movement compares to typical market volumes
- Whether this could impact ETH price
2. BEHAVIORAL ANALYSIS
- Whether this appears to be institutional, whale, or protocol movement
- If this fits any known wallet patterns or behaviors
- Signs of smart contract interaction or DeFi activity
3. RISK & IMPLICATIONS
- Potential market impact or price influence
- Signs of potential market manipulation or unusual activity
- Protocol or DeFi risks if applicable
4. STRATEGIC INSIGHTS
- What traders should know about this movement
- Potential chain reactions or follow-up effects
- Market opportunities or risks created
Write naturally but precisely. Focus on actionable insights and important patterns.
Your analysis helps traders and researchers understand significant market movements in real-time."""
class EthereumAnalyzer:
def __init__(self, min_value_eth: float = 100.0):
load_dotenv()
logger.add(
"eth_analysis.log",
rotation="500 MB",
retention="10 days",
level="INFO",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
)
self.w3 = Web3(
Web3.HTTPProvider(
"https://mainnet.infura.io/v3/9aa3d95b3bc440fa88ea12eaa4456161"
)
)
if not self.w3.is_connected():
raise ConnectionError(
"Failed to connect to Ethereum network"
)
self.min_value_eth = min_value_eth
self.last_processed_block = self.w3.eth.block_number
self.eth_price = self.get_eth_price()
self.last_price_update = time.time()
# Initialize AI agent
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError(
"OpenAI API key not found in environment variables"
)
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4",
temperature=0.1,
)
self.agent = Agent(
agent_name="Ethereum-Analysis-Agent",
system_prompt=BLOCKCHAIN_AGENT_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="eth_agent.json",
user_name="eth_analyzer",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
self.csv_filename = "ethereum_analysis.csv"
self.initialize_csv()
def get_eth_price(self) -> float:
"""Get current ETH price from CoinGecko API."""
try:
response = requests.get(
"https://api.coingecko.com/api/v3/simple/price",
params={"ids": "ethereum", "vs_currencies": "usd"},
)
return float(response.json()["ethereum"]["usd"])
except Exception as e:
logger.error(f"Error fetching ETH price: {str(e)}")
return 0.0
def update_eth_price(self):
"""Update ETH price if more than 5 minutes have passed."""
if time.time() - self.last_price_update > 300: # 5 minutes
self.eth_price = self.get_eth_price()
self.last_price_update = time.time()
logger.info(f"Updated ETH price: ${self.eth_price:,.2f}")
def initialize_csv(self):
"""Initialize CSV file with headers."""
headers = [
"timestamp",
"transaction_hash",
"from_address",
"to_address",
"value_eth",
"value_usd",
"eth_price",
"gas_used",
"gas_price_gwei",
"block_number",
"analysis",
]
if not os.path.exists(self.csv_filename):
with open(self.csv_filename, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(headers)
async def analyze_transaction(
self, tx_hash: str
) -> Optional[Dict[str, Any]]:
"""Analyze a single transaction."""
try:
tx = self.w3.eth.get_transaction(tx_hash)
receipt = self.w3.eth.get_transaction_receipt(tx_hash)
value_eth = float(self.w3.from_wei(tx.value, "ether"))
if value_eth < self.min_value_eth:
return None
block = self.w3.eth.get_block(tx.blockNumber)
# Update ETH price if needed
self.update_eth_price()
value_usd = value_eth * self.eth_price
analysis = {
"timestamp": datetime.fromtimestamp(
block.timestamp
).isoformat(),
"transaction_hash": tx_hash.hex(),
"from_address": tx["from"],
"to_address": tx.to if tx.to else "Contract Creation",
"value_eth": value_eth,
"value_usd": value_usd,
"eth_price": self.eth_price,
"gas_used": receipt.gasUsed,
"gas_price_gwei": float(
self.w3.from_wei(tx.gasPrice, "gwei")
),
"block_number": tx.blockNumber,
}
# Check if it's a contract
if tx.to:
code = self.w3.eth.get_code(tx.to)
analysis["is_contract"] = len(code) > 0
# Get contract events
if analysis["is_contract"]:
analysis["events"] = receipt.logs
return analysis
except Exception as e:
logger.error(
f"Error analyzing transaction {tx_hash}: {str(e)}"
)
return None
def prepare_analysis_prompt(self, tx_data: Dict[str, Any]) -> str:
"""Prepare detailed analysis prompt including price context."""
value_usd = tx_data["value_usd"]
eth_price = tx_data["eth_price"]
prompt = f"""Analyze this Ethereum transaction in current market context:
Transaction Details:
- Value: {tx_data['value_eth']:.2f} ETH (${value_usd:,.2f} at current price)
- Current ETH Price: ${eth_price:,.2f}
- From: {tx_data['from_address']}
- To: {tx_data['to_address']}
- Contract Interaction: {tx_data.get('is_contract', False)}
- Gas Used: {tx_data['gas_used']:,} units
- Gas Price: {tx_data['gas_price_gwei']:.2f} Gwei
- Block: {tx_data['block_number']}
- Timestamp: {tx_data['timestamp']}
{f"Event Count: {len(tx_data['events'])} events" if tx_data.get('events') else "No contract events"}
Consider the transaction's significance given the current ETH price of ${eth_price:,.2f} and total USD value of ${value_usd:,.2f}.
Analyze market impact, patterns, risks, and strategic implications."""
return prompt
def save_to_csv(self, tx_data: Dict[str, Any], ai_analysis: str):
"""Save transaction data and analysis to CSV."""
row = [
tx_data["timestamp"],
tx_data["transaction_hash"],
tx_data["from_address"],
tx_data["to_address"],
tx_data["value_eth"],
tx_data["value_usd"],
tx_data["eth_price"],
tx_data["gas_used"],
tx_data["gas_price_gwei"],
tx_data["block_number"],
ai_analysis.replace("\n", " "),
]
with open(self.csv_filename, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(row)
async def monitor_transactions(self):
"""Monitor and analyze transactions one at a time."""
logger.info(
f"Starting transaction monitor (minimum value: {self.min_value_eth} ETH)"
)
while True:
try:
current_block = self.w3.eth.block_number
block = self.w3.eth.get_block(
current_block, full_transactions=True
)
for tx in block.transactions:
tx_analysis = await self.analyze_transaction(
tx.hash
)
if tx_analysis:
# Get AI analysis
analysis_prompt = (
self.prepare_analysis_prompt(tx_analysis)
)
ai_analysis = self.agent.run(analysis_prompt)
print(ai_analysis)
# Save to CSV
self.save_to_csv(tx_analysis, ai_analysis)
# Print analysis
print("\n" + "=" * 50)
print("New Transaction Analysis")
print(
f"Hash: {tx_analysis['transaction_hash']}"
)
print(
f"Value: {tx_analysis['value_eth']:.2f} ETH (${tx_analysis['value_usd']:,.2f})"
)
print(
f"Current ETH Price: ${self.eth_price:,.2f}"
)
print("=" * 50)
print(ai_analysis)
print("=" * 50 + "\n")
await asyncio.sleep(1) # Wait for next block
except Exception as e:
logger.error(f"Error in monitoring loop: {str(e)}")
await asyncio.sleep(1)
async def main():
"""Entry point for the analysis system."""
analyzer = EthereumAnalyzer(min_value_eth=100.0)
await analyzer.monitor_transactions()
if __name__ == "__main__":
print("Starting Ethereum Transaction Analyzer...")
print("Saving results to ethereum_analysis.csv")
print("Press Ctrl+C to stop")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopping analyzer...")

@ -1,6 +1,5 @@
import os
import asyncio
import threading
from swarms import Agent
from swarm_models import OpenAIChat
import time
@ -40,18 +39,21 @@ agent = Agent(
streaming_on=False,
)
# Function to measure time and memory usage
def measure_time_and_memory(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
memory_usage = psutil.Process().memory_info().rss / 1024 ** 2
memory_usage = psutil.Process().memory_info().rss / 1024**2
print(f"Time taken: {end_time - start_time} seconds")
print(f"Memory used: {memory_usage} MB")
return result
return wrapper
# Function to run the agent asynchronously
@measure_time_and_memory
async def run_agent_async():
@ -61,11 +63,13 @@ async def run_agent_async():
)
)
# Function to run the agent on another thread
@measure_time_and_memory
def run_agent_thread():
asyncio.run(run_agent_async())
# Run the agent asynchronously and on another thread to test the speed
asyncio.run(run_agent_async())
run_agent_thread()

File diff suppressed because it is too large Load Diff

@ -0,0 +1,417 @@
import os
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
import json
from datetime import datetime
import inspect
import typing
from typing import Union
from swarms import Agent
from swarm_models import OpenAIChat
@dataclass
class ToolDefinition:
name: str
description: str
parameters: Dict[str, Any]
required_params: List[str]
callable: Optional[Callable] = None
def extract_type_hints(func: Callable) -> Dict[str, Any]:
"""Extract parameter types from function type hints."""
return typing.get_type_hints(func)
def extract_tool_info(func: Callable) -> ToolDefinition:
"""Extract tool information from a callable function."""
# Get function name
name = func.__name__
# Get docstring
description = inspect.getdoc(func) or "No description available"
# Get parameters and their types
signature = inspect.signature(func)
type_hints = extract_type_hints(func)
parameters = {}
required_params = []
for param_name, param in signature.parameters.items():
# Skip self parameter for methods
if param_name == "self":
continue
param_type = type_hints.get(param_name, Any)
# Handle optional parameters
is_optional = (
param.default != inspect.Parameter.empty
or getattr(param_type, "__origin__", None) is Union
and type(None) in param_type.__args__
)
if not is_optional:
required_params.append(param_name)
parameters[param_name] = {
"type": str(param_type),
"default": (
None
if param.default is inspect.Parameter.empty
else param.default
),
"required": not is_optional,
}
return ToolDefinition(
name=name,
description=description,
parameters=parameters,
required_params=required_params,
callable=func,
)
@dataclass
class FunctionSpec:
"""Specification for a callable tool function."""
name: str
description: str
parameters: Dict[
str, dict
] # Contains type and description for each parameter
return_type: str
return_description: str
@dataclass
class ExecutionStep:
"""Represents a single step in the execution plan."""
step_id: int
function_name: str
parameters: Dict[str, Any]
expected_output: str
completed: bool = False
result: Any = None
@dataclass
class ExecutionContext:
"""Maintains state during execution."""
task: str
steps: List[ExecutionStep] = field(default_factory=list)
results: Dict[int, Any] = field(default_factory=dict)
current_step: int = 0
history: List[Dict[str, Any]] = field(default_factory=list)
class ToolAgent:
def __init__(
self,
functions: List[Callable],
openai_api_key: str,
model_name: str = "gpt-4",
temperature: float = 0.1,
):
self.functions = {func.__name__: func for func in functions}
self.function_specs = self._analyze_functions(functions)
self.model = OpenAIChat(
openai_api_key=openai_api_key,
model_name=model_name,
temperature=temperature,
)
self.system_prompt = self._create_system_prompt()
self.agent = Agent(
agent_name="Tool-Agent",
system_prompt=self.system_prompt,
llm=self.model,
max_loops=1,
verbose=True,
)
def _analyze_functions(
self, functions: List[Callable]
) -> Dict[str, FunctionSpec]:
"""Analyze functions to create detailed specifications."""
specs = {}
for func in functions:
hints = get_type_hints(func)
sig = inspect.signature(func)
doc = inspect.getdoc(func) or ""
# Parse docstring for parameter descriptions
param_descriptions = {}
current_param = None
for line in doc.split("\n"):
if ":param" in line:
param_name = (
line.split(":param")[1].split(":")[0].strip()
)
desc = line.split(":", 2)[-1].strip()
param_descriptions[param_name] = desc
elif ":return:" in line:
return_desc = line.split(":return:")[1].strip()
# Build parameter specifications
parameters = {}
for name, param in sig.parameters.items():
param_type = hints.get(name, Any)
parameters[name] = {
"type": str(param_type),
"type_class": param_type,
"description": param_descriptions.get(name, ""),
"required": param.default == param.empty,
}
specs[func.__name__] = FunctionSpec(
name=func.__name__,
description=doc.split("\n")[0],
parameters=parameters,
return_type=str(hints.get("return", Any)),
return_description=(
return_desc if "return_desc" in locals() else ""
),
)
return specs
def _create_system_prompt(self) -> str:
"""Create system prompt with detailed function specifications."""
functions_desc = []
for spec in self.function_specs.values():
params_desc = []
for name, details in spec.parameters.items():
params_desc.append(
f" - {name}: {details['type']} - {details['description']}"
)
functions_desc.append(
f"""
Function: {spec.name}
Description: {spec.description}
Parameters:
{chr(10).join(params_desc)}
Returns: {spec.return_type} - {spec.return_description}
"""
)
return f"""You are an AI agent that creates and executes plans using available functions.
Available Functions:
{chr(10).join(functions_desc)}
You must respond in two formats depending on the phase:
1. Planning Phase:
{{
"phase": "planning",
"plan": {{
"description": "Overall plan description",
"steps": [
{{
"step_id": 1,
"function": "function_name",
"parameters": {{
"param1": "value1",
"param2": "value2"
}},
"purpose": "Why this step is needed"
}}
]
}}
}}
2. Execution Phase:
{{
"phase": "execution",
"analysis": "Analysis of current result",
"next_action": {{
"type": "continue|request_input|complete",
"reason": "Why this action was chosen",
"needed_input": {{}} # If requesting input
}}
}}
Always:
- Use exact function names
- Ensure parameter types match specifications
- Provide clear reasoning for each decision
"""
def _execute_function(
self, spec: FunctionSpec, parameters: Dict[str, Any]
) -> Any:
"""Execute a function with type checking."""
converted_params = {}
for name, value in parameters.items():
param_spec = spec.parameters[name]
try:
# Convert value to required type
param_type = param_spec["type_class"]
if param_type in (int, float, str, bool):
converted_params[name] = param_type(value)
else:
converted_params[name] = value
except (ValueError, TypeError) as e:
raise ValueError(
f"Parameter '{name}' conversion failed: {str(e)}"
)
return self.functions[spec.name](**converted_params)
def run(self, task: str) -> Dict[str, Any]:
"""Execute task with planning and step-by-step execution."""
context = ExecutionContext(task=task)
execution_log = {
"task": task,
"start_time": datetime.utcnow().isoformat(),
"steps": [],
"final_result": None,
}
try:
# Planning phase
plan_prompt = f"Create a plan to: {task}"
plan_response = self.agent.run(plan_prompt)
plan_data = json.loads(
plan_response.replace("System:", "").strip()
)
# Convert plan to execution steps
for step in plan_data["plan"]["steps"]:
context.steps.append(
ExecutionStep(
step_id=step["step_id"],
function_name=step["function"],
parameters=step["parameters"],
expected_output=step["purpose"],
)
)
# Execution phase
while context.current_step < len(context.steps):
step = context.steps[context.current_step]
print(
f"\nExecuting step {step.step_id}: {step.function_name}"
)
try:
# Execute function
spec = self.function_specs[step.function_name]
result = self._execute_function(
spec, step.parameters
)
context.results[step.step_id] = result
step.completed = True
step.result = result
# Get agent's analysis
analysis_prompt = f"""
Step {step.step_id} completed:
Function: {step.function_name}
Result: {json.dumps(result)}
Remaining steps: {len(context.steps) - context.current_step - 1}
Analyze the result and decide next action.
"""
analysis_response = self.agent.run(
analysis_prompt
)
analysis_data = json.loads(
analysis_response.replace(
"System:", ""
).strip()
)
execution_log["steps"].append(
{
"step_id": step.step_id,
"function": step.function_name,
"parameters": step.parameters,
"result": result,
"analysis": analysis_data,
}
)
if (
analysis_data["next_action"]["type"]
== "complete"
):
if (
context.current_step
< len(context.steps) - 1
):
continue
break
context.current_step += 1
except Exception as e:
print(f"Error in step {step.step_id}: {str(e)}")
execution_log["steps"].append(
{
"step_id": step.step_id,
"function": step.function_name,
"parameters": step.parameters,
"error": str(e),
}
)
raise
# Final analysis
final_prompt = f"""
Task completed. Results:
{json.dumps(context.results, indent=2)}
Provide final analysis and recommendations.
"""
final_analysis = self.agent.run(final_prompt)
execution_log["final_result"] = {
"success": True,
"results": context.results,
"analysis": json.loads(
final_analysis.replace("System:", "").strip()
),
}
except Exception as e:
execution_log["final_result"] = {
"success": False,
"error": str(e),
}
execution_log["end_time"] = datetime.utcnow().isoformat()
return execution_log
def calculate_investment_return(
principal: float, rate: float, years: int
) -> float:
"""Calculate investment return with compound interest.
:param principal: Initial investment amount in dollars
:param rate: Annual interest rate as decimal (e.g., 0.07 for 7%)
:param years: Number of years to invest
:return: Final investment value
"""
return principal * (1 + rate) ** years
agent = ToolAgent(
functions=[calculate_investment_return],
openai_api_key=os.getenv("OPENAI_API_KEY"),
)
result = agent.run(
"Calculate returns for $10000 invested at 7% for 10 years"
)

@ -95,12 +95,14 @@ flow = "BossAgent -> ExpenseAnalyzer -> SummaryGenerator"
# Using AgentRearrange class to manage the swarm
agent_system = AgentRearrange(
name="pe-swarm",
description="ss",
agents=agents,
flow=flow,
return_json=False,
output_type="final",
max_loops=1,
docs=["SECURITY.md"],
# docs=["SECURITY.md"],
)
# Input task for the swarm

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "6.2.9"
version = "6.4.7"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -37,6 +37,14 @@ keywords = [
"Generative AI",
"Agent Marketplace",
"Agent Store",
"quant",
"finance",
"algorithmic trading",
"portfolio optimization",
"risk management",
"financial modeling",
"machine learning for finance",
"natural language processing for finance",
]
classifiers = [
"Development Status :: 4 - Beta",
@ -52,27 +60,18 @@ python = ">=3.10,<4.0"
torch = ">=2.1.1,<3.0"
transformers = ">= 4.39.0, <5.0.0"
asyncio = ">=3.4.3,<4.0"
langchain-community = "0.0.29"
langchain-experimental = "0.0.55"
backoff = "2.2.1"
toml = "*"
pypdf = "4.3.1"
loguru = "0.7.2"
loguru = "*"
pydantic = "2.8.2"
tenacity = "8.5.0"
Pillow = "10.4.0"
tenacity = "*"
psutil = "*"
sentry-sdk = {version = "*", extras = ["http"]} # Updated here
python-dotenv = "*"
PyYAML = "*"
docstring_parser = "0.16"
fastapi = "*"
openai = ">=1.30.1,<2.0"
termcolor = "*"
tiktoken = "*"
networkx = "*"
swarms-memory = "*"
black = "*"
aiofiles = "*"
swarm-models = "*"
clusterops = "*"
@ -96,9 +95,7 @@ mypy-protobuf = "^3.0.0"
[tool.poetry.group.test.dependencies]
pytest = "^8.1.1"
termcolor = "^2.4.0"
pandas = "^2.2.2"
fastapi = ">=0.110.1,<0.116.0"
[tool.ruff]
line-length = 70

@ -2,21 +2,16 @@
torch>=2.1.1,<3.0
transformers>=4.39.0,<5.0.0
asyncio>=3.4.3,<4.0
langchain-community==0.0.28
langchain-experimental==0.0.55
backoff==2.2.1
toml
pypdf==4.3.1
ratelimit==2.2.1
loguru==0.7.2
loguru
pydantic==2.8.2
tenacity==8.5.0
Pillow==10.4.0
tenacity
rich
psutil
sentry-sdk
python-dotenv
opencv-python-headless
PyYAML
docstring_parser==0.16
black>=23.1,<25.0
@ -26,12 +21,8 @@ types-pytz>=2023.3,<2025.0
types-chardet>=5.0.4.6
mypy-protobuf>=3.0.0
pytest>=8.1.1
termcolor>=2.4.0
pandas>=2.2.2
fastapi>=0.110.1
networkx
swarms-memory
pre-commit
aiofiles
swarm-models
clusterops

@ -0,0 +1,9 @@
from swarms import Agent
Agent(
agent_name="Stock-Analysis-Agent",
model_name="gpt-4o-mini",
max_loops="auto",
streaming_on=True,
interactive=True,
).run("What are 5 hft algorithms")

@ -1,22 +1,38 @@
import os
import concurrent.futures
from dotenv import load_dotenv
# from swarms.structs.workspace_manager import WorkspaceManager
# workspace_manager = WorkspaceManager()
# workspace_manager.run()
from loguru import logger
load_dotenv()
# Disable logging by default
if os.getenv("SWARMS_VERBOSE_GLOBAL", "False").lower() == "false":
logger.disable("")
# Import telemetry functions with error handling
from swarms.telemetry.bootup import bootup # noqa: E402, F403
from swarms.telemetry.sentry_active import (
from swarms.telemetry.sentry_active import ( # noqa: E402
activate_sentry,
) # noqa: E402
# Use ThreadPoolExecutor to run bootup and activate_sentry concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(bootup)
executor.submit(activate_sentry)
# Run telemetry functions concurrently with error handling
def run_telemetry():
try:
with concurrent.futures.ThreadPoolExecutor(
max_workers=2
) as executor:
future_bootup = executor.submit(bootup)
future_sentry = executor.submit(activate_sentry)
# Wait for completion and check for exceptions
future_bootup.result()
future_sentry.result()
except Exception as e:
logger.error(f"Error running telemetry functions: {e}")
run_telemetry()
from swarms.agents import * # noqa: E402, F403
from swarms.artifacts import * # noqa: E402, F403

@ -0,0 +1,253 @@
import re
from dotenv import load_dotenv
from tenacity import retry, stop_after_attempt, wait_exponential
from swarms import Agent
from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
)
from swarms.utils.formatter import formatter
from swarms.utils.litellm import LiteLLM
load_dotenv()
def prepare_yaml_for_parsing(raw_yaml: str) -> str:
"""
Prepares raw YAML content by fixing spacing and formatting issues.
Args:
raw_yaml (str): The raw YAML content extracted from Markdown.
Returns:
str: The cleaned YAML content ready for parsing.
"""
# Fix sequence items that are improperly placed on the same line as their key
fixed_yaml = re.sub(
r"(\b\w+\b):\s*-\s*", r"\1:\n - ", raw_yaml
) # Fix "key: - value" to "key:\n - value"
# Ensure proper spacing after colons
fixed_yaml = re.sub(
r"(\S):(\S)", r"\1: \2", fixed_yaml
) # Ensure space after colons
# Remove trailing spaces before newlines
fixed_yaml = re.sub(r"\s+\n", "\n", fixed_yaml)
# Replace non-breaking spaces (if any) with regular spaces
fixed_yaml = fixed_yaml.replace("\xa0", " ")
return fixed_yaml.strip()
def parse_yaml_from_swarm_markdown(markdown_text: str) -> dict:
"""
Extracts and prepares YAML content from a Markdown-style 'Auto-Swarm-Builder' block and parses it.
Args:
markdown_text (str): The Markdown text containing the YAML inside 'Auto-Swarm-Builder' block.
Returns:
dict: A parsed Python dictionary of the YAML content.
"""
# Match the 'Auto-Swarm-Builder' block with YAML inside triple backticks
pattern = r"```yaml\s*\n(.*?)```"
match = re.search(pattern, markdown_text, re.DOTALL)
if not match:
raise ValueError(
"No YAML content found in the 'Auto-Swarm-Builder' block."
)
raw_yaml = match.group(1).strip()
# Preprocess and normalize the YAML content
normalized_yaml = prepare_yaml_for_parsing(raw_yaml)
return normalized_yaml
AUTO_GEN_PROMPT = """
You are a specialized agent responsible for creating YAML configuration files for multi-agent swarms. Your role is to generate well-structured YAML that defines both individual agents and swarm architectures based on user requirements.
Output only the yaml nothing else. You will be penalized for making mistakes
GUIDELINES:
1. Each YAML file must contain an `agents` section with at least one agent configuration
2. Each agent configuration requires the following mandatory fields:
- agent_name (string)
- system_prompt (string)
3. Optional agent fields include:
- max_loops (integer)
- autosave (boolean)
- dashboard (boolean)
- verbose (boolean)
- dynamic_temperature_enabled (boolean)
- saved_state_path (string)
- user_name (string)
- retry_attempts (integer)
- context_length (integer)
- return_step_meta (boolean)
- output_type (string)
- task (string)
4. When a swarm is needed, include a `swarm_architecture` section with:
Mandatory fields:
- name (string)
- swarm_type (string: "ConcurrentWorkflow" or "SequentialWorkflow") [AgentRearrange, MixtureOfAgents, SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow]
Optional fields:
- description (string)
- max_loops (integer)
- task (string)
TEMPLATE STRUCTURE:
```yaml
agents:
- agent_name: "Agent-1-Name"
system_prompt: "Detailed system prompt here"
max_loops: 1
# [additional optional fields]
- agent_name: "Agent-2-Name"
system_prompt: "Detailed system prompt here"
# [additional optional fields]
swarm_architecture:
name: "Swarm-Name"
description: "Swarm purpose and goals"
swarm_type: "ConcurrentWorkflow"
max_loops: 5
task: "Main swarm task description"
```
VALIDATION RULES:
1. All agent names must be unique
2. System prompts must be clear and specific to the agent's role
3. Integer values must be positive
4. Boolean values must be true or false (lowercase)
5. File paths should use forward slashes
6. Tasks should be specific and aligned with the agent/swarm purpose
When generating a YAML configuration:
1. Ask for specific requirements about the agents and swarm needed
2. Determine if a swarm architecture is necessary based on the task complexity
3. Generate appropriate system prompts for each agent based on their roles
4. Include relevant optional fields based on the use case
5. Validate the configuration against all rules before returning
Example valid YAML configurations are provided below. Use these as references for structure and formatting:
```yaml
agents:
- agent_name: "Data-Analysis-Agent"
system_prompt: "You are a specialized data analysis agent focused on processing and interpreting financial data. Provide clear, actionable insights based on the data provided."
max_loops: 3
autosave: true
verbose: true
context_length: 100000
output_type: "json"
task: "Analyze quarterly financial reports and identify trends"
# Multi-Agent Swarm Example
agents:
- agent_name: "Research-Agent"
system_prompt: "You are a research agent specialized in gathering and summarizing scientific publications. Focus on peer-reviewed sources and provide comprehensive summaries."
max_loops: 2
context_length: 150000
output_type: "str"
- agent_name: "Analysis-Agent"
system_prompt: "You are an analysis agent that processes research summaries and identifies key patterns and insights. Provide detailed analytical reports."
max_loops: 3
context_length: 200000
output_type: "json"
swarm_architecture:
name: "Research-Analysis-Swarm"
description: "A swarm for comprehensive research analysis and insight generation"
swarm_type: "SequentialWorkflow"
max_loops: 5
task: "Research and analyze recent developments in quantum computing"
```
"""
def generate_swarm_config(
task: str,
file_name: str = "swarm_config_output.yaml",
model_name: str = "gpt-4o",
*args,
**kwargs,
):
"""
Generates a swarm configuration based on the provided task and model name.
This function attempts to generate a swarm configuration by running an agent with the specified task and model name.
It then parses the output into YAML format and creates agents based on the parsed YAML content.
Args:
task (str): The task to be performed by the swarm.
file_name (str, optional): The file name for the output YAML configuration. Defaults to "swarm_config_output.yaml".
model_name (str, optional): The name of the model to use for the agent. Defaults to "gpt-4o".
*args: Additional positional arguments to be passed to the agent's run method.
**kwargs: Additional keyword arguments to be passed to the agent's run method.
Returns:
Any: The output of the swarm configuration generation process. This can be a SwarmRouter instance or an error message.
"""
formatter.print_panel(
"Auto Generating Swarm...", "Auto Swarm Builder"
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(min=4, max=10),
)
def attempt_generate_swarm_config():
try:
model = LiteLLM(model_name=model_name)
# Initialize the agent
agent = Agent(
agent_name="Auto-Swarm-Builder",
system_prompt=AUTO_GEN_PROMPT,
llm=model,
max_loops=1,
dynamic_temperature_enabled=True,
saved_state_path="swarm_builder.json",
user_name="swarms_corp",
output_type="str",
)
# Generate output from the agent
raw_output = agent.run(task, *args, **kwargs)
yaml_content = parse_yaml_from_swarm_markdown(raw_output)
print(yaml_content)
# Create agents from the YAML file
output = create_agents_from_yaml(
yaml_string=yaml_content,
return_type="run_swarm",
)
formatter.print_panel(
"Swarm configuration generated successfully.",
"Success",
)
return output
except Exception as e:
formatter.print_panel(
f"Error generating swarm configuration: {str(e)}",
"Error",
)
raise
return attempt_generate_swarm_config()

@ -1,22 +1,168 @@
import os
from typing import Any, Callable, Dict, List, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import yaml
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from pydantic import (
BaseModel,
Field,
field_validator,
)
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
from swarms.utils.litellm import LiteLLM
logger = initialize_logger(log_folder="create_agents_from_yaml")
class AgentConfig(BaseModel):
agent_name: str
system_prompt: str
model_name: Optional[str] = None
max_loops: int = Field(default=1, ge=1)
autosave: bool = True
dashboard: bool = False
verbose: bool = False
dynamic_temperature_enabled: bool = False
saved_state_path: Optional[str] = None
user_name: str = "default_user"
retry_attempts: int = Field(default=3, ge=1)
context_length: int = Field(default=100000, ge=1000)
return_step_meta: bool = False
output_type: str = "str"
auto_generate_prompt: bool = False
artifacts_on: bool = False
artifacts_file_extension: str = ".md"
artifacts_output_path: str = ""
@field_validator("system_prompt")
@classmethod
def validate_system_prompt(cls, v):
if not v or not isinstance(v, str) or len(v.strip()) == 0:
raise ValueError(
"System prompt must be a non-empty string"
)
return v
class SwarmConfig(BaseModel):
name: str
description: str
max_loops: int = Field(default=1, ge=1)
swarm_type: str
task: Optional[str] = None
flow: Optional[Dict] = None
autosave: bool = True
return_json: bool = False
rules: str = ""
@field_validator("swarm_type")
@classmethod
def validate_swarm_type(cls, v):
valid_types = {
"SequentialWorkflow",
"ConcurrentWorkflow",
"AgentRearrange",
"MixtureOfAgents",
"auto",
}
if v not in valid_types:
raise ValueError(
f"Swarm type must be one of: {valid_types}"
)
return v
class YAMLConfig(BaseModel):
agents: List[AgentConfig] = Field(..., min_length=1)
swarm_architecture: Optional[SwarmConfig] = None
model_config = {
"extra": "forbid" # Prevent additional fields not in the model
}
def load_yaml_safely(
yaml_file: str = None, yaml_string: str = None
) -> Dict:
"""Safely load and validate YAML configuration using Pydantic."""
try:
if yaml_string:
config_dict = yaml.safe_load(yaml_string)
elif yaml_file:
if not os.path.exists(yaml_file):
raise FileNotFoundError(
f"YAML file {yaml_file} not found."
)
with open(yaml_file, "r") as file:
config_dict = yaml.safe_load(file)
else:
raise ValueError(
"Either yaml_file or yaml_string must be provided"
)
# Validate using Pydantic
YAMLConfig(**config_dict)
return config_dict
except yaml.YAMLError as e:
raise ValueError(f"Error parsing YAML: {str(e)}")
except Exception as e:
raise ValueError(f"Error validating configuration: {str(e)}")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((ConnectionError, TimeoutError)),
before_sleep=lambda retry_state: logger.info(
f"Retrying after error: {retry_state.outcome.exception()}"
),
)
def create_agent_with_retry(
agent_config: Dict, model: LiteLLM
) -> Agent:
"""Create an agent with retry logic for handling transient failures."""
try:
validated_config = AgentConfig(**agent_config)
agent = Agent(
agent_name=validated_config.agent_name,
system_prompt=validated_config.system_prompt,
llm=model,
max_loops=validated_config.max_loops,
autosave=validated_config.autosave,
dashboard=validated_config.dashboard,
verbose=validated_config.verbose,
dynamic_temperature_enabled=validated_config.dynamic_temperature_enabled,
saved_state_path=validated_config.saved_state_path,
user_name=validated_config.user_name,
retry_attempts=validated_config.retry_attempts,
context_length=validated_config.context_length,
return_step_meta=validated_config.return_step_meta,
output_type=validated_config.output_type,
auto_generate_prompt=validated_config.auto_generate_prompt,
artifacts_on=validated_config.artifacts_on,
artifacts_file_extension=validated_config.artifacts_file_extension,
artifacts_output_path=validated_config.artifacts_output_path,
)
return agent
except Exception as e:
logger.error(
f"Error creating agent {agent_config.get('agent_name', 'unknown')}: {str(e)}"
)
raise
def create_agents_from_yaml(
model: Callable = None,
yaml_file: str = "agents.yaml",
yaml_string: str = None,
return_type: str = "auto",
*args,
**kwargs,
) -> Union[
SwarmRouter,
Agent,
@ -25,171 +171,99 @@ def create_agents_from_yaml(
List[Dict[str, Any]],
]:
"""
Create agents and/or SwarmRouter based on configurations defined in a YAML file.
This function dynamically creates agents and a SwarmRouter (if specified) based on the
configuration in the YAML file. It adapts its behavior based on the presence of a
swarm architecture and the number of agents defined.
Args:
model (Callable): The language model to be used by the agents.
yaml_file (str): Path to the YAML file containing agent and swarm configurations.
return_type (str): Determines the return value. Options are:
"auto" (default): Automatically determine the most appropriate return type.
"swarm": Return SwarmRouter if present, otherwise a single agent or list of agents.
"agents": Return a list of agents (or a single agent if only one is defined).
"both": Return both SwarmRouter (or single agent) and list of agents.
"tasks": Return task results if any tasks were executed.
"run_swarm": Run the swarm and return its output.
*args: Additional positional arguments for agent or SwarmRouter customization.
**kwargs: Additional keyword arguments for agent or SwarmRouter customization.
Returns:
Union[SwarmRouter, Agent, List[Agent], Tuple[Union[SwarmRouter, Agent], List[Agent]], List[Dict[str, Any]]]:
The return type depends on the 'return_type' argument and the configuration in the YAML file.
Raises:
FileNotFoundError: If the specified YAML file is not found.
ValueError: If the YAML configuration is invalid or if an invalid return_type is specified.
Create agents and/or SwarmRouter based on configurations defined in a YAML file or string.
"""
try:
logger.info(
f"Checking if the YAML file {yaml_file} exists..."
)
if not os.path.exists(yaml_file):
logger.error(f"YAML file {yaml_file} not found.")
raise FileNotFoundError(
f"YAML file {yaml_file} not found."
)
logger.info(f"Loading YAML file {yaml_file}")
with open(yaml_file, "r") as file:
config = yaml.safe_load(file)
if "agents" not in config:
logger.error(
"The YAML configuration does not contain 'agents'."
)
raise ValueError(
"The YAML configuration does not contain 'agents'."
)
agents = []
task_results = []
swarm_router = None
try:
# Load and validate configuration
config = load_yaml_safely(yaml_file, yaml_string)
# Create agents
# Create agents with retry logic
for agent_config in config["agents"]:
logger.info(
f"Creating agent: {agent_config['agent_name']}"
)
if "system_prompt" not in agent_config:
logger.error(
f"System prompt is missing for agent: {agent_config['agent_name']}"
)
raise ValueError(
f"System prompt is missing for agent: {agent_config['agent_name']}"
if "model_name" in agent_config:
model_instance = LiteLLM(
model_name=agent_config["model_name"]
)
else:
model_name = "gpt-4o"
model_instance = LiteLLM(model_name=model_name)
agent = Agent(
agent_name=agent_config["agent_name"],
system_prompt=agent_config["system_prompt"],
llm=model,
max_loops=agent_config.get("max_loops", 1),
autosave=agent_config.get("autosave", True),
dashboard=agent_config.get("dashboard", False),
verbose=agent_config.get("verbose", False),
dynamic_temperature_enabled=agent_config.get(
"dynamic_temperature_enabled", False
),
saved_state_path=agent_config.get("saved_state_path"),
user_name=agent_config.get(
"user_name", "default_user"
),
retry_attempts=agent_config.get("retry_attempts", 1),
context_length=agent_config.get(
"context_length", 100000
),
return_step_meta=agent_config.get(
"return_step_meta", False
),
output_type=agent_config.get("output_type", "str"),
auto_generate_prompt=agent_config.get(
"auto_generate_prompt", "False"
),
artifacts_on=agent_config.get(
"artifacts_on", "False"
),
artifacts_file_extension=agent_config.get(
"artifacts_file_extension", ".md"
),
artifacts_output_path=agent_config.get(
"artifacts_output_path", ""
),
*args,
**kwargs,
agent = create_agent_with_retry(
agent_config, model_instance
)
logger.info(
f"Agent {agent_config['agent_name']} created successfully."
)
agents.append(agent)
# Create SwarmRouter if swarm_architecture is present
swarm_router = None
# Create SwarmRouter if specified
if "swarm_architecture" in config:
swarm_config = config["swarm_architecture"]
try:
swarm_config = SwarmConfig(
**config["swarm_architecture"]
)
swarm_router = SwarmRouter(
name=swarm_config["name"],
description=swarm_config["description"],
max_loops=swarm_config["max_loops"],
name=swarm_config.name,
description=swarm_config.description,
max_loops=swarm_config.max_loops,
agents=agents,
swarm_type=swarm_config["swarm_type"],
task=swarm_config.get("task"),
flow=swarm_config.get("flow"),
autosave=swarm_config.get("autosave"),
return_json=swarm_config.get("return_json"),
rules=swarm_config.get("rules", "") * args,
**kwargs,
swarm_type=swarm_config.swarm_type,
task=swarm_config.task,
flow=swarm_config.flow,
autosave=swarm_config.autosave,
return_json=swarm_config.return_json,
rules=swarm_config.rules,
)
logger.info(
f"SwarmRouter '{swarm_config['name']}' created successfully."
f"SwarmRouter '{swarm_config.name}' created successfully."
)
except Exception as e:
logger.error(f"Error creating SwarmRouter: {str(e)}")
raise ValueError(
f"Failed to create SwarmRouter: {str(e)}"
)
# Define function to run SwarmRouter
def run_swarm_router(
task: str = (
swarm_config.get("task")
if "swarm_architecture" in config
else None
),
):
if swarm_router:
# Handle return types with improved error checking
valid_return_types = {
"auto",
"swarm",
"agents",
"both",
"tasks",
"run_swarm",
}
if return_type not in valid_return_types:
raise ValueError(
f"Invalid return_type. Must be one of: {valid_return_types}"
)
if return_type == "run_swarm" or "swarm":
if not swarm_router:
raise ValueError(
"Cannot run swarm: SwarmRouter not created."
)
try:
output = swarm_router.run(task)
print(output)
logger.info(
f"Output for SwarmRouter '{swarm_config['name']}': {output}"
return swarm_router.run(
config["swarm_architecture"]["task"]
)
return output
except Exception as e:
logger.error(
f"Error running task for SwarmRouter '{swarm_config['name']}': {e}"
)
raise e
else:
logger.error("SwarmRouter not created.")
raise ValueError("SwarmRouter not created.")
logger.error(f"Error running SwarmRouter: {str(e)}")
raise
# Handle return types
# Return appropriate type based on configuration
if return_type == "auto":
if swarm_router:
return swarm_router
elif len(agents) == 1:
return agents[0]
else:
return agents
return (
swarm_router
if swarm_router
else (agents[0] if len(agents) == 1 else agents)
)
elif return_type == "swarm":
return (
swarm_router
@ -205,24 +279,10 @@ def create_agents_from_yaml(
else agents[0] if len(agents) == 1 else agents
), agents
elif return_type == "tasks":
if not task_results:
logger.warning(
"No tasks were executed. Returning empty list."
)
return task_results
elif return_type == "run_swarm":
if swarm_router:
return run_swarm_router()
else:
except Exception as e:
logger.error(
"Cannot run swarm: SwarmRouter not created."
)
raise ValueError(
"Cannot run swarm: SwarmRouter not created."
f"Critical error in create_agents_from_yaml: {str(e)}"
)
else:
logger.error(f"Invalid return_type: {return_type}")
raise ValueError(f"Invalid return_type: {return_type}")
except Exception as e:
logger.error(f"An error occurred: {e}")
raise e
raise

@ -1,9 +1,5 @@
from swarms.artifacts.base_artifact import BaseArtifact
from swarms.artifacts.text_artifact import TextArtifact
from swarms.artifacts.main_artifact import Artifact
__all__ = [
"BaseArtifact",
"TextArtifact",
"Artifact",
]

@ -1,77 +0,0 @@
from __future__ import annotations
import json
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
@dataclass
class BaseArtifact(ABC):
"""
Base class for artifacts.
"""
id: str
name: str
value: Any
def __post_init__(self):
if self.id is None:
self.id = uuid.uuid4().hex
if self.name is None:
self.name = self.id
@classmethod
def value_to_bytes(cls, value: Any) -> bytes:
"""
Convert the value to bytes.
"""
if isinstance(value, bytes):
return value
else:
return str(value).encode()
@classmethod
def value_to_dict(cls, value: Any) -> dict:
"""
Convert the value to a dictionary.
"""
if isinstance(value, dict):
dict_value = value
else:
dict_value = json.loads(value)
return {k: v for k, v in dict_value.items()}
def to_text(self) -> str:
"""
Convert the value to text.
"""
return str(self.value)
def __str__(self) -> str:
"""
Return a string representation of the artifact.
"""
return self.to_text()
def __bool__(self) -> bool:
"""
Return the boolean value of the artifact.
"""
return bool(self.value)
def __len__(self) -> int:
"""
Return the length of the artifact.
"""
return len(self.value)
@abstractmethod
def __add__(self, other: BaseArtifact) -> BaseArtifact:
"""
Add two artifacts together.
"""
...

@ -1,58 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable
from swarms.artifacts.base_artifact import BaseArtifact
@dataclass
class TextArtifact(BaseArtifact):
"""
Represents a text artifact.
Attributes:
value (str): The text value of the artifact.
encoding (str, optional): The encoding of the text (default is "utf-8").
encoding_error_handler (str, optional): The error handler for encoding errors (default is "strict").
_embedding (list[float]): The embedding of the text artifact (default is an empty list).
Properties:
embedding (Optional[list[float]]): The embedding of the text artifact.
Methods:
__add__(self, other: BaseArtifact) -> TextArtifact: Concatenates the text value of the artifact with another artifact.
__bool__(self) -> bool: Checks if the text value of the artifact is non-empty.
generate_embedding(self, driver: BaseEmbeddingModel) -> Optional[list[float]]: Generates the embedding of the text artifact using a given embedding model.
token_count(self, tokenizer: BaseTokenizer) -> int: Counts the number of tokens in the text artifact using a given tokenizer.
to_bytes(self) -> bytes: Converts the text value of the artifact to bytes using the specified encoding and error handler.
"""
value: str
encoding: str = "utf-8"
encoding_error_handler: str = "strict"
tokenizer: Callable = None
_embedding: list[float] = field(default_factory=list)
@property
def embedding(self) -> list[float] | None:
return None if len(self._embedding) == 0 else self._embedding
def __add__(self, other: BaseArtifact) -> TextArtifact:
return TextArtifact(self.value + other.value)
def __bool__(self) -> bool:
return bool(self.value.strip())
def generate_embedding(self, model) -> list[float] | None:
self._embedding.clear()
self._embedding.extend(model.embed_string(str(self.value)))
return self.embedding
def token_count(self) -> int:
return self.tokenizer.count_tokens(str(self.value))
def to_bytes(self) -> bytes:
return self.value.encode(
encoding=self.encoding, errors=self.encoding_error_handler
)

@ -1,163 +1,279 @@
import argparse
import os
import subprocess
import time
import webbrowser
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table
from rich.text import Text
from swarms.cli.onboarding_process import OnboardingProcess
from swarms.agents.auto_generate_swarm_config import (
generate_swarm_config,
)
from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
)
import subprocess
from swarms.cli.onboarding_process import OnboardingProcess
from swarms.utils.formatter import formatter
# Initialize console with custom styling
console = Console()
ASCII_ART = """
_________
/ _____/_ _ _______ _______ _____ ______
\_____ \\ \/ \/ /\__ \\_ __ \/ \ / ___/
/ \\ / / __ \| | \/ Y Y \\___ \
/_______ / \/\_/ (____ /__| |__|_| /____ >
\/ \/ \/ \/
class SwarmCLIError(Exception):
"""Custom exception for Swarm CLI errors"""
pass
# Color scheme
COLORS = {
"primary": "red",
"secondary": "#FF6B6B",
"accent": "#4A90E2",
"success": "#2ECC71",
"warning": "#F1C40F",
"error": "#E74C3C",
"text": "#FFFFFF",
}
ASCII_ART = """
"""
# Function to display the ASCII art in red
def create_spinner(text: str) -> Progress:
"""Create a custom spinner with the given text."""
return Progress(
SpinnerColumn(style=COLORS["primary"]),
TextColumn("[{task.description}]", style=COLORS["text"]),
console=console,
)
def show_ascii_art():
text = Text(ASCII_ART, style="bold cyan")
console.print(text)
"""Display the ASCII art with a glowing effect."""
panel = Panel(
Text(ASCII_ART, style=f"bold {COLORS['primary']}"),
border_style=COLORS["secondary"],
title="[bold]Welcome to Swarms[/bold]",
subtitle="[dim]Power to the Swarms[/dim]",
)
console.print(panel)
def create_command_table() -> Table:
"""Create a beautifully formatted table of commands."""
table = Table(
show_header=True,
header_style=f"bold {COLORS['primary']}",
border_style=COLORS["secondary"],
title="Available Commands",
padding=(0, 2),
)
table.add_column("Command", style="bold white")
table.add_column("Description", style="dim white")
commands = [
("onboarding", "Start the interactive onboarding process"),
("help", "Display this help message"),
("get-api-key", "Retrieve your API key from the platform"),
("check-login", "Verify login status and initialize cache"),
("run-agents", "Execute agents from your YAML configuration"),
("auto-upgrade", "Update Swarms to the latest version"),
("book-call", "Schedule a strategy session with our team"),
("autoswarm", "Generate and execute an autonomous swarm"),
]
for cmd, desc in commands:
table.add_row(cmd, desc)
return table
# Help command
def show_help():
"""Display a beautifully formatted help message."""
console.print(
"\n[bold]Swarms CLI - Command Reference[/bold]\n",
style=COLORS["primary"],
)
console.print(create_command_table())
console.print(
"""
[bold cyan]Swarms CLI - Help[/bold cyan]
"\n[dim]For detailed documentation, visit: https://docs.swarms.world[/dim]"
)
[bold magenta]Commands:[/bold magenta]
[bold white]onboarding[/bold white] : Starts the onboarding process
[bold white]help[/bold white] : Shows this help message
[bold white]get-api-key[/bold white] : Retrieves your API key from the platform
[bold white]check-login[/bold white] : Checks if you're logged in and starts the cache
[bold white]read-docs[/bold white] : Redirects you to swarms cloud documentation!
[bold white]run-agents[/bold white] : Run your Agents from your specified yaml file. Specify the yaml file with path the `--yaml-file` arg. Example: `--yaml-file agents.yaml`
[bold white]generate-prompt[/bold white] : Generate a prompt through automated prompt engineering. Requires an OPENAI Key in your `.env` Example: --prompt "Generate a prompt for an agent to analyze legal docs"
[bold white]auto-upgrade[/bold white] : Automatically upgrades Swarms to the latest version
[bold white]book-call[/bold white] : Book a strategy session with our team to discuss your use case and get personalized guidance
For more details, visit: https://docs.swarms.world
"""
def show_error(message: str, help_text: str = None):
"""Display error message in a formatted panel"""
error_panel = Panel(
f"[bold red]{message}[/bold red]",
title="Error",
border_style="red",
)
console.print(error_panel)
if help_text:
console.print(f"\n[yellow] {help_text}[/yellow]")
# [bold white]add-agent[/bold white] : Add an agent to the marketplace under your name. Must have a Dockerfile + your agent.yaml to publish. Learn more Here: https://docs.swarms.world/en/latest/swarms_cloud/vision/
def execute_with_spinner(action: callable, text: str) -> None:
"""Execute an action with a spinner animation."""
with create_spinner(text) as progress:
task = progress.add_task(text, total=None)
result = action()
progress.remove_task(task)
return result
# Fetch API key from platform
def get_api_key():
"""Retrieve API key with visual feedback."""
with create_spinner("Opening API key portal...") as progress:
task = progress.add_task("Opening browser...")
webbrowser.open("https://swarms.world/platform/api-keys")
time.sleep(1)
progress.remove_task(task)
console.print(
"[bold yellow]Opening the API key retrieval page...[/bold yellow]"
f"\n[{COLORS['success']}]✓ API key page opened in your browser[/{COLORS['success']}]"
)
# Simulating API key retrieval process by opening the website
import webbrowser
webbrowser.open("https://swarms.world/platform/api-keys")
time.sleep(2)
def check_login():
"""Verify login status with enhanced visual feedback."""
cache_file = "cache.txt"
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
if f.read() == "logged_in":
console.print(
"[bold green]Your API key is available on the dashboard.[/bold green]"
f"[{COLORS['success']}]✓ Authentication verified[/{COLORS['success']}]"
)
return True
with create_spinner("Authenticating...") as progress:
task = progress.add_task("Initializing session...")
time.sleep(1)
with open(cache_file, "w") as f:
f.write("logged_in")
progress.remove_task(task)
# Redirect to docs
def redirect_to_docs():
console.print(
"[bold yellow]Opening the Docs page...[/bold yellow]"
f"[{COLORS['success']}]✓ Login successful![/{COLORS['success']}]"
)
# Simulating API key retrieval process by opening the website
import webbrowser
return True
webbrowser.open("https://docs.swarms.world")
time.sleep(2)
# Redirect to docs
def redirect_to_call():
def run_autoswarm(task: str, model: str):
"""Run autoswarm with enhanced error handling"""
try:
console.print(
"[bold yellow]Opening the Call page...[/bold yellow]"
"[yellow]Initializing autoswarm configuration...[/yellow]"
)
# Simulating API key retrieval process by opening the website
import webbrowser
webbrowser.open("https://cal.com/swarms/swarms-strategy-session")
time.sleep(2)
# Set LiteLLM verbose mode for debugging
import litellm
litellm.set_verbose = True
# Check and start cache (login system simulation)
def check_login():
cache_file = "cache.txt"
# Validate inputs
if not task or task.strip() == "":
raise SwarmCLIError("Task cannot be empty")
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
cache_content = f.read()
if cache_content == "logged_in":
if not model or model.strip() == "":
raise SwarmCLIError("Model name cannot be empty")
# Attempt to generate swarm configuration
console.print(
"[bold green]You are already logged in.[/bold green]"
f"[yellow]Generating swarm for task: {task}[/yellow]"
)
else:
result = generate_swarm_config(task=task, model=model)
if result:
console.print(
"[bold red]You are not logged in.[/bold red]"
"[green]✓ Swarm configuration generated successfully![/green]"
)
else:
console.print("[bold yellow]Logging in...[/bold yellow]")
time.sleep(2)
with open(cache_file, "w") as f:
f.write("logged_in")
console.print("[bold green]Login successful![/bold green]")
raise SwarmCLIError(
"Failed to generate swarm configuration"
)
except Exception as e:
if "No YAML content found" in str(e):
show_error(
"Failed to generate YAML configuration",
"This might be due to an API key issue or invalid model configuration.\n"
+ "1. Check if your OpenAI API key is set correctly\n"
+ "2. Verify the model name is valid\n"
+ "3. Try running with --model gpt-4",
)
else:
show_error(
f"Error during autoswarm execution: {str(e)}",
"For debugging, try:\n"
+ "1. Check your API keys are set correctly\n"
+ "2. Verify your network connection\n"
+ "3. Try a different model",
)
def check_and_upgrade_version():
console.print(
"[bold yellow]Checking for Swarms updates...[/bold yellow]"
)
try:
# Check for updates using pip
"""Check for updates with visual progress."""
def check_update():
result = subprocess.run(
["pip", "list", "--outdated", "--format=freeze"],
capture_output=True,
text=True,
)
outdated_packages = result.stdout.splitlines()
return result.stdout.splitlines()
# Check if Swarms is outdated
for package in outdated_packages:
outdated = execute_with_spinner(
check_update, "Checking for updates..."
)
for package in outdated:
if package.startswith("swarms=="):
console.print(
"[bold magenta]New version available! Upgrading...[/bold magenta]"
f"[{COLORS['warning']}]↑ Update available![/{COLORS['warning']}]"
)
with create_spinner("Upgrading Swarms...") as progress:
task = progress.add_task(
"Installing latest version..."
)
subprocess.run(
["pip", "install", "--upgrade", "swarms"],
check=True,
)
progress.remove_task(task)
console.print(
"[bold green]Swarms upgraded successfully![/bold green]"
f"[{COLORS['success']}]✓ Swarms upgraded successfully![/{COLORS['success']}]"
)
return
console.print(
"[bold green]Swarms is up-to-date.[/bold green]"
)
except Exception as e:
console.print(
f"[bold red]Error checking for updates: {e}[/bold red]"
f"[{COLORS['success']}]✓ Swarms is up to date![/{COLORS['success']}]"
)
# Main CLI handler
def main():
parser = argparse.ArgumentParser(description="Swarms Cloud CLI")
try:
# Adding arguments for different commands
show_ascii_art()
parser = argparse.ArgumentParser(
description="Swarms Cloud CLI"
)
parser.add_argument(
"command",
choices=[
@ -166,45 +282,31 @@ def main():
"get-api-key",
"check-login",
"run-agents",
"generate-prompt", # Added new command for generating prompts
"auto-upgrade", # Added new command for auto-upgrade,
"auto-upgrade",
"book-call",
"autoswarm",
],
help="Command to run",
help="Command to execute",
)
parser.add_argument(
"--yaml-file",
type=str,
default="agents.yaml",
help="Specify the YAML file for running agents",
)
parser.add_argument(
"--prompt",
type=str,
help="Specify the task for generating a prompt",
)
parser.add_argument(
"--num-loops",
type=int,
default=1,
help="Specify the number of loops for generating a prompt",
help="YAML configuration file path",
)
parser.add_argument(
"--autosave",
action="store_true",
help="Enable autosave for the prompt generator",
"--task", type=str, help="Task for autoswarm"
)
parser.add_argument(
"--save-to-yaml",
action="store_true",
help="Save the generated prompt to a YAML file",
"--model",
type=str,
default="gpt-4",
help="Model for autoswarm",
)
args = parser.parse_args()
show_ascii_art()
# Determine which command to run
try:
if args.command == "onboarding":
OnboardingProcess().run()
elif args.command == "help":
@ -217,28 +319,30 @@ def main():
create_agents_from_yaml(
yaml_file=args.yaml_file, return_type="tasks"
)
# elif args.command == "generate-prompt":
# if (
# args.prompt
# ): # Corrected from args.prompt_task to args.prompt
# generate_prompt(
# num_loops=args.num_loops,
# autosave=args.autosave,
# save_to_yaml=args.save_to_yaml,
# prompt=args.prompt, # Corrected from args.prompt_task to args.prompt
# )
# else:
# console.print(
# "[bold red]Please specify a task for generating a prompt using '--prompt'.[/bold red]"
# )
elif args.command == "auto-upgrade":
check_and_upgrade_version()
elif args.command == "book-call":
redirect_to_call()
else:
webbrowser.open(
"https://cal.com/swarms/swarms-strategy-session"
)
elif args.command == "autoswarm":
if not args.task:
show_error(
"Missing required argument: --task",
"Example usage: python cli.py autoswarm --task 'analyze this data' --model gpt-4",
)
exit(1)
run_autoswarm(args.task, args.model)
except Exception as e:
console.print(
"[bold red]Unknown command! Type 'help' for usage.[/bold red]"
f"[{COLORS['error']}]Error: {str(e)}[/{COLORS['error']}]"
)
return
except Exception as error:
formatter.print_panel(
f"Error detected: {error} check your args"
)
raise error
if __name__ == "__main__":

@ -87,19 +87,6 @@ class OnboardingProcess:
try:
combined_data = {**self.user_data, **self.system_data}
log_agent_data(combined_data)
# threading.Thread(target=log_agent_data(combined_data)).start()
# with open(self.auto_save_path, "w") as f:
# json.dump(combined_data, f, indent=4)
# # logger.info(
# # "User and system data successfully saved to {}",
# # self.auto_save_path,
# # )
# with open(self.cache_save_path, "w") as f:
# json.dump(combined_data, f, indent=4)
# logger.info(
# "User and system data successfully cached in {}",
# self.cache_save_path,
# )
return # Exit the function if saving was successful
except Exception as e:
logger.error(

@ -24,8 +24,6 @@ import toml
import yaml
from pydantic import BaseModel
from swarm_models.tiktoken_wrapper import TikTokenizer
from termcolor import colored
from swarms.agents.ape_agent import auto_generate_prompt
from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
@ -340,6 +338,8 @@ class Agent:
scheduled_run_date: Optional[datetime] = None,
do_not_use_cluster_ops: bool = True,
all_gpus: bool = False,
model_name: str = None,
llm_args: dict = None,
*args,
**kwargs,
):
@ -455,6 +455,8 @@ class Agent:
self.scheduled_run_date = scheduled_run_date
self.do_not_use_cluster_ops = do_not_use_cluster_ops
self.all_gpus = all_gpus
self.model_name = model_name
self.llm_args = llm_args
# Initialize the short term memory
self.short_memory = Conversation(
@ -591,6 +593,21 @@ class Agent:
# Telemetry Processor to log agent data
threading.Thread(target=self.log_agent_data).start()
threading.Thread(target=self.llm_handling())
def llm_handling(self):
if self.llm is None:
from swarms.utils.litellm import LiteLLM
if self.llm_args is not None:
self.llm = LiteLLM(
model_name=self.model_name, **self.llm_args
)
else:
self.llm = LiteLLM(model_name=self.model_name)
def check_if_no_prompt_then_autogenerate(self, task: str = None):
"""
Checks if auto_generate_prompt is enabled and generates a prompt by combining agent name, description and system prompt if available.
@ -671,11 +688,8 @@ class Agent:
return self.stopping_condition(response)
return False
except Exception as error:
print(
colored(
f"Error checking stopping condition: {error}",
"red",
)
logger.error(
f"Error checking stopping condition: {error}"
)
def dynamic_temperature(self):
@ -688,21 +702,20 @@ class Agent:
try:
if hasattr(self.llm, "temperature"):
# Randomly change the temperature attribute of self.llm object
logger.info("Enabling Random Dyamic Temperature")
self.llm.temperature = random.uniform(0.0, 1.0)
else:
# Use a default temperature
self.llm.temperature = 0.5
except Exception as error:
print(
colored(
logger.error(
f"Error dynamically changing temperature: {error}"
)
)
def print_dashboard(self):
"""Print dashboard"""
print(colored("Initializing Agent Dashboard...", "yellow"))
formatter.print_panel(
f"Initializing Agent: {self.agent_name}"
)
data = self.to_dict()
@ -710,8 +723,7 @@ class Agent:
# data = json.dumps(data, indent=4)
# json_data = json.dumps(data, indent=4)
print(
colored(
formatter.print_panel(
f"""
Agent Dashboard
--------------------------------------------
@ -724,8 +736,6 @@ class Agent:
----------------------------------------
""",
"green",
)
)
def loop_count_print(
@ -737,7 +747,7 @@ class Agent:
loop_count (_type_): _description_
max_loops (_type_): _description_
"""
print(colored(f"\nLoop {loop_count} of {max_loops}", "cyan"))
logger.info(f"\nLoop {loop_count} of {max_loops}")
print("\n")
# Check parameters
@ -761,8 +771,8 @@ class Agent:
self,
task: Optional[str] = None,
img: Optional[str] = None,
is_last: bool = False,
print_task: bool = False,
is_last: Optional[bool] = False,
print_task: Optional[bool] = False,
*args,
**kwargs,
) -> Any:
@ -960,7 +970,7 @@ class Agent:
if self.interactive:
logger.info("Interactive mode enabled.")
user_input = colored(input("You: "), "red")
user_input = input("You: ")
# User-defined exit command
if (
@ -1024,6 +1034,11 @@ class Agent:
self.artifacts_file_extension,
)
try:
self.log_agent_data()
except Exception:
pass
# More flexible output types
if (
self.output_type == "string"
@ -1059,6 +1074,14 @@ class Agent:
)
except Exception as error:
self.log_agent_data()
logger.info(
f"Error running agent: {error} optimize your input parameters"
)
raise error
except KeyboardInterrupt as error:
self.log_agent_data()
logger.info(
f"Error running agent: {error} optimize your input parameters"
)
@ -1261,7 +1284,7 @@ class Agent:
logger.info(f"Running bulk tasks: {inputs}")
return [self.run(**input_data) for input_data in inputs]
except Exception as error:
print(colored(f"Error running bulk run: {error}", "red"))
logger.info(f"Error running bulk run: {error}", "red")
def save(self) -> None:
"""Save the agent history to a file.
@ -1438,9 +1461,7 @@ class Agent:
with open(file_path, "w") as f:
yaml.dump(self.to_dict(), f)
except Exception as error:
logger.error(
colored(f"Error saving agent to YAML: {error}", "red")
)
logger.error(f"Error saving agent to YAML: {error}")
raise error
def get_llm_parameters(self):
@ -1505,7 +1526,7 @@ class Agent:
role=self.user_name, content=data
)
except Exception as error:
print(colored(f"Error ingesting docs: {error}", "red"))
logger.info(f"Error ingesting docs: {error}", "red")
def ingest_pdf(self, pdf: str):
"""Ingest the pdf into the memory
@ -1520,7 +1541,7 @@ class Agent:
role=self.user_name, content=text
)
except Exception as error:
print(colored(f"Error ingesting pdf: {error}", "red"))
logger.info(f"Error ingesting pdf: {error}", "red")
def receieve_message(self, name: str, message: str):
"""Receieve a message"""
@ -1604,12 +1625,10 @@ class Agent:
role=self.user_name, content=text
)
except Exception as error:
print(
colored(
f"Error getting docs from doc folders: {error}",
"red",
)
logger.error(
f"Error getting docs from doc folders: {error}"
)
raise error
def check_end_session_agentops(self):
if self.agent_ops_on is True:
@ -1629,7 +1648,8 @@ class Agent:
try:
# Query the long term memory
if self.long_term_memory is not None:
logger.info(f"Querying long term memory for: {task}")
formatter.print_panel(f"Querying RAG for: {task}")
memory_retrieval = self.long_term_memory.query(
task, *args, **kwargs
)
@ -1638,15 +1658,15 @@ class Agent:
f"Documents Available: {str(memory_retrieval)}"
)
# Count the tokens
memory_token_count = self.tokenizer.count_tokens(
memory_retrieval
)
if memory_token_count > self.memory_chunk_size:
# Truncate the memory by the memory chunk size
memory_retrieval = self.truncate_string_by_tokens(
memory_retrieval, self.memory_chunk_size
)
# # Count the tokens
# memory_token_count = self.tokenizer.count_tokens(
# memory_retrieval
# )
# if memory_token_count > self.memory_chunk_size:
# # Truncate the memory by the memory chunk size
# memory_retrieval = self.truncate_string_by_tokens(
# memory_retrieval, self.memory_chunk_size
# )
self.short_memory.add(
role="Database",

@ -1,109 +1,87 @@
import os
from typing import List, Any
from swarms.structs.agent import Agent
from loguru import logger
import uuid
WORKSPACE_DIR = os.getenv("WORKSPACE_DIR")
uuid_for_log = str(uuid.uuid4())
logger.add(
os.path.join(
WORKSPACE_DIR,
"agents_available",
f"agents-available-{uuid_for_log}.log",
),
level="INFO",
colorize=True,
backtrace=True,
diagnose=True,
)
def get_agent_name(agent: Any) -> str:
"""Helper function to safely get agent name
Args:
agent (Any): The agent object to get name from
Returns:
str: The agent's name if found, 'Unknown' otherwise
"""
if isinstance(agent, Agent) and hasattr(agent, "agent_name"):
return agent.agent_name
return "Unknown"
def get_agent_description(agent: Any) -> str:
"""Helper function to get agent description or system prompt preview
Args:
agent (Any): The agent object
Returns:
str: Description or first 100 chars of system prompt
"""
if not isinstance(agent, Agent):
return "N/A"
if hasattr(agent, "description") and agent.description:
return agent.description
if hasattr(agent, "system_prompt") and agent.system_prompt:
return f"{agent.system_prompt[:150]}..."
return "N/A"
from typing import List
def showcase_available_agents(
agents: List[Agent],
name: str = None,
description: str = None,
agents: List[Agent] = [],
update_agents_on: bool = False,
format: str = "XML",
) -> str:
"""
Generate a formatted string showcasing all available agents and their descriptions.
Format the available agents in either XML or Table format.
Args:
agents (List[Agent]): List of Agent objects to showcase.
update_agents_on (bool, optional): If True, updates each agent's system prompt with
the showcase information. Defaults to False.
agents (List[Agent]): A list of agents to represent
name (str, optional): Name of the swarm
description (str, optional): Description of the swarm
format (str, optional): Output format ("XML" or "Table"). Defaults to "XML"
Returns:
str: Formatted string containing agent information, including names, descriptions
and IDs for all available agents.
str: Formatted string containing agent information
"""
logger.info(f"Showcasing {len(agents)} available agents")
formatted_agents = []
header = f"\n####### Agents available in the swarm: {name} ############\n"
header += f"{description}\n"
row_format = "{:<5} | {:<20} | {:<50}"
header_row = row_format.format("ID", "Agent Name", "Description")
separator = "-" * 80
def truncate(text: str, max_length: int = 130) -> str:
return (
f"{text[:max_length]}..."
if len(text) > max_length
else text
)
formatted_agents.append(header)
formatted_agents.append(separator)
formatted_agents.append(header_row)
formatted_agents.append(separator)
output = []
if format.upper() == "TABLE":
output.append("\n| ID | Agent Name | Description |")
output.append("|-----|------------|-------------|")
for idx, agent in enumerate(agents):
if not isinstance(agent, Agent):
logger.warning(
f"Skipping non-Agent object: {type(agent)}"
if isinstance(agent, Agent):
agent_name = getattr(agent, "agent_name", str(agent))
description = getattr(
agent,
"description",
getattr(
agent, "system_prompt", "Unknown description"
),
)
continue
agent_name = get_agent_name(agent)
description = (
get_agent_description(agent)[:100] + "..."
if len(get_agent_description(agent)) > 100
else get_agent_description(agent)
desc = truncate(description, 50)
output.append(
f"| {idx + 1} | {agent_name} | {desc} |"
)
formatted_agents.append(
row_format.format(idx + 1, agent_name, description)
else:
output.append(
f"| {idx + 1} | {agent} | Unknown description |"
)
return "\n".join(output)
# Default XML format
output.append("<agents>")
if name:
output.append(f" <name>{name}</name>")
if description:
output.append(
f" <description>{truncate(description)}</description>"
)
for idx, agent in enumerate(agents):
output.append(f" <agent id='{idx + 1}'>")
if isinstance(agent, Agent):
agent_name = getattr(agent, "agent_name", str(agent))
description = getattr(
agent,
"description",
getattr(
agent, "system_prompt", "Unknown description"
),
)
output.append(f" <name>{agent_name}</name>")
output.append(
f" <description>{truncate(description)}</description>"
)
else:
output.append(f" <name>{agent}</name>")
output.append(
" <description>Unknown description</description>"
)
output.append(" </agent>")
output.append("</agents>")
showcase = "\n".join(formatted_agents)
return showcase
return "\n".join(output)

@ -1,8 +1,7 @@
import json
from typing import Any, Dict, List, Optional
from termcolor import colored
from swarms.utils.formatter import formatter
from swarms.structs.agent import Agent
from swarms.structs.base_structure import BaseStructure
from swarms.structs.task import Task
@ -132,9 +131,10 @@ class BaseWorkflow(BaseStructure):
for task in self.tasks:
task.result = None
except Exception as error:
print(
colored(f"Error resetting workflow: {error}", "red"),
formatter.print_panel(
f"Error resetting workflow: {error}"
)
raise error
def get_task_results(self) -> Dict[str, Any]:
"""
@ -148,10 +148,8 @@ class BaseWorkflow(BaseStructure):
task.description: task.result for task in self.tasks
}
except Exception as error:
print(
colored(
f"Error getting task results: {error}", "red"
),
formatter.print_panel(
f"Error getting task results: {error}"
)
def remove_task(self, task: str) -> None:
@ -163,12 +161,10 @@ class BaseWorkflow(BaseStructure):
if task.description != task
]
except Exception as error:
print(
colored(
formatter.print_panel(
f"Error removing task from workflow: {error}",
"red",
),
)
raise error
def update_task(self, task: str, **updates) -> None:
"""
@ -203,11 +199,9 @@ class BaseWorkflow(BaseStructure):
f"Task {task} not found in workflow."
)
except Exception as error:
print(
colored(
f"Error updating task in workflow: {error}", "red"
formatter.print_panel(
f"Error updating task in workflow: {error}"
),
)
def delete_task(self, task: str) -> None:
"""
@ -240,12 +234,10 @@ class BaseWorkflow(BaseStructure):
f"Task {task} not found in workflow."
)
except Exception as error:
print(
colored(
formatter.print_panel(
f"Error deleting task from workflow: {error}",
"red",
),
)
raise error
def save_workflow_state(
self,
@ -287,23 +279,18 @@ class BaseWorkflow(BaseStructure):
}
json.dump(state, f, indent=4)
except Exception as error:
print(
colored(
formatter.print_panel(
f"Error saving workflow state: {error}",
"red",
)
)
raise error
def add_objective_to_workflow(self, task: str, **kwargs) -> None:
"""Adds an objective to the workflow."""
try:
print(
colored(
formatter.print_panel(
"""
Adding Objective to Workflow...""",
"green",
attrs=["bold", "underline"],
)
)
task = Task(
@ -314,12 +301,10 @@ class BaseWorkflow(BaseStructure):
)
self.tasks.append(task)
except Exception as error:
print(
colored(
formatter.print_panel(
f"Error adding objective to workflow: {error}",
"red",
)
)
raise error
def load_workflow_state(
self, filepath: str = None, **kwargs
@ -359,11 +344,8 @@ class BaseWorkflow(BaseStructure):
)
self.tasks.append(task)
except Exception as error:
print(
colored(
formatter.print_panel(
f"Error loading workflow state: {error}",
"red",
)
)
def workflow_dashboard(self, **kwargs) -> None:
@ -383,8 +365,7 @@ class BaseWorkflow(BaseStructure):
>>> workflow.workflow_dashboard()
"""
print(
colored(
formatter.print_panel(
f"""
Sequential Workflow Dashboard
--------------------------------
@ -398,10 +379,7 @@ class BaseWorkflow(BaseStructure):
--------------------------------
Metadata:
kwargs: {kwargs}
""",
"cyan",
attrs=["bold", "underline"],
)
"""
)
def workflow_bootup(self, **kwargs) -> None:
@ -409,11 +387,6 @@ class BaseWorkflow(BaseStructure):
Workflow bootup.
"""
print(
colored(
"""
Sequential Workflow Initializing...""",
"green",
attrs=["bold", "underline"],
)
formatter.print_panel(
"""Sequential Workflow Initializing...""",
)

@ -3,10 +3,9 @@ import json
from typing import Any, Optional
import yaml
from termcolor import colored
from swarms.structs.base_structure import BaseStructure
from typing import TYPE_CHECKING
from swarms.utils.formatter import formatter
if TYPE_CHECKING:
from swarms.structs.agent import (
@ -191,18 +190,9 @@ class Conversation(BaseStructure):
Args:
detailed (bool, optional): detailed. Defaults to False.
"""
role_to_color = {
"system": "red",
"user": "green",
"assistant": "blue",
"function": "magenta",
}
for message in self.conversation_history:
print(
colored(
f"{message['role']}: {message['content']}\n\n",
role_to_color[message["role"]],
)
formatter.print_panel(
f"{message['role']}: {message['content']}\n\n"
)
def export_conversation(self, filename: str, *args, **kwargs):
@ -307,47 +297,37 @@ class Conversation(BaseStructure):
for message in messages:
if message["role"] == "system":
print(
colored(
formatter.print_panel(
f"system: {message['content']}\n",
role_to_color[message["role"]],
)
)
elif message["role"] == "user":
print(
colored(
formatter.print_panel(
f"user: {message['content']}\n",
role_to_color[message["role"]],
)
)
elif message["role"] == "assistant" and message.get(
"function_call"
):
print(
colored(
formatter.print_panel(
f"assistant: {message['function_call']}\n",
role_to_color[message["role"]],
)
)
elif message["role"] == "assistant" and not message.get(
"function_call"
):
print(
colored(
formatter.print_panel(
f"assistant: {message['content']}\n",
role_to_color[message["role"]],
)
)
elif message["role"] == "tool":
print(
colored(
formatter.print_panel(
(
f"function ({message['name']}):"
f" {message['content']}\n"
),
role_to_color[message["role"]],
)
)
def truncate_memory_with_tokenizer(self):
"""

@ -0,0 +1,665 @@
"""
GraphSwarm: A production-grade framework for orchestrating swarms of agents
Author: Claude
License: MIT
Version: 2.0.0
"""
import asyncio
import json
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union
import chromadb
import networkx as nx
from loguru import logger
from pydantic import BaseModel, Field
from swarms import Agent
# Configure logging
logger.add(
"graphswarm.log",
rotation="500 MB",
retention="10 days",
level="INFO",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
)
class AgentOutput(BaseModel):
"""Structured output from an agent."""
agent_name: str
timestamp: float = Field(default_factory=time.time)
output: Any
execution_time: float
error: Optional[str] = None
metadata: Dict = Field(default_factory=dict)
class SwarmOutput(BaseModel):
"""Structured output from the entire swarm."""
timestamp: float = Field(default_factory=time.time)
outputs: Dict[str, AgentOutput]
execution_time: float
success: bool
error: Optional[str] = None
metadata: Dict = Field(default_factory=dict)
class SwarmMemory:
"""Vector-based memory system for GraphSwarm using ChromaDB."""
def __init__(self, collection_name: str = "swarm_memories"):
"""Initialize SwarmMemory with ChromaDB."""
self.client = chromadb.Client()
# Get or create collection
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"description": "GraphSwarm execution memories"},
)
def store_execution(self, task: str, result: SwarmOutput):
"""Store execution results in vector memory."""
try:
# Create metadata
metadata = {
"timestamp": datetime.now().isoformat(),
"success": result.success,
"execution_time": result.execution_time,
"agent_sequence": json.dumps(
[name for name in result.outputs.keys()]
),
"error": result.error if result.error else "",
}
# Create document from outputs
document = {
"task": task,
"outputs": json.dumps(
{
name: {
"output": str(output.output),
"execution_time": output.execution_time,
"error": output.error,
}
for name, output in result.outputs.items()
}
),
}
# Store in ChromaDB
self.collection.add(
documents=[json.dumps(document)],
metadatas=[metadata],
ids=[f"exec_{datetime.now().timestamp()}"],
)
print("added to database")
logger.info(f"Stored execution in memory: {task}")
except Exception as e:
logger.error(
f"Failed to store execution in memory: {str(e)}"
)
def get_similar_executions(self, task: str, limit: int = 5):
"""Retrieve similar past executions."""
try:
# Query ChromaDB for similar executions
results = self.collection.query(
query_texts=[task],
n_results=limit,
include=["documents", "metadatas"],
)
print(results)
if not results["documents"]:
return []
# Process results
executions = []
for doc, metadata in zip(
results["documents"][0], results["metadatas"][0]
):
doc_dict = json.loads(doc)
executions.append(
{
"task": doc_dict["task"],
"outputs": json.loads(doc_dict["outputs"]),
"success": metadata["success"],
"execution_time": metadata["execution_time"],
"agent_sequence": json.loads(
metadata["agent_sequence"]
),
"timestamp": metadata["timestamp"],
}
)
return executions
except Exception as e:
logger.error(
f"Failed to retrieve similar executions: {str(e)}"
)
return []
def get_optimal_sequence(self, task: str) -> Optional[List[str]]:
"""Get the most successful agent sequence for similar tasks."""
similar_executions = self.get_similar_executions(task)
print(f"similar_executions {similar_executions}")
if not similar_executions:
return None
# Sort by success and execution time
successful_execs = [
ex for ex in similar_executions if ex["success"]
]
if not successful_execs:
return None
# Return sequence from most successful execution
return successful_execs[0]["agent_sequence"]
def clear_memory(self):
"""Clear all memories."""
self.client.delete_collection(self.collection.name)
self.collection = self.client.get_or_create_collection(
name=self.collection.name
)
class GraphSwarm:
"""
Enhanced framework for creating and managing swarms of collaborative agents.
"""
def __init__(
self,
agents: Union[
List[Agent], List[Tuple[Agent, List[str]]], None
] = None,
max_workers: Optional[int] = None,
swarm_name: str = "Collaborative Agent Swarm",
memory_collection: str = "swarm_memory",
):
"""Initialize GraphSwarm."""
self.graph = nx.DiGraph()
self.agents: Dict[str, Agent] = {}
self.dependencies: Dict[str, List[str]] = {}
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.swarm_name = swarm_name
self.memory_collection = memory_collection
self.memory = SwarmMemory(collection_name=memory_collection)
if agents:
self.initialize_agents(agents)
logger.info(f"Initialized GraphSwarm: {swarm_name}")
def initialize_agents(
self,
agents: Union[List[Agent], List[Tuple[Agent, List[str]]]],
):
"""Initialize agents and their dependencies."""
try:
# Handle list of Agents or (Agent, dependencies) tuples
for item in agents:
if isinstance(item, tuple):
agent, dependencies = item
else:
agent, dependencies = item, []
if not isinstance(agent, Agent):
raise ValueError(
f"Expected Agent object, got {type(agent)}"
)
self.agents[agent.agent_name] = agent
self.dependencies[agent.agent_name] = dependencies
self.graph.add_node(agent.agent_name, agent=agent)
# Add dependencies
for dep in dependencies:
if dep not in self.agents:
raise ValueError(
f"Dependency {dep} not found for agent {agent.agent_name}"
)
self.graph.add_edge(dep, agent.agent_name)
self._validate_graph()
except Exception as e:
logger.error(f"Failed to initialize agents: {str(e)}")
raise
def _validate_graph(self):
"""Validate the agent dependency graph."""
if not self.graph.nodes():
raise ValueError("No agents added to swarm")
if not nx.is_directed_acyclic_graph(self.graph):
cycles = list(nx.simple_cycles(self.graph))
raise ValueError(
f"Agent dependency graph contains cycles: {cycles}"
)
def _get_agent_role_description(self, agent_name: str) -> str:
"""Generate a description of the agent's role in the swarm."""
predecessors = list(self.graph.predecessors(agent_name))
successors = list(self.graph.successors(agent_name))
position = (
"initial"
if not predecessors
else ("final" if not successors else "intermediate")
)
role = f"""You are {agent_name}, a specialized agent in the {self.swarm_name}.
Position: {position} agent in the workflow
Your relationships:"""
if predecessors:
role += (
f"\nYou receive input from: {', '.join(predecessors)}"
)
if successors:
role += f"\nYour output will be used by: {', '.join(successors)}"
return role
def _generate_workflow_context(self) -> str:
"""Generate a description of the entire workflow."""
execution_order = list(nx.topological_sort(self.graph))
workflow = f"""Workflow Overview of {self.swarm_name}:
Processing Order:
{' -> '.join(execution_order)}
Agent Roles:
"""
for agent_name in execution_order:
predecessors = list(self.graph.predecessors(agent_name))
successors = list(self.graph.successors(agent_name))
workflow += f"\n\n{agent_name}:"
if predecessors:
workflow += (
f"\n- Receives from: {', '.join(predecessors)}"
)
if successors:
workflow += f"\n- Sends to: {', '.join(successors)}"
if not predecessors and not successors:
workflow += "\n- Independent agent"
return workflow
def _build_agent_prompt(
self, agent_name: str, task: str, context: Dict = None
) -> str:
"""Build a comprehensive prompt for the agent including role and context."""
prompt_parts = [
self._get_agent_role_description(agent_name),
"\nWorkflow Context:",
self._generate_workflow_context(),
"\nYour Task:",
task,
]
if context:
prompt_parts.extend(
["\nContext from Previous Agents:", str(context)]
)
prompt_parts.extend(
[
"\nInstructions:",
"1. Process the task according to your role",
"2. Consider the input from previous agents when available",
"3. Provide clear, structured output",
"4. Remember that your output will be used by subsequent agents",
"\nResponse Guidelines:",
"- Provide clear, well-organized output",
"- Include relevant details and insights",
"- Highlight key findings",
"- Flag any uncertainties or issues",
]
)
return "\n".join(prompt_parts)
async def _execute_agent(
self, agent_name: str, task: str, context: Dict = None
) -> AgentOutput:
"""Execute a single agent."""
start_time = time.time()
agent = self.agents[agent_name]
try:
# Build comprehensive prompt
full_prompt = self._build_agent_prompt(
agent_name, task, context
)
logger.debug(f"Prompt for {agent_name}:\n{full_prompt}")
# Execute agent
output = await asyncio.to_thread(agent.run, full_prompt)
return AgentOutput(
agent_name=agent_name,
output=output,
execution_time=time.time() - start_time,
metadata={
"task": task,
"context": context,
"position_in_workflow": list(
nx.topological_sort(self.graph)
).index(agent_name),
},
)
except Exception as e:
logger.error(
f"Error executing agent {agent_name}: {str(e)}"
)
return AgentOutput(
agent_name=agent_name,
output=None,
execution_time=time.time() - start_time,
error=str(e),
metadata={"task": task},
)
async def execute(self, task: str) -> SwarmOutput:
"""
Execute the entire swarm of agents with memory integration.
Args:
task: Initial task to execute
Returns:
SwarmOutput: Structured output from all agents
"""
start_time = time.time()
outputs = {}
success = True
error = None
try:
# Get similar past executions
similar_executions = self.memory.get_similar_executions(
task, limit=3
)
optimal_sequence = self.memory.get_optimal_sequence(task)
# Get base execution order
base_execution_order = list(
nx.topological_sort(self.graph)
)
# Determine final execution order
if optimal_sequence and all(
agent in base_execution_order
for agent in optimal_sequence
):
logger.info(
f"Using optimal sequence from memory: {optimal_sequence}"
)
execution_order = optimal_sequence
else:
execution_order = base_execution_order
# Get historical context if available
historical_context = {}
if similar_executions:
best_execution = similar_executions[0]
if best_execution["success"]:
historical_context = {
"similar_task": best_execution["task"],
"previous_outputs": best_execution["outputs"],
"execution_time": best_execution[
"execution_time"
],
"success_patterns": self._extract_success_patterns(
similar_executions
),
}
# Execute agents in order
for agent_name in execution_order:
try:
# Get context from dependencies and history
agent_context = {
"dependencies": {
dep: outputs[dep].output
for dep in self.graph.predecessors(
agent_name
)
if dep in outputs
},
"historical": historical_context,
"position": execution_order.index(agent_name),
"total_agents": len(execution_order),
}
# Execute agent with enhanced context
output = await self._execute_agent(
agent_name, task, agent_context
)
outputs[agent_name] = output
# Update historical context with current execution
if output.output:
historical_context.update(
{
f"current_{agent_name}_output": output.output
}
)
# Check for errors
if output.error:
success = False
error = f"Agent {agent_name} failed: {output.error}"
# Try to recover using memory
if similar_executions:
recovery_output = self._attempt_recovery(
agent_name, task, similar_executions
)
if recovery_output:
outputs[agent_name] = recovery_output
success = True
error = None
continue
break
except Exception as agent_error:
logger.error(
f"Error executing agent {agent_name}: {str(agent_error)}"
)
success = False
error = f"Agent {agent_name} failed: {str(agent_error)}"
break
# Create result
result = SwarmOutput(
outputs=outputs,
execution_time=time.time() - start_time,
success=success,
error=error,
metadata={
"task": task,
"used_optimal_sequence": optimal_sequence
is not None,
"similar_executions_found": len(
similar_executions
),
"execution_order": execution_order,
"historical_context_used": bool(
historical_context
),
},
)
# Store execution in memory
await self._store_execution_async(task, result)
return result
except Exception as e:
logger.error(f"Swarm execution failed: {str(e)}")
return SwarmOutput(
outputs=outputs,
execution_time=time.time() - start_time,
success=False,
error=str(e),
metadata={"task": task},
)
def run(self, task: str) -> SwarmOutput:
"""Synchronous interface to execute the swarm."""
return asyncio.run(self.execute(task))
def _extract_success_patterns(
self, similar_executions: List[Dict]
) -> Dict:
"""Extract success patterns from similar executions."""
patterns = {}
successful_execs = [
ex for ex in similar_executions if ex["success"]
]
if successful_execs:
patterns = {
"common_sequences": self._find_common_sequences(
successful_execs
),
"avg_execution_time": sum(
ex["execution_time"] for ex in successful_execs
)
/ len(successful_execs),
"successful_strategies": self._extract_strategies(
successful_execs
),
}
return patterns
def _attempt_recovery(
self,
failed_agent: str,
task: str,
similar_executions: List[Dict],
) -> Optional[AgentOutput]:
"""Attempt to recover from failure using memory."""
for execution in similar_executions:
if (
execution["success"]
and failed_agent in execution["outputs"]
):
historical_output = execution["outputs"][failed_agent]
return AgentOutput(
agent_name=failed_agent,
output=historical_output["output"],
execution_time=historical_output[
"execution_time"
],
metadata={
"recovered_from_memory": True,
"original_task": execution["task"],
},
)
return None
async def _store_execution_async(
self, task: str, result: SwarmOutput
):
"""Asynchronously store execution in memory."""
try:
await asyncio.to_thread(
self.memory.store_execution, task, result
)
except Exception as e:
logger.error(
f"Failed to store execution in memory: {str(e)}"
)
def add_agent(self, agent: Agent, dependencies: List[str] = None):
"""Add a new agent to the swarm."""
dependencies = dependencies or []
self.agents[agent.agent_name] = agent
self.dependencies[agent.agent_name] = dependencies
self.graph.add_node(agent.agent_name, agent=agent)
for dep in dependencies:
if dep not in self.agents:
raise ValueError(f"Dependency {dep} not found")
self.graph.add_edge(dep, agent.agent_name)
self._validate_graph()
if __name__ == "__main__":
try:
# Create agents
data_collector = Agent(
agent_name="Market-Data-Collector",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
trend_analyzer = Agent(
agent_name="Market-Trend-Analyzer",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
report_generator = Agent(
agent_name="Investment-Report-Generator",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
# Create swarm
swarm = GraphSwarm(
agents=[
(data_collector, []),
(trend_analyzer, ["Market-Data-Collector"]),
(report_generator, ["Market-Trend-Analyzer"]),
],
swarm_name="Market Analysis Intelligence Network",
)
# Run the swarm
result = swarm.run(
"Analyze current market trends for tech stocks and provide investment recommendations"
)
# Print results
print(f"Execution success: {result.success}")
print(f"Total time: {result.execution_time:.2f} seconds")
for agent_name, output in result.outputs.items():
print(f"\nAgent: {agent_name}")
print(f"Output: {output.output}")
if output.error:
print(f"Error: {output.error}")
except Exception as error:
logger.error(error)
raise error

@ -0,0 +1,244 @@
import os
import asyncio
from pydantic import BaseModel, Field
from typing import List, Dict, Any
from swarms import Agent
from swarm_models import OpenAIChat
from dotenv import load_dotenv
from swarms.utils.formatter import formatter
# Load environment variables
load_dotenv()
# Get OpenAI API key
api_key = os.getenv("OPENAI_API_KEY")
# Define Pydantic schema for agent outputs
class AgentOutput(BaseModel):
"""Schema for capturing the output of each agent."""
agent_name: str = Field(..., description="The name of the agent")
message: str = Field(
...,
description="The agent's response or contribution to the group chat",
)
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Additional metadata about the agent's response",
)
class GroupChat:
"""
GroupChat class to enable multiple agents to communicate in an asynchronous group chat.
Each agent is aware of all other agents, every message exchanged, and the social context.
"""
def __init__(
self,
name: str,
description: str,
agents: List[Agent],
max_loops: int = 1,
):
"""
Initialize the GroupChat.
Args:
name (str): Name of the group chat.
description (str): Description of the purpose of the group chat.
agents (List[Agent]): A list of agents participating in the chat.
max_loops (int): Maximum number of loops to run through all agents.
"""
self.name = name
self.description = description
self.agents = agents
self.max_loops = max_loops
self.chat_history = (
[]
) # Stores all messages exchanged in the chat
formatter.print_panel(
f"Initialized GroupChat '{self.name}' with {len(self.agents)} agents. Max loops: {self.max_loops}",
title="Groupchat Swarm",
)
async def _agent_conversation(
self, agent: Agent, input_message: str
) -> AgentOutput:
"""
Facilitate a single agent's response to the chat.
Args:
agent (Agent): The agent responding.
input_message (str): The message triggering the response.
Returns:
AgentOutput: The agent's response captured in a structured format.
"""
formatter.print_panel(
f"Agent '{agent.agent_name}' is responding to the message: {input_message}",
title="Groupchat Swarm",
)
response = await asyncio.to_thread(agent.run, input_message)
output = AgentOutput(
agent_name=agent.agent_name,
message=response,
metadata={"context_length": agent.context_length},
)
# logger.debug(f"Agent '{agent.agent_name}' response: {response}")
return output
async def _run(self, initial_message: str) -> List[AgentOutput]:
"""
Execute the group chat asynchronously, looping through all agents up to max_loops.
Args:
initial_message (str): The initial message to start the chat.
Returns:
List[AgentOutput]: The responses of all agents across all loops.
"""
formatter.print_panel(
f"Starting group chat '{self.name}' with initial message: {initial_message}",
title="Groupchat Swarm",
)
self.chat_history.append(
{"sender": "System", "message": initial_message}
)
outputs = []
for loop in range(self.max_loops):
formatter.print_panel(
f"Group chat loop {loop + 1}/{self.max_loops}",
title="Groupchat Swarm",
)
for agent in self.agents:
# Create a custom input message for each agent, sharing the chat history and social context
input_message = (
f"Chat History:\n{self._format_chat_history()}\n\n"
f"Participants:\n"
+ "\n".join(
[
f"- {a.agent_name}: {a.system_prompt}"
for a in self.agents
]
)
+ f"\n\nNew Message: {initial_message}\n\n"
f"You are '{agent.agent_name}'. Remember to keep track of the social context, who is speaking, "
f"and respond accordingly based on your role: {agent.system_prompt}."
)
# Collect agent's response
output = await self._agent_conversation(
agent, input_message
)
outputs.append(output)
# Update chat history with the agent's response
self.chat_history.append(
{
"sender": agent.agent_name,
"message": output.message,
}
)
formatter.print_panel(
"Group chat completed. All agent responses captured.",
title="Groupchat Swarm",
)
return outputs
def run(self, task: str, *args, **kwargs):
return asyncio.run(self.run(task, *args, **kwargs))
def _format_chat_history(self) -> str:
"""
Format the chat history for agents to understand the context.
Returns:
str: The formatted chat history as a string.
"""
return "\n".join(
[
f"{entry['sender']}: {entry['message']}"
for entry in self.chat_history
]
)
def __str__(self) -> str:
"""String representation of the group chat's outputs."""
return self._format_chat_history()
def to_json(self) -> str:
"""JSON representation of the group chat's outputs."""
return [
{"sender": entry["sender"], "message": entry["message"]}
for entry in self.chat_history
]
# Example Usage
if __name__ == "__main__":
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1,
)
# Example agents
agent1 = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial analyst specializing in investment strategies.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
agent2 = Agent(
agent_name="Tax-Adviser-Agent",
system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
# Create group chat
group_chat = GroupChat(
name="Financial Discussion",
description="A group chat for financial analysis and tax advice.",
agents=[agent1, agent2],
)
# Run the group chat
asyncio.run(
group_chat.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria? What do you guys think?"
)
)

@ -86,9 +86,7 @@ class MixtureOfAgents:
self.input_schema = MixtureOfAgentsInput(
name=name,
description=description,
agents=[
agent.to_dict() for agent in self.agents
],
agents=[agent.to_dict() for agent in self.agents],
aggregator_agent=aggregator_agent.to_dict(),
aggregator_system_prompt=self.aggregator_system_prompt,
layers=self.layers,

@ -0,0 +1,276 @@
import asyncio
import pulsar
from pulsar import ConsumerType
from loguru import logger
from swarms import Agent
from typing import List, Dict, Any
import json
class ScalableAsyncAgentSwarm:
"""
A scalable, asynchronous swarm of agents leveraging Apache Pulsar for inter-agent communication.
Provides load balancing, health monitoring, dead letter queues, and centralized logging.
"""
def __init__(
self,
pulsar_url: str,
topic: str,
dlq_topic: str,
agents_config: List[Dict[str, Any]],
):
"""
Initializes the async swarm with agents.
Args:
pulsar_url (str): The URL of the Apache Pulsar broker.
topic (str): The main topic for task distribution.
dlq_topic (str): The Dead Letter Queue topic for failed messages.
agents_config (List[Dict[str, Any]]): List of agent configurations with `name`, `description`, and `model_name`.
"""
self.pulsar_url = pulsar_url
self.topic = topic
self.dlq_topic = dlq_topic
self.agents_config = agents_config
self.client = pulsar.Client(pulsar_url)
self.consumer = self.client.subscribe(
topic,
subscription_name="swarm-task-sub",
consumer_type=ConsumerType.Shared,
)
self.dlq_producer = self.client.create_producer(dlq_topic)
self.response_logger = []
self.agents = [
self.create_agent(config) for config in agents_config
]
self.agent_index = 0
logger.info(
"Swarm initialized with agents: {}",
[agent["name"] for agent in agents_config],
)
def create_agent(
self, agent_config: Dict[str, Any]
) -> Dict[str, Any]:
"""
Creates a new agent configuration with asynchronous capabilities.
Args:
agent_config (Dict[str, Any]): Configuration dictionary with agent details.
Returns:
Dict[str, Any]: A dictionary containing agent metadata and functionality.
"""
agent_name = agent_config["name"]
description = agent_config["description"]
model_name = agent_config.get("model_name", "gpt-4o-mini")
class AsyncAgent:
"""
An asynchronous agent that processes tasks and communicates via Apache Pulsar.
"""
def __init__(
self, name: str, description: str, model_name: str
):
self.name = name
self.description = description
self.agent = Agent(
agent_name=name,
model_name=model_name,
max_loops="auto",
interactive=True,
streaming_on=True,
)
logger.info(
f"Initialized agent '{name}' - {description}"
)
async def process_task(
self, message: str
) -> Dict[str, Any]:
"""
Processes a single task using the agent.
Args:
message (str): The task message.
Returns:
Dict[str, Any]: JSON-formatted response.
"""
try:
logger.info(
f"Agent {self.name} processing task: {message}"
)
response = await asyncio.to_thread(
self.agent.run, message
)
logger.info(f"Agent {self.name} completed task.")
return {
"agent_name": self.name,
"response": response,
}
except Exception as e:
logger.error(
f"Agent {self.name} encountered an error: {e}"
)
return {"agent_name": self.name, "error": str(e)}
return {
"name": agent_name,
"instance": AsyncAgent(
agent_name, description, model_name
),
}
async def distribute_task(self, message: str):
"""
Distributes a task to the next available agent using round-robin.
Args:
message (str): The task message.
"""
agent = self.agents[self.agent_index]
self.agent_index = (self.agent_index + 1) % len(self.agents)
try:
response = await agent["instance"].process_task(message)
self.log_response(response)
except Exception as e:
logger.error(
f"Error processing task by agent {agent['name']}: {e}"
)
self.send_to_dlq(message)
async def monitor_health(self):
"""
Periodically monitors the health of agents.
"""
while True:
logger.info("Performing health check for all agents.")
for agent in self.agents:
logger.info(f"Agent {agent['name']} is online.")
await asyncio.sleep(10)
def send_to_dlq(self, message: str):
"""
Sends a failed message to the Dead Letter Queue (DLQ).
Args:
message (str): The message to send to the DLQ.
"""
try:
self.dlq_producer.send(message.encode("utf-8"))
logger.info("Message sent to Dead Letter Queue.")
except Exception as e:
logger.error(f"Failed to send message to DLQ: {e}")
def log_response(self, response: Dict[str, Any]):
"""
Logs the response to a centralized list for later analysis.
Args:
response (Dict[str, Any]): The agent's response.
"""
self.response_logger.append(response)
logger.info(f"Response logged: {response}")
async def listen_and_distribute(self):
"""
Listens to the main Pulsar topic and distributes tasks to agents.
"""
while True:
msg = self.consumer.receive()
try:
message = msg.data().decode("utf-8")
logger.info(f"Received task: {message}")
await self.distribute_task(message)
self.consumer.acknowledge(msg)
except Exception as e:
logger.error(f"Error processing message: {e}")
self.send_to_dlq(msg.data().decode("utf-8"))
self.consumer.negative_acknowledge(msg)
async def run(self):
"""
Runs the swarm asynchronously with health monitoring and task distribution.
"""
logger.info("Starting the async swarm...")
task_listener = asyncio.create_task(
self.listen_and_distribute()
)
health_monitor = asyncio.create_task(self.monitor_health())
await asyncio.gather(task_listener, health_monitor)
def shutdown(self):
"""
Safely shuts down the swarm and logs all responses.
"""
logger.info("Shutting down the swarm...")
self.client.close()
with open("responses.json", "w") as f:
json.dump(self.response_logger, f, indent=4)
logger.info("Responses saved to 'responses.json'.")
# from scalable_agent_swarm import ScalableAsyncAgentSwarm # Assuming your swarm class is saved here
if __name__ == "__main__":
# Example Configuration
PULSAR_URL = "pulsar://localhost:6650"
TOPIC = "stock-analysis"
DLQ_TOPIC = "stock-analysis-dlq"
# Agents configuration
AGENTS_CONFIG = [
{
"name": "Stock-Analysis-Agent-1",
"description": "Analyzes stock trends.",
"model_name": "gpt-4o-mini",
},
{
"name": "Stock-News-Agent",
"description": "Summarizes stock news.",
"model_name": "gpt-4o-mini",
},
{
"name": "Tech-Trends-Agent",
"description": "Tracks tech sector trends.",
"model_name": "gpt-4o-mini",
},
]
# Tasks to send
TASKS = [
"Analyze the trend for tech stocks in Q4 2024",
"Summarize the latest news on the S&P 500",
"Identify the top-performing sectors in the stock market",
"Provide a forecast for AI-related stocks for 2025",
]
# Initialize and run the swarm
swarm = ScalableAsyncAgentSwarm(
PULSAR_URL, TOPIC, DLQ_TOPIC, AGENTS_CONFIG
)
try:
# Run the swarm in the background
swarm_task = asyncio.create_task(swarm.run())
# Send tasks to the topic
client = pulsar.Client(PULSAR_URL)
producer = client.create_producer(TOPIC)
for task in TASKS:
producer.send(task.encode("utf-8"))
print(f"Sent task: {task}")
producer.close()
client.close()
# Keep the swarm running
asyncio.run(swarm_task)
except KeyboardInterrupt:
swarm.shutdown()

@ -1,5 +1,5 @@
import traceback
import asyncio
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
@ -13,10 +13,10 @@ from swarms.structs.agent import Agent
from swarms.structs.agents_available import showcase_available_agents
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.add_docs_to_agents import handle_input_docs
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="rearrange")
@ -121,16 +121,14 @@ class AgentRearrange(BaseSwarm):
output_type: OutputType = "final",
docs: List[str] = None,
doc_folder: str = None,
device: str = "cpu",
device_id: int = 0,
all_cores: bool = False,
all_gpus: bool = True,
no_use_clusterops: bool = True,
*args,
**kwargs,
):
# reliability_check(
# agents=agents,
# name=name,
# description=description,
# flow=flow,
# max_loops=max_loops,
# )
super(AgentRearrange, self).__init__(
name=name,
description=description,
@ -150,33 +148,11 @@ class AgentRearrange(BaseSwarm):
self.output_type = output_type
self.docs = docs
self.doc_folder = doc_folder
self.swarm_history = {
agent.agent_name: [] for agent in agents
}
self.id = uuid.uuid4().hex if id is None else id
# Output schema
self.input_config = AgentRearrangeInput(
swarm_id=self.id,
name=self.name,
description=self.description,
flow=self.flow,
max_loops=self.max_loops,
output_type=self.output_type,
)
# Output schema
self.output_schema = AgentRearrangeOutput(
Input=self.input_config,
outputs=[],
)
# Run the reliability checks to validate the swarm
# self.handle_input_docs()
# Show the agents whose in the swarm
# self.showcase_agents()
self.device = device
self.device_id = device_id
self.all_cores = all_cores
self.all_gpus = all_gpus
self.no_use_clusterops = no_use_clusterops
def showcase_agents(self):
# Get formatted agent info once
@ -184,12 +160,34 @@ class AgentRearrange(BaseSwarm):
name=self.name,
description=self.description,
agents=self.agents,
format="Table",
)
# Update all agents in one pass using values()
for agent in self.agents.values():
if isinstance(agent, Agent):
agent.system_prompt += agents_available
return agents_available
def rearrange_prompt_prep(self) -> str:
"""Prepares a formatted prompt describing the swarm configuration.
Returns:
str: A formatted string containing the swarm's name, description,
flow pattern, and participating agents.
"""
agents_available = self.showcase_agents()
prompt = f"""
===== Swarm Configuration =====
Name: {self.name}
Description: {self.description}
===== Execution Flow =====
{self.flow}
===== Participating Agents =====
{agents_available}
===========================
"""
return prompt
def set_custom_flow(self, flow: str):
self.flow = flow
@ -322,6 +320,7 @@ class AgentRearrange(BaseSwarm):
current_task = task
all_responses = []
response_dict = {}
previous_agent = None
logger.info(
f"Starting task execution with {len(tasks)} steps"
@ -346,12 +345,19 @@ class AgentRearrange(BaseSwarm):
f"Starting loop {loop_count + 1}/{self.max_loops}"
)
for task in tasks:
for task_idx, task in enumerate(tasks):
is_last = task == tasks[-1]
agent_names = [
name.strip() for name in task.split(",")
]
# Prepare prompt with previous agent info
prompt_prefix = ""
if previous_agent and task_idx > 0:
prompt_prefix = f"Previous agent {previous_agent} output: {current_task}\n"
elif task_idx == 0:
prompt_prefix = "Initial task: "
if len(agent_names) > 1:
# Parallel processing
logger.info(
@ -367,12 +373,14 @@ class AgentRearrange(BaseSwarm):
):
current_task = (
self.custom_human_in_the_loop(
current_task
prompt_prefix
+ str(current_task)
)
)
else:
current_task = input(
"Enter your response:"
prompt_prefix
+ "Enter your response: "
)
results.append(current_task)
response_dict[agent_name] = (
@ -380,13 +388,13 @@ class AgentRearrange(BaseSwarm):
)
else:
agent = self.agents[agent_name]
current_task = (
str(current_task)
task_with_context = (
prompt_prefix + str(current_task)
if current_task
else ""
else prompt_prefix
)
result = agent.run(
task=current_task,
task=task_with_context,
img=img,
is_last=is_last,
*args,
@ -404,6 +412,7 @@ class AgentRearrange(BaseSwarm):
current_task = "; ".join(results)
all_responses.extend(results)
previous_agent = ",".join(agent_names)
else:
# Sequential processing
@ -419,23 +428,25 @@ class AgentRearrange(BaseSwarm):
):
current_task = (
self.custom_human_in_the_loop(
current_task
prompt_prefix
+ str(current_task)
)
)
else:
current_task = input(
"Enter the next task: "
prompt_prefix
+ "Enter the next task: "
)
response_dict[agent_name] = current_task
else:
agent = self.agents[agent_name]
current_task = (
str(current_task)
task_with_context = (
prompt_prefix + str(current_task)
if current_task
else ""
else prompt_prefix
)
current_task = agent.run(
task=current_task,
task=task_with_context,
img=img,
is_last=is_last,
*args,
@ -451,6 +462,7 @@ class AgentRearrange(BaseSwarm):
)
all_responses.append(current_task)
previous_agent = agent_name
loop_count += 1
@ -506,7 +518,11 @@ class AgentRearrange(BaseSwarm):
Returns:
The result from executing the task through the cluster operations wrapper.
"""
if no_use_clusterops:
no_use_clusterops = (
no_use_clusterops or self.no_use_clusterops
)
if no_use_clusterops is True:
return self._run(
task=task,
img=img,

@ -107,7 +107,7 @@ class SequentialWorkflow:
all_cores: bool = False,
all_gpus: bool = False,
device_id: int = 0,
no_use_clusterops: bool = False,
no_use_clusterops: bool = True,
*args,
**kwargs,
) -> str:

@ -12,7 +12,7 @@ def auto_update():
try:
# Check if auto-update is disabled
auto_update_enabled = os.getenv(
"SWARMS_AUTOUPDATE_ON", "true"
"SWARMS_AUTOUPDATE_ON", "false"
).lower()
if auto_update_enabled == "false":
logger.info(

@ -1,7 +1,6 @@
import json
from typing import Any, Dict, List, Union
from termcolor import cprint
from transformers import PreTrainedModel, PreTrainedTokenizer
from pydantic import BaseModel
from swarms.tools.logits_processor import (
@ -68,15 +67,6 @@ class Jsonformer:
self.temperature = temperature
self.max_string_token_length = max_string_token_length
def debug(self, caller: str, value: str, is_prompt: bool = False):
if self.debug_on:
if is_prompt:
cprint(caller, "green", end=" ")
cprint(value, "yellow")
else:
cprint(caller, "green", end=" ")
cprint(value, "blue")
def generate_number(
self, temperature: Union[float, None] = None, iterations=0
):

@ -3,8 +3,7 @@ from typing import Any, List
import inspect
from typing import Callable
from termcolor import colored
from swarms.utils.formatter import formatter
def scrape_tool_func_docs(fn: Callable) -> str:
@ -37,17 +36,16 @@ def scrape_tool_func_docs(fn: Callable) -> str:
f" {inspect.getdoc(fn)}\nParameters:\n{parameters_str}"
)
except Exception as error:
print(
colored(
(
formatter.print_panel(
f"Error scraping tool function docs {error} try"
" optimizing your inputs with different"
" variables and attempt once more."
),
"red",
)
)
raise error
def tool_find_by_name(tool_name: str, tools: List[Any]):
"""Find the tool by name"""

@ -63,40 +63,40 @@ def any_to_str(data: Union[str, Dict, List, Tuple, Any]) -> str:
return f"Error converting data: {str(e)}"
def main():
# Example 1: Dictionary
print("Dictionary:")
print(
any_to_str(
{
"name": "John",
"age": 30,
"hobbies": ["reading", "hiking"],
}
)
)
print("\nNested Dictionary:")
print(
any_to_str(
{
"user": {
"id": 123,
"details": {"city": "New York", "active": True},
},
"data": [1, 2, 3],
}
)
)
print("\nList and Tuple:")
print(any_to_str([1, "text", None, (1, 2)]))
print(any_to_str((True, False, None)))
print("\nEmpty Collections:")
print(any_to_str([]))
print(any_to_str({}))
if __name__ == "__main__":
main()
# def main():
# # Example 1: Dictionary
# print("Dictionary:")
# print(
# any_to_str(
# {
# "name": "John",
# "age": 30,
# "hobbies": ["reading", "hiking"],
# }
# )
# )
# print("\nNested Dictionary:")
# print(
# any_to_str(
# {
# "user": {
# "id": 123,
# "details": {"city": "New York", "active": True},
# },
# "data": [1, 2, 3],
# }
# )
# )
# print("\nList and Tuple:")
# print(any_to_str([1, "text", None, (1, 2)]))
# print(any_to_str((True, False, None)))
# print("\nEmpty Collections:")
# print(any_to_str([]))
# print(any_to_str({}))
# if __name__ == "__main__":
# main()

@ -4,7 +4,6 @@ from functools import wraps
from typing import Any, Callable
import psutil
from loguru import logger
from pydantic import BaseModel
from swarms.utils.loguru_logger import initialize_logger

@ -0,0 +1,105 @@
try:
from litellm import completion
except ImportError:
import subprocess
subprocess.check_call(["pip", "install", "litellm"])
import litellm
from litellm import completion
litellm.set_verbose = True
class LiteLLM:
"""
This class represents a LiteLLM.
It is used to interact with the LLM model for various tasks.
"""
def __init__(
self,
model_name: str = "gpt-4o",
system_prompt: str = None,
stream: bool = False,
temperature: float = 0.5,
max_tokens: int = 4000,
):
"""
Initialize the LiteLLM with the given parameters.
Args:
model_name (str, optional): The name of the model to use. Defaults to "gpt-4o".
system_prompt (str, optional): The system prompt to use. Defaults to None.
stream (bool, optional): Whether to stream the output. Defaults to False.
temperature (float, optional): The temperature for the model. Defaults to 0.5.
max_tokens (int, optional): The maximum number of tokens to generate. Defaults to 4000.
"""
self.model_name = model_name
self.system_prompt = system_prompt
self.stream = stream
self.temperature = temperature
self.max_tokens = max_tokens
def _prepare_messages(self, task: str) -> list:
"""
Prepare the messages for the given task.
Args:
task (str): The task to prepare messages for.
Returns:
list: A list of messages prepared for the task.
"""
messages = []
if self.system_prompt: # Check if system_prompt is not None
messages.append(
{"role": "system", "content": self.system_prompt}
)
messages.append({"role": "user", "content": task})
return messages
def run(self, task: str, *args, **kwargs):
"""
Run the LLM model for the given task.
Args:
task (str): The task to run the model for.
*args: Additional positional arguments to pass to the model.
**kwargs: Additional keyword arguments to pass to the model.
Returns:
str: The content of the response from the model.
"""
messages = self._prepare_messages(task)
response = completion(
model=self.model_name,
messages=messages,
stream=self.stream,
temperature=self.temperature,
# max_completion_tokens=self.max_tokens,
max_tokens=self.max_tokens,
*args,
**kwargs,
)
content = response.choices[
0
].message.content # Accessing the content
return content
def __call__(self, task: str, *args, **kwargs):
"""
Call the LLM model for the given task.
Args:
task (str): The task to run the model for.
*args: Additional positional arguments to pass to the model.
**kwargs: Additional keyword arguments to pass to the model.
Returns:
str: The content of the response from the model.
"""
return self.run(task, *args, **kwargs)

@ -1,4 +1,4 @@
from termcolor import colored
from swarms.utils.formatter import formatter
def display_markdown_message(message: str, color: str = "cyan"):
@ -12,13 +12,10 @@ def display_markdown_message(message: str, color: str = "cyan"):
if line == "":
print()
elif line == "---":
print(colored("-" * 50, color))
formatter.print_panel("-" * 50)
else:
print(colored(line, color))
formatter.print_panel(line)
if "\n" not in message and message.startswith(">"):
# Aesthetic choice. For these tags, they need a space below them
print()
# display_markdown_message("I love you and you are beautiful.", "cyan")

@ -1,50 +1,64 @@
import re
def extract_code_from_markdown(markdown_content: str) -> str:
def extract_code_blocks_with_language(markdown_text: str):
"""
Extracts code blocks from a Markdown string and returns them as a single string.
Extracts all code blocks from Markdown text along with their languages.
Args:
- markdown_content (str): The Markdown content as a string.
markdown_text (str): The input Markdown text.
Returns:
- str: A single string containing all the code blocks separated by newlines.
list[dict]: A list of dictionaries, each containing:
- 'language': The detected language (or 'plaintext' if none specified).
- 'content': The content of the code block.
"""
# Regular expression for fenced code blocks with optional language specifier
pattern = r"```(?:\w+\n)?(.*?)```"
# Regex pattern to match code blocks and optional language specifiers
pattern = r"```(\w+)?\n(.*?)```"
# Check if markdown_content is a string
if not isinstance(markdown_content, str):
raise TypeError("markdown_content must be a string")
# Find all matches (language and content)
matches = re.findall(pattern, markdown_text, re.DOTALL)
# Find all matches of the pattern
matches = re.finditer(pattern, markdown_content, re.DOTALL)
# Extract the content inside the backticks
# Parse results
code_blocks = []
for match in matches:
code_block = match.group(1).strip()
# Remove any leading or trailing whitespace from the code block
code_block = code_block.strip()
# Remove any empty lines from the code block
code_block = "\n".join(
[line for line in code_block.split("\n") if line.strip()]
for language, content in matches:
language = (
language.strip() if language else "plaintext"
) # Default to 'plaintext'
code_blocks.append(
{"language": language, "content": content.strip()}
)
code_blocks.append(code_block)
# Concatenate all code blocks separated by newlines
if code_blocks:
return "\n\n".join(code_blocks)
else:
return ""
return code_blocks
def extract_code_from_markdown(
markdown_text: str, language: str = None
):
"""
Extracts content of code blocks for a specific language or all blocks if no language specified.
Args:
markdown_text (str): The input Markdown text.
language (str, optional): The language to filter by (e.g., 'yaml', 'python').
# example = """
# hello im an agent
# ```bash
# pip install swarms
# ```
# """
Returns:
str: The concatenated content of matched code blocks or an empty string if none found.
"""
# Get all code blocks with detected languages
code_blocks = extract_code_blocks_with_language(markdown_text)
# Filter by language if specified
if language:
code_blocks = [
block["content"]
for block in code_blocks
if block["language"] == language
]
else:
code_blocks = [
block["content"] for block in code_blocks
] # Include all blocks
# print(extract_code_from_markdown(example)) # Output: { "type": "function", "function": { "name": "fetch_financial_news", "parameters": { "query": "Nvidia news", "num_articles": 5 } } }
# Return concatenated content
return "\n\n".join(code_blocks) if code_blocks else ""

@ -1,14 +1,12 @@
import sys
from swarms.utils.try_except_wrapper import try_except_wrapper
try:
import pypdf
except ImportError:
print(
"pypdf not installed. Please install it using: pip install"
" pypdf"
)
sys.exit(1)
import subprocess
subprocess.check_call(["python", "-m", "pip", "install", "pypdf"])
import pypdf
@try_except_wrapper

@ -1,51 +0,0 @@
import json
import yaml
def remove_whitespace_from_json(json_string: str) -> str:
"""
Removes unnecessary whitespace from a JSON string.
This function parses the JSON string into a Python object and then
serializes it back into a JSON string without unnecessary whitespace.
Args:
json_string (str): The JSON string.
Returns:
str: The JSON string with whitespace removed.
"""
parsed = json.loads(json_string)
return json.dumps(parsed, separators=(",", ":"))
# # Example usage for JSON
# json_string = '{"field1": 123, "field2": "example text"}'
# print(remove_whitespace_from_json(json_string))
def remove_whitespace_from_yaml(yaml_string: str) -> str:
"""
Removes unnecessary whitespace from a YAML string.
This function parses the YAML string into a Python object and then
serializes it back into a YAML string with minimized whitespace.
Note: This might change the representation style of YAML data.
Args:
yaml_string (str): The YAML string.
Returns:
str: The YAML string with whitespace reduced.
"""
parsed = yaml.safe_load(yaml_string)
return yaml.dump(parsed, default_flow_style=True)
# # Example usage for YAML
# yaml_string = """
# field1: 123
# field2: example text
# """
# print(remove_whitespace_from_yaml(yaml_string))

@ -0,0 +1,53 @@
import concurrent.futures
from typing import List, Union
from swarms.structs.agent import Agent
def update_system_prompts(
agents: List[Union[Agent, str]],
prompt: str,
) -> List[Agent]:
"""
Update system prompts for a list of agents concurrently.
Args:
agents: List of Agent objects or strings to update
prompt: The prompt text to append to each agent's system prompt
Returns:
List of updated Agent objects
"""
if not agents:
return agents
def update_agent_prompt(agent: Union[Agent, str]) -> Agent:
# Convert string to Agent if needed
if isinstance(agent, str):
agent = Agent(
agent_name=agent,
system_prompt=prompt, # Initialize with the provided prompt
)
else:
# Preserve existing prompt and append new one
existing_prompt = (
agent.system_prompt if agent.system_prompt else ""
)
agent.system_prompt = existing_prompt + "\n" + prompt
return agent
# Use ThreadPoolExecutor for concurrent execution
max_workers = min(len(agents), 4) # Reasonable thread count
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures = []
for agent in agents:
future = executor.submit(update_agent_prompt, agent)
futures.append(future)
# Collect results as they complete
updated_agents = []
for future in concurrent.futures.as_completed(futures):
updated_agents.append(future.result())
return updated_agents
Loading…
Cancel
Save