Session 3: Vector Databases & Search Optimization¶
Learning Outcomes¶
By the end of this session, you will be able to: - Deploy multiple vector database architectures and select optimal solutions for specific use cases - Implement hybrid search strategies that combine semantic similarity with keyword matching - Optimize vector indices for performance, scalability, and cost efficiency in production environments - Design advanced retrieval pipelines with multi-stage processing and intelligent reranking - Evaluate search quality using comprehensive metrics and domain-specific performance indicators
Chapter Overview¶
What You'll Learn: Enterprise Vector Search Infrastructure¶
Vector databases are the high-performance engines that transform RAG from concept to reality. Unlike simple storage systems, enterprise vector databases handle millions of vectors with sub-second search times while supporting complex filtering, hybrid search strategies, and real-time updates at scale.
Why It Matters: The Heart of Production RAG Systems¶
Modern RAG applications require sophisticated search infrastructure that goes far beyond basic similarity matching. Enterprise systems must handle diverse query patterns, scale to massive document collections, and maintain consistent performance under varying loads while preserving search accuracy.
How It Stands Out: 2025 Vector Database Landscape¶
Based on current enterprise deployments, the vector database ecosystem in 2025 features: - HNSW algorithm dominance: 3x better performance than IVF with superior accuracy - Hybrid search integration: Combining semantic and lexical search for 15-25% better precision - Dynamic data handling: Real-time updates without complete index rebuilds - Enterprise optimization: Composite indexes with quantization for memory efficiency
Where You'll Apply It: Critical Use Cases¶
- Customer Support: Real-time similarity search across millions of support documents
- Legal Discovery: Complex filtering with exact term matching for compliance requirements
- Medical Research: High-precision semantic search in scientific literature databases
- E-commerce: Product recommendation systems with multi-modal search capabilities
Figure 1: This diagram shows how vector databases serve as the central search engine in RAG architectures, handling both semantic similarity and hybrid search patterns that enable sophisticated information retrieval.
Learning Path Options¶
Observer Path (35 minutes): Understand vector database concepts and architectures - Focus: Core concepts with clear examples of indexing algorithms and search strategies - Best for: Getting oriented with vector search fundamentals and trade-offs
🙋♂️ Participant Path (70 minutes): Implement hybrid search systems - Focus: Hands-on setup of multiple vector databases and hybrid search implementation - Best for: Building practical search infrastructure with real-world optimization
🛠️ Implementer Path (120 minutes): Advanced optimization and enterprise deployment - Focus: Production-grade performance tuning and multi-database architecture - Best for: Deep technical mastery with enterprise-scale optimization patterns
Part 1: Vector Database Architecture (Observer: 10 min | Participant: 20 min)¶
Understanding Vector Database Design Principles¶
Vector databases solve the fundamental challenge of semantic search: finding similar content in high-dimensional spaces efficiently. Unlike traditional databases that match exact values, vector databases calculate similarity between numerical representations of meaning.
The Core Challenge: Similarity at Scale¶
Consider searching through 1 million documents for "machine learning techniques." A naive approach would: 1. Calculate similarity between your query vector and each document vector 2. Sort all 1 million results by similarity score 3. Return the top matches
This approach requires 1 million similarity calculations per query - far too slow for production use.
Vector Database Interface (Observer Focus)¶
Here's a simple interface that shows the essential operations every vector database must support:
# Simple vector database interface
from typing import List, Dict, Any
class VectorDatabaseInterface:
"""Essential operations for vector similarity search."""
def __init__(self, dimension: int, metric: str = "cosine"):
self.dimension = dimension # Vector size (e.g., 1536 for OpenAI embeddings)
self.metric = metric # cosine, euclidean, or dot_product
def add_vectors(self, vectors: List[List[float]],
metadata: List[Dict], ids: List[str]):
"""Store vectors with associated metadata and unique IDs."""
pass
def search(self, query_vector: List[float],
top_k: int = 10, filters: Dict = None):
"""Find most similar vectors with optional metadata filtering."""
pass
def update_vector(self, vector_id: str,
new_vector: List[float], new_metadata: Dict):
"""Update existing vector and its metadata."""
pass
Key Design Decisions: - Cosine similarity: Best for text embeddings because it handles document length naturally - Metadata storage: Enables filtering by document type, date, or user permissions - Batch operations: Essential for efficient data loading and updates
PARTICIPANT PATH: Production Vector Database Setup¶
Let's implement a production-ready vector database system using ChromaDB:
import chromadb
from chromadb.config import Settings
import numpy as np
from typing import List, Dict, Optional
class ProductionVectorStore:
"""Production-ready ChromaDB implementation with optimization."""
def __init__(self, persist_directory: str, collection_name: str):
self.persist_directory = persist_directory
self.collection_name = collection_name
# Initialize client with production settings
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(
allow_reset=False, # Production safety
anonymized_telemetry=False # Avoid external dependencies
)
)
# Create optimized collection
self.collection = self._initialize_collection()
def _initialize_collection(self):
"""Initialize collection with optimized HNSW parameters."""
try:
# Try to load existing collection
collection = self.client.get_collection(self.collection_name)
print(f"Loaded existing collection: {self.collection_name}")
except ValueError:
# Create new collection with HNSW optimization
collection = self.client.create_collection(
name=self.collection_name,
metadata={
"hnsw:space": "cosine",
"hnsw:construction_ef": 200, # Build-time accuracy
"hnsw:M": 16, # Node connectivity
"hnsw:search_ef": 100 # Query-time speed/accuracy
}
)
print(f"Created optimized collection: {self.collection_name}")
return collection
def add_documents_batch(self, documents: List[str],
embeddings: List[List[float]],
metadata: List[Dict],
ids: List[str],
batch_size: int = 1000):
"""Add documents in optimized batches."""
total_docs = len(documents)
for i in range(0, total_docs, batch_size):
batch_end = min(i + batch_size, total_docs)
self.collection.add(
documents=documents[i:batch_end],
embeddings=embeddings[i:batch_end],
metadatas=metadata[i:batch_end],
ids=ids[i:batch_end]
)
print(f"Added batch {i//batch_size + 1} "
f"({batch_end - i} documents)")
def similarity_search(self, query: str, top_k: int = 10,
filters: Optional[Dict] = None):
"""Perform optimized similarity search."""
results = self.collection.query(
query_texts=[query],
n_results=top_k,
where=filters # Metadata filtering
)
return self._format_results(results)
def _format_results(self, raw_results):
"""Format ChromaDB results for consistent interface."""
formatted = []
for i, doc in enumerate(raw_results['documents'][0]):
result = {
'content': doc,
'metadata': raw_results['metadatas'][0][i],
'similarity_score': raw_results['distances'][0][i],
'id': raw_results['ids'][0][i]
}
formatted.append(result)
return formatted
ChromaDB vs Enterprise Alternatives:
Database | Best For | Strengths | Limitations |
---|---|---|---|
ChromaDB | Development, moderate scale | Simple setup, good performance to 1M vectors | Single-node, memory constraints |
Pinecone | Enterprise, high availability | Managed scaling, global distribution | Usage-based pricing, vendor lock-in |
Qdrant | High performance, complex filtering | Excellent filtering, self-hosted control | More complex setup |
Weaviate | Multi-modal search | Built-in ML capabilities | Resource intensive |
IMPLEMENTER PATH: Multi-Database Architecture¶
For enterprise applications, implement a strategy pattern that can switch between vector databases:
from abc import ABC, abstractmethod
import time
class VectorDatabaseStrategy(ABC):
"""Abstract strategy for vector database implementations."""
@abstractmethod
def add_vectors(self, vectors, metadata, ids):
pass
@abstractmethod
def search(self, query_vector, top_k, filters):
pass
@abstractmethod
def get_performance_metrics(self):
pass
class EnterpriseVectorManager:
"""Multi-database vector manager with intelligent routing."""
def __init__(self):
self.databases = {}
self.performance_history = {}
self.default_database = None
def register_database(self, name: str, database: VectorDatabaseStrategy,
is_default: bool = False):
"""Register a vector database implementation."""
self.databases[name] = database
self.performance_history[name] = []
if is_default or not self.default_database:
self.default_database = name
def intelligent_search(self, query_vector: List[float],
top_k: int = 10,
performance_priority: str = "balanced"):
"""Route search to optimal database based on requirements."""
# Select database based on performance requirements
if performance_priority == "speed":
database_name = self._select_fastest_database()
elif performance_priority == "accuracy":
database_name = self._select_most_accurate_database()
else:
database_name = self.default_database
# Execute search with performance tracking
start_time = time.time()
results = self.databases[database_name].search(
query_vector, top_k, None
)
search_time = time.time() - start_time
# Update performance history
self.performance_history[database_name].append({
'search_time': search_time,
'result_count': len(results),
'timestamp': time.time()
})
return {
'results': results,
'database_used': database_name,
'search_time': search_time
}
def _select_fastest_database(self):
"""Select database with best average performance."""
best_db = self.default_database
best_time = float('inf')
for db_name, history in self.performance_history.items():
if history:
avg_time = sum(h['search_time'] for h in history[-10:]) / min(len(history), 10)
if avg_time < best_time:
best_time = avg_time
best_db = db_name
return best_db
Part 2: HNSW vs IVF Index Optimization (Observer: 8 min | Participant: 15 min)¶
Understanding Index Algorithm Trade-offs¶
The choice between HNSW and IVF indexing algorithms represents one of the most critical decisions in vector database architecture. Each embodies a different philosophy for organizing high-dimensional search spaces.
Index Algorithm Comparison (Observer Focus)¶
HNSW (Hierarchical Navigable Small World):
- Philosophy: Navigate through similarity space like a GPS system
- Performance: 3x faster than IVF with better accuracy
- Memory: Higher usage but consistent performance
- Best for: Real-time applications requiring <100ms latency
IVF (Inverted File):
- Philosophy: Divide and conquer through intelligent clustering
- Performance: Good balance of speed and memory efficiency
- Memory: Lower usage, better for resource-constrained environments
- Best for: Large datasets where memory is a constraint
Here's a simple comparison of their characteristics:
# Index performance comparison
index_comparison = {
"HNSW": {
"query_latency": "0.1-1ms",
"memory_usage": "High",
"build_time": "Medium",
"recall_at_10": "95-99%",
"best_for": "Real-time applications"
},
"IVF": {
"query_latency": "1-10ms",
"memory_usage": "Medium",
"build_time": "Fast",
"recall_at_10": "85-95%",
"best_for": "Large-scale, memory-constrained"
}
}
def recommend_index(dataset_size, memory_limit, latency_requirement):
"""Simple index recommendation logic."""
if latency_requirement < 100 and memory_limit > 8:
return "HNSW"
elif dataset_size > 10_000_000 or memory_limit < 4:
return "IVF"
else:
return "HNSW" # Default for balanced requirements
PARTICIPANT PATH: HNSW Index Implementation¶
Let's implement an optimized HNSW index using FAISS:
import faiss
import numpy as np
from typing import List, Dict, Any
class OptimizedHNSWIndex:
"""Production HNSW implementation with intelligent parameter selection."""
def __init__(self, dimension: int, performance_target: str = "balanced"):
self.dimension = dimension
self.performance_target = performance_target
self.index = None
self.id_mapping = {}
# Parameter selection based on target
if performance_target == "speed":
self.M = 16 # Lower connectivity for speed
self.ef_construction = 128 # Faster construction
self.ef_search = 64 # Faster queries
elif performance_target == "accuracy":
self.M = 64 # High connectivity for recall
self.ef_construction = 512 # Thorough construction
self.ef_search = 256 # High-accuracy searches
else: # balanced
self.M = 32 # Balanced connectivity
self.ef_construction = 200 # Good graph quality
self.ef_search = 128 # Balanced search
def build_index(self, vectors: np.ndarray, external_ids: List[str]):
"""Build optimized HNSW index."""
print(f"Building HNSW index with M={self.M}, "
f"ef_construction={self.ef_construction}")
# Create HNSW index
self.index = faiss.IndexHNSWFlat(self.dimension, self.M)
self.index.hnsw.efConstruction = self.ef_construction
# Build the graph
print("Building HNSW graph structure...")
self.index.add(vectors)
# Set search parameter
self.index.hnsw.efSearch = self.ef_search
# Store ID mapping
for i, external_id in enumerate(external_ids):
self.id_mapping[i] = external_id
# Calculate memory usage
memory_per_vector = self.dimension * 4 + self.M * 4
total_memory_mb = (len(vectors) * memory_per_vector) / (1024**2)
print(f"HNSW index ready: {len(vectors):,} vectors, "
f"~{total_memory_mb:.1f}MB memory")
def search(self, query_vector: np.ndarray, top_k: int = 10):
"""Search with current ef_search parameter."""
if self.index is None:
raise ValueError("Index not built yet")
# Ensure query is 2D array
if query_vector.ndim == 1:
query_vector = query_vector.reshape(1, -1)
# Perform search
distances, indices = self.index.search(query_vector, top_k)
# Format results
results = []
for i, (distance, idx) in enumerate(zip(distances[0], indices[0])):
if idx != -1: # Valid result
results.append({
'id': self.id_mapping.get(idx, str(idx)),
'distance': float(distance),
'similarity': 1 / (1 + distance) # Convert to similarity
})
return results
def tune_search_quality(self, ef_search: int):
"""Dynamically adjust search quality vs speed."""
if self.index:
self.index.hnsw.efSearch = ef_search
print(f"Updated ef_search to {ef_search}")
HNSW Parameter Impact: - M (connectivity): Higher values improve recall but increase memory usage - ef_construction: Controls build quality - higher values create better graphs - ef_search: Runtime parameter for speed/accuracy trade-off
IMPLEMENTER PATH: Intelligent Index Selection¶
For production systems, implement automatic index selection based on data characteristics:
class IntelligentIndexSelector:
"""Automatically select optimal indexing strategy."""
def __init__(self):
self.performance_profiles = {
"small_dataset": {"max_vectors": 50000, "index": "Flat"},
"medium_fast": {"max_vectors": 1000000, "index": "HNSW", "target": "speed"},
"medium_accurate": {"max_vectors": 1000000, "index": "HNSW", "target": "accuracy"},
"large_memory": {"max_vectors": float('inf'), "index": "IVF_PQ"},
"large_speed": {"max_vectors": float('inf'), "index": "HNSW", "M": 16}
}
def select_optimal_index(self, dataset_info: Dict) -> Dict:
"""Select best index configuration for dataset."""
n_vectors = dataset_info.get('vector_count', 0)
memory_limit_gb = dataset_info.get('memory_limit_gb', 8)
latency_requirement_ms = dataset_info.get('max_latency_ms', 100)
accuracy_requirement = dataset_info.get('min_recall', 0.9)
# Small dataset: use exact search
if n_vectors < 50000:
return {"algorithm": "Flat", "rationale": "Small dataset, exact search optimal"}
# Memory-constrained or very large
memory_usage_gb = n_vectors * dataset_info.get('dimension', 1536) * 4 / (1024**3)
if memory_usage_gb > memory_limit_gb or n_vectors > 10000000:
return {
"algorithm": "IVF_PQ",
"centroids": int(n_vectors * 0.08),
"pq_segments": 16,
"rationale": "Memory constraints or large scale require compression"
}
# High accuracy requirement
if accuracy_requirement > 0.95:
return {
"algorithm": "HNSW",
"M": 64,
"ef_construction": 512,
"ef_search": 256,
"rationale": "High accuracy requirement favors HNSW with high parameters"
}
# Speed priority
if latency_requirement_ms < 50:
return {
"algorithm": "HNSW",
"M": 16,
"ef_construction": 128,
"ef_search": 64,
"rationale": "Ultra-low latency requirement"
}
# Balanced default
return {
"algorithm": "HNSW",
"M": 32,
"ef_construction": 200,
"ef_search": 128,
"rationale": "Balanced performance for typical RAG workload"
}
Part 3: Hybrid Search Implementation (Observer: 10 min | Participant: 20 min)¶
Combining Semantic and Lexical Search¶
Hybrid search addresses a fundamental limitation of pure semantic search: the semantic gap between how users phrase questions and how documents express answers. By combining vector similarity with keyword matching, we achieve 15-25% better precision.
The Hybrid Search Philosophy (Observer Focus)¶
Consider this example: - User Query: "What's the company's policy on remote work?" - Document Text: "Employees may work from home up to 3 days per week..."
Pure semantic search might miss this match because "remote work" and "work from home" are semantically similar but lexically different. Hybrid search catches both patterns.
Here's a simple hybrid search approach:
# Simple hybrid search concept
def simple_hybrid_search(query, vector_store, documents, top_k=10):
"""Combine semantic and keyword search results."""
# Semantic search
semantic_results = vector_store.similarity_search(query, k=top_k*2)
# Keyword search (simplified)
keyword_results = []
query_words = query.lower().split()
for i, doc in enumerate(documents):
score = sum(1 for word in query_words if word in doc.lower())
if score > 0:
keyword_results.append({
'document': doc,
'keyword_score': score / len(query_words),
'index': i
})
# Simple combination: average the scores
combined_results = []
for semantic_result in semantic_results:
# Find corresponding keyword score
keyword_score = 0
for kw_result in keyword_results:
if kw_result['document'] == semantic_result.page_content:
keyword_score = kw_result['keyword_score']
break
combined_score = (semantic_result.similarity + keyword_score) / 2
combined_results.append({
'document': semantic_result,
'combined_score': combined_score
})
# Sort by combined score
combined_results.sort(key=lambda x: x['combined_score'], reverse=True)
return combined_results[:top_k]
PARTICIPANT PATH: Production Hybrid Search Engine¶
Let's implement a sophisticated hybrid search system using BM25 and Reciprocal Rank Fusion:
import re
import numpy as np
from collections import Counter
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import List, Dict, Tuple
class ProductionHybridSearch:
"""Production hybrid search with BM25 and RRF fusion."""
def __init__(self, vector_store, documents: List[str]):
self.vector_store = vector_store
self.documents = documents
# Initialize TF-IDF for BM25 calculation
self.tfidf_vectorizer = TfidfVectorizer(
max_features=10000,
stop_words='english',
ngram_range=(1, 2), # Include bigrams
lowercase=True
)
# Fit on document corpus
self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(documents)
print(f"Built TF-IDF index for {len(documents)} documents")
def hybrid_search(self, query: str, top_k: int = 10,
semantic_weight: float = 0.7) -> List[Dict]:
"""Execute hybrid search with RRF fusion."""
# Step 1: Semantic search
semantic_results = self.vector_store.similarity_search(
query, k=min(top_k * 3, 50) # Get more for reranking
)
# Step 2: BM25 lexical search
bm25_scores = self._compute_bm25_scores(query)
# Step 3: Reciprocal Rank Fusion
fused_results = self._reciprocal_rank_fusion(
semantic_results, bm25_scores, k=60
)
return fused_results[:top_k]
def _compute_bm25_scores(self, query: str, k1: float = 1.2,
b: float = 0.75) -> np.ndarray:
"""Compute BM25 scores for all documents."""
# Tokenize query
query_tokens = self.tfidf_vectorizer.build_analyzer()(query.lower())
# Document statistics
doc_lengths = np.array([len(doc.split()) for doc in self.documents])
avg_doc_length = np.mean(doc_lengths)
scores = np.zeros(len(self.documents))
# Process each query term
for token in query_tokens:
if token in self.tfidf_vectorizer.vocabulary_:
term_idx = self.tfidf_vectorizer.vocabulary_[token]
# Get term frequencies
tf_scores = self.tfidf_matrix[:, term_idx].toarray().flatten()
tf = tf_scores * len(self.documents)
# Calculate BM25 components
df = np.sum(tf > 0) # Document frequency
if df > 0:
# IDF calculation
idf = np.log((len(self.documents) - df + 0.5) / (df + 0.5))
# BM25 formula
numerator = tf * (k1 + 1)
denominator = tf + k1 * (1 - b + b * doc_lengths / avg_doc_length)
scores += idf * (numerator / denominator)
return scores
def _reciprocal_rank_fusion(self, semantic_results: List,
bm25_scores: np.ndarray, k: int = 60) -> List[Dict]:
"""Fuse semantic and lexical results using RRF."""
doc_scores = {}
# Add semantic scores (convert to RRF)
for rank, result in enumerate(semantic_results):
doc_id = result.metadata.get('id', rank)
doc_scores[doc_id] = {
'document': result,
'semantic_rrf': 1 / (k + rank + 1),
'lexical_rrf': 0
}
# Add BM25 scores (convert to RRF)
bm25_rankings = np.argsort(-bm25_scores) # Descending order
for rank, doc_idx in enumerate(bm25_rankings[:len(semantic_results)]):
doc_id = doc_idx
if doc_id in doc_scores:
doc_scores[doc_id]['lexical_rrf'] = 1 / (k + rank + 1)
else:
# Create entry for lexical-only results
doc_scores[doc_id] = {
'document': self.documents[doc_idx],
'semantic_rrf': 0,
'lexical_rrf': 1 / (k + rank + 1)
}
# Calculate final RRF scores
for doc_id in doc_scores:
semantic_rrf = doc_scores[doc_id]['semantic_rrf']
lexical_rrf = doc_scores[doc_id]['lexical_rrf']
doc_scores[doc_id]['final_score'] = semantic_rrf + lexical_rrf
# Sort by final score
sorted_results = sorted(
doc_scores.values(),
key=lambda x: x['final_score'],
reverse=True
)
return sorted_results
Why RRF Outperforms Score Fusion: - No normalization needed: RRF works with rankings, not raw scores - Robust to outliers: Extreme scores don't dominate the fusion - Mathematically principled: Based on probability theory for rank aggregation
IMPLEMENTER PATH: Advanced Query Enhancement¶
For enterprise applications, implement query enhancement that improves hybrid search effectiveness:
class QueryEnhancementEngine:
"""Advanced query enhancement for improved hybrid search."""
def __init__(self, llm_model):
self.llm_model = llm_model
self.enhancement_strategies = [
'synonym_expansion',
'question_decomposition',
'hypothetical_document_generation'
]
async def enhance_query(self, query: str, strategy: str = "comprehensive") -> Dict:
"""Generate enhanced queries for comprehensive search."""
enhanced_queries = {
'original': query,
'variants': []
}
if strategy in ["comprehensive", "synonym_expansion"]:
expanded = await self._expand_with_synonyms(query)
enhanced_queries['variants'].append({
'type': 'synonym_expanded',
'query': expanded,
'weight': 0.8
})
if strategy in ["comprehensive", "question_decomposition"]:
sub_queries = await self._decompose_question(query)
for i, sub_q in enumerate(sub_queries):
enhanced_queries['variants'].append({
'type': 'sub_query',
'query': sub_q,
'weight': 0.6,
'index': i
})
if strategy in ["comprehensive", "hypothetical_document"]:
hyde_doc = await self._generate_hypothetical_document(query)
enhanced_queries['variants'].append({
'type': 'hypothetical_document',
'query': hyde_doc,
'weight': 0.9
})
return enhanced_queries
async def _expand_with_synonyms(self, query: str) -> str:
"""Expand query with relevant synonyms."""
expansion_prompt = f"""
Expand this search query by adding relevant synonyms and related terms.
Keep the expansion focused and avoid redundancy.
Original query: {query}
Expanded query with synonyms:
"""
response = await self.llm_model.apredict(expansion_prompt)
return response.strip()
async def _generate_hypothetical_document(self, query: str) -> str:
"""Generate hypothetical document that would answer the query."""
hyde_prompt = f"""
Write a brief, informative paragraph that would likely appear in a document
that answers this question. Use the style and terminology typical of
authoritative sources.
Question: {query}
Hypothetical document excerpt:
"""
response = await self.llm_model.apredict(hyde_prompt)
return response.strip()
Part 4: Performance Optimization & Evaluation (Observer: 7 min | Participant: 15 min)¶
Search Performance Optimization Strategies¶
Production vector search requires multiple optimization layers: caching frequent queries, batch processing for efficiency, and intelligent prefetching based on usage patterns.
Basic Performance Optimization (Observer Focus)¶
Here are the key optimization strategies that provide the most impact:
# Essential performance optimizations
from functools import lru_cache
import hashlib
import time
class OptimizedSearchEngine:
"""Search engine with essential performance optimizations."""
def __init__(self, vector_store, cache_size: int = 1000):
self.vector_store = vector_store
self.query_cache = {}
self.cache_size = cache_size
self.performance_stats = {
'cache_hits': 0,
'cache_misses': 0,
'total_searches': 0,
'avg_search_time': 0
}
def optimized_search(self, query: str, top_k: int = 10,
use_cache: bool = True) -> Dict:
"""Search with caching and performance tracking."""
# Create cache key
cache_key = hashlib.md5(f"{query}_{top_k}".encode()).hexdigest()
# Check cache first
if use_cache and cache_key in self.query_cache:
self.performance_stats['cache_hits'] += 1
return self.query_cache[cache_key]
# Perform search
start_time = time.time()
results = self.vector_store.similarity_search(query, k=top_k)
search_time = time.time() - start_time
# Format response
response = {
'results': results,
'search_time': search_time,
'cached': False
}
# Cache result
if use_cache and len(self.query_cache) < self.cache_size:
self.query_cache[cache_key] = response
# Update stats
self.performance_stats['cache_misses'] += 1
self.performance_stats['total_searches'] += 1
self._update_avg_search_time(search_time)
return response
def get_cache_hit_rate(self) -> float:
"""Calculate current cache hit rate."""
total = self.performance_stats['cache_hits'] + self.performance_stats['cache_misses']
if total == 0:
return 0.0
return self.performance_stats['cache_hits'] / total
Performance Impact of Optimizations: - Query caching: 70-80% hit rate for common queries saves significant compute - Batch processing: 3-5x improvement for bulk operations - Index optimization: HNSW tuning can improve speed by 2-3x
PARTICIPANT PATH: Comprehensive Performance Monitoring¶
Implement detailed performance monitoring for production systems:
import asyncio
import concurrent.futures
from dataclasses import dataclass
from typing import List, Dict, Any
import statistics
@dataclass
class SearchMetrics:
"""Container for search performance metrics."""
query_latency_p50: float
query_latency_p95: float
query_latency_p99: float
cache_hit_rate: float
error_rate: float
throughput_qps: float
class ProductionSearchMonitor:
"""Comprehensive search performance monitoring."""
def __init__(self, search_engine):
self.search_engine = search_engine
self.metrics_history = []
self.current_window = []
self.window_size = 1000 # Number of queries to track
async def monitored_search(self, query: str, **kwargs) -> Dict:
"""Execute search with comprehensive monitoring."""
start_time = time.time()
error_occurred = False
try:
# Execute search
result = await asyncio.to_thread(
self.search_engine.optimized_search,
query, **kwargs
)
except Exception as e:
error_occurred = True
result = {'error': str(e), 'results': []}
# Record metrics
end_time = time.time()
search_metrics = {
'query': query,
'latency': end_time - start_time,
'timestamp': end_time,
'error': error_occurred,
'cached': result.get('cached', False),
'result_count': len(result.get('results', []))
}
self._record_metrics(search_metrics)
return result
def _record_metrics(self, metrics: Dict):
"""Record metrics in sliding window."""
self.current_window.append(metrics)
# Maintain window size
if len(self.current_window) > self.window_size:
self.current_window.pop(0)
def get_current_metrics(self) -> SearchMetrics:
"""Calculate current performance metrics."""
if not self.current_window:
return SearchMetrics(0, 0, 0, 0, 0, 0)
# Extract latencies
latencies = [m['latency'] for m in self.current_window if not m['error']]
if not latencies:
return SearchMetrics(0, 0, 0, 0, 1.0, 0)
# Calculate percentiles
latencies.sort()
p50 = statistics.median(latencies)
p95 = latencies[int(len(latencies) * 0.95)] if len(latencies) > 1 else latencies[0]
p99 = latencies[int(len(latencies) * 0.99)] if len(latencies) > 1 else latencies[0]
# Calculate other metrics
cache_hits = sum(1 for m in self.current_window if m['cached'])
cache_hit_rate = cache_hits / len(self.current_window)
errors = sum(1 for m in self.current_window if m['error'])
error_rate = errors / len(self.current_window)
# Calculate throughput (queries per second)
time_span = self.current_window[-1]['timestamp'] - self.current_window[0]['timestamp']
throughput = len(self.current_window) / time_span if time_span > 0 else 0
return SearchMetrics(
query_latency_p50=p50,
query_latency_p95=p95,
query_latency_p99=p99,
cache_hit_rate=cache_hit_rate,
error_rate=error_rate,
throughput_qps=throughput
)
async def performance_benchmark(self, test_queries: List[str],
concurrent_requests: int = 10) -> Dict:
"""Run comprehensive performance benchmark."""
print(f"Running benchmark with {len(test_queries)} queries, "
f"{concurrent_requests} concurrent requests")
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(concurrent_requests)
async def bounded_search(query):
async with semaphore:
return await self.monitored_search(query)
# Execute all queries concurrently
start_time = time.time()
tasks = [bounded_search(query) for query in test_queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
# Analyze results
successful_searches = [r for r in results if not isinstance(r, Exception)]
failed_searches = [r for r in results if isinstance(r, Exception)]
metrics = self.get_current_metrics()
return {
'total_queries': len(test_queries),
'successful_queries': len(successful_searches),
'failed_queries': len(failed_searches),
'total_time_seconds': total_time,
'average_qps': len(test_queries) / total_time,
'performance_metrics': metrics,
'concurrency_level': concurrent_requests
}
IMPLEMENTER PATH: Advanced Performance Tuning¶
For enterprise deployments, implement adaptive performance tuning:
class AdaptivePerformanceTuner:
"""Automatically tune search parameters based on performance metrics."""
def __init__(self, search_engine, monitor):
self.search_engine = search_engine
self.monitor = monitor
self.tuning_history = []
self.current_config = {
'cache_size': 1000,
'ef_search': 128, # For HNSW
'timeout_ms': 1000
}
async def adaptive_tuning_cycle(self):
"""Run one cycle of adaptive performance tuning."""
# Get current performance
current_metrics = self.monitor.get_current_metrics()
# Determine if tuning is needed
tuning_needed = self._should_tune(current_metrics)
if tuning_needed:
# Try parameter adjustments
new_config = self._generate_tuning_candidate(current_metrics)
# Test new configuration
test_metrics = await self._test_configuration(new_config)
# Apply if improvement found
if self._is_improvement(current_metrics, test_metrics):
self._apply_configuration(new_config)
print(f"Applied performance tuning: {new_config}")
# Record tuning attempt
self.tuning_history.append({
'timestamp': time.time(),
'old_config': self.current_config.copy(),
'new_config': new_config,
'old_metrics': current_metrics,
'new_metrics': test_metrics,
'applied': self._is_improvement(current_metrics, test_metrics)
})
def _should_tune(self, metrics: SearchMetrics) -> bool:
"""Determine if performance tuning is warranted."""
# Tune if latency is high or cache hit rate is low
return (metrics.query_latency_p95 > 200 or # >200ms p95 latency
metrics.cache_hit_rate < 0.6 or # <60% cache hit rate
metrics.error_rate > 0.05) # >5% error rate
def _generate_tuning_candidate(self, metrics: SearchMetrics) -> Dict:
"""Generate candidate configuration for testing."""
new_config = self.current_config.copy()
# Adjust based on observed issues
if metrics.query_latency_p95 > 200:
# High latency - try faster search parameters
new_config['ef_search'] = max(32, new_config['ef_search'] - 32)
if metrics.cache_hit_rate < 0.6:
# Low cache hit rate - increase cache size
new_config['cache_size'] = min(5000, new_config['cache_size'] * 1.5)
if metrics.error_rate > 0.05:
# High error rate - increase timeout
new_config['timeout_ms'] = min(5000, new_config['timeout_ms'] * 1.2)
return new_config
Optional Deep-Dive Modules¶
⚠️ OPTIONAL CONTENT - Choose based on your goals:
- Module A: Advanced Index Algorithms - Deep dive into FAISS, quantization, and custom indexing strategies
Multiple Choice Test - Session 3¶
Test your understanding of vector databases and search optimization:
Question 1: Which similarity metric is most suitable for RAG applications using text embeddings?
A) Euclidean distance
B) Manhattan distance
C) Cosine similarity
D) Hamming distance
Question 2: What is the primary advantage of HNSW indexing over IVF indexing?
A) Lower memory usage
B) Better compression ratios
C) Faster query performance with high recall
D) Simpler configuration
Question 3: In Reciprocal Rank Fusion (RRF), what does the 'k' parameter control?
A) Number of results to return
B) Weight balance between semantic and lexical scores
C) The smoothing factor in rank combination
D) Maximum number of query variants
Question 4: What is the key benefit of cross-encoder reranking compared to bi-encoder similarity?
A) Faster inference speed
B) Lower computational requirements
C) Joint processing of query-document pairs for better accuracy
D) Simpler model architecture
Question 5: When should you choose IVF indexing over HNSW for vector search?
A) When you need the fastest possible queries
B) When you have limited memory and large datasets
C) When accuracy is more important than speed
D) When you need real-time updates
Question 6: What is the purpose of the 'ef_construction' parameter in HNSW?
A) Controls memory usage during search
B) Determines the number of connections per node
C) Sets the dynamic candidate list size during index building
D) Defines the maximum number of layers
Question 7: In hybrid search, what does BM25 provide that semantic search lacks?
A) Better understanding of context
B) Exact term matching and frequency analysis
C) Handling of synonyms and related concepts
D) Multi-language support
Question 8: Why is query caching particularly effective in RAG systems?
A) Vector embeddings are expensive to compute
B) Users often ask similar or repeated questions
C) Database queries are the main bottleneck
D) All of the above
Navigation¶
Previous: Session 2 - Advanced Chunking & Preprocessing
Optional Deep Dive Modules:
- Module A: Advanced Index Algorithms - Deep dive into FAISS optimization and enterprise indexing strategies
Next: Session 4 - Query Enhancement & Context Augmentation →