Session 3 - Module A: Enterprise Agent Patterns¶
⚠️ ADVANCED OPTIONAL MODULE Prerequisites: Complete Session 3 core content first.
Enterprise MCP agent deployments require sophisticated patterns for production reliability, including circuit breakers, connection pooling, security controls, and comprehensive monitoring.
Advanced Production Patterns¶
Pattern 1: Circuit Breaker for MCP Server Resilience¶
Enterprise agents must handle partial system failures gracefully. The circuit breaker pattern prevents cascading failures when MCP servers become unavailable.
# utils/circuit_breaker.py - Essential imports for fault tolerance
import asyncio
import time
from enum import Enum
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
Circuit breaker implementation requires precise timing control and async operation handling. The asyncio
module provides timeout functionality crucial for preventing hanging operations, while time
enables recovery timeout calculations. These imports establish the foundation for implementing the Circuit Breaker Pattern - a fundamental enterprise resilience pattern.
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failure state - reject requests
HALF_OPEN = "half_open" # Testing if service recovered
The three-state model implements the classic circuit breaker pattern. CLOSED state allows normal operations with failure monitoring. OPEN state blocks all requests to protect failing services and prevent cascading failures. HALF_OPEN state carefully tests service recovery by allowing limited traffic. This state machine prevents both overwhelming failing services and missing service recovery.
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Failures before opening
recovery_timeout: int = 60 # Seconds before trying again
success_threshold: int = 3 # Successes needed to close
timeout: int = 30 # Request timeout
Configuration parameters balance fault detection speed with stability. The failure threshold of 5 prevents opening on transient issues while catching persistent problems quickly. 60-second recovery timeout allows sufficient time for service restoration without excessive delays. Success threshold of 3 ensures genuine recovery before resuming full traffic flow.
class CircuitBreaker:
"""Circuit breaker for MCP server calls with enterprise reliability."""
def __init__(self, name: str, config: CircuitBreakerConfig):
self.name = name
self.config = config
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
Circuit breaker initialization establishes the failure tracking state. Starting in CLOSED state assumes services are healthy until proven otherwise. The counters track consecutive failures and successes, while last_failure_time
enables recovery timeout calculations. Named circuit breakers enable monitoring multiple services independently.
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"circuit_opens": 0
}
Comprehensive metrics enable operational visibility into circuit breaker behavior. Total requests show traffic volume, success/failure ratios indicate service health, and circuit opens count helps identify unstable services. These metrics integrate with enterprise monitoring systems for alerting and capacity planning.
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection."""
self.metrics["total_requests"] += 1
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.config.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
logger.info(f"Circuit breaker {self.name} entering HALF_OPEN state")
else:
raise CircuitBreakerOpenError(f"Circuit breaker {self.name} is OPEN")
The call method implements the core circuit breaker logic. Request counting provides traffic metrics. OPEN state handling either rejects requests immediately (protecting failing services) or transitions to HALF_OPEN after the recovery timeout. This automatic transition enables self-healing behavior without manual intervention.
try:
# Execute with timeout
result = await asyncio.wait_for(
func(*args, **kwargs),
timeout=self.config.timeout
)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise e
Operation execution includes timeout protection to prevent hanging on unresponsive services. Success and failure handlers update circuit breaker state based on results. Re-raising exceptions preserves error context for calling code while still tracking the failure. This approach implements transparent fault tolerance - the circuit breaker is invisible when services are healthy.
async def _on_success(self):
"""Handle successful operation."""
self.metrics["successful_requests"] += 1
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
logger.info(f"Circuit breaker {self.name} CLOSED - service recovered")
else:
self.failure_count = 0 # Reset failure count on success
Success handling implements recovery verification. In HALF_OPEN state, consecutive successes prove service recovery before transitioning to CLOSED. In CLOSED state, any success resets failure counters, preventing circuit opening due to old failures. This logic ensures the circuit breaker only opens for current, persistent failures.
async def _on_failure(self):
"""Handle failed operation."""
self.metrics["failed_requests"] += 1
self.failure_count += 1
self.last_failure_time = time.time()
if (self.state == CircuitState.CLOSED and
self.failure_count >= self.config.failure_threshold):
self.state = CircuitState.OPEN
self.metrics["circuit_opens"] += 1
logger.error(f"Circuit breaker {self.name} OPENED - service failing")
elif self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
logger.warning(f"Circuit breaker {self.name} back to OPEN - service still failing")
Failure handling tracks consecutive failures and timestamps for recovery timeout calculations. CLOSED state transitions to OPEN when failure threshold is exceeded, implementing fail-fast behavior. HALF_OPEN failures immediately return to OPEN, indicating the service hasn't recovered. This prevents partial recovery oscillation that could overwhelm struggling services.
The custom exception enables explicit fault handling - calling code can distinguish between circuit breaker protection and actual service failures. This allows applications to implement fallback strategies (cached responses, default values) when services are temporarily unavailable due to circuit breaker protection.
Pattern 2: Enhanced MCP Manager with Connection Pooling¶
Production deployments require efficient resource management and connection pooling to handle high concurrency.
# utils/enterprise_mcp_manager.py - Essential imports for enterprise patterns
import asyncio
import time
from typing import Dict, List, Optional, Any
from contextlib import asynccontextmanager
from langchain_mcp_adapters import MCPAdapter
from config import Config, MCPServerConfig
from .circuit_breaker import CircuitBreaker, CircuitBreakerConfig
logger = logging.getLogger(__name__)
This import section establishes the foundation for enterprise MCP management. The asynccontextmanager
is crucial for resource lifecycle management - it ensures connections are properly returned to the pool even if exceptions occur. The MCPAdapter
comes from LangChain's MCP integration, providing the core interface for communicating with MCP servers in enterprise environments.
class ConnectionPool:
"""Manages a pool of MCP adapter connections for high concurrency."""
def __init__(self, server_config: MCPServerConfig, pool_size: int = 5):
self.server_config = server_config
self.pool_size = pool_size
self.available_connections: asyncio.Queue = asyncio.Queue(maxsize=pool_size)
self.total_connections = 0
The ConnectionPool class implements the object pool pattern - a fundamental enterprise pattern for managing expensive resources. Here we use an asyncio.Queue
with a maximum size to control concurrency. In production environments, creating MCP connections has significant overhead (process spawning, IPC setup), so pooling dramatically improves performance under load. The queue acts as a bounded buffer, preventing resource exhaustion.
self.metrics = {
"connections_created": 0,
"connections_reused": 0,
"pool_exhausted_count": 0
}
async def initialize(self):
"""Pre-populate connection pool."""
for _ in range(self.pool_size):
adapter = await self._create_connection()
if adapter:
await self.available_connections.put(adapter)
Metrics collection is essential for enterprise observability - these counters help operations teams understand connection pool efficiency and identify scaling needs. The initialization method implements warm-up: pre-creating connections during startup rather than on-demand. This eliminates cold-start latency for the first users, a critical pattern for enterprise SLAs.
async def _create_connection(self) -> Optional[MCPAdapter]:
"""Create a new MCP adapter connection."""
try:
adapter = MCPAdapter(
command=self.server_config.command,
args=self.server_config.args,
timeout=self.server_config.timeout
)
await adapter.start()
self.total_connections += 1
self.metrics["connections_created"] += 1
return adapter
except Exception as e:
logger.error(f"Failed to create connection for {self.server_config.name}: {e}")
return None
Connection creation handles the complex process of spawning MCP server processes and establishing communication channels. The timeout configuration is crucial for enterprise deployments - it prevents hanging connections from consuming resources indefinitely. Notice the defensive programming: returning None
on failure rather than raising exceptions, allowing the system to degrade gracefully when individual connections fail.
@asynccontextmanager
async def get_connection(self):
"""Get connection from pool with automatic return."""
adapter = None
try:
# Try to get existing connection
try:
adapter = self.available_connections.get_nowait()
self.metrics["connections_reused"] += 1
except asyncio.QueueEmpty:
# Pool exhausted, create new connection
self.metrics["pool_exhausted_count"] += 1
adapter = await self._create_connection()
if not adapter:
raise ConnectionError(f"Failed to create connection for {self.server_config.name}")
The context manager pattern ensures resource safety - connections are automatically returned even if exceptions occur during use. The two-tier approach (reuse existing, then create new) implements elastic scaling: the pool can temporarily exceed its base size during traffic spikes, then contract back during quiet periods. Pool exhaustion metrics help capacity planning.
yield adapter
finally:
# Return connection to pool if still valid
if adapter:
try:
self.available_connections.put_nowait(adapter)
except asyncio.QueueFull:
# Pool full, close excess connection
await adapter.stop()
self.total_connections -= 1
The finally
block implements the cleanup guarantee - connections are always returned to the pool or properly closed. When the pool is full (during traffic spike recovery), excess connections are terminated rather than leaked. This prevents resource accumulation and ensures the system returns to its steady-state resource usage.
class EnterpriseMCPManager:
"""Production-grade MCP server manager with advanced patterns."""
def __init__(self, server_configs: List[MCPServerConfig]):
self.server_configs = {config.name: config for config in server_configs}
self.connection_pools: Dict[str, ConnectionPool] = {}
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
self.health_status: Dict[str, bool] = {}
The EnterpriseMCPManager orchestrates multiple enterprise patterns simultaneously. It maintains separate connection pools per MCP server (horizontal scaling), individual circuit breakers for fault isolation, and health status tracking for operations dashboards. This design enables independent failure domains - problems with one MCP server don't affect others.
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"average_response_time": 0.0
}
self._monitoring_task: Optional[asyncio.Task] = None
Global metrics provide system-wide visibility crucial for enterprise monitoring. The average response time uses a rolling calculation to provide real-time performance insights. The monitoring task reference allows for graceful shutdown - a critical requirement for enterprise applications that need clean restarts during maintenance windows.
async def initialize(self):
"""Initialize all connection pools and circuit breakers."""
for name, config in self.server_configs.items():
# Create connection pool
pool = ConnectionPool(config, pool_size=5)
await pool.initialize()
self.connection_pools[name] = pool
Initialization follows the fail-fast principle - all critical resources are established during startup rather than on first use. This allows operators to detect configuration issues immediately rather than discovering them when users attempt operations. The sequential initialization ensures each pool is fully ready before proceeding to the next.
# Create circuit breaker
cb_config = CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout=60,
success_threshold=3,
timeout=config.timeout
)
self.circuit_breakers[name] = CircuitBreaker(name, cb_config)
self.health_status[name] = True
logger.info(f"Initialized enterprise MCP manager for {name}")
Circuit breaker configuration balances fault tolerance with recovery speed. A threshold of 5 failures prevents flapping on transient issues, while 60-second recovery timeout allows sufficient time for service restoration. The 3-success requirement ensures the service is genuinely recovered before resuming full traffic. These parameters are tuned for typical enterprise service characteristics.
# Start monitoring task
self._monitoring_task = asyncio.create_task(self._monitoring_loop())
async def call_tool(self, server_name: str, tool_name: str, args: Dict[str, Any]) -> Any:
"""Call tool with enterprise patterns: pooling, circuit breaker, metrics."""
start_time = time.time()
self.metrics["total_requests"] += 1
if server_name not in self.connection_pools:
raise ValueError(f"Server {server_name} not configured")
The monitoring loop provides active health checking - continuously verifying service availability rather than waiting for user-facing failures. Request timing begins immediately to capture all overhead, including pool acquisition time. Input validation prevents cryptic failures downstream when invalid server names are used.
pool = self.connection_pools[server_name]
circuit_breaker = self.circuit_breakers[server_name]
try:
async def _call_with_pool():
async with pool.get_connection() as adapter:
return await adapter.call_tool(tool_name, args)
# Execute with circuit breaker protection
result = await circuit_breaker.call(_call_with_pool)
This demonstrates pattern composition - the circuit breaker wraps the connection pool operation, providing multiple layers of protection. The inner function encapsulates the pool usage pattern, while the circuit breaker provides fault tolerance. This layered approach is fundamental to enterprise resilience architecture.
# Update metrics
response_time = time.time() - start_time
self.metrics["successful_requests"] += 1
self._update_average_response_time(response_time)
return result
except Exception as e:
self.metrics["failed_requests"] += 1
self.health_status[server_name] = False
logger.error(f"Tool call failed for {server_name}.{tool_name}: {e}")
raise
Metrics are updated in both success and failure paths to ensure complete observability. Setting health status to False
triggers alerting systems and may influence load balancing decisions in multi-region deployments. The exception is re-raised to preserve the original error context for calling code, following the transparency principle.
def _update_average_response_time(self, response_time: float):
"""Update rolling average response time."""
current_avg = self.metrics["average_response_time"]
total_requests = self.metrics["successful_requests"]
# Calculate rolling average
self.metrics["average_response_time"] = (
(current_avg * (total_requests - 1) + response_time) / total_requests
)
The rolling average calculation provides real-time performance visibility without storing historical data points. This approach scales to high-traffic environments where storing individual response times would consume excessive memory. The calculation weights recent performance more heavily as the denominator grows, providing insight into current system behavior.
async def _monitoring_loop(self):
"""Continuous monitoring and health checks."""
while True:
try:
for name in self.server_configs.keys():
await self._health_check(name)
await asyncio.sleep(30) # Check every 30 seconds
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Monitoring loop error: {e}")
await asyncio.sleep(5)
The monitoring loop implements proactive health checking with graceful error handling. The 30-second interval balances freshness with resource consumption. Catching CancelledError
specifically allows for clean shutdown, while generic exception handling ensures monitoring continues even if individual health checks fail. The shorter sleep on errors enables faster recovery from transient monitoring issues.
async def _health_check(self, server_name: str):
"""Perform health check on server."""
try:
# Simple health check by listing tools
pool = self.connection_pools[server_name]
async with pool.get_connection() as adapter:
await adapter.list_tools()
self.health_status[server_name] = True
except Exception as e:
self.health_status[server_name] = False
logger.warning(f"Health check failed for {server_name}: {e}")
Health checks use a lightweight operation (list_tools
) that exercises the complete communication path without side effects. This approach detects connection, process, and protocol issues while avoiding expensive operations during monitoring. The binary health status feeds into alerting systems and operational dashboards.
def get_metrics(self) -> Dict[str, Any]:
"""Get comprehensive metrics for monitoring."""
server_metrics = {}
for name, cb in self.circuit_breakers.items():
server_metrics[name] = {
"circuit_breaker_state": cb.state.value,
"circuit_breaker_metrics": cb.metrics,
"connection_pool_metrics": self.connection_pools[name].metrics,
"health_status": self.health_status[name]
}
return {
"global_metrics": self.metrics,
"server_metrics": server_metrics
}
The metrics endpoint provides hierarchical observability - global system metrics plus per-server details. This structure supports both high-level dashboard views and detailed troubleshooting. Including circuit breaker states helps operators understand why certain servers might be unavailable, enabling informed intervention decisions.
async def cleanup(self):
"""Clean up resources."""
if self._monitoring_task:
self._monitoring_task.cancel()
try:
await self._monitoring_task
except asyncio.CancelledError:
pass
# Close all connections
for pool in self.connection_pools.values():
while pool.total_connections > 0:
try:
adapter = pool.available_connections.get_nowait()
await adapter.stop()
pool.total_connections -= 1
except asyncio.QueueEmpty:
break
Proper cleanup is essential for enterprise applications that need clean restarts during maintenance. The monitoring task is cancelled gracefully, and all pooled connections are explicitly closed to prevent process leaks. This pattern ensures the system can restart cleanly without leaving zombie processes or locked resources.
Pattern 3: Enterprise Security and Access Control¶
Production agents require sophisticated access control and audit logging.
# security/enterprise_auth.py - Enterprise security imports
import jwt
import time
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from enum import Enum
import logging
logger = logging.getLogger(__name__)
Enterprise security requires careful handling of JSON Web Tokens (JWT) for authentication. The jwt
library provides cryptographically secure token validation essential for enterprise environments. Time-based expiration checking prevents stale credentials from being used, while logging ensures all security events are tracked for compliance.
class Permission(Enum):
READ_WEATHER = "weather:read"
READ_FILES = "files:read"
WRITE_FILES = "files:write"
QUERY_DATABASE = "database:query"
MODIFY_DATABASE = "database:modify"
ADMIN_TOOLS = "admin:*"
Permission enumeration implements principle of least privilege - defining granular access rights that map to specific enterprise operations. The string values follow a namespace:action pattern enabling flexible authorization policies. The wildcard admin:*
provides superuser access while maintaining auditability. This structure supports enterprise compliance requirements for access control documentation.
@dataclass
class UserContext:
user_id: str
roles: Set[str]
permissions: Set[Permission]
session_id: str
expires_at: float
organization_id: str
UserContext encapsulates all security-relevant user information in a single immutable structure. Including organization_id
enables multi-tenant security - ensuring users can only access resources within their organization. The expires_at
timestamp enforces session lifetime limits, while session_id
enables session tracking and revocation capabilities essential for enterprise security.
class EnterpriseAuthenticator:
"""Enterprise authentication and authorization for MCP agents."""
def __init__(self, jwt_secret: str, default_permissions: Dict[str, List[Permission]]):
self.jwt_secret = jwt_secret
self.default_permissions = default_permissions
self.active_sessions: Dict[str, UserContext] = {}
self.audit_log: List[Dict] = []
The authenticator maintains role-based access control (RBAC) through default_permissions
mapping. The jwt_secret
must be cryptographically secure and shared across service instances for token validation. Active session tracking enables real-time access control and supports security features like concurrent session limits and forced logouts.
def authenticate_token(self, token: str) -> Optional[UserContext]:
"""Authenticate JWT token and return user context."""
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
user_id = payload.get("user_id")
roles = set(payload.get("roles", []))
session_id = payload.get("session_id")
expires_at = payload.get("exp", 0)
organization_id = payload.get("org_id")
JWT token validation uses HMAC-SHA256 algorithm providing cryptographic integrity - ensuring tokens cannot be forged without the secret key. Extracting claims from the payload establishes user identity and roles. The use of get()
with defaults prevents KeyError exceptions when tokens have missing fields, implementing defensive programming for security-critical code.
if time.time() > expires_at:
self._audit("TOKEN_EXPIRED", {"user_id": user_id})
return None
# Calculate permissions from roles
permissions = set()
for role in roles:
permissions.update(self.default_permissions.get(role, []))
Expiration checking prevents replay attacks using old credentials. All security events including expired tokens are audited for forensic analysis. Permission calculation implements role-based access control - users inherit all permissions from their assigned roles. This approach scales better than direct permission assignment in large organizations.
user_context = UserContext(
user_id=user_id,
roles=roles,
permissions=permissions,
session_id=session_id,
expires_at=expires_at,
organization_id=organization_id
)
self.active_sessions[session_id] = user_context
self._audit("USER_AUTHENTICATED", {"user_id": user_id, "roles": list(roles)})
return user_context
Successful authentication creates a UserContext with computed permissions, enabling stateless authorization - subsequent requests don't need to recalculate permissions. Session tracking enables advanced features like concurrent session limits and administrative logouts. Authentication events are audited with user and role information for compliance reporting.
Invalid token handling implements security-first design - any JWT validation failure results in authentication denial and audit logging. This catches forged tokens, signature mismatches, and malformed tokens. The specific error is logged for security analysis while clients receive a simple rejection.
def authorize_tool_access(self, user_context: UserContext, server_name: str, tool_name: str) -> bool:
"""Check if user has permission to access specific tool."""
required_permission = self._get_required_permission(server_name, tool_name)
if required_permission in user_context.permissions:
self._audit("TOOL_ACCESS_GRANTED", {
"user_id": user_context.user_id,
"server": server_name,
"tool": tool_name
})
return True
Authorization implements fine-grained access control at the MCP tool level. Every tool access requires explicit permission checking, preventing privilege escalation. Successful access is audited with full context (user, server, tool) enabling detailed access analysis and supporting compliance requirements for data access logging.
self._audit("TOOL_ACCESS_DENIED", {
"user_id": user_context.user_id,
"server": server_name,
"tool": tool_name,
"required_permission": required_permission.value
})
return False
Access denial auditing captures both the attempt and the required permission, enabling security analysis and helping administrators understand why access was denied. This information supports zero-trust security - every access attempt is logged and can be analyzed for suspicious patterns.
def _get_required_permission(self, server_name: str, tool_name: str) -> Permission:
"""Map server/tool combinations to required permissions."""
permission_map = {
("weather", "get_current_weather"): Permission.READ_WEATHER,
("weather", "get_forecast"): Permission.READ_WEATHER,
("filesystem", "read_file"): Permission.READ_FILES,
("filesystem", "write_file"): Permission.WRITE_FILES,
("filesystem", "list_files"): Permission.READ_FILES,
("database", "query"): Permission.QUERY_DATABASE,
("database", "insert"): Permission.MODIFY_DATABASE,
("database", "update"): Permission.MODIFY_DATABASE,
}
return permission_map.get((server_name, tool_name), Permission.ADMIN_TOOLS)
Permission mapping implements declarative security policy - clearly defining what permission each MCP tool requires. The tuple-based mapping allows precise control over server-tool combinations. Defaulting to ADMIN_TOOLS
for unmapped operations implements secure by default - unknown operations require maximum privileges, forcing explicit permission grants.
def _audit(self, action: str, details: Dict):
"""Log security events for audit trail."""
audit_entry = {
"timestamp": time.time(),
"action": action,
"details": details
}
self.audit_log.append(audit_entry)
logger.info(f"SECURITY_AUDIT: {action} - {details}")
# Keep only last 1000 entries in memory
if len(self.audit_log) > 1000:
self.audit_log = self.audit_log[-1000:]
Comprehensive audit logging captures all security events with timestamps and contextual details. The structured format enables automated analysis and compliance reporting. Memory bounds prevent unbounded growth while retaining sufficient history for security analysis. The logging integration ensures events reach centralized security monitoring systems.
def get_audit_log(self, user_id: Optional[str] = None, limit: int = 100) -> List[Dict]:
"""Retrieve audit log entries."""
if user_id:
filtered_log = [
entry for entry in self.audit_log
if entry["details"].get("user_id") == user_id
]
else:
filtered_log = self.audit_log
return filtered_log[-limit:]
Audit log retrieval supports both user-specific and system-wide analysis. Filtering by user enables individual access review - examining all actions by a specific user for security investigations. The limit parameter prevents large result sets while the reverse slice returns the most recent entries, supporting both real-time monitoring and historical analysis.
Pattern 4: Performance Monitoring and Observability¶
Enterprise deployments require comprehensive monitoring and alerting capabilities.
# monitoring/enterprise_monitoring.py - Comprehensive monitoring imports
import time
import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from collections import defaultdict, deque
import statistics
import logging
logger = logging.getLogger(__name__)
Enterprise monitoring requires sophisticated data structures and statistical analysis. The deque
provides efficient circular buffering for metrics storage, while defaultdict
simplifies grouping operations. The statistics
module enables percentile calculations essential for SLA monitoring. These imports establish the foundation for production-grade observability.
@dataclass
class PerformanceMetrics:
timestamp: float
server_name: str
tool_name: str
response_time: float
success: bool
error_type: Optional[str] = None
user_id: Optional[str] = None
The PerformanceMetrics dataclass captures the essential dimensions for enterprise monitoring: temporal (when), spatial (which server), functional (what operation), performance (how fast), and quality (success/failure). Including user_id
enables per-tenant analysis in multi-tenant environments. This structure supports both real-time alerting and historical trend analysis.
class PerformanceTracker:
"""Enterprise-grade performance monitoring for MCP agents."""
def __init__(self, retention_hours: int = 24):
self.retention_hours = retention_hours
self.metrics: deque = deque(maxlen=10000) # Keep last 10k metrics
self.alert_thresholds = {
"response_time_p95": 5.0, # 95th percentile response time
"error_rate": 0.05, # 5% error rate
"availability": 0.99 # 99% availability
}
self.alerts: List[Dict] = []
The bounded deque implements fixed-memory monitoring - crucial for long-running enterprise services. The 10,000 metric limit prevents memory growth while providing sufficient data for statistical analysis. Alert thresholds follow enterprise SLA standards: P95 response time captures tail latency that affects user experience, 5% error rate allows for transient issues without false alarms, and 99% availability maps to "two nines" uptime requirements.
def record_metric(self, metric: PerformanceMetrics):
"""Record a performance metric."""
self.metrics.append(metric)
self._check_alerts()
Metric recording triggers immediate alert evaluation - implementing real-time monitoring. This approach ensures rapid detection of performance degradation, critical for enterprise environments where service level objectives must be maintained. The automatic alert checking eliminates the need for separate monitoring processes.
def get_server_stats(self, server_name: str, hours: int = 1) -> Dict[str, Any]:
"""Get comprehensive statistics for a server."""
cutoff_time = time.time() - (hours * 3600)
server_metrics = [
m for m in self.metrics
if m.server_name == server_name and m.timestamp >= cutoff_time
]
if not server_metrics:
return {"error": "No metrics found for server"}
Time-based filtering provides sliding window analysis - essential for understanding recent performance trends without being skewed by historical issues. The hour-based parameterization allows operators to adjust the analysis timeframe based on their needs: short windows for incident response, longer windows for capacity planning.
successful_metrics = [m for m in server_metrics if m.success]
failed_metrics = [m for m in server_metrics if not m.success]
response_times = [m.response_time for m in successful_metrics]
stats = {
"total_requests": len(server_metrics),
"successful_requests": len(successful_metrics),
"failed_requests": len(failed_metrics),
"error_rate": len(failed_metrics) / len(server_metrics) if server_metrics else 0,
"availability": len(successful_metrics) / len(server_metrics) if server_metrics else 0,
}
Separating successful and failed metrics enables quality-based analysis. Response times are calculated only from successful operations to avoid skewing performance metrics with timeout failures. The error rate and availability calculations provide the Golden Signals of monitoring - key indicators that drive operational decisions in enterprise environments.
if response_times:
stats.update({
"avg_response_time": statistics.mean(response_times),
"min_response_time": min(response_times),
"max_response_time": max(response_times),
"p50_response_time": statistics.median(response_times),
"p95_response_time": self._percentile(response_times, 0.95),
"p99_response_time": self._percentile(response_times, 0.99),
})
Comprehensive response time statistics provide different operational insights: mean shows overall system performance, median reveals typical user experience, P95/P99 expose tail latency affecting a subset of users. These percentiles are crucial for enterprise SLAs - most systems can handle average performance but struggle with outliers that create poor user experiences.
# Error breakdown
error_counts = defaultdict(int)
for metric in failed_metrics:
error_counts[metric.error_type or "unknown"] += 1
stats["error_breakdown"] = dict(error_counts)
return stats
Error categorization enables root cause analysis - distinguishing between timeout errors, connection failures, authentication issues, and application errors. This breakdown helps operations teams prioritize fixes: network issues require infrastructure attention, while application errors need development team involvement.
def get_tool_stats(self, tool_name: str, hours: int = 1) -> Dict[str, Any]:
"""Get statistics for a specific tool across all servers."""
cutoff_time = time.time() - (hours * 3600)
tool_metrics = [
m for m in self.metrics
if m.tool_name == tool_name and m.timestamp >= cutoff_time
]
if not tool_metrics:
return {"error": "No metrics found for tool"}
Tool-centric analysis supports feature-level monitoring - understanding how specific MCP tools perform across different servers. This view helps identify whether performance issues are systemic (affecting all tools) or specific to certain functionality, guiding troubleshooting efforts in complex enterprise deployments.
# Group by server
server_stats = defaultdict(list)
for metric in tool_metrics:
server_stats[metric.server_name].append(metric)
result = {
"total_requests": len(tool_metrics),
"servers_used": list(server_stats.keys()),
"per_server_stats": {}
}
for server, metrics in server_stats.items():
successful = [m for m in metrics if m.success]
result["per_server_stats"][server] = {
"requests": len(metrics),
"success_rate": len(successful) / len(metrics),
"avg_response_time": statistics.mean([m.response_time for m in successful]) if successful else 0
}
Server grouping reveals load distribution patterns and helps identify performance outliers. If one server shows consistently poor performance for a specific tool, it may indicate configuration issues, resource constraints, or network problems affecting that server specifically.
def _percentile(self, data: List[float], percentile: float) -> float:
"""Calculate percentile of data."""
if not data:
return 0.0
sorted_data = sorted(data)
index = int(percentile * len(sorted_data))
return sorted_data[min(index, len(sorted_data) - 1)]
Custom percentile calculation provides precise control over statistical analysis. While libraries exist for this, the simple implementation avoids external dependencies and ensures consistent behavior across environments. The boundary check prevents index errors with small datasets.
def _check_alerts(self):
"""Check for alert conditions."""
# Only check alerts every 100 metrics to avoid overhead
if len(self.metrics) % 100 != 0:
return
# Check last hour of data
recent_stats = self.get_server_stats("all", hours=1)
if "error" in recent_stats:
return
Throttled alert checking implements computational efficiency - running expensive statistical calculations every 100 metrics rather than every metric. This reduces CPU overhead while maintaining reasonable alert latency. The one-hour analysis window provides sufficient data for reliable trend detection.
# Check response time P95
if recent_stats.get("p95_response_time", 0) > self.alert_thresholds["response_time_p95"]:
self._trigger_alert("HIGH_RESPONSE_TIME", {
"p95_response_time": recent_stats["p95_response_time"],
"threshold": self.alert_thresholds["response_time_p95"]
})
# Check error rate
if recent_stats.get("error_rate", 0) > self.alert_thresholds["error_rate"]:
self._trigger_alert("HIGH_ERROR_RATE", {
"error_rate": recent_stats["error_rate"],
"threshold": self.alert_thresholds["error_rate"]
})
Multiple alert conditions monitor the Golden Signals - latency, errors, and availability. Each alert includes both current value and threshold for context. This information helps operators quickly assess severity and understand how far metrics have deviated from acceptable ranges.
# Check availability
if recent_stats.get("availability", 1) < self.alert_thresholds["availability"]:
self._trigger_alert("LOW_AVAILABILITY", {
"availability": recent_stats["availability"],
"threshold": self.alert_thresholds["availability"]
})
Availability monitoring detects systemic failures that might not trigger individual error rate alerts. The default value of 1 (100%) ensures that missing availability data doesn't trigger false alerts while still catching genuine availability issues.
def _trigger_alert(self, alert_type: str, details: Dict):
"""Trigger an alert."""
alert = {
"timestamp": time.time(),
"type": alert_type,
"details": details,
"resolved": False
}
self.alerts.append(alert)
logger.warning(f"ALERT: {alert_type} - {details}")
# Keep only last 100 alerts
if len(self.alerts) > 100:
self.alerts = self.alerts[-100:]
Alert structures capture essential metadata for incident management: timestamp for correlation, type for categorization, details for context, and resolution status for tracking. The bounded alert history prevents memory growth while maintaining recent alert context for troubleshooting.
def get_health_status(self) -> Dict[str, Any]:
"""Get overall system health status."""
recent_time = time.time() - 300 # Last 5 minutes
recent_metrics = [m for m in self.metrics if m.timestamp >= recent_time]
if not recent_metrics:
return {"status": "UNKNOWN", "reason": "No recent metrics"}
error_rate = len([m for m in recent_metrics if not m.success]) / len(recent_metrics)
avg_response_time = statistics.mean([m.response_time for m in recent_metrics if m.success])
if error_rate > 0.1: # 10% error rate
return {"status": "CRITICAL", "reason": "High error rate", "error_rate": error_rate}
elif error_rate > 0.05: # 5% error rate
return {"status": "WARNING", "reason": "Elevated error rate", "error_rate": error_rate}
elif avg_response_time > 3.0: # 3 second average
return {"status": "WARNING", "reason": "Slow response times", "avg_response_time": avg_response_time}
else:
return {"status": "HEALTHY", "error_rate": error_rate, "avg_response_time": avg_response_time}
The health status endpoint provides traffic light monitoring - a simple red/yellow/green status that operations teams can quickly interpret. The 5-minute window focuses on very recent performance, suitable for immediate operational decisions. Tiered thresholds distinguish between warning conditions (requiring attention) and critical conditions (requiring immediate action).
Module Assessment¶
Question 1: What is the primary purpose of the circuit breaker pattern in enterprise MCP deployments?
A) Improve performance
B) Prevent cascading failures
C) Reduce memory usage
D) Simplify configuration
Question 2: In the connection pooling pattern, what happens when the pool is exhausted?
A) Requests are rejected
B) New connections are created temporarily
C) The system waits indefinitely
D) Connections are shared unsafely
Question 3: Which authentication standard does the enterprise security pattern implement?
A) Basic authentication
B) OAuth 2.0
C) JWT tokens
D) API keys
Question 4: What triggers performance alerts in the monitoring system?
A) Manual configuration only
B) Threshold violations for response time, error rate, or availability
C) User complaints
D) Server restart events
Question 5: How does the enterprise MCP manager handle server failures?
A) Immediate shutdown
B) Circuit breaker protection with automatic recovery testing
C) Manual intervention required
D) Load balancing to other servers
Question 6: What is the benefit of audit logging in enterprise deployments?
A) Performance optimization
B) Compliance and security forensics
C) Debugging code issues
D) User experience improvement
Question 7: In the performance tracking system, what does P95 response time represent?
A) Average response time
B) 95% of requests complete within this time
C) Maximum response time
D) 95% availability percentage
View Module A Test Solutions →
🧭 Navigation¶
Previous: Session 2 - Implementation →
Next: Session 4 - Team Orchestration →