⚙️ Session 9 Advanced: Monitoring & Observability¶
⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯📝 Session 9 - Production Agent Deployment Time Investment: 4-6 hours Outcome: Master enterprise-scale monitoring, observability, and production troubleshooting
Advanced Learning Outcomes¶
After completing this advanced monitoring module, you will master:
- Comprehensive metrics collection and analysis for multi-agent systems
- Advanced health checking patterns with failure recovery automation
- Production alerting strategies with intelligent escalation and noise reduction
- Distributed tracing implementation for complex agent workflow debugging
- Performance optimization using observability data and automated remediation
Comprehensive Monitoring Architecture¶
Advanced Metrics Collection System¶
Enterprise monitoring requires sophisticated metrics that provide actionable insights into agent behavior and system health:
# monitoring/advanced_agent_metrics.py - Production monitoring foundation
import time
import asyncio
import psutil
import threading
from typing import Dict, Any, Optional, List, Callable
from prometheus_client import Counter, Histogram, Gauge, Info, Summary, start_http_server
from prometheus_client.core import CollectorRegistry
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from collections import defaultdict
logger = logging.getLogger(__name__)
Advanced metrics imports provide comprehensive system monitoring capabilities including process monitoring (psutil
), threading for non-blocking operations, and sophisticated data structures. The dataclass and collections imports enable efficient metric aggregation and correlation analysis.
@dataclass
class MetricPoint:
"""Individual metric measurement with metadata."""
name: str
value: float
timestamp: datetime
labels: Dict[str, str] = field(default_factory=dict)
class AdvancedAgentMetrics:
"""Comprehensive metrics collection for production agent systems."""
def __init__(self, service_name: str = "mcp-agent", metrics_port: int = 9090):
self.service_name = service_name
self.metrics_port = metrics_port
self.registry = CollectorRegistry()
# Metric storage for correlation analysis
self.metric_history: Dict[str, List[MetricPoint]] = defaultdict(list)
self.metric_lock = threading.Lock()
# Initialize comprehensive metrics
self._initialize_system_metrics()
self._initialize_business_metrics()
self._initialize_performance_metrics()
# Start background metric collection
self._start_background_collection()
# Start HTTP server with custom registry
start_http_server(metrics_port, registry=self.registry)
logger.info(f"Advanced metrics server started on port {metrics_port}")
The AdvancedAgentMetrics class implements enterprise-grade monitoring with metric correlation, historical analysis, and thread-safe operations. Custom registry registration enables metric isolation and advanced collection patterns required for large-scale deployments.
def _initialize_system_metrics(self):
"""Initialize comprehensive system-level metrics."""
# System identification and versioning
self.info = Info(
'agent_system_info',
'Comprehensive agent system information',
registry=self.registry
)
self.info.info({
'service': self.service_name,
'version': '1.0.0',
'environment': 'production',
'python_version': '3.11.0',
'deployment_timestamp': datetime.now().isoformat(),
'cluster_node': self._get_node_name(),
'availability_zone': self._get_availability_zone()
})
# Process and resource metrics
self.process_cpu_usage = Gauge(
'process_cpu_usage_percent',
'Current process CPU usage percentage',
registry=self.registry
)
self.process_memory_usage = Gauge(
'process_memory_usage_bytes',
'Current process memory usage in bytes',
registry=self.registry
)
self.process_memory_rss = Gauge(
'process_memory_rss_bytes',
'Process resident set size in bytes',
registry=self.registry
)
self.open_file_descriptors = Gauge(
'process_open_fds',
'Number of open file descriptors',
registry=self.registry
)
System metrics provide foundational visibility into resource consumption and process health. Memory RSS tracking identifies memory leaks, while file descriptor monitoring prevents resource exhaustion. Deployment timestamps and cluster information enable correlation with infrastructure changes.
def _initialize_business_metrics(self):
"""Initialize business-level agent metrics."""
# Agent workflow metrics with detailed labeling
self.workflow_executions_total = Counter(
'agent_workflow_executions_total',
'Total agent workflow executions',
['workflow_type', 'agent_id', 'outcome', 'complexity_tier'],
registry=self.registry
)
self.workflow_duration = Histogram(
'agent_workflow_duration_seconds',
'Agent workflow execution duration',
['workflow_type', 'complexity_tier'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 300.0],
registry=self.registry
)
# Active workflow tracking with queue analysis
self.active_workflows = Gauge(
'agent_active_workflows',
'Currently active workflows by type',
['workflow_type', 'priority'],
registry=self.registry
)
self.workflow_queue_depth = Gauge(
'agent_workflow_queue_depth',
'Pending workflows in queue',
['priority', 'workflow_type'],
registry=self.registry
)
Business metrics track agent workflow patterns with granular labeling that enables sophisticated analysis. Complexity tier tracking identifies performance patterns across different workflow types, while queue depth monitoring prevents backlog accumulation that could impact user experience.
# MCP tool interaction metrics with server breakdown
self.mcp_tool_calls = Counter(
'mcp_tool_calls_total',
'Total MCP tool calls with server breakdown',
['server', 'tool', 'status', 'error_type'],
registry=self.registry
)
self.mcp_tool_duration = Histogram(
'mcp_tool_call_duration_seconds',
'MCP tool call duration with percentile analysis',
['server', 'tool'],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0],
registry=self.registry
)
self.mcp_connection_pool_active = Gauge(
'mcp_connection_pool_active',
'Active MCP connections by server',
['server'],
registry=self.registry
)
self.mcp_connection_pool_idle = Gauge(
'mcp_connection_pool_idle',
'Idle MCP connections available',
['server'],
registry=self.registry
)
MCP-specific metrics provide deep visibility into tool integration performance and health. Connection pool tracking prevents resource exhaustion while enabling capacity planning. Error type labeling enables targeted troubleshooting of specific integration issues.
# Agent-to-Agent communication metrics
self.a2a_messages_sent = Counter(
'a2a_messages_sent_total',
'A2A messages sent with routing information',
['message_type', 'recipient_type', 'priority'],
registry=self.registry
)
self.a2a_messages_received = Counter(
'a2a_messages_received_total',
'A2A messages received with processing status',
['message_type', 'sender_type', 'processing_status'],
registry=self.registry
)
self.a2a_message_latency = Histogram(
'a2a_message_latency_seconds',
'End-to-end A2A message latency',
['message_type'],
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0],
registry=self.registry
)
Agent-to-agent communication metrics track distributed workflow coordination effectiveness. Latency histograms with fine-grained buckets capture network and processing delays, while message type breakdown enables identification of communication bottlenecks in complex multi-agent scenarios.
Advanced Performance Monitoring¶
def _initialize_performance_metrics(self):
"""Initialize detailed performance tracking metrics."""
# HTTP request performance with detailed breakdown
self.http_request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration with SLA alignment',
['method', 'endpoint', 'status_class'],
buckets=[0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0],
registry=self.registry
)
# Request rate and concurrency tracking
self.http_requests_in_flight = Gauge(
'http_requests_in_flight',
'Current number of HTTP requests being processed',
['endpoint'],
registry=self.registry
)
self.http_request_rate = Counter(
'http_requests_per_second',
'HTTP requests per second by endpoint',
['method', 'endpoint'],
registry=self.registry
)
# Error tracking with detailed classification
self.error_count = Counter(
'errors_total',
'Total errors with detailed classification',
['error_type', 'component', 'severity', 'recoverable'],
registry=self.registry
)
Performance metrics align with SLA requirements through carefully chosen histogram buckets that map to user experience thresholds. In-flight request tracking enables load shedding decisions, while detailed error classification enables targeted remediation strategies.
# Cache performance metrics
self.cache_operations = Counter(
'cache_operations_total',
'Cache operations with hit/miss tracking',
['operation', 'cache_type', 'result'],
registry=self.registry
)
self.cache_hit_ratio = Gauge(
'cache_hit_ratio',
'Cache hit ratio by cache type',
['cache_type'],
registry=self.registry
)
# Database performance tracking
self.database_operations = Counter(
'database_operations_total',
'Database operations with performance classification',
['operation', 'table', 'query_type'],
registry=self.registry
)
self.database_connection_pool_active = Gauge(
'database_connection_pool_active',
'Active database connections',
registry=self.registry
)
Cache and database performance metrics enable optimization of data access patterns. Hit ratio tracking guides cache sizing decisions, while connection pool monitoring prevents database resource exhaustion during high-load scenarios.
Intelligent Background Collection¶
def _start_background_collection(self):
"""Start background metric collection threads."""
# System resource collection
system_thread = threading.Thread(
target=self._collect_system_metrics,
daemon=True,
name="system-metrics-collector"
)
system_thread.start()
# Performance analysis collection
analysis_thread = threading.Thread(
target=self._collect_performance_analysis,
daemon=True,
name="performance-analysis-collector"
)
analysis_thread.start()
logger.info("Background metric collection threads started")
def _collect_system_metrics(self):
"""Continuously collect system-level metrics."""
while True:
try:
# Process metrics collection
process = psutil.Process()
# CPU usage with smoothing
cpu_percent = process.cpu_percent(interval=1.0)
self.process_cpu_usage.set(cpu_percent)
# Memory metrics with detailed breakdown
memory_info = process.memory_info()
self.process_memory_usage.set(memory_info.rss)
self.process_memory_rss.set(memory_info.rss)
# File descriptor tracking
try:
fds = process.num_fds()
self.open_file_descriptors.set(fds)
except AttributeError:
# Windows compatibility
self.open_file_descriptors.set(0)
# Store historical data for correlation
self._store_metric_point("cpu_usage", cpu_percent)
self._store_metric_point("memory_usage", memory_info.rss)
except Exception as e:
logger.error(f"Error collecting system metrics: {e}")
time.sleep(15) # 15-second collection interval
Background metric collection uses dedicated threads to prevent blocking of main application operations. Historical data storage enables correlation analysis and trend detection that can predict resource exhaustion before it impacts performance.
Comprehensive Health Checking System¶
# monitoring/advanced_health_checker.py - Enterprise health monitoring
import asyncio
import time
import json
from typing import Dict, Any, List, Callable, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
import logging
import aioredis
import asyncpg
logger = logging.getLogger(__name__)
class HealthStatus(Enum):
"""Health check status enumeration."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
@dataclass
class HealthCheckResult:
"""Individual health check result with metadata."""
name: str
status: HealthStatus
response_time_ms: float
message: str = ""
details: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
Advanced health checking uses structured data types and comprehensive status classification. The enum-based status system enables sophisticated health logic, while detailed results provide troubleshooting context for operations teams.
class AdvancedHealthChecker:
"""Comprehensive health checking with failure prediction and auto-recovery."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.health_checks: Dict[str, Callable] = {}
self.health_history: Dict[str, List[HealthCheckResult]] = {}
self.failure_patterns: Dict[str, int] = {}
self.auto_recovery_enabled = config.get('auto_recovery', True)
# Initialize core health checks
self._register_core_checks()
# Start health monitoring background task
asyncio.create_task(self._health_monitoring_loop())
def _register_core_checks(self):
"""Register essential health checks for agent systems."""
# Database connectivity check
self.register_health_check(
"database_connectivity",
self._check_database_health,
critical=True,
timeout=5.0
)
# Redis connectivity and performance check
self.register_health_check(
"redis_connectivity",
self._check_redis_health,
critical=True,
timeout=3.0
)
# MCP server connectivity check
self.register_health_check(
"mcp_servers_health",
self._check_mcp_servers,
critical=False,
timeout=10.0
)
# Internal system health check
self.register_health_check(
"system_resources",
self._check_system_resources,
critical=True,
timeout=2.0
)
Core health check registration establishes the foundation for system monitoring. Critical flag designation enables different alerting strategies, while timeout specifications prevent health checks from becoming performance bottlenecks during system stress.
async def _check_database_health(self) -> HealthCheckResult:
"""Comprehensive database health assessment."""
start_time = time.time()
try:
# Connection pool test
conn = await asyncpg.connect(self.config['database_url'])
# Simple query test
result = await conn.fetchval('SELECT 1')
if result != 1:
return HealthCheckResult(
name="database_connectivity",
status=HealthStatus.UNHEALTHY,
response_time_ms=(time.time() - start_time) * 1000,
message="Database query returned unexpected result",
details={"expected": 1, "actual": result}
)
# Connection count check
active_connections = await conn.fetchval(
"SELECT count(*) FROM pg_stat_activity WHERE state = 'active'"
)
# Performance test with complex query
await conn.fetchval("SELECT pg_database_size(current_database())")
await conn.close()
response_time = (time.time() - start_time) * 1000
# Determine status based on response time and load
if response_time > 1000: # 1 second threshold
status = HealthStatus.DEGRADED
message = f"Database responding slowly ({response_time:.1f}ms)"
elif active_connections > 80: # High connection count
status = HealthStatus.DEGRADED
message = f"High database connection count ({active_connections})"
else:
status = HealthStatus.HEALTHY
message = "Database operating normally"
return HealthCheckResult(
name="database_connectivity",
status=status,
response_time_ms=response_time,
message=message,
details={
"active_connections": active_connections,
"query_performance_ms": response_time
}
)
except Exception as e:
return HealthCheckResult(
name="database_connectivity",
status=HealthStatus.UNHEALTHY,
response_time_ms=(time.time() - start_time) * 1000,
message=f"Database connectivity failed: {str(e)}",
details={"error_type": type(e).__name__, "error_details": str(e)}
)
Database health checking goes beyond simple connectivity to include performance assessment and load monitoring. Multi-tier status determination (HEALTHY/DEGRADED/UNHEALTHY) enables nuanced alerting and automatic remediation strategies.
async def _check_redis_health(self) -> HealthCheckResult:
"""Comprehensive Redis cluster health assessment."""
start_time = time.time()
try:
# Redis cluster connection
redis = aioredis.from_url(
self.config['redis_url'],
encoding="utf-8",
decode_responses=True
)
# Basic connectivity test
pong = await redis.ping()
if not pong:
await redis.close()
return HealthCheckResult(
name="redis_connectivity",
status=HealthStatus.UNHEALTHY,
response_time_ms=(time.time() - start_time) * 1000,
message="Redis ping failed"
)
# Performance test with set/get operations
test_key = f"health_check_{int(time.time())}"
await redis.set(test_key, "health_check_value", ex=60)
retrieved_value = await redis.get(test_key)
await redis.delete(test_key)
if retrieved_value != "health_check_value":
await redis.close()
return HealthCheckResult(
name="redis_connectivity",
status=HealthStatus.UNHEALTHY,
response_time_ms=(time.time() - start_time) * 1000,
message="Redis data integrity test failed"
)
# Cluster health assessment (if clustering enabled)
cluster_info = {}
try:
cluster_nodes = await redis.cluster_nodes()
cluster_info = self._analyze_cluster_health(cluster_nodes)
except Exception:
# Not a cluster deployment
pass
await redis.close()
response_time = (time.time() - start_time) * 1000
# Determine health status
if cluster_info.get('failed_nodes', 0) > 0:
status = HealthStatus.DEGRADED
message = f"Redis cluster has {cluster_info['failed_nodes']} failed nodes"
elif response_time > 500: # 500ms threshold
status = HealthStatus.DEGRADED
message = f"Redis responding slowly ({response_time:.1f}ms)"
else:
status = HealthStatus.HEALTHY
message = "Redis operating normally"
return HealthCheckResult(
name="redis_connectivity",
status=status,
response_time_ms=response_time,
message=message,
details={
"performance_ms": response_time,
"cluster_info": cluster_info
}
)
except Exception as e:
return HealthCheckResult(
name="redis_connectivity",
status=HealthStatus.UNHEALTHY,
response_time_ms=(time.time() - start_time) * 1000,
message=f"Redis connectivity failed: {str(e)}",
details={"error_type": type(e).__name__}
)
Redis health checking includes cluster analysis and performance validation through real data operations. The comprehensive assessment helps identify both connectivity issues and performance degradation that could impact agent coordination.
Production Alert Management¶
# monitoring/alert_manager.py - Enterprise alerting system
import json
import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class AlertSeverity(Enum):
"""Alert severity levels with escalation policies."""
CRITICAL = "critical"
WARNING = "warning"
INFO = "info"
@dataclass
class Alert:
"""Alert definition with routing and escalation metadata."""
name: str
severity: AlertSeverity
message: str
labels: Dict[str, str]
annotations: Dict[str, str]
timestamp: datetime
resolved: bool = False
class AlertManager:
"""Production-grade alert management with intelligent routing."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.active_alerts: Dict[str, Alert] = {}
self.alert_history: List[Alert] = []
self.suppression_rules: List[Dict[str, Any]] = []
# Load alert rules from configuration
self._load_alert_rules()
Alert management system implements enterprise patterns including severity-based routing, suppression rules, and historical analysis. The structured approach enables sophisticated alert correlation and reduces notification noise in production environments.
# monitoring/alerts/production-alerts.yml - Comprehensive alert definitions
groups:
- name: system-health
rules:
- alert: AgentHighCPUUsage
expr: process_cpu_usage_percent > 80
for: 5m
labels:
severity: warning
component: system
team: platform
annotations:
summary: "Agent process high CPU usage"
description: "CPU usage is {{ $value }}% for more than 5 minutes on {{ $labels.instance }}"
runbook_url: "https://runbooks.company.com/agent-high-cpu"
dashboard_url: "https://grafana.company.com/d/agent-performance"
- alert: AgentMemoryLeakDetected
expr: increase(process_memory_rss_bytes[30m]) > 100000000
for: 10m
labels:
severity: critical
component: system
team: platform
annotations:
summary: "Potential memory leak detected"
description: "Memory usage increased by {{ $value | humanizeBytes }} in 30 minutes"
action_required: "Investigate memory usage patterns and consider restart"
- alert: WorkflowFailureRateHigh
expr: (rate(agent_workflow_executions_total{outcome="failed"}[5m]) / rate(agent_workflow_executions_total[5m])) > 0.1
for: 3m
labels:
severity: critical
component: workflow
team: product
annotations:
summary: "High workflow failure rate detected"
description: "Workflow failure rate is {{ $value | humanizePercentage }} over 5 minutes"
impact: "User experience degradation"
Production alert rules include comprehensive metadata for routing, escalation, and remediation. Runbook URLs enable rapid response, while dashboard links provide immediate access to relevant debugging information. Different teams receive different alerts based on component ownership.
- name: infrastructure-health
rules:
- alert: DatabaseConnectionPoolExhausted
expr: database_connection_pool_active >= 95
for: 2m
labels:
severity: critical
component: database
team: platform
annotations:
summary: "Database connection pool near exhaustion"
description: "{{ $value }} active connections out of maximum pool size"
immediate_action: "Scale connection pool or investigate connection leaks"
- alert: RedisClusterNodeDown
expr: redis_cluster_nodes{status="fail"} > 0
for: 1m
labels:
severity: critical
component: cache
team: platform
annotations:
summary: "Redis cluster node failure detected"
description: "{{ $value }} Redis cluster nodes are in failed state"
escalation: "Page on-call engineer immediately"
- alert: MCPToolLatencyHigh
expr: histogram_quantile(0.95, rate(mcp_tool_call_duration_seconds_bucket[5m])) > 10
for: 2m
labels:
severity: warning
component: mcp
team: integrations
annotations:
summary: "MCP tool calls experiencing high latency"
description: "95th percentile latency is {{ $value }}s for {{ $labels.server }}/{{ $labels.tool }}"
investigation: "Check external service health and network connectivity"
Infrastructure alerts focus on resource exhaustion and external dependency health. Critical alerts trigger immediate escalation, while warning-level alerts enable proactive investigation before user impact occurs. Component-specific routing ensures alerts reach teams with appropriate expertise.
Advanced Troubleshooting Patterns¶
Distributed Tracing Implementation¶
# monitoring/distributed_tracing.py - Enterprise tracing system
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
import time
from typing import Dict, Any, Optional
from contextlib import contextmanager
class DistributedTracing:
"""Enterprise-grade distributed tracing for agent workflows."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.tracer_provider = TracerProvider()
trace.set_tracer_provider(self.tracer_provider)
# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name=config.get('jaeger_host', 'localhost'),
agent_port=config.get('jaeger_port', 6831),
)
span_processor = BatchSpanProcessor(jaeger_exporter)
self.tracer_provider.add_span_processor(span_processor)
# Initialize auto-instrumentation
RequestsInstrumentor().instrument()
AsyncPGInstrumentor().instrument()
self.tracer = trace.get_tracer(__name__)
@contextmanager
def trace_workflow(self, workflow_name: str, workflow_id: str, **attributes):
"""Trace complete workflow execution with correlation."""
with self.tracer.start_as_current_span(
f"workflow.{workflow_name}",
attributes={
"workflow.id": workflow_id,
"workflow.name": workflow_name,
"service.name": "mcp-agent",
**attributes
}
) as span:
span.set_attribute("workflow.start_time", time.time())
try:
yield span
span.set_status(trace.StatusCode.OK)
except Exception as e:
span.set_status(trace.StatusCode.ERROR, str(e))
span.record_exception(e)
raise
finally:
span.set_attribute("workflow.end_time", time.time())
Distributed tracing implementation provides end-to-end visibility into complex multi-agent workflows. Automatic instrumentation captures database and HTTP interactions, while custom workflow tracing enables correlation of business operations with infrastructure performance.
Performance Analysis Automation¶
# monitoring/performance_analyzer.py - Automated performance optimization
import statistics
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class PerformanceAnomaly:
"""Detected performance anomaly with context."""
metric_name: str
anomaly_type: str
severity: str
description: str
suggested_actions: List[str]
detection_time: datetime
class PerformanceAnalyzer:
"""Automated performance analysis and optimization recommendations."""
def __init__(self, metrics_collector):
self.metrics = metrics_collector
self.baseline_data: Dict[str, List[float]] = {}
self.anomaly_threshold = 2.0 # Standard deviations
def analyze_performance_trends(self, hours: int = 24) -> List[PerformanceAnomaly]:
"""Analyze performance trends and detect anomalies."""
anomalies = []
# Analyze response time trends
response_times = self._get_metric_history('response_time', hours)
if response_times:
anomaly = self._detect_response_time_anomalies(response_times)
if anomaly:
anomalies.append(anomaly)
# Analyze resource utilization patterns
cpu_usage = self._get_metric_history('cpu_usage', hours)
memory_usage = self._get_metric_history('memory_usage', hours)
if cpu_usage and memory_usage:
resource_anomaly = self._detect_resource_anomalies(cpu_usage, memory_usage)
if resource_anomaly:
anomalies.append(resource_anomaly)
return anomalies
def _detect_response_time_anomalies(self, response_times: List[float]) -> Optional[PerformanceAnomaly]:
"""Detect response time anomalies using statistical analysis."""
if len(response_times) < 10:
return None
mean = statistics.mean(response_times)
stdev = statistics.stdev(response_times)
# Recent data for comparison
recent_times = response_times[-20:]
recent_mean = statistics.mean(recent_times)
if recent_mean > mean + (self.anomaly_threshold * stdev):
return PerformanceAnomaly(
metric_name="response_time",
anomaly_type="latency_increase",
severity="warning" if recent_mean < mean + (3 * stdev) else "critical",
description=f"Response time increased significantly: {recent_mean:.2f}s vs baseline {mean:.2f}s",
suggested_actions=[
"Check database query performance",
"Analyze resource utilization",
"Review recent deployments",
"Scale horizontal replicas if load increased"
],
detection_time=datetime.now()
)
return None
Automated performance analysis uses statistical methods to detect anomalies and provide actionable recommendations. This proactive approach enables optimization before performance issues impact users, while suggested actions guide operations teams toward effective remediation strategies.
Advanced monitoring transforms data into insights, insights into actions, and actions into reliable, high-performing production systems.
🧭 Navigation¶
Previous: Session 8 - Production Ready →
Next: Session 10 - Enterprise Integration →