⚙️ Session 4: Complete Enhancement Pipeline¶
⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete all 🎯 Observer, 📝 Participant, and other ⚙️ Implementer files Time Investment: 2-3 hours Outcome: Production-ready complete query enhancement system
Learning Outcomes¶
After completing this comprehensive guide, you will:
- Build complete production-ready query enhancement pipelines
- Implement sophisticated configuration and strategy management
- Create comprehensive error handling and fallback systems
- Develop performance optimization and monitoring capabilities
Complete Production Enhancement Pipeline¶
The Ultimate RAG Enhancement Architecture¶
This system integrates all enhancement techniques into a cohesive, production-ready pipeline that can handle any query type and content scenario with optimal performance and reliability.
Step 1: Production Pipeline Architecture¶
Build the comprehensive enhancement system foundation:
import asyncio
import logging
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
from enum import Enum
import time
from concurrent.futures import ThreadPoolExecutor
class EnhancementStrategy(Enum):
"""Available enhancement strategies."""
MINIMAL = "minimal"
BALANCED = "balanced"
COMPREHENSIVE = "comprehensive"
CUSTOM = "custom"
@dataclass
class EnhancementConfig:
"""Configuration for enhancement pipeline."""
strategy: EnhancementStrategy = EnhancementStrategy.BALANCED
use_hyde: bool = True
use_expansion: bool = True
use_multi_query: bool = False
use_context_optimization: bool = True
use_advanced_prompting: bool = True
max_context_tokens: int = 4000
retrieval_k: int = 20
timeout_seconds: int = 30
enable_caching: bool = True
enable_monitoring: bool = True
class ProductionQueryEnhancementPipeline:
"""Complete production-ready query enhancement pipeline."""
def __init__(self,
llm_model,
embedding_model,
tokenizer,
vector_store,
config: EnhancementConfig = None):
self.llm_model = llm_model
self.embedding_model = embedding_model
self.tokenizer = tokenizer
self.vector_store = vector_store
self.config = config or EnhancementConfig()
# Initialize enhancement components
self._initialize_components()
# Setup monitoring and caching
self._setup_infrastructure()
Production Architecture: Comprehensive system design with configuration management, monitoring, caching, and error handling.
Step 2: Component Initialization with Error Handling¶
def _initialize_components(self):
"""Initialize all enhancement components with error handling."""
try:
# Core enhancement components
if self.config.use_hyde:
from Session4_HyDE_Implementation import HyDEQueryEnhancer
self.hyde_enhancer = HyDEQueryEnhancer(
self.llm_model,
self.embedding_model
)
if self.config.use_expansion:
from Session4_Query_Expansion_Practice import IntelligentQueryExpander
self.query_expander = IntelligentQueryExpander(self.llm_model)
if self.config.use_multi_query:
from Session4_Query_Expansion_Practice import MultiQueryGenerator
self.multi_query_gen = MultiQueryGenerator(self.llm_model)
if self.config.use_context_optimization:
from Session4_Context_Optimization import ContextWindowOptimizer
self.context_optimizer = ContextWindowOptimizer(
self.tokenizer,
self.config.max_context_tokens
)
if self.config.use_advanced_prompting:
from Session4_Advanced_Prompt_Engineering import RAGPromptEngineer, DynamicPromptAdapter
self.prompt_engineer = RAGPromptEngineer(self.llm_model)
self.dynamic_adapter = DynamicPromptAdapter(self.llm_model)
self.logger.info("All enhancement components initialized successfully")
except Exception as e:
self.logger.error(f"Component initialization error: {e}")
self._initialize_fallback_components()
def _initialize_fallback_components(self):
"""Initialize minimal fallback components."""
self.logger.warning("Initializing fallback components")
# Basic fallback implementations
self.hyde_enhancer = None
self.query_expander = None
self.multi_query_gen = None
self.context_optimizer = BasicContextOptimizer(self.tokenizer)
self.prompt_engineer = BasicPromptEngineer()
Robust Initialization: Component initialization with comprehensive error handling and fallback mechanisms ensures system reliability.
Step 3: Infrastructure Setup (Monitoring & Caching)¶
def _setup_infrastructure(self):
"""Setup monitoring, caching, and performance tracking."""
# Logging setup
self.logger = logging.getLogger(f"RAGEnhancement-{id(self)}")
# Performance monitoring
self.metrics = {
'queries_processed': 0,
'average_processing_time': 0.0,
'cache_hits': 0,
'cache_misses': 0,
'enhancement_success_rate': 0.0,
'error_count': 0
}
# Caching system
if self.config.enable_caching:
self.cache = {}
self.cache_ttl = 3600 # 1 hour
# Thread pool for async operations
self.thread_pool = ThreadPoolExecutor(max_workers=4)
self.logger.info("Infrastructure setup completed")
def _update_metrics(self, processing_time: float, success: bool):
"""Update performance metrics."""
self.metrics['queries_processed'] += 1
# Update average processing time
current_avg = self.metrics['average_processing_time']
count = self.metrics['queries_processed']
self.metrics['average_processing_time'] = (
(current_avg * (count - 1) + processing_time) / count
)
# Update success rate
if success:
current_success = self.metrics['enhancement_success_rate']
self.metrics['enhancement_success_rate'] = (
(current_success * (count - 1) + 1.0) / count
)
else:
self.metrics['error_count'] += 1
current_success = self.metrics['enhancement_success_rate']
self.metrics['enhancement_success_rate'] = (
(current_success * (count - 1) + 0.0) / count
)
Infrastructure Management: Comprehensive monitoring, caching, and performance tracking for production deployment.
Step 4: Main Pipeline Orchestration¶
async def enhance_query_complete(self,
query: str,
custom_config: Dict = None) -> Dict[str, Any]:
"""Complete query enhancement pipeline with full orchestration."""
start_time = time.time()
processing_success = False
try:
# Check cache first
if self.config.enable_caching:
cached_result = self._check_cache(query, custom_config)
if cached_result:
self.metrics['cache_hits'] += 1
self.logger.info(f"Cache hit for query: {query[:50]}...")
return cached_result
else:
self.metrics['cache_misses'] += 1
# Apply custom configuration if provided
working_config = self._merge_configs(custom_config)
# Initialize result container
enhancement_result = {
'original_query': query,
'timestamp': time.time(),
'config_used': working_config,
'processing_stages': {}
}
# Stage 1: Query Enhancement
query_enhancements = await self._execute_query_enhancements(
query, working_config
)
enhancement_result['processing_stages']['query_enhancement'] = query_enhancements
# Stage 2: Enhanced Retrieval
retrieval_results = await self._execute_enhanced_retrieval(
query, query_enhancements, working_config
)
enhancement_result['processing_stages']['enhanced_retrieval'] = retrieval_results
# Stage 3: Context Optimization
context_optimization = await self._execute_context_optimization(
query, retrieval_results, working_config
)
enhancement_result['processing_stages']['context_optimization'] = context_optimization
# Stage 4: Prompt Engineering
prompt_engineering = await self._execute_prompt_engineering(
query, context_optimization, working_config
)
enhancement_result['processing_stages']['prompt_engineering'] = prompt_engineering
# Stage 5: Final Assembly
final_result = self._assemble_final_result(
query, enhancement_result
)
processing_success = True
processing_time = time.time() - start_time
# Cache result
if self.config.enable_caching:
self._cache_result(query, custom_config, final_result)
# Update metrics
self._update_metrics(processing_time, processing_success)
self.logger.info(f"Successfully enhanced query in {processing_time:.2f}s")
return final_result
except Exception as e:
processing_time = time.time() - start_time
self.logger.error(f"Pipeline error: {e}")
# Update metrics for failure
self._update_metrics(processing_time, processing_success)
# Return fallback result
return self._generate_fallback_result(query, str(e))
Pipeline Orchestration: Complete pipeline with comprehensive error handling, caching, metrics, and fallback mechanisms.
Step 5: Query Enhancement Execution¶
async def _execute_query_enhancements(self,
query: str,
config: Dict) -> Dict[str, Any]:
"""Execute all configured query enhancement techniques."""
enhancements = {}
# Prepare async tasks
tasks = []
# HyDE Enhancement
if config.get('use_hyde', False) and self.hyde_enhancer:
tasks.append(
self._safe_async_execute(
'hyde',
self.hyde_enhancer.enhance_query_with_hyde,
query
)
)
# Query Expansion
if config.get('use_expansion', False) and self.query_expander:
tasks.append(
self._safe_async_execute(
'expansion',
self.query_expander.expand_query,
query,
strategies=['semantic', 'contextual']
)
)
# Multi-Query Generation
if config.get('use_multi_query', False) and self.multi_query_gen:
tasks.append(
self._safe_async_execute(
'multi_query',
self.multi_query_gen.generate_multi_query_set,
query,
total_queries=6
)
)
# Execute all enhancements concurrently
if tasks:
enhancement_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for result in enhancement_results:
if isinstance(result, dict) and 'type' in result and 'result' in result:
enhancements[result['type']] = result['result']
elif isinstance(result, Exception):
self.logger.error(f"Enhancement task failed: {result}")
return enhancements
async def _safe_async_execute(self, task_type: str, func, *args, **kwargs):
"""Safely execute async enhancement tasks with timeout."""
try:
# Execute with timeout
result = await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(
self.thread_pool,
lambda: func(*args, **kwargs)
),
timeout=self.config.timeout_seconds
)
return {
'type': task_type,
'result': result,
'success': True
}
except asyncio.TimeoutError:
self.logger.error(f"Timeout in {task_type} enhancement")
return {
'type': task_type,
'result': None,
'success': False,
'error': 'timeout'
}
except Exception as e:
self.logger.error(f"Error in {task_type} enhancement: {e}")
return {
'type': task_type,
'result': None,
'success': False,
'error': str(e)
}
Concurrent Enhancement: Parallel execution of enhancement techniques with timeout protection and error isolation.
Step 6: Enhanced Retrieval Strategy¶
async def _execute_enhanced_retrieval(self,
query: str,
enhancements: Dict,
config: Dict) -> Dict[str, Any]:
"""Execute enhanced retrieval using all available enhancements."""
retrieval_results = {
'combined_results': [],
'individual_retrievals': {},
'retrieval_metadata': {}
}
retrieval_tasks = []
# Original query retrieval (baseline)
retrieval_tasks.append(
self._safe_retrieval_execute(
'original',
lambda: self.vector_store.similarity_search(query, k=config.get('retrieval_k', 20))
)
)
# HyDE-enhanced retrieval
if 'hyde' in enhancements and enhancements['hyde']:
hyde_embedding = enhancements['hyde'].get('enhanced_embedding')
if hyde_embedding is not None:
retrieval_tasks.append(
self._safe_retrieval_execute(
'hyde',
lambda: self.vector_store.similarity_search_by_vector(
hyde_embedding, k=config.get('retrieval_k', 20)
)
)
)
# Expanded query retrieval
if 'expansion' in enhancements and enhancements['expansion']:
expanded_query = enhancements['expansion'].get('expanded_query')
if expanded_query:
retrieval_tasks.append(
self._safe_retrieval_execute(
'expansion',
lambda: self.vector_store.similarity_search(
expanded_query, k=config.get('retrieval_k', 20)
)
)
)
# Multi-query retrieval
if 'multi_query' in enhancements and enhancements['multi_query']:
query_variants = enhancements['multi_query'].get('query_variants', [])
for i, variant in enumerate(query_variants[:3]): # Limit to top 3 variants
retrieval_tasks.append(
self._safe_retrieval_execute(
f'variant_{i}',
lambda q=variant: self.vector_store.similarity_search(
q, k=config.get('retrieval_k', 20) // 3
)
)
)
# Execute all retrievals concurrently
retrieval_responses = await asyncio.gather(*retrieval_tasks, return_exceptions=True)
# Process and combine results
all_results = []
for response in retrieval_responses:
if isinstance(response, dict) and response.get('success', False):
retrieval_type = response['type']
results = response['result']
retrieval_results['individual_retrievals'][retrieval_type] = results
all_results.extend(results)
elif isinstance(response, Exception):
self.logger.error(f"Retrieval task failed: {response}")
# Deduplicate and rank combined results
retrieval_results['combined_results'] = self._deduplicate_and_rank_results(
all_results, query
)
retrieval_results['retrieval_metadata'] = {
'total_individual_results': len(all_results),
'deduplicated_count': len(retrieval_results['combined_results']),
'retrieval_strategies_used': len(retrieval_results['individual_retrievals']),
'success_rate': len([r for r in retrieval_responses if isinstance(r, dict) and r.get('success', False)]) / len(retrieval_responses)
}
return retrieval_results
Multi-Strategy Retrieval: Comprehensive retrieval using all available enhancement techniques with intelligent result combination.
Step 7: Intelligent Result Deduplication and Ranking¶
def _deduplicate_and_rank_results(self, all_results: List, query: str) -> List:
"""Intelligently deduplicate and rank combined retrieval results."""
if not all_results:
return []
# Create result tracking
unique_results = {}
for result in all_results:
# Generate content hash for deduplication
content = result.page_content if hasattr(result, 'page_content') else str(result)
content_hash = hash(content.strip().lower())
if content_hash not in unique_results:
unique_results[content_hash] = {
'result': result,
'occurrences': 1,
'sources': [self._extract_result_source(result)]
}
else:
unique_results[content_hash]['occurrences'] += 1
source = self._extract_result_source(result)
if source not in unique_results[content_hash]['sources']:
unique_results[content_hash]['sources'].append(source)
# Calculate ranking scores
ranked_results = []
for content_hash, data in unique_results.items():
result = data['result']
# Base relevance score (from similarity search)
base_score = getattr(result, 'similarity_score', 0.5)
# Boost for multiple occurrences (consensus across strategies)
occurrence_boost = min(data['occurrences'] * 0.1, 0.3)
# Boost for source diversity
source_diversity_boost = min(len(data['sources']) * 0.05, 0.2)
final_score = base_score + occurrence_boost + source_diversity_boost
ranked_results.append({
'result': result,
'final_score': final_score,
'occurrences': data['occurrences'],
'sources': data['sources']
})
# Sort by final score and return top results
ranked_results.sort(key=lambda x: x['final_score'], reverse=True)
return [item['result'] for item in ranked_results[:self.config.retrieval_k]]
async def _safe_retrieval_execute(self, task_type: str, func):
"""Safely execute retrieval tasks with error handling."""
try:
result = await asyncio.get_event_loop().run_in_executor(
self.thread_pool, func
)
return {
'type': task_type,
'result': result,
'success': True
}
except Exception as e:
self.logger.error(f"Retrieval error in {task_type}: {e}")
return {
'type': task_type,
'result': [],
'success': False,
'error': str(e)
}
Intelligent Ranking: Sophisticated result deduplication and ranking that considers consensus across strategies and source diversity.
Step 8: Advanced Context Optimization¶
async def _execute_context_optimization(self,
query: str,
retrieval_results: Dict,
config: Dict) -> Dict[str, Any]:
"""Execute context optimization with intelligent strategy selection."""
if not config.get('use_context_optimization', True) or not self.context_optimizer:
return {
'optimized_context': self._create_basic_context(retrieval_results['combined_results']),
'strategy_used': 'basic',
'optimization_applied': False
}
try:
# Analyze content characteristics for strategy selection
content_analysis = self._analyze_retrieval_content(
retrieval_results['combined_results']
)
# Select optimal optimization strategy
optimization_strategy = self._select_optimization_strategy(
content_analysis, config
)
# Execute optimization
optimization_result = await asyncio.get_event_loop().run_in_executor(
self.thread_pool,
lambda: self.context_optimizer.optimize_context_window(
query,
retrieval_results['combined_results'],
strategy=optimization_strategy
)
)
optimization_result['content_analysis'] = content_analysis
optimization_result['optimization_applied'] = True
return optimization_result
except Exception as e:
self.logger.error(f"Context optimization error: {e}")
return {
'optimized_context': self._create_basic_context(retrieval_results['combined_results']),
'strategy_used': 'fallback',
'optimization_applied': False,
'error': str(e)
}
def _analyze_retrieval_content(self, results: List) -> Dict[str, Any]:
"""Analyze retrieval results to guide optimization strategy."""
if not results:
return {'total_tokens': 0, 'source_diversity': 0, 'avg_chunk_size': 0}
total_tokens = 0
sources = set()
chunk_sizes = []
for result in results:
content = result.page_content if hasattr(result, 'page_content') else str(result)
tokens = len(self.tokenizer.encode(content))
total_tokens += tokens
chunk_sizes.append(tokens)
# Extract source information
metadata = getattr(result, 'metadata', {})
source = metadata.get('source', 'unknown')
sources.add(source)
return {
'total_tokens': total_tokens,
'source_diversity': len(sources) / len(results),
'avg_chunk_size': sum(chunk_sizes) / len(chunk_sizes),
'max_chunk_size': max(chunk_sizes),
'token_budget_ratio': total_tokens / self.config.max_context_tokens
}
def _select_optimization_strategy(self, content_analysis: Dict, config: Dict) -> str:
"""Intelligently select optimization strategy based on content characteristics."""
token_ratio = content_analysis.get('token_budget_ratio', 1.0)
source_diversity = content_analysis.get('source_diversity', 0.5)
avg_chunk_size = content_analysis.get('avg_chunk_size', 200)
# Strategy selection logic
if token_ratio <= 0.8:
return 'relevance_ranking' # Simple case
elif source_diversity > 0.6 and token_ratio > 1.5:
return 'hierarchical_summary' # High diversity, needs summarization
elif avg_chunk_size > 500:
return 'semantic_compression' # Large chunks, compress
else:
return 'diversity_clustering' # Balance relevance and diversity
Adaptive Optimization: Intelligent context optimization strategy selection based on content characteristics and system constraints.
Step 9: Dynamic Prompt Engineering¶
async def _execute_prompt_engineering(self,
query: str,
context_result: Dict,
config: Dict) -> Dict[str, Any]:
"""Execute advanced prompt engineering with dynamic adaptation."""
if not config.get('use_advanced_prompting', True):
return {
'final_prompt': self._create_basic_prompt(query, context_result['optimized_context']),
'prompt_strategy': 'basic',
'engineering_applied': False
}
try:
# Dynamic prompt adaptation
if self.dynamic_adapter:
adaptation_result = await asyncio.get_event_loop().run_in_executor(
self.thread_pool,
lambda: self.dynamic_adapter.adapt_prompt_dynamically(
query,
context_result['optimized_context'],
context_result
)
)
return {
'final_prompt': adaptation_result['adapted_prompt'],
'prompt_strategy': adaptation_result['strategy'],
'engineering_applied': True,
'adaptation_metadata': adaptation_result
}
# Fallback to standard prompt engineering
elif self.prompt_engineer:
prompt_result = await asyncio.get_event_loop().run_in_executor(
self.thread_pool,
lambda: self.prompt_engineer.engineer_rag_prompt(
query,
context_result['optimized_context'],
optimizations=['confidence_calibration']
)
)
return {
'final_prompt': prompt_result['optimized_prompt'],
'prompt_strategy': prompt_result['query_type'],
'engineering_applied': True,
'prompt_metadata': prompt_result
}
else:
return {
'final_prompt': self._create_basic_prompt(query, context_result['optimized_context']),
'prompt_strategy': 'basic',
'engineering_applied': False
}
except Exception as e:
self.logger.error(f"Prompt engineering error: {e}")
return {
'final_prompt': self._create_basic_prompt(query, context_result['optimized_context']),
'prompt_strategy': 'fallback',
'engineering_applied': False,
'error': str(e)
}
Step 10: Final Result Assembly and Validation¶
def _assemble_final_result(self, query: str, enhancement_result: Dict) -> Dict[str, Any]:
"""Assemble comprehensive final result with all metadata."""
# Extract key components
final_prompt = enhancement_result['processing_stages']['prompt_engineering']['final_prompt']
context = enhancement_result['processing_stages']['context_optimization']['optimized_context']
# Calculate quality scores
quality_metrics = self._calculate_quality_metrics(enhancement_result)
# Generate response using enhanced prompt
try:
if self.config.enable_monitoring:
response_start = time.time()
final_response = self.llm_model.predict(final_prompt)
if self.config.enable_monitoring:
response_time = time.time() - response_start
else:
response_time = 0.0
except Exception as e:
self.logger.error(f"LLM response generation error: {e}")
final_response = f"Error generating response: {e}"
response_time = 0.0
return {
'query': query,
'response': final_response,
'enhanced_context': context,
'final_prompt': final_prompt,
'quality_metrics': quality_metrics,
'processing_metadata': {
'total_processing_time': time.time() - enhancement_result['timestamp'],
'response_generation_time': response_time,
'enhancement_stages_completed': len(enhancement_result['processing_stages']),
'config_used': enhancement_result['config_used']
},
'enhancement_details': enhancement_result['processing_stages']
}
def _calculate_quality_metrics(self, enhancement_result: Dict) -> Dict[str, float]:
"""Calculate quality metrics for the enhancement process."""
metrics = {
'enhancement_coverage': 0.0,
'context_efficiency': 0.0,
'retrieval_diversity': 0.0,
'overall_quality_score': 0.0
}
# Enhancement coverage (how many techniques were successfully applied)
stages = enhancement_result['processing_stages']
total_possible_enhancements = 4 # hyde, expansion, context_opt, prompt_eng
successful_enhancements = sum(1 for stage in stages.values() if stage.get('success', True))
metrics['enhancement_coverage'] = successful_enhancements / total_possible_enhancements
# Context efficiency (from context optimization)
context_stage = stages.get('context_optimization', {})
metrics['context_efficiency'] = context_stage.get('efficiency_score', 0.5)
# Retrieval diversity (from retrieval results)
retrieval_stage = stages.get('enhanced_retrieval', {})
retrieval_metadata = retrieval_stage.get('retrieval_metadata', {})
metrics['retrieval_diversity'] = retrieval_metadata.get('success_rate', 0.5)
# Overall quality score (weighted average)
metrics['overall_quality_score'] = (
0.3 * metrics['enhancement_coverage'] +
0.4 * metrics['context_efficiency'] +
0.3 * metrics['retrieval_diversity']
)
return metrics
Quality Assessment: Comprehensive quality metrics and result assembly with detailed metadata for monitoring and optimization.
Production Deployment Integration¶
Step 11: Caching and Performance Optimization¶
def _check_cache(self, query: str, custom_config: Dict = None) -> Optional[Dict]:
"""Check cache for previous results."""
if not self.config.enable_caching:
return None
cache_key = self._generate_cache_key(query, custom_config)
if cache_key in self.cache:
cached_entry = self.cache[cache_key]
# Check TTL
if time.time() - cached_entry['timestamp'] < self.cache_ttl:
return cached_entry['result']
else:
# Expired, remove from cache
del self.cache[cache_key]
return None
def _cache_result(self, query: str, custom_config: Dict, result: Dict):
"""Cache result for future use."""
if not self.config.enable_caching:
return
cache_key = self._generate_cache_key(query, custom_config)
self.cache[cache_key] = {
'result': result,
'timestamp': time.time()
}
# Simple cache size management
if len(self.cache) > 1000: # Max 1000 entries
# Remove oldest entries
oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k]['timestamp'])
del self.cache[oldest_key]
def _generate_cache_key(self, query: str, custom_config: Dict = None) -> str:
"""Generate cache key from query and config."""
config_hash = hash(str(sorted((custom_config or {}).items())))
query_hash = hash(query.strip().lower())
return f"{query_hash}_{config_hash}"
Step 12: Monitoring and Analytics¶
def get_performance_metrics(self) -> Dict[str, Any]:
"""Get comprehensive performance metrics."""
return {
'processing_metrics': self.metrics.copy(),
'cache_metrics': {
'cache_size': len(self.cache) if self.config.enable_caching else 0,
'hit_rate': self.metrics['cache_hits'] / max(1, self.metrics['cache_hits'] + self.metrics['cache_misses'])
},
'component_status': self._get_component_status(),
'system_health': self._assess_system_health()
}
def _get_component_status(self) -> Dict[str, str]:
"""Check status of all enhancement components."""
status = {}
components = [
('hyde_enhancer', 'HyDE'),
('query_expander', 'Query Expansion'),
('multi_query_gen', 'Multi-Query Generation'),
('context_optimizer', 'Context Optimization'),
('prompt_engineer', 'Prompt Engineering'),
('dynamic_adapter', 'Dynamic Adaptation')
]
for attr_name, display_name in components:
if hasattr(self, attr_name) and getattr(self, attr_name) is not None:
status[display_name] = 'active'
else:
status[display_name] = 'inactive'
return status
def _assess_system_health(self) -> str:
"""Assess overall system health."""
error_rate = self.metrics['error_count'] / max(1, self.metrics['queries_processed'])
success_rate = self.metrics['enhancement_success_rate']
avg_time = self.metrics['average_processing_time']
if error_rate < 0.05 and success_rate > 0.9 and avg_time < 5.0:
return 'excellent'
elif error_rate < 0.1 and success_rate > 0.8 and avg_time < 10.0:
return 'good'
elif error_rate < 0.2 and success_rate > 0.7 and avg_time < 20.0:
return 'fair'
else:
return 'poor'
Usage Examples and Testing¶
Step 13: Complete Usage Example¶
# Initialize the production pipeline
async def main():
# Setup components
config = EnhancementConfig(
strategy=EnhancementStrategy.COMPREHENSIVE,
use_hyde=True,
use_expansion=True,
use_multi_query=True,
use_context_optimization=True,
use_advanced_prompting=True,
max_context_tokens=4000,
enable_monitoring=True,
enable_caching=True
)
pipeline = ProductionQueryEnhancementPipeline(
llm_model=llm_model,
embedding_model=embedding_model,
tokenizer=tokenizer,
vector_store=vector_store,
config=config
)
# Test with various query types
test_queries = [
"How do I implement authentication in a React application?",
"What are the pros and cons of microservices vs monolithic architecture?",
"Explain the differences between SQL and NoSQL databases",
"How to optimize database query performance?"
]
results = []
for query in test_queries:
print(f"\nProcessing: {query}")
result = await pipeline.enhance_query_complete(query)
results.append(result)
print(f"Quality Score: {result['quality_metrics']['overall_quality_score']:.3f}")
print(f"Processing Time: {result['processing_metadata']['total_processing_time']:.2f}s")
print(f"Response: {result['response'][:200]}...")
# Get performance metrics
metrics = pipeline.get_performance_metrics()
print(f"\nSystem Performance:")
print(f"Queries Processed: {metrics['processing_metrics']['queries_processed']}")
print(f"Average Processing Time: {metrics['processing_metrics']['average_processing_time']:.2f}s")
print(f"Success Rate: {metrics['processing_metrics']['enhancement_success_rate']:.3f}")
print(f"Cache Hit Rate: {metrics['cache_metrics']['hit_rate']:.3f}")
print(f"System Health: {metrics['system_health']}")
# Run the example
if __name__ == "__main__":
asyncio.run(main())
Complete Integration: Production-ready example demonstrating comprehensive system usage with monitoring and performance tracking.
Production Deployment Considerations¶
Performance Optimization¶
- Concurrent Processing: All enhancement techniques run in parallel where possible
- Intelligent Caching: Query-based caching with TTL and size management
- Resource Management: Thread pool management and timeout controls
Error Handling and Reliability¶
- Graceful Degradation: System continues with reduced functionality if components fail
- Comprehensive Logging: Detailed logging for monitoring and debugging
- Fallback Mechanisms: Basic implementations when advanced features fail
Monitoring and Observability¶
- Performance Metrics: Processing times, success rates, and resource usage
- Quality Metrics: Enhancement effectiveness and result quality assessment
- Health Monitoring: Component status and system health indicators
Configuration Management¶
- Strategy Selection: Pre-defined strategies for different use cases
- Dynamic Configuration: Runtime configuration updates for optimization
- Environment Adaptation: Different settings for development, staging, and production
🧭 Navigation¶
Previous: Session 3 - Advanced Patterns →
Next: Session 5 - Type-Safe Development →