Skip to content

⚙️ Session 5 Advanced: Security Monitoring & Audit Systems

⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer and 📝 Participant paths Time Investment: 3-4 hours Outcome: Master comprehensive security monitoring and audit systems

Advanced Learning Outcomes

After completing this module, you will master:

  • Real-time security event detection and alerting
  • Comprehensive audit logging with tamper-proof storage
  • Advanced threat detection using machine learning
  • Compliance monitoring and automated reporting

Enterprise Security Monitoring Architecture

Think of security monitoring as having an army of digital sentries who never sleep, never blink, and remember everything. They watch every entrance, monitor every transaction, and instantly alert you to any suspicious activity.

Real-Time Security Event Detection

The security monitoring system processes events in real-time to detect threats as they emerge:

# src/security/monitoring.py

import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass, asdict
from enum import Enum
import hashlib
import hmac
import redis
from kafka import KafkaProducer, KafkaConsumer
import pandas as pd
from sklearn.ensemble import IsolationForest
import numpy as np

logger = logging.getLogger(__name__)

class SecurityEventType(Enum):
    """Types of security events we monitor."""
    AUTHENTICATION_FAILURE = "auth_failure"
    AUTHORIZATION_DENIED = "authz_denied"
    RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"
    SUSPICIOUS_PATTERN = "suspicious_pattern"
    TOKEN_MISUSE = "token_misuse"
    PRIVILEGE_ESCALATION = "privilege_escalation"
    DATA_EXFILTRATION = "data_exfiltration"
    SYSTEM_ANOMALY = "system_anomaly"

class SecuritySeverity(Enum):
    """Security event severity levels."""
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

@dataclass
class SecurityEvent:
    """Comprehensive security event record."""
    event_id: str
    event_type: SecurityEventType
    severity: SecuritySeverity
    timestamp: datetime
    source_ip: str
    user_id: Optional[str]
    session_id: Optional[str]
    resource: str
    action: str
    success: bool
    metadata: Dict[str, Any]
    risk_score: float
    detection_rules: List[str]

class SecurityMonitoringEngine:
    """Real-time security monitoring and threat detection engine."""

    def __init__(self, config: Dict[str, Any]):
        self.redis_client = redis.Redis(**config["redis"])
        self.kafka_producer = KafkaProducer(
            **config["kafka"],
            value_serializer=lambda x: json.dumps(x, default=str).encode('utf-8')
        )

        # Monitoring configuration
        self.event_retention_days = config.get("event_retention_days", 90)
        self.alert_thresholds = config.get("alert_thresholds", self._default_thresholds())
        self.ml_models = {}

        # Initialize threat detection models
        self._initialize_ml_models()

        # Real-time event processing
        self.event_processors = {
            SecurityEventType.AUTHENTICATION_FAILURE: self._process_auth_failure,
            SecurityEventType.RATE_LIMIT_EXCEEDED: self._process_rate_limit_violation,
            SecurityEventType.SUSPICIOUS_PATTERN: self._process_suspicious_pattern,
            SecurityEventType.TOKEN_MISUSE: self._process_token_misuse
        }

The architecture uses Kafka for real-time event streaming and Redis for fast pattern detection.

Advanced Threat Detection Using Machine Learning

Machine learning models analyze behavior patterns to detect sophisticated attacks:

    def _initialize_ml_models(self):
        """Initialize machine learning models for threat detection."""

        # Anomaly detection model for user behavior
        self.ml_models["user_behavior"] = IsolationForest(
            contamination=0.1,  # Expect 10% anomalous behavior
            random_state=42
        )

        # Pattern detection for request sequences
        self.ml_models["request_patterns"] = IsolationForest(
            contamination=0.05,  # More conservative for request patterns
            random_state=42
        )

        # Initialize with baseline training data
        asyncio.create_task(self._train_baseline_models())

    async def _train_baseline_models(self):
        """Train ML models with historical baseline data."""
        try:
            # Get historical normal behavior data
            baseline_data = await self._get_baseline_training_data()

            if baseline_data and len(baseline_data) > 100:  # Need sufficient data
                # Train user behavior model
                user_features = self._extract_user_behavior_features(baseline_data)
                if len(user_features) > 50:
                    self.ml_models["user_behavior"].fit(user_features)
                    logger.info("User behavior anomaly model trained successfully")

                # Train request pattern model
                request_features = self._extract_request_pattern_features(baseline_data)
                if len(request_features) > 50:
                    self.ml_models["request_patterns"].fit(request_features)
                    logger.info("Request pattern anomaly model trained successfully")

        except Exception as e:
            logger.error(f"Failed to train baseline ML models: {e}")

    async def process_security_event(self, raw_event: Dict[str, Any]) -> SecurityEvent:
        """Process incoming security event with comprehensive analysis."""

        # Step 1: Create structured security event
        security_event = self._create_security_event(raw_event)

        # Step 2: Calculate risk score using multiple algorithms
        risk_score = await self._calculate_comprehensive_risk_score(security_event)
        security_event.risk_score = risk_score

        # Step 3: Apply ML-based threat detection
        ml_insights = await self._apply_ml_threat_detection(security_event)
        security_event.metadata.update(ml_insights)

        # Step 4: Check against known attack patterns
        pattern_matches = await self._check_attack_patterns(security_event)
        security_event.detection_rules.extend(pattern_matches)

        # Step 5: Store event for analysis and alerting
        await self._store_security_event(security_event)

        # Step 6: Generate alerts if thresholds exceeded
        await self._evaluate_alert_conditions(security_event)

        # Step 7: Update threat intelligence
        await self._update_threat_intelligence(security_event)

        return security_event

Multi-layered analysis combines rule-based detection with machine learning for comprehensive threat identification.

Comprehensive Risk Score Calculation

The risk scoring system evaluates multiple factors to assess threat levels:

    async def _calculate_comprehensive_risk_score(self, event: SecurityEvent) -> float:
        """Calculate comprehensive risk score for security event."""

        base_scores = {
            SecurityEventType.AUTHENTICATION_FAILURE: 3.0,
            SecurityEventType.AUTHORIZATION_DENIED: 4.0,
            SecurityEventType.RATE_LIMIT_EXCEEDED: 2.0,
            SecurityEventType.TOKEN_MISUSE: 8.0,
            SecurityEventType.PRIVILEGE_ESCALATION: 9.0,
            SecurityEventType.DATA_EXFILTRATION: 10.0
        }

        base_score = base_scores.get(event.event_type, 1.0)

        # Factor 1: IP reputation and geolocation risk
        ip_risk = await self._calculate_ip_risk(event.source_ip)

        # Factor 2: User behavior deviation
        user_risk = await self._calculate_user_behavior_risk(event.user_id, event)

        # Factor 3: Temporal patterns (time-based anomalies)
        temporal_risk = self._calculate_temporal_risk(event.timestamp)

        # Factor 4: Resource sensitivity
        resource_risk = await self._calculate_resource_sensitivity_risk(event.resource)

        # Factor 5: Historical context
        historical_risk = await self._calculate_historical_context_risk(event)

        # Weighted composite score
        composite_score = (
            base_score * 0.3 +           # 30% base event type
            ip_risk * 0.2 +              # 20% IP reputation
            user_risk * 0.25 +           # 25% user behavior
            temporal_risk * 0.1 +        # 10% timing anomalies
            resource_risk * 0.1 +        # 10% resource sensitivity
            historical_risk * 0.05       # 5% historical context
        )

        # Normalize to 0-10 scale
        normalized_score = min(10.0, max(0.0, composite_score))

        return round(normalized_score, 2)

    async def _calculate_ip_risk(self, ip_address: str) -> float:
        """Calculate risk score based on IP address characteristics."""

        try:
            # Check IP reputation databases
            reputation_score = await self._check_ip_reputation(ip_address)

            # Check geolocation risk
            geo_risk = await self._calculate_geolocation_risk(ip_address)

            # Check if IP is from known bad networks
            network_risk = await self._check_network_reputation(ip_address)

            # Historical behavior of this IP
            historical_ip_risk = await self._calculate_ip_historical_risk(ip_address)

            return min(10.0, reputation_score + geo_risk + network_risk + historical_ip_risk)

        except Exception as e:
            logger.warning(f"IP risk calculation failed for {ip_address}: {e}")
            return 1.0  # Default low risk if calculation fails

    async def _calculate_user_behavior_risk(self, user_id: Optional[str],
                                          event: SecurityEvent) -> float:
        """Calculate risk based on user behavior patterns."""

        if not user_id:
            return 2.0  # Anonymous users get moderate risk

        try:
            # Get recent user behavior history
            behavior_history = await self._get_user_behavior_history(user_id, hours=24)

            if not behavior_history:
                return 1.0  # New users get low risk initially

            # Extract behavior features
            features = self._extract_user_behavior_features([{
                "user_id": user_id,
                "timestamp": event.timestamp,
                "resource": event.resource,
                "action": event.action,
                "source_ip": event.source_ip,
                "success": event.success
            }])

            # Use ML model to detect anomalies
            if "user_behavior" in self.ml_models and features:
                anomaly_score = self.ml_models["user_behavior"].score_samples([features[0]])
                # Convert to risk score (lower anomaly score = higher risk)
                risk_score = max(0, (1.0 - anomaly_score[0]) * 10)
                return min(10.0, risk_score)

            # Fallback: rule-based behavior analysis
            return self._calculate_rule_based_behavior_risk(behavior_history, event)

        except Exception as e:
            logger.warning(f"User behavior risk calculation failed for {user_id}: {e}")
            return 1.0

The multi-factor risk calculation provides nuanced threat assessment beyond simple rule matching.

Advanced Attack Pattern Detection

Detect sophisticated attack patterns across multiple events and timeframes:

class AttackPatternDetector:
    """Advanced detection of complex attack patterns."""

    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.pattern_definitions = self._load_attack_patterns()

    def _load_attack_patterns(self) -> Dict[str, Dict]:
        """Load attack pattern definitions."""
        return {
            "credential_stuffing": {
                "description": "Automated credential testing across multiple accounts",
                "indicators": [
                    {"event_type": "auth_failure", "min_count": 10, "time_window": 300},
                    {"source_diversity": {"min_users": 5, "max_success_rate": 0.1}},
                    {"temporal_pattern": {"regular_intervals": True, "burst_behavior": True}}
                ],
                "severity": SecuritySeverity.HIGH,
                "confidence_threshold": 0.8
            },

            "privilege_escalation_chain": {
                "description": "Sequential privilege escalation attempts",
                "indicators": [
                    {"event_sequence": ["authz_denied", "auth_failure", "authz_denied"]},
                    {"resource_progression": {"increasing_sensitivity": True}},
                    {"time_correlation": {"max_interval": 600}}
                ],
                "severity": SecuritySeverity.CRITICAL,
                "confidence_threshold": 0.9
            },

            "data_exfiltration_pattern": {
                "description": "Systematic data access and extraction",
                "indicators": [
                    {"resource_access": {"volume_threshold": 100, "diversity_threshold": 10}},
                    {"temporal_pattern": {"sustained_activity": True, "off_hours": True}},
                    {"data_sensitivity": {"high_value_resources": True}}
                ],
                "severity": SecuritySeverity.CRITICAL,
                "confidence_threshold": 0.85
            }
        }

    async def detect_attack_patterns(self, event: SecurityEvent) -> List[Dict[str, Any]]:
        """Detect complex attack patterns involving current event."""

        detected_patterns = []

        for pattern_name, pattern_def in self.pattern_definitions.items():
            try:
                # Check if event could be part of this attack pattern
                if self._event_matches_pattern_context(event, pattern_def):

                    # Analyze pattern indicators
                    confidence = await self._analyze_pattern_indicators(event, pattern_def)

                    if confidence >= pattern_def["confidence_threshold"]:
                        detected_patterns.append({
                            "pattern_name": pattern_name,
                            "description": pattern_def["description"],
                            "confidence": confidence,
                            "severity": pattern_def["severity"],
                            "indicators_matched": await self._get_matched_indicators(event, pattern_def),
                            "recommended_actions": self._get_recommended_actions(pattern_name)
                        })

            except Exception as e:
                logger.error(f"Pattern detection failed for {pattern_name}: {e}")

        return detected_patterns

    async def _analyze_pattern_indicators(self, event: SecurityEvent,
                                        pattern_def: Dict) -> float:
        """Analyze how well event matches pattern indicators."""

        total_score = 0.0
        max_score = 0.0

        for indicator in pattern_def["indicators"]:
            indicator_score, indicator_max = await self._evaluate_indicator(event, indicator)
            total_score += indicator_score
            max_score += indicator_max

        confidence = (total_score / max_score) if max_score > 0 else 0.0
        return min(1.0, confidence)

    async def _evaluate_indicator(self, event: SecurityEvent,
                                 indicator: Dict) -> tuple[float, float]:
        """Evaluate specific pattern indicator."""

        if "event_type" in indicator:
            return await self._evaluate_event_type_indicator(event, indicator)
        elif "source_diversity" in indicator:
            return await self._evaluate_source_diversity_indicator(event, indicator)
        elif "temporal_pattern" in indicator:
            return await self._evaluate_temporal_pattern_indicator(event, indicator)
        elif "event_sequence" in indicator:
            return await self._evaluate_sequence_indicator(event, indicator)
        elif "resource_access" in indicator:
            return await self._evaluate_resource_access_indicator(event, indicator)

        return 0.0, 1.0  # Default: no match, full weight

Sophisticated pattern detection identifies multi-stage attacks that span multiple events and timeframes.

Tamper-Proof Audit Logging

Implement cryptographically secure audit logging that ensures integrity:

class TamperProofAuditLogger:
    """Cryptographically secure audit logging system."""

    def __init__(self, config: Dict[str, Any]):
        self.signing_key = config["audit_signing_key"].encode()
        self.encryption_key = config.get("audit_encryption_key", "").encode()
        self.storage_backend = config["storage_backend"]
        self.chain_integrity = True

        # Initialize audit chain
        self.last_hash = self._get_last_audit_hash()

    async def log_audit_event(self, event_type: str, details: Dict[str, Any],
                             user_context: Optional[Dict[str, Any]] = None) -> str:
        """Log audit event with cryptographic integrity protection."""

        # Create comprehensive audit record
        audit_record = {
            "audit_id": self._generate_audit_id(),
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "details": details,
            "user_context": user_context,
            "system_context": self._get_system_context(),
            "previous_hash": self.last_hash
        }

        # Calculate integrity hash
        record_hash = self._calculate_record_hash(audit_record)
        audit_record["record_hash"] = record_hash

        # Create digital signature
        signature = self._create_digital_signature(audit_record)
        audit_record["signature"] = signature

        # Encrypt sensitive data if required
        if self.encryption_key and self._should_encrypt(event_type):
            audit_record = self._encrypt_sensitive_data(audit_record)

        # Store with integrity checking
        storage_result = await self._store_audit_record(audit_record)

        # Update chain hash for next record
        self.last_hash = record_hash

        # Verify chain integrity periodically
        if self._should_verify_chain():
            asyncio.create_task(self._verify_audit_chain_integrity())

        return audit_record["audit_id"]

    def _calculate_record_hash(self, record: Dict[str, Any]) -> str:
        """Calculate cryptographic hash for audit record."""

        # Create canonical representation
        canonical_data = json.dumps(
            {k: v for k, v in record.items() if k not in ["record_hash", "signature"]},
            sort_keys=True,
            separators=(',', ':')
        )

        # Include previous hash for chaining
        hash_input = f"{canonical_data}:{record.get('previous_hash', '')}"

        # Calculate SHA-256 hash
        return hashlib.sha256(hash_input.encode()).hexdigest()

    def _create_digital_signature(self, record: Dict[str, Any]) -> str:
        """Create HMAC signature for audit record."""

        # Create signature over the complete record including hash
        message = json.dumps(record, sort_keys=True, separators=(',', ':')).encode()
        signature = hmac.new(self.signing_key, message, hashlib.sha256).hexdigest()

        return signature

    async def verify_audit_record(self, audit_id: str) -> Dict[str, Any]:
        """Verify integrity of specific audit record."""

        try:
            # Retrieve record from storage
            record = await self._retrieve_audit_record(audit_id)
            if not record:
                return {"valid": False, "reason": "record_not_found"}

            # Verify digital signature
            stored_signature = record.pop("signature", "")
            calculated_signature = self._create_digital_signature(record)

            if not hmac.compare_digest(stored_signature, calculated_signature):
                return {"valid": False, "reason": "signature_verification_failed"}

            # Verify record hash
            stored_hash = record.get("record_hash", "")
            calculated_hash = self._calculate_record_hash(record)

            if stored_hash != calculated_hash:
                return {"valid": False, "reason": "hash_verification_failed"}

            return {
                "valid": True,
                "audit_id": audit_id,
                "timestamp": record.get("timestamp"),
                "verification_time": datetime.utcnow().isoformat()
            }

        except Exception as e:
            logger.error(f"Audit verification failed for {audit_id}: {e}")
            return {"valid": False, "reason": "verification_error", "error": str(e)}

Cryptographic integrity protection ensures audit logs cannot be tampered with undetected.

Compliance Monitoring and Automated Reporting

Generate compliance reports automatically for various security frameworks:

class ComplianceMonitor:
    """Automated compliance monitoring and reporting system."""

    def __init__(self, audit_logger: TamperProofAuditLogger):
        self.audit_logger = audit_logger
        self.compliance_frameworks = {
            "SOC2": self._load_soc2_requirements(),
            "ISO27001": self._load_iso27001_requirements(),
            "GDPR": self._load_gdpr_requirements(),
            "PCI_DSS": self._load_pci_dss_requirements()
        }

    async def generate_compliance_report(self, framework: str,
                                        time_range: str = "30d") -> Dict[str, Any]:
        """Generate comprehensive compliance report."""

        if framework not in self.compliance_frameworks:
            raise ValueError(f"Unsupported compliance framework: {framework}")

        requirements = self.compliance_frameworks[framework]

        # Analyze compliance for each requirement
        compliance_results = {}
        overall_score = 0.0

        for req_id, requirement in requirements.items():
            result = await self._assess_compliance_requirement(requirement, time_range)
            compliance_results[req_id] = result
            overall_score += result["score"]

        overall_score = overall_score / len(requirements) if requirements else 0

        # Generate executive summary
        summary = self._generate_compliance_summary(compliance_results, overall_score)

        # Create detailed report
        report = {
            "framework": framework,
            "generation_time": datetime.utcnow().isoformat(),
            "time_range": time_range,
            "overall_score": round(overall_score, 2),
            "compliance_status": "COMPLIANT" if overall_score >= 0.85 else "NON_COMPLIANT",
            "executive_summary": summary,
            "detailed_results": compliance_results,
            "recommendations": self._generate_compliance_recommendations(compliance_results)
        }

        return report

Automated compliance monitoring ensures continuous adherence to security frameworks.


Previous: Session 4 - Team Orchestration →
Next: Session 6 - Modular Architecture →