Skip to content

Session 1 - Module B: Performance Optimization (35 minutes)

Prerequisites: Session 1 Core Section Complete
Target Audience: Performance-focused developers
Cognitive Load: 5 optimization concepts


Module Overview

This module focuses on optimizing bare metal agents for performance, memory efficiency, and speed. You'll learn memory management patterns, tool execution optimization, and response time improvement strategies essential for production agent systems.

Learning Objectives

By the end of this module, you will: - Implement efficient memory usage patterns and conversation history optimization - Optimize tool execution speed through caching and parallel execution - Design strategies for faster agent responses and improved user experience - Build performance monitoring and analytics systems for continuous optimization


Part 1: Memory Management (15 minutes)

Efficient Memory Usage Patterns

🗂️ File: src/session1/memory_optimized_agent.py - Memory-efficient agent implementations

Memory management is crucial for long-running agent systems that need to maintain conversation context without memory leaks. The foundation is a structured approach to tracking memory usage:

from collections import deque
from typing import Dict, List, Any, Optional
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class MemoryEntry:
    content: str
    timestamp: datetime
    importance_score: float
    size_bytes: int

The MemoryEntry dataclass tracks not just content but also metadata essential for intelligent memory management - when it was created, how important it is, and its memory footprint.

class MemoryOptimizedAgent(BaseAgent):
    """Agent with intelligent memory management for long conversations"""

    def __init__(self, name: str, llm_client, max_memory_mb: float = 10.0):
        super().__init__(name, "Memory optimized agent", llm_client)
        self.max_memory_bytes = max_memory_mb * 1024 * 1024
        self.memory_entries: deque = deque(maxlen=1000)  # Hard limit

The agent uses a deque with a hard maximum to prevent unbounded memory growth, while the configurable memory limit allows fine-tuning based on deployment constraints.

        self.importance_threshold = 0.5
        self.cleanup_interval = 100  # Clean up every N messages
        self.message_count = 0

The additional configuration parameters control memory management behavior: importance_threshold determines the minimum significance level for keeping entries, cleanup_interval sets how frequently maintenance occurs, and message_count tracks when to trigger cleanup operations.

Conversation History Optimization

The memory addition process includes proactive cleanup to maintain performance:

def add_to_memory(self, content: str, importance_score: float = 0.5):
    """Add content to memory with intelligent sizing and cleanup"""

    # Calculate memory footprint
    content_size = sys.getsizeof(content)

    # Create memory entry
    entry = MemoryEntry(
        content=content,
        timestamp=datetime.now(),
        importance_score=importance_score,
        size_bytes=content_size
    )

Each entry tracks its actual memory footprint using sys.getsizeof(), enabling precise memory management decisions based on real usage rather than estimates.

Next, the method implements proactive memory management by checking cleanup intervals:

    # Check if cleanup is needed
    self.message_count += 1
    if self.message_count % self.cleanup_interval == 0:
        self._cleanup_memory()

Regular cleanup prevents memory from growing unchecked. The message counter triggers maintenance at predetermined intervals, ensuring consistent performance.

Finally, the method adds the entry and handles emergency cleanup situations:

    # Add entry
    self.memory_entries.append(entry)

    # Force cleanup if memory limit exceeded
    if self._get_total_memory_usage() > self.max_memory_bytes:
        self._aggressive_cleanup()

Regular cleanup prevents memory from growing unchecked, while emergency cleanup handles situations where memory limits are exceeded despite regular maintenance.

Intelligent Memory Cleanup

The cleanup method implements a sophisticated algorithm that balances importance and recency:

def _cleanup_memory(self):
    """Intelligent memory cleanup based on importance and age"""

    if not self.memory_entries:
        return

    # Convert to list for easier manipulation
    entries_list = list(self.memory_entries)

First, the method checks for empty memory and converts the deque to a list for sorting operations. This conversion is necessary because deques don't support the sorting operations required.

Next, the algorithm sorts entries using a composite key that considers both importance and chronological order:

    # Sort by importance and age (keep important recent items)
    entries_list.sort(key=lambda x: (x.importance_score, x.timestamp))

    # Keep top 70% of entries
    keep_count = int(len(entries_list) * 0.7)
    entries_to_keep = entries_list[-keep_count:]

The sorting prioritizes entries by both importance and age, ensuring critical recent information is preserved. Keeping 70% of entries provides a balance between memory conservation and information retention.

Finally, the method updates the memory structure and logs the cleanup operation:

    # Update deque
    self.memory_entries.clear()
    for entry in entries_to_keep:
        self.memory_entries.append(entry)

    self.logger.info(f"Memory cleanup: kept {len(entries_to_keep)} entries")

def _get_total_memory_usage(self) -> int:
    """Calculate total memory usage in bytes"""
    return sum(entry.size_bytes for entry in self.memory_entries)

Utility methods track memory consumption by summing the size_bytes field of all entries, providing accurate memory usage data for optimization decisions.

Comprehensive Memory Statistics

The statistics method provides detailed analytics for memory management optimization:

def get_memory_stats(self) -> Dict[str, Any]:
    """Get comprehensive memory usage statistics"""
    total_entries = len(self.memory_entries)
    total_bytes = self._get_total_memory_usage()

    if total_entries > 0:
        avg_importance = sum(e.importance_score for e in self.memory_entries) / total_entries
        oldest_entry = min(self.memory_entries, key=lambda x: x.timestamp)
        newest_entry = max(self.memory_entries, key=lambda x: x.timestamp)

The method starts by collecting basic metrics and identifying temporal boundaries. Average importance helps understand the quality of retained information.

Next, it calculates comprehensive utilization and temporal metrics:

        return {
            "total_entries": total_entries,
            "total_memory_mb": total_bytes / (1024 * 1024),
            "avg_importance_score": avg_importance,
            "memory_utilization_pct": (total_bytes / self.max_memory_bytes) * 100,
            "oldest_entry_age_hours": (datetime.now() - oldest_entry.timestamp).total_seconds() / 3600,
            "memory_span_hours": (newest_entry.timestamp - oldest_entry.timestamp).total_seconds() / 3600
        }

    return {"total_entries": 0, "total_memory_mb": 0}

The statistics method provides comprehensive analytics including memory utilization percentages, average importance scores, and temporal spans, enabling data-driven memory management decisions.

Context Compression Strategies

Context compression ensures optimal LLM performance by providing relevant information within size constraints:

def get_compressed_context(self, max_context_size: int = 2000) -> str:
    """Get compressed conversation context for LLM calls"""

    # Sort entries by importance and recency
    sorted_entries = sorted(
        self.memory_entries,
        key=lambda x: (x.importance_score * 0.7 + self._recency_score(x) * 0.3),
        reverse=True
    )

The sorting algorithm weights importance higher (70%) than recency (30%), ensuring critical information is prioritized while still valuing recent context.

Next, the method builds the context string using a greedy packing approach:

    # Build context within size limit
    context_parts = []
    current_size = 0

    for entry in sorted_entries:
        if current_size + len(entry.content) > max_context_size:
            break

        context_parts.append(entry.content)
        current_size += len(entry.content)

The packing algorithm greedily includes entries until the size limit is reached, maximizing the amount of relevant context provided to the LLM.

Finally, the method handles truncation gracefully by informing the LLM about omitted content:

    # Add summary if we had to truncate
    if len(context_parts) < len(sorted_entries):
        truncated_count = len(sorted_entries) - len(context_parts)
        summary = f"\n[... {truncated_count} earlier messages truncated for length ...]"
        context_parts.append(summary)

    return "\n".join(context_parts)

When truncation occurs, a clear summary informs the LLM that additional context exists but was omitted, helping it understand the conversation's scope.

def _recency_score(self, entry: MemoryEntry) -> float:
    """Calculate recency score (1.0 = most recent, 0.0 = oldest)"""
    if not self.memory_entries:
        return 1.0

    oldest = min(self.memory_entries, key=lambda x: x.timestamp)
    newest = max(self.memory_entries, key=lambda x: x.timestamp)

    total_span = (newest.timestamp - oldest.timestamp).total_seconds()
    if total_span == 0:
        return 1.0

    entry_age = (newest.timestamp - entry.timestamp).total_seconds()
    return 1.0 - (entry_age / total_span)

The recency scoring normalizes timestamps to a 0-1 scale relative to the conversation span, ensuring fair comparison regardless of absolute time differences.


Part 2: Tool Execution Speed (10 minutes)

Caching and Parallel Execution

🗂️ File: src/session1/optimized_tools.py - High-performance tool implementations

Tool performance optimization combines intelligent caching with parallel execution capabilities:

import asyncio
from functools import lru_cache
from typing import Dict, List, Any, Callable
import hashlib
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class OptimizedToolAgent(BaseAgent):
    """Agent with high-performance tool execution"""

    def __init__(self, name: str, llm_client, tools: List[Tool]):
        super().__init__(name, "Optimized tool agent", llm_client)
        self.tools = {tool.name: tool for tool in tools}
        self.tool_cache = {}
        self.execution_stats = {}
        self.thread_pool = ThreadPoolExecutor(max_workers=4)

The agent maintains separate caches and statistics for each tool, while the thread pool enables parallel execution of CPU-intensive tool operations.

    def _cache_key(self, tool_name: str, params: Dict[str, Any]) -> str:
        """Generate cache key for tool execution"""
        # Create deterministic hash of tool name and parameters
        param_str = str(sorted(params.items()))
        cache_input = f"{tool_name}:{param_str}"
        return hashlib.md5(cache_input.encode()).hexdigest()

Caching uses deterministic hashing of tool names and parameters. Sorting the parameters ensures consistent cache keys regardless of parameter order.

    def _is_cacheable(self, tool_name: str) -> bool:
        """Determine if tool results should be cached"""
        # Don't cache tools that have side effects or time-dependent results
        non_cacheable = {"web_search", "current_time", "random_generator", "file_write"}
        return tool_name not in non_cacheable

Intelligent caching excludes tools with side effects or time-dependent results, preventing stale data while maximizing cache effectiveness for deterministic operations.

Intelligent Tool Caching

Intelligent Tool Caching Implementation

The caching system balances performance gains with result freshness:

async def execute_tool_cached(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
    """Execute tool with intelligent caching"""

    start_time = time.time()

    # Check cache first
    if self._is_cacheable(tool_name):
        cache_key = self._cache_key(tool_name, params)

Each execution is timed for performance analysis, and cacheable tools are checked for existing results before expensive re-execution.

The cache validation process includes TTL (Time-To-Live) checks to ensure result freshness:

        if cache_key in self.tool_cache:
            cached_result = self.tool_cache[cache_key]
            # Check if cache entry is still valid (within 1 hour)
            if time.time() - cached_result["timestamp"] < 3600:
                execution_time = time.time() - start_time
                self._update_stats(tool_name, execution_time, True)
                return cached_result["result"]

Cache validation includes expiry checks (1-hour TTL) to balance performance gains with result freshness. Cache hits are tracked in performance statistics to measure caching effectiveness.

When cache misses occur, the system executes the tool and stores results for future use:

    # Execute tool
    if tool_name not in self.tools:
        raise ValueError(f"Tool {tool_name} not available")

    tool = self.tools[tool_name]
    result = await tool.execute(params)

    # Cache result if appropriate
    if self._is_cacheable(tool_name):
        self.tool_cache[cache_key] = {
            "result": result,
            "timestamp": time.time()
        }

Finally, performance statistics are updated regardless of cache hit or miss:

    # Update performance stats
    execution_time = time.time() - start_time
    self._update_stats(tool_name, execution_time, False)

    return result

```python
def _update_stats(self, tool_name: str, execution_time: float, cache_hit: bool):
    """Update tool execution statistics"""
    if tool_name not in self.execution_stats:
        self.execution_stats[tool_name] = {
            "total_calls": 0,
            "cache_hits": 0,
            "total_time": 0.0,
            "avg_time": 0.0
        }

    stats = self.execution_stats[tool_name]
    stats["total_calls"] += 1
    stats["total_time"] += execution_time
    stats["avg_time"] = stats["total_time"] / stats["total_calls"]

    if cache_hit:
        stats["cache_hits"] += 1

Performance statistics track execution times, call counts, and cache hit rates per tool, enabling optimization decisions based on actual usage patterns.

Parallel Tool Execution

Parallel Tool Execution

Parallel execution maximizes throughput when multiple tools can run simultaneously:

async def execute_tools_parallel(self, tool_requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Execute multiple tools in parallel for better performance"""

    async def execute_single_tool(request):
        tool_name = request["tool"]
        params = request.get("params", {})
        try:
            result = await self.execute_tool_cached(tool_name, params)
            return {"success": True, "result": result, "tool": tool_name}
        except Exception as e:
            return {"success": False, "error": str(e), "tool": tool_name}

The parallel execution system handles multiple tool requests concurrently, significantly reducing total execution time for independent operations. Each tool execution is wrapped in error handling to prevent one failure from affecting others.

Next, the method executes all requests concurrently using asyncio.gather:

    # Execute all tool requests concurrently
    tasks = [execute_single_tool(request) for request in tool_requests]
    results = await asyncio.gather(*tasks, return_exceptions=True)

The return_exceptions=True parameter ensures that exceptions don't cancel other concurrent operations, allowing partial success scenarios.

Finally, the method processes results and handles any exceptions that occurred:

    # Handle any exceptions
    processed_results = []
    for result in results:
        if isinstance(result, Exception):
            processed_results.append({
                "success": False,
                "error": str(result),
                "tool": "unknown"
            })
        else:
            processed_results.append(result)

    return processed_results

Performance Statistics Collection

Comprehensive performance tracking enables data-driven optimization decisions:

def get_performance_stats(self) -> Dict[str, Any]:
    """Get comprehensive performance statistics"""
    total_calls = sum(stats["total_calls"] for stats in self.execution_stats.values())
    total_cache_hits = sum(stats["cache_hits"] for stats in self.execution_stats.values())

    cache_hit_rate = (total_cache_hits / total_calls * 100) if total_calls > 0 else 0

    return {
        "total_tool_calls": total_calls,
        "total_cache_hits": total_cache_hits,
        "cache_hit_rate_pct": cache_hit_rate,
        "tool_stats": dict(self.execution_stats),
        "cache_size": len(self.tool_cache),
        "thread_pool_size": self.thread_pool._max_workers
    }

Part 3: Response Time Optimization (10 minutes)

Faster Agent Responses

🗂️ File: src/session1/fast_response_agent.py - Speed-optimized agent implementations

import asyncio
from typing import Optional, Dict, Any
import time

class FastResponseAgent(BaseAgent):
    """Agent optimized for minimal response latency"""

    def __init__(self, name: str, llm_client):
        super().__init__(name, "Fast response agent", llm_client)
        self.response_cache = {}
        self.precomputed_responses = {}
        self.response_times = []
        self.target_response_time = 2.0  # seconds

The agent initialization sets up caching infrastructure and tracks response times with a configurable target latency (2 seconds default), enabling performance-driven optimizations.

Fast Message Processing

The core processing method implements multiple optimization strategies:

    async def process_message_fast(self, message: str) -> Dict[str, Any]:
        """Process message with speed optimizations"""
        start_time = time.time()

        # Check for exact match in cache
        if message in self.response_cache:
            response = self.response_cache[message]
            response_time = time.time() - start_time
            self.response_times.append(response_time)
            return {
                "response": response,
                "response_time": response_time,
                "cache_hit": True
            }

The processing method starts with timing and immediately checks for exact cache matches, providing sub-millisecond responses for repeated messages. All response times are tracked for performance analysis.

When exact matches aren't available, the system uses fuzzy matching for similar queries:

        # Check for similar message (fuzzy matching)
        similar_response = self._find_similar_response(message)
        if similar_response:
            response_time = time.time() - start_time
            self.response_times.append(response_time)
            return {
                "response": similar_response,
                "response_time": response_time,
                "cache_hit": "fuzzy"
            }

When exact matches fail, fuzzy matching using Jaccard similarity finds responses to similar messages, significantly reducing response time while maintaining quality for semantically related queries.

For new messages, the system uses timeout controls to ensure timely responses:

        # Generate new response with timeout
        try:
            response = await asyncio.wait_for(
                self._generate_response_with_context(message),
                timeout=self.target_response_time
            )
        except asyncio.TimeoutError:
            response = self._get_fallback_response(message)

        # Cache the response
        self.response_cache[message] = response

        response_time = time.time() - start_time
        self.response_times.append(response_time)

        return {
            "response": response,
            "response_time": response_time,
            "cache_hit": False
        }

For new messages, the system uses asyncio timeout to enforce response time limits. When LLM processing exceeds the target time, fallback responses ensure users always receive timely feedback.

Fuzzy Matching Algorithm

The similarity detection system uses Jaccard coefficient for semantic matching:

    def _find_similar_response(self, message: str) -> Optional[str]:
        """Find cached response for similar message using fuzzy matching"""
        message_words = set(message.lower().split())

        best_match = None
        best_similarity = 0.0

The method starts by converting the input message to a set of lowercase words, enabling case-insensitive comparison and duplicate removal.

Next, it iterates through all cached messages to find the best semantic match:

        for cached_message, cached_response in self.response_cache.items():
            cached_words = set(cached_message.lower().split())

            # Calculate Jaccard similarity
            intersection = message_words & cached_words
            union = message_words | cached_words

            if union:
                similarity = len(intersection) / len(union)
                if similarity > 0.7 and similarity > best_similarity:
                    best_similarity = similarity
                    best_match = cached_response

        return best_match

The similarity algorithm uses Jaccard coefficient to compare word sets, requiring 70% similarity threshold to prevent false matches while enabling semantic reuse of cached responses.

    def _get_fallback_response(self, message: str) -> str:
        """Generate quick fallback response when main processing times out"""
        fallback_responses = [
            "I'm processing your request. Could you please rephrase or simplify your question?",
            "I understand you're asking about this topic. Let me provide a brief response while I gather more details.",
            "That's an interesting question. Here's what I can tell you immediately..."
        ]

        # Select fallback based on message characteristics
        if "?" in message:
            return fallback_responses[0]
        elif len(message.split()) > 20:
            return fallback_responses[1]
        else:
            return fallback_responses[2]

Performance Monitoring and Analytics

Comprehensive performance tracking enables data-driven optimization:

def get_response_time_stats(self) -> Dict[str, Any]:
    """Get detailed response time analytics"""
    if not self.response_times:
        return {"no_data": True}

    sorted_times = sorted(self.response_times)
    n = len(sorted_times)

The analytics method starts by validating data availability and sorting response times for percentile calculations.

Next, it calculates comprehensive performance metrics:

    return {
        "total_responses": n,
        "avg_response_time": sum(sorted_times) / n,
        "min_response_time": min(sorted_times),
        "max_response_time": max(sorted_times),
        "p50_response_time": sorted_times[n // 2],
        "p90_response_time": sorted_times[int(n * 0.9)],
        "p95_response_time": sorted_times[int(n * 0.95)],
        "responses_under_target": sum(1 for t in sorted_times if t < self.target_response_time),
        "target_achievement_rate": sum(1 for t in sorted_times if t < self.target_response_time) / n * 100,
        "cache_entries": len(self.response_cache)
    }

The analytics provide comprehensive performance metrics including percentile distributions (P50, P90, P95) that are essential for understanding response time characteristics under different load conditions.

Automated Performance Optimization

The system automatically adjusts its behavior based on performance metrics:

async def optimize_performance(self):
    """Automatically optimize agent performance based on metrics"""
    stats = self.get_response_time_stats()

    if stats.get("no_data"):
        return

    # Adjust target response time based on performance
    if stats["target_achievement_rate"] > 90:
        # Performing well, can be more ambitious
        self.target_response_time *= 0.9
    elif stats["target_achievement_rate"] < 50:
        # Struggling, be more lenient
        self.target_response_time *= 1.2

The optimization algorithm adjusts targets based on actual performance: when achieving 90%+ success rate, it becomes more ambitious; when below 50%, it becomes more lenient.

Finally, the system manages cache size to prevent memory issues:

    # Clean cache if it's getting too large
    if len(self.response_cache) > 1000:
        # Keep only most recent 500 entries
        items = list(self.response_cache.items())
        self.response_cache = dict(items[-500:])

    self.logger.info(f"Performance optimization: target time {self.target_response_time:.2f}s, cache size {len(self.response_cache)}")

📝 Multiple Choice Test - Module B

Test your understanding of performance optimization concepts:

Question 1: What information does the MemoryEntry dataclass track to enable intelligent memory management?
A) Only content and timestamp
B) Content, timestamp, importance_score, and size_bytes
C) Just the memory size and creation time
D) Content and importance score only

Question 2: How does the memory cleanup algorithm prioritize which entries to keep?
A) Random selection
B) First-in-first-out (FIFO)
C) Sorts by importance and age, keeps top 70%
D) Only keeps the most recent entries

Question 3: Why are certain tools marked as non-cacheable in the optimization system?
A) They consume too much memory
B) They have side effects or time-dependent results
C) They execute too slowly
D) They require special permissions

Question 4: What technique does the context compression use to fit within size limits?
A) Truncates all messages to the same length
B) Removes all older messages completely
C) Weights importance (70%) higher than recency (30%) when sorting
D) Compresses text using algorithms

Question 5: What does the performance monitoring system track to optimize agent responses?
A) Only response times
B) Memory usage exclusively
C) Response times, percentiles, cache hit rates, and target achievement
D) Just error rates and failures

🗂️ View Test Solutions →


Module Summary

You've now mastered performance optimization for bare metal agents:

Memory Management: Implemented efficient memory usage and conversation history optimization
Tool Execution Speed: Built caching and parallel execution systems for faster tool usage
Response Time Optimization: Created strategies for faster agent responses with fallback mechanisms
Performance Monitoring: Designed analytics systems for continuous performance improvement

Related Modules: - Core Session: Session 1 - Bare Metal Agents - Module A: Advanced Agent Patterns - Module C: Complex State Management

🗂️ Code Files: All examples use files in src/session1/ - memory_optimized_agent.py - Memory-efficient agent implementations - optimized_tools.py - High-performance tool execution - fast_response_agent.py - Speed-optimized response generation

🚀 Quick Start: Run cd src/session1 && python demo_runner.py to see performance optimization examples