You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/examples/single_agent/rag/qdrant_rag_example.py

883 lines
30 KiB

"""
Qdrant RAG Example with Document Ingestion
This example demonstrates how to use the agent structure from example.py with Qdrant RAG
to ingest a vast array of PDF documents and text files for advanced quantitative trading analysis.
Features:
- Document ingestion from multiple file types (PDF, TXT, MD)
- Qdrant vector database integration
- Sentence transformer embeddings
- Comprehensive document processing pipeline
- Agent with RAG capabilities for financial analysis
"""
import os
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.models import Distance, VectorParams
from sentence_transformers import SentenceTransformer
from swarms import Agent
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.data_to_text import data_to_text
class DocumentProcessor:
"""
Handles document processing and text extraction from various file formats.
This class provides functionality to process PDF, TXT, and Markdown files,
extracting text content for vectorization and storage in the RAG system.
"""
def __init__(
self, supported_extensions: Optional[List[str]] = None
):
"""
Initialize the DocumentProcessor.
Args:
supported_extensions: List of supported file extensions.
Defaults to ['.pdf', '.txt', '.md']
"""
if supported_extensions is None:
supported_extensions = [".pdf", ".txt", ".md"]
self.supported_extensions = supported_extensions
def process_document(
self, file_path: Union[str, Path]
) -> Optional[Dict[str, str]]:
"""
Process a single document and extract its text content.
Args:
file_path: Path to the document file
Returns:
Dictionary containing document metadata and extracted text, or None if processing fails
"""
file_path = Path(file_path)
if not file_path.exists():
print(f"File not found: {file_path}")
return None
if file_path.suffix.lower() not in self.supported_extensions:
print(f"Unsupported file type: {file_path.suffix}")
return None
try:
# Extract text based on file type
if file_path.suffix.lower() == ".pdf":
try:
text_content = pdf_to_text(str(file_path))
except Exception as pdf_error:
print(f"Error extracting PDF text: {pdf_error}")
# Fallback: try to read as text file
with open(
file_path,
"r",
encoding="utf-8",
errors="ignore",
) as f:
text_content = f.read()
else:
try:
text_content = data_to_text(str(file_path))
except Exception as data_error:
print(f"Error extracting text: {data_error}")
# Fallback: try to read as text file
with open(
file_path,
"r",
encoding="utf-8",
errors="ignore",
) as f:
text_content = f.read()
# Ensure text_content is a string
if callable(text_content):
print(
f"Warning: {file_path} returned a callable, trying to call it..."
)
try:
text_content = text_content()
except Exception as call_error:
print(f"Error calling callable: {call_error}")
return None
if not text_content or not isinstance(text_content, str):
print(
f"No valid text content extracted from: {file_path}"
)
return None
# Clean the text content
text_content = str(text_content).strip()
return {
"file_path": str(file_path),
"file_name": file_path.name,
"file_type": file_path.suffix.lower(),
"text_content": text_content,
"file_size": file_path.stat().st_size,
"processed_at": datetime.utcnow().isoformat(),
}
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
return None
def process_directory(
self, directory_path: Union[str, Path], max_workers: int = 4
) -> List[Dict[str, str]]:
"""
Process all supported documents in a directory concurrently.
Args:
directory_path: Path to the directory containing documents
max_workers: Maximum number of concurrent workers for processing
Returns:
List of processed document dictionaries
"""
directory_path = Path(directory_path)
if not directory_path.is_dir():
print(f"Directory not found: {directory_path}")
return []
# Find all supported files
supported_files = []
for ext in self.supported_extensions:
supported_files.extend(directory_path.rglob(f"*{ext}"))
supported_files.extend(
directory_path.rglob(f"*{ext.upper()}")
)
if not supported_files:
print(f"No supported files found in: {directory_path}")
return []
print(f"Found {len(supported_files)} files to process")
# Process files concurrently
processed_documents = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(
self.process_document, file_path
): file_path
for file_path in supported_files
}
for future in concurrent.futures.as_completed(
future_to_file
):
file_path = future_to_file[future]
try:
result = future.result()
if result:
processed_documents.append(result)
print(f"Processed: {result['file_name']}")
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
print(
f"Successfully processed {len(processed_documents)} documents"
)
return processed_documents
class QdrantRAGMemory:
"""
Enhanced Qdrant memory system for RAG operations with document storage.
This class extends the basic Qdrant memory system to handle document ingestion,
chunking, and semantic search for large document collections.
"""
def __init__(
self,
collection_name: str = "document_memories",
vector_size: int = 384, # Default size for all-MiniLM-L6-v2
url: Optional[str] = None,
api_key: Optional[str] = None,
chunk_size: int = 1000,
chunk_overlap: int = 200,
):
"""
Initialize the Qdrant RAG memory system.
Args:
collection_name: Name of the Qdrant collection to use
vector_size: Dimension of the embedding vectors
url: Optional Qdrant server URL (defaults to local)
api_key: Optional Qdrant API key for cloud deployment
chunk_size: Size of text chunks for processing
chunk_overlap: Overlap between consecutive chunks
"""
self.collection_name = collection_name
self.vector_size = vector_size
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Initialize Qdrant client
if url and api_key:
self.client = QdrantClient(url=url, api_key=api_key)
else:
self.client = QdrantClient(
":memory:"
) # Local in-memory storage
# Initialize embedding model
self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# Get the actual embedding dimension from the model
sample_text = "Sample text for dimension check"
sample_embedding = self.embedding_model.encode(sample_text)
actual_dimension = len(sample_embedding)
# Update vector_size to match the actual model dimension
if actual_dimension != self.vector_size:
print(
f"Updating vector size from {self.vector_size} to {actual_dimension} to match model"
)
self.vector_size = actual_dimension
# Create collection if it doesn't exist
self._create_collection()
def _create_collection(self):
"""Create the Qdrant collection if it doesn't exist."""
collections = self.client.get_collections().collections
exists = any(
col.name == self.collection_name for col in collections
)
if not exists:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.vector_size, distance=Distance.COSINE
),
)
print(
f"Created Qdrant collection: {self.collection_name}"
)
def _chunk_text(self, text: str) -> List[str]:
"""
Split text into overlapping chunks for better retrieval.
Args:
text: Text content to chunk
Returns:
List of text chunks
"""
# Ensure text is a string
if not isinstance(text, str):
text = str(text)
if len(text) <= self.chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
# Try to break at sentence boundaries
if end < len(text):
# Look for sentence endings
for i in range(end, max(start, end - 100), -1):
if text[i] in ".!?":
end = i + 1
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - self.chunk_overlap
if start >= len(text):
break
return chunks
def add_document(
self, document_data: Dict[str, str]
) -> List[str]:
"""
Add a document to the memory system with chunking.
Args:
document_data: Dictionary containing document information
Returns:
List of memory IDs for the stored chunks
"""
text_content = document_data["text_content"]
# Ensure text_content is a string
if not isinstance(text_content, str):
print(
f"Warning: text_content is not a string: {type(text_content)}"
)
text_content = str(text_content)
chunks = self._chunk_text(text_content)
memory_ids = []
for i, chunk in enumerate(chunks):
# Generate embedding for the chunk
embedding = self.embedding_model.encode(chunk).tolist()
# Prepare metadata
metadata = {
"document_name": document_data["file_name"],
"document_path": document_data["file_path"],
"document_type": document_data["file_type"],
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_text": chunk,
"timestamp": datetime.utcnow().isoformat(),
"file_size": document_data["file_size"],
}
# Store the chunk
memory_id = str(uuid.uuid4())
self.client.upsert(
collection_name=self.collection_name,
points=[
models.PointStruct(
id=memory_id,
payload=metadata,
vector=embedding,
)
],
)
memory_ids.append(memory_id)
print(
f"Added document '{document_data['file_name']}' with {len(chunks)} chunks"
)
return memory_ids
def add_documents_batch(
self, documents: List[Dict[str, str]]
) -> List[str]:
"""
Add multiple documents to the memory system.
Args:
documents: List of document dictionaries
Returns:
List of all memory IDs
"""
all_memory_ids = []
for document in documents:
memory_ids = self.add_document(document)
all_memory_ids.extend(memory_ids)
return all_memory_ids
def add(self, text: str, metadata: Optional[Dict] = None) -> str:
"""
Add a text entry to the memory system (required by Swarms interface).
Args:
text: The text content to add
metadata: Optional metadata for the entry
Returns:
str: ID of the stored memory
"""
if metadata is None:
metadata = {}
# Generate embedding for the text
embedding = self.embedding_model.encode(text).tolist()
# Prepare metadata
memory_metadata = {
"text": text,
"timestamp": datetime.utcnow().isoformat(),
"source": "agent_memory",
}
memory_metadata.update(metadata)
# Store the point
memory_id = str(uuid.uuid4())
self.client.upsert(
collection_name=self.collection_name,
points=[
models.PointStruct(
id=memory_id,
payload=memory_metadata,
vector=embedding,
)
],
)
return memory_id
def query(
self,
query_text: str,
limit: int = 5,
score_threshold: float = 0.7,
include_metadata: bool = True,
) -> List[Dict]:
"""
Query memories based on text similarity.
Args:
query_text: The text query to search for
limit: Maximum number of results to return
score_threshold: Minimum similarity score threshold
include_metadata: Whether to include metadata in results
Returns:
List of matching memories with their metadata
"""
try:
# Check if collection has any points
collection_info = self.client.get_collection(
self.collection_name
)
if collection_info.points_count == 0:
print(
"Warning: Collection is empty, no documents to query"
)
return []
# Generate embedding for the query
query_embedding = self.embedding_model.encode(
query_text
).tolist()
# Search in Qdrant
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=limit,
score_threshold=score_threshold,
)
memories = []
for res in results:
memory = res.payload.copy()
memory["similarity_score"] = res.score
if not include_metadata:
# Keep only essential information
memory = {
"chunk_text": memory.get("chunk_text", ""),
"document_name": memory.get(
"document_name", ""
),
"similarity_score": memory[
"similarity_score"
],
}
memories.append(memory)
return memories
except Exception as e:
print(f"Error querying collection: {e}")
return []
def get_collection_stats(self) -> Dict:
"""
Get statistics about the collection.
Returns:
Dictionary containing collection statistics
"""
try:
collection_info = self.client.get_collection(
self.collection_name
)
return {
"collection_name": self.collection_name,
"vector_size": collection_info.config.params.vectors.size,
"distance": collection_info.config.params.vectors.distance,
"points_count": collection_info.points_count,
}
except Exception as e:
print(f"Error getting collection stats: {e}")
return {}
def clear_collection(self):
"""Clear all memories from the collection."""
self.client.delete_collection(self.collection_name)
self._create_collection()
print(f"Cleared collection: {self.collection_name}")
class QuantitativeTradingRAGAgent:
"""
Advanced quantitative trading agent with RAG capabilities for document analysis.
This agent combines the structure from example.py with Qdrant RAG to provide
comprehensive financial analysis based on ingested documents.
"""
def __init__(
self,
agent_name: str = "Quantitative-Trading-RAG-Agent",
collection_name: str = "financial_documents",
qdrant_url: Optional[str] = None,
qdrant_api_key: Optional[str] = None,
model_name: str = "claude-sonnet-4-20250514",
max_loops: int = 1,
chunk_size: int = 1000,
chunk_overlap: int = 200,
):
"""
Initialize the Quantitative Trading RAG Agent.
Args:
agent_name: Name of the agent
collection_name: Name of the Qdrant collection
qdrant_url: Optional Qdrant server URL
qdrant_api_key: Optional Qdrant API key
model_name: LLM model to use
max_loops: Maximum number of agent loops
chunk_size: Size of text chunks for processing
chunk_overlap: Overlap between consecutive chunks
"""
self.agent_name = agent_name
self.collection_name = collection_name
# Initialize document processor
self.document_processor = DocumentProcessor()
# Initialize Qdrant RAG memory
self.rag_memory = QdrantRAGMemory(
collection_name=collection_name,
url=qdrant_url,
api_key=qdrant_api_key,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
# Initialize the agent with RAG capabilities
self.agent = Agent(
agent_name=agent_name,
agent_description="Advanced quantitative trading and algorithmic analysis agent with RAG capabilities",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You have access to a comprehensive document database through RAG (Retrieval-Augmented Generation).
When answering questions, you can search through this database to find relevant information
and provide evidence-based responses.
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
model_name=model_name,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_loops=max_loops,
dynamic_context_window=True,
long_term_memory=self.rag_memory,
)
def ingest_documents(
self, documents_path: Union[str, Path]
) -> int:
"""
Ingest documents from a directory into the RAG system.
Args:
documents_path: Path to directory containing documents
Returns:
Number of documents successfully ingested
"""
print(f"Starting document ingestion from: {documents_path}")
try:
# Process documents
processed_documents = (
self.document_processor.process_directory(
documents_path
)
)
if not processed_documents:
print("No documents to ingest")
return 0
# Add documents to RAG memory
memory_ids = self.rag_memory.add_documents_batch(
processed_documents
)
print(
f"Successfully ingested {len(processed_documents)} documents"
)
print(f"Created {len(memory_ids)} memory chunks")
return len(processed_documents)
except Exception as e:
print(f"Error during document ingestion: {e}")
import traceback
traceback.print_exc()
return 0
def query_documents(
self, query: str, limit: int = 5
) -> List[Dict]:
"""
Query the document database for relevant information.
Args:
query: The query text
limit: Maximum number of results to return
Returns:
List of relevant document chunks
"""
return self.rag_memory.query(query, limit=limit)
def run_analysis(self, task: str) -> str:
"""
Run a financial analysis task using the agent with RAG capabilities.
Args:
task: The analysis task to perform
Returns:
Agent's response to the task
"""
print(f"Running analysis task: {task}")
# First, query the document database for relevant context
relevant_docs = self.query_documents(task, limit=3)
if relevant_docs:
# Enhance the task with relevant document context
context = "\n\nRelevant Document Information:\n"
for i, doc in enumerate(relevant_docs, 1):
context += f"\nDocument {i}: {doc.get('document_name', 'Unknown')}\n"
context += f"Relevance Score: {doc.get('similarity_score', 0):.3f}\n"
context += (
f"Content: {doc.get('chunk_text', '')[:500]}...\n"
)
enhanced_task = f"{task}\n\n{context}"
else:
enhanced_task = task
# Run the agent
response = self.agent.run(enhanced_task)
return response
def get_database_stats(self) -> Dict:
"""
Get statistics about the document database.
Returns:
Dictionary containing database statistics
"""
return self.rag_memory.get_collection_stats()
def main():
"""
Main function demonstrating the Qdrant RAG agent with document ingestion.
"""
from datetime import datetime
# Example usage
print("🚀 Initializing Quantitative Trading RAG Agent...")
# Initialize the agent (you can set environment variables for Qdrant cloud)
agent = QuantitativeTradingRAGAgent(
agent_name="Quantitative-Trading-RAG-Agent",
collection_name="financial_documents",
qdrant_url=os.getenv(
"QDRANT_URL"
), # Optional: For cloud deployment
qdrant_api_key=os.getenv(
"QDRANT_API_KEY"
), # Optional: For cloud deployment
model_name="claude-sonnet-4-20250514",
max_loops=1,
chunk_size=1000,
chunk_overlap=200,
)
# Example: Ingest documents from a directory
documents_path = "documents" # Path to your documents
if os.path.exists(documents_path):
print(f"Found documents directory: {documents_path}")
try:
agent.ingest_documents(documents_path)
except Exception as e:
print(f"Error ingesting documents: {e}")
print("Continuing without document ingestion...")
else:
print(f"Documents directory not found: {documents_path}")
print("Creating a sample document for demonstration...")
# Create a sample document
try:
sample_doc = {
"file_path": "sample_financial_analysis.txt",
"file_name": "sample_financial_analysis.txt",
"file_type": ".txt",
"text_content": """
Gold ETFs: A Comprehensive Investment Guide
Gold ETFs (Exchange-Traded Funds) provide investors with exposure to gold prices
without the need to physically store the precious metal. These funds track the
price of gold and offer several advantages including liquidity, diversification,
and ease of trading.
Top Gold ETFs include:
1. SPDR Gold Shares (GLD) - Largest gold ETF with high liquidity
2. iShares Gold Trust (IAU) - Lower expense ratio alternative
3. Aberdeen Standard Physical Gold ETF (SGOL) - Swiss storage option
Investment strategies for gold ETFs:
- Portfolio diversification (5-10% allocation)
- Inflation hedge
- Safe haven during market volatility
- Tactical trading opportunities
Market analysis shows that gold has historically served as a store of value
and hedge against inflation. Recent market conditions have increased interest
in gold investments due to economic uncertainty and geopolitical tensions.
""",
"file_size": 1024,
"processed_at": datetime.utcnow().isoformat(),
}
# Add the sample document to the RAG memory
memory_ids = agent.rag_memory.add_document(sample_doc)
print(
f"Added sample document with {len(memory_ids)} chunks"
)
except Exception as e:
print(f"Error creating sample document: {e}")
print("Continuing without sample document...")
# Example: Query the database
print("\n📊 Querying document database...")
try:
query_results = agent.query_documents(
"gold ETFs investment strategies", limit=3
)
print(f"Found {len(query_results)} relevant document chunks")
if query_results:
print("Sample results:")
for i, result in enumerate(query_results[:2], 1):
print(
f" {i}. {result.get('document_name', 'Unknown')} (Score: {result.get('similarity_score', 0):.3f})"
)
else:
print(
"No documents found in database. This is expected if no documents were ingested."
)
except Exception as e:
print(f"❌ Query failed: {e}")
# Example: Run financial analysis
print("\n💹 Running financial analysis...")
analysis_task = "What are the best top 3 ETFs for gold coverage and what are their key characteristics?"
try:
response = agent.run_analysis(analysis_task)
print("\n📈 Analysis Results:")
print(response)
except Exception as e:
print(f"❌ Analysis failed: {e}")
print("This might be due to API key or model access issues.")
print("Continuing with database statistics...")
# Try a simpler query that doesn't require the LLM
print("\n🔍 Trying simple document query instead...")
try:
simple_results = agent.query_documents(
"what do you see in the document?", limit=2
)
if simple_results:
print("Simple query results:")
for i, result in enumerate(simple_results, 1):
print(
f" {i}. {result.get('document_name', 'Unknown')}"
)
print(
f" Content preview: {result.get('chunk_text', '')[:100]}..."
)
else:
print("No results from simple query")
except Exception as simple_error:
print(f"Simple query also failed: {simple_error}")
# Get database statistics
print("\n📊 Database Statistics:")
try:
stats = agent.get_database_stats()
for key, value in stats.items():
print(f" {key}: {value}")
except Exception as e:
print(f"❌ Failed to get database statistics: {e}")
print("\n✅ Example completed successfully!")
print("💡 To test with your own documents:")
print(" 1. Create a 'documents' directory")
print(" 2. Add PDF, TXT, or MD files")
print(" 3. Run the script again")
if __name__ == "__main__":
main()