Skip to content

⚙️ Session 9 Advanced: Monitoring & Observability

⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯📝 Session 9 - Production Agent Deployment Time Investment: 4-6 hours Outcome: Master enterprise-scale monitoring, observability, and production troubleshooting

Advanced Learning Outcomes

After completing this advanced monitoring module, you will master:

  • Comprehensive metrics collection and analysis for multi-agent systems
  • Advanced health checking patterns with failure recovery automation
  • Production alerting strategies with intelligent escalation and noise reduction
  • Distributed tracing implementation for complex agent workflow debugging
  • Performance optimization using observability data and automated remediation

Comprehensive Monitoring Architecture

Advanced Metrics Collection System

Enterprise monitoring requires sophisticated metrics that provide actionable insights into agent behavior and system health:

# monitoring/advanced_agent_metrics.py - Production monitoring foundation
import time
import asyncio
import psutil
import threading
from typing import Dict, Any, Optional, List, Callable
from prometheus_client import Counter, Histogram, Gauge, Info, Summary, start_http_server
from prometheus_client.core import CollectorRegistry
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from collections import defaultdict

logger = logging.getLogger(__name__)

Advanced metrics imports provide comprehensive system monitoring capabilities including process monitoring (psutil), threading for non-blocking operations, and sophisticated data structures. The dataclass and collections imports enable efficient metric aggregation and correlation analysis.

@dataclass
class MetricPoint:
    """Individual metric measurement with metadata."""
    name: str
    value: float
    timestamp: datetime
    labels: Dict[str, str] = field(default_factory=dict)

class AdvancedAgentMetrics:
    """Comprehensive metrics collection for production agent systems."""

    def __init__(self, service_name: str = "mcp-agent", metrics_port: int = 9090):
        self.service_name = service_name
        self.metrics_port = metrics_port
        self.registry = CollectorRegistry()

        # Metric storage for correlation analysis
        self.metric_history: Dict[str, List[MetricPoint]] = defaultdict(list)
        self.metric_lock = threading.Lock()

        # Initialize comprehensive metrics
        self._initialize_system_metrics()
        self._initialize_business_metrics()
        self._initialize_performance_metrics()

        # Start background metric collection
        self._start_background_collection()

        # Start HTTP server with custom registry
        start_http_server(metrics_port, registry=self.registry)
        logger.info(f"Advanced metrics server started on port {metrics_port}")

The AdvancedAgentMetrics class implements enterprise-grade monitoring with metric correlation, historical analysis, and thread-safe operations. Custom registry registration enables metric isolation and advanced collection patterns required for large-scale deployments.

    def _initialize_system_metrics(self):
        """Initialize comprehensive system-level metrics."""

        # System identification and versioning
        self.info = Info(
            'agent_system_info',
            'Comprehensive agent system information',
            registry=self.registry
        )
        self.info.info({
            'service': self.service_name,
            'version': '1.0.0',
            'environment': 'production',
            'python_version': '3.11.0',
            'deployment_timestamp': datetime.now().isoformat(),
            'cluster_node': self._get_node_name(),
            'availability_zone': self._get_availability_zone()
        })

        # Process and resource metrics
        self.process_cpu_usage = Gauge(
            'process_cpu_usage_percent',
            'Current process CPU usage percentage',
            registry=self.registry
        )

        self.process_memory_usage = Gauge(
            'process_memory_usage_bytes',
            'Current process memory usage in bytes',
            registry=self.registry
        )

        self.process_memory_rss = Gauge(
            'process_memory_rss_bytes',
            'Process resident set size in bytes',
            registry=self.registry
        )

        self.open_file_descriptors = Gauge(
            'process_open_fds',
            'Number of open file descriptors',
            registry=self.registry
        )

System metrics provide foundational visibility into resource consumption and process health. Memory RSS tracking identifies memory leaks, while file descriptor monitoring prevents resource exhaustion. Deployment timestamps and cluster information enable correlation with infrastructure changes.

    def _initialize_business_metrics(self):
        """Initialize business-level agent metrics."""

        # Agent workflow metrics with detailed labeling
        self.workflow_executions_total = Counter(
            'agent_workflow_executions_total',
            'Total agent workflow executions',
            ['workflow_type', 'agent_id', 'outcome', 'complexity_tier'],
            registry=self.registry
        )

        self.workflow_duration = Histogram(
            'agent_workflow_duration_seconds',
            'Agent workflow execution duration',
            ['workflow_type', 'complexity_tier'],
            buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 300.0],
            registry=self.registry
        )

        # Active workflow tracking with queue analysis
        self.active_workflows = Gauge(
            'agent_active_workflows',
            'Currently active workflows by type',
            ['workflow_type', 'priority'],
            registry=self.registry
        )

        self.workflow_queue_depth = Gauge(
            'agent_workflow_queue_depth',
            'Pending workflows in queue',
            ['priority', 'workflow_type'],
            registry=self.registry
        )

Business metrics track agent workflow patterns with granular labeling that enables sophisticated analysis. Complexity tier tracking identifies performance patterns across different workflow types, while queue depth monitoring prevents backlog accumulation that could impact user experience.

        # MCP tool interaction metrics with server breakdown
        self.mcp_tool_calls = Counter(
            'mcp_tool_calls_total',
            'Total MCP tool calls with server breakdown',
            ['server', 'tool', 'status', 'error_type'],
            registry=self.registry
        )

        self.mcp_tool_duration = Histogram(
            'mcp_tool_call_duration_seconds',
            'MCP tool call duration with percentile analysis',
            ['server', 'tool'],
            buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0],
            registry=self.registry
        )

        self.mcp_connection_pool_active = Gauge(
            'mcp_connection_pool_active',
            'Active MCP connections by server',
            ['server'],
            registry=self.registry
        )

        self.mcp_connection_pool_idle = Gauge(
            'mcp_connection_pool_idle',
            'Idle MCP connections available',
            ['server'],
            registry=self.registry
        )

MCP-specific metrics provide deep visibility into tool integration performance and health. Connection pool tracking prevents resource exhaustion while enabling capacity planning. Error type labeling enables targeted troubleshooting of specific integration issues.

        # Agent-to-Agent communication metrics
        self.a2a_messages_sent = Counter(
            'a2a_messages_sent_total',
            'A2A messages sent with routing information',
            ['message_type', 'recipient_type', 'priority'],
            registry=self.registry
        )

        self.a2a_messages_received = Counter(
            'a2a_messages_received_total',
            'A2A messages received with processing status',
            ['message_type', 'sender_type', 'processing_status'],
            registry=self.registry
        )

        self.a2a_message_latency = Histogram(
            'a2a_message_latency_seconds',
            'End-to-end A2A message latency',
            ['message_type'],
            buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0],
            registry=self.registry
        )

Agent-to-agent communication metrics track distributed workflow coordination effectiveness. Latency histograms with fine-grained buckets capture network and processing delays, while message type breakdown enables identification of communication bottlenecks in complex multi-agent scenarios.

Advanced Performance Monitoring

    def _initialize_performance_metrics(self):
        """Initialize detailed performance tracking metrics."""

        # HTTP request performance with detailed breakdown
        self.http_request_duration = Histogram(
            'http_request_duration_seconds',
            'HTTP request duration with SLA alignment',
            ['method', 'endpoint', 'status_class'],
            buckets=[0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0],
            registry=self.registry
        )

        # Request rate and concurrency tracking
        self.http_requests_in_flight = Gauge(
            'http_requests_in_flight',
            'Current number of HTTP requests being processed',
            ['endpoint'],
            registry=self.registry
        )

        self.http_request_rate = Counter(
            'http_requests_per_second',
            'HTTP requests per second by endpoint',
            ['method', 'endpoint'],
            registry=self.registry
        )

        # Error tracking with detailed classification
        self.error_count = Counter(
            'errors_total',
            'Total errors with detailed classification',
            ['error_type', 'component', 'severity', 'recoverable'],
            registry=self.registry
        )

Performance metrics align with SLA requirements through carefully chosen histogram buckets that map to user experience thresholds. In-flight request tracking enables load shedding decisions, while detailed error classification enables targeted remediation strategies.

        # Cache performance metrics
        self.cache_operations = Counter(
            'cache_operations_total',
            'Cache operations with hit/miss tracking',
            ['operation', 'cache_type', 'result'],
            registry=self.registry
        )

        self.cache_hit_ratio = Gauge(
            'cache_hit_ratio',
            'Cache hit ratio by cache type',
            ['cache_type'],
            registry=self.registry
        )

        # Database performance tracking
        self.database_operations = Counter(
            'database_operations_total',
            'Database operations with performance classification',
            ['operation', 'table', 'query_type'],
            registry=self.registry
        )

        self.database_connection_pool_active = Gauge(
            'database_connection_pool_active',
            'Active database connections',
            registry=self.registry
        )

Cache and database performance metrics enable optimization of data access patterns. Hit ratio tracking guides cache sizing decisions, while connection pool monitoring prevents database resource exhaustion during high-load scenarios.

Intelligent Background Collection

    def _start_background_collection(self):
        """Start background metric collection threads."""

        # System resource collection
        system_thread = threading.Thread(
            target=self._collect_system_metrics,
            daemon=True,
            name="system-metrics-collector"
        )
        system_thread.start()

        # Performance analysis collection
        analysis_thread = threading.Thread(
            target=self._collect_performance_analysis,
            daemon=True,
            name="performance-analysis-collector"
        )
        analysis_thread.start()

        logger.info("Background metric collection threads started")

    def _collect_system_metrics(self):
        """Continuously collect system-level metrics."""
        while True:
            try:
                # Process metrics collection
                process = psutil.Process()

                # CPU usage with smoothing
                cpu_percent = process.cpu_percent(interval=1.0)
                self.process_cpu_usage.set(cpu_percent)

                # Memory metrics with detailed breakdown
                memory_info = process.memory_info()
                self.process_memory_usage.set(memory_info.rss)
                self.process_memory_rss.set(memory_info.rss)

                # File descriptor tracking
                try:
                    fds = process.num_fds()
                    self.open_file_descriptors.set(fds)
                except AttributeError:
                    # Windows compatibility
                    self.open_file_descriptors.set(0)

                # Store historical data for correlation
                self._store_metric_point("cpu_usage", cpu_percent)
                self._store_metric_point("memory_usage", memory_info.rss)

            except Exception as e:
                logger.error(f"Error collecting system metrics: {e}")

            time.sleep(15)  # 15-second collection interval

Background metric collection uses dedicated threads to prevent blocking of main application operations. Historical data storage enables correlation analysis and trend detection that can predict resource exhaustion before it impacts performance.

Comprehensive Health Checking System

# monitoring/advanced_health_checker.py - Enterprise health monitoring
import asyncio
import time
import json
from typing import Dict, Any, List, Callable, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
import logging
import aioredis
import asyncpg

logger = logging.getLogger(__name__)

class HealthStatus(Enum):
    """Health check status enumeration."""
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"
    UNKNOWN = "unknown"

@dataclass
class HealthCheckResult:
    """Individual health check result with metadata."""
    name: str
    status: HealthStatus
    response_time_ms: float
    message: str = ""
    details: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)

Advanced health checking uses structured data types and comprehensive status classification. The enum-based status system enables sophisticated health logic, while detailed results provide troubleshooting context for operations teams.

class AdvancedHealthChecker:
    """Comprehensive health checking with failure prediction and auto-recovery."""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.health_checks: Dict[str, Callable] = {}
        self.health_history: Dict[str, List[HealthCheckResult]] = {}
        self.failure_patterns: Dict[str, int] = {}
        self.auto_recovery_enabled = config.get('auto_recovery', True)

        # Initialize core health checks
        self._register_core_checks()

        # Start health monitoring background task
        asyncio.create_task(self._health_monitoring_loop())

    def _register_core_checks(self):
        """Register essential health checks for agent systems."""

        # Database connectivity check
        self.register_health_check(
            "database_connectivity",
            self._check_database_health,
            critical=True,
            timeout=5.0
        )

        # Redis connectivity and performance check
        self.register_health_check(
            "redis_connectivity",
            self._check_redis_health,
            critical=True,
            timeout=3.0
        )

        # MCP server connectivity check
        self.register_health_check(
            "mcp_servers_health",
            self._check_mcp_servers,
            critical=False,
            timeout=10.0
        )

        # Internal system health check
        self.register_health_check(
            "system_resources",
            self._check_system_resources,
            critical=True,
            timeout=2.0
        )

Core health check registration establishes the foundation for system monitoring. Critical flag designation enables different alerting strategies, while timeout specifications prevent health checks from becoming performance bottlenecks during system stress.

    async def _check_database_health(self) -> HealthCheckResult:
        """Comprehensive database health assessment."""
        start_time = time.time()

        try:
            # Connection pool test
            conn = await asyncpg.connect(self.config['database_url'])

            # Simple query test
            result = await conn.fetchval('SELECT 1')
            if result != 1:
                return HealthCheckResult(
                    name="database_connectivity",
                    status=HealthStatus.UNHEALTHY,
                    response_time_ms=(time.time() - start_time) * 1000,
                    message="Database query returned unexpected result",
                    details={"expected": 1, "actual": result}
                )

            # Connection count check
            active_connections = await conn.fetchval(
                "SELECT count(*) FROM pg_stat_activity WHERE state = 'active'"
            )

            # Performance test with complex query
            await conn.fetchval("SELECT pg_database_size(current_database())")

            await conn.close()

            response_time = (time.time() - start_time) * 1000

            # Determine status based on response time and load
            if response_time > 1000:  # 1 second threshold
                status = HealthStatus.DEGRADED
                message = f"Database responding slowly ({response_time:.1f}ms)"
            elif active_connections > 80:  # High connection count
                status = HealthStatus.DEGRADED
                message = f"High database connection count ({active_connections})"
            else:
                status = HealthStatus.HEALTHY
                message = "Database operating normally"

            return HealthCheckResult(
                name="database_connectivity",
                status=status,
                response_time_ms=response_time,
                message=message,
                details={
                    "active_connections": active_connections,
                    "query_performance_ms": response_time
                }
            )

        except Exception as e:
            return HealthCheckResult(
                name="database_connectivity",
                status=HealthStatus.UNHEALTHY,
                response_time_ms=(time.time() - start_time) * 1000,
                message=f"Database connectivity failed: {str(e)}",
                details={"error_type": type(e).__name__, "error_details": str(e)}
            )

Database health checking goes beyond simple connectivity to include performance assessment and load monitoring. Multi-tier status determination (HEALTHY/DEGRADED/UNHEALTHY) enables nuanced alerting and automatic remediation strategies.

    async def _check_redis_health(self) -> HealthCheckResult:
        """Comprehensive Redis cluster health assessment."""
        start_time = time.time()

        try:
            # Redis cluster connection
            redis = aioredis.from_url(
                self.config['redis_url'],
                encoding="utf-8",
                decode_responses=True
            )

            # Basic connectivity test
            pong = await redis.ping()
            if not pong:
                await redis.close()
                return HealthCheckResult(
                    name="redis_connectivity",
                    status=HealthStatus.UNHEALTHY,
                    response_time_ms=(time.time() - start_time) * 1000,
                    message="Redis ping failed"
                )

            # Performance test with set/get operations
            test_key = f"health_check_{int(time.time())}"
            await redis.set(test_key, "health_check_value", ex=60)
            retrieved_value = await redis.get(test_key)
            await redis.delete(test_key)

            if retrieved_value != "health_check_value":
                await redis.close()
                return HealthCheckResult(
                    name="redis_connectivity",
                    status=HealthStatus.UNHEALTHY,
                    response_time_ms=(time.time() - start_time) * 1000,
                    message="Redis data integrity test failed"
                )

            # Cluster health assessment (if clustering enabled)
            cluster_info = {}
            try:
                cluster_nodes = await redis.cluster_nodes()
                cluster_info = self._analyze_cluster_health(cluster_nodes)
            except Exception:
                # Not a cluster deployment
                pass

            await redis.close()

            response_time = (time.time() - start_time) * 1000

            # Determine health status
            if cluster_info.get('failed_nodes', 0) > 0:
                status = HealthStatus.DEGRADED
                message = f"Redis cluster has {cluster_info['failed_nodes']} failed nodes"
            elif response_time > 500:  # 500ms threshold
                status = HealthStatus.DEGRADED
                message = f"Redis responding slowly ({response_time:.1f}ms)"
            else:
                status = HealthStatus.HEALTHY
                message = "Redis operating normally"

            return HealthCheckResult(
                name="redis_connectivity",
                status=status,
                response_time_ms=response_time,
                message=message,
                details={
                    "performance_ms": response_time,
                    "cluster_info": cluster_info
                }
            )

        except Exception as e:
            return HealthCheckResult(
                name="redis_connectivity",
                status=HealthStatus.UNHEALTHY,
                response_time_ms=(time.time() - start_time) * 1000,
                message=f"Redis connectivity failed: {str(e)}",
                details={"error_type": type(e).__name__}
            )

Redis health checking includes cluster analysis and performance validation through real data operations. The comprehensive assessment helps identify both connectivity issues and performance degradation that could impact agent coordination.

Production Alert Management

# monitoring/alert_manager.py - Enterprise alerting system
import json
import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class AlertSeverity(Enum):
    """Alert severity levels with escalation policies."""
    CRITICAL = "critical"
    WARNING = "warning"
    INFO = "info"

@dataclass
class Alert:
    """Alert definition with routing and escalation metadata."""
    name: str
    severity: AlertSeverity
    message: str
    labels: Dict[str, str]
    annotations: Dict[str, str]
    timestamp: datetime
    resolved: bool = False

class AlertManager:
    """Production-grade alert management with intelligent routing."""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.active_alerts: Dict[str, Alert] = {}
        self.alert_history: List[Alert] = []
        self.suppression_rules: List[Dict[str, Any]] = []

        # Load alert rules from configuration
        self._load_alert_rules()

Alert management system implements enterprise patterns including severity-based routing, suppression rules, and historical analysis. The structured approach enables sophisticated alert correlation and reduces notification noise in production environments.

# monitoring/alerts/production-alerts.yml - Comprehensive alert definitions
groups:
- name: system-health
  rules:
  - alert: AgentHighCPUUsage
    expr: process_cpu_usage_percent > 80
    for: 5m
    labels:
      severity: warning
      component: system
      team: platform
    annotations:
      summary: "Agent process high CPU usage"
      description: "CPU usage is {{ $value }}% for more than 5 minutes on {{ $labels.instance }}"
      runbook_url: "https://runbooks.company.com/agent-high-cpu"
      dashboard_url: "https://grafana.company.com/d/agent-performance"

  - alert: AgentMemoryLeakDetected
    expr: increase(process_memory_rss_bytes[30m]) > 100000000
    for: 10m
    labels:
      severity: critical
      component: system
      team: platform
    annotations:
      summary: "Potential memory leak detected"
      description: "Memory usage increased by {{ $value | humanizeBytes }} in 30 minutes"
      action_required: "Investigate memory usage patterns and consider restart"

  - alert: WorkflowFailureRateHigh
    expr: (rate(agent_workflow_executions_total{outcome="failed"}[5m]) / rate(agent_workflow_executions_total[5m])) > 0.1
    for: 3m
    labels:
      severity: critical
      component: workflow
      team: product
    annotations:
      summary: "High workflow failure rate detected"
      description: "Workflow failure rate is {{ $value | humanizePercentage }} over 5 minutes"
      impact: "User experience degradation"

Production alert rules include comprehensive metadata for routing, escalation, and remediation. Runbook URLs enable rapid response, while dashboard links provide immediate access to relevant debugging information. Different teams receive different alerts based on component ownership.

- name: infrastructure-health
  rules:
  - alert: DatabaseConnectionPoolExhausted
    expr: database_connection_pool_active >= 95
    for: 2m
    labels:
      severity: critical
      component: database
      team: platform
    annotations:
      summary: "Database connection pool near exhaustion"
      description: "{{ $value }} active connections out of maximum pool size"
      immediate_action: "Scale connection pool or investigate connection leaks"

  - alert: RedisClusterNodeDown
    expr: redis_cluster_nodes{status="fail"} > 0
    for: 1m
    labels:
      severity: critical
      component: cache
      team: platform
    annotations:
      summary: "Redis cluster node failure detected"
      description: "{{ $value }} Redis cluster nodes are in failed state"
      escalation: "Page on-call engineer immediately"

  - alert: MCPToolLatencyHigh
    expr: histogram_quantile(0.95, rate(mcp_tool_call_duration_seconds_bucket[5m])) > 10
    for: 2m
    labels:
      severity: warning
      component: mcp
      team: integrations
    annotations:
      summary: "MCP tool calls experiencing high latency"
      description: "95th percentile latency is {{ $value }}s for {{ $labels.server }}/{{ $labels.tool }}"
      investigation: "Check external service health and network connectivity"

Infrastructure alerts focus on resource exhaustion and external dependency health. Critical alerts trigger immediate escalation, while warning-level alerts enable proactive investigation before user impact occurs. Component-specific routing ensures alerts reach teams with appropriate expertise.

Advanced Troubleshooting Patterns

Distributed Tracing Implementation

# monitoring/distributed_tracing.py - Enterprise tracing system
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
import time
from typing import Dict, Any, Optional
from contextlib import contextmanager

class DistributedTracing:
    """Enterprise-grade distributed tracing for agent workflows."""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.tracer_provider = TracerProvider()
        trace.set_tracer_provider(self.tracer_provider)

        # Configure Jaeger exporter
        jaeger_exporter = JaegerExporter(
            agent_host_name=config.get('jaeger_host', 'localhost'),
            agent_port=config.get('jaeger_port', 6831),
        )

        span_processor = BatchSpanProcessor(jaeger_exporter)
        self.tracer_provider.add_span_processor(span_processor)

        # Initialize auto-instrumentation
        RequestsInstrumentor().instrument()
        AsyncPGInstrumentor().instrument()

        self.tracer = trace.get_tracer(__name__)

    @contextmanager
    def trace_workflow(self, workflow_name: str, workflow_id: str, **attributes):
        """Trace complete workflow execution with correlation."""
        with self.tracer.start_as_current_span(
            f"workflow.{workflow_name}",
            attributes={
                "workflow.id": workflow_id,
                "workflow.name": workflow_name,
                "service.name": "mcp-agent",
                **attributes
            }
        ) as span:
            span.set_attribute("workflow.start_time", time.time())
            try:
                yield span
                span.set_status(trace.StatusCode.OK)
            except Exception as e:
                span.set_status(trace.StatusCode.ERROR, str(e))
                span.record_exception(e)
                raise
            finally:
                span.set_attribute("workflow.end_time", time.time())

Distributed tracing implementation provides end-to-end visibility into complex multi-agent workflows. Automatic instrumentation captures database and HTTP interactions, while custom workflow tracing enables correlation of business operations with infrastructure performance.

Performance Analysis Automation

# monitoring/performance_analyzer.py - Automated performance optimization
import statistics
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class PerformanceAnomaly:
    """Detected performance anomaly with context."""
    metric_name: str
    anomaly_type: str
    severity: str
    description: str
    suggested_actions: List[str]
    detection_time: datetime

class PerformanceAnalyzer:
    """Automated performance analysis and optimization recommendations."""

    def __init__(self, metrics_collector):
        self.metrics = metrics_collector
        self.baseline_data: Dict[str, List[float]] = {}
        self.anomaly_threshold = 2.0  # Standard deviations

    def analyze_performance_trends(self, hours: int = 24) -> List[PerformanceAnomaly]:
        """Analyze performance trends and detect anomalies."""
        anomalies = []

        # Analyze response time trends
        response_times = self._get_metric_history('response_time', hours)
        if response_times:
            anomaly = self._detect_response_time_anomalies(response_times)
            if anomaly:
                anomalies.append(anomaly)

        # Analyze resource utilization patterns
        cpu_usage = self._get_metric_history('cpu_usage', hours)
        memory_usage = self._get_metric_history('memory_usage', hours)

        if cpu_usage and memory_usage:
            resource_anomaly = self._detect_resource_anomalies(cpu_usage, memory_usage)
            if resource_anomaly:
                anomalies.append(resource_anomaly)

        return anomalies

    def _detect_response_time_anomalies(self, response_times: List[float]) -> Optional[PerformanceAnomaly]:
        """Detect response time anomalies using statistical analysis."""
        if len(response_times) < 10:
            return None

        mean = statistics.mean(response_times)
        stdev = statistics.stdev(response_times)

        # Recent data for comparison
        recent_times = response_times[-20:]
        recent_mean = statistics.mean(recent_times)

        if recent_mean > mean + (self.anomaly_threshold * stdev):
            return PerformanceAnomaly(
                metric_name="response_time",
                anomaly_type="latency_increase",
                severity="warning" if recent_mean < mean + (3 * stdev) else "critical",
                description=f"Response time increased significantly: {recent_mean:.2f}s vs baseline {mean:.2f}s",
                suggested_actions=[
                    "Check database query performance",
                    "Analyze resource utilization",
                    "Review recent deployments",
                    "Scale horizontal replicas if load increased"
                ],
                detection_time=datetime.now()
            )

        return None

Automated performance analysis uses statistical methods to detect anomalies and provide actionable recommendations. This proactive approach enables optimization before performance issues impact users, while suggested actions guide operations teams toward effective remediation strategies.

Advanced monitoring transforms data into insights, insights into actions, and actions into reliable, high-performing production systems.


Previous: Session 8 - Production Ready →
Next: Session 10 - Enterprise Integration →