Skip to content

⚙️ Session 2: Advanced Agent Architecture

⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer Path and 📝 Participant Path Time Investment: 2-3 hours Outcome: Master sophisticated agent orchestration patterns and enterprise architecture

Advanced Learning Outcomes

After completing this advanced architecture module, you will master:

  • Sophisticated agent orchestration patterns for complex data workflows
  • Multi-agent coordination strategies for distributed data systems
  • Advanced error recovery and fault tolerance mechanisms
  • Performance optimization techniques for high-scale agent deployments
  • Enterprise integration patterns for production data infrastructure

Advanced Orchestration Patterns

Beyond simple sequential chains, enterprise data systems require sophisticated orchestration that can handle conditional logic, parallel processing, and dynamic routing based on data characteristics and system state.

Conditional Chain Execution

Real-world data processing often requires different analytical approaches based on data characteristics, system state, or business rules:

from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.schema import BaseOutputParser
import json

class DataAnalysisRouter:
    """Route data analysis based on dataset characteristics"""

    def __init__(self, llm):
        self.llm = llm
        self.routing_chains = self._create_routing_chains()

    def _create_routing_chains(self):
        """Create specialized chains for different data analysis scenarios"""

        # Real-time streaming data analysis
        streaming_template = """
        You are analyzing real-time streaming data with these characteristics:
        - High velocity: {events_per_second} events/second
        - Data freshness: {latency_ms}ms latency
        - Quality issues: {error_rate}% error rate

        Dataset: {dataset_info}

        Provide streaming-optimized analysis focusing on:
        1. Real-time anomaly detection
        2. Performance bottlenecks
        3. Immediate action recommendations
        """

        streaming_chain = LLMChain(
            llm=self.llm,
            prompt=PromptTemplate(
                template=streaming_template,
                input_variables=["events_per_second", "latency_ms", "error_rate", "dataset_info"]
            )
        )

        # Batch processing data analysis
        batch_template = """
        You are analyzing batch-processed data with these characteristics:
        - Volume: {data_volume} records processed
        - Processing time: {processing_hours} hours
        - Success rate: {success_rate}%

        Dataset: {dataset_info}

        Provide batch-optimized analysis focusing on:
        1. Processing efficiency and optimization opportunities
        2. Data quality trends over time
        3. Scalability recommendations
        """

        batch_chain = LLMChain(
            llm=self.llm,
            prompt=PromptTemplate(
                template=batch_template,
                input_variables=["data_volume", "processing_hours", "success_rate", "dataset_info"]
            )
        )

        return {
            "streaming": streaming_chain,
            "batch": batch_chain
        }

    def route_analysis(self, data_characteristics, dataset_info):
        """Route to appropriate analysis chain based on data characteristics"""

        processing_type = data_characteristics.get("processing_type", "batch")

        if processing_type == "streaming":
            return self.routing_chains["streaming"].run({
                "events_per_second": data_characteristics.get("events_per_second", 0),
                "latency_ms": data_characteristics.get("latency_ms", 0),
                "error_rate": data_characteristics.get("error_rate", 0),
                "dataset_info": dataset_info
            })
        else:
            return self.routing_chains["batch"].run({
                "data_volume": data_characteristics.get("data_volume", 0),
                "processing_hours": data_characteristics.get("processing_hours", 0),
                "success_rate": data_characteristics.get("success_rate", 100),
                "dataset_info": dataset_info
            })

Parallel Chain Execution

For comprehensive data analysis, execute multiple analytical approaches simultaneously to provide diverse perspectives:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any

class ParallelDataAnalyzer:
    """Execute multiple data analysis chains in parallel for comprehensive insights"""

    def __init__(self, llm):
        self.llm = llm
        self.analysis_chains = self._create_analysis_chains()
        self.executor = ThreadPoolExecutor(max_workers=4)

    def _create_analysis_chains(self):
        """Create specialized analysis chains for different perspectives"""

        chains = {}

        # Performance analysis chain
        perf_template = """
        Analyze the performance characteristics of this data system:
        {system_metrics}

        Focus on: throughput, latency, resource utilization, and optimization opportunities.
        """

        chains["performance"] = LLMChain(
            llm=self.llm,
            prompt=PromptTemplate(
                template=perf_template,
                input_variables=["system_metrics"]
            )
        )

        # Quality analysis chain
        quality_template = """
        Analyze the data quality characteristics of this dataset:
        {quality_metrics}

        Focus on: completeness, accuracy, consistency, and validity issues.
        """

        chains["quality"] = LLMChain(
            llm=self.llm,
            prompt=PromptTemplate(
                template=quality_template,
                input_variables=["quality_metrics"]
            )
        )

        # Security analysis chain
        security_template = """
        Analyze the security and compliance aspects of this data system:
        {security_metrics}

        Focus on: access controls, data privacy, encryption, and compliance requirements.
        """

        chains["security"] = LLMChain(
            llm=self.llm,
            prompt=PromptTemplate(
                template=security_template,
                input_variables=["security_metrics"]
            )
        )

        # Business impact analysis chain
        business_template = """
        Analyze the business impact and ROI of this data system:
        {business_metrics}

        Focus on: value generation, cost efficiency, business outcomes, and strategic alignment.
        """

        chains["business"] = LLMChain(
            llm=self.llm,
            prompt=PromptTemplate(
                template=business_template,
                input_variables=["business_metrics"]
            )
        )

        return chains

    def _execute_chain(self, chain_name: str, chain: LLMChain, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Execute a single analysis chain with error handling"""
        try:
            result = chain.run(inputs)
            return {
                "chain": chain_name,
                "status": "success",
                "result": result,
                "timestamp": datetime.now().isoformat()
            }
        except Exception as e:
            return {
                "chain": chain_name,
                "status": "error",
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }

    def analyze_parallel(self, analysis_inputs: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
        """Execute all analysis chains in parallel and combine results"""

        # Prepare futures for parallel execution
        futures = []
        for chain_name, chain in self.analysis_chains.items():
            if chain_name in analysis_inputs:
                future = self.executor.submit(
                    self._execute_chain,
                    chain_name,
                    chain,
                    analysis_inputs[chain_name]
                )
                futures.append(future)

        # Collect results
        results = {}
        successful_analyses = []
        failed_analyses = []

        for future in futures:
            try:
                result = future.result(timeout=30)  # 30 second timeout per chain
                results[result["chain"]] = result

                if result["status"] == "success":
                    successful_analyses.append(result["chain"])
                else:
                    failed_analyses.append(result["chain"])

            except Exception as e:
                failed_analyses.append(f"timeout_or_error: {str(e)}")

        # Generate comprehensive summary
        summary = self._generate_comprehensive_summary(results, successful_analyses, failed_analyses)

        return {
            "individual_analyses": results,
            "summary": summary,
            "metadata": {
                "successful_chains": len(successful_analyses),
                "failed_chains": len(failed_analyses),
                "total_chains": len(self.analysis_chains),
                "analysis_timestamp": datetime.now().isoformat()
            }
        }

    def _generate_comprehensive_summary(self, results: Dict, successful: List, failed: List) -> str:
        """Generate a comprehensive summary from all successful analyses"""

        if not successful:
            return "Unable to generate comprehensive summary - all analysis chains failed"

        summary_parts = []

        for chain_name in successful:
            if chain_name in results and results[chain_name]["status"] == "success":
                summary_parts.append(f"**{chain_name.title()} Analysis:**\n{results[chain_name]['result'][:200]}...")

        if failed:
            summary_parts.append(f"**Note:** {len(failed)} analysis chain(s) failed: {', '.join(failed)}")

        return "\n\n".join(summary_parts)

Multi-Agent Coordination Strategies

Enterprise data systems often require multiple specialized agents working together, each contributing their unique expertise to solve complex analytical challenges.

Agent Communication Patterns

Implement sophisticated communication patterns between specialized agents:

from enum import Enum
from typing import Optional, List
import uuid

class AgentRole(Enum):
    DATA_QUALITY_ENGINEER = "data_quality_engineer"
    PERFORMANCE_ANALYST = "performance_analyst"
    SECURITY_AUDITOR = "security_auditor"
    BUSINESS_ANALYST = "business_analyst"
    SYSTEM_ORCHESTRATOR = "system_orchestrator"

class AgentMessage:
    """Structure for inter-agent communication"""

    def __init__(self, sender: AgentRole, recipient: AgentRole, message_type: str, content: str, metadata: Dict = None):
        self.id = str(uuid.uuid4())
        self.sender = sender
        self.recipient = recipient
        self.message_type = message_type
        self.content = content
        self.metadata = metadata or {}
        self.timestamp = datetime.now().isoformat()

class MultiAgentCoordinator:
    """Coordinate multiple specialized agents for comprehensive data analysis"""

    def __init__(self, llm):
        self.llm = llm
        self.agents = self._initialize_specialized_agents()
        self.message_history = []
        self.analysis_state = {}

    def _initialize_specialized_agents(self):
        """Initialize all specialized agents with their tools and prompts"""

        agents = {}

        # Data Quality Engineer Agent
        quality_tools = [create_data_quality_tool()]
        agents[AgentRole.DATA_QUALITY_ENGINEER] = create_specialized_production_agent(
            "a senior data quality engineer focused on ensuring data reliability and accuracy",
            quality_tools,
            "Expert in data validation, quality metrics, and automated testing frameworks"
        )

        # Performance Analyst Agent
        perf_tools = [create_streaming_monitor_tool(), create_production_data_tool()]
        agents[AgentRole.PERFORMANCE_ANALYST] = create_specialized_production_agent(
            "a performance analyst specialized in system optimization and resource efficiency",
            perf_tools,
            "Expert in performance tuning, resource optimization, and scalability analysis"
        )

        # Security Auditor Agent
        security_tools = [create_security_assessment_tool()]  # Would need to implement
        agents[AgentRole.SECURITY_AUDITOR] = create_specialized_production_agent(
            "a security auditor focused on data privacy, access controls, and compliance",
            security_tools,
            "Expert in data security, privacy regulations, and compliance frameworks"
        )

        return agents

    def coordinate_comprehensive_analysis(self, analysis_request: str) -> str:
        """Coordinate multiple agents for comprehensive system analysis"""

        # Phase 1: Initial individual analyses
        individual_results = {}

        for role, agent in self.agents.items():
            try:
                # Customize request for each agent's specialty
                specialized_request = self._customize_request_for_agent(analysis_request, role)

                # Execute analysis
                result = agent.run(specialized_request)
                individual_results[role.value] = result

                # Log communication
                message = AgentMessage(
                    sender=role,
                    recipient=AgentRole.SYSTEM_ORCHESTRATOR,
                    message_type="analysis_result",
                    content=result
                )
                self.message_history.append(message)

            except Exception as e:
                individual_results[role.value] = f"Analysis failed: {str(e)}"

        # Phase 2: Cross-agent collaboration and synthesis
        synthesis_result = self._synthesize_multi_agent_results(individual_results)

        # Phase 3: Generate final coordinated recommendations
        final_analysis = self._generate_coordinated_recommendations(individual_results, synthesis_result)

        return final_analysis

    def _customize_request_for_agent(self, base_request: str, agent_role: AgentRole) -> str:
        """Customize analysis request for specific agent expertise"""

        role_contexts = {
            AgentRole.DATA_QUALITY_ENGINEER: "Focus on data quality, validation, and accuracy aspects:",
            AgentRole.PERFORMANCE_ANALYST: "Focus on system performance, scalability, and optimization aspects:",
            AgentRole.SECURITY_AUDITOR: "Focus on security, privacy, and compliance aspects:"
        }

        context = role_contexts.get(agent_role, "Analyze from your area of expertise:")
        return f"{context}\n\n{base_request}"

    def _synthesize_multi_agent_results(self, results: Dict[str, str]) -> str:
        """Synthesize results from multiple agents to identify patterns and conflicts"""

        synthesis_prompt = f"""
        You are a senior system architect synthesizing analysis results from multiple specialists:

        {json.dumps(results, indent=2)}

        Identify:
        1. Common themes and patterns across all analyses
        2. Conflicting recommendations that need resolution
        3. Dependencies between different aspects of the system
        4. Integrated opportunities for improvement

        Provide a synthesis that bridges all perspectives.
        """

        synthesis_chain = LLMChain(
            llm=self.llm,
            prompt=PromptTemplate(
                template=synthesis_prompt,
                input_variables=[]
            )
        )

        return synthesis_chain.run({})

    def _generate_coordinated_recommendations(self, individual_results: Dict, synthesis: str) -> str:
        """Generate final coordinated recommendations based on all inputs"""

        recommendation_prompt = f"""
        Based on comprehensive multi-agent analysis and synthesis:

        INDIVIDUAL AGENT ANALYSES:
        {json.dumps(individual_results, indent=2)}

        CROSS-AGENT SYNTHESIS:
        {synthesis}

        Generate coordinated recommendations that:
        1. Address all identified issues holistically
        2. Prioritize actions based on impact and feasibility
        3. Consider interdependencies between different system aspects
        4. Provide clear implementation guidance
        5. Include success metrics and monitoring approaches

        Structure as an executive summary with actionable next steps.
        """

        final_chain = LLMChain(
            llm=self.llm,
            prompt=PromptTemplate(
                template=recommendation_prompt,
                input_variables=[]
            )
        )

        return final_chain.run({})

Advanced Error Recovery & Fault Tolerance

Enterprise systems require sophisticated error handling that goes beyond simple retries, implementing circuit breakers, graceful degradation, and intelligent recovery strategies.

Circuit Breaker Pattern for Agent Systems

Implement circuit breaker patterns to prevent cascading failures in agent workflows:

from enum import Enum
import time
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, requests blocked
    HALF_OPEN = "half_open" # Testing if service recovered

class CircuitBreaker:
    """Circuit breaker pattern for agent tool calls"""

    def __init__(self, failure_threshold: int = 5, timeout: int = 60, recovery_timeout: int = 30):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function call through circuit breaker"""

        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception(f"Circuit breaker OPEN - service unavailable")

        try:
            result = func(*args, **kwargs)

            # Success - reset circuit breaker if half open
            if self.state == CircuitState.HALF_OPEN:
                self._reset()

            return result

        except Exception as e:
            self._record_failure()
            raise e

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset"""
        if self.last_failure_time is None:
            return True

        return (time.time() - self.last_failure_time) >= self.recovery_timeout

    def _record_failure(self):
        """Record failure and update circuit breaker state"""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _reset(self):
        """Reset circuit breaker to closed state"""
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

class FaultTolerantAgent:
    """Agent with comprehensive fault tolerance mechanisms"""

    def __init__(self, base_agent, circuit_breaker_config=None):
        self.base_agent = base_agent
        self.circuit_breakers = {}
        self.circuit_config = circuit_breaker_config or {
            "failure_threshold": 3,
            "timeout": 30,
            "recovery_timeout": 60
        }

        # Wrap agent tools with circuit breakers
        self._wrap_tools_with_circuit_breakers()

    def _wrap_tools_with_circuit_breakers(self):
        """Wrap each agent tool with its own circuit breaker"""

        for tool in self.base_agent.tools:
            tool_name = tool.name

            # Create circuit breaker for this tool
            circuit_breaker = CircuitBreaker(**self.circuit_config)
            self.circuit_breakers[tool_name] = circuit_breaker

            # Wrap the tool function
            original_func = tool.func

            def wrapped_func(*args, circuit_breaker=circuit_breaker, original_func=original_func, **kwargs):
                return circuit_breaker.call(original_func, *args, **kwargs)

            tool.func = wrapped_func

    def run_with_fallback(self, request: str, fallback_strategies: List[str] = None) -> str:
        """Run agent with fallback strategies for fault tolerance"""

        fallback_strategies = fallback_strategies or [
            "retry_with_simplified_request",
            "use_cached_results",
            "provide_partial_analysis",
            "graceful_degradation_response"
        ]

        # Primary attempt
        try:
            return self.base_agent.run(request)

        except Exception as primary_error:
            print(f"Primary agent execution failed: {primary_error}")

            # Try fallback strategies in order
            for strategy in fallback_strategies:
                try:
                    return self._execute_fallback_strategy(strategy, request, primary_error)
                except Exception as fallback_error:
                    print(f"Fallback strategy '{strategy}' failed: {fallback_error}")
                    continue

            # All strategies failed
            return self._generate_failure_response(request, primary_error)

    def _execute_fallback_strategy(self, strategy: str, request: str, error: Exception) -> str:
        """Execute specific fallback strategy"""

        if strategy == "retry_with_simplified_request":
            simplified_request = self._simplify_request(request)
            return self.base_agent.run(simplified_request)

        elif strategy == "use_cached_results":
            # In production, implement actual caching
            return "Using cached analysis results due to system unavailability"

        elif strategy == "provide_partial_analysis":
            return self._generate_partial_analysis(request, error)

        elif strategy == "graceful_degradation_response":
            return self._generate_degraded_response(request, error)

        else:
            raise Exception(f"Unknown fallback strategy: {strategy}")

    def _simplify_request(self, request: str) -> str:
        """Simplify complex request to increase success probability"""

        # Extract key concepts and create simpler version
        simplified = f"""
        Provide a basic analysis of: {request[:100]}...

        Focus on essential information only. Avoid complex tool usage.
        """

        return simplified

    def _generate_partial_analysis(self, request: str, error: Exception) -> str:
        """Generate partial analysis based on available information"""

        return f"""
        PARTIAL ANALYSIS (some systems unavailable):

        Request: {request[:150]}...

        Status: Some data systems are currently unavailable due to: {str(error)[:100]}

        Available insights:
        - System appears to be experiencing connectivity issues
        - Recommend checking system status and retrying in 5-10 minutes
        - Basic troubleshooting steps: verify network connectivity, check service status

        For complete analysis, please retry when all systems are operational.
        """

    def _generate_degraded_response(self, request: str, error: Exception) -> str:
        """Generate response indicating degraded functionality"""

        return f"""
        SYSTEM OPERATING IN DEGRADED MODE

        Your request: {request[:100]}...

        Current status: Limited functionality due to system issues
        Error details: {str(error)[:150]}...

        Recommendations:
        1. Check system status dashboard
        2. Retry request in 10-15 minutes
        3. Contact system administrator if issues persist

        We apologize for the inconvenience and are working to restore full functionality.
        """

    def _generate_failure_response(self, request: str, error: Exception) -> str:
        """Generate final failure response when all strategies exhausted"""

        return f"""
        UNABLE TO PROCESS REQUEST

        All recovery strategies have been exhausted.

        Original request: {request[:100]}...
        Primary error: {str(error)[:100]}...

        Please contact technical support or try again later.
        """

    def get_circuit_breaker_status(self) -> Dict[str, str]:
        """Get current status of all circuit breakers"""

        status = {}
        for tool_name, breaker in self.circuit_breakers.items():
            status[tool_name] = {
                "state": breaker.state.value,
                "failure_count": breaker.failure_count,
                "last_failure_time": breaker.last_failure_time
            }

        return status

Performance Optimization for High-Scale Deployments

Enterprise deployments require sophisticated performance optimization strategies to handle high-volume, low-latency requirements while maintaining cost efficiency.

Intelligent Caching Strategies

Implement multi-level caching with intelligent invalidation for agent responses:

import hashlib
import pickle
import redis
from typing import Optional, Dict, Any
from datetime import datetime, timedelta

class AgentResponseCache:
    """Multi-level caching system for agent responses with intelligent invalidation"""

    def __init__(self, redis_url: str = None, enable_local_cache: bool = True):
        self.redis_client = redis.from_url(redis_url) if redis_url else None
        self.local_cache = {} if enable_local_cache else None
        self.cache_stats = {"hits": 0, "misses": 0, "invalidations": 0}

    def _generate_cache_key(self, request: str, context: Dict[str, Any] = None) -> str:
        """Generate deterministic cache key from request and context"""

        # Create consistent hash from request and context
        content = f"{request}_{json.dumps(context or {}, sort_keys=True)}"
        return hashlib.md5(content.encode()).hexdigest()

    def get_cached_response(self, request: str, context: Dict[str, Any] = None, max_age_minutes: int = 30) -> Optional[str]:
        """Retrieve cached response if available and valid"""

        cache_key = self._generate_cache_key(request, context)

        # Try local cache first (fastest)
        if self.local_cache:
            cached_item = self.local_cache.get(cache_key)
            if cached_item and self._is_cache_valid(cached_item, max_age_minutes):
                self.cache_stats["hits"] += 1
                return cached_item["response"]

        # Try Redis cache (distributed)
        if self.redis_client:
            try:
                cached_data = self.redis_client.get(cache_key)
                if cached_data:
                    cached_item = pickle.loads(cached_data)
                    if self._is_cache_valid(cached_item, max_age_minutes):
                        # Update local cache
                        if self.local_cache:
                            self.local_cache[cache_key] = cached_item

                        self.cache_stats["hits"] += 1
                        return cached_item["response"]
            except Exception as e:
                print(f"Redis cache error: {e}")

        # Cache miss
        self.cache_stats["misses"] += 1
        return None

    def cache_response(self, request: str, response: str, context: Dict[str, Any] = None, ttl_minutes: int = 60):
        """Cache agent response with TTL"""

        cache_key = self._generate_cache_key(request, context)
        cache_item = {
            "response": response,
            "timestamp": datetime.now().isoformat(),
            "context": context or {},
            "ttl_minutes": ttl_minutes
        }

        # Store in local cache
        if self.local_cache:
            self.local_cache[cache_key] = cache_item

        # Store in Redis with TTL
        if self.redis_client:
            try:
                serialized = pickle.dumps(cache_item)
                self.redis_client.setex(cache_key, timedelta(minutes=ttl_minutes), serialized)
            except Exception as e:
                print(f"Redis cache storage error: {e}")

    def _is_cache_valid(self, cached_item: Dict[str, Any], max_age_minutes: int) -> bool:
        """Check if cached item is still valid"""

        if "timestamp" not in cached_item:
            return False

        cached_time = datetime.fromisoformat(cached_item["timestamp"])
        age_limit = datetime.now() - timedelta(minutes=max_age_minutes)

        return cached_time > age_limit

    def invalidate_cache_pattern(self, pattern: str):
        """Invalidate cache entries matching pattern"""

        self.cache_stats["invalidations"] += 1

        # Invalidate local cache
        if self.local_cache:
            keys_to_remove = [key for key in self.local_cache.keys() if pattern in key]
            for key in keys_to_remove:
                del self.local_cache[key]

        # Invalidate Redis cache
        if self.redis_client:
            try:
                keys = self.redis_client.keys(f"*{pattern}*")
                if keys:
                    self.redis_client.delete(*keys)
            except Exception as e:
                print(f"Redis cache invalidation error: {e}")

    def get_cache_stats(self) -> Dict[str, Any]:
        """Get cache performance statistics"""

        total_requests = self.cache_stats["hits"] + self.cache_stats["misses"]
        hit_rate = self.cache_stats["hits"] / total_requests if total_requests > 0 else 0

        return {
            "hit_rate": f"{hit_rate:.2%}",
            "total_hits": self.cache_stats["hits"],
            "total_misses": self.cache_stats["misses"],
            "total_invalidations": self.cache_stats["invalidations"],
            "local_cache_size": len(self.local_cache) if self.local_cache else 0
        }

class HighPerformanceAgent:
    """Agent optimized for high-scale deployments with caching and performance monitoring"""

    def __init__(self, base_agent, cache_config: Dict[str, Any] = None):
        self.base_agent = base_agent
        self.cache = AgentResponseCache(**(cache_config or {}))
        self.performance_metrics = {
            "total_requests": 0,
            "cache_hits": 0,
            "avg_response_time": 0,
            "total_response_time": 0
        }

    def run_optimized(self, request: str, context: Dict[str, Any] = None, cache_ttl: int = 30) -> str:
        """Run agent with performance optimizations"""

        start_time = time.time()
        self.performance_metrics["total_requests"] += 1

        # Check cache first
        cached_response = self.cache.get_cached_response(request, context)
        if cached_response:
            self.performance_metrics["cache_hits"] += 1
            self._update_response_time_stats(time.time() - start_time)
            return cached_response

        # Execute agent and cache result
        try:
            response = self.base_agent.run(request)
            self.cache.cache_response(request, response, context, cache_ttl)

            self._update_response_time_stats(time.time() - start_time)
            return response

        except Exception as e:
            self._update_response_time_stats(time.time() - start_time)
            raise e

    def _update_response_time_stats(self, response_time: float):
        """Update response time statistics"""

        self.performance_metrics["total_response_time"] += response_time
        self.performance_metrics["avg_response_time"] = (
            self.performance_metrics["total_response_time"] /
            self.performance_metrics["total_requests"]
        )

    def get_performance_report(self) -> Dict[str, Any]:
        """Get comprehensive performance report"""

        cache_stats = self.cache.get_cache_stats()

        return {
            "agent_performance": {
                "total_requests": self.performance_metrics["total_requests"],
                "cache_hit_rate": f"{self.performance_metrics['cache_hits'] / max(1, self.performance_metrics['total_requests']):.2%}",
                "avg_response_time": f"{self.performance_metrics['avg_response_time']:.3f}s"
            },
            "cache_performance": cache_stats,
            "optimization_recommendations": self._generate_optimization_recommendations(cache_stats)
        }

    def _generate_optimization_recommendations(self, cache_stats: Dict) -> List[str]:
        """Generate performance optimization recommendations"""

        recommendations = []

        hit_rate = float(cache_stats["hit_rate"].rstrip('%')) / 100

        if hit_rate < 0.3:
            recommendations.append("Consider increasing cache TTL or improving cache key strategy")

        if self.performance_metrics["avg_response_time"] > 2.0:
            recommendations.append("Average response time is high - consider tool optimization or parallel execution")

        if cache_stats["local_cache_size"] > 1000:
            recommendations.append("Local cache size is large - consider implementing LRU eviction")

        return recommendations

Enterprise Integration Patterns

Integrate agents seamlessly with existing enterprise data infrastructure, authentication systems, and compliance frameworks.

Authentication & Authorization Integration

Implement enterprise-grade security patterns for agent deployments:

from functools import wraps
import jwt
from typing import List, Dict, Callable
from enum import Enum

class Permission(Enum):
    READ_DATA = "read_data"
    WRITE_DATA = "write_data"
    ADMIN_OPERATIONS = "admin_operations"
    EXECUTE_QUERIES = "execute_queries"

class SecureAgent:
    """Enterprise agent with comprehensive security controls"""

    def __init__(self, base_agent, jwt_secret: str, permission_config: Dict[str, List[Permission]]):
        self.base_agent = base_agent
        self.jwt_secret = jwt_secret
        self.permission_config = permission_config
        self.audit_log = []

        # Wrap tools with security controls
        self._secure_agent_tools()

    def _secure_agent_tools(self):
        """Apply security wrappers to all agent tools"""

        for tool in self.base_agent.tools:
            original_func = tool.func
            required_permissions = self._get_tool_permissions(tool.name)

            def secured_func(*args, original_func=original_func, permissions=required_permissions, **kwargs):
                # Security check would be performed here in actual implementation
                return original_func(*args, **kwargs)

            tool.func = secured_func

    def _get_tool_permissions(self, tool_name: str) -> List[Permission]:
        """Get required permissions for a specific tool"""

        tool_permissions = {
            "DataWarehouse": [Permission.READ_DATA, Permission.EXECUTE_QUERIES],
            "StreamingMonitor": [Permission.READ_DATA],
            "DataQualityAssessment": [Permission.READ_DATA],
            "SystemAdmin": [Permission.ADMIN_OPERATIONS]
        }

        return tool_permissions.get(tool_name, [Permission.READ_DATA])

    def run_secure(self, request: str, auth_token: str, user_context: Dict[str, str] = None) -> str:
        """Run agent with security validation"""

        # Validate authentication token
        try:
            user_claims = jwt.decode(auth_token, self.jwt_secret, algorithms=["HS256"])
            user_id = user_claims.get("user_id")
            user_roles = user_claims.get("roles", [])

        except jwt.InvalidTokenError as e:
            self._log_security_event("AUTHENTICATION_FAILED", request, str(e))
            raise PermissionError("Invalid authentication token")

        # Check authorization
        if not self._check_authorization(user_roles, request):
            self._log_security_event("AUTHORIZATION_FAILED", request, f"User {user_id} insufficient permissions")
            raise PermissionError("Insufficient permissions for requested operation")

        # Log authorized request
        self._log_security_event("REQUEST_AUTHORIZED", request, f"User {user_id} with roles {user_roles}")

        try:
            # Execute with user context
            response = self.base_agent.run(request)

            # Log successful execution
            self._log_security_event("REQUEST_COMPLETED", request, f"Success for user {user_id}")

            return response

        except Exception as e:
            self._log_security_event("REQUEST_FAILED", request, f"Error for user {user_id}: {str(e)}")
            raise e

    def _check_authorization(self, user_roles: List[str], request: str) -> bool:
        """Check if user has required permissions for request"""

        # In production, implement sophisticated authorization logic
        # This is a simplified example

        admin_roles = ["admin", "data_engineer", "senior_analyst"]
        read_only_roles = ["analyst", "viewer"]

        # Check for admin operations
        if any(keyword in request.lower() for keyword in ["delete", "drop", "modify", "admin"]):
            return any(role in admin_roles for role in user_roles)

        # All users can perform read operations
        return True

    def _log_security_event(self, event_type: str, request: str, details: str):
        """Log security events for audit purposes"""

        audit_entry = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "request_summary": request[:100],
            "details": details
        }

        self.audit_log.append(audit_entry)

        # In production, send to security information and event management (SIEM) system
        print(f"SECURITY LOG: {event_type} - {details}")

    def get_audit_log(self) -> List[Dict]:
        """Retrieve security audit log"""
        return self.audit_log.copy()

🎯📝 Prerequisites Review

Before diving deeper, ensure you've mastered the foundational concepts:

Core Understanding Required:
- 🎯 LangChain Architecture Foundations - Essential building blocks
- 📝 Practical Implementation - Hands-on experience with agents and tools

⚙️ Continue Advanced Learning

Explore related advanced architecture topics:

Next Advanced Modules:
- ⚙️ Production Memory Systems - Enterprise state management and persistence
- ⚙️ Enterprise Tool Development - Custom integrations and specialized capabilities

Legacy Advanced Modules:
- Advanced LangChain Patterns - Complex workflows & optimization
- Production Deployment Strategies - Enterprise deployment & monitoring


Previous: Session 1 - Foundations →
Next: Session 3 - Advanced Patterns →