Session 8 - Module C: Performance Optimization (50 minutes)¶
Prerequisites: Session 8 Core Section Complete
Target Audience: Performance engineers and cost optimization specialists
Cognitive Load: 4 optimization concepts
Module Overview¶
This module explores advanced performance optimization techniques for Agno agent systems including intelligent caching strategies, cost management systems, memory optimization, latency reduction techniques, and automated performance tuning. You'll learn to build highly efficient agent systems that minimize costs while maximizing performance.
Learning Objectives¶
By the end of this module, you will:
- Implement multi-layer caching systems with intelligent invalidation strategies
- Design cost optimization frameworks with automated budget management
- Create memory-efficient agent architectures with resource pooling
- Build latency optimization systems with request batching and connection pooling
Part 1: Intelligent Caching Systems (20 minutes)¶
Multi-Layer Caching Architecture¶
🗂️ File: src/session8/intelligent_caching.py
- Advanced caching for agent systems
Step 1: Core Imports and Cache Architecture Setup¶
Before building our intelligent caching system, we need to import the essential modules and define our cache architecture. This forms the foundation for multi-layer caching.
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncio
import hashlib
import json
import logging
from enum import Enum
import redis
import sqlite3
Key Libraries Explained:
asyncio
: Enables asynchronous cache operations for better performanceredis
: Provides distributed caching capabilities across multiple servershashlib
: Creates cache keys and handles content deduplicationdatetime
: Manages cache expiration and time-based optimization
Step 2: Defining Cache Hierarchy Levels¶
Modern caching systems use multiple layers, each optimized for different access patterns and performance requirements.
class CacheLevel(Enum):
"""Cache hierarchy levels for optimal performance"""
L1_MEMORY = "l1_memory" # Fastest: In-process memory cache
L2_REDIS = "l2_redis" # Fast: Redis distributed cache
L3_PERSISTENT = "l3_persistent" # Medium: Persistent storage cache
L4_CDN = "l4_cdn" # Global: CDN edge cache
Cache Level Strategy:
- L1 (Memory): Millisecond access, limited capacity, perfect for frequently accessed data
- L2 (Redis): Sub-second access, shared across instances, ideal for session data
- L3 (Persistent): Second-level access, permanent storage, great for expensive computations
- L4 (CDN): Global distribution, massive capacity, perfect for static resources
Step 3: Cache Entry Data Structure¶
Each cache entry needs metadata to support intelligent caching decisions like expiration, access tracking, and cache level optimization.
@dataclass
class CacheEntry:
"""Cache entry with comprehensive metadata for intelligent management"""
key: str
value: Any
created_at: datetime
last_accessed: datetime
access_count: int = 0
ttl_seconds: Optional[int] = None
cache_level: CacheLevel = CacheLevel.L1_MEMORY
metadata: Dict[str, Any] = field(default_factory=dict)
Metadata Enables:
- Access tracking:
access_count
andlast_accessed
for LRU eviction - Expiration management:
ttl_seconds
for automatic cleanup - Performance optimization:
cache_level
for promotion/demotion decisions - Custom attributes:
metadata
for application-specific caching logic
Step 4: Cache Performance Monitoring¶
Effective caching requires comprehensive statistics to understand performance patterns and optimize cache configuration.
class CacheStatistics:
"""Statistics tracking for cache operations and performance analysis"""
def __init__(self):
self.hits = {"l1": 0, "l2": 0, "l3": 0, "l4": 0}
self.misses = {"l1": 0, "l2": 0, "l3": 0, "l4": 0}
self.evictions = {"l1": 0, "l2": 0, "l3": 0, "l4": 0}
Performance Tracking Strategy:
- Hits: Successful cache retrievals that avoid expensive operations
- Misses: Cache failures that require full computation or external API calls
- Evictions: Cache removals that help optimize memory usage
def record_hit(self, level: str):
"""Record a cache hit for performance monitoring"""
if level in self.hits:
self.hits[level] += 1
def record_miss(self, level: str):
"""Record a cache miss for optimization analysis"""
if level in self.misses:
self.misses[level] += 1
def record_eviction(self, level: str):
"""Record a cache eviction for memory management tracking"""
if level in self.evictions:
self.evictions[level] += 1
Why Track These Metrics:
- Hit rates indicate caching effectiveness and cost savings
- Miss patterns reveal opportunities for cache optimization
- Eviction frequency shows memory pressure and sizing needs
def get_hit_rate(self, level: str) -> float:
"""Calculate hit rate for specific cache level optimization"""
hits = self.hits.get(level, 0)
misses = self.misses.get(level, 0)
total = hits + misses
return hits / total if total > 0 else 0.0
def get_overall_hit_rate(self) -> float:
"""Calculate overall system cache effectiveness"""
total_hits = sum(self.hits.values())
total_requests = total_hits + sum(self.misses.values())
return total_hits / total_requests if total_requests > 0 else 0.0
Hit Rate Analysis:
- 90%+ hit rate: Excellent caching, significant cost savings
- 70-90% hit rate: Good performance, some optimization opportunities
- <70% hit rate: Poor caching efficiency, requires strategy review
Step 5: Cache Configuration Management¶
Proper configuration is crucial for optimal cache performance and resource utilization across different environments.
@dataclass
class CacheConfiguration:
"""Configuration for intelligent cache management"""
l1_max_size: int = 1000 # Maximum entries in L1 cache
l1_max_memory_mb: int = 100 # Memory limit for L1 cache
default_ttl: int = 3600 # Default time-to-live (1 hour)
redis_url: str = "redis://localhost:6379" # Redis connection string
Configuration Strategy:
- L1 size limit: Prevents memory overflow in high-traffic scenarios
- Memory constraints: Ensures predictable resource usage
- TTL defaults: Balances freshness with performance
- Redis connectivity: Enables distributed caching across instances
def __post_init__(self):
"""Validate configuration parameters for production safety"""
if self.l1_max_size <= 0:
raise ValueError("l1_max_size must be positive")
if self.l1_max_memory_mb <= 0:
raise ValueError("l1_max_memory_mb must be positive")
if self.default_ttl <= 0:
raise ValueError("default_ttl must be positive")
Validation Benefits:
- Prevents misconfigurations that could cause system failures
- Ensures positive values for all critical cache parameters
- Fails fast during initialization rather than at runtime
Step 6: Cache Backend Architecture¶
Using an abstract base class ensures consistent interfaces across different cache implementations while enabling easy backend switching.
from abc import ABC, abstractmethod
class CacheBackend(ABC):
"""Abstract base class for cache backends with consistent interface"""
@abstractmethod
async def get(self, key: str) -> Optional[Any]:
"""Retrieve value from cache with async support"""
pass
@abstractmethod
async def set(self, key: str, entry: CacheEntry):
"""Store value in cache with metadata"""
pass
@abstractmethod
async def delete(self, key: str):
"""Remove value from cache for invalidation"""
pass
Abstract Backend Benefits:
- Consistent interface across memory, Redis, and persistent storage
- Easy backend switching without changing application code
- Async support for non-blocking cache operations
- Type safety with clear method signatures
Step 7: Memory Cache Implementation¶
The L1 memory cache provides ultra-fast access times with intelligent TTL management and access tracking for optimization.
class MemoryCacheBackend(CacheBackend):
"""In-memory cache backend with intelligent LRU eviction"""
def __init__(self, config: CacheConfiguration, stats: CacheStatistics):
self._cache: Dict[str, CacheEntry] = {}
self._config = config
self._stats = stats
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
Memory Cache Initialization:
- Dictionary storage: Provides O(1) average-case lookup performance
- Configuration integration: Uses validated cache settings
- Statistics tracking: Monitors performance for optimization
- Logging support: Enables debugging and monitoring
async def get(self, key: str) -> Optional[Any]:
"""Retrieve value from L1 memory cache with TTL validation"""
if key in self._cache:
entry = self._cache[key]
# Automatic TTL expiration check
if entry.ttl_seconds:
elapsed = (datetime.now() - entry.created_at).total_seconds()
if elapsed > entry.ttl_seconds:
del self._cache[key] # Clean up expired entry
return None
# Update access metadata for LRU algorithm
entry.last_accessed = datetime.now()
entry.access_count += 1
return entry.value
return None
Smart Cache Retrieval Features:
- Automatic expiration: Removes stale data without manual intervention
- Access tracking: Records usage patterns for intelligent eviction
- Metadata updates: Maintains fresh statistics for optimization decisions
- Clean removal: Handles expired entries to prevent memory leaks
async def set(self, key: str, entry: CacheEntry):
"""Store entry in L1 memory cache with capacity management"""
# Proactive eviction to maintain performance
if len(self._cache) >= self._config.l1_max_size:
await self._evict_entries()
self._cache[key] = entry
async def delete(self, key: str):
"""Remove entry from memory cache for invalidation"""
if key in self._cache:
del self._cache[key]
Cache Storage Management:
- Capacity checks: Prevents unlimited memory growth
- Proactive eviction: Maintains optimal performance before hitting limits
- Simple storage: Direct dictionary assignment for maximum speed
- Safe deletion: Handles missing keys gracefully
Step 8: Intelligent Cache Eviction Strategy¶
The eviction algorithm balances recency and frequency to optimize cache effectiveness.
async def _evict_entries(self):
"""Intelligent L1 cache eviction using LRU + frequency analysis"""
if not self._cache:
return
# Calculate comprehensive eviction scores
entries_with_scores = []
now = datetime.now()
for key, entry in self._cache.items():
# Time since last access (higher = more stale)
recency_score = (now - entry.last_accessed).total_seconds()
# Inverse frequency (higher = less frequently used)
frequency_score = 1.0 / (entry.access_count + 1)
# Combined score prioritizes stale, infrequently used entries
combined_score = recency_score * frequency_score
entries_with_scores.append((key, combined_score))
Eviction Algorithm Explained:
- Recency factor: Measures time since last access (staleness)
- Frequency factor: Considers how often the entry is used
- Combined scoring: Balances both factors for optimal eviction decisions
- Higher scores: Indicate better candidates for removal
# Sort by score (highest first) and remove top 20%
entries_with_scores.sort(key=lambda x: x[1], reverse=True)
entries_to_evict = entries_with_scores[:len(entries_with_scores) // 5]
for key, _ in entries_to_evict:
del self._cache[key]
self._stats.record_eviction("l1") # Track for optimization
Smart Eviction Strategy:
- Conservative removal: Only evicts 20% of entries to maintain performance
- Score-based selection: Removes least valuable entries first
- Statistics tracking: Records evictions for monitoring and tuning
- Batch processing: Efficient bulk removal operation
def calculate_memory_usage(self) -> float:
"""Calculate approximate L1 cache memory usage for monitoring"""
total_size = 0
for entry in self._cache.values():
# Rough estimation of memory usage per entry
total_size += len(str(entry.value))
return total_size / (1024 * 1024) # Convert to MB
Memory Usage Monitoring:
- Approximate calculation: Provides useful estimates without heavy computation
- String conversion: Simple method to estimate object size
- MB conversion: Human-readable units for monitoring dashboards
- Performance tracking: Helps optimize cache configuration
Step 9: Redis Distributed Cache Backend¶
The L2 Redis cache provides shared caching across multiple agent instances with persistence and scalability.
class RedisCacheBackend(CacheBackend):
"""Redis distributed cache backend for shared agent caching"""
def __init__(self, config: CacheConfiguration, stats: CacheStatistics):
self._redis_client = redis.from_url(config.redis_url)
self._config = config
self._stats = stats
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
Redis Backend Advantages:
- Distributed caching: Shared cache across multiple agent instances
- Persistence options: Can survive application restarts
- Atomic operations: Thread-safe operations for concurrent access
- Built-in expiration: Native TTL support for automatic cleanup
async def get(self, key: str) -> Optional[Any]:
"""Retrieve from L2 Redis cache with error handling"""
try:
# Execute Redis operation in thread pool to avoid blocking
cached_data = await asyncio.get_event_loop().run_in_executor(
None, self._redis_client.get, key
)
if cached_data:
return json.loads(cached_data.decode('utf-8'))
except Exception as e:
self.logger.warning(f"Redis cache error: {e}")
return None
Redis Retrieval Features:
- Async execution: Non-blocking operations using thread pool
- JSON deserialization: Handles complex data structures
- Error resilience: Graceful degradation when Redis is unavailable
- UTF-8 decoding: Proper string handling for international content
async def set(self, key: str, entry: CacheEntry):
"""Store entry in L2 Redis cache with automatic expiration"""
try:
cache_data = json.dumps(entry.value)
await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._redis_client.setex(
key,
entry.ttl_seconds or self._config.default_ttl,
cache_data
)
)
except Exception as e:
self.logger.error(f"Redis cache set error: {e}")
Redis Set Operation Features:
- JSON serialization: Enables storage of complex Python objects
- Automatic expiration: Uses SETEX for built-in TTL support
- Lambda execution: Properly handles Redis client calls in thread pool
async def delete(self, key: str):
"""Remove entry from Redis cache for invalidation"""
try:
await asyncio.get_event_loop().run_in_executor(
None, self._redis_client.delete, key
)
except Exception as e:
self.logger.error(f"Redis cache delete error: {e}")
Redis Storage Operations:
- JSON serialization: Enables storage of complex Python objects
- Automatic expiration: Uses SETEX for built-in TTL support
- Lambda execution: Properly handles Redis client calls in thread pool
- Error logging: Maintains system stability with graceful error handling
Step 10: Intelligent Cache Manager Foundation¶
The central cache manager orchestrates multi-layer caching with intelligent policies and cache promotion strategies.
class IntelligentCacheManager:
"""Multi-layer intelligent caching system for agent responses"""
def __init__(self, config: CacheConfiguration):
self._config = config
self._stats = CacheStatistics()
self._cache_policies = {}
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
# Initialize cache backends
self._backends = {
CacheLevel.L1_MEMORY: MemoryCacheBackend(config, self._stats),
CacheLevel.L2_REDIS: RedisCacheBackend(config, self._stats)
}
Cache Manager Architecture:
- Multi-backend support: Seamlessly integrates L1 memory and L2 Redis caching
- Statistics integration: Tracks performance across all cache levels
- Policy-driven: Configurable caching strategies for different data types
- Logging support: Comprehensive monitoring and debugging capabilities
Step 11: Cache Policy Configuration¶
Define intelligent caching policies optimized for different types of agent data and usage patterns.
def setup_cache_policies(self) -> Dict[str, Any]:
"""Configure intelligent caching policies"""
cache_config = {
"response_caching": {
"enabled": True,
"ttl_seconds": 3600,
"cache_levels": [CacheLevel.L1_MEMORY, CacheLevel.L2_REDIS],
"invalidation_strategy": "semantic_similarity",
"compression": True
},
"model_output_caching": {
"enabled": True,
"ttl_seconds": 7200,
"cache_levels": [CacheLevel.L2_REDIS, CacheLevel.L3_PERSISTENT],
"deduplication": True,
"similarity_threshold": 0.95
}
}
Policy Design Principles:
- Response caching: 1-hour TTL for agent responses with semantic invalidation
- Model output caching: 2-hour TTL for expensive model computations
- Multi-level storage: Optimizes placement based on access patterns
- Deduplication: Reduces storage costs through similarity detection
"conversation_context_caching": {
"enabled": True,
"ttl_seconds": 1800,
"cache_levels": [CacheLevel.L1_MEMORY],
"max_context_length": 10,
"context_compression": True
},
"tool_result_caching": {
"enabled": True,
"ttl_seconds": 600, # 10 minutes for tool results
"cache_levels": [CacheLevel.L1_MEMORY, CacheLevel.L2_REDIS],
"cache_by_parameters": True,
"parameter_normalization": True
}
}
self.cache_policies = cache_config
return cache_config
Specialized Caching Strategies:
- Context caching: Short-lived L1-only for conversation state
- Tool result caching: 10-minute TTL for API call results
- Parameter normalization: Improves cache hit rates for similar tool calls
- Context compression: Reduces memory footprint for long conversations
Step 12: Cache Retrieval with Intelligent Hierarchy¶
The cache retrieval system implements a smart hierarchy traversal with automatic promotion and comprehensive error handling.
async def get_cached_response(self, cache_key: str,
cache_levels: Optional[List[CacheLevel]] = None) -> Optional[Any]:
"""Get cached response with intelligent cache hierarchy traversal"""
if not cache_key:
raise ValueError("cache_key cannot be empty")
if cache_levels is None:
cache_levels = [CacheLevel.L1_MEMORY, CacheLevel.L2_REDIS]
Retrieval Strategy:
- Input validation: Ensures cache key is valid before processing
- Default hierarchy: Uses L1 → L2 traversal for optimal performance
- Flexible levels: Allows custom cache level specification
- Error prevention: Guards against empty or invalid cache keys
for cache_level in cache_levels:
try:
backend = self._backends.get(cache_level)
if backend is None:
self.logger.warning(f"Backend not available for cache level {cache_level}")
continue
result = await backend.get(cache_key)
if result is not None:
# Cache hit - promote to higher cache levels
await self._promote_cache_entry(cache_key, result, cache_level)
# Update statistics
level_name = cache_level.value.split('_')[0]
self._stats.record_hit(level_name)
self.logger.debug(f"Cache hit at level {cache_level} for key {cache_key}")
return result
Cache Hit Processing:
- Automatic promotion: Moves popular data to faster cache levels
- Statistics tracking: Records hit rates for performance analysis
- Graceful degradation: Continues if individual backends fail
- Performance logging: Tracks cache effectiveness for optimization
except Exception as e:
self.logger.warning(f"Cache level {cache_level} failed for key {cache_key}: {e}")
level_name = cache_level.value.split('_')[0]
self._stats.record_miss(level_name)
continue
# Cache miss - record misses for all levels
for level in cache_levels:
level_name = level.value.split('_')[0]
self._stats.record_miss(level_name)
self.logger.debug(f"Cache miss for key {cache_key}")
return None
Error Handling and Statistics:
- Exception resilience: System continues functioning even with backend failures
- Miss tracking: Records failures for performance monitoring
- Comprehensive logging: Provides debugging information for optimization
- Statistical accuracy: Ensures metrics reflect actual cache performance
Step 13: Cache Entry Storage Management¶
The cache storage system manages entries across multiple levels with automatic fallback and success tracking.
async def set_cached_response(self, cache_key: str, value: Any,
ttl_seconds: Optional[int] = None,
cache_levels: Optional[List[CacheLevel]] = None):
"""Set cached response across multiple cache levels"""
if not cache_key:
raise ValueError("cache_key cannot be empty")
if cache_levels is None:
cache_levels = [CacheLevel.L1_MEMORY, CacheLevel.L2_REDIS]
if ttl_seconds is None:
ttl_seconds = self._config.default_ttl
Parameter Validation and Defaults:
- Key validation: Prevents empty cache keys that could cause lookup failures
- Level defaults: Uses L1 and L2 caching for optimal performance balance
- TTL management: Falls back to configuration defaults for consistent expiration
cache_entry = CacheEntry(
key=cache_key,
value=value,
created_at=datetime.now(),
last_accessed=datetime.now(),
ttl_seconds=ttl_seconds
)
success_count = 0
Cache Entry Creation:
- Metadata initialization: Sets creation and access timestamps for tracking
- Value storage: Preserves original data with complete metadata wrapper
- Success tracking: Monitors how many backends successfully store the entry
for cache_level in cache_levels:
try:
backend = self._backends.get(cache_level)
if backend is None:
self.logger.warning(f"Backend not available for cache level {cache_level}")
continue
await backend.set(cache_key, cache_entry)
success_count += 1
self.logger.debug(f"Set cache entry at level {cache_level} for key {cache_key}")
except Exception as e:
self.logger.error(f"Failed to set cache in {cache_level} for key {cache_key}: {e}")
if success_count == 0:
raise RuntimeError(f"Failed to set cache entry for key {cache_key} in any backend")
Multi-Backend Storage Strategy:
- Backend validation: Checks if cache level is available before attempting storage
- Error isolation: Individual backend failures don't stop other storage attempts
- Success tracking: Ensures at least one backend successfully stores the data
- Comprehensive logging: Provides visibility into storage operations and failures
Step 14: Cache Promotion System¶
Cache promotion automatically moves frequently accessed data to faster cache levels.
async def _promote_cache_entry(self, cache_key: str, value: Any, source_level: CacheLevel):
"""Promote cache entry to higher-level caches"""
# Determine which levels to promote to
promotion_levels = []
if source_level == CacheLevel.L2_REDIS:
# Promote from L2 to L1
promotion_levels = [CacheLevel.L1_MEMORY]
if promotion_levels:
try:
await self.set_cached_response(
cache_key,
value,
cache_levels=promotion_levels
)
self.logger.debug(f"Promoted cache entry {cache_key} from {source_level} to {promotion_levels}")
except Exception as e:
self.logger.warning(f"Failed to promote cache entry {cache_key}: {e}")
Intelligent Promotion Logic:
- Level analysis: Determines appropriate promotion targets based on current level
- Performance optimization: Moves popular L2 data to ultra-fast L1 memory
- Error resilience: Promotion failures don't affect original cache functionality
- Transparent operation: Uses existing storage methods for consistency
Step 15: Cache Invalidation Operations¶
Cache invalidation ensures data consistency by removing stale entries across all cache levels.
async def invalidate_cache_entry(self, cache_key: str,
cache_levels: Optional[List[CacheLevel]] = None):
"""Invalidate cache entry across specified levels"""
if not cache_key:
raise ValueError("cache_key cannot be empty")
if cache_levels is None:
cache_levels = list(self._backends.keys())
for cache_level in cache_levels:
try:
backend = self._backends.get(cache_level)
if backend:
await backend.delete(cache_key)
self.logger.debug(f"Invalidated cache entry {cache_key} at level {cache_level}")
except Exception as e:
self.logger.error(f"Failed to invalidate cache in {cache_level} for key {cache_key}: {e}")
Invalidation Strategy:
- Comprehensive removal: Defaults to removing from all available cache levels
- Individual control: Allows targeted invalidation of specific cache levels
- Error tolerance: Individual backend failures don't stop complete invalidation
- Operation logging: Tracks invalidation success and failures for monitoring
Step 16: Semantic Cache Key Generation¶
Semantic cache keys enable intelligent caching based on content similarity rather than exact matches.
def create_semantic_cache_key(self, query: str, context: Dict[str, Any]) -> str:
"""Create cache key based on semantic similarity rather than exact match"""
# Normalize query for better cache hits
normalized_query = query.lower().strip()
# Include relevant context in cache key
context_hash = hashlib.md5(
json.dumps(context, sort_keys=True).encode()
).hexdigest()[:8]
# Create semantic hash (in production, use embeddings)
query_hash = hashlib.sha256(normalized_query.encode()).hexdigest()[:16]
return f"semantic:{query_hash}:{context_hash}"
Semantic Key Features:
- Query normalization: Reduces variations through lowercasing and trimming
- Context integration: Includes contextual information for accurate matching
- Hash truncation: Uses short hashes to balance uniqueness with readability
- Semantic prefix: Clear identifier for semantically-generated cache keys
Step 17: Cache Performance Metrics Collection¶
Comprehensive metrics collection provides insights into cache effectiveness and cost savings.
def get_cache_performance_metrics(self) -> Dict[str, Any]:
"""Get comprehensive cache performance metrics"""
overall_hit_rate = self._stats.get_overall_hit_rate()
if overall_hit_rate == 0.0:
self.logger.warning("No cache requests recorded")
# Get memory cache specific metrics
memory_backend = self._backends.get(CacheLevel.L1_MEMORY)
memory_usage = memory_backend.calculate_memory_usage() if memory_backend else 0.0
Metrics Foundation:
- Hit rate calculation: Measures overall cache effectiveness across all levels
- Usage monitoring: Tracks system utilization and alerts for no activity
- Memory tracking: Monitors L1 cache memory consumption for optimization
metrics = {
"overall_hit_rate": overall_hit_rate,
"hit_rates_by_level": {
level.value.split('_')[0]: self._stats.get_hit_rate(level.value.split('_')[0])
for level in self._backends.keys()
},
"cache_statistics": {
"hits": self._stats.hits.copy(),
"misses": self._stats.misses.copy(),
"evictions": self._stats.evictions.copy()
}
}
Performance Metrics Collection:
- Overall hit rate: Provides system-wide cache effectiveness measurement
- Level-specific rates: Enables optimization of individual cache layers
- Operational statistics: Tracks hits, misses, and evictions for trend analysis
metrics.update({
"cache_size": {
"l1_memory_usage_mb": memory_usage,
"backends_active": len(self._backends)
},
"cost_savings": self._calculate_cache_savings(),
"configuration": {
"l1_max_size": self._config.l1_max_size,
"l1_max_memory_mb": self._config.l1_max_memory_mb,
"default_ttl": self._config.default_ttl
}
})
return metrics
Comprehensive Metrics Structure:
- Performance data: Hit rates both overall and per-level for optimization insights
- Operational statistics: Detailed hits, misses, and evictions for trend analysis
- Resource usage: Memory consumption and active backend tracking
- Financial impact: Cost savings calculations for ROI demonstration
- Configuration visibility: Current settings for performance correlation
Step 18: Cache Savings Calculation¶
The cache savings calculator estimates financial benefits of the caching system.
def _calculate_cache_savings(self) -> Dict[str, Any]:
"""Calculate estimated cost savings from caching"""
total_hits = sum(self._stats.hits.values())
total_requests = total_hits + sum(self._stats.misses.values())
if total_requests == 0:
return {"total_requests": 0, "estimated_savings": 0.0}
# Simplified cost calculation - assumes $0.001 per cache miss (API call)
cost_per_miss = 0.001
estimated_savings = total_hits * cost_per_miss
return {
"total_requests": total_requests,
"cache_hits": total_hits,
"estimated_savings_usd": estimated_savings,
"hit_rate": total_hits / total_requests,
"cost_avoidance_rate": total_hits / total_requests if total_requests > 0 else 0.0
}
Cost Analysis Features:
- Request aggregation: Combines hits and misses for total request volume
- Financial modeling: Uses realistic per-API-call costs for savings estimation
- ROI calculation: Provides multiple metrics for business justification
- Efficiency measurement: Hit rate and cost avoidance rate for optimization
Step 19: Response Deduplication System¶
Intelligent deduplication reduces storage costs by identifying and consolidating similar responses.
class ResponseDeduplication:
"""Intelligent response deduplication system"""
def __init__(self):
self.response_signatures = {}
self.similarity_threshold = 0.90
self.dedup_stats = {"duplicates_found": 0, "storage_saved": 0}
Deduplication Configuration:
- Signature storage: Maintains responses for similarity comparison
- Similarity threshold: 90% threshold ensures high-quality matches
- Statistics tracking: Monitors deduplication effectiveness and storage savings
def calculate_response_similarity(self, response1: str, response2: str) -> float:
"""Calculate semantic similarity between responses"""
# Simplified similarity calculation (in production, use proper NLP models)
words1 = set(response1.lower().split())
words2 = set(response2.lower().split())
if not words1 or not words2:
return 0.0
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
return intersection / union if union > 0 else 0.0
Similarity Algorithm:
- Word-level analysis: Compares responses at the word level for content similarity
- Jaccard similarity: Uses intersection-over-union for robust similarity scoring
- Edge case handling: Safely handles empty responses with zero similarity
- Production note: Real implementations should use sophisticated NLP models
Step 20: Duplicate Detection and Storage¶
The duplicate detection system identifies similar responses and manages canonical storage.
async def check_duplicate_response(self, response: str,
context: Dict[str, Any]) -> Optional[str]:
"""Check if response is a duplicate and return canonical version"""
response_hash = self._create_response_hash(response, context)
# Check for similar responses
for existing_hash, stored_response in self.response_signatures.items():
similarity = self.calculate_response_similarity(response, stored_response["text"])
if similarity >= self.similarity_threshold:
self.dedup_stats["duplicates_found"] += 1
self.dedup_stats["storage_saved"] += len(response)
return existing_hash
Duplicate Detection Process:
- Hash generation: Creates unique identifier for each response-context pair
- Similarity scanning: Compares against all stored responses for matches
- Statistics updating: Tracks duplicate finds and storage space saved
# Store new unique response
self.response_signatures[response_hash] = {
"text": response,
"context": context,
"created_at": datetime.now(),
"usage_count": 1
}
return response_hash
Canonical Storage Management:
- Unique response storage: Stores new responses with comprehensive metadata
- Usage tracking: Monitors how often responses are accessed
- Timestamp recording: Tracks creation time for cache management
def _create_response_hash(self, response: str, context: Dict[str, Any]) -> str:
"""Create hash for response deduplication"""
combined = f"{response}:{json.dumps(context, sort_keys=True)}"
return hashlib.sha256(combined.encode()).hexdigest()[:16]
Hash Generation Strategy:
- Content combination: Merges response text with context for unique identification
- Deterministic ordering: Sorts context keys for consistent hash generation
- Secure hashing: Uses SHA-256 for collision-resistant hash generation
- Length optimization: Truncates to 16 characters for storage efficiency
Part 2: Cost Management and Optimization (20 minutes)¶
Automated Cost Management Framework¶
🗂️ File: src/session8/cost_management.py
- Comprehensive cost optimization
Step 1: Cost Management Foundation¶
Cost optimization is critical for production agent systems. Let's start with the essential imports and strategy definitions.
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncio
import logging
from enum import Enum
Step 2: Cost Optimization Strategies¶
Define the core strategies available for automated cost optimization across different operational dimensions.
class CostOptimizationStrategy(Enum):
"""Strategic approaches to cost optimization in agent systems"""
MODEL_RIGHT_SIZING = "model_right_sizing" # Choose optimal model for task
REQUEST_BATCHING = "request_batching" # Batch requests for efficiency
CACHING_ENHANCEMENT = "caching_enhancement" # Improve cache hit rates
RESOURCE_SCHEDULING = "resource_scheduling" # Schedule workload for cost savings
SPOT_INSTANCE_USAGE = "spot_instance_usage" # Use spot instances when possible
Strategy Explanations:
- Model Right-Sizing: Automatically select the most cost-effective model that meets quality requirements
- Request Batching: Group multiple requests to reduce per-request overhead costs
- Caching Enhancement: Increase cache hit rates to reduce expensive API calls
- Resource Scheduling: Shift non-urgent workloads to off-peak hours with lower costs
- Spot Instance Usage: Leverage cheaper spot compute instances for fault-tolerant workloads
Step 3: Budget Configuration Framework¶
Structured budget management with automated alerts and protective measures.
@dataclass
class CostBudget:
"""Comprehensive cost budget configuration with multi-level protection"""
name: str
daily_limit: float # Maximum daily spending
weekly_limit: float # Maximum weekly spending
monthly_limit: float # Maximum monthly spending
alert_thresholds: List[float] = field(default_factory=lambda: [0.5, 0.8, 0.9, 1.0])
auto_scale_down_threshold: float = 0.95 # Trigger automatic scaling at 95%
emergency_shutdown_threshold: float = 1.0 # Emergency stop at 100%
class CostOptimizationException(Exception):
"""Exception raised during cost optimization operations"""
pass
Budget Protection Levels:
- 50% threshold: Early warning for budget monitoring
- 80% threshold: Activate cost optimization strategies
- 90% threshold: Aggressive cost reduction measures
- 95% threshold: Automatic scaling down of non-critical services
- 100% threshold: Emergency shutdown to prevent budget overrun
Step 4: Model Cost Analysis System¶
The ModelCostCalculator provides intelligent model selection based on cost, performance, and quality requirements.
class ModelCostCalculator:
"""Advanced calculator for model usage costs and performance optimization"""
def __init__(self):
# Comprehensive model cost and performance profiles
self._model_profiles = {
"gpt-4": {
"cost_per_token": 0.00003, # Premium model pricing
"accuracy": 0.98, # Highest accuracy
"avg_response_time_ms": 2000, # Slower but thorough
"quality_score": 0.95 # Best quality
},
Premium Model Configuration:
- GPT-4: Highest cost but maximum accuracy and quality
- Critical use cases: Complex reasoning, high-stakes decisions
"gpt-4-turbo": {
"cost_per_token": 0.00001, # Balanced pricing
"accuracy": 0.96, # High accuracy
"avg_response_time_ms": 1500, # Faster processing
"quality_score": 0.90 # Excellent quality
},
"gpt-3.5-turbo": {
"cost_per_token": 0.0000015, # Most economical
"accuracy": 0.92, # Good accuracy
"avg_response_time_ms": 1000, # Fastest response
"quality_score": 0.85 # Good quality
}
}
Model Profile Strategy:
- GPT-4: Premium choice for critical tasks requiring maximum accuracy
- GPT-4-Turbo: Balanced option for most production workloads
- GPT-3.5-Turbo: Cost-effective choice for high-volume, less complex tasks
Step 5: Cost Calculation Methods¶
Essential methods for model cost analysis and optimization decisions.
def get_model_profiles(self) -> Dict[str, Dict[str, Any]]:
"""Get complete model cost and performance profiles"""
return self._model_profiles.copy()
def calculate_cost(self, model: str, token_count: int) -> float:
"""Calculate precise cost for model usage"""
profile = self._model_profiles.get(model)
if not profile:
raise ValueError(f"Unknown model: {model}")
return profile["cost_per_token"] * token_count
Cost Calculation Features:
- Profile access: Safe copy of model configurations
- Precise calculation: Direct token-based cost computation
- Error handling: Clear feedback for invalid model names
- Extensible design: Easy to add new models and pricing tiers
Step 6: Intelligent Model Selection Algorithm¶
The core algorithm that automatically selects the optimal model based on multiple requirements.
def get_model_by_requirements(self, accuracy_requirement: float,
max_response_time: int,
budget_constraint: float,
token_count: int) -> Optional[str]:
"""Get optimal model based on requirements and budget"""
best_model = None
best_score = float('-inf')
for model_name, profile in self._model_profiles.items():
# Check if model meets requirements
if (profile["accuracy"] >= accuracy_requirement and
profile["avg_response_time_ms"] <= max_response_time):
# Check budget constraint
estimated_cost = self.calculate_cost(model_name, token_count)
if estimated_cost <= budget_constraint:
Multi-Criteria Model Selection:
- Requirement filtering: Only considers models that meet accuracy and latency requirements
- Budget validation: Ensures cost stays within allocated budget
- Multi-factor scoring: Balances cost efficiency, quality, and performance
# Calculate optimization score (balance of cost, quality, speed)
cost_score = 1.0 / (estimated_cost + 0.001)
quality_score = profile["quality_score"]
speed_score = 1.0 / (profile["avg_response_time_ms"] / 1000.0)
combined_score = (cost_score * 0.4 + quality_score * 0.4 + speed_score * 0.2)
if combined_score > best_score:
best_score = combined_score
best_model = model_name
return best_model
Scoring Algorithm Details:
- Cost score: Higher for cheaper models (40% weight)
- Quality score: Direct quality rating (40% weight)
- Speed score: Higher for faster models (20% weight)
- Balanced optimization: Prioritizes cost and quality equally over speed
The BudgetTracker and AutomatedCostOptimizer classes have been implemented above with proper segmentation and explanations. Below we continue with the model selection optimization process.
"""Automated cost optimization system for agent operations"""
def __init__(self, budget_config: CostBudget, model_calculator: Optional[ModelCostCalculator] = None):
self._budget_tracker = BudgetTracker(budget_config)
self._model_calculator = model_calculator or ModelCostCalculator()
self._optimization_rules = {}
self._active_optimizations = []
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
AutomatedCostOptimizer Initialization:
- Budget integration: Uses BudgetTracker for real-time cost monitoring
- Model calculator: Handles cost analysis and model selection optimization
- Rule engine: Manages dynamic optimization rules and active strategies
- Logging framework: Provides comprehensive audit trail for optimization decisions
Now we configure the optimization rules that govern automated cost management:
def setup_cost_optimization_rules(self) -> Dict[str, Any]:
"""Configure automated cost optimization rules"""
optimization_config = {
"model_selection": {
"enabled": True,
"rules": [
{
"condition": "input_tokens < 500 and complexity_score < 0.3",
"action": "use_cheaper_model",
"target_model": "gpt-3.5-turbo",
"potential_savings": 0.90 # 90% cost reduction
},
{
"condition": "response_time_requirement > 2s and accuracy_tolerance > 0.95",
"action": "use_efficient_model",
"target_model": "gpt-4-turbo",
"potential_savings": 0.50
}
]
}
}
Model Selection Optimization Rules:
- Simple task optimization: Switch to GPT-3.5-turbo for small, simple requests (90% savings)
- Balanced performance: Use GPT-4-turbo when time allows but accuracy is important (50% savings)
- Condition-based switching: Automatic model selection based on request characteristics
"request_batching": {
"enabled": True,
"batch_size": 5,
"batch_timeout_ms": 100,
"cost_reduction": 0.25
},
"caching_optimization": {
"enabled": True,
"target_hit_rate": 0.70,
"cache_duration_optimization": True,
"semantic_caching": True
}
Additional Optimization Strategies:
- Request batching: Groups 5 requests with 100ms timeout for 25% cost reduction
- Caching optimization: Targets 70% hit rate with semantic similarity matching
- Duration optimization: Automatically adjusts cache TTL based on usage patterns
"resource_scheduling": {
"enabled": True,
"off_peak_hours": [22, 23, 0, 1, 2, 3, 4, 5],
"off_peak_discount": 0.30,
"workload_shifting": True
},
"budget_protection": {
"enabled": True,
"soft_limits": [0.8, 0.9], # 80%, 90% of budget
"hard_limit": 1.0, # 100% of budget
"actions": {
"0.8": ["enable_aggressive_caching", "switch_to_cheaper_models"],
"0.9": ["batch_requests", "defer_non_critical_tasks"],
"1.0": ["emergency_scale_down", "disable_non_essential_agents"]
}
}
}
self.optimization_rules = optimization_config
return optimization_config
Advanced Cost Protection Measures:
- Off-peak scheduling: 30% discount for workloads shifted to hours 22:00-05:00
- Budget protection: Graduated responses at 80%, 90%, and 100% budget utilization
- Emergency actions: Automatic scaling and service reduction when budgets are exceeded
- Workload shifting: Intelligent deferral of non-critical tasks to reduce immediate costs
Step 7: Model Selection Optimization Process¶
The automated model optimization process that balances cost, performance, and quality requirements.
async def optimize_model_selection(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Automatically select optimal model based on cost and performance requirements"""
try:
# Validate input data
if not isinstance(request_data, dict):
raise CostOptimizationException("request_data must be a dictionary")
# Analyze request characteristics
input_tokens = request_data.get("input_tokens", 0)
required_accuracy = request_data.get("accuracy_requirement", 0.95)
max_response_time = request_data.get("max_response_time_ms", 5000)
if input_tokens <= 0:
raise CostOptimizationException("input_tokens must be positive")
Request Analysis Phase:
- Input validation: Ensures request data is properly formatted and contains required fields
- Parameter extraction: Gets accuracy requirements, timing constraints, and token counts
- Constraint checking: Validates that token counts are positive for cost calculation
# Get budget constraints
budget_remaining = self._budget_tracker.get_budget_remaining_percentage("daily")
available_budget = budget_remaining * 0.1 # Use 10% of remaining daily budget for this request
# Get optimal model based on requirements
recommended_model = self._model_calculator.get_model_by_requirements(
accuracy_requirement=required_accuracy,
max_response_time=max_response_time,
budget_constraint=available_budget,
token_count=input_tokens
)
Budget-Aware Selection:
- Budget integration: Uses current budget status to limit per-request spending
- Conservative allocation: Only uses 10% of remaining daily budget per request
- Multi-criteria matching: Finds models that satisfy accuracy, speed, and cost requirements
Step 8: Successful Optimization Response¶
When a suitable model is found, we calculate comprehensive cost analysis and tracking.
if recommended_model:
# Calculate costs and savings
recommended_cost = self._model_calculator.calculate_cost(recommended_model, input_tokens)
baseline_cost = self._model_calculator.calculate_cost("gpt-4", input_tokens)
cost_savings = baseline_cost - recommended_cost
# Record the cost
self._budget_tracker.add_cost_entry(
recommended_cost,
f"Model optimization: {recommended_model} for {input_tokens} tokens"
)
Cost Analysis Process:
- Recommended cost: Actual cost for the selected optimal model
- Baseline comparison: Uses GPT-4 as the premium baseline for savings calculation
- Cost tracking: Records actual spending for budget management
- Detailed logging: Tracks optimization decisions for audit and analysis
return {
"recommended_model": recommended_model,
"estimated_cost": recommended_cost,
"cost_savings": max(0, cost_savings),
"savings_percentage": cost_savings / baseline_cost if baseline_cost > 0 else 0,
"budget_remaining": budget_remaining,
"reasoning": f"Selected {recommended_model} based on budget constraint ${available_budget:.4f}, accuracy requirement {required_accuracy}",
"optimization_applied": True
}
Success Response Structure:
- Model recommendation: The optimal model meeting all constraints
- Cost transparency: Shows exact costs and potential savings
- Budget status: Current remaining budget for monitoring
- Decision rationale: Explains why this model was selected
Step 9: Fallback Strategy Implementation¶
When no model meets all requirements, we implement a safe fallback strategy.
else:
# No model meets requirements - use fallback
fallback_model = "gpt-3.5-turbo" # Cheapest option
fallback_cost = self._model_calculator.calculate_cost(fallback_model, input_tokens)
self._budget_tracker.add_cost_entry(
fallback_cost,
f"Fallback model: {fallback_model} for {input_tokens} tokens"
)
return {
"recommended_model": fallback_model,
"estimated_cost": fallback_cost,
"cost_savings": 0,
"savings_percentage": 0,
"budget_remaining": budget_remaining,
"reasoning": "No model meets all requirements; using fallback",
"optimization_applied": False,
"warning": "Requirements could not be fully satisfied within budget constraints"
}
Fallback Strategy Benefits:
- Service continuity: Always provides a working model recommendation
- Cost control: Uses the cheapest available model when budgets are tight
- Transparent reporting: Clearly indicates when optimization wasn't possible
- Warning system: Alerts operators to budget constraint issues
except Exception as e:
self.logger.error(f"Model optimization failed: {str(e)}")
raise CostOptimizationException(f"Model optimization failed: {str(e)}") from e
Error Handling:
- Comprehensive logging: Captures all optimization failures for debugging
- Exception chaining: Preserves original error context for troubleshooting
- Graceful degradation: System continues operating even when optimization fails
Step 10: Request Batching Implementation¶
Intelligent request batching reduces per-request overhead and enables volume discounts from API providers.
async def implement_request_batching(self, pending_requests: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Implement intelligent request batching for cost optimization"""
if not self.optimization_rules.get("request_batching", {}).get("enabled"):
return {"batching_enabled": False}
batch_config = self.optimization_rules["request_batching"]
batch_size = batch_config["batch_size"]
timeout_ms = batch_config["batch_timeout_ms"]
Batching Configuration:
- Policy checking: Only executes if batching is enabled in optimization rules
- Size limits: Respects configured batch size for optimal throughput
- Timeout management: Prevents requests from waiting too long for batching
# Group requests by similarity for better batching efficiency
batches = self._group_requests_for_batching(pending_requests, batch_size)
batching_results = {
"batches_created": len(batches),
"total_requests": len(pending_requests),
"batching_efficiency": len(pending_requests) / len(batches) if batches else 0,
"estimated_cost_savings": 0.0
}
Batching Metrics:
- Batch creation: Number of groups created from pending requests
- Efficiency calculation: Measures how well requests were grouped
- Cost tracking: Prepares structure for savings calculation
# Calculate cost savings from batching
if batches:
individual_cost = len(pending_requests) * 1.0 # Baseline cost per request
batched_cost = len(batches) * 1.0 * 0.75 # 25% discount for batching
batching_results["estimated_cost_savings"] = individual_cost - batched_cost
batching_results["savings_percentage"] = (individual_cost - batched_cost) / individual_cost
return batching_results
Cost Savings Calculation:
- Individual cost: Baseline cost if each request was processed separately
- Batch discount: 25% cost reduction for batched requests
- Savings reporting: Provides both absolute and percentage savings for monitoring
def _group_requests_for_batching(self, requests: List[Dict[str, Any]],
batch_size: int) -> List[List[Dict[str, Any]]]:
"""Group similar requests for efficient batching"""
# Simple grouping by agent type and model
groups = {}
for request in requests:
agent_type = request.get("agent_type", "default")
model_name = request.get("model_name", "default")
group_key = f"{agent_type}:{model_name}"
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(request)
Request Grouping Strategy:
- Similarity-based grouping: Groups requests by agent type and model for optimal batching
- Dynamic key generation: Creates composite keys from request characteristics
- Efficient grouping: Uses dictionary structure for fast group lookup and insertion
# Create batches from groups
batches = []
for group_requests in groups.values():
for i in range(0, len(group_requests), batch_size):
batch = group_requests[i:i + batch_size]
batches.append(batch)
return batches
Batch Creation Process:
- Group iteration: Processes each request group independently for optimal organization
- Size-aware batching: Respects configured batch size limits for efficient processing
- Sequential batch creation: Creates ordered batches that maintain processing consistency
Step 11: Budget Remaining Calculation¶
Real-time budget tracking enables smart cost decisions throughout the optimization process.
def _get_budget_remaining_percentage(self) -> float:
"""Calculate remaining budget percentage"""
now = datetime.now()
# Get today's spending
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
today_spending = sum(
entry.get("cost", 0) for entry in self.cost_history
if datetime.fromisoformat(entry["timestamp"]) >= today_start
)
daily_remaining = max(0, self.budget_config.daily_limit - today_spending)
return daily_remaining / self.budget_config.daily_limit
Budget Tracking Features:
- Real-time calculation: Uses current timestamp for accurate daily budget assessment
- Spending aggregation: Sums all costs from today's transactions for precise tracking
- Safe calculations: Uses max() to prevent negative remaining budgets
- Percentage conversion: Returns 0-1 range for consistent threshold comparisons
Step 12: Emergency Cost Control System¶
Automatic emergency responses protect against budget overruns with graduated intervention strategies.
async def emergency_cost_controls(self, current_spend: float) -> Dict[str, Any]:
"""Implement emergency cost controls when budget is exceeded"""
daily_percentage = current_spend / self.budget_config.daily_limit
actions_taken = []
if daily_percentage >= self.budget_config.emergency_shutdown_threshold:
# Emergency shutdown
actions_taken.extend([
"emergency_agent_shutdown",
"disable_non_critical_services",
"enable_maximum_caching",
"switch_to_cheapest_models"
])
Emergency Response Triggers:
- Budget analysis: Calculates current spending as percentage of daily limit
- Threshold comparison: Checks against configured emergency thresholds
- Critical actions: Implements emergency shutdown when 100% budget is reached
elif daily_percentage >= self.budget_config.auto_scale_down_threshold:
# Aggressive cost reduction
actions_taken.extend([
"aggressive_scale_down",
"batch_all_requests",
"enable_aggressive_caching",
"defer_low_priority_tasks"
])
Graduated Response Strategy:
- Automatic scaling: Triggers at 95% budget utilization threshold
- Aggressive optimization: Enables all available cost reduction measures
- Service preservation: Maintains essential functionality while reducing costs
return {
"emergency_level": "critical" if daily_percentage >= 1.0 else "warning",
"budget_utilization": daily_percentage,
"actions_taken": actions_taken,
"estimated_cost_reduction": self._calculate_emergency_savings(actions_taken)
}
def _calculate_emergency_savings(self, actions: List[str]) -> float:
"""Calculate estimated cost savings from emergency actions"""
savings_map = {
"emergency_agent_shutdown": 0.80,
"aggressive_scale_down": 0.60,
"enable_maximum_caching": 0.40,
"switch_to_cheapest_models": 0.85,
"batch_all_requests": 0.25,
"defer_low_priority_tasks": 0.30
}
return max(savings_map.get(action, 0) for action in actions)
Step 11: Resource Pool Manager¶
The ResourcePoolManager handles cost-efficient agent execution with intelligent scaling and spot instance management.
class ResourcePoolManager:
"""Manage resource pools for cost-efficient agent execution"""
def __init__(self):
self.agent_pools = {}
self.connection_pools = {}
self.resource_utilization = {}
Resource Pool Architecture:
- Agent pools: Managed collections of agents with scaling policies
- Connection pools: Shared connections for efficient resource usage
- Utilization tracking: Monitors resource usage for optimization decisions
Step 12: Agent Pool Configuration¶
Agent pools provide cost-efficient scaling with sophisticated policies for different workload patterns.
def create_agent_pool(self, pool_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Create managed agent pool for resource efficiency"""
pool_config = {
"pool_name": pool_name,
"min_agents": config.get("min_agents", 2),
"max_agents": config.get("max_agents", 20),
"idle_timeout_minutes": config.get("idle_timeout", 30),
"warmup_agents": config.get("warmup_agents", 3)
}
Basic Pool Configuration:
- Size constraints: Minimum and maximum agent limits prevent under/over-provisioning
- Timeout management: Idle timeout reduces costs during low activity periods
- Warmup strategy: Pre-warmed agents reduce cold start latency
pool_config["scaling_policy"] = {
"scale_up_threshold": 0.8, # 80% utilization
"scale_down_threshold": 0.3, # 30% utilization
"scale_up_increment": 2,
"scale_down_increment": 1,
"cooldown_period_minutes": 5
}
Intelligent Scaling Policies:
- Utilization thresholds: Triggers scaling at 80% load, scales down at 30%
- Incremental scaling: Adds 2 agents up, removes 1 down for stability
- Cooldown protection: 5-minute cooldown prevents oscillation
pool_config["cost_optimization"] = {
"enable_spot_instances": config.get("spot_instances", True),
"preemptible_percentage": 0.7,
"cost_per_hour_target": config.get("cost_target", 10.0)
}
self.agent_pools[pool_name] = pool_config
return pool_config
Cost Optimization Features:
- Spot instances: 70% of fleet uses cheaper spot instances when available
- Cost targeting: Maintains specific cost per hour targets
- Preemptible strategy: Balances cost savings with reliability requirements
Step 13: Workload-Based Resource Optimization¶
Predictive resource allocation based on workload forecasting and cost optimization.
async def optimize_resource_allocation(self, workload_forecast: Dict[str, Any]) -> Dict[str, Any]:
"""Optimize resource allocation based on workload forecasting"""
optimization_plan = {
"timestamp": datetime.now().isoformat(),
"forecast_horizon_hours": workload_forecast.get("horizon_hours", 24),
"optimizations": []
}
Optimization Planning:
- Timestamp tracking: Records when optimization decisions were made
- Forecast horizon: Uses 24-hour default prediction window
- Optimization collection: Gathers all recommended changes
for pool_name, pool_config in self.agent_pools.items():
current_agents = pool_config.get("current_agents", pool_config["min_agents"])
forecasted_load = workload_forecast.get(pool_name, {}).get("expected_rps", 100)
# Calculate optimal agent count
agents_per_rps = 0.1 # Simplified: 1 agent per 10 RPS
optimal_agents = max(
pool_config["min_agents"],
min(pool_config["max_agents"], int(forecasted_load * agents_per_rps))
)
Capacity Planning Algorithm:
- Current state analysis: Evaluates existing agent allocation
- Load forecasting: Uses predicted request rates for sizing
- Capacity calculation: 1 agent per 10 RPS rule with min/max constraints
if optimal_agents != current_agents:
cost_impact = (optimal_agents - current_agents) * 0.50 # $0.50/hour per agent
optimization_plan["optimizations"].append({
"pool_name": pool_name,
"current_agents": current_agents,
"optimal_agents": optimal_agents,
"change": optimal_agents - current_agents,
"hourly_cost_impact": cost_impact,
"reason": "workload_forecast_optimization"
})
return optimization_plan
Part 3: Memory and Latency Optimization (10 minutes)¶
High-Performance Memory Management¶
🗂️ File: src/session8/performance_optimization.py
- Memory and latency optimization
Step 1: Performance Optimization Foundations¶
Memory and latency optimization requires careful resource management and intelligent algorithms.
from typing import Dict, List, Any, Optional
import asyncio
import gc # Garbage collection management
import psutil # System resource monitoring
import time # Performance timing
from dataclasses import dataclass
Critical Performance Libraries:
- asyncio: Enables non-blocking concurrent operations
- gc: Direct garbage collection control for memory optimization
- psutil: System resource monitoring and optimization
- time: Performance measurement and timing analysis
Step 2: Memory Optimization Configuration¶
Structured configuration for memory management with intelligent defaults.
@dataclass
class MemoryOptimizationConfig:
"""Comprehensive memory optimization configuration"""
max_memory_mb: int = 2048 # Maximum memory allocation
gc_threshold: float = 0.8 # Trigger GC at 80% memory usage
context_window_size: int = 4096 # Maximum context length
enable_streaming: bool = True # Enable response streaming
batch_processing: bool = True # Enable request batching
Configuration Strategy:
- Memory limits: Prevent out-of-memory crashes in production
- GC thresholds: Proactive garbage collection for consistent performance
- Context management: Balance context richness with memory efficiency
- Streaming: Reduce memory footprint for large responses
- Batching: Optimize throughput while managing memory usage
Step 3: Memory-Optimized Agent Manager¶
The central manager for memory-efficient agent execution with intelligent context management.
class MemoryOptimizedAgentManager:
"""Advanced memory-efficient agent execution manager"""
def __init__(self, config: MemoryOptimizationConfig):
self.config = config
self.memory_stats = {"peak_usage": 0, "gc_triggers": 0}
Manager Initialization:
- Configuration integration: Uses validated memory optimization settings
- Statistics tracking: Monitors memory usage patterns for optimization
- Performance baseline: Establishes initial memory footprint
Step 4: Intelligent Context Management¶
Smart context pruning that preserves important information while reducing memory usage.
### Step 4: Intelligent Context Management
Smart context pruning that preserves important information while reducing memory usage.
```python
async def optimize_context_management(self, conversation_history: List[Dict]) -> List[Dict]:
"""Optimize conversation context using intelligent pruning algorithms"""
# Fast path: no optimization needed
if len(conversation_history) <= self.config.context_window_size:
return conversation_history
# Intelligent context pruning strategy
important_messages = []
recent_messages = conversation_history[-50:] # Preserve recent context
Context Optimization Strategy:
- Fast path optimization: Avoids unnecessary processing when context is already optimal
- Dual retention: Keeps both recent messages and historically important ones
- Configurable limits: Uses context window size from configuration
# Extract high-value messages from older history
for msg in conversation_history[:-50]:
importance_score = self._calculate_message_importance(msg)
if importance_score > 0.7: # Keep important messages
important_messages.append(msg)
# Combine important historical context with recent messages
optimized_context = important_messages + recent_messages
Importance-Based Filtering:
- Selective retention: Only keeps messages scoring above 0.7 importance threshold
- Historical preservation: Analyzes older messages for lasting value
- Context combination: Merges important historical and recent messages
# Final size check and truncation if needed
if len(optimized_context) > self.config.context_window_size:
optimized_context = optimized_context[-self.config.context_window_size:]
return optimized_context
Final Optimization Pass:
- Size enforcement: Ensures context doesn't exceed configured limits
- Recency bias: Keeps most recent messages when truncation is needed
- Memory efficiency: Prevents unbounded context growth
Context Optimization Strategy:
- Importance-based retention: Keeps high-value messages regardless of age
- Recency bias: Always preserves recent conversation context
- Size constraints: Respects configured memory limits
- Semantic preservation: Maintains conversation coherence
Step 5: Message Importance Scoring¶
Algorithm for determining which messages to retain during context optimization.
def _calculate_message_importance(self, message: Dict[str, Any]) -> float:
"""Calculate importance score using multiple factors"""
content = message.get("content", "")
role = message.get("role", "user")
# Base importance score
importance = 0.5
# System messages are critical for context
if role == "system":
importance += 0.3
Base Scoring Logic:
- Starting value: 0.5 provides neutral baseline for all messages
- Role weighting: System messages get +0.3 bonus for context preservation
- Incremental scoring: Multiple factors can contribute to final importance
# Keyword-based importance detection
important_keywords = ["important", "remember", "context", "reference"]
for keyword in important_keywords:
if keyword.lower() in content.lower():
importance += 0.2
break
# Length-based importance heuristic
if len(content) > 200:
importance += 0.1
return min(importance, 1.0) # Cap at 1.0
Importance Scoring Factors:
- Role-based: System messages get higher priority
- Keyword detection: Messages with explicit importance markers
- Content length: Longer messages often contain more context
- Bounded scoring: Ensures consistent 0-1 range for comparison
Step 6: Latency Optimization System¶
Advanced latency optimization through connection pooling and request batching.
class LatencyOptimizer:
"""Advanced latency optimization system with connection pooling"""
def __init__(self):
self.connection_pools = {} # Model endpoint connection pools
self.request_cache = {} # Request-level caching
self.batch_processor = None # Batch processing engine
Latency Optimizer Components:
- Connection pools: Reuse HTTP connections for faster requests
- Request cache: Cache identical requests to eliminate redundant calls
- Batch processor: Group requests for improved throughput
Step 7: Connection Pool Setup¶
Connection pooling dramatically reduces latency by reusing HTTP connections across requests.
async def setup_connection_pooling(self, model_endpoints: Dict[str, str]):
"""Setup connection pooling for model endpoints"""
import aiohttp
for model_name, endpoint in model_endpoints.items():
# Create connection pool for each model endpoint
connector = aiohttp.TCPConnector(
limit=20, # Max connections
limit_per_host=10,
keepalive_timeout=30,
enable_cleanup_closed=True
)
Connection Pool Configuration:
- Connection limits: 20 total connections, 10 per host prevents resource exhaustion
- Keepalive optimization: 30-second timeout maintains warm connections
- Cleanup management: Automatic cleanup prevents connection leaks
timeout = aiohttp.ClientTimeout(total=30)
session = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
self.connection_pools[model_name] = session
Session Management:
- Timeout control: 30-second total timeout prevents hanging requests
- Per-model pools: Separate connection pools for different model endpoints
- Session reuse: Each model gets a dedicated session for optimal performance
Step 8: Request Batching Implementation¶
Request batching reduces per-request overhead and improves overall throughput.
async def implement_request_batching(self, requests: List[Dict[str, Any]]) -> List[Any]:
"""Implement request batching for reduced latency overhead"""
# Group requests by model for efficient batching
model_groups = {}
for i, request in enumerate(requests):
model = request.get("model", "default")
if model not in model_groups:
model_groups[model] = []
model_groups[model].append((i, request))
Request Grouping Strategy:
- Model-based grouping: Groups requests by target model for optimal batching
- Index tracking: Maintains original request order for result reassembly
- Flexible grouping: Handles multiple model types in a single batch operation
# Process batches concurrently
all_results = [None] * len(requests)
batch_tasks = []
for model, model_requests in model_groups.items():
task = self._process_model_batch(model, model_requests)
batch_tasks.append(task)
batch_results = await asyncio.gather(*batch_tasks)
Concurrent Processing:
- Parallel execution: Processes different model groups simultaneously
- Task management: Creates async tasks for each model group
- Result collection: Uses gather() for efficient concurrent execution
# Reassemble results in original order
for batch_result in batch_results:
for original_index, result in batch_result:
all_results[original_index] = result
return all_results
Result Assembly:
- Order preservation: Maintains original request order in results
- Index mapping: Uses stored indices to place results correctly
- Complete response: Returns all results in expected sequence
Step 9: Model Batch Processing Implementation¶
Processes batched requests for specific models with connection pooling and fallback strategies.
async def _process_model_batch(self, model: str,
requests: List[Tuple[int, Dict[str, Any]]]) -> List[Tuple[int, Any]]:
"""Process batch of requests for a specific model"""
if model in self.connection_pools:
session = self.connection_pools[model]
# Prepare batch payload
batch_payload = {
"model": model,
"requests": [req for _, req in requests]
}
Batch Preparation Process:
- Connection verification: Ensures pooled connection exists for the model
- Payload construction: Creates structured batch request with model identification
- Request extraction: Extracts request data while preserving index mapping
# Execute batch request
async with session.post("/v1/batch", json=batch_payload) as response:
batch_response = await response.json()
# Map results back to original indices
results = []
for i, (original_index, _) in enumerate(requests):
result = batch_response.get("results", [])[i] if i < len(batch_response.get("results", [])) else None
results.append((original_index, result))
return results
Batch Execution and Response Handling:
- Async HTTP execution: Uses connection pool for efficient batch processing
- Response parsing: Handles JSON response from batch endpoint
- Index preservation: Maps batch results back to original request positions
- Error resilience: Safely handles missing or incomplete batch results
# Fallback: process individually
results = []
for original_index, request in requests:
# Simulate individual processing
result = {"status": "processed", "request_id": request.get("id")}
results.append((original_index, result))
return results
Module Summary¶
You've now mastered performance optimization for production Agno systems:
✅ Intelligent Caching: Implemented multi-layer caching with semantic similarity and deduplication
✅ Cost Optimization: Built automated cost management with model selection and budget protection
✅ Resource Pooling: Created efficient agent pools with spot instance integration
✅ Memory Management: Designed memory-efficient context handling with intelligent pruning
✅ Latency Optimization: Implemented connection pooling and request batching for reduced overhead
Next Steps¶
- Continue to Module D: Security & Compliance for enterprise security
- Return to Core: Session 8 Main
- Review Module A: Advanced Monitoring & Observability
📝 Multiple Choice Test - Module C¶
Test your understanding of Performance Optimization:
Question 1: What is the primary advantage of implementing multi-layer caching with L1 memory and L2 Redis? A) It reduces development complexity
B) It provides both ultra-fast access and distributed caching capabilities
C) It eliminates the need for TTL management
D) It automatically handles all cache invalidation
Question 2: In the intelligent cache eviction algorithm, what factors are used to calculate the eviction score? A) Only time since last access
B) Only access frequency
C) Both recency score and frequency score combined
D) Random selection for fairness
Question 3: What is the cost optimization strategy when daily budget utilization reaches 95%? A) Continue normal operations
B) Send alerts but maintain current scaling
C) Trigger automatic scaling down of non-critical services
D) Completely shut down all agent services
Question 4: How does the intelligent model selection algorithm balance different requirements? A) It only considers cost (100% weight)
B) It uses equal weights for all factors
C) It weights cost and quality at 40% each, speed at 20%
D) It prioritizes speed above all other factors
Question 5: What is the purpose of connection pooling in the latency optimization system? A) To increase security through connection encryption
B) To reduce latency by reusing HTTP connections across requests
C) To automatically retry failed requests
D) To load balance requests across multiple servers
🗂️ Source Files for Module C:
src/session8/intelligent_caching.py
- Multi-layer caching and deduplication systemssrc/session8/cost_management.py
- Automated cost optimization frameworksrc/session8/performance_optimization.py
- Memory and latency optimization techniques