updated docs !

pull/1067/head
harshalmore31 1 month ago
parent 8338aa89bc
commit c38e4ba490

@ -433,7 +433,7 @@ nav:
- Yahoo Finance: "swarms/examples/yahoo_finance.md"
- Firecrawl: "developer_guides/firecrawl.md"
- RAG Vector Databases:
- RAG:
- Overview: "rag-vector-databases/overview.md"
- ChromaDB Integration: "rag-vector-databases/chromadb.md"
- FAISS Integration: "rag-vector-databases/faiss.md"

@ -56,171 +56,53 @@ export OPENAI_API_KEY="your-openai-api-key"
```python
"""
ChromaDB RAG Integration with Swarms Agent
Agent with ChromaDB RAG (Retrieval-Augmented Generation)
This example demonstrates how to integrate ChromaDB as a vector database
for RAG operations with Swarms agents using LiteLLM embeddings.
This example demonstrates using ChromaDB as a vector database for RAG operations,
allowing agents to store and retrieve documents for enhanced context.
"""
import chromadb
from swarms import Agent
import os
from litellm import embedding
from swarms_memory import ChromaDB
# Initialize ChromaDB client
# Option 1: In-memory (for testing/development)
# client = chromadb.Client()
# Option 2: Persistent local storage (recommended for development)
client = chromadb.PersistentClient(path="./chroma_db")
# Option 3: Remote ChromaDB server (for production)
# client = chromadb.HttpClient(
# host=os.getenv("CHROMA_HOST", "localhost"),
# port=os.getenv("CHROMA_PORT", "8000")
# )
# Create or get collection
collection_name = "swarms_knowledge_base"
collection = client.get_or_create_collection(
name=collection_name,
metadata={"description": "Knowledge base for Swarms agents"}
# Initialize ChromaDB wrapper for RAG operations
rag_db = ChromaDB(
metric="cosine", # Distance metric for similarity search
output_dir="knowledge_base_new", # Collection name
limit_tokens=1000, # Token limit for queries
n_results=3, # Number of results to retrieve
verbose=False
)
# Embedding function using LiteLLM
def get_embeddings(texts):
"""Generate embeddings using LiteLLM unified interface"""
if isinstance(texts, str):
texts = [texts]
response = embedding(
model="text-embedding-3-small", # Using LiteLLM unified approach
input=texts
)
return [item["embedding"] for item in response["data"]]
# Sample documents to add to the knowledge base
# Add documents to the knowledge base
documents = [
"ChromaDB is an open-source embedding database for AI applications.",
"RAG combines retrieval and generation for enhanced AI responses.",
"Vector embeddings enable semantic search across documents.",
"The Swarms framework supports multiple memory backends including ChromaDB.",
"LiteLLM provides a unified interface for different embedding models.",
]
# Document metadata
metadatas = [
{"category": "database", "topic": "chromadb", "difficulty": "beginner"},
{"category": "ai", "topic": "rag", "difficulty": "intermediate"},
{"category": "ai", "topic": "embeddings", "difficulty": "intermediate"},
{"category": "framework", "topic": "swarms", "difficulty": "beginner"},
{"category": "library", "topic": "litellm", "difficulty": "beginner"},
"ChromaDB is an open-source embedding database designed to store and query vector embeddings efficiently.",
"ChromaDB provides a simple Python API for adding, querying, and managing vector embeddings with metadata.",
"ChromaDB supports multiple embedding functions including OpenAI, Sentence Transformers, and custom models.",
"ChromaDB can run locally or in distributed mode, making it suitable for both development and production.",
"ChromaDB offers filtering capabilities allowing queries based on both vector similarity and metadata conditions.",
"ChromaDB provides persistent storage and can handle large-scale embedding collections with fast retrieval.",
"Kye Gomez is the founder of Swarms."
]
# Generate embeddings and add documents
embeddings = get_embeddings(documents)
document_ids = [f"doc_{i}" for i in range(len(documents))]
# Add documents to ChromaDB
collection.add(
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
ids=document_ids
)
# Method 1: Add documents individually
for doc in documents:
rag_db.add(doc)
# Custom memory class for ChromaDB integration
class ChromaDBMemory:
def __init__(self, collection, embedding_model="text-embedding-3-small"):
self.collection = collection
self.embedding_model = embedding_model
def add(self, text, metadata=None):
"""Add a document to ChromaDB"""
doc_id = f"doc_{len(self.collection.get()['ids'])}"
embedding = get_embeddings([text])[0]
self.collection.add(
embeddings=[embedding],
documents=[text],
metadatas=[metadata] if metadata else [{}],
ids=[doc_id]
)
return doc_id
def query(self, query_text, n_results=3, where=None):
"""Query ChromaDB for relevant documents"""
query_embedding = get_embeddings([query_text])[0]
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
where=where
)
return {
'documents': results['documents'][0],
'metadatas': results['metadatas'][0],
'distances': results['distances'][0]
}
# Initialize ChromaDB memory
memory = ChromaDBMemory(collection)
# Create Swarms agent with ChromaDB memory
# Create agent with RAG capabilities
agent = Agent(
agent_name="ChromaDB-RAG-Agent",
agent_description="Agent with ChromaDB-powered RAG for enhanced knowledge retrieval",
agent_name="RAG-Agent",
agent_description="Swarms Agent with ChromaDB-powered RAG for enhanced knowledge retrieval",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
# Note: Integrating custom memory requires implementing the memory interface
long_term_memory=rag_db
)
# Function to query with context
def query_with_rag(query_text):
"""Query with RAG using ChromaDB"""
# Retrieve relevant documents
results = memory.query(query_text, n_results=3)
# Prepare context
context = "\n".join(results['documents'])
# Enhanced prompt with context
enhanced_prompt = f"""
Based on the following context, please answer the question:
Context:
{context}
Question: {query_text}
Please provide a comprehensive answer based on the context provided.
"""
# Run agent with enhanced prompt
response = agent.run(enhanced_prompt)
return response
# Example usage
if __name__ == "__main__":
# Query with RAG
question = "What is ChromaDB and how does it work with RAG?"
response = query_with_rag(question)
print(f"Question: {question}")
print(f"Answer: {response}")
# Add new document dynamically
new_doc = "ChromaDB supports advanced filtering with metadata queries."
memory.add(new_doc, {"category": "feature", "topic": "filtering"})
# Query with filtering
filtered_results = memory.query(
"How to filter results?",
n_results=2,
where={"category": "feature"}
)
print(f"Filtered results: {filtered_results['documents']}")
# Query with RAG
response = agent.run("What is ChromaDB and who is founder of swarms ?")
print(response)
```
## Use Cases

@ -59,314 +59,49 @@ export CUDA_VISIBLE_DEVICES="0"
```python
"""
FAISS RAG Integration with Swarms Agent
Agent with FAISS RAG (Retrieval-Augmented Generation)
This example demonstrates how to integrate FAISS as a high-performance vector database
for RAG operations with Swarms agents using LiteLLM embeddings.
This example demonstrates using FAISS as a vector database for RAG operations,
allowing agents to store and retrieve documents for enhanced context.
"""
import faiss
import numpy as np
import pickle
import os
from typing import List, Dict, Any
from swarms import Agent
from litellm import embedding
class FAISSMemory:
"""FAISS-based memory system for RAG operations"""
def __init__(self,
dimension: int = 1536, # text-embedding-3-small dimension
embedding_model: str = "text-embedding-3-small",
index_type: str = "Flat",
nlist: int = 100):
"""
Initialize FAISS memory system
Args:
dimension: Vector dimension (1536 for text-embedding-3-small)
embedding_model: LiteLLM embedding model name
index_type: FAISS index type ('Flat', 'IVF', 'HNSW')
nlist: Number of clusters for IVF index
"""
self.dimension = dimension
self.embedding_model = embedding_model
self.index_type = index_type
self.nlist = nlist
# Initialize FAISS index
self.index = self._create_index()
# Storage for documents and metadata
self.documents = []
self.metadata = []
self.id_to_index = {}
self.next_id = 0
def _create_index(self):
"""Create appropriate FAISS index based on type"""
if self.index_type == "Flat":
# Exact search, good for small to medium datasets
return faiss.IndexFlatIP(self.dimension) # Inner product
elif self.index_type == "IVF":
# Approximate search with inverted file index
quantizer = faiss.IndexFlatIP(self.dimension)
index = faiss.IndexIVFFlat(quantizer, self.dimension, self.nlist)
return index
elif self.index_type == "HNSW":
# Hierarchical Navigable Small World graphs
index = faiss.IndexHNSWFlat(self.dimension, 32)
index.hnsw.efConstruction = 200
index.hnsw.efSearch = 100
return index
else:
raise ValueError(f"Unsupported index type: {self.index_type}")
def _get_embeddings(self, texts: List[str]) -> np.ndarray:
"""Generate embeddings using LiteLLM"""
response = embedding(
model=self.embedding_model,
input=texts
)
embeddings = np.array([item["embedding"] for item in response["data"]])
# Normalize for cosine similarity (convert to inner product)
faiss.normalize_L2(embeddings)
return embeddings.astype('float32')
def add_documents(self, documents: List[str], metadata: List[Dict] = None):
"""Add multiple documents to the index"""
if metadata is None:
metadata = [{}] * len(documents)
# Generate embeddings
embeddings = self._get_embeddings(documents)
# Train index if necessary (for IVF)
if self.index_type == "IVF" and not self.index.is_trained:
self.index.train(embeddings)
# Add to index
start_id = len(self.documents)
self.index.add(embeddings)
# Store documents and metadata
self.documents.extend(documents)
self.metadata.extend(metadata)
# Update ID mapping
for i, doc in enumerate(documents):
self.id_to_index[self.next_id] = start_id + i
self.next_id += 1
return list(range(start_id, start_id + len(documents)))
def add_document(self, document: str, metadata: Dict = None):
"""Add a single document to the index"""
return self.add_documents([document], [metadata or {}])[0]
def search(self, query: str, k: int = 3, score_threshold: float = None) -> Dict[str, Any]:
"""Search for similar documents"""
if len(self.documents) == 0:
return {"documents": [], "metadata": [], "scores": [], "ids": []}
# Generate query embedding
query_embedding = self._get_embeddings([query])
# Search
scores, indices = self.index.search(query_embedding, k)
scores = scores[0] # Get first (and only) query result
indices = indices[0]
# Filter by score threshold if provided
if score_threshold is not None:
valid_indices = scores >= score_threshold
scores = scores[valid_indices]
indices = indices[valid_indices]
# Prepare results
results = {
"documents": [self.documents[idx] for idx in indices if idx < len(self.documents)],
"metadata": [self.metadata[idx] for idx in indices if idx < len(self.metadata)],
"scores": scores.tolist(),
"ids": indices.tolist()
}
return results
def save_index(self, filepath: str):
"""Save FAISS index and metadata to disk"""
# Save FAISS index
faiss.write_index(self.index, f"{filepath}.index")
# Save metadata and documents
with open(f"{filepath}.pkl", 'wb') as f:
pickle.dump({
'documents': self.documents,
'metadata': self.metadata,
'id_to_index': self.id_to_index,
'next_id': self.next_id,
'dimension': self.dimension,
'embedding_model': self.embedding_model,
'index_type': self.index_type,
'nlist': self.nlist
}, f)
def load_index(self, filepath: str):
"""Load FAISS index and metadata from disk"""
# Load FAISS index
self.index = faiss.read_index(f"{filepath}.index")
# Load metadata and documents
with open(f"{filepath}.pkl", 'rb') as f:
data = pickle.load(f)
self.documents = data['documents']
self.metadata = data['metadata']
self.id_to_index = data['id_to_index']
self.next_id = data['next_id']
self.dimension = data['dimension']
self.embedding_model = data['embedding_model']
self.index_type = data['index_type']
self.nlist = data.get('nlist', 100)
# Initialize FAISS memory system
# Option 1: Flat index (exact search, good for small datasets)
memory = FAISSMemory(
dimension=1536, # text-embedding-3-small dimension
from swarms_memory import FAISSDB
# Initialize FAISS wrapper for RAG operations
rag_db = FAISSDB(
embedding_model="text-embedding-3-small",
index_type="Flat"
metric="cosine",
index_file="knowledge_base.faiss"
)
# Option 2: IVF index (approximate search, good for large datasets)
# memory = FAISSMemory(
# dimension=1536,
# embedding_model="text-embedding-3-small",
# index_type="IVF",
# nlist=100
# )
# Option 3: HNSW index (very fast approximate search)
# memory = FAISSMemory(
# dimension=1536,
# embedding_model="text-embedding-3-small",
# index_type="HNSW"
# )
# Sample documents for the knowledge base
# Add documents to the knowledge base
documents = [
"FAISS is a library for efficient similarity search and clustering of dense vectors.",
"RAG combines retrieval and generation for more accurate AI responses with relevant context.",
"Vector embeddings enable semantic search across large document collections.",
"The Swarms framework supports multiple memory backends including FAISS for high performance.",
"LiteLLM provides a unified interface for different embedding models and providers.",
"FAISS supports both exact and approximate search algorithms for different use cases.",
"GPU acceleration in FAISS can provide significant speedups for large-scale applications.",
"Index types in FAISS include Flat, IVF, HNSW, and PQ for different performance characteristics.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The swarms framework supports multiple memory backends including FAISS.",
"Swarms is the first and most reliable multi-agent production-grade framework.",
"Kye Gomez is Founder and CEO of Swarms."
]
# Document metadata
metadatas = [
{"category": "library", "topic": "faiss", "difficulty": "intermediate"},
{"category": "ai", "topic": "rag", "difficulty": "intermediate"},
{"category": "ai", "topic": "embeddings", "difficulty": "beginner"},
{"category": "framework", "topic": "swarms", "difficulty": "beginner"},
{"category": "library", "topic": "litellm", "difficulty": "beginner"},
{"category": "search", "topic": "algorithms", "difficulty": "advanced"},
{"category": "performance", "topic": "gpu", "difficulty": "advanced"},
{"category": "indexing", "topic": "algorithms", "difficulty": "advanced"},
]
# Add documents individually
for doc in documents:
rag_db.add(doc)
# Add documents to FAISS memory
print("Adding documents to FAISS index...")
doc_ids = memory.add_documents(documents, metadatas)
print(f"Added {len(doc_ids)} documents to the index")
# Create Swarms agent with FAISS-powered RAG
# Create agent with RAG capabilities
agent = Agent(
agent_name="FAISS-RAG-Agent",
agent_description="High-performance agent with FAISS-powered RAG for fast knowledge retrieval",
agent_name="RAG-Agent",
agent_description="Swarms Agent with FAISS-powered RAG for enhanced knowledge retrieval",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
)
def query_with_faiss_rag(query_text: str, k: int = 3):
"""Query with RAG using FAISS for high-performance retrieval"""
print(f"\nQuerying: {query_text}")
# Retrieve relevant documents using FAISS
results = memory.search(query_text, k=k)
if not results["documents"]:
return agent.run(query_text)
# Prepare context from retrieved documents
context = "\n".join([
f"Document {i+1}: {doc}"
for i, doc in enumerate(results["documents"])
])
# Show retrieved documents and scores
print("Retrieved documents:")
for i, (doc, score) in enumerate(zip(results["documents"], results["scores"])):
print(f" {i+1}. (Score: {score:.4f}) {doc[:100]}...")
# Enhanced prompt with context
enhanced_prompt = f"""
Based on the following retrieved context, please answer the question:
Context:
{context}
Question: {query_text}
Please provide a comprehensive answer based primarily on the context provided.
"""
# Run agent with enhanced prompt
response = agent.run(enhanced_prompt)
return response
# Example usage and testing
if __name__ == "__main__":
# Test different queries
queries = [
"What is FAISS and what are its key features?",
"How does RAG work and why is it useful?",
"What are the different FAISS index types?",
"How can GPU acceleration improve performance?",
]
for query in queries:
response = query_with_faiss_rag(query, k=3)
print(f"Answer: {response}\n")
print("-" * 80)
# Demonstrate adding new documents dynamically
print("\nAdding new document...")
new_doc = "FAISS supports product quantization (PQ) for memory-efficient storage of large vector datasets."
new_metadata = {"category": "compression", "topic": "pq", "difficulty": "advanced"}
memory.add_document(new_doc, new_metadata)
# Query about the new document
response = query_with_faiss_rag("What is product quantization in FAISS?")
print(f"Answer about PQ: {response}")
# Save the index for future use
print("\nSaving FAISS index...")
memory.save_index("./faiss_knowledge_base")
print("Index saved successfully!")
# Demonstrate loading (in a real application, you'd do this separately)
print("\nTesting index loading...")
new_memory = FAISSMemory()
new_memory.load_index("./faiss_knowledge_base")
test_results = new_memory.search("What is FAISS?", k=2)
print(f"Loaded index test - found {len(test_results['documents'])} documents")
# Query with RAG
response = agent.run("What is FAISS and how does it relate to RAG? Who is the founder of Swarms?")
print(response)
```
## Use Cases

@ -60,406 +60,61 @@ export OPENAI_API_KEY="your-openai-api-key"
```python
"""
Milvus Cloud RAG Integration with Swarms Agent
Agent with Milvus Cloud RAG (Retrieval-Augmented Generation)
This example demonstrates how to integrate Milvus Cloud as a managed vector database
for RAG operations with Swarms agents using LiteLLM embeddings.
This example demonstrates using Milvus Cloud (Zilliz) as a vector database for RAG operations,
allowing agents to store and retrieve documents from your cloud-hosted Milvus account.
"""
import os
from typing import List, Dict, Any, Optional
import numpy as np
from pymilvus import (
connections, Collection, FieldSchema, CollectionSchema,
DataType, utility, MilvusClient
)
from swarms import Agent
from litellm import embedding
class MilvusCloudMemory:
"""Milvus Cloud-based memory system for RAG operations"""
def __init__(self,
collection_name: str = "swarms_knowledge_base",
embedding_model: str = "text-embedding-3-small",
dimension: int = 1536,
index_type: str = "HNSW",
metric_type: str = "COSINE"):
"""
Initialize Milvus Cloud memory system
Args:
collection_name: Name of the Milvus collection
embedding_model: LiteLLM embedding model name
dimension: Vector dimension (1536 for text-embedding-3-small)
index_type: Index type (HNSW, IVF_FLAT, IVF_SQ8, etc.)
metric_type: Distance metric (COSINE, L2, IP)
"""
self.collection_name = collection_name
self.embedding_model = embedding_model
self.dimension = dimension
self.index_type = index_type
self.metric_type = metric_type
# Initialize Milvus Cloud connection
self.client = self._connect_to_cloud()
# Create collection if it doesn't exist
self.collection = self._create_or_get_collection()
def _connect_to_cloud(self):
"""Connect to Milvus Cloud using credentials"""
uri = os.getenv("MILVUS_CLOUD_URI")
token = os.getenv("MILVUS_CLOUD_TOKEN")
if not uri or not token:
raise ValueError("MILVUS_CLOUD_URI and MILVUS_CLOUD_TOKEN must be set")
# Using MilvusClient for simplified operations
client = MilvusClient(
uri=uri,
token=token
)
print(f"Connected to Milvus Cloud: {uri}")
return client
def _create_or_get_collection(self):
"""Create or get the collection with appropriate schema"""
# Check if collection exists
if self.client.has_collection(collection_name=self.collection_name):
print(f"Collection '{self.collection_name}' already exists")
return self.client.get_collection(self.collection_name)
# Define collection schema
schema = self.client.create_schema(
auto_id=True,
enable_dynamic_field=True
)
# Add fields
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=self.dimension)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
# Create collection
self.client.create_collection(
collection_name=self.collection_name,
schema=schema
)
# Create index on vector field
index_params = {
"index_type": self.index_type,
"metric_type": self.metric_type,
"params": self._get_index_params()
}
self.client.create_index(
collection_name=self.collection_name,
field_name="embedding",
index_params=index_params
)
print(f"Created collection '{self.collection_name}' with {self.index_type} index")
return self.client.get_collection(self.collection_name)
def _get_index_params(self):
"""Get index parameters based on index type"""
if self.index_type == "HNSW":
return {"M": 16, "efConstruction": 200}
elif self.index_type == "IVF_FLAT":
return {"nlist": 128}
elif self.index_type == "IVF_SQ8":
return {"nlist": 128}
elif self.index_type == "ANNOY":
return {"n_trees": 8}
else:
return {}
def _get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using LiteLLM"""
response = embedding(
model=self.embedding_model,
input=texts
)
return [item["embedding"] for item in response["data"]]
def add_documents(self, documents: List[str], metadata: List[Dict] = None) -> List[int]:
"""Add multiple documents to Milvus Cloud"""
if metadata is None:
metadata = [{}] * len(documents)
# Generate embeddings
embeddings = self._get_embeddings(documents)
# Prepare data for insertion
data = [
{
"embedding": emb,
"text": doc,
"metadata": meta
}
for emb, doc, meta in zip(embeddings, documents, metadata)
]
# Insert data
result = self.client.insert(
collection_name=self.collection_name,
data=data
)
# Flush to ensure data is written
self.client.flush(collection_name=self.collection_name)
print(f"Added {len(documents)} documents to Milvus Cloud")
return result["ids"] if "ids" in result else []
def add_document(self, document: str, metadata: Dict = None) -> int:
"""Add a single document to Milvus Cloud"""
result = self.add_documents([document], [metadata or {}])
return result[0] if result else None
def search(self,
query: str,
limit: int = 3,
filter_expr: str = None,
output_fields: List[str] = None) -> Dict[str, Any]:
"""Search for similar documents in Milvus Cloud"""
# Generate query embedding
query_embedding = self._get_embeddings([query])[0]
# Set default output fields
if output_fields is None:
output_fields = ["text", "metadata"]
# Prepare search parameters
search_params = {
"metric_type": self.metric_type,
"params": self._get_search_params()
}
# Perform search
results = self.client.search(
collection_name=self.collection_name,
data=[query_embedding],
anns_field="embedding",
search_params=search_params,
limit=limit,
expr=filter_expr,
output_fields=output_fields
)[0] # Get first (and only) query result
# Format results
formatted_results = {
"documents": [],
"metadata": [],
"scores": [],
"ids": []
}
for result in results:
formatted_results["documents"].append(result.get("text", ""))
formatted_results["metadata"].append(result.get("metadata", {}))
formatted_results["scores"].append(float(result["distance"]))
formatted_results["ids"].append(result["id"])
return formatted_results
def _get_search_params(self):
"""Get search parameters based on index type"""
if self.index_type == "HNSW":
return {"ef": 100}
elif self.index_type in ["IVF_FLAT", "IVF_SQ8"]:
return {"nprobe": 16}
else:
return {}
def delete_documents(self, filter_expr: str) -> int:
"""Delete documents matching the filter expression"""
result = self.client.delete(
collection_name=self.collection_name,
filter=filter_expr
)
print(f"Deleted documents matching: {filter_expr}")
return result
def get_collection_stats(self) -> Dict[str, Any]:
"""Get collection statistics"""
stats = self.client.get_collection_stats(collection_name=self.collection_name)
return {
"row_count": stats["row_count"],
"data_size": stats.get("data_size", "N/A"),
"index_size": stats.get("index_size", "N/A")
}
# Initialize Milvus Cloud memory
memory = MilvusCloudMemory(
collection_name="swarms_rag_demo",
embedding_model="text-embedding-3-small",
dimension=1536,
index_type="HNSW", # High performance for similarity search
metric_type="COSINE"
from swarms_memory import MilvusDB
# Get Milvus Cloud credentials
milvus_uri = os.getenv("MILVUS_URI")
milvus_token = os.getenv("MILVUS_TOKEN")
if not milvus_uri or not milvus_token:
print("❌ Missing Milvus Cloud credentials!")
print("Please set MILVUS_URI and MILVUS_TOKEN in your .env file")
exit(1)
# Initialize Milvus Cloud wrapper for RAG operations
rag_db = MilvusDB(
embedding_model="text-embedding-3-small", # OpenAI embedding model
collection_name="swarms_cloud_knowledge", # Cloud collection name
uri=milvus_uri, # Your Zilliz Cloud URI
token=milvus_token, # Your Zilliz Cloud token
metric="COSINE", # Distance metric for similarity search
)
# Sample documents for the knowledge base
# Add documents to the knowledge base
documents = [
"Milvus Cloud is a fully managed vector database service with enterprise-grade features.",
"RAG combines retrieval and generation to provide more accurate and contextual AI responses.",
"Vector embeddings enable semantic search across unstructured data like text and images.",
"The Swarms framework integrates with multiple vector databases including Milvus Cloud.",
"LiteLLM provides a unified interface for different embedding models and providers.",
"Milvus supports various index types including HNSW, IVF, and ANNOY for different use cases.",
"Auto-scaling in Milvus Cloud ensures optimal performance without manual intervention.",
"Enterprise security features include end-to-end encryption and compliance certifications.",
"Milvus Cloud is a fully managed vector database service provided by Zilliz.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The swarms framework supports multiple memory backends including Milvus Cloud.",
"Swarms is the first and most reliable multi-agent production-grade framework.",
"Kye Gomez is Founder and CEO of Swarms."
]
# Document metadata with rich attributes for filtering
metadatas = [
{"category": "database", "topic": "milvus_cloud", "difficulty": "beginner", "type": "overview"},
{"category": "ai", "topic": "rag", "difficulty": "intermediate", "type": "concept"},
{"category": "ai", "topic": "embeddings", "difficulty": "intermediate", "type": "concept"},
{"category": "framework", "topic": "swarms", "difficulty": "beginner", "type": "integration"},
{"category": "library", "topic": "litellm", "difficulty": "beginner", "type": "tool"},
{"category": "indexing", "topic": "algorithms", "difficulty": "advanced", "type": "technical"},
{"category": "scaling", "topic": "cloud", "difficulty": "intermediate", "type": "feature"},
{"category": "security", "topic": "enterprise", "difficulty": "advanced", "type": "feature"},
]
# Add documents to Milvus Cloud
print("Adding documents to Milvus Cloud...")
doc_ids = memory.add_documents(documents, metadatas)
print(f"Successfully added {len(doc_ids)} documents")
# Add documents individually
for doc in documents:
rag_db.add(doc)
# Display collection statistics
stats = memory.get_collection_stats()
print(f"Collection stats: {stats}")
# Create Swarms agent with Milvus Cloud RAG
# Create agent with RAG capabilities
agent = Agent(
agent_name="MilvusCloud-RAG-Agent",
agent_description="Enterprise agent with Milvus Cloud-powered RAG for scalable knowledge retrieval",
agent_name="Cloud-RAG-Agent",
agent_description="Swarms Agent with Milvus Cloud-powered RAG for scalable knowledge retrieval",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
)
def query_with_milvus_rag(query_text: str,
limit: int = 3,
filter_expr: str = None):
"""Query with RAG using Milvus Cloud for enterprise-scale retrieval"""
print(f"\nQuerying: {query_text}")
if filter_expr:
print(f"Filter: {filter_expr}")
# Retrieve relevant documents using Milvus Cloud
results = memory.search(
query=query_text,
limit=limit,
filter_expr=filter_expr
)
if not results["documents"]:
print("No relevant documents found")
return agent.run(query_text)
# Prepare context from retrieved documents
context = "\n".join([
f"Document {i+1}: {doc}"
for i, doc in enumerate(results["documents"])
])
# Display retrieved documents with metadata
print("Retrieved documents:")
for i, (doc, score, meta) in enumerate(zip(
results["documents"], results["scores"], results["metadata"]
)):
print(f" {i+1}. (Score: {score:.4f}) Category: {meta.get('category', 'N/A')}")
print(f" {doc[:100]}...")
# Enhanced prompt with context
enhanced_prompt = f"""
Based on the following retrieved context from our knowledge base, please answer the question:
Context:
{context}
Question: {query_text}
Please provide a comprehensive answer based primarily on the context provided.
"""
# Run agent with enhanced prompt
response = agent.run(enhanced_prompt)
return response
# Example usage and testing
if __name__ == "__main__":
# Test basic queries
queries = [
"What is Milvus Cloud and what makes it enterprise-ready?",
"How does RAG improve AI responses?",
"What are the different index types supported by Milvus?",
"What security features does Milvus Cloud provide?",
]
print("=== Basic RAG Queries ===")
for query in queries:
response = query_with_milvus_rag(query, limit=3)
print(f"Answer: {response}\n")
print("-" * 80)
# Test filtered queries using metadata
print("\n=== Filtered Queries ===")
# Query only advanced topics
response = query_with_milvus_rag(
"What are some advanced features?",
limit=2,
filter_expr='metadata["difficulty"] == "advanced"'
)
print(f"Advanced features: {response}\n")
# Query only concepts
response = query_with_milvus_rag(
"Explain key AI concepts",
limit=2,
filter_expr='metadata["type"] == "concept"'
)
print(f"AI concepts: {response}\n")
# Query database-related documents
response = query_with_milvus_rag(
"Tell me about database capabilities",
limit=3,
filter_expr='metadata["category"] == "database" or metadata["category"] == "indexing"'
)
print(f"Database capabilities: {response}\n")
# Demonstrate adding new documents with metadata
print("=== Adding New Document ===")
new_doc = "Milvus Cloud provides automatic backup and disaster recovery for enterprise data protection."
new_metadata = {
"category": "backup",
"topic": "disaster_recovery",
"difficulty": "intermediate",
"type": "feature"
}
memory.add_document(new_doc, new_metadata)
# Query about the new document
response = query_with_milvus_rag("What backup features are available?")
print(f"Backup features: {response}\n")
# Display final collection statistics
final_stats = memory.get_collection_stats()
print(f"Final collection stats: {final_stats}")
# Example of deleting documents (use with caution)
# memory.delete_documents('metadata["category"] == "test"')
# Query with RAG
response = agent.run("What is Milvus Cloud and how does it relate to RAG? Who is the founder of Swarms?")
print(response)
```
## Use Cases

@ -56,417 +56,52 @@ export OPENAI_API_KEY="your-openai-api-key"
```python
"""
Milvus Lite RAG Integration with Swarms Agent
Agent with Milvus RAG (Retrieval-Augmented Generation)
This example demonstrates how to integrate Milvus Lite as a local vector database
for RAG operations with Swarms agents using LiteLLM embeddings.
This example demonstrates using Milvus as a vector database for RAG operations,
allowing agents to store and retrieve documents for enhanced context.
"""
import os
from typing import List, Dict, Any, Optional
import numpy as np
from pymilvus import (
connections, Collection, FieldSchema, CollectionSchema,
DataType, utility, MilvusClient
)
from swarms import Agent
from litellm import embedding
from swarms_memory import MilvusDB
class MilvusLiteMemory:
"""Milvus Lite-based memory system for RAG operations"""
def __init__(self,
db_path: str = "./milvus_lite.db",
collection_name: str = "swarms_knowledge_base",
embedding_model: str = "text-embedding-3-small",
dimension: int = 1536,
index_type: str = "HNSW",
metric_type: str = "COSINE"):
"""
Initialize Milvus Lite memory system
Args:
db_path: Path to local Milvus Lite database file
collection_name: Name of the Milvus collection
embedding_model: LiteLLM embedding model name
dimension: Vector dimension (1536 for text-embedding-3-small)
index_type: Index type (HNSW, IVF_FLAT, etc.)
metric_type: Distance metric (COSINE, L2, IP)
"""
self.db_path = db_path
self.collection_name = collection_name
self.embedding_model = embedding_model
self.dimension = dimension
self.index_type = index_type
self.metric_type = metric_type
# Initialize Milvus Lite connection
self.client = self._connect_to_lite()
# Create collection if it doesn't exist
self.collection = self._create_or_get_collection()
def _connect_to_lite(self):
"""Connect to Milvus Lite using local file"""
# Create database directory if it doesn't exist
os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True)
# Connect using MilvusClient with local file
client = MilvusClient(uri=self.db_path)
print(f"Connected to Milvus Lite database: {self.db_path}")
return client
def _create_or_get_collection(self):
"""Create or get the collection with appropriate schema"""
# Check if collection exists
if self.client.has_collection(collection_name=self.collection_name):
print(f"Collection '{self.collection_name}' already exists")
return self.collection_name
# Define collection schema
schema = self.client.create_schema(
auto_id=True,
enable_dynamic_field=True
)
# Add fields
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=self.dimension)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
# Create collection
self.client.create_collection(
collection_name=self.collection_name,
schema=schema
)
# Create index on vector field
index_params = {
"index_type": self.index_type,
"metric_type": self.metric_type,
"params": self._get_index_params()
}
self.client.create_index(
collection_name=self.collection_name,
field_name="embedding",
index_params=index_params
)
print(f"Created collection '{self.collection_name}' with {self.index_type} index")
return self.collection_name
def _get_index_params(self):
"""Get index parameters based on index type"""
if self.index_type == "HNSW":
return {"M": 16, "efConstruction": 200}
elif self.index_type == "IVF_FLAT":
return {"nlist": 64} # Smaller nlist for lite version
elif self.index_type == "IVF_SQ8":
return {"nlist": 64}
else:
return {}
def _get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using LiteLLM"""
response = embedding(
model=self.embedding_model,
input=texts
)
return [item["embedding"] for item in response["data"]]
def add_documents(self, documents: List[str], metadata: List[Dict] = None) -> List[int]:
"""Add multiple documents to Milvus Lite"""
if metadata is None:
metadata = [{}] * len(documents)
# Generate embeddings
embeddings = self._get_embeddings(documents)
# Prepare data for insertion
data = [
{
"embedding": emb,
"text": doc,
"metadata": meta
}
for emb, doc, meta in zip(embeddings, documents, metadata)
]
# Insert data
result = self.client.insert(
collection_name=self.collection_name,
data=data
)
print(f"Added {len(documents)} documents to Milvus Lite")
return result.get("ids", [])
def add_document(self, document: str, metadata: Dict = None) -> int:
"""Add a single document to Milvus Lite"""
result = self.add_documents([document], [metadata or {}])
return result[0] if result else None
def search(self,
query: str,
limit: int = 3,
filter_expr: str = None,
output_fields: List[str] = None) -> Dict[str, Any]:
"""Search for similar documents in Milvus Lite"""
# Generate query embedding
query_embedding = self._get_embeddings([query])[0]
# Set default output fields
if output_fields is None:
output_fields = ["text", "metadata"]
# Prepare search parameters
search_params = {
"metric_type": self.metric_type,
"params": self._get_search_params()
}
# Perform search
results = self.client.search(
collection_name=self.collection_name,
data=[query_embedding],
anns_field="embedding",
search_params=search_params,
limit=limit,
expr=filter_expr,
output_fields=output_fields
)[0] # Get first (and only) query result
# Format results
formatted_results = {
"documents": [],
"metadata": [],
"scores": [],
"ids": []
}
for result in results:
formatted_results["documents"].append(result.get("text", ""))
formatted_results["metadata"].append(result.get("metadata", {}))
formatted_results["scores"].append(float(result["distance"]))
formatted_results["ids"].append(result["id"])
return formatted_results
def _get_search_params(self):
"""Get search parameters based on index type"""
if self.index_type == "HNSW":
return {"ef": 64} # Lower ef for lite version
elif self.index_type in ["IVF_FLAT", "IVF_SQ8"]:
return {"nprobe": 8} # Lower nprobe for lite version
else:
return {}
def delete_documents(self, filter_expr: str) -> int:
"""Delete documents matching the filter expression"""
result = self.client.delete(
collection_name=self.collection_name,
filter=filter_expr
)
print(f"Deleted documents matching: {filter_expr}")
return result
def get_collection_stats(self) -> Dict[str, Any]:
"""Get collection statistics"""
stats = self.client.get_collection_stats(collection_name=self.collection_name)
return {
"row_count": stats["row_count"],
"data_size": stats.get("data_size", "N/A")
}
def backup_database(self, backup_path: str):
"""Create a backup of the Milvus Lite database"""
import shutil
shutil.copy2(self.db_path, backup_path)
print(f"Database backed up to: {backup_path}")
def get_database_size(self) -> int:
"""Get the size of the database file in bytes"""
return os.path.getsize(self.db_path) if os.path.exists(self.db_path) else 0
# Initialize Milvus Lite memory
memory = MilvusLiteMemory(
db_path="./data/swarms_rag.db",
collection_name="swarms_lite_demo",
embedding_model="text-embedding-3-small",
dimension=1536,
index_type="HNSW", # Efficient for local use
metric_type="COSINE"
# Initialize Milvus wrapper for RAG operations
rag_db = MilvusDB(
embedding_model="text-embedding-3-small", # OpenAI embedding model
collection_name="swarms_knowledge", # Collection name
db_file="swarms_milvus.db", # Local Milvus Lite database
metric="COSINE", # Distance metric for similarity search
)
# Sample documents for the knowledge base
# Add documents to the knowledge base
documents = [
"Milvus Lite is a lightweight, standalone version of Milvus for local development and small applications.",
"RAG systems combine document retrieval with text generation for more informed AI responses.",
"Vector embeddings represent text as high-dimensional numerical vectors for semantic similarity.",
"The Swarms framework provides flexible integration with various vector database backends.",
"LiteLLM enables unified access to different embedding models through a single interface.",
"Local vector databases like Milvus Lite eliminate network latency and external dependencies.",
"HNSW indices provide excellent performance for similarity search in moderate-sized datasets.",
"Embedded databases run within the application process for simplified deployment.",
]
# Document metadata for filtering and organization
metadatas = [
{"category": "database", "topic": "milvus_lite", "difficulty": "beginner", "type": "overview"},
{"category": "ai", "topic": "rag", "difficulty": "intermediate", "type": "concept"},
{"category": "ml", "topic": "embeddings", "difficulty": "intermediate", "type": "concept"},
{"category": "framework", "topic": "swarms", "difficulty": "beginner", "type": "integration"},
{"category": "library", "topic": "litellm", "difficulty": "beginner", "type": "tool"},
{"category": "performance", "topic": "local", "difficulty": "intermediate", "type": "benefit"},
{"category": "indexing", "topic": "hnsw", "difficulty": "advanced", "type": "algorithm"},
{"category": "architecture", "topic": "embedded", "difficulty": "intermediate", "type": "pattern"},
"Milvus is an open-source vector database built for scalable similarity search and AI applications.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The swarms framework supports multiple memory backends including Milvus.",
"Swarms is the first and most reliable multi-agent production-grade framework.",
"Kye Gomez is Founder and CEO of Swarms."
]
# Add documents to Milvus Lite
print("Adding documents to Milvus Lite...")
doc_ids = memory.add_documents(documents, metadatas)
print(f"Successfully added {len(doc_ids)} documents")
# Add documents individually
for doc in documents:
rag_db.add(doc)
# Display database information
stats = memory.get_collection_stats()
db_size = memory.get_database_size()
print(f"Collection stats: {stats}")
print(f"Database size: {db_size / 1024:.1f} KB")
# Create Swarms agent with Milvus Lite RAG
# Create agent with RAG capabilities
agent = Agent(
agent_name="MilvusLite-RAG-Agent",
agent_description="Local agent with Milvus Lite-powered RAG for development and testing",
agent_name="RAG-Agent",
agent_description="Swarms Agent with Milvus-powered RAG for enhanced knowledge retrieval and semantic search",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
)
def query_with_milvus_lite_rag(query_text: str,
limit: int = 3,
filter_expr: str = None):
"""Query with RAG using Milvus Lite for local, low-latency retrieval"""
print(f"\nQuerying: {query_text}")
if filter_expr:
print(f"Filter: {filter_expr}")
# Retrieve relevant documents using Milvus Lite
results = memory.search(
query=query_text,
limit=limit,
filter_expr=filter_expr
)
if not results["documents"]:
print("No relevant documents found")
return agent.run(query_text)
# Prepare context from retrieved documents
context = "\n".join([
f"Document {i+1}: {doc}"
for i, doc in enumerate(results["documents"])
])
# Display retrieved documents with metadata
print("Retrieved documents:")
for i, (doc, score, meta) in enumerate(zip(
results["documents"], results["scores"], results["metadata"]
)):
print(f" {i+1}. (Score: {score:.4f}) Category: {meta.get('category', 'N/A')}")
print(f" {doc[:100]}...")
# Enhanced prompt with context
enhanced_prompt = f"""
Based on the following retrieved context from our local knowledge base, please answer the question:
# Query with RAG
response = agent.run("What is Milvus and how does it relate to RAG? Who is the founder of Swarms?")
print(response)
Context:
{context}
Question: {query_text}
Please provide a comprehensive answer based primarily on the context provided.
"""
# Run agent with enhanced prompt
response = agent.run(enhanced_prompt)
return response
# Example usage and testing
if __name__ == "__main__":
# Test basic queries
queries = [
"What is Milvus Lite and how is it different from full Milvus?",
"How does RAG improve AI applications?",
"What are the benefits of using local vector databases?",
"How do HNSW indices work for similarity search?",
]
print("=== Basic RAG Queries ===")
for query in queries:
response = query_with_milvus_lite_rag(query, limit=3)
print(f"Answer: {response}\n")
print("-" * 80)
# Test filtered queries using metadata
print("\n=== Filtered Queries ===")
# Query only concepts
response = query_with_milvus_lite_rag(
"Explain key technical concepts",
limit=2,
filter_expr='metadata["type"] == "concept"'
)
print(f"Technical concepts: {response}\n")
# Query beginner-level content
response = query_with_milvus_lite_rag(
"What should beginners know?",
limit=3,
filter_expr='metadata["difficulty"] == "beginner"'
)
print(f"Beginner content: {response}\n")
# Query database-related documents
response = query_with_milvus_lite_rag(
"Tell me about database features",
limit=2,
filter_expr='metadata["category"] == "database" or metadata["category"] == "performance"'
)
print(f"Database features: {response}\n")
# Demonstrate adding new documents dynamically
print("=== Adding New Document ===")
new_doc = "Milvus Lite supports persistent storage with automatic data recovery on restart."
new_metadata = {
"category": "persistence",
"topic": "storage",
"difficulty": "intermediate",
"type": "feature"
}
memory.add_document(new_doc, new_metadata)
# Query about the new document
response = query_with_milvus_lite_rag("How does data persistence work?")
print(f"Data persistence: {response}\n")
# Demonstrate backup functionality
print("=== Database Management ===")
backup_path = "./data/swarms_rag_backup.db"
memory.backup_database(backup_path)
# Display final statistics
final_stats = memory.get_collection_stats()
final_db_size = memory.get_database_size()
print(f"Final collection stats: {final_stats}")
print(f"Final database size: {final_db_size / 1024:.1f} KB")
# Example of cleaning up (optional)
# memory.delete_documents('metadata["category"] == "test"')
```
## Use Cases

@ -60,475 +60,56 @@ export OPENAI_API_KEY="your-openai-api-key"
```python
"""
Pinecone RAG Integration with Swarms Agent
Agent with Pinecone RAG (Retrieval-Augmented Generation)
This example demonstrates how to integrate Pinecone as a serverless vector database
for RAG operations with Swarms agents using LiteLLM embeddings.
This example demonstrates using Pinecone as a vector database for RAG operations,
allowing agents to store and retrieve documents for enhanced context.
"""
import os
import time
from typing import List, Dict, Any, Optional
import numpy as np
import pinecone
from swarms import Agent
from litellm import embedding
from swarms_memory import PineconeMemory
class PineconeMemory:
"""Pinecone-based memory system for RAG operations"""
def __init__(self,
index_name: str = "swarms-knowledge-base",
embedding_model: str = "text-embedding-3-small",
dimension: int = 1536,
metric: str = "cosine",
pod_type: str = "p1.x1",
replicas: int = 1,
shards: int = 1):
"""
Initialize Pinecone memory system
Args:
index_name: Name of the Pinecone index
embedding_model: LiteLLM embedding model name
dimension: Vector dimension (1536 for text-embedding-3-small)
metric: Distance metric (cosine, euclidean, dotproduct)
pod_type: Pinecone pod type for performance/cost optimization
replicas: Number of replicas for high availability
shards: Number of shards for horizontal scaling
"""
self.index_name = index_name
self.embedding_model = embedding_model
self.dimension = dimension
self.metric = metric
self.pod_type = pod_type
self.replicas = replicas
self.shards = shards
# Initialize Pinecone connection
self._initialize_pinecone()
# Create or connect to index
self.index = self._create_or_get_index()
# Document counter for ID generation
self._doc_counter = 0
def _initialize_pinecone(self):
"""Initialize Pinecone with API credentials"""
api_key = os.getenv("PINECONE_API_KEY")
environment = os.getenv("PINECONE_ENVIRONMENT")
if not api_key or not environment:
raise ValueError("PINECONE_API_KEY and PINECONE_ENVIRONMENT must be set")
pinecone.init(api_key=api_key, environment=environment)
print(f"Initialized Pinecone in environment: {environment}")
def _create_or_get_index(self):
"""Create or get the Pinecone index"""
# Check if index exists
if self.index_name in pinecone.list_indexes():
print(f"Connecting to existing index: {self.index_name}")
return pinecone.Index(self.index_name)
# Create new index
print(f"Creating new index: {self.index_name}")
pinecone.create_index(
name=self.index_name,
dimension=self.dimension,
metric=self.metric,
pod_type=self.pod_type,
replicas=self.replicas,
shards=self.shards
)
# Wait for index to be ready
print("Waiting for index to be ready...")
while not pinecone.describe_index(self.index_name).status['ready']:
time.sleep(1)
print(f"Index {self.index_name} is ready!")
return pinecone.Index(self.index_name)
def _get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using LiteLLM"""
response = embedding(
model=self.embedding_model,
input=texts
)
return [item["embedding"] for item in response["data"]]
def _generate_id(self, prefix: str = "doc") -> str:
"""Generate unique document ID"""
self._doc_counter += 1
return f"{prefix}_{self._doc_counter}_{int(time.time())}"
def add_documents(self,
documents: List[str],
metadata: List[Dict] = None,
ids: List[str] = None,
namespace: str = None,
batch_size: int = 100) -> List[str]:
"""Add multiple documents to Pinecone"""
if metadata is None:
metadata = [{}] * len(documents)
if ids is None:
ids = [self._generate_id() for _ in documents]
# Generate embeddings
embeddings = self._get_embeddings(documents)
# Prepare vectors for upsert
vectors = []
for i, (doc_id, embedding_vec, doc, meta) in enumerate(
zip(ids, embeddings, documents, metadata)
):
# Add document text to metadata
meta_with_text = {**meta, "text": doc}
vectors.append({
"id": doc_id,
"values": embedding_vec,
"metadata": meta_with_text
})
# Batch upsert vectors
upserted_ids = []
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.index.upsert(vectors=batch, namespace=namespace)
upserted_ids.extend([v["id"] for v in batch])
print(f"Added {len(documents)} documents to Pinecone index")
return upserted_ids
def add_document(self,
document: str,
metadata: Dict = None,
doc_id: str = None,
namespace: str = None) -> str:
"""Add a single document to Pinecone"""
result = self.add_documents(
documents=[document],
metadata=[metadata or {}],
ids=[doc_id] if doc_id else None,
namespace=namespace
)
return result[0] if result else None
def search(self,
query: str,
top_k: int = 3,
namespace: str = None,
filter_dict: Dict = None,
include_metadata: bool = True,
include_values: bool = False) -> Dict[str, Any]:
"""Search for similar documents in Pinecone"""
# Generate query embedding
query_embedding = self._get_embeddings([query])[0]
# Perform search
results = self.index.query(
vector=query_embedding,
top_k=top_k,
namespace=namespace,
filter=filter_dict,
include_metadata=include_metadata,
include_values=include_values
)
# Format results
formatted_results = {
"documents": [],
"metadata": [],
"scores": [],
"ids": []
}
for match in results.matches:
formatted_results["ids"].append(match.id)
formatted_results["scores"].append(float(match.score))
if include_metadata and match.metadata:
formatted_results["documents"].append(match.metadata.get("text", ""))
# Remove text from metadata to avoid duplication
meta_without_text = {k: v for k, v in match.metadata.items() if k != "text"}
formatted_results["metadata"].append(meta_without_text)
else:
formatted_results["documents"].append("")
formatted_results["metadata"].append({})
return formatted_results
def delete_documents(self,
ids: List[str] = None,
filter_dict: Dict = None,
namespace: str = None,
delete_all: bool = False) -> Dict:
"""Delete documents from Pinecone"""
if delete_all:
return self.index.delete(delete_all=True, namespace=namespace)
elif ids:
return self.index.delete(ids=ids, namespace=namespace)
elif filter_dict:
return self.index.delete(filter=filter_dict, namespace=namespace)
else:
raise ValueError("Must specify ids, filter_dict, or delete_all=True")
def get_index_stats(self, namespace: str = None) -> Dict:
"""Get index statistics"""
return self.index.describe_index_stats().to_dict()
def list_namespaces(self) -> List[str]:
"""List all namespaces in the index"""
stats = self.index.describe_index_stats()
return list(stats.namespaces.keys()) if stats.namespaces else []
def update_document(self,
doc_id: str,
document: str = None,
metadata: Dict = None,
namespace: str = None):
"""Update an existing document"""
if document:
# Generate new embedding if document text changed
embedding_vec = self._get_embeddings([document])[0]
metadata = metadata or {}
metadata["text"] = document
self.index.upsert(
vectors=[{
"id": doc_id,
"values": embedding_vec,
"metadata": metadata
}],
namespace=namespace
)
elif metadata:
# Update only metadata (requires fetching existing vector)
fetch_result = self.index.fetch([doc_id], namespace=namespace)
if doc_id in fetch_result.vectors:
existing_vector = fetch_result.vectors[doc_id]
updated_metadata = {**existing_vector.metadata, **metadata}
self.index.upsert(
vectors=[{
"id": doc_id,
"values": existing_vector.values,
"metadata": updated_metadata
}],
namespace=namespace
)
# Initialize Pinecone memory
memory = PineconeMemory(
index_name="swarms-rag-demo",
# Initialize Pinecone wrapper for RAG operations
rag_db = PineconeMemory(
api_key=os.getenv("PINECONE_API_KEY", "your-pinecone-api-key"),
index_name="knowledge-base",
embedding_model="text-embedding-3-small",
dimension=1536,
metric="cosine",
pod_type="p1.x1" # Cost-effective for development
namespace="examples"
)
# Sample documents for the knowledge base
# Add documents to the knowledge base
documents = [
"Pinecone is a fully managed vector database service designed for AI applications at scale.",
"RAG systems enhance AI responses by retrieving relevant context from knowledge bases.",
"Vector embeddings enable semantic similarity search across unstructured data.",
"The Swarms framework provides seamless integration with cloud vector databases like Pinecone.",
"LiteLLM offers unified access to various embedding models through a consistent API.",
"Serverless vector databases eliminate infrastructure management and provide auto-scaling.",
"Real-time updates in Pinecone allow dynamic knowledge base modifications without downtime.",
"Global distribution ensures low-latency access to vector search across worldwide regions.",
"Pinecone is a vector database that makes it easy to add semantic search to applications.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The swarms framework supports multiple memory backends including Pinecone.",
"Swarms is the first and most reliable multi-agent production-grade framework.",
"Kye Gomez is Founder and CEO of Swarms."
]
# Rich metadata for advanced filtering
metadatas = [
{"category": "database", "topic": "pinecone", "difficulty": "beginner", "type": "overview", "industry": "tech"},
{"category": "ai", "topic": "rag", "difficulty": "intermediate", "type": "concept", "industry": "ai"},
{"category": "ml", "topic": "embeddings", "difficulty": "intermediate", "type": "concept", "industry": "ai"},
{"category": "framework", "topic": "swarms", "difficulty": "beginner", "type": "integration", "industry": "ai"},
{"category": "library", "topic": "litellm", "difficulty": "beginner", "type": "tool", "industry": "ai"},
{"category": "architecture", "topic": "serverless", "difficulty": "advanced", "type": "concept", "industry": "cloud"},
{"category": "feature", "topic": "realtime", "difficulty": "advanced", "type": "capability", "industry": "database"},
{"category": "infrastructure", "topic": "global", "difficulty": "advanced", "type": "architecture", "industry": "cloud"},
]
# Add documents individually
for doc in documents:
rag_db.add(doc)
# Add documents to Pinecone
print("Adding documents to Pinecone...")
doc_ids = memory.add_documents(documents, metadatas)
print(f"Successfully added {len(doc_ids)} documents")
# Display index statistics
stats = memory.get_index_stats()
print(f"Index stats: Total vectors: {stats.get('total_vector_count', 0)}")
# Wait for Pinecone's eventual consistency to ensure documents are indexed
print("Waiting for documents to be indexed...")
time.sleep(2)
# Create Swarms agent with Pinecone RAG
# Create agent with RAG capabilities
agent = Agent(
agent_name="Pinecone-RAG-Agent",
agent_description="Cloud-native agent with Pinecone-powered RAG for global-scale knowledge retrieval",
agent_name="RAG-Agent",
agent_description="Swarms Agent with Pinecone-powered RAG for enhanced knowledge retrieval",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
)
def query_with_pinecone_rag(query_text: str,
top_k: int = 3,
filter_dict: Dict = None,
namespace: str = None):
"""Query with RAG using Pinecone for global-scale retrieval"""
print(f"\nQuerying: {query_text}")
if filter_dict:
print(f"Filter: {filter_dict}")
# Retrieve relevant documents using Pinecone
results = memory.search(
query=query_text,
top_k=top_k,
filter_dict=filter_dict,
namespace=namespace
)
if not results["documents"]:
print("No relevant documents found")
return agent.run(query_text)
# Prepare context from retrieved documents
context = "\n".join([
f"Document {i+1}: {doc}"
for i, doc in enumerate(results["documents"])
])
# Display retrieved documents with metadata and scores
print("Retrieved documents:")
for i, (doc, score, meta) in enumerate(zip(
results["documents"], results["scores"], results["metadata"]
)):
print(f" {i+1}. (Score: {score:.4f}) Category: {meta.get('category', 'N/A')}")
print(f" Topic: {meta.get('topic', 'N/A')}, Industry: {meta.get('industry', 'N/A')}")
print(f" {doc[:100]}...")
# Enhanced prompt with context
enhanced_prompt = f"""
Based on the following retrieved context from our global knowledge base, please answer the question:
Context:
{context}
Question: {query_text}
Please provide a comprehensive answer based primarily on the context provided.
"""
# Run agent with enhanced prompt
response = agent.run(enhanced_prompt)
return response
# Example usage and testing
if __name__ == "__main__":
# Test basic queries
queries = [
"What is Pinecone and what makes it suitable for AI applications?",
"How do RAG systems work and what are their benefits?",
"What are the advantages of serverless vector databases?",
"How does global distribution improve vector search performance?",
]
print("=== Basic RAG Queries ===")
for query in queries:
response = query_with_pinecone_rag(query, top_k=3)
print(f"Answer: {response}\n")
print("-" * 80)
# Test advanced filtering
print("\n=== Advanced Filtering Queries ===")
# Query only AI industry documents
response = query_with_pinecone_rag(
"What are key AI concepts?",
top_k=3,
filter_dict={"industry": "ai"}
)
print(f"AI concepts: {response}\n")
# Query advanced topics in cloud/database industry
response = query_with_pinecone_rag(
"What are advanced cloud and database features?",
top_k=2,
filter_dict={
"$and": [
{"difficulty": "advanced"},
{"$or": [{"industry": "cloud"}, {"industry": "database"}]}
]
}
)
print(f"Advanced features: {response}\n")
# Query concepts and overviews for beginners
response = query_with_pinecone_rag(
"What should beginners know about databases and frameworks?",
top_k=3,
filter_dict={
"$and": [
{"difficulty": "beginner"},
{"$or": [{"category": "database"}, {"category": "framework"}]}
]
}
)
print(f"Beginner content: {response}\n")
# Demonstrate namespaces (optional)
print("=== Namespace Example ===")
# Add documents to a specific namespace
namespace_docs = ["Pinecone supports namespaces for logical data separation and multi-tenancy."]
namespace_meta = [{"category": "feature", "topic": "namespaces", "difficulty": "intermediate"}]
memory.add_documents(namespace_docs, namespace_meta, namespace="features")
# Query within namespace
response = query_with_pinecone_rag(
"How do namespaces work?",
top_k=2,
namespace="features"
)
print(f"Namespace query: {response}\n")
# Demonstrate document update
print("=== Document Update Example ===")
# Update an existing document
if doc_ids:
memory.update_document(
doc_id=doc_ids[0],
metadata={"updated": True, "version": "2.0"}
)
print("Updated document metadata")
# Add dynamic document
new_doc = "Pinecone provides comprehensive monitoring and analytics for vector database operations."
new_meta = {
"category": "monitoring",
"topic": "analytics",
"difficulty": "intermediate",
"industry": "database",
"type": "feature"
}
new_id = memory.add_document(new_doc, new_meta)
# Query about the new document
response = query_with_pinecone_rag("What monitoring capabilities are available?")
print(f"Monitoring capabilities: {response}\n")
# Display final statistics
final_stats = memory.get_index_stats()
print(f"Final index stats: Total vectors: {final_stats.get('total_vector_count', 0)}")
# List namespaces
namespaces = memory.list_namespaces()
print(f"Available namespaces: {namespaces}")
# Example of cleanup (use with caution)
# memory.delete_documents(filter_dict={"category": "test"})
# Query with RAG
response = agent.run("What is Pinecone and how does it relate to RAG? Who is the founder of Swarms?")
print(response)
```
## Use Cases

@ -1,452 +0,0 @@
# Qdrant RAG Integration
This example demonstrates how to integrate Qdrant vector database with Swarms agents for Retrieval-Augmented Generation (RAG). Qdrant is a high-performance vector database that enables agents to store, index, and retrieve documents using semantic similarity search for enhanced context and more accurate responses.
## Prerequisites
- Python 3.7+
- OpenAI API key
- Swarms library
- Qdrant client and swarms-memory
## Installation
```bash
pip install qdrant-client fastembed swarms-memory litellm
```
> **Note**: The `litellm` package is required for using LiteLLM provider models like OpenAI, Azure, Cohere, etc.
## Tutorial Steps
### Step 1: Install Swarms
First, install the latest version of Swarms:
```bash
pip3 install -U swarms
```
### Step 2: Environment Setup
Set up your environment variables in a `.env` file:
```plaintext
OPENAI_API_KEY="your-api-key-here"
QDRANT_URL="https://your-cluster.qdrant.io"
QDRANT_API_KEY="your-api-key"
WORKSPACE_DIR="agent_workspace"
```
### Step 3: Choose Deployment
Select your Qdrant deployment option:
- **In-memory**: For testing and development (data is not persisted)
- **Local server**: For production deployments with persistent storage
- **Qdrant Cloud**: Managed cloud service (recommended for production)
### Step 4: Configure Database
Set up the vector database wrapper with your preferred embedding model and collection settings
### Step 5: Add Documents
Load documents using individual or batch processing methods
### Step 6: Create Agent
Initialize your agent with RAG capabilities and start querying
## Code
### Basic Setup with Individual Document Processing
```python
from qdrant_client import QdrantClient, models
from swarms import Agent
from swarms_memory import QdrantDB
import os
# Client Configuration Options
# Option 1: In-memory (testing only - data is NOT persisted)
# ":memory:" creates a temporary in-memory database that's lost when program ends
client = QdrantClient(":memory:")
# Option 2: Local Qdrant Server
# Requires: docker run -p 6333:6333 qdrant/qdrant
# client = QdrantClient(host="localhost", port=6333)
# Option 3: Qdrant Cloud (recommended for production)
# Get credentials from https://cloud.qdrant.io
# client = QdrantClient(
# url=os.getenv("QDRANT_URL"), # e.g., "https://xyz-abc.eu-central.aws.cloud.qdrant.io"
# api_key=os.getenv("QDRANT_API_KEY") # Your Qdrant Cloud API key
# )
# Create vector database wrapper
rag_db = QdrantDB(
client=client,
embedding_model="text-embedding-3-small",
collection_name="knowledge_base",
distance=models.Distance.COSINE,
n_results=3
)
# Add documents to the knowledge base
documents = [
"Qdrant is a vector database optimized for similarity search and AI applications.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The swarms framework supports multiple memory backends including Qdrant."
]
# Method 1: Add documents individually
for doc in documents:
rag_db.add(doc)
# Create agent with RAG capabilities
agent = Agent(
agent_name="RAG-Agent",
agent_description="Agent with Qdrant-powered RAG for enhanced knowledge retrieval",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
)
# Query with RAG
try:
response = agent.run("What is Qdrant and how does it relate to RAG?")
print(response)
except Exception as e:
print(f"Error during query: {e}")
# Handle error appropriately
```
### Advanced Setup with Batch Processing and Metadata
```python
from qdrant_client import QdrantClient, models
from swarms import Agent
from swarms_memory import QdrantDB
import os
# Initialize client (using in-memory for this example)
client = QdrantClient(":memory:")
# Create vector database wrapper
rag_db = QdrantDB(
client=client,
embedding_model="text-embedding-3-small",
collection_name="advanced_knowledge_base",
distance=models.Distance.COSINE,
n_results=3
)
# Method 2: Batch add documents (more efficient for large datasets)
# Example with metadata
documents_with_metadata = [
"Machine learning is a subset of artificial intelligence.",
"Deep learning uses neural networks with multiple layers.",
"Natural language processing enables computers to understand human language.",
"Computer vision allows machines to interpret visual information.",
"Reinforcement learning learns through interaction with an environment."
]
metadata = [
{"category": "AI", "difficulty": "beginner", "topic": "overview"},
{"category": "ML", "difficulty": "intermediate", "topic": "neural_networks"},
{"category": "NLP", "difficulty": "intermediate", "topic": "language"},
{"category": "CV", "difficulty": "advanced", "topic": "vision"},
{"category": "RL", "difficulty": "advanced", "topic": "learning"}
]
# Batch add with metadata
doc_ids = rag_db.batch_add(documents_with_metadata, metadata=metadata, batch_size=3)
print(f"Added {len(doc_ids)} documents in batch")
# Query with metadata return
results_with_metadata = rag_db.query(
"What is artificial intelligence?",
n_results=3,
return_metadata=True
)
for i, result in enumerate(results_with_metadata):
print(f"\nResult {i+1}:")
print(f" Document: {result['document']}")
print(f" Category: {result['category']}")
print(f" Difficulty: {result['difficulty']}")
print(f" Topic: {result['topic']}")
print(f" Score: {result['score']:.4f}")
# Create agent with RAG capabilities
agent = Agent(
agent_name="Advanced-RAG-Agent",
agent_description="Advanced agent with metadata-enhanced RAG capabilities",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
)
# Query with enhanced context
response = agent.run("Explain the relationship between machine learning and artificial intelligence")
print(response)
```
## Production Setup
### Setting up Qdrant Cloud
1. Sign up at [cloud.qdrant.io](https://cloud.qdrant.io)
2. Create a cluster
3. Get your cluster URL and API key
4. Set environment variables:
```bash
export QDRANT_URL="https://your-cluster.eu-central.aws.cloud.qdrant.io"
export QDRANT_API_KEY="your-api-key-here"
```
### Running Local Qdrant Server
```bash
# Docker
docker run -p 6333:6333 qdrant/qdrant
# Docker Compose
version: '3.7'
services:
qdrant:
image: qdrant/qdrant
ports:
- "6333:6333"
volumes:
- ./qdrant_storage:/qdrant/storage
```
### Production Configuration Example
```python
from qdrant_client import QdrantClient, models
from swarms_memory import QdrantDB
import os
import logging
# Setup logging for production monitoring
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
# Connect to Qdrant server with proper error handling
client = QdrantClient(
host=os.getenv("QDRANT_HOST", "localhost"),
port=int(os.getenv("QDRANT_PORT", "6333")),
api_key=os.getenv("QDRANT_API_KEY"), # Use environment variable
timeout=30 # 30 second timeout
)
# Production RAG configuration with enhanced settings
rag_db = QdrantDB(
client=client,
embedding_model="text-embedding-3-large", # Higher quality embeddings
collection_name="production_knowledge",
distance=models.Distance.COSINE,
n_results=10,
api_key=os.getenv("OPENAI_API_KEY") # Secure API key handling
)
logger.info("Successfully initialized production RAG database")
except Exception as e:
logger.error(f"Failed to initialize RAG database: {e}")
raise
```
## Configuration Options
### Distance Metrics
| Metric | Description | Best For |
|--------|-------------|----------|
| **COSINE** | Cosine similarity (default) | Normalized embeddings, text similarity |
| **EUCLIDEAN** | Euclidean distance | Absolute distance measurements |
| **DOT** | Dot product | Maximum inner product search |
### Embedding Model Options
#### LiteLLM Provider Models (Recommended)
| Model | Provider | Dimensions | Description |
|-------|----------|------------|-------------|
| `text-embedding-3-small` | OpenAI | 1536 | Efficient, cost-effective |
| `text-embedding-3-large` | OpenAI | 3072 | Best quality |
| `azure/your-deployment` | Azure | Variable | Azure OpenAI embeddings |
| `cohere/embed-english-v3.0` | Cohere | 1024 | Advanced language understanding |
| `voyage/voyage-3-large` | Voyage AI | 1024 | High-quality embeddings |
#### SentenceTransformer Models
| Model | Dimensions | Description |
|-------|------------|-------------|
| `all-MiniLM-L6-v2` | 384 | Fast, general-purpose |
| `all-mpnet-base-v2` | 768 | Higher quality |
| `all-roberta-large-v1` | 1024 | Best quality |
#### Usage Example
```python
# OpenAI embeddings (default example)
rag_db = QdrantDB(
client=client,
embedding_model="text-embedding-3-small",
collection_name="openai_collection"
)
```
> **Note**: QdrantDB supports all LiteLLM provider models (Azure, Cohere, Voyage AI, etc.), SentenceTransformer models, and custom embedding functions. See the embedding model options table above for the complete list.
## Use Cases
### Document Q&A System
Create an intelligent document question-answering system:
```python
# Load company documents into Qdrant
company_documents = [
"Company policy on remote work allows flexible scheduling with core hours 10 AM - 3 PM.",
"API documentation: Use POST /api/v1/users to create new user accounts.",
"Product specifications: Our software supports Windows, Mac, and Linux platforms."
]
for doc in company_documents:
rag_db.add(doc)
# Agent can now answer questions using the documents
agent = Agent(
agent_name="Company-DocQA-Agent",
agent_description="Intelligent document Q&A system for company information",
model_name="gpt-4o",
long_term_memory=rag_db
)
answer = agent.run("What is the company policy on remote work?")
print(answer)
```
### Knowledge Base Management
Build a comprehensive knowledge management system:
```python
class KnowledgeBaseAgent:
def __init__(self):
self.client = QdrantClient(":memory:")
self.rag_db = QdrantDB(
client=self.client,
embedding_model="text-embedding-3-small",
collection_name="knowledge_base",
n_results=5
)
self.agent = Agent(
agent_name="KB-Management-Agent",
agent_description="Knowledge base management and retrieval system",
model_name="gpt-4o",
long_term_memory=self.rag_db
)
def add_knowledge(self, text: str, metadata: dict = None):
"""Add new knowledge to the base"""
if metadata:
return self.rag_db.batch_add([text], metadata=[metadata])
return self.rag_db.add(text)
def query(self, question: str):
"""Query the knowledge base"""
return self.agent.run(question)
def bulk_import(self, documents: list, metadata_list: list = None):
"""Import multiple documents efficiently"""
return self.rag_db.batch_add(documents, metadata=metadata_list, batch_size=50)
# Usage
kb = KnowledgeBaseAgent()
kb.add_knowledge("Python is a high-level programming language.", {"category": "programming"})
kb.add_knowledge("Qdrant is optimized for vector similarity search.", {"category": "databases"})
result = kb.query("What programming languages are mentioned?")
print(result)
```
## Best Practices
### Document Processing Strategy
| Practice | Recommendation | Details |
|----------|----------------|---------|
| **Chunking** | 200-500 tokens | Split large documents into optimal chunks for retrieval |
| **Overlap** | 20-50 tokens | Maintain context between consecutive chunks |
| **Preprocessing** | Clean & normalize | Remove noise and standardize text format |
### Collection Organization
| Practice | Recommendation | Details |
|----------|----------------|---------|
| **Separation** | Type-based collections | Use separate collections for docs, policies, code, etc. |
| **Naming** | Consistent conventions | Follow clear, descriptive naming patterns |
| **Lifecycle** | Update strategies | Plan for document versioning and updates |
### Embedding Model Selection
| Environment | Recommended Model | Use Case |
|-------------|-------------------|----------|
| **Development** | `all-MiniLM-L6-v2` | Fast iteration and testing |
| **Production** | `text-embedding-3-small/large` | High-quality production deployment |
| **Specialized** | Domain-specific models | Industry or domain-focused applications |
### Performance Optimization
| Setting | Recommendation | Rationale |
|---------|----------------|-----------|
| **Retrieval Count** | Start with 3-5 results | Balance relevance with performance |
| **Batch Operations** | Use `batch_add()` | Efficient bulk document processing |
| **Metadata** | Strategic storage | Enable filtering and enhanced context |
### Production Deployment
| Component | Best Practice | Implementation |
|-----------|---------------|----------------|
| **Storage** | Persistent server | Use Qdrant Cloud or self-hosted server |
| **Error Handling** | Robust mechanisms | Implement retry logic and graceful failures |
| **Monitoring** | Performance tracking | Monitor metrics and embedding quality |
## Performance Tips
- **Development**: Use in-memory mode for rapid prototyping and testing
- **Production**: Deploy dedicated Qdrant server with appropriate resource allocation
- **Scalability**: Use batch operations for adding multiple documents efficiently
- **Memory Management**: Monitor memory usage with large document collections
- **API Usage**: Consider rate limits when using cloud-based embedding services
- **Caching**: Implement caching strategies for frequently accessed documents
## Customization
You can modify the system configuration to create specialized RAG agents for different use cases:
| Use Case | Configuration | Description |
|----------|---------------|-------------|
| **Technical Documentation** | High n_results (10-15), precise embeddings | Comprehensive technical Q&A |
| **Customer Support** | Fast embeddings, metadata filtering | Quick response with categorization |
| **Research Assistant** | Large embedding model, broad retrieval | Deep analysis and synthesis |
| **Code Documentation** | Code-specific embeddings, semantic chunking | Programming-focused assistance |
## Related Resources
- [Qdrant Documentation](https://qdrant.tech/documentation/)
- [Swarms Memory GitHub Repository](https://github.com/The-Swarm-Corporation/swarms-memory)
- [Agent Documentation](../agents/new_agent.md)
- [OpenAI Embeddings Guide](https://platform.openai.com/docs/guides/embeddings)
- [Vector Database Concepts](https://qdrant.tech/documentation/concepts/)

@ -1,991 +0,0 @@
# Qdrant RAG Integration with Swarms
## Overview
Qdrant is a high-performance, open-source vector database designed specifically for similarity search and AI applications. It offers both cloud and self-hosted deployment options, providing enterprise-grade features including advanced filtering, payload support, collection aliasing, and clustering capabilities. Qdrant is built with Rust for optimal performance and memory efficiency, making it ideal for production RAG systems that require both speed and scalability.
## Key Features
- **High Performance**: Rust-based implementation with optimized HNSW algorithm
- **Rich Filtering**: Advanced payload filtering with complex query conditions
- **Collection Management**: Multiple collections with independent configurations
- **Payload Support**: Rich metadata storage and querying capabilities
- **Clustering**: Horizontal scaling with automatic sharding
- **Snapshots**: Point-in-time backups and recovery
- **Hybrid Cloud**: Flexible deployment options (cloud, on-premise, hybrid)
- **Real-time Updates**: Live index updates without service interruption
## Architecture
Qdrant integrates with Swarms agents as a flexible, high-performance vector database:
```
[Agent] -> [Qdrant Memory] -> [Vector Collections] -> [Similarity + Payload Search] -> [Retrieved Context]
```
The system leverages Qdrant's advanced filtering capabilities to combine vector similarity with rich metadata queries for precise context retrieval.
## Setup & Configuration
### Installation
```bash
pip install qdrant-client
pip install swarms
pip install litellm
```
### Environment Variables
```bash
# Qdrant Cloud configuration
export QDRANT_URL="https://your-cluster.qdrant.tech"
export QDRANT_API_KEY="your-api-key"
# Or local Qdrant configuration
export QDRANT_HOST="localhost"
export QDRANT_PORT="6333"
# OpenAI API key for LLM
export OPENAI_API_KEY="your-openai-api-key"
```
### Dependencies
- `qdrant-client>=1.7.0`
- `swarms`
- `litellm`
- `numpy`
## Code Example
```python
"""
Qdrant RAG Integration with Swarms Agent
This example demonstrates how to integrate Qdrant as a high-performance vector database
for RAG operations with Swarms agents using LiteLLM embeddings.
"""
import os
from typing import List, Dict, Any, Optional
import numpy as np
from qdrant_client import QdrantClient, models
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from swarms import Agent
from litellm import embedding
import uuid
from datetime import datetime
class QdrantMemory:
"""Qdrant-based memory system for RAG operations"""
def __init__(self,
collection_name: str = "swarms_knowledge_base",
embedding_model: str = "text-embedding-3-small",
dimension: int = 1536,
distance_metric: Distance = Distance.COSINE,
hnsw_config: Optional[Dict] = None,
optimizers_config: Optional[Dict] = None):
"""
Initialize Qdrant memory system
Args:
collection_name: Name of the Qdrant collection
embedding_model: LiteLLM embedding model name
dimension: Vector dimension (1536 for text-embedding-3-small)
distance_metric: Distance metric (COSINE, EUCLID, DOT)
hnsw_config: HNSW algorithm configuration
optimizers_config: Optimizer configuration for performance tuning
"""
self.collection_name = collection_name
self.embedding_model = embedding_model
self.dimension = dimension
self.distance_metric = distance_metric
# Default HNSW configuration optimized for typical use cases
self.hnsw_config = hnsw_config or {
"m": 16, # Number of bi-directional links
"ef_construct": 200, # Size of dynamic candidate list
"full_scan_threshold": 10000, # When to use full scan vs HNSW
"max_indexing_threads": 0, # Auto-detect threads
"on_disk": False # Keep index in memory for speed
}
# Default optimizer configuration
self.optimizers_config = optimizers_config or {
"deleted_threshold": 0.2, # Trigger optimization when 20% deleted
"vacuum_min_vector_number": 1000, # Minimum vectors before vacuum
"default_segment_number": 0, # Auto-determine segments
"max_segment_size": None, # No segment size limit
"memmap_threshold": None, # Auto-determine memory mapping
"indexing_threshold": 20000, # Start indexing after 20k vectors
"flush_interval_sec": 5, # Flush interval in seconds
"max_optimization_threads": 1 # Single optimization thread
}
# Initialize Qdrant client
self.client = self._create_client()
# Create collection if it doesn't exist
self._create_collection()
def _create_client(self) -> QdrantClient:
"""Create Qdrant client based on configuration"""
# Try cloud configuration first
url = os.getenv("QDRANT_URL")
api_key = os.getenv("QDRANT_API_KEY")
if url and api_key:
print(f"Connecting to Qdrant Cloud: {url}")
return QdrantClient(url=url, api_key=api_key)
# Fall back to local configuration
host = os.getenv("QDRANT_HOST", "localhost")
port = int(os.getenv("QDRANT_PORT", "6333"))
print(f"Connecting to local Qdrant: {host}:{port}")
return QdrantClient(host=host, port=port)
def _create_collection(self):
"""Create collection with optimized configuration"""
# Check if collection already exists
collections = self.client.get_collections().collections
if any(c.name == self.collection_name for c in collections):
print(f"Collection '{self.collection_name}' already exists")
return
# Create collection with vector configuration
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.dimension,
distance=self.distance_metric,
hnsw_config=models.HnswConfigDiff(**self.hnsw_config)
),
optimizers_config=models.OptimizersConfigDiff(**self.optimizers_config)
)
print(f"Created collection '{self.collection_name}' with {self.distance_metric} distance")
def _get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using LiteLLM"""
response = embedding(
model=self.embedding_model,
input=texts
)
return [item["embedding"] for item in response["data"]]
def _generate_point_id(self) -> str:
"""Generate unique point ID"""
return str(uuid.uuid4())
def add_documents(self,
documents: List[str],
metadata: List[Dict] = None,
ids: List[str] = None,
batch_size: int = 100) -> List[str]:
"""Add multiple documents to Qdrant"""
if metadata is None:
metadata = [{}] * len(documents)
if ids is None:
ids = [self._generate_point_id() for _ in documents]
# Generate embeddings
embeddings = self._get_embeddings(documents)
# Prepare points for upsert
points = []
for point_id, embedding_vec, doc, meta in zip(ids, embeddings, documents, metadata):
# Create rich payload with document text and metadata
payload = {
"text": doc,
"timestamp": datetime.now().isoformat(),
**meta # Include all metadata fields
}
points.append(PointStruct(
id=point_id,
vector=embedding_vec,
payload=payload
))
# Batch upsert points
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
self.client.upsert(
collection_name=self.collection_name,
points=batch
)
print(f"Added {len(documents)} documents to Qdrant collection")
return ids
def add_document(self,
document: str,
metadata: Dict = None,
point_id: str = None) -> str:
"""Add a single document to Qdrant"""
result = self.add_documents(
documents=[document],
metadata=[metadata or {}],
ids=[point_id] if point_id else None
)
return result[0] if result else None
def search(self,
query: str,
limit: int = 3,
score_threshold: float = None,
payload_filter: Filter = None,
with_payload: bool = True,
with_vectors: bool = False) -> Dict[str, Any]:
"""Search for similar documents in Qdrant with advanced filtering"""
# Generate query embedding
query_embedding = self._get_embeddings([query])[0]
# Perform search with optional filtering
search_result = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=limit,
score_threshold=score_threshold,
query_filter=payload_filter,
with_payload=with_payload,
with_vectors=with_vectors
)
# Format results
formatted_results = {
"documents": [],
"metadata": [],
"scores": [],
"ids": []
}
for point in search_result:
formatted_results["ids"].append(str(point.id))
formatted_results["scores"].append(float(point.score))
if with_payload and point.payload:
# Extract text and separate metadata
text = point.payload.get("text", "")
# Remove internal fields from metadata
metadata = {k: v for k, v in point.payload.items()
if k not in ["text", "timestamp"]}
formatted_results["documents"].append(text)
formatted_results["metadata"].append(metadata)
else:
formatted_results["documents"].append("")
formatted_results["metadata"].append({})
return formatted_results
def search_with_payload_filter(self,
query: str,
filter_conditions: List[FieldCondition],
limit: int = 3) -> Dict[str, Any]:
"""Search with complex payload filtering"""
payload_filter = Filter(
must=filter_conditions
)
return self.search(
query=query,
limit=limit,
payload_filter=payload_filter
)
def delete_documents(self,
point_ids: List[str] = None,
payload_filter: Filter = None) -> bool:
"""Delete documents from Qdrant"""
if point_ids:
result = self.client.delete(
collection_name=self.collection_name,
points_selector=models.PointIdsList(points=point_ids)
)
elif payload_filter:
result = self.client.delete(
collection_name=self.collection_name,
points_selector=models.FilterSelector(filter=payload_filter)
)
else:
raise ValueError("Must specify either point_ids or payload_filter")
return result.operation_id is not None
def update_payload(self,
point_ids: List[str],
payload: Dict,
overwrite: bool = False) -> bool:
"""Update payload for existing points"""
operation = self.client.overwrite_payload if overwrite else self.client.set_payload
result = operation(
collection_name=self.collection_name,
payload=payload,
points=point_ids
)
return result.operation_id is not None
def get_collection_info(self) -> Dict[str, Any]:
"""Get detailed collection information"""
info = self.client.get_collection(self.collection_name)
return {
"vectors_count": info.vectors_count,
"indexed_vectors_count": info.indexed_vectors_count,
"points_count": info.points_count,
"segments_count": info.segments_count,
"config": {
"distance": info.config.params.vectors.distance,
"dimension": info.config.params.vectors.size,
"hnsw_config": info.config.params.vectors.hnsw_config.__dict__ if info.config.params.vectors.hnsw_config else None
}
}
def create_payload_index(self, field_name: str, field_schema: str = "keyword"):
"""Create index on payload field for faster filtering"""
self.client.create_payload_index(
collection_name=self.collection_name,
field_name=field_name,
field_schema=field_schema
)
print(f"Created payload index on field: {field_name}")
def create_snapshot(self) -> str:
"""Create a snapshot of the collection"""
result = self.client.create_snapshot(collection_name=self.collection_name)
print(f"Created snapshot: {result.name}")
return result.name
# Initialize Qdrant memory
memory = QdrantMemory(
collection_name="swarms_rag_demo",
embedding_model="text-embedding-3-small",
dimension=1536,
distance_metric=Distance.COSINE
)
# Sample documents for the knowledge base
documents = [
"Qdrant is a high-performance, open-source vector database built with Rust for AI applications.",
"RAG systems combine vector similarity search with rich payload filtering for precise context retrieval.",
"Vector embeddings represent semantic meaning of text for similarity-based search operations.",
"The Swarms framework integrates seamlessly with Qdrant's advanced filtering capabilities.",
"LiteLLM provides unified access to embedding models with consistent API interfaces.",
"HNSW algorithm in Qdrant provides excellent performance for approximate nearest neighbor search.",
"Payload filtering enables complex queries combining vector similarity with metadata conditions.",
"Qdrant supports both cloud and self-hosted deployment for flexible architecture options.",
]
# Rich metadata for advanced filtering demonstrations
metadatas = [
{
"category": "database",
"topic": "qdrant",
"difficulty": "intermediate",
"type": "overview",
"language": "rust",
"performance_tier": "high"
},
{
"category": "ai",
"topic": "rag",
"difficulty": "intermediate",
"type": "concept",
"language": "python",
"performance_tier": "medium"
},
{
"category": "ml",
"topic": "embeddings",
"difficulty": "beginner",
"type": "concept",
"language": "agnostic",
"performance_tier": "medium"
},
{
"category": "framework",
"topic": "swarms",
"difficulty": "beginner",
"type": "integration",
"language": "python",
"performance_tier": "high"
},
{
"category": "library",
"topic": "litellm",
"difficulty": "beginner",
"type": "tool",
"language": "python",
"performance_tier": "medium"
},
{
"category": "algorithm",
"topic": "hnsw",
"difficulty": "advanced",
"type": "technical",
"language": "rust",
"performance_tier": "high"
},
{
"category": "feature",
"topic": "filtering",
"difficulty": "advanced",
"type": "capability",
"language": "rust",
"performance_tier": "high"
},
{
"category": "deployment",
"topic": "architecture",
"difficulty": "intermediate",
"type": "infrastructure",
"language": "agnostic",
"performance_tier": "high"
}
]
# Add documents to Qdrant
print("Adding documents to Qdrant...")
doc_ids = memory.add_documents(documents, metadatas)
print(f"Successfully added {len(doc_ids)} documents")
# Create payload indices for better filtering performance
memory.create_payload_index("category")
memory.create_payload_index("difficulty")
memory.create_payload_index("performance_tier")
# Display collection information
info = memory.get_collection_info()
print(f"Collection info: {info['points_count']} points, {info['vectors_count']} vectors")
# Create Swarms agent with Qdrant RAG
agent = Agent(
agent_name="Qdrant-RAG-Agent",
agent_description="Advanced agent with Qdrant-powered RAG featuring rich payload filtering",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
)
def query_with_qdrant_rag(query_text: str,
limit: int = 3,
filter_conditions: List[FieldCondition] = None,
score_threshold: float = None):
"""Query with RAG using Qdrant's advanced filtering capabilities"""
print(f"\nQuerying: {query_text}")
if filter_conditions:
print(f"Applied filters: {len(filter_conditions)} conditions")
# Retrieve relevant documents using Qdrant
if filter_conditions:
results = memory.search_with_payload_filter(
query=query_text,
filter_conditions=filter_conditions,
limit=limit
)
else:
results = memory.search(
query=query_text,
limit=limit,
score_threshold=score_threshold
)
if not results["documents"]:
print("No relevant documents found")
return agent.run(query_text)
# Prepare context from retrieved documents
context = "\n".join([
f"Document {i+1}: {doc}"
for i, doc in enumerate(results["documents"])
])
# Display retrieved documents with rich metadata
print("Retrieved documents:")
for i, (doc, score, meta) in enumerate(zip(
results["documents"], results["scores"], results["metadata"]
)):
print(f" {i+1}. (Score: {score:.4f})")
print(f" Category: {meta.get('category', 'N/A')}, Difficulty: {meta.get('difficulty', 'N/A')}")
print(f" Language: {meta.get('language', 'N/A')}, Performance: {meta.get('performance_tier', 'N/A')}")
print(f" {doc[:100]}...")
# Enhanced prompt with context
enhanced_prompt = f"""
Based on the following retrieved context from our knowledge base, please answer the question:
Context:
{context}
Question: {query_text}
Please provide a comprehensive answer based primarily on the context provided.
"""
# Run agent with enhanced prompt
response = agent.run(enhanced_prompt)
return response
# Example usage and testing
if __name__ == "__main__":
# Test basic queries
queries = [
"What is Qdrant and what are its key features?",
"How do RAG systems work with vector databases?",
"What are the benefits of HNSW algorithm?",
"How does payload filtering enhance search capabilities?",
]
print("=== Basic RAG Queries ===")
for query in queries:
response = query_with_qdrant_rag(query, limit=3)
print(f"Answer: {response}\n")
print("-" * 80)
# Test advanced payload filtering
print("\n=== Advanced Payload Filtering ===")
# Query only high-performance, advanced topics
high_perf_filter = [
FieldCondition(key="performance_tier", match=MatchValue(value="high")),
FieldCondition(key="difficulty", match=MatchValue(value="advanced"))
]
response = query_with_qdrant_rag(
"What are high-performance advanced features?",
limit=3,
filter_conditions=high_perf_filter
)
print(f"High-performance features: {response}\n")
# Query beginner-friendly Python content
python_beginner_filter = [
FieldCondition(key="language", match=MatchValue(value="python")),
FieldCondition(key="difficulty", match=MatchValue(value="beginner"))
]
response = query_with_qdrant_rag(
"What Python tools should beginners know about?",
limit=2,
filter_conditions=python_beginner_filter
)
print(f"Python beginner tools: {response}\n")
# Query database and algorithm concepts
db_algo_filter = [
FieldCondition(
key="category",
match=MatchValue(any_of=["database", "algorithm"])
)
]
response = query_with_qdrant_rag(
"Explain database and algorithm concepts",
limit=3,
filter_conditions=db_algo_filter
)
print(f"Database and algorithm concepts: {response}\n")
# Demonstrate score threshold filtering
print("=== Score Threshold Filtering ===")
response = query_with_qdrant_rag(
"What is machine learning?", # Query not closely related to our documents
limit=5,
score_threshold=0.7 # High threshold to filter low-relevance results
)
print(f"High-relevance only: {response}\n")
# Demonstrate payload updates
print("=== Payload Updates ===")
if doc_ids:
# Update payload for first document
memory.update_payload(
point_ids=[doc_ids[0]],
payload={"updated": True, "version": "2.0", "priority": "high"}
)
print("Updated document payload")
# Add new document with advanced metadata
new_doc = "Qdrant clustering enables horizontal scaling with automatic data distribution across nodes."
new_metadata = {
"category": "scaling",
"topic": "clustering",
"difficulty": "expert",
"type": "feature",
"language": "rust",
"performance_tier": "ultra_high",
"version": "1.0"
}
new_id = memory.add_document(new_doc, new_metadata)
# Query about clustering with expert-level filter
expert_filter = [
FieldCondition(key="difficulty", match=MatchValue(value="expert")),
FieldCondition(key="category", match=MatchValue(value="scaling"))
]
response = query_with_qdrant_rag(
"How does clustering work for scaling?",
filter_conditions=expert_filter
)
print(f"Expert clustering info: {response}\n")
# Demonstrate complex filtering with multiple conditions
print("=== Complex Multi-Condition Filtering ===")
complex_filter = [
FieldCondition(key="performance_tier", match=MatchValue(any_of=["high", "ultra_high"])),
FieldCondition(key="type", match=MatchValue(any_of=["feature", "capability", "technical"])),
FieldCondition(key="difficulty", match=MatchValue(except_of=["beginner"]))
]
response = query_with_qdrant_rag(
"What are the most advanced technical capabilities?",
limit=4,
filter_conditions=complex_filter
)
print(f"Advanced capabilities: {response}\n")
# Create snapshot for backup
print("=== Collection Management ===")
snapshot_name = memory.create_snapshot()
# Display final collection statistics
final_info = memory.get_collection_info()
print(f"Final collection info:")
print(f" Points: {final_info['points_count']}")
print(f" Vectors: {final_info['vectors_count']}")
print(f" Indexed vectors: {final_info['indexed_vectors_count']}")
print(f" Segments: {final_info['segments_count']}")
# Example of cleanup (use with caution)
# Delete test documents
# test_filter = Filter(
# must=[FieldCondition(key="category", match=MatchValue(value="test"))]
# )
# memory.delete_documents(payload_filter=test_filter)
```
## Use Cases
### 1. Advanced RAG Systems
- **Scenario**: Complex document retrieval requiring metadata filtering
- **Benefits**: Rich payload queries, high performance, flexible filtering
- **Best For**: Enterprise search, legal document analysis, technical documentation
### 2. Multi-Modal AI Applications
- **Scenario**: Applications combining text, images, and other data types
- **Benefits**: Flexible payload structure, multiple vector configurations
- **Best For**: Content management, media analysis, cross-modal search
### 3. Real-time Analytics Platforms
- **Scenario**: Applications requiring fast vector search with real-time updates
- **Benefits**: Live index updates, high throughput, clustering support
- **Best For**: Recommendation engines, fraud detection, real-time personalization
### 4. Hybrid Cloud Deployments
- **Scenario**: Organizations requiring flexible deployment options
- **Benefits**: Cloud and on-premise options, data sovereignty, custom configurations
- **Best For**: Government, healthcare, financial services with strict compliance needs
## Performance Characteristics
### Scaling and Performance
- **Horizontal Scaling**: Clustering support with automatic sharding
- **Vertical Scaling**: Optimized memory usage and CPU utilization
- **Query Performance**: Sub-millisecond search with HNSW indexing
- **Update Performance**: Real-time updates without index rebuilding
- **Storage Efficiency**: Configurable on-disk vs in-memory storage
### HNSW Configuration Impact
| Configuration | Use Case | Memory | Speed | Accuracy |
|---------------|----------|---------|-------|----------|
| **m=16, ef=200** | Balanced | Medium | Fast | High |
| **m=32, ef=400** | High accuracy | High | Medium | Very High |
| **m=8, ef=100** | Memory optimized | Low | Very Fast | Medium |
| **Custom** | Specific workload | Variable | Variable | Variable |
### Performance Optimization
```python
# High-performance configuration
memory = QdrantMemory(
hnsw_config={
"m": 32, # More connections for accuracy
"ef_construct": 400, # Higher construction quality
"full_scan_threshold": 5000, # Lower threshold for small datasets
"max_indexing_threads": 4, # Utilize multiple cores
"on_disk": False # Keep in memory for speed
},
optimizers_config={
"indexing_threshold": 10000, # Start indexing sooner
"max_optimization_threads": 2 # More optimization threads
}
)
```
## Cloud vs Self-Hosted Deployment
### Qdrant Cloud
```python
# Cloud deployment
memory = QdrantMemory(
collection_name="production-kb",
# Configured via environment variables:
# QDRANT_URL and QDRANT_API_KEY
)
```
**Advantages:**
- Managed infrastructure with automatic scaling
- Built-in monitoring and alerting
- Global edge locations for low latency
- Automatic backups and disaster recovery
- Enterprise security and compliance
### Self-Hosted Qdrant
```python
# Self-hosted deployment
memory = QdrantMemory(
collection_name="private-kb",
# Configured via environment variables:
# QDRANT_HOST and QDRANT_PORT
)
```
**Advantages:**
- Full control over infrastructure and data
- Custom hardware optimization
- No data transfer costs
- Compliance with data residency requirements
- Custom security configurations
## Advanced Features
### Collection Aliases
```python
# Create collection alias for zero-downtime updates
client.create_alias(
create_alias=models.CreateAlias(
collection_name="swarms_kb_v2",
alias_name="production_kb"
)
)
# Switch traffic to new version
client.update_aliases(
change_aliases_operations=[
models.CreateAlias(
collection_name="swarms_kb_v3",
alias_name="production_kb"
)
]
)
```
### Advanced Filtering Examples
```python
# Complex boolean logic
complex_filter = Filter(
must=[
FieldCondition(key="category", match=MatchValue(value="ai")),
Filter(
should=[
FieldCondition(key="difficulty", match=MatchValue(value="advanced")),
FieldCondition(key="performance_tier", match=MatchValue(value="high"))
]
)
],
must_not=[
FieldCondition(key="deprecated", match=MatchValue(value=True))
]
)
# Range queries
date_filter = Filter(
must=[
FieldCondition(
key="created_date",
range=models.Range(
gte="2024-01-01",
lt="2024-12-31"
)
)
]
)
# Geo-queries (if using geo payloads)
geo_filter = Filter(
must=[
FieldCondition(
key="location",
geo_radius=models.GeoRadius(
center=models.GeoPoint(lat=40.7128, lon=-74.0060),
radius=1000.0 # meters
)
)
]
)
```
### Batch Operations
```python
# Efficient batch processing for large datasets
def batch_process_documents(memory, documents, batch_size=1000):
"""Process large document collections efficiently"""
total_processed = 0
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
doc_texts = [doc['text'] for doc in batch]
metadata = [doc['metadata'] for doc in batch]
doc_ids = memory.add_documents(
documents=doc_texts,
metadata=metadata,
batch_size=batch_size
)
total_processed += len(doc_ids)
print(f"Processed {total_processed}/{len(documents)} documents")
return total_processed
```
### Clustering Configuration
```python
# Configure Qdrant cluster (self-hosted)
cluster_config = {
"nodes": [
{"host": "node1.example.com", "port": 6333},
{"host": "node2.example.com", "port": 6333},
{"host": "node3.example.com", "port": 6333}
],
"replication_factor": 2,
"write_consistency_factor": 1
}
# Create distributed collection
client.create_collection(
collection_name="distributed_kb",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
shard_number=6, # Distribute across cluster
replication_factor=2 # Redundancy
)
```
## Monitoring and Maintenance
### Health Monitoring
```python
def monitor_collection_health(memory):
"""Monitor collection health and performance"""
info = memory.get_collection_info()
# Check indexing progress
index_ratio = info['indexed_vectors_count'] / info['vectors_count']
if index_ratio < 0.9:
print(f"Warning: Only {index_ratio:.1%} vectors are indexed")
# Check segment efficiency
avg_points_per_segment = info['points_count'] / info['segments_count']
if avg_points_per_segment < 1000:
print(f"Info: Low points per segment ({avg_points_per_segment:.0f})")
return {
'health_score': index_ratio,
'points_per_segment': avg_points_per_segment,
'status': 'healthy' if index_ratio > 0.9 else 'degraded'
}
```
### Performance Tuning
```python
# Optimize collection for specific workload
def optimize_collection(client, collection_name, workload_type='balanced'):
"""Optimize collection configuration for workload"""
configs = {
'read_heavy': {
'indexing_threshold': 1000,
'max_indexing_threads': 8,
'flush_interval_sec': 10
},
'write_heavy': {
'indexing_threshold': 50000,
'max_indexing_threads': 2,
'flush_interval_sec': 1
},
'balanced': {
'indexing_threshold': 20000,
'max_indexing_threads': 4,
'flush_interval_sec': 5
}
}
config = configs.get(workload_type, configs['balanced'])
client.update_collection(
collection_name=collection_name,
optimizers_config=models.OptimizersConfigDiff(**config)
)
```
## Best Practices
1. **Collection Design**: Plan collection schema and payload structure upfront
2. **Index Strategy**: Create payload indices on frequently filtered fields
3. **Batch Operations**: Use batch operations for better throughput
4. **Memory Management**: Configure HNSW parameters based on available resources
5. **Filtering Optimization**: Use indexed fields for complex filter conditions
6. **Snapshot Strategy**: Regular snapshots for data backup and recovery
7. **Monitoring**: Implement health monitoring and performance tracking
8. **Cluster Planning**: Design cluster topology for high availability
9. **Version Management**: Use collection aliases for zero-downtime updates
10. **Security**: Implement proper authentication and network security
## Troubleshooting
### Common Issues
1. **Slow Query Performance**
- Check if payload indices exist for filter fields
- Verify HNSW configuration is appropriate for dataset size
- Monitor segment count and optimization status
2. **High Memory Usage**
- Enable on-disk storage for vectors if needed
- Reduce HNSW 'm' parameter for memory efficiency
- Monitor segment sizes and trigger optimization
3. **Indexing Delays**
- Adjust indexing threshold based on write patterns
- Increase max_indexing_threads if CPU allows
- Monitor indexing progress and segment health
4. **Connection Issues**
- Verify network connectivity and firewall settings
- Check API key permissions and rate limits
- Implement connection pooling and retry logic
### Debugging Tools
```python
# Debug collection status
def debug_collection(client, collection_name):
"""Debug collection issues"""
info = client.get_collection(collection_name)
print(f"Collection: {collection_name}")
print(f"Status: {info.status}")
print(f"Vectors: {info.vectors_count} total, {info.indexed_vectors_count} indexed")
print(f"Segments: {info.segments_count}")
# Check for common issues
if info.vectors_count > 0 and info.indexed_vectors_count == 0:
print("Issue: No vectors are indexed. Check indexing configuration.")
if info.segments_count > info.vectors_count / 1000:
print("Issue: Too many segments. Consider running optimization.")
```
This comprehensive guide provides everything needed to integrate Qdrant with Swarms agents for advanced RAG applications with rich payload filtering using the unified LiteLLM embeddings approach.

@ -64,552 +64,54 @@ export OPENAI_API_KEY="your-openai-api-key"
```python
"""
SingleStore RAG Integration with Swarms Agent
Agent with SingleStore RAG (Retrieval-Augmented Generation)
This example demonstrates how to integrate SingleStore as a unified SQL + vector database
for RAG operations combining structured data with vector similarity search.
This example demonstrates using SingleStore as a vector database for RAG operations,
allowing agents to store and retrieve documents for enhanced context.
"""
import os
import singlestoredb as s2
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from swarms import Agent
from litellm import embedding
import json
from datetime import datetime
import uuid
class SingleStoreRAGMemory:
"""SingleStore-based memory system combining SQL and vector operations"""
def __init__(self,
table_name: str = "swarms_documents",
embedding_model: str = "text-embedding-3-small",
vector_dimension: int = 1536,
create_indexes: bool = True):
"""
Initialize SingleStore RAG memory system
Args:
table_name: Name of the documents table
embedding_model: LiteLLM embedding model name
vector_dimension: Dimension of vector embeddings
create_indexes: Whether to create optimized indexes
"""
self.table_name = table_name
self.embedding_model = embedding_model
self.vector_dimension = vector_dimension
self.create_indexes = create_indexes
# Initialize connection
self.connection = self._create_connection()
# Create table schema
self._create_table()
# Create indexes for performance
if create_indexes:
self._create_indexes()
def _create_connection(self):
"""Create SingleStore connection"""
connection_params = {
"host": os.getenv("SINGLESTORE_HOST"),
"port": int(os.getenv("SINGLESTORE_PORT", "3306")),
"user": os.getenv("SINGLESTORE_USER"),
"password": os.getenv("SINGLESTORE_PASSWORD"),
"database": os.getenv("SINGLESTORE_DATABASE"),
}
# Optional SSL configuration
if os.getenv("SINGLESTORE_SSL_DISABLED", "false").lower() != "true":
connection_params["ssl_disabled"] = False
# Remove None values
connection_params = {k: v for k, v in connection_params.items() if v is not None}
try:
conn = s2.connect(**connection_params)
print(f"Connected to SingleStore: {connection_params['host']}")
return conn
except Exception as e:
raise ConnectionError(f"Failed to connect to SingleStore: {e}")
def _create_table(self):
"""Create documents table with vector and metadata columns"""
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id VARCHAR(255) PRIMARY KEY,
title VARCHAR(1000),
content TEXT,
embedding BLOB,
category VARCHAR(100),
author VARCHAR(255),
tags JSON,
metadata JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
token_count INT,
content_hash VARCHAR(64),
-- Full-text search index
FULLTEXT(title, content)
)
"""
with self.connection.cursor() as cursor:
cursor.execute(create_table_sql)
print(f"Created/verified table: {self.table_name}")
def _create_indexes(self):
"""Create optimized indexes for performance"""
indexes = [
f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_category ON {self.table_name}(category)",
f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_author ON {self.table_name}(author)",
f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_created_at ON {self.table_name}(created_at)",
f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_token_count ON {self.table_name}(token_count)",
]
with self.connection.cursor() as cursor:
for index_sql in indexes:
try:
cursor.execute(index_sql)
except Exception as e:
print(f"Index creation note: {e}")
print(f"Created indexes for table: {self.table_name}")
def _get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using LiteLLM"""
response = embedding(
model=self.embedding_model,
input=texts
)
return [item["embedding"] for item in response["data"]]
def _serialize_embedding(self, embedding_vector: List[float]) -> bytes:
"""Serialize embedding vector for storage"""
return np.array(embedding_vector, dtype=np.float32).tobytes()
def _deserialize_embedding(self, embedding_bytes: bytes) -> List[float]:
"""Deserialize embedding vector from storage"""
return np.frombuffer(embedding_bytes, dtype=np.float32).tolist()
def add_documents(self,
documents: List[Dict[str, Any]],
batch_size: int = 100) -> List[str]:
"""Add documents with rich metadata to SingleStore"""
# Generate embeddings for all documents
texts = [doc.get("content", "") for doc in documents]
embeddings = self._get_embeddings(texts)
doc_ids = []
# Prepare batch insert
insert_sql = f"""
INSERT INTO {self.table_name}
(id, title, content, embedding, category, author, tags, metadata, token_count, content_hash)
VALUES (%(id)s, %(title)s, %(content)s, %(embedding)s, %(category)s,
%(author)s, %(tags)s, %(metadata)s, %(token_count)s, %(content_hash)s)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
content = VALUES(content),
embedding = VALUES(embedding),
category = VALUES(category),
author = VALUES(author),
tags = VALUES(tags),
metadata = VALUES(metadata),
updated_at = CURRENT_TIMESTAMP
"""
# Process documents in batches
for i in range(0, len(documents), batch_size):
batch_docs = documents[i:i + batch_size]
batch_embeddings = embeddings[i:i + batch_size]
batch_data = []
for doc, embedding_vec in zip(batch_docs, batch_embeddings):
doc_id = doc.get("id", str(uuid.uuid4()))
doc_ids.append(doc_id)
# Calculate content hash for deduplication
content_hash = str(hash(doc.get("content", "")))
# Estimate token count (rough approximation)
token_count = len(doc.get("content", "").split()) * 1.3 # Rough token estimate
batch_data.append({
"id": doc_id,
"title": doc.get("title", ""),
"content": doc.get("content", ""),
"embedding": self._serialize_embedding(embedding_vec),
"category": doc.get("category", ""),
"author": doc.get("author", ""),
"tags": json.dumps(doc.get("tags", [])),
"metadata": json.dumps(doc.get("metadata", {})),
"token_count": int(token_count),
"content_hash": content_hash
})
# Execute batch insert
with self.connection.cursor() as cursor:
cursor.executemany(insert_sql, batch_data)
self.connection.commit()
print(f"Added {len(documents)} documents to SingleStore")
return doc_ids
def vector_search(self,
query: str,
limit: int = 5,
similarity_threshold: float = 0.7,
filters: Dict[str, Any] = None) -> Dict[str, Any]:
"""Perform vector similarity search with optional SQL filters"""
# Generate query embedding
query_embedding = self._get_embeddings([query])[0]
query_embedding_bytes = self._serialize_embedding(query_embedding)
# Build base SQL query
sql_parts = [f"""
SELECT
id, title, content, category, author, tags, metadata, created_at,
DOT_PRODUCT(embedding, %(query_embedding)s) /
(SQRT(DOT_PRODUCT(embedding, embedding)) * SQRT(DOT_PRODUCT(%(query_embedding)s, %(query_embedding)s))) as similarity_score
FROM {self.table_name}
WHERE 1=1
"""]
params = {"query_embedding": query_embedding_bytes}
# Add filters
if filters:
if "category" in filters:
sql_parts.append("AND category = %(category)s")
params["category"] = filters["category"]
if "author" in filters:
sql_parts.append("AND author = %(author)s")
params["author"] = filters["author"]
if "date_range" in filters:
date_range = filters["date_range"]
if "start" in date_range:
sql_parts.append("AND created_at >= %(start_date)s")
params["start_date"] = date_range["start"]
if "end" in date_range:
sql_parts.append("AND created_at <= %(end_date)s")
params["end_date"] = date_range["end"]
if "tags" in filters:
sql_parts.append("AND JSON_CONTAINS(tags, %(tags_filter)s)")
params["tags_filter"] = json.dumps(filters["tags"])
# Add similarity threshold and ordering
sql_parts.extend([
f"HAVING similarity_score >= {similarity_threshold}",
"ORDER BY similarity_score DESC",
f"LIMIT {limit}"
])
final_sql = " ".join(sql_parts)
# Execute query
with self.connection.cursor() as cursor:
cursor.execute(final_sql, params)
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Format results
formatted_results = {
"documents": [],
"metadata": [],
"scores": [],
"ids": []
}
for row in results:
row_dict = dict(zip(columns, row))
formatted_results["documents"].append(row_dict["content"])
formatted_results["scores"].append(float(row_dict["similarity_score"]))
formatted_results["ids"].append(row_dict["id"])
# Parse JSON fields and combine metadata
tags = json.loads(row_dict["tags"]) if row_dict["tags"] else []
metadata = json.loads(row_dict["metadata"]) if row_dict["metadata"] else {}
combined_metadata = {
"title": row_dict["title"],
"category": row_dict["category"],
"author": row_dict["author"],
"tags": tags,
"created_at": str(row_dict["created_at"]),
**metadata
}
formatted_results["metadata"].append(combined_metadata)
return formatted_results
def hybrid_search(self,
query: str,
limit: int = 5,
vector_weight: float = 0.7,
text_weight: float = 0.3,
filters: Dict[str, Any] = None) -> Dict[str, Any]:
"""Perform hybrid search combining vector similarity and full-text search"""
# Generate query embedding
query_embedding = self._get_embeddings([query])[0]
query_embedding_bytes = self._serialize_embedding(query_embedding)
# Build hybrid search SQL
sql_parts = [f"""
SELECT
id, title, content, category, author, tags, metadata, created_at,
DOT_PRODUCT(embedding, %(query_embedding)s) /
(SQRT(DOT_PRODUCT(embedding, embedding)) * SQRT(DOT_PRODUCT(%(query_embedding)s, %(query_embedding)s))) as vector_score,
MATCH(title, content) AGAINST (%(query_text)s IN NATURAL LANGUAGE MODE) as text_score,
({vector_weight} * DOT_PRODUCT(embedding, %(query_embedding)s) /
(SQRT(DOT_PRODUCT(embedding, embedding)) * SQRT(DOT_PRODUCT(%(query_embedding)s, %(query_embedding)s))) +
{text_weight} * MATCH(title, content) AGAINST (%(query_text)s IN NATURAL LANGUAGE MODE)) as hybrid_score
FROM {self.table_name}
WHERE 1=1
"""]
params = {
"query_embedding": query_embedding_bytes,
"query_text": query
}
# Add filters (same as vector_search)
if filters:
if "category" in filters:
sql_parts.append("AND category = %(category)s")
params["category"] = filters["category"]
# Add other filters as needed...
# Complete query
sql_parts.extend([
"ORDER BY hybrid_score DESC",
f"LIMIT {limit}"
])
final_sql = " ".join(sql_parts)
# Execute and format results (similar to vector_search)
with self.connection.cursor() as cursor:
cursor.execute(final_sql, params)
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Format results with hybrid scoring
formatted_results = {
"documents": [],
"metadata": [],
"scores": [],
"vector_scores": [],
"text_scores": [],
"ids": []
}
for row in results:
row_dict = dict(zip(columns, row))
formatted_results["documents"].append(row_dict["content"])
formatted_results["scores"].append(float(row_dict["hybrid_score"]))
formatted_results["vector_scores"].append(float(row_dict["vector_score"]))
formatted_results["text_scores"].append(float(row_dict["text_score"]))
formatted_results["ids"].append(row_dict["id"])
# Parse and combine metadata
tags = json.loads(row_dict["tags"]) if row_dict["tags"] else []
metadata = json.loads(row_dict["metadata"]) if row_dict["metadata"] else {}
combined_metadata = {
"title": row_dict["title"],
"category": row_dict["category"],
"author": row_dict["author"],
"tags": tags,
"created_at": str(row_dict["created_at"]),
**metadata
}
formatted_results["metadata"].append(combined_metadata)
return formatted_results
def get_analytics(self) -> Dict[str, Any]:
"""Get analytics about the document collection"""
analytics_sql = f"""
SELECT
COUNT(*) as total_documents,
COUNT(DISTINCT category) as unique_categories,
COUNT(DISTINCT author) as unique_authors,
AVG(token_count) as avg_token_count,
MAX(token_count) as max_token_count,
MIN(created_at) as earliest_document,
MAX(created_at) as latest_document
FROM {self.table_name}
"""
with self.connection.cursor() as cursor:
cursor.execute(analytics_sql)
result = cursor.fetchone()
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, result)) if result else {}
# Initialize SingleStore RAG memory
memory = SingleStoreRAGMemory(
table_name="swarms_rag_docs",
embedding_model="text-embedding-3-small",
vector_dimension=1536
from swarms_memory import SingleStoreDB
# Initialize SingleStore wrapper for RAG operations
rag_db = SingleStoreDB(
host=os.getenv("SINGLESTORE_HOST", "localhost"),
port=int(os.getenv("SINGLESTORE_PORT", "3306")),
user=os.getenv("SINGLESTORE_USER", "root"),
password=os.getenv("SINGLESTORE_PASSWORD", "your-password"),
database=os.getenv("SINGLESTORE_DATABASE", "knowledge_base"),
table_name="documents",
embedding_model="text-embedding-3-small"
)
# Sample documents with rich structured data
# Add documents to the knowledge base
documents = [
{
"title": "SingleStore Vector Capabilities",
"content": "SingleStore combines SQL databases with native vector search, enabling complex queries that join structured data with similarity search results.",
"category": "database",
"author": "Technical Team",
"tags": ["singlestore", "vectors", "sql"],
"metadata": {"difficulty": "intermediate", "topic": "hybrid_database"}
},
{
"title": "Real-time Analytics with Vectors",
"content": "SingleStore's HTAP architecture enables real-time analytics on streaming data while performing vector similarity searches simultaneously.",
"category": "analytics",
"author": "Data Scientist",
"tags": ["analytics", "real-time", "htap"],
"metadata": {"difficulty": "advanced", "topic": "streaming"}
},
{
"title": "SQL + Vector Integration",
"content": "The power of SingleStore lies in its ability to combine traditional SQL operations with modern vector search in a single query.",
"category": "integration",
"author": "System Architect",
"tags": ["sql", "integration", "unified"],
"metadata": {"difficulty": "intermediate", "topic": "sql_vectors"}
}
"SingleStore is a distributed SQL database designed for data-intensive applications.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The swarms framework supports multiple memory backends including SingleStore.",
"Swarms is the first and most reliable multi-agent production-grade framework.",
"Kye Gomez is Founder and CEO of Swarms."
]
# Add documents to SingleStore
print("Adding documents to SingleStore...")
doc_ids = memory.add_documents(documents)
print(f"Successfully added {len(doc_ids)} documents")
# Add documents individually
for doc in documents:
rag_db.add(doc)
# Display analytics
analytics = memory.get_analytics()
print(f"Collection analytics: {analytics}")
# Create Swarms agent
# Create agent with RAG capabilities
agent = Agent(
agent_name="SingleStore-RAG-Agent",
agent_description="Advanced agent with SingleStore hybrid SQL + vector RAG capabilities",
agent_name="RAG-Agent",
agent_description="Swarms Agent with SingleStore-powered RAG for enhanced knowledge retrieval",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
)
def query_with_singlestore_rag(query_text: str,
search_type: str = "vector",
limit: int = 3,
filters: Dict[str, Any] = None):
"""Query with SingleStore RAG using vector, text, or hybrid search"""
print(f"\nSingleStore {search_type.title()} Search: {query_text}")
# Perform search based on type
if search_type == "hybrid":
results = memory.hybrid_search(
query=query_text,
limit=limit,
vector_weight=0.7,
text_weight=0.3,
filters=filters
)
else: # vector search
results = memory.vector_search(
query=query_text,
limit=limit,
filters=filters
)
if not results["documents"]:
print("No relevant documents found")
return agent.run(query_text)
# Prepare enhanced context with metadata
context_parts = []
for i, (doc, meta, score) in enumerate(zip(
results["documents"],
results["metadata"],
results["scores"]
)):
metadata_info = f"[Title: {meta.get('title', 'N/A')}, Category: {meta.get('category', 'N/A')}, Author: {meta.get('author', 'N/A')}]"
score_info = f"[Score: {score:.3f}"
# Add hybrid score details if available
if "vector_scores" in results:
score_info += f", Vector: {results['vector_scores'][i]:.3f}, Text: {results['text_scores'][i]:.3f}"
score_info += "]"
context_parts.append(f"Document {i+1} {metadata_info} {score_info}:\n{doc}")
context = "\n\n".join(context_parts)
# Enhanced prompt with structured context
enhanced_prompt = f"""
Based on the following retrieved documents from our SingleStore knowledge base:
{context}
Question: {query_text}
Instructions:
1. Use the structured metadata to understand document context and authority
2. Consider the similarity scores when weighing information importance
3. Provide a comprehensive answer based on the retrieved context
4. Mention specific document titles or authors when referencing information
Response:
"""
return agent.run(enhanced_prompt)
# Example usage
if __name__ == "__main__":
# Test vector search
print("=== Vector Search ===")
response = query_with_singlestore_rag(
"How does SingleStore combine SQL with vector search?",
search_type="vector",
limit=3
)
print(f"Answer: {response}\n")
# Test hybrid search
print("=== Hybrid Search ===")
response = query_with_singlestore_rag(
"real-time analytics capabilities",
search_type="hybrid",
limit=2
)
print(f"Hybrid Answer: {response}\n")
# Test filtered search
print("=== Filtered Search ===")
response = query_with_singlestore_rag(
"database integration patterns",
search_type="vector",
filters={"category": "database"},
limit=2
)
print(f"Filtered Answer: {response}\n")
print("SingleStore RAG integration demonstration completed!")
# Query with RAG
response = agent.run("What is SingleStore and how does it relate to RAG? Who is the founder of Swarms?")
print(response)
```
## Use Cases

@ -61,553 +61,64 @@ export HUGGINGFACE_API_KEY="your-hf-key"
```python
"""
Weaviate Cloud RAG Integration with Swarms Agent
Agent with Weaviate Cloud RAG
This example demonstrates how to integrate Weaviate Cloud as a multi-modal vector database
for RAG operations with Swarms agents using both built-in and LiteLLM embeddings.
This example demonstrates using Weaviate Cloud as a vector database for RAG operations,
allowing agents to store and retrieve documents from cloud-hosted Weaviate.
"""
import os
import weaviate
from typing import List, Dict, Any, Optional
from swarms import Agent
from litellm import embedding
import uuid
from datetime import datetime
class WeaviateCloudMemory:
"""Weaviate Cloud-based memory system for RAG operations"""
def __init__(self,
class_name: str = "SwarmsDocument",
embedding_model: str = "text-embedding-3-small",
use_builtin_vectorization: bool = False,
vectorizer: str = "text2vec-openai"):
"""
Initialize Weaviate Cloud memory system
Args:
class_name: Name of the Weaviate class (collection)
embedding_model: LiteLLM embedding model name
use_builtin_vectorization: Use Weaviate's built-in vectorization
vectorizer: Built-in vectorizer module (if used)
"""
self.class_name = class_name
self.embedding_model = embedding_model
self.use_builtin_vectorization = use_builtin_vectorization
self.vectorizer = vectorizer
# Initialize Weaviate client
self.client = self._create_client()
# Create class schema if it doesn't exist
self._create_schema()
def _create_client(self):
"""Create Weaviate Cloud client"""
url = os.getenv("WEAVIATE_URL")
api_key = os.getenv("WEAVIATE_API_KEY")
openai_key = os.getenv("OPENAI_API_KEY")
if not url:
raise ValueError("WEAVIATE_URL must be set")
auth_config = None
if api_key:
auth_config = weaviate.AuthApiKey(api_key=api_key)
# Additional headers for API keys
additional_headers = {}
if openai_key:
additional_headers["X-OpenAI-Api-Key"] = openai_key
client = weaviate.Client(
url=url,
auth_client_secret=auth_config,
additional_headers=additional_headers
)
print(f"Connected to Weaviate Cloud: {url}")
return client
def _create_schema(self):
"""Create Weaviate class schema"""
# Check if class already exists
schema = self.client.schema.get()
existing_classes = [c["class"] for c in schema.get("classes", [])]
if self.class_name in existing_classes:
print(f"Class '{self.class_name}' already exists")
return
# Define class schema
class_obj = {
"class": self.class_name,
"description": "Document class for Swarms RAG operations",
"properties": [
{
"name": "text",
"dataType": ["text"],
"description": "The document content",
"indexFilterable": True,
"indexSearchable": True
},
{
"name": "category",
"dataType": ["string"],
"description": "Document category",
"indexFilterable": True
},
{
"name": "topic",
"dataType": ["string"],
"description": "Document topic",
"indexFilterable": True
},
{
"name": "difficulty",
"dataType": ["string"],
"description": "Content difficulty level",
"indexFilterable": True
},
{
"name": "metadata",
"dataType": ["object"],
"description": "Additional metadata"
},
{
"name": "timestamp",
"dataType": ["date"],
"description": "Creation timestamp"
}
]
}
# Add vectorizer configuration if using built-in
if self.use_builtin_vectorization:
class_obj["vectorizer"] = self.vectorizer
if self.vectorizer == "text2vec-openai":
class_obj["moduleConfig"] = {
"text2vec-openai": {
"model": "ada",
"modelVersion": "002",
"type": "text"
}
}
else:
class_obj["vectorizer"] = "none"
# Create the class
self.client.schema.create_class(class_obj)
print(f"Created class '{self.class_name}'")
def _get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using LiteLLM (when not using built-in vectorization)"""
if self.use_builtin_vectorization:
return None # Weaviate will handle vectorization
response = embedding(
model=self.embedding_model,
input=texts
)
return [item["embedding"] for item in response["data"]]
def add_documents(self,
documents: List[str],
metadata: List[Dict] = None,
batch_size: int = 100) -> List[str]:
"""Add multiple documents to Weaviate"""
if metadata is None:
metadata = [{}] * len(documents)
# Generate embeddings if not using built-in vectorization
embeddings = self._get_embeddings(documents) if not self.use_builtin_vectorization else None
# Prepare objects for batch import
objects = []
doc_ids = []
for i, (doc, meta) in enumerate(zip(documents, metadata)):
doc_id = str(uuid.uuid4())
doc_ids.append(doc_id)
obj = {
"class": self.class_name,
"id": doc_id,
"properties": {
"text": doc,
"category": meta.get("category", ""),
"topic": meta.get("topic", ""),
"difficulty": meta.get("difficulty", ""),
"metadata": meta,
"timestamp": datetime.now().isoformat()
}
}
# Add vector if using external embeddings
if embeddings and i < len(embeddings):
obj["vector"] = embeddings[i]
objects.append(obj)
# Batch import
with self.client.batch as batch:
batch.batch_size = batch_size
for obj in objects:
batch.add_data_object(**obj)
print(f"Added {len(documents)} documents to Weaviate Cloud")
return doc_ids
def add_document(self, document: str, metadata: Dict = None) -> str:
"""Add a single document to Weaviate"""
result = self.add_documents([document], [metadata or {}])
return result[0] if result else None
def search(self,
query: str,
limit: int = 3,
where_filter: Dict = None,
certainty: float = None,
distance: float = None) -> Dict[str, Any]:
"""Search for similar documents in Weaviate Cloud"""
# Build GraphQL query
query_builder = (
self.client.query
.get(self.class_name, ["text", "category", "topic", "difficulty", "metadata"])
)
# Add vector search
if self.use_builtin_vectorization:
query_builder = query_builder.with_near_text({"concepts": [query]})
else:
# Generate query embedding
query_embedding = self._get_embeddings([query])[0]
query_builder = query_builder.with_near_vector({"vector": query_embedding})
# Add filters
if where_filter:
query_builder = query_builder.with_where(where_filter)
# Add certainty/distance threshold
if certainty is not None:
query_builder = query_builder.with_certainty(certainty)
elif distance is not None:
query_builder = query_builder.with_distance(distance)
# Set limit
query_builder = query_builder.with_limit(limit)
# Add additional fields for scoring
query_builder = query_builder.with_additional(["certainty", "distance", "id"])
# Execute query
result = query_builder.do()
# Format results
formatted_results = {
"documents": [],
"metadata": [],
"scores": [],
"ids": []
}
if "data" in result and "Get" in result["data"] and self.class_name in result["data"]["Get"]:
for item in result["data"]["Get"][self.class_name]:
formatted_results["documents"].append(item.get("text", ""))
formatted_results["metadata"].append(item.get("metadata", {}))
formatted_results["ids"].append(item["_additional"]["id"])
# Use certainty (higher is better) or distance (lower is better)
score = item["_additional"].get("certainty",
1.0 - item["_additional"].get("distance", 1.0))
formatted_results["scores"].append(float(score))
return formatted_results
def hybrid_search(self,
query: str,
limit: int = 3,
alpha: float = 0.5,
where_filter: Dict = None) -> Dict[str, Any]:
"""Perform hybrid search (vector + keyword) in Weaviate"""
query_builder = (
self.client.query
.get(self.class_name, ["text", "category", "topic", "difficulty", "metadata"])
.with_hybrid(query=query, alpha=alpha) # alpha: 0=keyword, 1=vector
)
if where_filter:
query_builder = query_builder.with_where(where_filter)
query_builder = (
query_builder
.with_limit(limit)
.with_additional(["score", "id"])
)
result = query_builder.do()
# Format results (similar to search method)
formatted_results = {
"documents": [],
"metadata": [],
"scores": [],
"ids": []
}
if "data" in result and "Get" in result["data"] and self.class_name in result["data"]["Get"]:
for item in result["data"]["Get"][self.class_name]:
formatted_results["documents"].append(item.get("text", ""))
formatted_results["metadata"].append(item.get("metadata", {}))
formatted_results["ids"].append(item["_additional"]["id"])
formatted_results["scores"].append(float(item["_additional"].get("score", 0.0)))
return formatted_results
def delete_documents(self, where_filter: Dict) -> bool:
"""Delete documents matching filter"""
result = self.client.batch.delete_objects(
class_name=self.class_name,
where=where_filter
)
return "results" in result and "successful" in result["results"]
def get_class_info(self) -> Dict[str, Any]:
"""Get class statistics and information"""
# Get class schema
schema = self.client.schema.get(self.class_name)
# Get object count (approximate)
result = (
self.client.query
.aggregate(self.class_name)
.with_meta_count()
.do()
)
count = 0
if "data" in result and "Aggregate" in result["data"]:
agg_data = result["data"]["Aggregate"][self.class_name][0]
count = agg_data.get("meta", {}).get("count", 0)
return {
"class_name": self.class_name,
"object_count": count,
"properties": len(schema.get("properties", [])),
"vectorizer": schema.get("vectorizer"),
"description": schema.get("description", "")
}
# Initialize Weaviate Cloud memory
# Option 1: Using LiteLLM embeddings
memory = WeaviateCloudMemory(
class_name="SwarmsKnowledgeBase",
from swarms_memory import WeaviateDB
# Get Weaviate Cloud credentials
weaviate_url = os.getenv("WEAVIATE_URL")
weaviate_key = os.getenv("WEAVIATE_API_KEY")
if not weaviate_url or not weaviate_key:
print("Missing Weaviate Cloud credentials!")
print("Please set WEAVIATE_URL and WEAVIATE_API_KEY environment variables")
exit(1)
# Create WeaviateDB wrapper for cloud RAG operations
rag_db = WeaviateDB(
embedding_model="text-embedding-3-small",
use_builtin_vectorization=False
collection_name="swarms_cloud_knowledge",
cluster_url=f"https://{weaviate_url}",
auth_client_secret=weaviate_key,
distance_metric="cosine",
)
# Option 2: Using Weaviate's built-in vectorization
# memory = WeaviateCloudMemory(
# class_name="SwarmsKnowledgeBase",
# use_builtin_vectorization=True,
# vectorizer="text2vec-openai"
# )
# Sample documents for the knowledge base
# Add documents to the cloud knowledge base
documents = [
"Weaviate Cloud is a fully managed vector database with GraphQL API and built-in AI integrations.",
"RAG systems benefit from Weaviate's multi-modal search and relationship modeling capabilities.",
"Vector embeddings in Weaviate can be generated using built-in modules or external models.",
"The Swarms framework leverages Weaviate's GraphQL interface for flexible data queries.",
"LiteLLM integration provides unified access to embedding models across different providers.",
"Hybrid search in Weaviate combines vector similarity with traditional keyword search.",
"GraphQL enables complex queries with filtering, aggregation, and relationship traversal.",
"Multi-modal capabilities allow searching across text, images, and other data types simultaneously.",
"Weaviate Cloud Service provides managed vector database hosting with enterprise features.",
"Cloud-hosted vector databases offer scalability, reliability, and managed infrastructure.",
"RAG combines retrieval and generation for more accurate AI responses.",
"The Swarms framework supports multiple cloud memory backends including Weaviate Cloud.",
"Swarms is the first and most reliable multi-agent production-grade framework.",
"Kye Gomez is Founder and CEO of Swarms Corporation."
]
# Rich metadata for advanced filtering and organization
metadatas = [
{"category": "database", "topic": "weaviate", "difficulty": "intermediate", "type": "overview"},
{"category": "ai", "topic": "rag", "difficulty": "intermediate", "type": "concept"},
{"category": "ml", "topic": "embeddings", "difficulty": "beginner", "type": "concept"},
{"category": "framework", "topic": "swarms", "difficulty": "beginner", "type": "integration"},
{"category": "library", "topic": "litellm", "difficulty": "beginner", "type": "tool"},
{"category": "search", "topic": "hybrid", "difficulty": "advanced", "type": "feature"},
{"category": "api", "topic": "graphql", "difficulty": "advanced", "type": "interface"},
{"category": "multimodal", "topic": "search", "difficulty": "advanced", "type": "capability"},
]
# Add documents to Weaviate Cloud
print("Adding documents to Weaviate Cloud...")
doc_ids = memory.add_documents(documents, metadatas)
print(f"Successfully added {len(doc_ids)} documents")
# Display class information
info = memory.get_class_info()
print(f"Class info: {info['object_count']} objects, {info['properties']} properties")
for doc in documents:
rag_db.add(doc)
# Create Swarms agent with Weaviate Cloud RAG
# Create agent with cloud RAG capabilities
agent = Agent(
agent_name="Weaviate-RAG-Agent",
agent_description="Intelligent agent with Weaviate Cloud multi-modal RAG capabilities",
agent_name="Weaviate-Cloud-RAG-Agent",
agent_description="Swarms Agent with Weaviate Cloud-powered RAG for scalable knowledge retrieval",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
)
def query_with_weaviate_rag(query_text: str,
limit: int = 3,
search_type: str = "vector",
where_filter: Dict = None,
alpha: float = 0.5):
"""Query with RAG using Weaviate's advanced search capabilities"""
print(f"\nQuerying ({search_type}): {query_text}")
if where_filter:
print(f"Filter: {where_filter}")
# Choose search method
if search_type == "hybrid":
results = memory.hybrid_search(
query=query_text,
limit=limit,
alpha=alpha,
where_filter=where_filter
)
else: # vector search
results = memory.search(
query=query_text,
limit=limit,
where_filter=where_filter
)
if not results["documents"]:
print("No relevant documents found")
return agent.run(query_text)
# Prepare context from retrieved documents
context = "\n".join([
f"Document {i+1}: {doc}"
for i, doc in enumerate(results["documents"])
])
# Display retrieved documents with metadata
print("Retrieved documents:")
for i, (doc, score, meta) in enumerate(zip(
results["documents"], results["scores"], results["metadata"]
)):
print(f" {i+1}. (Score: {score:.4f}) Category: {meta.get('category', 'N/A')}")
print(f" {doc[:100]}...")
# Enhanced prompt with context
enhanced_prompt = f"""
Based on the following retrieved context from our knowledge base, please answer the question:
Context:
{context}
Question: {query_text}
Please provide a comprehensive answer based primarily on the context provided.
"""
# Run agent with enhanced prompt
response = agent.run(enhanced_prompt)
return response
# Example usage and testing
if __name__ == "__main__":
# Test vector search
print("=== Vector Search Queries ===")
queries = [
"What is Weaviate Cloud and its key features?",
"How does RAG work with vector databases?",
"What are hybrid search capabilities?",
"How does GraphQL enhance database queries?",
]
for query in queries:
response = query_with_weaviate_rag(query, limit=3, search_type="vector")
print(f"Answer: {response}\n")
print("-" * 80)
# Test hybrid search
print("\n=== Hybrid Search Queries ===")
hybrid_queries = [
"advanced search features",
"GraphQL API capabilities",
"multi-modal AI applications",
]
for query in hybrid_queries:
response = query_with_weaviate_rag(
query,
limit=3,
search_type="hybrid",
alpha=0.7 # More weight on vector search
)
print(f"Hybrid Answer: {response}\n")
print("-" * 80)
# Test filtered search
print("\n=== Filtered Search Queries ===")
# Filter for advanced topics
advanced_filter = {
"path": ["difficulty"],
"operator": "Equal",
"valueText": "advanced"
}
response = query_with_weaviate_rag(
"What are advanced capabilities?",
limit=3,
where_filter=advanced_filter
)
print(f"Advanced topics: {response}\n")
# Filter for database and API categories
category_filter = {
"operator": "Or",
"operands": [
{
"path": ["category"],
"operator": "Equal",
"valueText": "database"
},
{
"path": ["category"],
"operator": "Equal",
"valueText": "api"
}
]
}
response = query_with_weaviate_rag(
"Tell me about database and API features",
limit=3,
where_filter=category_filter
)
print(f"Database & API features: {response}\n")
# Add new document dynamically
print("=== Dynamic Document Addition ===")
new_doc = "Weaviate's schema flexibility allows automatic type inference and dynamic property addition."
new_meta = {
"category": "schema",
"topic": "flexibility",
"difficulty": "intermediate",
"type": "feature"
}
memory.add_document(new_doc, new_meta)
# Query about schema flexibility
response = query_with_weaviate_rag("How does schema flexibility work?")
print(f"Schema flexibility: {response}\n")
# Display final class information
final_info = memory.get_class_info()
print(f"Final class info: {final_info}")
print("Testing agent with cloud RAG...")
# Query with cloud RAG
response = agent.run("What is Weaviate Cloud and how does it relate to RAG? Who founded Swarms?")
print(response)
```
## Use Cases

@ -90,302 +90,50 @@ export HUGGINGFACE_API_KEY="your-hf-key"
```python
"""
Weaviate Local RAG Integration with Swarms Agent
Agent with Weaviate Local RAG
This example demonstrates how to integrate self-hosted Weaviate as a customizable
vector database for RAG operations with full local control.
This example demonstrates using local Weaviate as a vector database for RAG operations,
allowing agents to store and retrieve documents for enhanced context.
"""
import weaviate
from typing import List, Dict, Any, Optional
from swarms import Agent
from litellm import embedding
import uuid
from datetime import datetime
class WeaviateLocalMemory:
"""Weaviate Local-based memory system for RAG operations"""
def __init__(self,
url: str = "http://localhost:8080",
class_name: str = "LocalDocument",
embedding_model: str = "text-embedding-3-small",
use_builtin_vectorization: bool = False,
auth_config: Optional[Dict] = None):
"""
Initialize Weaviate Local memory system
Args:
url: Weaviate server URL
class_name: Name of the Weaviate class
embedding_model: LiteLLM embedding model name
use_builtin_vectorization: Use Weaviate's built-in vectorization
auth_config: Authentication configuration
"""
self.url = url
self.class_name = class_name
self.embedding_model = embedding_model
self.use_builtin_vectorization = use_builtin_vectorization
# Initialize client
self.client = self._create_client(auth_config)
# Create schema
self._create_schema()
def _create_client(self, auth_config: Optional[Dict] = None):
"""Create Weaviate local client"""
client_config = {"url": self.url}
if auth_config:
if auth_config.get("type") == "api_key":
client_config["auth_client_secret"] = weaviate.AuthApiKey(
api_key=auth_config["api_key"]
)
elif auth_config.get("type") == "username_password":
client_config["auth_client_secret"] = weaviate.AuthClientPassword(
username=auth_config["username"],
password=auth_config["password"]
)
# Add API keys for modules
additional_headers = {}
if "OPENAI_API_KEY" in os.environ:
additional_headers["X-OpenAI-Api-Key"] = os.environ["OPENAI_API_KEY"]
if additional_headers:
client_config["additional_headers"] = additional_headers
client = weaviate.Client(**client_config)
# Test connection
try:
client.schema.get()
print(f"Connected to Weaviate Local: {self.url}")
except Exception as e:
raise ConnectionError(f"Failed to connect to Weaviate: {e}")
return client
def _create_schema(self):
"""Create Weaviate class schema"""
schema = self.client.schema.get()
existing_classes = [c["class"] for c in schema.get("classes", [])]
if self.class_name in existing_classes:
print(f"Class '{self.class_name}' already exists")
return
# Define comprehensive schema
class_obj = {
"class": self.class_name,
"description": "Local document class for Swarms RAG operations",
"vectorizer": "none" if not self.use_builtin_vectorization else "text2vec-openai",
"properties": [
{
"name": "text",
"dataType": ["text"],
"description": "Document content",
"indexFilterable": True,
"indexSearchable": True,
"tokenization": "word"
},
{
"name": "title",
"dataType": ["string"],
"description": "Document title",
"indexFilterable": True
},
{
"name": "category",
"dataType": ["string"],
"description": "Document category",
"indexFilterable": True
},
{
"name": "tags",
"dataType": ["string[]"],
"description": "Document tags",
"indexFilterable": True
},
{
"name": "author",
"dataType": ["string"],
"description": "Document author",
"indexFilterable": True
},
{
"name": "created_at",
"dataType": ["date"],
"description": "Creation date"
},
{
"name": "metadata",
"dataType": ["object"],
"description": "Additional metadata"
}
]
}
self.client.schema.create_class(class_obj)
print(f"Created local class '{self.class_name}'")
def add_documents(self, documents: List[Dict]) -> List[str]:
"""Add documents with rich metadata to Weaviate Local"""
doc_ids = []
with self.client.batch as batch:
batch.batch_size = 100
for doc_data in documents:
doc_id = str(uuid.uuid4())
doc_ids.append(doc_id)
# Prepare properties
properties = {
"text": doc_data.get("text", ""),
"title": doc_data.get("title", ""),
"category": doc_data.get("category", ""),
"tags": doc_data.get("tags", []),
"author": doc_data.get("author", ""),
"created_at": doc_data.get("created_at", datetime.now().isoformat()),
"metadata": doc_data.get("metadata", {})
}
batch_obj = {
"class": self.class_name,
"id": doc_id,
"properties": properties
}
# Add vector if using external embeddings
if not self.use_builtin_vectorization:
text_content = doc_data.get("text", "")
if text_content:
embedding_vec = self._get_embeddings([text_content])[0]
batch_obj["vector"] = embedding_vec
batch.add_data_object(**batch_obj)
print(f"Added {len(documents)} documents to local Weaviate")
return doc_ids
def _get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using LiteLLM"""
response = embedding(model=self.embedding_model, input=texts)
return [item["embedding"] for item in response["data"]]
def search(self, query: str, limit: int = 3, **kwargs) -> Dict[str, Any]:
"""Search documents with flexible filtering"""
# Build query
query_builder = (
self.client.query
.get(self.class_name, ["text", "title", "category", "tags", "author", "metadata"])
)
# Add vector search
if self.use_builtin_vectorization:
query_builder = query_builder.with_near_text({"concepts": [query]})
else:
query_embedding = self._get_embeddings([query])[0]
query_builder = query_builder.with_near_vector({"vector": query_embedding})
# Add optional filters
if "where_filter" in kwargs:
query_builder = query_builder.with_where(kwargs["where_filter"])
# Execute query
result = (
query_builder
.with_limit(limit)
.with_additional(["certainty", "distance", "id"])
.do()
)
# Format results
formatted_results = {"documents": [], "metadata": [], "scores": [], "ids": []}
if "data" in result and "Get" in result["data"]:
for item in result["data"]["Get"].get(self.class_name, []):
formatted_results["documents"].append(item.get("text", ""))
# Combine all metadata
metadata = {
"title": item.get("title", ""),
"category": item.get("category", ""),
"tags": item.get("tags", []),
"author": item.get("author", ""),
**item.get("metadata", {})
}
formatted_results["metadata"].append(metadata)
formatted_results["ids"].append(item["_additional"]["id"])
score = item["_additional"].get("certainty", 0.0)
formatted_results["scores"].append(float(score))
return formatted_results
# Sample usage
memory = WeaviateLocalMemory(
url="http://localhost:8080",
class_name="SwarmsLocalKB",
embedding_model="text-embedding-3-small"
from swarms_memory import WeaviateDB
# Create WeaviateDB wrapper for RAG operations
rag_db = WeaviateDB(
embedding_model="text-embedding-3-small",
collection_name="swarms_knowledge",
cluster_url="http://localhost:8080", # Local Weaviate instance
distance_metric="cosine",
)
# Add rich documents
# Add documents to the knowledge base
documents = [
{
"text": "Weaviate Local provides full control over vector database deployment and data sovereignty.",
"title": "Local Deployment Benefits",
"category": "deployment",
"tags": ["weaviate", "local", "control"],
"author": "System",
"metadata": {"difficulty": "intermediate", "topic": "infrastructure"}
},
{
"text": "Self-hosted Weaviate enables custom configurations and air-gapped deployments for sensitive data.",
"title": "Security and Compliance",
"category": "security",
"tags": ["security", "compliance", "air-gap"],
"author": "Admin",
"metadata": {"difficulty": "advanced", "topic": "security"}
}
"Weaviate is an open-source vector database optimized for similarity search and AI applications.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The Swarms framework supports multiple memory backends including Weaviate.",
"Swarms is the first and most reliable multi-agent production-grade framework.",
"Kye Gomez is Founder and CEO of Swarms Corporation."
]
# Create agent and add documents
memory.add_documents(documents)
# Add documents individually
for doc in documents:
rag_db.add(doc)
# Create agent with RAG capabilities
agent = Agent(
agent_name="Local-Weaviate-Agent",
agent_description="Agent with self-hosted Weaviate for private RAG operations",
agent_name="Weaviate-RAG-Agent",
agent_description="Swarms Agent with Weaviate-powered RAG for enhanced knowledge retrieval",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
)
def query_local_rag(query: str, limit: int = 3) -> str:
"""Query local Weaviate with RAG"""
results = memory.search(query, limit=limit)
if not results["documents"]:
return agent.run(query)
context = "\n".join(results["documents"])
enhanced_prompt = f"""
Based on this local knowledge base context:
{context}
Question: {query}
Provide a comprehensive answer using the context.
"""
return agent.run(enhanced_prompt)
# Example usage
response = query_local_rag("What are the benefits of local Weaviate deployment?")
# Query with RAG
response = agent.run("What is Weaviate and how does it relate to RAG? Who is the founder of Swarms?")
print(response)
```

@ -61,477 +61,105 @@ export OPENAI_API_KEY="your-openai-api-key"
```python
"""
Zyphra RAG Integration with Swarms Agent
Agent with Zyphra RAG (Retrieval-Augmented Generation)
This example demonstrates how to integrate Zyphra RAG as a specialized
vector database optimized for RAG workflows with intelligent retrieval.
This example demonstrates using Zyphra RAG system for RAG operations,
allowing agents to store and retrieve documents for enhanced context.
Note: Zyphra RAG is a complete RAG system with graph-based retrieval.
"""
import os
from typing import List, Dict, Any, Optional, Tuple
import torch
from swarms import Agent
from litellm import embedding
import tiktoken
from datetime import datetime
import uuid
# Conceptual Zyphra RAG client implementation
class ZyphraRAGClient:
"""Conceptual client for Zyphra RAG service"""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
# This would be implemented with actual HTTP client
def create_collection(self, name: str, config: Dict) -> Dict:
# Conceptual API call
return {"collection_id": f"col_{uuid.uuid4()}", "status": "created"}
def add_documents(self, collection_id: str, documents: List[Dict]) -> Dict:
# Conceptual document ingestion with intelligent chunking
return {"document_ids": [f"doc_{i}" for i in range(len(documents))]}
def search(self, collection_id: str, query: str, params: Dict) -> Dict:
# Conceptual RAG-optimized search
return {
"results": [
{
"text": "Sample retrieved content...",
"score": 0.95,
"metadata": {"chunk_id": "chunk_1", "relevance": "high"},
"context_signals": {"semantic": 0.9, "lexical": 0.8, "contextual": 0.95}
}
]
}
class ZyphraRAGMemory:
"""Zyphra RAG-based memory system optimized for RAG operations"""
def __init__(self,
collection_name: str = "swarms_rag_collection",
embedding_model: str = "text-embedding-3-small",
chunk_strategy: str = "intelligent",
max_chunk_size: int = 512,
chunk_overlap: int = 50,
retrieval_strategy: str = "hybrid_enhanced"):
"""
Initialize Zyphra RAG memory system
Args:
collection_name: Name of the RAG collection
embedding_model: LiteLLM embedding model name
chunk_strategy: Document chunking strategy
max_chunk_size: Maximum tokens per chunk
chunk_overlap: Overlap tokens between chunks
retrieval_strategy: Retrieval optimization strategy
"""
self.collection_name = collection_name
self.embedding_model = embedding_model
self.chunk_strategy = chunk_strategy
self.max_chunk_size = max_chunk_size
self.chunk_overlap = chunk_overlap
self.retrieval_strategy = retrieval_strategy
# Initialize tokenizer for chunk management
self.tokenizer = tiktoken.get_encoding("cl100k_base")
# Initialize Zyphra RAG client (conceptual)
self.client = self._create_client()
# Create collection with RAG-optimized configuration
self.collection_id = self._create_collection()
def _create_client(self):
"""Create Zyphra RAG client"""
api_key = os.getenv("ZYPHRA_RAG_API_KEY")
base_url = os.getenv("ZYPHRA_RAG_URL", "https://api.zyphra.com/rag/v1")
if not api_key:
raise ValueError("ZYPHRA_RAG_API_KEY must be set")
print(f"Connecting to Zyphra RAG: {base_url}")
return ZyphraRAGClient(api_key, base_url)
def _create_collection(self):
"""Create RAG-optimized collection"""
config = {
"embedding_model": self.embedding_model,
"chunk_strategy": self.chunk_strategy,
"max_chunk_size": self.max_chunk_size,
"chunk_overlap": self.chunk_overlap,
"retrieval_strategy": self.retrieval_strategy,
"optimization_target": "rag_quality",
"context_window_management": True,
"query_enhancement": True,
"relevance_scoring": "llm_optimized"
}
result = self.client.create_collection(self.collection_name, config)
print(f"Created RAG collection: {result['collection_id']}")
return result["collection_id"]
def _intelligent_chunking(self, text: str, metadata: Dict) -> List[Dict]:
"""Implement intelligent chunking with context preservation"""
# Tokenize text
tokens = self.tokenizer.encode(text)
# Simple chunking implementation (in practice, this would be more sophisticated)
chunks = []
start = 0
while start < len(tokens):
end = min(start + self.max_chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = self.tokenizer.decode(chunk_tokens)
# Preserve context by including overlap
if start > 0 and self.chunk_overlap > 0:
overlap_start = max(0, start - self.chunk_overlap)
overlap_tokens = tokens[overlap_start:start]
overlap_text = self.tokenizer.decode(overlap_tokens)
chunk_text = overlap_text + " " + chunk_text
chunks.append({
"text": chunk_text,
"tokens": len(chunk_tokens),
"chunk_index": len(chunks),
"start_token": start,
"end_token": end,
"metadata": {**metadata, "chunk_strategy": self.chunk_strategy}
})
start = end - self.chunk_overlap if end < len(tokens) else end
return chunks
def _get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using LiteLLM"""
response = embedding(
model=self.embedding_model,
input=texts
)
return [item["embedding"] for item in response["data"]]
def add_documents(self,
documents: List[str],
metadata: List[Dict] = None,
enable_chunking: bool = True) -> List[str]:
"""Add documents with intelligent chunking for RAG optimization"""
if metadata is None:
metadata = [{}] * len(documents)
processed_docs = []
for doc, meta in zip(documents, metadata):
if enable_chunking and self.chunk_strategy != "none":
# Apply intelligent chunking
chunks = self._intelligent_chunking(doc, meta)
processed_docs.extend(chunks)
else:
# Add as single document
processed_docs.append({
"text": doc,
"metadata": meta,
"tokens": len(self.tokenizer.encode(doc))
})
# Generate embeddings for all processed documents
texts = [doc["text"] for doc in processed_docs]
embeddings = self._get_embeddings(texts)
# Prepare documents for Zyphra RAG ingestion
rag_documents = []
for i, (doc_data, embedding_vec) in enumerate(zip(processed_docs, embeddings)):
rag_doc = {
"id": f"doc_{uuid.uuid4()}",
"text": doc_data["text"],
"embedding": embedding_vec,
"metadata": {
**doc_data["metadata"],
"tokens": doc_data["tokens"],
"processed_at": datetime.now().isoformat(),
"chunk_index": doc_data.get("chunk_index", 0)
}
}
rag_documents.append(rag_doc)
# Ingest into Zyphra RAG
result = self.client.add_documents(self.collection_id, rag_documents)
print(f"Added {len(documents)} documents ({len(processed_docs)} chunks) to Zyphra RAG")
return result["document_ids"]
def search(self,
query: str,
limit: int = 3,
relevance_threshold: float = 0.7,
context_optimization: bool = True,
query_enhancement: bool = True) -> Dict[str, Any]:
"""Perform RAG-optimized search with enhanced retrieval strategies"""
search_params = {
"limit": limit,
"relevance_threshold": relevance_threshold,
"context_optimization": context_optimization,
"query_enhancement": query_enhancement,
"retrieval_strategy": self.retrieval_strategy,
"embedding_model": self.embedding_model,
"return_context_signals": True,
"optimize_for_llm": True
}
# Perform enhanced search
search_result = self.client.search(self.collection_id, query, search_params)
# Format results with RAG-specific enhancements
formatted_results = {
"documents": [],
"metadata": [],
"scores": [],
"context_signals": [],
"retrieval_quality": {},
"token_counts": []
}
total_tokens = 0
for result in search_result.get("results", []):
formatted_results["documents"].append(result["text"])
formatted_results["metadata"].append(result.get("metadata", {}))
formatted_results["scores"].append(float(result["score"]))
formatted_results["context_signals"].append(result.get("context_signals", {}))
# Track token usage for context window management
token_count = len(self.tokenizer.encode(result["text"]))
formatted_results["token_counts"].append(token_count)
total_tokens += token_count
# Add retrieval quality metrics
formatted_results["retrieval_quality"] = {
"total_tokens": total_tokens,
"avg_relevance": sum(formatted_results["scores"]) / len(formatted_results["scores"]) if formatted_results["scores"] else 0,
"context_diversity": self._calculate_context_diversity(formatted_results["documents"]),
"query_enhancement_applied": query_enhancement
}
return formatted_results
def _calculate_context_diversity(self, documents: List[str]) -> float:
"""Calculate diversity score for retrieved context"""
# Simple diversity calculation (in practice, this would be more sophisticated)
if len(documents) <= 1:
return 1.0
# Calculate semantic diversity based on document similarity
embeddings = self._get_embeddings(documents)
similarities = []
for i in range(len(embeddings)):
for j in range(i + 1, len(embeddings)):
# Cosine similarity
dot_product = sum(a * b for a, b in zip(embeddings[i], embeddings[j]))
norm_a = sum(a * a for a in embeddings[i]) ** 0.5
norm_b = sum(b * b for b in embeddings[j]) ** 0.5
similarity = dot_product / (norm_a * norm_b)
similarities.append(similarity)
# Diversity = 1 - average similarity
avg_similarity = sum(similarities) / len(similarities) if similarities else 0
return 1.0 - avg_similarity
def optimize_context_window(self,
retrieved_results: Dict,
max_tokens: int = 4000,
strategy: str = "relevance_first") -> Dict[str, Any]:
"""Optimize retrieved context for specific token budget"""
documents = retrieved_results["documents"]
scores = retrieved_results["scores"]
token_counts = retrieved_results["token_counts"]
if sum(token_counts) <= max_tokens:
return retrieved_results
# Apply context window optimization strategy
if strategy == "relevance_first":
# Sort by relevance and include highest scoring documents
sorted_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
elif strategy == "token_efficient":
# Optimize for best relevance per token
efficiency_scores = [score / max(token_count, 1) for score, token_count in zip(scores, token_counts)]
sorted_indices = sorted(range(len(efficiency_scores)), key=lambda i: efficiency_scores[i], reverse=True)
else:
sorted_indices = list(range(len(documents)))
# Select documents within token budget
selected_docs = []
selected_metadata = []
selected_scores = []
selected_tokens = []
current_tokens = 0
for idx in sorted_indices:
if current_tokens + token_counts[idx] <= max_tokens:
selected_docs.append(documents[idx])
selected_metadata.append(retrieved_results["metadata"][idx])
selected_scores.append(scores[idx])
selected_tokens.append(token_counts[idx])
current_tokens += token_counts[idx]
return {
"documents": selected_docs,
"metadata": selected_metadata,
"scores": selected_scores,
"token_counts": selected_tokens,
"optimization_applied": True,
"final_token_count": current_tokens,
"token_efficiency": current_tokens / max_tokens
}
# Initialize Zyphra RAG memory
memory = ZyphraRAGMemory(
collection_name="swarms_zyphra_rag",
embedding_model="text-embedding-3-small",
chunk_strategy="intelligent",
max_chunk_size=512,
retrieval_strategy="hybrid_enhanced"
)
# Sample documents optimized for RAG
documents = [
"Zyphra RAG is a specialized vector database designed specifically for retrieval-augmented generation workflows. It optimizes every aspect of the retrieval process to maximize relevance and context quality for language models.",
"Intelligent chunking in Zyphra RAG preserves document context while creating optimal chunk sizes for embedding and retrieval. This approach maintains semantic coherence across chunk boundaries.",
"Multi-strategy retrieval combines semantic similarity, lexical matching, and contextual signals to identify the most relevant information for specific queries and use cases.",
"Context window optimization ensures that retrieved information fits within language model constraints while maximizing information density and relevance to the query.",
"RAG-specific scoring algorithms in Zyphra prioritize content that is most likely to improve language model response quality and accuracy.",
]
# Rich metadata for RAG optimization
metadatas = [
{"category": "overview", "topic": "zyphra_rag", "difficulty": "intermediate", "content_type": "explanation"},
{"category": "feature", "topic": "chunking", "difficulty": "advanced", "content_type": "technical"},
{"category": "feature", "topic": "retrieval", "difficulty": "advanced", "content_type": "technical"},
{"category": "optimization", "topic": "context", "difficulty": "expert", "content_type": "technical"},
{"category": "algorithm", "topic": "scoring", "difficulty": "expert", "content_type": "technical"},
]
# Add documents to Zyphra RAG
print("Adding documents to Zyphra RAG...")
doc_ids = memory.add_documents(documents, metadatas, enable_chunking=True)
print(f"Successfully processed documents into {len(doc_ids)} retrievable units")
# Create Swarms agent with Zyphra RAG
agent = Agent(
agent_name="ZyphraRAG-Agent",
agent_description="Advanced agent with Zyphra RAG-optimized retrieval for maximum context relevance",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
)
def query_with_zyphra_rag(query_text: str,
limit: int = 3,
max_context_tokens: int = 3000,
optimization_strategy: str = "relevance_first"):
"""Query with Zyphra RAG's advanced retrieval optimization"""
print(f"\nZyphra RAG Query: {query_text}")
# Perform RAG-optimized search
results = memory.search(
query=query_text,
limit=limit,
relevance_threshold=0.7,
context_optimization=True,
query_enhancement=True
from swarms_memory.vector_dbs.zyphra_rag import RAGSystem
# Simple LLM wrapper that uses the agent's model
class AgentLLMWrapper(torch.nn.Module):
"""
LLM wrapper that integrates with the Swarms Agent's model.
"""
def __init__(self):
super().__init__()
self.agent = None
def set_agent(self, agent):
"""Set the agent reference for LLM calls"""
self.agent = agent
def forward(self, prompt: str) -> str:
if self.agent:
return self.agent.llm(prompt)
return f"Generated response for: {prompt[:100]}..."
def __call__(self, prompt: str) -> str:
return self.forward(prompt)
# Create a wrapper class to make Zyphra RAG compatible with Swarms Agent
class ZyphraRAGWrapper:
"""
Wrapper to make Zyphra RAG system compatible with Swarms Agent memory interface.
"""
def __init__(self, rag_system, chunks, embeddings, graph):
self.rag_system = rag_system
self.chunks = chunks
self.embeddings = embeddings
self.graph = graph
def add(self, doc: str):
"""Add method for compatibility - Zyphra processes entire documents at once"""
print(f"Note: Zyphra RAG processes entire documents. Document already processed: {doc[:50]}...")
def query(self, query_text: str, **kwargs) -> str:
"""Query the RAG system"""
return self.rag_system.answer_query(query_text, self.chunks, self.embeddings, self.graph)
if __name__ == '__main__':
# Create LLM wrapper
llm = AgentLLMWrapper()
# Initialize Zyphra RAG System
rag_db = RAGSystem(
llm=llm,
vocab_size=10000 # Vocabulary size for sparse embeddings
)
if not results["documents"]:
print("No relevant documents found")
return agent.run(query_text)
# Optimize context window if needed
if results["retrieval_quality"]["total_tokens"] > max_context_tokens:
print(f"Optimizing context window: {results['retrieval_quality']['total_tokens']} -> {max_context_tokens} tokens")
results = memory.optimize_context_window(results, max_context_tokens, optimization_strategy)
# Prepare enhanced context
context_parts = []
for i, (doc, score, signals) in enumerate(zip(
results["documents"],
results["scores"],
results.get("context_signals", [{}] * len(results["documents"]))
)):
relevance_info = f"[Relevance: {score:.3f}"
if signals:
relevance_info += f", Semantic: {signals.get('semantic', 0):.2f}, Contextual: {signals.get('contextual', 0):.2f}"
relevance_info += "]"
context_parts.append(f"Context {i+1} {relevance_info}:\n{doc}")
context = "\n\n".join(context_parts)
# Display retrieval analytics
quality = results["retrieval_quality"]
print(f"Retrieval Quality Metrics:")
print(f" - Total tokens: {quality['total_tokens']}")
print(f" - Average relevance: {quality['avg_relevance']:.3f}")
print(f" - Context diversity: {quality['context_diversity']:.3f}")
print(f" - Query enhancement: {quality['query_enhancement_applied']}")
# Enhanced prompt with RAG-optimized context
enhanced_prompt = f"""
You are provided with high-quality, RAG-optimized context retrieved specifically for this query.
The context has been scored and ranked for relevance, with diversity optimization applied.
Context:
{context}
# Add documents to the knowledge base
documents = [
"Zyphra RAG is an advanced retrieval system that combines sparse embeddings with graph-based retrieval algorithms.",
"Zyphra RAG uses Personalized PageRank (PPR) to identify the most relevant document chunks for a given query.",
"The system builds a graph representation of document chunks based on embedding similarities between text segments.",
"Zyphra RAG employs sparse embeddings using word count methods for fast, CPU-friendly text representation.",
"The graph builder creates adjacency matrices representing similarity relationships between document chunks.",
"Zyphra RAG excels at context-aware document retrieval through its graph-based approach to semantic search.",
"Kye Gomez is the founder of Swarms."
]
document_text = " ".join(documents)
Question: {query_text}
# Process the document (creates chunks, embeddings, and graph)
chunks, embeddings, graph = rag_db.process_document(document_text, chunk_size=100)
Instructions:
1. Base your response primarily on the provided context
2. Consider the relevance scores when weighing information
3. Synthesize information from multiple context pieces when applicable
4. Indicate confidence level based on context quality and relevance
# Create the wrapper
rag_wrapper = ZyphraRAGWrapper(rag_db, chunks, embeddings, graph)
Response:
"""
# Run agent with enhanced prompt
response = agent.run(enhanced_prompt)
return response
# Example usage and testing
if __name__ == "__main__":
# Test RAG-optimized queries
queries = [
"How does Zyphra RAG optimize retrieval for language models?",
"What is intelligent chunking and why is it important?",
"How does multi-strategy retrieval work?",
"What are the benefits of context window optimization?",
]
print("=== Zyphra RAG Enhanced Queries ===")
for query in queries:
response = query_with_zyphra_rag(
query,
limit=3,
max_context_tokens=2500,
optimization_strategy="relevance_first"
)
print(f"Enhanced Answer: {response}\n")
print("-" * 80)
# Test token efficiency optimization
print("\n=== Token Efficiency Optimization ===")
response = query_with_zyphra_rag(
"Explain all the advanced features of Zyphra RAG",
limit=5,
max_context_tokens=1500, # Strict token limit
optimization_strategy="token_efficient"
# Create agent with RAG capabilities
agent = Agent(
agent_name="RAG-Agent",
agent_description="Swarms Agent with Zyphra RAG-powered graph-based retrieval for enhanced knowledge retrieval",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_wrapper
)
print(f"Token-optimized answer: {response}\n")
print("Zyphra RAG integration demonstration completed!")
# Connect the LLM wrapper to the agent
llm.set_agent(agent)
# Query with RAG
response = agent.run("What is Zyphra RAG and who is the founder of Swarms?")
print(response)
```
## Use Cases

Loading…
Cancel
Save