Session 10 - Module A: Advanced Security & Compliance (80 minutes)¶
Prerequisites: Session 10 Core Section Complete
Target Audience: Security engineers, compliance officers, and enterprise architects
Cognitive Load: 6 advanced security concepts
Module Overview¶
This module explores comprehensive security and compliance frameworks for enterprise agent systems including GDPR compliance, RBAC implementation, advanced encryption strategies, audit logging, zero-trust architecture, and regulatory compliance patterns. You'll learn to build enterprise-grade security systems that meet the highest standards for data protection and regulatory requirements.
Learning Objectives¶
By the end of this module, you will:
- Implement GDPR-compliant data handling with automated privacy protection
- Design comprehensive RBAC systems with fine-grained access control
- Create zero-trust security architectures with end-to-end encryption
- Build comprehensive audit logging and compliance monitoring systems
Part 1: GDPR Compliance and Data Privacy (30 minutes)¶
Comprehensive GDPR Implementation¶
🗂️ File: src/session10/security/gdpr_compliance.py
- GDPR compliance framework
Let's start by setting up the foundational imports and enumerations for our GDPR compliance system. This demonstrates the comprehensive scope of data protection requirements in enterprise systems:
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import asyncio
import json
import hashlib
import logging
from abc import ABC, abstractmethod
Next, we define the GDPR legal basis types, which form the foundation of all data processing activities. Every piece of personal data must have a valid legal basis under GDPR Article 6:
class LegalBasisType(Enum):
"""GDPR legal basis for data processing"""
CONSENT = "consent"
CONTRACT = "contract"
LEGAL_OBLIGATION = "legal_obligation"
VITAL_INTERESTS = "vital_interests"
PUBLIC_TASK = "public_task"
LEGITIMATE_INTERESTS = "legitimate_interests"
We also need to define the data subject rights that individuals can exercise under GDPR. These rights form the core of data protection and must be implemented in any compliant system:
class DataSubjectRights(Enum):
"""GDPR data subject rights"""
ACCESS = "access" # Article 15
RECTIFICATION = "rectification" # Article 16
ERASURE = "erasure" # Article 17 (Right to be forgotten)
RESTRICT_PROCESSING = "restrict_processing" # Article 18
DATA_PORTABILITY = "data_portability" # Article 20
OBJECT = "object" # Article 21
WITHDRAW_CONSENT = "withdraw_consent" # Article 7
Now we create a comprehensive data structure to track every piece of personal data with its GDPR metadata. This demonstrates the principle of accountability - we must be able to prove compliance for every data element:
@dataclass
class PersonalDataElement:
"""Individual personal data element with GDPR metadata"""
element_id: str
data_type: str
category: str # Basic personal data, sensitive personal data, etc.
source: str
legal_basis: LegalBasisType
purpose: str
retention_period_days: int
is_sensitive: bool = False
consent_id: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
last_accessed: datetime = field(default_factory=datetime.now)
encryption_status: str = "encrypted"
Next, we implement the main GDPR compliance manager class. This serves as the central orchestrator for all data protection activities, maintaining comprehensive records as required by GDPR Articles 30 and 35:
class GDPRComplianceManager:
"""Comprehensive GDPR compliance management system"""
def __init__(self):
self.personal_data_registry: Dict[str, PersonalDataElement] = {}
self.consent_records: Dict[str, Dict[str, Any]] = {}
self.processing_activities: List[Dict[str, Any]] = []
self.data_subject_requests: Dict[str, Dict[str, Any]] = {}
self.privacy_impact_assessments: Dict[str, Dict[str, Any]] = {}
self.logger = logging.getLogger(__name__)
The data registration method implements the core principle of lawful processing. Before any personal data can be stored, we must validate the legal basis and obtain consent where required:
async def register_personal_data(self, data_element: PersonalDataElement,
data_subject_id: str) -> Dict[str, Any]:
"""Register personal data with GDPR compliance tracking"""
# Validate legal basis
if not await self._validate_legal_basis(data_element, data_subject_id):
return {
'success': False,
'error': 'Invalid or insufficient legal basis for processing'
}
# Check for consent if required
if data_element.legal_basis == LegalBasisType.CONSENT:
consent_valid = await self._validate_consent(data_element.consent_id, data_subject_id)
if not consent_valid:
return {
'success': False,
'error': 'Valid consent required for processing'
}
After validation, we register the data element and perform necessary compliance actions including logging and privacy impact assessment triggers:
# Register data element
registry_key = f"{data_subject_id}:{data_element.element_id}"
self.personal_data_registry[registry_key] = data_element
# Log processing activity
await self._log_processing_activity("data_registration", {
'data_subject_id': data_subject_id,
'element_id': data_element.element_id,
'legal_basis': data_element.legal_basis.value,
'purpose': data_element.purpose
})
# Check if Privacy Impact Assessment is needed
if data_element.is_sensitive or data_element.category == "biometric":
await self._trigger_pia_assessment(data_element, data_subject_id)
return {
'success': True,
'registry_key': registry_key,
'retention_until': datetime.now() + timedelta(days=data_element.retention_period_days)
}
The data subject request handler is the cornerstone of GDPR compliance, implementing all the rights guaranteed to individuals under Articles 15-22:
async def handle_data_subject_request(self, request_type: DataSubjectRights,
data_subject_id: str,
request_details: Dict[str, Any] = None) -> Dict[str, Any]:
"""Handle GDPR data subject rights requests"""
request_id = f"dsr_{int(datetime.now().timestamp())}"
request_details = request_details or {}
# Validate data subject identity (simplified for demo)
if not await self._validate_data_subject_identity(data_subject_id):
return {
'success': False,
'error': 'Data subject identity validation failed'
}
The routing logic directs requests to specialized handlers, each implementing specific GDPR article requirements:
# Process specific request type
if request_type == DataSubjectRights.ACCESS:
result = await self._handle_access_request(data_subject_id)
elif request_type == DataSubjectRights.RECTIFICATION:
result = await self._handle_rectification_request(data_subject_id, request_details)
elif request_type == DataSubjectRights.ERASURE:
result = await self._handle_erasure_request(data_subject_id, request_details)
elif request_type == DataSubjectRights.DATA_PORTABILITY:
result = await self._handle_portability_request(data_subject_id)
elif request_type == DataSubjectRights.RESTRICT_PROCESSING:
result = await self._handle_restriction_request(data_subject_id, request_details)
elif request_type == DataSubjectRights.OBJECT:
result = await self._handle_objection_request(data_subject_id, request_details)
elif request_type == DataSubjectRights.WITHDRAW_CONSENT:
result = await self._handle_consent_withdrawal(data_subject_id, request_details)
else:
result = {'success': False, 'error': 'Unsupported request type'}
Comprehensive audit trails ensure accountability and transparency:
# Store request record
self.data_subject_requests[request_id] = {
'request_type': request_type.value,
'data_subject_id': data_subject_id,
'request_details': request_details,
'timestamp': datetime.now(),
'status': 'completed' if result['success'] else 'failed',
'result': result
}
# Log activity
await self._log_processing_activity("data_subject_request", {
'request_id': request_id,
'request_type': request_type.value,
'data_subject_id': data_subject_id,
'status': result.get('status', 'unknown')
})
return {**result, 'request_id': request_id}
The access request handler implements Article 15 (Right of access), which requires providing comprehensive information about all data processing. This is often the most complex right to implement:
async def _handle_access_request(self, data_subject_id: str) -> Dict[str, Any]:
"""Handle Article 15 - Right of access by the data subject"""
subject_data = {}
# Collect all personal data for this subject
for registry_key, data_element in self.personal_data_registry.items():
if registry_key.startswith(f"{data_subject_id}:"):
subject_data[data_element.element_id] = {
'data_type': data_element.data_type,
'category': data_element.category,
'purpose': data_element.purpose,
'legal_basis': data_element.legal_basis.value,
'retention_period_days': data_element.retention_period_days,
'created_at': data_element.created_at.isoformat(),
'last_accessed': data_element.last_accessed.isoformat()
}
We must also include processing activities and third-party sharing information to provide complete transparency as required by Article 15:
# Include processing activities
processing_info = [
activity for activity in self.processing_activities
if activity.get('data_subject_id') == data_subject_id
]
access_report = {
'data_subject_id': data_subject_id,
'personal_data': subject_data,
'processing_activities': processing_info,
'retention_policies': self._get_retention_policies(data_subject_id),
'third_party_sharing': self._get_third_party_sharing_info(data_subject_id),
'report_generated_at': datetime.now().isoformat()
}
return {
'success': True,
'access_report': access_report,
'format': 'structured_json'
}
The erasure request handler implements Article 17 (Right to be forgotten), one of the most challenging rights to implement as it requires careful legal analysis and comprehensive data removal:
async def _handle_erasure_request(self, data_subject_id: str,
request_details: Dict[str, Any]) -> Dict[str, Any]:
"""Handle Article 17 - Right to erasure (right to be forgotten)"""
# Check if erasure is legally permissible
erasure_permitted = await self._check_erasure_permitted(data_subject_id, request_details)
if not erasure_permitted['permitted']:
return {
'success': False,
'error': 'Erasure not permitted',
'reason': erasure_permitted['reason']
}
When erasure is permitted, we must systematically remove all traces of the individual's data from our systems:
# Perform erasure
erased_elements = []
# Remove from personal data registry
keys_to_remove = [
key for key in self.personal_data_registry.keys()
if key.startswith(f"{data_subject_id}:")
]
for key in keys_to_remove:
data_element = self.personal_data_registry[key]
erased_elements.append({
'element_id': data_element.element_id,
'data_type': data_element.data_type,
'erasure_timestamp': datetime.now().isoformat()
})
del self.personal_data_registry[key]
Critically, we must also handle cascade effects including consent record removal and third-party notifications as required by Article 17(2):
# Remove consent records
if data_subject_id in self.consent_records:
del self.consent_records[data_subject_id]
# Notify third parties if necessary
third_party_notifications = await self._notify_third_parties_of_erasure(data_subject_id)
return {
'success': True,
'erased_elements': erased_elements,
'erasure_timestamp': datetime.now().isoformat(),
'third_party_notifications': third_party_notifications
}
The data portability handler implements Article 20, which applies only to data processed under consent or contract legal bases. This creates a machine-readable export for data subject empowerment:
async def _handle_portability_request(self, data_subject_id: str) -> Dict[str, Any]:
"""Handle Article 20 - Right to data portability"""
# Collect portable data (consent-based or contract-based only)
portable_data = {}
for registry_key, data_element in self.personal_data_registry.items():
if (registry_key.startswith(f"{data_subject_id}:") and
data_element.legal_basis in [LegalBasisType.CONSENT, LegalBasisType.CONTRACT]):
portable_data[data_element.element_id] = {
'data_type': data_element.data_type,
'value': f"[ENCRYPTED_VALUE_{data_element.element_id}]", # Would be actual data in production
'created_at': data_element.created_at.isoformat(),
'legal_basis': data_element.legal_basis.value
}
The export package includes comprehensive metadata and integrity verification to ensure the portability package meets technical and security requirements:
# Create portable format
portable_package = {
'data_subject_id': data_subject_id,
'export_timestamp': datetime.now().isoformat(),
'data_format': 'JSON',
'data': portable_data,
'metadata': {
'export_version': '1.0',
'compliance_standard': 'GDPR_Article_20',
'data_integrity_hash': hashlib.sha256(
json.dumps(portable_data, sort_keys=True).encode()
).hexdigest()
}
}
return {
'success': True,
'portable_data': portable_package,
'format': 'structured_json',
'encryption_available': True
}
The consent management system is fundamental to GDPR compliance, implementing the requirements for valid, specific, informed, and freely given consent under Article 7:
async def manage_consent(self, data_subject_id: str, purpose: str,
consent_given: bool, consent_details: Dict[str, Any] = None) -> Dict[str, Any]:
"""Manage GDPR consent with granular controls"""
consent_id = f"consent_{data_subject_id}_{purpose}_{int(datetime.now().timestamp())}"
consent_record = {
'consent_id': consent_id,
'data_subject_id': data_subject_id,
'purpose': purpose,
'consent_given': consent_given,
'timestamp': datetime.now(),
'consent_details': consent_details or {},
'withdrawal_timestamp': None,
'is_active': consent_given
}
Consent storage and withdrawal handling must implement cascade effects as required by Article 7(3) - consent withdrawal must be as easy as giving consent:
# Store consent record
if data_subject_id not in self.consent_records:
self.consent_records[data_subject_id] = {}
self.consent_records[data_subject_id][consent_id] = consent_record
# If consent is withdrawn, update related data processing
if not consent_given:
await self._handle_consent_withdrawal_cascade(data_subject_id, purpose)
await self._log_processing_activity("consent_management", {
'consent_id': consent_id,
'data_subject_id': data_subject_id,
'purpose': purpose,
'consent_given': consent_given
})
return {
'success': True,
'consent_id': consent_id,
'status': 'active' if consent_given else 'withdrawn'
}
The Privacy Impact Assessment (PIA) implementation follows Article 35 requirements, mandating systematic privacy risk evaluation for high-risk processing:
async def perform_data_protection_impact_assessment(self,
processing_activity: Dict[str, Any]) -> Dict[str, Any]:
"""Perform Data Protection Impact Assessment (DPIA) as required by Article 35"""
pia_id = f"pia_{int(datetime.now().timestamp())}"
# Assess necessity and proportionality
necessity_assessment = await self._assess_processing_necessity(processing_activity)
# Identify risks to data subjects
risk_assessment = await self._assess_data_subject_risks(processing_activity)
# Evaluate safeguards and measures
safeguards_assessment = await self._assess_safeguards(processing_activity)
The comprehensive risk analysis combines necessity, risk, and safeguards assessments to determine if supervisory authority consultation is required:
# Overall risk determination
overall_risk = await self._determine_overall_risk(
necessity_assessment, risk_assessment, safeguards_assessment
)
pia_report = {
'pia_id': pia_id,
'processing_activity': processing_activity,
'assessment_date': datetime.now().isoformat(),
'necessity_assessment': necessity_assessment,
'risk_assessment': risk_assessment,
'safeguards_assessment': safeguards_assessment,
'overall_risk_level': overall_risk['level'],
'recommendations': overall_risk['recommendations'],
'requires_consultation': overall_risk['level'] == 'high',
'review_date': (datetime.now() + timedelta(days=365)).isoformat()
}
self.privacy_impact_assessments[pia_id] = pia_report
return {
'success': True,
'pia_id': pia_id,
'risk_level': overall_risk['level'],
'requires_consultation': pia_report['requires_consultation'],
'recommendations': overall_risk['recommendations']
}
The compliance reporting system provides comprehensive oversight and demonstrates accountability as required by Article 5(2). This generates executive-level compliance dashboards:
async def generate_compliance_report(self) -> Dict[str, Any]:
"""Generate comprehensive GDPR compliance report"""
# Data processing statistics
processing_stats = {
'total_data_subjects': len(set(
key.split(':')[0] for key in self.personal_data_registry.keys()
)),
'total_data_elements': len(self.personal_data_registry),
'consent_based_processing': len([
elem for elem in self.personal_data_registry.values()
if elem.legal_basis == LegalBasisType.CONSENT
]),
'sensitive_data_elements': len([
elem for elem in self.personal_data_registry.values()
if elem.is_sensitive
])
}
Rights request metrics demonstrate our responsiveness to data subject exercises, which is critical for regulatory audits:
# Rights requests statistics
rights_stats = {
'total_requests': len(self.data_subject_requests),
'access_requests': len([
req for req in self.data_subject_requests.values()
if req['request_type'] == DataSubjectRights.ACCESS.value
]),
'erasure_requests': len([
req for req in self.data_subject_requests.values()
if req['request_type'] == DataSubjectRights.ERASURE.value
]),
'average_response_time_hours': 48, # Calculated from actual response times
'compliance_rate': 100.0
}
The comprehensive report combines all compliance dimensions into a single accountability document:
# Consent management statistics
consent_stats = {
'total_consents': sum(len(consents) for consents in self.consent_records.values()),
'active_consents': sum(
1 for consents in self.consent_records.values()
for consent in consents.values()
if consent['is_active']
),
'consent_withdrawal_rate': 5.2 # Percentage
}
# Security and breach information
security_status = {
'encryption_compliance': 100.0, # Percentage of data encrypted
'access_control_compliance': 100.0,
'audit_logging_enabled': True,
'data_breaches_ytd': 0,
'last_security_assessment': datetime.now().isoformat()
}
compliance_report = {
'report_id': f"compliance_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
'report_date': datetime.now().isoformat(),
'reporting_period': {
'start': (datetime.now() - timedelta(days=365)).isoformat(),
'end': datetime.now().isoformat()
},
'processing_statistics': processing_stats,
'rights_request_statistics': rights_stats,
'consent_management': consent_stats,
'security_status': security_status,
'privacy_impact_assessments': len(self.privacy_impact_assessments),
'compliance_score': self._calculate_compliance_score(),
'recommendations': await self._generate_compliance_recommendations()
}
return compliance_report
The compliance scoring algorithm provides quantitative assessment of GDPR adherence for management oversight:
def _calculate_compliance_score(self) -> float:
"""Calculate overall GDPR compliance score"""
# Simplified scoring algorithm
score = 100.0
# Deduct for missing consent records
consent_coverage = len([
elem for elem in self.personal_data_registry.values()
if elem.legal_basis != LegalBasisType.CONSENT or elem.consent_id
]) / max(1, len(self.personal_data_registry))
score *= consent_coverage
# Deduct for delayed rights responses
# (In production, would calculate actual response times)
# Deduct for missing PIAs on high-risk processing
# (In production, would analyze actual risk levels)
return round(score, 1)
Advanced data anonymization techniques protect against re-identification while preserving data utility. K-anonymity ensures each individual cannot be distinguished from at least k-1 others:
class DataAnonymization:
"""Advanced data anonymization and pseudonymization techniques"""
def __init__(self):
self.anonymization_techniques = {}
self.pseudonymization_keys = {}
K-anonymity implementation groups records by quasi-identifiers and applies generalization or suppression to meet privacy requirements:
async def k_anonymize_dataset(self, dataset: List[Dict[str, Any]],
k_value: int = 5,
quasi_identifiers: List[str] = None) -> Dict[str, Any]:
"""Implement k-anonymity to protect against re-identification"""
if not quasi_identifiers:
quasi_identifiers = ['age', 'zipcode', 'gender']
# Group records by quasi-identifier combinations
groups = {}
for record in dataset:
qi_signature = tuple(record.get(qi) for qi in quasi_identifiers)
if qi_signature not in groups:
groups[qi_signature] = []
groups[qi_signature].append(record)
Records that don't meet the k-anonymity threshold are either generalized (reducing precision) or suppressed (removed) to prevent re-identification:
# Identify groups smaller than k
small_groups = {sig: records for sig, records in groups.items() if len(records) < k_value}
# Apply generalization/suppression
anonymized_dataset = []
suppressed_records = 0
for signature, records in groups.items():
if len(records) >= k_value:
# Group meets k-anonymity requirement
anonymized_dataset.extend(records)
else:
# Apply generalization or suppression
generalized_records = await self._generalize_records(records, quasi_identifiers)
if generalized_records:
anonymized_dataset.extend(generalized_records)
else:
suppressed_records += len(records)
return {
'anonymized_dataset': anonymized_dataset,
'k_value': k_value,
'original_size': len(dataset),
'anonymized_size': len(anonymized_dataset),
'suppressed_records': suppressed_records,
'privacy_level': f"{k_value}-anonymous"
}
Differential privacy provides mathematically rigorous privacy guarantees by adding calibrated noise to query results, ensuring individual contributions cannot be detected:
async def differential_privacy_noise(self, query_result: float,
epsilon: float = 1.0,
sensitivity: float = 1.0) -> Dict[str, Any]:
"""Add differential privacy noise to query results"""
import random
import math
# Laplace mechanism for differential privacy
scale = sensitivity / epsilon
noise = random.laplace(0, scale)
noisy_result = query_result + noise
return {
'original_result': query_result,
'noisy_result': noisy_result,
'noise_added': noise,
'epsilon': epsilon,
'privacy_budget_used': epsilon,
'privacy_guarantee': f"({epsilon}, 0)-differential privacy"
}
Comprehensive audit logging is essential for GDPR accountability, providing detailed records of all data processing activities for regulatory compliance and security monitoring:
class AuditLoggingSystem:
"""Comprehensive audit logging for GDPR compliance"""
def __init__(self):
self.audit_logs: List[Dict[str, Any]] = []
self.log_retention_days = 2555 # 7 years for compliance
Data access logging captures all interactions with personal data, creating immutable audit trails for compliance verification:
async def log_data_access(self, access_details: Dict[str, Any]):
"""Log data access events"""
log_entry = {
'event_id': f"access_{int(datetime.now().timestamp())}",
'event_type': 'data_access',
'timestamp': datetime.now().isoformat(),
'data_subject_id': access_details.get('data_subject_id'),
'accessor_id': access_details.get('accessor_id'),
'data_elements': access_details.get('data_elements', []),
'purpose': access_details.get('purpose'),
'legal_basis': access_details.get('legal_basis'),
'ip_address': access_details.get('ip_address'),
'user_agent': access_details.get('user_agent')
}
self.audit_logs.append(log_entry)
Data modification logging captures all changes to personal data, supporting both security monitoring and data subject rights verification:
async def log_data_modification(self, modification_details: Dict[str, Any]):
"""Log data modification events"""
log_entry = {
'event_id': f"modify_{int(datetime.now().timestamp())}",
'event_type': 'data_modification',
'timestamp': datetime.now().isoformat(),
'data_subject_id': modification_details.get('data_subject_id'),
'modifier_id': modification_details.get('modifier_id'),
'modifications': modification_details.get('modifications', []),
'reason': modification_details.get('reason'),
'legal_basis': modification_details.get('legal_basis')
}
self.audit_logs.append(log_entry)
Audit report generation provides comprehensive analysis of all logged activities, supporting compliance demonstrations and security investigations:
async def generate_audit_report(self, start_date: datetime = None,
end_date: datetime = None) -> Dict[str, Any]:
"""Generate comprehensive audit report"""
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
# Filter logs by date range
filtered_logs = [
log for log in self.audit_logs
if start_date <= datetime.fromisoformat(log['timestamp']) <= end_date
]
# Analyze access patterns
access_stats = {
'total_access_events': len([log for log in filtered_logs if log['event_type'] == 'data_access']),
'unique_data_subjects': len(set(log.get('data_subject_id') for log in filtered_logs if log.get('data_subject_id'))),
'unique_accessors': len(set(log.get('accessor_id') for log in filtered_logs if log.get('accessor_id')))
}
return {
'report_period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'total_events': len(filtered_logs),
'access_statistics': access_stats,
'events': filtered_logs,
'compliance_status': 'compliant'
}
Part 2: Role-Based Access Control and Zero-Trust Architecture (30 minutes)¶
Advanced RBAC Implementation¶
🗂️ File: src/session10/security/enterprise_auth.py
- Advanced RBAC and zero-trust security
Now we implement advanced Role-Based Access Control (RBAC) with zero-trust principles. Let's start with the foundational imports and permission system:
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import asyncio
import jwt
import hashlib
import logging
from abc import ABC, abstractmethod
Granular permissions form the foundation of enterprise access control, enabling fine-grained authorization decisions based on the principle of least privilege:
class Permission(Enum):
"""Granular permission system"""
READ_PERSONAL_DATA = "read_personal_data"
WRITE_PERSONAL_DATA = "write_personal_data"
DELETE_PERSONAL_DATA = "delete_personal_data"
MANAGE_USERS = "manage_users"
MANAGE_ROLES = "manage_roles"
ACCESS_AUDIT_LOGS = "access_audit_logs"
SYSTEM_ADMIN = "system_admin"
COMPLIANCE_VIEW = "compliance_view"
SECURITY_ADMIN = "security_admin"
Next, we define comprehensive role and user data structures that support hierarchical roles and attribute-based access control (ABAC):
@dataclass
class Role:
"""Enterprise role with fine-grained permissions"""
role_id: str
role_name: str
permissions: Set[Permission] = field(default_factory=set)
description: str = ""
created_at: datetime = field(default_factory=datetime.now)
is_active: bool = True
parent_roles: Set[str] = field(default_factory=set)
User objects integrate RBAC with additional security attributes, supporting multi-factor authentication and session management:
@dataclass
class User:
"""Enterprise user with RBAC"""
user_id: str
username: str
email: str
roles: Set[str] = field(default_factory=set)
attributes: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
last_login: Optional[datetime] = None
is_active: bool = True
password_hash: str = ""
mfa_enabled: bool = False
The Zero-Trust Security Manager implements the core principle of "never trust, always verify" with comprehensive session management and policy enforcement:
class ZeroTrustSecurityManager:
"""Zero-trust security architecture implementation"""
def __init__(self):
self.users: Dict[str, User] = {}
self.roles: Dict[str, Role] = {}
self.access_policies: List[Dict[str, Any]] = []
self.active_sessions: Dict[str, Dict[str, Any]] = {}
self.security_context_cache: Dict[str, Dict[str, Any]] = {}
self.logger = logging.getLogger(__name__)
# Initialize default roles
self._initialize_default_roles()
Default role initialization establishes enterprise-standard roles that align with organizational structures and regulatory requirements:
def _initialize_default_roles(self):
"""Initialize enterprise default roles"""
# Data Protection Officer
dpo_role = Role(
role_id="dpo",
role_name="Data Protection Officer",
permissions={
Permission.READ_PERSONAL_DATA,
Permission.ACCESS_AUDIT_LOGS,
Permission.COMPLIANCE_VIEW
},
description="GDPR Data Protection Officer role"
)
# System Administrator
admin_role = Role(
role_id="system_admin",
role_name="System Administrator",
permissions={
Permission.SYSTEM_ADMIN,
Permission.MANAGE_USERS,
Permission.MANAGE_ROLES,
Permission.ACCESS_AUDIT_LOGS
},
description="Full system administration role"
)
We also define operational roles that provide least-privilege access for day-to-day business functions:
# Data Analyst
analyst_role = Role(
role_id="data_analyst",
role_name="Data Analyst",
permissions={Permission.READ_PERSONAL_DATA},
description="Read-only access to anonymized data"
)
# Customer Service Representative
csr_role = Role(
role_id="customer_service",
role_name="Customer Service Representative",
permissions={
Permission.READ_PERSONAL_DATA,
Permission.WRITE_PERSONAL_DATA
},
description="Limited customer data access for support"
)
self.roles.update({
"dpo": dpo_role,
"system_admin": admin_role,
"data_analyst": analyst_role,
"customer_service": csr_role
})
Multi-factor authentication with zero-trust verification ensures that every authentication attempt is thoroughly validated before granting access:
async def authenticate_user(self, username: str, password: str,
additional_factors: Dict[str, Any] = None) -> Dict[str, Any]:
"""Multi-factor authentication with zero-trust verification"""
# Find user
user = None
for u in self.users.values():
if u.username == username:
user = u
break
if not user or not user.is_active:
await self._log_security_event("authentication_failed", {
'username': username,
'reason': 'user_not_found_or_inactive'
})
return {'success': False, 'error': 'Authentication failed'}
Password verification and multi-factor authentication create multiple security layers, following defense-in-depth principles:
# Verify password
if not self._verify_password(password, user.password_hash):
await self._log_security_event("authentication_failed", {
'user_id': user.user_id,
'username': username,
'reason': 'invalid_password'
})
return {'success': False, 'error': 'Authentication failed'}
# Multi-factor authentication
if user.mfa_enabled:
mfa_result = await self._verify_mfa(user, additional_factors or {})
if not mfa_result['success']:
return mfa_result
Successful authentication creates a comprehensive security context and generates secure session tokens for subsequent authorization decisions:
# Create security context
security_context = await self._create_security_context(user)
# Generate session token
session_token = await self._generate_session_token(user, security_context)
# Update user login time
user.last_login = datetime.now()
await self._log_security_event("authentication_success", {
'user_id': user.user_id,
'username': username,
'session_token_id': session_token['token_id']
})
return {
'success': True,
'user': {
'user_id': user.user_id,
'username': user.username,
'roles': list(user.roles)
},
'session_token': session_token['token'],
'token_expires_at': session_token['expires_at'],
'security_context': security_context
}
Zero-trust authorization validates every request, regardless of the user's authentication status, implementing continuous verification principles:
async def authorize_request(self, session_token: str,
required_permission: Permission,
resource_context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Zero-trust authorization for every request"""
# Validate session token
token_validation = await self._validate_session_token(session_token)
if not token_validation['valid']:
return {'authorized': False, 'error': 'Invalid session token'}
user = token_validation['user']
# Check if user has required permission
user_permissions = await self._get_user_permissions(user.user_id)
if required_permission not in user_permissions:
await self._log_security_event("authorization_denied", {
'user_id': user.user_id,
'required_permission': required_permission.value,
'resource_context': resource_context
})
return {'authorized': False, 'error': 'Insufficient permissions'}
Contextual access policies add dynamic authorization layers based on environmental factors, time, location, and data sensitivity:
# Apply contextual access policies
policy_result = await self._evaluate_access_policies(
user, required_permission, resource_context or {}
)
if not policy_result['allowed']:
await self._log_security_event("authorization_denied", {
'user_id': user.user_id,
'required_permission': required_permission.value,
'resource_context': resource_context,
'policy_reason': policy_result['reason']
})
return {'authorized': False, 'error': policy_result['reason']}
# Success - log access
await self._log_security_event("authorization_granted", {
'user_id': user.user_id,
'required_permission': required_permission.value,
'resource_context': resource_context
})
return {
'authorized': True,
'user_context': {
'user_id': user.user_id,
'username': user.username,
'roles': list(user.roles)
},
'permission_granted': required_permission.value,
'access_constraints': policy_result.get('constraints', {})
}
Security context creation establishes comprehensive user context including permissions, risk assessment, and ABAC attributes for informed authorization decisions:
async def _create_security_context(self, user: User) -> Dict[str, Any]:
"""Create comprehensive security context for user"""
# Get user permissions
permissions = await self._get_user_permissions(user.user_id)
# Calculate risk score
risk_score = await self._calculate_user_risk_score(user)
# Get attribute-based access control attributes
abac_attributes = {
'department': user.attributes.get('department'),
'clearance_level': user.attributes.get('clearance_level', 'standard'),
'location': user.attributes.get('location'),
'employment_type': user.attributes.get('employment_type', 'employee')
}
security_context = {
'user_id': user.user_id,
'username': user.username,
'roles': list(user.roles),
'permissions': [p.value for p in permissions],
'risk_score': risk_score,
'abac_attributes': abac_attributes,
'context_created_at': datetime.now().isoformat(),
'session_constraints': await self._calculate_session_constraints(user, risk_score)
}
# Cache security context
self.security_context_cache[user.user_id] = security_context
return security_context
Dynamic access policy evaluation implements contextual security controls that adapt to environmental factors, data sensitivity, and operational constraints:
async def _evaluate_access_policies(self, user: User, permission: Permission,
resource_context: Dict[str, Any]) -> Dict[str, Any]:
"""Evaluate dynamic access policies"""
# Time-based access control
current_hour = datetime.now().hour
if current_hour < 6 or current_hour > 22: # Outside business hours
if permission in [Permission.DELETE_PERSONAL_DATA, Permission.SYSTEM_ADMIN]:
return {
'allowed': False,
'reason': 'High-risk operations not allowed outside business hours'
}
# Location-based access control
user_location = user.attributes.get('location')
if resource_context.get('sensitive_data') and user_location == 'remote':
return {
'allowed': False,
'reason': 'Sensitive data access not allowed from remote locations'
}
Data classification and clearance level policies ensure only authorized personnel can access highly sensitive information:
# Data sensitivity-based access
data_classification = resource_context.get('data_classification')
if data_classification == 'highly_confidential':
required_clearance = user.attributes.get('clearance_level')
if required_clearance != 'high':
return {
'allowed': False,
'reason': 'Insufficient clearance level for highly confidential data'
}
# Concurrent session limits
active_sessions = len([
s for s in self.active_sessions.values()
if s['user_id'] == user.user_id
])
if active_sessions > 3:
return {
'allowed': False,
'reason': 'Maximum concurrent sessions exceeded'
}
return {
'allowed': True,
'constraints': {
'session_timeout_minutes': 60,
'require_re_auth_for_sensitive': True
}
}
Attribute-Based Access Control (ABAC) provides fine-grained authorization decisions based on subject, object, environment, and action attributes:
async def implement_attribute_based_access_control(self,
request_context: Dict[str, Any]) -> Dict[str, Any]:
"""Advanced ABAC (Attribute-Based Access Control) implementation"""
subject_attrs = request_context.get('subject_attributes', {})
object_attrs = request_context.get('object_attributes', {})
environment_attrs = request_context.get('environment_attributes', {})
action = request_context.get('action')
The ABAC policy engine evaluates complex rules using lambda functions for flexible, runtime-configurable authorization policies:
# ABAC policy engine
policy_rules = [
{
'rule_id': 'sensitive_data_access',
'condition': lambda s, o, e, a: (
o.get('classification') == 'sensitive' and
s.get('clearance_level') != 'high'
),
'effect': 'deny',
'reason': 'Insufficient clearance for sensitive data'
},
{
'rule_id': 'time_restricted_operations',
'condition': lambda s, o, e, a: (
a in ['delete', 'modify'] and
not (9 <= e.get('current_hour', 0) <= 17)
),
'effect': 'deny',
'reason': 'Destructive operations only allowed during business hours'
},
{
'rule_id': 'department_data_access',
'condition': lambda s, o, e, a: (
o.get('department') != s.get('department') and
a == 'read' and
s.get('role') != 'admin'
),
'effect': 'deny',
'reason': 'Cross-department data access requires admin role'
}
]
Policy evaluation follows a deny-by-default approach, with explicit allow decisions only when no restrictive policies match:
# Evaluate policies
for rule in policy_rules:
if rule['condition'](subject_attrs, object_attrs, environment_attrs, action):
return {
'decision': rule['effect'],
'rule_applied': rule['rule_id'],
'reason': rule['reason']
}
# Default allow if no deny rules match
return {
'decision': 'allow',
'rule_applied': 'default_allow',
'reason': 'No restrictive policies applied'
}
Dynamic risk scoring evaluates multiple factors to adjust security controls based on user behavior and environmental conditions:
async def _calculate_user_risk_score(self, user: User) -> float:
"""Calculate dynamic user risk score"""
risk_factors = []
# Time since last login
if user.last_login:
days_since_login = (datetime.now() - user.last_login).days
if days_since_login > 30:
risk_factors.append(('inactive_user', 0.3))
# Role-based risk
admin_roles = {'system_admin', 'security_admin'}
if any(role in admin_roles for role in user.roles):
risk_factors.append(('admin_privileges', 0.4))
# MFA status
if not user.mfa_enabled:
risk_factors.append(('no_mfa', 0.5))
# Location risk
location = user.attributes.get('location', 'unknown')
if location == 'remote':
risk_factors.append(('remote_access', 0.2))
# Calculate composite risk score (0-1 scale)
base_risk = 0.1 # Everyone has some base risk
additional_risk = sum(factor[1] for factor in risk_factors)
return min(1.0, base_risk + additional_risk)
Comprehensive security event logging provides real-time threat detection and forensic capabilities for enterprise security operations:
class SecurityEventLogger:
"""Comprehensive security event logging"""
def __init__(self):
self.security_events: List[Dict[str, Any]] = []
self.alert_thresholds = {
'failed_auth_attempts': 5,
'privilege_escalation_attempts': 3,
'suspicious_access_patterns': 10
}
Security event logging captures detailed information about all security-relevant activities with automated threat analysis:
async def log_security_event(self, event_type: str, event_details: Dict[str, Any]):
"""Log security events with threat detection"""
event = {
'event_id': f"sec_{int(datetime.now().timestamp())}",
'event_type': event_type,
'timestamp': datetime.now().isoformat(),
'details': event_details,
'severity': self._calculate_event_severity(event_type, event_details)
}
self.security_events.append(event)
# Check for security threats
await self._analyze_for_threats(event)
Severity calculation enables prioritized security response and automated alerting based on threat levels:
def _calculate_event_severity(self, event_type: str, details: Dict[str, Any]) -> str:
"""Calculate event severity for security monitoring"""
high_severity_events = {
'authentication_failed',
'authorization_denied',
'privilege_escalation_attempt',
'data_breach_detected'
}
medium_severity_events = {
'unusual_access_pattern',
'session_timeout',
'mfa_failure'
}
if event_type in high_severity_events:
return 'high'
elif event_type in medium_severity_events:
return 'medium'
else:
return 'low'
Threat analysis automatically detects common attack patterns including brute force attempts and privilege escalation:
async def _analyze_for_threats(self, event: Dict[str, Any]):
"""Analyze events for security threats"""
# Check for brute force attacks
if event['event_type'] == 'authentication_failed':
await self._check_brute_force_attack(event)
# Check for privilege escalation
if event['event_type'] == 'authorization_denied':
await self._check_privilege_escalation(event)
# Check for unusual access patterns
await self._check_access_patterns(event)
Security reporting provides executive-level dashboards and operational intelligence for security teams to assess threat landscape and system health:
async def generate_security_report(self, hours: int = 24) -> Dict[str, Any]:
"""Generate comprehensive security report"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_events = [
event for event in self.security_events
if datetime.fromisoformat(event['timestamp']) >= cutoff_time
]
# Security metrics
metrics = {
'total_events': len(recent_events),
'high_severity_events': len([e for e in recent_events if e['severity'] == 'high']),
'failed_authentications': len([e for e in recent_events if e['event_type'] == 'authentication_failed']),
'authorization_denials': len([e for e in recent_events if e['event_type'] == 'authorization_denied']),
'unique_users_with_events': len(set(e['details'].get('user_id') for e in recent_events if e['details'].get('user_id')))
}
return {
'report_period_hours': hours,
'report_timestamp': datetime.now().isoformat(),
'security_metrics': metrics,
'threat_level': self._assess_threat_level(metrics),
'recommendations': self._generate_security_recommendations(metrics),
'events': recent_events
}
Part 3: Advanced Encryption and Data Protection (20 minutes)¶
End-to-End Encryption Implementation¶
🗂️ File: src/session10/security/encryption_framework.py
- Advanced encryption systems
Enterprise-grade encryption requires comprehensive key management and multiple encryption algorithms. Let's start with the essential imports and key management structures:
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
import json
Encryption keys require comprehensive metadata tracking for enterprise security compliance and operational management:
@dataclass
class EncryptionKey:
"""Encryption key with metadata"""
key_id: str
key_type: str # symmetric, asymmetric_public, asymmetric_private
key_data: bytes
created_at: datetime
expires_at: Optional[datetime] = None
algorithm: str = "AES-256-GCM"
purpose: str = "data_encryption"
The Advanced Encryption Manager orchestrates all cryptographic operations with comprehensive key lifecycle management and audit capabilities:
class AdvancedEncryptionManager:
"""Enterprise-grade encryption management system"""
def __init__(self):
self.encryption_keys: Dict[str, EncryptionKey] = {}
self.key_rotation_schedule: Dict[str, timedelta] = {}
self.encrypted_data_registry: Dict[str, Dict[str, Any]] = {}
Key generation creates both symmetric and asymmetric keys, supporting hybrid encryption schemes that combine the efficiency of symmetric encryption with the security of asymmetric key exchange:
async def generate_encryption_keys(self, purpose: str = "data_encryption") -> Dict[str, Any]:
"""Generate comprehensive encryption key set"""
key_set_id = f"keyset_{int(datetime.now().timestamp())}"
# Generate symmetric key for data encryption
symmetric_key_data = Fernet.generate_key()
symmetric_key = EncryptionKey(
key_id=f"{key_set_id}_symmetric",
key_type="symmetric",
key_data=symmetric_key_data,
created_at=datetime.now(),
expires_at=datetime.now() + timedelta(days=365),
algorithm="Fernet",
purpose=purpose
)
RSA key pair generation provides secure key exchange capabilities with enterprise-grade 4096-bit keys:
# Generate RSA key pair for key exchange
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096
)
public_key_data = private_key.public_key().public_key_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
private_key_data = private_key.private_key_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
Key object creation and storage with rotation scheduling ensures proper key lifecycle management:
public_key_obj = EncryptionKey(
key_id=f"{key_set_id}_public",
key_type="asymmetric_public",
key_data=public_key_data,
created_at=datetime.now(),
algorithm="RSA-4096",
purpose="key_exchange"
)
private_key_obj = EncryptionKey(
key_id=f"{key_set_id}_private",
key_type="asymmetric_private",
key_data=private_key_data,
created_at=datetime.now(),
algorithm="RSA-4096",
purpose="key_exchange"
)
# Store keys
self.encryption_keys[symmetric_key.key_id] = symmetric_key
self.encryption_keys[public_key_obj.key_id] = public_key_obj
self.encryption_keys[private_key_obj.key_id] = private_key_obj
# Set rotation schedule
self.key_rotation_schedule[symmetric_key.key_id] = timedelta(days=90)
self.key_rotation_schedule[public_key_obj.key_id] = timedelta(days=730)
return {
'key_set_id': key_set_id,
'symmetric_key_id': symmetric_key.key_id,
'public_key_id': public_key_obj.key_id,
'private_key_id': private_key_obj.key_id,
'keys_generated_at': datetime.now().isoformat()
}
Data encryption with comprehensive metadata tracking ensures auditability and compliance with data protection regulations:
async def encrypt_sensitive_data(self, data: str, key_id: str,
classification: str = "confidential") -> Dict[str, Any]:
"""Encrypt sensitive data with comprehensive protection"""
if key_id not in self.encryption_keys:
return {'success': False, 'error': 'Encryption key not found'}
encryption_key = self.encryption_keys[key_id]
if encryption_key.key_type == "symmetric":
# Use Fernet for symmetric encryption
fernet = Fernet(encryption_key.key_data)
encrypted_data = fernet.encrypt(data.encode())
else:
return {'success': False, 'error': 'Unsupported key type for data encryption'}
Encryption metadata includes integrity hashes and classification information for comprehensive data governance:
# Create encryption metadata
encryption_metadata = {
'encrypted_at': datetime.now().isoformat(),
'key_id': key_id,
'algorithm': encryption_key.algorithm,
'classification': classification,
'data_hash': self._compute_hash(data),
'encryption_version': '1.0'
}
# Generate unique identifier for encrypted data
encrypted_data_id = f"enc_{int(datetime.now().timestamp())}"
# Store encryption registry entry
self.encrypted_data_registry[encrypted_data_id] = {
'metadata': encryption_metadata,
'encrypted_data': base64.b64encode(encrypted_data).decode(),
'access_log': []
}
return {
'success': True,
'encrypted_data_id': encrypted_data_id,
'metadata': encryption_metadata
}
Decryption with comprehensive access logging provides audit trails essential for compliance and security monitoring:
async def decrypt_sensitive_data(self, encrypted_data_id: str,
accessor_id: str,
purpose: str) -> Dict[str, Any]:
"""Decrypt sensitive data with access logging"""
if encrypted_data_id not in self.encrypted_data_registry:
return {'success': False, 'error': 'Encrypted data not found'}
data_entry = self.encrypted_data_registry[encrypted_data_id]
key_id = data_entry['metadata']['key_id']
if key_id not in self.encryption_keys:
return {'success': False, 'error': 'Decryption key not found'}
encryption_key = self.encryption_keys[key_id]
# Decrypt data
try:
encrypted_data = base64.b64decode(data_entry['encrypted_data'])
if encryption_key.key_type == "symmetric":
fernet = Fernet(encryption_key.key_data)
decrypted_data = fernet.decrypt(encrypted_data).decode()
else:
return {'success': False, 'error': 'Unsupported key type for decryption'}
except Exception as e:
return {'success': False, 'error': f'Decryption failed: {str(e)}'}
Access logging captures who accessed what data for what purpose, creating immutable audit trails:
# Log access
access_entry = {
'accessor_id': accessor_id,
'access_timestamp': datetime.now().isoformat(),
'purpose': purpose,
'decryption_success': True
}
data_entry['access_log'].append(access_entry)
return {
'success': True,
'decrypted_data': decrypted_data,
'metadata': data_entry['metadata'],
'access_logged': True
}
Field-level encryption enables granular data protection within database records, allowing different encryption policies for different data types:
async def implement_field_level_encryption(self, record: Dict[str, Any],
field_encryption_map: Dict[str, str]) -> Dict[str, Any]:
"""Implement field-level encryption for database records"""
encrypted_record = record.copy()
encryption_metadata = {}
for field_name, key_id in field_encryption_map.items():
if field_name in record:
field_value = str(record[field_name])
# Encrypt field
encryption_result = await self.encrypt_sensitive_data(
field_value, key_id, "field_level"
)
if encryption_result['success']:
encrypted_record[field_name] = encryption_result['encrypted_data_id']
encryption_metadata[field_name] = encryption_result['metadata']
else:
return {'success': False, 'error': f'Failed to encrypt field {field_name}'}
return {
'success': True,
'encrypted_record': encrypted_record,
'encryption_metadata': encryption_metadata,
'original_fields_encrypted': list(field_encryption_map.keys())
}
Key rotation is essential for maintaining cryptographic security over time, requiring careful coordination to avoid data loss:
async def rotate_encryption_keys(self, key_id: str) -> Dict[str, Any]:
"""Rotate encryption keys for security"""
if key_id not in self.encryption_keys:
return {'success': False, 'error': 'Key not found'}
old_key = self.encryption_keys[key_id]
# Generate new key of same type
if old_key.key_type == "symmetric":
new_key_data = Fernet.generate_key()
new_key = EncryptionKey(
key_id=f"{key_id}_rotated_{int(datetime.now().timestamp())}",
key_type="symmetric",
key_data=new_key_data,
created_at=datetime.now(),
expires_at=datetime.now() + timedelta(days=365),
algorithm=old_key.algorithm,
purpose=old_key.purpose
)
else:
return {'success': False, 'error': 'Key rotation not implemented for this key type'}
# Store new key
self.encryption_keys[new_key.key_id] = new_key
# Mark old key for deprecation
old_key.expires_at = datetime.now() + timedelta(days=30) # Grace period
return {
'success': True,
'old_key_id': key_id,
'new_key_id': new_key.key_id,
'rotation_timestamp': datetime.now().isoformat(),
'grace_period_days': 30
}
Data integrity verification through cryptographic hashing ensures encrypted data hasn't been tampered with:
def _compute_hash(self, data: str) -> str:
"""Compute SHA-256 hash for data integrity"""
return hashlib.sha256(data.encode()).hexdigest()
Encryption status reporting provides operational intelligence for security teams and compliance auditors:
async def get_encryption_status_report(self) -> Dict[str, Any]:
"""Generate comprehensive encryption status report"""
# Key statistics
key_stats = {
'total_keys': len(self.encryption_keys),
'symmetric_keys': len([k for k in self.encryption_keys.values() if k.key_type == "symmetric"]),
'asymmetric_keys': len([k for k in self.encryption_keys.values() if k.key_type.startswith("asymmetric")]),
'keys_near_expiry': len([
k for k in self.encryption_keys.values()
if k.expires_at and (k.expires_at - datetime.now()).days <= 30
])
}
# Encrypted data statistics
data_stats = {
'total_encrypted_items': len(self.encrypted_data_registry),
'classification_breakdown': {},
'total_access_events': sum(
len(entry['access_log']) for entry in self.encrypted_data_registry.values()
)
}
# Classification breakdown
for entry in self.encrypted_data_registry.values():
classification = entry['metadata'].get('classification', 'unknown')
data_stats['classification_breakdown'][classification] = \
data_stats['classification_breakdown'].get(classification, 0) + 1
return {
'report_timestamp': datetime.now().isoformat(),
'key_management': key_stats,
'data_encryption': data_stats,
'compliance_status': 'compliant',
'recommendations': self._generate_encryption_recommendations(key_stats, data_stats)
}
Recommendation generation provides actionable insights for maintaining optimal encryption security posture:
def _generate_encryption_recommendations(self, key_stats: Dict[str, Any],
data_stats: Dict[str, Any]) -> List[str]:
"""Generate encryption recommendations"""
recommendations = []
if key_stats['keys_near_expiry'] > 0:
recommendations.append(f"Rotate {key_stats['keys_near_expiry']} keys that are near expiry")
if data_stats['total_encrypted_items'] == 0:
recommendations.append("No encrypted data found - consider implementing encryption for sensitive data")
if key_stats['asymmetric_keys'] == 0:
recommendations.append("Consider implementing asymmetric encryption for key exchange")
return recommendations
Module Summary¶
You've now mastered advanced security and compliance for enterprise agent systems:
✅ GDPR Compliance: Implemented comprehensive data protection with automated privacy controls
✅ Advanced RBAC: Built fine-grained role-based access control with zero-trust architecture
✅ Encryption Framework: Created enterprise-grade encryption with key rotation and field-level protection
✅ Audit Logging: Designed comprehensive security event logging and compliance monitoring
✅ Zero-Trust Security: Implemented context-aware authentication and authorization systems
Next Steps¶
- Continue to Module B: Enterprise Operations & Scaling for operational excellence
- Return to Core: Session 10 Main
- Portfolio Project: Build a complete enterprise-grade secure agent system
🗂️ Source Files for Module A:
src/session10/security/gdpr_compliance.py
- Comprehensive GDPR compliance frameworksrc/session10/security/enterprise_auth.py
- Advanced RBAC and zero-trust securitysrc/session10/security/encryption_framework.py
- Enterprise encryption and key management
📝 Multiple Choice Test - Module A¶
Test your understanding of advanced security and compliance for enterprise agent systems:
Question 1: Under GDPR, which legal basis requires the most complex implementation due to withdrawal requirements and proof of consent?
A) Contract
B) Legal obligation
C) Consent
D) Legitimate interests
Question 2: In zero-trust architecture, what principle governs every access request?
A) Trust but verify
B) Never trust, always verify
C) Verify once, trust always
D) Trust by default
Question 3: What is the primary purpose of k-anonymity in data anonymization?
A) To encrypt sensitive data
B) To ensure each individual cannot be distinguished from at least k-1 others
C) To hash personal identifiers
D) To compress large datasets
Question 4: In ABAC (Attribute-Based Access Control), which attributes are NOT typically evaluated?
A) Subject attributes
B) Object attributes
C) Environment attributes
D) Network attributes
Question 5: What is the recommended approach for key rotation in enterprise encryption systems?
A) Immediate replacement without grace period
B) Rotate keys only when compromised
C) Generate new key and provide grace period for old key
D) Never rotate keys to avoid data loss
Question 6: Which GDPR article mandates Data Protection Impact Assessments (DPIA) for high-risk processing?
A) Article 15
B) Article 17
C) Article 20
D) Article 35
Question 7: In field-level encryption, what is the primary advantage over full-record encryption?
A) Faster encryption performance
B) Granular protection allowing different encryption policies per field
C) Smaller storage requirements
D) Simplified key management
🧭 Navigation¶
Previous: Session 10 Main
Optional Deep Dive Modules: