Skip to content

Session 8 - Module D: Security & Compliance (45 minutes)

Prerequisites: Session 8 Core Section Complete
Target Audience: Security engineers and compliance specialists
Cognitive Load: 4 security concepts


Module Overview

This module explores enterprise security and compliance frameworks for Agno agent systems including zero-trust architecture, data encryption, audit logging, privacy controls, and regulatory compliance (GDPR, HIPAA, SOC2). You'll learn to build security-first agent systems that meet enterprise compliance requirements.

Learning Objectives

By the end of this module, you will: - Implement zero-trust security architecture for agent communications - Design comprehensive data encryption and privacy protection systems - Create audit logging and compliance monitoring frameworks - Build regulatory compliance systems for GDPR, HIPAA, and SOC2 requirements


Part 1: Zero-Trust Security Architecture (20 minutes)

Comprehensive Security Framework

🗂️ File: src/session8/security_architecture.py - Zero-trust security implementation

We'll start by importing the necessary security libraries and defining our security models. This foundation provides the building blocks for enterprise-grade security:

from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncio
import hashlib
import jwt
import logging
from enum import Enum
import json
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64

Next, we define our security classification levels and authentication methods. These enums establish the security framework that will govern all agent operations:

class SecurityLevel(Enum):
    """Security clearance levels for agent operations"""
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"
    TOP_SECRET = "top_secret"

class AuthenticationMethod(Enum):
    """Authentication methods supported"""
    API_KEY = "api_key"
    JWT_TOKEN = "jwt_token"
    OAUTH2 = "oauth2"
    MUTUAL_TLS = "mutual_tls"
    CERTIFICATE = "certificate"

Now we create the SecurityContext dataclass that carries security information throughout our system. This context ensures every operation has proper security oversight:

@dataclass
class SecurityContext:
    """Security context for agent operations"""
    user_id: str
    session_id: str
    security_level: SecurityLevel
    authentication_method: AuthenticationMethod
    permissions: List[str] = field(default_factory=list)
    data_classification: str = "internal"
    audit_required: bool = True
    encryption_required: bool = True
    created_at: datetime = field(default_factory=datetime.now)
    expires_at: Optional[datetime] = None

The ZeroTrustSecurityManager is the core component that implements our security architecture. It follows the principle of "never trust, always verify":

class ZeroTrustSecurityManager:
    """Zero-trust security manager for agent systems"""

    def __init__(self, encryption_key: bytes = None):
        self.encryption_key = encryption_key or Fernet.generate_key()
        self.cipher_suite = Fernet(self.encryption_key)
        self.security_policies = {}
        self.active_sessions = {}
        self.audit_logger = logging.getLogger("security_audit")
        self.threat_detection = ThreatDetectionSystem()

The security policy configuration defines our comprehensive security framework. This method establishes enterprise-grade security controls across all domains:

    def setup_security_policies(self) -> Dict[str, Any]:
        """Configure comprehensive security policies"""

        # Authentication policies enforce strong identity verification
        authentication_config = {
            "session_timeout_minutes": 60,
            "max_concurrent_sessions": 5,
            "password_policy": {
                "min_length": 12,
                "require_special_chars": True,
                "require_numbers": True,
                "require_uppercase": True,
                "history_count": 12
            },
            "mfa_required": True,
            "certificate_validation": True
        }

Authorization policies implement role-based access control with the principle of least privilege:

        # Authorization policies control access to resources
        authorization_config = {
            "rbac_enabled": True,
            "attribute_based_access": True,
            "dynamic_permissions": True,
            "principle_of_least_privilege": True,
            "permission_inheritance": False
        }

Encryption policies protect data both at rest and in transit using industry-standard algorithms:

        # Encryption policies for data protection
        encryption_config = {
            "data_at_rest": {
                "algorithm": "AES-256-GCM",
                "key_rotation_days": 90,
                "encrypted_fields": [
                    "conversation_content",
                    "user_data", 
                    "agent_responses",
                    "tool_parameters"
                ]
            },
            "data_in_transit": {
                "tls_version": "1.3",
                "cipher_suites": [
                    "TLS_AES_256_GCM_SHA384",
                    "TLS_CHACHA20_POLY1305_SHA256"
                ],
                "certificate_pinning": True
            }
        }

Audit logging ensures compliance with regulatory requirements and provides forensic capabilities:

        # Audit and monitoring configuration
        audit_config = {
            "enabled": True,
            "log_level": "INFO",
            "retention_days": 2555,  # 7 years
            "real_time_monitoring": True,
            "log_integrity_checking": True,
            "log_events": [
                "authentication_attempts",
                "authorization_decisions",
                "data_access",
                "configuration_changes",
                "security_incidents"
            ]
        }

Threat detection configuration enables proactive security monitoring:

        # Threat detection and response
        threat_detection_config = {
            "enabled": True,
            "behavioral_analysis": True,
            "anomaly_detection": True,
            "real_time_alerts": True,
            "threat_intelligence": True
        }

        # Combine all security configurations
        security_config = {
            "authentication": authentication_config,
            "authorization": authorization_config,
            "encryption": encryption_config,
            "audit_logging": audit_config,
            "threat_detection": threat_detection_config
        }

        self.security_policies = security_config
        return security_config

The authenticate_request method implements zero-trust authentication where every request must be verified regardless of source:

    async def authenticate_request(self, request_data: Dict[str, Any]) -> SecurityContext:
        """Authenticate and authorize agent requests with zero-trust principles"""

        # Extract authentication credentials from multiple sources
        auth_header = request_data.get("authorization")
        api_key = request_data.get("api_key")
        client_cert = request_data.get("client_certificate")

        if not auth_header and not api_key and not client_cert:
            raise SecurityException("No authentication credentials provided")

After credential extraction, we authenticate the identity using the appropriate method:

        # Determine and execute authentication method
        auth_method = self._determine_auth_method(request_data)
        user_identity = await self._authenticate_identity(request_data, auth_method)

        # Create comprehensive security context
        security_context = SecurityContext(
            user_id=user_identity["user_id"],
            session_id=self._generate_session_id(),
            security_level=SecurityLevel(user_identity.get("security_level", "internal")),
            authentication_method=auth_method,
            permissions=user_identity.get("permissions", []),
            data_classification=user_identity.get("data_classification", "internal")
        )

Finally, we authorize the request and create an audit trail:

        # Verify authorization and log security events
        await self._authorize_request(security_context, request_data)

        await self._log_security_event("authentication_success", {
            "user_id": security_context.user_id,
            "auth_method": auth_method.value,
            "source_ip": request_data.get("source_ip"),
            "user_agent": request_data.get("user_agent")
        })

        # Store active session for future validation
        self.active_sessions[security_context.session_id] = security_context
        return security_context

The identity authentication dispatcher routes to specific authentication handlers:

    async def _authenticate_identity(self, request_data: Dict[str, Any], 
                                   auth_method: AuthenticationMethod) -> Dict[str, Any]:
        """Authenticate user identity based on method"""

        if auth_method == AuthenticationMethod.JWT_TOKEN:
            return await self._authenticate_jwt(request_data.get("authorization"))
        elif auth_method == AuthenticationMethod.API_KEY:
            return await self._authenticate_api_key(request_data.get("api_key"))
        elif auth_method == AuthenticationMethod.MUTUAL_TLS:
            return await self._authenticate_mtls(request_data.get("client_certificate"))
        else:
            raise SecurityException(f"Unsupported authentication method: {auth_method}")

JWT authentication provides stateless token validation with comprehensive security checks:

    async def _authenticate_jwt(self, auth_header: str) -> Dict[str, Any]:
        """Authenticate JWT token"""

        try:
            # Extract token from Bearer format and decode
            token = auth_header.split(" ")[1] if auth_header.startswith("Bearer ") else auth_header

            decoded_token = jwt.decode(
                token, 
                self._get_jwt_secret(),
                algorithms=["HS256"],
                options={"verify_exp": True}
            )

We perform additional validation beyond standard JWT verification:

            # Comprehensive token validation
            if not decoded_token.get("user_id"):
                raise SecurityException("Invalid token: missing user_id")

            if datetime.now() > datetime.fromtimestamp(decoded_token.get("exp", 0)):
                raise SecurityException("Token expired")

            return {
                "user_id": decoded_token["user_id"],
                "permissions": decoded_token.get("permissions", []),
                "security_level": decoded_token.get("security_level", "internal"),
                "data_classification": decoded_token.get("data_classification", "internal")
            }

        except jwt.InvalidTokenError as e:
            raise SecurityException(f"JWT authentication failed: {str(e)}")

The authorization method implements fine-grained access control with multiple validation layers:

    async def _authorize_request(self, security_context: SecurityContext, 
                               request_data: Dict[str, Any]):
        """Authorize request based on security context and policies"""

        requested_action = request_data.get("action", "unknown")
        requested_resource = request_data.get("resource", "unknown")

        # Permission-based authorization check
        required_permission = f"{requested_action}:{requested_resource}"

        if required_permission not in security_context.permissions and "admin:*" not in security_context.permissions:
            await self._log_security_event("authorization_denied", {
                "user_id": security_context.user_id,
                "requested_permission": required_permission,
                "user_permissions": security_context.permissions
            })
            raise SecurityException(f"Access denied: insufficient permissions for {required_permission}")

Security clearance validation ensures users can only access appropriately classified resources:

        # Security clearance validation
        resource_security_level = self._get_resource_security_level(requested_resource)
        if security_context.security_level.value < resource_security_level.value:
            raise SecurityException(f"Access denied: insufficient security clearance")

        # Additional contextual authorization checks
        await self._perform_contextual_authorization(security_context, request_data)

Data encryption follows classification policies to protect sensitive information:

    async def encrypt_sensitive_data(self, data: Any, 
                                   security_context: SecurityContext) -> Dict[str, Any]:
        """Encrypt sensitive data based on classification"""

        if not security_context.encryption_required:
            return {"encrypted": False, "data": data}

        # Prepare data for encryption
        data_string = json.dumps(data) if not isinstance(data, str) else data
        encrypted_data = self.cipher_suite.encrypt(data_string.encode())

Encryption metadata provides audit trail and key management information:

        # Generate comprehensive encryption metadata
        encryption_metadata = {
            "algorithm": "Fernet",
            "encrypted_at": datetime.now().isoformat(),
            "encrypted_by": security_context.user_id,
            "key_version": "v1",
            "data_classification": security_context.data_classification
        }

        return {
            "encrypted": True,
            "data": base64.b64encode(encrypted_data).decode(),
            "metadata": encryption_metadata
        }

Decryption includes authorization checks to ensure users can access the decrypted data:

    async def decrypt_sensitive_data(self, encrypted_package: Dict[str, Any],
                                   security_context: SecurityContext) -> Any:
        """Decrypt sensitive data with authorization checks"""

        if not encrypted_package.get("encrypted"):
            return encrypted_package.get("data")

        # Authorization check based on data classification
        metadata = encrypted_package.get("metadata", {})
        data_classification = metadata.get("data_classification", "internal")

        if not self._can_access_classification(security_context, data_classification):
            raise SecurityException("Access denied: insufficient clearance for data classification")

Decryption process with error handling and data parsing:

        # Perform decryption with comprehensive error handling
        try:
            encrypted_data = base64.b64decode(encrypted_package["data"].encode())
            decrypted_data = self.cipher_suite.decrypt(encrypted_data)

            # Intelligent data parsing
            try:
                return json.loads(decrypted_data.decode())
            except json.JSONDecodeError:
                return decrypted_data.decode()

        except Exception as e:
            raise SecurityException(f"Decryption failed: {str(e)}")

Utility methods for session management and security infrastructure:

    def _generate_session_id(self) -> str:
        """Generate secure session ID"""
        import secrets
        return secrets.token_urlsafe(32)

    def _get_jwt_secret(self) -> str:
        """Get JWT signing secret (in production, use secure key management)"""
        return "your-super-secret-jwt-key-change-this-in-production"

class SecurityException(Exception):
    """Security-related exceptions"""
    pass

The ThreatDetectionSystem provides advanced security monitoring and response capabilities:

class ThreatDetectionSystem:
    """Advanced threat detection for agent systems"""

    def __init__(self):
        self.threat_patterns = {}
        self.anomaly_baseline = {}
        self.active_threats = []
        self.logger = logging.getLogger("threat_detection")

Threat detection configuration defines patterns and response strategies:

    def setup_threat_detection(self) -> Dict[str, Any]:
        """Configure threat detection patterns"""

        # Define threat detection patterns
        patterns_config = {
            "brute_force": {
                "failed_attempts_threshold": 5,
                "time_window_minutes": 5,
                "block_duration_minutes": 30
            },
            "anomalous_behavior": {
                "request_rate_threshold": 1000,  # requests per minute
                "unusual_access_patterns": True,
                "geographic_anomalies": True
            },
            "data_exfiltration": {
                "large_response_threshold_mb": 10,
                "rapid_requests_threshold": 50,
                "sensitive_data_access_monitoring": True
            },
            "injection_attacks": {
                "sql_injection_patterns": True,
                "prompt_injection_detection": True,
                "code_injection_patterns": True
            }
        }

Response actions are tiered based on threat severity:

        # Define response actions by risk level
        response_actions = {
            "low_risk": ["log_event", "monitor_closely"],
            "medium_risk": ["rate_limit", "require_additional_auth"],
            "high_risk": ["block_temporarily", "alert_security_team"],
            "critical": ["block_permanently", "emergency_response"]
        }

        threat_config = {
            "patterns": patterns_config,
            "response_actions": response_actions
        }

        self.threat_patterns = threat_config
        return threat_config

The threat analysis engine evaluates requests against multiple threat vectors:

    async def analyze_request_for_threats(self, request_data: Dict[str, Any],
                                        security_context: SecurityContext) -> Dict[str, Any]:
        """Analyze request for potential security threats"""

        # Initialize threat analysis structure
        threat_analysis = {
            "timestamp": datetime.now().isoformat(),
            "request_id": request_data.get("request_id"),
            "user_id": security_context.user_id,
            "threats_detected": [],
            "risk_level": "low",
            "recommended_actions": []
        }

We check for multiple threat patterns using specialized detection methods:

        # Multi-vector threat detection
        brute_force_risk = await self._detect_brute_force(security_context.user_id)
        if brute_force_risk["detected"]:
            threat_analysis["threats_detected"].append("brute_force_attempt")
            threat_analysis["risk_level"] = "high"

        prompt_injection_risk = await self._detect_prompt_injection(request_data.get("prompt", ""))
        if prompt_injection_risk["detected"]:
            threat_analysis["threats_detected"].append("prompt_injection")
            threat_analysis["risk_level"] = max(threat_analysis["risk_level"], "medium")

        behavioral_risk = await self._detect_behavioral_anomalies(request_data, security_context)
        if behavioral_risk["detected"]:
            threat_analysis["threats_detected"].append("behavioral_anomaly")
            threat_analysis["risk_level"] = max(threat_analysis["risk_level"], "medium")

Finally, we determine appropriate response actions based on the threat level:

        # Determine recommended response actions
        if threat_analysis["threats_detected"]:
            threat_analysis["recommended_actions"] = self.threat_patterns["response_actions"].get(
                threat_analysis["risk_level"], ["log_event"]
            )

        return threat_analysis

Prompt injection detection identifies attempts to manipulate agent behavior:

    async def _detect_prompt_injection(self, prompt: str) -> Dict[str, Any]:
        """Detect potential prompt injection attacks"""

        # Common prompt injection patterns
        injection_patterns = [
            "ignore previous instructions",
            "forget your role", 
            "you are now",
            "system:",
            "assistant:",
            "user:",
            "<script>",
            "eval(",
            "execute",
            "rm -rf",
            "DROP TABLE"
        ]

        prompt_lower = prompt.lower()
        detected_patterns = []

        for pattern in injection_patterns:
            if pattern.lower() in prompt_lower:
                detected_patterns.append(pattern)

        return {
            "detected": len(detected_patterns) > 0,
            "patterns": detected_patterns,
            "confidence": min(len(detected_patterns) * 0.3, 1.0)
        }

Behavioral anomaly detection identifies unusual patterns that may indicate compromise:

    async def _detect_behavioral_anomalies(self, request_data: Dict[str, Any],
                                         security_context: SecurityContext) -> Dict[str, Any]:
        """Detect anomalous user behavior patterns"""

        user_id = security_context.user_id
        current_time = datetime.now()
        request_rate = self._calculate_recent_request_rate(user_id)

        anomalies = []

        # Request rate anomaly detection
        if request_rate > 100:  # More than 100 requests per minute
            anomalies.append("high_request_rate")

        # Temporal anomaly detection
        if current_time.hour < 6 or current_time.hour > 22:  # Outside normal hours
            anomalies.append("unusual_access_time")

        # Geographic anomaly detection
        user_ip = request_data.get("source_ip", "")
        if self._is_unusual_geographic_location(user_id, user_ip):
            anomalies.append("geographic_anomaly")

        return {
            "detected": len(anomalies) > 0,
            "anomalies": anomalies,
            "risk_score": len(anomalies) * 0.3
        }

Part 2: Compliance and Audit Framework (15 minutes)

Regulatory Compliance System

🗂️ File: src/session8/compliance_framework.py - Comprehensive compliance management

We begin by establishing the compliance framework structure with support for multiple regulatory standards:

from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
import logging
from enum import Enum

The ComplianceFramework enum defines the regulatory standards our system supports:

class ComplianceFramework(Enum):
    """Supported compliance frameworks"""
    GDPR = "gdpr"
    HIPAA = "hipaa"
    SOC2 = "soc2"
    ISO27001 = "iso27001"
    PCI_DSS = "pci_dss"
    CCPA = "ccpa"

The DataSubject class manages individual privacy rights and consent tracking:

@dataclass
class DataSubject:
    """Data subject for privacy compliance"""
    subject_id: str
    subject_type: str  # user, patient, customer
    consent_records: List[Dict[str, Any]] = field(default_factory=list)
    data_retention_policy: Optional[str] = None
    deletion_requests: List[Dict[str, Any]] = field(default_factory=list)

The ComplianceManager orchestrates all compliance activities across frameworks:

class ComplianceManager:
    """Comprehensive compliance management system"""

    def __init__(self):
        self.compliance_policies = {}
        self.audit_trail = []
        self.data_subjects = {}
        self.retention_policies = {}
        self.logger = logging.getLogger("compliance")

The setup_compliance_frameworks method configures policies for multiple regulatory standards. First, let's establish GDPR compliance:

    def setup_compliance_frameworks(self) -> Dict[str, Any]:
        """Configure multiple compliance frameworks"""

        # GDPR configuration for EU data protection
        gdpr_config = {
            "enabled": True,
            "data_controller": "Your Company Ltd",
            "dpo_contact": "dpo@company.com",
            "lawful_basis": "consent",
            "retention_period_months": 24,
            "rights_supported": [
                "access", "rectification", "erasure", 
                "portability", "restrict_processing", "object"
            ],
            "consent_management": {
                "explicit_consent_required": True,
                "consent_withdrawal": True,
                "consent_records_retention": 84  # months
            },
            "privacy_by_design": {
                "data_minimization": True,
                "purpose_limitation": True,
                "storage_limitation": True,
                "pseudonymization": True
            }
        }

HIPAA configuration protects healthcare information with specific security requirements:

        # HIPAA configuration for healthcare data protection
        hipaa_config = {
            "enabled": True,
            "covered_entity": "Healthcare Provider Inc",
            "business_associate_agreements": True,
            "minimum_necessary": True,
            "phi_safeguards": {
                "access_controls": True,
                "audit_controls": True,
                "integrity": True,
                "transmission_security": True
            },
            "breach_notification": {
                "breach_threshold": 500,  # individuals affected
                "notification_timeline_hours": 72,
                "hhs_notification_days": 60
            }
        }

SOC2 configuration establishes trust service criteria for operational security:

        # SOC2 configuration for service organization controls
        soc2_config = {
            "enabled": True,
            "trust_service_criteria": [
                "security", "availability", "processing_integrity",
                "confidentiality", "privacy"
            ],
            "control_objectives": {
                "logical_access": True,
                "system_operations": True,
                "change_management": True,
                "risk_mitigation": True
            },
            "audit_requirements": {
                "continuous_monitoring": True,
                "annual_assessment": True,
                "third_party_audit": True
            }
        }

        # Combine all compliance configurations
        compliance_config = {
            "gdpr": gdpr_config,
            "hipaa": hipaa_config,
            "soc2": soc2_config
        }

        self.compliance_policies = compliance_config
        return compliance_config

Data subject request processing implements GDPR rights with comprehensive logging:

    async def process_data_subject_request(self, request_type: str, 
                                         subject_id: str,
                                         request_details: Dict[str, Any]) -> Dict[str, Any]:
        """Process data subject rights requests (GDPR Article 12-22)"""

        # Generate unique request identifier
        request_id = f"DSR_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{subject_id}"

        # Create audit trail for compliance
        await self._log_compliance_event("data_subject_request", {
            "request_id": request_id,
            "request_type": request_type,
            "subject_id": subject_id,
            "timestamp": datetime.now().isoformat()
        })

Request routing directs to appropriate handlers based on GDPR articles:

        # Route to appropriate request handler
        if request_type == "access":  # Right of Access (Article 15)
            return await self._process_access_request(subject_id, request_id)
        elif request_type == "rectification":  # Right to Rectification (Article 16)
            return await self._process_rectification_request(subject_id, request_details, request_id)
        elif request_type == "erasure":  # Right to Erasure (Article 17)
            return await self._process_erasure_request(subject_id, request_id)
        elif request_type == "portability":  # Right to Data Portability (Article 20)
            return await self._process_portability_request(subject_id, request_id)
        else:
            return {
                "request_id": request_id,
                "status": "unsupported",
                "message": f"Request type {request_type} not supported"
            }

The access request handler implements GDPR Article 15 requirements comprehensively:

    async def _process_access_request(self, subject_id: str, request_id: str) -> Dict[str, Any]:
        """Process GDPR Article 15 - Right of Access"""

        # Collect all personal data for the subject
        personal_data = await self._collect_personal_data(subject_id)

        # Prepare GDPR-compliant structured response
        access_response = {
            "request_id": request_id,
            "subject_id": subject_id,
            "status": "completed",
            "data_collected": {
                "processing_purposes": personal_data.get("purposes", []),
                "categories_of_data": personal_data.get("categories", []),
                "recipients": personal_data.get("recipients", []),
                "retention_period": personal_data.get("retention_period"),
                "rights_information": [
                    "rectification", "erasure", "restrict_processing",
                    "object", "portability", "withdraw_consent"
                ],
                "data_source": personal_data.get("source", "directly_provided"),
                "automated_decision_making": personal_data.get("automated_decisions", False)
            },
            "personal_data": personal_data.get("data", {}),
            "response_format": "structured_json",
            "generated_at": datetime.now().isoformat()
        }

        # Ensure GDPR timeline compliance (1 month)
        response_deadline = datetime.now() + timedelta(days=30)
        access_response["response_deadline"] = response_deadline.isoformat()

        return access_response

Erasure request processing implements the "right to be forgotten" with legal safeguards:

    async def _process_erasure_request(self, subject_id: str, request_id: str) -> Dict[str, Any]:
        """Process GDPR Article 17 - Right to Erasure"""

        # Assess legal feasibility of erasure
        erasure_assessment = await self._assess_erasure_feasibility(subject_id)

        if not erasure_assessment["can_erase"]:
            return {
                "request_id": request_id,
                "status": "denied",
                "reason": erasure_assessment["reason"],
                "legal_basis": erasure_assessment["legal_basis"]
            }

When erasure is legally permissible, we execute comprehensive data removal:

        # Execute erasure with comprehensive tracking
        erasure_results = await self._perform_data_erasure(subject_id)

        # Handle third-party notification requirements
        if erasure_results["third_party_notification_required"]:
            await self._notify_third_parties_of_erasure(subject_id, erasure_results["shared_with"])

        return {
            "request_id": request_id,
            "status": "completed",
            "erasure_completed_at": datetime.now().isoformat(),
            "data_erased": erasure_results["erased_categories"],
            "systems_affected": erasure_results["systems"],
            "third_parties_notified": erasure_results.get("third_parties_notified", [])
        }

Data retention policy implementation ensures automated compliance with regulatory timelines:

    async def implement_data_retention_policies(self) -> Dict[str, Any]:
        """Implement automated data retention and disposal"""

        retention_results = {
            "processed_at": datetime.now().isoformat(),
            "policies_applied": [],
            "data_disposed": [],
            "errors": []
        }

        # Apply retention policies for each enabled framework
        for framework, config in self.compliance_policies.items():
            if not config.get("enabled"):
                continue

            if framework == "gdpr":
                gdpr_results = await self._apply_gdpr_retention_policy(config)
                retention_results["policies_applied"].append(gdpr_results)

            elif framework == "hipaa":
                hipaa_results = await self._apply_hipaa_retention_policy(config)
                retention_results["policies_applied"].append(hipaa_results)

        return retention_results

GDPR retention policy implementation includes automated data lifecycle management:

    async def _apply_gdpr_retention_policy(self, gdpr_config: Dict[str, Any]) -> Dict[str, Any]:
        """Apply GDPR-specific retention policies"""

        retention_period = gdpr_config.get("retention_period_months", 24)
        cutoff_date = datetime.now() - timedelta(days=retention_period * 30)

        # Identify data eligible for deletion
        eligible_data = await self._find_data_older_than(cutoff_date)

        deletion_results = {
            "framework": "gdpr",
            "retention_period_months": retention_period,
            "cutoff_date": cutoff_date.isoformat(),
            "records_identified": len(eligible_data),
            "records_deleted": 0,
            "errors": []
        }

Each data record is evaluated for legal retention requirements before deletion:

        # Process each eligible record with legal compliance checks
        for data_record in eligible_data:
            try:
                # Legal basis assessment for retention
                if await self._has_legal_basis_to_retain(data_record):
                    continue

                # Execute secure deletion
                await self._delete_data_record(data_record)
                deletion_results["records_deleted"] += 1

            except Exception as e:
                deletion_results["errors"].append({
                    "record_id": data_record.get("id"),
                    "error": str(e)
                })

        return deletion_results

Compliance reporting generates comprehensive assessments for regulatory audits:

    async def generate_compliance_report(self, framework: ComplianceFramework,
                                       report_period_days: int = 30) -> Dict[str, Any]:
        """Generate comprehensive compliance reports"""

        end_date = datetime.now()
        start_date = end_date - timedelta(days=report_period_days)

        # Initialize comprehensive report structure
        report = {
            "framework": framework.value,
            "report_period": {
                "start_date": start_date.isoformat(),
                "end_date": end_date.isoformat(),
                "period_days": report_period_days
            },
            "compliance_status": "compliant",  # Default, will be updated
            "metrics": {},
            "violations": [],
            "recommendations": [],
            "generated_at": datetime.now().isoformat()
        }

Framework-specific report generation handles unique compliance requirements:

        # Generate framework-specific compliance analysis
        if framework == ComplianceFramework.GDPR:
            report.update(await self._generate_gdpr_report(start_date, end_date))
        elif framework == ComplianceFramework.HIPAA:
            report.update(await self._generate_hipaa_report(start_date, end_date))
        elif framework == ComplianceFramework.SOC2:
            report.update(await self._generate_soc2_report(start_date, end_date))

        return report

GDPR-specific compliance reporting tracks key performance indicators:

    async def _generate_gdpr_report(self, start_date: datetime, 
                                  end_date: datetime) -> Dict[str, Any]:
        """Generate GDPR-specific compliance report"""

        # Collect comprehensive GDPR metrics
        gdpr_metrics = {
            "data_subject_requests": await self._count_dsr_by_type(start_date, end_date),
            "consent_records": await self._count_consent_activities(start_date, end_date),
            "data_breaches": await self._count_data_breaches(start_date, end_date),
            "retention_compliance": await self._assess_retention_compliance(),
            "third_party_transfers": await self._count_international_transfers(start_date, end_date)
        }

Violation assessment identifies compliance gaps requiring immediate attention:

        # Comprehensive compliance violation assessment
        violations = []

        # Data Subject Request timeline compliance
        overdue_dsrs = await self._find_overdue_dsrs()
        if overdue_dsrs:
            violations.append({
                "type": "dsr_response_time",
                "severity": "high", 
                "description": f"{len(overdue_dsrs)} data subject requests overdue",
                "remediation": "Process overdue requests immediately"
            })

        # Consent management compliance
        invalid_consents = await self._find_invalid_consents()
        if invalid_consents:
            violations.append({
                "type": "invalid_consent",
                "severity": "medium",
                "description": f"{len(invalid_consents)} invalid consent records found",
                "remediation": "Update consent records and re-obtain consent where necessary"
            })

        return {
            "gdpr_metrics": gdpr_metrics,
            "violations": violations,
            "compliance_status": "non_compliant" if violations else "compliant"
        }

The AuditTrailManager ensures tamper-evident logging for compliance and forensics:

class AuditTrailManager:
    """Comprehensive audit trail management"""

    def __init__(self):
        self.audit_events = []
        self.integrity_hashes = {}
        self.logger = logging.getLogger("audit")

Audit event logging creates immutable records with integrity protection:

    async def log_audit_event(self, event_type: str, 
                            event_data: Dict[str, Any],
                            security_context: Optional[Dict[str, Any]] = None):
        """Log auditable events with integrity protection"""

        # Create comprehensive audit entry
        audit_entry = {
            "event_id": self._generate_event_id(),
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "event_data": event_data,
            "security_context": security_context or {},
            "system_context": {
                "service_version": "1.0.0",
                "environment": "production",
                "node_id": "agent-node-1"
            }
        }

Integrity protection prevents audit log tampering:

        # Add tamper-evident integrity protection
        audit_entry["integrity_hash"] = self._calculate_integrity_hash(audit_entry)

        # Store and transmit audit entry
        self.audit_events.append(audit_entry)
        await self._send_to_audit_system(audit_entry)

Utility methods support audit infrastructure with unique identifiers and integrity verification:

    def _generate_event_id(self) -> str:
        """Generate unique audit event ID"""
        import uuid
        return str(uuid.uuid4())

    def _calculate_integrity_hash(self, audit_entry: Dict[str, Any]) -> str:
        """Calculate integrity hash for audit entry"""

        # Remove the hash field itself from calculation
        entry_copy = {k: v for k, v in audit_entry.items() if k != "integrity_hash"}
        entry_json = json.dumps(entry_copy, sort_keys=True)

        import hashlib
        return hashlib.sha256(entry_json.encode()).hexdigest()

Part 3: Privacy and Data Protection (10 minutes)

Advanced Privacy Controls

🗂️ File: src/session8/privacy_protection.py - Privacy-preserving agent systems

The privacy protection system implements advanced techniques to protect personal data while maintaining system functionality:

from typing import Dict, List, Any, Optional
import hashlib
import json
from datetime import datetime, timedelta

The PrivacyPreservingAgentSystem coordinates multiple privacy-preserving techniques:

class PrivacyPreservingAgentSystem:
    """Privacy-preserving techniques for agent operations"""

    def __init__(self):
        self.anonymization_techniques = {}
        self.pseudonymization_keys = {}
        self.differential_privacy_params = {}

Privacy technique configuration establishes multiple protection mechanisms:

    def setup_privacy_techniques(self) -> Dict[str, Any]:
        """Configure privacy-preserving techniques"""

        # Data anonymization configuration using proven techniques
        anonymization_config = {
            "enabled": True,
            "techniques": ["k_anonymity", "l_diversity", "t_closeness"],
            "k_value": 5,  # k-anonymity parameter
            "l_value": 2,  # l-diversity parameter
            "t_value": 0.2  # t-closeness parameter
        }

Differential privacy provides mathematical privacy guarantees:

        # Differential privacy for statistical queries
        differential_privacy_config = {
            "enabled": True,
            "epsilon": 1.0,  # Privacy budget
            "delta": 1e-5,   # Failure probability
            "noise_mechanism": "laplace"
        }

Pseudonymization and encryption techniques protect individual identities:

        # Pseudonymization for identity protection
        pseudonymization_config = {
            "enabled": True,
            "key_rotation_days": 90,
            "deterministic": False,  # Use random pseudonyms
            "format_preserving": True
        }

        # Homomorphic encryption for computation on encrypted data
        homomorphic_config = {
            "enabled": False,  # Computationally expensive
            "scheme": "ckks",  # For approximate computations
            "key_size": 4096
        }

        privacy_config = {
            "data_anonymization": anonymization_config,
            "differential_privacy": differential_privacy_config,
            "pseudonymization": pseudonymization_config,
            "homomorphic_encryption": homomorphic_config
        }

        return privacy_config

Dataset anonymization applies formal privacy models to protect individual records:

    def anonymize_dataset(self, dataset: List[Dict[str, Any]], 
                         sensitive_attributes: List[str]) -> Dict[str, Any]:
        """Apply k-anonymity and l-diversity to dataset"""

        # Multi-step anonymization process:
        # 1. Identify quasi-identifiers
        # 2. Apply generalization and suppression
        # 3. Ensure k-anonymity constraint
        # 4. Apply l-diversity for sensitive attributes

        anonymized_data = []
        for record in dataset:
            anonymized_record = self._anonymize_record(record, sensitive_attributes)
            anonymized_data.append(anonymized_record)

The anonymization result includes privacy metrics for verification:

        return {
            "original_records": len(dataset),
            "anonymized_records": len(anonymized_data),
            "anonymization_applied": True,
            "data": anonymized_data,
            "privacy_metrics": {
                "k_anonymity": self._calculate_k_anonymity(anonymized_data),
                "l_diversity": self._calculate_l_diversity(anonymized_data, sensitive_attributes)
            }
        }

Differential privacy implementation adds calibrated noise to protect individual privacy:

    def apply_differential_privacy(self, query_result: float, 
                                 sensitivity: float = 1.0,
                                 epsilon: float = 1.0) -> float:
        """Apply differential privacy to query results"""

        import random
        import math

        # Laplace mechanism for (ε,0)-differential privacy
        scale = sensitivity / epsilon
        noise = random.laplace(0, scale)

        return query_result + noise

Record-level anonymization applies generalization and suppression techniques:

    def _anonymize_record(self, record: Dict[str, Any], 
                         sensitive_attrs: List[str]) -> Dict[str, Any]:
        """Anonymize individual record"""

        anonymized = record.copy()

        # Age generalization to reduce identifiability
        if "age" in record:
            age = record["age"]
            if age < 25:
                anonymized["age_group"] = "18-24"
            elif age < 35:
                anonymized["age_group"] = "25-34"
            elif age < 45:
                anonymized["age_group"] = "35-44"
            else:
                anonymized["age_group"] = "45+"
            del anonymized["age"]

Geographic and identifier anonymization protects location privacy:

        # Location generalization to broader regions
        if "zip_code" in record:
            zip_code = str(record["zip_code"])
            anonymized["region"] = zip_code[:3] + "**"
            del anonymized["zip_code"]

        # Direct identifier handling with pseudonymization
        identifiers = ["name", "email", "phone", "ssn"]
        for identifier in identifiers:
            if identifier in anonymized:
                if identifier in sensitive_attrs:
                    anonymized[identifier] = self._pseudonymize_value(anonymized[identifier])
                else:
                    del anonymized[identifier]

        return anonymized

Module Summary

You've now mastered security and compliance for enterprise Agno systems:

Zero-Trust Architecture: Implemented comprehensive authentication, authorization, and threat detection
Data Encryption: Built end-to-end encryption with key management and secure data handling
Compliance Framework: Created GDPR, HIPAA, and SOC2 compliance systems with automated reporting
Privacy Protection: Implemented advanced privacy-preserving techniques including anonymization and differential privacy
Audit Trail: Designed comprehensive audit logging with integrity protection and compliance monitoring

Next Steps


📝 Multiple Choice Test - Module D

Test your understanding of Security & Compliance for enterprise agent systems:

Question 1: What is the core principle of zero-trust security architecture? A) Trust all internal network traffic by default
B) Never trust, always verify every request regardless of source
C) Only authenticate users once per session
D) Allow unrestricted access within the enterprise network

Question 2: Which GDPR article governs the "Right to Erasure" (right to be forgotten)? A) Article 15 - Right of Access
B) Article 16 - Right to Rectification
C) Article 17 - Right to Erasure
D) Article 20 - Right to Data Portability

Question 3: What does differential privacy add to query results to protect individual privacy? A) Encryption with rotating keys
B) Calibrated mathematical noise using Laplace mechanism
C) Complete data anonymization
D) Access control restrictions

Question 4: In the threat detection system, what response action is recommended for "high_risk" threats? A) log_event, monitor_closely
B) rate_limit, require_additional_auth
C) block_temporarily, alert_security_team
D) block_permanently, emergency_response

Question 5: What privacy-preserving technique ensures k-anonymity in datasets? A) Differential privacy with epsilon budget
B) Homomorphic encryption of sensitive fields
C) Generalization and suppression of quasi-identifiers
D) JWT token-based authentication

🗂️ View Test Solutions →


🗂️ Source Files for Module D: - src/session8/security_architecture.py - Zero-trust security implementation - src/session8/compliance_framework.py - Regulatory compliance management - src/session8/privacy_protection.py - Privacy-preserving techniques