⚙️ Session 4 Advanced: Enterprise Resilience Patterns - Bulletproof Production Systems¶
⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer and 📝 Participant paths Time Investment: 3-4 hours Outcome: Master enterprise resilience patterns including circuit breakers, chaos engineering, and production resilience testing
Advanced Learning Outcomes¶
After completing this module, you will master:
- Circuit breaker patterns for cascade failure prevention
- Chaos engineering principles for proactive resilience testing
- Blue-green deployment strategies for zero-downtime updates
- Production load testing frameworks for capacity validation
Circuit Breaker Pattern Implementation¶
The Foundation of Resilient Systems¶
Circuit breakers are your first line of defense against cascade failures in production systems. They prevent a failing service from bringing down your entire system by automatically detecting failures and temporarily blocking requests to failing services.
Here's a comprehensive circuit breaker implementation designed for production MCP servers:
# resilience/circuit_breaker.py - Production Circuit Breaker Implementation
import asyncio
import time
import logging
from enum import Enum
from typing import Callable, Any, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
# Production logging setup
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "closed" # Normal operation, monitoring for failures
OPEN = "open" # Blocking requests, service is failing
HALF_OPEN = "half_open" # Testing recovery, limited requests allowed
@dataclass
class CircuitBreakerMetrics:
"""Comprehensive metrics for circuit breaker monitoring."""
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
timeouts: int = 0
circuit_opens: int = 0
circuit_closes: int = 0
last_failure_time: Optional[datetime] = None
last_success_time: Optional[datetime] = None
consecutive_failures: int = 0
consecutive_successes: int = 0
Advanced Circuit Breaker Implementation¶
class ProductionCircuitBreaker:
"""
Production Circuit Breaker: Your Defense Against Cascade Failures
This implementation provides:
- Intelligent failure detection with sliding time windows
- Exponential backoff for recovery attempts
- Comprehensive metrics and logging
- Configurable fallback response generation
- Integration with monitoring systems
"""
def __init__(
self,
name: str,
failure_threshold: int = 5,
recovery_timeout: int = 60,
success_threshold: int = 3,
timeout_duration: float = 10.0,
monitoring_window: int = 300 # 5 minutes
):
self.name = name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.timeout_duration = timeout_duration
self.monitoring_window = monitoring_window
self.state = CircuitState.CLOSED
self.metrics = CircuitBreakerMetrics()
self.last_failure_time = None
self.failure_count = 0
self.success_count = 0
# Failure tracking with sliding window
self.recent_failures: List[datetime] = []
# Prometheus metrics integration
self._setup_metrics()
def _setup_metrics(self):
"""Initialize Prometheus metrics for comprehensive monitoring."""
from prometheus_client import Counter, Gauge, Histogram
self.request_counter = Counter(
'circuit_breaker_requests_total',
'Total requests through circuit breaker',
['circuit_name', 'state', 'outcome']
)
self.state_gauge = Gauge(
'circuit_breaker_state',
'Circuit breaker state (0=closed, 1=open, 2=half_open)',
['circuit_name']
)
self.failure_rate = Gauge(
'circuit_breaker_failure_rate',
'Current failure rate percentage',
['circuit_name']
)
Core Circuit Breaker Logic¶
The main call method orchestrates all circuit breaker functionality:
async def call(self, operation: Callable, *args, **kwargs) -> Any:
"""
Execute operation with circuit breaker protection.
This method handles:
- State management (CLOSED, OPEN, HALF_OPEN)
- Failure detection and counting
- Automatic recovery attempts
- Fallback response generation
- Comprehensive metrics collection
"""
# Update state based on current conditions
await self._update_state()
# Handle different circuit states
if self.state == CircuitState.OPEN:
return await self._handle_open_circuit(operation)
elif self.state == CircuitState.HALF_OPEN:
return await self._handle_half_open_circuit(operation, *args, **kwargs)
else: # CLOSED state
return await self._handle_closed_circuit(operation, *args, **kwargs)
async def _handle_closed_circuit(self, operation: Callable, *args, **kwargs) -> Any:
"""Handle requests when circuit is closed (normal operation)."""
try:
# Execute operation with timeout protection
result = await asyncio.wait_for(
operation(*args, **kwargs),
timeout=self.timeout_duration
)
# Record successful execution
await self._record_success()
self.request_counter.labels(
circuit_name=self.name,
state='closed',
outcome='success'
).inc()
logger.debug(f"Circuit breaker {self.name}: Successful execution")
return result
except asyncio.TimeoutError:
await self._record_timeout()
raise CircuitBreakerTimeoutError(
f"Operation timed out after {self.timeout_duration}s"
)
except Exception as e:
await self._record_failure(e)
raise
Handle open circuit state with intelligent fallback:
async def _handle_open_circuit(self, operation: Callable) -> Any:
"""Handle requests when circuit is open (blocking requests)."""
self.request_counter.labels(
circuit_name=self.name,
state='open',
outcome='blocked'
).inc()
logger.warning(
f"Circuit breaker {self.name}: Request blocked - circuit is OPEN",
failure_count=self.failure_count,
last_failure=self.last_failure_time
)
# Generate intelligent fallback response
fallback_response = await self._generate_fallback_response(operation)
if fallback_response is not None:
return fallback_response
# No fallback available - raise circuit breaker exception
raise CircuitBreakerOpenError(
f"Circuit breaker {self.name} is OPEN. "
f"Service has failed {self.failure_count} times. "
f"Next retry in {self._time_until_retry()}s"
)
async def _handle_half_open_circuit(self, operation: Callable, *args, **kwargs) -> Any:
"""Handle requests when circuit is half-open (testing recovery)."""
try:
# Allow limited requests to test service recovery
logger.info(f"Circuit breaker {self.name}: Testing service recovery")
result = await asyncio.wait_for(
operation(*args, **kwargs),
timeout=self.timeout_duration
)
# Success in half-open state
await self._record_recovery_success()
self.request_counter.labels(
circuit_name=self.name,
state='half_open',
outcome='success'
).inc()
return result
except Exception as e:
# Failure in half-open state - back to open
await self._record_recovery_failure(e)
raise
State Management and Recovery Logic¶
Intelligent state transitions based on failure patterns:
async def _update_state(self):
"""Update circuit breaker state based on current conditions."""
current_time = datetime.now()
# Clean old failures from sliding window
self._clean_old_failures()
if self.state == CircuitState.CLOSED:
# Check if we should open the circuit
if self._should_open_circuit():
await self._open_circuit()
elif self.state == CircuitState.OPEN:
# Check if we should attempt recovery
if self._should_attempt_recovery():
await self._transition_to_half_open()
elif self.state == CircuitState.HALF_OPEN:
# Circuit will close automatically on sufficient successes
# or open automatically on any failure
pass
def _should_open_circuit(self) -> bool:
"""Determine if circuit should be opened based on failure patterns."""
# Multiple failure detection strategies
# Strategy 1: Consecutive failures
if self.metrics.consecutive_failures >= self.failure_threshold:
logger.warning(
f"Circuit breaker {self.name}: Opening due to consecutive failures",
consecutive_failures=self.metrics.consecutive_failures,
threshold=self.failure_threshold
)
return True
# Strategy 2: Failure rate in sliding window
if len(self.recent_failures) >= self.failure_threshold:
window_start = datetime.now() - timedelta(seconds=self.monitoring_window)
recent_failure_count = sum(
1 for failure_time in self.recent_failures
if failure_time >= window_start
)
if recent_failure_count >= self.failure_threshold:
failure_rate = (recent_failure_count / self.metrics.total_requests) * 100
logger.warning(
f"Circuit breaker {self.name}: Opening due to high failure rate",
failure_rate=f"{failure_rate:.2f}%",
recent_failures=recent_failure_count,
threshold=self.failure_threshold
)
return True
return False
def _should_attempt_recovery(self) -> bool:
"""Check if enough time has passed to attempt recovery."""
if self.last_failure_time is None:
return True
time_since_failure = (datetime.now() - self.last_failure_time).total_seconds()
# Exponential backoff for recovery attempts
backoff_time = self.recovery_timeout * (2 ** min(self.metrics.circuit_opens - 1, 5))
return time_since_failure >= backoff_time
Metrics and State Recording¶
Comprehensive tracking for observability and debugging:
async def _record_success(self):
"""Record successful operation execution."""
self.metrics.successful_requests += 1
self.metrics.total_requests += 1
self.metrics.consecutive_successes += 1
self.metrics.consecutive_failures = 0
self.metrics.last_success_time = datetime.now()
# Reset success counter in half-open state
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
# Close circuit if enough successes
if self.success_count >= self.success_threshold:
await self._close_circuit()
async def _record_failure(self, exception: Exception):
"""Record failed operation execution."""
current_time = datetime.now()
self.metrics.failed_requests += 1
self.metrics.total_requests += 1
self.metrics.consecutive_failures += 1
self.metrics.consecutive_successes = 0
self.metrics.last_failure_time = current_time
# Track recent failures for sliding window analysis
self.recent_failures.append(current_time)
# Update failure rate metrics
if self.metrics.total_requests > 0:
failure_rate = (self.metrics.failed_requests / self.metrics.total_requests) * 100
self.failure_rate.labels(circuit_name=self.name).set(failure_rate)
self.request_counter.labels(
circuit_name=self.name,
state=self.state.value,
outcome='failure'
).inc()
logger.error(
f"Circuit breaker {self.name}: Operation failed",
error=str(exception),
consecutive_failures=self.metrics.consecutive_failures,
total_failures=self.metrics.failed_requests
)
async def _record_timeout(self):
"""Record timeout as a special type of failure."""
self.metrics.timeouts += 1
await self._record_failure(asyncio.TimeoutError("Operation timeout"))
async def _open_circuit(self):
"""Transition circuit to OPEN state."""
self.state = CircuitState.OPEN
self.metrics.circuit_opens += 1
self.last_failure_time = datetime.now()
self.state_gauge.labels(circuit_name=self.name).set(1)
logger.error(
f"Circuit breaker {self.name}: Circuit OPENED",
total_failures=self.metrics.failed_requests,
failure_rate=f"{(self.metrics.failed_requests / max(self.metrics.total_requests, 1)) * 100:.2f}%"
)
async def _close_circuit(self):
"""Transition circuit to CLOSED state."""
self.state = CircuitState.CLOSED
self.metrics.circuit_closes += 1
self.failure_count = 0
self.success_count = 0
self.state_gauge.labels(circuit_name=self.name).set(0)
logger.info(
f"Circuit breaker {self.name}: Circuit CLOSED - Service recovered",
recovery_successes=self.metrics.consecutive_successes
)
async def _transition_to_half_open(self):
"""Transition circuit to HALF_OPEN state."""
self.state = CircuitState.HALF_OPEN
self.success_count = 0
self.state_gauge.labels(circuit_name=self.name).set(2)
logger.info(
f"Circuit breaker {self.name}: Circuit HALF_OPEN - Testing recovery",
time_since_failure=self._time_since_last_failure()
)
Fallback Response Generation¶
Intelligent fallback responses for different operation types:
async def _generate_fallback_response(self, operation: Callable) -> Optional[Any]:
"""Generate intelligent fallback responses based on operation type."""
# Check if operation has custom fallback
if hasattr(operation, '__circuit_breaker_fallback__'):
try:
return await operation.__circuit_breaker_fallback__()
except Exception as e:
logger.warning(f"Fallback function failed: {e}")
# Generate default fallbacks based on operation name/type
operation_name = getattr(operation, '__name__', 'unknown')
if 'health' in operation_name.lower():
return {
"status": "degraded",
"message": "Service temporarily unavailable",
"circuit_breaker": "open",
"timestamp": datetime.now().isoformat()
}
elif 'process' in operation_name.lower() or 'data' in operation_name.lower():
return {
"error": "Service temporarily unavailable",
"fallback": True,
"retry_after": self._time_until_retry(),
"circuit_breaker_status": self.state.value
}
# No suitable fallback
return None
def _time_until_retry(self) -> int:
"""Calculate seconds until next retry attempt."""
if self.last_failure_time is None:
return 0
time_since_failure = (datetime.now() - self.last_failure_time).total_seconds()
backoff_time = self.recovery_timeout * (2 ** min(self.metrics.circuit_opens - 1, 5))
return max(0, int(backoff_time - time_since_failure))
def _clean_old_failures(self):
"""Remove failures outside the monitoring window."""
cutoff_time = datetime.now() - timedelta(seconds=self.monitoring_window)
self.recent_failures = [
failure_time for failure_time in self.recent_failures
if failure_time >= cutoff_time
]
Chaos Engineering for Production Resilience¶
Proactive Resilience Testing¶
Chaos engineering is the practice of intentionally introducing failures into your production system to identify weaknesses before they cause actual outages. Here's how to implement safe, controlled chaos testing:
# chaos/chaos_engineer.py - Production Chaos Engineering Framework
import asyncio
import random
import logging
from typing import List, Dict, Callable, Any
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class ChaosExperimentType(Enum):
NETWORK_LATENCY = "network_latency"
SERVICE_FAILURE = "service_failure"
RESOURCE_EXHAUSTION = "resource_exhaustion"
DEPENDENCY_FAILURE = "dependency_failure"
@dataclass
class ChaosExperiment:
"""Definition of a chaos engineering experiment."""
name: str
experiment_type: ChaosExperimentType
target_service: str
duration_seconds: int
intensity: float # 0.0 to 1.0
conditions: Dict[str, Any]
safety_checks: List[Callable]
rollback_plan: Callable
class ProductionChaosEngineer:
"""
Production Chaos Engineering: Controlled Failure Introduction
This system provides:
- Safe, controlled failure introduction
- Comprehensive safety checks and rollback mechanisms
- Real-time monitoring and automatic experiment termination
- Detailed experiment reporting and analysis
"""
def __init__(self, monitoring_system, circuit_breakers: Dict[str, ProductionCircuitBreaker]):
self.monitoring_system = monitoring_system
self.circuit_breakers = circuit_breakers
self.active_experiments: List[ChaosExperiment] = []
self.experiment_results: List[Dict] = []
async def run_experiment(self, experiment: ChaosExperiment) -> Dict[str, Any]:
"""Execute a controlled chaos engineering experiment."""
experiment_id = f"{experiment.name}_{int(time.time())}"
start_time = datetime.now()
logger.info(
f"Starting chaos experiment: {experiment.name}",
experiment_id=experiment_id,
target=experiment.target_service,
duration=experiment.duration_seconds
)
# Pre-experiment safety checks
if not await self._safety_checks_pass(experiment):
logger.error(f"Safety checks failed for experiment {experiment.name}")
return {"status": "aborted", "reason": "safety_checks_failed"}
# Record baseline metrics
baseline_metrics = await self._collect_baseline_metrics(experiment)
try:
# Start the experiment
self.active_experiments.append(experiment)
experiment_task = asyncio.create_task(
self._execute_experiment(experiment, experiment_id)
)
# Monitor experiment progress
monitoring_task = asyncio.create_task(
self._monitor_experiment(experiment, experiment_id)
)
# Wait for experiment completion or early termination
done, pending = await asyncio.wait(
[experiment_task, monitoring_task],
return_when=asyncio.FIRST_COMPLETED
)
# Cancel remaining tasks
for task in pending:
task.cancel()
# Collect results
end_time = datetime.now()
final_metrics = await self._collect_final_metrics(experiment)
result = {
"experiment_id": experiment_id,
"name": experiment.name,
"status": "completed",
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration": (end_time - start_time).total_seconds(),
"baseline_metrics": baseline_metrics,
"final_metrics": final_metrics,
"impact_analysis": self._analyze_impact(baseline_metrics, final_metrics)
}
self.experiment_results.append(result)
return result
except Exception as e:
logger.error(f"Experiment {experiment.name} failed", error=str(e))
await self._emergency_rollback(experiment)
return {
"experiment_id": experiment_id,
"status": "failed",
"error": str(e),
"duration": (datetime.now() - start_time).total_seconds()
}
finally:
# Cleanup
if experiment in self.active_experiments:
self.active_experiments.remove(experiment)
# Execute rollback plan
await experiment.rollback_plan()
Specific Chaos Experiments¶
Implementation of different types of chaos experiments:
async def _execute_network_latency_experiment(
self,
experiment: ChaosExperiment,
experiment_id: str
):
"""Introduce network latency to test timeout handling."""
target_delay = experiment.intensity * 5.0 # Max 5 second delay
# Monkey patch network calls to add latency
original_aiohttp_request = aiohttp.ClientSession._request
async def delayed_request(self, method, url, **kwargs):
# Add random latency
delay = random.uniform(0, target_delay)
await asyncio.sleep(delay)
return await original_aiohttp_request(self, method, url, **kwargs)
# Apply the chaos
aiohttp.ClientSession._request = delayed_request
logger.info(
f"Network latency experiment active",
experiment_id=experiment_id,
max_delay=target_delay
)
# Run for specified duration
await asyncio.sleep(experiment.duration_seconds)
# Restore original behavior
aiohttp.ClientSession._request = original_aiohttp_request
logger.info(f"Network latency experiment completed", experiment_id=experiment_id)
async def _execute_service_failure_experiment(
self,
experiment: ChaosExperiment,
experiment_id: str
):
"""Simulate service failures to test circuit breaker behavior."""
failure_rate = experiment.intensity # 0.0 to 1.0
target_service = experiment.target_service
if target_service not in self.circuit_breakers:
logger.error(f"No circuit breaker found for {target_service}")
return
circuit_breaker = self.circuit_breakers[target_service]
# Override circuit breaker to inject failures
original_call = circuit_breaker.call
async def failing_call(operation, *args, **kwargs):
if random.random() < failure_rate:
logger.debug(f"Chaos: Injecting failure in {target_service}")
raise Exception(f"Chaos experiment failure: {experiment_id}")
return await original_call(operation, *args, **kwargs)
circuit_breaker.call = failing_call
logger.info(
f"Service failure experiment active",
experiment_id=experiment_id,
failure_rate=f"{failure_rate * 100:.1f}%"
)
await asyncio.sleep(experiment.duration_seconds)
# Restore original behavior
circuit_breaker.call = original_call
logger.info(f"Service failure experiment completed", experiment_id=experiment_id)
Blue-Green Deployment Strategy¶
Zero-Downtime Production Updates¶
Blue-green deployment is a technique that reduces downtime and risk by running two identical production environments called Blue and Green:
# deployment/blue_green.py - Zero-Downtime Deployment System
import asyncio
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import aiohttp
import time
logger = logging.getLogger(__name__)
class DeploymentEnvironment(Enum):
BLUE = "blue"
GREEN = "green"
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class EnvironmentHealth:
"""Health status of a deployment environment."""
environment: DeploymentEnvironment
status: HealthStatus
response_time: float
error_rate: float
last_check: datetime
details: Dict[str, Any]
class BlueGreenDeploymentManager:
"""
Blue-Green Deployment: Zero-Downtime Production Updates
This system manages:
- Dual environment orchestration
- Health validation and traffic switching
- Automatic rollback on deployment failure
- Comprehensive deployment monitoring
"""
def __init__(
self,
blue_endpoint: str,
green_endpoint: str,
load_balancer_api: str,
health_check_path: str = "/health"
):
self.blue_endpoint = blue_endpoint
self.green_endpoint = green_endpoint
self.load_balancer_api = load_balancer_api
self.health_check_path = health_check_path
self.current_active = DeploymentEnvironment.BLUE
self.deployment_in_progress = False
async def deploy_new_version(
self,
version: str,
deployment_artifacts: Dict[str, Any]
) -> Dict[str, Any]:
"""Deploy new version using blue-green strategy."""
if self.deployment_in_progress:
raise Exception("Deployment already in progress")
self.deployment_in_progress = True
deployment_start = time.time()
# Determine target environment
target_env = (DeploymentEnvironment.GREEN
if self.current_active == DeploymentEnvironment.BLUE
else DeploymentEnvironment.BLUE)
logger.info(
f"Starting blue-green deployment",
version=version,
current_active=self.current_active.value,
target_environment=target_env.value
)
try:
# Phase 1: Deploy to target environment
await self._deploy_to_environment(target_env, version, deployment_artifacts)
# Phase 2: Health validation
health_check_passed = await self._validate_environment_health(
target_env,
required_checks=5,
timeout_minutes=5
)
if not health_check_passed:
raise Exception(f"Health checks failed for {target_env.value} environment")
# Phase 3: Gradual traffic shift
await self._perform_gradual_traffic_shift(target_env)
# Phase 4: Final validation
final_health = await self._validate_environment_health(
target_env,
required_checks=3,
timeout_minutes=2
)
if not final_health:
# Rollback immediately
await self._rollback_traffic()
raise Exception("Final health check failed - deployment rolled back")
# Success - update active environment
old_active = self.current_active
self.current_active = target_env
# Phase 5: Cleanup old environment
await self._cleanup_old_environment(old_active)
deployment_time = time.time() - deployment_start
result = {
"status": "success",
"version": version,
"old_environment": old_active.value,
"new_environment": target_env.value,
"deployment_time": deployment_time,
"rollback_available": True
}
logger.info(
"Blue-green deployment completed successfully",
**result
)
return result
except Exception as e:
logger.error(f"Deployment failed: {e}")
# Emergency rollback
await self._emergency_rollback(target_env)
return {
"status": "failed",
"error": str(e),
"deployment_time": time.time() - deployment_start,
"rollback_completed": True
}
finally:
self.deployment_in_progress = False
Traffic Shifting and Health Validation¶
The core logic for safe traffic transitions:
async def _perform_gradual_traffic_shift(self, target_env: DeploymentEnvironment) -> None:
"""Perform gradual traffic shift with monitoring."""
# Traffic shift percentages
shift_stages = [10, 25, 50, 75, 100]
for percentage in shift_stages:
logger.info(f"Shifting {percentage}% traffic to {target_env.value}")
# Update load balancer configuration
await self._update_load_balancer_weights(target_env, percentage)
# Monitor for issues
await asyncio.sleep(30) # Let traffic stabilize
# Health check after traffic shift
health = await self._check_environment_health(target_env)
if health.status != HealthStatus.HEALTHY:
logger.error(
f"Health degraded at {percentage}% traffic",
error_rate=health.error_rate,
response_time=health.response_time
)
# Rollback to previous percentage
await self._rollback_traffic()
raise Exception(f"Health degraded during traffic shift at {percentage}%")
logger.info(
f"Traffic shift to {percentage}% successful",
response_time=health.response_time,
error_rate=health.error_rate
)
async def _validate_environment_health(
self,
environment: DeploymentEnvironment,
required_checks: int,
timeout_minutes: int
) -> bool:
"""Validate environment health with multiple checks."""
endpoint = (self.blue_endpoint if environment == DeploymentEnvironment.BLUE
else self.green_endpoint)
successful_checks = 0
start_time = time.time()
timeout_seconds = timeout_minutes * 60
while successful_checks < required_checks:
if time.time() - start_time > timeout_seconds:
logger.error(
f"Health validation timeout for {environment.value}",
successful_checks=successful_checks,
required_checks=required_checks
)
return False
try:
health = await self._check_environment_health(environment)
if health.status == HealthStatus.HEALTHY:
successful_checks += 1
logger.info(
f"Health check {successful_checks}/{required_checks} passed",
environment=environment.value,
response_time=health.response_time
)
else:
# Reset counter on failure
successful_checks = 0
logger.warning(
f"Health check failed for {environment.value}",
status=health.status.value,
error_rate=health.error_rate
)
except Exception as e:
successful_checks = 0
logger.error(f"Health check error: {e}")
await asyncio.sleep(10) # Wait between checks
return True
async def _check_environment_health(self, environment: DeploymentEnvironment) -> EnvironmentHealth:
"""Check health of specific environment."""
endpoint = (self.blue_endpoint if environment == DeploymentEnvironment.BLUE
else self.green_endpoint)
start_time = time.time()
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{endpoint}{self.health_check_path}",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response_time = time.time() - start_time
if response.status == 200:
health_data = await response.json()
# Parse health details
status = HealthStatus.HEALTHY
if health_data.get("status") == "degraded":
status = HealthStatus.DEGRADED
return EnvironmentHealth(
environment=environment,
status=status,
response_time=response_time,
error_rate=0.0, # Calculate from metrics
last_check=datetime.now(),
details=health_data
)
else:
return EnvironmentHealth(
environment=environment,
status=HealthStatus.UNHEALTHY,
response_time=response_time,
error_rate=1.0,
last_check=datetime.now(),
details={"http_status": response.status}
)
except Exception as e:
return EnvironmentHealth(
environment=environment,
status=HealthStatus.UNHEALTHY,
response_time=time.time() - start_time,
error_rate=1.0,
last_check=datetime.now(),
details={"error": str(e)}
)
Production Load Testing Framework¶
Capacity Validation and SLA Testing¶
Production systems need to be validated under realistic load conditions. Here's a comprehensive load testing framework:
# testing/load_tester.py - Production Load Testing Framework
import asyncio
import aiohttp
import time
import statistics
from typing import List, Dict, Callable, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
import logging
logger = logging.getLogger(__name__)
@dataclass
class LoadTestResult:
"""Comprehensive load test results."""
test_name: str
duration_seconds: float
total_requests: int
successful_requests: int
failed_requests: int
timeout_requests: int
requests_per_second: float
average_response_time: float
p50_response_time: float
p95_response_time: float
p99_response_time: float
error_rate: float
throughput_mbps: float
response_times: List[float] = field(default_factory=list)
errors: List[Dict] = field(default_factory=list)
class ProductionLoadTester:
"""
Production Load Testing: SLA Validation and Capacity Planning
This framework provides:
- Realistic traffic pattern simulation
- Comprehensive performance metrics collection
- SLA validation and alerting
- Capacity planning analysis
- Automated test reporting
"""
def __init__(
self,
target_endpoint: str,
max_concurrent_requests: int = 100,
test_duration_minutes: int = 10
):
self.target_endpoint = target_endpoint
self.max_concurrent_requests = max_concurrent_requests
self.test_duration_seconds = test_duration_minutes * 60
self.results_history: List[LoadTestResult] = []
async def run_load_test(
self,
test_name: str,
request_generator: Callable,
target_rps: int,
sla_requirements: Dict[str, float]
) -> LoadTestResult:
"""Execute comprehensive load test with SLA validation."""
logger.info(
f"Starting load test: {test_name}",
target_endpoint=self.target_endpoint,
target_rps=target_rps,
duration=f"{self.test_duration_seconds}s"
)
# Initialize metrics collection
response_times = []
errors = []
successful_requests = 0
failed_requests = 0
timeout_requests = 0
bytes_transferred = 0
start_time = time.time()
end_time = start_time + self.test_duration_seconds
# Rate limiting setup
request_interval = 1.0 / target_rps
last_request_time = start_time
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(self.max_concurrent_requests)
async def execute_request() -> Dict:
"""Execute single request with comprehensive error handling."""
nonlocal successful_requests, failed_requests, timeout_requests, bytes_transferred
async with semaphore:
request_start = time.time()
try:
# Generate request parameters
method, url, headers, data = await request_generator()
async with aiohttp.ClientSession() as session:
async with session.request(
method=method,
url=f"{self.target_endpoint}{url}",
headers=headers,
json=data,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
response_body = await response.read()
response_time = time.time() - request_start
response_times.append(response_time)
bytes_transferred += len(response_body)
if 200 <= response.status < 400:
successful_requests += 1
else:
failed_requests += 1
errors.append({
"timestamp": datetime.now().isoformat(),
"status_code": response.status,
"response_time": response_time,
"error": f"HTTP {response.status}"
})
return {
"status": "success",
"response_time": response_time,
"status_code": response.status
}
except asyncio.TimeoutError:
timeout_requests += 1
response_time = time.time() - request_start
response_times.append(response_time)
errors.append({
"timestamp": datetime.now().isoformat(),
"error": "Request timeout",
"response_time": response_time
})
return {"status": "timeout", "response_time": response_time}
except Exception as e:
failed_requests += 1
response_time = time.time() - request_start
response_times.append(response_time)
errors.append({
"timestamp": datetime.now().isoformat(),
"error": str(e),
"response_time": response_time
})
return {"status": "error", "error": str(e), "response_time": response_time}
# Execute load test
tasks = []
current_time = time.time()
while current_time < end_time:
# Rate limiting
if current_time - last_request_time >= request_interval:
task = asyncio.create_task(execute_request())
tasks.append(task)
last_request_time = current_time
# Clean up completed tasks
if len(tasks) >= self.max_concurrent_requests:
done_tasks = [task for task in tasks if task.done()]
for task in done_tasks:
tasks.remove(task)
if not done_tasks:
await asyncio.sleep(0.01) # Prevent busy waiting
current_time = time.time()
# Wait for remaining tasks
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
# Calculate final metrics
total_time = time.time() - start_time
total_requests = successful_requests + failed_requests + timeout_requests
if response_times:
response_times.sort()
avg_response_time = statistics.mean(response_times)
p50 = response_times[int(len(response_times) * 0.5)]
p95 = response_times[int(len(response_times) * 0.95)]
p99 = response_times[int(len(response_times) * 0.99)]
else:
avg_response_time = p50 = p95 = p99 = 0.0
result = LoadTestResult(
test_name=test_name,
duration_seconds=total_time,
total_requests=total_requests,
successful_requests=successful_requests,
failed_requests=failed_requests,
timeout_requests=timeout_requests,
requests_per_second=total_requests / total_time if total_time > 0 else 0,
average_response_time=avg_response_time,
p50_response_time=p50,
p95_response_time=p95,
p99_response_time=p99,
error_rate=(failed_requests + timeout_requests) / max(total_requests, 1),
throughput_mbps=(bytes_transferred / (1024 * 1024)) / max(total_time, 1),
response_times=response_times[:1000], # Keep sample for analysis
errors=errors[:100] # Keep sample errors
)
# SLA validation
sla_violations = self._validate_sla(result, sla_requirements)
logger.info(
f"Load test completed: {test_name}",
rps=f"{result.requests_per_second:.2f}",
avg_response_time=f"{result.average_response_time:.3f}s",
p95_response_time=f"{result.p95_response_time:.3f}s",
error_rate=f"{result.error_rate * 100:.2f}%",
sla_violations=len(sla_violations)
)
if sla_violations:
logger.error(
f"SLA violations detected in {test_name}",
violations=sla_violations
)
self.results_history.append(result)
return result
def _validate_sla(self, result: LoadTestResult, requirements: Dict[str, float]) -> List[str]:
"""Validate test results against SLA requirements."""
violations = []
if "max_response_time_p95" in requirements:
if result.p95_response_time > requirements["max_response_time_p95"]:
violations.append(
f"P95 response time {result.p95_response_time:.3f}s exceeds "
f"SLA requirement {requirements['max_response_time_p95']:.3f}s"
)
if "max_error_rate" in requirements:
if result.error_rate > requirements["max_error_rate"]:
violations.append(
f"Error rate {result.error_rate * 100:.2f}% exceeds "
f"SLA requirement {requirements['max_error_rate'] * 100:.2f}%"
)
if "min_throughput_rps" in requirements:
if result.requests_per_second < requirements["min_throughput_rps"]:
violations.append(
f"Throughput {result.requests_per_second:.2f} RPS below "
f"SLA requirement {requirements['min_throughput_rps']:.2f} RPS"
)
return violations
# Example usage for MCP server load testing
async def mcp_request_generator():
"""Generate realistic MCP server requests for load testing."""
# Mix of different request types
request_types = [
("POST", "/mcp", {"Content-Type": "application/json"}, {
"jsonrpc": "2.0",
"method": "tools/list",
"id": 1
}),
("POST", "/mcp", {"Content-Type": "application/json"}, {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "process_data",
"arguments": {"data": {"test": "data"}, "operation": "transform"}
},
"id": 2
}),
("GET", "/health", {}, None),
("GET", "/metrics", {}, None)
]
return random.choice(request_types)
This comprehensive enterprise resilience framework provides production-ready patterns for building bulletproof MCP servers that can withstand real-world failures and scale to meet enterprise demands. The combination of circuit breakers, chaos engineering, blue-green deployments, and comprehensive load testing ensures your production systems are truly resilient.
🧭 Navigation¶
Previous: Session 3 - Advanced Patterns →
Next: Session 5 - Type-Safe Development →