Skip to content

⚙️ Session 4: Complete Enhancement Pipeline

⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete all 🎯 Observer, 📝 Participant, and other ⚙️ Implementer files Time Investment: 2-3 hours Outcome: Production-ready complete query enhancement system

Learning Outcomes

After completing this comprehensive guide, you will:

  • Build complete production-ready query enhancement pipelines
  • Implement sophisticated configuration and strategy management
  • Create comprehensive error handling and fallback systems
  • Develop performance optimization and monitoring capabilities

Complete Production Enhancement Pipeline

The Ultimate RAG Enhancement Architecture

This system integrates all enhancement techniques into a cohesive, production-ready pipeline that can handle any query type and content scenario with optimal performance and reliability.

Step 1: Production Pipeline Architecture

Build the comprehensive enhancement system foundation:

import asyncio
import logging
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
from enum import Enum
import time
from concurrent.futures import ThreadPoolExecutor

class EnhancementStrategy(Enum):
    """Available enhancement strategies."""
    MINIMAL = "minimal"
    BALANCED = "balanced"
    COMPREHENSIVE = "comprehensive"
    CUSTOM = "custom"

@dataclass
class EnhancementConfig:
    """Configuration for enhancement pipeline."""
    strategy: EnhancementStrategy = EnhancementStrategy.BALANCED
    use_hyde: bool = True
    use_expansion: bool = True
    use_multi_query: bool = False
    use_context_optimization: bool = True
    use_advanced_prompting: bool = True
    max_context_tokens: int = 4000
    retrieval_k: int = 20
    timeout_seconds: int = 30
    enable_caching: bool = True
    enable_monitoring: bool = True

class ProductionQueryEnhancementPipeline:
    """Complete production-ready query enhancement pipeline."""

    def __init__(self,
                 llm_model,
                 embedding_model,
                 tokenizer,
                 vector_store,
                 config: EnhancementConfig = None):

        self.llm_model = llm_model
        self.embedding_model = embedding_model
        self.tokenizer = tokenizer
        self.vector_store = vector_store
        self.config = config or EnhancementConfig()

        # Initialize enhancement components
        self._initialize_components()

        # Setup monitoring and caching
        self._setup_infrastructure()

Production Architecture: Comprehensive system design with configuration management, monitoring, caching, and error handling.

Step 2: Component Initialization with Error Handling

def _initialize_components(self):
    """Initialize all enhancement components with error handling."""

    try:
        # Core enhancement components
        if self.config.use_hyde:
            from Session4_HyDE_Implementation import HyDEQueryEnhancer
            self.hyde_enhancer = HyDEQueryEnhancer(
                self.llm_model,
                self.embedding_model
            )

        if self.config.use_expansion:
            from Session4_Query_Expansion_Practice import IntelligentQueryExpander
            self.query_expander = IntelligentQueryExpander(self.llm_model)

        if self.config.use_multi_query:
            from Session4_Query_Expansion_Practice import MultiQueryGenerator
            self.multi_query_gen = MultiQueryGenerator(self.llm_model)

        if self.config.use_context_optimization:
            from Session4_Context_Optimization import ContextWindowOptimizer
            self.context_optimizer = ContextWindowOptimizer(
                self.tokenizer,
                self.config.max_context_tokens
            )

        if self.config.use_advanced_prompting:
            from Session4_Advanced_Prompt_Engineering import RAGPromptEngineer, DynamicPromptAdapter
            self.prompt_engineer = RAGPromptEngineer(self.llm_model)
            self.dynamic_adapter = DynamicPromptAdapter(self.llm_model)

        self.logger.info("All enhancement components initialized successfully")

    except Exception as e:
        self.logger.error(f"Component initialization error: {e}")
        self._initialize_fallback_components()

def _initialize_fallback_components(self):
    """Initialize minimal fallback components."""

    self.logger.warning("Initializing fallback components")

    # Basic fallback implementations
    self.hyde_enhancer = None
    self.query_expander = None
    self.multi_query_gen = None
    self.context_optimizer = BasicContextOptimizer(self.tokenizer)
    self.prompt_engineer = BasicPromptEngineer()

Robust Initialization: Component initialization with comprehensive error handling and fallback mechanisms ensures system reliability.

Step 3: Infrastructure Setup (Monitoring & Caching)

def _setup_infrastructure(self):
    """Setup monitoring, caching, and performance tracking."""

    # Logging setup
    self.logger = logging.getLogger(f"RAGEnhancement-{id(self)}")

    # Performance monitoring
    self.metrics = {
        'queries_processed': 0,
        'average_processing_time': 0.0,
        'cache_hits': 0,
        'cache_misses': 0,
        'enhancement_success_rate': 0.0,
        'error_count': 0
    }

    # Caching system
    if self.config.enable_caching:
        self.cache = {}
        self.cache_ttl = 3600  # 1 hour

    # Thread pool for async operations
    self.thread_pool = ThreadPoolExecutor(max_workers=4)

    self.logger.info("Infrastructure setup completed")

def _update_metrics(self, processing_time: float, success: bool):
    """Update performance metrics."""

    self.metrics['queries_processed'] += 1

    # Update average processing time
    current_avg = self.metrics['average_processing_time']
    count = self.metrics['queries_processed']
    self.metrics['average_processing_time'] = (
        (current_avg * (count - 1) + processing_time) / count
    )

    # Update success rate
    if success:
        current_success = self.metrics['enhancement_success_rate']
        self.metrics['enhancement_success_rate'] = (
            (current_success * (count - 1) + 1.0) / count
        )
    else:
        self.metrics['error_count'] += 1
        current_success = self.metrics['enhancement_success_rate']
        self.metrics['enhancement_success_rate'] = (
            (current_success * (count - 1) + 0.0) / count
        )

Infrastructure Management: Comprehensive monitoring, caching, and performance tracking for production deployment.

Step 4: Main Pipeline Orchestration

async def enhance_query_complete(self,
                               query: str,
                               custom_config: Dict = None) -> Dict[str, Any]:
    """Complete query enhancement pipeline with full orchestration."""

    start_time = time.time()
    processing_success = False

    try:
        # Check cache first
        if self.config.enable_caching:
            cached_result = self._check_cache(query, custom_config)
            if cached_result:
                self.metrics['cache_hits'] += 1
                self.logger.info(f"Cache hit for query: {query[:50]}...")
                return cached_result
            else:
                self.metrics['cache_misses'] += 1

        # Apply custom configuration if provided
        working_config = self._merge_configs(custom_config)

        # Initialize result container
        enhancement_result = {
            'original_query': query,
            'timestamp': time.time(),
            'config_used': working_config,
            'processing_stages': {}
        }

        # Stage 1: Query Enhancement
        query_enhancements = await self._execute_query_enhancements(
            query, working_config
        )
        enhancement_result['processing_stages']['query_enhancement'] = query_enhancements

        # Stage 2: Enhanced Retrieval
        retrieval_results = await self._execute_enhanced_retrieval(
            query, query_enhancements, working_config
        )
        enhancement_result['processing_stages']['enhanced_retrieval'] = retrieval_results

        # Stage 3: Context Optimization
        context_optimization = await self._execute_context_optimization(
            query, retrieval_results, working_config
        )
        enhancement_result['processing_stages']['context_optimization'] = context_optimization

        # Stage 4: Prompt Engineering
        prompt_engineering = await self._execute_prompt_engineering(
            query, context_optimization, working_config
        )
        enhancement_result['processing_stages']['prompt_engineering'] = prompt_engineering

        # Stage 5: Final Assembly
        final_result = self._assemble_final_result(
            query, enhancement_result
        )

        processing_success = True
        processing_time = time.time() - start_time

        # Cache result
        if self.config.enable_caching:
            self._cache_result(query, custom_config, final_result)

        # Update metrics
        self._update_metrics(processing_time, processing_success)

        self.logger.info(f"Successfully enhanced query in {processing_time:.2f}s")

        return final_result

    except Exception as e:
        processing_time = time.time() - start_time
        self.logger.error(f"Pipeline error: {e}")

        # Update metrics for failure
        self._update_metrics(processing_time, processing_success)

        # Return fallback result
        return self._generate_fallback_result(query, str(e))

Pipeline Orchestration: Complete pipeline with comprehensive error handling, caching, metrics, and fallback mechanisms.

Step 5: Query Enhancement Execution

async def _execute_query_enhancements(self,
                                    query: str,
                                    config: Dict) -> Dict[str, Any]:
    """Execute all configured query enhancement techniques."""

    enhancements = {}

    # Prepare async tasks
    tasks = []

    # HyDE Enhancement
    if config.get('use_hyde', False) and self.hyde_enhancer:
        tasks.append(
            self._safe_async_execute(
                'hyde',
                self.hyde_enhancer.enhance_query_with_hyde,
                query
            )
        )

    # Query Expansion
    if config.get('use_expansion', False) and self.query_expander:
        tasks.append(
            self._safe_async_execute(
                'expansion',
                self.query_expander.expand_query,
                query,
                strategies=['semantic', 'contextual']
            )
        )

    # Multi-Query Generation
    if config.get('use_multi_query', False) and self.multi_query_gen:
        tasks.append(
            self._safe_async_execute(
                'multi_query',
                self.multi_query_gen.generate_multi_query_set,
                query,
                total_queries=6
            )
        )

    # Execute all enhancements concurrently
    if tasks:
        enhancement_results = await asyncio.gather(*tasks, return_exceptions=True)

        # Process results
        for result in enhancement_results:
            if isinstance(result, dict) and 'type' in result and 'result' in result:
                enhancements[result['type']] = result['result']
            elif isinstance(result, Exception):
                self.logger.error(f"Enhancement task failed: {result}")

    return enhancements

async def _safe_async_execute(self, task_type: str, func, *args, **kwargs):
    """Safely execute async enhancement tasks with timeout."""

    try:
        # Execute with timeout
        result = await asyncio.wait_for(
            asyncio.get_event_loop().run_in_executor(
                self.thread_pool,
                lambda: func(*args, **kwargs)
            ),
            timeout=self.config.timeout_seconds
        )

        return {
            'type': task_type,
            'result': result,
            'success': True
        }

    except asyncio.TimeoutError:
        self.logger.error(f"Timeout in {task_type} enhancement")
        return {
            'type': task_type,
            'result': None,
            'success': False,
            'error': 'timeout'
        }

    except Exception as e:
        self.logger.error(f"Error in {task_type} enhancement: {e}")
        return {
            'type': task_type,
            'result': None,
            'success': False,
            'error': str(e)
        }

Concurrent Enhancement: Parallel execution of enhancement techniques with timeout protection and error isolation.

Step 6: Enhanced Retrieval Strategy

async def _execute_enhanced_retrieval(self,
                                    query: str,
                                    enhancements: Dict,
                                    config: Dict) -> Dict[str, Any]:
    """Execute enhanced retrieval using all available enhancements."""

    retrieval_results = {
        'combined_results': [],
        'individual_retrievals': {},
        'retrieval_metadata': {}
    }

    retrieval_tasks = []

    # Original query retrieval (baseline)
    retrieval_tasks.append(
        self._safe_retrieval_execute(
            'original',
            lambda: self.vector_store.similarity_search(query, k=config.get('retrieval_k', 20))
        )
    )

    # HyDE-enhanced retrieval
    if 'hyde' in enhancements and enhancements['hyde']:
        hyde_embedding = enhancements['hyde'].get('enhanced_embedding')
        if hyde_embedding is not None:
            retrieval_tasks.append(
                self._safe_retrieval_execute(
                    'hyde',
                    lambda: self.vector_store.similarity_search_by_vector(
                        hyde_embedding, k=config.get('retrieval_k', 20)
                    )
                )
            )

    # Expanded query retrieval
    if 'expansion' in enhancements and enhancements['expansion']:
        expanded_query = enhancements['expansion'].get('expanded_query')
        if expanded_query:
            retrieval_tasks.append(
                self._safe_retrieval_execute(
                    'expansion',
                    lambda: self.vector_store.similarity_search(
                        expanded_query, k=config.get('retrieval_k', 20)
                    )
                )
            )

    # Multi-query retrieval
    if 'multi_query' in enhancements and enhancements['multi_query']:
        query_variants = enhancements['multi_query'].get('query_variants', [])
        for i, variant in enumerate(query_variants[:3]):  # Limit to top 3 variants
            retrieval_tasks.append(
                self._safe_retrieval_execute(
                    f'variant_{i}',
                    lambda q=variant: self.vector_store.similarity_search(
                        q, k=config.get('retrieval_k', 20) // 3
                    )
                )
            )

    # Execute all retrievals concurrently
    retrieval_responses = await asyncio.gather(*retrieval_tasks, return_exceptions=True)

    # Process and combine results
    all_results = []
    for response in retrieval_responses:
        if isinstance(response, dict) and response.get('success', False):
            retrieval_type = response['type']
            results = response['result']

            retrieval_results['individual_retrievals'][retrieval_type] = results
            all_results.extend(results)
        elif isinstance(response, Exception):
            self.logger.error(f"Retrieval task failed: {response}")

    # Deduplicate and rank combined results
    retrieval_results['combined_results'] = self._deduplicate_and_rank_results(
        all_results, query
    )

    retrieval_results['retrieval_metadata'] = {
        'total_individual_results': len(all_results),
        'deduplicated_count': len(retrieval_results['combined_results']),
        'retrieval_strategies_used': len(retrieval_results['individual_retrievals']),
        'success_rate': len([r for r in retrieval_responses if isinstance(r, dict) and r.get('success', False)]) / len(retrieval_responses)
    }

    return retrieval_results

Multi-Strategy Retrieval: Comprehensive retrieval using all available enhancement techniques with intelligent result combination.

Step 7: Intelligent Result Deduplication and Ranking

def _deduplicate_and_rank_results(self, all_results: List, query: str) -> List:
    """Intelligently deduplicate and rank combined retrieval results."""

    if not all_results:
        return []

    # Create result tracking
    unique_results = {}

    for result in all_results:
        # Generate content hash for deduplication
        content = result.page_content if hasattr(result, 'page_content') else str(result)
        content_hash = hash(content.strip().lower())

        if content_hash not in unique_results:
            unique_results[content_hash] = {
                'result': result,
                'occurrences': 1,
                'sources': [self._extract_result_source(result)]
            }
        else:
            unique_results[content_hash]['occurrences'] += 1
            source = self._extract_result_source(result)
            if source not in unique_results[content_hash]['sources']:
                unique_results[content_hash]['sources'].append(source)

    # Calculate ranking scores
    ranked_results = []
    for content_hash, data in unique_results.items():
        result = data['result']

        # Base relevance score (from similarity search)
        base_score = getattr(result, 'similarity_score', 0.5)

        # Boost for multiple occurrences (consensus across strategies)
        occurrence_boost = min(data['occurrences'] * 0.1, 0.3)

        # Boost for source diversity
        source_diversity_boost = min(len(data['sources']) * 0.05, 0.2)

        final_score = base_score + occurrence_boost + source_diversity_boost

        ranked_results.append({
            'result': result,
            'final_score': final_score,
            'occurrences': data['occurrences'],
            'sources': data['sources']
        })

    # Sort by final score and return top results
    ranked_results.sort(key=lambda x: x['final_score'], reverse=True)

    return [item['result'] for item in ranked_results[:self.config.retrieval_k]]

async def _safe_retrieval_execute(self, task_type: str, func):
    """Safely execute retrieval tasks with error handling."""

    try:
        result = await asyncio.get_event_loop().run_in_executor(
            self.thread_pool, func
        )

        return {
            'type': task_type,
            'result': result,
            'success': True
        }

    except Exception as e:
        self.logger.error(f"Retrieval error in {task_type}: {e}")
        return {
            'type': task_type,
            'result': [],
            'success': False,
            'error': str(e)
        }

Intelligent Ranking: Sophisticated result deduplication and ranking that considers consensus across strategies and source diversity.

Step 8: Advanced Context Optimization

async def _execute_context_optimization(self,
                                      query: str,
                                      retrieval_results: Dict,
                                      config: Dict) -> Dict[str, Any]:
    """Execute context optimization with intelligent strategy selection."""

    if not config.get('use_context_optimization', True) or not self.context_optimizer:
        return {
            'optimized_context': self._create_basic_context(retrieval_results['combined_results']),
            'strategy_used': 'basic',
            'optimization_applied': False
        }

    try:
        # Analyze content characteristics for strategy selection
        content_analysis = self._analyze_retrieval_content(
            retrieval_results['combined_results']
        )

        # Select optimal optimization strategy
        optimization_strategy = self._select_optimization_strategy(
            content_analysis, config
        )

        # Execute optimization
        optimization_result = await asyncio.get_event_loop().run_in_executor(
            self.thread_pool,
            lambda: self.context_optimizer.optimize_context_window(
                query,
                retrieval_results['combined_results'],
                strategy=optimization_strategy
            )
        )

        optimization_result['content_analysis'] = content_analysis
        optimization_result['optimization_applied'] = True

        return optimization_result

    except Exception as e:
        self.logger.error(f"Context optimization error: {e}")
        return {
            'optimized_context': self._create_basic_context(retrieval_results['combined_results']),
            'strategy_used': 'fallback',
            'optimization_applied': False,
            'error': str(e)
        }

def _analyze_retrieval_content(self, results: List) -> Dict[str, Any]:
    """Analyze retrieval results to guide optimization strategy."""

    if not results:
        return {'total_tokens': 0, 'source_diversity': 0, 'avg_chunk_size': 0}

    total_tokens = 0
    sources = set()
    chunk_sizes = []

    for result in results:
        content = result.page_content if hasattr(result, 'page_content') else str(result)
        tokens = len(self.tokenizer.encode(content))
        total_tokens += tokens
        chunk_sizes.append(tokens)

        # Extract source information
        metadata = getattr(result, 'metadata', {})
        source = metadata.get('source', 'unknown')
        sources.add(source)

    return {
        'total_tokens': total_tokens,
        'source_diversity': len(sources) / len(results),
        'avg_chunk_size': sum(chunk_sizes) / len(chunk_sizes),
        'max_chunk_size': max(chunk_sizes),
        'token_budget_ratio': total_tokens / self.config.max_context_tokens
    }

def _select_optimization_strategy(self, content_analysis: Dict, config: Dict) -> str:
    """Intelligently select optimization strategy based on content characteristics."""

    token_ratio = content_analysis.get('token_budget_ratio', 1.0)
    source_diversity = content_analysis.get('source_diversity', 0.5)
    avg_chunk_size = content_analysis.get('avg_chunk_size', 200)

    # Strategy selection logic
    if token_ratio <= 0.8:
        return 'relevance_ranking'  # Simple case
    elif source_diversity > 0.6 and token_ratio > 1.5:
        return 'hierarchical_summary'  # High diversity, needs summarization
    elif avg_chunk_size > 500:
        return 'semantic_compression'  # Large chunks, compress
    else:
        return 'diversity_clustering'  # Balance relevance and diversity

Adaptive Optimization: Intelligent context optimization strategy selection based on content characteristics and system constraints.

Step 9: Dynamic Prompt Engineering

async def _execute_prompt_engineering(self,
                                    query: str,
                                    context_result: Dict,
                                    config: Dict) -> Dict[str, Any]:
    """Execute advanced prompt engineering with dynamic adaptation."""

    if not config.get('use_advanced_prompting', True):
        return {
            'final_prompt': self._create_basic_prompt(query, context_result['optimized_context']),
            'prompt_strategy': 'basic',
            'engineering_applied': False
        }

    try:
        # Dynamic prompt adaptation
        if self.dynamic_adapter:
            adaptation_result = await asyncio.get_event_loop().run_in_executor(
                self.thread_pool,
                lambda: self.dynamic_adapter.adapt_prompt_dynamically(
                    query,
                    context_result['optimized_context'],
                    context_result
                )
            )

            return {
                'final_prompt': adaptation_result['adapted_prompt'],
                'prompt_strategy': adaptation_result['strategy'],
                'engineering_applied': True,
                'adaptation_metadata': adaptation_result
            }

        # Fallback to standard prompt engineering
        elif self.prompt_engineer:
            prompt_result = await asyncio.get_event_loop().run_in_executor(
                self.thread_pool,
                lambda: self.prompt_engineer.engineer_rag_prompt(
                    query,
                    context_result['optimized_context'],
                    optimizations=['confidence_calibration']
                )
            )

            return {
                'final_prompt': prompt_result['optimized_prompt'],
                'prompt_strategy': prompt_result['query_type'],
                'engineering_applied': True,
                'prompt_metadata': prompt_result
            }

        else:
            return {
                'final_prompt': self._create_basic_prompt(query, context_result['optimized_context']),
                'prompt_strategy': 'basic',
                'engineering_applied': False
            }

    except Exception as e:
        self.logger.error(f"Prompt engineering error: {e}")
        return {
            'final_prompt': self._create_basic_prompt(query, context_result['optimized_context']),
            'prompt_strategy': 'fallback',
            'engineering_applied': False,
            'error': str(e)
        }

Step 10: Final Result Assembly and Validation

def _assemble_final_result(self, query: str, enhancement_result: Dict) -> Dict[str, Any]:
    """Assemble comprehensive final result with all metadata."""

    # Extract key components
    final_prompt = enhancement_result['processing_stages']['prompt_engineering']['final_prompt']
    context = enhancement_result['processing_stages']['context_optimization']['optimized_context']

    # Calculate quality scores
    quality_metrics = self._calculate_quality_metrics(enhancement_result)

    # Generate response using enhanced prompt
    try:
        if self.config.enable_monitoring:
            response_start = time.time()

        final_response = self.llm_model.predict(final_prompt)

        if self.config.enable_monitoring:
            response_time = time.time() - response_start
        else:
            response_time = 0.0

    except Exception as e:
        self.logger.error(f"LLM response generation error: {e}")
        final_response = f"Error generating response: {e}"
        response_time = 0.0

    return {
        'query': query,
        'response': final_response,
        'enhanced_context': context,
        'final_prompt': final_prompt,
        'quality_metrics': quality_metrics,
        'processing_metadata': {
            'total_processing_time': time.time() - enhancement_result['timestamp'],
            'response_generation_time': response_time,
            'enhancement_stages_completed': len(enhancement_result['processing_stages']),
            'config_used': enhancement_result['config_used']
        },
        'enhancement_details': enhancement_result['processing_stages']
    }

def _calculate_quality_metrics(self, enhancement_result: Dict) -> Dict[str, float]:
    """Calculate quality metrics for the enhancement process."""

    metrics = {
        'enhancement_coverage': 0.0,
        'context_efficiency': 0.0,
        'retrieval_diversity': 0.0,
        'overall_quality_score': 0.0
    }

    # Enhancement coverage (how many techniques were successfully applied)
    stages = enhancement_result['processing_stages']
    total_possible_enhancements = 4  # hyde, expansion, context_opt, prompt_eng
    successful_enhancements = sum(1 for stage in stages.values() if stage.get('success', True))
    metrics['enhancement_coverage'] = successful_enhancements / total_possible_enhancements

    # Context efficiency (from context optimization)
    context_stage = stages.get('context_optimization', {})
    metrics['context_efficiency'] = context_stage.get('efficiency_score', 0.5)

    # Retrieval diversity (from retrieval results)
    retrieval_stage = stages.get('enhanced_retrieval', {})
    retrieval_metadata = retrieval_stage.get('retrieval_metadata', {})
    metrics['retrieval_diversity'] = retrieval_metadata.get('success_rate', 0.5)

    # Overall quality score (weighted average)
    metrics['overall_quality_score'] = (
        0.3 * metrics['enhancement_coverage'] +
        0.4 * metrics['context_efficiency'] +
        0.3 * metrics['retrieval_diversity']
    )

    return metrics

Quality Assessment: Comprehensive quality metrics and result assembly with detailed metadata for monitoring and optimization.

Production Deployment Integration

Step 11: Caching and Performance Optimization

def _check_cache(self, query: str, custom_config: Dict = None) -> Optional[Dict]:
    """Check cache for previous results."""

    if not self.config.enable_caching:
        return None

    cache_key = self._generate_cache_key(query, custom_config)

    if cache_key in self.cache:
        cached_entry = self.cache[cache_key]

        # Check TTL
        if time.time() - cached_entry['timestamp'] < self.cache_ttl:
            return cached_entry['result']
        else:
            # Expired, remove from cache
            del self.cache[cache_key]

    return None

def _cache_result(self, query: str, custom_config: Dict, result: Dict):
    """Cache result for future use."""

    if not self.config.enable_caching:
        return

    cache_key = self._generate_cache_key(query, custom_config)

    self.cache[cache_key] = {
        'result': result,
        'timestamp': time.time()
    }

    # Simple cache size management
    if len(self.cache) > 1000:  # Max 1000 entries
        # Remove oldest entries
        oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k]['timestamp'])
        del self.cache[oldest_key]

def _generate_cache_key(self, query: str, custom_config: Dict = None) -> str:
    """Generate cache key from query and config."""

    config_hash = hash(str(sorted((custom_config or {}).items())))
    query_hash = hash(query.strip().lower())

    return f"{query_hash}_{config_hash}"

Step 12: Monitoring and Analytics

def get_performance_metrics(self) -> Dict[str, Any]:
    """Get comprehensive performance metrics."""

    return {
        'processing_metrics': self.metrics.copy(),
        'cache_metrics': {
            'cache_size': len(self.cache) if self.config.enable_caching else 0,
            'hit_rate': self.metrics['cache_hits'] / max(1, self.metrics['cache_hits'] + self.metrics['cache_misses'])
        },
        'component_status': self._get_component_status(),
        'system_health': self._assess_system_health()
    }

def _get_component_status(self) -> Dict[str, str]:
    """Check status of all enhancement components."""

    status = {}

    components = [
        ('hyde_enhancer', 'HyDE'),
        ('query_expander', 'Query Expansion'),
        ('multi_query_gen', 'Multi-Query Generation'),
        ('context_optimizer', 'Context Optimization'),
        ('prompt_engineer', 'Prompt Engineering'),
        ('dynamic_adapter', 'Dynamic Adaptation')
    ]

    for attr_name, display_name in components:
        if hasattr(self, attr_name) and getattr(self, attr_name) is not None:
            status[display_name] = 'active'
        else:
            status[display_name] = 'inactive'

    return status

def _assess_system_health(self) -> str:
    """Assess overall system health."""

    error_rate = self.metrics['error_count'] / max(1, self.metrics['queries_processed'])
    success_rate = self.metrics['enhancement_success_rate']
    avg_time = self.metrics['average_processing_time']

    if error_rate < 0.05 and success_rate > 0.9 and avg_time < 5.0:
        return 'excellent'
    elif error_rate < 0.1 and success_rate > 0.8 and avg_time < 10.0:
        return 'good'
    elif error_rate < 0.2 and success_rate > 0.7 and avg_time < 20.0:
        return 'fair'
    else:
        return 'poor'

Usage Examples and Testing

Step 13: Complete Usage Example

# Initialize the production pipeline
async def main():
    # Setup components
    config = EnhancementConfig(
        strategy=EnhancementStrategy.COMPREHENSIVE,
        use_hyde=True,
        use_expansion=True,
        use_multi_query=True,
        use_context_optimization=True,
        use_advanced_prompting=True,
        max_context_tokens=4000,
        enable_monitoring=True,
        enable_caching=True
    )

    pipeline = ProductionQueryEnhancementPipeline(
        llm_model=llm_model,
        embedding_model=embedding_model,
        tokenizer=tokenizer,
        vector_store=vector_store,
        config=config
    )

    # Test with various query types
    test_queries = [
        "How do I implement authentication in a React application?",
        "What are the pros and cons of microservices vs monolithic architecture?",
        "Explain the differences between SQL and NoSQL databases",
        "How to optimize database query performance?"
    ]

    results = []
    for query in test_queries:
        print(f"\nProcessing: {query}")

        result = await pipeline.enhance_query_complete(query)
        results.append(result)

        print(f"Quality Score: {result['quality_metrics']['overall_quality_score']:.3f}")
        print(f"Processing Time: {result['processing_metadata']['total_processing_time']:.2f}s")
        print(f"Response: {result['response'][:200]}...")

    # Get performance metrics
    metrics = pipeline.get_performance_metrics()
    print(f"\nSystem Performance:")
    print(f"Queries Processed: {metrics['processing_metrics']['queries_processed']}")
    print(f"Average Processing Time: {metrics['processing_metrics']['average_processing_time']:.2f}s")
    print(f"Success Rate: {metrics['processing_metrics']['enhancement_success_rate']:.3f}")
    print(f"Cache Hit Rate: {metrics['cache_metrics']['hit_rate']:.3f}")
    print(f"System Health: {metrics['system_health']}")

# Run the example
if __name__ == "__main__":
    asyncio.run(main())

Complete Integration: Production-ready example demonstrating comprehensive system usage with monitoring and performance tracking.

Production Deployment Considerations

Performance Optimization

  • Concurrent Processing: All enhancement techniques run in parallel where possible
  • Intelligent Caching: Query-based caching with TTL and size management
  • Resource Management: Thread pool management and timeout controls

Error Handling and Reliability

  • Graceful Degradation: System continues with reduced functionality if components fail
  • Comprehensive Logging: Detailed logging for monitoring and debugging
  • Fallback Mechanisms: Basic implementations when advanced features fail

Monitoring and Observability

  • Performance Metrics: Processing times, success rates, and resource usage
  • Quality Metrics: Enhancement effectiveness and result quality assessment
  • Health Monitoring: Component status and system health indicators

Configuration Management

  • Strategy Selection: Pre-defined strategies for different use cases
  • Dynamic Configuration: Runtime configuration updates for optimization
  • Environment Adaptation: Different settings for development, staging, and production

Previous: Session 3 - Advanced Patterns →
Next: Session 5 - Type-Safe Development →