Session 6 - Module B: Enterprise Modular Systems (30 minutes)¶
Prerequisites: Session 6 Core Section Complete
Target Audience: Enterprise architects and production system builders
Cognitive Load: 3 enterprise concepts
Module Overview¶
This module explores production-scale atomic agent systems including enterprise context management, multi-tenant architectures, performance monitoring, and deployment strategies. You'll learn to build robust, scalable atomic systems that can handle enterprise workloads with comprehensive governance and observability.
Learning Objectives¶
By the end of this module, you will:
- Implement enterprise context providers for business-aware agents
- Design multi-tenant atomic agent architectures with isolation and resource management
- Build comprehensive monitoring and observability systems for production deployment
- Create deployment strategies for atomic agent systems in enterprise environments
Part 1: Enterprise Context Management (15 minutes)¶
Advanced Context Provider Systems¶
🗂️ File: src/session6/enterprise_context.py
- Production context management
Enterprise atomic agents require sophisticated context awareness that goes beyond simple configuration. Let's build this system step by step, starting with the foundational components.
Step 1: Essential Imports for Enterprise Context¶
First, we establish the foundation with imports that provide strong typing, asynchronous operations, and enterprise-grade logging:
from typing import Dict, List, Any, Optional, Protocol, runtime_checkable
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import asyncio
import logging
from enum import Enum
These imports establish our foundation for enterprise context management, including typing for strong contracts and asyncio for scalable operations.
Step 2: Hierarchical Context Scope System¶
Next, we define the hierarchical context scope system that enables granular control over information access:
class ContextScope(Enum):
"""Scope levels for enterprise context"""
GLOBAL = "global" # Organization-wide context
DIVISION = "division" # Business division context
DEPARTMENT = "department" # Department-specific context
PROJECT = "project" # Project-level context
USER = "user" # User-specific context
SESSION = "session" # Session-specific context
This scope hierarchy allows agents to access context at appropriate organizational levels, ensuring security and relevance. Each scope level provides increasing specificity and decreasing access breadth.
Step 3: Context Metadata Structure¶
Now we need metadata structures to track context lifecycle, ownership, and access control:
@dataclass
class ContextMetadata:
"""Metadata for context entries"""
created_at: datetime
created_by: str
scope: ContextScope
priority: int = 5 # 1-10, higher is more important
expires_at: Optional[datetime] = None
tags: List[str] = field(default_factory=list)
access_control: Dict[str, List[str]] = field(default_factory=dict)
The metadata structure provides audit trails, expiration management, and access control - critical for enterprise compliance. Priority levels help agents understand which context takes precedence when conflicts arise.
Step 4: Context Provider Protocol Definition¶
Finally, we define the contract that all context providers must implement:
@runtime_checkable
class EnterpriseContextProvider(Protocol):
"""Protocol for enterprise context providers"""
async def get_context(self, scope: ContextScope,
context_key: str,
user_id: Optional[str] = None) -> Dict[str, Any]:
"""Retrieve context for specified scope and key"""
...
async def set_context(self, scope: ContextScope,
context_key: str,
context_data: Dict[str, Any],
metadata: ContextMetadata) -> bool:
"""Set context with metadata and access control"""
...
async def invalidate_context(self, scope: ContextScope,
context_key: str) -> bool:
"""Invalidate context entry"""
...
This protocol ensures all context providers implement consistent interfaces for retrieving, setting, and invalidating context data.
Step 5: Policy Context Provider Foundation¶
Now let's implement the policy context provider that manages enterprise compliance and governance rules:
class PolicyContextProvider:
"""Enterprise policy and compliance context provider"""
def __init__(self, policy_store_config: Dict[str, Any]):
self.policy_store = policy_store_config
self.policy_cache = {}
self.cache_ttl = timedelta(minutes=30)
self.logger = logging.getLogger(__name__)
The policy provider manages enterprise governance rules with configurable caching to balance performance with policy freshness. The 30-minute cache TTL ensures policies stay current while reducing database load.
Step 6: Intelligent Policy Retrieval with Caching¶
The main context retrieval method implements intelligent caching to optimize performance:
async def get_context(self, scope: ContextScope,
context_key: str,
user_id: Optional[str] = None) -> Dict[str, Any]:
"""Retrieve policy context with caching and user-specific rules"""
cache_key = f"{scope.value}:{context_key}:{user_id or 'anonymous'}"
# Check cache first for performance optimization
if cache_key in self.policy_cache:
cached_entry = self.policy_cache[cache_key]
if datetime.now() - cached_entry["timestamp"] < self.cache_ttl:
return cached_entry["data"]
This caching strategy creates unique keys for each scope, context, and user combination, ensuring proper isolation while maximizing cache hits.
Step 7: Policy Fetching and Cache Management¶
When cache misses occur, we fetch fresh policy data and update the cache:
# Fetch policy context from authoritative source
policy_context = await self._fetch_policy_context(scope, context_key, user_id)
# Cache the result with timestamp for TTL management
self.policy_cache[cache_key] = {
"data": policy_context,
"timestamp": datetime.now()
}
return policy_context
This approach ensures agents always get current policy information while maintaining optimal performance through intelligent caching.
Step 8: Core Policy Assembly¶
The core policy fetching logic assembles comprehensive enterprise governance rules:
async def _fetch_policy_context(self, scope: ContextScope,
context_key: str,
user_id: Optional[str]) -> Dict[str, Any]:
"""Fetch policy context from enterprise systems"""
base_policies = {
"data_handling": {
"encryption_required": True,
"retention_period_days": 2555, # 7 years
"anonymization_required": True,
"cross_border_transfer": "restricted"
}
}
The data handling policies ensure compliance with privacy regulations like GDPR. The 7-year retention period aligns with typical financial compliance requirements.
Step 9: AI Governance and Security Policies¶
Next, we add AI governance and security policies that ensure responsible AI deployment:
"ai_governance": {
"model_approval_required": True,
"human_oversight_required": True,
"bias_monitoring_enabled": True,
"explainability_required": True
},
"security": {
"authentication_required": True,
"authorization_levels": ["read", "write", "admin"],
"audit_logging_required": True,
"session_timeout_minutes": 60
}
These policies establish enterprise-wide standards for AI governance and security, ensuring agents operate within approved guardrails.
Step 10: Policy Override and Assembly¶
The system then applies contextual overrides based on user roles and scope:
# Apply user-specific overrides for role-based customization
if user_id:
user_policies = await self._get_user_specific_policies(user_id)
base_policies.update(user_policies)
# Apply scope-specific policies for contextual enforcement
scope_policies = await self._get_scope_policies(scope, context_key)
base_policies.update(scope_policies)
return {
"policies": base_policies,
"enforcement_level": "strict",
"last_updated": datetime.now().isoformat(),
"policy_version": "2025.1",
"compliance_frameworks": ["GDPR", "SOX", "HIPAA", "ISO27001"]
}
This layered approach allows for flexible policy application while maintaining strict baseline standards.
Step 11: Role-Based Policy Customization¶
The policy system provides role-based customization to ensure users get appropriate permissions:
async def _get_user_specific_policies(self, user_id: str) -> Dict[str, Any]:
"""Get user-specific policy overrides"""
# Get user roles from identity management system
user_roles = await self._get_user_roles(user_id)
user_policies = {}
This method builds role-specific policy overrides that extend or modify base policies based on user responsibilities.
Step 12: Data Scientist Role Permissions¶
Data scientists receive specialized permissions for model development and experimentation:
if "data_scientist" in user_roles:
user_policies["ai_governance"] = {
"model_experimentation_allowed": True,
"data_sampling_allowed": True,
"model_performance_tracking": True
}
These permissions enable data scientists to develop and test models while maintaining governance oversight.
Step 13: Administrative Role Permissions¶
Administrators receive elevated permissions for system management:
if "admin" in user_roles:
user_policies["security"] = {
"elevated_permissions": True,
"system_configuration_access": True,
"user_management_access": True
}
return user_policies
This role-based approach ensures that different user types get appropriate policy configurations based on their organizational responsibilities.
Step 14: Enterprise Identity Integration¶
The role lookup system integrates with enterprise identity management:
async def _get_user_roles(self, user_id: str) -> List[str]:
"""Get user roles from identity management system"""
# Simulate role lookup from enterprise identity provider
role_mapping = {
"user_123": ["data_scientist", "team_lead"],
"user_456": ["admin", "security_officer"],
"user_789": ["business_analyst", "read_only"]
}
return role_mapping.get(user_id, ["user"])
In production, this would integrate with systems like Active Directory, LDAP, or modern identity providers like Okta or Auth0.
Step 15: Business Context Provider Foundation¶
Next, we implement the business context provider that gives agents awareness of market and operational conditions:
class BusinessContextProvider:
"""Business context provider for market and operational awareness"""
def __init__(self, business_data_sources: Dict[str, Any]):
self.data_sources = business_data_sources
self.context_cache = {}
self.logger = logging.getLogger(__name__)
The business context provider connects agents to real-world business intelligence, making them aware of market conditions and operational constraints. This enables context-aware decision making.
Step 16: Business Context Routing¶
The main context retrieval method routes to specialized context types based on the requested information:
async def get_context(self, scope: ContextScope,
context_key: str,
user_id: Optional[str] = None) -> Dict[str, Any]:
"""Retrieve business context including market conditions and operational state"""
business_context = {}
if context_key == "market_conditions":
business_context = await self._get_market_context()
elif context_key == "operational_metrics":
business_context = await self._get_operational_context()
elif context_key == "financial_constraints":
business_context = await self._get_financial_context(scope)
elif context_key == "regulatory_environment":
business_context = await self._get_regulatory_context()
return business_context
This routing approach allows agents to request specific types of business intelligence based on their decision-making needs.
Step 17: Market Intelligence Context¶
The market context method provides agents with comprehensive market intelligence:
async def _get_market_context(self) -> Dict[str, Any]:
"""Get current market conditions and trends"""
return {
"market_sentiment": "cautiously_optimistic",
"economic_indicators": {
"gdp_growth": 2.3,
"unemployment_rate": 3.8,
"inflation_rate": 2.1,
"interest_rates": 5.25
}
}
These economic indicators provide agents with fundamental market conditions that influence business decisions.
Step 18: Industry Trends and Competitive Analysis¶
Next, we add industry-specific trends and competitive intelligence:
"industry_trends": {
"ai_adoption_rate": "accelerating",
"automation_investment": "increasing",
"regulatory_scrutiny": "heightened",
"talent_shortage": "critical"
},
"competitive_landscape": {
"market_competition": "high",
"barrier_to_entry": "medium",
"technology_disruption_risk": "high",
"customer_switching_cost": "low"
},
"last_updated": datetime.now().isoformat(),
"data_confidence": 0.85
}
This market intelligence enables agents to make business-aware decisions based on current economic and competitive conditions.
Step 19: Operational System Health Context¶
The operational context provides real-time system health and performance data:
async def _get_operational_context(self) -> Dict[str, Any]:
"""Get current operational metrics and system health"""
return {
"system_health": {
"overall_status": "healthy",
"cpu_utilization": 65.2,
"memory_utilization": 78.1,
"storage_utilization": 82.5,
"network_latency_ms": 23.4
}
}
System health metrics enable agents to make resource-aware decisions and avoid overloading infrastructure.
Step 20: Performance and Capacity Metrics¶
Next, we add performance metrics and capacity planning data:
"performance_metrics": {
"requests_per_second": 1547,
"average_response_time_ms": 125,
"error_rate_percent": 0.12,
"uptime_percent": 99.97
},
"capacity_planning": {
"current_load_percent": 72,
"projected_growth_30d": 15.3,
"scaling_threshold_percent": 85,
"time_to_scale_minutes": 8
}
These metrics help agents understand current system performance and capacity constraints.
Step 21: Operational Cost Context¶
Finally, we include operational cost data for budget-aware decision making:
"operational_costs": {
"hourly_compute_cost": 47.23,
"daily_operational_cost": 1133.52,
"cost_per_request": 0.0012,
"budget_utilization_percent": 67.8
}
}
This operational context enables agents to make resource-aware decisions and respond to system constraints while considering cost implications.
Step 22: Enterprise Context Orchestrator Foundation¶
Now we implement the orchestrator that coordinates multiple context providers:
class EnterpriseContextOrchestrator:
"""Orchestrates multiple context providers for comprehensive enterprise awareness"""
def __init__(self):
self.context_providers: Dict[str, EnterpriseContextProvider] = {}
self.context_hierarchy = {}
self.access_policies = {}
self.logger = logging.getLogger(__name__)
The orchestrator manages multiple context providers, enabling agents to access comprehensive enterprise intelligence from a single interface.
Step 23: Context Provider Registration¶
Provider registration establishes priority and access control for different context sources:
def register_provider(self, provider_name: str,
provider: EnterpriseContextProvider,
priority: int = 5):
"""Register a context provider with priority"""
self.context_providers[provider_name] = provider
self.context_hierarchy[provider_name] = {
"priority": priority,
"enabled": True,
"last_accessed": None
}
Priority levels allow the orchestrator to determine which providers to consult first and which context takes precedence during conflicts.
Step 24: Comprehensive Context Aggregation¶
The core orchestration method aggregates context from multiple providers based on priority:
async def get_comprehensive_context(self, scope: ContextScope,
context_keys: List[str],
user_id: Optional[str] = None) -> Dict[str, Any]:
"""Get comprehensive context from multiple providers"""
comprehensive_context = {
"metadata": {
"scope": scope.value,
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"providers_consulted": []
},
"context": {}
}
The method starts by creating a comprehensive context structure with metadata for audit and tracking purposes.
Step 25: Provider Priority Sorting¶
Next, we sort providers by priority to ensure high-priority context sources are consulted first:
# Sort providers by priority (highest first)
sorted_providers = sorted(
self.context_providers.items(),
key=lambda x: self.context_hierarchy[x[0]]["priority"],
reverse=True
)
This sorting ensures that critical context providers (like security policies) are consulted before optional ones (like market data).
Step 26: Context Collection with Error Handling¶
The method iterates through providers in priority order, collecting context while handling failures gracefully:
for context_key in context_keys:
context_data = {}
for provider_name, provider in sorted_providers:
if not self.context_hierarchy[provider_name]["enabled"]:
continue
try:
provider_context = await provider.get_context(scope, context_key, user_id)
if provider_context:
context_data[provider_name] = provider_context
if provider_name not in comprehensive_context["metadata"]["providers_consulted"]:
comprehensive_context["metadata"]["providers_consulted"].append(provider_name)
# Update last accessed time for monitoring
self.context_hierarchy[provider_name]["last_accessed"] = datetime.now()
except Exception as e:
self.logger.error(f"Provider {provider_name} failed for {context_key}: {str(e)}")
continue
comprehensive_context["context"][context_key] = context_data
return comprehensive_context
This approach ensures agents receive all available context even if some providers fail, with proper error logging for troubleshooting.
Step 27: Context Integration for Agent Prompts¶
The orchestrator also provides context integration capabilities for agent prompts:
def apply_context_to_prompt(self, base_prompt: str,
context: Dict[str, Any],
context_integration_strategy: str = "append") -> str:
"""Apply context to agent prompts using specified strategy"""
if context_integration_strategy == "append":
return self._append_context_strategy(base_prompt, context)
elif context_integration_strategy == "inject":
return self._inject_context_strategy(base_prompt, context)
elif context_integration_strategy == "template":
return self._template_context_strategy(base_prompt, context)
else:
return base_prompt
This flexible approach allows different context integration strategies based on agent requirements and prompt structure.
Step 28: Context Append Strategy¶
The append strategy provides a simple but effective way to add context to agent prompts:
def _append_context_strategy(self, prompt: str, context: Dict[str, Any]) -> str:
"""Append context information to the end of the prompt"""
context_section = "\n\n--- Enterprise Context ---\n"
for context_key, provider_data in context.get("context", {}).items():
context_section += f"\n{context_key.title()} Context:\n"
for provider_name, data in provider_data.items():
if isinstance(data, dict):
context_section += f"• {provider_name}: {self._format_context_data(data)}\n"
return prompt + context_section
This strategy cleanly separates base instructions from enterprise context, making prompts easy to read and understand.
Step 29: Context Data Formatting¶
The formatting utility ensures context data is presented in a digestible format for language models:
def _format_context_data(self, data: Dict[str, Any]) -> str:
"""Format context data for prompt inclusion"""
if not data:
return "No data available"
formatted_items = []
for key, value in data.items():
if isinstance(value, dict):
formatted_items.append(f"{key}: {len(value)} items")
elif isinstance(value, list):
formatted_items.append(f"{key}: {len(value)} items")
else:
formatted_items.append(f"{key}: {value}")
return ", ".join(formatted_items[:3]) # Limit to 3 items to avoid prompt bloat
This formatting approach prevents context information from overwhelming the agent prompt while providing essential details.
Part 2: Multi-Tenant Architecture and Monitoring (15 minutes)¶
Production Deployment Patterns¶
🗂️ File: src/session6/enterprise_deployment.py
- Production deployment systems
Enterprise multi-tenant architectures require sophisticated isolation, resource management, and monitoring. Let's build this production-grade system step by step.
Step 30: Multi-Tenant Foundation Imports¶
First, we establish the imports needed for multi-tenant operations:
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncio
import logging
import json
from enum import Enum
import threading
import queue
These imports provide the foundation for multi-tenant operations, including concurrency controls, resource monitoring, and thread-safe operations.
Step 31: Tenant Isolation Levels¶
Enterprise environments require different levels of tenant isolation based on security and compliance needs:
class TenantIsolationLevel(Enum):
"""Levels of tenant isolation"""
SHARED = "shared" # Shared infrastructure
NAMESPACE = "namespace" # Logical separation
DEDICATED = "dedicated" # Dedicated resources
PHYSICAL = "physical" # Physical separation
These isolation levels provide flexibility from cost-effective shared resources to compliance-required physical separation.
Step 32: Tenant Configuration Schema¶
Each tenant requires comprehensive configuration management:
@dataclass
class TenantConfiguration:
"""Configuration for enterprise tenant"""
tenant_id: str
tenant_name: str
isolation_level: TenantIsolationLevel
resource_limits: Dict[str, Any]
security_policies: Dict[str, Any]
compliance_requirements: List[str]
billing_configuration: Dict[str, Any]
created_at: datetime = field(default_factory=datetime.now)
This configuration captures all aspects of tenant management from security policies to billing configurations and compliance requirements.
Step 33: Resource Quota Management¶
Resource quotas prevent any single tenant from monopolizing system resources:
@dataclass
class ResourceQuota:
"""Resource quotas for tenant isolation"""
max_concurrent_agents: int = 10
max_memory_mb: int = 2048
max_cpu_cores: float = 2.0
max_storage_gb: int = 10
max_requests_per_minute: int = 1000
max_execution_time_seconds: int = 300
These quotas ensure fair resource allocation and prevent resource exhaustion across the multi-tenant system.
Step 34: Multi-Tenant Orchestrator Foundation¶
The orchestrator manages all aspects of multi-tenant agent execution:
class MultiTenantAtomicOrchestrator:
"""Enterprise multi-tenant orchestrator for atomic agents"""
def __init__(self):
self.tenants: Dict[str, TenantConfiguration] = {}
self.tenant_resources: Dict[str, Dict[str, Any]] = {}
self.resource_monitors: Dict[str, 'ResourceMonitor'] = {}
self.global_metrics = {}
self.logger = logging.getLogger(__name__)
The orchestrator maintains separate tracking for each tenant's configuration, resources, and monitoring systems, ensuring complete isolation.
Step 35: Tenant Registration Setup¶
Tenant registration sets up comprehensive isolation and monitoring:
def register_tenant(self, tenant_config: TenantConfiguration):
"""Register a new tenant with isolated resources"""
tenant_id = tenant_config.tenant_id
self.tenants[tenant_id] = tenant_config
# Initialize isolated tenant resource tracking
self.tenant_resources[tenant_id] = {
"active_agents": {},
"resource_usage": {
"memory_mb": 0,
"cpu_usage": 0.0,
"storage_gb": 0,
"requests_count": 0,
"last_reset": datetime.now()
}
}
This initial setup creates isolated resource tracking for each tenant to prevent cross-tenant interference.
Step 36: Performance Metrics and Monitoring Setup¶
Next, we initialize performance tracking and dedicated monitoring:
"performance_metrics": {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"average_response_time": 0.0,
"last_24h_usage": []
}
}
# Set up dedicated resource monitor for this tenant
self.resource_monitors[tenant_id] = ResourceMonitor(
tenant_id,
tenant_config.resource_limits.get("quota", ResourceQuota())
)
self.logger.info(f"Registered tenant: {tenant_id} with isolation: {tenant_config.isolation_level.value}")
This registration process ensures each tenant gets isolated resource tracking and dedicated monitoring for comprehensive observability.
Step 37: Tenant-Isolated Agent Execution¶
The main execution method enforces tenant isolation and resource limits:
async def execute_agent_for_tenant(self, tenant_id: str,
agent_id: str,
input_data: Any,
execution_context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Execute atomic agent with tenant isolation and resource management"""
if tenant_id not in self.tenants:
raise ValueError(f"Tenant {tenant_id} not registered")
tenant_config = self.tenants[tenant_id]
resource_monitor = self.resource_monitors[tenant_id]
This method starts by validating tenant registration and retrieving the appropriate configuration and monitoring systems.
Step 38: Pre-Execution Resource Validation¶
Before executing any agent, we perform comprehensive resource availability checks:
# Pre-execution resource validation
resource_check = await resource_monitor.check_resource_availability(execution_context or {})
if not resource_check["available"]:
return {
"status": "rejected",
"reason": "Resource limits exceeded",
"limits_exceeded": resource_check["limits_exceeded"],
"current_usage": resource_check["current_usage"]
}
This pre-execution check prevents resource violations before any work begins, protecting system stability and tenant isolation.
Step 39: Resource Reservation and Context Preparation¶
The execution process includes comprehensive resource tracking:
execution_start = datetime.now()
try:
# Reserve resources for this execution
resource_reservation = await resource_monitor.reserve_resources(execution_context or {})
# Prepare tenant-specific context
tenant_context = await self._prepare_tenant_context(tenant_id, execution_context or {})
This preparation phase reserves necessary resources and builds tenant-specific context before execution begins.
Step 40: Isolated Execution and Success Tracking¶
Next, we execute the agent with full isolation and track successful completions:
# Execute with full tenant isolation
result = await self._execute_with_tenant_isolation(
tenant_id, agent_id, input_data, tenant_context
)
# Record successful execution metrics
execution_time = (datetime.now() - execution_start).total_seconds()
await self._record_execution_metrics(
tenant_id, "success", execution_time, resource_reservation
)
return {
"status": "success",
"result": result,
"execution_time": execution_time,
"resource_usage": resource_reservation,
"tenant_id": tenant_id
}
This approach ensures every execution is tracked and attributed to the correct tenant with comprehensive metrics.
Step 41: Error Handling and Metrics Recording¶
Proper error handling ensures resources are always released:
except Exception as e:
execution_time = (datetime.now() - execution_start).total_seconds()
await self._record_execution_metrics(
tenant_id, "error", execution_time, {}
)
self.logger.error(f"Agent execution failed for tenant {tenant_id}: {str(e)}")
return {
"status": "error",
"error": str(e),
"execution_time": execution_time,
"tenant_id": tenant_id
}
Error handling includes metrics recording to track failure patterns and help with system optimization.
Step 42: Resource Cleanup Guarantee¶
The finally block ensures resources are always released:
finally:
# Always release reserved resources
if 'resource_reservation' in locals():
await resource_monitor.release_resources(resource_reservation)
The finally block ensures resources are always released, preventing resource leaks even when errors occur during execution.
Step 43: Tenant Context Preparation¶
Tenant context preparation ensures proper isolation and compliance:
async def _prepare_tenant_context(self, tenant_id: str,
execution_context: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare tenant-specific context for agent execution"""
tenant_config = self.tenants[tenant_id]
# Build comprehensive tenant context
tenant_context = {
"tenant_id": tenant_id,
"tenant_name": tenant_config.tenant_name,
"isolation_level": tenant_config.isolation_level.value,
"security_policies": tenant_config.security_policies,
"compliance_requirements": tenant_config.compliance_requirements
}
This initial context provides agents with tenant identification and core policy information.
Step 44: Execution Environment Context¶
Next, we add execution environment details and merge with provided context:
"execution_environment": {
"resource_limits": tenant_config.resource_limits,
"execution_timestamp": datetime.now().isoformat(),
"execution_id": f"{tenant_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
}
}
# Merge with provided execution context
tenant_context.update(execution_context)
return tenant_context
This context provides agents with tenant-specific policies, limits, and compliance requirements, ensuring they operate within appropriate boundaries.
Step 45: Resource Monitor Foundation¶
The resource monitor enforces tenant quotas and tracks usage:
class ResourceMonitor:
"""Monitors and enforces resource usage for tenant isolation"""
def __init__(self, tenant_id: str, resource_quota: ResourceQuota):
self.tenant_id = tenant_id
self.resource_quota = resource_quota
self.current_usage = {
"concurrent_agents": 0,
"memory_mb": 0,
"cpu_cores": 0.0,
"storage_gb": 0,
"requests_in_window": 0,
"window_start": datetime.now()
}
self.usage_history = []
self.lock = threading.Lock()
The monitor maintains real-time usage tracking with thread-safe operations for concurrent access, ensuring accurate resource accounting.
Step 46: Rate Limiting Window Management¶
The monitor performs comprehensive resource availability checks:
async def check_resource_availability(self, execution_context: Dict[str, Any]) -> Dict[str, Any]:
"""Check if resources are available for new execution"""
with self.lock:
# Reset request rate window if needed
if datetime.now() - self.current_usage["window_start"] > timedelta(minutes=1):
self.current_usage["requests_in_window"] = 0
self.current_usage["window_start"] = datetime.now()
estimated_requirements = self._estimate_resource_requirements(execution_context)
limits_exceeded = []
The monitor implements sliding window rate limiting to prevent request rate violations.
Step 47: Systematic Resource Quota Checking¶
Next, we check each resource quota systematically:
# Check each resource quota systematically
if (self.current_usage["concurrent_agents"] + 1) > self.resource_quota.max_concurrent_agents:
limits_exceeded.append("concurrent_agents")
if (self.current_usage["memory_mb"] + estimated_requirements["memory_mb"]) > self.resource_quota.max_memory_mb:
limits_exceeded.append("memory")
if (self.current_usage["cpu_cores"] + estimated_requirements["cpu_cores"]) > self.resource_quota.max_cpu_cores:
limits_exceeded.append("cpu")
if (self.current_usage["requests_in_window"] + 1) > self.resource_quota.max_requests_per_minute:
limits_exceeded.append("request_rate")
This systematic checking ensures all resource types are validated before allowing execution.
Step 48: Resource Availability Response¶
Finally, we return comprehensive availability information:
return {
"available": len(limits_exceeded) == 0,
"limits_exceeded": limits_exceeded,
"current_usage": self.current_usage.copy(),
"estimated_requirements": estimated_requirements
}
This comprehensive check ensures no resource quotas are violated before execution begins, with detailed feedback for troubleshooting.
Step 49: Atomic Resource Reservation¶
The monitor provides atomic resource reservation operations:
async def reserve_resources(self, execution_context: Dict[str, Any]) -> Dict[str, Any]:
"""Reserve resources for execution"""
estimated_requirements = self._estimate_resource_requirements(execution_context)
with self.lock:
# Atomically update current usage
self.current_usage["concurrent_agents"] += 1
self.current_usage["memory_mb"] += estimated_requirements["memory_mb"]
self.current_usage["cpu_cores"] += estimated_requirements["cpu_cores"]
self.current_usage["requests_in_window"] += 1
The atomic update ensures all resource counters are updated consistently within the lock.
Step 50: Reservation Tracking¶
Next, we create a reservation record for audit and cleanup purposes:
reservation = {
"reservation_id": f"res_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"memory_mb": estimated_requirements["memory_mb"],
"cpu_cores": estimated_requirements["cpu_cores"],
"reserved_at": datetime.now()
}
return reservation
Resource reservation is atomic and tracked with unique IDs for audit purposes and proper cleanup.
Step 51: Safe Resource Release¶
Resource release includes safety checks to prevent negative usage:
async def release_resources(self, reservation: Dict[str, Any]):
"""Release reserved resources"""
with self.lock:
self.current_usage["concurrent_agents"] -= 1
self.current_usage["memory_mb"] -= reservation["memory_mb"]
self.current_usage["cpu_cores"] -= reservation["cpu_cores"]
# Safety checks to prevent negative usage
self.current_usage["memory_mb"] = max(0, self.current_usage["memory_mb"])
self.current_usage["cpu_cores"] = max(0.0, self.current_usage["cpu_cores"])
self.current_usage["concurrent_agents"] = max(0, self.current_usage["concurrent_agents"])
These safety checks protect against accounting errors that could cause negative resource usage, maintaining data integrity.
Step 52: Base Resource Estimation¶
The monitor estimates resource requirements based on execution context:
def _estimate_resource_requirements(self, execution_context: Dict[str, Any]) -> Dict[str, Any]:
"""Estimate resource requirements based on execution context"""
# Base resource requirements for typical executions
base_requirements = {
"memory_mb": 256,
"cpu_cores": 0.5,
"estimated_duration": 30 # seconds
}
# Extract context hints for resource scaling
complexity = execution_context.get("complexity", "medium")
model_size = execution_context.get("model_size", "standard")
The base requirements provide conservative estimates that work for most typical agent executions.
Step 53: Dynamic Resource Scaling¶
Next, we adjust requirements based on execution context hints:
# Scale up for high complexity tasks
if complexity == "high":
base_requirements["memory_mb"] *= 2
base_requirements["cpu_cores"] *= 1.5
base_requirements["estimated_duration"] *= 2
# Scale up for large models
if model_size == "large":
base_requirements["memory_mb"] *= 1.5
base_requirements["cpu_cores"] *= 1.2
return base_requirements
This estimation helps prevent resource exhaustion by accurately predicting execution requirements based on workload characteristics.
Step 54: Enterprise Monitoring System Foundation¶
The monitoring system provides comprehensive observability for multi-tenant operations:
class EnterpriseMonitoringSystem:
"""Comprehensive monitoring for enterprise atomic agent systems"""
def __init__(self):
self.metrics_store = {}
self.alert_thresholds = {}
self.alert_queue = queue.Queue()
self.monitoring_enabled = True
self.logger = logging.getLogger(__name__)
The monitoring system maintains metrics storage, alerting capabilities, and configurable thresholds for comprehensive system observability.
Step 55: Monitoring Configuration¶
Configuration allows customization of monitoring behavior and alert thresholds:
def configure_monitoring(self, monitoring_config: Dict[str, Any]):
"""Configure enterprise monitoring parameters"""
self.alert_thresholds = {
"error_rate_percent": monitoring_config.get("error_rate_threshold", 5.0),
"response_time_ms": monitoring_config.get("response_time_threshold", 1000),
"resource_utilization_percent": monitoring_config.get("resource_threshold", 80.0),
"tenant_quota_utilization_percent": monitoring_config.get("quota_threshold", 90.0)
}
self.monitoring_enabled = monitoring_config.get("enabled", True)
These configurable thresholds allow different alerting strategies for different enterprise environments and SLA requirements.
Step 56: Execution Metrics Recording Setup¶
The monitoring system records detailed execution metrics for analysis and alerting:
async def record_execution_metrics(self, tenant_id: str,
agent_id: str,
execution_result: Dict[str, Any]):
"""Record execution metrics for monitoring and alerting"""
if not self.monitoring_enabled:
return
timestamp = datetime.now()
# Create comprehensive metrics record
metrics_record = {
"tenant_id": tenant_id,
"agent_id": agent_id,
"timestamp": timestamp.isoformat(),
"execution_time": execution_result.get("execution_time", 0),
"status": execution_result.get("status", "unknown"),
"resource_usage": execution_result.get("resource_usage", {}),
"error": execution_result.get("error")
}
The metrics record captures all essential execution information for comprehensive analysis.
Step 57: Metrics Storage and Alert Processing¶
Next, we store metrics and trigger alert processing:
# Store metrics with proper indexing
metrics_key = f"{tenant_id}:{agent_id}"
if metrics_key not in self.metrics_store:
self.metrics_store[metrics_key] = []
self.metrics_store[metrics_key].append(metrics_record)
# Trigger alert checking
await self._check_alert_conditions(tenant_id, agent_id, metrics_record)
# Maintain rolling 24-hour window
cutoff_time = timestamp - timedelta(hours=24)
self.metrics_store[metrics_key] = [
m for m in self.metrics_store[metrics_key]
if datetime.fromisoformat(m["timestamp"]) > cutoff_time
]
This recording system maintains a rolling window of metrics while triggering real-time alerting and cleanup.
Step 58: Alert Condition Analysis Setup¶
The monitoring system analyzes metrics to trigger appropriate alerts:
async def _check_alert_conditions(self, tenant_id: str,
agent_id: str,
current_metrics: Dict[str, Any]):
"""Check if current metrics trigger any alerts"""
metrics_key = f"{tenant_id}:{agent_id}"
recent_metrics = self.metrics_store.get(metrics_key, [])
if len(recent_metrics) < 5: # Need sufficient data points for analysis
return
# Analyze recent execution patterns
recent_executions = recent_metrics[-10:] # Last 10 executions
error_count = len([m for m in recent_executions if m["status"] == "error"])
error_rate = (error_count / len(recent_executions)) * 100
The alert system requires sufficient data points to make reliable assessments and avoid false positives.
Step 59: Error Rate Alert Processing¶
Next, we check error rate thresholds and generate alerts when necessary:
if error_rate > self.alert_thresholds["error_rate_percent"]:
self.alert_queue.put({
"type": "error_rate_high",
"tenant_id": tenant_id,
"agent_id": agent_id,
"error_rate": error_rate,
"threshold": self.alert_thresholds["error_rate_percent"],
"timestamp": datetime.now().isoformat()
})
Error rate alerts help identify problematic agents or tenants before they affect system stability.
Step 60: Response Time Alert Processing¶
Finally, we monitor response time performance:
# Monitor response time performance
avg_response_time = sum(m["execution_time"] for m in recent_executions) / len(recent_executions)
avg_response_time_ms = avg_response_time * 1000
if avg_response_time_ms > self.alert_thresholds["response_time_ms"]:
self.alert_queue.put({
"type": "response_time_high",
"tenant_id": tenant_id,
"agent_id": agent_id,
"avg_response_time_ms": avg_response_time_ms,
"threshold": self.alert_thresholds["response_time_ms"],
"timestamp": datetime.now().isoformat()
})
This intelligent alerting system identifies performance degradation and error rate spikes before they impact business operations.
Module Summary¶
You've now mastered enterprise-scale atomic agent systems:
✅ Enterprise Context Management: Implemented sophisticated context providers for policy, business, and operational awareness
✅ Multi-Tenant Architecture: Built tenant isolation with resource quotas and security boundaries
✅ Production Monitoring: Created comprehensive observability and alerting systems
✅ Deployment Strategies: Designed scalable deployment patterns for enterprise environments
📝 Multiple Choice Test - Module B¶
Test your understanding of enterprise modular systems and multi-tenant architecture:
Question 1: What types of context does the EnterpriseContextProvider manage for operational awareness? A) User preferences only
B) Business policies, compliance requirements, and operational constraints
C) Database connections only
D) UI configurations
Question 2: How does the multi-tenant agent deployment system ensure tenant isolation? A) Shared resources for all tenants
B) Separate agent instances with resource quotas and security boundaries
C) Single agent serving all tenants
D) Manual tenant switching
Question 3: What metrics does the enterprise monitoring system track for performance analysis? A) CPU usage only
B) Response times, success rates, error patterns, and resource utilization
C) Memory consumption only
D) Network bandwidth
Question 4: How does the AlertManager handle performance threshold violations? A) Silent logging only
B) Configurable alerting with severity levels and notification channels
C) Email notifications only
D) No alerting mechanism
Question 5: What deployment strategies does the enterprise system support for different environments? A) Single deployment mode
B) Blue-green deployments, canary releases, and rolling updates with health checks
C) Manual deployment only
D) Development environment only
Next Steps¶
- Return to Core: Session 6 Main
- Advance to Session 7: First ADK Agent
- Compare with Module A: Advanced Composition Patterns
🗂️ Source Files for Module B:
src/session6/enterprise_context.py
- Production context managementsrc/session6/enterprise_deployment.py
- Multi-tenant deployment systemssrc/session6/monitoring_systems.py
- Comprehensive observability