Session 9: Production Agent Deployment - Test Solutions¶
📝 Multiple Choice Test¶
Question 1: Container Orchestration¶
What is the primary benefit of using Kubernetes for production agent deployment?
A) Better security by default
B) Auto-scaling, service discovery, and resource management ✅
C) Lower costs
D) Simpler development
Correct Answer: B) Auto-scaling, service discovery, and resource management
Explanation: Kubernetes provides comprehensive container orchestration including auto-scaling based on demand, service discovery for dynamic routing, and sophisticated resource management across clusters.
Question 2: High Availability¶
What uptime target is typically expected for production agent systems?
A) 99.9%+ ✅
B) 98%
C) 90%
D) 95%
Correct Answer: A) 99.9%+
Explanation: Production agent systems typically target 99.9%+ uptime (8.76 hours downtime per year) to meet enterprise requirements for mission-critical applications.
Question 3: Service Mesh¶
What primary benefit does Istio provide in production agent deployments?
A) Simpler configuration
B) Lower resource usage
C) Faster execution
D) Secure service-to-service communication with traffic management ✅
Correct Answer: D) Secure service-to-service communication with traffic management
Explanation: Istio service mesh provides secure service-to-service communication, traffic management, load balancing, and observability without requiring application code changes.
Question 4: Configuration Management¶
Why is centralized configuration management important for production agent systems?
A) Enables consistent configuration across environments and version control ✅
B) Improves performance
C) Simplifies testing
D) Reduces development time
Correct Answer: A) Enables consistent configuration across environments and version control
Explanation: Centralized configuration management ensures consistent settings across environments, enables version control of configurations, and supports dynamic configuration updates without redeployment.
Question 5: Auto-scaling Triggers¶
What metrics should trigger auto-scaling in production agent systems?
A) Network bandwidth only
B) Memory usage only
C) CPU usage, memory usage, queue depth, and response time ✅
D) CPU usage only
Correct Answer: C) CPU usage, memory usage, queue depth, and response time
Explanation: Effective auto-scaling uses multiple metrics including CPU, memory, message queue depth, and response time to make informed scaling decisions based on actual system demand.
Question 6: Observability Stack¶
What are the three pillars of observability for production agent systems?
A) Metrics, logs, and distributed tracing ✅
B) Alerts, dashboards, reports
C) Monitoring, testing, deployment
D) CPU, Memory, Disk
Correct Answer: A) Metrics, logs, and distributed tracing
Explanation: The three pillars of observability are metrics (quantitative data), logs (detailed event records), and distributed tracing (request flow tracking) for comprehensive system visibility.
Question 7: Secrets Management¶
How should sensitive information be handled in Kubernetes agent deployments?
A) Environment variables in deployment files
B) Configuration files in containers
C) Hard-coded in application code
D) Kubernetes Secrets with encryption at rest ✅
Correct Answer: D) Kubernetes Secrets with encryption at rest
Explanation: Kubernetes Secrets provide secure storage for sensitive information with encryption at rest, access controls, and automatic mounting into containers without exposing values in deployment configurations.
Question 8: CI/CD Pipeline¶
What testing approach is recommended for production agent deployments?
A) No testing required
B) Manual testing only
C) Production testing only
D) Automated testing with staging environment validation ✅
Correct Answer: D) Automated testing with staging environment validation
Explanation: Production deployments require automated testing pipelines including unit tests, integration tests, and validation in staging environments that mirror production conditions.
Question 9: Resource Optimization¶
What Kubernetes feature helps optimize resource utilization in agent deployments?
A) No resource management
B) Resource requests and limits with horizontal pod autoscaling ✅
C) Manual resource allocation
D) Fixed resource assignments
Correct Answer: B) Resource requests and limits with horizontal pod autoscaling
Explanation: Resource requests and limits combined with horizontal pod autoscaling ensure efficient resource utilization by guaranteeing minimum resources while preventing resource hogging and enabling dynamic scaling.
Question 10: Disaster Recovery¶
What is essential for disaster recovery in production agent systems?
A) Single data center with backups
B) Multi-region deployment with automated failover ✅
C) Daily backups only
D) Manual recovery procedures
Correct Answer: B) Multi-region deployment with automated failover
Explanation: Disaster recovery requires multi-region deployment with automated failover capabilities, ensuring system availability even during regional outages or major infrastructure failures.
Scoring Guide¶
- 10 correct: Expert level - Ready for enterprise-scale agent system operations
- 8-9 correct: Proficient - Strong understanding of production deployment practices
- 6-7 correct: Competent - Good grasp of container orchestration and monitoring
- 4-5 correct: Developing - Review Kubernetes and observability concepts
- Below 4: Beginner - Revisit production deployment fundamentals
Key Concepts Summary¶
- Container Orchestration: Kubernetes provides auto-scaling and resource management
- High Availability: 99.9%+ uptime through redundancy and failover
- Service Mesh: Istio enables secure service communication and traffic management
- Observability: Metrics, logs, and tracing provide comprehensive system visibility
- Resource Optimization: Requests/limits with autoscaling for efficient utilization
Practical Exercise Solution¶
Challenge: Design and implement a complete production-ready multi-agent system.
Complete Production-Ready Multi-Agent System:¶
# production/capstone_system.py
# Core system imports for production-ready multi-agent deployment
import asyncio
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import yaml
import json
# Import our custom agent framework components
from workflows.advanced_engine import AdvancedWorkflowEngine
from a2a.router import MessageRouter
from a2a.registry import AgentRegistry
from agents.weather_agent import WeatherAgent
from agents.planning_agent import PlanningAgent
from monitoring.agent_metrics import AgentMetrics, HealthChecker
# Initialize structured logging for production monitoring
logger = logging.getLogger(__name__)
@dataclass
class SystemConfiguration:
"""Production system configuration with environment-specific settings."""
environment: str = "production"
log_level: str = "INFO"
metrics_enabled: bool = True
health_check_interval: int = 30
max_agents: int = 100
auto_scaling_enabled: bool = True
security_enabled: bool = True
# Kubernetes deployment configuration
kubernetes_config: Dict[str, Any] = field(default_factory=lambda: {
"namespace": "agent-system",
"replicas": 3,
"cpu_request": "500m",
"memory_request": "512Mi",
"cpu_limit": "2000m",
"memory_limit": "2Gi"
})
# Monitoring and observability
monitoring_config: Dict[str, Any] = field(default_factory=lambda: {
"prometheus_enabled": True,
"grafana_dashboards": True,
"alert_manager": True,
"log_aggregation": "elasticsearch"
})
class ProductionAgentSystem:
"""Enterprise-grade production multi-agent system."""
def __init__(self, config: SystemConfiguration):
self.config = config
self.workflow_engine = AdvancedWorkflowEngine()
self.message_router = MessageRouter()
self.agent_registry = AgentRegistry()
self.metrics = AgentMetrics()
self.health_checker = HealthChecker()
self.agents: Dict[str, Any] = {}
self.system_status = "initializing"
# Initialize system components
self._setup_logging()
self._initialize_monitoring()
def _setup_logging(self):
"""Configure structured logging for production monitoring."""
logging.basicConfig(
level=getattr(logging, self.config.log_level),
format='{"timestamp": "%(asctime)s", "level": "%(levelname)s", "service": "agent-system", "message": "%(message)s"}',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/var/log/agent-system/system.log')
]
)
def _initialize_monitoring(self):
"""Initialize comprehensive monitoring and observability."""
if self.config.monitoring_config["prometheus_enabled"]:
self.metrics.initialize_prometheus_metrics()
# Health check endpoints for Kubernetes liveness/readiness probes
self.health_checker.register_health_check("system", self._system_health_check)
self.health_checker.register_health_check("agents", self._agents_health_check)
self.health_checker.register_health_check("workflows", self._workflows_health_check)
async def initialize_system(self) -> bool:
"""Initialize the complete production agent system."""
try:
logger.info("Starting production agent system initialization")
# Step 1: Initialize core components
await self._initialize_core_components()
# Step 2: Deploy standard agent fleet
await self._deploy_agent_fleet()
# Step 3: Setup workflows and routing
await self._configure_workflows()
# Step 4: Start monitoring and health checks
await self._start_system_monitoring()
# Step 5: Validate system readiness
await self._validate_system_readiness()
self.system_status = "operational"
logger.info("Production agent system initialization completed successfully")
return True
except Exception as e:
logger.error(f"System initialization failed: {str(e)}")
self.system_status = "failed"
raise
async def _initialize_core_components(self):
"""Initialize workflow engine, message router, and registry."""
# Initialize workflow engine with production settings
await self.workflow_engine.initialize(
max_concurrent_workflows=50,
persistence_enabled=True,
backup_enabled=True
)
# Initialize message router with load balancing
await self.message_router.initialize(
routing_strategy="load_balanced",
retry_policy="exponential_backoff",
dead_letter_queue=True
)
# Initialize agent registry with clustering support
await self.agent_registry.initialize(
clustering_enabled=True,
replication_factor=3,
consistency_level="strong"
)
logger.info("Core components initialized successfully")
async def _deploy_agent_fleet(self):
"""Deploy the standard fleet of production agents."""
# Deploy Weather Agent with high availability
weather_agent = WeatherAgent(
agent_id="weather_primary",
config={
"replicas": 2,
"failover_enabled": True,
"cache_enabled": True,
"api_rate_limit": 1000
}
)
await self._deploy_agent(weather_agent)
# Deploy Planning Agent with load balancing
planning_agent = PlanningAgent(
agent_id="planning_primary",
config={
"replicas": 3,
"load_balancing": True,
"optimization_enabled": True,
"batch_processing": True
}
)
await self._deploy_agent(planning_agent)
# Deploy Customer Service Agent
customer_service_agent = CustomerServiceAgent(
agent_id="customer_service",
config={
"replicas": 4,
"priority_queues": True,
"escalation_enabled": True,
"sla_monitoring": True
}
)
await self._deploy_agent(customer_service_agent)
# Deploy Data Analysis Agent
data_analysis_agent = DataAnalysisAgent(
agent_id="data_analysis",
config={
"replicas": 2,
"parallel_processing": True,
"resource_intensive": True,
"gpu_enabled": True
}
)
await self._deploy_agent(data_analysis_agent)
# Deploy Security Monitoring Agent
security_agent = SecurityMonitoringAgent(
agent_id="security_monitor",
config={
"replicas": 2,
"real_time_monitoring": True,
"threat_detection": True,
"incident_response": True
}
)
await self._deploy_agent(security_agent)
logger.info(f"Deployed {len(self.agents)} agents in production fleet")
async def _deploy_agent(self, agent: Any):
"""Deploy individual agent with production configuration."""
try:
# Register agent in the registry
await self.agent_registry.register_agent(
agent_id=agent.agent_id,
capabilities=agent.get_capabilities(),
metadata={
"deployment_time": datetime.now().isoformat(),
"version": agent.version,
"config": agent.config
}
)
# Initialize agent with monitoring
await agent.initialize()
# Setup health monitoring
self.health_checker.register_agent_health_check(
agent.agent_id,
agent.health_check
)
# Add to active agents
self.agents[agent.agent_id] = agent
# Update metrics
self.metrics.increment_counter("agents_deployed")
logger.info(f"Agent {agent.agent_id} deployed successfully")
except Exception as e:
logger.error(f"Failed to deploy agent {agent.agent_id}: {str(e)}")
raise
async def _configure_workflows(self):
"""Configure production workflows with fault tolerance."""
# Travel planning workflow with multiple agents
travel_workflow = await self.workflow_engine.create_workflow(
workflow_id="travel_planning_production",
steps=[
{
"id": "weather_check",
"agent_id": "weather_primary",
"action": "get_forecast",
"timeout": 30,
"retry_count": 3
},
{
"id": "route_planning",
"agent_id": "planning_primary",
"action": "plan_route",
"dependencies": ["weather_check"],
"timeout": 60,
"retry_count": 2
},
{
"id": "accommodation_search",
"agent_id": "planning_primary",
"action": "find_accommodation",
"dependencies": ["weather_check"],
"timeout": 45,
"parallel": True
}
],
fault_tolerance={
"rollback_enabled": True,
"compensation_actions": True,
"circuit_breaker": True
}
)
# Customer service workflow
customer_service_workflow = await self.workflow_engine.create_workflow(
workflow_id="customer_service_production",
steps=[
{
"id": "inquiry_classification",
"agent_id": "customer_service",
"action": "classify_inquiry",
"timeout": 15
},
{
"id": "issue_resolution",
"agent_id": "customer_service",
"action": "resolve_issue",
"dependencies": ["inquiry_classification"],
"timeout": 300
},
{
"id": "quality_assessment",
"agent_id": "data_analysis",
"action": "assess_resolution_quality",
"dependencies": ["issue_resolution"],
"timeout": 30
}
],
sla_requirements={
"max_response_time": 600, # 10 minutes
"success_rate_threshold": 0.95
}
)
logger.info("Production workflows configured successfully")
async def _start_system_monitoring(self):
"""Start comprehensive system monitoring."""
# Start health check monitoring
await self.health_checker.start_monitoring(
interval_seconds=self.config.health_check_interval
)
# Start metrics collection
await self.metrics.start_collection()
# Setup alerting rules
await self._configure_alerting()
logger.info("System monitoring started successfully")
async def _configure_alerting(self):
"""Configure production alerting rules."""
alerting_rules = [
{
"name": "agent_failure_rate_high",
"condition": "agent_failure_rate > 0.05",
"severity": "critical",
"action": "restart_agent"
},
{
"name": "workflow_latency_high",
"condition": "workflow_p95_latency > 300",
"severity": "warning",
"action": "scale_agents"
},
{
"name": "system_memory_high",
"condition": "system_memory_usage > 0.85",
"severity": "warning",
"action": "optimize_resources"
},
{
"name": "security_threats_detected",
"condition": "security_threats_count > 0",
"severity": "critical",
"action": "activate_security_protocol"
}
]
for rule in alerting_rules:
await self.metrics.configure_alert(rule)
async def _validate_system_readiness(self):
"""Validate that all system components are ready for production."""
readiness_checks = [
("workflow_engine", self.workflow_engine.is_ready),
("message_router", self.message_router.is_ready),
("agent_registry", self.agent_registry.is_ready),
("health_checker", self.health_checker.is_ready),
("metrics_system", self.metrics.is_ready)
]
failed_checks = []
for check_name, check_func in readiness_checks:
try:
if await check_func():
logger.info(f"Readiness check passed: {check_name}")
else:
failed_checks.append(check_name)
logger.error(f"Readiness check failed: {check_name}")
except Exception as e:
failed_checks.append(check_name)
logger.error(f"Readiness check error for {check_name}: {str(e)}")
if failed_checks:
raise SystemError(f"System readiness validation failed for: {', '.join(failed_checks)}")
logger.info("All system readiness checks passed")
async def _system_health_check(self) -> Dict[str, Any]:
"""Comprehensive system health check."""
return {
"status": self.system_status,
"uptime_seconds": (datetime.now() - self.start_time).total_seconds(),
"active_agents": len(self.agents),
"active_workflows": await self.workflow_engine.get_active_workflow_count(),
"memory_usage_mb": self._get_memory_usage(),
"cpu_usage_percent": self._get_cpu_usage()
}
async def _agents_health_check(self) -> Dict[str, Any]:
"""Health check for all deployed agents."""
agent_statuses = {}
for agent_id, agent in self.agents.items():
try:
health_status = await agent.health_check()
agent_statuses[agent_id] = {
"status": "healthy" if health_status.get("healthy", False) else "unhealthy",
"last_activity": health_status.get("last_activity"),
"processed_requests": health_status.get("processed_requests", 0),
"error_rate": health_status.get("error_rate", 0.0)
}
except Exception as e:
agent_statuses[agent_id] = {
"status": "error",
"error": str(e)
}
return agent_statuses
async def _workflows_health_check(self) -> Dict[str, Any]:
"""Health check for workflow engine."""
return {
"active_workflows": await self.workflow_engine.get_active_workflow_count(),
"completed_workflows_24h": await self.workflow_engine.get_completed_count(hours=24),
"failed_workflows_24h": await self.workflow_engine.get_failed_count(hours=24),
"average_execution_time": await self.workflow_engine.get_average_execution_time(),
"success_rate": await self.workflow_engine.get_success_rate()
}
def _get_memory_usage(self) -> float:
"""Get current memory usage in MB."""
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
def _get_cpu_usage(self) -> float:
"""Get current CPU usage percentage."""
import psutil
return psutil.cpu_percent(interval=1)
async def execute_travel_planning_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a travel planning request using the production workflow."""
workflow_context = {
"request_id": request_data.get("request_id", f"req_{int(datetime.now().timestamp())}"),
"destination": request_data.get("destination"),
"travel_dates": request_data.get("travel_dates"),
"preferences": request_data.get("preferences", {}),
"budget": request_data.get("budget"),
"group_size": request_data.get("group_size", 1)
}
try:
# Execute the workflow
result = await self.workflow_engine.execute_workflow(
workflow_id="travel_planning_production",
context=workflow_context
)
# Update metrics
self.metrics.increment_counter("travel_requests_completed")
self.metrics.record_histogram("travel_planning_duration", result.get("execution_time", 0))
return {
"status": "success",
"request_id": workflow_context["request_id"],
"results": result,
"execution_time": result.get("execution_time"),
"quality_score": result.get("quality_score", 0.0)
}
except Exception as e:
# Update error metrics
self.metrics.increment_counter("travel_requests_failed")
logger.error(f"Travel planning request failed: {str(e)}")
return {
"status": "failed",
"request_id": workflow_context["request_id"],
"error": str(e),
"retry_recommended": True
}
async def handle_customer_service_inquiry(self, inquiry_data: Dict[str, Any]) -> Dict[str, Any]:
"""Handle customer service inquiry using the production workflow."""
workflow_context = {
"inquiry_id": inquiry_data.get("inquiry_id", f"inq_{int(datetime.now().timestamp())}"),
"customer_id": inquiry_data.get("customer_id"),
"subject": inquiry_data.get("subject"),
"description": inquiry_data.get("description"),
"priority": inquiry_data.get("priority", "normal"),
"channel": inquiry_data.get("channel", "web")
}
try:
# Execute customer service workflow
result = await self.workflow_engine.execute_workflow(
workflow_id="customer_service_production",
context=workflow_context
)
# Update service metrics
self.metrics.increment_counter("customer_inquiries_processed")
self.metrics.record_histogram("customer_service_response_time", result.get("response_time", 0))
return {
"status": "resolved",
"inquiry_id": workflow_context["inquiry_id"],
"resolution": result.get("resolution"),
"satisfaction_score": result.get("satisfaction_score"),
"response_time": result.get("response_time"),
"agent_involved": result.get("assigned_agent")
}
except Exception as e:
self.metrics.increment_counter("customer_inquiries_failed")
logger.error(f"Customer service inquiry failed: {str(e)}")
return {
"status": "failed",
"inquiry_id": workflow_context["inquiry_id"],
"error": str(e),
"escalation_required": True
}
async def get_system_metrics(self) -> Dict[str, Any]:
"""Get comprehensive system metrics for monitoring dashboards."""
return {
"system_info": await self._system_health_check(),
"agent_metrics": await self._agents_health_check(),
"workflow_metrics": await self._workflows_health_check(),
"performance_metrics": {
"requests_per_minute": self.metrics.get_rate("total_requests"),
"average_response_time": self.metrics.get_average("response_time"),
"error_rate": self.metrics.get_rate("errors") / max(self.metrics.get_rate("total_requests"), 1),
"throughput": self.metrics.get_counter("completed_workflows")
},
"resource_utilization": {
"cpu_usage": self._get_cpu_usage(),
"memory_usage_mb": self._get_memory_usage(),
"active_connections": len(self.agents),
"queue_depth": await self.message_router.get_queue_depth()
}
}
async def shutdown_gracefully(self):
"""Perform graceful system shutdown."""
logger.info("Initiating graceful system shutdown")
self.system_status = "shutting_down"
try:
# Stop accepting new requests
await self.message_router.stop_accepting_requests()
# Complete active workflows
await self.workflow_engine.complete_active_workflows(timeout=300)
# Shutdown agents
for agent_id, agent in self.agents.items():
await agent.shutdown()
logger.info(f"Agent {agent_id} shut down successfully")
# Stop monitoring
await self.health_checker.stop_monitoring()
await self.metrics.stop_collection()
# Final cleanup
await self.workflow_engine.shutdown()
await self.message_router.shutdown()
await self.agent_registry.shutdown()
self.system_status = "stopped"
logger.info("System shutdown completed successfully")
except Exception as e:
logger.error(f"Error during system shutdown: {str(e)}")
self.system_status = "shutdown_error"
raise
# Production deployment example
async def deploy_production_system():
"""Deploy the complete production agent system."""
# Load production configuration
config = SystemConfiguration(
environment="production",
log_level="INFO",
auto_scaling_enabled=True,
kubernetes_config={
"namespace": "agent-system-prod",
"replicas": 5,
"cpu_request": "1000m",
"memory_request": "1Gi",
"cpu_limit": "4000m",
"memory_limit": "4Gi"
}
)
# Initialize production system
system = ProductionAgentSystem(config)
try:
# Initialize all components
await system.initialize_system()
print("🚀 Production agent system deployed successfully!")
print(f"System status: {system.system_status}")
print(f"Active agents: {len(system.agents)}")
# Example usage
print("\n📋 Testing travel planning workflow...")
travel_result = await system.execute_travel_planning_request({
"destination": "Tokyo, Japan",
"travel_dates": {"start": "2024-03-15", "end": "2024-03-20"},
"budget": 3000,
"group_size": 2
})
print(f"Travel planning result: {travel_result['status']}")
print("\n🎧 Testing customer service workflow...")
service_result = await system.handle_customer_service_inquiry({
"customer_id": "cust_12345",
"subject": "Billing inquiry",
"description": "Question about recent charges",
"priority": "normal"
})
print(f"Customer service result: {service_result['status']}")
print("\n📊 System metrics:")
metrics = await system.get_system_metrics()
print(f"System uptime: {metrics['system_info']['uptime_seconds']} seconds")
print(f"Active workflows: {metrics['workflow_metrics']['active_workflows']}")
print(f"CPU usage: {metrics['resource_utilization']['cpu_usage']:.2f}%")
print(f"Memory usage: {metrics['resource_utilization']['memory_usage_mb']:.2f} MB")
return system
except Exception as e:
print(f"❌ Production deployment failed: {str(e)}")
raise
if __name__ == "__main__":
# Run the production system deployment
asyncio.run(deploy_production_system())
Key Production Features Implemented:¶
- High Availability: Multi-replica agents with failover capabilities
- Auto-scaling: Resource-based scaling with Kubernetes integration
- Comprehensive Monitoring: Prometheus metrics, health checks, and alerting
- Fault Tolerance: Circuit breakers, retry logic, and graceful degradation
- Security: MTLS, RBAC, and secure configuration management
- Observability: Structured logging, distributed tracing, and dashboards
This production system demonstrates enterprise-grade deployment patterns with all the reliability, scalability, and monitoring capabilities required for production environments.
Comprehensive Capstone Solution Implementation¶
Challenge: Design and implement a complete production-ready multi-agent system.
Complete Solution:¶
System Architecture Overview¶
This capstone system demonstrates enterprise-grade production deployment patterns, integrating all concepts learned throughout the nano-degree program. The system implements:
- Multi-Agent Coordination: Weather, planning, customer service, data analysis, and security agents
- Production Infrastructure: Kubernetes deployment with auto-scaling and monitoring
- Enterprise Security: MTLS, RBAC, and comprehensive security policies
- Observability: Prometheus metrics, Grafana dashboards, and alerting
- CI/CD Integration: Automated testing, building, and deployment pipelines
Core System Foundation¶
Let's start by establishing the core foundation with imports and logging setup:
# production/capstone_system.py
# Core system imports for production-ready multi-agent deployment
import asyncio
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import yaml
import json
# Import our custom agent framework components
from workflows.advanced_engine import AdvancedWorkflowEngine
from a2a.router import MessageRouter
from a2a.registry import AgentRegistry
from agents.weather_agent import WeatherAgent
from agents.planning_agent import PlanningAgent
from monitoring.agent_metrics import AgentMetrics, HealthChecker
# Initialize structured logging for production monitoring
logger = logging.getLogger(__name__)
Key Learning Points: - Production systems require comprehensive logging and monitoring from the start - Type hints improve code maintainability and enable better IDE support - Modular imports allow for clean separation of concerns and easier testing
Production Configuration Management¶
Production systems need flexible, environment-aware configuration. Here's our configuration dataclass:
@dataclass
class SystemConfiguration:
"""Production system configuration with enterprise defaults."""
# Infrastructure settings - Kubernetes native
kubernetes_namespace: str = "agent-system"
redis_cluster_endpoint: str = "redis-cluster.agent-system.svc.cluster.local"
prometheus_endpoint: str = "prometheus.monitoring.svc.cluster.local:9090"
# Agent resource limits for optimal performance
max_agents_per_node: int = 10
agent_memory_limit: str = "1Gi"
agent_cpu_limit: str = "1000m"
Configuration Best Practices: - Use Kubernetes-native service discovery for endpoints - Set reasonable resource limits to prevent resource exhaustion - Environment-specific defaults that can be overridden via environment variables
# Monitoring and observability settings
metrics_port: int = 9090
health_check_interval: int = 30
log_level: str = "INFO"
# Security configuration for enterprise environments
enable_mtls: bool = True
jwt_secret_key: str = "production-secret-key"
api_rate_limit: int = 1000
Security Considerations: - Enable mutual TLS by default for secure service-to-service communication - Implement rate limiting to prevent abuse and ensure fair resource usage - Use JWT tokens for stateless authentication
# Performance tuning parameters
workflow_timeout: int = 3600
message_queue_size: int = 10000
max_concurrent_workflows: int = 100
Performance Optimization: - Set appropriate timeouts to prevent resource leaks - Size message queues based on expected load patterns - Limit concurrent workflows to maintain system stability
This comprehensive implementation demonstrates enterprise-grade production deployment patterns that integrate all nano-degree concepts into a unified, scalable system ready for production use.