⚙️ Session 8 Module C: Performance & Production Validation¶
⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer and 📝 Participant paths in Session 8 Time Investment: 2-3 hours Outcome: Master performance optimization, cost management, and comprehensive production validation strategies
Advanced Learning Outcomes¶
After completing this module, you will master:
- Advanced performance optimization techniques for data processing agents
- Cost management and resource optimization strategies
- Comprehensive production validation and testing methodologies
- Complete assessment and testing frameworks for production systems
Advanced Performance Optimization¶
Circuit Breaker Patterns for Data Processing¶
When processing large volumes of data, circuit breakers prevent cascading failures by monitoring system health and automatically stopping requests when error rates exceed thresholds:
import asyncio
from enum import Enum
from datetime import datetime, timedelta
import logging
class CircuitBreakerState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking requests
HALF_OPEN = "half_open" # Testing if service recovered
class DataProcessingCircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitBreakerState.CLOSED
Implement circuit breaker logic for data processing operations:
async def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker."""
if self.state == CircuitBreakerState.OPEN:
# Check if recovery timeout has passed
if (datetime.now() - self.last_failure_time).seconds > self.recovery_timeout:
self.state = CircuitBreakerState.HALF_OPEN
logging.info("Circuit breaker moving to HALF_OPEN state")
else:
raise Exception("Circuit breaker is OPEN - service unavailable")
try:
# Execute the function
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
# Success - reset failure count if in HALF_OPEN
if self.state == CircuitBreakerState.HALF_OPEN:
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
logging.info("Circuit breaker reset to CLOSED state")
return result
Handle failures and state transitions:
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
# Check if threshold exceeded
if self.failure_count >= self.failure_threshold:
self.state = CircuitBreakerState.OPEN
logging.error(f"Circuit breaker opened after {self.failure_count} failures")
raise e
# Usage with data processing agent
class CircuitBreakerDataAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.circuit_breaker = DataProcessingCircuitBreaker()
async def safe_process(self, query: str):
"""Process data through circuit breaker."""
return await self.circuit_breaker.call(self.agent.arun, query)
Advanced Caching Strategies¶
Implement multi-tier caching for optimal performance in data processing workflows:
import hashlib
import json
from typing import Optional, Dict, Any
class MultiTierCacheManager:
def __init__(self):
self.memory_cache = {} # L1 cache
self.redis_cache = None # L2 cache
self.memory_cache_size = 1000
self.cache_ttl = {
"query_results": 3600, # 1 hour
"user_sessions": 1800, # 30 minutes
"data_patterns": 7200, # 2 hours
}
Implement intelligent cache key generation and retrieval:
def _generate_cache_key(self, prefix: str, data: Dict[str, Any]) -> str:
"""Generate consistent cache key from data."""
# Sort keys for consistent hashing
sorted_data = json.dumps(data, sort_keys=True)
hash_object = hashlib.md5(sorted_data.encode())
return f"{prefix}:{hash_object.hexdigest()}"
async def get_cached_result(self, cache_type: str, query_data: Dict[str, Any]) -> Optional[str]:
"""Retrieve from multi-tier cache."""
cache_key = self._generate_cache_key(cache_type, query_data)
# Check L1 cache (memory) first
if cache_key in self.memory_cache:
logging.debug(f"Cache hit (L1): {cache_key}")
return self.memory_cache[cache_key]["data"]
# Check L2 cache (Redis) if available
if self.redis_cache:
cached_data = await self.redis_cache.get(cache_key)
if cached_data:
logging.debug(f"Cache hit (L2): {cache_key}")
# Promote to L1 cache
self._store_in_memory_cache(cache_key, cached_data)
return cached_data
logging.debug(f"Cache miss: {cache_key}")
return None
Implement cache storage with TTL and eviction policies:
async def store_cached_result(self, cache_type: str, query_data: Dict[str, Any], result: str):
"""Store result in multi-tier cache."""
cache_key = self._generate_cache_key(cache_type, query_data)
ttl = self.cache_ttl.get(cache_type, 3600)
# Store in L1 cache (memory)
self._store_in_memory_cache(cache_key, result)
# Store in L2 cache (Redis) if available
if self.redis_cache:
await self.redis_cache.setex(cache_key, ttl, result)
def _store_in_memory_cache(self, key: str, data: str):
"""Store in memory cache with LRU eviction."""
if len(self.memory_cache) >= self.memory_cache_size:
# Simple LRU eviction - remove oldest entry
oldest_key = min(self.memory_cache.keys(),
key=lambda k: self.memory_cache[k]["timestamp"])
del self.memory_cache[oldest_key]
self.memory_cache[key] = {
"data": data,
"timestamp": datetime.now()
}
# Usage with data processing agent
class CachedDataAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.cache_manager = MultiTierCacheManager()
async def process_with_cache(self, query: str, session_id: str = None):
"""Process query with intelligent caching."""
query_data = {"query": query, "session_id": session_id}
# Check cache first
cached_result = await self.cache_manager.get_cached_result("query_results", query_data)
if cached_result:
return cached_result
# Process with agent
response = await self.agent.arun(query, session_id=session_id)
# Cache the result
await self.cache_manager.store_cached_result("query_results", query_data, response.content)
return response.content
Load Testing and Performance Benchmarking¶
Implement comprehensive load testing for production data processing agents:
import aiohttp
import asyncio
import time
from dataclasses import dataclass
from typing import List
import statistics
@dataclass
class LoadTestResult:
total_requests: int
successful_requests: int
failed_requests: int
avg_response_time: float
min_response_time: float
max_response_time: float
requests_per_second: float
percentile_95: float
percentile_99: float
class DataAgentLoadTester:
def __init__(self, base_url: str, auth_token: str = None):
self.base_url = base_url
self.auth_token = auth_token
self.results = []
Implement concurrent request execution with metrics collection:
async def execute_request(self, session: aiohttp.ClientSession, request_data: dict):
"""Execute single request and measure performance."""
start_time = time.time()
headers = {"Authorization": f"Bearer {self.auth_token}"} if self.auth_token else {}
headers["Content-Type"] = "application/json"
try:
async with session.post(
f"{self.base_url}/process-data",
json=request_data,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
await response.text() # Consume response
end_time = time.time()
return {
"success": response.status == 200,
"status_code": response.status,
"response_time": end_time - start_time,
"error": None
}
except Exception as e:
end_time = time.time()
return {
"success": False,
"status_code": 0,
"response_time": end_time - start_time,
"error": str(e)
}
Execute load test with configurable parameters:
async def run_load_test(self,
concurrent_users: int = 10,
requests_per_user: int = 10,
test_queries: List[str] = None) -> LoadTestResult:
"""Run comprehensive load test."""
if not test_queries:
test_queries = [
"Analyze customer behavior patterns",
"Generate sales report summary",
"Process transaction data analysis",
"Create data quality assessment"
]
# Prepare test data
test_requests = []
for user in range(concurrent_users):
for req in range(requests_per_user):
query = test_queries[req % len(test_queries)]
test_requests.append({
"data_query": query,
"pipeline_id": f"load_test_{user}_{req}"
})
# Execute load test
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [self.execute_request(session, req) for req in test_requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
# Process results
return self._analyze_results(results, end_time - start_time)
Analyze and report performance metrics:
def _analyze_results(self, results: List[dict], total_duration: float) -> LoadTestResult:
"""Analyze load test results and calculate metrics."""
successful_results = [r for r in results if isinstance(r, dict) and r["success"]]
failed_results = [r for r in results if isinstance(r, dict) and not r["success"]]
response_times = [r["response_time"] for r in successful_results]
if not response_times:
raise ValueError("No successful requests in load test")
# Calculate percentiles
sorted_times = sorted(response_times)
percentile_95 = sorted_times[int(0.95 * len(sorted_times))]
percentile_99 = sorted_times[int(0.99 * len(sorted_times))]
return LoadTestResult(
total_requests=len(results),
successful_requests=len(successful_results),
failed_requests=len(failed_results),
avg_response_time=statistics.mean(response_times),
min_response_time=min(response_times),
max_response_time=max(response_times),
requests_per_second=len(results) / total_duration,
percentile_95=percentile_95,
percentile_99=percentile_99
)
# Usage example
async def run_performance_test():
tester = DataAgentLoadTester("http://localhost:8000", "your-auth-token")
result = await tester.run_load_test(
concurrent_users=50,
requests_per_user=20
)
print(f"Load Test Results:")
print(f"Total Requests: {result.total_requests}")
print(f"Success Rate: {(result.successful_requests / result.total_requests) * 100:.2f}%")
print(f"Average Response Time: {result.avg_response_time:.3f}s")
print(f"95th Percentile: {result.percentile_95:.3f}s")
print(f"Requests/Second: {result.requests_per_second:.2f}")
Cost Optimization Strategies¶
Resource Usage Monitoring and Optimization¶
Implement comprehensive cost tracking and optimization for production data processing agents:
import psutil
import time
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class ResourceUsage:
timestamp: float
cpu_percent: float
memory_mb: float
disk_io_mb: float
network_io_mb: float
active_connections: int
class CostOptimizationManager:
def __init__(self):
self.usage_history: List[ResourceUsage] = []
self.cost_per_hour = {
"cpu_core": 0.05, # $0.05 per CPU core hour
"memory_gb": 0.01, # $0.01 per GB memory hour
"disk_io_gb": 0.001, # $0.001 per GB disk I/O
"network_gb": 0.001 # $0.001 per GB network I/O
}
Monitor and collect resource usage metrics:
def collect_resource_usage(self) -> ResourceUsage:
"""Collect current resource usage metrics."""
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
# Memory usage
memory = psutil.virtual_memory()
memory_mb = memory.used / (1024 * 1024)
# Disk I/O (simplified)
disk_io = psutil.disk_io_counters()
disk_io_mb = (disk_io.read_bytes + disk_io.write_bytes) / (1024 * 1024)
# Network I/O
network_io = psutil.net_io_counters()
network_io_mb = (network_io.bytes_sent + network_io.bytes_recv) / (1024 * 1024)
# Active connections (simplified)
connections = len(psutil.net_connections())
usage = ResourceUsage(
timestamp=time.time(),
cpu_percent=cpu_percent,
memory_mb=memory_mb,
disk_io_mb=disk_io_mb,
network_io_mb=network_io_mb,
active_connections=connections
)
self.usage_history.append(usage)
# Keep only last 24 hours of data
cutoff_time = time.time() - (24 * 3600)
self.usage_history = [u for u in self.usage_history if u.timestamp > cutoff_time]
return usage
Calculate costs and optimization recommendations:
def calculate_hourly_cost(self) -> Dict[str, float]:
"""Calculate estimated hourly costs based on usage."""
if not self.usage_history:
return {"total": 0.0}
# Get average usage over last hour
recent_usage = [u for u in self.usage_history if u.timestamp > (time.time() - 3600)]
if not recent_usage:
return {"total": 0.0}
avg_cpu = sum(u.cpu_percent for u in recent_usage) / len(recent_usage)
avg_memory_gb = sum(u.memory_mb for u in recent_usage) / len(recent_usage) / 1024
avg_disk_io_gb = sum(u.disk_io_mb for u in recent_usage) / len(recent_usage) / 1024
avg_network_gb = sum(u.network_io_mb for u in recent_usage) / len(recent_usage) / 1024
# Calculate costs (assuming 8 CPU cores available)
cpu_cores_used = (avg_cpu / 100) * 8
costs = {
"cpu": cpu_cores_used * self.cost_per_hour["cpu_core"],
"memory": avg_memory_gb * self.cost_per_hour["memory_gb"],
"disk_io": avg_disk_io_gb * self.cost_per_hour["disk_io_gb"],
"network_io": avg_network_gb * self.cost_per_hour["network_gb"]
}
costs["total"] = sum(costs.values())
return costs
def get_optimization_recommendations(self) -> List[str]:
"""Generate cost optimization recommendations."""
recommendations = []
if not self.usage_history:
return recommendations
recent_usage = [u for u in self.usage_history if u.timestamp > (time.time() - 3600)]
if not recent_usage:
return recommendations
avg_cpu = sum(u.cpu_percent for u in recent_usage) / len(recent_usage)
avg_memory_gb = sum(u.memory_mb for u in recent_usage) / len(recent_usage) / 1024
# CPU optimization recommendations
if avg_cpu < 20:
recommendations.append("CPU usage is low - consider reducing instance size")
elif avg_cpu > 80:
recommendations.append("CPU usage is high - consider scaling horizontally or increasing instance size")
# Memory optimization recommendations
if avg_memory_gb < 2:
recommendations.append("Memory usage is low - consider smaller instance type")
elif avg_memory_gb > 14: # Assuming 16GB instances
recommendations.append("Memory usage is high - consider memory optimization or larger instances")
return recommendations
Integration with data processing agent for cost tracking:
class CostOptimizedDataAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.cost_manager = CostOptimizationManager()
self.processing_count = 0
async def process_with_cost_tracking(self, query: str):
"""Process query while tracking resource costs."""
# Collect pre-processing usage
pre_usage = self.cost_manager.collect_resource_usage()
start_time = time.time()
try:
# Process with agent
response = await self.agent.arun(query)
# Collect post-processing usage
post_usage = self.cost_manager.collect_resource_usage()
processing_time = time.time() - start_time
# Log cost information
self.processing_count += 1
if self.processing_count % 10 == 0: # Log every 10 requests
costs = self.cost_manager.calculate_hourly_cost()
recommendations = self.cost_manager.get_optimization_recommendations()
logging.info(f"Cost Analysis - Hourly estimate: ${costs['total']:.4f}")
for rec in recommendations:
logging.info(f"Optimization: {rec}")
return response.content
except Exception as e:
# Still collect usage data on errors
self.cost_manager.collect_resource_usage()
raise
Comprehensive Test Suite for Production Systems¶
Create a complete testing framework that validates all aspects of production readiness:
import unittest
import asyncio
from typing import Dict, List, Any
class ProductionReadinessTestSuite:
def __init__(self, agent: Agent, base_url: str = None):
self.agent = agent
self.base_url = base_url or "http://localhost:8000"
self.test_results: Dict[str, Any] = {}
Implement comprehensive functionality tests:
async def test_basic_functionality(self) -> Dict[str, bool]:
"""Test basic agent functionality."""
tests = {
"agent_responds": False,
"handles_simple_query": False,
"handles_complex_query": False,
"maintains_session": False,
"error_handling": False
}
try:
# Test 1: Basic response
response = await self.agent.arun("Hello, how are you?")
tests["agent_responds"] = bool(response.content)
# Test 2: Simple data query
response = await self.agent.arun("What is data processing?")
tests["handles_simple_query"] = len(response.content) > 50
# Test 3: Complex data query
complex_query = "Analyze the relationship between customer demographics and purchase patterns in e-commerce data"
response = await self.agent.arun(complex_query)
tests["handles_complex_query"] = len(response.content) > 100
# Test 4: Session maintenance
session_id = "test_session_123"
await self.agent.arun("Remember my name is Alice", session_id=session_id)
response = await self.agent.arun("What is my name?", session_id=session_id)
tests["maintains_session"] = "Alice" in response.content or "alice" in response.content.lower()
# Test 5: Error handling
try:
await self.agent.arun("", session_id="test") # Empty query
tests["error_handling"] = True # Should handle gracefully
except Exception:
tests["error_handling"] = True # Expected behavior
except Exception as e:
logging.error(f"Functionality test failed: {e}")
return tests
Test security and authentication features:
async def test_security_features(self) -> Dict[str, bool]:
"""Test security and authentication features."""
tests = {
"authentication_required": False,
"invalid_token_rejected": False,
"rate_limiting_works": False,
"input_validation": False,
"no_sensitive_data_leakage": False
}
if not self.base_url:
return tests # Skip if no API endpoint
try:
import aiohttp
async with aiohttp.ClientSession() as session:
# Test 1: Authentication required
async with session.post(f"{self.base_url}/process-data",
json={"data_query": "test", "pipeline_id": "test"}) as response:
tests["authentication_required"] = response.status == 401
# Test 2: Invalid token rejection
headers = {"Authorization": "Bearer invalid_token"}
async with session.post(f"{self.base_url}/process-data",
json={"data_query": "test", "pipeline_id": "test"},
headers=headers) as response:
tests["invalid_token_rejected"] = response.status == 401
# Test 3: Input validation
async with session.post(f"{self.base_url}/process-data",
json={"invalid_field": "test"}) as response:
tests["input_validation"] = response.status == 422 # Validation error
except Exception as e:
logging.error(f"Security test failed: {e}")
return tests
Performance and scalability testing:
async def test_performance_requirements(self) -> Dict[str, bool]:
"""Test performance and scalability requirements."""
tests = {
"response_time_acceptable": False,
"handles_concurrent_requests": False,
"memory_usage_reasonable": False,
"no_memory_leaks": False
}
try:
# Test 1: Response time
start_time = time.time()
await self.agent.arun("Quick test query")
response_time = time.time() - start_time
tests["response_time_acceptable"] = response_time < 5.0 # 5 second threshold
# Test 2: Concurrent requests
async def concurrent_request():
return await self.agent.arun(f"Test concurrent query {time.time()}")
concurrent_tasks = [concurrent_request() for _ in range(5)]
responses = await asyncio.gather(*concurrent_tasks, return_exceptions=True)
successful_responses = [r for r in responses if not isinstance(r, Exception)]
tests["handles_concurrent_requests"] = len(successful_responses) >= 4 # 80% success rate
# Test 3: Memory usage
import psutil
process = psutil.Process()
memory_before = process.memory_info().rss
# Process several queries
for i in range(10):
await self.agent.arun(f"Memory test query {i}")
memory_after = process.memory_info().rss
memory_increase_mb = (memory_after - memory_before) / (1024 * 1024)
tests["memory_usage_reasonable"] = memory_increase_mb < 100 # Less than 100MB increase
tests["no_memory_leaks"] = memory_increase_mb < 50 # Less than 50MB for no leaks
except Exception as e:
logging.error(f"Performance test failed: {e}")
return tests
Execute complete test suite and generate report:
async def run_complete_assessment(self) -> Dict[str, Any]:
"""Run complete production readiness assessment."""
print("🔍 Running Production Readiness Assessment...")
# Run all test categories
functionality_results = await self.test_basic_functionality()
security_results = await self.test_security_features()
performance_results = await self.test_performance_requirements()
# Combine results
all_results = {
"functionality": functionality_results,
"security": security_results,
"performance": performance_results
}
# Calculate scores
scores = {}
total_passed = 0
total_tests = 0
for category, tests in all_results.items():
passed = sum(1 for result in tests.values() if result)
total = len(tests)
scores[category] = {
"passed": passed,
"total": total,
"percentage": (passed / total) * 100 if total > 0 else 0
}
total_passed += passed
total_tests += total
overall_score = (total_passed / total_tests) * 100 if total_tests > 0 else 0
# Generate report
self._print_assessment_report(all_results, scores, overall_score)
return {
"results": all_results,
"scores": scores,
"overall_score": overall_score,
"production_ready": overall_score >= 85
}
Generate detailed assessment report:
def _print_assessment_report(self, results: Dict[str, Dict[str, bool]],
scores: Dict[str, Dict[str, Any]], overall_score: float):
"""Print detailed assessment report."""
print("
" + "="*60)
print("📊 PRODUCTION READINESS ASSESSMENT REPORT")
print("="*60)
for category, tests in results.items():
category_score = scores[category]
print(f"
🔸 {category.upper()}: {category_score['passed']}/{category_score['total']} ({category_score['percentage']:.1f}%)")
for test_name, passed in tests.items():
status = "✅ PASS" if passed else "❌ FAIL"
print(f" {status} {test_name.replace('_', ' ').title()}")
print(f"
🎯 OVERALL SCORE: {overall_score:.1f}%")
if overall_score >= 85:
print("🎉 PRODUCTION READY! System meets production requirements.")
elif overall_score >= 70:
print("⚠️ NEEDS IMPROVEMENT: Address failing tests before production.")
else:
print("🚫 NOT PRODUCTION READY: Significant issues need resolution.")
print("="*60)
# Usage example
async def run_production_assessment():
# Create test suite
test_suite = ProductionReadinessTestSuite(
agent=data_production_agent,
base_url="http://localhost:8000"
)
# Run complete assessment
results = await test_suite.run_complete_assessment()
# Save results for reporting
with open("production_assessment_report.json", "w") as f:
import json
json.dump(results, f, indent=2)
📝 Multiple Choice Test - Session 8¶
🧭 Navigation¶
Previous: Session 7 - Agent Systems →
Next: Session 9 - Multi-Agent Coordination →