⚙️ Session 9 Advanced: Complete Production RAG Architecture¶
⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer and 📝 Participant paths Time Investment: 8-12 hours Outcome: Master enterprise-grade production RAG deployment
Advanced Production Learning Outcomes¶
After completing this advanced module, you will master:
- Complete microservices orchestration with dependency management
- High-performance embedding services with intelligent caching
- Advanced load balancing with multiple strategies and auto-scaling
- Comprehensive monitoring with analytics and performance prediction
- Production deployment patterns and container orchestration
Part 1: Complete Production Orchestrator¶
Advanced Service Infrastructure¶
The production orchestrator manages complex service dependencies and health monitoring:
# Production-ready containerized RAG system
from typing import Dict, List, Any, Optional
import asyncio
from dataclasses import dataclass
from enum import Enum
import logging
import time
from datetime import datetime
This foundation imports handle asynchronous operations, data structures, and monitoring infrastructure essential for production RAG systems. The datetime module supports comprehensive logging and audit trails required in enterprise environments.
class ServiceStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class ServiceHealth:
"""Health check result for RAG services."""
service_name: str
status: ServiceStatus
response_time_ms: float
error_count: int
last_check: datetime
details: Dict
# Track the connection for load metrics
if selected_instance:
self.load_metrics[service_name][selected_instance]['active_connections'] += 1
return selected_instance
Service selection combines health filtering with strategy-based optimization. Connection tracking creates a feedback loop that improves future routing decisions by considering actual load patterns.
Response Time-Based Selection¶
async def _response_time_selection(self, service_name: str,
healthy_instances: List[Any]) -> Any:
"""Select instance with best average response time."""
best_instance = None
best_response_time = float('inf')
for instance in healthy_instances:
metrics = self.load_metrics[service_name][instance]
avg_response_time = metrics['avg_response_time']
# Adjust response time based on current load
# Higher active connections increase the adjusted time
adjusted_time = avg_response_time * (1 + metrics['active_connections'] * 0.1)
if adjusted_time < best_response_time:
best_response_time = adjusted_time
best_instance = instance
return best_instance
Response time selection balances historical performance with current load. The adjustment factor prevents fast instances from becoming overloaded while still preferring high-performance services.
Auto-Scaling System¶
class RAGAutoScaler:
"""Auto-scaling system for RAG services based on load and performance metrics."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.scaling_policies = {} # Per-service scaling configurations
self.monitoring_interval = config.get('monitoring_interval', 30) # seconds
# Define scale-up thresholds
self.scale_up_thresholds = config.get('scale_up', {
'cpu_threshold': 70.0, # CPU usage percentage
'memory_threshold': 80.0, # Memory usage percentage
'response_time_threshold': 2.0, # Response time in seconds
'queue_size_threshold': 100, # Queue backlog size
'error_rate_threshold': 5.0 # Error rate percentage
})
# Define scale-down thresholds (more conservative)
self.scale_down_thresholds = config.get('scale_down', {
'cpu_threshold': 30.0,
'memory_threshold': 40.0,
'response_time_threshold': 0.5,
'queue_size_threshold': 10,
'stable_duration': 300 # Require 5 minutes of stability
})
# Start continuous monitoring
self.monitoring_task = asyncio.create_task(self._continuous_monitoring())
Auto-scaling configuration separates scale-up and scale-down thresholds to prevent oscillation. Conservative scale-down requirements ensure stability during variable load periods common in enterprise RAG systems.
Service Registration for Scaling¶
async def register_service_for_scaling(self, service_name: str,
scaling_config: Dict[str, Any]):
"""Register service for auto-scaling with specific configuration."""
self.scaling_policies[service_name] = {
'min_instances': scaling_config.get('min_instances', 1),
'max_instances': scaling_config.get('max_instances', 10),
'current_instances': scaling_config.get('current_instances', 1),
'scaling_cooldown': scaling_config.get('cooldown', 300),
'last_scaling_action': 0,
'stability_window': [],
'custom_thresholds': scaling_config.get('thresholds', {})
}
Per-service scaling policies enable fine-tuning for different RAG components. Vector stores might need different scaling characteristics than embedding services due to their varying resource requirements and startup times.
Continuous Monitoring Loop¶
async def _continuous_monitoring(self):
"""Continuously monitor services and trigger scaling decisions."""
while True:
try:
# Check each registered service
for service_name in self.scaling_policies.keys():
# Collect current performance metrics
current_metrics = await self._collect_service_metrics(service_name)
# Evaluate if scaling action is needed
scaling_decision = await self._evaluate_scaling_decision(
service_name, current_metrics
)
# Execute scaling action if required
if scaling_decision['action'] != 'none':
await self._execute_scaling_action(service_name, scaling_decision)
# Wait before next monitoring cycle
await asyncio.sleep(self.monitoring_interval)
except Exception as e:
self.logger.error(f"Auto-scaling monitoring error: {e}")
await asyncio.sleep(self.monitoring_interval)
Continuous monitoring forms the core of the auto-scaling system. Error handling ensures monitoring continues during partial failures, maintaining system resilience during operational issues.
Scaling Decision Logic¶
async def _evaluate_scaling_decision(self, service_name: str,
metrics: Dict[str, Any]) -> Dict[str, Any]:
"""Evaluate whether scaling action is needed."""
policy = self.scaling_policies[service_name]
current_time = time.time()
# Respect cooldown period to prevent rapid scaling
if current_time - policy['last_scaling_action'] < policy['scaling_cooldown']:
return {'action': 'none', 'reason': 'cooldown_active'}
# Check if any scale-up condition is met
scale_up_triggered = (
metrics['cpu_usage'] > self.scale_up_thresholds['cpu_threshold'] or
metrics['memory_usage'] > self.scale_up_thresholds['memory_threshold'] or
metrics['avg_response_time'] > self.scale_up_thresholds['response_time_threshold'] or
metrics['queue_size'] > self.scale_up_thresholds['queue_size_threshold'] or
metrics['error_rate'] > self.scale_up_thresholds['error_rate_threshold']
)
# Scale up if conditions are met and within limits
if scale_up_triggered and policy['current_instances'] < policy['max_instances']:
return {
'action': 'scale_up',
'target_instances': min(
policy['current_instances'] + 1,
policy['max_instances']
),
'reason': 'high_load_detected',
'metrics': metrics
}
Scale-up evaluation uses OR logic for responsiveness - any threshold breach triggers scaling. This ensures RAG systems maintain performance during various load conditions, from CPU spikes to request queue buildups.
Scale-Down Logic with Stability Requirements¶
# Check scale-down conditions (all must be met)
scale_down_conditions = (
metrics['cpu_usage'] < self.scale_down_thresholds['cpu_threshold'] and
metrics['memory_usage'] < self.scale_down_thresholds['memory_threshold'] and
metrics['avg_response_time'] < self.scale_down_thresholds['response_time_threshold'] and
metrics['queue_size'] < self.scale_down_thresholds['queue_size_threshold']
)
# Track stability over time
policy['stability_window'].append({
'timestamp': current_time,
'stable': scale_down_conditions
})
# Keep only measurements within the stability window
stable_duration = self.scale_down_thresholds['stable_duration']
policy['stability_window'] = [
measurement for measurement in policy['stability_window']
if current_time - measurement['timestamp'] <= stable_duration
]
# Scale down only if consistently stable
if (len(policy['stability_window']) > 0 and
all(m['stable'] for m in policy['stability_window']) and
policy['current_instances'] > policy['min_instances'] and
current_time - policy['stability_window'][0]['timestamp'] >= stable_duration):
return {
'action': 'scale_down',
'target_instances': max(
policy['current_instances'] - 1,
policy['min_instances']
),
'reason': 'sustained_low_usage',
'stability_duration': current_time - policy['stability_window'][0]['timestamp']
}
return {'action': 'none', 'reason': 'no_scaling_needed'}
Scale-down requires ALL conditions to be met over a sustained period. This conservative approach prevents oscillation while ensuring adequate capacity during unpredictable load patterns common in enterprise environments.
🧭 Navigation¶
Previous: Session 8 - Production Ready →
Next: Session 10 - Enterprise Integration →