Session 4: Production MCP Deployment - Enterprise-Grade Infrastructure¶
Learning Outcomes¶
By the end of this session, you will be able to:
- Deploy production-ready MCP servers using containerization and cloud platforms
- Implement comprehensive monitoring, health checks, and observability systems
- Configure auto-scaling, load balancing, and high availability architectures
- Set up CI/CD pipelines for automated deployment and testing
- Apply production best practices including caching, logging, and error handling
Chapter Overview¶
What You'll Learn: Enterprise MCP Infrastructure¶
In this session, we'll transform our MCP servers from development prototypes into production-ready services capable of handling enterprise-scale traffic and operational demands. This involves implementing the infrastructure patterns that power real-world AI systems at major companies.
Why This Matters: Production MCP at Scale (2024-2025)¶
Based on industry research, production MCP deployment has become critical for enterprise AI:
- Containerization Standard: Docker-based MCP servers show 60% reduction in deployment-related support tickets and enable near-instant onboarding
- Operational Excellence: Organizations employing advanced container orchestration report major efficiency gains in 80% of cases
- Developer Adoption: Well-documented MCP servers see 2x higher developer adoption rates in enterprise environments
- Mean Time to Resolution: Verbose logging and monitoring can slash debugging time (MTTR) by up to 40%
- Security Requirements: Enterprise MCP deployments require sandboxing, OAuth 2.0, and multi-layer security architectures
How Production MCP Stands Out: Enterprise Infrastructure Patterns¶
Production MCP deployment differs significantly from development environments: - Transport Evolution: Support for both legacy HTTP+SSE and modern Streamable HTTP protocols for maximum compatibility - Cloud-Native Architecture: AWS guidance includes private subnets, security groups, and automated health checks - Multi-AZ Resilience: Cross-region deployment with automatic failover and zero single points of failure - Observability Integration: CloudWatch, Prometheus, and distributed tracing for complete system visibility
Where You'll Apply This: Enterprise Use Cases¶
Production MCP infrastructure supports: - High-Availability AI Services: 99.9% uptime requirements for customer-facing AI applications - Auto-Scaling Workloads: Dynamic scaling based on ML-driven load balancing and demand prediction - Multi-Tenant Systems: Secure isolation for enterprise customers with dedicated resources - Compliance Requirements: SOC2, ISO27001, and GDPR-compliant deployments with audit trails
Figure 1: Enterprise MCP deployment architecture showing containerization, multi-AZ deployment, monitoring stack, and security layers working together to provide scalable, reliable AI infrastructure
Learning Path Options¶
Observer Path (35 minutes): Understand production deployment concepts and architecture patterns - Focus: Quick insights into containerization, monitoring, and cloud deployment strategies - Best for: Getting oriented with enterprise infrastructure concepts
🙋♂️ Participant Path (65 minutes): Implement working production deployment pipeline
- Focus: Hands-on containerization, monitoring setup, and cloud deployment - Best for: Building practical production deployment skills
🛠️ Implementer Path (100 minutes): Advanced enterprise architecture and multi-region deployment - Focus: High-availability patterns, advanced monitoring, and enterprise security - Best for: Production infrastructure architecture and DevOps expertise
Part 1: Production Infrastructure Fundamentals (Observer: 10 min | Participant: 25 min)¶
The Production Transformation Challenge¶
Transforming development MCP servers into production-ready services requires addressing enterprise-grade operational concerns that don't exist in development environments.
Enterprise Production Requirements: 1. Observability: Complete system visibility through metrics, logs, traces, and health checks 2. Scalability: Automatic load handling with predictive scaling and resource optimization 3. Reliability: Fault tolerance with circuit breakers, retries, and graceful degradation 4. Security: Multi-layer authentication, authorization, and threat protection 5. Performance: Caching strategies, connection pooling, and resource efficiency 6. Compliance: Audit trails, data protection, and regulatory requirements
Industry Impact (2024-2025 Data): According to enterprise deployment studies, containerized MCP servers eliminate "it works on my machine" phenomena and reduce deployment-related issues by 60%, while advanced orchestration provides major efficiency gains in 80% of organizations.
OBSERVER PATH: Production Architecture Patterns¶
Key Production Concepts:
- Containerization Benefits: Docker encapsulation ensures consistent environments from development through production
- Cloud-Native Deployment: AWS/GCP patterns with private subnets and security groups
- Observability Stack: Prometheus metrics, structured logging, and distributed tracing
- High Availability: Multi-AZ deployment with automated health checks and failover
PARTICIPANT PATH: Building Production-Ready Infrastructure¶
Step 1: Production Server Foundation
Implement enterprise-grade server architecture with observability:
# src/production_mcp_server.py - Enterprise MCP Server
import os
import json
import structlog # Enterprise structured logging
from datetime import datetime
from typing import Dict, Any, Optional
from mcp.server.fastmcp import FastMCP
import aioredis
import asyncio
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from tenacity import retry, stop_after_attempt, wait_exponential
import time
Enterprise Dependencies Explained: - structlog
: Advanced structured logging for enterprise observability - prometheus_client
: Industry-standard metrics collection - tenacity
: Production-grade retry policies with exponential backoff - aioredis
: High-performance Redis integration for caching and session management
Step 2: Enterprise Logging Configuration
Implement structured logging following enterprise standards:
# Configure enterprise-grade structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer() # JSON for log aggregation
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
Enterprise Logging Benefits: - JSON Format: Compatible with log aggregation systems (ELK, Splunk) - Structured Fields: Consistent logging format for automated analysis - ISO Timestamps: Standard time format for distributed systems - Exception Handling: Automatic stack trace capture and formatting
Step 3: Enterprise Metrics and Monitoring
Implement comprehensive observability with Prometheus metrics:
# Enterprise-grade Prometheus metrics
# Request metrics
request_count = Counter(
'mcp_requests_total',
'Total MCP requests',
['method', 'status', 'client_id', 'version']
)
request_duration = Histogram(
'mcp_request_duration_seconds',
'MCP request duration in seconds',
['method', 'endpoint'],
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
)
# System health metrics
active_connections = Gauge('mcp_active_connections', 'Active MCP connections')
cpu_usage = Gauge('mcp_cpu_usage_percent', 'CPU usage percentage')
memory_usage = Gauge('mcp_memory_usage_bytes', 'Memory usage in bytes')
# Business metrics
tool_usage = Counter('mcp_tool_usage_total', 'Tool usage counts', ['tool_name', 'success'])
cache_hits = Counter('mcp_cache_hits_total', 'Cache hit/miss counts', ['type'])
ratelimit_violations = Counter('mcp_ratelimit_violations_total', 'Rate limit violations')
# Start Prometheus metrics server
start_http_server(9090) # Metrics available at :9090/metrics
logger.info("Prometheus metrics server started", port=9090)
Enterprise Metrics Categories: - SLI Metrics: Request rate, latency, error rate for SLO tracking - Resource Metrics: CPU, memory, connections for capacity planning - Business Metrics: Tool usage patterns for product insights - Security Metrics: Rate limiting and abuse detection
Step 4: Health Check and Readiness Probes
Implement Kubernetes-compatible health checks:
@app.route('/health')
async def health_check():
"""Kubernetes liveness probe - is the service running?"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.route('/ready')
async def readiness_check():
"""Kubernetes readiness probe - is the service ready to handle traffic?"""
checks = {
"redis": await check_redis_connection(),
"mcp_servers": await check_mcp_servers(),
"disk_space": check_disk_space(),
}
all_healthy = all(checks.values())
status_code = 200 if all_healthy else 503
return {
"status": "ready" if all_healthy else "not_ready",
"checks": checks,
"timestamp": datetime.utcnow().isoformat()
}, status_code
Step 1.1.3: Initialize the Server Class
Now let's create our production server class with configuration:
class ProductionMCPServer:
"""
Production-ready MCP server with comprehensive monitoring and caching.
This server includes all the features needed for production deployment:
- Redis caching for improved performance
- Prometheus metrics for monitoring
- Structured logging for debugging
- Health checks for load balancers
- Environment-based configuration
"""
def __init__(self, name: str = "Production MCP Server"):
self.mcp = FastMCP(name)
self.redis_client: Optional[aioredis.Redis] = None
self.cache_ttl = int(os.getenv('CACHE_TTL', '300')) # 5 minutes default
self.start_time = time.time()
Next, we configure environment variables for cloud deployment compatibility:
# Configuration from environment variables - essential for cloud deployment
self.config = {
'redis_url': os.getenv('REDIS_URL', 'redis://localhost:6379'),
'max_request_size': int(os.getenv('MAX_REQUEST_SIZE', '1048576')), # 1MB
'rate_limit': int(os.getenv('RATE_LIMIT', '100')), # requests per minute
'environment': os.getenv('ENVIRONMENT', 'development')
}
self._setup_tools()
self._setup_monitoring()
Configuration strategy:
- All settings from environment variables
- Sensible defaults for development
- Production overrides through deployment
Step 1.1.4: Async Resource Initialization
Separate async initialization for expensive resources:
async def initialize(self):
"""
Initialize async resources like Redis connections.
This separation allows the server to start synchronously but
initialize expensive resources asynchronously.
"""
try:
self.redis_client = await aioredis.from_url(
self.config['redis_url'],
encoding="utf-8",
decode_responses=True
)
logger.info("Redis connection established")
except Exception as e:
logger.warning(f"Redis connection failed: {e}. Running without cache.")
Step 1.1.5: Production Tools with Caching - Tool Setup
Set up the production tools infrastructure:
def _setup_tools(self):
"""Set up MCP tools with decorators for monitoring."""
@self.mcp.tool()
@self._monitor_tool
async def process_data(data: Dict[str, Any], operation: str = "transform") -> Dict:
"""
Process data with specified operation, demonstrating caching patterns.
This tool shows how to implement caching in production MCP servers.
The cache key is based on the input data hash, ensuring consistency.
"""
Step 1.1.5.1: Cache Key Generation and Lookup
Implement deterministic caching for consistent performance:
# Generate deterministic cache key based on input
cache_key = f"process:{operation}:{hash(json.dumps(data, sort_keys=True))}"
# Check cache first - this can dramatically improve response times
cached = await self._get_cache(cache_key)
if cached:
logger.info(f"Cache hit for operation: {operation}")
return cached
Step 1.1.5.2: Data Processing and Cache Storage
Perform operations and cache results for future requests:
# Perform actual processing
logger.info(f"Processing data with operation: {operation}")
result = {
"operation": operation,
"input_size": len(json.dumps(data)),
"processed_at": datetime.now().isoformat(),
"result": self._perform_operation(data, operation),
"cache_status": "miss"
}
# Cache the result for future requests
await self._set_cache(cache_key, result)
return result
Caching strategy:
- Deterministic cache keys ensure consistency
- Cache misses trigger computation and storage
- Cache hits dramatically improve response times
Step 1.1.6: Health Check Implementation
Essential for production deployment:
@self.mcp.tool()
@self._monitor_tool
async def health_check() -> Dict:
"""
Health check endpoint for load balancers and monitoring systems.
This is crucial for production deployment - load balancers use this
to determine if the server is ready to handle requests.
"""
checks = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"uptime_seconds": time.time() - self.start_time,
"environment": self.config['environment'],
"version": "1.0.0"
}
# Check external dependency health
if self.redis_client:
try:
await self.redis_client.ping()
checks["redis"] = "connected"
except Exception as e:
checks["redis"] = "disconnected"
checks["status"] = "degraded"
logger.warning(f"Redis health check failed: {e}")
else:
checks["redis"] = "not_configured"
return checks
[Continue with remaining helper methods and server startup in the source files...]
Key Production Features Explained:
- Environment Configuration: All settings come from environment variables, making the server configurable without code changes
- Caching Strategy: Redis caching with deterministic keys reduces database load and improves response times
- Observability: Comprehensive metrics collection using Prometheus for monitoring and alerting
- Health Checks: Essential for load balancers to determine server readiness
- Error Handling: Graceful degradation when external services (Redis) are unavailable
Step 1.2: Containerization with Docker¶
Containerize the MCP server for consistent deployment across environments. Let's build this step by step following Docker best practices.
Step 1.2.1: Base Image and System Dependencies
Start with a secure, minimal base image:
# deployments/Dockerfile
FROM python:3.11-slim
# Install system dependencies required for our application
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
Step 1.2.2: Security Configuration
Create a non-root user for security (principle of least privilege):
# Create non-root user for security (principle of least privilege)
RUN useradd -m -u 1000 mcpuser
# Set working directory
WORKDIR /app
Step 1.2.3: Dependency Installation
Optimize Docker layer caching by installing dependencies first:
# Copy requirements first for better Docker layer caching
# If requirements don't change, Docker can reuse this layer
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
Step 1.2.4: Application Code and Permissions
Copy application code and set up proper permissions:
# Copy application code
COPY src/ ./src/
COPY scripts/ ./scripts/
# Create log directory with proper permissions
RUN mkdir -p /var/log/mcp && chown mcpuser:mcpuser /var/log/mcp
# Switch to non-root user before running the application
USER mcpuser
Step 1.2.5: Health Checks and Runtime Configuration
Configure health checks and runtime environment:
# Health check endpoint for container orchestrators
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python scripts/health_check.py || exit 1
# Environment variables for consistent behavior
ENV PYTHONUNBUFFERED=1
ENV MCP_LOG_LEVEL=INFO
# Expose metrics port (separate from main application port)
EXPOSE 9090
# Run the server
CMD ["python", "-m", "src.production_mcp_server"]
Container security features:
- Minimal base image: python:3.11-slim reduces attack surface
- Non-root execution: Application runs as unprivileged user
- Layer optimization: Dependencies cached separately from code
- Health monitoring: Built-in health checks for orchestrators
Docker Best Practices Implemented:
- Multi-stage Build: Separates build dependencies from runtime
- Non-root User: Runs application as unprivileged user for security
- Layer Optimization: Requirements installed first for better caching
- Health Checks: Built-in health checking for orchestrators
- Signal Handling: Proper shutdown behavior with PYTHONUNBUFFERED
Step 1.3: Local Development with Docker Compose¶
Create a complete local development environment that mirrors production. Let's build this multi-service setup step by step.
Step 1.3.1: Compose File Header and MCP Server Service
Set up the main MCP server with proper configuration:
# deployments/docker-compose.yml
version: '3.8'
services:
# Main MCP server
mcp-server:
build:
context: ..
dockerfile: deployments/Dockerfile
ports:
- "8080:8080" # MCP server port
- "9090:9090" # Prometheus metrics port
environment:
- REDIS_URL=redis://redis:6379
- ENVIRONMENT=development
- CACHE_TTL=300
- RATE_LIMIT=100
depends_on:
- redis
volumes:
- ./logs:/var/log/mcp
restart: unless-stopped
healthcheck:
test: ["CMD", "python", "scripts/health_check.py"]
interval: 30s
timeout: 10s
retries: 3
Step 1.3.2: Redis Caching Service
Configure Redis for production-like caching:
# Redis for caching
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes # Enable persistence
Step 1.3.3: Monitoring Stack - Prometheus and Grafana
Set up comprehensive monitoring infrastructure:
# Prometheus for metrics collection
prometheus:
image: prom/prometheus:latest
ports:
- "9091:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
# Grafana for visualization
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana-dashboards:/etc/grafana/provisioning/dashboards
depends_on:
- prometheus
Step 1.3.4: Data Persistence Configuration
Define persistent volumes for data continuity:
Development environment features:
- Service isolation: Each component runs in its own container
- Dependency management: Proper startup order with depends_on
- Health monitoring: Built-in health checks for reliability
- Data persistence: Volumes ensure data survives container restarts
- Production parity: Same services and configurations as production
Development Environment Benefits:
- Complete monitoring stack included
- Persistent data volumes for development continuity
- Hot-reload capabilities for rapid iteration
- Production-like environment for accurate testing
Part 2: Cloud Deployment - Google Cloud Run (20 minutes)¶
Understanding Serverless Deployment¶
Google Cloud Run provides serverless container deployment with automatic scaling. This is ideal for MCP servers because:
- Automatic Scaling: Scales to zero when not in use, scales up based on demand
- Pay-per-use: Only pay for actual request processing time
- Managed Infrastructure: No server management required
- Global Distribution: Automatic load balancing and edge caching
Step 2.1: Cloud Run HTTP Adapter¶
Cloud Run expects HTTP traffic, so we need an adapter to convert MCP JSON-RPC to HTTP. Let's build this step by step:
Step 2.1.1: FastAPI Application Setup
# src/cloud_run_adapter.py
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse, StreamingResponse
import json
import asyncio
from typing import AsyncIterator
import os
import logging
from src.production_mcp_server import ProductionMCPServer
# Configure logging for Cloud Run (structured logging recommended)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="MCP Server on Cloud Run",
description="Production MCP server deployed on Google Cloud Run",
version="1.0.0"
)
# Global server instance
server = ProductionMCPServer()
Step 2.1.2: Application Lifecycle Events
@app.on_event("startup")
async def startup_event():
"""
Initialize server on startup.
Cloud Run containers start fresh for each deployment,
so we initialize our server and its dependencies here.
"""
logger.info("Initializing MCP server for Cloud Run...")
await server.initialize()
logger.info("MCP server ready to handle requests")
@app.on_event("shutdown")
async def shutdown_event():
"""
Cleanup on shutdown.
Proper cleanup ensures graceful shutdown when Cloud Run
terminates the container.
"""
logger.info("Shutting down MCP server...")
if server.redis_client:
await server.redis_client.close()
Step 2.1.3: MCP Request Handler - Request Processing
Handle MCP requests over HTTP with proper JSON-RPC protocol support:
@app.post("/mcp")
async def handle_mcp_request(request: Request):
"""
Handle MCP requests over HTTP.
This endpoint converts HTTP requests to MCP JSON-RPC format
and routes them to the appropriate MCP tools.
"""
try:
body = await request.json()
logger.info(f"Received MCP request: {body.get('method', 'unknown')}")
# Route to appropriate handler based on method
method = body.get("method", "")
params = body.get("params", {})
Step 2.1.3.1: Tools List Handler
Handle requests for available tools:
if method == "tools/list":
# List available tools
tools = server.mcp.list_tools()
return JSONResponse(content={
"jsonrpc": "2.0",
"result": tools,
"id": body.get("id")
})
Step 2.1.3.2: Tool Execution Handler
Execute specific tools with parameter validation:
elif method.startswith("tools/call"):
# Execute a specific tool
tool_name = params.get("name")
tool_params = params.get("arguments", {})
# Find and execute the requested tool
tool = server.mcp.get_tool(tool_name)
if tool:
result = await tool(**tool_params)
return JSONResponse(content={
"jsonrpc": "2.0",
"result": result,
"id": body.get("id")
})
else:
return JSONResponse(
content={
"jsonrpc": "2.0",
"error": {"code": -32601, "message": f"Tool '{tool_name}' not found"},
"id": body.get("id")
},
status_code=404
)
Step 2.1.3.3: Method Not Found Handler
Handle unsupported MCP methods:
else:
return JSONResponse(
content={
"jsonrpc": "2.0",
"error": {"code": -32601, "message": f"Method '{method}' not found"},
"id": body.get("id")
},
status_code=404
)
Step 2.1.3.4: Exception Handling
Handle parsing errors and unexpected exceptions:
except json.JSONDecodeError:
return JSONResponse(
content={
"jsonrpc": "2.0",
"error": {"code": -32700, "message": "Parse error"},
"id": None
},
status_code=400
)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return JSONResponse(
content={
"jsonrpc": "2.0",
"error": {"code": -32603, "message": "Internal error"},
"id": body.get("id", None)
},
status_code=500
)
Handler features:
- JSON-RPC compliance: Proper error codes and response format
- Method routing: Support for tools/list and tools/call operations
- Error handling: Comprehensive exception catching and reporting
- Request logging: All requests logged for debugging and monitoring
Step 2.1.4: Health Check and Metrics
@app.get("/health")
async def health_check():
"""Health check endpoint for Cloud Run."""
try:
health = await server.mcp.get_tool("health_check")()
if health.get("status") == "healthy":
return health
else:
return JSONResponse(content=health, status_code=503)
except Exception as e:
return JSONResponse(
content={"status": "unhealthy", "error": str(e)},
status_code=503
)
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
from prometheus_client import generate_latest
return Response(content=generate_latest(), media_type="text/plain")
[Continue with streaming support and CORS configuration in the source files...]
Step 2.2: Cloud Build Configuration¶
Automate the build and deployment process with Google Cloud Build. This creates a complete CI/CD pipeline for your MCP server.
Step 2.2.1: Container Build Process
Build and tag the container image with version control:
# deployments/cloudbuild.yaml
steps:
# Build the container image
- name: 'gcr.io/cloud-builders/docker'
args: [
'build',
'-t', 'gcr.io/$PROJECT_ID/mcp-server:$COMMIT_SHA',
'-t', 'gcr.io/$PROJECT_ID/mcp-server:latest',
'-f', 'deployments/Dockerfile',
'.'
]
Step 2.2.2: Image Registry Push
Push both versioned and latest tags to Container Registry:
# Push to Container Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'gcr.io/$PROJECT_ID/mcp-server:$COMMIT_SHA']
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'gcr.io/$PROJECT_ID/mcp-server:latest']
Step 2.2.3: Cloud Run Deployment
Deploy the new image to Cloud Run with production configuration:
# Deploy to Cloud Run
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: gcloud
args:
- 'run'
- 'deploy'
- 'mcp-server'
- '--image=gcr.io/$PROJECT_ID/mcp-server:$COMMIT_SHA'
- '--region=us-central1'
- '--platform=managed'
- '--allow-unauthenticated'
- '--set-env-vars=ENVIRONMENT=production,REDIS_URL=${_REDIS_URL}'
- '--memory=1Gi'
- '--cpu=2'
- '--timeout=300'
- '--concurrency=100'
- '--max-instances=50'
- '--min-instances=1'
Step 2.2.4: Build Configuration
Configure build parameters and substitutions:
# Configurable substitutions
substitutions:
_REDIS_URL: 'redis://10.0.0.3:6379' # Replace with your Redis instance
# Build options
options:
logging: CLOUD_LOGGING_ONLY
machineType: 'E2_HIGHCPU_8'
CI/CD pipeline features:
- Automatic triggers: Builds triggered by git commits
- Version control: Each deployment tagged with commit SHA
- Resource optimization: High-CPU build machines for faster builds
- Environment promotion: Configurable variables for different environments
Production Deployment Features:
- Automatic scaling: 1-50 instances based on demand
- Resource limits: 1GB memory, 2 CPU cores per instance
- Timeout configuration: 5-minute timeout for long-running requests
- Concurrency control: 100 concurrent requests per instance
Step 2.3: Infrastructure as Code with Terraform¶
Terraform enables reproducible infrastructure deployments. Let's build the complete Cloud Run infrastructure step by step.
Step 2.3.1: Terraform Configuration and Provider
Set up Terraform with Google Cloud provider:
# deployments/terraform/main.tf
terraform {
required_providers {
google = {
source = "hashicorp/google"
version = "~> 5.0"
}
}
}
provider "google" {
project = var.project_id
region = var.region
}
Step 2.3.2: Variable Definitions
Define configurable parameters for flexible deployments:
# Variables
variable "project_id" {
description = "GCP Project ID"
type = string
}
variable "region" {
description = "GCP Region"
type = string
default = "us-central1"
}
Step 2.3.3: Cloud Run Service - Basic Configuration
Define the core Cloud Run service structure:
# Cloud Run Service
resource "google_cloud_run_service" "mcp_server" {
name = "mcp-server"
location = var.region
template {
spec {
containers {
image = "gcr.io/${var.project_id}/mcp-server:latest"
# Resource limits for cost control and performance
resources {
limits = {
cpu = "2"
memory = "1Gi"
}
}
Step 2.3.4: Environment Variables and Security
Configure environment variables and secrets integration:
# Environment variables from secrets
env {
name = "ENVIRONMENT"
value = "production"
}
env {
name = "REDIS_URL"
value_from {
secret_key_ref {
name = google_secret_manager_secret.redis_url.secret_id
key = "latest"
}
}
}
# Health check port
ports {
container_port = 8080
}
}
# Service account for security
service_account_name = google_service_account.mcp_server.email
}
Step 2.3.5: Auto-scaling and Traffic Configuration
Configure scaling behavior and traffic routing:
# Scaling configuration
metadata {
annotations = {
"autoscaling.knative.dev/maxScale" = "50"
"autoscaling.knative.dev/minScale" = "1"
"run.googleapis.com/cpu-throttling" = "false"
}
}
}
# Traffic routing (100% to latest revision)
traffic {
percent = 100
latest_revision = true
}
}
Step 2.3.6: Service Account and IAM
Create service account and configure access permissions:
# Service Account for the Cloud Run service
resource "google_service_account" "mcp_server" {
account_id = "mcp-server"
display_name = "MCP Server Service Account"
description = "Service account for MCP server on Cloud Run"
}
# IAM binding to allow public access
resource "google_cloud_run_service_iam_member" "public" {
service = google_cloud_run_service.mcp_server.name
location = google_cloud_run_service.mcp_server.location
role = "roles/run.invoker"
member = "allUsers"
}
Step 2.3.7: Secret Management
Configure secure storage for sensitive configuration:
# Secret Manager for sensitive configuration
resource "google_secret_manager_secret" "redis_url" {
secret_id = "redis-url"
replication {
automatic = true
}
}
Step 2.3.8: Monitoring and Alerting
Set up automated monitoring and notification channels:
# Cloud Monitoring alert policy
resource "google_monitoring_alert_policy" "high_error_rate" {
display_name = "MCP Server High Error Rate"
combiner = "OR"
conditions {
display_name = "Error rate > 5%"
condition_threshold {
filter = "resource.type=\"cloud_run_revision\" resource.label.service_name=\"mcp-server\""
comparison = "COMPARISON_GREATER_THAN"
threshold_value = 0.05
duration = "300s"
aggregations {
alignment_period = "60s"
per_series_aligner = "ALIGN_RATE"
}
}
}
notification_channels = [google_monitoring_notification_channel.email.name]
}
# Notification channel for alerts
resource "google_monitoring_notification_channel" "email" {
display_name = "Email Notification"
type = "email"
labels = {
email_address = "ops-team@yourcompany.com"
}
}
Step 2.3.9: Output Values
Export important infrastructure information:
# Output the service URL
output "service_url" {
value = google_cloud_run_service.mcp_server.status[0].url
description = "URL of the deployed Cloud Run service"
}
output "service_account_email" {
value = google_service_account.mcp_server.email
description = "Email of the service account"
}
Terraform advantages:
- Infrastructure as Code: Version-controlled, repeatable deployments
- State management: Tracks resource changes and dependencies
- Plan before apply: Preview changes before deployment
- Multi-cloud support: Works across different cloud providers
Deploy with: terraform init && terraform plan && terraform apply
Part 3: AWS Lambda Deployment (15 minutes)¶
Understanding Serverless vs Container Deployment¶
AWS Lambda offers a different serverless model compared to Cloud Run:
- Function-based: Code runs as functions, not containers
- Event-driven: Responds to events (HTTP, S3, DynamoDB, etc.)
- Cold starts: Functions may experience initialization delays
- Time limits: Maximum execution time of 15 minutes
Step 3.1: Lambda Handler Implementation¶
AWS Lambda provides a different serverless model than Cloud Run. Let's implement both FastAPI-based and direct handlers for maximum flexibility.
Step 3.1.1: Lambda Dependencies and Setup
Start with essential imports and logging configuration:
# src/lambda_handler.py
import json
import os
import asyncio
from typing import Dict, Any
import logging
from mangum import Mangum
# Import our FastAPI app from the Cloud Run adapter
from src.cloud_run_adapter import app
# Configure logging for Lambda
logger = logging.getLogger()
logger.setLevel(logging.INFO)
Lambda-specific considerations:
- mangum: Converts ASGI apps (FastAPI) to Lambda handlers
- Logging: Use AWS CloudWatch-compatible logging
- Environment: Cold starts require efficient initialization
Step 3.1.2: FastAPI to Lambda Adapter
Convert our existing FastAPI app for Lambda deployment:
Step 3.1.3: Direct Lambda Handler - Request Processing
For maximum performance, implement a direct Lambda handler:
def lambda_handler_direct(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Direct Lambda handler for MCP requests without FastAPI overhead.
This approach provides maximum control and minimum cold start time
for simple MCP operations.
Args:
event: Lambda event containing the HTTP request
context: Lambda context with runtime information
Returns:
HTTP response in Lambda proxy integration format
"""
try:
# Log request for debugging
logger.info(f"Lambda invoked with event: {json.dumps(event, default=str)}")
# Parse the HTTP request body
body = json.loads(event.get('body', '{}'))
method = body.get('method', '')
Step 3.1.4: Tools List Handler
Handle MCP tools listing requests:
# Handle different MCP methods
if method == 'tools/list':
# Return list of available tools
tools = [
{
"name": "process_data",
"description": "Process data with various operations",
"inputSchema": {
"type": "object",
"properties": {
"data": {"type": "object"},
"operation": {"type": "string", "enum": ["transform", "validate", "analyze"]}
}
}
},
{
"name": "health_check",
"description": "Check server health status",
"inputSchema": {"type": "object"}
}
]
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'jsonrpc': '2.0',
'result': {"tools": tools},
'id': body.get('id')
})
}
Step 3.1.5: Tool Execution Handler
Handle tool execution requests:
elif method.startswith('tools/call'):
# Execute specific tool
result = asyncio.run(execute_tool(body))
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(result)
}
else:
return {
'statusCode': 404,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'jsonrpc': '2.0',
'error': {'code': -32601, 'message': 'Method not found'},
'id': body.get('id')
})
}
Step 3.1.6: Error Handling
Implement comprehensive error handling for Lambda:
except json.JSONDecodeError:
return {
'statusCode': 400,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'jsonrpc': '2.0',
'error': {'code': -32700, 'message': 'Parse error'}
})
}
except Exception as e:
logger.error(f"Lambda handler error: {str(e)}")
return {
'statusCode': 500,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'jsonrpc': '2.0',
'error': {'code': -32603, 'message': 'Internal error'}
})
}
Step 3.1.7: Tool Execution Implementation
Implement the actual tool logic for Lambda:
async def execute_tool(body: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute MCP tool based on request body.
This function demonstrates how to handle MCP tool execution
in the Lambda environment.
"""
params = body.get('params', {})
tool_name = params.get('name')
tool_args = params.get('arguments', {})
try:
if tool_name == 'process_data':
# Simulate data processing
data = tool_args.get('data', {})
operation = tool_args.get('operation', 'transform')
result = {
"operation": operation,
"processed_at": "2024-01-01T00:00:00Z",
"lambda_context": {
"aws_region": os.environ.get('AWS_REGION'),
"function_name": os.environ.get('AWS_LAMBDA_FUNCTION_NAME')
},
"result": data # In real implementation, process the data
}
elif tool_name == 'health_check':
result = {
"status": "healthy",
"platform": "aws_lambda",
"region": os.environ.get('AWS_REGION'),
"memory_limit": os.environ.get('AWS_LAMBDA_FUNCTION_MEMORY_SIZE')
}
else:
raise ValueError(f"Unknown tool: {tool_name}")
return {
'jsonrpc': '2.0',
'result': result,
'id': body.get('id')
}
except Exception as e:
return {
'jsonrpc': '2.0',
'error': {'code': -32603, 'message': str(e)},
'id': body.get('id')
}
Lambda advantages:
- Fast cold starts: Direct handler minimizes initialization time
- AWS integration: Native access to AWS services and context
- Cost optimization: Pay only for execution time
- Auto-scaling: Scales to zero when not in use
For complete implementation details, see src/session4/lambda/lambda_handler.py
Step 3.2: Lambda Container Image¶
# deployments/Dockerfile.lambda
FROM public.ecr.aws/lambda/python:3.11
# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY src/ ./src/
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD ["src.lambda_handler.handler"]
Step 3.3: SAM Template for Infrastructure¶
AWS SAM (Serverless Application Model) provides Infrastructure as Code for serverless applications. Let's build this template step by step for production deployment.
Step 3.3.1: Template Header and Global Configuration
Start with the SAM template structure and global settings:
# deployments/template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
MCP Server on AWS Lambda
Production-ready MCP server deployed as Lambda function with
API Gateway, monitoring, and observability.
# Global settings for all functions
Globals:
Function:
Timeout: 300
MemorySize: 1024
Environment:
Variables:
ENVIRONMENT: production
LOG_LEVEL: INFO
Global configuration benefits:
- Consistent settings: All functions inherit the same baseline configuration
- Resource limits: 5-minute timeout and 1GB memory for compute-intensive tasks
- Environment variables: Production-ready logging and environment detection
Step 3.3.2: Parameters for Flexible Deployment
Define configurable parameters for different environments:
Step 3.3.3: Lambda Function Configuration
Define the core MCP server Lambda function:
Resources:
# Main MCP Server Lambda Function
MCPServerFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub 'mcp-server-${Stage}'
PackageType: Image
ImageConfig:
Command: ["src.lambda_handler.handler"]
Architectures:
- x86_64
Environment:
Variables:
REDIS_URL: !Sub '{{resolve:secretsmanager:redis-url:SecretString}}'
Step 3.3.4: API Gateway Events Configuration
Configure API Gateway events for different endpoints:
# API Gateway Events
Events:
MCPApi:
Type: Api
Properties:
Path: /mcp
Method: POST
RestApiId: !Ref MCPApi
HealthCheck:
Type: Api
Properties:
Path: /health
Method: GET
RestApiId: !Ref MCPApi
StreamingApi:
Type: Api
Properties:
Path: /mcp/stream
Method: POST
RestApiId: !Ref MCPApi
Step 3.3.5: IAM Permissions and Container Metadata
Configure security permissions and container settings:
# IAM permissions
Policies:
- AWSSecretsManagerGetSecretValuePolicy:
SecretArn: !Ref RedisUrlSecret
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
# Container image metadata
Metadata:
DockerTag: latest
DockerContext: ../
Dockerfile: deployments/Dockerfile.lambda
Step 3.3.6: API Gateway Configuration
Configure API Gateway with CORS and logging:
# API Gateway with custom configuration
MCPApi:
Type: AWS::Serverless::Api
Properties:
StageName: !Ref Stage
Cors:
AllowMethods: "'GET,POST,OPTIONS'"
AllowHeaders: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'"
AllowOrigin: "'*'"
# Enable request/response logging
MethodSettings:
- ResourcePath: "/*"
HttpMethod: "*"
LoggingLevel: INFO
DataTraceEnabled: true
MetricsEnabled: true
Step 3.3.7: Logging and Secret Management
Configure CloudWatch logs and secure secret storage:
# CloudWatch Log Group with retention
MCPServerLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/mcp-server-${Stage}'
RetentionInDays: 30
# Secret for Redis connection
RedisUrlSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: redis-url
Description: Redis connection URL for MCP server
SecretString: !Sub |
{
"url": "redis://your-redis-cluster.cache.amazonaws.com:6379"
}
Step 3.3.8: CloudWatch Alarms for Monitoring
Set up automated alerting for production issues:
# CloudWatch Alarms for monitoring
HighErrorRateAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub 'MCP-Server-HighErrorRate-${Stage}'
AlarmDescription: 'MCP Server error rate is too high'
MetricName: Errors
Namespace: AWS/Lambda
Statistic: Sum
Period: 300
EvaluationPeriods: 2
Threshold: 10
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: FunctionName
Value: !Ref MCPServerFunction
HighLatencyAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub 'MCP-Server-HighLatency-${Stage}'
AlarmDescription: 'MCP Server latency is too high'
MetricName: Duration
Namespace: AWS/Lambda
Statistic: Average
Period: 300
EvaluationPeriods: 2
Threshold: 10000 # 10 seconds
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: FunctionName
Value: !Ref MCPServerFunction
Step 3.3.9: Stack Outputs
Export important values for other stacks and external systems:
# Outputs for other stacks or external systems
Outputs:
MCPServerApi:
Description: API Gateway endpoint URL for MCP Server
Value: !Sub 'https://${MCPApi}.execute-api.${AWS::Region}.amazonaws.com/${Stage}/'
Export:
Name: !Sub '${AWS::StackName}-ApiUrl'
MCPServerFunction:
Description: MCP Server Lambda Function ARN
Value: !GetAtt MCPServerFunction.Arn
Export:
Name: !Sub '${AWS::StackName}-FunctionArn'
SAM template advantages:
- Simplified serverless: Higher-level abstractions for Lambda and API Gateway
- Built-in best practices: Automatic IAM roles and resource relationships
- Easy deployment: Single command deployment with
sam deploy
- Local testing: Run and test functions locally with
sam local
For complete deployment instructions, see deployments/aws-sam-deploy.md
Part 4: Monitoring and Observability (15 minutes)¶
The Three Pillars of Observability¶
Production systems require comprehensive observability through:
- Metrics: Quantitative measurements (response time, error rates)
- Logs: Detailed event records for debugging
- Traces: Request flow through distributed systems
Step 4.1: Comprehensive Monitoring System¶
Effective production monitoring requires multiple components working together. Let's build this system step by step, focusing on observability patterns that scale with your infrastructure.
Step 4.1.1: Core Dependencies and Data Structures
First, we'll set up the foundation with proper imports and health status tracking:
# monitoring/monitor.py
from prometheus_client import start_http_server, Counter, Histogram, Gauge
import time
import asyncio
import aiohttp
from typing import List, Dict, Optional
import logging
import json
from dataclasses import dataclass
from datetime import datetime, timedelta
Key libraries explained:
- prometheus_client: Industry-standard metrics collection
- aiohttp: Async HTTP client for efficient health checks
- dataclasses: Type-safe data structures for better code quality
Step 4.1.2: Structured Logging Configuration
Production systems need consistent, parsable logs:
# Configure structured logging for production environments
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Step 4.1.3: Health Status Data Model
Define a clear contract for health check results:
@dataclass
class ServerHealthStatus:
"""Represents the health status of a single MCP server."""
url: str
status: str # 'healthy', 'unhealthy', 'error'
response_time: Optional[float]
last_check: datetime
error_message: Optional[str] = None
details: Optional[Dict] = None
Status categories:
- healthy: Server responding correctly
- unhealthy: Server responding with errors
- error: Server not reachable or timing out
Step 4.1.4: Monitor Class Initialization
Create the core monitoring class with essential configuration:
class MCPServerMonitor:
"""
Comprehensive monitoring system for MCP servers.
This monitor provides:
- Health checking with configurable intervals
- Prometheus metrics collection
- Alerting when servers become unhealthy
- Historical data tracking
"""
def __init__(self, server_urls: List[str], check_interval: int = 30):
self.server_urls = server_urls
self.check_interval = check_interval
self.server_status: Dict[str, ServerHealthStatus] = {}
self.failure_counts: Dict[str, int] = {url: 0 for url in server_urls}
Step 4.1.5: Prometheus Metrics Setup
Define comprehensive metrics for observability:
# Prometheus metrics for comprehensive monitoring
self.health_check_total = Counter(
'mcp_health_checks_total',
'Total health checks performed',
['server', 'status']
)
self.response_time = Histogram(
'mcp_response_time_seconds',
'Response time for MCP requests',
['server', 'method'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, float('inf')]
)
self.server_availability = Gauge(
'mcp_server_availability',
'Server availability (1=up, 0=down)',
['server']
)
self.consecutive_failures = Gauge(
'mcp_consecutive_failures',
'Number of consecutive failures',
['server']
)
Metrics explained:
- Counter: Tracks total events (health checks)
- Histogram: Measures distributions with buckets (response times)
- Gauge: Tracks current values (availability, failure counts)
Step 4.1.6: Health Check Implementation - Setup
Implement robust health checking with proper error handling:
async def check_health(self, session: aiohttp.ClientSession, url: str) -> ServerHealthStatus:
"""
Perform comprehensive health check on a single server.
This includes:
- Basic connectivity test
- Health endpoint validation
- Response time measurement
- Error categorization
"""
start_time = time.time()
Step 4.1.7: Health Check - Success Path
Handle successful health check responses:
try:
# Make health check request with timeout
async with session.get(
f"{url}/health",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response_time = time.time() - start_time
if response.status == 200:
# Parse health check response
try:
health_data = await response.json()
# Reset failure count on success
self.failure_counts[url] = 0
# Update metrics
self.health_check_total.labels(server=url, status='success').inc()
self.server_availability.labels(server=url).set(1)
self.response_time.labels(server=url, method='health').observe(response_time)
self.consecutive_failures.labels(server=url).set(0)
return ServerHealthStatus(
url=url,
status='healthy',
response_time=response_time,
last_check=datetime.now(),
details=health_data
)
Step 4.1.8: Health Check - Error Handling
Handle various failure scenarios with appropriate metrics:
except json.JSONDecodeError:
return ServerHealthStatus(
url=url,
status='unhealthy',
response_time=response_time,
last_check=datetime.now(),
error_message="Invalid JSON response"
)
else:
# HTTP error status
self.failure_counts[url] += 1
self.health_check_total.labels(server=url, status='error').inc()
self.server_availability.labels(server=url).set(0)
self.consecutive_failures.labels(server=url).set(self.failure_counts[url])
return ServerHealthStatus(
url=url,
status='unhealthy',
response_time=response_time,
last_check=datetime.now(),
error_message=f"HTTP {response.status}"
)
Step 4.1.9: Health Check - Exception Handling
Handle network timeouts and unexpected errors:
except asyncio.TimeoutError:
self.failure_counts[url] += 1
self.health_check_total.labels(server=url, status='timeout').inc()
self.server_availability.labels(server=url).set(0)
self.consecutive_failures.labels(server=url).set(self.failure_counts[url])
return ServerHealthStatus(
url=url,
status='error',
response_time=time.time() - start_time,
last_check=datetime.now(),
error_message="Request timeout"
)
except Exception as e:
self.failure_counts[url] += 1
self.health_check_total.labels(server=url, status='error').inc()
self.server_availability.labels(server=url).set(0)
self.consecutive_failures.labels(server=url).set(self.failure_counts[url])
return ServerHealthStatus(
url=url,
status='error',
response_time=time.time() - start_time,
last_check=datetime.now(),
error_message=str(e)
)
Error handling strategy:
- Timeouts: 10-second limit prevents hanging requests
- HTTP errors: Distinguish between server errors and connectivity issues
- JSON parsing: Handle malformed responses gracefully
- Metrics tracking: Every failure type is tracked for analysis
Step 4.1.10: Concurrent Health Checking
Check multiple servers simultaneously for better performance:
async def check_all_servers(self, session: aiohttp.ClientSession) -> List[ServerHealthStatus]:
"""Check health of all configured servers concurrently."""
tasks = [
self.check_health(session, url)
for url in self.server_urls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any exceptions from the gather
health_statuses = []
for i, result in enumerate(results):
if isinstance(result, Exception):
# Create error status for failed check
health_statuses.append(ServerHealthStatus(
url=self.server_urls[i],
status='error',
response_time=None,
last_check=datetime.now(),
error_message=str(result)
))
else:
health_statuses.append(result)
return health_statuses
Concurrency benefits:
- Parallel execution: All servers checked simultaneously
- Exception isolation: One failed check doesn't break others
- Improved performance: Total check time is limited by slowest server
Step 4.1.11: Health Analysis and Alerting
Analyze trends and generate actionable insights:
def analyze_health_trends(self) -> Dict[str, Any]:
"""
Analyze health trends and provide insights.
This could be extended to use historical data from a time-series database.
"""
analysis = {
"total_servers": len(self.server_urls),
"healthy_servers": 0,
"unhealthy_servers": 0,
"error_servers": 0,
"servers_with_alerts": [],
"average_response_time": None
}
response_times = []
for status in self.server_status.values():
if status.status == 'healthy':
analysis["healthy_servers"] += 1
if status.response_time:
response_times.append(status.response_time)
elif status.status == 'unhealthy':
analysis["unhealthy_servers"] += 1
else:
analysis["error_servers"] += 1
# Check for alert conditions
if self.failure_counts.get(status.url, 0) >= 3:
analysis["servers_with_alerts"].append({
"url": status.url,
"consecutive_failures": self.failure_counts[status.url],
"last_error": status.error_message
})
if response_times:
analysis["average_response_time"] = sum(response_times) / len(response_times)
return analysis
Analysis features:
- Server categorization: Healthy, unhealthy, and error states
- Alert triggers: 3+ consecutive failures trigger alerts
- Performance metrics: Average response time calculation
- Actionable data: Specific error messages for debugging
Step 4.1.12: Continuous Monitoring Loop
Implement the main monitoring loop with comprehensive error handling:
async def monitor_loop(self):
"""
Main monitoring loop that runs continuously.
This loop:
1. Checks all server health
2. Updates internal state
3. Logs significant events
4. Triggers alerts if needed
"""
logger.info(f"Starting monitoring loop for {len(self.server_urls)} servers")
async with aiohttp.ClientSession() as session:
while True:
try:
# Check all servers
health_statuses = await self.check_all_servers(session)
# Update internal state
for status in health_statuses:
self.server_status[status.url] = status
# Log significant events
unhealthy_servers = [s for s in health_statuses if s.status != 'healthy']
if unhealthy_servers:
for server in unhealthy_servers:
logger.warning(
f"Server {server.url} is {server.status}: {server.error_message}"
)
# Analyze trends and trigger alerts
analysis = self.analyze_health_trends()
if analysis["servers_with_alerts"]:
logger.error(f"ALERT: {len(analysis['servers_with_alerts'])} servers need attention")
for alert in analysis["servers_with_alerts"]:
logger.error(f" - {alert['url']}: {alert['consecutive_failures']} failures")
# Log summary every few iterations
healthy_count = analysis["healthy_servers"]
total_count = analysis["total_servers"]
logger.info(f"Health check complete: {healthy_count}/{total_count} servers healthy")
except Exception as e:
logger.error(f"Error in monitoring loop: {e}")
# Wait before next check
await asyncio.sleep(self.check_interval)
Loop features:
- Continuous operation: Runs indefinitely with configurable intervals
- State management: Updates internal tracking for all servers
- Progressive alerting: Logs warnings for issues, errors for critical problems
- Exception isolation: Loop continues even if individual checks fail Step 4.1.13: Monitor Startup and Configuration
Start the monitoring system with both metrics server and health checking:
def start(self, metrics_port: int = 9092):
"""
Start the monitoring system.
This starts both the Prometheus metrics server and the monitoring loop.
"""
# Start Prometheus metrics server
start_http_server(metrics_port)
logger.info(f"Prometheus metrics server started on port {metrics_port}")
# Start monitoring loop
logger.info("Starting MCP server monitoring...")
asyncio.run(self.monitor_loop())
Step 4.1.14: Example Usage and Configuration
Configure and start the monitoring system:
# Example usage and configuration
if __name__ == "__main__":
# Configure servers to monitor
servers = [
"http://localhost:8080",
"https://mcp-server-abc123.run.app",
"https://api.example.com"
]
# Create and start monitor
monitor = MCPServerMonitor(servers, check_interval=30)
monitor.start()
Configuration options:
- servers: List of MCP server URLs to monitor
- check_interval: Health check frequency in seconds
- metrics_port: Prometheus metrics exposure port
For complete implementation details, see src/session4/monitoring/monitor.py
Step 4.2: Grafana Dashboard Configuration¶
Grafana dashboards provide visual insights into your MCP server performance. Let's build a comprehensive dashboard step by step.
Step 4.2.1: Dashboard Header and Configuration
Set up the basic dashboard structure and metadata:
{
"dashboard": {
"id": null,
"title": "MCP Server Production Monitoring",
"tags": ["mcp", "production", "monitoring"],
"style": "dark",
"timezone": "browser",
"editable": true,
"refresh": "30s",
"time": {
"from": "now-1h",
"to": "now"
}
}
}
Dashboard features:
- Auto-refresh: Updates every 30 seconds for real-time monitoring
- Dark theme: Reduces eye strain during long monitoring sessions
- Flexible time range: Default 1-hour view with customizable timeframes
Step 4.2.2: Server Availability Panel
Create a high-level availability indicator:
"panels": [
{
"id": 1,
"title": "Server Availability",
"type": "stat",
"gridPos": {"h": 8, "w": 6, "x": 0, "y": 0},
"targets": [{
"expr": "avg(mcp_server_availability)",
"legendFormat": "Availability %"
}],
"fieldConfig": {
"defaults": {
"unit": "percentunit",
"min": 0,
"max": 1,
"thresholds": {
"steps": [
{"color": "red", "value": 0},
{"color": "yellow", "value": 0.95},
{"color": "green", "value": 0.99}
]
}
}
}
}
]
Step 4.2.3: Request Rate and Error Rate Panels
Monitor traffic patterns and error frequencies:
{
"id": 2,
"title": "Request Rate",
"type": "graph",
"gridPos": {"h": 8, "w": 12, "x": 6, "y": 0},
"targets": [{
"expr": "sum(rate(mcp_requests_total[5m])) by (server)",
"legendFormat": "{{server}}"
}],
"yAxes": [{
"label": "Requests/sec",
"min": 0
}]
},
{
"id": 3,
"title": "Error Rate",
"type": "graph",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 8},
"targets": [{
"expr": "sum(rate(mcp_errors_total[5m])) by (server, error_type)",
"legendFormat": "{{server}} - {{error_type}}"
}],
"yAxes": [{
"label": "Errors/sec",
"min": 0
}],
"alert": {
"conditions": [{
"query": {"queryType": "", "refId": "A"},
"reducer": {"type": "avg", "params": []},
"evaluator": {"params": [0.1], "type": "gt"}
}],
"executionErrorState": "alerting",
"frequency": "10s",
"handler": 1,
"name": "High Error Rate Alert",
"noDataState": "no_data"
}
}
Step 4.2.4: Performance Monitoring Panels
Track response times and connection metrics:
{
"id": 4,
"title": "Response Time Percentiles",
"type": "graph",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 8},
"targets": [
{
"expr": "histogram_quantile(0.50, sum(rate(mcp_request_duration_seconds_bucket[5m])) by (le, server))",
"legendFormat": "{{server}} - p50"
},
{
"expr": "histogram_quantile(0.95, sum(rate(mcp_request_duration_seconds_bucket[5m])) by (le, server))",
"legendFormat": "{{server}} - p95"
},
{
"expr": "histogram_quantile(0.99, sum(rate(mcp_request_duration_seconds_bucket[5m])) by (le, server))",
"legendFormat": "{{server}} - p99"
}
],
"yAxes": [{
"label": "Response Time (s)",
"min": 0
}]
},
{
"id": 5,
"title": "Active Connections",
"type": "graph",
"gridPos": {"h": 8, "w": 8, "x": 0, "y": 16},
"targets": [{
"expr": "mcp_active_connections",
"legendFormat": "{{server}}"
}],
"yAxes": [{
"label": "Connections",
"min": 0
}]
}
Step 4.2.5: Health Status Table and Variables
Display detailed health information and server filtering:
{
"id": 6,
"title": "Server Health Status",
"type": "table",
"gridPos": {"h": 8, "w": 16, "x": 8, "y": 16},
"targets": [{
"expr": "mcp_server_availability",
"format": "table",
"instant": true
}],
"transformations": [{
"id": "organize",
"options": {
"excludeByName": {"Time": true, "__name__": true},
"renameByName": {"server": "Server", "Value": "Status"}
}
}]
}
],
"templating": {
"list": [{
"name": "server",
"type": "query",
"query": "label_values(mcp_server_availability, server)",
"refresh": 1,
"includeAll": true,
"multi": true
}]
}
Dashboard benefits:
- Comprehensive coverage: Availability, performance, errors, and health status
- Real-time alerting: Built-in alerts for critical thresholds
- Percentile tracking: P50, P95, P99 response times for SLA monitoring
- Server filtering: Dynamic server selection for focused analysis
Key metrics explained:
- Availability: Percentage of time servers are responding correctly
- Request rate: Traffic volume per second across all servers
- Error rate: Failed requests per second with automatic alerting
- Response time percentiles: Performance distribution analysis
- Active connections: Current load on each server
For complete dashboard JSON and import instructions, see monitoring/grafana-dashboards/mcp-production.json
Chapter Summary¶
Congratulations! You've successfully transformed your MCP servers from development prototypes to production-ready services. Let's review the comprehensive production features you've implemented:
Production Deployment Achievements:¶
Containerization & Infrastructure¶
- Docker containerization with security best practices and health checks
- Multi-environment configuration using environment variables
- Infrastructure as Code with Terraform for reproducible deployments
- Container orchestration with Docker Compose for local development
Cloud Platform Deployment¶
- Google Cloud Run deployment with auto-scaling and load balancing
- AWS Lambda serverless deployment with API Gateway integration
- CI/CD pipelines with Cloud Build and SAM for automated deployments
- Secret management using cloud-native secret stores
Monitoring & Observability¶
- Prometheus metrics collection for comprehensive monitoring
- Structured logging with proper log levels and formatting
- Health check endpoints for load balancer integration
- Grafana dashboards for visualization and alerting
- Distributed monitoring across multiple server instances
Production Features¶
- Redis caching for improved performance and reduced load
- Error handling with graceful degradation and retries
- Resource management with memory and CPU limits
- Security hardening with non-root users and input validation
Architecture Benefits Achieved:¶
- Scalability: Auto-scaling based on demand with proper resource limits
- Reliability: Health checks, error handling, and graceful degradation
- Observability: Comprehensive metrics, logging, and monitoring
- Maintainability: Infrastructure as Code and automated deployments
- Security: Container security, secret management, and access controls
Practical Exercise¶
Extend the monitoring system with a circuit breaker pattern for improved resilience:
@mcp.tool()
async def resilient_operation(data: Dict[str, Any]) -> Dict:
"""
Demonstrate circuit breaker pattern for resilient operations.
TODO: Implement circuit breaker logic that:
1. Tracks failure rates over time windows
2. Opens circuit after threshold failures
3. Allows limited requests in half-open state
4. Closes circuit when operations succeed
Args:
data: Operation data to process
Returns:
Operation result or circuit breaker response
"""
# Your implementation here
pass
Additional Resources¶
- Google Cloud Run Best Practices - Containerized deployment optimization
- AWS Lambda Container Images Guide - Serverless container deployment
- Prometheus Monitoring Best Practices - Metrics collection and naming conventions
- Docker Security Best Practices - Container security hardening
- Grafana Dashboard Design Guidelines - Effective monitoring dashboards
Practical Exercise¶
Challenge: Implement a circuit breaker pattern for production MCP servers.
Build a robust circuit breaker system that prevents cascade failures and enables graceful degradation:
Your Task:¶
class CircuitBreaker:
"""Circuit breaker for production MCP server operations."""
def __init__(self, failure_threshold=5, recovery_timeout=60):
"""Initialize circuit breaker with thresholds."""
pass
async def call(self, operation, *args, **kwargs):
"""Execute operation with circuit breaker protection."""
pass
Requirements:¶
- State Management: Implement CLOSED, OPEN, and HALF_OPEN states
- Failure Detection: Track failure rates and response times
- Automatic Recovery: Test recovery after timeout periods
- Metrics Integration: Expose circuit breaker metrics for monitoring
- Graceful Degradation: Provide fallback responses when circuit is open
Implementation Tasks:¶
- Implement distributed tracing using OpenTelemetry to track requests across services
- Create a blue-green deployment strategy for zero-downtime updates
- Build load testing scripts to verify your servers can handle production traffic
- Add automatic failover between multiple regions for high availability
Hint: Use exponential backoff for recovery attempts and integrate with your existing monitoring stack.
Multiple Choice Test - Session 4¶
Test your understanding of Production MCP Deployment:
Question 1: What is the primary difference between development and production MCP servers?
A) Production servers are slower than development servers
B) Production servers use different protocols
C) Production servers only work with specific LLMs
D) Production servers require observability, scalability, and reliability
Question 2: What is the main advantage of containerizing MCP servers with Docker?
A) Improved performance
B) Better security by default
C) Automatic scaling capabilities
D) Consistent environments across development and production
Question 3: Which Prometheus metric type is best suited for tracking response times?
A) Histogram
B) Counter
C) Gauge
D) Summary
Question 4: What information should a comprehensive health check endpoint provide?
A) Database connectivity and dependent services status
B) Only HTTP 200 status
C) Server uptime only
D) Current server load only
Question 5: What metric is most important for auto-scaling MCP servers?
A) CPU utilization only
B) Network bandwidth only
C) Request rate combined with response time
D) Memory usage only
Question 6: What type of caching is most effective for MCP server responses?
A) File-based caching
B) In-memory caching only
C) Redis distributed caching with TTL expiration
D) Database-level caching only
Question 7: When should a circuit breaker transition to the "open" state?
A) When the server starts up
B) When response times are slightly elevated
C) When memory usage is high
D) When error rates exceed the configured threshold
Question 8: What is the recommended approach for deploying MCP servers through CI/CD?
A) Direct deployment to production
B) Blue-green deployment with health checks
C) Rolling deployment without testing
D) Manual deployment with downtime
Question 9: Which security practice is essential for production MCP server containers?
A) Running containers as root user for full access
B) Using non-root users and resource limits
C) Disabling all logging to reduce overhead
D) Allowing unlimited resource consumption
Question 10: What is the primary purpose of implementing structured logging in production MCP servers?
A) Reduce file sizes
B) Improve code readability
C) Enable efficient searching and monitoring
D) Decrease memory usage
Navigation¶
Previous: Session 3 - LangChain MCP Integration
Note: Advanced container orchestration and multi-cloud strategies are covered within this session and expanded upon in Session 9 (Production Agent Deployment).