Merge branch 'kyegomez:master' into Graph-of-thought

pull/1202/head
CI-DEV 1 month ago committed by GitHub
commit 6cc298fce8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -58,6 +58,17 @@ Swarms delivers a comprehensive, enterprise-grade multi-agent infrastructure pla
| 🛠️ **Developer Experience** | • Intuitive Enterprise API<br>• Comprehensive Documentation<br>• Active Enterprise Community<br>• CLI & SDK Tools<br>• IDE Integration Support<br>• Code Generation Templates | • Accelerated Development Cycles<br>• Reduced Learning Curve<br>• Expert Community Support<br>• Rapid Deployment Capabilities<br>• Enhanced Developer Productivity<br>• Standardized Development Patterns |
## 🔌 Supported Protocols & Integrations
Swarms seamlessly integrates with industry-standard protocols, enabling powerful capabilities for tool integration, payment processing, and distributed agent orchestration.
| Protocol | Description | Use Cases | Documentation |
|----------|-------------|-----------|---------------|
| **[MCP (Model Context Protocol)](https://docs.swarms.world/en/latest/swarms/examples/multi_mcp_agent/)** | Standardized protocol for AI agents to interact with external tools and services through MCP servers. Enables dynamic tool discovery and execution. | • Tool integration<br>• Multi-server connections<br>• External API access<br>• Database connectivity | [MCP Integration Guide](https://docs.swarms.world/en/latest/swarms/examples/multi_mcp_agent/) |
| **[X402](https://docs.swarms.world/en/latest/examples/x402_payment_integration/)** | Cryptocurrency payment protocol for API endpoints. Enables monetization of agents with pay-per-use models. | • Agent monetization<br>• Payment gate protection<br>• Crypto payments<br>• Pay-per-use services | [X402 Quickstart](https://docs.swarms.world/en/latest/examples/x402_payment_integration/) |
| **[AOP (Agent Orchestration Protocol)](https://docs.swarms.world/en/latest/examples/aop_medical/)** | Framework for deploying and managing agents as distributed services. Enables agent discovery, management, and execution through standardized protocols. | • Distributed agent deployment<br>• Agent discovery<br>• Service orchestration<br>• Scalable multi-agent systems | [AOP Reference](https://docs.swarms.world/en/latest/swarms/structs/aop/) |
## Install 💻
### Using pip
@ -822,7 +833,7 @@ Thank you for contributing to swarms. Your work is extremely appreciated and rec
-----
## Connect With Us
## Join the Swarms community 👾👾👾
Join our community of agent engineers and researchers for technical support, cutting-edge updates, and exclusive access to world-class agent engineering insights!

@ -0,0 +1,231 @@
# X402 Discovery Query Agent
This example demonstrates how to create a Swarms agent that can search and query services from the X402 bazaar using the Coinbase CDP API. The agent can discover available services, filter them by price, and provide summaries of the results.
## Overview
The X402 Discovery Query Agent enables you to:
| Feature | Description |
|---------|-------------|
| Query X402 services | Search the X402 bazaar for available services |
| Filter by price | Find services within your budget |
| Summarize results | Get AI-powered summaries of discovered services |
| Pagination support | Handle large result sets efficiently |
## Prerequisites
Before you begin, ensure you have:
- Python 3.10 or higher
- API keys for your AI model provider (e.g., Anthropic Claude)
- `httpx` library for async HTTP requests
## Installation
Install the required dependencies:
```bash
pip install swarms httpx
```
## Code Example
Here's the complete implementation of the X402 Discovery Query Agent:
```python
import asyncio
from typing import List, Optional, Dict, Any
from swarms import Agent
import httpx
async def query_x402_services(
limit: Optional[int] = None,
max_price: Optional[int] = None,
offset: int = 0,
base_url: str = "https://api.cdp.coinbase.com",
) -> Dict[str, Any]:
"""
Query x402 discovery services from the Coinbase CDP API.
Args:
limit: Optional maximum number of services to return. If None, returns all available.
max_price: Optional maximum price in atomic units to filter by. Only services with
maxAmountRequired <= max_price will be included.
offset: Pagination offset for the API request. Defaults to 0.
base_url: Base URL for the API. Defaults to Coinbase CDP API.
Returns:
Dict containing the API response with 'items' list and pagination info.
Raises:
httpx.HTTPError: If the HTTP request fails.
httpx.RequestError: If there's a network error.
"""
url = f"{base_url}/platform/v2/x402/discovery/resources"
params = {"offset": offset}
# If both limit and max_price are specified, fetch more services to account for filtering
api_limit = limit
if limit is not None and max_price is not None:
# Fetch 5x the limit to account for services that might be filtered out
api_limit = limit * 5
if api_limit is not None:
params["limit"] = api_limit
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
# Filter by price if max_price is specified
if max_price is not None and "items" in data:
filtered_items = []
for item in data.get("items", []):
# Check if any payment option in 'accepts' has maxAmountRequired <= max_price
accepts = item.get("accepts", [])
for accept in accepts:
max_amount_str = accept.get("maxAmountRequired", "")
if max_amount_str:
try:
max_amount = int(max_amount_str)
if max_amount <= max_price:
filtered_items.append(item)
break # Only add item once if any payment option matches
except (ValueError, TypeError):
continue
# Apply limit to filtered results if specified
if limit is not None:
filtered_items = filtered_items[:limit]
data["items"] = filtered_items
# Update pagination total if we filtered
if "pagination" in data:
data["pagination"]["total"] = len(filtered_items)
return data
def get_x402_services_sync(
limit: Optional[int] = None,
max_price: Optional[int] = None,
offset: int = 0,
) -> str:
"""
Synchronous wrapper for get_x402_services that returns a formatted string.
Args:
limit: Optional maximum number of services to return.
max_price: Optional maximum price in atomic units to filter by.
offset: Pagination offset for the API request. Defaults to 0.
Returns:
JSON-formatted string of service dictionaries matching the criteria.
"""
async def get_x402_services():
result = await query_x402_services(
limit=limit, max_price=max_price, offset=offset
)
return result.get("items", [])
services = asyncio.run(get_x402_services())
return str(services)
# Initialize the agent with the discovery tool
agent = Agent(
agent_name="X402-Discovery-Agent",
agent_description="A agent that queries the x402 discovery services from the Coinbase CDP API.",
model_name="claude-haiku-4-5",
dynamic_temperature_enabled=True,
max_loops=1,
dynamic_context_window=True,
tools=[get_x402_services_sync],
top_p=None,
temperature=None,
tool_call_summary=True,
)
if __name__ == "__main__":
# Run the agent
out = agent.run(
task="Summarize the first 10 services under 100000 atomic units (e.g., $0.10 USDC)"
)
print(out)
```
## Usage
### Basic Query
Query all available services:
```python
result = await query_x402_services()
print(f"Found {len(result['items'])} services")
```
### Filtered Query
Get services within a specific price range:
```python
# Get first 10 services under 100000 atomic units ($0.10 USDC with 6 decimals)
services = await get_x402_services(limit=10, max_price=100000)
for service in services:
print(service["resource"])
```
### Using the Agent
Run the agent to get AI-powered summaries:
```python
# The agent will automatically call the tool and provide a summary
out = agent.run(
task="Find and summarize 5 affordable services under 50000 atomic units"
)
print(out)
```
## Understanding Price Units
X402 services use atomic units for pricing. For example:
- **USDC** typically uses 6 decimals
- 100,000 atomic units = $0.10 USDC
- 1,000,000 atomic units = $1.00 USDC
Always check the `accepts` array in each service to understand the payment options and their price requirements.
## API Response Structure
Each service in the response contains:
- `resource`: The service endpoint or resource identifier
- `accepts`: Array of payment options with `maxAmountRequired` values
- Additional metadata about the service
## Error Handling
The functions handle various error cases:
- Network errors are raised as `httpx.RequestError`
- HTTP errors are raised as `httpx.HTTPError`
- Invalid price values are silently skipped during filtering
## Next Steps
1. Customize the agent's system prompt for specific use cases
2. Add additional filtering criteria (e.g., by service type)
3. Implement caching for frequently accessed services
4. Create a web interface for browsing services
5. Integrate with payment processing to actually use discovered services
## Related Documentation
- [X402 Payment Integration](x402_payment_integration.md) - Learn how to monetize your agents with X402
- [Agent Tools Reference](../swarms/tools/tools_examples.md) - Understand how to create and use tools with agents

@ -437,6 +437,7 @@ nav:
- X402:
- x402 Quickstart Example: "examples/x402_payment_integration.md"
- X402 Discovery Query Agent: "examples/x402_discovery_query.md"
- Swarms Cloud API:

@ -290,14 +290,14 @@ task = "Write a short story about a robot who discovers music."
# --- Example 1: SequentialWorkflow ---
# Agents run one after another in a chain: Writer -> Editor -> Reviewer.
print("Running a Sequential Workflow...")
sequential_router = SwarmRouter(swarm_type=SwarmType.SequentialWorkflow, agents=agents)
sequential_router = SwarmRouter(swarm_type="SequentialWorkflow", agents=agents)
sequential_output = sequential_router.run(task)
print(f"Final Sequential Output:\n{sequential_output}\n")
# --- Example 2: ConcurrentWorkflow ---
# All agents receive the same initial task and run at the same time.
print("Running a Concurrent Workflow...")
concurrent_router = SwarmRouter(swarm_type=SwarmType.ConcurrentWorkflow, agents=agents)
concurrent_router = SwarmRouter(swarm_type="ConcurrentWorkflow", agents=agents)
concurrent_outputs = concurrent_router.run(task)
# This returns a dictionary of each agent's output
for agent_name, output in concurrent_outputs.items():
@ -312,9 +312,9 @@ aggregator = Agent(
model_name="gpt-4o-mini"
)
moa_router = SwarmRouter(
swarm_type=SwarmType.MixtureOfAgents,
swarm_type="MixtureOfAgents",
agents=agents,
aggregator_agent=aggregator, # MoA requires an aggregator
aggregator_agent=aggregator,
)
aggregated_output = moa_router.run(task)
print(f"Final Aggregated Output:\n{aggregated_output}\n")

@ -24,7 +24,7 @@ mkdocs-autolinks-plugin
# Requirements for core
jinja2~=3.1
markdown~=3.8
markdown~=3.10
mkdocs-material-extensions~=1.3
pygments~=2.19
pymdown-extensions~=10.16

@ -29,7 +29,7 @@ GROQ_API_KEY=""
```python
from swarms import Agent
from swarms.structs.swarm_router import SwarmRouter, SwarmType
from swarms.structs.swarm_router import SwarmRouter
# Initialize specialized agents
data_extractor_agent = Agent(
@ -61,7 +61,7 @@ sequential_router = SwarmRouter(
name="SequentialRouter",
description="Process tasks in sequence",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
swarm_type=SwarmType.SequentialWorkflow,
swarm_type="SequentialWorkflow",
max_loops=1
)
@ -76,7 +76,7 @@ concurrent_router = SwarmRouter(
name="ConcurrentRouter",
description="Process tasks concurrently",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
swarm_type=SwarmType.ConcurrentWorkflow,
swarm_type="ConcurrentWorkflow",
max_loops=1
)
@ -91,8 +91,8 @@ rearrange_router = SwarmRouter(
name="RearrangeRouter",
description="Dynamically rearrange agents for optimal task processing",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
swarm_type=SwarmType.AgentRearrange,
flow=f"{data_extractor_agent.agent_name} -> {summarizer_agent.agent_name} -> {financial_analyst_agent.agent_name}",
swarm_type="AgentRearrange",
rearrange_flow=f"{data_extractor_agent.agent_name} -> {summarizer_agent.agent_name} -> {financial_analyst_agent.agent_name}",
max_loops=1
)
@ -107,7 +107,7 @@ mixture_router = SwarmRouter(
name="MixtureRouter",
description="Combine multiple expert agents",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
swarm_type=SwarmType.MixtureOfAgents,
swarm_type="MixtureOfAgents",
max_loops=1
)
@ -137,7 +137,7 @@ router = SwarmRouter(
name="CustomRouter",
description="Custom router configuration",
agents=[data_extractor_agent, summarizer_agent, financial_analyst_agent],
swarm_type=SwarmType.SequentialWorkflow,
swarm_type="SequentialWorkflow",
max_loops=3,
autosave=True,
verbose=True,
@ -145,6 +145,27 @@ router = SwarmRouter(
)
```
# SwarmType Reference
## Valid SwarmType Values
| Value | Description |
|-------|-------------|
| `"SequentialWorkflow"` | Execute agents in sequence |
| `"ConcurrentWorkflow"` | Execute agents concurrently |
| `"AgentRearrange"` | Dynamically rearrange agent execution order |
| `"MixtureOfAgents"` | Combine outputs from multiple agents |
| `"GroupChat"` | Enable group chat between agents |
| `"MultiAgentRouter"` | Route tasks to appropriate agents |
| `"AutoSwarmBuilder"` | Automatically build swarm configuration |
| `"HiearchicalSwarm"` | Hierarchical agent organization |
| `"MajorityVoting"` | Use majority voting for decisions |
| `"MALT"` | Multi-Agent Learning and Training |
| `"CouncilAsAJudge"` | Council-based evaluation system |
| `"InteractiveGroupChat"` | Interactive group chat with agents |
| `"HeavySwarm"` | Heavy swarm for complex tasks |
| `"auto"` | Automatically select swarm type |
# Best Practices
## Choose the appropriate swarm type based on your task requirements:
@ -187,7 +208,7 @@ Here's a complete example showing how to use SwarmRouter in a real-world scenari
```python
import os
from swarms import Agent
from swarms.structs.swarm_router import SwarmRouter, SwarmType
from swarms.structs.swarm_router import SwarmRouter
# Initialize specialized agents
research_agent = Agent(
@ -216,7 +237,7 @@ router = SwarmRouter(
name="ResearchAnalysisRouter",
description="Process research and analysis tasks",
agents=[research_agent, analysis_agent, summary_agent],
swarm_type=SwarmType.SequentialWorkflow,
swarm_type="SequentialWorkflow",
max_loops=1,
verbose=True
)

@ -83,6 +83,7 @@ The `Agent` class establishes a conversational loop with a language model, allow
| `traceback` | `Optional[Any]` | Object used for traceback handling. |
| `traceback_handlers` | `Optional[Any]` | List of traceback handlers. |
| `streaming_on` | `Optional[bool]` | Boolean indicating whether to stream responses. |
| `stream` | `Optional[bool]` | Boolean indicating whether to enable detailed token-by-token streaming with metadata. |
| `docs` | `List[str]` | List of document paths or contents to be ingested. |
| `docs_folder` | `Optional[str]` | Path to a folder containing documents to be ingested. |
| `verbose` | `Optional[bool]` | Boolean indicating whether to print verbose output. |
@ -759,6 +760,22 @@ print(agent.system_prompt)
```
### Token-by-Token Streaming
```python
from swarms import Agent
# Initialize agent with detailed streaming
agent = Agent(
model_name="gpt-4.1",
max_loops=1,
stream=True, # Enable detailed token-by-token streaming
)
# Run with detailed streaming - each token shows metadata
agent.run("Tell me a short story about a robot learning to paint.")
```
## Agent Structured Outputs
- Create a structured output schema for the agent [List[Dict]]

@ -40,7 +40,6 @@ The `execution_type` parameter controls how the AutoSwarmBuilder operates:
| Execution Type | Description |
|----------------------------------|-----------------------------------------------------------|
| **"return-agents"** | Creates and returns agent specifications as a dictionary (default) |
| **"execute-swarm-router"** | Executes the swarm router with the created agents |
| **"return-swarm-router-config"** | Returns the swarm router configuration as a dictionary |
| **"return-agents-objects"** | Returns agent objects created from specifications |
@ -602,7 +601,6 @@ for agent in agents:
- Use `verbose=True` during development for debugging
- Choose the right `execution_type` for your use case:
- Use `"return-agents"` for getting agent specifications as dictionary (default)
- Use `"execute-swarm-router"` for executing the swarm router with created agents
- Use `"return-swarm-router-config"` for analyzing swarm architecture
- Use `"return-agents-objects"` for getting agent objects created from specifications
- Set `max_tokens` appropriately based on expected response length

@ -186,14 +186,14 @@ task = "Write a short story about a robot who discovers music."
# --- Example 1: SequentialWorkflow ---
# Agents run one after another in a chain: Writer -> Editor -> Reviewer.
print("Running a Sequential Workflow...")
sequential_router = SwarmRouter(swarm_type=SwarmType.SequentialWorkflow, agents=agents)
sequential_router = SwarmRouter(swarm_type="SequentialWorkflow", agents=agents)
sequential_output = sequential_router.run(task)
print(f"Final Sequential Output:\n{sequential_output}\n")
# --- Example 2: ConcurrentWorkflow ---
# All agents receive the same initial task and run at the same time.
print("Running a Concurrent Workflow...")
concurrent_router = SwarmRouter(swarm_type=SwarmType.ConcurrentWorkflow, agents=agents)
concurrent_router = SwarmRouter(swarm_type="ConcurrentWorkflow", agents=agents)
concurrent_outputs = concurrent_router.run(task)
# This returns a dictionary of each agent's output
for agent_name, output in concurrent_outputs.items():
@ -208,9 +208,9 @@ aggregator = Agent(
model_name="gpt-4o-mini"
)
moa_router = SwarmRouter(
swarm_type=SwarmType.MixtureOfAgents,
swarm_type="MixtureOfAgents",
agents=agents,
aggregator_agent=aggregator, # MoA requires an aggregator
aggregator_agent=aggregator,
)
aggregated_output = moa_router.run(task)
print(f"Final Aggregated Output:\n{aggregated_output}\n")

@ -0,0 +1,50 @@
from eth_account import Account
from x402.clients.httpx import x402HttpxClient
import os
from dotenv import load_dotenv
load_dotenv()
async def buy_x402_service(
base_url: str = None, endpoint: str = None
):
"""
Purchase a service from the X402 bazaar using the provided affordable_service details.
This function sets up an X402 client with the user's private key, connects to the service provider,
and executes a GET request to the service's endpoint as part of the buying process.
Args:
affordable_service (dict): A dictionary containing information about the target service.
base_url (str, optional): The base URL of the service provider. Defaults to None.
endpoint (str, optional): The specific API endpoint to interact with. Defaults to None.
Returns:
response (httpx.Response): The response object returned by the GET request to the service endpoint.
Example:
```python
affordable_service = {"id": "service123", "price": 90000}
response = await buy_x402_service(
affordable_service,
base_url="https://api.cdp.coinbase.com",
endpoint="/x402/v1/bazaar/services/service123"
)
print(await response.aread())
```
"""
key = os.getenv("X402_PRIVATE_KEY")
# Set up your payment account from private key
account = Account.from_key(key)
async with x402HttpxClient(
account=account, base_url=base_url
) as client:
response = await client.get(endpoint)
print(await response.aread())
return response

@ -0,0 +1,229 @@
import asyncio
from typing import List, Optional, Dict, Any
from swarms import Agent
import httpx
async def query_x402_services(
limit: Optional[int] = None,
max_price: Optional[int] = None,
offset: int = 0,
base_url: str = "https://api.cdp.coinbase.com",
) -> Dict[str, Any]:
"""
Query x402 discovery services from the Coinbase CDP API.
Args:
limit: Optional maximum number of services to return. If None, returns all available.
max_price: Optional maximum price in atomic units to filter by. Only services with
maxAmountRequired <= max_price will be included.
offset: Pagination offset for the API request. Defaults to 0.
base_url: Base URL for the API. Defaults to Coinbase CDP API.
Returns:
Dict containing the API response with 'items' list and pagination info.
Raises:
httpx.HTTPError: If the HTTP request fails.
httpx.RequestError: If there's a network error.
Example:
```python
# Get all services
result = await query_x402_services()
print(f"Found {len(result['items'])} services")
# Get first 10 services under 100000 atomic units
result = await query_x402_services(limit=10, max_price=100000)
```
"""
url = f"{base_url}/platform/v2/x402/discovery/resources"
params = {"offset": offset}
# If both limit and max_price are specified, fetch more services to account for filtering
# This ensures we can return the requested number after filtering by price
api_limit = limit
if limit is not None and max_price is not None:
# Fetch 5x the limit to account for services that might be filtered out
api_limit = limit * 5
if api_limit is not None:
params["limit"] = api_limit
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
# Filter by price if max_price is specified
if max_price is not None and "items" in data:
filtered_items = []
for item in data.get("items", []):
# Check if any payment option in 'accepts' has maxAmountRequired <= max_price
accepts = item.get("accepts", [])
for accept in accepts:
max_amount_str = accept.get("maxAmountRequired", "")
if max_amount_str:
try:
max_amount = int(max_amount_str)
if max_amount <= max_price:
filtered_items.append(item)
break # Only add item once if any payment option matches
except (ValueError, TypeError):
continue
# Apply limit to filtered results if specified
if limit is not None:
filtered_items = filtered_items[:limit]
data["items"] = filtered_items
# Update pagination total if we filtered
if "pagination" in data:
data["pagination"]["total"] = len(filtered_items)
return data
def filter_services_by_price(
services: List[Dict[str, Any]], max_price: int
) -> List[Dict[str, Any]]:
"""
Filter services by maximum price in atomic units.
Args:
services: List of service dictionaries from the API.
max_price: Maximum price in atomic units. Only services with at least one
payment option where maxAmountRequired <= max_price will be included.
Returns:
List of filtered service dictionaries.
Example:
```python
all_services = result["items"]
affordable = filter_services_by_price(all_services, max_price=100000)
```
"""
filtered = []
for item in services:
accepts = item.get("accepts", [])
for accept in accepts:
max_amount_str = accept.get("maxAmountRequired", "")
if max_amount_str:
try:
max_amount = int(max_amount_str)
if max_amount <= max_price:
filtered.append(item)
break # Only add item once if any payment option matches
except (ValueError, TypeError):
continue
return filtered
def limit_services(
services: List[Dict[str, Any]], max_count: int
) -> List[Dict[str, Any]]:
"""
Limit the number of services returned.
Args:
services: List of service dictionaries.
max_count: Maximum number of services to return.
Returns:
List containing at most max_count services.
Example:
```python
all_services = result["items"]
limited = limit_services(all_services, max_count=10)
```
"""
return services[:max_count]
async def get_x402_services(
limit: Optional[int] = None,
max_price: Optional[int] = None,
offset: int = 0,
) -> List[Dict[str, Any]]:
"""
Get x402 services with optional filtering by count and price.
This is a convenience function that queries the API and applies filters.
Args:
limit: Optional maximum number of services to return.
max_price: Optional maximum price in atomic units to filter by.
offset: Pagination offset for the API request. Defaults to 0.
Returns:
List of service dictionaries matching the criteria.
Example:
```python
# Get first 10 services under $0.10 USDC (100000 atomic units with 6 decimals)
services = await get_x402_services(limit=10, max_price=100000)
for service in services:
print(service["resource"])
```
"""
result = await query_x402_services(
limit=limit, max_price=max_price, offset=offset
)
return result.get("items", [])
def get_x402_services_sync(
limit: Optional[int] = None,
max_price: Optional[int] = None,
offset: int = 0,
) -> str:
"""
Synchronous wrapper for get_x402_services that returns a formatted string.
Args:
limit: Optional maximum number of services to return.
max_price: Optional maximum price in atomic units to filter by.
offset: Pagination offset for the API request. Defaults to 0.
Returns:
JSON-formatted string of service dictionaries matching the criteria.
Example:
```python
# Get first 10 services under $0.10 USDC
services_str = get_x402_services_sync(limit=10, max_price=100000)
print(services_str)
```
"""
services = asyncio.run(
get_x402_services(
limit=limit, max_price=max_price, offset=offset
)
)
return str(services)
agent = Agent(
agent_name="X402-Discovery-Agent",
agent_description="A agent that queries the x402 discovery services from the Coinbase CDP API.",
model_name="claude-haiku-4-5",
dynamic_temperature_enabled=True,
max_loops=1,
dynamic_context_window=True,
tools=[get_x402_services_sync],
top_p=None,
# temperature=0.0,
temperature=None,
tool_call_summary=True,
)
if __name__ == "__main__":
# Run the agent
out = agent.run(
task="Summarize the first 10 services under 100000 atomic units (e.g., $0.10 USDC)"
)
print(out)

@ -1,6 +0,0 @@
# RAG (Retrieval Augmented Generation) Examples
This directory contains examples demonstrating RAG implementations and vector database integrations in Swarms.
## Qdrant RAG
- [qdrant_rag_example.py](qdrant_rag_example.py) - Complete Qdrant RAG implementation

@ -1,98 +0,0 @@
"""
Agent with Qdrant RAG (Retrieval-Augmented Generation)
This example demonstrates using Qdrant as a vector database for RAG operations,
allowing agents to store and retrieve documents for enhanced context.
"""
from qdrant_client import QdrantClient, models
from swarms import Agent
from swarms_memory import QdrantDB
# Initialize Qdrant client
# Option 1: In-memory (for testing/development - data is not persisted)
# client = QdrantClient(":memory:")
# Option 2: Local Qdrant server
# client = QdrantClient(host="localhost", port=6333)
# Option 3: Qdrant Cloud (recommended for production)
import os
client = QdrantClient(
url=os.getenv("QDRANT_URL", "https://your-cluster.qdrant.io"),
api_key=os.getenv("QDRANT_API_KEY", "your-api-key"),
)
# Create QdrantDB wrapper for RAG operations
rag_db = QdrantDB(
client=client,
embedding_model="text-embedding-3-small",
collection_name="knowledge_base",
distance=models.Distance.COSINE,
n_results=3,
)
# Add documents to the knowledge base
documents = [
"Qdrant is a vector database optimized for similarity search and AI applications.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The swarms framework supports multiple memory backends including Qdrant.",
]
# Method 1: Add documents individually
for doc in documents:
rag_db.add(doc)
# Method 2: Batch add documents (more efficient for large datasets)
# Example with metadata
# documents_with_metadata = [
# "Machine learning is a subset of artificial intelligence.",
# "Deep learning uses neural networks with multiple layers.",
# "Natural language processing enables computers to understand human language.",
# "Computer vision allows machines to interpret visual information.",
# "Reinforcement learning learns through interaction with an environment."
# ]
#
# metadata = [
# {"category": "AI", "difficulty": "beginner", "topic": "overview"},
# {"category": "ML", "difficulty": "intermediate", "topic": "neural_networks"},
# {"category": "NLP", "difficulty": "intermediate", "topic": "language"},
# {"category": "CV", "difficulty": "advanced", "topic": "vision"},
# {"category": "RL", "difficulty": "advanced", "topic": "learning"}
# ]
#
# # Batch add with metadata
# doc_ids = rag_db.batch_add(documents_with_metadata, metadata=metadata, batch_size=3)
# print(f"Added {len(doc_ids)} documents in batch")
#
# # Query with metadata return
# results_with_metadata = rag_db.query(
# "What is artificial intelligence?",
# n_results=3,
# return_metadata=True
# )
#
# for i, result in enumerate(results_with_metadata):
# print(f"\nResult {i+1}:")
# print(f" Document: {result['document']}")
# print(f" Category: {result['category']}")
# print(f" Difficulty: {result['difficulty']}")
# print(f" Topic: {result['topic']}")
# print(f" Score: {result['score']:.4f}")
# Create agent with RAG capabilities
agent = Agent(
agent_name="RAG-Agent",
agent_description="Agent with Qdrant-powered RAG for enhanced knowledge retrieval",
model_name="gpt-4.1",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db,
)
# Query with RAG
response = agent.run("What is Qdrant and how does it relate to RAG?")
print(response)

@ -2,11 +2,25 @@
This directory contains examples demonstrating advanced reasoning capabilities and agent evaluation systems in Swarms.
## Reasoning Agent Router Examples
The `reasoning_agent_router_examples/` folder contains simple examples for each agent type supported by the `ReasoningAgentRouter`:
- [reasoning_duo_example.py](reasoning_agent_router_examples/reasoning_duo_example.py) - Reasoning Duo agent for collaborative reasoning
- [self_consistency_example.py](reasoning_agent_router_examples/self_consistency_example.py) - Self-Consistency agent with multiple samples
- [ire_example.py](reasoning_agent_router_examples/ire_example.py) - Iterative Reflective Expansion (IRE) agent
- [agent_judge_example.py](reasoning_agent_router_examples/agent_judge_example.py) - Agent Judge for evaluation and judgment
- [reflexion_agent_example.py](reasoning_agent_router_examples/reflexion_agent_example.py) - Reflexion agent with memory capabilities
- [gkp_agent_example.py](reasoning_agent_router_examples/gkp_agent_example.py) - Generated Knowledge Prompting (GKP) agent
## Agent Judge Examples
The `agent_judge_examples/` folder contains detailed examples of the AgentJudge system:
- [example1_basic_evaluation.py](agent_judge_examples/example1_basic_evaluation.py) - Basic agent evaluation
- [example2_technical_evaluation.py](agent_judge_examples/example2_technical_evaluation.py) - Technical evaluation criteria
- [example3_creative_evaluation.py](agent_judge_examples/example3_creative_evaluation.py) - Creative evaluation patterns
## O3 Integration
- [example_o3.py](example_o3.py) - O3 model integration example
- [o3_agent.py](o3_agent.py) - O3 agent implementation
## Self-MoA Sequential Examples
- [moa_seq_example.py](moa_seq_example.py) - Self-MoA Sequential reasoning example for complex problem-solving

@ -1,18 +0,0 @@
from swarms.utils.litellm_wrapper import LiteLLM
# Initialize the LiteLLM wrapper with reasoning support
llm = LiteLLM(
model_name="claude-sonnet-4-20250514", # OpenAI o3 model with reasoning
reasoning_effort="low", # Enable reasoning with high effort
temperature=1,
max_tokens=2000,
stream=False,
thinking_tokens=1024,
)
# Example task that would benefit from reasoning
task = "Solve this step-by-step: A farmer has 17 sheep and all but 9 die. How many sheep does he have left?"
print("=== Running reasoning model ===")
response = llm.run(task)
print(response)

@ -0,0 +1,9 @@
from swarms.agents.reasoning_agents import ReasoningAgentRouter
router = ReasoningAgentRouter(
swarm_type="AgentJudge",
model_name="gpt-4o-mini",
max_loops=1,
)
result = router.run("Is Python a good programming language?")

@ -0,0 +1,9 @@
from swarms.agents.reasoning_agents import ReasoningAgentRouter
router = ReasoningAgentRouter(
swarm_type="GKPAgent",
model_name="gpt-4o-mini",
num_knowledge_items=3,
)
result = router.run("What is artificial intelligence?")

@ -0,0 +1,11 @@
from swarms.agents.reasoning_agents import ReasoningAgentRouter
router = ReasoningAgentRouter(
swarm_type="ire",
model_name="gpt-4o-mini",
num_samples=1,
)
result = router.run("Explain photosynthesis in one sentence.")
print(result)

@ -0,0 +1,9 @@
from swarms.agents.reasoning_agents import ReasoningAgentRouter
router = ReasoningAgentRouter(
swarm_type="reasoning-duo",
model_name="gpt-4o-mini",
max_loops=1,
)
result = router.run("What is 2+2?")

@ -0,0 +1,10 @@
from swarms.agents.reasoning_agents import ReasoningAgentRouter
router = ReasoningAgentRouter(
swarm_type="ReflexionAgent",
model_name="gpt-4o-mini",
max_loops=1,
memory_capacity=3,
)
result = router.run("What is machine learning?")

@ -0,0 +1,10 @@
from swarms.agents.reasoning_agents import ReasoningAgentRouter
router = ReasoningAgentRouter(
swarm_type="self-consistency",
model_name="gpt-4o-mini",
max_loops=1,
num_samples=3,
)
result = router.run("What is the capital of France?")

@ -1,882 +1,91 @@
"""
Qdrant RAG Example with Document Ingestion
This example demonstrates how to use the agent structure from example.py with Qdrant RAG
to ingest a vast array of PDF documents and text files for advanced quantitative trading analysis.
Features:
- Document ingestion from multiple file types (PDF, TXT, MD)
- Qdrant vector database integration
- Sentence transformer embeddings
- Comprehensive document processing pipeline
- Agent with RAG capabilities for financial analysis
"""
import os
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.models import Distance, VectorParams
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient, models
from swarms import Agent
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.data_to_text import data_to_text
class DocumentProcessor:
"""
Handles document processing and text extraction from various file formats.
This class provides functionality to process PDF, TXT, and Markdown files,
extracting text content for vectorization and storage in the RAG system.
"""
def __init__(
self, supported_extensions: Optional[List[str]] = None
):
"""
Initialize the DocumentProcessor.
Args:
supported_extensions: List of supported file extensions.
Defaults to ['.pdf', '.txt', '.md']
"""
if supported_extensions is None:
supported_extensions = [".pdf", ".txt", ".md"]
self.supported_extensions = supported_extensions
def process_document(
self, file_path: Union[str, Path]
) -> Optional[Dict[str, str]]:
"""
Process a single document and extract its text content.
Args:
file_path: Path to the document file
Returns:
Dictionary containing document metadata and extracted text, or None if processing fails
"""
file_path = Path(file_path)
if not file_path.exists():
print(f"File not found: {file_path}")
return None
if file_path.suffix.lower() not in self.supported_extensions:
print(f"Unsupported file type: {file_path.suffix}")
return None
try:
# Extract text based on file type
if file_path.suffix.lower() == ".pdf":
try:
text_content = pdf_to_text(str(file_path))
except Exception as pdf_error:
print(f"Error extracting PDF text: {pdf_error}")
# Fallback: try to read as text file
with open(
file_path,
"r",
encoding="utf-8",
errors="ignore",
) as f:
text_content = f.read()
else:
try:
text_content = data_to_text(str(file_path))
except Exception as data_error:
print(f"Error extracting text: {data_error}")
# Fallback: try to read as text file
with open(
file_path,
"r",
encoding="utf-8",
errors="ignore",
) as f:
text_content = f.read()
# Ensure text_content is a string
if callable(text_content):
print(
f"Warning: {file_path} returned a callable, trying to call it..."
)
try:
text_content = text_content()
except Exception as call_error:
print(f"Error calling callable: {call_error}")
return None
if not text_content or not isinstance(text_content, str):
print(
f"No valid text content extracted from: {file_path}"
)
return None
from swarms_memory import QdrantDB
# Clean the text content
text_content = str(text_content).strip()
return {
"file_path": str(file_path),
"file_name": file_path.name,
"file_type": file_path.suffix.lower(),
"text_content": text_content,
"file_size": file_path.stat().st_size,
"processed_at": datetime.utcnow().isoformat(),
}
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
return None
def process_directory(
self, directory_path: Union[str, Path], max_workers: int = 4
) -> List[Dict[str, str]]:
"""
Process all supported documents in a directory concurrently.
Args:
directory_path: Path to the directory containing documents
max_workers: Maximum number of concurrent workers for processing
Returns:
List of processed document dictionaries
"""
directory_path = Path(directory_path)
if not directory_path.is_dir():
print(f"Directory not found: {directory_path}")
return []
# Find all supported files
supported_files = []
for ext in self.supported_extensions:
supported_files.extend(directory_path.rglob(f"*{ext}"))
supported_files.extend(
directory_path.rglob(f"*{ext.upper()}")
)
if not supported_files:
print(f"No supported files found in: {directory_path}")
return []
print(f"Found {len(supported_files)} files to process")
# Process files concurrently
processed_documents = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(
self.process_document, file_path
): file_path
for file_path in supported_files
}
for future in concurrent.futures.as_completed(
future_to_file
):
file_path = future_to_file[future]
try:
result = future.result()
if result:
processed_documents.append(result)
print(f"Processed: {result['file_name']}")
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
print(
f"Successfully processed {len(processed_documents)} documents"
)
return processed_documents
class QdrantRAGMemory:
"""
Enhanced Qdrant memory system for RAG operations with document storage.
This class extends the basic Qdrant memory system to handle document ingestion,
chunking, and semantic search for large document collections.
"""
def __init__(
self,
collection_name: str = "document_memories",
vector_size: int = 384, # Default size for all-MiniLM-L6-v2
url: Optional[str] = None,
api_key: Optional[str] = None,
chunk_size: int = 1000,
chunk_overlap: int = 200,
):
"""
Initialize the Qdrant RAG memory system.
Args:
collection_name: Name of the Qdrant collection to use
vector_size: Dimension of the embedding vectors
url: Optional Qdrant server URL (defaults to local)
api_key: Optional Qdrant API key for cloud deployment
chunk_size: Size of text chunks for processing
chunk_overlap: Overlap between consecutive chunks
"""
self.collection_name = collection_name
self.vector_size = vector_size
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Initialize Qdrant client
if url and api_key:
self.client = QdrantClient(url=url, api_key=api_key)
else:
self.client = QdrantClient(
":memory:"
) # Local in-memory storage
# Initialize embedding model
self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# Get the actual embedding dimension from the model
sample_text = "Sample text for dimension check"
sample_embedding = self.embedding_model.encode(sample_text)
actual_dimension = len(sample_embedding)
# Update vector_size to match the actual model dimension
if actual_dimension != self.vector_size:
print(
f"Updating vector size from {self.vector_size} to {actual_dimension} to match model"
)
self.vector_size = actual_dimension
# Create collection if it doesn't exist
self._create_collection()
def _create_collection(self):
"""Create the Qdrant collection if it doesn't exist."""
collections = self.client.get_collections().collections
exists = any(
col.name == self.collection_name for col in collections
)
if not exists:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.vector_size, distance=Distance.COSINE
),
)
print(
f"Created Qdrant collection: {self.collection_name}"
)
def _chunk_text(self, text: str) -> List[str]:
"""
Split text into overlapping chunks for better retrieval.
Args:
text: Text content to chunk
Returns:
List of text chunks
"""
# Ensure text is a string
if not isinstance(text, str):
text = str(text)
if len(text) <= self.chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
# Try to break at sentence boundaries
if end < len(text):
# Look for sentence endings
for i in range(end, max(start, end - 100), -1):
if text[i] in ".!?":
end = i + 1
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - self.chunk_overlap
if start >= len(text):
break
return chunks
def add_document(
self, document_data: Dict[str, str]
) -> List[str]:
"""
Add a document to the memory system with chunking.
Args:
document_data: Dictionary containing document information
Returns:
List of memory IDs for the stored chunks
"""
text_content = document_data["text_content"]
# Ensure text_content is a string
if not isinstance(text_content, str):
print(
f"Warning: text_content is not a string: {type(text_content)}"
)
text_content = str(text_content)
chunks = self._chunk_text(text_content)
memory_ids = []
for i, chunk in enumerate(chunks):
# Generate embedding for the chunk
embedding = self.embedding_model.encode(chunk).tolist()
# Prepare metadata
metadata = {
"document_name": document_data["file_name"],
"document_path": document_data["file_path"],
"document_type": document_data["file_type"],
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_text": chunk,
"timestamp": datetime.utcnow().isoformat(),
"file_size": document_data["file_size"],
}
# Store the chunk
memory_id = str(uuid.uuid4())
self.client.upsert(
collection_name=self.collection_name,
points=[
models.PointStruct(
id=memory_id,
payload=metadata,
vector=embedding,
)
],
)
memory_ids.append(memory_id)
print(
f"Added document '{document_data['file_name']}' with {len(chunks)} chunks"
)
return memory_ids
def add_documents_batch(
self, documents: List[Dict[str, str]]
) -> List[str]:
"""
Add multiple documents to the memory system.
Args:
documents: List of document dictionaries
Returns:
List of all memory IDs
"""
all_memory_ids = []
for document in documents:
memory_ids = self.add_document(document)
all_memory_ids.extend(memory_ids)
return all_memory_ids
def add(self, text: str, metadata: Optional[Dict] = None) -> str:
"""
Add a text entry to the memory system (required by Swarms interface).
Args:
text: The text content to add
metadata: Optional metadata for the entry
Returns:
str: ID of the stored memory
"""
if metadata is None:
metadata = {}
# Generate embedding for the text
embedding = self.embedding_model.encode(text).tolist()
# Prepare metadata
memory_metadata = {
"text": text,
"timestamp": datetime.utcnow().isoformat(),
"source": "agent_memory",
}
memory_metadata.update(metadata)
# Store the point
memory_id = str(uuid.uuid4())
self.client.upsert(
collection_name=self.collection_name,
points=[
models.PointStruct(
id=memory_id,
payload=memory_metadata,
vector=embedding,
)
],
)
return memory_id
def query(
self,
query_text: str,
limit: int = 5,
score_threshold: float = 0.7,
include_metadata: bool = True,
) -> List[Dict]:
"""
Query memories based on text similarity.
Args:
query_text: The text query to search for
limit: Maximum number of results to return
score_threshold: Minimum similarity score threshold
include_metadata: Whether to include metadata in results
Returns:
List of matching memories with their metadata
"""
try:
# Check if collection has any points
collection_info = self.client.get_collection(
self.collection_name
)
if collection_info.points_count == 0:
print(
"Warning: Collection is empty, no documents to query"
)
return []
# Generate embedding for the query
query_embedding = self.embedding_model.encode(
query_text
).tolist()
# Search in Qdrant
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=limit,
score_threshold=score_threshold,
)
memories = []
for res in results:
memory = res.payload.copy()
memory["similarity_score"] = res.score
if not include_metadata:
# Keep only essential information
memory = {
"chunk_text": memory.get("chunk_text", ""),
"document_name": memory.get(
"document_name", ""
),
"similarity_score": memory[
"similarity_score"
],
}
memories.append(memory)
return memories
except Exception as e:
print(f"Error querying collection: {e}")
return []
def get_collection_stats(self) -> Dict:
"""
Get statistics about the collection.
Returns:
Dictionary containing collection statistics
"""
try:
collection_info = self.client.get_collection(
self.collection_name
)
return {
"collection_name": self.collection_name,
"vector_size": collection_info.config.params.vectors.size,
"distance": collection_info.config.params.vectors.distance,
"points_count": collection_info.points_count,
}
except Exception as e:
print(f"Error getting collection stats: {e}")
return {}
def clear_collection(self):
"""Clear all memories from the collection."""
self.client.delete_collection(self.collection_name)
self._create_collection()
print(f"Cleared collection: {self.collection_name}")
class QuantitativeTradingRAGAgent:
"""
Advanced quantitative trading agent with RAG capabilities for document analysis.
# Option 1: In-memory (for testing/development - data is not persisted)
# client = QdrantClient(":memory:")
This agent combines the structure from example.py with Qdrant RAG to provide
comprehensive financial analysis based on ingested documents.
"""
# Option 2: Local Qdrant server
# client = QdrantClient(host="localhost", port=6333)
def __init__(
self,
agent_name: str = "Quantitative-Trading-RAG-Agent",
collection_name: str = "financial_documents",
qdrant_url: Optional[str] = None,
qdrant_api_key: Optional[str] = None,
model_name: str = "claude-sonnet-4-20250514",
max_loops: int = 1,
chunk_size: int = 1000,
chunk_overlap: int = 200,
):
"""
Initialize the Quantitative Trading RAG Agent.
Args:
agent_name: Name of the agent
collection_name: Name of the Qdrant collection
qdrant_url: Optional Qdrant server URL
qdrant_api_key: Optional Qdrant API key
model_name: LLM model to use
max_loops: Maximum number of agent loops
chunk_size: Size of text chunks for processing
chunk_overlap: Overlap between consecutive chunks
"""
self.agent_name = agent_name
self.collection_name = collection_name
# Initialize document processor
self.document_processor = DocumentProcessor()
# Initialize Qdrant RAG memory
self.rag_memory = QdrantRAGMemory(
collection_name=collection_name,
url=qdrant_url,
api_key=qdrant_api_key,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
# Initialize the agent with RAG capabilities
self.agent = Agent(
agent_name=agent_name,
agent_description="Advanced quantitative trading and algorithmic analysis agent with RAG capabilities",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You have access to a comprehensive document database through RAG (Retrieval-Augmented Generation).
When answering questions, you can search through this database to find relevant information
and provide evidence-based responses.
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
model_name=model_name,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_loops=max_loops,
dynamic_context_window=True,
long_term_memory=self.rag_memory,
)
def ingest_documents(
self, documents_path: Union[str, Path]
) -> int:
"""
Ingest documents from a directory into the RAG system.
Args:
documents_path: Path to directory containing documents
Returns:
Number of documents successfully ingested
"""
print(f"Starting document ingestion from: {documents_path}")
try:
# Process documents
processed_documents = (
self.document_processor.process_directory(
documents_path
)
)
if not processed_documents:
print("No documents to ingest")
return 0
# Add documents to RAG memory
memory_ids = self.rag_memory.add_documents_batch(
processed_documents
)
print(
f"Successfully ingested {len(processed_documents)} documents"
)
print(f"Created {len(memory_ids)} memory chunks")
return len(processed_documents)
except Exception as e:
print(f"Error during document ingestion: {e}")
import traceback
traceback.print_exc()
return 0
def query_documents(
self, query: str, limit: int = 5
) -> List[Dict]:
"""
Query the document database for relevant information.
Args:
query: The query text
limit: Maximum number of results to return
Returns:
List of relevant document chunks
"""
return self.rag_memory.query(query, limit=limit)
def run_analysis(self, task: str) -> str:
"""
Run a financial analysis task using the agent with RAG capabilities.
Args:
task: The analysis task to perform
Returns:
Agent's response to the task
"""
print(f"Running analysis task: {task}")
# First, query the document database for relevant context
relevant_docs = self.query_documents(task, limit=3)
if relevant_docs:
# Enhance the task with relevant document context
context = "\n\nRelevant Document Information:\n"
for i, doc in enumerate(relevant_docs, 1):
context += f"\nDocument {i}: {doc.get('document_name', 'Unknown')}\n"
context += f"Relevance Score: {doc.get('similarity_score', 0):.3f}\n"
context += (
f"Content: {doc.get('chunk_text', '')[:500]}...\n"
)
enhanced_task = f"{task}\n\n{context}"
else:
enhanced_task = task
# Run the agent
response = self.agent.run(enhanced_task)
return response
def get_database_stats(self) -> Dict:
"""
Get statistics about the document database.
Returns:
Dictionary containing database statistics
"""
return self.rag_memory.get_collection_stats()
def main():
"""
Main function demonstrating the Qdrant RAG agent with document ingestion.
"""
from datetime import datetime
# Example usage
print("🚀 Initializing Quantitative Trading RAG Agent...")
# Option 3: Qdrant Cloud (recommended for production)
import os
# Initialize the agent (you can set environment variables for Qdrant cloud)
agent = QuantitativeTradingRAGAgent(
agent_name="Quantitative-Trading-RAG-Agent",
collection_name="financial_documents",
qdrant_url=os.getenv(
"QDRANT_URL"
), # Optional: For cloud deployment
qdrant_api_key=os.getenv(
"QDRANT_API_KEY"
), # Optional: For cloud deployment
model_name="claude-sonnet-4-20250514",
client = QdrantClient(
url=os.getenv("QDRANT_URL", "https://your-cluster.qdrant.io"),
api_key=os.getenv("QDRANT_API_KEY", "your-api-key"),
)
# Create QdrantDB wrapper for RAG operations
rag_db = QdrantDB(
client=client,
embedding_model="text-embedding-3-small",
collection_name="knowledge_base",
distance=models.Distance.COSINE,
n_results=3,
)
# Add documents to the knowledge base
documents = [
"Qdrant is a vector database optimized for similarity search and AI applications.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The swarms framework supports multiple memory backends including Qdrant.",
]
# Method 1: Add documents individually
for doc in documents:
rag_db.add(doc)
# Method 2: Batch add documents (more efficient for large datasets)
# Example with metadata
# documents_with_metadata = [
# "Machine learning is a subset of artificial intelligence.",
# "Deep learning uses neural networks with multiple layers.",
# "Natural language processing enables computers to understand human language.",
# "Computer vision allows machines to interpret visual information.",
# "Reinforcement learning learns through interaction with an environment."
# ]
#
# metadata = [
# {"category": "AI", "difficulty": "beginner", "topic": "overview"},
# {"category": "ML", "difficulty": "intermediate", "topic": "neural_networks"},
# {"category": "NLP", "difficulty": "intermediate", "topic": "language"},
# {"category": "CV", "difficulty": "advanced", "topic": "vision"},
# {"category": "RL", "difficulty": "advanced", "topic": "learning"}
# ]
#
# # Batch add with metadata
# doc_ids = rag_db.batch_add(documents_with_metadata, metadata=metadata, batch_size=3)
# print(f"Added {len(doc_ids)} documents in batch")
#
# # Query with metadata return
# results_with_metadata = rag_db.query(
# "What is artificial intelligence?",
# n_results=3,
# return_metadata=True
# )
#
# for i, result in enumerate(results_with_metadata):
# print(f"\nResult {i+1}:")
# print(f" Document: {result['document']}")
# print(f" Category: {result['category']}")
# print(f" Difficulty: {result['difficulty']}")
# print(f" Topic: {result['topic']}")
# print(f" Score: {result['score']:.4f}")
# Create agent with RAG capabilities
agent = Agent(
agent_name="RAG-Agent",
agent_description="Agent with Qdrant-powered RAG for enhanced knowledge retrieval",
model_name="gpt-4.1",
max_loops=1,
chunk_size=1000,
chunk_overlap=200,
)
# Example: Ingest documents from a directory
documents_path = "documents" # Path to your documents
if os.path.exists(documents_path):
print(f"Found documents directory: {documents_path}")
try:
agent.ingest_documents(documents_path)
except Exception as e:
print(f"Error ingesting documents: {e}")
print("Continuing without document ingestion...")
else:
print(f"Documents directory not found: {documents_path}")
print("Creating a sample document for demonstration...")
# Create a sample document
try:
sample_doc = {
"file_path": "sample_financial_analysis.txt",
"file_name": "sample_financial_analysis.txt",
"file_type": ".txt",
"text_content": """
Gold ETFs: A Comprehensive Investment Guide
Gold ETFs (Exchange-Traded Funds) provide investors with exposure to gold prices
without the need to physically store the precious metal. These funds track the
price of gold and offer several advantages including liquidity, diversification,
and ease of trading.
Top Gold ETFs include:
1. SPDR Gold Shares (GLD) - Largest gold ETF with high liquidity
2. iShares Gold Trust (IAU) - Lower expense ratio alternative
3. Aberdeen Standard Physical Gold ETF (SGOL) - Swiss storage option
Investment strategies for gold ETFs:
- Portfolio diversification (5-10% allocation)
- Inflation hedge
- Safe haven during market volatility
- Tactical trading opportunities
Market analysis shows that gold has historically served as a store of value
and hedge against inflation. Recent market conditions have increased interest
in gold investments due to economic uncertainty and geopolitical tensions.
""",
"file_size": 1024,
"processed_at": datetime.utcnow().isoformat(),
}
# Add the sample document to the RAG memory
memory_ids = agent.rag_memory.add_document(sample_doc)
print(
f"Added sample document with {len(memory_ids)} chunks"
)
except Exception as e:
print(f"Error creating sample document: {e}")
print("Continuing without sample document...")
# Example: Query the database
print("\n📊 Querying document database...")
try:
query_results = agent.query_documents(
"gold ETFs investment strategies", limit=3
)
print(f"Found {len(query_results)} relevant document chunks")
if query_results:
print("Sample results:")
for i, result in enumerate(query_results[:2], 1):
print(
f" {i}. {result.get('document_name', 'Unknown')} (Score: {result.get('similarity_score', 0):.3f})"
)
else:
print(
"No documents found in database. This is expected if no documents were ingested."
dynamic_temperature_enabled=True,
long_term_memory=rag_db,
)
except Exception as e:
print(f"❌ Query failed: {e}")
# Example: Run financial analysis
print("\n💹 Running financial analysis...")
analysis_task = "What are the best top 3 ETFs for gold coverage and what are their key characteristics?"
try:
response = agent.run_analysis(analysis_task)
print("\n📈 Analysis Results:")
# Query with RAG
response = agent.run("What is Qdrant and how does it relate to RAG?")
print(response)
except Exception as e:
print(f"❌ Analysis failed: {e}")
print("This might be due to API key or model access issues.")
print("Continuing with database statistics...")
# Try a simpler query that doesn't require the LLM
print("\n🔍 Trying simple document query instead...")
try:
simple_results = agent.query_documents(
"what do you see in the document?", limit=2
)
if simple_results:
print("Simple query results:")
for i, result in enumerate(simple_results, 1):
print(
f" {i}. {result.get('document_name', 'Unknown')}"
)
print(
f" Content preview: {result.get('chunk_text', '')[:100]}..."
)
else:
print("No results from simple query")
except Exception as simple_error:
print(f"Simple query also failed: {simple_error}")
# Get database statistics
print("\n📊 Database Statistics:")
try:
stats = agent.get_database_stats()
for key, value in stats.items():
print(f" {key}: {value}")
except Exception as e:
print(f"❌ Failed to get database statistics: {e}")
print("\n✅ Example completed successfully!")
print("💡 To test with your own documents:")
print(" 1. Create a 'documents' directory")
print(" 2. Add PDF, TXT, or MD files")
print(" 3. Run the script again")
if __name__ == "__main__":
main()

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "8.6.0"
version = "8.6.3"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@swarms.world>"]

@ -0,0 +1,40 @@
from swarms import SwarmRouter, Agent
# Create specialized agents
research_agent = Agent(
agent_name="Research-Analyst",
agent_description="Specialized in comprehensive research and data gathering",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
analysis_agent = Agent(
agent_name="Data-Analyst",
agent_description="Expert in data analysis and pattern recognition",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
strategy_agent = Agent(
agent_name="Strategy-Consultant",
agent_description="Specialized in strategic planning and recommendations",
model_name="gpt-4o-mini",
max_loops=1,
verbose=False,
)
router = SwarmRouter(
name="SwarmRouter",
description="Routes tasks to specialized agents based on their capabilities",
agents=[research_agent, analysis_agent, strategy_agent],
swarm_type="MajorityVoting",
max_loops=1,
verbose=False,
)
result = router.run(
"Conduct a research analysis on water stocks and etfs"
)
print(result)

@ -285,6 +285,10 @@ class ReasoningAgentRouter:
"""
try:
swarm = self.select_swarm()
if self.swarm_type == "ReflexionAgent":
return swarm.run(tasks=[task], *args, **kwargs)
else:
return swarm.run(task=task, *args, **kwargs)
except Exception as e:
raise ReasoningAgentExecutorError(

@ -1,3 +1,8 @@
"""
A structured prompt template that guides models through step-by-step visual analysis, from observation to reflection.
Provides a systematic chain-of-thought approach for analyzing images, graphs, and visual puzzles with detailed reasoning and visual references.
"""
VISUAL_CHAIN_OF_THOUGHT = """
You, as the model, are presented with a visual problem. This could be an image containing various elements that you need to analyze, a graph that requires interpretation, or a visual puzzle. Your task is to examine the visual information carefully and describe your process of understanding and solving the problem.

@ -218,7 +218,8 @@ class Agent:
preset_stopping_token (bool): Enable preset stopping token
traceback (Any): The traceback
traceback_handlers (Any): The traceback handlers
streaming_on (bool): Enable streaming
streaming_on (bool): Enable basic streaming with formatted panels
stream (bool): Enable detailed token-by-token streaming with metadata (citations, tokens used, etc.)
docs (List[str]): The list of documents
docs_folder (str): The folder containing the documents
verbose (bool): Enable verbose mode
@ -310,9 +311,9 @@ class Agent:
>>> print(response)
>>> # Generate a report on the financials.
>>> # Real-time streaming example
>>> agent = Agent(model_name="gpt-4.1", max_loops=1, streaming_on=True)
>>> response = agent.run("Tell me a long story.") # Will stream in real-time
>>> # Detailed token streaming example
>>> agent = Agent(model_name="gpt-4.1", max_loops=1, stream=True)
>>> response = agent.run("Tell me a story.") # Will stream each token with detailed metadata
>>> print(response) # Final complete response
>>> # Fallback model example
@ -366,6 +367,7 @@ class Agent:
traceback: Optional[Any] = None,
traceback_handlers: Optional[Any] = None,
streaming_on: Optional[bool] = False,
stream: Optional[bool] = False,
docs: List[str] = None,
docs_folder: Optional[str] = None,
verbose: Optional[bool] = False,
@ -516,6 +518,7 @@ class Agent:
self.traceback = traceback
self.traceback_handlers = traceback_handlers
self.streaming_on = streaming_on
self.stream = stream
self.docs = docs
self.docs_folder = docs_folder
self.verbose = verbose
@ -1346,6 +1349,8 @@ class Agent:
)
elif self.streaming_on:
pass
elif self.stream:
pass
else:
self.pretty_print(
response, loop_count
@ -2566,8 +2571,105 @@ class Agent:
del kwargs["is_last"]
try:
# Set streaming parameter in LLM if streaming is enabled
if self.streaming_on and hasattr(self.llm, "stream"):
if self.stream and hasattr(self.llm, "stream"):
original_stream = self.llm.stream
self.llm.stream = True
if img is not None:
streaming_response = self.llm.run(
task=task, img=img, *args, **kwargs
)
else:
streaming_response = self.llm.run(
task=task, *args, **kwargs
)
if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str):
complete_response = ""
token_count = 0
final_chunk = None
first_chunk = None
for chunk in streaming_response:
if first_chunk is None:
first_chunk = chunk
if hasattr(chunk, "choices") and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
complete_response += content
token_count += 1
# Schema per token outputted
token_info = {
"token_index": token_count,
"model": getattr(chunk, 'model', self.get_current_model()),
"id": getattr(chunk, 'id', ''),
"created": getattr(chunk, 'created', int(time.time())),
"object": getattr(chunk, 'object', 'chat.completion.chunk'),
"token": content,
"system_fingerprint": getattr(chunk, 'system_fingerprint', ''),
"finish_reason": chunk.choices[0].finish_reason,
"citations": getattr(chunk, 'citations', None),
"provider_specific_fields": getattr(chunk, 'provider_specific_fields', None),
"service_tier": getattr(chunk, 'service_tier', 'default'),
"obfuscation": getattr(chunk, 'obfuscation', None),
"usage": getattr(chunk, 'usage', None),
"logprobs": chunk.choices[0].logprobs,
"timestamp": time.time()
}
print(f"ResponseStream {token_info}")
if streaming_callback is not None:
streaming_callback(token_info)
final_chunk = chunk
#Final ModelResponse to stream
if final_chunk and hasattr(final_chunk, 'usage') and final_chunk.usage:
usage = final_chunk.usage
print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
f"created={getattr(final_chunk, 'created', 'N/A')}, "
f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
f"provider_specific_fields=None, "
f"usage=Usage(completion_tokens={usage.completion_tokens}, "
f"prompt_tokens={usage.prompt_tokens}, "
f"total_tokens={usage.total_tokens}, "
f"completion_tokens_details=CompletionTokensDetailsWrapper("
f"accepted_prediction_tokens={usage.completion_tokens_details.accepted_prediction_tokens}, "
f"audio_tokens={usage.completion_tokens_details.audio_tokens}, "
f"reasoning_tokens={usage.completion_tokens_details.reasoning_tokens}, "
f"rejected_prediction_tokens={usage.completion_tokens_details.rejected_prediction_tokens}, "
f"text_tokens={usage.completion_tokens_details.text_tokens}), "
f"prompt_tokens_details=PromptTokensDetailsWrapper("
f"audio_tokens={usage.prompt_tokens_details.audio_tokens}, "
f"cached_tokens={usage.prompt_tokens_details.cached_tokens}, "
f"text_tokens={usage.prompt_tokens_details.text_tokens}, "
f"image_tokens={usage.prompt_tokens_details.image_tokens})))")
else:
print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
f"created={getattr(final_chunk, 'created', 'N/A')}, "
f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
f"provider_specific_fields=None)")
self.llm.stream = original_stream
return complete_response
else:
self.llm.stream = original_stream
return streaming_response
elif self.streaming_on and hasattr(self.llm, "stream"):
original_stream = self.llm.stream
self.llm.stream = True
@ -3052,6 +3154,8 @@ class Agent:
if self.streaming_on:
pass
elif self.stream:
pass
if self.print_on:
formatter.print_panel(

@ -16,7 +16,6 @@ load_dotenv()
execution_types = [
"return-agents",
"execute-swarm-router",
"return-swarm-router-config",
"return-agents-objects",
]

@ -944,7 +944,8 @@ class HierarchicalSwarm:
if self.planning_enabled is True:
self.director.tools_list_dictionary = None
out = self.setup_director_with_planning(
task=self.conversation.get_str(), img=img
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
img=img,
)
self.conversation.add(
role=self.director.agent_name, content=out

@ -2,7 +2,7 @@ import concurrent.futures
import json
import os
import traceback
from typing import Any, Callable, Dict, List, Literal, Optional, Union
from typing import Any, Callable, Dict, List, Literal, Optional, Union, get_args
from pydantic import BaseModel, Field
@ -46,6 +46,7 @@ SwarmType = Literal[
"CouncilAsAJudge",
"InteractiveGroupChat",
"HeavySwarm",
"BatchedGridWorkflow",
]
@ -272,6 +273,24 @@ class SwarmRouter:
"SwarmRouter: Swarm type cannot be 'none'. Check the docs for all the swarm types available. https://docs.swarms.world/en/latest/swarms/structs/swarm_router/"
)
# Validate swarm type is a valid string
valid_swarm_types = get_args(SwarmType)
if not isinstance(self.swarm_type, str):
raise SwarmRouterConfigError(
f"SwarmRouter: swarm_type must be a string, not {type(self.swarm_type).__name__}. "
f"Valid types are: {', '.join(valid_swarm_types)}. "
"Use swarm_type='SequentialWorkflow' (string), NOT SwarmType.SequentialWorkflow. "
"See https://docs.swarms.world/en/latest/swarms/structs/swarm_router/"
)
if self.swarm_type not in valid_swarm_types:
raise SwarmRouterConfigError(
f"SwarmRouter: Invalid swarm_type '{self.swarm_type}'. "
f"Valid types are: {', '.join(valid_swarm_types)}. "
"See https://docs.swarms.world/en/latest/swarms/structs/swarm_router/"
)
if (
self.swarm_type != "HeavySwarm"
and self.agents is None
@ -423,7 +442,6 @@ class SwarmRouter:
max_loops=self.max_loops,
flow=self.rearrange_flow,
output_type=self.output_type,
return_entire_history=self.return_entire_history,
*args,
**kwargs,
)
@ -474,7 +492,6 @@ class SwarmRouter:
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
return_all_history=self.return_entire_history,
output_type=self.output_type,
*args,
**kwargs,
@ -499,7 +516,8 @@ class SwarmRouter:
name=self.name,
description=self.description,
agents=self.agents,
consensus_agent=self.agents[-1],
max_loops=self.max_loops,
output_type=self.output_type,
*args,
**kwargs,
)

@ -0,0 +1,9 @@
from swarms.structs.agent import Agent
agent = Agent(
model_name="gpt-4.1",
max_loops=1,
stream=True,
)
agent.run("Tell me a short story about a robot learning to paint.")
Loading…
Cancel
Save