⚙️ Session 8 Module B: Enterprise Architecture & Security¶
⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer and 📝 Participant paths in Session 8 Time Investment: 2-3 hours Outcome: Master enterprise security patterns, authentication systems, and advanced deployment architectures
Advanced Learning Outcomes¶
After completing this module, you will master:
- Enterprise security patterns for production data processing agents
- Advanced authentication and authorization systems
- JWT token management and API security best practices
- Kubernetes deployment patterns for agent systems
Security Essentials - Your Data Pipeline's Digital Fortress¶
Every day, data breaches cost businesses $4.45 million on average globally. Every 11 seconds, there's a new ransomware attack targeting data systems somewhere on the web. Your data processing agent isn't just analyzing patterns - it's guarding against an army of malicious actors seeking access to sensitive data.
Security isn't a feature you add to data systems later; it's the foundation everything else stands on when handling enterprise data:
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
app = FastAPI()
security = HTTPBearer()
Implement JWT token verification for secure data processing API access:
def verify_data_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token for data processing API access."""
try:
payload = jwt.decode(
credentials.credentials,
"your-secret-key",
algorithms=["HS256"]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Data access token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid data access token")
Create secure endpoints with authentication and logging for data processing:
@app.post("/secure-data-process")
async def secure_data_process(
request: DataQueryRequest,
user_info = Depends(verify_data_token)
):
# Log data access for audit trails
logging.info(f"User {user_info.get('user_id')} processed data query")
# Rate limiting for data processing (simplified)
# In production, use Redis-based rate limiting for data workloads
response = await data_production_agent.arun(request.data_query)
return {"analysis_result": response.content, "user": user_info["user_id"]}
Advanced Authentication Patterns¶
OAuth 2.0 Integration for Enterprise Systems¶
Enterprise data processing systems often need to integrate with existing OAuth 2.0 providers for user authentication:
from authlib.integrations.fastapi_oauth2 import OAuth2Token
from authlib.integrations.requests_client import OAuth2Session
import httpx
class EnterpriseOAuthHandler:
def __init__(self, client_id: str, client_secret: str, authorization_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.authorization_url = authorization_url
Implement token validation and user info retrieval:
async def validate_access_token(self, access_token: str):
"""Validate OAuth 2.0 access token with authorization server."""
headers = {"Authorization": f"Bearer {access_token}"}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.authorization_url}/userinfo",
headers=headers,
timeout=10.0
)
if response.status_code == 200:
user_info = response.json()
return {
"valid": True,
"user_id": user_info.get("sub"),
"email": user_info.get("email"),
"roles": user_info.get("roles", [])
}
else:
return {"valid": False, "error": "Invalid token"}
Role-based access control for data processing operations:
def check_data_processing_permission(self, user_roles: list, required_role: str = "data_analyst"):
"""Check if user has permission for data processing operations."""
return required_role in user_roles or "admin" in user_roles
# Usage in FastAPI endpoints
async def verify_oauth_token(authorization: str = Header(...)):
oauth_handler = EnterpriseOAuthHandler(
client_id=os.getenv("OAUTH_CLIENT_ID"),
client_secret=os.getenv("OAUTH_CLIENT_SECRET"),
authorization_url=os.getenv("OAUTH_AUTH_URL")
)
# Extract token from Authorization header
token = authorization.replace("Bearer ", "")
user_info = await oauth_handler.validate_access_token(token)
if not user_info["valid"]:
raise HTTPException(status_code=401, detail="Invalid OAuth token")
return user_info
API Key Management and Rate Limiting¶
For service-to-service communication, API keys provide a simpler authentication method:
import redis.asyncio as redis
from datetime import datetime, timedelta
class APIKeyManager:
def __init__(self):
self.redis_client = redis.from_url("redis://localhost:6379")
self.rate_limit_window = 3600 # 1 hour
self.default_rate_limit = 1000 # requests per hour
Implement API key validation with rate limiting:
async def validate_api_key(self, api_key: str):
"""Validate API key and check rate limits."""
# Check if API key exists (in production, use database)
valid_keys = {
"key_123": {"service": "analytics_service", "rate_limit": 5000},
"key_456": {"service": "reporting_service", "rate_limit": 1000},
}
if api_key not in valid_keys:
return {"valid": False, "error": "Invalid API key"}
key_info = valid_keys[api_key]
# Check rate limit
rate_limit_key = f"rate_limit:{api_key}"
current_count = await self.redis_client.get(rate_limit_key)
if current_count is None:
# First request in this window
await self.redis_client.setex(
rate_limit_key,
self.rate_limit_window,
1
)
remaining_requests = key_info["rate_limit"] - 1
else:
current_count = int(current_count)
if current_count >= key_info["rate_limit"]:
return {
"valid": False,
"error": "Rate limit exceeded",
"reset_time": self.rate_limit_window
}
# Increment counter
await self.redis_client.incr(rate_limit_key)
remaining_requests = key_info["rate_limit"] - current_count - 1
return {
"valid": True,
"service": key_info["service"],
"remaining_requests": remaining_requests
}
FastAPI dependency for API key authentication:
async def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")):
"""FastAPI dependency for API key validation."""
api_manager = APIKeyManager()
result = await api_manager.validate_api_key(x_api_key)
if not result["valid"]:
raise HTTPException(
status_code=401 if "Invalid" in result["error"] else 429,
detail=result["error"],
headers={"X-RateLimit-Remaining": "0"} if "Rate limit" in result["error"] else {}
)
return result
# Usage in endpoints
@app.post("/api/data-process")
async def api_data_process(
request: DataQueryRequest,
api_info = Depends(verify_api_key)
):
# Log service access
logging.info(f"Service {api_info['service']} processed data query")
response = await data_production_agent.arun(request.data_query)
return {
"analysis_result": response.content,
"service": api_info["service"],
"remaining_requests": api_info["remaining_requests"]
}
Data Security and Encryption¶
Encryption at Rest and in Transit¶
Production data processing systems must encrypt sensitive data both at rest and in transit:
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class DataEncryptionManager:
def __init__(self, password: str = None):
self.password = password or os.getenv("ENCRYPTION_PASSWORD")
self.key = self._derive_key(self.password)
self.cipher_suite = Fernet(self.key)
Implement encryption and decryption methods:
def _derive_key(self, password: str):
"""Derive encryption key from password."""
salt = os.getenv("ENCRYPTION_SALT", "default_salt").encode()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
def encrypt_sensitive_data(self, data: str) -> str:
"""Encrypt sensitive data for storage."""
encrypted_data = self.cipher_suite.encrypt(data.encode())
return base64.urlsafe_b64encode(encrypted_data).decode()
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive data from storage."""
encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted_data = self.cipher_suite.decrypt(encrypted_bytes)
return decrypted_data.decode()
Integration with data processing agent for secure operations:
class SecureDataAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.encryption_manager = DataEncryptionManager()
async def process_sensitive_query(self, query: str, session_id: str):
"""Process query with sensitive data encryption."""
# Encrypt sensitive parts before processing
# This is a simplified example - implement based on your data sensitivity rules
if "personal" in query.lower() or "private" in query.lower():
encrypted_query = self.encryption_manager.encrypt_sensitive_data(query)
logging.info(f"Processing encrypted query for session {session_id}")
# Process with agent
response = await self.agent.arun(encrypted_query, session_id=session_id)
# Decrypt response if needed
if response.content.startswith("encrypted:"):
decrypted_response = self.encryption_manager.decrypt_sensitive_data(
response.content.replace("encrypted:", "")
)
return decrypted_response
return response.content
else:
# Process normally for non-sensitive queries
response = await self.agent.arun(query, session_id=session_id)
return response.content
Secure Configuration Management¶
Production systems need secure configuration management to protect secrets and credentials:
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
import os
class SecureConfigManager:
def __init__(self):
# Support multiple secret backends
self.backend = os.getenv("SECRET_BACKEND", "environment")
if self.backend == "azure_keyvault":
credential = DefaultAzureCredential()
vault_url = os.getenv("AZURE_KEYVAULT_URL")
self.kv_client = SecretClient(vault_url=vault_url, credential=credential)
Implement secret retrieval with fallback mechanisms:
async def get_secret(self, secret_name: str, default_value: str = None):
"""Retrieve secret from configured backend."""
try:
if self.backend == "azure_keyvault":
secret = self.kv_client.get_secret(secret_name)
return secret.value
elif self.backend == "environment":
return os.getenv(secret_name, default_value)
elif self.backend == "file":
# For development/testing - not recommended for production
secret_file = f"/etc/secrets/{secret_name}"
if os.path.exists(secret_file):
with open(secret_file, 'r') as f:
return f.read().strip()
return default_value
else:
raise ValueError(f"Unsupported secret backend: {self.backend}")
except Exception as e:
logging.error(f"Failed to retrieve secret {secret_name}: {e}")
return default_value
async def get_database_config(self):
"""Get database configuration from secure storage."""
return {
"host": await self.get_secret("DATABASE_HOST", "localhost"),
"port": int(await self.get_secret("DATABASE_PORT", "5432")),
"username": await self.get_secret("DATABASE_USERNAME"),
"password": await self.get_secret("DATABASE_PASSWORD"),
"database": await self.get_secret("DATABASE_NAME", "agno_data")
}
Usage in production agent initialization:
async def create_secure_production_agent():
"""Create production agent with secure configuration."""
config_manager = SecureConfigManager()
# Get secure database configuration
db_config = await config_manager.get_database_config()
database_url = f"postgresql://{db_config['username']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}"
# Get API keys securely
openai_api_key = await config_manager.get_secret("OPENAI_API_KEY")
# Create agent with secure configuration
agent = Agent(
name="SecureDataProcessingAgent",
model="gpt-4",
api_key=openai_api_key,
storage=PostgresStorage(database_url),
monitoring=True
)
return agent
Kubernetes Deployment Patterns¶
Basic Kubernetes Deployment for Agno Agents¶
Enterprise production systems often run on Kubernetes for scalability and reliability:
# deployment.yaml for Agno data processing agents
apiVersion: apps/v1
kind: Deployment
metadata:
name: agno-data-agent
labels:
app: agno-data-agent
tier: backend
spec:
replicas: 3
selector:
matchLabels:
app: agno-data-agent
template:
metadata:
labels:
app: agno-data-agent
spec:
containers:
- name: agno-agent
image: agno-data-processing:latest
ports:
- containerPort: 8000
Configure environment variables and resource limits:
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: agno-secrets
key: database-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agno-secrets
key: openai-api-key
- name: REDIS_URL
value: "redis://redis-service:6379"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
Add health checks and security context:
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
Create service and ingress configurations:
apiVersion: v1
kind: Service
metadata:
name: agno-data-service
spec:
selector:
app: agno-data-agent
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIP
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: agno-data-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
tls:
- hosts:
- api.yourcompany.com
secretName: agno-tls
rules:
- host: api.yourcompany.com
http:
paths:
- path: /data-processing
pathType: Prefix
backend:
service:
name: agno-data-service
port:
number: 80
Advanced Kubernetes Security Patterns¶
Production Kubernetes deployments require additional security measures:
# security-policy.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: agno-service-account
namespace: production
automountServiceAccountToken: false
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: production
name: agno-role
rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list"]
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: agno-role-binding
namespace: production
subjects:
- kind: ServiceAccount
name: agno-service-account
namespace: production
roleRef:
kind: Role
name: agno-role
apiGroup: rbac.authorization.k8s.io
Network policies for traffic isolation:
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: agno-network-policy
namespace: production
spec:
podSelector:
matchLabels:
app: agno-data-agent
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
ports:
- protocol: TCP
port: 8000
egress:
- to:
- namespaceSelector:
matchLabels:
name: database
ports:
- protocol: TCP
port: 5432
- to: [] # Allow DNS
ports:
- protocol: UDP
port: 53
Pod security standards configuration:
apiVersion: v1
kind: Pod
metadata:
name: agno-data-agent
spec:
serviceAccountName: agno-service-account
securityContext:
runAsNonRoot: true
runAsUser: 1000
runAsGroup: 1000
fsGroup: 1000
seccompProfile:
type: RuntimeDefault
containers:
- name: agno-agent
image: agno-data-processing:latest
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
capabilities:
drop:
- ALL
Production Readiness Checklist¶
Comprehensive Production Validation¶
Before a data engineer deploys a pipeline that will process petabytes of customer data daily, they go through a rigorous pre-production checklist. Every data source, every processing stage, every quality check must be verified:
Define comprehensive production readiness categories for data processing:
async def validate_data_production_readiness(self):
"""Comprehensive data processing production readiness assessment."""
checklist = {
"✅ Configuration": {
"environment_variables": self._check_env_vars(),
"database_configured": self._check_database(),
"monitoring_enabled": self._check_monitoring()
},
"✅ Performance": {
"response_time": await self._check_response_time(),
"concurrent_handling": await self._check_concurrency(),
"resource_limits": self._check_resource_limits()
}
}
Add reliability and security validation checks for data systems:
"✅ Reliability": {
"error_handling": self._check_error_handling(),
"retry_logic": self._check_retry_logic(),
"graceful_degradation": self._check_degradation()
},
"✅ Security": {
"authentication": self._check_auth(),
"input_validation": self._check_validation(),
"rate_limiting": self._check_rate_limits()
}
}
return checklist
Implement helper methods for validation tailored to data processing:
def _check_env_vars(self) -> bool:
"""Check required environment variables for data processing."""
required_vars = ["DATABASE_URL", "API_KEY", "SECRET_KEY", "DATA_PROCESSING_MODE"]
import os
return all(os.getenv(var) for var in required_vars)
async def _check_response_time(self) -> str:
"""Measure average response time for data processing queries."""
import time
start = time.time()
await self.agent.arun("test data processing query")
duration = time.time() - start
return f"{duration:.2f}s"
def _check_security_headers(self) -> bool:
"""Check if security headers are properly configured."""
# In a real implementation, this would test actual HTTP responses
required_headers = [
"X-Content-Type-Options",
"X-Frame-Options",
"X-XSS-Protection",
"Strict-Transport-Security"
]
# Simplified check - implement based on your security requirements
return True
Quick validation usage example for data processing systems:
# Quick validation for data processing readiness
async def run_production_readiness_check():
checker = DataProductionReadinessChecker(data_production_agent)
readiness = await checker.validate_data_production_readiness()
# Display results
for category, checks in readiness.items():
print(f"\n{category}")
for check_name, result in checks.items():
status = "✅" if result else "❌"
print(f" {status} {check_name}: {result}")
🧭 Navigation¶
Previous: Session 7 - Agent Systems →
Next: Session 9 - Multi-Agent Coordination →