⚙️ Session 2: Production Memory Systems¶
⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer Path and 📝 Participant Path Time Investment: 2-3 hours Outcome: Master enterprise-grade memory management, state persistence, and context optimization
Advanced Learning Outcomes¶
After completing this production memory systems module, you will master:
- Enterprise-grade memory persistence and recovery strategies
- Advanced context management techniques for long-running conversations
- Memory optimization patterns for high-performance agent deployments
- Distributed memory architectures for multi-agent coordination
- Compliance and audit-ready conversation logging systems
Enterprise Memory Architecture Patterns¶
Production agent systems require sophisticated memory management that goes beyond simple conversation buffers, implementing persistent storage, distributed synchronization, and intelligent context optimization.
Distributed Memory Coordination¶
For multi-agent systems operating across distributed infrastructure, memory synchronization becomes critical for maintaining consistent context and coordination state:
import redis
import json
import uuid
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from langchain.memory import ConversationBufferMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessage
class DistributedMemoryManager:
"""Enterprise memory manager with distributed coordination capabilities"""
def __init__(self, redis_url: str, memory_namespace: str = "agent_memory"):
self.redis_client = redis.from_url(redis_url)
self.namespace = memory_namespace
self.local_cache = {}
self.sync_interval = 30 # seconds
self.last_sync = {}
def create_agent_memory(self, agent_id: str, memory_type: str = "buffer") -> 'DistributedConversationMemory':
"""Create distributed memory instance for specific agent"""
memory_key = f"{self.namespace}:agent:{agent_id}:conversation"
if memory_type == "buffer":
return DistributedConversationMemory(
redis_client=self.redis_client,
memory_key=memory_key,
agent_id=agent_id
)
elif memory_type == "summary":
return DistributedSummaryMemory(
redis_client=self.redis_client,
memory_key=memory_key,
agent_id=agent_id
)
else:
raise ValueError(f"Unsupported memory type: {memory_type}")
def sync_memories_across_agents(self, agent_ids: List[str]) -> Dict[str, Any]:
"""Synchronize memory state across multiple agents for coordination"""
sync_results = {}
for agent_id in agent_ids:
try:
memory_key = f"{self.namespace}:agent:{agent_id}:conversation"
# Get latest memory state
memory_data = self.redis_client.get(memory_key)
if memory_data:
memory_state = json.loads(memory_data)
sync_results[agent_id] = {
"status": "synced",
"message_count": len(memory_state.get("messages", [])),
"last_update": memory_state.get("last_update"),
"context_summary": memory_state.get("context_summary", "")[:100]
}
else:
sync_results[agent_id] = {
"status": "no_memory_found",
"message_count": 0
}
except Exception as e:
sync_results[agent_id] = {
"status": "sync_error",
"error": str(e)
}
return sync_results
def create_shared_context(self, context_id: str, initial_context: Dict[str, Any]) -> str:
"""Create shared context accessible by multiple agents"""
context_key = f"{self.namespace}:shared_context:{context_id}"
shared_context = {
"context_id": context_id,
"created_at": datetime.now().isoformat(),
"data": initial_context,
"access_log": [],
"version": 1
}
self.redis_client.set(context_key, json.dumps(shared_context))
return context_key
def update_shared_context(self, context_id: str, updates: Dict[str, Any], agent_id: str) -> bool:
"""Update shared context with version control and access logging"""
context_key = f"{self.namespace}:shared_context:{context_id}"
try:
# Get current context with lock
current_data = self.redis_client.get(context_key)
if not current_data:
return False
context = json.loads(current_data)
# Update data and metadata
context["data"].update(updates)
context["version"] += 1
context["last_updated"] = datetime.now().isoformat()
context["access_log"].append({
"agent_id": agent_id,
"action": "update",
"timestamp": datetime.now().isoformat(),
"changes": list(updates.keys())
})
# Save updated context
self.redis_client.set(context_key, json.dumps(context))
return True
except Exception as e:
print(f"Failed to update shared context: {e}")
return False
class DistributedConversationMemory(ConversationBufferMemory):
"""Conversation memory with distributed persistence and synchronization"""
def __init__(self, redis_client: redis.Redis, memory_key: str, agent_id: str, max_token_limit: int = 4000):
super().__init__(return_messages=True)
self.redis_client = redis_client
self.memory_key = memory_key
self.agent_id = agent_id
self.max_token_limit = max_token_limit
self.local_cache_ttl = 300 # 5 minutes
self.last_persistence = None
# Load existing memory on initialization
self._load_from_distributed_storage()
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save conversation context to both local and distributed storage"""
# Save to local memory first
super().save_context(inputs, outputs)
# Persist to distributed storage
self._persist_to_distributed_storage()
def _persist_to_distributed_storage(self):
"""Persist current memory state to distributed storage"""
try:
messages_data = []
for message in self.chat_memory.messages:
messages_data.append({
"type": type(message).__name__,
"content": message.content,
"timestamp": datetime.now().isoformat()
})
memory_state = {
"agent_id": self.agent_id,
"messages": messages_data,
"message_count": len(messages_data),
"last_update": datetime.now().isoformat(),
"token_estimate": self._estimate_token_count(),
"context_summary": self._generate_context_summary()
}
# Set with TTL for automatic cleanup
self.redis_client.setex(
self.memory_key,
timedelta(hours=24),
json.dumps(memory_state)
)
self.last_persistence = datetime.now()
except Exception as e:
print(f"Failed to persist memory: {e}")
def _load_from_distributed_storage(self):
"""Load memory state from distributed storage"""
try:
stored_data = self.redis_client.get(self.memory_key)
if not stored_data:
return
memory_state = json.loads(stored_data)
# Reconstruct messages
self.chat_memory.messages = []
for msg_data in memory_state.get("messages", []):
if msg_data["type"] == "HumanMessage":
message = HumanMessage(content=msg_data["content"])
elif msg_data["type"] == "AIMessage":
message = AIMessage(content=msg_data["content"])
else:
continue # Skip unknown message types
self.chat_memory.messages.append(message)
print(f"Loaded {len(self.chat_memory.messages)} messages from distributed storage")
except Exception as e:
print(f"Failed to load memory from distributed storage: {e}")
def _estimate_token_count(self) -> int:
"""Estimate token count for current conversation"""
total_content = ""
for message in self.chat_memory.messages:
total_content += message.content
# Rough estimation: ~4 characters per token
return len(total_content) // 4
def _generate_context_summary(self) -> str:
"""Generate brief summary of current conversation context"""
if not self.chat_memory.messages:
return "Empty conversation"
recent_messages = self.chat_memory.messages[-3:] # Last 3 messages
summary_parts = []
for msg in recent_messages:
content_preview = msg.content[:100] + "..." if len(msg.content) > 100 else msg.content
message_type = "Human" if isinstance(msg, HumanMessage) else "AI"
summary_parts.append(f"{message_type}: {content_preview}")
return " | ".join(summary_parts)
def optimize_memory_for_tokens(self):
"""Optimize memory to stay within token limits"""
current_tokens = self._estimate_token_count()
if current_tokens > self.max_token_limit:
# Remove oldest messages until within limit
while current_tokens > self.max_token_limit and len(self.chat_memory.messages) > 1:
removed_message = self.chat_memory.messages.pop(0)
current_tokens = self._estimate_token_count()
print(f"Removed message to stay within token limit: {removed_message.content[:50]}...")
# Persist optimized memory
self._persist_to_distributed_storage()
Advanced Context Optimization¶
Sophisticated context management ensures agents maintain relevant information while optimizing for performance and cost efficiency.
Intelligent Context Summarization¶
Implement AI-driven context summarization that preserves critical information while reducing token consumption:
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
class IntelligentContextManager:
"""Advanced context management with AI-driven summarization and optimization"""
def __init__(self, llm, context_config: Dict[str, Any] = None):
self.llm = llm
self.config = context_config or {
"max_context_tokens": 2000,
"summary_trigger_tokens": 3000,
"importance_threshold": 0.7,
"context_categories": ["technical", "business", "data_quality", "performance"]
}
self.summarization_chain = self._create_summarization_chain()
self.importance_chain = self._create_importance_chain()
def _create_summarization_chain(self) -> LLMChain:
"""Create specialized chain for intelligent context summarization"""
template = """
You are an expert at summarizing technical conversations while preserving critical information.
Conversation History:
{conversation_text}
Create a comprehensive summary that:
1. Preserves all technical details, metrics, and specific findings
2. Maintains decision points and their reasoning
3. Keeps action items and recommendations
4. Summarizes background context concisely
5. Structures information by importance and category
Categories to preserve: {categories}
Generate a structured summary that maintains technical precision while reducing length by ~70%.
"""
return LLMChain(
llm=self.llm,
prompt=PromptTemplate(
template=template,
input_variables=["conversation_text", "categories"]
)
)
def _create_importance_chain(self) -> LLMChain:
"""Create chain for evaluating message importance"""
template = """
Evaluate the importance of this conversation message for ongoing data engineering context:
Message: {message_text}
Context: {current_context}
Rate importance (0.0-1.0) and explain reasoning:
High importance (0.8-1.0): Critical decisions, specific metrics, error conditions, action items
Medium importance (0.4-0.7): General analysis, contextual information, process descriptions
Low importance (0.0-0.3): Casual remarks, acknowledgments, routine confirmations
Return JSON: {{"importance": 0.X, "reasoning": "explanation", "categories": ["category1", "category2"]}}
"""
return LLMChain(
llm=self.llm,
prompt=PromptTemplate(
template=template,
input_variables=["message_text", "current_context"]
)
)
def optimize_conversation_context(self, messages: List[BaseMessage], current_context: str = "") -> Dict[str, Any]:
"""Intelligently optimize conversation context for efficiency"""
# Estimate current token usage
total_tokens = self._estimate_total_tokens(messages)
if total_tokens <= self.config["max_context_tokens"]:
return {
"optimized_messages": messages,
"summary": None,
"optimization_applied": False,
"token_reduction": 0
}
# Analyze message importance
message_analysis = []
for i, message in enumerate(messages):
try:
importance_result = self.importance_chain.run({
"message_text": message.content[:500], # Limit for efficiency
"current_context": current_context[:200]
})
# Parse importance score (simplified)
importance_score = 0.5 # Default fallback
try:
if "importance" in importance_result:
# Extract numeric score (simplified parsing)
score_str = importance_result.split('"importance":')[1].split(',')[0].strip()
importance_score = float(score_str.replace('"', ''))
except:
pass
message_analysis.append({
"index": i,
"message": message,
"importance": importance_score,
"token_estimate": len(message.content) // 4
})
except Exception as e:
print(f"Error analyzing message importance: {e}")
# Keep message with medium importance as fallback
message_analysis.append({
"index": i,
"message": message,
"importance": 0.5,
"token_estimate": len(message.content) // 4
})
# Sort by importance (descending)
message_analysis.sort(key=lambda x: x["importance"], reverse=True)
# Select most important messages within token limit
optimized_messages = []
token_count = 0
summarization_candidates = []
for analysis in message_analysis:
if (token_count + analysis["token_estimate"] <= self.config["max_context_tokens"]
and analysis["importance"] >= self.config["importance_threshold"]):
optimized_messages.append(analysis["message"])
token_count += analysis["token_estimate"]
else:
summarization_candidates.append(analysis["message"])
# Create summary of less important messages
summary = None
if summarization_candidates:
summary_text = "\n".join([msg.content for msg in summarization_candidates])
try:
summary = self.summarization_chain.run({
"conversation_text": summary_text,
"categories": self.config["context_categories"]
})
except Exception as e:
print(f"Summarization failed: {e}")
summary = "Context summary unavailable due to processing error"
token_reduction = total_tokens - self._estimate_total_tokens(optimized_messages)
return {
"optimized_messages": optimized_messages,
"summary": summary,
"optimization_applied": True,
"token_reduction": token_reduction,
"messages_summarized": len(summarization_candidates),
"messages_retained": len(optimized_messages)
}
def _estimate_total_tokens(self, messages: List[BaseMessage]) -> int:
"""Estimate total tokens for message list"""
total_content = ""
for message in messages:
total_content += message.content
return len(total_content) // 4 # Rough estimation
class ContextAwareMemory(ConversationBufferMemory):
"""Memory with intelligent context optimization"""
def __init__(self, llm, context_manager: IntelligentContextManager = None, **kwargs):
super().__init__(**kwargs)
self.llm = llm
self.context_manager = context_manager or IntelligentContextManager(llm)
self.optimization_history = []
def get_optimized_context(self) -> str:
"""Get optimized conversation context"""
if not self.chat_memory.messages:
return ""
optimization_result = self.context_manager.optimize_conversation_context(
self.chat_memory.messages,
current_context=self._get_current_context_summary()
)
# Record optimization metrics
self.optimization_history.append({
"timestamp": datetime.now().isoformat(),
"optimization_applied": optimization_result["optimization_applied"],
"token_reduction": optimization_result.get("token_reduction", 0),
"messages_retained": optimization_result.get("messages_retained", len(self.chat_memory.messages))
})
# Build optimized context
context_parts = []
if optimization_result["summary"]:
context_parts.append(f"CONVERSATION SUMMARY:\n{optimization_result['summary']}\n")
context_parts.append("RECENT CONVERSATION:")
for message in optimization_result["optimized_messages"]:
message_type = "Human" if isinstance(message, HumanMessage) else "AI"
context_parts.append(f"{message_type}: {message.content}")
return "\n".join(context_parts)
def _get_current_context_summary(self) -> str:
"""Generate brief summary of current conversation state"""
if not self.chat_memory.messages:
return "New conversation"
message_count = len(self.chat_memory.messages)
recent_message = self.chat_memory.messages[-1].content[:100]
return f"Conversation with {message_count} messages. Latest: {recent_message}..."
def get_optimization_stats(self) -> Dict[str, Any]:
"""Get context optimization performance statistics"""
if not self.optimization_history:
return {"optimizations_performed": 0}
total_optimizations = len(self.optimization_history)
applied_optimizations = sum(1 for opt in self.optimization_history if opt["optimization_applied"])
total_token_reduction = sum(opt["token_reduction"] for opt in self.optimization_history)
return {
"optimizations_performed": total_optimizations,
"optimizations_applied": applied_optimizations,
"optimization_rate": f"{applied_optimizations / max(1, total_optimizations):.1%}",
"total_token_reduction": total_token_reduction,
"avg_token_reduction": total_token_reduction / max(1, applied_optimizations)
}
Multi-Tenancy and Isolation¶
Enterprise deployments require robust memory isolation and multi-tenancy support to ensure data privacy and compliance across different organizations and user groups.
Tenant-Isolated Memory Architecture¶
Implement comprehensive tenant isolation for memory systems:
from typing import Set
import hashlib
from enum import Enum
class IsolationLevel(Enum):
STRICT = "strict" # Complete isolation, no shared resources
MODERATE = "moderate" # Shared infrastructure, isolated data
MINIMAL = "minimal" # Basic separation, shared optimizations
class TenantMemoryManager:
"""Enterprise memory manager with comprehensive tenant isolation"""
def __init__(self, redis_client: redis.Redis, encryption_key: str, isolation_level: IsolationLevel = IsolationLevel.STRICT):
self.redis_client = redis_client
self.encryption_key = encryption_key
self.isolation_level = isolation_level
self.tenant_configurations = {}
self.access_control = TenantAccessControl()
def register_tenant(self, tenant_id: str, config: Dict[str, Any]) -> bool:
"""Register new tenant with specific configuration"""
try:
tenant_config = {
"tenant_id": tenant_id,
"created_at": datetime.now().isoformat(),
"isolation_level": self.isolation_level.value,
"memory_quota": config.get("memory_quota", 1000), # MB
"retention_days": config.get("retention_days", 30),
"encryption_enabled": config.get("encryption_enabled", True),
"audit_enabled": config.get("audit_enabled", True),
"allowed_agents": config.get("allowed_agents", []),
"data_residency": config.get("data_residency", "default")
}
# Create tenant namespace
tenant_key = f"tenant:{tenant_id}:config"
self.redis_client.set(tenant_key, json.dumps(tenant_config))
# Initialize tenant memory spaces
self._initialize_tenant_spaces(tenant_id, tenant_config)
self.tenant_configurations[tenant_id] = tenant_config
return True
except Exception as e:
print(f"Failed to register tenant {tenant_id}: {e}")
return False
def create_tenant_memory(self, tenant_id: str, agent_id: str, user_id: str) -> 'TenantIsolatedMemory':
"""Create isolated memory instance for specific tenant, agent, and user"""
if tenant_id not in self.tenant_configurations:
raise ValueError(f"Tenant {tenant_id} not registered")
tenant_config = self.tenant_configurations[tenant_id]
# Validate agent authorization
if tenant_config["allowed_agents"] and agent_id not in tenant_config["allowed_agents"]:
raise PermissionError(f"Agent {agent_id} not authorized for tenant {tenant_id}")
# Create isolated memory key
memory_key = self._generate_isolated_key(tenant_id, agent_id, user_id)
return TenantIsolatedMemory(
redis_client=self.redis_client,
memory_key=memory_key,
tenant_id=tenant_id,
agent_id=agent_id,
user_id=user_id,
encryption_key=self.encryption_key,
tenant_config=tenant_config
)
def _initialize_tenant_spaces(self, tenant_id: str, config: Dict[str, Any]):
"""Initialize isolated spaces for tenant"""
spaces = {
"conversations": f"tenant:{tenant_id}:conversations",
"shared_context": f"tenant:{tenant_id}:shared_context",
"audit_log": f"tenant:{tenant_id}:audit",
"usage_metrics": f"tenant:{tenant_id}:metrics"
}
for space_name, space_key in spaces.items():
self.redis_client.set(
f"{space_key}:initialized",
json.dumps({
"created_at": datetime.now().isoformat(),
"space_type": space_name,
"tenant_id": tenant_id
})
)
def _generate_isolated_key(self, tenant_id: str, agent_id: str, user_id: str) -> str:
"""Generate cryptographically isolated memory key"""
if self.isolation_level == IsolationLevel.STRICT:
# Include all identifiers for complete isolation
key_data = f"{tenant_id}:{agent_id}:{user_id}:{self.encryption_key}"
elif self.isolation_level == IsolationLevel.MODERATE:
# Tenant and user isolation
key_data = f"{tenant_id}:{user_id}:{self.encryption_key}"
else:
# Basic tenant isolation
key_data = f"{tenant_id}:{self.encryption_key}"
key_hash = hashlib.sha256(key_data.encode()).hexdigest()
return f"tenant:{tenant_id}:memory:{key_hash}"
def get_tenant_usage_report(self, tenant_id: str) -> Dict[str, Any]:
"""Generate usage report for specific tenant"""
if tenant_id not in self.tenant_configurations:
raise ValueError(f"Tenant {tenant_id} not found")
try:
# Get all tenant keys
pattern = f"tenant:{tenant_id}:*"
tenant_keys = self.redis_client.keys(pattern)
# Calculate memory usage
total_memory = 0
conversation_count = 0
for key in tenant_keys:
memory_info = self.redis_client.memory_usage(key)
if memory_info:
total_memory += memory_info
if b"conversations" in key:
conversation_count += 1
# Get usage metrics
metrics_key = f"tenant:{tenant_id}:metrics"
metrics_data = self.redis_client.get(metrics_key)
metrics = json.loads(metrics_data) if metrics_data else {}
return {
"tenant_id": tenant_id,
"memory_usage_bytes": total_memory,
"memory_usage_mb": total_memory / (1024 * 1024),
"conversation_count": conversation_count,
"quota_usage": (total_memory / (1024 * 1024)) / self.tenant_configurations[tenant_id]["memory_quota"],
"metrics": metrics,
"generated_at": datetime.now().isoformat()
}
except Exception as e:
return {"error": f"Failed to generate usage report: {str(e)}"}
class TenantIsolatedMemory(ConversationBufferMemory):
"""Conversation memory with complete tenant isolation and encryption"""
def __init__(self, redis_client: redis.Redis, memory_key: str, tenant_id: str,
agent_id: str, user_id: str, encryption_key: str, tenant_config: Dict[str, Any]):
super().__init__(return_messages=True)
self.redis_client = redis_client
self.memory_key = memory_key
self.tenant_id = tenant_id
self.agent_id = agent_id
self.user_id = user_id
self.encryption_key = encryption_key
self.tenant_config = tenant_config
self.audit_logger = TenantAuditLogger(redis_client, tenant_id) if tenant_config["audit_enabled"] else None
# Load existing conversation
self._load_tenant_conversation()
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context with tenant isolation and auditing"""
# Save to local memory
super().save_context(inputs, outputs)
# Audit logging
if self.audit_logger:
self.audit_logger.log_memory_operation(
"save_context",
{
"agent_id": self.agent_id,
"user_id": self.user_id,
"inputs_keys": list(inputs.keys()),
"outputs_keys": list(outputs.keys())
}
)
# Persist with encryption
self._persist_encrypted_conversation()
# Update usage metrics
self._update_tenant_metrics()
def _load_tenant_conversation(self):
"""Load conversation with tenant isolation validation"""
try:
encrypted_data = self.redis_client.get(self.memory_key)
if not encrypted_data:
return
# Decrypt and load conversation
conversation_data = self._decrypt_conversation_data(encrypted_data)
# Validate tenant ownership
if conversation_data.get("tenant_id") != self.tenant_id:
raise SecurityError(f"Tenant mismatch: expected {self.tenant_id}, got {conversation_data.get('tenant_id')}")
# Reconstruct messages
self.chat_memory.messages = []
for msg_data in conversation_data.get("messages", []):
if msg_data["type"] == "HumanMessage":
message = HumanMessage(content=msg_data["content"])
elif msg_data["type"] == "AIMessage":
message = AIMessage(content=msg_data["content"])
else:
continue
self.chat_memory.messages.append(message)
if self.audit_logger:
self.audit_logger.log_memory_operation(
"load_conversation",
{"messages_loaded": len(self.chat_memory.messages)}
)
except Exception as e:
print(f"Failed to load tenant conversation: {e}")
if self.audit_logger:
self.audit_logger.log_memory_operation("load_error", {"error": str(e)})
def _persist_encrypted_conversation(self):
"""Persist conversation with encryption and tenant metadata"""
try:
messages_data = []
for message in self.chat_memory.messages:
messages_data.append({
"type": type(message).__name__,
"content": message.content,
"timestamp": datetime.now().isoformat()
})
conversation_data = {
"tenant_id": self.tenant_id,
"agent_id": self.agent_id,
"user_id": self.user_id,
"messages": messages_data,
"last_update": datetime.now().isoformat(),
"encryption_version": "1.0"
}
# Encrypt conversation data
encrypted_data = self._encrypt_conversation_data(conversation_data)
# Set with tenant-specific TTL
ttl_days = self.tenant_config.get("retention_days", 30)
self.redis_client.setex(
self.memory_key,
timedelta(days=ttl_days),
encrypted_data
)
except Exception as e:
if self.audit_logger:
self.audit_logger.log_memory_operation("persist_error", {"error": str(e)})
raise e
def _encrypt_conversation_data(self, data: Dict[str, Any]) -> bytes:
"""Encrypt conversation data using tenant-specific encryption"""
if not self.tenant_config.get("encryption_enabled", True):
return json.dumps(data).encode()
# Simplified encryption (use proper encryption in production)
from cryptography.fernet import Fernet
key_hash = hashlib.sha256(f"{self.encryption_key}:{self.tenant_id}".encode()).digest()
fernet_key = base64.urlsafe_b64encode(key_hash)
fernet = Fernet(fernet_key)
data_bytes = json.dumps(data).encode()
return fernet.encrypt(data_bytes)
def _decrypt_conversation_data(self, encrypted_data: bytes) -> Dict[str, Any]:
"""Decrypt conversation data using tenant-specific decryption"""
if not self.tenant_config.get("encryption_enabled", True):
return json.loads(encrypted_data.decode())
# Simplified decryption (use proper encryption in production)
from cryptography.fernet import Fernet
key_hash = hashlib.sha256(f"{self.encryption_key}:{self.tenant_id}".encode()).digest()
fernet_key = base64.urlsafe_b64encode(key_hash)
fernet = Fernet(fernet_key)
decrypted_bytes = fernet.decrypt(encrypted_data)
return json.loads(decrypted_bytes.decode())
def _update_tenant_metrics(self):
"""Update tenant-specific usage metrics"""
try:
metrics_key = f"tenant:{self.tenant_id}:metrics"
# Get current metrics
current_metrics = self.redis_client.get(metrics_key)
metrics = json.loads(current_metrics) if current_metrics else {}
# Update metrics
metrics.update({
"last_activity": datetime.now().isoformat(),
"total_messages": metrics.get("total_messages", 0) + 1,
"active_conversations": metrics.get("active_conversations", 0) + (1 if len(self.chat_memory.messages) == 1 else 0)
})
# Save updated metrics
self.redis_client.set(metrics_key, json.dumps(metrics))
except Exception as e:
print(f"Failed to update tenant metrics: {e}")
class TenantAuditLogger:
"""Audit logging system for tenant memory operations"""
def __init__(self, redis_client: redis.Redis, tenant_id: str):
self.redis_client = redis_client
self.tenant_id = tenant_id
self.audit_key = f"tenant:{tenant_id}:audit"
def log_memory_operation(self, operation: str, details: Dict[str, Any]):
"""Log memory operation for compliance and monitoring"""
audit_entry = {
"timestamp": datetime.now().isoformat(),
"operation": operation,
"details": details,
"tenant_id": self.tenant_id
}
# Add to audit log list
self.redis_client.lpush(
self.audit_key,
json.dumps(audit_entry)
)
# Keep only last 1000 entries per tenant
self.redis_client.ltrim(self.audit_key, 0, 999)
Compliance and Audit-Ready Memory Systems¶
Enterprise deployments must support comprehensive audit trails, data retention policies, and compliance reporting for regulatory requirements.
Comprehensive Audit and Compliance Framework¶
Implement enterprise-grade audit trails with compliance reporting:
from dataclasses import dataclass
from typing import Union
import base64
@dataclass
class AuditEvent:
event_id: str
timestamp: str
tenant_id: str
user_id: str
agent_id: str
event_type: str
event_data: Dict[str, Any]
compliance_tags: List[str]
retention_policy: str
class ComplianceMemoryManager:
"""Memory manager with comprehensive compliance and audit capabilities"""
def __init__(self, redis_client: redis.Redis, compliance_config: Dict[str, Any]):
self.redis_client = redis_client
self.compliance_config = compliance_config
self.audit_buffer = []
self.retention_policies = self._load_retention_policies()
def _load_retention_policies(self) -> Dict[str, Dict[str, Any]]:
"""Load data retention policies for different compliance frameworks"""
return {
"GDPR": {
"default_retention_days": 365,
"deletion_on_request": True,
"data_portability": True,
"consent_required": True
},
"HIPAA": {
"default_retention_days": 2555, # 7 years
"encryption_required": True,
"access_logging": True,
"data_integrity": True
},
"SOX": {
"default_retention_days": 2555, # 7 years
"immutable_audit": True,
"access_controls": True,
"change_tracking": True
},
"PCI_DSS": {
"default_retention_days": 365,
"encryption_required": True,
"access_monitoring": True,
"data_masking": True
}
}
def create_compliant_memory(self, tenant_id: str, user_id: str, agent_id: str,
compliance_frameworks: List[str]) -> 'ComplianceAwareMemory':
"""Create memory instance with compliance controls"""
# Determine strictest retention policy
retention_days = max(
self.retention_policies[framework]["default_retention_days"]
for framework in compliance_frameworks
if framework in self.retention_policies
)
# Determine required compliance features
compliance_features = {
"encryption_required": any(
self.retention_policies.get(framework, {}).get("encryption_required", False)
for framework in compliance_frameworks
),
"access_logging": any(
self.retention_policies.get(framework, {}).get("access_logging", False)
for framework in compliance_frameworks
),
"deletion_on_request": any(
self.retention_policies.get(framework, {}).get("deletion_on_request", False)
for framework in compliance_frameworks
),
"data_portability": any(
self.retention_policies.get(framework, {}).get("data_portability", False)
for framework in compliance_frameworks
)
}
memory_key = f"compliant:{tenant_id}:{user_id}:{agent_id}"
return ComplianceAwareMemory(
redis_client=self.redis_client,
memory_key=memory_key,
tenant_id=tenant_id,
user_id=user_id,
agent_id=agent_id,
compliance_frameworks=compliance_frameworks,
compliance_features=compliance_features,
retention_days=retention_days,
audit_manager=self
)
def record_audit_event(self, event: AuditEvent):
"""Record audit event with compliance metadata"""
audit_key = f"audit:{event.tenant_id}:{datetime.now().strftime('%Y-%m')}"
audit_record = {
"event_id": event.event_id,
"timestamp": event.timestamp,
"tenant_id": event.tenant_id,
"user_id": event.user_id,
"agent_id": event.agent_id,
"event_type": event.event_type,
"event_data": event.event_data,
"compliance_tags": event.compliance_tags,
"retention_policy": event.retention_policy,
"recorded_at": datetime.now().isoformat()
}
# Store audit record
self.redis_client.lpush(audit_key, json.dumps(audit_record))
# Set retention based on compliance requirements
if event.retention_policy in self.retention_policies:
retention_days = self.retention_policies[event.retention_policy]["default_retention_days"]
self.redis_client.expire(audit_key, timedelta(days=retention_days))
def generate_compliance_report(self, tenant_id: str, frameworks: List[str],
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Generate comprehensive compliance report"""
report = {
"tenant_id": tenant_id,
"frameworks": frameworks,
"report_period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
},
"generated_at": datetime.now().isoformat(),
"compliance_status": {},
"audit_summary": {},
"recommendations": []
}
# Analyze compliance for each framework
for framework in frameworks:
if framework not in self.retention_policies:
continue
framework_status = self._analyze_framework_compliance(
tenant_id, framework, start_date, end_date
)
report["compliance_status"][framework] = framework_status
# Generate audit summary
report["audit_summary"] = self._generate_audit_summary(tenant_id, start_date, end_date)
# Generate recommendations
report["recommendations"] = self._generate_compliance_recommendations(report)
return report
def _analyze_framework_compliance(self, tenant_id: str, framework: str,
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Analyze compliance status for specific framework"""
policy = self.retention_policies[framework]
# Get audit events for period
audit_events = self._get_audit_events_for_period(tenant_id, start_date, end_date)
analysis = {
"framework": framework,
"total_events": len(audit_events),
"compliance_score": 0.0,
"violations": [],
"requirements_met": [],
"data_retention_status": "compliant"
}
# Check specific requirements
if policy.get("encryption_required"):
encryption_events = [e for e in audit_events if "encryption" in e.get("compliance_tags", [])]
analysis["requirements_met"].append(f"Encryption: {len(encryption_events)} events")
if policy.get("access_logging"):
access_events = [e for e in audit_events if e.get("event_type") in ["memory_access", "data_retrieval"]]
analysis["requirements_met"].append(f"Access logging: {len(access_events)} events")
# Calculate compliance score (simplified)
analysis["compliance_score"] = min(1.0, len(analysis["requirements_met"]) / 3.0)
return analysis
def _get_audit_events_for_period(self, tenant_id: str, start_date: datetime, end_date: datetime) -> List[Dict]:
"""Retrieve audit events for specified period"""
events = []
# Get audit keys for the period (month-based)
current_date = start_date.replace(day=1) # Start of month
while current_date <= end_date:
audit_key = f"audit:{tenant_id}:{current_date.strftime('%Y-%m')}"
# Get all events for the month
raw_events = self.redis_client.lrange(audit_key, 0, -1)
for raw_event in raw_events:
try:
event = json.loads(raw_event)
event_timestamp = datetime.fromisoformat(event["timestamp"])
if start_date <= event_timestamp <= end_date:
events.append(event)
except:
continue
# Move to next month
if current_date.month == 12:
current_date = current_date.replace(year=current_date.year + 1, month=1)
else:
current_date = current_date.replace(month=current_date.month + 1)
return events
def _generate_audit_summary(self, tenant_id: str, start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Generate audit trail summary"""
events = self._get_audit_events_for_period(tenant_id, start_date, end_date)
# Categorize events
event_categories = {}
for event in events:
category = event.get("event_type", "unknown")
event_categories[category] = event_categories.get(category, 0) + 1
return {
"total_events": len(events),
"event_categories": event_categories,
"unique_users": len(set(e.get("user_id") for e in events if e.get("user_id"))),
"unique_agents": len(set(e.get("agent_id") for e in events if e.get("agent_id"))),
"period_coverage": f"{start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}"
}
def _generate_compliance_recommendations(self, report: Dict[str, Any]) -> List[str]:
"""Generate compliance improvement recommendations"""
recommendations = []
for framework, status in report["compliance_status"].items():
if status["compliance_score"] < 0.8:
recommendations.append(f"Improve {framework} compliance score (currently {status['compliance_score']:.1%})")
if status.get("violations"):
recommendations.append(f"Address {len(status['violations'])} {framework} violations")
if report["audit_summary"]["total_events"] == 0:
recommendations.append("Enable comprehensive audit logging for all memory operations")
return recommendations
class ComplianceAwareMemory(ConversationBufferMemory):
"""Memory with comprehensive compliance controls and audit trails"""
def __init__(self, redis_client: redis.Redis, memory_key: str, tenant_id: str,
user_id: str, agent_id: str, compliance_frameworks: List[str],
compliance_features: Dict[str, bool], retention_days: int,
audit_manager: ComplianceMemoryManager):
super().__init__(return_messages=True)
self.redis_client = redis_client
self.memory_key = memory_key
self.tenant_id = tenant_id
self.user_id = user_id
self.agent_id = agent_id
self.compliance_frameworks = compliance_frameworks
self.compliance_features = compliance_features
self.retention_days = retention_days
self.audit_manager = audit_manager
# Load existing memory
self._load_compliant_memory()
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context with comprehensive compliance controls"""
# Record audit event
audit_event = AuditEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.now().isoformat(),
tenant_id=self.tenant_id,
user_id=self.user_id,
agent_id=self.agent_id,
event_type="memory_save",
event_data={
"inputs_keys": list(inputs.keys()),
"outputs_keys": list(outputs.keys()),
"message_count": len(self.chat_memory.messages) + 1
},
compliance_tags=self.compliance_frameworks + ["memory_operation"],
retention_policy=self.compliance_frameworks[0] if self.compliance_frameworks else "default"
)
self.audit_manager.record_audit_event(audit_event)
# Save to local memory
super().save_context(inputs, outputs)
# Persist with compliance controls
self._persist_compliant_memory()
def _persist_compliant_memory(self):
"""Persist memory with compliance controls"""
conversation_data = {
"tenant_id": self.tenant_id,
"user_id": self.user_id,
"agent_id": self.agent_id,
"compliance_frameworks": self.compliance_frameworks,
"messages": [],
"metadata": {
"last_update": datetime.now().isoformat(),
"retention_until": (datetime.now() + timedelta(days=self.retention_days)).isoformat(),
"compliance_version": "1.0"
}
}
# Add messages with compliance metadata
for message in self.chat_memory.messages:
message_data = {
"type": type(message).__name__,
"content": message.content,
"timestamp": datetime.now().isoformat(),
"compliance_tags": self.compliance_frameworks
}
# Apply data masking if required
if self.compliance_features.get("data_masking"):
message_data["content"] = self._apply_data_masking(message_data["content"])
conversation_data["messages"].append(message_data)
# Encrypt if required
if self.compliance_features.get("encryption_required"):
serialized_data = self._encrypt_compliant_data(conversation_data)
else:
serialized_data = json.dumps(conversation_data).encode()
# Store with compliance retention
self.redis_client.setex(
self.memory_key,
timedelta(days=self.retention_days),
serialized_data
)
def _apply_data_masking(self, content: str) -> str:
"""Apply data masking for sensitive information"""
import re
# Mask common sensitive patterns
# Email addresses
content = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '***@***.***', content)
# Phone numbers (simplified)
content = re.sub(r'\b\d{3}-\d{3}-\d{4}\b', '***-***-****', content)
content = re.sub(r'\b\(\d{3}\)\s*\d{3}-\d{4}\b', '(***) ***-****', content)
# SSN (simplified)
content = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '***-**-****', content)
return content
def _encrypt_compliant_data(self, data: Dict[str, Any]) -> bytes:
"""Encrypt data according to compliance requirements"""
# Use enterprise-grade encryption in production
from cryptography.fernet import Fernet
import base64
# Generate compliance-specific encryption key
key_material = f"{self.tenant_id}:{':'.join(self.compliance_frameworks)}:memory_encryption"
key_hash = hashlib.sha256(key_material.encode()).digest()
encryption_key = base64.urlsafe_b64encode(key_hash)
fernet = Fernet(encryption_key)
data_bytes = json.dumps(data).encode()
return fernet.encrypt(data_bytes)
def request_data_deletion(self, user_consent: bool = False) -> Dict[str, Any]:
"""Handle data deletion request (GDPR right to be forgotten)"""
if not self.compliance_features.get("deletion_on_request"):
return {
"status": "denied",
"reason": "Data deletion not permitted under current compliance framework"
}
if not user_consent:
return {
"status": "consent_required",
"reason": "User consent required for data deletion"
}
try:
# Record deletion request
audit_event = AuditEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.now().isoformat(),
tenant_id=self.tenant_id,
user_id=self.user_id,
agent_id=self.agent_id,
event_type="data_deletion_request",
event_data={"consent_provided": user_consent},
compliance_tags=self.compliance_frameworks + ["data_deletion"],
retention_policy="GDPR"
)
self.audit_manager.record_audit_event(audit_event)
# Delete memory data
self.redis_client.delete(self.memory_key)
# Clear local memory
self.chat_memory.messages = []
return {
"status": "completed",
"deleted_at": datetime.now().isoformat(),
"confirmation_id": audit_event.event_id
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def export_data_for_portability(self) -> Dict[str, Any]:
"""Export user data for portability (GDPR)"""
if not self.compliance_features.get("data_portability"):
return {
"status": "denied",
"reason": "Data portability not supported under current compliance framework"
}
# Record data export request
audit_event = AuditEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.now().isoformat(),
tenant_id=self.tenant_id,
user_id=self.user_id,
agent_id=self.agent_id,
event_type="data_export_request",
event_data={"export_format": "json"},
compliance_tags=self.compliance_frameworks + ["data_portability"],
retention_policy="GDPR"
)
self.audit_manager.record_audit_event(audit_event)
# Export conversation data
export_data = {
"export_metadata": {
"tenant_id": self.tenant_id,
"user_id": self.user_id,
"agent_id": self.agent_id,
"exported_at": datetime.now().isoformat(),
"compliance_frameworks": self.compliance_frameworks,
"export_id": audit_event.event_id
},
"conversations": []
}
for message in self.chat_memory.messages:
export_data["conversations"].append({
"message_type": type(message).__name__,
"content": message.content,
"estimated_timestamp": datetime.now().isoformat() # Simplified
})
return {
"status": "completed",
"data": export_data,
"format": "json"
}
🎯📝 Prerequisites Review¶
Before implementing production memory systems, ensure you have solid understanding of:
Foundation Knowledge:
- 🎯 LangChain Memory Basics - Memory types and basic configuration
- 📝 Production Implementation - Practical memory management patterns
⚙️ Continue Advanced Learning¶
Explore complementary advanced topics:
Related Advanced Modules:
- ⚙️ Advanced Agent Architecture - Sophisticated orchestration patterns
- ⚙️ Enterprise Tool Development - Custom integrations and specialized capabilities
Legacy Advanced Modules:
- Advanced LangChain Patterns - Complex workflows & optimization
- Production Deployment Strategies - Enterprise deployment & monitoring
🧭 Navigation¶
Previous: Session 1 - Foundations →
Next: Session 3 - Advanced Patterns →