Session 5: Secure MCP Servers - Enterprise Security Architecture¶
Learning Outcomes¶
By the end of this session, you will be able to:
- Implement OAuth 2.1 with PKCE authentication following 2025 MCP security standards
- Configure Resource Indicators (RFC 8707) to prevent token misuse and scope violations
- Apply comprehensive rate limiting and DDoS protection with distributed controls
- Secure data transmission using TLS encryption and certificate management
- Validate all inputs and sanitize data to prevent injection attacks
- Monitor security events through comprehensive audit logging and anomaly detection
Chapter Overview¶
What You'll Learn: Enterprise MCP Security Architecture¶
In this session, we'll implement enterprise-grade security for MCP servers, following the latest 2025 security standards and addressing the critical vulnerabilities discovered in early MCP deployments. This covers the essential security layers that protect against sophisticated threats while maintaining compliance with security frameworks.
Why This Matters: The 2025 MCP Security Crisis and Response¶
Based on 2024-2025 security research, MCP security has undergone significant evolution:
- Critical Vulnerability Recognition: Security researchers identified MCP as "powerful but dangerous" with serious security problems due to lack of security-first design
- OAuth 2.1 Standardization: March 2025 specification mandated OAuth 2.1 with PKCE for all clients, raising the baseline security significantly
- Resource Indicators Requirement: RFC 8707 implementation now prevents malicious servers from misusing tokens across different resources
- Enterprise Adoption Challenges: Every MCP integration is considered "a potential backdoor" until proper security controls are implemented
- Human-in-the-Loop Requirements: Industry consensus treats "SHOULD" security recommendations as "MUST" for production systems
How Secure MCP Stands Out: Defense-in-Depth Architecture¶
The 2025 secure MCP architecture implements multiple security layers: - OAuth 2.1 with PKCE: Mandatory implementation preventing authorization code interception attacks - Resource Indicators: Token scoping prevents cross-resource abuse and privilege escalation - Attribute-Based Access Control (ABAC): Dynamic permissions based on user role, data sensitivity, and context - Comprehensive Monitoring: Anomaly detection for failed authentication, unusual access patterns, and data exfiltration
Where You'll Apply This: Enterprise Security Use Cases¶
Secure MCP implementations are critical for: - Financial Services: Compliance with PCI-DSS and SOX regulations for AI-driven trading and analysis - Healthcare Systems: HIPAA-compliant AI agents accessing patient records and medical databases - Government Agencies: FedRAMP-authorized AI systems with classified data access - Enterprise SaaS: Multi-tenant AI platforms with strict data isolation requirements
Figure 1: Enterprise MCP security architecture showing OAuth 2.1 authentication, Resource Indicators, rate limiting, and comprehensive monitoring layers working together to create a defense-in-depth security posture
Learning Path Options¶
Observer Path (30 minutes): Understand MCP security concepts and threat landscape - Focus: Quick insights into OAuth 2.1, security vulnerabilities, and protection strategies - Best for: Getting oriented with enterprise security requirements
🙋♂️ Participant Path (60 minutes): Implement working secure authentication and monitoring
- Focus: Hands-on OAuth 2.1 implementation, rate limiting, and security controls - Best for: Building practical secure MCP systems
🛠️ Implementer Path (95 minutes): Advanced enterprise security and compliance - Focus: ABAC policies, advanced threat detection, and regulatory compliance - Best for: Enterprise security architecture and compliance expertise
Part 1: Modern MCP Security Fundamentals (Observer: 8 min | Participant: 20 min)¶
The 2025 MCP Security Threat Landscape¶
Based on security research and vulnerability disclosures, MCP faces sophisticated threats that require enterprise-grade countermeasures:
Critical Threat Vectors: 1. Token Misuse Across Resources: Malicious servers using stolen tokens to access unintended resources 2. Authorization Code Interception: Man-in-the-middle attacks against OAuth flows without PKCE 3. Unintended LLM Actions: AI models performing destructive operations without explicit user intent
4. Privilege Escalation: Users or AI agents accessing resources beyond their intended permissions 5. Distributed Rate Limit Bypass: Coordinated attacks from multiple IP addresses 6. Data Exfiltration: Malicious extraction of sensitive data through seemingly legitimate tool calls
Industry Security Assessment: Security researchers have identified MCP as having "serious security problems" with every integration being "a potential backdoor" until proper controls are implemented. This has driven the rapid evolution of security standards in 2025.
OBSERVER PATH: Understanding Modern MCP Security¶
Key Security Evolution (2024-2025):
- OAuth 2.1 with PKCE Mandate: March 2025 specification requires PKCE for all clients, preventing authorization code attacks
- Resource Indicators (RFC 8707): Tokens explicitly scoped to specific MCP servers prevent cross-resource abuse
- Human-in-the-Loop Requirements: Production systems treat optional security controls as mandatory
- Sandbox Isolation: Local MCP servers must run in restricted execution environments
PARTICIPANT PATH: Implementing OAuth 2.1 with PKCE¶
Step 1: Modern OAuth 2.1 Implementation
Implement the 2025 MCP security standard with OAuth 2.1 and PKCE:
# src/auth/oauth_mcp.py - 2025 MCP Security Standard
import secrets
import hashlib
import base64
import jwt
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any, List, Tuple
from authlib.integrations.fastapi_oauth2 import AuthorizationServer
from authlib.oauth2.rfc7636 import CodeChallenge
from authlib.oauth2.rfc8707 import ResourceIndicator
import redis
import structlog
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
logger = structlog.get_logger()
2025 Security Dependencies Explained: - authlib
: Enterprise OAuth 2.1 implementation with PKCE support - cryptography
: FIPS-compliant cryptographic operations - rfc7636
: PKCE (Proof Key for Code Exchange) implementation - rfc8707
: Resource Indicators for token scoping
Step 2: PKCE Code Challenge Generation
Implement secure PKCE challenge generation following RFC 7636:
class PKCEGenerator:
"""RFC 7636 PKCE implementation for MCP OAuth 2.1."""
@staticmethod
def generate_code_verifier() -> str:
"""
Generate cryptographically secure code verifier.
RFC 7636 requires 43-128 character URL-safe string.
"""
code_verifier = base64.urlsafe_b64encode(
secrets.token_bytes(96)
).decode('utf-8').rstrip('=')
logger.info("Generated PKCE code verifier", length=len(code_verifier))
return code_verifier
@staticmethod
def generate_code_challenge(verifier: str) -> Tuple[str, str]:
"""
Generate code challenge from verifier using SHA256.
Returns: (challenge, method)
"""
digest = hashes.Hash(hashes.SHA256())
digest.update(verifier.encode('utf-8'))
challenge = base64.urlsafe_b64encode(
digest.finalize()
).decode('utf-8').rstrip('=')
return challenge, "S256"
PKCE Security Benefits: - Authorization Code Protection: Prevents interception attacks - Public Client Security: Secure OAuth for mobile and SPA clients - Cryptographic Proof: Mathematical verification of client authenticity
Step 3: Resource Indicators Implementation
Implement RFC 8707 Resource Indicators to prevent token misuse:
class ResourceIndicatorManager:
"""RFC 8707 Resource Indicators for MCP token scoping."""
def __init__(self, redis_client):
self.redis = redis_client
self.valid_resources = {
'mcp://filesystem-server': {
'description': 'File system operations',
'scopes': ['read', 'write', 'list']
},
'mcp://database-server': {
'description': 'Database operations',
'scopes': ['select', 'insert', 'update']
},
'mcp://api-server': {
'description': 'External API calls',
'scopes': ['call', 'webhook']
}
}
def validate_resource_request(self, resource: str, scopes: List[str]) -> bool:
"""
Validate resource indicator and requested scopes.
Prevents tokens from being used on unintended resources.
"""
if resource not in self.valid_resources:
logger.warning("Invalid resource requested", resource=resource)
return False
valid_scopes = self.valid_resources[resource]['scopes']
invalid_scopes = set(scopes) - set(valid_scopes)
if invalid_scopes:
logger.warning(
"Invalid scopes requested",
resource=resource,
invalid_scopes=list(invalid_scopes)
)
return False
return True
def create_scoped_token(self, resource: str, scopes: List[str],
user_context: Dict[str, Any]) -> str:
"""
Create JWT token scoped to specific resource and permissions.
Token can ONLY be used for the specified resource.
"""
if not self.validate_resource_request(resource, scopes):
raise ValueError(f"Invalid resource or scopes: {resource}")
payload = {
'aud': resource, # RFC 8707 audience claim
'scope': ' '.join(scopes),
'sub': user_context['user_id'],
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(hours=1),
'resource_indicator': resource,
'context': user_context
}
token = jwt.encode(payload, os.getenv('JWT_SECRET'), algorithm='HS256')
logger.info(
"Created scoped token",
resource=resource,
scopes=scopes,
user_id=user_context['user_id']
)
return token
Resource Indicator Security Benefits: - Token Scoping: Mathematically prevents cross-resource token usage - Audience Validation: Explicit resource targeting in JWT audience claim - Scope Limitation: Granular permission control per resource - Audit Trail: Complete tracking of token creation and usage
Step 4: Enhanced JWT Management with Security Controls
class JWTManager:
"""JWT token management with Redis-based token blacklisting."""
def __init__(self, secret_key: str = None, redis_client = None):
# Initialize core JWT settings
self.secret_key = secret_key or os.getenv('JWT_SECRET_KEY', self._generate_secret())
self.algorithm = "HS256"
self.access_token_expire_minutes = 30
self.refresh_token_expire_days = 7
self.redis_client = redis_client
# Validate secret key security
self._validate_secret_key()
Security design principles:
- Environment variables prevent secrets in code
- Short access tokens (30 min) limit exposure window
- Separate refresh tokens (7 days) balance security and usability
Step 1.1.3: Secret Key Validation
def _validate_secret_key(self):
"""Validate that secret key meets security requirements."""
if len(self.secret_key) < 32:
raise ValueError("JWT secret key must be at least 32 characters")
if self.secret_key in ['secret', 'password', 'key']:
raise ValueError("JWT secret key cannot be a common word")
logger.info("JWT Manager initialized with secure secret key")
Security considerations:
- Secret key must be at least 32 characters for cryptographic security
- Environment variables keep secrets out of code
- Automatic secret generation for development environments
Step 1.1.2: Secure Secret Generation
def _generate_secret(self) -> str:
"""Generate a secure random secret key."""
return secrets.token_urlsafe(64)
Why this matters:
secrets.token_urlsafe()
uses cryptographically secure random generation- 64-byte keys provide 512 bits of entropy
- URL-safe encoding prevents encoding issues
Step 1.1.4: Access Token Payload Creation
Access tokens contain user identity and permissions for authorization:
def create_tokens(self, user_data: Dict[str, Any]) -> Dict[str, str]:
"""Create access and refresh tokens with proper security claims."""
now = datetime.now(timezone.utc)
# Access token with comprehensive user permissions
access_payload = {
"sub": user_data["user_id"], # Subject (user ID)
"username": user_data["username"], # Human-readable identifier
"roles": user_data.get("roles", ["user"]), # User roles
"permissions": user_data.get("permissions", []), # Specific permissions
"iat": now, # Issued at time
"exp": now + timedelta(minutes=self.access_token_expire_minutes),
"type": "access" # Token type for validation
}
Access token security:
- Short expiration (30 minutes) limits damage from token theft
- Comprehensive claims enable fine-grained authorization
- Type field prevents token confusion attacks
Step 1.1.5: Refresh Token Design
Refresh tokens use minimal claims to reduce information leakage:
# Refresh token (minimal claims for security)
refresh_payload = {
"sub": user_data["user_id"], # Only user ID needed
"iat": now, # Issued at time
"exp": now + timedelta(days=self.refresh_token_expire_days),
"type": "refresh" # Clearly mark as refresh token
}
Refresh token principles:
- Minimal data reduces exposure risk if compromised
- Longer lifespan (7 days) for user convenience
- Single purpose - only for generating new access tokens
Step 1.1.6: Token Generation and Response
# Generate JWT tokens using secure algorithm
access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm=self.algorithm)
# Return tokens in standard OAuth 2.0 format
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": self.access_token_expire_minutes * 60 # Seconds until expiry
}
Token generation security:
- HS256 algorithm provides strong cryptographic signatures
- Standard OAuth 2.0 response ensures client compatibility
- Expires_in field helps clients manage token lifecycle
Step 1.1.7: Token Verification Process
Token verification follows a multi-step security validation process:
def verify_token(self, token: str) -> Dict[str, Any]:
"""Verify and decode JWT token with comprehensive security checks."""
try:
# Step 1: Check blacklist for revoked tokens
if self._is_token_blacklisted(token):
raise HTTPException(status_code=401, detail="Token has been revoked")
# Step 2: Cryptographically verify and decode
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
# Step 3: Validate token type to prevent confusion attacks
if payload.get("type") != "access":
raise HTTPException(status_code=401, detail="Invalid token type")
return payload
Step 1.1.8: Error Handling for Token Verification
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
except Exception as e:
logger.error(f"Unexpected token verification error: {e}")
raise HTTPException(status_code=401, detail="Authentication failed")
Verification security:
- Blacklist check first prevents using revoked tokens
- Cryptographic validation ensures token integrity
- Type validation prevents refresh tokens being used as access tokens
Step 1.1.9: Blacklist Checking for Revoked Tokens
Token blacklisting enables secure logout and revocation:
def _is_token_blacklisted(self, token: str) -> bool:
"""Check if token has been revoked via blacklist."""
if not self.redis_client:
return False # No Redis = no blacklisting capability
try:
# Hash token to avoid storing sensitive data
token_hash = hashlib.sha256(token.encode()).hexdigest()
return self.redis_client.exists(f"blacklist:{token_hash}")
except Exception:
# Fail securely - log warning but allow access
logger.warning("Could not check token blacklist")
return False
Blacklist security:
- Token hashing prevents storing full tokens in Redis
- Graceful degradation when Redis is unavailable
- Secure failure mode allows access if blacklist check fails
Step 1.1.10: Token Revocation Implementation
def blacklist_token(self, token: str, ttl_seconds: int = None):
"""Add token to blacklist for secure revocation."""
if not self.redis_client:
logger.warning("Cannot blacklist token: Redis not available")
return
try:
# Auto-calculate TTL from token expiry if not provided
if not ttl_seconds:
ttl_seconds = self._calculate_token_ttl(token)
# Store hashed token in blacklist with expiration
token_hash = hashlib.sha256(token.encode()).hexdigest()
self.redis_client.setex(f"blacklist:{token_hash}", ttl_seconds, "revoked")
logger.info(f"Token successfully blacklisted for {ttl_seconds} seconds")
except Exception as e:
logger.error(f"Failed to blacklist token: {e}")
Step 1.1.11: TTL Calculation Helper
def _calculate_token_ttl(self, token: str) -> int:
"""Calculate remaining TTL for token based on expiry."""
try:
# Decode without verifying expiry to get exp claim
payload = jwt.decode(
token, self.secret_key,
algorithms=[self.algorithm],
options={"verify_exp": False}
)
exp = payload.get("exp", 0)
current_time = int(datetime.now(timezone.utc).timestamp())
return max(0, exp - current_time) # Ensure non-negative TTL
except Exception:
# Default to token expiry time if calculation fails
return self.access_token_expire_minutes * 60
Step 1.2: Role-Based Access Control (RBAC)¶
Now let's implement a flexible authorization system:
Step 1.2.1: Permission Enumeration Design
Permissions define specific actions users can perform:
# src/auth/permissions.py
from enum import Enum
from typing import List, Set, Dict
from functools import wraps
from fastapi import HTTPException
import logging
logger = logging.getLogger(__name__)
class Permission(Enum):
"""Fine-grained permissions for MCP server resources."""
READ_WEATHER = "weather:read"
WRITE_WEATHER = "weather:write"
READ_FILES = "files:read"
WRITE_FILES = "files:write"
DELETE_FILES = "files:delete"
ADMIN_USERS = "admin:users"
VIEW_METRICS = "metrics:view"
Permission design principles:
- Namespace format (resource:action) for clear organization
- Granular permissions enable precise access control
- Enum-based prevents typos and enables IDE autocomplete
Step 1.2.2: Role Hierarchy Definition
Roles group permissions into logical user categories:
class Role(Enum):
"""Hierarchical roles from least to most privileged."""
GUEST = "guest" # Read-only basic access
USER = "user" # Standard user capabilities
PREMIUM = "premium" # Enhanced features
ADMIN = "admin" # Full system access
Step 1.2.3: Role-Permission Mapping
# Progressive permission assignment by role
ROLE_PERMISSIONS: Dict[Role, Set[Permission]] = {
Role.GUEST: {
Permission.READ_WEATHER # Basic read access only
},
Role.USER: {
Permission.READ_WEATHER,
Permission.READ_FILES # Add file reading
},
Role.PREMIUM: {
Permission.READ_WEATHER, Permission.WRITE_WEATHER,
Permission.READ_FILES, Permission.WRITE_FILES # Full weather + files
},
Role.ADMIN: {
perm for perm in Permission # All permissions
}
}
Role hierarchy benefits:
- Progressive permissions from guest to admin
- Clear separation of capabilities
- Easy to audit and understand access levels
Step 1.2.4: Permission Validation Helper
def has_permission(user_permissions: List[str], required_permission: Permission) -> bool:
"""Check if user has specific permission."""
return required_permission.value in user_permissions
def get_user_permissions(user_roles: List[str]) -> Set[Permission]:
"""Get all permissions for user's roles."""
permissions = set()
for role_name in user_roles:
try:
role = Role(role_name)
permissions.update(ROLE_PERMISSIONS.get(role, set()))
except ValueError:
logger.warning(f"Unknown role: {role_name}")
return permissions
Step 1.2.5: Permission Checking Decorator
def require_permission(required_permission: Permission):
"""Decorator to enforce permission requirements on MCP tools."""
def decorator(tool_func):
@wraps(tool_func)
async def wrapper(*args, **kwargs):
# Extract user context from request
current_user = get_current_user()
if not current_user:
raise HTTPException(status_code=401, detail="Authentication required")
# Validate user has required permission
user_permissions = current_user.get("permissions", [])
if not has_permission(user_permissions, required_permission):
logger.warning(
f"Permission denied: User {current_user.get('username')} "
f"attempted {required_permission.value}"
)
The decorator continues with error handling and security logging:
raise HTTPException(
status_code=403,
detail=f"Permission '{required_permission.value}' required"
)
return await tool_func(*args, **kwargs)
return wrapper
return decorator
Authorization security:
- Fail-secure design denies access by default
- Audit logging tracks permission violations
- Clear error messages help developers debug issues
Step 1.3: Secure MCP Server Integration¶
Let's integrate our authentication system with the MCP server:
Step 1.3.1: Middleware Class Setup
Authentication middleware intercepts all MCP requests for security validation:
# src/auth/middleware.py
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Dict, Any, Optional
import logging
import time
logger = logging.getLogger(__name__)
class MCPAuthMiddleware:
"""Centralized authentication middleware for MCP server security."""
def __init__(self, jwt_manager: JWTManager):
self.jwt_manager = jwt_manager
self.security = HTTPBearer(auto_error=False) # Custom error handling
self.excluded_paths = {"/health", "/metrics"} # Skip auth for monitoring
Step 1.3.2: Header Extraction and Validation
def _extract_bearer_token(self, request: Request) -> str:
"""Safely extract and validate Bearer token from headers."""
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HTTPException(
status_code=401,
detail="Authorization header required"
)
if not auth_header.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail="Bearer token format required"
)
Now we extract and validate the token format:
# Extract token (handle malformed headers safely)
parts = auth_header.split(" ")
if len(parts) != 2:
raise HTTPException(
status_code=401,
detail="Invalid authorization header format"
)
return parts[1]
Step 1.3.3: Request Authentication Process
async def authenticate_request(self, request: Request) -> Optional[Dict[str, Any]]:
"""Authenticate incoming MCP request with comprehensive validation."""
# Skip authentication for excluded paths
if request.url.path in self.excluded_paths:
return None
start_time = time.time()
try:
# Step 1: Extract token from headers
token = self._extract_bearer_token(request)
# Step 2: Verify token cryptographically
payload = self.jwt_manager.verify_token(token)
The authentication continues with performance logging and success tracking:
# Step 3: Log successful authentication
auth_duration = (time.time() - start_time) * 1000
logger.info(
f"Authentication successful: {payload.get('username')} "
f"(ID: {payload.get('sub')}) in {auth_duration:.2f}ms"
)
return payload
Step 1.3.4: Error Handling and Security Logging
except HTTPException:
# Re-raise HTTP exceptions (already properly formatted)
raise
except Exception as e:
# Log security-relevant errors and fail securely
logger.error(
f"Authentication failure from {request.client.host if request.client else 'unknown'}: {e}",
extra={"request_path": request.url.path, "user_agent": request.headers.get("User-Agent")}
)
raise HTTPException(
status_code=401,
detail="Authentication failed"
)
Middleware security features:
- Path exclusions for health checks and monitoring
- Comprehensive error handling with security logging
- Performance monitoring tracks authentication latency
- IP and user agent logging for security analysis
Step 1.3.5: Server Class and Dependencies
# src/secure_mcp_server.py
from mcp.server.fastmcp import FastMCP
from src.auth.jwt_auth import JWTManager
from src.auth.permissions import require_permission, Permission
from src.auth.middleware import MCPAuthMiddleware
from typing import Dict, List
import re
import os
from pathlib import Path
Now we initialize the secure MCP server with all security components:
class SecureMCPServer:
"""Production-ready MCP server with comprehensive security."""
def __init__(self, name: str = "Secure MCP Server"):
# Core components initialization
self.mcp = FastMCP(name)
self.jwt_manager = JWTManager()
self.auth_middleware = MCPAuthMiddleware(self.jwt_manager)
# Security configuration
self.allowed_file_patterns = [r'^/tmp/.*\.txt$', r'^/data/.*\.(json|csv)$']
self.setup_security_tools()
Step 1.3.6: Basic Security Tools
def setup_security_tools(self):
"""Configure MCP tools with appropriate security controls."""
@self.mcp.tool()
@require_permission(Permission.READ_WEATHER)
async def get_weather(city: str) -> Dict:
"""Get weather data - requires weather:read permission."""
# Input validation
if not city or len(city) > 100:
raise HTTPException(status_code=400, detail="Invalid city name")
# Simulated weather API call
return {
"city": city,
"temperature": 22,
"condition": "Sunny",
"timestamp": datetime.now().isoformat()
}
Step 1.3.7: File Operations with Security
@self.mcp.tool()
@require_permission(Permission.WRITE_FILES)
async def write_file(path: str, content: str) -> Dict:
"""Write file with path validation and size limits."""
# Security validations
if not self._validate_file_path(path):
raise HTTPException(status_code=400, detail="File path not allowed")
if len(content) > 1024 * 1024: # 1MB limit
raise HTTPException(status_code=400, detail="File too large")
# Secure file writing logic would go here
return {"success": True, "path": path, "size": len(content)}
Step 1.3.8: Administrative Functions
@self.mcp.tool()
@require_permission(Permission.ADMIN_USERS)
async def list_users() -> List[Dict]:
"""List system users - admin only."""
# This would typically query a database
return [
{"id": 1, "username": "admin", "role": "admin", "active": True},
{"id": 2, "username": "user1", "role": "user", "active": True}
]
def _validate_file_path(self, path: str) -> bool:
"""Validate file path against security patterns."""
return any(re.match(pattern, path) for pattern in self.allowed_file_patterns)
Security integration benefits:
- Permission decorators provide declarative access control
- Input validation prevents injection attacks
- Path restrictions limit file system access
- Size limits prevent resource exhaustion attacks
Part 2: API Key Management (20 minutes)¶
Security progression: Now that we have JWT authentication working, let's add API keys for machine-to-machine authentication scenarios where interactive login isn't practical.
Step 2.1: API Key System Implementation¶
In addition to JWT tokens, we'll implement API keys for machine-to-machine authentication:
Step 2.1.1: API Key Manager Setup
API keys provide secure authentication for automated systems:
# src/auth/api_keys.py
import secrets
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, List
import uuid
import logging
logger = logging.getLogger(__name__)
class APIKeyManager:
"""Secure API key management with Redis storage and automatic expiration."""
def __init__(self, redis_client):
self.redis_client = redis_client
self.key_prefix = "api_key:"
self.default_expiry_days = 90
self.key_length = 32 # 32 bytes = 256 bits of entropy
Step 2.1.2: Secure Key Generation
def _generate_secure_key(self) -> tuple[str, str]:
"""Generate cryptographically secure API key and ID."""
key_id = str(uuid.uuid4())
# Format: mcp_<base64-url-safe-32-bytes>
api_key = f"mcp_{secrets.token_urlsafe(self.key_length)}"
return key_id, api_key
def _hash_api_key(self, api_key: str) -> str:
"""Hash API key for secure storage."""
return hashlib.sha256(api_key.encode()).hexdigest()
API key security:
- UUID key IDs provide collision-free identifiers
- URL-safe encoding prevents issues in HTTP headers
- SHA256 hashing protects stored keys from database breaches
Step 2.1.3: Metadata Creation
def _create_key_metadata(self, key_id: str, user_id: str, name: str,
permissions: List[str], expiry_days: int) -> Dict:
"""Create comprehensive metadata for API key."""
now = datetime.now()
return {
"key_id": key_id,
"user_id": user_id,
"name": name[:100], # Limit name length
"permissions": permissions,
"created_at": now.isoformat(),
"expires_at": (now + timedelta(days=expiry_days)).isoformat(),
"last_used": None,
"usage_count": 0,
"active": True,
"created_by": "system" # Track creation source
}
Step 2.1.4: Complete Key Generation
def generate_api_key(self, user_id: str, name: str, permissions: List[str],
expires_in_days: int = None) -> Dict[str, str]:
"""Generate a new API key with full security metadata."""
# Input validation
if not user_id or not name:
raise ValueError("User ID and name are required")
if not permissions:
raise ValueError("At least one permission must be specified")
# Generate secure key components
key_id, api_key = self._generate_secure_key()
key_hash = self._hash_api_key(api_key)
Next, we create and store the key metadata securely:
# Create metadata and store securely
expiry_days = expires_in_days or self.default_expiry_days
metadata = self._create_key_metadata(key_id, user_id, name, permissions, expiry_days)
# Store in Redis with automatic expiration
self._store_key_metadata(key_hash, metadata, expiry_days)
logger.info(f"Generated API key {key_id} for user {user_id}")
return {
"api_key": api_key,
"key_id": key_id,
"expires_at": metadata["expires_at"]
}
Step 2.1.5: Secure Storage Helper
def _store_key_metadata(self, key_hash: str, metadata: Dict, expiry_days: int):
"""Securely store key metadata in Redis with TTL."""
try:
ttl_seconds = expiry_days * 24 * 60 * 60
self.redis_client.setex(
f"{self.key_prefix}{key_hash}",
ttl_seconds,
json.dumps(metadata)
)
except Exception as e:
logger.error(f"Failed to store API key metadata: {e}")
raise RuntimeError("Could not store API key")
API key generation security:
- Automatic expiration reduces long-term exposure risk
- Secure random generation prevents key prediction
- Metadata tracking enables usage monitoring and auditing
- Error handling prevents partial key creation
Step 2.1.6: Initial Key Validation
API key validation follows a multi-step security process:
def validate_api_key(self, api_key: str) -> Optional[Dict]:
"""Validate API key with comprehensive security checks."""
# Step 1: Format validation
if not self._is_valid_key_format(api_key):
return None
try:
# Step 2: Retrieve and validate metadata
metadata = self._get_key_metadata(api_key)
if not metadata:
return None
Finally, we check the key status and update usage tracking:
# Step 3: Check key status and update usage
if self._is_key_active(metadata):
self._update_key_usage(api_key, metadata)
return metadata
return None
except Exception as e:
logger.error(f"API key validation error: {e}")
return None
Step 2.1.7: Format and Prefix Validation
def _is_valid_key_format(self, api_key: str) -> bool:
"""Validate API key format and structure."""
if not api_key or not isinstance(api_key, str):
return False
# Check prefix and length
if not api_key.startswith("mcp_"):
logger.warning(f"Invalid API key prefix from request")
return False
# Expected length: "mcp_" + 32 URL-safe base64 chars = ~47 chars
if len(api_key) < 40 or len(api_key) > 60:
logger.warning(f"Invalid API key length: {len(api_key)}")
return False
return True
Step 2.1.8: Metadata Retrieval
def _get_key_metadata(self, api_key: str) -> Optional[Dict]:
"""Securely retrieve key metadata from storage."""
key_hash = self._hash_api_key(api_key)
try:
metadata_json = self.redis_client.get(f"{self.key_prefix}{key_hash}")
if not metadata_json:
logger.info(f"API key not found in storage")
return None
metadata = json.loads(metadata_json)
return metadata
except json.JSONDecodeError as e:
logger.error(f"Corrupted API key metadata: {e}")
return None
except Exception as e:
logger.error(f"Failed to retrieve API key metadata: {e}")
return None
Step 2.1.9: Active Status Checking
def _is_key_active(self, metadata: Dict) -> bool:
"""Check if API key is still active and valid."""
# Check active flag
if not metadata.get("active", False):
logger.info(f"API key {metadata.get('key_id')} is deactivated")
return False
# Check expiration (additional safety check)
expires_at = metadata.get("expires_at")
if expires_at:
try:
expiry = datetime.fromisoformat(expires_at.replace('Z', '+00:00'))
if datetime.now() > expiry.replace(tzinfo=None):
logger.info(f"API key {metadata.get('key_id')} has expired")
return False
except (ValueError, TypeError) as e:
logger.warning(f"Invalid expiry date format: {e}")
return False
return True
Step 2.1.10: Usage Tracking Update
def _update_key_usage(self, api_key: str, metadata: Dict):
"""Update key usage statistics atomically."""
try:
key_hash = self._hash_api_key(api_key)
redis_key = f"{self.key_prefix}{key_hash}"
# Update usage statistics
metadata["last_used"] = datetime.now().isoformat()
metadata["usage_count"] = metadata.get("usage_count", 0) + 1
# Preserve existing TTL
ttl = self.redis_client.ttl(redis_key)
if ttl > 0:
self.redis_client.setex(redis_key, ttl, json.dumps(metadata))
Usage tracking continues with fallback handling for expired TTL:
else:
# Fallback: use default expiry
self.redis_client.setex(
redis_key,
self.default_expiry_days * 24 * 60 * 60,
json.dumps(metadata)
)
except Exception as e:
# Usage tracking failure shouldn't block authentication
logger.warning(f"Failed to update key usage statistics: {e}")
API key validation security:
- Multi-step validation prevents bypassing security checks
- Format validation blocks malformed or malicious keys
- Active status checking enforces key revocation
- Usage tracking enables monitoring and rate limiting
- Graceful error handling maintains system availability
Part 3: Rate Limiting and DDoS Protection (15 minutes)¶
Security progression: With authentication established, we now add rate limiting to prevent abuse and DDoS attacks, completing our defense-in-depth strategy.
Step 3.1: Implementing Rate Limiting¶
Step 3.1.1: Rate Limiter Class Setup
Token bucket algorithm provides smooth rate limiting with burst capability:
# src/security/rate_limiter.py
import time
import json
import logging
from typing import Optional, Dict
import redis
logger = logging.getLogger(__name__)
class TokenBucketRateLimiter:
"""Token bucket rate limiter with Redis backend for distributed rate limiting."""
def __init__(self, redis_client, default_capacity: int = 100,
default_refill_rate: float = 10):
self.redis_client = redis_client
self.default_capacity = default_capacity
self.default_refill_rate = default_refill_rate # tokens per second
self.bucket_prefix = "rate_limit:"
self.bucket_ttl = 3600 # 1 hour cleanup TTL
Token bucket principles:
- Capacity: Maximum burst size allowed
- Refill rate: Steady-state requests per second
- Distributed: Redis enables rate limiting across multiple servers
Step 3.1.2: Bucket State Retrieval
def _get_bucket_state(self, bucket_key: str) -> tuple[float, float]:
"""Retrieve current bucket state from Redis."""
try:
bucket_data = self.redis_client.get(bucket_key)
if bucket_data:
bucket = json.loads(bucket_data)
return bucket["last_refill"], bucket["tokens"]
else:
# Initialize new bucket - start with full capacity
return time.time(), float(self.default_capacity)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Corrupted bucket data for {bucket_key}: {e}")
return time.time(), float(self.default_capacity)
Step 3.1.3: Token Calculation Algorithm
def _calculate_tokens(self, last_refill: float, current_tokens: float,
current_time: float, capacity: int, refill_rate: float) -> float:
"""Calculate current token count using bucket algorithm."""
# Calculate time elapsed since last refill
time_elapsed = max(0, current_time - last_refill)
# Calculate tokens to add (rate * time)
tokens_to_add = time_elapsed * refill_rate
# Add tokens but don't exceed capacity (bucket overflow)
new_tokens = min(float(capacity), current_tokens + tokens_to_add)
return new_tokens
Step 3.1.4: Request Authorization Check
async def is_allowed(self, identifier: str, capacity: int = None,
refill_rate: float = None) -> bool:
"""Check if request should be allowed based on rate limit."""
# Use provided limits or defaults
capacity = capacity or self.default_capacity
refill_rate = refill_rate or self.default_refill_rate
bucket_key = f"{self.bucket_prefix}{identifier}"
current_time = time.time()
try:
# Step 1: Get current bucket state
last_refill, current_tokens = self._get_bucket_state(bucket_key)
# Step 2: Calculate available tokens
available_tokens = self._calculate_tokens(
last_refill, current_tokens, current_time, capacity, refill_rate
)
Now we determine if the request should be allowed and update bucket state:
# Step 3: Check if request can be allowed
if available_tokens >= 1.0:
# Allow request and consume token
remaining_tokens = available_tokens - 1.0
allowed = True
else:
# Deny request, no tokens consumed
remaining_tokens = available_tokens
allowed = False
# Step 4: Update bucket state
self._update_bucket_state(bucket_key, current_time, remaining_tokens)
return allowed
Step 3.1.5: State Persistence
def _update_bucket_state(self, bucket_key: str, timestamp: float, tokens: float):
"""Update bucket state in Redis with TTL."""
try:
new_bucket = {
"tokens": tokens,
"last_refill": timestamp
}
self.redis_client.setex(
bucket_key,
self.bucket_ttl,
json.dumps(new_bucket)
)
except Exception as e:
logger.error(f"Failed to update bucket state for {bucket_key}: {e}")
except Exception as e:
logger.error(f"Rate limiting error for {identifier}: {e}")
return True # Fail open - availability over strict rate limiting
Rate limiting security:
- Fail-open design maintains availability during Redis outages
- TTL cleanup prevents infinite bucket accumulation
- Precise timing ensures accurate rate calculations
- Distributed consistency works across multiple server instances
Step 3.1.6: Middleware Class and Role-Based Limits
Rate limiting middleware applies different limits based on user privileges:
class RateLimitMiddleware:
"""Role-based rate limiting middleware for MCP servers."""
def __init__(self, rate_limiter: TokenBucketRateLimiter):
self.rate_limiter = rate_limiter
# Hierarchical rate limits by user role
self.limits = {
"guest": {"capacity": 50, "refill_rate": 1}, # Very restrictive
"user": {"capacity": 200, "refill_rate": 5}, # Standard limits
"premium": {"capacity": 1000, "refill_rate": 20}, # Enhanced limits
"admin": {"capacity": 5000, "refill_rate": 100} # High limits
}
Step 3.1.7: User Identification and Limit Selection
def _get_rate_limits(self, user_data: Dict) -> tuple[str, Dict]:
"""Determine user identifier and appropriate rate limits."""
# Extract primary role (use first role if multiple)
user_roles = user_data.get("roles", ["guest"])
primary_role = user_roles[0] if user_roles else "guest"
# Create unique identifier for this user
user_id = user_data.get("sub", "anonymous")
identifier = f"user:{user_id}"
# Get limits for this role (fallback to guest if unknown role)
limits = self.limits.get(primary_role, self.limits["guest"])
return identifier, limits
Step 3.1.8: Rate Limit Enforcement
async def check_rate_limit(self, request: Request, user_data: Dict) -> bool:
"""Enforce rate limits and block excessive requests."""
# Determine user limits
identifier, limits = self._get_rate_limits(user_data)
# Apply rate limiting
allowed = await self.rate_limiter.is_allowed(
identifier,
capacity=limits["capacity"],
refill_rate=limits["refill_rate"]
)
if not allowed:
# Log rate limit violation
logger.warning(
f"Rate limit exceeded: {identifier} "
f"(role: {user_data.get('roles', ['unknown'])[0]})"
)
# Return standardized rate limit response
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Please try again later.",
headers={"Retry-After": "60"} # Suggest retry after 60 seconds
)
return True
Rate limiting middleware features:
- Role-based limits provide fair access based on user tier
- Unique identifiers prevent cross-user rate limit sharing
- Standard HTTP 429 responses with retry guidance
- Security logging enables monitoring and alerting
- Graceful degradation when rate limiter fails
Chapter Summary¶
Security implementation complete! You've built a production-ready security system that demonstrates defense-in-depth principles through multiple complementary layers.
You've successfully implemented a comprehensive security system for MCP servers! Let's review what you've accomplished:
Security Features Implemented:¶
Authentication & Authorization¶
- JWT token system with access and refresh tokens
- API key management with automatic rotation
- Role-based access control (RBAC) with fine-grained permissions
- Token blacklisting for secure logout and revocation
Protection Mechanisms¶
- Rate limiting with token bucket algorithm
- Input validation and sanitization
- Permission decorators for easy tool protection
- Secure secret management with environment variables
Security Monitoring¶
- Audit logging for all authentication events
- Usage tracking for API keys and tokens
- Error handling with secure failure modes
- Security metrics for monitoring and alerting
Production Security Considerations:¶
- Encryption: All tokens use strong cryptographic algorithms
- Storage: Sensitive data is hashed, never stored in plaintext
- Expiration: Short-lived tokens limit exposure windows
- Monitoring: Comprehensive logging enables threat detection
- Scalability: Redis backend supports high-throughput scenarios
Testing Your Understanding¶
Quick Check Questions¶
- Why do we use short-lived access tokens (30 minutes) instead of long-lived ones?
A) To reduce server load
B) To limit exposure if tokens are compromised
C) To improve performance
D) To simplify implementation
A) To improve token performance
B) To enable secure logout and token revocation
C) To reduce memory usage
D) To simplify token validation
A) It counts requests per minute
B) It blocks all requests after a limit
C) It allows bursts but limits average rate
D) It only limits failed requests
A) To save storage space
B) To improve lookup performance
C) To prevent key theft from database breaches
D) To enable key rotation
A) Always allow access when in doubt
B) Always deny access when systems fail
C) Log all security events
D) Use multiple authentication methods
Practical Exercise¶
Implement a security audit system that tracks suspicious activity:
@mcp.tool()
@require_permission(Permission.VIEW_METRICS)
async def get_security_audit(time_range: str = "24h") -> Dict:
"""
Get security audit information for the specified time range.
TODO: Implement audit system that tracks:
1. Failed authentication attempts
2. Permission denied events
3. Rate limit violations
4. Unusual access patterns
Args:
time_range: Time range for audit (1h, 24h, 7d)
Returns:
Security audit report with metrics and alerts
"""
# Your implementation here
pass
Next Session Preview¶
In Session 6, we'll focus on Advanced MCP Patterns including:
- Custom transport protocols for specialized use cases
- Event-driven MCP architectures with webhooks
- Distributed MCP server clusters and load balancing
- Real-time streaming data with Server-Sent Events
Homework¶
- Implement OAuth 2.0 integration for third-party authentication providers
- Create a security dashboard showing real-time threat metrics
- Add IP-based rate limiting in addition to user-based limits
- Implement certificate-based authentication for high-security environments
Hint: Check the Session5_Test_Solutions.md file for complete implementations and advanced security patterns.
Multiple Choice Test - Session 5 (15 minutes)¶
Test your understanding of Secure MCP Servers:
Question 1: What security approach does the session recommend for MCP servers? A) Client-side security only
B) Single-layer authentication only
C) Defense-in-depth with multiple security layers
D) Network security only
Question 2: What is the minimum recommended length for JWT secret keys? A) 16 characters
B) 64 characters
C) 32 characters
D) 24 characters
Question 3: How should refresh tokens be handled for maximum security? A) Include them in URL parameters
B) Store them in localStorage
C) Store them in browser cookies only
D) Use Redis with automatic expiration and blacklisting
Question 4: Which rate limiting algorithm provides the best balance of fairness and burst handling? A) Fixed window
B) Sliding window
C) Leaky bucket
D) Token bucket
Question 5: What is the advantage of role-based permissions over user-specific permissions? A) Higher security
B) Better performance
C) Easier management and scalability
D) Simpler implementation
Question 6: What is the recommended approach for validating MCP tool inputs? A) Server-side validation using Pydantic models
B) Database constraints only
C) Client-side validation only
D) No validation needed
Question 7: What TLS version should be the minimum requirement for production MCP servers? A) SSL 3.0
B) TLS 1.1
C) TLS 1.0
D) TLS 1.2
Question 8: How should API keys be rotated securely in production? A) Rotate only when compromised
B) Automatic rotation with overlap periods
C) Never rotate keys
D) Manual rotation monthly
Question 9: What information is most critical to include in security audit logs? A) System performance metrics
B) Only successful operations
C) Debug information only
D) Authentication events and permission changes
Question 10: Which technique is most effective for protecting MCP servers from DDoS attacks? A) Blocking all international traffic
B) Using only strong authentication
C) Implementing multiple rate limiting layers
D) Increasing server capacity
Navigation¶
Previous: Session 4 - Production MCP Deployment
Next: Session 6 - ACP Fundamentals →
Additional Resources¶
- OWASP API Security Top 10
- JWT Security Best Practices
- Redis Security Guidelines
- Python Cryptography Documentation
- FastAPI Security Patterns
Remember: Security is not a feature, it's a foundation. Always assume breach, validate everything, and log comprehensively! 🔒