[NEW DOCS] [Updated swarms api documentation] [IMPR] [Improved swarms output types and automatic port]

pull/864/head
Kye Gomez 3 days ago
parent e67f4161f3
commit 41c7004dcd

3
.gitignore vendored

@ -13,11 +13,14 @@ target/
Cargo.lock
.pytest_cache
static/generated
conversations/
runs
Financial-Analysis-Agent_state.json
conversations/
experimental
ffn_alternatives
artifacts_five
experimental/
encryption
errors
chroma

@ -13,11 +13,17 @@ The Swarms API provides a robust, scalable infrastructure for deploying and mana
Key capabilities include:
- **Intelligent Swarm Management**: Create and execute swarms of specialized AI agents that collaborate to solve complex tasks
- **Automatic Agent Generation**: Dynamically create optimized agents based on task requirements
- **Multiple Swarm Architectures**: Choose from various swarm patterns to match your specific workflow needs
- **Scheduled Execution**: Set up automated, scheduled swarm executions
- **Comprehensive Logging**: Track and analyze all API interactions
- **Cost Management**: Predictable, transparent pricing with optimized resource utilization
- **Enterprise Security**: Full API key authentication and management
Swarms API is designed for production use cases requiring sophisticated AI orchestration, with applications in finance, healthcare, legal, research, and other domains where complex reasoning and multi-agent collaboration are needed.
@ -48,6 +54,7 @@ API keys can be obtained and managed at [https://swarms.world/platform/api-keys]
| `/v1/swarms/available` | GET | Get all available swarms as a list of strings |
| `/v1/models/available` | GET | Get all available models as a list of strings |
| `/v1/agent/completions` | POST | Run a single agent with specified configuration |
| `/v1/agent/batch/completions` | POST | Run a batch of individual agent completions|
@ -481,6 +488,181 @@ curl -X GET "https://api.swarms.world/v1/swarm/logs" \
}
```
## Individual Agent Endpoints
### Run Single Agent
### AgentCompletion Model
The `AgentCompletion` model defines the configuration for running a single agent task.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| `agent_config` | AgentSpec | The configuration of the agent to be completed | Yes |
| `task` | string | The task to be completed by the agent | Yes |
| `history` | Dict[str, Any] | The history of the agent's previous tasks and responses | No |
### AgentSpec Model
The `AgentSpec` model defines the configuration for an individual agent.
| Field | Type | Default | Description | Required |
|-------|------|---------|-------------|----------|
| `agent_name` | string | None | The unique name assigned to the agent | Yes |
| `description` | string | None | Detailed explanation of the agent's purpose | No |
| `system_prompt` | string | None | Initial instruction provided to the agent | No |
| `model_name` | string | "gpt-4o-mini" | Name of the AI model to use | No |
| `auto_generate_prompt` | boolean | false | Whether to auto-generate prompts | No |
| `max_tokens` | integer | 8192 | Maximum tokens in response | No |
| `temperature` | float | 0.5 | Controls randomness (0-1) | No |
| `role` | string | "worker" | Role of the agent | No |
| `max_loops` | integer | 1 | Maximum iterations | No |
| `tools_list_dictionary` | List[Dict] | None | Available tools for the agent | No |
| `mcp_url` | string | None | URL for Model Control Protocol | No |
Execute a task using a single agent with specified configuration.
**Endpoint**: `/v1/agent/completions`
**Method**: POST
**Rate Limit**: 100 requests per 60 seconds
**Request Body**:
```json
{
"agent_config": {
"agent_name": "Research Assistant",
"description": "Specialized in research and analysis",
"system_prompt": "You are an expert research assistant.",
"model_name": "gpt-4o",
"auto_generate_prompt": false,
"max_tokens": 8192,
"temperature": 0.5,
"role": "worker",
"max_loops": 1,
"tools_list_dictionary": [
{
"name": "search",
"description": "Search the web for information",
"parameters": {
"query": "string"
}
}
],
"mcp_url": "https://example-mcp.com"
},
"task": "Research the latest developments in quantum computing and summarize key findings",
"history": {
"previous_research": "Earlier findings on quantum computing basics...",
"user_preferences": "Focus on practical applications..."
}
}
```
**Response**:
```json
{
"id": "agent-abc123xyz",
"success": true,
"name": "Research Assistant",
"description": "Specialized in research and analysis",
"temperature": 0.5,
"outputs": {
"research_summary": "...",
"key_findings": [
"..."
]
},
"usage": {
"input_tokens": 450,
"output_tokens": 850,
"total_tokens": 1300,
"mcp_url": 0.1
},
"timestamp": "2024-03-05T12:34:56.789Z"
}
```
#### Run Batch Agents
Execute multiple agent tasks in parallel.
**Endpoint**: `/v1/agent/batch/completions`
**Method**: POST
**Rate Limit**: 100 requests per 60 seconds
**Maximum Batch Size**: 10 requests
**Input** A list of `AgentCompeletion` inputs
**Request Body**:
```json
[
{
"agent_config": {
"agent_name": "Market Analyst",
"description": "Expert in market analysis",
"system_prompt": "You are a financial market analyst.",
"model_name": "gpt-4o",
"temperature": 0.3
},
"task": "Analyze the current market trends in AI technology sector"
},
{
"agent_config": {
"agent_name": "Technical Writer",
"description": "Specialized in technical documentation",
"system_prompt": "You are a technical documentation expert.",
"model_name": "gpt-4o",
"temperature": 0.7
},
"task": "Create a technical guide for implementing OAuth2 authentication"
}
]
```
**Response**:
```json
{
"batch_id": "agent-batch-xyz789",
"total_requests": 2,
"execution_time": 15.5,
"timestamp": "2024-03-05T12:34:56.789Z",
"results": [
{
"id": "agent-abc123",
"success": true,
"name": "Market Analyst",
"outputs": {
"market_analysis": "..."
},
"usage": {
"input_tokens": 300,
"output_tokens": 600,
"total_tokens": 900
}
},
{
"id": "agent-def456",
"success": true,
"name": "Technical Writer",
"outputs": {
"technical_guide": "..."
},
"usage": {
"input_tokens": 400,
"output_tokens": 800,
"total_tokens": 1200
}
}
]
}
```
-----
## Production Examples
### Python Examples
@ -889,100 +1071,88 @@ Error responses include a detailed message explaining the issue:
## Rate Limiting
The API enforces a rate limit of 100 requests per 60-second window. When exceeded, a 429 status code is returned. Implement appropriate retry logic with exponential backoff in production applications.
| Description | Details |
|-------------|---------|
| Rate Limit | 100 requests per 60-second window |
| Exceed Consequence | 429 status code returned |
| Recommended Action | Implement retry logic with exponential backoff |
## Billing & Cost Management
The API uses a credit-based billing system with costs calculated based on:
1. **Agent Count**: Base cost per agent
2. **Input Tokens**: Cost based on the size of input data and prompts
3. **Output Tokens**: Cost based on the length of generated responses
4. **Time of Day**: Reduced rates during nighttime hours (8 PM to 6 AM PT)
Cost information is included in each response's metadata for transparency and forecasting.
| Cost Factor | Description |
|-------------|-------------|
| Agent Count | Base cost per agent |
| Input Tokens | Cost based on size of input data and prompts |
| Output Tokens | Cost based on length of generated responses |
| Time of Day | Reduced rates during nighttime hours (8 PM to 6 AM PT) |
| Cost Information | Included in each response's metadata |
## Best Practices
1. **Task Description**
- Provide detailed, specific task descriptions
- Include all necessary context and constraints
- Structure complex inputs for easier processing
2. **Agent Configuration**
- For simple tasks, use `AutoSwarmBuilder` to automatically generate optimal agents
- For complex or specialized tasks, manually define agents with specific expertise
- Use appropriate `swarm_type` for your workflow pattern
3. **Production Implementation**
- Implement robust error handling and retries
### Task Description
- Log API responses for debugging and auditing
| Practice | Description |
|----------|-------------|
| Detail | Provide detailed, specific task descriptions |
| Context | Include all necessary context and constraints |
| Structure | Structure complex inputs for easier processing |
- Monitor costs closely during development and testing
### Agent Configuration
- Use scheduled jobs for recurring tasks instead of continuous polling
| Practice | Description |
|----------|-------------|
| Simple Tasks | Use `AutoSwarmBuilder` for automatic agent generation |
| Complex Tasks | Manually define agents with specific expertise |
| Workflow | Use appropriate `swarm_type` for your workflow pattern |
4. **Cost Optimization**
### Production Implementation
- Batch related tasks when possible
| Practice | Description |
|----------|-------------|
| Error Handling | Implement robust error handling and retries |
| Logging | Log API responses for debugging and auditing |
| Cost Monitoring | Monitor costs closely during development and testing |
| Scheduling | Use scheduled jobs for recurring tasks instead of polling |
- Schedule non-urgent tasks during discount hours
- Carefully scope task descriptions to reduce token usage
- Cache results when appropriate
### Cost Optimization
| Practice | Description |
|----------|-------------|
| Batching | Batch related tasks when possible |
| Scheduling | Schedule non-urgent tasks during discount hours |
| Scoping | Carefully scope task descriptions to reduce token usage |
| Caching | Cache results when appropriate |
## Support
For technical assistance with the Swarms API, please contact:
- Documentation: [https://docs.swarms.world](https://docs.swarms.world)
- Email: kye@swarms.world
- Community Discord: [https://discord.gg/jM3Z6M9uMq](https://discord.gg/jM3Z6M9uMq)
- Swarms Marketplace: [https://swarms.world](https://swarms.world)
- Swarms AI Website: [https://swarms.ai](https://swarms.ai)
| Support Type | Contact Information |
|--------------|---------------------|
| Documentation | [https://docs.swarms.world](https://docs.swarms.world) |
| Email | kye@swarms.world |
| Community | [https://discord.gg/jM3Z6M9uMq](https://discord.gg/jM3Z6M9uMq) |
| Marketplace | [https://swarms.world](https://swarms.world) |
| Website | [https://swarms.ai](https://swarms.ai) |
## Service Tiers
The API offers two service tiers to accommodate different processing needs:
### Standard Tier
- Default processing tier
- Immediate execution
- Higher priority processing
- Standard pricing
- 5-minute timeout limit
| Feature | Description |
|---------|-------------|
| Processing | Default processing tier |
| Execution | Immediate execution |
| Priority | Higher priority processing |
| Pricing | Standard pricing |
| Timeout | 5-minute timeout limit |
### Flex Tier
- Lower cost processing
- Automatic retries (up to 3 attempts)
- Longer timeout (15 minutes)
- 75% discount on token costs
- Best for non-urgent tasks
- Exponential backoff on resource contention
To use the flex tier, set `service_tier: "flex"` in your SwarmSpec configuration.
| Feature | Description |
|---------|-------------|
| Cost | Lower cost processing |
| Retries | Automatic retries (up to 3 attempts) |
| Timeout | 15-minute timeout |
| Discount | 75% discount on token costs |
| Suitability | Best for non-urgent tasks |
| Backoff | Exponential backoff on resource contention |
| Configuration | Set `service_tier: "flex"` in SwarmSpec |

@ -1,4 +1,6 @@
import time
from swarms import Agent
from swarms.schemas.conversation_schema import ConversationSchema
# Initialize the agent
agent = Agent(
@ -33,11 +35,18 @@ agent = Agent(
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=3,
max_loops=1,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
output_type="all",
output_type="json",
safety_prompt_on=True,
conversation_schema=ConversationSchema(
time_enabled=True,
message_id_on=True,
),
)
print(agent.run("What are the best top 3 etfs for gold coverage?"))
out = agent.run("What are the best top 3 etfs for gold coverage?")
time.sleep(10)
print(out)

@ -0,0 +1,85 @@
from swarms.structs.conversation import (
Conversation,
get_conversation_dir,
)
import os
import shutil
def cleanup_test_conversations():
"""Clean up test conversation files after running the example."""
conv_dir = get_conversation_dir()
if os.path.exists(conv_dir):
shutil.rmtree(conv_dir)
print(
f"\nCleaned up test conversations directory: {conv_dir}"
)
def main():
# Example 1: In-memory only conversation (no saving)
print("\nExample 1: In-memory conversation (no saving)")
conv_memory = Conversation(
name="memory_only_chat",
save_enabled=False, # Don't save to disk
autosave=False,
)
conv_memory.add("user", "This conversation won't be saved!")
conv_memory.display_conversation()
# Example 2: Conversation with autosaving
print("\nExample 2: Conversation with autosaving")
conversation_dir = get_conversation_dir()
print(f"Conversations will be stored in: {conversation_dir}")
conv_autosave = Conversation(
name="autosave_chat",
conversations_dir=conversation_dir,
save_enabled=True, # Enable saving
autosave=True, # Enable autosaving
)
print(f"Created new conversation with ID: {conv_autosave.id}")
print(
f"This conversation is saved at: {conv_autosave.save_filepath}"
)
# Add some messages (each will be autosaved)
conv_autosave.add("user", "Hello! How are you?")
conv_autosave.add(
"assistant",
"I'm doing well, thank you! How can I help you today?",
)
# Example 3: Load from specific file
print("\nExample 3: Load from specific file")
custom_file = os.path.join(conversation_dir, "custom_chat.json")
# Create a conversation and save it to a custom file
conv_custom = Conversation(
name="custom_chat",
save_filepath=custom_file,
save_enabled=True,
)
conv_custom.add("user", "This is a custom saved conversation")
conv_custom.add(
"assistant", "I'll be saved in a custom location!"
)
conv_custom.save_as_json()
# Now load it specifically
loaded_conv = Conversation.load_conversation(
name="custom_chat", load_filepath=custom_file
)
print("Loaded custom conversation:")
loaded_conv.display_conversation()
# List all saved conversations
print("\nAll saved conversations:")
conversations = Conversation.list_conversations(conversation_dir)
for conv_info in conversations:
print(
f"- {conv_info['name']} (ID: {conv_info['id']}, Created: {conv_info['created_at']})"
)
main()

@ -0,0 +1,9 @@
from swarms.structs.agent import Agent
agent = Agent(
agent_name="test",
agent_description="test",
system_prompt="test",
)
print(agent.list_output_types())

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "7.8.3"
version = "7.8.5"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -1,27 +1,17 @@
from typing import Any
from typing import Callable
from tenacity import retry, stop_after_attempt, wait_exponential
from swarms.prompts.prompt_generator import (
prompt_generator_sys_prompt as second_sys_prompt,
)
from swarms.prompts.prompt_generator_optimizer import (
prompt_generator_sys_prompt,
OPENAI_PROMPT_GENERATOR_SYS_PROMPT,
)
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="ape_agent")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def auto_generate_prompt(
task: str = None,
model: Any = None,
max_tokens: int = 4000,
use_second_sys_prompt: bool = True,
model: Callable = None,
*args,
**kwargs,
):
@ -38,16 +28,9 @@ def auto_generate_prompt(
str: The generated prompt.
"""
try:
system_prompt = (
second_sys_prompt.get_prompt()
if use_second_sys_prompt
else prompt_generator_sys_prompt.get_prompt()
)
output = model.run(
system_prompt + task, max_tokens=max_tokens
return model.run(
task=f"{OPENAI_PROMPT_GENERATOR_SYS_PROMPT} \n\n Task: {task}"
)
print(output)
return output
except Exception as e:
logger.error(f"Error generating prompt: {str(e)}")
raise

@ -7,7 +7,7 @@ from loguru import logger
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.malt import majority_voting_prompt
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.utils.any_to_str import any_to_str
from swarms.utils.history_output_formatter import (
history_output_formatter,

@ -22,7 +22,7 @@ from typing import List, Tuple
from loguru import logger
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.utils.history_output_formatter import (
history_output_formatter,
)

@ -7,7 +7,7 @@ from swarms.agents.i_agent import (
IterativeReflectiveExpansion as IREAgent,
)
from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.agents.agent_judge import AgentJudge
agent_types = Literal[

@ -4,7 +4,7 @@ from loguru import logger
from swarms.prompts.reasoning_prompt import REASONING_PROMPT
from swarms.structs.agent import Agent
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,

@ -0,0 +1,38 @@
AGGREGATOR_SYSTEM_PROMPT = """You are a highly skilled Aggregator Agent responsible for analyzing, synthesizing, and summarizing conversations between multiple AI agents. Your primary goal is to distill complex multi-agent interactions into clear, actionable insights.
Key Responsibilities:
1. Conversation Analysis:
- Identify the main topics and themes discussed
- Track the progression of ideas and problem-solving approaches
- Recognize key decisions and turning points in the conversation
- Note any conflicts, agreements, or important conclusions reached
2. Agent Contribution Assessment:
- Evaluate each agent's unique contributions to the discussion
- Highlight complementary perspectives and insights
- Identify any knowledge gaps or areas requiring further exploration
- Recognize patterns in agent interactions and collaborative dynamics
3. Summary Generation Guidelines:
- Begin with a high-level overview of the conversation's purpose and outcome
- Structure the summary in a logical, hierarchical manner
- Prioritize critical information while maintaining context
- Include specific examples or quotes when they significantly impact understanding
- Maintain objectivity while synthesizing different viewpoints
- Highlight actionable insights and next steps if applicable
4. Quality Standards:
- Ensure accuracy in representing each agent's contributions
- Maintain clarity and conciseness without oversimplifying
- Use consistent terminology throughout the summary
- Preserve important technical details and domain-specific language
- Flag any uncertainties or areas needing clarification
5. Output Format:
- Present information in a structured, easy-to-read format
- Use bullet points or sections for better readability when appropriate
- Include a brief conclusion or recommendation section if relevant
- Maintain professional and neutral tone throughout
Remember: Your role is crucial in making complex multi-agent discussions accessible and actionable. Focus on extracting value from the conversation while maintaining the integrity of each agent's contributions.
"""

@ -99,9 +99,6 @@ class Prompt(BaseModel):
if self.autosave:
self._autosave()
if self.auto_generate_prompt and self.llm:
self.auto_generate_prompt()
def edit_prompt(self, new_content: str) -> None:
"""
Edits the prompt content and updates the version control.

@ -0,0 +1,71 @@
from datetime import datetime
from typing import Any, List, Optional
from pydantic import BaseModel, Field
class Usage(BaseModel):
prompt_tokens: Optional[int] = Field(
default=None,
description="Number of tokens used in the prompt",
)
completion_tokens: Optional[int] = Field(
default=None,
description="Number of tokens used in the completion",
)
total_tokens: Optional[int] = Field(
default=None, description="Total number of tokens used"
)
class ModelConfig(BaseModel):
model_name: Optional[str] = Field(
default=None,
description="Name of the model used for generation",
)
temperature: Optional[float] = Field(
default=None,
description="Temperature setting used for generation",
)
top_p: Optional[float] = Field(
default=None, description="Top-p setting used for generation"
)
max_tokens: Optional[int] = Field(
default=None,
description="Maximum number of tokens to generate",
)
frequency_penalty: Optional[float] = Field(
default=None,
description="Frequency penalty used for generation",
)
presence_penalty: Optional[float] = Field(
default=None,
description="Presence penalty used for generation",
)
class AgentCompletionResponse(BaseModel):
id: Optional[str] = Field(
default=None, description="Unique identifier for the response"
)
agent_name: Optional[str] = Field(
default=None,
description="Name of the agent that generated the response",
)
agent_description: Optional[str] = Field(
default=None, description="Description of the agent"
)
outputs: Optional[List[Any]] = Field(
default=None,
description="List of outputs generated by the agent",
)
usage: Optional[Usage] = Field(
default=None, description="Token usage statistics"
)
model_config: Optional[ModelConfig] = Field(
default=None, description="Model configuration"
)
timestamp: Optional[str] = Field(
default_factory=lambda: datetime.now().isoformat(),
description="Timestamp of when the response was generated",
)

@ -0,0 +1,9 @@
from typing import Optional
from pydantic import BaseModel, Field
class ConversationSchema(BaseModel):
time_enabled: Optional[bool] = Field(default=False)
message_id_on: Optional[bool] = Field(default=True)
autosave: Optional[bool] = Field(default=False)
count_tokens: Optional[bool] = Field(default=False)

@ -80,6 +80,8 @@ from swarms.structs.swarming_architectures import (
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.batch_agent_execution import batch_agent_execution
from swarms.structs.ma_blocks import aggregate
__all__ = [
"Agent",
@ -150,4 +152,5 @@ __all__ = [
"AutoSwarmBuilder",
"CouncilAsAJudge",
"batch_agent_execution",
"aggregate",
]

@ -42,7 +42,6 @@ from swarms.schemas.base_schemas import (
)
from swarms.structs.agent_roles import agent_roles
from swarms.structs.conversation import Conversation
from swarms.structs.output_types import OutputType
from swarms.structs.safe_loading import (
SafeLoaderUtils,
SafeStateManager,
@ -78,10 +77,10 @@ from swarms.utils.index import (
format_data_structure,
format_dict_to_string,
)
from swarms.schemas.conversation_schema import ConversationSchema
from swarms.utils.output_types import OutputType
# Utils
# Custom stopping condition
def stop_when_repeats(response: str) -> bool:
# Stop if the word stop appears in the response
return "stop" in response.lower()
@ -406,7 +405,8 @@ class Agent:
safety_prompt_on: bool = False,
random_models_on: bool = False,
mcp_config: Optional[MCPConnection] = None,
top_p: float = 0.90,
top_p: Optional[float] = 0.90,
conversation_schema: Optional[ConversationSchema] = None,
*args,
**kwargs,
):
@ -533,6 +533,7 @@ class Agent:
self.random_models_on = random_models_on
self.mcp_config = mcp_config
self.top_p = top_p
self.conversation_schema = conversation_schema
self._cached_llm = (
None # Add this line to cache the LLM instance
@ -612,10 +613,28 @@ class Agent:
# Initialize the short term memory
memory = Conversation(
system_prompt=prompt,
time_enabled=False,
user=self.user_name,
rules=self.rules,
token_count=False,
token_count=(
self.conversation_schema.count_tokens
if self.conversation_schema
else False
),
message_id_on=(
self.conversation_schema.message_id_on
if self.conversation_schema
else False
),
autosave=(
self.conversation_schema.autosave
if self.conversation_schema
else False
),
time_enabled=(
self.conversation_schema.time_enabled
if self.conversation_schema
else False
),
)
return memory
@ -770,7 +789,7 @@ class Agent:
"No agent details found. Using task as fallback for prompt generation."
)
self.system_prompt = auto_generate_prompt(
task, self.llm
task=task, model=self._cached_llm
)
else:
# Combine all available components
@ -2824,3 +2843,6 @@ class Agent:
f"{tool_response}",
loop_count,
)
def list_output_types(self):
return OutputType

@ -1,18 +1,18 @@
import concurrent.futures
import datetime
import hashlib
import json
import os
import threading
import uuid
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Union,
Literal,
Any,
)
import yaml
@ -33,6 +33,25 @@ def generate_conversation_id():
return str(uuid.uuid4())
def get_conversation_dir():
"""Get the directory for storing conversation logs."""
# Get the current working directory
conversation_dir = os.path.join(os.getcwd(), "conversations")
try:
os.makedirs(conversation_dir, mode=0o755, exist_ok=True)
except Exception as e:
logger.error(
f"Failed to create conversations directory: {str(e)}"
)
# Fallback to the same directory as the script
conversation_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"conversations",
)
os.makedirs(conversation_dir, mode=0o755, exist_ok=True)
return conversation_dir
# Define available providers
providers = Literal["mem0", "in-memory"]
@ -58,10 +77,6 @@ class Conversation(BaseStructure):
save_as_json_bool (bool): Flag to save conversation history as JSON.
token_count (bool): Flag to enable token counting for messages.
conversation_history (list): List to store the history of messages.
cache_enabled (bool): Flag to enable prompt caching.
cache_stats (dict): Statistics about cache usage.
cache_lock (threading.Lock): Lock for thread-safe cache operations.
conversations_dir (str): Directory to store cached conversations.
"""
def __init__(
@ -70,20 +85,21 @@ class Conversation(BaseStructure):
name: str = None,
system_prompt: Optional[str] = None,
time_enabled: bool = False,
autosave: bool = False,
autosave: bool = False, # Changed default to False
save_enabled: bool = False, # New parameter to control if saving is enabled
save_filepath: str = None,
tokenizer: Any = None,
load_filepath: str = None, # New parameter to specify which file to load from
tokenizer: Callable = None,
context_length: int = 8192,
rules: str = None,
custom_rules_prompt: str = None,
user: str = "User:",
auto_save: bool = True,
save_as_yaml: bool = True,
save_as_yaml: bool = False,
save_as_json_bool: bool = False,
token_count: bool = True,
cache_enabled: bool = True,
conversations_dir: Optional[str] = None,
provider: providers = "in-memory",
conversations_dir: Optional[str] = None,
message_id_on: bool = False,
*args,
**kwargs,
):
@ -95,73 +111,88 @@ class Conversation(BaseStructure):
self.system_prompt = system_prompt
self.time_enabled = time_enabled
self.autosave = autosave
self.save_filepath = save_filepath
self.save_enabled = save_enabled
self.conversations_dir = conversations_dir
self.message_id_on = message_id_on
# Handle save filepath
if save_enabled and save_filepath:
self.save_filepath = save_filepath
elif save_enabled and conversations_dir:
self.save_filepath = os.path.join(
conversations_dir, f"{self.id}.json"
)
else:
self.save_filepath = None
self.load_filepath = load_filepath
self.conversation_history = []
self.tokenizer = tokenizer
self.context_length = context_length
self.rules = rules
self.custom_rules_prompt = custom_rules_prompt
self.user = user
self.auto_save = auto_save
self.save_as_yaml = save_as_yaml
self.save_as_json_bool = save_as_json_bool
self.token_count = token_count
self.cache_enabled = cache_enabled
self.provider = provider
self.cache_stats = {
"hits": 0,
"misses": 0,
"cached_tokens": 0,
"total_tokens": 0,
}
self.cache_lock = threading.Lock()
self.conversations_dir = conversations_dir
# Create conversation directory if saving is enabled
if self.save_enabled and self.conversations_dir:
os.makedirs(self.conversations_dir, exist_ok=True)
# Try to load existing conversation or initialize new one
self.setup()
def setup(self):
# Set up conversations directory
self.conversations_dir = (
self.conversations_dir
or os.path.join(
os.path.expanduser("~"), ".swarms", "conversations"
)
)
os.makedirs(self.conversations_dir, exist_ok=True)
# Try to load existing conversation if it exists
conversation_file = os.path.join(
self.conversations_dir, f"{self.name}.json"
)
if os.path.exists(conversation_file):
with open(conversation_file, "r") as f:
saved_data = json.load(f)
# Update attributes from saved data
for key, value in saved_data.get(
"metadata", {}
).items():
if hasattr(self, key):
setattr(self, key, value)
self.conversation_history = saved_data.get(
"history", []
"""Set up the conversation by either loading existing data or initializing new."""
if self.load_filepath and os.path.exists(self.load_filepath):
try:
self.load_from_json(self.load_filepath)
logger.info(
f"Loaded existing conversation from {self.load_filepath}"
)
except Exception as e:
logger.error(f"Failed to load conversation: {str(e)}")
self._initialize_new_conversation()
elif self.save_filepath and os.path.exists(
self.save_filepath
):
try:
self.load_from_json(self.save_filepath)
logger.info(
f"Loaded existing conversation from {self.save_filepath}"
)
except Exception as e:
logger.error(f"Failed to load conversation: {str(e)}")
self._initialize_new_conversation()
else:
# If system prompt is not None, add it to the conversation history
if self.system_prompt is not None:
self.add("System", self.system_prompt)
if self.rules is not None:
self.add(self.user or "User", self.rules)
if self.custom_rules_prompt is not None:
self.add(
self.user or "User", self.custom_rules_prompt
self._initialize_new_conversation()
def _initialize_new_conversation(self):
"""Initialize a new conversation with system prompt and rules."""
if self.system_prompt is not None:
self.add("System", self.system_prompt)
if self.rules is not None:
self.add(self.user or "User", self.rules)
if self.custom_rules_prompt is not None:
self.add(self.user or "User", self.custom_rules_prompt)
if self.tokenizer is not None:
self.truncate_memory_with_tokenizer()
def _autosave(self):
"""Automatically save the conversation if autosave is enabled."""
if self.autosave and self.save_filepath:
try:
self.save_as_json(self.save_filepath)
except Exception as e:
logger.error(
f"Failed to autosave conversation: {str(e)}"
)
# If tokenizer then truncate
if self.tokenizer is not None:
self.truncate_memory_with_tokenizer()
def mem0_provider(self):
try:
from mem0 import AsyncMemory
@ -180,104 +211,10 @@ class Conversation(BaseStructure):
)
return None
def _generate_cache_key(
self, content: Union[str, dict, list]
) -> str:
"""Generate a cache key for the given content.
Args:
content (Union[str, dict, list]): The content to generate a cache key for.
Returns:
str: The cache key.
"""
if isinstance(content, (dict, list)):
content = json.dumps(content, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
def _get_cached_tokens(
self, content: Union[str, dict, list]
) -> Optional[int]:
"""Get the number of cached tokens for the given content.
Args:
content (Union[str, dict, list]): The content to check.
Returns:
Optional[int]: The number of cached tokens, or None if not cached.
"""
if not self.cache_enabled:
return None
with self.cache_lock:
cache_key = self._generate_cache_key(content)
if cache_key in self.cache_stats:
self.cache_stats["hits"] += 1
return self.cache_stats[cache_key]
self.cache_stats["misses"] += 1
return None
def _update_cache_stats(
self, content: Union[str, dict, list], token_count: int
):
"""Update cache statistics for the given content.
Args:
content (Union[str, dict, list]): The content to update stats for.
token_count (int): The number of tokens in the content.
"""
if not self.cache_enabled:
return
with self.cache_lock:
cache_key = self._generate_cache_key(content)
self.cache_stats[cache_key] = token_count
self.cache_stats["cached_tokens"] += token_count
self.cache_stats["total_tokens"] += token_count
def _save_to_cache(self):
"""Save the current conversation state to the cache directory."""
if not self.conversations_dir:
return
conversation_file = os.path.join(
self.conversations_dir, f"{self.name}.json"
)
# Prepare metadata
metadata = {
"id": self.id,
"name": self.name,
"system_prompt": self.system_prompt,
"time_enabled": self.time_enabled,
"autosave": self.autosave,
"save_filepath": self.save_filepath,
"context_length": self.context_length,
"rules": self.rules,
"custom_rules_prompt": self.custom_rules_prompt,
"user": self.user,
"auto_save": self.auto_save,
"save_as_yaml": self.save_as_yaml,
"save_as_json_bool": self.save_as_json_bool,
"token_count": self.token_count,
"cache_enabled": self.cache_enabled,
}
# Prepare data to save
save_data = {
"metadata": metadata,
"history": self.conversation_history,
"cache_stats": self.cache_stats,
}
# Save to file
with open(conversation_file, "w") as f:
json.dump(save_data, f, indent=4)
def add_in_memory(
self,
role: str,
content: Union[str, dict, list],
content: Union[str, dict, list, Any],
*args,
**kwargs,
):
@ -287,39 +224,32 @@ class Conversation(BaseStructure):
role (str): The role of the speaker (e.g., 'User', 'System').
content (Union[str, dict, list]): The content of the message to be added.
"""
# Base message with role
# Base message with role and timestamp
message = {
"role": role,
"content": content,
}
# Handle different content types
if isinstance(content, dict) or isinstance(content, list):
message["content"] = content
elif self.time_enabled:
message["content"] = (
f"Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} \n {content}"
)
else:
message["content"] = content
if self.time_enabled:
message["timestamp"] = datetime.datetime.now().isoformat()
# Check cache for token count
cached_tokens = self._get_cached_tokens(content)
if cached_tokens is not None:
message["token_count"] = cached_tokens
message["cached"] = True
else:
message["cached"] = False
if self.message_id_on:
message["message_id"] = str(uuid.uuid4())
# Add message to appropriate backend
# Add message to conversation history
self.conversation_history.append(message)
if self.token_count is True and not message.get(
"cached", False
):
if self.token_count is True:
self._count_tokens(content, message)
# Save to cache after adding message
self._save_to_cache()
# Autosave after adding message, but only if saving is enabled
if self.autosave and self.save_enabled and self.save_filepath:
try:
self.save_as_json(self.save_filepath)
except Exception as e:
logger.error(
f"Failed to autosave conversation: {str(e)}"
)
def add_mem0(
self,
@ -367,8 +297,6 @@ class Conversation(BaseStructure):
tokens = count_tokens(any_to_str(content))
# Update the message that's already in the conversation history
message["token_count"] = int(tokens)
# Update cache stats
self._update_cache_stats(content, int(tokens))
# If autosave is enabled, save after token count is updated
if self.autosave:
@ -413,7 +341,6 @@ class Conversation(BaseStructure):
index (str): Index of the message to delete.
"""
self.conversation_history.pop(index)
self._save_to_cache()
def update(self, index: str, role, content):
"""Update a message in the conversation history.
@ -427,7 +354,6 @@ class Conversation(BaseStructure):
"role": role,
"content": content,
}
self._save_to_cache()
def query(self, index: str):
"""Query a message in the conversation history.
@ -462,9 +388,26 @@ class Conversation(BaseStructure):
detailed (bool, optional): Flag to display detailed information. Defaults to False.
"""
for message in self.conversation_history:
formatter.print_panel(
f"{message['role']}: {message['content']}\n\n"
)
content = message["content"]
role = message["role"]
# Format the message content
if isinstance(content, (dict, list)):
content = json.dumps(content, indent=2)
# Create the display string
display_str = f"{role}: {content}"
# Add details if requested
if detailed:
display_str += f"\nTimestamp: {message.get('timestamp', 'Unknown')}"
display_str += f"\nMessage ID: {message.get('message_id', 'Unknown')}"
if "token_count" in message:
display_str += (
f"\nTokens: {message['token_count']}"
)
formatter.print_panel(display_str)
def export_conversation(self, filename: str, *args, **kwargs):
"""Export the conversation history to a file.
@ -531,9 +474,47 @@ class Conversation(BaseStructure):
Args:
filename (str): Filename to save the conversation history.
"""
if filename is not None:
with open(filename, "w") as f:
json.dump(self.conversation_history, f)
# Don't save if saving is disabled
if not self.save_enabled:
return
save_path = filename or self.save_filepath
if save_path is not None:
try:
# Prepare metadata
metadata = {
"id": self.id,
"name": self.name,
"created_at": datetime.datetime.now().isoformat(),
"system_prompt": self.system_prompt,
"rules": self.rules,
"custom_rules_prompt": self.custom_rules_prompt,
}
# Prepare save data
save_data = {
"metadata": metadata,
"history": self.conversation_history,
}
# Create directory if it doesn't exist
os.makedirs(
os.path.dirname(save_path),
mode=0o755,
exist_ok=True,
)
# Write directly to file
with open(save_path, "w") as f:
json.dump(save_data, f, indent=2)
# Only log explicit saves, not autosaves
if not self.autosave:
logger.info(
f"Successfully saved conversation to {save_path}"
)
except Exception as e:
logger.error(f"Failed to save conversation: {str(e)}")
def load_from_json(self, filename: str):
"""Load the conversation history from a JSON file.
@ -541,9 +522,32 @@ class Conversation(BaseStructure):
Args:
filename (str): Filename to load from.
"""
if filename is not None:
with open(filename) as f:
self.conversation_history = json.load(f)
if filename is not None and os.path.exists(filename):
try:
with open(filename) as f:
data = json.load(f)
# Load metadata
metadata = data.get("metadata", {})
self.id = metadata.get("id", self.id)
self.name = metadata.get("name", self.name)
self.system_prompt = metadata.get(
"system_prompt", self.system_prompt
)
self.rules = metadata.get("rules", self.rules)
self.custom_rules_prompt = metadata.get(
"custom_rules_prompt", self.custom_rules_prompt
)
# Load conversation history
self.conversation_history = data.get("history", [])
logger.info(
f"Successfully loaded conversation from {filename}"
)
except Exception as e:
logger.error(f"Failed to load conversation: {str(e)}")
raise
def search_keyword_in_conversation(self, keyword: str):
"""Search for a keyword in the conversation history.
@ -600,7 +604,6 @@ class Conversation(BaseStructure):
def clear(self):
"""Clear the conversation history."""
self.conversation_history = []
self._save_to_cache()
def to_json(self):
"""Convert the conversation history to a JSON string.
@ -608,7 +611,7 @@ class Conversation(BaseStructure):
Returns:
str: The conversation history as a JSON string.
"""
return json.dumps(self.conversation_history)
return json.dumps(self.conversation_history, indent=4)
def to_dict(self):
"""Convert the conversation history to a dictionary.
@ -759,79 +762,121 @@ class Conversation(BaseStructure):
"""
self.conversation_history.extend(messages)
def get_cache_stats(self) -> Dict[str, int]:
"""Get statistics about cache usage.
Returns:
Dict[str, int]: Statistics about cache usage.
"""
with self.cache_lock:
return {
"hits": self.cache_stats["hits"],
"misses": self.cache_stats["misses"],
"cached_tokens": self.cache_stats["cached_tokens"],
"total_tokens": self.cache_stats["total_tokens"],
"hit_rate": (
self.cache_stats["hits"]
/ (
self.cache_stats["hits"]
+ self.cache_stats["misses"]
)
if (
self.cache_stats["hits"]
+ self.cache_stats["misses"]
)
> 0
else 0
),
}
def clear_memory(self):
"""Clear the memory of the conversation."""
self.conversation_history = []
@classmethod
def load_conversation(
cls, name: str, conversations_dir: Optional[str] = None
cls,
name: str,
conversations_dir: Optional[str] = None,
load_filepath: Optional[str] = None,
) -> "Conversation":
"""Load a conversation from the cache by name.
"""Load a conversation from saved file by name or specific file.
Args:
name (str): Name of the conversation to load
conversations_dir (Optional[str]): Directory containing cached conversations
conversations_dir (Optional[str]): Directory containing conversations
load_filepath (Optional[str]): Specific file to load from
Returns:
Conversation: The loaded conversation object
"""
return cls(name=name, conversations_dir=conversations_dir)
if load_filepath:
return cls(
name=name,
load_filepath=load_filepath,
save_enabled=False, # Don't enable saving when loading specific file
)
conv_dir = conversations_dir or get_conversation_dir()
# Try loading by name first
filepath = os.path.join(conv_dir, f"{name}.json")
# If not found by name, try loading by ID
if not os.path.exists(filepath):
filepath = os.path.join(conv_dir, f"{name}")
if not os.path.exists(filepath):
logger.warning(
f"No conversation found with name or ID: {name}"
)
return cls(
name=name,
conversations_dir=conv_dir,
save_enabled=True,
)
return cls(
name=name,
conversations_dir=conv_dir,
load_filepath=filepath,
save_enabled=True,
)
@classmethod
def list_cached_conversations(
def list_conversations(
cls, conversations_dir: Optional[str] = None
) -> List[str]:
"""List all cached conversations.
) -> List[Dict[str, str]]:
"""List all saved conversations.
Args:
conversations_dir (Optional[str]): Directory containing cached conversations
conversations_dir (Optional[str]): Directory containing conversations
Returns:
List[str]: List of conversation names (without .json extension)
List[Dict[str, str]]: List of conversation metadata
"""
if conversations_dir is None:
conversations_dir = os.path.join(
os.path.expanduser("~"), ".swarms", "conversations"
)
if not os.path.exists(conversations_dir):
conv_dir = conversations_dir or get_conversation_dir()
if not os.path.exists(conv_dir):
return []
conversations = []
for file in os.listdir(conversations_dir):
if file.endswith(".json"):
conversations.append(
file[:-5]
) # Remove .json extension
return conversations
seen_ids = (
set()
) # Track seen conversation IDs to avoid duplicates
for filename in os.listdir(conv_dir):
if filename.endswith(".json"):
try:
filepath = os.path.join(conv_dir, filename)
with open(filepath) as f:
data = json.load(f)
metadata = data.get("metadata", {})
conv_id = metadata.get("id")
name = metadata.get("name")
created_at = metadata.get("created_at")
# Skip if we've already seen this ID or if required fields are missing
if (
not all([conv_id, name, created_at])
or conv_id in seen_ids
):
continue
seen_ids.add(conv_id)
conversations.append(
{
"id": conv_id,
"name": name,
"created_at": created_at,
"filepath": filepath,
}
)
except json.JSONDecodeError:
logger.warning(
f"Skipping corrupted conversation file: {filename}"
)
continue
except Exception as e:
logger.error(
f"Failed to read conversation {filename}: {str(e)}"
)
continue
def clear_memory(self):
"""Clear the memory of the conversation."""
self.conversation_history = []
# Sort by creation date, newest first
return sorted(
conversations, key=lambda x: x["created_at"], reverse=True
)
# # Example usage

@ -7,7 +7,7 @@ from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.utils.any_to_str import any_to_str
from swarms.utils.formatter import formatter

@ -1,5 +1,5 @@
import os
from typing import List, Literal
from typing import List
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import get_swarms_info
@ -9,23 +9,7 @@ from swarms.utils.history_output_formatter import (
)
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Union, Callable
HistoryOutputType = Literal[
"list",
"dict",
"dictionary",
"string",
"str",
"final",
"last",
"json",
"all",
"yaml",
# "dict-final",
"dict-all-except-first",
"str-all-except-first",
]
from swarms.utils.history_output_formatter import HistoryOutputType
tools = [
{

@ -0,0 +1,84 @@
from swarms.structs.agent import Agent
from typing import List, Callable
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import run_agents_concurrently
from swarms.utils.history_output_formatter import (
history_output_formatter,
HistoryOutputType,
)
from swarms.prompts.agent_conversation_aggregator import (
AGGREGATOR_SYSTEM_PROMPT,
)
def aggregator_agent_task_prompt(
task: str, workers: List[Agent], conversation: Conversation
):
return f"""
Please analyze and summarize the following multi-agent conversation, following your guidelines for comprehensive synthesis:
Conversation Context:
Original Task: {task}
Number of Participating Agents: {len(workers)}
Conversation Content:
{conversation.get_str()}
Please provide a 3,000 word comprehensive summary report of the conversation.
"""
def aggregate(
workers: List[Callable],
task: str = None,
type: HistoryOutputType = "all",
aggregator_model_name: str = "anthropic/claude-3-sonnet-20240229",
):
"""
Aggregate a list of tasks into a single task.
"""
if task is None:
raise ValueError("Task is required in the aggregator block")
if workers is None:
raise ValueError(
"Workers is required in the aggregator block"
)
if not isinstance(workers, list):
raise ValueError("Workers must be a list of Callable")
if not all(isinstance(worker, Callable) for worker in workers):
raise ValueError("Workers must be a list of Callable")
conversation = Conversation()
aggregator_agent = Agent(
agent_name="Aggregator",
agent_description="Expert agent specializing in analyzing and synthesizing multi-agent conversations",
system_prompt=AGGREGATOR_SYSTEM_PROMPT,
max_loops=1,
model_name=aggregator_model_name,
output_type="final",
max_tokens=4000,
)
results = run_agents_concurrently(agents=workers, task=task)
# Zip the results with the agents
for result, agent in zip(results, workers):
conversation.add(content=result, role=agent.agent_name)
final_result = aggregator_agent.run(
task=aggregator_agent_task_prompt(task, workers, conversation)
)
conversation.add(
content=final_result, role=aggregator_agent.agent_name
)
return history_output_formatter(
conversation=conversation, type=type
)

@ -9,7 +9,7 @@ from typing import Any, Callable, List, Optional
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import run_agents_concurrently
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.utils.formatter import formatter
from swarms.utils.loguru_logger import initialize_logger

@ -7,7 +7,7 @@ from swarms.structs.agent import Agent
from swarms.prompts.ag_prompt import aggregator_system_prompt_main
from swarms.utils.loguru_logger import initialize_logger
import concurrent.futures
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,

@ -1,3 +1,4 @@
import concurrent.futures
import asyncio
import os
import threading
@ -5,7 +6,7 @@ from concurrent.futures import (
ThreadPoolExecutor,
)
from dataclasses import dataclass
from typing import Any, Callable, List, Union
from typing import Any, Callable, List, Optional, Union
import psutil
@ -68,44 +69,42 @@ async def run_agents_concurrently_async(
def run_agents_concurrently(
agents: List[AgentType],
task: str,
batch_size: int = None,
max_workers: int = None,
max_workers: Optional[int] = None,
) -> List[Any]:
"""
Optimized concurrent agent runner using both uvloop and ThreadPoolExecutor.
Optimized concurrent agent runner using ThreadPoolExecutor.
Args:
agents: List of Agent instances to run concurrently
task: Task string to execute
batch_size: Number of agents to run in parallel in each batch (defaults to CPU count)
max_workers: Maximum number of threads in the executor (defaults to CPU count * 2)
max_workers: Maximum number of threads in the executor (defaults to 95% of CPU cores)
Returns:
List of outputs from each agent
"""
# Optimize defaults based on system resources
cpu_cores = os.cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
if max_workers is None:
# 95% of the available CPU cores
num_cores = os.cpu_count()
max_workers = int(num_cores * 0.95) if num_cores else 1
results = []
# Get or create event loop
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Create a shared thread pool executor with optimal worker count
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Process agents in batches
for i in range(0, len(agents), batch_size):
batch = agents[i : i + batch_size]
batch_results = loop.run_until_complete(
run_agents_concurrently_async(batch, task, executor)
)
results.extend(batch_results)
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all tasks and get futures
futures = [
executor.submit(agent.run, task) for agent in agents
]
# Wait for all futures to complete and get results
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
# Append the error if an agent fails
results.append(e)
return results

@ -16,7 +16,7 @@ from pydantic import BaseModel, Field
from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.utils.any_to_str import any_to_str
from swarms.utils.history_output_formatter import (
history_output_formatter,

@ -1,6 +0,0 @@
from swarms.utils.history_output_formatter import (
HistoryOutputType as OutputType,
)
# Use the OutputType for type annotations
output_type: OutputType # OutputType now includes 'xml'

@ -15,7 +15,7 @@ from swarms.utils.history_output_formatter import (
from swarms.utils.loguru_logger import initialize_logger
from swarms.telemetry.main import log_agent_data
from swarms.structs.conversation import Conversation
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.structs.multi_agent_exec import get_agents_info
logger = initialize_logger(log_folder="rearrange")

@ -2,7 +2,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Optional
from swarms.structs.agent import Agent
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.structs.rearrange import AgentRearrange
from swarms.utils.loguru_logger import initialize_logger

@ -89,7 +89,7 @@ class SwarmMatcher:
Returns:
np.ndarray: The embedding vector for the text.
"""
import numpy as np
logger.debug(f"Getting embedding for text: {text[:50]}...")
try:
inputs = self.tokenizer(
@ -142,6 +142,7 @@ class SwarmMatcher:
Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score.
"""
import numpy as np
logger.debug(f"Finding best match for task: {task[:50]}...")
try:
task_embedding = self.get_embedding(task)

@ -20,7 +20,7 @@ from swarms.structs.rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from swarms.structs.swarm_matcher import swarm_matcher
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.malt import MALT
from swarms.structs.deep_research_swarm import DeepResearchSwarm

@ -9,7 +9,7 @@ from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.structs.output_types import OutputType
from swarms.utils.output_types import OutputType
logger = initialize_logger(log_folder="swarming_architectures")

@ -17,6 +17,10 @@ from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.try_except_wrapper import try_except_wrapper
from swarms.utils.calculate_func_metrics import profile_func
from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.output_types import HistoryOutputType
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
__all__ = [
@ -35,4 +39,6 @@ __all__ = [
"try_except_wrapper",
"profile_func",
"count_tokens",
"HistoryOutputType",
"history_output_formatter",
]

@ -1,6 +1,5 @@
import secrets
import string
import re
def generate_api_key(prefix: str = "sk-", length: int = 32) -> str:
@ -36,29 +35,3 @@ def generate_api_key(prefix: str = "sk-", length: int = 32) -> str:
api_key = f"{prefix}{random_part}"
return api_key
def validate_api_key(api_key: str, prefix: str = "sk-") -> bool:
"""
Validate if an API key matches the expected format.
Args:
api_key (str): The API key to validate
prefix (str): The expected prefix (default: "sk-")
Returns:
bool: True if the API key is valid, False otherwise
"""
if not isinstance(api_key, str):
return False
# Check if key starts with prefix
if not api_key.startswith(prefix):
return False
# Check if the rest of the key contains only alphanumeric characters
random_part = api_key[len(prefix) :]
if not re.match(r"^[a-zA-Z0-9]+$", random_part):
return False
return True

@ -1,28 +1,11 @@
import yaml
from swarms.structs.conversation import Conversation
from typing import Literal, Union, List, Dict, Any
from typing import Union, List, Dict, Any
from swarms.utils.xml_utils import to_xml_string
HistoryOutputType = Literal[
"list",
"dict",
"dictionary",
"string",
"str",
"final",
"last",
"json",
"all",
"yaml",
"xml",
# "dict-final",
"dict-all-except-first",
"str-all-except-first",
]
from swarms.utils.output_types import HistoryOutputType
def history_output_formatter(
conversation: Conversation, type: HistoryOutputType = "list"
conversation: callable, type: HistoryOutputType = "list"
) -> Union[List[Dict[str, Any]], Dict[str, Any], str]:
if type == "list":
return conversation.return_messages_as_list()

@ -0,0 +1,23 @@
from typing import Literal
HistoryOutputType = Literal[
"list",
"dict",
"dictionary",
"string",
"str",
"final",
"last",
"json",
"all",
"yaml",
"xml",
# "dict-final",
"dict-all-except-first",
"str-all-except-first",
"basemodel",
]
OutputType = HistoryOutputType
output_type: HistoryOutputType # OutputType now includes 'xml'

@ -1,3 +1,5 @@
import os
import concurrent.futures
from typing import List, Optional, Dict, Any
from loguru import logger
@ -131,16 +133,16 @@ class VLLMWrapper:
Returns:
List[str]: List of model responses.
"""
logger.info(
f"Running tasks in batches of size {batch_size}. Total tasks: {len(tasks)}"
)
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i : i + batch_size]
for task in batch:
logger.info(f"Running task: {task}")
results.append(self.run(task))
logger.info("Completed all tasks.")
return results
# Fetch 95% of the available CPU cores
num_cores = os.cpu_count()
num_workers = int(num_cores * 0.95)
with concurrent.futures.ThreadPoolExecutor(
max_workers=num_workers
) as executor:
futures = [
executor.submit(self.run, task) for task in tasks
]
return [
future.result()
for future in concurrent.futures.as_completed(futures)
]

@ -1,127 +0,0 @@
import platform
from typing import Any
from clusterops import (
execute_on_gpu,
execute_on_multiple_gpus,
list_available_gpus,
execute_with_all_cpu_cores,
execute_on_cpu,
)
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="clusterops_wrapper")
def exec_callable_with_clusterops(
device: str = "cpu",
device_id: int = 1,
all_cores: bool = True,
all_gpus: bool = False,
func: callable = None,
enable_logging: bool = True,
*args,
**kwargs,
) -> Any:
"""
Executes a given function on a specified device, either CPU or GPU.
This method attempts to execute a given function on a specified device, either CPU or GPU. It logs the device selection and the number of cores or GPU ID used. If the device is set to CPU, it can use all available cores or a specific core specified by `device_id`. If the device is set to GPU, it uses the GPU specified by `device_id`.
Args:
device (str, optional): The device to use for execution. Defaults to "cpu".
device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True.
all_gpus (bool, optional): If True, uses all available GPUs. Defaults to False.
func (callable): The function to execute.
enable_logging (bool, optional): If True, enables logging. Defaults to True.
*args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method.
Returns:
Any: The result of the execution.
Raises:
ValueError: If an invalid device is specified.
Exception: If any other error occurs during execution.
"""
if func is None:
raise ValueError("A callable function must be provided")
try:
if enable_logging:
logger.info(f"Attempting to run on device: {device}")
device = device.lower()
# Check if the platform is Windows and do nothing if true
if platform.system() == "Windows":
if enable_logging:
logger.info(
"Platform is Windows, not executing on device."
)
return None
if device == "cpu":
if enable_logging:
logger.info("Device set to CPU")
if all_cores:
if enable_logging:
logger.info("Using all CPU cores")
return execute_with_all_cpu_cores(
func, *args, **kwargs
)
if device_id is not None:
if enable_logging:
logger.info(
f"Using specific CPU core: {device_id}"
)
return execute_on_cpu(
device_id, func, *args, **kwargs
)
elif device == "gpu":
if enable_logging:
logger.info("Device set to GPU")
if all_gpus:
if enable_logging:
logger.info("Using all available GPUs")
gpus = [int(gpu) for gpu in list_available_gpus()]
return execute_on_multiple_gpus(
gpus, func, *args, **kwargs
)
if enable_logging:
logger.info(f"Using GPU device ID: {device_id}")
return execute_on_gpu(device_id, func, *args, **kwargs)
else:
raise ValueError(
f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'."
)
except ValueError as e:
if enable_logging:
logger.error(
f"Invalid device or configuration specified: {e}"
)
raise
except Exception as e:
if enable_logging:
logger.error(f"An error occurred during execution: {e}")
raise
# def test_clusterops(x):
# return x + 1
# example = exec_callable_with_clusterops(
# device="cpu",
# all_cores=True,
# func = test_clusterops,
# )
# print(example)

@ -0,0 +1,567 @@
import os
import json
import time
import datetime
import yaml
from swarms.structs.conversation import (
Conversation,
generate_conversation_id,
)
def run_all_tests():
"""Run all tests for the Conversation class"""
test_results = []
def run_test(test_func):
try:
test_func()
test_results.append(f"{test_func.__name__} passed")
except Exception as e:
test_results.append(
f"{test_func.__name__} failed: {str(e)}"
)
def test_basic_initialization():
"""Test basic initialization of Conversation"""
conv = Conversation()
assert conv.id is not None
assert conv.conversation_history is not None
assert isinstance(conv.conversation_history, list)
# Test with custom ID
custom_id = generate_conversation_id()
conv_with_id = Conversation(id=custom_id)
assert conv_with_id.id == custom_id
# Test with custom name
conv_with_name = Conversation(name="Test Conversation")
assert conv_with_name.name == "Test Conversation"
def test_initialization_with_settings():
"""Test initialization with various settings"""
conv = Conversation(
system_prompt="Test system prompt",
time_enabled=True,
autosave=True,
token_count=True,
provider="in-memory",
context_length=4096,
rules="Test rules",
custom_rules_prompt="Custom rules",
user="TestUser:",
save_as_yaml=True,
save_as_json_bool=True,
)
# Test all settings
assert conv.system_prompt == "Test system prompt"
assert conv.time_enabled is True
assert conv.autosave is True
assert conv.token_count is True
assert conv.provider == "in-memory"
assert conv.context_length == 4096
assert conv.rules == "Test rules"
assert conv.custom_rules_prompt == "Custom rules"
assert conv.user == "TestUser:"
assert conv.save_as_yaml is True
assert conv.save_as_json_bool is True
def test_message_manipulation():
"""Test adding, deleting, and updating messages"""
conv = Conversation()
# Test adding messages with different content types
conv.add("user", "Hello") # String content
conv.add("assistant", {"response": "Hi"}) # Dict content
conv.add("system", ["Hello", "Hi"]) # List content
assert len(conv.conversation_history) == 3
assert isinstance(
conv.conversation_history[1]["content"], dict
)
assert isinstance(
conv.conversation_history[2]["content"], list
)
# Test adding multiple messages
conv.add_multiple(
["user", "assistant", "system"],
["Hi", "Hello there", "System message"],
)
assert len(conv.conversation_history) == 6
# Test updating message with different content type
conv.update(0, "user", {"updated": "content"})
assert isinstance(
conv.conversation_history[0]["content"], dict
)
# Test deleting multiple messages
conv.delete(0)
conv.delete(0)
assert len(conv.conversation_history) == 4
def test_message_retrieval():
"""Test message retrieval methods"""
conv = Conversation()
# Add messages in specific order for testing
conv.add("user", "Test message")
conv.add("assistant", "Test response")
conv.add("system", "System message")
# Test query - note: messages might have system prompt prepended
message = conv.query(0)
assert "Test message" in message["content"]
# Test search with multiple results
results = conv.search("Test")
assert (
len(results) >= 2
) # At least two messages should contain "Test"
assert any(
"Test message" in str(msg["content"]) for msg in results
)
assert any(
"Test response" in str(msg["content"]) for msg in results
)
# Test get_last_message_as_string
last_message = conv.get_last_message_as_string()
assert "System message" in last_message
# Test return_messages_as_list
messages_list = conv.return_messages_as_list()
assert (
len(messages_list) >= 3
) # At least our 3 added messages
assert any("Test message" in msg for msg in messages_list)
# Test return_messages_as_dictionary
messages_dict = conv.return_messages_as_dictionary()
assert (
len(messages_dict) >= 3
) # At least our 3 added messages
assert all(isinstance(m, dict) for m in messages_dict)
assert all(
{"role", "content"} <= set(m.keys())
for m in messages_dict
)
# Test get_final_message and content
assert "System message" in conv.get_final_message()
assert "System message" in conv.get_final_message_content()
# Test return_all_except_first
remaining_messages = conv.return_all_except_first()
assert (
len(remaining_messages) >= 2
) # At least 2 messages after removing first
# Test return_all_except_first_string
remaining_string = conv.return_all_except_first_string()
assert isinstance(remaining_string, str)
def test_saving_loading():
"""Test saving and loading conversation"""
# Test with save_enabled
conv = Conversation(
save_enabled=True,
conversations_dir="./test_conversations",
)
conv.add("user", "Test save message")
# Test save_as_json
test_file = os.path.join(
"./test_conversations", "test_conversation.json"
)
conv.save_as_json(test_file)
assert os.path.exists(test_file)
# Test load_from_json
new_conv = Conversation()
new_conv.load_from_json(test_file)
assert len(new_conv.conversation_history) == 1
assert (
new_conv.conversation_history[0]["content"]
== "Test save message"
)
# Test class method load_conversation
loaded_conv = Conversation.load_conversation(
name=conv.id, conversations_dir="./test_conversations"
)
assert loaded_conv.id == conv.id
# Cleanup
os.remove(test_file)
os.rmdir("./test_conversations")
def test_output_formats():
"""Test different output formats"""
conv = Conversation()
conv.add("user", "Test message")
conv.add("assistant", {"response": "Test"})
# Test JSON output
json_output = conv.to_json()
assert isinstance(json_output, str)
parsed_json = json.loads(json_output)
assert len(parsed_json) == 2
# Test dict output
dict_output = conv.to_dict()
assert isinstance(dict_output, list)
assert len(dict_output) == 2
# Test YAML output
yaml_output = conv.to_yaml()
assert isinstance(yaml_output, str)
parsed_yaml = yaml.safe_load(yaml_output)
assert len(parsed_yaml) == 2
# Test return_json
json_str = conv.return_json()
assert isinstance(json_str, str)
assert len(json.loads(json_str)) == 2
def test_memory_management():
"""Test memory management functions"""
conv = Conversation()
# Test clear
conv.add("user", "Test message")
conv.clear()
assert len(conv.conversation_history) == 0
# Test clear_memory
conv.add("user", "Test message")
conv.clear_memory()
assert len(conv.conversation_history) == 0
# Test batch operations
messages = [
{"role": "user", "content": "Message 1"},
{"role": "assistant", "content": "Response 1"},
]
conv.batch_add(messages)
assert len(conv.conversation_history) == 2
# Test truncate_memory_with_tokenizer
if conv.tokenizer: # Only if tokenizer is available
conv.truncate_memory_with_tokenizer()
assert len(conv.conversation_history) > 0
def test_conversation_metadata():
"""Test conversation metadata and listing"""
test_dir = "./test_conversations_metadata"
os.makedirs(test_dir, exist_ok=True)
try:
# Create a conversation with metadata
conv = Conversation(
name="Test Conv",
system_prompt="System",
rules="Rules",
custom_rules_prompt="Custom",
conversations_dir=test_dir,
save_enabled=True,
autosave=True,
)
# Add a message to trigger save
conv.add("user", "Test message")
# Give a small delay for autosave
time.sleep(0.1)
# List conversations and verify
conversations = Conversation.list_conversations(test_dir)
assert len(conversations) >= 1
found_conv = next(
(
c
for c in conversations
if c["name"] == "Test Conv"
),
None,
)
assert found_conv is not None
assert found_conv["id"] == conv.id
finally:
# Cleanup
import shutil
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
def test_time_enabled_messages():
"""Test time-enabled messages"""
conv = Conversation(time_enabled=True)
conv.add("user", "Time test")
# Verify timestamp in message
message = conv.conversation_history[0]
assert "timestamp" in message
assert isinstance(message["timestamp"], str)
# Verify time in content when time_enabled is True
assert "Time:" in message["content"]
def test_provider_specific():
"""Test provider-specific functionality"""
# Test in-memory provider
conv_memory = Conversation(provider="in-memory")
conv_memory.add("user", "Test")
assert len(conv_memory.conversation_history) == 1
# Test mem0 provider if available
try:
conv_mem0 = Conversation(provider="mem0")
conv_mem0.add("user", "Test")
# Add appropriate assertions based on mem0 behavior
except:
pass # Skip if mem0 is not available
def test_tool_output():
"""Test tool output handling"""
conv = Conversation()
tool_output = {
"tool_name": "test_tool",
"output": "test result",
}
conv.add_tool_output_to_agent("tool", tool_output)
assert len(conv.conversation_history) == 1
assert conv.conversation_history[0]["role"] == "tool"
assert conv.conversation_history[0]["content"] == tool_output
def test_autosave_functionality():
"""Test autosave functionality and related features"""
test_dir = "./test_conversations_autosave"
os.makedirs(test_dir, exist_ok=True)
try:
# Test with autosave and save_enabled True
conv = Conversation(
autosave=True,
save_enabled=True,
conversations_dir=test_dir,
name="autosave_test",
)
# Add a message and verify it was auto-saved
conv.add("user", "Test autosave message")
save_path = os.path.join(test_dir, f"{conv.id}.json")
# Give a small delay for autosave to complete
time.sleep(0.1)
assert os.path.exists(
save_path
), f"Save file not found at {save_path}"
# Load the saved conversation and verify content
loaded_conv = Conversation.load_conversation(
name=conv.id, conversations_dir=test_dir
)
found_message = False
for msg in loaded_conv.conversation_history:
if "Test autosave message" in str(msg["content"]):
found_message = True
break
assert (
found_message
), "Message not found in loaded conversation"
# Clean up first conversation files
if os.path.exists(save_path):
os.remove(save_path)
# Test with save_enabled=False
conv_no_save = Conversation(
autosave=False, # Changed to False to prevent autosave
save_enabled=False,
conversations_dir=test_dir,
)
conv_no_save.add("user", "This shouldn't be saved")
save_path_no_save = os.path.join(
test_dir, f"{conv_no_save.id}.json"
)
time.sleep(0.1) # Give time for potential save
assert not os.path.exists(
save_path_no_save
), "File should not exist when save_enabled is False"
finally:
# Cleanup
import shutil
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
def test_advanced_message_handling():
"""Test advanced message handling features"""
conv = Conversation()
# Test adding messages with metadata
metadata = {"timestamp": "2024-01-01", "session_id": "123"}
conv.add("user", "Test with metadata", metadata=metadata)
# Test batch operations with different content types
messages = [
{"role": "user", "content": "Message 1"},
{
"role": "assistant",
"content": {"response": "Complex response"},
},
{"role": "system", "content": ["Multiple", "Items"]},
]
conv.batch_add(messages)
assert (
len(conv.conversation_history) == 4
) # Including the first message
# Test message format consistency
for msg in conv.conversation_history:
assert "role" in msg
assert "content" in msg
if "timestamp" in msg:
assert isinstance(msg["timestamp"], str)
def test_conversation_metadata_handling():
"""Test handling of conversation metadata and attributes"""
test_dir = "./test_conversations_metadata_handling"
os.makedirs(test_dir, exist_ok=True)
try:
# Test initialization with all optional parameters
conv = Conversation(
name="Test Conv",
system_prompt="System Prompt",
time_enabled=True,
context_length=2048,
rules="Test Rules",
custom_rules_prompt="Custom Rules",
user="CustomUser:",
provider="in-memory",
conversations_dir=test_dir,
save_enabled=True,
)
# Verify all attributes are set correctly
assert conv.name == "Test Conv"
assert conv.system_prompt == "System Prompt"
assert conv.time_enabled is True
assert conv.context_length == 2048
assert conv.rules == "Test Rules"
assert conv.custom_rules_prompt == "Custom Rules"
assert conv.user == "CustomUser:"
assert conv.provider == "in-memory"
# Test saving and loading preserves metadata
conv.save_as_json()
# Load using load_conversation
loaded_conv = Conversation.load_conversation(
name=conv.id, conversations_dir=test_dir
)
# Verify metadata was preserved
assert loaded_conv.name == "Test Conv"
assert loaded_conv.system_prompt == "System Prompt"
assert loaded_conv.rules == "Test Rules"
finally:
# Cleanup
import shutil
shutil.rmtree(test_dir)
def test_time_enabled_features():
"""Test time-enabled message features"""
conv = Conversation(time_enabled=True)
# Add message and verify timestamp
conv.add("user", "Time test message")
message = conv.conversation_history[0]
# Verify timestamp format
assert "timestamp" in message
try:
datetime.datetime.fromisoformat(message["timestamp"])
except ValueError:
assert False, "Invalid timestamp format"
# Verify time in content
assert "Time:" in message["content"]
assert (
datetime.datetime.now().strftime("%Y-%m-%d")
in message["content"]
)
def test_provider_specific_features():
"""Test provider-specific features and behaviors"""
# Test in-memory provider
conv_memory = Conversation(provider="in-memory")
conv_memory.add("user", "Test in-memory")
assert len(conv_memory.conversation_history) == 1
assert (
"Test in-memory"
in conv_memory.get_last_message_as_string()
)
# Test mem0 provider if available
try:
from mem0 import AsyncMemory
# Skip actual mem0 testing since it requires async
pass
except ImportError:
pass
# Test invalid provider
invalid_provider = "invalid_provider"
try:
Conversation(provider=invalid_provider)
# If we get here, the provider was accepted when it shouldn't have been
raise AssertionError(
f"Should have raised ValueError for provider '{invalid_provider}'"
)
except ValueError:
# This is the expected behavior
pass
# Run all tests
tests = [
test_basic_initialization,
test_initialization_with_settings,
test_message_manipulation,
test_message_retrieval,
test_saving_loading,
test_output_formats,
test_memory_management,
test_conversation_metadata,
test_time_enabled_messages,
test_provider_specific,
test_tool_output,
test_autosave_functionality,
test_advanced_message_handling,
test_conversation_metadata_handling,
test_time_enabled_features,
test_provider_specific_features,
]
for test in tests:
run_test(test)
# Print results
print("\nTest Results:")
for result in test_results:
print(result)
if __name__ == "__main__":
run_all_tests()

@ -0,0 +1,36 @@
from swarms.structs.agent import Agent
from swarms.structs.ma_blocks import aggregate
agents = [
Agent(
agent_name="Sector-Financial-Analyst",
agent_description="Senior financial analyst at BlackRock.",
system_prompt="You are a financial analyst tasked with optimizing asset allocations for a $50B portfolio. Provide clear, quantitative recommendations for each sector.",
max_loops=1,
model_name="gpt-4o-mini",
max_tokens=3000,
),
Agent(
agent_name="Sector-Risk-Analyst",
agent_description="Expert risk management analyst.",
system_prompt="You are a risk analyst responsible for advising on risk allocation within a $50B portfolio. Provide detailed insights on risk exposures for each sector.",
max_loops=1,
model_name="gpt-4o-mini",
max_tokens=3000,
),
Agent(
agent_name="Tech-Sector-Analyst",
agent_description="Technology sector analyst.",
system_prompt="You are a tech sector analyst focused on capital and risk allocations. Provide data-backed insights for the tech sector.",
max_loops=1,
model_name="gpt-4o-mini",
max_tokens=3000,
),
]
aggregate(
workers=agents,
task="What is the best sector to invest in?",
type="all",
)
Loading…
Cancel
Save