Skip to content

📝 Session 2: LangChain Practical Implementation

📝 PARTICIPANT PATH CONTENT Prerequisites: Complete 🎯 Observer Path Time Investment: 2-3 hours Outcome: Build production-ready data intelligence agents

Learning Outcomes

After completing this practical implementation module, you will be able to:

  • Create robust data processing chains with error handling
  • Build agents with custom tools for data system integration
  • Implement production-grade memory management strategies
  • Deploy agents with monitoring and reliability patterns

Practical Implementation: Building Real-World Data Systems

Now we move from concepts to practice, creating agents that solve actual data engineering problems and provide real value to data teams and infrastructure.

Exercise Files

Practice with these implementation examples that demonstrate LangChain patterns in action for data systems:

Validation Commands

Test your understanding with these commands that verify your implementations work correctly with data scenarios:

cd src/session2
python langchain_basics.py        # Architecture validation
python langchain_tool_use.py      # Agent workflow testing

Advanced Chain Patterns with Error Handling

Building on the basic chains from the Observer Path, we now implement production-ready patterns that handle real-world data processing challenges.

Prompt Templates with Validation

Prompt templates create reusable, parameterized prompts with variables, solving the challenge of how to maintain consistency while adapting to different data contexts:

template = """
Role: {role}
Data Analysis Task: {task}
Dataset Context: {context}
Output Format: {format}
"""

Define the template with input validation - this creates a flexible framework for various data analysis scenarios:

from langchain.prompts import PromptTemplate

def create_validated_prompt(role, task, context, format):
    """Create prompt template with input validation"""
    # Validate required parameters
    if not all([role, task, context, format]):
        raise ValueError("All template parameters are required")

    template = """
    Role: {role}
    Data Analysis Task: {task}
    Dataset Context: {context}
    Output Format: {format}
    """

    return PromptTemplate(
        template=template,
        input_variables=["role", "task", "context", "format"]
    )

Using Templates with Chains

Combine the template with a chain for dynamic data analysis responses - this is where templates become powerful, adaptive interfaces for data intelligence:

def create_analysis_chain(llm):
    """Create reusable analysis chain with error handling"""
    try:
        prompt = create_validated_prompt(
            role="Data Quality Engineer",
            task="Analyze streaming data anomalies",
            context="Real-time customer behavior data from e-commerce platform",
            format="JSON with severity levels and recommended actions"
        )

        return LLMChain(llm=llm, prompt=prompt)
    except Exception as e:
        print(f"Chain creation failed: {e}")
        return None

Error Handling and Retry Logic

Robust error handling is crucial for production data systems, transforming brittle demos into resilient data infrastructure. Common failures include API rate limits, network timeouts, and service unavailability:

from langchain.callbacks import StdOutCallbackHandler
import time

def run_with_retry(chain, inputs, max_retries=3):
    """Execute chain with exponential backoff retry logic"""
    for attempt in range(max_retries):
        try:
            return chain.run(inputs)
        except Exception as e:
            if attempt == max_retries - 1:
                raise e

            # Exponential backoff: 1s, 2s, 4s
            wait_time = 2 ** attempt
            print(f"Attempt {attempt + 1} failed: {e}")
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)

    return None

Usage Example with Complete Error Handling

Use the retry function with proper error handling - this ensures your data processing applications maintain uptime and reliability:

def analyze_data_with_resilience(chain, data_report):
    """Analyze data with complete error handling and logging"""
    try:
        result = run_with_retry(
            chain,
            {"data_report": data_report}
        )

        if result:
            print(f"Analysis completed successfully: {result[:100]}...")
            return result
        else:
            print("Analysis failed after all retries")
            return "Unable to complete analysis - please try again later"

    except Exception as e:
        print(f"Critical error in data analysis: {e}")
        return f"Analysis system error: {str(e)}"

Advanced Tool Creation & Integration

Moving beyond basic tools, we create sophisticated data processing capabilities with proper error handling and validation.

Tool Creation Methods - Extended Implementation

Method 3: Production Tool with Full Error Handling

Create production-ready tools with comprehensive error handling and logging:

from langchain.agents import Tool
import logging
import json

def create_production_data_tool():
    """Create enterprise-grade data warehouse tool"""

    def query_data_warehouse_safe(sql_query: str) -> str:
        """Execute SQL query with comprehensive error handling"""
        try:
            # Input validation
            if not sql_query or not isinstance(sql_query, str):
                return "Error: Invalid SQL query provided"

            # Log query attempt
            logging.info(f"Executing query: {sql_query[:100]}...")

            # Simulate database connection and query
            # In production: connect to actual data warehouse
            if "DROP" in sql_query.upper() or "DELETE" in sql_query.upper():
                return "Error: Destructive operations not allowed"

            # Mock successful query execution
            result = {
                "status": "success",
                "rows_returned": 1847,
                "execution_time": "2.3s",
                "query": sql_query[:50] + "..." if len(sql_query) > 50 else sql_query
            }

            return json.dumps(result, indent=2)

        except Exception as e:
            logging.error(f"Database query failed: {e}")
            return f"Database connection error: {str(e)}"

    return Tool(
        name="DataWarehouse",
        description="Execute safe SQL queries against enterprise data warehouse with error handling",
        func=query_data_warehouse_safe
    )

Streaming Pipeline Monitoring Tool

Create a comprehensive monitoring tool for real-time data pipelines:

def create_streaming_monitor_tool():
    """Create comprehensive streaming pipeline monitoring tool"""

    def monitor_streaming_pipeline_detailed(pipeline_id: str) -> str:
        """Monitor streaming pipeline with detailed metrics and error detection"""
        try:
            if not pipeline_id:
                return "Error: Pipeline ID required"

            # Simulate comprehensive pipeline monitoring
            # In production: integrate with Apache Kafka, Apache Storm, etc.

            metrics = {
                "pipeline_id": pipeline_id,
                "status": "RUNNING",
                "metrics": {
                    "events_per_second": 15000,
                    "average_latency_ms": 150,
                    "error_rate": 0.02,
                    "consumer_lag": 50
                },
                "health_checks": {
                    "kafka_connection": "HEALTHY",
                    "database_connection": "HEALTHY",
                    "memory_usage": "78%",
                    "cpu_usage": "45%"
                },
                "alerts": []
            }

            # Add alerts based on metrics
            if metrics["metrics"]["error_rate"] > 0.01:
                metrics["alerts"].append("Warning: Error rate above threshold")

            if metrics["metrics"]["consumer_lag"] > 100:
                metrics["alerts"].append("Warning: Consumer lag high")

            return json.dumps(metrics, indent=2)

        except Exception as e:
            return f"Monitoring system error: {str(e)}"

    return Tool(
        name="StreamingMonitor",
        description="Monitor real-time streaming pipelines with comprehensive metrics and alerting",
        func=monitor_streaming_pipeline_detailed
    )

Data Quality Assessment Tool

Build a comprehensive data quality assessment system:

def create_data_quality_tool():
    """Create advanced data quality assessment tool"""

    def assess_data_quality_comprehensive(dataset_path: str) -> str:
        """Perform comprehensive data quality assessment"""
        try:
            if not dataset_path:
                return "Error: Dataset path required"

            # Simulate comprehensive data quality analysis
            # In production: integrate with Great Expectations, Deequ, etc.

            quality_report = {
                "dataset": dataset_path,
                "timestamp": "2024-01-15T10:30:00Z",
                "overall_score": 94.7,
                "dimensions": {
                    "completeness": {
                        "score": 98.2,
                        "null_rate": 1.8,
                        "missing_patterns": ["customer_phone", "secondary_email"]
                    },
                    "validity": {
                        "score": 92.5,
                        "schema_compliance": 98.2,
                        "format_violations": ["date_format", "phone_format"]
                    },
                    "consistency": {
                        "score": 95.1,
                        "duplicate_rate": 2.3,
                        "cross_field_consistency": 97.8
                    },
                    "accuracy": {
                        "score": 91.8,
                        "reference_data_match": 89.5,
                        "business_rule_violations": 12
                    }
                },
                "recommendations": [
                    "Implement phone number validation",
                    "Add date format standardization",
                    "Review duplicate detection logic"
                ]
            }

            return json.dumps(quality_report, indent=2)

        except Exception as e:
            return f"Data quality assessment failed: {str(e)}"

    return Tool(
        name="DataQualityAssessment",
        description="Comprehensive data quality assessment with scoring and recommendations",
        func=assess_data_quality_comprehensive
    )

Agent Workflows & Complex Reasoning

Building sophisticated agents that can handle multi-step data analysis workflows with proper error recovery.

Tool Calling in Action - Production Implementation

Run the agent with complex data requests that need multiple tools - watch as it breaks down data problems like an experienced data engineer:

def create_production_data_agent():
    """Create production-ready data analysis agent"""

    # Initialize tools
    tools = [
        create_production_data_tool(),
        create_streaming_monitor_tool(),
        create_data_quality_tool()
    ]

    # Configure memory with size limits
    memory = ConversationBufferWindowMemory(
        memory_key="chat_history",
        k=10,  # Keep last 10 interactions
        return_messages=True
    )

    # Create agent with error handling
    try:
        agent = initialize_agent(
            tools=tools,
            llm=create_llm(),
            agent_type=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
            memory=memory,
            verbose=True,  # Show reasoning in development
            handle_parsing_errors=True,  # Graceful error handling
            max_iterations=5  # Prevent infinite loops
        )

        return agent
    except Exception as e:
        print(f"Agent creation failed: {e}")
        return None

Complex Multi-Tool Analysis

Execute sophisticated analysis workflows that require coordination of multiple data systems:

def run_comprehensive_data_analysis():
    """Execute comprehensive data analysis workflow"""

    agent = create_production_data_agent()
    if not agent:
        return "Failed to create agent"

    complex_request = """
    I need a comprehensive analysis of our data infrastructure:

    1. Check the quality of our customer behavior dataset
    2. Monitor the performance of streaming pipeline 'customer-events-v2'
    3. Query the data warehouse for any customer behavior anomalies in the last 24 hours
    4. Provide recommendations based on all findings
    """

    try:
        response = agent.run(complex_request)
        print("=== COMPREHENSIVE DATA ANALYSIS RESULTS ===")
        print(response)
        return response

    except Exception as e:
        print(f"Analysis workflow failed: {e}")
        return f"Workflow error: {str(e)}"

Agent Thought Process - Production Monitoring

With verbose=True, you can see the agent's reasoning - this reveals the sophisticated data analysis decision-making happening behind the scenes:

Thought: I need to analyze data infrastructure comprehensively across multiple systems
Action: DataQualityAssessment with customer behavior dataset
Observation: Quality score 94.7%, some phone format issues identified
Thought: Now I need to check streaming pipeline performance
Action: StreamingMonitor for customer-events-v2 pipeline
Observation: Pipeline healthy, processing 15K events/sec with minimal lag
Thought: Finally, I need to query the warehouse for recent anomalies
Action: DataWarehouse with anomaly detection query for last 24 hours
Observation: Found 3 behavioral pattern changes requiring investigation
Thought: I have all the data - time to synthesize comprehensive recommendations
Final Answer: Your data infrastructure shows good overall health (94.7% quality score) with strong streaming performance (15K events/sec). However, I identified 3 areas needing attention: phone format validation, behavioral anomaly investigation, and consumer lag monitoring. Recommended actions: implement phone validation rules, investigate the 3 detected behavioral changes, and set up proactive lag alerting.

State Persistence & Production Memory

Implement enterprise-grade memory management with persistence and recovery capabilities.

State Persistence - Production Implementation

Saving memory allows agents to remember previous data analysis sessions across conversations, creating continuity like data team relationships that build analytical understanding over time.

Enhanced Persistence Functions

Implement robust file-based persistence with error handling and validation:

import json
import os
from datetime import datetime

def save_conversation_robust(memory, filename, metadata=None):
    """Save conversation with metadata and error handling"""
    try:
        # Create backup directory if it doesn't exist
        os.makedirs("conversation_backups", exist_ok=True)

        # Prepare conversation data
        conversation_data = {
            "timestamp": datetime.now().isoformat(),
            "metadata": metadata or {},
            "messages": []
        }

        # Extract messages from memory
        messages = memory.chat_memory.messages
        for msg in messages:
            conversation_data["messages"].append({
                "type": type(msg).__name__,
                "content": str(msg)
            })

        # Save with backup
        filepath = os.path.join("conversation_backups", filename)
        with open(filepath, 'w') as f:
            json.dump(conversation_data, f, indent=2)

        print(f"✅ Conversation saved: {len(messages)} messages to {filepath}")
        return True

    except Exception as e:
        print(f"❌ Failed to save conversation: {e}")
        return False

Enhanced Loading Functions

Load previous conversations with validation and error recovery:

def load_conversation_robust(memory, filename):
    """Load conversation with validation and error recovery"""
    try:
        filepath = os.path.join("conversation_backups", filename)

        if not os.path.exists(filepath):
            print(f"📝 No previous conversation found at {filepath} - starting fresh")
            return False

        with open(filepath, 'r') as f:
            conversation_data = json.load(f)

        # Validate data structure
        if "messages" not in conversation_data:
            print("⚠️ Invalid conversation file format")
            return False

        messages = conversation_data["messages"]
        metadata = conversation_data.get("metadata", {})

        print(f"📖 Loaded conversation: {len(messages)} messages from {conversation_data.get('timestamp', 'unknown time')}")

        if metadata:
            print(f"📋 Session metadata: {metadata}")

        # Note: Full message reconstruction would require LangChain message objects
        # This is a simplified example showing the persistence pattern

        return True

    except Exception as e:
        print(f"❌ Failed to load conversation: {e}")
        return False

Context Management - Production Specialization

Context gives agents personality and specialized data knowledge, transforming generic AI into domain-specific data experts:

  • Role: Data Quality Engineer vs ML Pipeline Specialist - different expertise and analytical approaches
  • Knowledge: Domain-specific data understanding that shapes analytical responses
  • Style: Communication preferences that match data team expectations

Creating Specialized Production Agents

Define a function to build specialized data agents with comprehensive configuration:

def create_specialized_production_agent(role_description, tools_list, expertise_context=None):
    """Build production agent with specialized expertise and comprehensive configuration"""

    # Build specialized system prompt
    system_prompt = f"""
    You are {role_description}.

    Core Expertise:
    - Deep knowledge of data engineering best practices
    - Experience with production data systems and reliability
    - Focus on actionable insights and practical recommendations
    - Understanding of compliance and security requirements

    Communication Style:
    - Provide clear, concise analysis with specific recommendations
    - Include relevant metrics and data quality indicators
    - Highlight potential risks and mitigation strategies
    - Structure responses for technical and business audiences

    Additional Context:
    {expertise_context or "No additional context provided"}

    Always maintain focus on data reliability, security, and operational excellence.
    """

    # Configure production-grade memory
    memory = ConversationBufferWindowMemory(
        memory_key="chat_history",
        k=8,  # Optimal for context without token limits
        return_messages=True
    )

    # Create agent with production configuration
    try:
        agent = initialize_agent(
            tools=tools_list,
            llm=create_llm(),
            agent_type=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
            memory=memory,
            verbose=False,  # Disable in production
            handle_parsing_errors=True,
            max_iterations=4,  # Prevent runaway execution
            agent_kwargs={"system_message": system_prompt}
        )

        return agent
    except Exception as e:
        print(f"Specialized agent creation failed: {e}")
        return None

Creating Expert Production Agents

Build different specialized data agents for production environments:

# Data Quality Specialist
def create_data_quality_specialist():
    """Create specialized data quality engineering agent"""
    return create_specialized_production_agent(
        role_description="a senior data quality engineer with 8+ years of experience ensuring data reliability and accuracy across enterprise data systems",
        tools_list=[create_data_quality_tool(), create_production_data_tool()],
        expertise_context="Expert in data validation frameworks, quality metrics, and automated testing pipelines. Experienced with Great Expectations, Apache Griffin, and custom validation systems."
    )

# ML Pipeline Expert
def create_ml_pipeline_specialist():
    """Create specialized ML pipeline engineering agent"""
    return create_specialized_production_agent(
        role_description="an ML infrastructure engineer specialized in deploying and monitoring machine learning pipelines at scale",
        tools_list=[create_streaming_monitor_tool(), create_production_data_tool()],
        expertise_context="Expert in MLOps, model deployment, performance monitoring, and pipeline orchestration. Experienced with Kubeflow, MLflow, and production ML system architecture."
    )

Build Your Own Data Intelligence Agent

Create a practical data assistant following this comprehensive structure - this exercise brings together everything you've learned for real data engineering scenarios:

def create_data_intelligence_agent():
    """
    Build comprehensive data intelligence agent

    Implementation Checklist:
    1. ✅ Define tools: data warehouse, streaming monitor, quality checker
    2. ✅ Set up conversation memory for analytical continuity
    3. ✅ Add error handling for data access failures
    4. ✅ Configure specialized expertise and context
    5. ✅ Test with complex multi-step data analysis requests
    """

    # Step 1: Initialize production tools
    tools = [
        create_production_data_tool(),
        create_streaming_monitor_tool(),
        create_data_quality_tool()
    ]

    # Step 2: Configure memory with persistence
    memory = ConversationBufferWindowMemory(
        memory_key="chat_history",
        k=10,
        return_messages=True
    )

    # Step 3: Create agent with comprehensive error handling
    try:
        agent = initialize_agent(
            tools=tools,
            llm=create_llm(),
            agent_type=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
            memory=memory,
            verbose=True,  # Enable for development
            handle_parsing_errors=True,
            max_iterations=5
        )

        print("✅ Data Intelligence Agent created successfully")
        return agent

    except Exception as e:
        print(f"❌ Agent creation failed: {e}")
        return None

def test_data_intelligence_agent():
    """Test the agent with comprehensive data analysis request"""
    agent = create_data_intelligence_agent()
    if not agent:
        return

    # Test with complex multi-system request
    test_request = """
    Analyze our customer data pipeline performance and check quality metrics:

    1. Assess data quality of customer behavior dataset
    2. Monitor streaming pipeline 'customer-events-v2' performance
    3. Query warehouse for recent processing anomalies
    4. Provide comprehensive recommendations for improvements
    """

    try:
        print("🧠 Running comprehensive data analysis...")
        response = agent.run(test_request)
        print("=" * 50)
        print("📊 ANALYSIS COMPLETE")
        print("=" * 50)
        print(response)

    except Exception as e:
        print(f"❌ Test failed: {e}")

# Execute the test
if __name__ == "__main__":
    test_data_intelligence_agent()

Self-Assessment Checklist

Verify your understanding before moving to advanced topics:

Core Concepts:
- [ ] I can explain LangChain's 4 building blocks (LLM, Tools, Memory, Agent) and their interactions
- [ ] I understand when to use different chain patterns for data workflows
- [ ] I can implement error handling and retry logic for production reliability

Practical Implementation:
- [ ] I can create custom tools with comprehensive error handling
- [ ] I can build agents that coordinate multiple data systems effectively
- [ ] I can implement memory persistence for conversation continuity

Production Readiness:
- [ ] I understand production considerations for agent deployment
- [ ] I can create specialized agents with domain expertise
- [ ] I know when to use LangChain vs alternatives for data systems

🎯 Quick Review: Observer Path Concepts

If you need to review fundamental concepts covered in the Observer Path:

🎯 Return to Observer Path →

⚙️ Next Steps: Advanced Architecture

Ready for enterprise-grade patterns and sophisticated architectures?

Choose your advanced learning path:
- ⚙️ Advanced Agent Architecture - Complex orchestration and workflow patterns
- ⚙️ Production Memory Systems - Enterprise state management and persistence
- ⚙️ Enterprise Tool Development - Custom integrations and specialized capabilities


Previous: Session 1 - Foundations →
Next: Session 3 - Advanced Patterns →