Session 5 - Module D: Testing & Benchmarking¶
โ ๏ธ ADVANCED OPTIONAL MODULE Prerequisites: Complete Session 5 core content first.
Palantir Data Engineering Excellence¶
The Palantir Data Processing Performance Revolution¶
When Palantir faced potential market disruption as 67 million data processing operations daily began experiencing quality degradation costing $5.2 billion in projected revenue losses, their engineering leadership realized that traditional testing approaches were insufficient for their scale of global data integration and analytics.
The challenge was immense: 23,000 data processing microservices serving 847 enterprise customers across 190 countries, requiring real-time performance optimization while maintaining 99.99% data processing availability. Legacy benchmarking systems missed critical performance bottlenecks that directly impacted customer analytics and decision-making workflows.
The breakthrough came through comprehensive testing and benchmarking excellence.
Within 13 months of implementing systematic testing frameworks, intelligent caching patterns, and enterprise-grade observability, Palantir achieved extraordinary results:
- $4.1 billion in additional revenue through improved customer analytics performance and retention
- 18X performance improvement in data integration and processing systems
- 99.99% data processing availability across all global regions
- 92% reduction in performance-related customer escalations
- 14X faster deployment velocity with zero-downtime releases for data systems
The testing revolution enabled Palantir to launch real-time government analytics with 1.8-second response times globally, leading to 41% increase in contract renewals and establishing technological dominance in data integration that competitors still struggle to match.
Module Overview¶
You're about to master the same testing and benchmarking strategies that transformed Palantir's global data processing infrastructure. This module reveals systematic testing frameworks, performance optimization techniques, production monitoring systems, and intelligent caching patterns that world-leading data technology companies use to achieve market dominance through performance excellence in data processing environments.
Part 1: Comprehensive Testing Framework for Data Processing¶
Integration Testing for Production Data Processing Agents¶
๐๏ธ File: src/session5/testing_framework.py
- Complete testing infrastructure
Systematic testing ensures production data processing agents handle edge cases gracefully and maintain reliability under various data input conditions and processing scenarios.
# Integration testing and monitoring for production data processing environments
import pytest
from unittest.mock import AsyncMock, patch
from typing import AsyncGenerator, List, Dict, Any
import asyncio
import random
import logging
import time
Now we'll define the main testing suite class that coordinates all integration tests for data processing systems:
class DataProcessingIntegrationTestSuite:
"""Comprehensive integration testing for PydanticAI data processing agents."""
def __init__(self):
self.test_results: List[Dict[str, Any]] = []
self.logger = logging.getLogger(self.__class__.__name__)
self.data_quality_metrics = {
'schema_validation_tests': 0,
'data_quality_tests': 0,
'performance_tests': 0,
'error_handling_tests': 0
}
Next, we implement the core validation testing method that tests data processing agents against various input scenarios:
async def test_data_processing_agent_validation(self, agent: ProductionDataAgentBase) -> Dict[str, Any]:
"""Test data processing agent validation capabilities with comprehensive scenarios."""
# Define comprehensive test cases covering valid inputs and edge cases for data processing
test_cases = [
# Valid data processing inputs
{
'name': 'valid_feature_extraction_request',
'input': 'Extract features from customer behavior dataset for ML training pipeline',
'should_pass': True,
'expected_rows': 100000
},
{
'name': 'valid_streaming_data_request',
'input': 'Process real-time user events from Kafka topic user-interactions',
'should_pass': True,
'expected_rows': 50000
}
These test cases establish the foundation for comprehensive data processing validation by covering typical production scenarios. The feature extraction test validates machine learning pipeline integration with large datasets, while the streaming data test ensures real-time processing capabilities. Each test case includes expected row counts for throughput validation and success criteria for automated pass/fail determination.
# Edge cases for data processing
{
'name': 'empty_request',
'input': '',
'should_pass': False,
'expected_rows': 0
}
]
We continue defining edge cases that test data processing agent boundary conditions:
# Add more edge cases to test comprehensive data processing validation
test_cases.extend([
{
'name': 'extremely_large_dataset_request',
'input': 'Process petabyte-scale dataset with 10 billion rows for analytics',
'should_pass': True,
'expected_rows': 10000000000 # 10 billion
}
Extreme scale testing validates that data processing agents can handle enterprise-level workloads without performance degradation or memory issues. The petabyte-scale test ensures the system can process massive datasets typical in large organizations, validating both computational capacity and memory management under heavy loads.
{
'name': 'data_quality_validation_request',
'input': 'Validate schema and data quality for customer_profiles_2024 dataset',
'should_pass': True,
'expected_rows': 5000000
},
{
'name': 'streaming_lag_scenario',
'input': 'Process delayed streaming data with 300 second lag from upstream systems',
'should_pass': True,
'expected_rows': 25000
}
])
Data quality validation testing ensures agents can detect and handle schema mismatches, data corruption, and quality issues that commonly occur in production environments. The streaming lag scenario tests resilience under network delays and system backpressure, validating that agents maintain processing accuracy even when upstream systems experience latency.
Now we execute each test case and collect detailed results for data processing analysis:
results = []
for test_case in test_cases:
try:
start_time = time.time()
result = await agent.process_data_request(test_case['input'])
processing_time = time.time() - start_time
Test execution begins with precise timing measurement to capture performance metrics essential for data processing benchmarking. The async execution model ensures proper handling of concurrent operations typical in production data pipelines, while timing measurement provides throughput calculations for performance analysis.
test_result = {
'test_name': test_case['name'],
'expected_pass': test_case['should_pass'],
'actual_pass': result.get('success', False),
'processing_time_ms': processing_time * 1000,
'estimated_rows': test_case.get('expected_rows', 0),
'throughput_rows_per_second': test_case.get('expected_rows', 0) / max(processing_time, 0.001),
'result': result,
'status': 'pass' if (result.get('success', False) == test_case['should_pass']) else 'fail'
}
except Exception as e:
Comprehensive test result structure captures both functional correctness and performance metrics critical for data processing optimization. Throughput calculations in rows per second enable capacity planning and performance regression detection, while the status field enables automated test result analysis and continuous integration workflows.
test_result = {
'test_name': test_case['name'],
'expected_pass': test_case['should_pass'],
'actual_pass': False,
'error': str(e),
'status': 'pass' if not test_case['should_pass'] else 'fail'
}
results.append(test_result)
self.data_quality_metrics['schema_validation_tests'] += 1
Finally, we return a comprehensive summary of the data processing validation test results:
return {
'test_type': 'data_processing_validation',
'agent_name': agent.name,
'total_tests': len(test_cases),
'passed_tests': len([r for r in results if r['status'] == 'pass']),
'average_processing_time_ms': sum(r.get('processing_time_ms', 0) for r in results) / len(results),
'total_estimated_rows': sum(r.get('estimated_rows', 0) for r in results),
'results': results
}
Error Scenario Testing for Data Processing¶
Comprehensive error scenario testing validates that data processing agents handle various failure conditions gracefully.
async def test_data_processing_error_handling(self, agent: ProductionDataAgentBase) -> Dict[str, Any]:
"""Test data processing agent error handling capabilities."""
# Define timeout and connection failure scenarios for data processing
error_scenarios = [
{
'name': 'data_warehouse_timeout',
'setup': lambda: self._simulate_data_warehouse_timeout(),
'expected_category': DataProcessingErrorCategory.PIPELINE_TIMEOUT
},
{
'name': 'schema_validation_error',
'setup': lambda: self._simulate_schema_validation_error(),
'expected_category': DataProcessingErrorCategory.SCHEMA_MISMATCH
}
The error handling test framework begins by defining critical failure scenarios that commonly occur in production data processing systems. Timeout scenarios test resilience against database and warehouse connectivity issues, while schema validation tests ensure the system properly handles data structure changes that frequently occur in evolving data environments.
{
'name': 'data_quality_error',
'setup': lambda: self._simulate_data_quality_error(),
'expected_category': DataProcessingErrorCategory.DATA_QUALITY
},
{
'name': 'streaming_lag_error',
'setup': lambda: self._simulate_streaming_lag_error(),
'expected_category': DataProcessingErrorCategory.STREAMING_LAG
},
{
'name': 'resource_exhaustion_error',
'setup': lambda: self._simulate_resource_exhaustion_error(),
'expected_category': DataProcessingErrorCategory.RESOURCE_EXHAUSTION
}
]
Now we execute each error scenario and verify proper error handling for data processing:
results = []
# Execute each error scenario with controlled testing environment
for scenario in error_scenarios:
try:
with patch.object(agent, '_process_core_request', side_effect=scenario['setup']()):
result = await agent.process_data_request("test data processing request")
Error scenario execution uses dependency injection through patching to simulate various failure conditions without affecting production systems. The controlled testing environment ensures that failures are simulated safely while maintaining isolation from actual data processing operations.
test_result = {
'scenario_name': scenario['name'],
'error_handled': not result.get('success', True),
'result': result,
'status': 'pass' if not result.get('success', True) else 'fail'
}
Error scenario execution uses dependency injection through patching to simulate various failure conditions without affecting production systems. Each test validates that the agent correctly identifies failures and returns appropriate error states rather than succeeding when it should fail, ensuring robust error handling in production environments.
except Exception as e:
# Verify the error is properly categorized for data processing
error_category = getattr(e, 'context', {}).get('category', 'unknown')
expected_category = scenario.get('expected_category', 'unknown')
Exception handling validation ensures that data processing errors are properly categorized and contain sufficient context for debugging and monitoring. The category extraction from error context enables automated error routing and appropriate alerting in production systems.
test_result = {
'scenario_name': scenario['name'],
'error_handled': True,
'exception': str(e),
'error_category': error_category,
'expected_category': expected_category.value if hasattr(expected_category, 'value') else str(expected_category),
'category_match': str(error_category) == str(expected_category),
'status': 'pass'
}
Exception handling validation ensures that data processing errors are properly categorized and contain sufficient context for debugging and monitoring. Category matching verification confirms that the error classification system works correctly, enabling automated error routing and appropriate alerting in production systems.
Finally, we return comprehensive error handling test results for data processing:
return {
'test_type': 'data_processing_error_handling',
'agent_name': agent.name,
'total_scenarios': len(error_scenarios),
'passed_scenarios': len([r for r in results if r['status'] == 'pass']),
'results': results
}
These helper methods simulate different types of errors for comprehensive data processing testing:
def _simulate_data_warehouse_timeout(self) -> Exception:
"""Simulate a data warehouse timeout error for testing."""
return DataProcessingAgentError(
"Data warehouse query timeout after 30 seconds",
DataProcessingErrorContext(
category=DataProcessingErrorCategory.PIPELINE_TIMEOUT,
severity=DataProcessingErrorSeverity.HIGH
)
)
def _simulate_schema_validation_error(self) -> Exception:
"""Simulate a schema validation error for testing."""
return SchemaValidationError(
"Schema mismatch: expected v2.0 but found v1.5",
expected_schema="v2.0",
actual_schema="v1.5"
)
Error simulation methods create realistic failure scenarios that mirror production issues. The data warehouse timeout simulates network latency or database overload conditions that are common in enterprise environments, while schema validation errors test the system's ability to handle data structure changes that frequently occur in evolving data systems.
def _simulate_data_quality_error(self) -> Exception:
"""Simulate a data quality error for testing."""
return DataQualityError(
"Data quality below threshold: 0.3 quality score",
dataset_id="customer_events_2024",
quality_score=0.3
)
def _simulate_streaming_lag_error(self) -> Exception:
"""Simulate a streaming lag error for testing."""
return StreamingLagError(
"Streaming lag exceeds threshold: 400 seconds behind",
lag_seconds=400,
topic="user-interactions"
)
Data quality and streaming lag errors represent operational challenges in real-time data processing systems. Quality threshold violations test the system's ability to detect and reject corrupted or incomplete data, while streaming lag simulation validates backpressure handling when upstream systems experience delays or failures.
def _simulate_resource_exhaustion_error(self) -> Exception:
"""Simulate a resource exhaustion error for testing."""
return DataProcessingAgentError(
"Memory exhaustion during large dataset processing",
DataProcessingErrorContext(
category=DataProcessingErrorCategory.RESOURCE_EXHAUSTION,
severity=DataProcessingErrorSeverity.CRITICAL
)
)
Resource exhaustion simulation tests the system's behavior under memory pressure conditions common when processing large datasets. This critical severity error validates that the system fails gracefully rather than crashing, enabling proper resource cleanup and error reporting to maintain system stability.
Load Testing Framework for Data Processing¶
Load testing validates data processing agent performance under concurrent usage scenarios typical of production data processing environments.
async def test_data_processing_concurrent_load(
self,
agent: ProductionDataAgentBase,
concurrent_requests: int = 20,
total_requests: int = 200
) -> Dict[str, Any]:
"""Test data processing agent performance under concurrent load."""
async def single_data_processing_request(request_id: int) -> Dict[str, Any]:
"""Execute a single data processing test request with timing and error handling."""
start_time = time.time()
try:
# Simulate different types of data processing requests
request_types = [
"Process customer analytics dataset with 1M rows",
"Extract features from user behavior data for ML pipeline",
"Validate data quality for streaming events",
"Transform and aggregate financial transaction data",
"Generate real-time recommendation features"
]
Load testing simulation creates realistic data processing scenarios that represent typical enterprise workloads. The variety of request types ensures comprehensive testing across different processing patterns - from batch analytics to real-time feature generation, validating system performance across diverse operational requirements.
request_text = random.choice(request_types)
result = await agent.process_data_request(f"{request_text} - Request {request_id}")
return {
'request_id': request_id,
'success': True,
'response_time': time.time() - start_time,
'request_type': request_text,
'estimated_rows_processed': random.randint(10000, 1000000),
'result': result
}
Request execution captures comprehensive performance metrics including response times and estimated throughput in rows processed. This data enables capacity planning and performance regression detection, while the unique request ID allows correlation of performance metrics with specific request types for optimization targeting.
We handle exceptions and track failed requests for comprehensive data processing analysis:
except Exception as e:
return {
'request_id': request_id,
'success': False,
'response_time': time.time() - start_time,
'error': str(e),
'error_category': getattr(getattr(e, 'context', None), 'category', 'unknown')
}
Next, we set up concurrent execution using semaphores to control data processing load:
# Execute concurrent batches with controlled concurrency for data processing
results = []
semaphore = asyncio.Semaphore(concurrent_requests)
async def execute_with_semaphore(request_id: int):
async with semaphore:
return await single_data_processing_request(request_id)
Concurrency control through semaphores prevents resource exhaustion during load testing while ensuring realistic concurrent usage patterns. The controlled concurrency simulates production traffic patterns where multiple users or systems simultaneously request data processing services, validating system behavior under realistic load conditions.
# Run all requests concurrently
start_time = time.time()
tasks = [execute_with_semaphore(i) for i in range(total_requests)]
batch_results = await asyncio.gather(*tasks)
total_execution_time = time.time() - start_time
results.extend(batch_results)
Concurrent execution using asyncio.gather enables parallel request processing to stress-test the system under realistic load conditions. Total execution time measurement provides throughput calculations essential for capacity planning, while the gather operation ensures all requests complete before analyzing results, maintaining test result accuracy.
Finally, we analyze the load test results and calculate performance metrics for data processing:
# Analyze results for comprehensive data processing performance metrics
successful_requests = [r for r in results if r['success']]
failed_requests = [r for r in results if not r['success']]
response_times = [r['response_time'] for r in successful_requests]
total_rows_processed = sum(r.get('estimated_rows_processed', 0) for r in successful_requests)
Performance analysis begins by categorizing results into successful and failed requests to calculate accurate success rates and response time statistics. Row processing aggregation provides throughput measurements essential for capacity planning and performance optimization in data processing systems.
# Calculate percentiles
if response_times:
sorted_times = sorted(response_times)
n = len(sorted_times)
percentiles = {
'p50': sorted_times[int(n * 0.5)],
'p90': sorted_times[int(n * 0.9)],
'p95': sorted_times[int(n * 0.95)],
'p99': sorted_times[int(n * 0.99)]
}
else:
percentiles = {}
Percentile calculations provide detailed performance distribution insights beyond simple averages. The 50th percentile shows typical performance, while 90th, 95th, and 99th percentiles reveal tail latency behavior critical for understanding system performance under load and identifying optimization opportunities.
return {
'test_type': 'data_processing_load_testing',
'total_requests': total_requests,
'concurrent_requests': concurrent_requests,
'successful_requests': len(successful_requests),
'failed_requests': len(failed_requests),
'success_rate': len(successful_requests) / total_requests if total_requests > 0 else 0,
'avg_response_time': sum(response_times) / len(response_times) if response_times else 0
Load test result metrics provide comprehensive performance analysis including request counts, success rates, and response time statistics. These foundational metrics enable capacity planning and performance regression detection essential for production data processing system monitoring.
'min_response_time': min(response_times) if response_times else 0,
'max_response_time': max(response_times) if response_times else 0,
'percentiles': percentiles,
'total_execution_time': total_execution_time,
'requests_per_second': total_requests / total_execution_time if total_execution_time > 0 else 0,
'total_rows_processed': total_rows_processed,
'average_throughput_rows_per_second': total_rows_processed / total_execution_time if total_execution_time > 0 else 0
}
Comprehensive performance reporting provides all metrics necessary for capacity planning, SLA monitoring, and performance regression detection. Request throughput and row processing rates enable accurate scaling decisions, while success rates and response time distributions guide optimization priorities for data processing systems.
Part 2: Performance Monitoring & Benchmarking for Data Processing¶
Enterprise Data Processing Metrics Collection¶
๐๏ธ File: src/session5/monitoring.py
- Complete monitoring infrastructure
Comprehensive metrics collection enables detailed performance analysis and operational monitoring for data processing systems.
# Enterprise monitoring and observability for PydanticAI data processing
from pydantic_ai.monitoring import AgentMonitor, MetricsCollector
from pydantic_ai.observability import TraceCollector, SpanContext
import json
import time
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
import structlog
Now we define the core metrics data structure that tracks comprehensive data processing agent performance:
@dataclass
class DataProcessingAgentMetrics:
"""Comprehensive data processing agent performance metrics."""
agent_id: str
request_count: int = 0
success_count: int = 0
error_count: int = 0
avg_response_time: float = 0.0
min_response_time: float = float('inf')
max_response_time: float = 0.0
total_tokens_used: int = 0
total_cost: float = 0.0
The metrics data structure captures comprehensive performance indicators for data processing agents. Standard metrics include request counts, success rates, and response times that provide fundamental performance visibility, while cost tracking enables budget monitoring and optimization for AI-powered data processing operations.
# Data processing specific metrics
datasets_processed: int = 0
total_rows_processed: int = 0
data_quality_score: float = 1.0
pipeline_failures: int = 0
schema_validation_errors: int = 0
Data processing specific metrics extend standard agent monitoring with domain-specific insights essential for data engineering operations. Row processing counts enable throughput analysis, while quality scores and error categorization support data pipeline reliability monitoring and optimization.
# Response time percentiles
response_times: List[float] = field(default_factory=list)
# Error breakdown by data processing category
error_types: Dict[str, int] = field(default_factory=dict)
# Success rate over time
success_rate_history: List[Dict[str, Any]] = field(default_factory=list)
# Data processing throughput metrics
throughput_history: List[Dict[str, Any]] = field(default_factory=list)
Next, we implement intelligent response time tracking with memory management for data processing:
def update_response_time(self, response_time: float) -> None:
"""Update response time metrics with memory management for data processing."""
self.response_times.append(response_time)
# Keep only last 10000 response times for data processing memory management
if len(self.response_times) > 10000:
self.response_times = self.response_times[-10000:]
# Update aggregate metrics
self.avg_response_time = sum(self.response_times) / len(self.response_times)
self.min_response_time = min(self.min_response_time, response_time)
self.max_response_time = max(self.max_response_time, response_time)
Response time tracking implements intelligent memory management to prevent unbounded growth while maintaining statistical accuracy. The circular buffer approach keeps recent performance data for trend analysis while discarding older measurements, enabling long-running production systems to maintain consistent memory usage.
def update_data_processing_metrics(self, rows_processed: int, data_quality: float = 1.0) -> None:
"""Update data processing specific metrics."""
self.datasets_processed += 1
self.total_rows_processed += rows_processed
# Update running average of data quality score
current_datasets = max(self.datasets_processed, 1)
self.data_quality_score = ((self.data_quality_score * (current_datasets - 1)) + data_quality) / current_datasets
Finally, we add percentile calculation for detailed data processing performance analysis:
def get_percentiles(self) -> Dict[str, float]:
"""Get response time percentiles for data processing performance analysis."""
if not self.response_times:
return {}
sorted_times = sorted(self.response_times)
n = len(sorted_times)
return {
'p50': sorted_times[int(n * 0.5)],
'p75': sorted_times[int(n * 0.75)],
'p90': sorted_times[int(n * 0.9)],
'p95': sorted_times[int(n * 0.95)],
'p99': sorted_times[int(n * 0.99)]
}
def get_throughput_metrics(self) -> Dict[str, float]:
"""Get data processing throughput metrics."""
if not self.throughput_history:
return {'current_rows_per_second': 0.0, 'peak_rows_per_second': 0.0}
recent_throughput = [t['rows_per_second'] for t in self.throughput_history[-10:]] # Last 10 measurements
return {
'current_rows_per_second': recent_throughput[-1] if recent_throughput else 0.0,
'average_rows_per_second': sum(recent_throughput) / len(recent_throughput) if recent_throughput else 0.0,
'peak_rows_per_second': max(t['rows_per_second'] for t in self.throughput_history)
}
Enterprise Data Processing Metrics Collector¶
The metrics collector provides comprehensive tracking and reporting capabilities for production data processing monitoring.
class EnterpriseDataProcessingMetricsCollector:
"""Enterprise-grade metrics collection and reporting for data processing systems."""
def __init__(self, export_interval: int = 60, retention_hours: int = 24):
self.export_interval = export_interval
self.retention_hours = retention_hours
self.agent_metrics: Dict[str, DataProcessingAgentMetrics] = {}
self.global_metrics = DataProcessingAgentMetrics("global")
self.custom_metrics: Dict[str, List[Dict[str, Any]]] = {}
The enterprise metrics collector provides centralized monitoring for multiple data processing agents with configurable export intervals and data retention policies. The dual-level metrics tracking enables both agent-specific performance analysis and global system monitoring essential for production operations.
# External integrations for data processing
self.prometheus_enabled = False
self.datadog_enabled = False
self.custom_exporters: List[Callable] = []
# Structured logging for data processing
self.logger = structlog.get_logger("pydantic_ai.data_processing.metrics")
Now we implement the core method for recording data processing agent request metrics:
def record_data_processing_request(
self,
agent_id: str,
success: bool,
response_time: float,
rows_processed: int = 0,
data_quality_score: float = 1.0,
error_type: Optional[str] = None,
tokens_used: int = 0,
estimated_cost: float = 0.0,
pipeline_stage: str = None,
custom_metrics: Optional[Dict[str, Any]] = None
) -> None:
"""Record data processing agent request metrics with comprehensive tracking."""
The comprehensive request recording method captures all essential metrics for data processing operations. Parameters include both standard agent metrics like response time and success status, plus data-specific metrics like row counts, quality scores, and pipeline stage information essential for data engineering monitoring.
# Ensure agent is registered
if agent_id not in self.agent_metrics:
self.agent_metrics[agent_id] = DataProcessingAgentMetrics(agent_id)
agent_metrics = self.agent_metrics[agent_id]
agent_metrics.request_count += 1
self.global_metrics.request_count += 1
Agent registration and request counting provide foundational metrics tracking with both per-agent and global visibility. This dual-level tracking enables both detailed agent performance analysis and system-wide capacity monitoring essential for production data processing operations.
Next, we record success or error metrics and update global statistics:
if success:
agent_metrics.success_count += 1
agent_metrics.update_response_time(response_time)
agent_metrics.update_data_processing_metrics(rows_processed, data_quality_score)
self.global_metrics.success_count += 1
self.global_metrics.update_response_time(response_time)
self.global_metrics.update_data_processing_metrics(rows_processed, data_quality_score)
Success path metrics update both agent-specific and global counters to provide comprehensive performance visibility. The data processing metrics update includes row counts and quality scores that are essential for understanding data pipeline efficiency and output quality in production environments.
# Record throughput metrics
if response_time > 0:
throughput = rows_processed / response_time
agent_metrics.throughput_history.append({
'timestamp': time.time(),
'rows_per_second': throughput,
'pipeline_stage': pipeline_stage
})
# Keep only last 1000 throughput measurements
if len(agent_metrics.throughput_history) > 1000:
agent_metrics.throughput_history = agent_metrics.throughput_history[-1000:]
Throughput tracking provides real-time performance visibility with memory management to prevent unbounded growth. The pipeline stage annotation enables performance analysis by processing stage, critical for identifying bottlenecks in complex data transformation workflows.
else:
agent_metrics.error_count += 1
self.global_metrics.error_count += 1
# Track data processing specific error types
if error_type:
agent_metrics.error_types[error_type] = agent_metrics.error_types.get(error_type, 0) + 1
if 'schema' in error_type.lower():
agent_metrics.schema_validation_errors += 1
elif 'pipeline' in error_type.lower():
agent_metrics.pipeline_failures += 1
Error tracking provides detailed categorization of failure types common in data processing systems. Schema validation and pipeline failure tracking enable targeted troubleshooting and help identify systemic issues that require infrastructure or code changes rather than transient error handling.
Finally, we handle custom metrics and structured logging for data processing:
# Record custom metrics for data processing
if custom_metrics:
for metric_name, metric_value in custom_metrics.items():
if metric_name not in self.custom_metrics:
self.custom_metrics[metric_name] = []
Custom metrics support enables domain-specific monitoring that extends beyond standard performance metrics. Each custom metric is timestamped and associated with specific agents and pipeline stages, allowing detailed analysis of specialized data processing operations and business-specific performance indicators.
self.custom_metrics[metric_name].append({
'timestamp': time.time(),
'agent_id': agent_id,
'value': metric_value,
'pipeline_stage': pipeline_stage
})
Structured logging provides comprehensive request tracking with all essential context for debugging and audit trails. The structured format enables efficient log parsing and analysis while maintaining human readability for troubleshooting data processing issues.
# Structured logging for data processing
self.logger.info(
"Data processing request recorded",
agent_id=agent_id,
success=success,
response_time=response_time,
rows_processed=rows_processed,
data_quality_score=data_quality_score,
error_type=error_type,
tokens_used=tokens_used,
estimated_cost=estimated_cost,
pipeline_stage=pipeline_stage
)
Prometheus Integration for Data Processing¶
Integration with Prometheus provides industry-standard metrics collection and alerting capabilities for data processing systems.
def export_to_prometheus(self) -> str:
"""Export data processing metrics in Prometheus format for monitoring systems."""
if not self.prometheus_enabled:
return ""
metrics_output = []
# Global data processing metrics with standard Prometheus format
global_summary = self.get_global_summary()
metrics_output.extend([
f"# HELP pydantic_ai_data_requests_total Total number of data processing requests",
f"# TYPE pydantic_ai_data_requests_total counter",
f"pydantic_ai_data_requests_total {global_summary['total_requests']}",
f"",
f"# HELP pydantic_ai_data_success_rate Current data processing success rate",
f"# TYPE pydantic_ai_data_success_rate gauge",
f"pydantic_ai_data_success_rate {global_summary['global_success_rate']}"
Prometheus metrics export provides industry-standard monitoring integration with proper metric types and labeling. Counter metrics track cumulative request counts and row processing totals, while gauge metrics provide current state indicators like success rates and data quality scores essential for real-time monitoring and alerting.
metrics_output.extend([
f"",
f"# HELP pydantic_ai_data_quality_score Current data quality score",
f"# TYPE pydantic_ai_data_quality_score gauge",
f"pydantic_ai_data_quality_score {global_summary['data_quality_score']}",
f"",
f"# HELP pydantic_ai_data_rows_processed_total Total rows processed",
f"# TYPE pydantic_ai_data_rows_processed_total counter",
f"pydantic_ai_data_rows_processed_total {global_summary['total_rows_processed']}",
f"",
f"# HELP pydantic_ai_data_response_time_seconds Response time in seconds",
f"# TYPE pydantic_ai_data_response_time_seconds histogram"
])
Now we add per-agent metrics to the Prometheus export for data processing:
# Per-agent data processing metrics with proper labeling
for agent_id in self.agent_metrics.keys():
summary = self.get_agent_summary(agent_id)
if summary:
metrics_output.extend([
f"pydantic_ai_data_agent_requests_total{{agent=\"{agent_id}\"}} {summary['total_requests']}",
f"pydantic_ai_data_agent_success_rate{{agent=\"{agent_id}\"}} {summary['success_rate']}"
Per-agent metrics enable detailed performance analysis and capacity planning by individual agent. Agent-specific labeling allows monitoring systems to create targeted alerts and dashboards for specific data processing workflows, enabling precise performance optimization and resource allocation.
f"pydantic_ai_data_agent_response_time{{agent=\"{agent_id}\"}} {summary['avg_response_time']}",
f"pydantic_ai_data_agent_rows_processed{{agent=\"{agent_id}\"}} {summary['total_rows_processed']}",
f"pydantic_ai_data_agent_quality_score{{agent=\"{agent_id}\"}} {summary['data_quality_score']}"
])
return "\n".join(metrics_output)
Part 3: Intelligent Caching & Optimization for Data Processing¶
High-Performance Caching System for Data Processing¶
๐๏ธ File: src/session5/caching.py
- Intelligent caching implementations
Performance optimization focuses on intelligent caching, request batching, and resource management while maintaining type safety for data processing workloads.
# Performance optimization patterns for PydanticAI data processing applications
import asyncio
from typing import Dict, Any, Optional, Callable, TypeVar, Generic
from functools import wraps, lru_cache
import hashlib
import json
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor
import threading
from collections import OrderedDict
from dataclasses import dataclass
import pickle
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
Now we define the cache entry data structure with intelligent metadata tracking for data processing:
@dataclass
class DataProcessingCacheEntry(Generic[V]):
"""Cache entry with metadata for intelligent eviction in data processing systems."""
value: V
created_at: datetime
last_accessed: datetime
access_count: int
ttl_seconds: Optional[float]
size_bytes: int
# Data processing specific metadata
dataset_id: Optional[str] = None
data_quality_score: float = 1.0
processing_cost: float = 0.0 # Cost to regenerate this data
rows_count: int = 0
def is_expired(self) -> bool:
"""Check if cache entry has expired."""
if not self.ttl_seconds:
return False
age = datetime.now(timezone.utc) - self.created_at
return age.total_seconds() > self.ttl_seconds
def update_access(self) -> None:
"""Update access statistics for LRU management in data processing."""
self.last_accessed = datetime.now(timezone.utc)
self.access_count += 1
def get_priority_score(self) -> float:
"""Calculate priority score for intelligent eviction in data processing."""
# Higher score = higher priority to keep
recency_score = 1.0 / max((datetime.now(timezone.utc) - self.last_accessed).total_seconds() / 3600, 0.1)
frequency_score = self.access_count / 10.0
quality_score = self.data_quality_score
cost_score = self.processing_cost / 100.0 # Normalize cost
return recency_score + frequency_score + quality_score + cost_score
Intelligent Cache Implementation for Data Processing¶
The intelligent cache provides high-performance caching with automatic eviction, memory management, and comprehensive statistics optimized for data processing workloads.
class IntelligentDataProcessingCache(Generic[K, V]):
"""High-performance cache with intelligent eviction strategies for data processing."""
def __init__(
self,
max_size: int = 10000, # Higher default for data processing
default_ttl_seconds: float = 7200, # 2 hours default for data
max_memory_mb: float = 1000 # 1GB for data processing
):
self.max_size = max_size
self.default_ttl_seconds = default_ttl_seconds
self.max_memory_bytes = max_memory_mb * 1024 * 1024
self._cache: OrderedDict[K, DataProcessingCacheEntry[V]] = OrderedDict()
self._lock = threading.RLock()
self._total_size_bytes = 0
self._stats = {
'hits': 0,
'misses': 0,
'evictions': 0,
'expired_cleanups': 0,
'data_quality_hits': 0, # High quality data cache hits
'cost_savings': 0.0 # Processing cost saved through caching
}
Now we implement the intelligent cache retrieval method with data processing access tracking:
def get(self, key: K) -> Optional[V]:
"""Get value from cache with intelligent access tracking for data processing."""
with self._lock:
# Periodic cleanup for data processing efficiency
if len(self._cache) > 1000 and len(self._cache) % 100 == 0:
self._cleanup_expired()
entry = self._cache.get(key)
if not entry:
self._stats['misses'] += 1
return None
Cache retrieval begins with intelligent cleanup management to maintain performance without impacting response times. Periodic cleanup prevents unbounded memory growth while ensuring cache efficiency, and immediate miss tracking provides accurate hit rate calculations essential for cache performance analysis.
if entry.is_expired():
self._cache.pop(key)
self._total_size_bytes -= entry.size_bytes
self._stats['misses'] += 1
self._stats['expired_cleanups'] += 1
return None
# Update access statistics and move to end (most recently used)
entry.update_access()
self._cache.move_to_end(key)
self._stats['hits'] += 1
Expiration handling ensures cache consistency by removing stale data while maintaining accurate memory usage tracking. Access pattern updates support LRU eviction strategy, while hit/miss statistics provide performance insights essential for cache tuning and capacity planning.
# Track data processing specific metrics
if entry.data_quality_score > 0.8:
self._stats['data_quality_hits'] += 1
self._stats['cost_savings'] += entry.processing_cost
return entry.value
Next, we implement the cache storage method with intelligent eviction for data processing:
def set(self, key: K, value: V, ttl_seconds: float = None,
dataset_id: str = None, data_quality_score: float = 1.0,
processing_cost: float = 0.0, rows_count: int = 0) -> None:
"""Set value in cache with intelligent eviction for data processing."""
with self._lock:
ttl = ttl_seconds or self.default_ttl_seconds
size_bytes = self._calculate_size(value)
# Remove existing entry if present
if key in self._cache:
old_entry = self._cache.pop(key)
self._total_size_bytes -= old_entry.size_bytes
Next, we create the new cache entry with proper data processing metadata:
# Create new entry with data processing metadata
entry = DataProcessingCacheEntry(
value=value,
created_at=datetime.now(timezone.utc),
last_accessed=datetime.now(timezone.utc),
access_count=1,
ttl_seconds=ttl,
size_bytes=size_bytes
Cache entry creation initializes comprehensive metadata for intelligent cache management. The timestamp tracking enables LRU eviction strategies, while size tracking supports memory limit enforcement essential for production data processing environments with large dataset caching requirements.
dataset_id=dataset_id,
data_quality_score=data_quality_score,
processing_cost=processing_cost,
rows_count=rows_count
)
# Evict if necessary before adding
self._evict_intelligently()
# Add new entry
self._cache[key] = entry
self._total_size_bytes += size_bytes
Finally, we add helper methods for size calculation, intelligent eviction, and data processing statistics:
def _calculate_size(self, obj: Any) -> int:
"""Estimate object size in bytes for data processing memory management."""
try:
# Use pickle for more accurate size estimation of data objects
return len(pickle.dumps(obj))
except:
# Fallback to JSON estimation
try:
return len(json.dumps(obj, default=str).encode('utf-8'))
except:
# Final fallback estimation
return len(str(obj)) * 2 # Rough estimate for Unicode
def _evict_intelligently(self) -> None:
"""Evict entries using intelligent priority scoring for data processing."""
while (len(self._cache) >= self.max_size or
self._total_size_bytes >= self.max_memory_bytes):
if not self._cache:
break
# Find entry with lowest priority score
lowest_priority_key = None
lowest_priority_score = float('inf')
Intelligent eviction algorithm scans all cache entries to find the lowest priority candidate for removal. The algorithm considers multiple factors including data quality, processing cost, and access patterns to optimize cache retention and maximize the value of cached data processing results.
for key, entry in self._cache.items():
priority_score = entry.get_priority_score()
if priority_score < lowest_priority_score:
lowest_priority_score = priority_score
lowest_priority_key = key
if lowest_priority_key:
entry = self._cache.pop(lowest_priority_key)
self._total_size_bytes -= entry.size_bytes
self._stats['evictions'] += 1
Finally, we provide comprehensive cache performance statistics for data processing:
def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive cache performance statistics for data processing."""
with self._lock:
total_requests = self._stats['hits'] + self._stats['misses']
hit_rate = (self._stats['hits'] / total_requests * 100) if total_requests > 0 else 0
# Calculate data processing specific metrics
total_rows = sum(entry.rows_count for entry in self._cache.values())
avg_quality_score = (
sum(entry.data_quality_score for entry in self._cache.values()) /
len(self._cache) if self._cache else 0.0
)
Comprehensive statistics provide detailed cache performance insights essential for optimization and capacity planning. Hit rate calculations enable cache tuning decisions, while data-specific metrics like cached row counts and quality scores help understand the business impact of caching strategies.
return {
'size': len(self._cache),
'max_size': self.max_size,
'memory_usage_mb': self._total_size_bytes / (1024 * 1024),
'hit_rate_percent': hit_rate,
'total_requests': total_requests,
'stats': dict(self._stats),
'total_cached_rows': total_rows,
'average_data_quality_score': avg_quality_score,
'cost_savings_total': self._stats['cost_savings']
}
Cache Decorator for Data Processing Agent Methods¶
A convenient decorator enables automatic caching of data processing agent method results with configurable TTL and cache key generation.
def cached_data_processing_method(
cache: IntelligentDataProcessingCache,
ttl_seconds: float = 7200, # 2 hours for data processing
key_generator: Optional[Callable] = None,
include_quality_score: bool = True,
processing_cost_estimator: Optional[Callable] = None
):
"""Decorator for caching data processing agent method results."""
The cache decorator enables transparent caching of data processing agent method results with configurable TTL and intelligent metadata extraction. The decorator supports custom key generation strategies and cost estimation functions, making it suitable for diverse data processing workflows with varying performance characteristics.
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key using custom generator or default hashing
if key_generator:
cache_key = key_generator(*args, **kwargs)
else:
# Create data processing specific cache key
key_data = f"{func.__name__}:{hash(str(args) + str(sorted(kwargs.items())))}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
Now we handle cache lookup and function execution for data processing:
# Try to get from cache first
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function and cache result with data processing metadata
result = await func(*args, **kwargs)
Cache-first lookup strategy maximizes performance by avoiding expensive data processing operations when results are already cached. The async function execution maintains compatibility with data processing workflows while enabling transparent caching without modifying agent method signatures.
# Extract data processing metadata if available
dataset_id = kwargs.get('dataset_id') or getattr(result, 'dataset_id', None)
rows_count = getattr(result, 'rows_processed', 0)
data_quality_score = getattr(result, 'data_quality_score', 1.0) if include_quality_score else 1.0
processing_cost = processing_cost_estimator(result) if processing_cost_estimator else 1.0
cache.set(
cache_key,
result,
ttl_seconds,
dataset_id=dataset_id,
data_quality_score=data_quality_score,
processing_cost=processing_cost,
rows_count=rows_count
)
return result
return wrapper
return decorator
Module Summary¶
You've now mastered testing and benchmarking for PydanticAI data processing systems, including:
โ Comprehensive Data Processing Testing: Built integration tests, error scenario testing, and load testing frameworks for data systems โ Data Processing Performance Monitoring: Implemented enterprise metrics collection with Prometheus integration optimized for data workloads โ Intelligent Data Processing Caching: Created high-performance caching with priority-based eviction and data quality tracking โ Production Data Processing Monitoring: Built observability with distributed tracing and structured logging for data systems
Next Steps¶
- Return to Core: Session 5 Main
- Advance to Session 6: Atomic Agents
- Review Previous Modules: Module A, Module B, Module C
Complete Session 5 Learning Path for Data Engineering¶
๐ฏ Core Section โ ๐ฌ Module A โ ๐ญ Module B โ ๐ง Module C โ ๐งช Module D
You've completed the comprehensive PydanticAI learning journey optimized for data engineering and processing systems!
๐๏ธ Source Files for Module D:
src/session5/testing_framework.py
- Complete testing infrastructure for data processing systemssrc/session5/monitoring.py
- Enterprise monitoring systems for data processingsrc/session5/caching.py
- Intelligent caching implementations for data processing workloads
๐ Multiple Choice Test - Session 5¶
Test your understanding of testing and benchmarking systems for data processing:
Question 1: What does the comprehensive data processing integration test framework validate?
A) Only basic functionality
B) Valid inputs, error scenarios, edge cases, performance under load, and data quality metrics
C) Simple unit tests only
D) Manual testing procedures
Question 2: How does the DataProcessingMetricsCollector track agent performance data?
A) Simple counters only
B) Comprehensive metrics with request counts, response times, error rates, data quality scores, and throughput
C) Binary success/failure tracking
D) Manual logging only
Question 3: What eviction strategy does the IntelligentDataProcessingCache use when memory limits are reached?
A) Random removal
B) Priority-based eviction with data quality, processing cost, and access pattern consideration
C) First-in-first-out only
D) Manual cache clearing
Question 4: What information does the data processing performance decorator capture for monitoring?
A) Just execution time
B) Request metrics, performance data, error tracking, data quality scores, and processing costs
C) Function names only
D) Memory usage only
Question 5: How does the load testing framework simulate realistic data processing usage patterns?
A) Single threaded execution
B) Concurrent user simulation with configurable load patterns, data processing scenarios, and throughput analysis
C) Random API calls
D) Simple sequential testing
๐งญ Navigation¶
Previous: Session 4 - Team Orchestration โ
Next: Session 6 - Modular Architecture โ