Skip to content

📝 Session 3: Performance Optimization

Prerequisites

Complete these documents before starting:
1. 🎯 Observer Path
2. 📝 Production Implementation Guide

This document covers advanced performance optimization techniques for production vector search systems.

Part 1: Advanced Caching Strategies

Intelligent Cache Management

Beyond simple query caching, production systems benefit from sophisticated cache strategies that adapt to usage patterns:

import time
import threading
from collections import OrderedDict
from dataclasses import dataclass
from typing import Dict, Any, Optional
import hashlib
import pickle

@dataclass
class CacheEntry:
    """Enhanced cache entry with metadata."""
    data: Any
    created_at: float
    last_accessed: float
    access_count: int
    size_bytes: int

    def is_expired(self, ttl_seconds: int) -> bool:
        """Check if entry has expired."""
        return time.time() - self.created_at > ttl_seconds

class AdaptiveQueryCache:
    """Production-grade cache with adaptive eviction and TTL."""

    def __init__(self, max_size_mb: int = 100, default_ttl_seconds: int = 3600):
        self.max_size_bytes = max_size_mb * 1024 * 1024
        self.default_ttl = default_ttl_seconds
        self.cache = OrderedDict()
        self.current_size_bytes = 0
        self.stats = {
            'hits': 0,
            'misses': 0,
            'evictions': 0,
            'expired_removals': 0
        }
        self._lock = threading.RLock()

    def _calculate_size(self, obj: Any) -> int:
        """Estimate object size in bytes."""
        try:
            return len(pickle.dumps(obj))
        except:
            # Fallback estimation
            return len(str(obj)) * 4

    def _create_key(self, query: str, **kwargs) -> str:
        """Create consistent cache key."""
        key_data = f"{query}_{sorted(kwargs.items())}"
        return hashlib.sha256(key_data.encode()).hexdigest()[:16]

The adaptive cache uses size-based and time-based eviction strategies. The size calculation enables memory-aware caching, while the threading lock ensures thread safety.

    def get(self, query: str, **kwargs) -> Optional[Any]:
        """Retrieve from cache with intelligent access tracking."""

        with self._lock:
            cache_key = self._create_key(query, **kwargs)

            if cache_key not in self.cache:
                self.stats['misses'] += 1
                return None

            entry = self.cache[cache_key]

            # Check expiration
            if entry.is_expired(self.default_ttl):
                del self.cache[cache_key]
                self.current_size_bytes -= entry.size_bytes
                self.stats['expired_removals'] += 1
                self.stats['misses'] += 1
                return None

            # Update access metadata
            current_time = time.time()
            entry.last_accessed = current_time
            entry.access_count += 1

            # Move to end (LRU)
            self.cache.move_to_end(cache_key)

            self.stats['hits'] += 1
            return entry.data

    def put(self, query: str, data: Any, **kwargs) -> bool:
        """Store in cache with intelligent eviction."""

        with self._lock:
            cache_key = self._create_key(query, **kwargs)
            data_size = self._calculate_size(data)

            # Don't cache oversized items
            if data_size > self.max_size_bytes * 0.1:  # Max 10% of cache size
                return False

            current_time = time.time()
            entry = CacheEntry(
                data=data,
                created_at=current_time,
                last_accessed=current_time,
                access_count=1,
                size_bytes=data_size
            )

            # Remove existing entry if present
            if cache_key in self.cache:
                old_entry = self.cache[cache_key]
                self.current_size_bytes -= old_entry.size_bytes
                del self.cache[cache_key]

            # Evict entries if needed
            while (self.current_size_bytes + data_size > self.max_size_bytes and
                   self.cache):
                self._evict_lru()

            # Add new entry
            self.cache[cache_key] = entry
            self.current_size_bytes += data_size

            return True

    def _evict_lru(self):
        """Evict least recently used item."""
        if not self.cache:
            return

        # Find LRU item
        lru_key = next(iter(self.cache))
        lru_entry = self.cache[lru_key]

        # Remove it
        del self.cache[lru_key]
        self.current_size_bytes -= lru_entry.size_bytes
        self.stats['evictions'] += 1

The intelligent eviction strategy prioritizes frequently accessed items while preventing oversized objects from dominating the cache. The size tracking prevents memory exhaustion.

Cache Warming and Precomputation

class CacheWarmer:
    """Proactive cache warming for common queries."""

    def __init__(self, search_engine, cache, query_log_path: str):
        self.search_engine = search_engine
        self.cache = cache
        self.query_log_path = query_log_path
        self.popular_queries = []

    def analyze_query_patterns(self, min_frequency: int = 5) -> List[str]:
        """Analyze query logs to identify popular queries."""

        query_counts = {}

        try:
            with open(self.query_log_path, 'r') as f:
                for line in f:
                    # Assume log format: timestamp|query|results_count
                    parts = line.strip().split('|')
                    if len(parts) >= 2:
                        query = parts[1].strip()
                        query_counts[query] = query_counts.get(query, 0) + 1
        except FileNotFoundError:
            logging.warning(f"Query log not found: {self.query_log_path}")
            return []

        # Filter by frequency
        popular = [query for query, count in query_counts.items()
                  if count >= min_frequency]

        # Sort by frequency
        popular.sort(key=lambda q: query_counts[q], reverse=True)

        self.popular_queries = popular[:100]  # Top 100
        return self.popular_queries

    def warm_cache(self, max_queries: int = 50):
        """Proactively warm cache with popular queries."""

        if not self.popular_queries:
            self.analyze_query_patterns()

        warmed_count = 0
        for query in self.popular_queries[:max_queries]:
            try:
                # Check if already cached
                cached_result = self.cache.get(query, top_k=10)

                if cached_result is None:
                    # Execute search and cache result
                    result = self.search_engine.hybrid_search(query, top_k=10)
                    self.cache.put(query, result, top_k=10)
                    warmed_count += 1

                    # Rate limiting to avoid overwhelming system
                    time.sleep(0.1)

            except Exception as e:
                logging.error(f"Failed to warm cache for query '{query}': {str(e)}")

        logging.info(f"Cache warming completed: {warmed_count} queries precomputed")

Cache warming proactively computes results for popular queries during off-peak hours. The query pattern analysis identifies optimization opportunities based on actual usage.

Part 2: Comprehensive Performance Monitoring

Real-time Performance Metrics

import asyncio
import statistics
from collections import deque, defaultdict
from datetime import datetime, timedelta
import json

class PerformanceMonitor:
    """Real-time performance monitoring with alerting."""

    def __init__(self, window_size_minutes: int = 5):
        self.window_size = timedelta(minutes=window_size_minutes)
        self.metrics_buffer = deque(maxlen=1000)  # Keep last 1000 operations
        self.alerts = []
        self.thresholds = {
            'p95_latency_ms': 500,
            'error_rate_percent': 5.0,
            'cache_hit_rate_percent': 60.0
        }

    def record_operation(self, operation_type: str, latency_ms: float,
                        success: bool, cache_hit: bool = False):
        """Record a single operation for monitoring."""

        record = {
            'timestamp': datetime.now(),
            'operation_type': operation_type,
            'latency_ms': latency_ms,
            'success': success,
            'cache_hit': cache_hit
        }

        self.metrics_buffer.append(record)

    def get_current_metrics(self) -> Dict[str, float]:
        """Calculate current performance metrics."""

        if not self.metrics_buffer:
            return {}

        # Filter to current window
        cutoff_time = datetime.now() - self.window_size
        recent_ops = [op for op in self.metrics_buffer
                     if op['timestamp'] >= cutoff_time]

        if not recent_ops:
            return {}

        # Calculate metrics
        latencies = [op['latency_ms'] for op in recent_ops if op['success']]
        total_ops = len(recent_ops)
        successful_ops = len([op for op in recent_ops if op['success']])
        cache_hits = len([op for op in recent_ops if op['cache_hit']])

        metrics = {
            'total_operations': total_ops,
            'success_rate_percent': (successful_ops / total_ops * 100) if total_ops > 0 else 0,
            'error_rate_percent': ((total_ops - successful_ops) / total_ops * 100) if total_ops > 0 else 0,
            'cache_hit_rate_percent': (cache_hits / total_ops * 100) if total_ops > 0 else 0,
            'operations_per_minute': total_ops / self.window_size.total_seconds() * 60
        }

        if latencies:
            latencies.sort()
            metrics.update({
                'avg_latency_ms': statistics.mean(latencies),
                'p50_latency_ms': statistics.median(latencies),
                'p95_latency_ms': latencies[int(len(latencies) * 0.95)] if len(latencies) > 1 else latencies[0],
                'p99_latency_ms': latencies[int(len(latencies) * 0.99)] if len(latencies) > 1 else latencies[0],
                'min_latency_ms': min(latencies),
                'max_latency_ms': max(latencies)
            })

        return metrics

    def check_alerts(self) -> List[Dict]:
        """Check for performance threshold violations."""

        metrics = self.get_current_metrics()
        new_alerts = []

        # Check P95 latency
        if 'p95_latency_ms' in metrics:
            if metrics['p95_latency_ms'] > self.thresholds['p95_latency_ms']:
                new_alerts.append({
                    'type': 'high_latency',
                    'message': f"P95 latency ({metrics['p95_latency_ms']:.1f}ms) exceeds threshold ({self.thresholds['p95_latency_ms']}ms)",
                    'severity': 'warning',
                    'timestamp': datetime.now(),
                    'value': metrics['p95_latency_ms']
                })

        # Check error rate
        if 'error_rate_percent' in metrics:
            if metrics['error_rate_percent'] > self.thresholds['error_rate_percent']:
                new_alerts.append({
                    'type': 'high_error_rate',
                    'message': f"Error rate ({metrics['error_rate_percent']:.1f}%) exceeds threshold ({self.thresholds['error_rate_percent']}%)",
                    'severity': 'critical',
                    'timestamp': datetime.now(),
                    'value': metrics['error_rate_percent']
                })

        # Check cache hit rate
        if 'cache_hit_rate_percent' in metrics:
            if metrics['cache_hit_rate_percent'] < self.thresholds['cache_hit_rate_percent']:
                new_alerts.append({
                    'type': 'low_cache_hit_rate',
                    'message': f"Cache hit rate ({metrics['cache_hit_rate_percent']:.1f}%) below threshold ({self.thresholds['cache_hit_rate_percent']}%)",
                    'severity': 'warning',
                    'timestamp': datetime.now(),
                    'value': metrics['cache_hit_rate_percent']
                })

        self.alerts.extend(new_alerts)
        return new_alerts

The performance monitor provides real-time metrics calculation with configurable alerting thresholds. The sliding window approach keeps metrics current and relevant.

Load Testing Framework

import concurrent.futures
import random
from typing import Callable, List
import numpy as np

class LoadTester:
    """Comprehensive load testing framework."""

    def __init__(self, search_function: Callable, monitor: PerformanceMonitor):
        self.search_function = search_function
        self.monitor = monitor

    def generate_test_queries(self, base_queries: List[str], count: int) -> List[str]:
        """Generate varied test queries from base set."""

        test_queries = []

        for _ in range(count):
            base_query = random.choice(base_queries)

            # Add variations
            variations = [
                base_query,  # Original
                base_query + " details",  # Extended
                base_query.replace(" ", " and "),  # Modified
                f"What about {base_query}?",  # Question format
            ]

            test_queries.append(random.choice(variations))

        return test_queries

    def run_load_test(self, test_queries: List[str],
                     concurrent_users: int = 10,
                     duration_minutes: int = 5) -> Dict:
        """Execute comprehensive load test."""

        end_time = time.time() + (duration_minutes * 60)
        test_results = {
            'total_queries': 0,
            'successful_queries': 0,
            'failed_queries': 0,
            'latencies': [],
            'errors': []
        }

        def worker_function():
            """Worker function for single user simulation."""
            worker_queries = 0
            worker_errors = 0

            while time.time() < end_time:
                query = random.choice(test_queries)

                try:
                    start_time = time.time()
                    result = self.search_function(query)
                    latency_ms = (time.time() - start_time) * 1000

                    # Record in monitor
                    self.monitor.record_operation(
                        'load_test_query',
                        latency_ms,
                        success=True
                    )

                    test_results['latencies'].append(latency_ms)
                    worker_queries += 1

                except Exception as e:
                    worker_errors += 1
                    test_results['errors'].append(str(e))

                    # Record failure in monitor
                    self.monitor.record_operation(
                        'load_test_query',
                        0,
                        success=False
                    )

                # Brief pause between queries
                time.sleep(random.uniform(0.1, 1.0))

            return worker_queries, worker_errors

        # Run concurrent workers
        with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
            futures = [executor.submit(worker_function) for _ in range(concurrent_users)]

            for future in concurrent.futures.as_completed(futures):
                queries, errors = future.result()
                test_results['total_queries'] += queries
                test_results['failed_queries'] += errors

        test_results['successful_queries'] = test_results['total_queries'] - test_results['failed_queries']

        # Calculate summary statistics
        if test_results['latencies']:
            latencies = test_results['latencies']
            test_results['latency_stats'] = {
                'mean_ms': np.mean(latencies),
                'median_ms': np.median(latencies),
                'p95_ms': np.percentile(latencies, 95),
                'p99_ms': np.percentile(latencies, 99),
                'std_ms': np.std(latencies)
            }

        return test_results

The load testing framework simulates realistic user behavior with varied query patterns and timing. The concurrent execution provides accurate load simulation.

Part 3: Adaptive Parameter Tuning

Intelligent Performance Optimization

class AdaptiveOptimizer:
    """Automatic performance parameter optimization."""

    def __init__(self, search_engine, monitor: PerformanceMonitor):
        self.search_engine = search_engine
        self.monitor = monitor
        self.optimization_history = []
        self.current_params = {
            'cache_size': 1000,
            'hnsw_ef_search': 100,
            'batch_size': 1000,
            'timeout_seconds': 30
        }

    def should_optimize(self) -> bool:
        """Determine if optimization is warranted."""

        metrics = self.monitor.get_current_metrics()

        # Optimize if performance is degraded
        conditions = [
            metrics.get('p95_latency_ms', 0) > 300,  # High latency
            metrics.get('error_rate_percent', 0) > 2,  # High error rate
            metrics.get('cache_hit_rate_percent', 100) < 50,  # Low cache hits
            len(self.optimization_history) == 0  # Never optimized
        ]

        return any(conditions)

    def optimize_cache_size(self) -> Dict:
        """Optimize cache size based on hit rates."""

        metrics = self.monitor.get_current_metrics()
        current_hit_rate = metrics.get('cache_hit_rate_percent', 0)

        optimization = {
            'parameter': 'cache_size',
            'old_value': self.current_params['cache_size'],
            'new_value': self.current_params['cache_size'],
            'reason': 'No change needed'
        }

        if current_hit_rate < 50:
            # Increase cache size
            new_size = min(5000, int(self.current_params['cache_size'] * 1.5))
            optimization.update({
                'new_value': new_size,
                'reason': f'Increasing cache size due to low hit rate ({current_hit_rate:.1f}%)'
            })
            self.current_params['cache_size'] = new_size

        elif current_hit_rate > 90:
            # Potentially reduce cache size to free memory
            new_size = max(500, int(self.current_params['cache_size'] * 0.8))
            optimization.update({
                'new_value': new_size,
                'reason': f'Reducing cache size - high hit rate ({current_hit_rate:.1f}%) suggests over-caching'
            })
            self.current_params['cache_size'] = new_size

        return optimization

    def optimize_search_parameters(self) -> List[Dict]:
        """Optimize search-related parameters."""

        metrics = self.monitor.get_current_metrics()
        optimizations = []

        # Optimize HNSW ef_search based on latency
        current_latency = metrics.get('p95_latency_ms', 0)

        if current_latency > 200:
            # Reduce ef_search for speed
            new_ef = max(32, self.current_params['hnsw_ef_search'] - 32)
            optimizations.append({
                'parameter': 'hnsw_ef_search',
                'old_value': self.current_params['hnsw_ef_search'],
                'new_value': new_ef,
                'reason': f'Reducing ef_search due to high latency ({current_latency:.1f}ms)'
            })
            self.current_params['hnsw_ef_search'] = new_ef

        elif current_latency < 50:
            # Increase ef_search for better accuracy
            new_ef = min(256, self.current_params['hnsw_ef_search'] + 32)
            optimizations.append({
                'parameter': 'hnsw_ef_search',
                'old_value': self.current_params['hnsw_ef_search'],
                'new_value': new_ef,
                'reason': f'Increasing ef_search due to low latency ({current_latency:.1f}ms) - room for better accuracy'
            })
            self.current_params['hnsw_ef_search'] = new_ef

        return optimizations

    def run_optimization_cycle(self) -> Dict:
        """Execute complete optimization cycle."""

        if not self.should_optimize():
            return {'optimizations_applied': 0, 'reason': 'No optimization needed'}

        logging.info("Starting adaptive optimization cycle")

        # Collect current performance baseline
        baseline_metrics = self.monitor.get_current_metrics()

        # Apply optimizations
        optimizations = []

        # Cache optimization
        cache_opt = self.optimize_cache_size()
        if cache_opt['new_value'] != cache_opt['old_value']:
            optimizations.append(cache_opt)

        # Search parameter optimization
        search_opts = self.optimize_search_parameters()
        optimizations.extend(search_opts)

        # Record optimization attempt
        optimization_record = {
            'timestamp': datetime.now(),
            'baseline_metrics': baseline_metrics,
            'optimizations': optimizations,
            'parameters_after': self.current_params.copy()
        }

        self.optimization_history.append(optimization_record)

        logging.info(f"Optimization cycle completed: {len(optimizations)} parameters adjusted")

        return {
            'optimizations_applied': len(optimizations),
            'details': optimizations,
            'new_parameters': self.current_params
        }

The adaptive optimizer automatically adjusts parameters based on observed performance patterns. The optimization history enables learning from past adjustments.

Part 4: Advanced Monitoring Dashboard

Metrics Export and Visualization

import json
from datetime import datetime
from typing import Dict, List

class MetricsExporter:
    """Export metrics for external monitoring systems."""

    def __init__(self, monitor: PerformanceMonitor, optimizer: AdaptiveOptimizer):
        self.monitor = monitor
        self.optimizer = optimizer

    def export_prometheus_metrics(self) -> str:
        """Export metrics in Prometheus format."""

        metrics = self.monitor.get_current_metrics()
        prometheus_lines = []

        # Add help and type information
        prometheus_lines.extend([
            "# HELP vector_search_latency_ms Query latency in milliseconds",
            "# TYPE vector_search_latency_ms histogram",
            "# HELP vector_search_requests_total Total number of search requests",
            "# TYPE vector_search_requests_total counter",
            "# HELP vector_search_cache_hit_rate Cache hit rate percentage",
            "# TYPE vector_search_cache_hit_rate gauge"
        ])

        # Export current metrics
        timestamp = int(time.time() * 1000)

        if 'p95_latency_ms' in metrics:
            prometheus_lines.append(
                f'vector_search_latency_ms{{quantile="0.95"}} {metrics["p95_latency_ms"]} {timestamp}'
            )
            prometheus_lines.append(
                f'vector_search_latency_ms{{quantile="0.99"}} {metrics["p99_latency_ms"]} {timestamp}'
            )

        if 'total_operations' in metrics:
            prometheus_lines.append(
                f'vector_search_requests_total {metrics["total_operations"]} {timestamp}'
            )

        if 'cache_hit_rate_percent' in metrics:
            prometheus_lines.append(
                f'vector_search_cache_hit_rate {metrics["cache_hit_rate_percent"]} {timestamp}'
            )

        return '\n'.join(prometheus_lines)

    def export_json_dashboard(self) -> str:
        """Export comprehensive dashboard data as JSON."""

        current_metrics = self.monitor.get_current_metrics()
        recent_alerts = self.monitor.alerts[-10:]  # Last 10 alerts
        optimization_history = self.optimizer.optimization_history[-5:]  # Last 5 optimizations

        dashboard_data = {
            'timestamp': datetime.now().isoformat(),
            'system_status': {
                'overall_health': self._determine_health_status(current_metrics),
                'active_alerts': len([a for a in recent_alerts if
                                    datetime.now() - a['timestamp'] < timedelta(hours=1)]),
                'last_optimization': optimization_history[-1]['timestamp'].isoformat() if optimization_history else None
            },
            'performance_metrics': current_metrics,
            'recent_alerts': [
                {
                    'type': alert['type'],
                    'message': alert['message'],
                    'severity': alert['severity'],
                    'timestamp': alert['timestamp'].isoformat(),
                    'value': alert['value']
                }
                for alert in recent_alerts
            ],
            'optimization_history': [
                {
                    'timestamp': opt['timestamp'].isoformat(),
                    'optimizations_count': len(opt['optimizations']),
                    'parameters': opt['parameters_after']
                }
                for opt in optimization_history
            ],
            'configuration': {
                'cache_size': self.optimizer.current_params['cache_size'],
                'hnsw_ef_search': self.optimizer.current_params['hnsw_ef_search'],
                'monitoring_window_minutes': 5
            }
        }

        return json.dumps(dashboard_data, indent=2)

    def _determine_health_status(self, metrics: Dict) -> str:
        """Determine overall system health."""

        if not metrics:
            return 'unknown'

        issues = []

        # Check key metrics
        if metrics.get('error_rate_percent', 0) > 5:
            issues.append('high_error_rate')

        if metrics.get('p95_latency_ms', 0) > 500:
            issues.append('high_latency')

        if metrics.get('cache_hit_rate_percent', 100) < 40:
            issues.append('poor_cache_performance')

        if not issues:
            return 'healthy'
        elif len(issues) == 1:
            return 'degraded'
        else:
            return 'unhealthy'

class PerformanceDashboard:
    """Simple web dashboard for performance monitoring."""

    def __init__(self, exporter: MetricsExporter, port: int = 8080):
        self.exporter = exporter
        self.port = port

    def generate_html_dashboard(self) -> str:
        """Generate HTML dashboard."""

        dashboard_data = json.loads(self.exporter.export_json_dashboard())

        html = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>Vector Search Performance Dashboard</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                .metric-box {{ border: 1px solid #ddd; padding: 15px; margin: 10px; border-radius: 5px; }}
                .healthy {{ border-color: #28a745; }}
                .degraded {{ border-color: #ffc107; }}
                .unhealthy {{ border-color: #dc3545; }}
                .alert {{ background-color: #f8d7da; border-color: #dc3545; color: #721c24; }}
                table {{ border-collapse: collapse; width: 100%; }}
                th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
                th {{ background-color: #f2f2f2; }}
            </style>
            <meta http-equiv="refresh" content="30">
        </head>
        <body>
            <h1>Vector Search Performance Dashboard</h1>
            <p>Last Updated: {dashboard_data['timestamp']}</p>

            <div class="metric-box {dashboard_data['system_status']['overall_health']}">
                <h2>System Status: {dashboard_data['system_status']['overall_health'].upper()}</h2>
                <p>Active Alerts: {dashboard_data['system_status']['active_alerts']}</p>
                <p>Last Optimization: {dashboard_data['system_status']['last_optimization'] or 'Never'}</p>
            </div>

            <div class="metric-box">
                <h2>Performance Metrics</h2>
                <table>
                    <tr><th>Metric</th><th>Value</th></tr>
        """

        # Add performance metrics
        metrics = dashboard_data['performance_metrics']
        for key, value in metrics.items():
            if isinstance(value, float):
                formatted_value = f"{value:.2f}"
            else:
                formatted_value = str(value)
            html += f"<tr><td>{key.replace('_', ' ').title()}</td><td>{formatted_value}</td></tr>"

        html += """
                </table>
            </div>

            <div class="metric-box">
                <h2>Recent Alerts</h2>
        """

        # Add alerts
        if dashboard_data['recent_alerts']:
            html += "<ul>"
            for alert in dashboard_data['recent_alerts'][-5:]:  # Last 5 alerts
                html += f'<li class="alert"><strong>{alert["severity"].upper()}</strong>: {alert["message"]} ({alert["timestamp"]})</li>'
            html += "</ul>"
        else:
            html += "<p>No recent alerts</p>"

        html += """
            </div>
        </body>
        </html>
        """

        return html

The dashboard provides comprehensive visibility into system performance with both machine-readable (Prometheus, JSON) and human-readable (HTML) formats.

Part 5: Production Optimization Checklist

Performance Optimization Validation

class OptimizationValidator:
    """Validate optimization effectiveness."""

    def __init__(self, search_engine, test_queries: List[str]):
        self.search_engine = search_engine
        self.test_queries = test_queries

    def validate_optimizations(self, pre_optimization_metrics: Dict,
                              post_optimization_metrics: Dict) -> Dict:
        """Compare before and after optimization metrics."""

        improvements = {}
        regressions = {}

        # Key metrics to compare
        key_metrics = [
            'p95_latency_ms',
            'cache_hit_rate_percent',
            'error_rate_percent',
            'operations_per_minute'
        ]

        for metric in key_metrics:
            if metric in pre_optimization_metrics and metric in post_optimization_metrics:
                old_value = pre_optimization_metrics[metric]
                new_value = post_optimization_metrics[metric]

                # Calculate percentage change
                if old_value != 0:
                    change_percent = ((new_value - old_value) / old_value) * 100
                else:
                    change_percent = float('inf') if new_value > 0 else 0

                # Determine if improvement or regression
                if metric == 'error_rate_percent':
                    # Lower is better for error rate
                    if change_percent < -5:  # 5% improvement threshold
                        improvements[metric] = {
                            'old': old_value,
                            'new': new_value,
                            'improvement_percent': abs(change_percent)
                        }
                    elif change_percent > 5:
                        regressions[metric] = {
                            'old': old_value,
                            'new': new_value,
                            'regression_percent': change_percent
                        }
                else:
                    # Higher is better for other metrics
                    if change_percent > 5:  # 5% improvement threshold
                        improvements[metric] = {
                            'old': old_value,
                            'new': new_value,
                            'improvement_percent': change_percent
                        }
                    elif change_percent < -5:
                        regressions[metric] = {
                            'old': old_value,
                            'new': new_value,
                            'regression_percent': abs(change_percent)
                        }

        return {
            'improvements': improvements,
            'regressions': regressions,
            'net_score': len(improvements) - len(regressions),
            'recommendation': 'keep' if len(improvements) >= len(regressions) else 'revert'
        }

The validation framework ensures optimizations actually improve performance and don't introduce regressions.

Key Performance Optimization Principles

Essential Takeaways

Advanced Caching:
- Implement size-aware and time-based cache eviction
- Use cache warming for popular queries
- Monitor cache hit rates continuously

Performance Monitoring:
- Track P95/P99 latencies, not just averages
- Implement real-time alerting for threshold violations
- Use comprehensive load testing for capacity planning

Adaptive Optimization:
- Automatically adjust parameters based on observed performance
- Validate optimization effectiveness with A/B testing
- Maintain optimization history for learning

Production Monitoring:
- Export metrics in standard formats (Prometheus, JSON)
- Provide both human and machine-readable dashboards
- Implement comprehensive health checks

Next Steps

⚙️ Ready for Advanced Topics?

If you've mastered performance optimization, explore advanced implementer topics:
- Advanced HNSW Tuning
- Advanced Hybrid Search


Previous: Session 2 - Implementation →
Next: Session 4 - Team Orchestration →