⚙️ Session 9 Enterprise: Complete Enterprise Architecture & Security¶
⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer and 📝 Participant paths Time Investment: 10-15 hours Outcome: Master enterprise integration, security, compliance, and monitoring
Enterprise Architecture Learning Outcomes¶
After completing this enterprise module, you will master:
- Complete enterprise integration with SharePoint, databases, and APIs
- Advanced authentication with multi-provider support and RBAC
- Comprehensive compliance frameworks (GDPR, HIPAA, SOX, CCPA)
- Real-time incremental indexing with change detection
- Production monitoring with analytics and alerting systems
Part 1: Enterprise Integration Framework¶
Complete Integration Architecture¶
The enterprise integration framework connects RAG systems to existing business infrastructure:
# Enterprise integration framework
class EnterpriseRAGIntegrator:
"""Integration framework for enterprise data systems and workflows."""
def __init__(self, integration_config: Dict[str, Any]):
self.config = integration_config
# Data source connectors
self.data_connectors = {
'sharepoint': SharePointConnector(integration_config.get('sharepoint', {})),
'confluence': ConfluenceConnector(integration_config.get('confluence', {})),
'database': DatabaseConnector(integration_config.get('database', {})),
'file_system': FileSystemConnector(integration_config.get('file_system', {})),
'api_endpoints': APIConnector(integration_config.get('api', {})),
's3': S3Connector(integration_config.get('s3', {}))
}
# Authentication and authorization
self.auth_manager = EnterpriseAuthManager(integration_config.get('auth', {}))
# Data transformation pipeline
self.data_transformer = DataTransformationPipeline()
# Change detection and incremental updates
self.change_detector = ChangeDetectionSystem(
integration_config.get('change_detection', {})
)
This comprehensive integration architecture supports diverse enterprise data sources through specialized connectors. Each connector handles source-specific authentication, data retrieval, and change detection patterns.
Enterprise Integration Setup¶
async def setup_enterprise_integration(self, data_sources: List[str]) -> Dict[str, Any]:
"""Set up integration with specified enterprise data sources."""
integration_results = {}
for source_name in data_sources:
if source_name in self.data_connectors:
try:
# Initialize connector
connector = self.data_connectors[source_name]
connection_result = await connector.initialize_connection()
# Test connectivity and permissions
test_result = await connector.test_connection()
# Set up change monitoring
if self.config.get('enable_change_detection', True):
change_monitoring = await self.change_detector.setup_monitoring(
source_name, connector
)
else:
change_monitoring = {'enabled': False}
integration_results[source_name] = {
'status': 'connected',
'connection_result': connection_result,
'test_result': test_result,
'change_monitoring': change_monitoring
}
except Exception as e:
integration_results[source_name] = {
'status': 'failed',
'error': str(e)
}
return {
'integration_results': integration_results,
'successful_connections': len([r for r in integration_results.values()
if r['status'] == 'connected']),
'total_sources': len(data_sources),
'change_detection_enabled': self.config.get('enable_change_detection', True)
}
Integration setup follows a standardized pattern - connection initialization, connectivity testing, and change monitoring configuration. This consistent approach enables automated deployment and monitoring across diverse data sources.
SharePoint Enterprise Connector¶
class SharePointConnector:
"""Enterprise SharePoint integration for document retrieval."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.site_url = config.get('site_url')
self.client_id = config.get('client_id')
self.client_secret = config.get('client_secret')
self.tenant_id = config.get('tenant_id')
# SharePoint client
self.sp_client = None
async def initialize_connection(self) -> Dict[str, Any]:
"""Initialize SharePoint connection with authentication."""
try:
# Initialize SharePoint client with OAuth
from office365.sharepoint.client_context import ClientContext
from office365.runtime.auth.client_credential import ClientCredential
credentials = ClientCredential(self.client_id, self.client_secret)
self.sp_client = ClientContext(self.site_url).with_credentials(credentials)
# Test connection
web = self.sp_client.web.get().execute_query()
return {
'success': True,
'site_title': web.title,
'site_url': web.url,
'connection_time': time.time()
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
SharePoint integration uses OAuth 2.0 client credentials flow for secure enterprise authentication. This approach provides proper security without storing user credentials while enabling automated data access.
Advanced Document Retrieval¶
async def retrieve_documents(self, folder_path: str = None,
modified_since: datetime = None) -> List[Dict[str, Any]]:
"""Retrieve documents from SharePoint with optional filtering."""
if not self.sp_client:
raise RuntimeError("SharePoint client not initialized")
documents = []
try:
# Get document library
if folder_path:
folder = self.sp_client.web.get_folder_by_server_relative_url(folder_path)
else:
folder = self.sp_client.web.default_document_library().root_folder
# Get files
files = folder.files.get().execute_query()
for file in files:
# Filter by modification date if specified
if modified_since and file.time_last_modified < modified_since:
continue
# Download file content
file_content = file.get_content().execute_query()
documents.append({
'id': file.unique_id,
'name': file.name,
'url': file.server_relative_url,
'content': file_content.value,
'modified': file.time_last_modified,
'size': file.length,
'content_type': file.content_type,
'metadata': {
'author': file.author.title if file.author else 'Unknown',
'created': file.time_created,
'version': file.ui_version_label
}
})
return documents
except Exception as e:
self.logger.error(f"SharePoint document retrieval error: {e}")
return []
Document retrieval captures comprehensive metadata essential for RAG processing. The incremental filtering capability supports efficient updates by processing only changed documents.
Part 2: Advanced Authentication and Security¶
Enterprise Authentication Manager¶
class EnterpriseAuthManager:
"""Enterprise authentication and authorization manager."""
def __init__(self, auth_config: Dict[str, Any]):
self.config = auth_config
self.auth_providers = {}
# Initialize authentication providers
if 'active_directory' in auth_config:
self.auth_providers['ad'] = ActiveDirectoryAuth(auth_config['active_directory'])
if 'oauth2' in auth_config:
self.auth_providers['oauth2'] = OAuth2Auth(auth_config['oauth2'])
if 'saml' in auth_config:
self.auth_providers['saml'] = SAMLAuth(auth_config['saml'])
# Role-based access control
self.rbac_manager = RBACManager(auth_config.get('rbac', {}))
async def authenticate_user(self, credentials: Dict[str, Any]) -> Dict[str, Any]:
"""Authenticate user using configured providers."""
auth_method = credentials.get('auth_method', 'oauth2')
if auth_method not in self.auth_providers:
return {
'authenticated': False,
'error': f'Authentication method {auth_method} not supported'
}
try:
auth_result = await self.auth_providers[auth_method].authenticate(credentials)
if auth_result['authenticated']:
# Get user permissions
user_permissions = await self.rbac_manager.get_user_permissions(
auth_result['user_info']
)
auth_result['permissions'] = user_permissions
# Create session token
session_token = self._create_session_token(auth_result['user_info'])
auth_result['session_token'] = session_token
return auth_result
except Exception as e:
return {
'authenticated': False,
'error': f'Authentication failed: {str(e)}'
}
Multi-provider authentication support enables integration with diverse enterprise identity systems. The immediate permission retrieval creates complete user context for authorization decisions throughout the RAG system.
Request Authorization System¶
async def authorize_request(self, session_token: str,
resource: str, action: str) -> Dict[str, Any]:
"""Authorize user request for specific resource and action."""
try:
# Validate session token
user_info = self._validate_session_token(session_token)
if not user_info:
return {
'authorized': False,
'error': 'Invalid or expired session token'
}
# Check permissions
authorized = await self.rbac_manager.check_permission(
user_info, resource, action
)
return {
'authorized': authorized,
'user_id': user_info['user_id'],
'permissions_checked': f'{resource}:{action}'
}
except Exception as e:
return {
'authorized': False,
'error': f'Authorization failed: {str(e)}'
}
Request authorization validates session tokens and checks resource permissions using the RBAC system. The detailed response supports audit trails and debugging while maintaining security.
Role-Based Access Control¶
class RBACManager:
"""Role-Based Access Control manager for RAG systems."""
def __init__(self, rbac_config: Dict[str, Any]):
self.config = rbac_config
# Define roles and permissions
self.roles = rbac_config.get('roles', {
'admin': ['*'], # Full access
'power_user': ['rag:query', 'rag:upload', 'rag:view_sources'],
'user': ['rag:query'],
'readonly': ['rag:query:readonly']
})
# Resource-based permissions
self.resources = rbac_config.get('resources', {
'documents': ['read', 'write', 'delete'],
'queries': ['execute', 'view_history'],
'system': ['configure', 'monitor', 'admin']
})
async def get_user_permissions(self, user_info: Dict[str, Any]) -> List[str]:
"""Get all permissions for a user based on their roles."""
user_roles = user_info.get('roles', [])
permissions = set()
for role in user_roles:
if role in self.roles:
role_permissions = self.roles[role]
permissions.update(role_permissions)
return list(permissions)
async def check_permission(self, user_info: Dict[str, Any],
resource: str, action: str) -> bool:
"""Check if user has permission for specific resource and action."""
user_permissions = await self.get_user_permissions(user_info)
# Check for wildcard permission
if '*' in user_permissions:
return True
# Check specific permission
required_permission = f"{resource}:{action}"
if required_permission in user_permissions:
return True
# Check resource-level permission
resource_permission = f"{resource}:*"
if resource_permission in user_permissions:
return True
return False
RBAC implementation provides hierarchical permission evaluation from wildcard to specific permissions. This graduated approach supports both broad administrator access and fine-grained user permissions.
Part 3: Comprehensive Compliance Framework¶
Privacy and Compliance Manager¶
# Privacy and compliance framework
class PrivacyComplianceManager:
"""Comprehensive privacy and compliance manager for enterprise RAG systems."""
def __init__(self, compliance_config: Dict[str, Any]):
self.config = compliance_config
# Initialize compliance framework handlers
self.frameworks = {
'gdpr': GDPRComplianceHandler(compliance_config.get('gdpr', {})),
'hipaa': HIPAAComplianceHandler(compliance_config.get('hipaa', {})),
'sox': SOXComplianceHandler(compliance_config.get('sox', {})),
'ccpa': CCPAComplianceHandler(compliance_config.get('ccpa', {}))
}
# Set up data processing components
self.data_classifier = DataClassifier() # Classify data sensitivity
self.pii_detector = PIIDetector() # Detect personal information
self.data_anonymizer = DataAnonymizer() # Anonymize sensitive data
# Initialize compliance audit system
self.audit_logger = ComplianceAuditLogger(compliance_config.get('audit', {}))
async def process_data_with_compliance(self, data: Dict[str, Any],
compliance_requirements: List[str]) -> Dict[str, Any]:
"""Process data while ensuring compliance with specified requirements."""
# Initialize processing result tracking
processing_result = {
'original_data_id': data.get('id'),
'compliance_checks': {},
'data_modifications': [],
'audit_entries': []
}
# Step 1: Classify the data by sensitivity level
data_classification = await self.data_classifier.classify_data(data)
processing_result['data_classification'] = data_classification
# Step 2: Detect personally identifiable information
pii_detection = await self.pii_detector.detect_sensitive_data(data)
processing_result['sensitive_data_detected'] = pii_detection
# Process data through each compliance framework
processed_data = data.copy()
for framework in compliance_requirements:
if framework in self.frameworks:
compliance_result = await self.frameworks[framework].process_data(
processed_data, data_classification, pii_detection
)
processing_result['compliance_checks'][framework] = compliance_result
# Apply modifications if not compliant
if not compliance_result['compliant']:
processed_data = await self._apply_compliance_modifications(
processed_data, compliance_result['required_actions']
)
processing_result['data_modifications'].extend(
compliance_result['required_actions']
)
# Log the compliance processing for audit trail
audit_entry = await self.audit_logger.log_compliance_processing(
data.get('id'), compliance_requirements, processing_result
)
processing_result['audit_entries'].append(audit_entry)
return {
'processed_data': processed_data,
'compliance_result': processing_result,
'compliant': all(
check['compliant'] for check in processing_result['compliance_checks'].values()
)
}
The compliance framework processes data through multiple regulatory requirements simultaneously. Comprehensive tracking and audit logging ensure regulatory compliance and provide evidence for compliance audits.
GDPR Compliance Handler¶
class GDPRComplianceHandler:
"""GDPR compliance handler for RAG systems."""
def __init__(self, gdpr_config: Dict[str, Any]):
self.config = gdpr_config
self.lawful_basis = gdpr_config.get('lawful_basis', 'legitimate_interest')
self.data_retention_days = gdpr_config.get('retention_days', 365)
async def process_data(self, data: Dict[str, Any],
classification: Dict[str, Any],
pii_detection: Dict[str, Any]) -> Dict[str, Any]:
"""Process data for GDPR compliance."""
compliance_result = {
'compliant': True,
'required_actions': [],
'gdpr_checks': {}
}
# Process only if personal data is detected
if pii_detection.get('contains_pii', False):
compliance_result['gdpr_checks']['personal_data_detected'] = True
# Check 1: Verify lawful basis for processing
consent_check = await self._check_consent(data, pii_detection)
compliance_result['gdpr_checks']['consent'] = consent_check
if not consent_check['valid']:
compliance_result['compliant'] = False
compliance_result['required_actions'].append({
'action': 'obtain_consent',
'reason': 'No valid consent for personal data processing'
})
# Check 2: Data minimization principle
minimization_check = await self._check_data_minimization(data, classification)
compliance_result['gdpr_checks']['data_minimization'] = minimization_check
if not minimization_check['compliant']:
compliance_result['required_actions'].append({
'action': 'minimize_data',
'fields_to_remove': minimization_check['excessive_fields']
})
# Check 3: Data retention limits
retention_check = await self._check_retention_period(data)
compliance_result['gdpr_checks']['retention'] = retention_check
if not retention_check['compliant']:
compliance_result['required_actions'].append({
'action': 'schedule_deletion',
'retention_expires': retention_check['expiry_date']
})
return compliance_result
GDPR compliance implementation addresses core requirements - lawful basis, data minimization, and retention limits. Each check provides specific guidance for remediation when compliance issues are detected.
Data Subject Rights Handling¶
async def handle_data_subject_request(self, request_type: str,
subject_id: str) -> Dict[str, Any]:
"""Handle GDPR data subject requests."""
# Route request to appropriate handler
if request_type == 'access':
return await self._handle_access_request(subject_id)
elif request_type == 'erasure':
return await self._handle_erasure_request(subject_id)
elif request_type == 'rectification':
return await self._handle_rectification_request(subject_id)
elif request_type == 'portability':
return await self._handle_portability_request(subject_id)
else:
return {'error': f'Unsupported request type: {request_type}'}
Data subject rights handling provides automated support for GDPR requirements. This systematic approach ensures compliance while reducing manual processing overhead.
Part 4: Real-Time Indexing and Change Detection¶
Incremental Indexing System¶
# Real-time indexing and incremental update system
class IncrementalIndexingSystem:
"""Real-time incremental indexing system for dynamic knowledge bases."""
def __init__(self, config: Dict[str, Any]):
self.config = config
# Initialize change detection systems for different source types
self.change_detectors = {
'file_system': FileSystemChangeDetector(),
'database': DatabaseChangeDetector(),
'api_webhook': WebhookChangeDetector(),
'message_queue': MessageQueueChangeDetector()
}
# Set up processing pipeline components
self.incremental_processor = IncrementalDocumentProcessor()
self.vector_store_updater = VectorStoreUpdater()
self.knowledge_graph_updater = KnowledgeGraphUpdater()
# Initialize change tracking system
self.change_tracker = ChangeTracker()
# Create processing queues with appropriate sizes
self.update_queue = asyncio.Queue(maxsize=config.get('queue_size', 10000))
self.deletion_queue = asyncio.Queue(maxsize=1000)
# Start background processors for parallel processing
self.processors = []
for i in range(config.get('num_processors', 3)):
processor = asyncio.create_task(self._incremental_update_processor(f"proc_{i}"))
self.processors.append(processor)
Real-time indexing supports multiple change detection mechanisms with separate processing queues. The multi-processor architecture enables parallel processing while maintaining system responsiveness.
Change Detection Setup¶
async def setup_change_detection(self, sources: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Set up change detection for specified data sources."""
setup_results = {}
# Configure each data source for monitoring
for source in sources:
source_type = source['type']
source_config = source['config']
source_id = source.get('id', f"{source_type}_{time.time()}")
if source_type in self.change_detectors:
try:
# Initialize the appropriate detector
detector = self.change_detectors[source_type]
setup_result = await detector.setup_monitoring(source_id, source_config)
# Connect change events to our processing pipeline
await detector.register_change_callback(
self._handle_change_event
)
setup_results[source_id] = {
'status': 'monitoring',
'detector_type': source_type,
'setup_result': setup_result
}
except Exception as e:
setup_results[source_id] = {
'status': 'failed',
'error': str(e)
}
return {
'setup_results': setup_results,
'monitoring_sources': len([r for r in setup_results.values()
if r['status'] == 'monitoring']),
'processors_active': len(self.processors)
}
Change detection setup configures monitoring for multiple data sources with consistent callback registration. Error handling ensures partial setup failures don't prevent overall system operation.
Background Update Processing¶
async def _incremental_update_processor(self, processor_id: str):
"""Background processor for incremental updates."""
while True:
try:
# Process document updates and creations
if not self.update_queue.empty():
change_event = await self.update_queue.get()
await self._process_incremental_update(change_event, processor_id)
self.update_queue.task_done()
# Process document deletions
if not self.deletion_queue.empty():
deletion_event = await self.deletion_queue.get()
await self._process_deletion(deletion_event, processor_id)
self.deletion_queue.task_done()
# Prevent busy waiting with small delay
await asyncio.sleep(0.1)
except Exception as e:
self.logger.error(f"Processor {processor_id} error: {e}")
await asyncio.sleep(1)
async def _process_incremental_update(self, change_event: Dict[str, Any],
processor_id: str):
"""Process individual incremental update."""
start_time = time.time()
try:
# Extract change information from event
source_id = change_event['source_id']
document_id = change_event['document_id']
change_type = change_event['type'] # 'create' or 'update'
document_data = change_event['document_data']
# Start tracking this change for monitoring
tracking_id = await self.change_tracker.start_tracking(change_event)
# Process the document through our pipeline
processing_result = await self.incremental_processor.process_document(
document_data, change_type
)
if processing_result['success']:
# Update vector store with processed chunks
vector_update_result = await self.vector_store_updater.update_document(
document_id, processing_result['chunks'], change_type
)
# Update knowledge graph if enabled
if self.config.get('update_knowledge_graph', True):
kg_update_result = await self.knowledge_graph_updater.update_document(
document_id, processing_result['entities'],
processing_result['relationships'], change_type
)
else:
kg_update_result = {'skipped': True}
# Complete change tracking with results
await self.change_tracker.complete_tracking(tracking_id, {
'processing_time': time.time() - start_time,
'vector_update': vector_update_result,
'kg_update': kg_update_result
})
else:
await self.change_tracker.fail_tracking(tracking_id, processing_result['error'])
except Exception as e:
self.logger.error(f"Incremental update error: {e}")
if 'tracking_id' in locals():
await self.change_tracker.fail_tracking(tracking_id, str(e))
Incremental update processing includes comprehensive tracking and error handling. The change tracker provides visibility into processing performance and enables debugging of update failures.
File System Change Detection¶
class FileSystemChangeDetector:
"""File system change detection using OS-level monitoring."""
def __init__(self):
self.watchers = {} # Track active file system watchers
self.change_callbacks = [] # Registered callback functions
async def setup_monitoring(self, source_id: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Set up file system monitoring for specified paths."""
import watchdog.observers
from watchdog.events import FileSystemEventHandler
watch_paths = config.get('paths', [])
file_patterns = config.get('patterns', ['*'])
# Create custom event handler for RAG system
class RAGFileSystemEventHandler(FileSystemEventHandler):
def __init__(self, detector_instance, source_id):
self.detector = detector_instance
self.source_id = source_id
def on_modified(self, event):
if not event.is_directory:
asyncio.create_task(self.detector._handle_file_change(
self.source_id, event.src_path, 'update'
))
def on_created(self, event):
if not event.is_directory:
asyncio.create_task(self.detector._handle_file_change(
self.source_id, event.src_path, 'create'
))
def on_deleted(self, event):
if not event.is_directory:
asyncio.create_task(self.detector._handle_file_change(
self.source_id, event.src_path, 'delete'
))
# Initialize file system observer
observer = watchdog.observers.Observer()
event_handler = RAGFileSystemEventHandler(self, source_id)
# Set up monitoring for all specified paths
for path in watch_paths:
observer.schedule(event_handler, path, recursive=True)
observer.start()
self.watchers[source_id] = observer
return {
'monitoring_paths': watch_paths,
'file_patterns': file_patterns,
'watcher_active': True
}
async def _handle_file_change(self, source_id: str, file_path: str, change_type: str):
"""Handle detected file system change."""
try:
# Read file content for create/update operations
if change_type in ['create', 'update']:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
document_data = {
'path': file_path,
'content': content,
'modified_time': os.path.getmtime(file_path)
}
else:
# For deletions, no content is available
document_data = None
# Create standardized change event
change_event = {
'source_id': source_id,
'document_id': file_path,
'type': change_type,
'document_data': document_data,
'timestamp': time.time()
}
# Notify all registered callbacks
for callback in self.change_callbacks:
await callback(change_event)
except Exception as e:
self.logger.error(f"File change handling error: {e}")
File system monitoring uses OS-level change detection for efficient, real-time updates. The standardized change event format enables consistent processing across different data sources.
Part 5: Advanced Monitoring and Analytics¶
Comprehensive Monitoring System¶
# Production monitoring and observability system setup
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import structlog
class RAGMonitoringSystem:
"""Comprehensive monitoring and observability for production RAG systems."""
def __init__(self, config: Dict[str, Any]):
self.config = config
# Initialize core monitoring components
self._setup_prometheus_metrics()
self.logger = structlog.get_logger()
# Initialize specialized monitoring components
self.performance_tracker = RAGPerformanceTracker()
self.alert_manager = RAGAlertManager(config.get('alerting', {}))
self.analytics = RAGAnalytics(config.get('analytics', {}))
self.health_checker = RAGHealthChecker()
def _setup_prometheus_metrics(self):
"""Set up Prometheus metrics for RAG system monitoring."""
# Request tracking metrics
self.request_counter = Counter(
'rag_requests_total',
'Total number of RAG requests',
['method', 'endpoint', 'status']
)
self.request_duration = Histogram(
'rag_request_duration_seconds',
'RAG request duration in seconds',
['method', 'endpoint']
)
# System health metrics
self.active_connections = Gauge(
'rag_active_connections',
'Number of active connections',
['service']
)
self.queue_size = Gauge(
'rag_queue_size',
'Size of processing queues',
['queue_type']
)
# Quality and accuracy metrics
self.response_quality = Histogram(
'rag_response_quality',
'Response quality scores',
['query_type']
)
self.retrieval_accuracy = Histogram(
'rag_retrieval_accuracy',
'Retrieval accuracy scores',
['retrieval_method']
)
# Error tracking
self.error_counter = Counter(
'rag_errors_total',
'Total number of errors',
['error_type', 'service']
)
# Start the Prometheus metrics server
metrics_port = self.config.get('metrics_port', 8000)
start_http_server(metrics_port)
Comprehensive metrics collection covers request patterns, system health, quality scores, and error tracking. Prometheus integration enables standard monitoring tool integration and alerting.
Advanced Request Tracking¶
async def track_request(self, method: str, endpoint: str,
request_func: Callable) -> Dict[str, Any]:
"""Track RAG request with comprehensive monitoring."""
start_time = time.time()
# Use Prometheus histogram to automatically track duration
with self.request_duration.labels(method=method, endpoint=endpoint).time():
try:
# Execute the RAG request function
result = await request_func()
# Record successful completion
self.request_counter.labels(
method=method, endpoint=endpoint, status='success'
).inc()
# Track quality metrics if available
if 'quality_score' in result:
query_type = result.get('query_type', 'unknown')
self.response_quality.labels(query_type=query_type).observe(
result['quality_score']
)
# Log structured information for observability
self.logger.info(
"RAG request completed",
method=method,
endpoint=endpoint,
duration=time.time() - start_time,
quality_score=result.get('quality_score'),
sources_retrieved=result.get('sources_count', 0)
)
return result
except Exception as e:
# Record failed request metrics
self.request_counter.labels(
method=method, endpoint=endpoint, status='error'
).inc()
# Track error type for analysis
error_type = type(e).__name__
self.error_counter.labels(
error_type=error_type, service=endpoint
).inc()
# Log detailed error information
self.logger.error(
"RAG request failed",
method=method,
endpoint=endpoint,
error=str(e),
duration=time.time() - start_time
)
# Check if alert thresholds are exceeded
await self.alert_manager.check_error_threshold(endpoint, error_type)
# Re-raise the exception for proper error handling
raise
Request tracking captures comprehensive performance and quality metrics while providing structured logging for operational visibility. Error handling includes automatic alerting threshold checking.
Performance Analytics Engine¶
class RAGAnalytics:
"""Advanced analytics for RAG system performance and usage."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.analytics_data = {}
# Initialize analytics components
self.metrics_store = MetricsTimeSeriesStore()
self.query_analyzer = QueryPatternAnalyzer()
self.performance_predictor = PerformancePredictor()
async def analyze_system_performance(self, time_window: str = '1h') -> Dict[str, Any]:
"""Analyze comprehensive system performance over time window."""
# Retrieve metrics data for the specified time window
metrics = await self.metrics_store.get_metrics_window(time_window)
# Perform comprehensive performance analysis
performance_analysis = {
'request_volume': self._analyze_request_volume(metrics),
'response_times': self._analyze_response_times(metrics),
'quality_trends': self._analyze_quality_trends(metrics),
'error_patterns': self._analyze_error_patterns(metrics),
'resource_usage': self._analyze_resource_usage(metrics),
'user_satisfaction': self._analyze_user_satisfaction(metrics)
}
# Identify potential performance issues
performance_issues = self._identify_performance_issues(performance_analysis)
# Generate actionable recommendations
recommendations = self._generate_performance_recommendations(
performance_analysis, performance_issues
)
return {
'analysis_period': time_window,
'performance_analysis': performance_analysis,
'identified_issues': performance_issues,
'recommendations': recommendations,
'overall_health_score': self._calculate_health_score(performance_analysis)
}
def _generate_performance_recommendations(self, analysis: Dict[str, Any],
issues: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate actionable performance recommendations."""
recommendations = []
# Check for high response time issues
if analysis['response_times']['p95'] > 2.0: # 2 second threshold
recommendations.append({
'type': 'performance',
'priority': 'high',
'issue': 'High response times detected',
'recommendation': 'Scale retrieval services or optimize vector search indices',
'expected_impact': 'Reduce P95 response time by 30-50%'
})
# Check for quality issues
if analysis['quality_trends']['average_score'] < 0.7:
recommendations.append({
'type': 'quality',
'priority': 'medium',
'issue': 'Response quality below target',
'recommendation': 'Review document chunking strategy, consider reranking',
'expected_impact': 'Improve average quality score by 15-25%'
})
# Check for resource utilization issues
if analysis['resource_usage']['cpu_utilization'] > 0.8:
recommendations.append({
'type': 'scaling',
'priority': 'high',
'issue': 'High CPU utilization detected',
'recommendation': 'Enable auto-scaling or add more service instances',
'expected_impact': 'Reduce CPU utilization to 60-70% range'
})
return recommendations
Performance analytics provides comprehensive system analysis with actionable recommendations. The machine learning-based approach identifies patterns and predicts performance issues before they impact users.
Complete Production Deployment¶
Production RAG Deployment System¶
# Complete production RAG system deployment
class ProductionRAGDeployment:
"""Complete production RAG deployment with enterprise features."""
def __init__(self, deployment_config: Dict[str, Any]):
# Core system orchestration
self.orchestrator = RAGServiceOrchestrator(deployment_config['services'])
# Enterprise integration
self.enterprise_integrator = EnterpriseRAGIntegrator(
deployment_config['enterprise_integration']
)
# Security and compliance
self.auth_manager = EnterpriseAuthManager(deployment_config['auth'])
self.compliance_manager = PrivacyComplianceManager(
deployment_config['compliance']
)
# Real-time indexing
self.incremental_indexer = IncrementalIndexingSystem(
deployment_config['incremental_indexing']
)
# Monitoring and analytics
self.monitoring_system = RAGMonitoringSystem(deployment_config['monitoring'])
# Auto-scaling
self.auto_scaler = RAGAutoScaler(deployment_config['auto_scaling'])
async def deploy_production_system(self) -> Dict[str, Any]:
"""Deploy complete production RAG system."""
deployment_result = {
'deployment_id': f"rag_prod_{int(time.time())}",
'components': {},
'status': 'deploying'
}
try:
# 1. Start core services
services_result = await self.orchestrator.start_services()
deployment_result['components']['services'] = services_result
# 2. Setup enterprise integration
integration_result = await self.enterprise_integrator.setup_enterprise_integration(
['sharepoint', 'database', 'file_system']
)
deployment_result['components']['enterprise_integration'] = integration_result
# 3. Initialize security
security_result = await self._initialize_security()
deployment_result['components']['security'] = security_result
# 4. Setup incremental indexing
indexing_result = await self.incremental_indexer.setup_change_detection([
{'type': 'file_system', 'config': {'paths': ['/data/documents']}},
{'type': 'database', 'config': {'connection_string': 'postgresql://...'}}
])
deployment_result['components']['incremental_indexing'] = indexing_result
# 5. Start monitoring
monitoring_result = await self._start_monitoring()
deployment_result['components']['monitoring'] = monitoring_result
# 6. Configure auto-scaling
scaling_result = await self._configure_auto_scaling()
deployment_result['components']['auto_scaling'] = scaling_result
deployment_result['status'] = 'deployed'
deployment_result['deployment_time'] = time.time()
return deployment_result
except Exception as e:
deployment_result['status'] = 'failed'
deployment_result['error'] = str(e)
return deployment_result
async def health_check_production(self) -> Dict[str, Any]:
"""Comprehensive production health check."""
return await self.monitoring_system.health_checker.comprehensive_health_check()
async def get_production_metrics(self, time_window: str = '1h') -> Dict[str, Any]:
"""Get comprehensive production metrics."""
return await self.monitoring_system.analytics.analyze_system_performance(time_window)
Complete production deployment orchestrates all enterprise components with comprehensive error handling. The modular approach enables partial deployment success while providing detailed status reporting.
🧭 Navigation¶
Previous: Session 8 - Production Ready →
Next: Session 10 - Enterprise Integration →