Skip to content

Session 5 - Module B: Enterprise PydanticAI

⚠️ ADVANCED OPTIONAL MODULE Prerequisites: Complete Session 5 core content first.

Snowflake Data Platform Success

The Snowflake Data Cloud Revolution

When Snowflake faced declining profit margins due to inefficient data service deployments costing $4.2 billion annually in unnecessary cloud infrastructure, their engineering leadership recognized that architectural excellence would determine their competitive position in the data-first economy.

The complexity was overwhelming: 312 different data processing services across 67 global regions, serving 5.4 million enterprise customers with demands for 99.99% uptime. Traditional deployment patterns created maintenance nightmares, with data engineering teams spending 72% of their time on infrastructure issues rather than building valuable data products.

The transformation began with enterprise PydanticAI architecture.

Within 14 months of implementing sophisticated dependency injection, scalable architectures, and intelligent monitoring systems, Snowflake achieved unprecedented results:

  • $3.1 billion in annual cost savings through architectural optimization of data processing services
  • 12X performance improvement in data warehouse query response times
  • 99.99% uptime achievement across all global data processing regions
  • 89% reduction in deployment complexity and maintenance overhead for data pipelines
  • 285% increase in data engineer productivity through automated workflows

The architectural revolution enabled Snowflake to launch real-time streaming analytics with 8X faster time-to-market than traditional approaches, capturing 52% market share in enterprise data cloud services and generating $6.1 billion in new revenue streams.

Module Overview

You're about to master the same enterprise PydanticAI patterns that transformed Snowflake's global data infrastructure. This module reveals production-grade deployment strategies, dependency injection systems, scalable architectures, and monitoring solutions that billion-dollar data companies use to dominate competitive markets through operational excellence in cloud-scale data processing.

Part 1: Dependency Injection & Architecture for Data Systems

Dependency Injection for Testing and Production Data Services

PydanticAI's dependency injection system enables clean separation between testing and production configurations, supporting eval-driven iterative development and robust production dependency management for data processing systems.

🗂️ File: src/session5/dependency_injection.py - Complete dependency injection system

Data Service Interface Protocols

Protocols define the contracts that all data service implementations must follow, enabling seamless switching between production and test implementations.

# Essential imports for data processing dependency injection
from pydantic_ai.dependencies import DependencyProvider, Injectable, Scope
from typing import Protocol, runtime_checkable, Dict, Any, Optional
from dataclasses import dataclass
from contextlib import asynccontextmanager
import asyncio
import logging
import uuid

Now we define service interface protocols for type-safe dependency injection in data systems:

# Data service interface definitions using Protocol pattern
@runtime_checkable
class DataWarehouseService(Protocol):
    """Protocol for data warehouse operations with type safety."""
    async def execute_query(self, query: str, params: Dict[str, Any] = None) -> Dict[str, Any]: ...
    async def save_dataset(self, dataset_name: str, data: Dict[str, Any]) -> str: ...
    async def health_check(self) -> bool: ...

@runtime_checkable
class StreamingService(Protocol):
    """Protocol for streaming data service integrations."""
    async def publish_event(self, topic: str, event_data: Dict[str, Any]) -> str: ...
    async def consume_events(self, topic: str, batch_size: int = 100) -> List[Dict[str, Any]]: ...
    async def get_topic_metrics(self, topic: str) -> Dict[str, Any]: ...

Finally, we add the feature store and ML model services:

@runtime_checkable
class FeatureStoreService(Protocol):
    """Protocol for ML feature store operations with TTL support."""
    async def get_features(self, entity_id: str, feature_names: List[str]) -> Dict[str, Any]: ...
    async def store_features(self, entity_id: str, features: Dict[str, Any], ttl: int = 3600) -> None: ...
    async def compute_feature_stats(self, feature_name: str) -> Dict[str, Any]: ...

The @runtime_checkable decorator allows isinstance() checks at runtime, ensuring type safety in data service dependency injection.

Production Data Service Implementations

Production services provide real connectivity with proper error handling, connection pooling, and monitoring tailored for data processing workloads.

# Production data warehouse implementation with connection pooling
class ProductionDataWarehouseService:
    """Production data warehouse service with connection pool management for Snowflake/BigQuery."""

    def __init__(self, connection_string: str, pool_size: int = 20):
        self.connection_string = connection_string
        self.pool_size = pool_size
        self._connection_pool = None

    async def initialize(self):
        """Initialize data warehouse connection pool for production use."""
        # In real implementation, this would create actual connection pool to Snowflake/BigQuery
        self._connection_pool = f"DataWarehousePool({self.connection_string}, size={self.pool_size})"
        logging.info(f"Data warehouse service initialized: {self._connection_pool}")

Now we implement the core data warehouse operations with proper error handling:

    async def execute_query(self, query: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """Execute SQL query on data warehouse with transaction safety."""
        # In production, this would use proper database drivers like snowflake-connector-python
        query_id = str(uuid.uuid4())
        logging.info(f"Executing query {query_id}: {query[:100]}...")

        # Simulate query execution with realistic data warehouse response
        return {
            "query_id": query_id,
            "row_count": 150000,  # Typical data warehouse query result size
            "execution_time_ms": 2500,
            "bytes_processed": 45000000,
            "cache_hit": False
        }

Query execution provides the core interface for data warehouse operations with comprehensive result metadata. The unique query ID enables request tracing and monitoring, while metrics like row count and bytes processed support performance optimization and cost tracking essential for enterprise data operations.

    async def save_dataset(self, dataset_name: str, data: Dict[str, Any]) -> str:
        """Save dataset to data warehouse with proper partitioning."""
        dataset_id = f"ds_{hash(dataset_name) % 100000}"
        logging.info(f"Saved dataset {dataset_name} as {dataset_id}")
        return dataset_id

    async def health_check(self) -> bool:
        """Monitor data warehouse connection health for production systems."""
        return self._connection_pool is not None

Dataset persistence and health monitoring provide essential production capabilities for data warehouse operations. The hash-based dataset ID ensures consistent naming while the health check enables monitoring systems to detect connection issues before they impact data processing operations.

Test Data Service Implementations

Test implementations provide predictable behavior and call tracking for comprehensive testing without external data dependencies.

# Test data warehouse service with in-memory storage and call tracking
class TestDataWarehouseService:
    """Test data warehouse service with in-memory storage and comprehensive logging."""

    def __init__(self):
        self.query_store = {}     # In-memory query results storage
        self.dataset_store = {}   # In-memory dataset storage
        self.call_log = []        # Track all method calls for verification

    async def initialize(self):
        """Initialize test data warehouse - always succeeds for testing."""
        logging.info("Test data warehouse service initialized")

Test service initialization provides predictable, isolated testing environments without external dependencies. In-memory storage enables fast test execution while call logging supports comprehensive verification of service interactions during integration testing.

    async def execute_query(self, query: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """Execute query against in-memory test data with predictable results."""
        query_id = f"test_query_{len(self.query_store)}"
        result = {
            "query_id": query_id,
            "row_count": 1000,  # Predictable test data size
            "execution_time_ms": 100,
            "bytes_processed": 5000,
            "cache_hit": True
        }
        self.query_store[query_id] = result
        self.call_log.append(("execute_query", query, params, result))
        return result

Test query execution returns consistent, predictable results that enable reliable automated testing. The deterministic response structure with fixed metrics supports test assertions, while call logging provides verification capabilities for complex integration test scenarios.

Now we add dataset storage and health check methods with call tracking:

    async def save_dataset(self, dataset_name: str, data: Dict[str, Any]) -> str:
        """Save dataset to in-memory store with call logging."""
        dataset_id = f"test_ds_{len(self.dataset_store)}"
        self.dataset_store[dataset_id] = {"name": dataset_name, "data": data}
        self.call_log.append(("save_dataset", dataset_name, dataset_id))
        return dataset_id

    async def health_check(self) -> bool:
        """Test data warehouse is always healthy for consistent testing."""
        return True

    def get_call_log(self) -> List[tuple]:
        """Get complete log of service calls for test verification."""
        return self.call_log.copy()

Data Service Dependency Injection Container

The dependency container manages service lifecycles, registration, and resolution with support for singletons and scoped services tailored for data processing environments.

# Advanced dependency injection container for data services
class DataServiceContainer:
    """Enterprise-grade dependency injection container for data processing services."""

    def __init__(self):
        self.services = {}                  # Active service instances
        self.factories = {}                 # Service factory functions
        self.singletons = {}               # Singleton service instances
        self.scoped_services = {}          # Scoped service definitions
        self.initialization_order = []     # Service initialization sequence
        self.health_checks = {}            # Health check functions per service

The dependency injection container provides comprehensive service lifecycle management for enterprise data processing systems. Multiple storage dictionaries support different service patterns - singletons for shared resources, scoped services for request-specific instances, and factories for on-demand creation with proper initialization ordering.

    def register_data_service(
        self,
        interface: Type,
        implementation: Type,
        *args,
        **kwargs
    ) -> None:
        """Register a data service as singleton for container lifetime."""
        self.factories[interface] = lambda: implementation(*args, **kwargs)
        self.initialization_order.append(interface)

        # Register health check if service supports it
        if hasattr(implementation, 'health_check'):
            self.health_checks[interface] = implementation.health_check

Service registration uses lambda factories to defer instantiation until resolution time, enabling proper dependency ordering and initialization sequencing. Health check registration provides automatic monitoring capabilities for all services that support health verification, essential for production data system reliability.

Now we implement the service resolution with proper lifecycle management:

    async def get_service(self, interface: Type, scope: str = "default") -> Any:
        """Resolve service instance with proper initialization and lifecycle management."""

        # Check existing singletons first for performance
        if interface in self.singletons:
            return self.singletons[interface]

        # Create and cache singleton instances
        if interface in self.factories:
            instance = self.factories[interface]()
            if hasattr(instance, 'initialize'):
                await instance.initialize()
            self.singletons[interface] = instance
            logging.info(f"Initialized data service: {interface.__name__}")
            return instance

        raise ValueError(f"No registration found for data service: {interface}")

Service resolution implements lazy initialization with singleton caching for optimal performance in production environments. The async initialization pattern supports services requiring asynchronous setup like database connections, while error handling ensures clear diagnostics when dependencies are misconfigured.

    async def health_check_all(self) -> Dict[str, bool]:
        """Check health of all registered data services."""
        health_status = {}
        for interface, instance in self.singletons.items():
            try:
                if hasattr(instance, 'health_check'):
                    health_status[interface.__name__] = await instance.health_check()
                else:
                    health_status[interface.__name__] = True
            except Exception as e:
                health_status[interface.__name__] = False
                logging.error(f"Health check failed for {interface.__name__}: {e}")

        return health_status

Comprehensive health monitoring provides operational visibility across all registered services in the data processing system. The exception handling ensures health check failures don't crash the monitoring system, while detailed logging enables troubleshooting of specific service issues that could impact data pipeline operations.

Part 2: Scalability & Performance for Data Processing

Production Agent Architecture for Data Workloads

🗂️ File: src/session5/production_agents.py - Scalable agent implementations

Production PydanticAI applications require robust architecture patterns that handle concurrency, error recovery, and monitoring at scale for data processing workloads.

# Production-ready agent patterns for data processing systems
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Protocol, runtime_checkable
from contextlib import asynccontextmanager
import logging
from abc import ABC, abstractmethod

# Configure production logging for data processing environments
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('data_agent_production.log')
    ]
)

Data Processing Metrics and Monitoring

Comprehensive metrics tracking enables monitoring and optimization of agent performance in production data processing environments.

class DataProcessingMetrics(BaseModel):
    """Comprehensive data processing agent metrics for monitoring."""

    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    average_response_time_ms: float = 0.0
    p95_response_time_ms: float = 0.0
    p99_response_time_ms: float = 0.0
    error_rate_percent: float = 0.0
    last_request_timestamp: Optional[datetime] = None
    uptime_seconds: float = 0.0
    memory_usage_mb: float = 0.0

    # Data processing specific metrics
    datasets_processed: int = 0
    total_rows_processed: int = 0
    average_throughput_rows_per_second: float = 0.0
    data_quality_score: float = 1.0

    @property
    def success_rate_percent(self) -> float:
        """Calculate success rate percentage."""
        if self.total_requests == 0:
            return 100.0
        return (self.successful_requests / self.total_requests) * 100.0

Production Data Processing Agent Base Class

The production agent base class provides all necessary infrastructure for enterprise deployment including metrics, health checks, and error handling optimized for data processing workflows.

class ProductionDataAgentBase(ABC):
    """Abstract base class for production-ready data processing agents."""

    def __init__(self, name: str, config: Dict[str, Any] = None):
        self.name = name
        self.config = config or {}
        self.logger = logging.getLogger(f"{self.__class__.__name__}.{name}")
        self.metrics = DataProcessingMetrics()
        self.start_time = datetime.now(timezone.utc)
        self._request_times: List[float] = []
        self._executor = ThreadPoolExecutor(max_workers=8)  # Higher concurrency for data processing
        self._health_status = "healthy"
        self._data_quality_issues: List[str] = []

Now we define the abstract interface that all production data processing agents must implement:

    @abstractmethod
    async def _process_core_request(self, request: BaseModel) -> BaseModel:
        """Core data processing request logic - must be implemented by subclasses."""
        pass

Request Processing with Data-Specific Monitoring

Request processing includes comprehensive monitoring, error handling, and performance tracking for production data processing reliability.

    async def process_data_request(self, request: BaseModel) -> BaseModel:
        """Process data request with full monitoring and error handling."""
        start_time = time.time()
        self.metrics.total_requests += 1

        try:
            # Update metrics
            self.metrics.last_request_timestamp = datetime.now(timezone.utc)

            # Process the data request
            result = await self._process_core_request(request)

            # Track success and data processing metrics
            self.metrics.successful_requests += 1
            self.metrics.datasets_processed += 1

            # Estimate processed rows (can be overridden by subclasses)
            estimated_rows = getattr(request, 'estimated_rows', 1000)
            self.metrics.total_rows_processed += estimated_rows

            self._health_status = "healthy"

            return result

Now we handle errors and track performance metrics in the exception handler:

        except Exception as e:
            # Track failure with data processing context
            self.metrics.failed_requests += 1
            self._health_status = "degraded"

            # Log data processing specific error details
            if hasattr(request, 'dataset_id'):
                self.logger.error(f"Data processing failed for dataset {request.dataset_id}: {e}")
            else:
                self.logger.error(f"Data processing request failed: {e}")

            # Track data quality issues
            if "data quality" in str(e).lower() or "validation" in str(e).lower():
                self._data_quality_issues.append(f"{datetime.now()}: {str(e)}")
                self.metrics.data_quality_score = max(0.0, self.metrics.data_quality_score - 0.01)

            raise

        finally:
            # Track timing and throughput
            request_time = (time.time() - start_time) * 1000
            self._request_times.append(request_time)
            self._update_timing_metrics()

Concurrent Data Processing

Production data agents support concurrent request processing with proper resource management and backpressure handling optimized for data processing workloads.

    async def process_data_batch(self, requests: List[BaseModel]) -> List[BaseModel]:
        """Process multiple data requests concurrently with resource management."""

        # Higher concurrent limits for data processing workloads
        max_concurrent = self.config.get('max_concurrent_requests', 20)
        semaphore = asyncio.Semaphore(max_concurrent)

        async def process_with_semaphore(request):
            async with semaphore:
                return await self.process_data_request(request)

        # Execute all requests concurrently
        tasks = [process_with_semaphore(req) for req in requests]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Separate successful results from exceptions
        successful_results = []
        failed_count = 0

        for result in results:
            if isinstance(result, Exception):
                failed_count += 1
                self.logger.error(f"Batch processing error: {result}")
            else:
                successful_results.append(result)

        # Update batch processing metrics
        batch_success_rate = len(successful_results) / len(requests) if requests else 1.0
        if batch_success_rate < 0.9:  # Less than 90% success rate
            self.logger.warning(f"Low batch success rate: {batch_success_rate:.2%}")

        return successful_results

Health Monitoring and Circuit Breaker for Data Services

Health monitoring and circuit breaker patterns prevent cascading failures in production data processing environments.

    async def health_check(self) -> Dict[str, Any]:
        """Comprehensive health check for data processing monitoring systems."""
        uptime = (datetime.now(timezone.utc) - self.start_time).total_seconds()
        self.metrics.uptime_seconds = uptime

        # Check memory usage
        import psutil
        process = psutil.Process()
        memory_mb = process.memory_info().rss / 1024 / 1024
        self.metrics.memory_usage_mb = memory_mb

        # Calculate throughput metrics
        if uptime > 0:
            self.metrics.average_throughput_rows_per_second = self.metrics.total_rows_processed / uptime

        return {
            "status": self._health_status,
            "uptime_seconds": uptime,
            "memory_mb": memory_mb,
            "success_rate": self.metrics.success_rate_percent,
            "total_requests": self.metrics.total_requests,
            "datasets_processed": self.metrics.datasets_processed,
            "rows_processed": self.metrics.total_rows_processed,
            "throughput_rows_per_second": self.metrics.average_throughput_rows_per_second,
            "data_quality_score": self.metrics.data_quality_score,
            "avg_response_time_ms": self.metrics.average_response_time_ms,
            "recent_data_quality_issues": self._data_quality_issues[-5:]  # Last 5 issues
        }

Part 3: Security & Compliance for Data Systems

Enterprise Security Patterns for Data Processing

🗂️ File: src/session5/security.py - Enterprise security implementations

Enterprise PydanticAI applications require comprehensive security patterns including authentication, authorization, data privacy, and audit logging tailored for data processing environments.

# Enterprise security patterns for PydanticAI data processing systems
import jwt
import hashlib
import secrets
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel, Field
from enum import Enum
import logging

Now we define the security configuration model for enterprise data processing deployments:

class DataSecurityConfig(BaseModel):
    """Security configuration for enterprise data processing deployments."""

    jwt_secret_key: str = Field(..., min_length=32)
    jwt_expiration_hours: int = Field(default=8, ge=1, le=24)  # Shorter for data processing
    api_rate_limit_per_minute: int = Field(default=1000, ge=1)  # Higher for data workloads
    enable_audit_logging: bool = Field(default=True)
    allowed_origins: List[str] = Field(default_factory=list)
    require_https: bool = Field(default=True)
    data_encryption_enabled: bool = Field(default=True)
    pii_detection_enabled: bool = Field(default=True)

Authentication and Authorization for Data Engineers

Comprehensive authentication and authorization system for enterprise data processing agent access control.

class DataUserRole(str, Enum):
    """User roles for role-based access control in data processing systems."""
    DATA_ADMIN = "data_admin"
    DATA_ENGINEER = "data_engineer"
    ML_ENGINEER = "ml_engineer"
    DATA_ANALYST = "data_analyst"
    DATA_VIEWER = "data_viewer"

class DataAuthenticationService:
    """Enterprise authentication service for data processing systems with JWT tokens."""

    def __init__(self, config: DataSecurityConfig):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.role_permissions = {
            DataUserRole.DATA_ADMIN: ["read", "write", "delete", "admin"],
            DataUserRole.DATA_ENGINEER: ["read", "write", "pipeline"],
            DataUserRole.ML_ENGINEER: ["read", "write", "model"],
            DataUserRole.DATA_ANALYST: ["read", "query"],
            DataUserRole.DATA_VIEWER: ["read"]
        }

Next, we implement JWT token creation with proper security and expiration for data processing workflows:

    def create_access_token(self, user_id: str, roles: List[DataUserRole]) -> str:
        """Create JWT access token with user information and data processing roles."""
        now = datetime.now(timezone.utc)
        payload = {
            "user_id": user_id,
            "roles": [role.value for role in roles],
            "permissions": self._get_permissions_for_roles(roles),
            "iat": now,
            "exp": now + timedelta(hours=self.config.jwt_expiration_hours),
            "iss": "pydantic-ai-data-platform"
        }

        token = jwt.encode(payload, self.config.jwt_secret_key, algorithm="HS256")
        self.logger.info(f"Created access token for data user {user_id} with roles {[r.value for r in roles]}")
        return token

    def _get_permissions_for_roles(self, roles: List[DataUserRole]) -> List[str]:
        """Get combined permissions for multiple roles."""
        permissions = set()
        for role in roles:
            permissions.update(self.role_permissions.get(role, []))
        return list(permissions)

Finally, we add secure token verification with proper error handling:

    def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
        """Verify JWT token and return user information with data processing context."""
        try:
            payload = jwt.decode(
                token,
                self.config.jwt_secret_key,
                algorithms=["HS256"],
                options={"verify_exp": True}
            )
            return payload
        except jwt.ExpiredSignatureError:
            self.logger.warning("Token verification failed: expired")
            return None
        except jwt.InvalidTokenError:
            self.logger.warning("Token verification failed: invalid")
            return None

    def check_permission(self, token_payload: Dict[str, Any], required_permission: str) -> bool:
        """Check if user has required permission for data operation."""
        user_permissions = token_payload.get("permissions", [])
        return required_permission in user_permissions

Data Privacy and Compliance for Data Processing

Data privacy patterns ensure compliance with regulations like GDPR, HIPAA, and other enterprise requirements specifically for data processing systems.

class DataPrivacyService:
    """Data privacy and compliance service for sensitive data processing systems."""

    def __init__(self, encryption_key: str):
        self.encryption_key = encryption_key.encode()
        self.logger = logging.getLogger(__name__)
        self.pii_patterns = [
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # Email
            r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',  # Credit card
        ]

    def anonymize_dataset(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Anonymize sensitive data in datasets while preserving utility for data processing."""
        anonymized = data.copy()

        # Hash personally identifiable information
        if 'user_id' in anonymized:
            anonymized['user_id_hash'] = self._hash_pii(anonymized.pop('user_id'))

        if 'customer_id' in anonymized:
            anonymized['customer_id_hash'] = self._hash_pii(anonymized.pop('customer_id'))

        # Handle feature vectors - anonymize but preserve structure
        if 'features' in anonymized and isinstance(anonymized['features'], dict):
            anonymized['features'] = self._anonymize_features(anonymized['features'])

Now we remove sensitive fields and provide PII detection for data processing:

        # Remove or mask sensitive fields common in data processing
        sensitive_fields = ['ssn', 'phone', 'address', 'credit_card', 'email', 'ip_address']
        for field in sensitive_fields:
            if field in anonymized:
                if field in ['email', 'phone']:
                    # Partial masking for data processing utility
                    anonymized[f'{field}_masked'] = self._partial_mask(anonymized.pop(field))
                else:
                    del anonymized[field]

        return anonymized

    def _hash_pii(self, data: str) -> str:
        """Hash personally identifiable information for data processing anonymization."""
        # Use salt for additional security in data processing
        salt = b"data_processing_salt"
        return hashlib.sha256(salt + data.encode()).hexdigest()[:16]

    def detect_pii_in_dataset(self, data: Dict[str, Any]) -> List[str]:
        """Detect potential PII in datasets for compliance monitoring."""
        import re
        pii_detected = []

        for field, value in data.items():
            if isinstance(value, str):
                for pattern in self.pii_patterns:
                    if re.search(pattern, value):
                        pii_detected.append(field)
                        break

        return pii_detected

Audit Logging System for Data Processing

Comprehensive audit logging tracks all data processing agent operations for compliance and security monitoring.

class DataProcessingAuditLogger:
    """Enterprise audit logging for data processing compliance and security monitoring."""

    def __init__(self, config: DataSecurityConfig):
        self.config = config
        self.audit_logger = logging.getLogger("data_processing_audit")

        # Configure audit-specific handler for data processing
        audit_handler = logging.FileHandler("data_processing_audit.log")
        audit_formatter = logging.Formatter(
            '%(asctime)s - DATA_AUDIT - %(levelname)s - %(message)s'
        )
        audit_handler.setFormatter(audit_formatter)
        self.audit_logger.addHandler(audit_handler)
        self.audit_logger.setLevel(logging.INFO)

Now we implement the core audit logging functionality for data processing:

    def log_data_processing_request(
        self,
        user_id: str,
        agent_name: str,
        dataset_id: str,
        processing_type: str,
        request_data: Dict[str, Any],
        result: Optional[Dict[str, Any]] = None,
        error: Optional[str] = None,
        data_sensitivity_level: str = "medium"
    ):
        """Log data processing request for comprehensive audit trail."""
        if not self.config.enable_audit_logging:
            return

The audit logging method accepts comprehensive parameters for data processing request tracking. Each parameter captures essential audit information: user identity, agent identification, dataset references, processing type classification, request payload, optional results, error states, and sensitivity classification for compliance monitoring.

        audit_entry = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "event_type": "data_processing_request",
            "user_id": user_id,
            "agent_name": agent_name,
            "dataset_id": dataset_id,
            "processing_type": processing_type,

The audit entry structure captures temporal, identity, and operational context essential for regulatory compliance. UTC timestamps ensure consistent timezone handling across distributed systems, while event type classification enables efficient log filtering and analysis for compliance reporting.

            "data_size_bytes": len(str(request_data)),
            "data_sensitivity_level": data_sensitivity_level,
            "success": error is None,
            "error_message": error,
            "compliance_flags": self._assess_compliance_flags(request_data)
        }

        self.audit_logger.info(f"DATA_PROCESSING: {audit_entry}")

Data size tracking monitors processing volume for capacity planning and cost attribution. Success determination and error capture provide comprehensive operational visibility, while compliance flags enable automated policy enforcement and regulatory report generation.

    def _assess_compliance_flags(self, data: Dict[str, Any]) -> List[str]:
        """Assess compliance flags for data processing requests."""
        flags = []

        # Check for potential PII
        privacy_service = DataPrivacyService("dummy_key")  # Would use real key
        if privacy_service.detect_pii_in_dataset(data):
            flags.append("pii_detected")

Compliance assessment begins with PII detection using specialized privacy services. Automated PII detection protects against inadvertent personal data processing violations, flagging requests that require additional privacy controls or consent verification.

        # Check for large data processing
        if len(str(data)) > 1000000:  # 1MB threshold
            flags.append("large_dataset")

        return flags

Large dataset detection identifies processing requests exceeding size thresholds that may require special handling, resource allocation, or approval workflows. The 1MB threshold represents a configurable boundary for automated versus supervised data processing operations.

Module Summary

You've now mastered enterprise PydanticAI patterns for data processing systems, including:

Data Service Dependency Injection: Built clean, testable architecture with data service protocols ✅ Production Data Processing Scalability: Implemented monitoring, metrics, and concurrent processing optimized for data workloads ✅ Enterprise Data Security: Created authentication, authorization, and compliance systems for data processing ✅ Data Processing Audit & Monitoring: Built comprehensive logging and health monitoring for data systems

Next Steps

🗂️ Source Files for Module B:

  • src/session5/dependency_injection.py - Complete data service DI system
  • src/session5/production_agents.py - Scalable agent patterns for data processing
  • src/session5/security.py - Enterprise security implementations for data systems

📝 Multiple Choice Test - Session 5

Test your understanding of enterprise PydanticAI patterns and production data processing systems:

Question 1: What design pattern does the data service dependency injection system use for service management?
A) Singleton pattern with global state
B) Protocol-based interfaces with container-managed lifecycles and health checking
C) Static factory methods only
D) Direct class instantiation

Question 2: How does the ProductionDataAgent handle concurrent data processing requests?
A) Sequential processing only
B) Semaphore-controlled concurrency with higher limits for data workloads and throughput tracking
C) Unlimited concurrent execution
D) Single-threaded execution with queuing

Question 3: What security measures does the DataAuthenticationService implement for data processing?
A) Simple password checking
B) JWT token validation, role-based authorization with data-specific permissions, and audit logging
C) Basic username verification
D) No authentication required

Question 4: What information does the comprehensive audit logging system capture for data processing?
A) Only request timestamps
B) Complete data processing tracking with dataset IDs, sensitivity levels, compliance flags, and performance metrics
C) Simple success/failure flags
D) Database query logs only

Question 5: How does the health monitoring system track data processing dependencies?
A) Manual status checks only
B) Automated dependency health checks with data quality monitoring and throughput metrics
C) Simple ping tests
D) Log file analysis only

View Solutions →


Previous: Session 4 - Team Orchestration →
Next: Session 6 - Modular Architecture →