Skip to content

Session 2: Advanced Chunking & Preprocessing

Learning Outcomes

By the end of this session, you will be able to: - Design hierarchical chunking strategies that preserve document structure and meaning - Extract and utilize rich metadata to enhance retrieval quality and context - Process multi-modal content including tables, images, and structured data effectively - Implement intelligent preprocessing pipelines that adapt to document complexity - Evaluate chunk quality using comprehensive metrics and assessment frameworks

Chapter Overview

What You'll Learn: Enterprise-Grade Document Understanding

Advanced chunking transforms RAG systems from simple text splitters into intelligent document understanding engines. Unlike naive approaches that arbitrarily split text, enterprise systems preserve document structure, extract rich metadata, and handle complex content types that include tables, images, and domain-specific formatting.

Why It Matters: Real-World Document Complexity

Enterprise documents contain structured relationships that naive chunking destroys. Tables split across chunks lose meaning, hierarchical information gets fragmented, and rich contextual metadata is discarded. Advanced preprocessing preserves these relationships, leading to 50-70% better context preservation and 30-40% improvement in retrieval relevance.

How It Stands Out: 2025 Enterprise Standards

Based on current industry trends, advanced chunking in 2025 employs: - Neural boundary detection that learns optimal split points - Context-aware processing that adapts to document types - Multi-modal understanding for mixed content formats - Dynamic window summarization for enhanced context

Where You'll Apply It: Critical Enterprise Use Cases

  • Legal Document Processing: Preserving citations and statutory references
  • Medical Record Analysis: Maintaining dosage and treatment relationships
  • Technical Documentation: Keeping code blocks and API references intact
  • Research Paper Processing: Preserving academic structure and citations

RAG Problems Overview Figure 1: This diagram illustrates common problems with naive chunking approaches, including fragmented content, lost context, and structural information loss that advanced preprocessing solves.

Learning Path Options

Observer Path (30 minutes): Understand concepts with simple examples - Focus: Core concepts and practical applications with clear demonstrations - Best for: Getting oriented with intelligent document processing fundamentals

๐Ÿ™‹โ€โ™‚๏ธ Participant Path (65 minutes): Implement structure-aware chunking - Focus: Hands-on implementation of hierarchical and metadata-enhanced chunking - Best for: Building practical preprocessing pipelines with real-world applications

๐Ÿ› ๏ธ Implementer Path (105 minutes): Advanced patterns and enterprise deployment - Focus: Domain-specific processors and enterprise-scale optimization - Best for: Deep technical mastery with production-ready implementations


Part 1: Document Structure Analysis (Observer: 8 min | Participant: 15 min)

Understanding the Enterprise Challenge

Enterprise documents contain complex structures that naive chunking destroys. Consider a typical enterprise scenario:

The Problem: A legal contract with multiple sections, numbered clauses, and cross-references gets processed by simple text splitting. The result? Clause numbers are separated from their content, cross-references become meaningless, and the hierarchical structure that legal professionals rely on is completely lost.

The Solution: Structure-aware document analysis that preserves these critical relationships.

Why Document Structure Analysis Matters

Intelligent document analysis provides several critical capabilities: - Preserves semantic relationships: Keeps related content together - Maintains hierarchy: Uses document structure as natural boundaries - Enables specialized processing: Different content types get appropriate handling - Enhances retrieval quality: Includes structural metadata for better matching

Simple Content Type Detection (Observer Focus)

Here's a straightforward example of how we can detect different content types in documents:

# Simple content type detection

from enum import Enum

class ContentType(Enum):
    HEADING = "heading"
    PARAGRAPH = "paragraph" 
    TABLE = "table"
    CODE = "code"
    LIST = "list"

def detect_simple_content_type(text_line):
    """Detect content type from a single line of text."""
    text = text_line.strip()

    # Check for markdown heading
    if text.startswith('#'):
        return ContentType.HEADING

    # Check for table (pipe-separated)
    if '|' in text and text.count('|') >= 2:
        return ContentType.TABLE

    # Check for code (starts with 4 spaces or tab)
    if text.startswith('    ') or text.startswith('\t'):
        return ContentType.CODE

    # Check for list item
    if text.startswith('- ') or text.startswith('* '):
        return ContentType.LIST

    return ContentType.PARAGRAPH

Code Explanation: - Lines 12-14: Detects markdown headings by looking for the '#' symbol - Lines 16-18: Identifies tables by counting pipe characters ('|') that separate columns - Lines 20-22: Recognizes code blocks by indentation patterns - Lines 24-26: Spots list items by bullet point markers

This classification enables our system to make intelligent decisions about how to process each type of content.

PARTICIPANT PATH: Building Document Elements

For hands-on implementation, let's create a more sophisticated structure to represent document elements:

from dataclasses import dataclass
from typing import Dict, Any

@dataclass
class DocumentElement:
    """Represents a structured document element with metadata."""
    content: str
    element_type: ContentType
    level: int  # Hierarchy level (0=top, 1=section, 2=subsection)
    metadata: Dict[str, Any]
    position: int  # Position in document

    def get_hierarchy_context(self):
        """Get human-readable hierarchy information."""
        hierarchy_labels = {
            0: "Document Root",
            1: "Major Section", 
            2: "Subsection",
            3: "Minor Section"
        }
        return hierarchy_labels.get(self.level, f"Level {self.level}")

Implementation Benefits: - Structured representation: Each element carries its content, type, and context - Hierarchy tracking: Level information enables structure-aware processing - Rich metadata: Position and additional context for enhanced chunking - Processing intelligence: Different element types can receive specialized handling

Now let's implement the structure analyzer that processes documents line by line:

class DocumentStructureAnalyzer:
    """Analyzes document structure and content types."""

    def __init__(self):
        self.heading_patterns = [
            r'^#{1,6}\s+(.+)$',      # Markdown headers
            r'^([A-Z][^a-z]*)\s*$',  # ALL CAPS headers
            r'^\d+\.\s+(.+)$',       # Numbered headers
        ]

    def analyze_structure(self, document_text: str) -> List[DocumentElement]:
        """Analyze document structure and create structured elements."""
        lines = document_text.split('\n')
        elements = []
        current_level = 0
        position = 0

        for i, line in enumerate(lines):
            if not line.strip():
                continue

            content_type = detect_simple_content_type(line)
            level = self._determine_level(line, current_level, content_type)

            element = DocumentElement(
                content=line.strip(),
                element_type=content_type,
                level=level,
                metadata={
                    "line_number": i + 1,
                    "char_count": len(line),
                    "word_count": len(line.split())
                },
                position=position
            )

            elements.append(element)
            position += 1
            current_level = level

        return elements

This analyzer creates a structured representation of the document that preserves hierarchy and content relationships.

IMPLEMENTER PATH: Advanced Pattern Recognition

For enterprise applications, implement sophisticated pattern recognition:

import re
from typing import List, Tuple

class AdvancedDocumentAnalyzer:
    """Enterprise-grade document analysis with domain-specific patterns."""

    def __init__(self):
        self.domain_patterns = {
            "legal": {
                "section_markers": [r"ยง\s*\d+", r"Article\s+[IVX]+", r"Section\s+\d+"],
                "citations": [r"\d+\s+U\.S\.C\.\s+ยง\s+\d+", r"\d+\s+F\.\d+d\s+\d+"]
            },
            "medical": {
                "dosages": [r"\d+\s*mg", r"\d+\s*ml", r"\d+\s*cc"],
                "medications": [r"[A-Z][a-z]+(?:in|ol|ide|ine)"]
            },
            "technical": {
                "apis": [r"[A-Z][a-zA-Z]+\.[a-zA-Z]+\(\)", r"HTTP[S]?\://"],
                "versions": [r"v?\d+\.\d+\.\d+", r"version\s+\d+"]
            }
        }

    def analyze_with_domain_knowledge(self, document_text: str, 
                                    domain: str = "general") -> Dict[str, Any]:
        """Analyze document with domain-specific intelligence."""
        analysis = {
            "domain": domain,
            "structure": self._analyze_structure(document_text),
            "complexity_score": self._calculate_complexity_score(document_text),
            "processing_strategy": "standard"
        }

        if domain in self.domain_patterns:
            domain_features = self._extract_domain_features(document_text, domain)
            analysis["domain_features"] = domain_features
            analysis["processing_strategy"] = self._recommend_strategy(domain_features)

        return analysis

This advanced analyzer adapts to different document domains and provides intelligent processing recommendations.


Part 2: Hierarchical Chunking Implementation (Observer: 8 min | Participant: 20 min)

Why Hierarchical Chunking Transforms RAG Performance

Traditional chunking is like cutting a newspaper with a paper shredder - it destroys the logical structure authors carefully created. Hierarchical chunking cuts along natural boundaries, preserving meaning and relationships.

Performance Impact: - 60-80% better context preservation because chunks contain complete thoughts - 40-50% improved retrieval accuracy because queries match naturally coherent content units - Enhanced user experience because retrieved content makes logical sense

Simple Hierarchical Chunking (Observer Focus)

Here's a basic example of how hierarchical chunking works:

def simple_hierarchical_chunk(elements: List[DocumentElement], 
                            max_chunk_size: int = 500) -> List[str]:
    """Create simple hierarchical chunks based on document structure."""
    chunks = []
    current_chunk = []
    current_size = 0

    for element in elements:
        element_size = len(element.content)

        # Start new chunk on major headings if current chunk has content
        if (element.element_type == ContentType.HEADING and 
            element.level <= 1 and current_chunk):

            # Save current chunk
            if current_chunk:
                chunks.append('\n'.join(current_chunk))
                current_chunk = []
                current_size = 0

        # Add element to current chunk if size permits
        if current_size + element_size <= max_chunk_size:
            current_chunk.append(element.content)
            current_size += element_size
        else:
            # Save current chunk and start new one
            if current_chunk:
                chunks.append('\n'.join(current_chunk))
            current_chunk = [element.content]
            current_size = element_size

    # Save final chunk
    if current_chunk:
        chunks.append('\n'.join(current_chunk))

    return chunks

Key Benefits: - Natural boundaries: Uses document structure instead of arbitrary splits - Complete sections: Keeps related content together - Size management: Respects maximum chunk size while preserving structure

PARTICIPANT PATH: Advanced Hierarchical Chunker

Let's implement a production-ready hierarchical chunker with overlap and metadata:

from langchain.schema import Document

class HierarchicalChunker:
    """Creates intelligent chunks based on document hierarchy."""

    def __init__(self, max_chunk_size: int = 1000, overlap_ratio: float = 0.1):
        self.max_chunk_size = max_chunk_size
        self.overlap_ratio = overlap_ratio
        self.analyzer = DocumentStructureAnalyzer()

    def create_hierarchical_chunks(self, document: Document) -> List[Document]:
        """Create chunks that preserve document hierarchy."""
        # Analyze document structure
        elements = self.analyzer.analyze_structure(document.page_content)

        # Group elements into logical sections
        sections = self._group_elements_by_hierarchy(elements)

        # Create chunks from sections
        chunks = []
        for section in sections:
            section_chunks = self._chunk_section(section, document.metadata)
            chunks.extend(section_chunks)

        return chunks

    def _group_elements_by_hierarchy(self, elements: List[DocumentElement]) -> List[List[DocumentElement]]:
        """Group elements into hierarchical sections."""
        sections = []
        current_section = []
        current_level = -1

        for element in elements:
            # Start new section on same or higher level heading
            if (element.element_type == ContentType.HEADING and
                element.level <= current_level and current_section):
                sections.append(current_section)
                current_section = [element]
                current_level = element.level
            elif element.element_type == ContentType.HEADING and not current_section:
                current_section = [element]
                current_level = element.level
            else:
                current_section.append(element)

        # Add final section
        if current_section:
            sections.append(current_section)

        return sections

    def _chunk_section(self, section: List[DocumentElement], 
                      base_metadata: Dict) -> List[Document]:
        """Create chunks from a document section with intelligent overlap."""
        chunks = []
        current_chunk_elements = []
        current_size = 0

        section_title = self._extract_section_title(section)

        for element in section:
            element_size = len(element.content)

            # Check if adding this element would exceed size limit
            if current_size + element_size > self.max_chunk_size and current_chunk_elements:
                # Create chunk from current elements
                chunk = self._create_chunk_from_elements(
                    current_chunk_elements, base_metadata, section_title
                )
                chunks.append(chunk)

                # Start new chunk with overlap for continuity
                overlap_elements = self._get_overlap_elements(current_chunk_elements)
                current_chunk_elements = overlap_elements + [element]
                current_size = sum(len(e.content) for e in current_chunk_elements)
            else:
                current_chunk_elements.append(element)
                current_size += element_size

        # Create final chunk
        if current_chunk_elements:
            chunk = self._create_chunk_from_elements(
                current_chunk_elements, base_metadata, section_title
            )
            chunks.append(chunk)

        return chunks

    def _create_chunk_from_elements(self, elements: List[DocumentElement],
                                  base_metadata: Dict, section_title: str) -> Document:
        """Create a document chunk with rich metadata."""
        # Combine element content with proper formatting
        content_parts = []
        for element in elements:
            if element.element_type == ContentType.HEADING:
                content_parts.append(f"\n{element.content}\n")
            else:
                content_parts.append(element.content)

        content = "\n".join(content_parts).strip()

        # Build enhanced metadata
        content_types = [e.element_type.value for e in elements]
        hierarchy_levels = [e.level for e in elements]

        enhanced_metadata = {
            **base_metadata,
            "section_title": section_title,
            "chunk_type": "hierarchical",
            "content_types": list(set(content_types)),
            "hierarchy_levels": hierarchy_levels,
            "element_count": len(elements),
            "has_heading": ContentType.HEADING.value in content_types,
            "has_table": ContentType.TABLE.value in content_types,
            "has_code": ContentType.CODE.value in content_types,
            "min_hierarchy_level": min(hierarchy_levels),
            "max_hierarchy_level": max(hierarchy_levels)
        }

        return Document(page_content=content, metadata=enhanced_metadata)

This implementation provides intelligent section grouping, size management, and context preservation through overlap.

IMPLEMENTER PATH: Enterprise Chunking Pipeline

For production environments, implement a comprehensive pipeline with quality assessment:

class EnterpriseChunkingPipeline:
    """Enterprise-grade chunking pipeline with quality assessment."""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.chunker = HierarchicalChunker(
            max_chunk_size=config.get("max_chunk_size", 1000),
            overlap_ratio=config.get("overlap_ratio", 0.1)
        )
        self.quality_assessor = ChunkQualityAssessor()

    def process_document_with_quality_control(self, document: Document) -> Dict[str, Any]:
        """Process document with comprehensive quality assessment."""
        # Create initial chunks
        chunks = self.chunker.create_hierarchical_chunks(document)

        # Assess chunk quality
        quality_metrics = self.quality_assessor.assess_chunk_quality(chunks)

        # Apply quality-based optimization if needed
        if quality_metrics["overall_quality"] < self.config.get("min_quality_threshold", 0.7):
            chunks = self._optimize_chunks(chunks, quality_metrics)
            quality_metrics = self.quality_assessor.assess_chunk_quality(chunks)

        return {
            "chunks": chunks,
            "quality_metrics": quality_metrics,
            "processing_stats": {
                "original_length": len(document.page_content),
                "chunk_count": len(chunks),
                "avg_chunk_size": sum(len(c.page_content) for c in chunks) / len(chunks),
                "quality_score": quality_metrics["overall_quality"]
            }
        }

This enterprise pipeline includes quality control and optimization feedback loops.


Part 3: Metadata Extraction & Enhancement (Observer: 6 min | Participant: 15 min)

The Power of Rich Metadata

Metadata significantly improves retrieval quality by creating additional searchable context. When users search for concepts, the system can match not just original text but also extracted entities, keywords, and inferred topics.

Simple Metadata Extraction (Observer Focus)

Here's a basic example of extracting useful metadata from document chunks:

import re
from typing import List, Dict

def extract_simple_metadata(text: str) -> Dict[str, Any]:
    """Extract basic metadata from text content."""
    words = text.split()

    # Basic statistics
    metadata = {
        "word_count": len(words),
        "char_count": len(text),
        "sentence_count": len(text.split('.')),
    }

    # Extract capitalized words (potential entities)
    capitalized_words = re.findall(r'\b[A-Z][a-z]+\b', text)
    metadata["potential_entities"] = list(set(capitalized_words))[:5]

    # Extract numbers and dates
    numbers = re.findall(r'\b\d+(?:\.\d+)?\b', text)
    metadata["numbers"] = [float(n) for n in numbers[:5]]

    dates = re.findall(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{4}\b', text)
    metadata["dates"] = dates

    # Assess content difficulty
    long_words = [w for w in words if len(w) > 6]
    metadata["difficulty_level"] = "advanced" if len(long_words) / len(words) > 0.3 else "intermediate"

    return metadata

Metadata Benefits: - Enhanced searchability: Additional context for retrieval matching - Content understanding: Insight into chunk characteristics - Quality assessment: Metrics for evaluating chunk usefulness

PARTICIPANT PATH: Advanced Metadata Extractor

For production systems, implement comprehensive metadata extraction:

from dataclasses import dataclass
from datetime import datetime

@dataclass
class ExtractedMetadata:
    """Container for extracted metadata."""
    entities: List[str]
    keywords: List[str]
    topics: List[str]
    dates: List[str]
    numbers: List[float]
    technical_terms: List[str]
    difficulty_level: str
    content_summary: str

class MetadataExtractor:
    """Extracts rich metadata from document content."""

    def __init__(self):
        self.technical_patterns = [
            r'\b[A-Z]{2,}\b',                    # Acronyms
            r'\b\w+\(\)\b',                      # Function calls
            r'\b[a-zA-Z_]\w*\.[a-zA-Z_]\w*\b',   # Object notation
            r'\b\d+\.\d+\.\d+\b',                # Version numbers
        ]

        self.topic_keywords = {
            "technology": ["software", "computer", "digital", "algorithm", "data", "system"],
            "business": ["market", "customer", "revenue", "strategy", "company", "industry"],
            "legal": ["contract", "agreement", "clause", "statute", "regulation", "compliance"],
            "medical": ["patient", "treatment", "diagnosis", "medication", "therapy", "clinical"]
        }

    def extract_enhanced_metadata(self, text: str) -> ExtractedMetadata:
        """Extract comprehensive metadata from text."""

        # Extract different types of information
        entities = self._extract_entities(text)
        keywords = self._extract_keywords(text)
        topics = self._infer_topics(text)
        dates = self._extract_dates(text)
        numbers = self._extract_numbers(text)
        technical_terms = self._extract_technical_terms(text)
        difficulty_level = self._assess_difficulty(text)
        content_summary = self._generate_summary(text)

        return ExtractedMetadata(
            entities=entities,
            keywords=keywords,
            topics=topics,
            dates=dates,
            numbers=numbers,
            technical_terms=technical_terms,
            difficulty_level=difficulty_level,
            content_summary=content_summary
        )

    def _extract_entities(self, text: str) -> List[str]:
        """Extract named entities using pattern matching."""
        entities = []

        # Extract capitalized words (potential proper nouns)
        capitalized_words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', text)
        entities.extend(capitalized_words)

        # Extract quoted terms
        quoted_terms = re.findall(r'"([^"]*)"', text)
        entities.extend(quoted_terms)

        # Remove duplicates and filter by length
        entities = list(set([e for e in entities if 2 < len(e) < 50]))

        return entities[:10]  # Limit to top 10

    def _infer_topics(self, text: str) -> List[str]:
        """Infer topics from content using keyword analysis."""
        text_lower = text.lower()
        topic_scores = {}

        for topic, keywords in self.topic_keywords.items():
            score = sum(text_lower.count(keyword) for keyword in keywords)
            if score > 0:
                topic_scores[topic] = score

        # Return topics sorted by relevance
        return sorted(topic_scores.keys(), key=lambda x: topic_scores[x], reverse=True)[:3]

    def _assess_difficulty(self, text: str) -> str:
        """Assess content difficulty level."""
        words = text.split()
        sentences = text.split('.')

        if not words or not sentences:
            return "unknown"

        # Calculate readability metrics
        avg_words_per_sentence = len(words) / len(sentences)
        long_words = len([w for w in words if len(w) > 6])
        long_word_ratio = long_words / len(words) if words else 0

        # Technical term density
        technical_terms = len(self._extract_technical_terms(text))
        technical_density = technical_terms / len(words) if words else 0

        # Determine difficulty
        if avg_words_per_sentence > 20 or long_word_ratio > 0.3 or technical_density > 0.1:
            return "advanced"
        elif avg_words_per_sentence > 15 or long_word_ratio > 0.2:
            return "intermediate"
        else:
            return "beginner"

Metadata-Enhanced Chunking Integration

Now let's combine hierarchical chunking with metadata extraction:

class MetadataEnhancedChunker:
    """Chunker that enriches chunks with extracted metadata."""

    def __init__(self, max_chunk_size: int = 1000):
        self.hierarchical_chunker = HierarchicalChunker(max_chunk_size=max_chunk_size)
        self.metadata_extractor = MetadataExtractor()

    def create_enhanced_chunks(self, document: Document) -> List[Document]:
        """Create chunks with rich metadata."""
        # First, create hierarchical chunks
        chunks = self.hierarchical_chunker.create_hierarchical_chunks(document)

        # Enhance each chunk with extracted metadata
        enhanced_chunks = []
        for chunk in chunks:
            enhanced_chunk = self._enhance_chunk_metadata(chunk)
            enhanced_chunks.append(enhanced_chunk)

        return enhanced_chunks

    def _enhance_chunk_metadata(self, chunk: Document) -> Document:
        """Enhance chunk with extracted metadata."""
        # Extract metadata from chunk content
        extracted_metadata = self.metadata_extractor.extract_enhanced_metadata(chunk.page_content)

        # Merge extracted metadata with existing metadata
        enhanced_metadata = {
            **chunk.metadata,
            "entities": extracted_metadata.entities,
            "keywords": extracted_metadata.keywords,
            "topics": extracted_metadata.topics,
            "technical_terms": extracted_metadata.technical_terms,
            "difficulty_level": extracted_metadata.difficulty_level,
            "content_summary": extracted_metadata.content_summary,
            "enhanced_at": datetime.now().isoformat()
        }

        # Create searchable content that includes metadata
        searchable_content = self._create_searchable_content(chunk.page_content, extracted_metadata)

        return Document(page_content=searchable_content, metadata=enhanced_metadata)

    def _create_searchable_content(self, original_content: str, metadata: ExtractedMetadata) -> str:
        """Create enhanced searchable content."""
        metadata_text_parts = []

        if metadata.keywords:
            metadata_text_parts.append(f"Keywords: {', '.join(metadata.keywords)}")

        if metadata.topics:
            metadata_text_parts.append(f"Topics: {', '.join(metadata.topics)}")

        if metadata.entities:
            metadata_text_parts.append(f"Entities: {', '.join(metadata.entities[:5])}")

        metadata_text = "\n".join(metadata_text_parts)

        # Combine original content with metadata
        if metadata_text:
            return f"{original_content}\n\n--- Metadata ---\n{metadata_text}"
        else:
            return original_content

Part 4: Multi-Modal & Quality Assessment (Observer: 8 min | Participant: 15 min)

Handling Complex Document Types

Real-world documents often contain tables, images, and mixed content that require specialized processing to maintain their meaning and relationships.

Table-Aware Processing (Observer Focus)

Tables are particularly challenging because they contain structured relationships that must be preserved:

def process_simple_table(table_text: str) -> Dict[str, Any]:
    """Process table content while preserving structure."""
    lines = table_text.strip().split('\n')
    table_lines = [line for line in lines if '|' in line]

    if not table_lines:
        return {"error": "No table structure found"}

    # Extract headers from first row
    header_row = table_lines[0]
    headers = [cell.strip() for cell in header_row.split('|') if cell.strip()]

    # Count data rows (excluding header and separator)
    data_rows = len(table_lines) - 2 if len(table_lines) > 2 else 0

    # Create enhanced description
    description = f"Table with {data_rows} rows and {len(headers)} columns"
    if headers:
        description += f" containing data about: {', '.join(headers)}"

    return {
        "enhanced_content": f"{description}\n\n{table_text}",
        "metadata": {
            "content_type": "table",
            "row_count": data_rows,
            "column_count": len(headers),
            "headers": headers
        }
    }

Table Processing Benefits: - Structure preservation: Tables remain intact and meaningful - Enhanced searchability: Descriptive text makes tables discoverable - Metadata enrichment: Table characteristics available for retrieval

Basic Quality Assessment

Let's implement simple metrics to evaluate chunk quality:

def assess_basic_quality(chunks: List[str]) -> Dict[str, float]:
    """Assess basic quality metrics for chunks."""
    if not chunks:
        return {"error": "No chunks to assess"}

    # Calculate size consistency
    chunk_sizes = [len(chunk) for chunk in chunks]
    avg_size = sum(chunk_sizes) / len(chunk_sizes)
    size_variance = sum((size - avg_size) ** 2 for size in chunk_sizes) / len(chunk_sizes)
    size_consistency = 1.0 / (1.0 + size_variance / (avg_size ** 2))

    # Calculate information density
    densities = []
    for chunk in chunks:
        words = chunk.split()
        unique_words = set(words)
        if words:
            density = len(unique_words) / len(words)
            densities.append(density)

    avg_density = sum(densities) / len(densities) if densities else 0

    return {
        "size_consistency": size_consistency,
        "avg_information_density": avg_density,
        "chunk_count": len(chunks),
        "avg_chunk_size": avg_size
    }

PARTICIPANT PATH: Complete Processing Pipeline

Let's build a comprehensive pipeline that combines all our advanced techniques:

class AdvancedProcessingPipeline:
    """Complete advanced document processing pipeline."""

    def __init__(self, max_chunk_size: int = 1000, enable_quality_assessment: bool = True):
        self.max_chunk_size = max_chunk_size
        self.enable_quality_assessment = enable_quality_assessment

        # Initialize processors
        self.metadata_chunker = MetadataEnhancedChunker(max_chunk_size=max_chunk_size)
        self.quality_assessor = ChunkQualityAssessor() if enable_quality_assessment else None

    def process_document(self, document: Document) -> Dict[str, Any]:
        """Process document using the most appropriate strategy."""

        # Analyze document characteristics
        doc_analysis = self._analyze_document_complexity(document)

        # Choose processing strategy
        if doc_analysis["has_tables"]:
            print("Detected tables - using table-aware processing...")
            processed_chunks = self._process_with_table_awareness(document)
        else:
            print("Using standard hierarchical processing...")
            processed_chunks = self.metadata_chunker.create_enhanced_chunks(document)

        # Assess quality if enabled
        quality_metrics = {}
        if self.enable_quality_assessment and self.quality_assessor:
            quality_metrics = self.quality_assessor.assess_chunk_quality(processed_chunks)

        # Add processing metadata
        for chunk in processed_chunks:
            chunk.metadata.update({
                "processing_strategy": doc_analysis["recommended_strategy"],
                "document_complexity": doc_analysis["complexity_score"],
                "processing_pipeline": "advanced_v2"
            })

        return {
            "chunks": processed_chunks,
            "document_analysis": doc_analysis,
            "quality_metrics": quality_metrics,
            "processing_stats": {
                "chunk_count": len(processed_chunks),
                "total_processed_chars": sum(len(c.page_content) for c in processed_chunks),
                "avg_chunk_size": sum(len(c.page_content) for c in processed_chunks) / len(processed_chunks)
            }
        }

    def _analyze_document_complexity(self, document: Document) -> Dict[str, Any]:
        """Analyze document to determine optimal processing strategy."""
        content = document.page_content

        # Detect various content types
        has_tables = "|" in content and content.count("|") > 5
        has_code = "```" in content or content.count("    ") > 3
        has_lists = content.count("- ") > 3 or content.count("* ") > 3
        has_headings = content.count("#") > 2

        # Calculate complexity score
        complexity_score = 0
        if has_tables: complexity_score += 3
        if has_code: complexity_score += 2
        if has_lists: complexity_score += 1
        if has_headings: complexity_score += 2

        # Determine strategy
        if has_tables:
            strategy = "table_aware"
        elif complexity_score > 4:
            strategy = "hierarchical"
        else:
            strategy = "standard"

        return {
            "has_tables": has_tables,
            "has_code": has_code,
            "has_lists": has_lists,
            "has_headings": has_headings,
            "complexity_score": complexity_score,
            "recommended_strategy": strategy
        }

IMPLEMENTER PATH: Enterprise Quality Control

For production systems, implement comprehensive quality assessment:

class ChunkQualityAssessor:
    """Comprehensive chunk quality assessment."""

    def assess_chunk_quality(self, chunks: List[Document]) -> Dict[str, float]:
        """Multi-dimensional quality assessment."""
        if not chunks:
            return {metric: 0.0 for metric in ["coherence", "density", "consistency", "overall"]}

        # Calculate individual metrics
        coherence = self._calculate_coherence_score(chunks)
        density = self._calculate_information_density(chunks)
        consistency = self._calculate_size_consistency(chunks)
        metadata_richness = self._calculate_metadata_richness(chunks)

        overall_quality = (coherence + density + consistency + metadata_richness) / 4

        return {
            "coherence_score": coherence,
            "information_density": density,
            "size_consistency": consistency,
            "metadata_richness": metadata_richness,
            "overall_quality": overall_quality
        }

    def _calculate_coherence_score(self, chunks: List[Document]) -> float:
        """Calculate topic coherence between adjacent chunks."""
        if len(chunks) < 2:
            return 1.0

        coherence_scores = []
        for i in range(len(chunks) - 1):
            current_topics = set(chunks[i].metadata.get("topics", []))
            next_topics = set(chunks[i + 1].metadata.get("topics", []))

            if current_topics and next_topics:
                overlap = len(current_topics & next_topics)
                union = len(current_topics | next_topics)
                score = overlap / union if union > 0 else 0
                coherence_scores.append(score)

        return sum(coherence_scores) / len(coherence_scores) if coherence_scores else 0.0

    def _calculate_information_density(self, chunks: List[Document]) -> float:
        """Calculate average information density across chunks."""
        densities = []

        for chunk in chunks:
            words = chunk.page_content.split()
            unique_words = set(words)

            if words:
                density = len(unique_words) / len(words)
                densities.append(density)

        return sum(densities) / len(densities) if densities else 0.0

    def _calculate_metadata_richness(self, chunks: List[Document]) -> float:
        """Assess metadata completeness across chunks."""
        expected_fields = ["topics", "entities", "keywords", "difficulty_level"]

        richness_scores = []
        for chunk in chunks:
            present_fields = sum(1 for field in expected_fields 
                               if field in chunk.metadata and chunk.metadata[field])
            score = present_fields / len(expected_fields)
            richness_scores.append(score)

        return sum(richness_scores) / len(richness_scores) if richness_scores else 0.0

Optional Deep-Dive Modules

โš ๏ธ OPTIONAL CONTENT - Choose based on your goals:


Multiple Choice Test - Session 2

Test your understanding of advanced chunking and preprocessing concepts:

Question 1: What is the primary benefit of detecting content types (headings, tables, code) during document analysis?
A) Reduces processing time
B) Enables structure-aware chunking that preserves meaning
C) Reduces storage requirements
D) Improves embedding quality

Question 2: In hierarchical chunking, why is it important to track element hierarchy levels?
A) To reduce memory usage
B) To simplify the codebase
C) To improve processing speed
D) To preserve document structure and create meaningful chunk boundaries

Question 3: What is the main advantage of extracting entities, keywords, and topics during preprocessing?
A) Reduces chunk size
B) Enables more precise retrieval through enriched context
C) Simplifies the chunking process
D) Improves computational efficiency

Question 4: Why do tables require specialized processing in RAG systems?
A) Tables use different encoding formats
B) Tables contain more text than paragraphs
C) Tables are always larger than the chunk size
D) Tables have structured relationships that are lost in naive chunking

Question 5: When processing documents with images, what is the best practice for RAG systems?
A) Store images as binary data in chunks
B) Create separate chunks for each image
C) Replace image references with descriptive text
D) Ignore images completely

Question 6: Which metric is most important for measuring chunk coherence in hierarchical chunking?
A) Topic consistency between related chunks
B) Number of chunks created
C) Average chunk size
D) Processing speed

Question 7: What is the optimal overlap ratio for hierarchical chunks?
A) 100% - complete duplication
B) 0% - no overlap needed
C) 10-20% - balanced context and efficiency
D) 50% - maximum context preservation

Question 8: Why should the advanced processing pipeline analyze document complexity before choosing a processing strategy?
A) To select the most appropriate processing approach for the content type
B) To set the embedding model parameters
C) To reduce computational costs
D) To determine the number of chunks to create

๐Ÿ—‚๏ธ View Test Solutions โ†’


Previous: Session 1 - Basic RAG Implementation

Optional Deep Dive Modules:

Next: Session 3 - Vector Databases & Search Optimization โ†’