Skip to content

Session 1 - Module C: Complex State Management (30 minutes)

Prerequisites: Session 1 Core Section Complete
Target Audience: Developers building stateful agent systems
Cognitive Load: 4 state management concepts


Module Overview

This module explores sophisticated state management patterns for bare metal agents, including conversation memory, agent state persistence, and dynamic context management. You'll learn how to build agents that maintain complex state across interactions and sessions.

Learning Objectives

By the end of this module, you will:

  • Implement conversation memory systems that scale with long interactions
  • Build agent state persistence for session continuity and recovery
  • Design dynamic context management for adaptive agent behavior
  • Create state synchronization patterns for multi-agent systems

Part 1: Conversation Memory Systems (12 minutes)

Advanced Memory Architecture

🗂️ File: src/session1/conversation_memory.py - Advanced memory management systems

Building on the memory optimization patterns, sophisticated conversation memory requires hierarchical storage, semantic indexing, and intelligent retrieval. The foundation is a structured memory system with priority levels:

from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
import sqlite3
from enum import Enum
import numpy as np
from sentence_transformers import SentenceTransformer

class MemoryPriority(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

The priority system enables intelligent memory management by distinguishing between casual conversation and critical information that must be preserved.

@dataclass
class ConversationMemory:
    id: str
    content: str
    timestamp: datetime

The ConversationMemory structure forms the core unit of the memory system, containing both the content and essential metadata for retrieval and management:

    priority: MemoryPriority
    context_tags: List[str] = field(default_factory=list)
    embedding: Optional[np.ndarray] = None
    access_count: int = 0
    last_accessed: Optional[datetime] = None
    related_memories: List[str] = field(default_factory=list)

Extended fields track usage patterns (access_count, last_accessed) and relationships (related_memories), enabling sophisticated memory management based on actual usage patterns.

class HierarchicalMemoryAgent(BaseAgent):
    """Agent with hierarchical memory management and semantic retrieval"""

    def __init__(self, name: str, llm_client, memory_db_path: str = "agent_memory.db"):
        super().__init__(name, "Hierarchical memory agent", llm_client)
        self.memory_db_path = memory_db_path
        self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
        self.working_memory: List[ConversationMemory] = []
        self.working_memory_limit = 20
        self._init_memory_database()

The agent initialization sets up both semantic embeddings (via SentenceTransformer) and persistent storage (via SQLite database), creating a dual-layer memory system.

### Semantic Memory Retrieval

The memory storage system combines semantic embeddings with hierarchical organization:

```python
def store_memory(self, content: str, priority: MemoryPriority = MemoryPriority.MEDIUM, 
                context_tags: List[str] = None) -> str:
    """Store memory with semantic embedding and hierarchy"""

    memory_id = f"mem_{datetime.now().timestamp()}"

    # Generate semantic embedding
    embedding = self.embedder.encode(content)

    # Create memory object
    memory = ConversationMemory(
        id=memory_id,
        content=content,
        timestamp=datetime.now(),
        priority=priority,
        context_tags=context_tags or [],
        embedding=embedding
    )

Each memory gets a unique ID and semantic embedding for similarity search. Context tags provide additional metadata for more sophisticated retrieval patterns.

    # Add to working memory
    self.working_memory.append(memory)

    # Manage working memory size
    if len(self.working_memory) > self.working_memory_limit:
        self._consolidate_to_long_term()

    # Store in persistent database
    self._store_in_database(memory)

    return memory_id

The dual storage approach keeps recent memories in fast working memory while archiving older memories to persistent storage, balancing performance with long-term retention.

def retrieve_relevant_memories(self, query: str, limit: int = 5) -> List[ConversationMemory]:
    """Retrieve memories using semantic similarity and priority"""

    # Generate query embedding
    query_embedding = self.embedder.encode(query)

    # Search working memory first
    working_candidates = []
    for memory in self.working_memory:
        if memory.embedding is not None:
            similarity = np.dot(query_embedding, memory.embedding) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(memory.embedding)
            )
            working_candidates.append((memory, similarity))

Retrieval starts with working memory using cosine similarity between the query and stored embeddings. This provides semantically relevant matches rather than just keyword matching.

    # Search long-term memory
    long_term_candidates = self._search_long_term_memory(query_embedding, limit * 2)

    # Combine and rank by relevance and priority
    all_candidates = working_candidates + long_term_candidates

    # Score combining similarity and priority
    scored_candidates = []
    for memory, similarity in all_candidates:
        priority_weight = memory.priority.value / 4.0  # Normalize to 0-1
        recency_weight = self._calculate_recency_weight(memory.timestamp)

The scoring system combines semantic similarity with priority and recency weights, ensuring important recent information surfaces even if it's not the most semantically similar match.

        access_weight = min(memory.access_count / 10.0, 1.0)  # Normalize access frequency

        combined_score = (
            similarity * 0.5 + 
            priority_weight * 0.3 + 
            recency_weight * 0.1 + 
            access_weight * 0.1
        )

        scored_candidates.append((memory, combined_score))

The access weight component rewards frequently referenced memories, creating a usage-based relevance boost that helps surface commonly needed information.

    # Sort by combined score and return top results
    scored_candidates.sort(key=lambda x: x[1], reverse=True)

    # Update access statistics
    relevant_memories = [mem for mem, score in scored_candidates[:limit]]
    for memory in relevant_memories:
        memory.access_count += 1
        memory.last_accessed = datetime.now()
        self._update_memory_access(memory)

    return relevant_memories
def _consolidate_to_long_term(self):
    """Move older, less important memories from working to long-term storage"""

    # Sort working memory by importance and recency
    sorted_memories = sorted(
        self.working_memory,
        key=lambda m: (m.priority.value, m.timestamp),
        reverse=True
    )

    # Keep high-priority and recent memories in working memory
    to_keep = sorted_memories[:self.working_memory_limit // 2]
    to_archive = sorted_memories[self.working_memory_limit // 2:]

The consolidation process preserves the most important and recent memories in fast working memory while archiving older memories to persistent storage.

    # Archive to long-term storage
    for memory in to_archive:
        self._archive_to_long_term(memory)

    # Update working memory
    self.working_memory = to_keep

    self.logger.info(f"Consolidated {len(to_archive)} memories to long-term storage")

Memory Context Integration

Context integration connects retrieved memories with current interactions for coherent, memory-aware responses:

def generate_contextual_response(self, message: str) -> str:
    """Generate response using relevant memory context"""

    # Retrieve relevant memories
    relevant_memories = self.retrieve_relevant_memories(message, limit=3)

    # Build context from memories
    memory_context = self._build_memory_context(relevant_memories)

The system retrieves the most relevant memories and formats them into coherent context that can be seamlessly integrated into the response generation prompt.

    # Generate response with memory-enhanced prompt
    enhanced_prompt = f"""
    Current message: {message}

    Relevant conversation history and context:
    {memory_context}

    Respond naturally, incorporating relevant context from the conversation history.
    Be specific about details you remember and acknowledge past interactions when relevant.
    """

    response = await self._call_llm(enhanced_prompt)

The enhanced prompt explicitly instructs the LLM to use the provided context naturally, creating responses that acknowledge and build upon previous interactions rather than treating each message in isolation.

    # Store this interaction
    interaction_content = f"User: {message}\nAgent: {response}"
    self.store_memory(
        content=interaction_content,
        priority=MemoryPriority.MEDIUM,
        context_tags=["conversation", "interaction"]
    )

    return response

Each interaction is automatically stored with medium priority and appropriate tags, building the memory foundation for future context retrieval.

def _build_memory_context(self, memories: List[ConversationMemory]) -> str:
    """Build readable context from retrieved memories"""
    if not memories:
        return "No relevant conversation history found."

    context_parts = []

The context building process formats retrieved memories into coherent, readable context that can be seamlessly integrated into LLM prompts.

    for i, memory in enumerate(memories, 1):
        timestamp = memory.timestamp.strftime("%Y-%m-%d %H:%M")
        tags = ", ".join(memory.context_tags) if memory.context_tags else "general"

        context_parts.append(f"""
        Memory {i} ({timestamp}) - Context: {tags}
        {memory.content}
        """)

    return "\n".join(context_parts)

Each memory is formatted with a timestamp and context tags, providing clear temporal and categorical information that helps the LLM understand when and in what context the information was captured.


Part 2: Agent State Persistence (10 minutes)

Persistent State Architecture

🗂️ File: src/session1/persistent_state.py - Agent state persistence systems

import pickle
import json
from pathlib import Path
from typing import Any, Dict, Optional
from dataclasses import dataclass, asdict
from datetime import datetime

The imports establish the foundation for state persistence, combining JSON serialization for human-readable storage with dataclasses for structured state management.

@dataclass
class AgentState:
    agent_id: str
    session_id: str
    timestamp: datetime
    conversation_history: List[Dict[str, Any]]
    active_goals: List[str]
    completed_tasks: List[Dict[str, Any]]
    learned_preferences: Dict[str, Any]
    tool_usage_stats: Dict[str, int]
    performance_metrics: Dict[str, float]
    custom_attributes: Dict[str, Any]

The AgentState dataclass provides a comprehensive structure for capturing all aspects of agent state, from conversation history to performance metrics and learned preferences.

class PersistentStateAgent(BaseAgent):
    """Agent with automatic state persistence and recovery"""

    def __init__(self, name: str, llm_client, state_dir: str = "agent_states"):
        super().__init__(name, "Persistent state agent", llm_client)
        self.state_dir = Path(state_dir)
        self.state_dir.mkdir(exist_ok=True)
        self.session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        self.state: Optional[AgentState] = None
        self.auto_save_interval = 10  # Save every N interactions
        self.interaction_count = 0

Agent initialization creates a dedicated state directory and generates a unique session ID. The auto-save interval ensures regular persistence without manual intervention.

    def initialize_or_restore_state(self, agent_id: str) -> AgentState:
        """Initialize new state or restore from previous session"""

        state_file = self.state_dir / f"{agent_id}_state.json"

        if state_file.exists():
            # Restore previous state
            try:
                with open(state_file, 'r') as f:
                    state_data = json.load(f)

                # Convert timestamp back to datetime
                state_data['timestamp'] = datetime.fromisoformat(state_data['timestamp'])

                self.state = AgentState(**state_data)
                self.state.session_id = self.session_id  # Update to current session

                self.logger.info(f"Restored state for agent {agent_id} from {state_file}")
                return self.state

State restoration first attempts to load existing state from the JSON file, converting timestamps back to datetime objects and updating the session ID to the current session.

            except Exception as e:
                self.logger.error(f"Failed to restore state: {e}")
                # Fall through to create new state

        # Create new state
        self.state = AgentState(
            agent_id=agent_id,
            session_id=self.session_id,
            timestamp=datetime.now(),
            conversation_history=[],
            active_goals=[],
            completed_tasks=[],
            learned_preferences={},
            tool_usage_stats={},
            performance_metrics={},
            custom_attributes={}
        )

        self.logger.info(f"Created new state for agent {agent_id}")
        return self.state

If state restoration fails or no previous state exists, the system creates a new AgentState with empty collections, ensuring the agent can always start cleanly.

    def save_state(self, force: bool = False) -> bool:
        """Save current state to persistent storage"""

        if not self.state:
            return False

        try:
            # Update timestamp
            self.state.timestamp = datetime.now()

            # Convert to dictionary for JSON serialization
            state_dict = asdict(self.state)
            state_dict['timestamp'] = self.state.timestamp.isoformat()

            # Save to file
            state_file = self.state_dir / f"{self.state.agent_id}_state.json"
            with open(state_file, 'w') as f:
                json.dump(state_dict, f, indent=2)

State saving converts the dataclass to a dictionary and handles datetime serialization by converting to ISO format. The main state file is written with human-readable JSON formatting.

            # Create backup
            backup_file = self.state_dir / f"{self.state.agent_id}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
            with open(backup_file, 'w') as f:
                json.dump(state_dict, f, indent=2)

            # Clean old backups (keep last 5)
            self._clean_old_backups(self.state.agent_id)

            self.logger.info(f"Saved state to {state_file}")
            return True

        except Exception as e:
            self.logger.error(f"Failed to save state: {e}")
            return False

State-Aware Processing

async def process_message_with_state(self, message: str, goals: List[str] = None) -> Dict[str, Any]:
    """Process message while maintaining and updating state"""

    if not self.state:
        raise ValueError("Agent state not initialized")

    # Update active goals if provided
    if goals:
        self.state.active_goals.extend(goals)
        self.state.active_goals = list(set(self.state.active_goals))  # Remove duplicates

    # Add to conversation history
    self.state.conversation_history.append({
        "timestamp": datetime.now().isoformat(),
        "type": "user_message",
        "content": message,
        "session_id": self.session_id
    })

The method begins by validating the agent state and updating active goals. Each user message is recorded in the conversation history with a timestamp and session ID for complete interaction tracking.

    # Generate response with state context
    state_context = self._build_state_context()
    enhanced_prompt = f"""
    User message: {message}

    Agent state context:
    {state_context}

    Respond appropriately, considering your ongoing goals and conversation history.
    """

    response = await self._call_llm(enhanced_prompt)

The system builds state context from current agent information and incorporates it into the LLM prompt, ensuring responses are informed by goals, conversation history, and learned preferences.

    # Update state with response
    self.state.conversation_history.append({
        "timestamp": datetime.now().isoformat(),
        "type": "agent_response", 
        "content": response,
        "session_id": self.session_id
    })

    # Check for goal completion
    completed_goals = self._check_goal_completion(message, response)
    for goal in completed_goals:
        if goal in self.state.active_goals:
            self.state.active_goals.remove(goal)
            self.state.completed_tasks.append({
                "goal": goal,
                "completed_at": datetime.now().isoformat(),
                "session_id": self.session_id
            })

After generating the response, the system records it in conversation history and checks for goal completion. Completed goals are moved from active to completed status with timestamps for tracking progress.

    # Auto-save state periodically
    self.interaction_count += 1
    if self.interaction_count % self.auto_save_interval == 0:
        self.save_state()

    return {
        "response": response,
        "active_goals": self.state.active_goals.copy(),
        "completed_goals": completed_goals,
        "session_id": self.session_id,
        "state_saved": self.interaction_count % self.auto_save_interval == 0
    }

The method automatically saves state at configured intervals and returns comprehensive information about the interaction, including response, goal status, and whether state was persisted.

def _build_state_context(self) -> str:
    """Build readable context from current agent state"""
    context_parts = []

    # Active goals
    if self.state.active_goals:
        context_parts.append(f"Active goals: {', '.join(self.state.active_goals)}")

    # Recent conversation
    recent_history = self.state.conversation_history[-6:]  # Last 3 exchanges
    if recent_history:
        context_parts.append("Recent conversation:")
        for entry in recent_history:
            context_parts.append(f"  {entry['type']}: {entry['content'][:100]}...")

The context building process creates readable summaries of agent state, including current goals and recent conversation snippets truncated for prompt efficiency.

    # Completed tasks
    if self.state.completed_tasks:
        recent_tasks = self.state.completed_tasks[-3:]
        context_parts.append(f"Recently completed: {[task['goal'] for task in recent_tasks]}")

    # Learned preferences
    if self.state.learned_preferences:
        context_parts.append(f"Learned preferences: {self.state.learned_preferences}")

    return "\n".join(context_parts)

Part 3: Dynamic Context Management (8 minutes)

Adaptive Context Systems

🗂️ File: src/session1/dynamic_context.py - Dynamic context adaptation

from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import asyncio

class ContextScope(Enum):
    IMMEDIATE = "immediate"      # Current interaction
    SESSION = "session"          # Current session
    HISTORICAL = "historical"    # Across sessions
    DOMAIN = "domain"           # Subject/domain specific

The ContextScope enum defines four different levels of context activation, allowing the system to intelligently decide when context is relevant based on temporal and semantic boundaries.

@dataclass
class ContextLayer:
    scope: ContextScope
    priority: int
    content: Dict[str, Any]
    expiry_time: Optional[datetime] = None
    activation_conditions: List[str] = None

Each ContextLayer contains both content and metadata for intelligent activation. The activation_conditions list contains keywords or patterns that trigger this layer's inclusion in the active context.

class DynamicContextAgent(BaseAgent):
    """Agent with dynamic, multi-layered context management"""

    def __init__(self, name: str, llm_client):
        super().__init__(name, "Dynamic context agent", llm_client)
        self.context_layers: Dict[str, ContextLayer] = {}
        self.context_activation_rules: Dict[str, Callable] = {}
        self.context_history: List[Dict[str, Any]] = []

The agent initialization creates storage for context layers and tracks activation history, enabling learning from past context usage patterns.

    def add_context_layer(self, layer_id: str, scope: ContextScope, 
                         content: Dict[str, Any], priority: int = 1,
                         expiry_time: Optional[datetime] = None,
                         activation_conditions: List[str] = None) -> str:
        """Add a dynamic context layer"""

        layer = ContextLayer(
            scope=scope,
            priority=priority,
            content=content,
            expiry_time=expiry_time,
            activation_conditions=activation_conditions or []
        )

        self.context_layers[layer_id] = layer

        self.logger.info(f"Added context layer: {layer_id} (scope: {scope.value}, priority: {priority})")
        return layer_id

The add_context_layer method provides a clean interface for registering new context with expiration times and activation conditions, enabling dynamic context management.

    def activate_context_dynamically(self, message: str, context_hints: List[str] = None) -> Dict[str, Any]:
        """Dynamically determine which context layers to activate"""

        activated_contexts = {}
        activation_log = []

        for layer_id, layer in self.context_layers.items():
            should_activate = self._should_activate_layer(layer, message, context_hints)

            if should_activate:
                # Check if layer has expired
                if layer.expiry_time and datetime.now() > layer.expiry_time:
                    self._remove_expired_layer(layer_id)
                    continue

For each context layer, the system first checks activation criteria, then validates expiration status to prevent using stale context.

                activated_contexts[layer_id] = layer.content
                activation_log.append({
                    "layer_id": layer_id,
                    "scope": layer.scope.value,
                    "priority": layer.priority,
                    "activation_reason": self._get_activation_reason(layer, message, context_hints)
                })

The activation process evaluates each layer for relevance, automatically removing expired contexts and building an activation log for transparency and debugging.

        # Sort by priority
        activation_log.sort(key=lambda x: x["priority"], reverse=True)

        return {
            "activated_contexts": activated_contexts,
            "activation_log": activation_log,
            "total_layers": len(activated_contexts)
        }

Returning both the activated contexts and the activation log provides transparency into why specific contexts were chosen, enabling system optimization and debugging.

    def _should_activate_layer(self, layer: ContextLayer, message: str, 
                              context_hints: List[str] = None) -> bool:
        """Determine if a context layer should be activated"""

        # Always activate immediate scope
        if layer.scope == ContextScope.IMMEDIATE:
            return True

        # Check activation conditions
        if layer.activation_conditions:
            for condition in layer.activation_conditions:
                if condition.lower() in message.lower():
                    return True
                if context_hints and condition in context_hints:
                    return True

The activation logic starts with immediate scope (always relevant) and then checks explicit activation conditions, including both message content and external context hints.

        # Check for domain-specific activation
        if layer.scope == ContextScope.DOMAIN:
            domain_keywords = layer.content.get("keywords", [])
            if any(keyword.lower() in message.lower() for keyword in domain_keywords):
                return True

        # Session scope - activate if session attributes match
        if layer.scope == ContextScope.SESSION:
            session_attributes = layer.content.get("session_attributes", {})
            current_session_data = self._get_current_session_data()

            for attr, value in session_attributes.items():
                if current_session_data.get(attr) == value:
                    return True

        return False

Domain and session scope activation uses semantic matching and session state comparison, enabling context-aware responses that adapt to current conversational and domain contexts.

async def process_with_dynamic_context(self, message: str, context_hints: List[str] = None) -> Dict[str, Any]:
    """Process message using dynamically activated context"""

    # Activate relevant context layers
    context_activation = self.activate_context_dynamically(message, context_hints)

    # Build context prompt
    context_prompt = self._build_dynamic_context_prompt(
        message, 
        context_activation["activated_contexts"],
        context_activation["activation_log"]
    )

    # Generate response
    response = await self._call_llm(context_prompt)

The processing method orchestrates context activation, prompt building, and response generation, creating a seamless flow from message input to context-aware output.

    # Update context based on interaction
    self._update_context_from_interaction(message, response, context_activation)

    # Record context usage
    self.context_history.append({
        "timestamp": datetime.now().isoformat(),
        "message": message,
        "activated_layers": list(context_activation["activated_contexts"].keys()),
        "response_length": len(response)
    })

    return {
        "response": response,
        "context_info": context_activation,
        "context_efficiency": self._calculate_context_efficiency(context_activation)
    }

After generating the response, the system updates context based on the interaction and records usage statistics, enabling learning and optimization of context activation patterns.

def _build_dynamic_context_prompt(self, message: str, activated_contexts: Dict[str, Any], 
                                 activation_log: List[Dict[str, Any]]) -> str:
    """Build prompt incorporating dynamically activated context"""

    prompt_parts = [f"User message: {message}"]

    if activated_contexts:
        prompt_parts.append("\nRelevant context (ordered by priority):")

        # Sort contexts by priority from activation log
        sorted_contexts = sorted(activation_log, key=lambda x: x["priority"], reverse=True)

The prompt building process starts with the user message and then adds activated contexts in priority order, ensuring the most important context appears first in the LLM prompt.

        for context_info in sorted_contexts:
            layer_id = context_info["layer_id"]
            context_data = activated_contexts[layer_id]

            prompt_parts.append(f"\n{context_info['scope'].title()} Context ({context_info['priority']} priority):")

            # Format context data based on type
            if isinstance(context_data, dict):
                for key, value in context_data.items():
                    if key != "keywords" and key != "session_attributes":  # Skip metadata
                        prompt_parts.append(f"  {key}: {value}")
            else:
                prompt_parts.append(f"  {context_data}")

    prompt_parts.append("\nRespond appropriately using the relevant context.")

    return "\n".join(prompt_parts)

📝 Multiple Choice Test - Module C

Test your understanding of complex state management concepts:

Question 1: What four pieces of information does the ConversationMemory dataclass track for intelligent memory management?
A) Content, timestamp, tags, and size
B) ID, content, timestamp, and priority
C) Content, priority, embedding, and context_tags
D) All of the above plus embedding and other metadata

Question 2: How does the semantic memory retrieval system determine relevance?
A) Keyword matching only
B) Cosine similarity between query and memory embeddings
C) Random selection from recent memories
D) Alphabetical ordering of content

Question 3: What is the purpose of the dual storage approach with working memory and long-term storage?
A) To save disk space
B) Balance performance with long-term retention
C) Reduce memory usage only
D) Simplify the codebase

Question 4: In the state persistence system, what triggers automatic state saving?
A) Only manual user commands
B) Fixed time intervals exclusively
C) Critical state changes and periodic intervals
D) When the application shuts down

Question 5: What determines which context layers are activated in dynamic context management?
A) Random selection
B) Only the most recent layer
C) Layer scope, message content, context hints, and activation conditions
D) User-specified preferences only

🗂️ View Test Solutions →


Module Summary

You've now mastered complex state management for bare metal agents:

Conversation Memory Systems: Built hierarchical memory with semantic retrieval and intelligent consolidation
Agent State Persistence: Implemented automatic state saving and recovery across sessions
Dynamic Context Management: Created adaptive context systems that activate relevant information dynamically
State Synchronization: Designed patterns for maintaining consistency across agent interactions

Related Modules:

🗂️ Code Files: All examples use files in src/session1/

  • conversation_memory.py - Hierarchical memory systems with semantic retrieval
  • persistent_state.py - Agent state persistence and recovery
  • dynamic_context.py - Dynamic context activation and management

🚀 Quick Start: Run cd src/session1 && python conversation_memory.py to see memory implementation