Session 9 - Module B: Production Multi-Agent Systems (70 minutes)¶
Prerequisites: Session 9 Core Section Complete
Target Audience: Production engineers and DevOps teams deploying multi-agent systems
Cognitive Load: 5 production concepts
Module Overview¶
This module explores enterprise-grade production deployment patterns for multi-agent systems including distributed monitoring, scalable orchestration, container-based deployment, service mesh integration, and operational excellence practices. You'll learn to build production-ready multi-agent systems that can handle enterprise scale and operational requirements.
Learning Objectives¶
By the end of this module, you will:
- Implement comprehensive monitoring and observability for multi-agent systems
- Design scalable deployment patterns with container orchestration
- Create distributed coordination mechanisms with fault tolerance
- Build operational excellence practices for multi-agent system maintenance
Part 1: Enterprise Monitoring & Observability (40 minutes)¶
Distributed Multi-Agent Monitoring¶
Production multi-agent systems require comprehensive monitoring to track performance, health, and inter-agent communication patterns. We'll build a distributed monitoring system that can handle metrics collection, alerting, and performance analysis across multiple agents.
🗂️ File: src/session9/distributed_monitoring.py
- Comprehensive monitoring for multi-agent systems
First, let's establish the foundational imports and data structures for our monitoring system:
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncio
import json
import logging
from enum import Enum
from collections import defaultdict, deque
import time
import psutil
import aiohttp
Next, we define the core enumerations that categorize our monitoring data. These types help us organize metrics and alerts systematically:
class MetricType(Enum):
"""Types of metrics for multi-agent systems"""
COUNTER = "counter" # Cumulative values (requests, errors)
GAUGE = "gauge" # Point-in-time values (CPU, memory)
HISTOGRAM = "histogram" # Distribution of values
SUMMARY = "summary" # Quantile calculations
class AlertSeverity(Enum):
"""Alert severity levels for escalation"""
INFO = "info" # Informational notices
WARNING = "warning" # Potential issues
CRITICAL = "critical" # Service affecting
EMERGENCY = "emergency" # System-wide failure
Now we define the data structures that represent individual metrics and system alerts. These form the foundation of our monitoring data model:
@dataclass
class AgentMetric:
"""Individual agent metric with metadata"""
agent_id: str
metric_name: str
metric_type: MetricType
value: float
timestamp: datetime = field(default_factory=datetime.now)
labels: Dict[str, str] = field(default_factory=dict)
The SystemAlert class captures alert information with all necessary metadata for tracking and resolution:
@dataclass
class SystemAlert:
"""System alert with tracking and resolution information"""
alert_id: str
alert_name: str
severity: AlertSeverity
description: str
affected_agents: List[str] = field(default_factory=list)
timestamp: datetime = field(default_factory=datetime.now)
resolved: bool = False
resolution_timestamp: Optional[datetime] = None
The DistributedMetricsCollector is the core component that orchestrates monitoring across multiple agents. It manages metric collection, aggregation, and alerting in a scalable, distributed manner.
class DistributedMetricsCollector:
"""Centralized metrics collection for multi-agent systems"""
def __init__(self, collection_interval: float = 30.0):
# Core collection settings
self.collection_interval = collection_interval
self.metrics_buffer: deque = deque(maxlen=10000)
self.agent_registry: Dict[str, Dict[str, Any]] = {}
self.metric_aggregations: Dict[str, List[float]] = defaultdict(list)
self.collection_tasks: Dict[str, asyncio.Task] = {}
self.logger = logging.getLogger(__name__)
# Alerting system components
self.active_alerts: Dict[str, SystemAlert] = {}
self.alert_rules: List[Dict[str, Any]] = []
self.alert_handlers = []
The agent registration method allows new agents to join the monitoring system dynamically. This supports the elastic nature of production multi-agent systems:
async def register_agent(self, agent_id: str, agent_info: Dict[str, Any]):
"""Register agent for monitoring with health tracking"""
self.agent_registry[agent_id] = {
'agent_info': agent_info,
'registration_time': datetime.now(),
'last_heartbeat': datetime.now(),
'status': 'active',
'metrics_endpoint': agent_info.get('metrics_endpoint'),
'health_endpoint': agent_info.get('health_endpoint')
}
# Start asynchronous metric collection for this agent
if agent_id not in self.collection_tasks:
task = asyncio.create_task(self._collect_agent_metrics(agent_id))
self.collection_tasks[agent_id] = task
self.logger.info(f"Registered agent {agent_id} for monitoring")
The metric collection loop runs continuously for each registered agent, gathering both custom metrics from agent endpoints and system-level performance data:
async def _collect_agent_metrics(self, agent_id: str):
"""Continuous metric collection loop for individual agent"""
while agent_id in self.agent_registry:
try:
agent_info = self.agent_registry[agent_id]
metrics_endpoint = agent_info.get('metrics_endpoint')
# Collect custom application metrics from agent
if metrics_endpoint:
custom_metrics = await self._fetch_agent_metrics(agent_id, metrics_endpoint)
for metric in custom_metrics:
await self._process_metric(metric)
# Collect system-level performance metrics
system_metrics = await self._collect_system_metrics(agent_id)
for metric in system_metrics:
await self._process_metric(metric)
# Update agent heartbeat and status
self.agent_registry[agent_id]['last_heartbeat'] = datetime.now()
self.agent_registry[agent_id]['status'] = 'active'
except Exception as e:
self.logger.error(f"Failed to collect metrics from agent {agent_id}: {e}")
self.agent_registry[agent_id]['status'] = 'unhealthy'
await self._generate_agent_health_alert(agent_id, str(e))
await asyncio.sleep(self.collection_interval)
The HTTP-based metric fetching method communicates with individual agents to collect their custom application metrics:
async def _fetch_agent_metrics(self, agent_id: str, endpoint: str) -> List[AgentMetric]:
"""Fetch custom metrics from agent HTTP endpoint"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, timeout=10) as response:
if response.status == 200:
data = await response.json()
# Parse metrics from agent response
metrics = []
for metric_data in data.get('metrics', []):
metric = AgentMetric(
agent_id=agent_id,
metric_name=metric_data['name'],
metric_type=MetricType(metric_data.get('type', 'gauge')),
value=float(metric_data['value']),
labels=metric_data.get('labels', {})
)
metrics.append(metric)
return metrics
except Exception as e:
self.logger.warning(f"Failed to fetch metrics from {endpoint}: {e}")
return [] # Return empty list on failure
For system-level monitoring, we collect infrastructure metrics like CPU, memory, and network usage that apply to all agents:
async def _collect_system_metrics(self, agent_id: str) -> List[AgentMetric]:
"""Collect system-level performance metrics"""
# Note: In production, this would collect from the specific agent process
# Here we demonstrate with local system metrics as an example
metrics = []
# CPU usage monitoring
cpu_usage = psutil.cpu_percent()
metrics.append(AgentMetric(
agent_id=agent_id,
metric_name="system_cpu_usage",
metric_type=MetricType.GAUGE,
value=cpu_usage,
labels={"unit": "percent"}
))
# Memory usage tracking
memory = psutil.virtual_memory()
metrics.append(AgentMetric(
agent_id=agent_id,
metric_name="system_memory_usage",
metric_type=MetricType.GAUGE,
value=memory.percent,
labels={"unit": "percent"}
))
Network I/O metrics help track inter-agent communication patterns and potential bottlenecks:
# Network I/O monitoring for communication patterns
network = psutil.net_io_counters()
metrics.append(AgentMetric(
agent_id=agent_id,
metric_name="network_bytes_sent",
metric_type=MetricType.COUNTER,
value=float(network.bytes_sent),
labels={"direction": "sent"}
))
metrics.append(AgentMetric(
agent_id=agent_id,
metric_name="network_bytes_recv",
metric_type=MetricType.COUNTER,
value=float(network.bytes_recv),
labels={"direction": "received"}
))
return metrics
The metric processing pipeline handles storage, aggregation, and real-time alert evaluation for each incoming metric:
async def _process_metric(self, metric: AgentMetric):
"""Process, store, and evaluate metric for alerts"""
# Store in circular buffer for recent access
self.metrics_buffer.append(metric)
# Update rolling aggregations for trend analysis
metric_key = f"{metric.agent_id}:{metric.metric_name}"
self.metric_aggregations[metric_key].append(metric.value)
# Maintain sliding window of recent values
if len(self.metric_aggregations[metric_key]) > 100:
self.metric_aggregations[metric_key] = \
self.metric_aggregations[metric_key][-50:]
# Real-time alert rule evaluation
await self._check_alert_rules(metric)
The alert rule checking system evaluates each metric against configured alert conditions:
async def _check_alert_rules(self, metric: AgentMetric):
"""Evaluate metric against all configured alert rules"""
for rule in self.alert_rules:
if await self._evaluate_alert_rule(rule, metric):
await self._trigger_alert(rule, metric)
async def _check_alert_rules(self, metric: AgentMetric):
"""Check if metric triggers any alert rules"""
for rule in self.alert_rules:
if await self._evaluate_alert_rule(rule, metric):
await self._trigger_alert(rule, metric)
Alert rule evaluation supports multiple condition types including threshold-based and rate-of-change detection:
async def _evaluate_alert_rule(self, rule: Dict[str, Any], metric: AgentMetric) -> bool:
"""Evaluate if metric satisfies alert rule condition"""
# Filter rules by metric name and agent
if rule.get('metric_name') and rule['metric_name'] != metric.metric_name:
return False
if rule.get('agent_id') and rule['agent_id'] != metric.agent_id:
return False
# Evaluate different condition types
condition_type = rule.get('condition_type', 'threshold')
if condition_type == 'threshold':
return self._evaluate_threshold_condition(rule, metric)
elif condition_type == 'rate_of_change':
return self._evaluate_rate_condition(rule, metric)
return False
Threshold evaluation handles various comparison operators:
def _evaluate_threshold_condition(self, rule: Dict[str, Any], metric: AgentMetric) -> bool:
"""Evaluate threshold-based alert conditions"""
operator = rule.get('operator', '>')
threshold = rule.get('threshold', 0)
if operator == '>':
return metric.value > threshold
elif operator == '<':
return metric.value < threshold
elif operator == '>=':
return metric.value >= threshold
elif operator == '<=':
return metric.value <= threshold
elif operator == '==':
return metric.value == threshold
elif operator == '!=':
return metric.value != threshold
return False
Rate-of-change detection identifies trends in metric values:
def _evaluate_rate_condition(self, rule: Dict[str, Any], metric: AgentMetric) -> bool:
"""Evaluate rate-of-change alert conditions"""
metric_key = f"{metric.agent_id}:{metric.metric_name}"
recent_values = self.metric_aggregations.get(metric_key, [])
if len(recent_values) >= 2:
rate = (recent_values[-1] - recent_values[-2]) / self.collection_interval
return rate > rule.get('rate_threshold', 0)
return False
When an alert condition is met, the system creates and distributes alert notifications to all registered handlers:
async def _trigger_alert(self, rule: Dict[str, Any], metric: AgentMetric):
"""Create and distribute alert notifications"""
alert_id = f"{rule['name']}:{metric.agent_id}:{metric.metric_name}"
# Prevent duplicate alerts for the same condition
if alert_id in self.active_alerts and not self.active_alerts[alert_id].resolved:
return # Alert already active
# Create alert with contextual information
alert = SystemAlert(
alert_id=alert_id,
alert_name=rule['name'],
severity=AlertSeverity(rule.get('severity', 'warning')),
description=rule['description'].format(
agent_id=metric.agent_id,
metric_name=metric.metric_name,
value=metric.value
),
affected_agents=[metric.agent_id]
)
self.active_alerts[alert_id] = alert
# Distribute to all registered alert handlers
for handler in self.alert_handlers:
try:
await handler(alert)
except Exception as e:
self.logger.error(f"Alert handler failed: {e}")
self.logger.warning(f"Alert triggered: {alert.alert_name} for agent {metric.agent_id}")
Special health monitoring generates alerts when agents become unresponsive or experience critical errors:
async def _generate_agent_health_alert(self, agent_id: str, error: str):
"""Generate critical health alerts for agent failures"""
alert_id = f"agent_health:{agent_id}"
# Only create new alert if not already active
if alert_id not in self.active_alerts or self.active_alerts[alert_id].resolved:
alert = SystemAlert(
alert_id=alert_id,
alert_name="Agent Health Issue",
severity=AlertSeverity.CRITICAL,
description=f"Agent {agent_id} is experiencing health issues: {error}",
affected_agents=[agent_id]
)
self.active_alerts[alert_id] = alert
# Notify all handlers of critical health issue
for handler in self.alert_handlers:
try:
await handler(alert)
except Exception as e:
self.logger.error(f"Alert handler failed: {e}")
The management interface allows dynamic configuration of alert rules and handlers:
def add_alert_rule(self, rule: Dict[str, Any]):
"""Add new alert rule with validation"""
required_fields = ['name', 'description', 'condition_type']
if not all(field in rule for field in required_fields):
raise ValueError(f"Alert rule must contain: {required_fields}")
self.alert_rules.append(rule)
self.logger.info(f"Added alert rule: {rule['name']}")
def add_alert_handler(self, handler: callable):
"""Register new alert handler function"""
self.alert_handlers.append(handler)
The system overview provides a comprehensive dashboard view of the entire multi-agent system health:
def get_system_overview(self) -> Dict[str, Any]:
"""Generate comprehensive system health dashboard"""
now = datetime.now()
# Calculate agent status distribution
agent_status = {
'total_agents': len(self.agent_registry),
'active_agents': sum(1 for info in self.agent_registry.values()
if info['status'] == 'active'),
'unhealthy_agents': sum(1 for info in self.agent_registry.values()
if info['status'] == 'unhealthy'),
'agents_detail': {}
}
Detailed agent information includes heartbeat tracking for availability monitoring:
# Build detailed agent status with heartbeat info
for agent_id, info in self.agent_registry.items():
last_heartbeat = info['last_heartbeat']
heartbeat_age = (now - last_heartbeat).total_seconds()
agent_status['agents_detail'][agent_id] = {
'status': info['status'],
'last_heartbeat_seconds_ago': heartbeat_age,
'registration_time': info['registration_time'].isoformat()
}
Alert summary provides operational visibility into system issues:
# Categorize alerts by severity for operational priorities
alert_summary = {
'total_alerts': len(self.active_alerts),
'active_alerts': sum(1 for alert in self.active_alerts.values()
if not alert.resolved),
'critical_alerts': sum(1 for alert in self.active_alerts.values()
if alert.severity == AlertSeverity.CRITICAL and not alert.resolved),
'warning_alerts': sum(1 for alert in self.active_alerts.values()
if alert.severity == AlertSeverity.WARNING and not alert.resolved)
}
# Metrics collection statistics
metrics_summary = {
'total_metrics_collected': len(self.metrics_buffer),
'unique_metric_types': len(set(f"{metric.agent_id}:{metric.metric_name}"
for metric in self.metrics_buffer)),
'collection_interval_seconds': self.collection_interval
}
return {
'system_overview': {
'timestamp': now.isoformat(),
'agents': agent_status,
'alerts': alert_summary,
'metrics': metrics_summary
}
}
Performance reporting provides historical analysis and trend identification for capacity planning:
async def generate_performance_report(self, time_window_hours: int = 24) -> Dict[str, Any]:
"""Generate comprehensive performance analysis report"""
cutoff_time = datetime.now() - timedelta(hours=time_window_hours)
# Filter metrics to analysis time window
recent_metrics = [metric for metric in self.metrics_buffer
if metric.timestamp >= cutoff_time]
# Calculate per-agent performance metrics
agent_performance = {}
for agent_id in self.agent_registry.keys():
agent_metrics = [m for m in recent_metrics if m.agent_id == agent_id]
if agent_metrics:
agent_performance[agent_id] = self._analyze_agent_performance(agent_id, agent_metrics)
The agent performance analysis calculates key performance indicators for operational insight:
def _analyze_agent_performance(self, agent_id: str, agent_metrics: List[AgentMetric]) -> Dict[str, Any]:
"""Analyze individual agent performance metrics"""
# Extract performance metrics by type
cpu_metrics = [m.value for m in agent_metrics if m.metric_name == 'system_cpu_usage']
memory_metrics = [m.value for m in agent_metrics if m.metric_name == 'system_memory_usage']
return {
'total_metrics': len(agent_metrics),
'avg_cpu_usage': sum(cpu_metrics) / len(cpu_metrics) if cpu_metrics else 0,
'max_cpu_usage': max(cpu_metrics) if cpu_metrics else 0,
'avg_memory_usage': sum(memory_metrics) / len(memory_metrics) if memory_metrics else 0,
'max_memory_usage': max(memory_metrics) if memory_metrics else 0,
'health_score': self._calculate_agent_health_score(agent_id, agent_metrics)
}
System-wide trend analysis identifies patterns across the entire multi-agent system:
# Calculate system-wide performance trends
system_trends = {
'total_metrics_in_window': len(recent_metrics),
'average_system_cpu': self._calculate_average_metric(recent_metrics, 'system_cpu_usage'),
'average_system_memory': self._calculate_average_metric(recent_metrics, 'system_memory_usage')
}
return {
'performance_report': {
'time_window_hours': time_window_hours,
'report_timestamp': datetime.now().isoformat(),
'agent_performance': agent_performance,
'system_trends': system_trends,
'recommendations': self._generate_performance_recommendations(agent_performance)
}
}
def _calculate_average_metric(self, metrics: List[AgentMetric], metric_name: str) -> float:
"""Calculate average value for specific metric type"""
values = [m.value for m in metrics if m.metric_name == metric_name]
return sum(values) / len(values) if values else 0
The health scoring algorithm provides a composite view of agent wellness based on multiple factors:
def _calculate_agent_health_score(self, agent_id: str,
recent_metrics: List[AgentMetric]) -> float:
"""Calculate composite health score (0-100) for agent"""
if not recent_metrics:
return 0.0
# Start with perfect health score
health_score = 100.0
# Factor 1: CPU utilization impact
cpu_metrics = [m.value for m in recent_metrics if m.metric_name == 'system_cpu_usage']
if cpu_metrics:
avg_cpu = sum(cpu_metrics) / len(cpu_metrics)
if avg_cpu > 80:
health_score -= 30 # Critical CPU usage
elif avg_cpu > 60:
health_score -= 15 # High CPU usage
# Factor 2: Memory utilization impact
memory_metrics = [m.value for m in recent_metrics if m.metric_name == 'system_memory_usage']
if memory_metrics:
avg_memory = sum(memory_metrics) / len(memory_metrics)
if avg_memory > 85:
health_score -= 25 # Critical memory usage
elif avg_memory > 70:
health_score -= 10 # High memory usage
Additional health factors include agent status and active alerts:
# Factor 3: Agent operational status
agent_info = self.agent_registry.get(agent_id, {})
if agent_info.get('status') != 'active':
health_score -= 50 # Major penalty for non-active agents
# Factor 4: Active alert impact
agent_alerts = [alert for alert in self.active_alerts.values()
if agent_id in alert.affected_agents and not alert.resolved]
health_score -= len(agent_alerts) * 10 # 10 points per active alert
return max(0.0, min(100.0, health_score)) # Clamp to 0-100 range
Performance recommendations provide actionable insights for system optimization:
def _generate_performance_recommendations(self,
agent_performance: Dict[str, Any]) -> List[str]:
"""Generate actionable performance optimization recommendations"""
recommendations = []
# Analyze each agent's performance patterns
for agent_id, perf in agent_performance.items():
if perf['health_score'] < 70:
recommendations.append(
f"Agent {agent_id} has low health score ({perf['health_score']:.1f}), investigate resource usage"
)
if perf['avg_cpu_usage'] > 70:
recommendations.append(
f"Agent {agent_id} showing high CPU usage ({perf['avg_cpu_usage']:.1f}%), consider scaling"
)
if perf['avg_memory_usage'] > 80:
recommendations.append(
f"Agent {agent_id} showing high memory usage ({perf['avg_memory_usage']:.1f}%), check for memory leaks"
)
# System-wide health analysis
unhealthy_count = sum(1 for perf in agent_performance.values() if perf['health_score'] < 70)
if unhealthy_count > len(agent_performance) * 0.3:
recommendations.append(
"More than 30% of agents are unhealthy, investigate system-wide issues"
)
return recommendations
Distributed tracing tracks request flows and interactions across multiple agents, enabling performance analysis and debugging of complex multi-agent workflows.
class DistributedTracing:
"""Distributed tracing for multi-agent interaction analysis"""
def __init__(self):
self.traces: Dict[str, Dict[str, Any]] = {} # Active traces
self.span_buffer: deque = deque(maxlen=10000) # Recent spans
Starting a new trace creates a unique identifier that follows a request through multiple agents:
async def start_trace(self, trace_id: str, operation: str,
agent_id: str, metadata: Dict[str, Any] = None) -> str:
"""Start a new distributed trace span"""
# Generate unique span identifier
span_id = f"{trace_id}:{agent_id}:{int(time.time() * 1000)}"
# Create span with timing and context information
span = {
'trace_id': trace_id,
'span_id': span_id,
'parent_span_id': None,
'operation': operation,
'agent_id': agent_id,
'start_time': datetime.now(),
'end_time': None,
'duration_ms': None,
'status': 'active',
'metadata': metadata or {},
'logs': []
}
# Initialize trace if not exists
if trace_id not in self.traces:
self.traces[trace_id] = {
'trace_id': trace_id,
'start_time': datetime.now(),
'spans': {}
}
# Store span in trace and buffer
self.traces[trace_id]['spans'][span_id] = span
self.span_buffer.append(span)
return span_id
Span completion records timing and outcome information for performance analysis:
async def finish_span(self, span_id: str, status: str = 'completed',
metadata: Dict[str, Any] = None):
"""Complete a trace span with timing and status"""
# Locate span across all active traces
span = None
for trace in self.traces.values():
if span_id in trace['spans']:
span = trace['spans'][span_id]
break
if span:
# Record completion timing
span['end_time'] = datetime.now()
span['duration_ms'] = (span['end_time'] - span['start_time']).total_seconds() * 1000
span['status'] = status
# Merge additional metadata
if metadata:
span['metadata'].update(metadata)
Trace analysis provides insights into multi-agent workflow performance and bottlenecks:
def get_trace_analysis(self, trace_id: str) -> Dict[str, Any]:
"""Analyze trace for performance insights and bottlenecks"""
if trace_id not in self.traces:
return {'error': 'Trace not found'}
trace = self.traces[trace_id]
spans = trace['spans']
# Filter to completed spans for analysis
completed_spans = [s for s in spans.values() if s['status'] == 'completed']
if not completed_spans:
return {'error': 'No completed spans in trace'}
# Calculate timing statistics
total_duration = max(s['duration_ms'] for s in completed_spans)
avg_duration = sum(s['duration_ms'] for s in completed_spans) / len(completed_spans)
Agent participation analysis shows how work is distributed across the multi-agent system:
# Analyze agent participation patterns
agent_participation = {}
for span in completed_spans:
agent_id = span['agent_id']
if agent_id not in agent_participation:
agent_participation[agent_id] = {
'span_count': 0,
'total_duration_ms': 0,
'operations': []
}
# Aggregate agent statistics
agent_participation[agent_id]['span_count'] += 1
agent_participation[agent_id]['total_duration_ms'] += span['duration_ms']
agent_participation[agent_id]['operations'].append(span['operation'])
return {
'trace_id': trace_id,
'total_spans': len(spans),
'completed_spans': len(completed_spans),
'total_duration_ms': total_duration,
'average_span_duration_ms': avg_duration,
'agent_participation': agent_participation,
'critical_path': self._find_critical_path(completed_spans)
}
Critical path analysis identifies the longest-running operations that determine overall trace duration:
def _find_critical_path(self, spans: List[Dict[str, Any]]) -> List[str]:
"""Identify critical path through trace spans"""
# Simplified critical path - longest duration spans
sorted_spans = sorted(spans, key=lambda s: s['duration_ms'], reverse=True)
# Return top 3 longest operations as critical path indicators
return [f"{s['agent_id']}:{s['operation']}" for s in sorted_spans[:3]]
Part 2: Scalable Deployment Patterns (30 minutes)¶
Container Orchestration and Service Mesh¶
Production multi-agent systems require sophisticated deployment orchestration to handle scaling, service discovery, and inter-agent communication. We'll implement Kubernetes-native deployment patterns with service mesh integration.
🗂️ File: src/session9/scalable_deployment.py
- Production deployment patterns
First, we establish the imports and configuration structures for container orchestration:
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
import yaml
import json
import asyncio
from datetime import datetime
The ContainerConfig dataclass defines the deployment parameters for individual agents:
@dataclass
class ContainerConfig:
"""Container configuration for agent deployment"""
image: str # Container image name
tag: str = "latest" # Image tag/version
resources: Dict[str, str] = field(default_factory=dict) # CPU/memory limits
environment: Dict[str, str] = field(default_factory=dict) # Environment variables
ports: List[int] = field(default_factory=list) # Exposed ports
health_check: Dict[str, Any] = field(default_factory=dict) # Health check config
The KubernetesDeploymentManager generates production-ready Kubernetes manifests for multi-agent systems:
class KubernetesDeploymentManager:
"""Kubernetes deployment manager for multi-agent systems"""
def __init__(self, namespace: str = "multi-agents"):
self.namespace = namespace
self.deployment_templates = {}
Agent deployment generation creates complete Kubernetes Deployment manifests with all necessary configurations:
def generate_agent_deployment(self, agent_id: str,
config: ContainerConfig,
replicas: int = 3) -> Dict[str, Any]:
"""Generate complete Kubernetes Deployment for agent"""
# Base deployment structure
deployment = {
'apiVersion': 'apps/v1',
'kind': 'Deployment',
'metadata': self._build_metadata(agent_id),
'spec': self._build_deployment_spec(agent_id, config, replicas)
}
return deployment
def _build_metadata(self, agent_id: str) -> Dict[str, Any]:
"""Build deployment metadata with proper labeling"""
return {
'name': f'agent-{agent_id}',
'namespace': self.namespace,
'labels': {
'app': 'multi-agent-system',
'agent-id': agent_id,
'component': 'agent'
}
}
The deployment specification includes replica management and pod template configuration:
def _build_deployment_spec(self, agent_id: str, config: ContainerConfig, replicas: int) -> Dict[str, Any]:
"""Build deployment specification with scaling and pod template"""
return {
'replicas': replicas,
'selector': {
'matchLabels': {
'app': 'multi-agent-system',
'agent-id': agent_id
}
},
'template': {
'metadata': {
'labels': {
'app': 'multi-agent-system',
'agent-id': agent_id,
'component': 'agent'
}
},
'spec': self._build_pod_spec(agent_id, config)
}
}
The pod specification defines container configuration with resource limits and health checks:
def _build_pod_spec(self, agent_id: str, config: ContainerConfig) -> Dict[str, Any]:
"""Build pod specification with container and resource configuration"""
return {
'containers': [{
'name': f'agent-{agent_id}',
'image': f'{config.image}:{config.tag}',
'ports': [{'containerPort': port} for port in config.ports],
'env': self._build_environment_vars(agent_id, config),
'resources': self._build_resource_spec(config),
'livenessProbe': self._build_liveness_probe(config),
'readinessProbe': self._build_readiness_probe(config)
}],
'serviceAccountName': 'multi-agent-service-account'
}
Environment variables provide runtime configuration and service discovery information:
def _build_environment_vars(self, agent_id: str, config: ContainerConfig) -> List[Dict[str, str]]:
"""Build environment variables for agent runtime configuration"""
base_env = [
{'name': 'AGENT_ID', 'value': agent_id},
{'name': 'NAMESPACE', 'value': self.namespace}
]
# Add custom environment variables
custom_env = [{'name': k, 'value': v} for k, v in config.environment.items()]
return base_env + custom_env
def _build_resource_spec(self, config: ContainerConfig) -> Dict[str, Any]:
"""Build resource requests and limits for proper scheduling"""
return {
'requests': {
'cpu': config.resources.get('cpu_request', '100m'),
'memory': config.resources.get('memory_request', '256Mi')
},
'limits': {
'cpu': config.resources.get('cpu_limit', '500m'),
'memory': config.resources.get('memory_limit', '1Gi')
}
}
Health check configuration ensures reliable service operation and automatic recovery:
def _build_liveness_probe(self, config: ContainerConfig) -> Dict[str, Any]:
"""Build liveness probe for automatic restart on failure"""
return {
'httpGet': {
'path': config.health_check.get('path', '/health'),
'port': config.health_check.get('port', 8080)
},
'initialDelaySeconds': 30,
'periodSeconds': 10
}
def _build_readiness_probe(self, config: ContainerConfig) -> Dict[str, Any]:
"""Build readiness probe for traffic routing control"""
return {
'httpGet': {
'path': config.health_check.get('readiness_path', '/ready'),
'port': config.health_check.get('port', 8080)
},
'initialDelaySeconds': 5,
'periodSeconds': 5
}
Service configuration enables network access and load balancing for agent pods:
def generate_service_configuration(self, agent_id: str,
ports: List[int]) -> Dict[str, Any]:
"""Generate Kubernetes Service for agent network access"""
service = {
'apiVersion': 'v1',
'kind': 'Service',
'metadata': {
'name': f'agent-{agent_id}-service',
'namespace': self.namespace,
'labels': {
'app': 'multi-agent-system',
'agent-id': agent_id
}
},
'spec': {
'selector': {
'app': 'multi-agent-system',
'agent-id': agent_id
},
'ports': [
{
'name': f'port-{port}',
'port': port,
'targetPort': port,
'protocol': 'TCP'
}
for port in ports
],
'type': 'ClusterIP' # Internal cluster access only
}
}
return service
Horizontal Pod Autoscaler (HPA) enables automatic scaling based on resource utilization:
def generate_hpa_configuration(self, agent_id: str,
min_replicas: int = 2,
max_replicas: int = 10,
target_cpu: int = 70) -> Dict[str, Any]:
"""Generate HPA for automatic agent scaling based on metrics"""
hpa = {
'apiVersion': 'autoscaling/v2',
'kind': 'HorizontalPodAutoscaler',
'metadata': {
'name': f'agent-{agent_id}-hpa',
'namespace': self.namespace
},
'spec': {
'scaleTargetRef': {
'apiVersion': 'apps/v1',
'kind': 'Deployment',
'name': f'agent-{agent_id}'
},
'minReplicas': min_replicas,
'maxReplicas': max_replicas,
'metrics': self._build_scaling_metrics(target_cpu)
}
}
return hpa
def _build_scaling_metrics(self, target_cpu: int) -> List[Dict[str, Any]]:
"""Build scaling metrics for CPU and memory utilization"""
return [
{
'type': 'Resource',
'resource': {
'name': 'cpu',
'target': {
'type': 'Utilization',
'averageUtilization': target_cpu
}
}
},
{
'type': 'Resource',
'resource': {
'name': 'memory',
'target': {
'type': 'Utilization',
'averageUtilization': 80
}
}
}
]
The complete manifest generation combines all components into deployable YAML:
def generate_complete_agent_manifests(self, agent_id: str,
config: ContainerConfig,
scaling_config: Dict[str, Any] = None) -> str:
"""Generate complete Kubernetes manifests for production deployment"""
scaling_config = scaling_config or {}
# Generate all required Kubernetes resources
deployment = self.generate_agent_deployment(
agent_id, config,
scaling_config.get('initial_replicas', 3)
)
service = self.generate_service_configuration(agent_id, config.ports)
hpa = self.generate_hpa_configuration(
agent_id,
scaling_config.get('min_replicas', 2),
scaling_config.get('max_replicas', 10),
scaling_config.get('target_cpu', 70)
)
# Combine all manifests into single deployable YAML
manifests = [deployment, service, hpa]
yaml_output = '\n---\n'.join(yaml.dump(manifest) for manifest in manifests)
return yaml_output
Service mesh integration provides advanced traffic management, security, and observability for multi-agent communication.
class ServiceMeshIntegration:
"""Service mesh integration for advanced multi-agent networking"""
def __init__(self, mesh_type: str = "istio"):
self.mesh_type = mesh_type
Virtual Services define advanced routing rules for inter-agent communication:
def generate_virtual_service(self, agent_id: str,
routing_rules: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate Istio VirtualService for advanced routing"""
virtual_service = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'VirtualService',
'metadata': {
'name': f'agent-{agent_id}-vs',
'namespace': 'multi-agents'
},
'spec': {
'hosts': [f'agent-{agent_id}-service'],
'http': self._build_http_routes(agent_id, routing_rules)
}
}
return virtual_service
HTTP routing rules enable sophisticated traffic patterns including fault injection and retry policies:
def _build_http_routes(self, agent_id: str, routing_rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Build HTTP routing rules with fault tolerance"""
http_routes = []
for rule in routing_rules:
http_rule = {
'match': [
{
'headers': rule.get('headers', {}),
'uri': {
'prefix': rule.get('path_prefix', '/')
}
}
],
'route': [
{
'destination': {
'host': f'agent-{agent_id}-service',
'port': {
'number': rule.get('port', 8080)
}
},
'weight': rule.get('weight', 100)
}
]
}
# Add fault injection for chaos engineering
if rule.get('fault_injection'):
http_rule['fault'] = rule['fault_injection']
# Add retry policy for resilience
if rule.get('retry_policy'):
http_rule['retries'] = rule['retry_policy']
http_routes.append(http_rule)
return http_routes
Destination Rules define traffic policies including load balancing and circuit breaking:
def generate_destination_rule(self, agent_id: str,
load_balancer_policy: str = "ROUND_ROBIN") -> Dict[str, Any]:
"""Generate DestinationRule for traffic policy and resilience"""
destination_rule = {
'apiVersion': 'networking.istio.io/v1beta1',
'kind': 'DestinationRule',
'metadata': {
'name': f'agent-{agent_id}-dr',
'namespace': 'multi-agents'
},
'spec': {
'host': f'agent-{agent_id}-service',
'trafficPolicy': self._build_traffic_policy(load_balancer_policy)
}
}
return destination_rule
def _build_traffic_policy(self, load_balancer_policy: str) -> Dict[str, Any]:
"""Build comprehensive traffic policy with resilience patterns"""
return {
'loadBalancer': {
'simple': load_balancer_policy
},
'connectionPool': {
'tcp': {
'maxConnections': 100
},
'http': {
'http1MaxPendingRequests': 50,
'maxRequestsPerConnection': 10
}
},
'outlierDetection': {
'consecutiveErrors': 5,
'interval': '30s',
'baseEjectionTime': '30s',
'maxEjectionPercent': 50,
'minHealthPercent': 30
}
}
Peer Authentication enforces mutual TLS for secure inter-agent communication:
def generate_peer_authentication(self, namespace: str = "multi-agents") -> Dict[str, Any]:
"""Generate PeerAuthentication for mandatory mutual TLS"""
peer_auth = {
'apiVersion': 'security.istio.io/v1beta1',
'kind': 'PeerAuthentication',
'metadata': {
'name': 'multi-agent-mtls',
'namespace': namespace
},
'spec': {
'mtls': {
'mode': 'STRICT' # Enforce mTLS for all communication
}
}
}
return peer_auth
The MultiAgentOrchestrator provides a high-level interface for deploying complete multi-agent systems with all necessary infrastructure:
class MultiAgentOrchestrator:
"""High-level orchestrator for complete multi-agent system deployment"""
def __init__(self):
self.k8s_manager = KubernetesDeploymentManager()
self.service_mesh = ServiceMeshIntegration()
self.deployed_agents: Dict[str, Dict[str, Any]] = {}
The system deployment orchestrates all components for a production-ready multi-agent system:
async def deploy_multi_agent_system(self, system_config: Dict[str, Any]) -> Dict[str, Any]:
"""Deploy complete multi-agent system with all infrastructure"""
# Initialize deployment tracking
deployment_results = {
'deployment_id': f"deploy-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
'agents': {},
'service_mesh_configs': {},
'monitoring_setup': {},
'success': True,
'errors': []
}
try:
# Deploy individual agents
await self._deploy_agents(system_config, deployment_results)
# Configure service mesh
await self._configure_service_mesh(system_config, deployment_results)
# Setup monitoring infrastructure
monitoring_config = await self._setup_system_monitoring(system_config)
deployment_results['monitoring_setup'] = monitoring_config
except Exception as e:
deployment_results['success'] = False
deployment_results['errors'].append(str(e))
return deployment_results
Agent deployment processes each agent configuration and generates the necessary Kubernetes resources:
async def _deploy_agents(self, system_config: Dict[str, Any], deployment_results: Dict[str, Any]):
"""Deploy individual agents with Kubernetes manifests"""
for agent_config in system_config.get('agents', []):
agent_id = agent_config['agent_id']
# Build container configuration
container_config = ContainerConfig(
image=agent_config['image'],
tag=agent_config.get('tag', 'latest'),
resources=agent_config.get('resources', {}),
environment=agent_config.get('environment', {}),
ports=agent_config.get('ports', [8080]),
health_check=agent_config.get('health_check', {})
)
# Generate complete Kubernetes manifests
manifests = self.k8s_manager.generate_complete_agent_manifests(
agent_id, container_config, agent_config.get('scaling', {})
)
deployment_results['agents'][agent_id] = {
'manifests': manifests,
'status': 'deployed',
'replicas': agent_config.get('scaling', {}).get('initial_replicas', 3)
}
Service mesh configuration enables advanced networking and security features:
async def _configure_service_mesh(self, system_config: Dict[str, Any], deployment_results: Dict[str, Any]):
"""Configure service mesh for inter-agent communication"""
# Configure per-agent service mesh policies
for agent_config in system_config.get('agents', []):
agent_id = agent_config['agent_id']
routing_rules = agent_config.get('routing_rules', [])
if routing_rules:
virtual_service = self.service_mesh.generate_virtual_service(
agent_id, routing_rules
)
destination_rule = self.service_mesh.generate_destination_rule(
agent_id, agent_config.get('load_balancer', 'ROUND_ROBIN')
)
deployment_results['service_mesh_configs'][agent_id] = {
'virtual_service': virtual_service,
'destination_rule': destination_rule
}
# Configure global security policies
peer_auth = self.service_mesh.generate_peer_authentication()
deployment_results['service_mesh_configs']['global'] = {
'peer_authentication': peer_auth
}
Monitoring setup establishes comprehensive observability for the multi-agent system:
async def _setup_system_monitoring(self, system_config: Dict[str, Any]) -> Dict[str, Any]:
"""Setup comprehensive monitoring and observability stack"""
monitoring_setup = {
'prometheus_config': self._generate_prometheus_config(),
'grafana_dashboards': self._generate_grafana_dashboards(),
'alert_rules': self._generate_alert_rules(),
'service_monitor': self._generate_service_monitor()
}
return monitoring_setup
Prometheus configuration enables automatic discovery and scraping of agent metrics:
def _generate_prometheus_config(self) -> Dict[str, Any]:
"""Generate Prometheus configuration for automatic metrics collection"""
return {
'scrape_configs': [
{
'job_name': 'multi-agent-system',
'kubernetes_sd_configs': [
{
'role': 'pod',
'namespaces': {
'names': ['multi-agents']
}
}
],
'relabel_configs': [
{
'source_labels': ['__meta_kubernetes_pod_label_app'],
'action': 'keep',
'regex': 'multi-agent-system'
}
]
}
]
}
Grafana dashboards provide visual monitoring and operational insights:
def _generate_grafana_dashboards(self) -> List[Dict[str, Any]]:
"""Generate comprehensive Grafana dashboards for operational visibility"""
dashboard = {
'dashboard': {
'title': 'Multi-Agent System Overview',
'panels': [
self._build_health_panel(),
self._build_cpu_panel(),
self._build_memory_panel(),
self._build_communication_panel()
]
}
}
return [dashboard]
def _build_health_panel(self) -> Dict[str, Any]:
"""Build agent health status panel"""
return {
'title': 'Agent Health Status',
'type': 'stat',
'targets': [
{
'expr': 'up{job="multi-agent-system"}',
'legendFormat': '{{agent_id}}'
}
]
}
def _build_communication_panel(self) -> Dict[str, Any]:
"""Build inter-agent communication monitoring panel"""
return {
'title': 'Inter-Agent Communication',
'type': 'graph',
'targets': [
{
'expr': 'rate(agent_messages_sent_total{job="multi-agent-system"}[5m])',
'legendFormat': '{{agent_id}} sent'
},
{
'expr': 'rate(agent_messages_received_total{job="multi-agent-system"}[5m])',
'legendFormat': '{{agent_id}} received'
}
]
}
Alert rules provide proactive notification of system issues:
def _generate_alert_rules(self) -> List[Dict[str, Any]]:
"""Generate comprehensive alerting rules for system health"""
return [
{
'alert': 'AgentDown',
'expr': 'up{job="multi-agent-system"} == 0',
'for': '1m',
'labels': {'severity': 'critical'},
'annotations': {
'summary': 'Agent {{$labels.agent_id}} is down',
'description': 'Agent {{$labels.agent_id}} has been down for more than 1 minute'
}
},
{
'alert': 'HighCpuUsage',
'expr': 'system_cpu_usage{job="multi-agent-system"} > 85',
'for': '5m',
'labels': {'severity': 'warning'},
'annotations': {
'summary': 'High CPU usage on agent {{$labels.agent_id}}',
'description': 'Agent {{$labels.agent_id}} CPU usage is {{$value}}%'
}
}
]
ServiceMonitor enables automatic Prometheus scraping configuration:
def _generate_service_monitor(self) -> Dict[str, Any]:
"""Generate ServiceMonitor for automatic Prometheus discovery"""
return {
'apiVersion': 'monitoring.coreos.com/v1',
'kind': 'ServiceMonitor',
'metadata': {
'name': 'multi-agent-system',
'namespace': 'multi-agents'
},
'spec': {
'selector': {
'matchLabels': {
'app': 'multi-agent-system'
}
},
'endpoints': [
{
'port': 'metrics',
'interval': '30s',
'path': '/metrics'
}
]
}
}
Module Summary¶
You've now mastered production deployment patterns for multi-agent systems:
✅ Distributed Monitoring: Implemented comprehensive monitoring with metrics, alerts, and tracing
✅ Container Orchestration: Built Kubernetes deployment patterns with auto-scaling
✅ Service Mesh Integration: Created Istio configurations for secure inter-agent communication
✅ Operational Excellence: Designed monitoring dashboards and alerting systems
✅ Production Readiness: Built complete deployment orchestration with fault tolerance
Next Steps¶
- Return to Core: Session 9 Main
- Continue to Module A: Advanced Consensus Algorithms
- Next Session: Session 10 - Enterprise Integration
🗂️ Source Files for Module B:
src/session9/distributed_monitoring.py
- Enterprise monitoring and observabilitysrc/session9/scalable_deployment.py
- Container orchestration and service mesh patterns
📝 Multiple Choice Test - Module B¶
Test your understanding of production multi-agent systems deployment and monitoring:
Question 1: Which component in the DistributedMetricsCollector is responsible for storing recently collected metrics for quick access?
A) agent_registry
B) metrics_buffer
C) alert_handlers
D) collection_tasks
Question 2: In Kubernetes deployment for multi-agent systems, what is the primary purpose of the HorizontalPodAutoscaler (HPA)?
A) To manage network routing between agents
B) To automatically scale agent replicas based on resource utilization
C) To provide service discovery for agents
D) To handle agent authentication
Question 3: What is the purpose of Istio's PeerAuthentication with STRICT mTLS mode in multi-agent systems?
A) To provide load balancing between agents
B) To enable automatic scaling of services
C) To enforce encrypted communication between all agent services
D) To monitor agent performance metrics
Question 4: In the distributed tracing system, what does a "span" represent?
A) The total time for a complete multi-agent workflow
B) A single operation or task performed by an individual agent
C) The network bandwidth used between agents
D) The error rate of agent communications
Question 5: Which Kubernetes resource type is used to expose agent pods to other services within the cluster?
A) Deployment
B) ConfigMap
C) Service
D) Secret
Question 6: What is the primary benefit of using Istio's DestinationRule with outlier detection in multi-agent systems?
A) To encrypt data between agents
B) To automatically remove unhealthy agent instances from load balancing
C) To scale agent replicas automatically
D) To provide service discovery capabilities
Question 7: In the agent health scoring algorithm, what happens when an agent has more than 85% memory usage?
A) The health score is reduced by 10 points
B) The health score is reduced by 25 points
C) The agent is automatically restarted
D) An emergency alert is triggered
🧭 Navigation¶
Previous: Session 9 Main
Optional Deep Dive Modules: - 🔬 Module A: Advanced Consensus Algorithms - 📡 Module B: Production Multi-Agent Systems
Next: Session 10 - Enterprise Integration & Production Deployment →