Skip to content

Session 7 - Module B: Enterprise Agent Systems (30 minutes)

Prerequisites: Session 7 Core Section Complete
Target Audience: DevOps engineers and enterprise architects building production ADK systems
Cognitive Load: 3 enterprise concepts


Module Overview

This module explores production-scale ADK agent deployment including container orchestration, load balancing, monitoring systems, security hardening, and enterprise integration patterns. You'll learn to deploy robust, scalable ADK agents that can handle enterprise workloads with comprehensive observability and security.

Learning Objectives

By the end of this module, you will:

  • Implement containerized ADK agent deployment with Kubernetes orchestration
  • Design load balancing and auto-scaling systems for high-availability agent services
  • Build comprehensive monitoring and observability systems for production agents
  • Create security frameworks for enterprise ADK deployments with compliance requirements

Part 1: Production Deployment Architecture (15 minutes)

Step 1: Setting Up Enterprise Deployment Framework

Let's begin by building a comprehensive framework for deploying ADK agents at enterprise scale. This foundation will support various deployment strategies and enterprise requirements.

🗂️ File: src/session7/enterprise_deployment.py - Production deployment patterns

from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncio
import logging
import json
import os
import yaml
from pathlib import Path
from enum import Enum

This setup provides all the tools we need for sophisticated enterprise deployment including configuration management, asynchronous operations, and comprehensive logging.

Step 2: Defining Deployment Strategies

Now let's create an enum for different deployment strategies that enterprises commonly use:

class DeploymentStrategy(Enum):
    """ADK agent deployment strategies"""
    BLUE_GREEN = "blue_green"          # Zero-downtime deployments
    ROLLING = "rolling"                # Gradual instance replacement
    CANARY = "canary"                 # Progressive traffic routing
    A_B_TESTING = "ab_testing"        # Split traffic for testing

These deployment strategies give us flexibility for different business needs: blue-green for zero downtime, rolling for gradual updates, canary for risk mitigation, and A/B testing for feature validation.

Step 3: Creating Container Configuration

Let's build a comprehensive container configuration system:

@dataclass
class ContainerConfiguration:
    """Container configuration for ADK agents"""
    image: str
    tag: str = "latest"
    cpu_request: str = "500m"
    cpu_limit: str = "1000m"
    memory_request: str = "512Mi"
    memory_limit: str = "1Gi"
    environment_variables: Dict[str, str] = field(default_factory=dict)
    secrets: List[str] = field(default_factory=list)
    health_check_path: str = "/health"
    ready_check_path: str = "/ready"
    port: int = 8080

This configuration class provides enterprise-grade resource management with CPU/memory limits, security through secrets management, and health monitoring endpoints.

Step 4: Building Service Configuration

Next, let's create the service-level configuration for Kubernetes deployments:

@dataclass
class ServiceConfiguration:
    """Service configuration for ADK agent deployment"""
    service_name: str
    namespace: str = "adk-agents"
    replicas: int = 3
    max_replicas: int = 10
    target_cpu_utilization: int = 70
    deployment_strategy: DeploymentStrategy = DeploymentStrategy.ROLLING
    load_balancer_type: str = "Application"  # Application, Network, or Classic

This service configuration enables auto-scaling, namespace isolation, and flexible deployment strategies - essential for enterprise environments.

Step 5: Creating the Enterprise Deployment Manager

Now let's build the main deployment management class:

class EnterpriseADKDeployment:
    """Enterprise deployment manager for ADK agents"""

    def __init__(self, cluster_config: Dict[str, Any]):
        self.cluster_config = cluster_config
        self.deployment_templates = {}
        self.monitoring_config = {}
        self.security_policies = {}
        self.logger = logging.getLogger(__name__)

This deployment manager centralizes all deployment concerns including templates, monitoring, and security policies.

Step 6: Generating Kubernetes Manifests

Let's implement the comprehensive manifest generation system:

    def generate_kubernetes_manifests(self, 
                                    container_config: ContainerConfiguration,
                                    service_config: ServiceConfiguration) -> Dict[str, Any]:
        """Generate complete Kubernetes deployment manifests for ADK agents"""

        manifests = {
            "deployment": self._create_deployment_manifest(container_config, service_config),
            "service": self._create_service_manifest(service_config),
            "hpa": self._create_hpa_manifest(service_config),
            "ingress": self._create_ingress_manifest(service_config),
            "configmap": self._create_configmap_manifest(container_config, service_config),
            "secret": self._create_secret_manifest(container_config),
            "service_account": self._create_service_account_manifest(service_config),
            "network_policy": self._create_network_policy_manifest(service_config)
        }

        return manifests

This method generates all the Kubernetes resources needed for a complete enterprise deployment: deployment, service, autoscaling, ingress, configuration, secrets, security, and networking.

Step 7: Building the Deployment Manifest

Now let's create the core deployment manifest with enterprise security features. We'll build this step by step to understand each component:

    def _create_deployment_manifest(self, 
                                  container_config: ContainerConfiguration,
                                  service_config: ServiceConfiguration) -> Dict[str, Any]:
        """Create Kubernetes Deployment manifest for ADK agents"""

First, we define the basic deployment metadata and structure:

        return {
            "apiVersion": "apps/v1",
            "kind": "Deployment",
            "metadata": {
                "name": service_config.service_name,
                "namespace": service_config.namespace,
                "labels": {
                    "app": service_config.service_name,
                    "component": "adk-agent",
                    "version": container_config.tag,
                    "managed-by": "adk-deployment-manager"
                }
            },

These labels enable service discovery and management. The "managed-by" label helps identify deployments created by our deployment manager versus manual deployments.

Next, we configure the deployment strategy for zero-downtime updates:

            "spec": {
                "replicas": service_config.replicas,
                "strategy": {
                    "type": "RollingUpdate" if service_config.deployment_strategy == DeploymentStrategy.ROLLING else "Recreate",
                    "rollingUpdate": {
                        "maxUnavailable": 1,
                        "maxSurge": 1
                    }
                },
                "selector": {
                    "matchLabels": {
                        "app": service_config.service_name
                    }
                },

The RollingUpdate strategy ensures only one pod is unavailable at a time, maintaining service availability during updates. MaxSurge allows one extra pod during the update process for smooth transitions.

Now we define the pod template with monitoring annotations:

                "template": {
                    "metadata": {
                        "labels": {
                            "app": service_config.service_name,
                            "component": "adk-agent",
                            "version": container_config.tag
                        },
                        "annotations": {
                            "prometheus.io/scrape": "true",
                            "prometheus.io/port": str(container_config.port),
                            "prometheus.io/path": "/metrics"
                        }
                    },

The Prometheus annotations enable automatic metrics collection without additional configuration, essential for production observability.

We implement pod-level security context for defense in depth:

                    "spec": {
                        "serviceAccountName": f"{service_config.service_name}-sa",
                        "securityContext": {
                            "runAsNonRoot": True,
                            "runAsUser": 1000,
                            "fsGroup": 2000
                        },

Running as non-root with specific user/group IDs prevents privilege escalation attacks and follows security best practices.

The container specification includes the core runtime configuration:

                        "containers": [{
                            "name": "adk-agent",
                            "image": f"{container_config.image}:{container_config.tag}",
                            "imagePullPolicy": "Always",
                            "ports": [{
                                "containerPort": container_config.port,
                                "protocol": "TCP"
                            }],

"Always" pull policy ensures we get the latest security patches and updates, critical for production environments.

Environment variables provide runtime configuration and pod metadata:

                            "env": [
                                {"name": key, "value": value} 
                                for key, value in container_config.environment_variables.items()
                            ] + [
                                {
                                    "name": "POD_NAME",
                                    "valueFrom": {
                                        "fieldRef": {
                                            "fieldPath": "metadata.name"
                                        }
                                    }
                                },
                                {
                                    "name": "POD_NAMESPACE",
                                    "valueFrom": {
                                        "fieldRef": {
                                            "fieldPath": "metadata.namespace"
                                        }
                                    }
                                }
                            ],

Pod name and namespace are injected from Kubernetes metadata, enabling self-aware agents that can interact with the Kubernetes API.

Configuration and secrets are mounted from external sources:

                            "envFrom": [{
                                "configMapRef": {
                                    "name": f"{service_config.service_name}-config"
                                }
                            }, {
                                "secretRef": {
                                    "name": f"{service_config.service_name}-secrets"
                                }
                            }],

This separation ensures sensitive data never appears in deployment manifests and can be managed independently.

Resource limits prevent noisy neighbor problems:

                            "resources": {
                                "requests": {
                                    "cpu": container_config.cpu_request,
                                    "memory": container_config.memory_request
                                },
                                "limits": {
                                    "cpu": container_config.cpu_limit,
                                    "memory": container_config.memory_limit
                                }
                            },

Requests guarantee minimum resources while limits prevent resource exhaustion, ensuring predictable performance.

Health checks ensure only healthy pods receive traffic:

                            "livenessProbe": {
                                "httpGet": {
                                    "path": container_config.health_check_path,
                                    "port": container_config.port
                                },
                                "initialDelaySeconds": 30,
                                "periodSeconds": 10,
                                "timeoutSeconds": 5,
                                "failureThreshold": 3
                            },
                            "readinessProbe": {
                                "httpGet": {
                                    "path": container_config.ready_check_path,
                                    "port": container_config.port
                                },
                                "initialDelaySeconds": 5,
                                "periodSeconds": 5,
                                "timeoutSeconds": 3,
                                "failureThreshold": 1
                            },

Liveness probes restart unhealthy containers while readiness probes prevent traffic to containers not ready to serve requests.

Finally, container-level security hardening:

                            "securityContext": {
                                "allowPrivilegeEscalation": False,
                                "readOnlyRootFilesystem": True,
                                "capabilities": {
                                    "drop": ["ALL"]
                                }
                            }
                        }],
                        "volumes": [{
                            "name": "tmp",
                            "emptyDir": {}
                        }]
                    }
                }
            }
        }

Dropping all capabilities and using read-only root filesystem prevents many attack vectors. The tmp volume provides writable space for applications that need it.

This deployment manifest demonstrates enterprise-grade security with non-root users, read-only filesystems, dropped capabilities, comprehensive health checks, and proper resource management.

Step 8: Creating Horizontal Pod Autoscaler

Now let's implement the auto-scaling configuration. We'll build this step by step to understand each component of intelligent scaling:

    def _create_hpa_manifest(self, service_config: ServiceConfiguration) -> Dict[str, Any]:
        """Create Horizontal Pod Autoscaler manifest"""

First, let's define the basic autoscaler metadata and target reference:

        return {
            "apiVersion": "autoscaling/v2",
            "kind": "HorizontalPodAutoscaler",
            "metadata": {
                "name": f"{service_config.service_name}-hpa",
                "namespace": service_config.namespace
            },

This creates a Kubernetes HPA resource with proper naming conventions that follow the service configuration.

Next, we specify which deployment this autoscaler should manage:

            "spec": {
                "scaleTargetRef": {
                    "apiVersion": "apps/v1",
                    "kind": "Deployment",
                    "name": service_config.service_name
                },
                "minReplicas": service_config.replicas,
                "maxReplicas": service_config.max_replicas,

The scaleTargetRef ensures we're scaling the correct deployment, while min/max replicas provide boundaries for scaling operations. This prevents both under-provisioning and runaway scaling.

Now let's configure the metrics that trigger scaling decisions:

                "metrics": [{
                    "type": "Resource",
                    "resource": {
                        "name": "cpu",
                        "target": {
                            "type": "Utilization",
                            "averageUtilization": service_config.target_cpu_utilization
                        }
                    }
                }, {
                    "type": "Resource", 
                    "resource": {
                        "name": "memory",
                        "target": {
                            "type": "Utilization",
                            "averageUtilization": 80
                        }
                    }
                }],

We monitor both CPU and memory utilization. CPU scaling uses the configured threshold while memory uses a conservative 80% threshold. The autoscaler triggers when either metric exceeds its threshold.

Finally, we implement intelligent scaling behavior to prevent thrashing:

                "behavior": {
                    "scaleDown": {
                        "stabilizationWindowSeconds": 300,
                        "policies": [{
                            "type": "Percent",
                            "value": 10,
                            "periodSeconds": 60
                        }]
                    },
                    "scaleUp": {
                        "stabilizationWindowSeconds": 60,
                        "policies": [{
                            "type": "Percent",
                            "value": 50,
                            "periodSeconds": 60
                        }]
                    }
                }
            }
        }

The scaling behavior policies ensure smooth operations: scale-down waits 5 minutes for stabilization and only reduces by 10% per minute, while scale-up responds faster with 1-minute stabilization and can increase by 50% per minute. This prevents rapid oscillations while allowing quick response to traffic spikes.

This autoscaler provides intelligent scaling based on both CPU and memory utilization with controlled scale-up and scale-down policies to prevent thrashing.

Step 9: Building Load Balancer Manager

Let's create the sophisticated load balancing system:

class LoadBalancerManager:
    """Manages load balancing and traffic routing for ADK agents"""

    def __init__(self, cloud_provider: str = "gcp"):
        self.cloud_provider = cloud_provider
        self.load_balancer_configs = {}
        self.traffic_routing_rules = {}
        self.health_check_configs = {}
        self.logger = logging.getLogger(__name__)

This load balancer manager provides cloud-agnostic load balancing with comprehensive configuration management and health monitoring.

Step 10: Implementing Application Load Balancer

Now let's create the application load balancer configuration. We'll build this enterprise-grade load balancer step by step to understand each security and reliability component:

    def create_application_load_balancer(self, 
                                       service_name: str,
                                       target_groups: List[Dict[str, Any]],
                                       routing_rules: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create application load balancer configuration for ADK agents"""

First, let's establish the core load balancer configuration with enterprise security settings:

        lb_config = {
            "load_balancer": {
                "name": f"{service_name}-alb",
                "type": "application",
                "scheme": "internet-facing",
                "security_groups": [f"{service_name}-lb-sg"],
                "subnets": ["subnet-public-1", "subnet-public-2"],
                "tags": {
                    "Environment": "production",
                    "Service": service_name,
                    "ManagedBy": "adk-deployment"
                }
            },

This configuration creates an internet-facing Application Load Balancer with proper security group isolation and multi-AZ deployment across public subnets. The tagging ensures proper resource management and cost allocation.

Next, we configure the HTTPS listener with enterprise-grade SSL policies:

            "listeners": [{
                "port": 443,
                "protocol": "HTTPS",
                "ssl_policy": "ELBSecurityPolicy-TLS-1-2-2019-07",
                "certificate_arn": f"arn:aws:acm:region:account:certificate/{service_name}",
                "default_actions": [{
                    "type": "forward",
                    "target_group_arn": target_groups[0]["arn"]
                }],
                "rules": routing_rules
            },

The HTTPS listener uses TLS 1.2 minimum with AWS Certificate Manager for automatic certificate management and rotation. Custom routing rules enable sophisticated traffic management patterns.

Now we add the HTTP listener with automatic HTTPS redirection for security:

            {
                "port": 80,
                "protocol": "HTTP",
                "default_actions": [{
                    "type": "redirect",
                    "redirect_config": {
                        "protocol": "HTTPS",
                        "port": "443",
                        "status_code": "HTTP_301"
                    }
                }]
            }],

This ensures all HTTP traffic is automatically redirected to HTTPS with a permanent redirect, enforcing encryption for all communications.

Finally, we complete the configuration and store it for management:

            "target_groups": target_groups
        }

        self.load_balancer_configs[service_name] = lb_config
        return lb_config

The target groups contain the actual ADK agent instances, while storing the configuration enables centralized management and updates.

This load balancer configuration provides enterprise-grade features including SSL termination, security groups, multiple availability zones, and automatic HTTP to HTTPS redirection.

Step 11: Creating Canary Deployment System

Let's implement the sophisticated canary deployment traffic routing. We'll build this step by step to understand each component of intelligent risk mitigation:

    def create_canary_deployment_routing(self, 
                                       service_name: str,
                                       stable_weight: int = 90,
                                       canary_weight: int = 10) -> Dict[str, Any]:
        """Create canary deployment traffic routing configuration"""

First, let's establish the basic routing strategy and target configuration:

        canary_config = {
            "routing_strategy": "weighted",
            "total_weight": 100,

Weighted routing allows us to precisely control traffic distribution between stable and canary versions, essential for controlled risk exposure.

Now let's define the stable production target with comprehensive health monitoring:

            "targets": [{
                "target_group": f"{service_name}-stable",
                "weight": stable_weight,
                "version": "stable",
                "health_check": {
                    "path": "/health",
                    "interval": 30,
                    "timeout": 5,
                    "healthy_threshold": 2,
                    "unhealthy_threshold": 3
                }
            },

The stable target receives the majority of traffic with conservative health checking. The threshold settings ensure we quickly detect failures but avoid false positives.

Next, we configure the canary target with identical health monitoring:

            {
                "target_group": f"{service_name}-canary",
                "weight": canary_weight,
                "version": "canary",
                "health_check": {
                    "path": "/health",
                    "interval": 30,
                    "timeout": 5,
                    "healthy_threshold": 2,
                    "unhealthy_threshold": 3
                }
            }],

The canary target receives a small percentage of traffic, allowing us to validate new deployments with real user traffic while minimizing risk exposure.

Now we establish monitoring thresholds for automatic decision making:

            "monitoring": {
                "success_rate_threshold": 99.0,
                "error_rate_threshold": 1.0,
                "response_time_threshold_ms": 500,
                "minimum_traffic_sample": 100
            },

These monitoring thresholds define our quality gates: 99% success rate, less than 1% errors, sub-500ms response times, and minimum sample size for statistical significance.

Finally, we configure the automation logic for promotion and rollback:

            "automation": {
                "promote_on_success": True,
                "rollback_on_failure": True,
                "evaluation_duration_minutes": 30,
                "promotion_criteria": {
                    "success_rate_min": 99.5,
                    "error_rate_max": 0.5,
                    "response_time_p95_max": 300
                }
            }
        }

        return canary_config

The automation system evaluates canary performance for 30 minutes against strict criteria before automatically promoting successful deployments or rolling back failures, ensuring enterprise-grade reliability.

This canary deployment system provides intelligent traffic splitting with automated promotion/rollback based on performance metrics, reducing deployment risk.

Step 12: Building Enterprise Monitoring System

Now let's create the comprehensive monitoring framework:

class EnterpriseMonitoring:
    """Comprehensive monitoring system for production ADK agents"""

    def __init__(self, monitoring_stack: str = "prometheus"):
        self.monitoring_stack = monitoring_stack
        self.metric_definitions = {}
        self.alert_rules = {}
        self.dashboard_configs = {}
        self.logger = logging.getLogger(__name__)

This monitoring system provides enterprise-grade observability with configurable metrics, alerting, and dashboards.

Step 13: Setting Up Agent Metrics

Let's define comprehensive metrics for monitoring ADK agent performance. We'll organize these into logical categories to understand what each type of metric tells us:

    def setup_agent_metrics(self) -> Dict[str, Any]:
        """Define comprehensive metrics collection for ADK agents"""

First, let's establish the application-level metrics that track core agent functionality:

        metrics_config = {
            "application_metrics": {
                "adk_agent_requests_total": {
                    "type": "counter",
                    "description": "Total number of requests processed by ADK agent",
                    "labels": ["agent_id", "method", "endpoint", "status_code"]
                },
                "adk_agent_request_duration_seconds": {
                    "type": "histogram",
                    "description": "Request duration in seconds",
                    "labels": ["agent_id", "method", "endpoint"],
                    "buckets": [0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
                },

These core metrics track request volume and latency distribution. The histogram buckets are carefully chosen to capture both fast and slow operations, essential for SLA monitoring.

Next, we monitor conversation state and memory usage:

                "adk_agent_active_conversations": {
                    "type": "gauge",
                    "description": "Number of active conversations",
                    "labels": ["agent_id"]
                },
                "adk_agent_memory_usage_bytes": {
                    "type": "gauge",
                    "description": "Memory usage by conversation memory system",
                    "labels": ["agent_id", "memory_type"]
                },

Conversation metrics help us understand agent load patterns and memory efficiency, critical for capacity planning and leak detection.

Now let's track MCP server interactions for integration health:

                "adk_agent_mcp_calls_total": {
                    "type": "counter",
                    "description": "Total MCP server calls",
                    "labels": ["agent_id", "mcp_server", "method", "status"]
                },
                "adk_agent_mcp_call_duration_seconds": {
                    "type": "histogram",
                    "description": "MCP call duration in seconds",
                    "labels": ["agent_id", "mcp_server", "method"],
                    "buckets": [0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
                }
            },

MCP metrics enable monitoring of external integrations with faster bucket intervals since these are typically shorter operations than user requests.

Next, we define infrastructure-level metrics for container health:

            "infrastructure_metrics": {
                "adk_container_cpu_usage_percent": {
                    "type": "gauge",
                    "description": "Container CPU usage percentage",
                    "labels": ["pod_name", "container_name"]
                },
                "adk_container_memory_usage_bytes": {
                    "type": "gauge",
                    "description": "Container memory usage in bytes",
                    "labels": ["pod_name", "container_name"]
                },
                "adk_container_restarts_total": {
                    "type": "counter",
                    "description": "Total container restarts",
                    "labels": ["pod_name", "container_name"]
                }
            },

Infrastructure metrics provide the foundation for auto-scaling decisions and reliability monitoring at the container level.

Finally, we include business-level metrics for outcome tracking:

            "business_metrics": {
                "adk_successful_interactions_total": {
                    "type": "counter",
                    "description": "Total successful user interactions",
                    "labels": ["agent_id", "interaction_type"]
                },
                "adk_user_satisfaction_score": {
                    "type": "gauge",
                    "description": "User satisfaction score",
                    "labels": ["agent_id"]
                },
                "adk_cost_per_interaction": {
                    "type": "gauge",
                    "description": "Cost per user interaction in dollars",
                    "labels": ["agent_id", "cost_center"]
                }
            }
        }

        self.metric_definitions = metrics_config
        return metrics_config

Business metrics connect technical performance to business outcomes, enabling data-driven optimization and cost management decisions.

This comprehensive metrics system covers application performance, infrastructure health, and business KPIs - essential for enterprise monitoring.

Step 14: Creating Intelligent Alerting System

Now let's implement comprehensive alerting rules for proactive issue detection. We'll organize these by criticality and response time requirements:

    def create_alerting_rules(self) -> List[Dict[str, Any]]:
        """Create comprehensive alerting rules for ADK agents"""

First, let's define critical application-level alerts for immediate response:

        alert_rules = [
            {
                "alert": "ADKAgentHighErrorRate",
                "expr": 'rate(adk_agent_requests_total{status_code=~"5.."}[5m]) / rate(adk_agent_requests_total[5m]) > 0.05',
                "for": "2m",
                "labels": {
                    "severity": "critical",
                    "component": "adk-agent"
                },
                "annotations": {
                    "summary": "ADK Agent {{ $labels.agent_id }} has high error rate",
                    "description": "ADK Agent {{ $labels.agent_id }} error rate is {{ $value | humanizePercentage }} which is above 5% threshold"
                }
            },

High error rates (>5%) trigger critical alerts after 2 minutes, ensuring rapid response to service degradation that affects user experience.

Next, we monitor latency performance with warning-level alerts:

            {
                "alert": "ADKAgentHighLatency",
                "expr": 'histogram_quantile(0.95, rate(adk_agent_request_duration_seconds_bucket[5m])) > 2',
                "for": "5m",
                "labels": {
                    "severity": "warning",
                    "component": "adk-agent"
                },
                "annotations": {
                    "summary": "ADK Agent {{ $labels.agent_id }} has high latency",
                    "description": "ADK Agent {{ $labels.agent_id }} 95th percentile latency is {{ $value }}s which is above 2s threshold"
                }
            },

Latency alerts use the 95th percentile with a 5-minute evaluation period, balancing noise reduction with timely detection of performance issues.

Now let's monitor external integration health:

            {
                "alert": "ADKAgentMCPCallFailures",
                "expr": 'rate(adk_agent_mcp_calls_total{status="error"}[5m]) > 0.1',
                "for": "3m",
                "labels": {
                    "severity": "warning",
                    "component": "adk-agent"
                },
                "annotations": {
                    "summary": "ADK Agent {{ $labels.agent_id }} has high MCP call failure rate",
                    "description": "ADK Agent {{ $labels.agent_id }} MCP calls to {{ $labels.mcp_server }} are failing at {{ $value }} calls per second"
                }
            },

MCP failure alerts help identify integration issues before they impact user experience, with moderate thresholds for external dependencies.

Let's add proactive memory leak detection:

            {
                "alert": "ADKAgentMemoryLeak",
                "expr": 'increase(adk_agent_memory_usage_bytes[30m]) > 104857600',  # 100MB increase
                "for": "10m",
                "labels": {
                    "severity": "warning",
                    "component": "adk-agent"
                },
                "annotations": {
                    "summary": "Potential memory leak in ADK Agent {{ $labels.agent_id }}",
                    "description": "ADK Agent {{ $labels.agent_id }} memory usage has increased by {{ $value | humanizeBytes }} in the last 30 minutes"
                }
            },

Memory leak detection uses a 30-minute window to identify concerning trends before they cause outages, giving time for proactive intervention.

Finally, we monitor infrastructure-level resource constraints:

            {
                "alert": "ADKContainerCPUThrottling",
                "expr": 'adk_container_cpu_usage_percent > 90',
                "for": "10m",
                "labels": {
                    "severity": "warning",
                    "component": "adk-container"
                },
                "annotations": {
                    "summary": "ADK Container {{ $labels.pod_name }} is experiencing CPU throttling",
                    "description": "Container {{ $labels.container_name }} in pod {{ $labels.pod_name }} has been using {{ $value }}% CPU for more than 10 minutes"
                }
            }
        ]

        self.alert_rules = alert_rules
        return alert_rules

CPU throttling alerts identify resource constraints that could impact performance, enabling proactive capacity management and optimization.

These alerting rules provide comprehensive coverage for error rates, latency, MCP failures, memory leaks, and resource throttling - essential for maintaining service quality.

Step 15: Building Security Management System

Let's create the enterprise security framework:

class SecurityManager:
    """Enterprise security management for ADK agent deployments"""

    def __init__(self):
        self.security_policies = {}
        self.compliance_frameworks = ["SOC2", "GDPR", "HIPAA", "PCI-DSS"]
        self.encryption_configs = {}
        self.access_control_policies = {}
        self.logger = logging.getLogger(__name__)

This security manager provides comprehensive security governance with support for major compliance frameworks.

Step 16: Implementing Comprehensive Security Policies

Now let's create the detailed security configuration. We'll build this defense-in-depth strategy layer by layer to understand each security control:

    def create_security_policies(self) -> Dict[str, Any]:
        """Create comprehensive security policies for ADK agents"""

First, let's establish network-level security with zero-trust networking:

        security_config = {
            "network_policies": {
                "default_deny": True,
                "ingress_rules": [
                    {
                        "description": "Allow ingress from load balancer",
                        "ports": [{"protocol": "TCP", "port": 8080}],
                        "sources": [{"namespaceSelector": {"matchLabels": {"name": "ingress-nginx"}}}]
                    },
                    {
                        "description": "Allow ingress from monitoring",
                        "ports": [{"protocol": "TCP", "port": 9090}],
                        "sources": [{"namespaceSelector": {"matchLabels": {"name": "monitoring"}}}]
                    }
                ],

The default-deny policy ensures no traffic is allowed unless explicitly permitted. We only allow ingress from the load balancer and monitoring systems on specific ports.

Next, we define controlled egress for external dependencies:

                "egress_rules": [
                    {
                        "description": "Allow DNS resolution",
                        "ports": [{"protocol": "UDP", "port": 53}],
                        "destinations": [{"namespaceSelector": {"matchLabels": {"name": "kube-system"}}}]
                    },
                    {
                        "description": "Allow HTTPS to external APIs",
                        "ports": [{"protocol": "TCP", "port": 443}],
                        "destinations": [{}]  # Allow all external HTTPS
                    }
                ]
            },

Egress rules limit outbound traffic to essential services: DNS resolution within the cluster and HTTPS to external APIs for model interactions.

Now let's implement pod-level security hardening:

            "pod_security_standards": {
                "security_context": {
                    "runAsNonRoot": True,
                    "runAsUser": 1000,
                    "runAsGroup": 2000,
                    "fsGroup": 2000,
                    "seccompProfile": {
                        "type": "RuntimeDefault"
                    }
                },
                "container_security": {
                    "allowPrivilegeEscalation": False,
                    "readOnlyRootFilesystem": True,
                    "capabilities": {
                        "drop": ["ALL"]
                    }
                }
            },

Pod security enforces non-root execution, drops all Linux capabilities, prevents privilege escalation, and uses read-only filesystems with seccomp filtering.

Let's add comprehensive secrets management:

            "secrets_management": {
                "encryption_at_rest": True,
                "encryption_in_transit": True,
                "secret_rotation": {
                    "enabled": True,
                    "rotation_period_days": 90
                },
                "secret_scanning": {
                    "enabled": True,
                    "scan_frequency": "daily"
                }
            },

Secrets management ensures encryption everywhere, automatic rotation every 90 days, and daily scanning for exposed credentials or vulnerabilities.

Finally, we implement compliance and governance controls:

            "compliance_controls": {
                "audit_logging": {
                    "enabled": True,
                    "retention_days": 365,
                    "log_level": "INFO"
                },
                "data_classification": {
                    "enabled": True,
                    "classification_levels": ["public", "internal", "confidential", "restricted"]
                },
                "access_controls": {
                    "rbac_enabled": True,
                    "service_account_tokens": False,
                    "pod_security_admission": "enforce"
                }
            }
        }

        return security_config

Compliance controls provide audit trails for regulatory requirements, data classification for proper handling, and strict RBAC with pod security admission enforcement.

This security configuration provides defense-in-depth with network segmentation, pod security standards, comprehensive secrets management, and compliance controls for enterprise environments.


Part 2: Advanced Monitoring and Observability (15 minutes)

Step 17: Building Advanced Observability Stack

Now let's create a comprehensive observability system that provides deep insights into ADK agent behavior and performance.

🗂️ File: src/session7/observability_stack.py - Comprehensive monitoring systems

from typing import Dict, List, Any, Optional
import asyncio
import logging
from datetime import datetime, timedelta
import json

This setup provides the foundation for building enterprise-grade observability including distributed tracing, logging aggregation, and SLO management.

Step 18: Creating Distributed Tracing Manager

Let's implement distributed tracing to track requests across multiple services:

class DistributedTracingManager:
    """Manages distributed tracing for ADK agent interactions"""

    def __init__(self, tracing_backend: str = "jaeger"):
        self.tracing_backend = tracing_backend
        self.trace_configs = {}
        self.sampling_strategies = {}
        self.logger = logging.getLogger(__name__)

This tracing manager enables comprehensive distributed tracing across ADK agent services, essential for debugging complex enterprise workflows.

Step 19: Configuring Tracing for ADK Services

Now let's implement the tracing configuration system:

    def configure_tracing(self, service_name: str) -> Dict[str, Any]:
        """Configure distributed tracing for ADK agent service"""

        tracing_config = {
            "service_name": service_name,
            "tracing_backend": self.tracing_backend,
            "sampling_strategy": {
                "type": "probabilistic",
                "param": 0.1  # Sample 10% of traces
            },
            "span_attributes": {
                "service.name": service_name,
                "service.version": "1.0.0",
                "deployment.environment": "production"
            },
            "instrumentation": {
                "http_requests": True,
                "database_calls": True,
                "external_apis": True,
                "mcp_calls": True,
                "conversation_flows": True
            },
            "exporters": [{
                "type": "jaeger",
                "endpoint": "http://jaeger-collector:14268/api/traces",
                "headers": {
                    "X-Service-Name": service_name
                }
            }, {
                "type": "console",
                "enabled": False  # Disable for production
            }]
        }

        return tracing_config

This configuration enables comprehensive tracing with intelligent sampling, proper service identification, and automatic instrumentation of key components.

Step 20: Building Log Aggregation System

Let's create the centralized logging infrastructure:

class LogAggregationManager:
    """Manages centralized logging for ADK agent systems"""

    def __init__(self, log_backend: str = "elasticsearch"):
        self.log_backend = log_backend
        self.log_configs = {}
        self.retention_policies = {}
        self.logger = logging.getLogger(__name__)

This log aggregation manager provides centralized, structured logging with configurable backends and retention policies.

Step 21: Configuring Structured Logging

Now let's implement comprehensive structured logging configuration. We'll build this enterprise logging system component by component:

    def configure_structured_logging(self, service_name: str) -> Dict[str, Any]:
        """Configure structured logging for ADK agents"""

First, let's define the logging formatters for different output destinations:

        logging_config = {
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "json": {
                    "format": "%(asctime)s %(name)s %(levelname)s %(message)s",
                    "class": "pythonjsonlogger.jsonlogger.JsonFormatter"
                },
                "detailed": {
                    "format": "%(asctime)s [%(levelname)s] %(name)s (%(filename)s:%(lineno)d) - %(message)s"
                }
            },

The JSON formatter provides machine-readable logs for automated processing, while the detailed formatter offers human-readable logs for debugging.

Next, let's configure the various log handlers for different destinations:

            "handlers": {
                "console": {
                    "class": "logging.StreamHandler",
                    "level": "INFO",
                    "formatter": "json",
                    "stream": "ext://sys.stdout"
                },
                "file": {
                    "class": "logging.handlers.RotatingFileHandler",
                    "level": "DEBUG",
                    "formatter": "detailed",
                    "filename": f"/var/log/{service_name}.log",
                    "maxBytes": 10485760,  # 10MB
                    "backupCount": 5
                },

Console output uses JSON for container log aggregation, while file logging captures detailed debug information with automatic rotation to prevent disk exhaustion.

Now we add centralized log aggregation for enterprise monitoring:

                "fluentd": {
                    "class": "fluent.handler.FluentHandler",
                    "level": "INFO",
                    "formatter": "json",
                    "tag": f"adk.{service_name}",
                    "host": "fluentd-service",
                    "port": 24224
                }
            },

Fluentd integration enables centralized log collection and forwarding to enterprise SIEM systems and monitoring platforms.

Next, we configure service-specific loggers with appropriate verbosity levels:

            "loggers": {
                service_name: {
                    "level": "INFO",
                    "handlers": ["console", "fluentd"],
                    "propagate": False
                },
                "adk.agents": {
                    "level": "DEBUG",
                    "handlers": ["console", "file", "fluentd"],
                    "propagate": False
                },
                "adk.mcp": {
                    "level": "INFO",
                    "handlers": ["console", "fluentd"],
                    "propagate": False
                }
            },

Agent components get debug-level logging for troubleshooting, while MCP interactions use info-level to avoid noise from frequent API calls.

Finally, we set conservative defaults for all other logging:

            "root": {
                "level": "WARNING",
                "handlers": ["console"]
            }
        }

        return logging_config

Root logger defaults to warnings only, preventing verbose third-party library logs from overwhelming the system while ensuring important issues are captured.

This structured logging configuration provides JSON formatting for machine parsing, multiple output handlers, and service-specific log levels for optimal observability.

Step 22: Creating SLO Management System

Let's implement Service Level Objectives for enterprise-grade reliability:

class SLOManager:
    """Manages Service Level Objectives for ADK agents"""

    def __init__(self):
        self.slo_definitions = {}
        self.sli_queries = {}
        self.error_budgets = {}
        self.logger = logging.getLogger(__name__)

This SLO manager provides enterprise-grade reliability management with comprehensive error budget tracking and burn rate alerting.

Step 23: Defining Comprehensive SLOs

Now let's implement detailed Service Level Objectives for our ADK agents. We'll build these enterprise reliability standards step by step:

    def define_agent_slos(self, service_name: str) -> Dict[str, Any]:
        """Define comprehensive SLOs for ADK agent service"""

First, let's establish the SLO framework with the measurement period:

        slo_config = {
            "service_name": service_name,
            "slo_period_days": 30,

A 30-day SLO period provides sufficient data for statistical significance while remaining actionable for monthly business planning.

Next, let's define availability objectives with precise error budget calculations:

            "objectives": [
                {
                    "name": "availability",
                    "description": "ADK agent service availability",
                    "target": 99.9,  # 99.9% availability
                    "sli_query": f'avg_over_time(up{{job="{service_name}"}}[5m])',
                    "error_budget_minutes": 43.2  # 0.1% of 30 days
                },

99.9% availability allows 43.2 minutes of downtime per month, balancing business needs with engineering constraints for a critical service.

Now let's add latency objectives focused on user experience:

                {
                    "name": "latency",
                    "description": "95th percentile response time under 500ms",
                    "target": 95.0,  # 95% of requests under 500ms
                    "sli_query": f'histogram_quantile(0.95, rate(adk_agent_request_duration_seconds_bucket{{service="{service_name}"}}[5m])) < 0.5',
                    "error_budget_requests": 5.0  # 5% error budget
                },

95th percentile latency under 500ms ensures excellent user experience while allowing for occasional slower operations like complex analysis tasks.

Let's define quality objectives for request success rates:

                {
                    "name": "quality",
                    "description": "Request success rate",
                    "target": 99.5,  # 99.5% success rate
                    "sli_query": f'rate(adk_agent_requests_total{{service="{service_name}",status_code!~"5.."}}[5m]) / rate(adk_agent_requests_total{{service="{service_name}"}}[5m])',
                    "error_budget_requests": 0.5  # 0.5% error budget
                },

99.5% success rate provides high reliability while acknowledging that some requests may fail due to external dependencies or edge cases.

Now we add user satisfaction as a business-aligned objective:

                {
                    "name": "user_satisfaction",
                    "description": "User satisfaction score above 4.0",
                    "target": 90.0,  # 90% of users rate > 4.0
                    "sli_query": f'avg_over_time(adk_user_satisfaction_score{{service="{service_name}"}}[1h]) > 4.0',
                    "error_budget_satisfaction": 10.0  # 10% can be below threshold
                }
            ],

User satisfaction tracking ensures technical reliability translates to business value, with 90% of users rating their experience above 4.0 out of 5.

Finally, let's implement intelligent error budget burn rate alerting:

            "alerting": {
                "burn_rate_alerts": [
                    {
                        "alert_name": f"{service_name}_ErrorBudgetBurn_Fast",
                        "burn_rate_threshold": 14.4,  # 2% in 1 hour
                        "lookback_window": "1h",
                        "severity": "critical"
                    },
                    {
                        "alert_name": f"{service_name}_ErrorBudgetBurn_Slow",
                        "burn_rate_threshold": 6.0,  # 5% in 6 hours
                        "lookback_window": "6h",
                        "severity": "warning"
                    }
                ]
            }
        }

        return slo_config

Burn rate alerts provide early warning when error budgets are being consumed too quickly, enabling proactive intervention before SLO violations occur.

This comprehensive SLO system tracks availability, latency, quality, and user satisfaction with intelligent error budget management and burn rate alerting - essential for enterprise reliability standards.

With these 23 detailed steps, we've built a complete enterprise ADK agent deployment system that provides production-ready containerization, sophisticated load balancing, comprehensive security, and enterprise-grade observability capabilities.


Module Summary

You've now mastered enterprise-scale ADK agent deployment:

Production Deployment: Implemented containerized deployment with Kubernetes orchestration and auto-scaling
Load Balancing: Built sophisticated traffic routing with canary deployments and health monitoring
Security Framework: Created comprehensive security policies with compliance controls
Observability Stack: Designed distributed tracing, structured logging, and SLO management systems


📝 Multiple Choice Test - Module B

Test your understanding of enterprise agent systems and production deployment:

Question 1: What deployment strategy does the enterprise system use for zero-downtime updates? A) Blue-green deployments with traffic switching
B) Manual server restart
C) Rolling updates with Kubernetes orchestration and canary deployment capability
D) Single instance deployment

Question 2: How does the load balancer determine traffic routing for agent requests? A) Random distribution
B) Health-based routing with weighted distribution and canary traffic management
C) First-available agent
D) Round-robin only

Question 3: What security controls does the SecurityManager implement for enterprise compliance? A) Basic authentication only
B) Role-based access control, audit logging, and compliance policy enforcement
C) No security controls
D) Manual authorization

Question 4: What observability features does the enterprise system provide for production monitoring? A) Basic logging only
B) Distributed tracing, structured logging, metrics collection, and SLO management
C) Manual monitoring
D) Error logs only

Question 5: How does the auto-scaling system respond to load changes? A) Manual scaling only
B) CPU and memory-based horizontal pod autoscaling with custom metrics
C) Fixed instance count
D) Manual load balancing

🗂️ View Test Solutions →

Next Steps


🗂️ Source Files for Module B:

  • src/session7/enterprise_deployment.py - Production deployment systems
  • src/session7/observability_stack.py - Comprehensive monitoring
  • src/session7/security_manager.py - Enterprise security controls