Skip to content

⚙️ Session 8 Module C: Performance & Production Validation

⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer and 📝 Participant paths in Session 8 Time Investment: 2-3 hours Outcome: Master performance optimization, cost management, and comprehensive production validation strategies

Advanced Learning Outcomes

After completing this module, you will master:

  • Advanced performance optimization techniques for data processing agents
  • Cost management and resource optimization strategies
  • Comprehensive production validation and testing methodologies
  • Complete assessment and testing frameworks for production systems

Advanced Performance Optimization

Circuit Breaker Patterns for Data Processing

When processing large volumes of data, circuit breakers prevent cascading failures by monitoring system health and automatically stopping requests when error rates exceed thresholds:

import asyncio
from enum import Enum
from datetime import datetime, timedelta
import logging

class CircuitBreakerState(Enum):
    CLOSED = "closed"     # Normal operation
    OPEN = "open"         # Blocking requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class DataProcessingCircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitBreakerState.CLOSED

Implement circuit breaker logic for data processing operations:

    async def call(self, func, *args, **kwargs):
        """Execute function through circuit breaker."""
        if self.state == CircuitBreakerState.OPEN:
            # Check if recovery timeout has passed
            if (datetime.now() - self.last_failure_time).seconds > self.recovery_timeout:
                self.state = CircuitBreakerState.HALF_OPEN
                logging.info("Circuit breaker moving to HALF_OPEN state")
            else:
                raise Exception("Circuit breaker is OPEN - service unavailable")

        try:
            # Execute the function
            result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)

            # Success - reset failure count if in HALF_OPEN
            if self.state == CircuitBreakerState.HALF_OPEN:
                self.state = CircuitBreakerState.CLOSED
                self.failure_count = 0
                logging.info("Circuit breaker reset to CLOSED state")

            return result

Handle failures and state transitions:

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()

            # Check if threshold exceeded
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitBreakerState.OPEN
                logging.error(f"Circuit breaker opened after {self.failure_count} failures")

            raise e

# Usage with data processing agent
class CircuitBreakerDataAgent:
    def __init__(self, agent: Agent):
        self.agent = agent
        self.circuit_breaker = DataProcessingCircuitBreaker()

    async def safe_process(self, query: str):
        """Process data through circuit breaker."""
        return await self.circuit_breaker.call(self.agent.arun, query)

Advanced Caching Strategies

Implement multi-tier caching for optimal performance in data processing workflows:

import hashlib
import json
from typing import Optional, Dict, Any

class MultiTierCacheManager:
    def __init__(self):
        self.memory_cache = {}  # L1 cache
        self.redis_cache = None  # L2 cache
        self.memory_cache_size = 1000
        self.cache_ttl = {
            "query_results": 3600,     # 1 hour
            "user_sessions": 1800,     # 30 minutes
            "data_patterns": 7200,     # 2 hours
        }

Implement intelligent cache key generation and retrieval:

    def _generate_cache_key(self, prefix: str, data: Dict[str, Any]) -> str:
        """Generate consistent cache key from data."""
        # Sort keys for consistent hashing
        sorted_data = json.dumps(data, sort_keys=True)
        hash_object = hashlib.md5(sorted_data.encode())
        return f"{prefix}:{hash_object.hexdigest()}"

    async def get_cached_result(self, cache_type: str, query_data: Dict[str, Any]) -> Optional[str]:
        """Retrieve from multi-tier cache."""
        cache_key = self._generate_cache_key(cache_type, query_data)

        # Check L1 cache (memory) first
        if cache_key in self.memory_cache:
            logging.debug(f"Cache hit (L1): {cache_key}")
            return self.memory_cache[cache_key]["data"]

        # Check L2 cache (Redis) if available
        if self.redis_cache:
            cached_data = await self.redis_cache.get(cache_key)
            if cached_data:
                logging.debug(f"Cache hit (L2): {cache_key}")
                # Promote to L1 cache
                self._store_in_memory_cache(cache_key, cached_data)
                return cached_data

        logging.debug(f"Cache miss: {cache_key}")
        return None

Implement cache storage with TTL and eviction policies:

    async def store_cached_result(self, cache_type: str, query_data: Dict[str, Any], result: str):
        """Store result in multi-tier cache."""
        cache_key = self._generate_cache_key(cache_type, query_data)
        ttl = self.cache_ttl.get(cache_type, 3600)

        # Store in L1 cache (memory)
        self._store_in_memory_cache(cache_key, result)

        # Store in L2 cache (Redis) if available
        if self.redis_cache:
            await self.redis_cache.setex(cache_key, ttl, result)

    def _store_in_memory_cache(self, key: str, data: str):
        """Store in memory cache with LRU eviction."""
        if len(self.memory_cache) >= self.memory_cache_size:
            # Simple LRU eviction - remove oldest entry
            oldest_key = min(self.memory_cache.keys(),
                           key=lambda k: self.memory_cache[k]["timestamp"])
            del self.memory_cache[oldest_key]

        self.memory_cache[key] = {
            "data": data,
            "timestamp": datetime.now()
        }

# Usage with data processing agent
class CachedDataAgent:
    def __init__(self, agent: Agent):
        self.agent = agent
        self.cache_manager = MultiTierCacheManager()

    async def process_with_cache(self, query: str, session_id: str = None):
        """Process query with intelligent caching."""
        query_data = {"query": query, "session_id": session_id}

        # Check cache first
        cached_result = await self.cache_manager.get_cached_result("query_results", query_data)
        if cached_result:
            return cached_result

        # Process with agent
        response = await self.agent.arun(query, session_id=session_id)

        # Cache the result
        await self.cache_manager.store_cached_result("query_results", query_data, response.content)

        return response.content

Load Testing and Performance Benchmarking

Implement comprehensive load testing for production data processing agents:

import aiohttp
import asyncio
import time
from dataclasses import dataclass
from typing import List
import statistics

@dataclass
class LoadTestResult:
    total_requests: int
    successful_requests: int
    failed_requests: int
    avg_response_time: float
    min_response_time: float
    max_response_time: float
    requests_per_second: float
    percentile_95: float
    percentile_99: float

class DataAgentLoadTester:
    def __init__(self, base_url: str, auth_token: str = None):
        self.base_url = base_url
        self.auth_token = auth_token
        self.results = []

Implement concurrent request execution with metrics collection:

    async def execute_request(self, session: aiohttp.ClientSession, request_data: dict):
        """Execute single request and measure performance."""
        start_time = time.time()
        headers = {"Authorization": f"Bearer {self.auth_token}"} if self.auth_token else {}
        headers["Content-Type"] = "application/json"

        try:
            async with session.post(
                f"{self.base_url}/process-data",
                json=request_data,
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                await response.text()  # Consume response
                end_time = time.time()

                return {
                    "success": response.status == 200,
                    "status_code": response.status,
                    "response_time": end_time - start_time,
                    "error": None
                }
        except Exception as e:
            end_time = time.time()
            return {
                "success": False,
                "status_code": 0,
                "response_time": end_time - start_time,
                "error": str(e)
            }

Execute load test with configurable parameters:

    async def run_load_test(self,
                          concurrent_users: int = 10,
                          requests_per_user: int = 10,
                          test_queries: List[str] = None) -> LoadTestResult:
        """Run comprehensive load test."""
        if not test_queries:
            test_queries = [
                "Analyze customer behavior patterns",
                "Generate sales report summary",
                "Process transaction data analysis",
                "Create data quality assessment"
            ]

        # Prepare test data
        test_requests = []
        for user in range(concurrent_users):
            for req in range(requests_per_user):
                query = test_queries[req % len(test_queries)]
                test_requests.append({
                    "data_query": query,
                    "pipeline_id": f"load_test_{user}_{req}"
                })

        # Execute load test
        start_time = time.time()
        async with aiohttp.ClientSession() as session:
            tasks = [self.execute_request(session, req) for req in test_requests]
            results = await asyncio.gather(*tasks, return_exceptions=True)

        end_time = time.time()

        # Process results
        return self._analyze_results(results, end_time - start_time)

Analyze and report performance metrics:

    def _analyze_results(self, results: List[dict], total_duration: float) -> LoadTestResult:
        """Analyze load test results and calculate metrics."""
        successful_results = [r for r in results if isinstance(r, dict) and r["success"]]
        failed_results = [r for r in results if isinstance(r, dict) and not r["success"]]

        response_times = [r["response_time"] for r in successful_results]

        if not response_times:
            raise ValueError("No successful requests in load test")

        # Calculate percentiles
        sorted_times = sorted(response_times)
        percentile_95 = sorted_times[int(0.95 * len(sorted_times))]
        percentile_99 = sorted_times[int(0.99 * len(sorted_times))]

        return LoadTestResult(
            total_requests=len(results),
            successful_requests=len(successful_results),
            failed_requests=len(failed_results),
            avg_response_time=statistics.mean(response_times),
            min_response_time=min(response_times),
            max_response_time=max(response_times),
            requests_per_second=len(results) / total_duration,
            percentile_95=percentile_95,
            percentile_99=percentile_99
        )

# Usage example
async def run_performance_test():
    tester = DataAgentLoadTester("http://localhost:8000", "your-auth-token")
    result = await tester.run_load_test(
        concurrent_users=50,
        requests_per_user=20
    )

    print(f"Load Test Results:")
    print(f"Total Requests: {result.total_requests}")
    print(f"Success Rate: {(result.successful_requests / result.total_requests) * 100:.2f}%")
    print(f"Average Response Time: {result.avg_response_time:.3f}s")
    print(f"95th Percentile: {result.percentile_95:.3f}s")
    print(f"Requests/Second: {result.requests_per_second:.2f}")

Cost Optimization Strategies

Resource Usage Monitoring and Optimization

Implement comprehensive cost tracking and optimization for production data processing agents:

import psutil
import time
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class ResourceUsage:
    timestamp: float
    cpu_percent: float
    memory_mb: float
    disk_io_mb: float
    network_io_mb: float
    active_connections: int

class CostOptimizationManager:
    def __init__(self):
        self.usage_history: List[ResourceUsage] = []
        self.cost_per_hour = {
            "cpu_core": 0.05,      # $0.05 per CPU core hour
            "memory_gb": 0.01,     # $0.01 per GB memory hour
            "disk_io_gb": 0.001,   # $0.001 per GB disk I/O
            "network_gb": 0.001    # $0.001 per GB network I/O
        }

Monitor and collect resource usage metrics:

    def collect_resource_usage(self) -> ResourceUsage:
        """Collect current resource usage metrics."""
        # CPU usage
        cpu_percent = psutil.cpu_percent(interval=1)

        # Memory usage
        memory = psutil.virtual_memory()
        memory_mb = memory.used / (1024 * 1024)

        # Disk I/O (simplified)
        disk_io = psutil.disk_io_counters()
        disk_io_mb = (disk_io.read_bytes + disk_io.write_bytes) / (1024 * 1024)

        # Network I/O
        network_io = psutil.net_io_counters()
        network_io_mb = (network_io.bytes_sent + network_io.bytes_recv) / (1024 * 1024)

        # Active connections (simplified)
        connections = len(psutil.net_connections())

        usage = ResourceUsage(
            timestamp=time.time(),
            cpu_percent=cpu_percent,
            memory_mb=memory_mb,
            disk_io_mb=disk_io_mb,
            network_io_mb=network_io_mb,
            active_connections=connections
        )

        self.usage_history.append(usage)

        # Keep only last 24 hours of data
        cutoff_time = time.time() - (24 * 3600)
        self.usage_history = [u for u in self.usage_history if u.timestamp > cutoff_time]

        return usage

Calculate costs and optimization recommendations:

    def calculate_hourly_cost(self) -> Dict[str, float]:
        """Calculate estimated hourly costs based on usage."""
        if not self.usage_history:
            return {"total": 0.0}

        # Get average usage over last hour
        recent_usage = [u for u in self.usage_history if u.timestamp > (time.time() - 3600)]

        if not recent_usage:
            return {"total": 0.0}

        avg_cpu = sum(u.cpu_percent for u in recent_usage) / len(recent_usage)
        avg_memory_gb = sum(u.memory_mb for u in recent_usage) / len(recent_usage) / 1024
        avg_disk_io_gb = sum(u.disk_io_mb for u in recent_usage) / len(recent_usage) / 1024
        avg_network_gb = sum(u.network_io_mb for u in recent_usage) / len(recent_usage) / 1024

        # Calculate costs (assuming 8 CPU cores available)
        cpu_cores_used = (avg_cpu / 100) * 8
        costs = {
            "cpu": cpu_cores_used * self.cost_per_hour["cpu_core"],
            "memory": avg_memory_gb * self.cost_per_hour["memory_gb"],
            "disk_io": avg_disk_io_gb * self.cost_per_hour["disk_io_gb"],
            "network_io": avg_network_gb * self.cost_per_hour["network_gb"]
        }

        costs["total"] = sum(costs.values())
        return costs

    def get_optimization_recommendations(self) -> List[str]:
        """Generate cost optimization recommendations."""
        recommendations = []

        if not self.usage_history:
            return recommendations

        recent_usage = [u for u in self.usage_history if u.timestamp > (time.time() - 3600)]

        if not recent_usage:
            return recommendations

        avg_cpu = sum(u.cpu_percent for u in recent_usage) / len(recent_usage)
        avg_memory_gb = sum(u.memory_mb for u in recent_usage) / len(recent_usage) / 1024

        # CPU optimization recommendations
        if avg_cpu < 20:
            recommendations.append("CPU usage is low - consider reducing instance size")
        elif avg_cpu > 80:
            recommendations.append("CPU usage is high - consider scaling horizontally or increasing instance size")

        # Memory optimization recommendations
        if avg_memory_gb < 2:
            recommendations.append("Memory usage is low - consider smaller instance type")
        elif avg_memory_gb > 14:  # Assuming 16GB instances
            recommendations.append("Memory usage is high - consider memory optimization or larger instances")

        return recommendations

Integration with data processing agent for cost tracking:

class CostOptimizedDataAgent:
    def __init__(self, agent: Agent):
        self.agent = agent
        self.cost_manager = CostOptimizationManager()
        self.processing_count = 0

    async def process_with_cost_tracking(self, query: str):
        """Process query while tracking resource costs."""
        # Collect pre-processing usage
        pre_usage = self.cost_manager.collect_resource_usage()
        start_time = time.time()

        try:
            # Process with agent
            response = await self.agent.arun(query)

            # Collect post-processing usage
            post_usage = self.cost_manager.collect_resource_usage()
            processing_time = time.time() - start_time

            # Log cost information
            self.processing_count += 1
            if self.processing_count % 10 == 0:  # Log every 10 requests
                costs = self.cost_manager.calculate_hourly_cost()
                recommendations = self.cost_manager.get_optimization_recommendations()

                logging.info(f"Cost Analysis - Hourly estimate: ${costs['total']:.4f}")
                for rec in recommendations:
                    logging.info(f"Optimization: {rec}")

            return response.content

        except Exception as e:
            # Still collect usage data on errors
            self.cost_manager.collect_resource_usage()
            raise

Comprehensive Test Suite for Production Systems

Create a complete testing framework that validates all aspects of production readiness:

import unittest
import asyncio
from typing import Dict, List, Any

class ProductionReadinessTestSuite:
    def __init__(self, agent: Agent, base_url: str = None):
        self.agent = agent
        self.base_url = base_url or "http://localhost:8000"
        self.test_results: Dict[str, Any] = {}

Implement comprehensive functionality tests:

    async def test_basic_functionality(self) -> Dict[str, bool]:
        """Test basic agent functionality."""
        tests = {
            "agent_responds": False,
            "handles_simple_query": False,
            "handles_complex_query": False,
            "maintains_session": False,
            "error_handling": False
        }

        try:
            # Test 1: Basic response
            response = await self.agent.arun("Hello, how are you?")
            tests["agent_responds"] = bool(response.content)

            # Test 2: Simple data query
            response = await self.agent.arun("What is data processing?")
            tests["handles_simple_query"] = len(response.content) > 50

            # Test 3: Complex data query
            complex_query = "Analyze the relationship between customer demographics and purchase patterns in e-commerce data"
            response = await self.agent.arun(complex_query)
            tests["handles_complex_query"] = len(response.content) > 100

            # Test 4: Session maintenance
            session_id = "test_session_123"
            await self.agent.arun("Remember my name is Alice", session_id=session_id)
            response = await self.agent.arun("What is my name?", session_id=session_id)
            tests["maintains_session"] = "Alice" in response.content or "alice" in response.content.lower()

            # Test 5: Error handling
            try:
                await self.agent.arun("", session_id="test")  # Empty query
                tests["error_handling"] = True  # Should handle gracefully
            except Exception:
                tests["error_handling"] = True  # Expected behavior

        except Exception as e:
            logging.error(f"Functionality test failed: {e}")

        return tests

Test security and authentication features:

    async def test_security_features(self) -> Dict[str, bool]:
        """Test security and authentication features."""
        tests = {
            "authentication_required": False,
            "invalid_token_rejected": False,
            "rate_limiting_works": False,
            "input_validation": False,
            "no_sensitive_data_leakage": False
        }

        if not self.base_url:
            return tests  # Skip if no API endpoint

        try:
            import aiohttp

            async with aiohttp.ClientSession() as session:
                # Test 1: Authentication required
                async with session.post(f"{self.base_url}/process-data",
                                      json={"data_query": "test", "pipeline_id": "test"}) as response:
                    tests["authentication_required"] = response.status == 401

                # Test 2: Invalid token rejection
                headers = {"Authorization": "Bearer invalid_token"}
                async with session.post(f"{self.base_url}/process-data",
                                      json={"data_query": "test", "pipeline_id": "test"},
                                      headers=headers) as response:
                    tests["invalid_token_rejected"] = response.status == 401

                # Test 3: Input validation
                async with session.post(f"{self.base_url}/process-data",
                                      json={"invalid_field": "test"}) as response:
                    tests["input_validation"] = response.status == 422  # Validation error

        except Exception as e:
            logging.error(f"Security test failed: {e}")

        return tests

Performance and scalability testing:

    async def test_performance_requirements(self) -> Dict[str, bool]:
        """Test performance and scalability requirements."""
        tests = {
            "response_time_acceptable": False,
            "handles_concurrent_requests": False,
            "memory_usage_reasonable": False,
            "no_memory_leaks": False
        }

        try:
            # Test 1: Response time
            start_time = time.time()
            await self.agent.arun("Quick test query")
            response_time = time.time() - start_time
            tests["response_time_acceptable"] = response_time < 5.0  # 5 second threshold

            # Test 2: Concurrent requests
            async def concurrent_request():
                return await self.agent.arun(f"Test concurrent query {time.time()}")

            concurrent_tasks = [concurrent_request() for _ in range(5)]
            responses = await asyncio.gather(*concurrent_tasks, return_exceptions=True)
            successful_responses = [r for r in responses if not isinstance(r, Exception)]
            tests["handles_concurrent_requests"] = len(successful_responses) >= 4  # 80% success rate

            # Test 3: Memory usage
            import psutil
            process = psutil.Process()
            memory_before = process.memory_info().rss

            # Process several queries
            for i in range(10):
                await self.agent.arun(f"Memory test query {i}")

            memory_after = process.memory_info().rss
            memory_increase_mb = (memory_after - memory_before) / (1024 * 1024)
            tests["memory_usage_reasonable"] = memory_increase_mb < 100  # Less than 100MB increase
            tests["no_memory_leaks"] = memory_increase_mb < 50  # Less than 50MB for no leaks

        except Exception as e:
            logging.error(f"Performance test failed: {e}")

        return tests

Execute complete test suite and generate report:

    async def run_complete_assessment(self) -> Dict[str, Any]:
        """Run complete production readiness assessment."""
        print("🔍 Running Production Readiness Assessment...")

        # Run all test categories
        functionality_results = await self.test_basic_functionality()
        security_results = await self.test_security_features()
        performance_results = await self.test_performance_requirements()

        # Combine results
        all_results = {
            "functionality": functionality_results,
            "security": security_results,
            "performance": performance_results
        }

        # Calculate scores
        scores = {}
        total_passed = 0
        total_tests = 0

        for category, tests in all_results.items():
            passed = sum(1 for result in tests.values() if result)
            total = len(tests)
            scores[category] = {
                "passed": passed,
                "total": total,
                "percentage": (passed / total) * 100 if total > 0 else 0
            }
            total_passed += passed
            total_tests += total

        overall_score = (total_passed / total_tests) * 100 if total_tests > 0 else 0

        # Generate report
        self._print_assessment_report(all_results, scores, overall_score)

        return {
            "results": all_results,
            "scores": scores,
            "overall_score": overall_score,
            "production_ready": overall_score >= 85
        }

Generate detailed assessment report:

    def _print_assessment_report(self, results: Dict[str, Dict[str, bool]],
                                scores: Dict[str, Dict[str, Any]], overall_score: float):
        """Print detailed assessment report."""
        print("
" + "="*60)
        print("📊 PRODUCTION READINESS ASSESSMENT REPORT")
        print("="*60)

        for category, tests in results.items():
            category_score = scores[category]
            print(f"
🔸 {category.upper()}: {category_score['passed']}/{category_score['total']} ({category_score['percentage']:.1f}%)")

            for test_name, passed in tests.items():
                status = "✅ PASS" if passed else "❌ FAIL"
                print(f"  {status} {test_name.replace('_', ' ').title()}")

        print(f"
🎯 OVERALL SCORE: {overall_score:.1f}%")

        if overall_score >= 85:
            print("🎉 PRODUCTION READY! System meets production requirements.")
        elif overall_score >= 70:
            print("⚠️  NEEDS IMPROVEMENT: Address failing tests before production.")
        else:
            print("🚫 NOT PRODUCTION READY: Significant issues need resolution.")

        print("="*60)

# Usage example
async def run_production_assessment():
    # Create test suite
    test_suite = ProductionReadinessTestSuite(
        agent=data_production_agent,
        base_url="http://localhost:8000"
    )

    # Run complete assessment
    results = await test_suite.run_complete_assessment()

    # Save results for reporting
    with open("production_assessment_report.json", "w") as f:
        import json
        json.dump(results, f, indent=2)

View Solutions →


📝 Multiple Choice Test - Session 8



View Solutions →

Previous: Session 7 - Agent Systems →
Next: Session 9 - Multi-Agent Coordination →