⚙️ Session 2: Quality Assessment Systems¶
⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer, 📝 Participant, and ⚙️ Advanced Processing Pipeline Time Investment: 2-3 hours Outcome: Master comprehensive quality control systems
Learning Outcomes¶
After mastering this advanced quality system, you will:
- Build multi-dimensional quality assessment frameworks
- Implement automated quality optimization loops
- Create comprehensive monitoring and alerting systems
- Deploy production-grade quality control pipelines
Enterprise Quality Control Framework¶
The final component is comprehensive quality assessment that evaluates chunks across multiple dimensions. This enables automated quality control and optimization feedback, ensuring consistent high-quality output for production RAG systems.
Multi-Dimensional Quality Assessment¶
Quality assessment is essential because not all chunks are created equal. Some preserve meaning well, others lose crucial context. You need metrics to identify and fix problematic chunks before they affect retrieval performance.
Comprehensive Quality Assessor¶
class ChunkQualityAssessor:
"""Comprehensive chunk quality assessment framework."""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or self._get_default_config()
self.quality_weights = {
"coherence": 0.25,
"density": 0.25,
"consistency": 0.20,
"metadata_richness": 0.15,
"completeness": 0.15
}
def assess_chunk_quality(self, chunks: List[Document]) -> Dict[str, float]:
"""Multi-dimensional quality assessment."""
if not chunks:
return {metric: 0.0 for metric in ["coherence", "density", "consistency", "overall"]}
# Calculate individual quality dimensions
coherence = self._calculate_coherence_score(chunks)
density = self._calculate_information_density(chunks)
consistency = self._calculate_size_consistency(chunks)
metadata_richness = self._calculate_metadata_richness(chunks)
completeness = self._calculate_content_completeness(chunks)
# Calculate weighted overall quality
overall_quality = (
coherence * self.quality_weights["coherence"] +
density * self.quality_weights["density"] +
consistency * self.quality_weights["consistency"] +
metadata_richness * self.quality_weights["metadata_richness"] +
completeness * self.quality_weights["completeness"]
)
return {
"coherence_score": coherence,
"information_density": density,
"size_consistency": consistency,
"metadata_richness": metadata_richness,
"content_completeness": completeness,
"overall_quality": overall_quality,
"quality_grade": self._assign_quality_grade(overall_quality)
}
The main assessment method coordinates five quality dimensions with configurable weights, providing both individual metrics and an overall quality score.
Quality Dimension Implementation¶
Topic Coherence Analysis¶
Coherence measures how well adjacent chunks relate to each other thematically:
def _calculate_coherence_score(self, chunks: List[Document]) -> float:
"""Calculate topic coherence between adjacent chunks."""
if len(chunks) < 2:
return 1.0
coherence_scores = []
for i in range(len(chunks) - 1):
current_topics = set(chunks[i].metadata.get("topics", []))
next_topics = set(chunks[i + 1].metadata.get("topics", []))
if current_topics and next_topics:
# Jaccard similarity for topic overlap
overlap = len(current_topics & next_topics)
union = len(current_topics | next_topics)
score = overlap / union if union > 0 else 0
coherence_scores.append(score)
# Also check content similarity
content_similarity = self._calculate_content_similarity(
chunks[i].page_content, chunks[i + 1].page_content
)
coherence_scores.append(content_similarity)
return sum(coherence_scores) / len(coherence_scores) if coherence_scores else 0.0
Coherence assessment uses both topic overlap (Jaccard similarity) and content similarity to identify cases where chunking boundaries break logical topic flow.
Advanced Content Similarity¶
def _calculate_content_similarity(self, content1: str, content2: str) -> float:
"""Calculate semantic similarity between content pieces."""
words1 = set(content1.lower().split())
words2 = set(content2.lower().split())
if not words1 or not words2:
return 0.0
# Remove stop words for better similarity calculation
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
words1 = words1 - stop_words
words2 = words2 - stop_words
if not words1 or not words2:
return 0.0
# Calculate Jaccard similarity
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union if union > 0 else 0.0
Content similarity calculation focuses on meaningful words while filtering out common stop words that don't contribute to semantic meaning.
Information Density Measurement¶
Information density measures vocabulary richness and content diversity:
def _calculate_information_density(self, chunks: List[Document]) -> float:
"""Calculate average information density across chunks."""
densities = []
for chunk in chunks:
words = chunk.page_content.split()
unique_words = set(word.lower() for word in words if len(word) > 2)
if words:
# Basic lexical diversity
lexical_diversity = len(unique_words) / len(words)
# Sentence complexity factor
sentences = [s.strip() for s in chunk.page_content.split('.') if s.strip()]
avg_sentence_length = len(words) / max(len(sentences), 1)
complexity_factor = min(avg_sentence_length / 15, 1.5) # Normalize around 15 words/sentence
# Technical content factor
technical_terms = len(chunk.metadata.get('technical_terms', []))
tech_factor = min(technical_terms / max(len(words), 1) * 100, 1.0)
density = lexical_diversity * complexity_factor * (1 + tech_factor)
densities.append(min(density, 1.0)) # Cap at 1.0
return sum(densities) / len(densities) if densities else 0.0
Information density combines lexical diversity, sentence complexity, and technical content to provide a comprehensive measure of content richness.
Size Consistency Analysis¶
Size consistency evaluates how uniformly chunks are sized, which affects processing efficiency:
def _calculate_size_consistency(self, chunks: List[Document]) -> float:
"""Calculate consistency of chunk sizes."""
if len(chunks) < 2:
return 1.0
chunk_sizes = [len(chunk.page_content) for chunk in chunks]
mean_size = sum(chunk_sizes) / len(chunk_sizes)
if mean_size == 0:
return 0.0
# Calculate coefficient of variation (normalized standard deviation)
variance = sum((size - mean_size) ** 2 for size in chunk_sizes) / len(chunk_sizes)
std_deviation = variance ** 0.5
coefficient_of_variation = std_deviation / mean_size
# Convert to consistency score (lower variation = higher consistency)
consistency_score = max(0, 1 - (coefficient_of_variation / 2)) # Normalize
return consistency_score
Size consistency uses coefficient of variation to measure how uniformly chunks are sized, with lower variation indicating better consistency.
Metadata Completeness Assessment¶
Metadata richness evaluates how well the extraction pipeline populated chunk metadata:
def _calculate_metadata_richness(self, chunks: List[Document]) -> float:
"""Assess metadata completeness across chunks."""
expected_fields = [
"topics", "entities", "keywords", "difficulty_level",
"technical_terms", "content_types", "section_title"
]
richness_scores = []
for chunk in chunks:
present_fields = 0
total_content_quality = 0
for field in expected_fields:
if field in chunk.metadata:
field_value = chunk.metadata[field]
if field_value: # Not empty
present_fields += 1
# Assess quality of field content
if isinstance(field_value, list):
content_quality = min(len(field_value) / 3, 1.0) # Normalize list length
else:
content_quality = min(len(str(field_value)) / 20, 1.0) # Normalize string length
total_content_quality += content_quality
# Combine presence and quality scores
presence_score = present_fields / len(expected_fields)
quality_score = total_content_quality / len(expected_fields) if expected_fields else 0
combined_score = (presence_score + quality_score) / 2
richness_scores.append(combined_score)
return sum(richness_scores) / len(richness_scores) if richness_scores else 0.0
Metadata richness considers both the presence of expected fields and the quality/completeness of their content.
Content Completeness Evaluation¶
def _calculate_content_completeness(self, chunks: List[Document]) -> float:
"""Evaluate content completeness and integrity."""
completeness_scores = []
for chunk in chunks:
content = chunk.page_content
# Check for complete sentences
sentences = [s.strip() for s in content.split('.') if s.strip()]
if sentences:
# Last sentence should end with proper punctuation
ends_properly = content.strip().endswith(('.', '!', '?', ':', ';'))
sentence_completeness = 1.0 if ends_properly else 0.7
else:
sentence_completeness = 0.5
# Check for balanced structure elements
has_headers = bool(re.search(r'^#+ ', content, re.MULTILINE))
has_content = len(content.split()) > 10
has_structure = chunk.metadata.get('content_types', [])
structure_completeness = 0
if has_content: structure_completeness += 0.4
if has_structure: structure_completeness += 0.3
if has_headers: structure_completeness += 0.3
# Check for truncation indicators
truncation_penalties = 0
truncation_indicators = ['...', '[truncated]', '[continued]', '##']
for indicator in truncation_indicators:
if indicator in content:
truncation_penalties += 0.2
completeness = max(0, (sentence_completeness + structure_completeness) / 2 - truncation_penalties)
completeness_scores.append(completeness)
return sum(completeness_scores) / len(completeness_scores) if completeness_scores else 0.0
Content completeness checks for sentence integrity, structural balance, and absence of truncation indicators.
Quality Grading and Feedback¶
Quality Grade Assignment¶
def _assign_quality_grade(self, overall_quality: float) -> str:
"""Assign letter grade based on overall quality score."""
if overall_quality >= 0.9:
return "A" # Excellent
elif overall_quality >= 0.8:
return "B" # Good
elif overall_quality >= 0.7:
return "C" # Acceptable
elif overall_quality >= 0.6:
return "D" # Below Average
else:
return "F" # Poor
def generate_quality_report(self, chunks: List[Document]) -> Dict[str, Any]:
"""Generate comprehensive quality assessment report."""
quality_metrics = self.assess_chunk_quality(chunks)
# Identify problematic chunks
problematic_chunks = []
for i, chunk in enumerate(chunks):
chunk_issues = self._identify_chunk_issues(chunk)
if chunk_issues:
problematic_chunks.append({
"chunk_index": i,
"issues": chunk_issues,
"severity": self._calculate_issue_severity(chunk_issues)
})
# Generate recommendations
recommendations = self._generate_quality_recommendations(quality_metrics, problematic_chunks)
return {
"quality_metrics": quality_metrics,
"chunk_count": len(chunks),
"problematic_chunks": problematic_chunks,
"recommendations": recommendations,
"quality_summary": {
"grade": quality_metrics["quality_grade"],
"overall_score": quality_metrics["overall_quality"],
"improvement_needed": quality_metrics["overall_quality"] < 0.7,
"critical_issues": len([p for p in problematic_chunks if p["severity"] == "high"])
}
}
Quality reporting provides actionable insights for improving chunk quality.
Issue Detection and Diagnosis¶
def _identify_chunk_issues(self, chunk: Document) -> List[Dict[str, str]]:
"""Identify specific quality issues in a chunk."""
issues = []
content = chunk.page_content
metadata = chunk.metadata
# Content length issues
if len(content) < 50:
issues.append({"type": "content_length", "severity": "medium",
"description": "Chunk is too short, may lack context"})
elif len(content) > 2000:
issues.append({"type": "content_length", "severity": "low",
"description": "Chunk is very long, may impact processing"})
# Sentence completeness
if not content.strip().endswith(('.', '!', '?', ':')):
issues.append({"type": "incomplete_sentence", "severity": "medium",
"description": "Chunk ends with incomplete sentence"})
# Metadata completeness
expected_metadata = ["topics", "entities", "keywords"]
missing_metadata = [field for field in expected_metadata
if field not in metadata or not metadata[field]]
if missing_metadata:
issues.append({"type": "metadata_incomplete", "severity": "low",
"description": f"Missing metadata: {', '.join(missing_metadata)}"})
# Content quality
words = content.split()
if len(words) > 0:
unique_words = set(words)
if len(unique_words) / len(words) < 0.3: # Low lexical diversity
issues.append({"type": "low_diversity", "severity": "medium",
"description": "Content has low lexical diversity"})
return issues
def _generate_quality_recommendations(self, quality_metrics: Dict[str, float],
problematic_chunks: List[Dict]) -> List[str]:
"""Generate actionable quality improvement recommendations."""
recommendations = []
# Overall quality recommendations
if quality_metrics["overall_quality"] < 0.6:
recommendations.append("Consider re-chunking with different parameters")
if quality_metrics["coherence_score"] < 0.5:
recommendations.append("Improve topic coherence by adjusting chunk boundaries")
recommendations.append("Consider increasing chunk overlap ratio")
if quality_metrics["information_density"] < 0.4:
recommendations.append("Chunks may be too large; consider reducing max chunk size")
recommendations.append("Review content for repetitive or low-value text")
if quality_metrics["size_consistency"] < 0.6:
recommendations.append("Improve size consistency with better boundary detection")
if quality_metrics["metadata_richness"] < 0.5:
recommendations.append("Enhance metadata extraction patterns")
recommendations.append("Review domain-specific terminology extraction")
# Chunk-specific recommendations
high_severity_issues = len([p for p in problematic_chunks if p["severity"] == "high"])
if high_severity_issues > 0:
recommendations.append(f"Address {high_severity_issues} high-severity chunk issues")
return recommendations
Issue detection provides specific, actionable feedback for improving individual chunks and overall processing quality.
Automated Quality Optimization¶
Quality-Based Optimization Engine¶
class QualityOptimizationEngine:
"""Automated system for improving chunk quality."""
def __init__(self, quality_assessor: ChunkQualityAssessor):
self.quality_assessor = quality_assessor
self.optimization_strategies = {
"merge_small_chunks": self._merge_small_chunks,
"split_large_chunks": self._split_large_chunks,
"enhance_metadata": self._enhance_metadata,
"fix_boundaries": self._fix_chunk_boundaries,
"improve_coherence": self._improve_coherence
}
def optimize_chunks(self, chunks: List[Document],
target_quality: float = 0.8) -> List[Document]:
"""Automatically optimize chunks to meet quality targets."""
current_quality = self.quality_assessor.assess_chunk_quality(chunks)
if current_quality["overall_quality"] >= target_quality:
return chunks # Already meets quality target
optimized_chunks = chunks.copy()
optimization_attempts = 0
max_attempts = 3
while (current_quality["overall_quality"] < target_quality and
optimization_attempts < max_attempts):
# Identify primary quality issue
primary_issue = self._identify_primary_issue(current_quality)
# Apply appropriate optimization strategy
if primary_issue in self.optimization_strategies:
optimized_chunks = self.optimization_strategies[primary_issue](optimized_chunks)
current_quality = self.quality_assessor.assess_chunk_quality(optimized_chunks)
optimization_attempts += 1
return optimized_chunks
The optimization engine automatically applies improvement strategies based on identified quality issues.
Specific Optimization Strategies¶
def _merge_small_chunks(self, chunks: List[Document]) -> List[Document]:
"""Merge chunks that are too small to be meaningful."""
optimized_chunks = []
i = 0
while i < len(chunks):
current_chunk = chunks[i]
if len(current_chunk.page_content) < 100 and i < len(chunks) - 1:
# Merge with next chunk
next_chunk = chunks[i + 1]
merged_content = f"{current_chunk.page_content}\n\n{next_chunk.page_content}"
# Combine metadata
merged_metadata = {**current_chunk.metadata}
for key, value in next_chunk.metadata.items():
if key in merged_metadata:
if isinstance(value, list):
merged_metadata[key] = list(set(merged_metadata[key] + value))
elif isinstance(value, (int, float)):
merged_metadata[key] = (merged_metadata[key] + value) / 2
else:
merged_metadata[key] = value
merged_chunk = Document(page_content=merged_content, metadata=merged_metadata)
optimized_chunks.append(merged_chunk)
i += 2 # Skip next chunk as it's been merged
else:
optimized_chunks.append(current_chunk)
i += 1
return optimized_chunks
def _split_large_chunks(self, chunks: List[Document]) -> List[Document]:
"""Split chunks that are too large and lose coherence."""
optimized_chunks = []
for chunk in chunks:
if len(chunk.page_content) > 1500: # Large chunk threshold
# Split at paragraph boundaries
paragraphs = chunk.page_content.split('\n\n')
if len(paragraphs) > 1:
# Create multiple chunks from paragraphs
current_content = []
current_size = 0
for paragraph in paragraphs:
para_size = len(paragraph)
if current_size + para_size > 800 and current_content:
# Create chunk from current content
chunk_content = '\n\n'.join(current_content)
new_chunk = Document(
page_content=chunk_content,
metadata={**chunk.metadata, "split_from_large": True}
)
optimized_chunks.append(new_chunk)
current_content = [paragraph]
current_size = para_size
else:
current_content.append(paragraph)
current_size += para_size + 2 # +2 for \n\n
# Add final chunk
if current_content:
chunk_content = '\n\n'.join(current_content)
new_chunk = Document(
page_content=chunk_content,
metadata={**chunk.metadata, "split_from_large": True}
)
optimized_chunks.append(new_chunk)
else:
optimized_chunks.append(chunk) # Can't split single paragraph
else:
optimized_chunks.append(chunk)
return optimized_chunks
Optimization strategies address specific quality issues with targeted improvements while preserving content integrity.
Quality Monitoring and Alerting¶
Continuous Quality Monitoring¶
class QualityMonitoringSystem:
"""Continuous monitoring system for chunk quality."""
def __init__(self, alert_thresholds: Dict[str, float] = None):
self.alert_thresholds = alert_thresholds or {
"overall_quality": 0.7,
"coherence_score": 0.6,
"information_density": 0.5,
"metadata_richness": 0.6
}
self.quality_history = []
self.alerts = []
def monitor_quality(self, chunks: List[Document],
processing_metadata: Dict = None) -> Dict[str, Any]:
"""Monitor quality and generate alerts."""
assessor = ChunkQualityAssessor()
quality_metrics = assessor.assess_chunk_quality(chunks)
# Record quality metrics
quality_record = {
"timestamp": datetime.now(),
"metrics": quality_metrics,
"chunk_count": len(chunks),
"processing_metadata": processing_metadata or {}
}
self.quality_history.append(quality_record)
# Check for quality issues and generate alerts
alerts = self._check_quality_alerts(quality_metrics)
self.alerts.extend(alerts)
# Analyze trends
trend_analysis = self._analyze_quality_trends()
return {
"current_quality": quality_metrics,
"alerts": alerts,
"trend_analysis": trend_analysis,
"monitoring_summary": self._generate_monitoring_summary()
}
def _check_quality_alerts(self, quality_metrics: Dict[str, float]) -> List[Dict]:
"""Check quality metrics against thresholds and generate alerts."""
alerts = []
for metric, threshold in self.alert_thresholds.items():
if metric in quality_metrics:
value = quality_metrics[metric]
if value < threshold:
severity = "high" if value < threshold * 0.8 else "medium"
alerts.append({
"type": "quality_threshold",
"metric": metric,
"value": value,
"threshold": threshold,
"severity": severity,
"message": f"{metric} ({value:.2f}) below threshold ({threshold})",
"timestamp": datetime.now()
})
return alerts
Quality monitoring provides continuous oversight with alerting for quality degradation.
Trend Analysis and Reporting¶
def _analyze_quality_trends(self) -> Dict[str, Any]:
"""Analyze quality trends over time."""
if len(self.quality_history) < 10:
return {"status": "insufficient_data", "message": "Need more data for trend analysis"}
recent_records = self.quality_history[-50:] # Last 50 records
trends = {}
for metric in ["overall_quality", "coherence_score", "information_density"]:
values = [record["metrics"][metric] for record in recent_records if metric in record["metrics"]]
if len(values) >= 10:
# Calculate trend direction
first_half = values[:len(values)//2]
second_half = values[len(values)//2:]
first_avg = sum(first_half) / len(first_half)
second_avg = sum(second_half) / len(second_half)
trend_direction = "improving" if second_avg > first_avg else "declining"
trend_magnitude = abs(second_avg - first_avg)
trends[metric] = {
"direction": trend_direction,
"magnitude": trend_magnitude,
"significance": "high" if trend_magnitude > 0.1 else "low"
}
return {
"trends": trends,
"analysis_period": f"Last {len(recent_records)} processing runs",
"overall_trend": self._summarize_trends(trends)
}
Trend analysis identifies quality patterns over time, enabling proactive quality management.
📝 Practice Exercises¶
Exercise 1: Comprehensive Quality Assessment¶
Build and test a complete quality assessment system:
# Sample chunks with different quality levels
test_chunks = [
Document(page_content="This is a well-structured chunk with good information density. It contains multiple sentences and covers the topic comprehensively. The content is coherent and provides valuable information.",
metadata={"topics": ["information", "quality"], "entities": ["content"], "difficulty_level": "intermediate"}),
Document(page_content="Short chunk.",
metadata={}), # Poor metadata
Document(page_content="This chunk has very repetitive content. The repetitive content makes it redundant. Redundant content is repetitive. This repetitive redundant content repeats.",
metadata={"topics": ["repetition"], "difficulty_level": "beginner"}),
Document(page_content="High-quality chunk discussing advanced machine learning algorithms including gradient descent, backpropagation, and regularization techniques. The mathematical foundations involve calculus, linear algebra, and probability theory.",
metadata={"topics": ["machine learning", "mathematics"], "entities": ["gradient descent", "backpropagation"], "technical_terms": ["algorithms", "calculus"], "difficulty_level": "advanced"})
]
# Assess quality
assessor = ChunkQualityAssessor()
quality_results = assessor.assess_chunk_quality(test_chunks)
print("Quality Assessment Results:")
for metric, score in quality_results.items():
print(f"{metric}: {score:.3f}")
# Generate detailed report
quality_report = assessor.generate_quality_report(test_chunks)
print(f"\nQuality Grade: {quality_report['quality_summary']['grade']}")
print(f"Problematic Chunks: {len(quality_report['problematic_chunks'])}")
print("Recommendations:")
for rec in quality_report['recommendations']:
print(f" - {rec}")
Exercise 2: Quality Optimization Testing¶
Test the automated optimization system:
# Create optimizer
optimizer = QualityOptimizationEngine(assessor)
# Test optimization on low-quality chunks
low_quality_chunks = [
Document(page_content="A.", metadata={}), # Too short
Document(page_content="B.", metadata={}), # Too short
Document(page_content="This is a much longer chunk that contains a lot of information but doesn't have proper structure and runs on without appropriate breaks or organization making it hard to process efficiently" * 5, metadata={"topics": ["long content"]}), # Too long
]
print("Before optimization:")
initial_quality = assessor.assess_chunk_quality(low_quality_chunks)
print(f"Overall quality: {initial_quality['overall_quality']:.3f}")
# Apply optimization
optimized_chunks = optimizer.optimize_chunks(low_quality_chunks, target_quality=0.7)
print("\nAfter optimization:")
final_quality = assessor.assess_chunk_quality(optimized_chunks)
print(f"Overall quality: {final_quality['overall_quality']:.3f}")
print(f"Number of chunks: {len(low_quality_chunks)} → {len(optimized_chunks)}")
Exercise 3: Quality Monitoring System¶
Implement and test continuous quality monitoring:
# Set up monitoring
monitor = QualityMonitoringSystem(alert_thresholds={
"overall_quality": 0.8,
"coherence_score": 0.7
})
# Simulate processing multiple document batches
test_batches = [
test_chunks, # Good quality batch
low_quality_chunks, # Poor quality batch
[Document(page_content="Medium quality content with some structure. Contains multiple sentences.",
metadata={"topics": ["medium"]})], # Medium quality
]
for i, batch in enumerate(test_batches):
print(f"\nProcessing batch {i+1}:")
monitoring_result = monitor.monitor_quality(
batch,
processing_metadata={"batch_id": i+1, "source": "test"}
)
print(f"Current quality: {monitoring_result['current_quality']['overall_quality']:.3f}")
print(f"Alerts generated: {len(monitoring_result['alerts'])}")
for alert in monitoring_result['alerts']:
print(f" ALERT: {alert['message']} (severity: {alert['severity']})")
# Generate trend analysis after multiple batches
final_monitoring = monitor.monitor_quality(test_chunks)
trend_analysis = final_monitoring['trend_analysis']
if trend_analysis.get('trends'):
print("\nQuality Trends:")
for metric, trend_info in trend_analysis['trends'].items():
print(f" {metric}: {trend_info['direction']} ({trend_info['significance']} significance)")
Production Integration¶
Integration with Processing Pipeline¶
class QualityAwareProcessingPipeline:
"""Processing pipeline with integrated quality control."""
def __init__(self):
self.chunker = MetadataEnhancedChunker()
self.quality_assessor = ChunkQualityAssessor()
self.optimizer = QualityOptimizationEngine(self.quality_assessor)
self.monitor = QualityMonitoringSystem()
def process_with_quality_control(self, document: Document) -> Dict[str, Any]:
"""Process document with comprehensive quality control."""
# Step 1: Initial chunking
chunks = self.chunker.create_enhanced_chunks(document)
# Step 2: Quality assessment
quality_metrics = self.quality_assessor.assess_chunk_quality(chunks)
# Step 3: Optimization if needed
if quality_metrics["overall_quality"] < 0.7:
chunks = self.optimizer.optimize_chunks(chunks, target_quality=0.8)
quality_metrics = self.quality_assessor.assess_chunk_quality(chunks)
# Step 4: Monitoring and alerting
monitoring_result = self.monitor.monitor_quality(chunks, {
"document_length": len(document.page_content),
"optimization_applied": quality_metrics["overall_quality"] >= 0.7
})
return {
"chunks": chunks,
"quality_metrics": quality_metrics,
"monitoring_result": monitoring_result,
"quality_assured": quality_metrics["overall_quality"] >= 0.7
}
Dashboard and Reporting¶
class QualityDashboard:
"""Quality monitoring dashboard for production systems."""
def __init__(self, monitor: QualityMonitoringSystem):
self.monitor = monitor
def generate_dashboard_data(self) -> Dict[str, Any]:
"""Generate data for quality monitoring dashboard."""
recent_history = self.monitor.quality_history[-100:]
if not recent_history:
return {"error": "No quality data available"}
# Calculate summary statistics
recent_scores = [record["metrics"]["overall_quality"] for record in recent_history]
avg_quality = sum(recent_scores) / len(recent_scores)
min_quality = min(recent_scores)
max_quality = max(recent_scores)
# Alert summary
recent_alerts = [alert for alert in self.monitor.alerts
if (datetime.now() - alert["timestamp"]).days < 7]
alert_summary = {}
for alert in recent_alerts:
severity = alert["severity"]
alert_summary[severity] = alert_summary.get(severity, 0) + 1
# Quality distribution
quality_ranges = {"excellent": 0, "good": 0, "acceptable": 0, "poor": 0}
for score in recent_scores:
if score >= 0.9:
quality_ranges["excellent"] += 1
elif score >= 0.8:
quality_ranges["good"] += 1
elif score >= 0.7:
quality_ranges["acceptable"] += 1
else:
quality_ranges["poor"] += 1
return {
"summary_stats": {
"avg_quality": avg_quality,
"min_quality": min_quality,
"max_quality": max_quality,
"total_processed": len(recent_history)
},
"alert_summary": alert_summary,
"quality_distribution": quality_ranges,
"recent_alerts": recent_alerts[-10:], # Last 10 alerts
"system_health": "healthy" if avg_quality > 0.7 and len(recent_alerts) < 5 else "degraded"
}
Key Implementation Guidelines¶
Production Best Practices¶
- Comprehensive Metrics: Implement multi-dimensional quality assessment
- Automated Optimization: Use feedback loops for continuous improvement
- Proactive Monitoring: Set up alerting for quality degradation
- Trend Analysis: Monitor quality patterns over time
Performance Considerations¶
- Efficient Assessment: Optimize quality calculations for large-scale processing
- Selective Optimization: Apply costly optimizations only when necessary
- Caching: Cache quality assessments for repeated content
- Batch Processing: Process quality assessments in batches
Integration Strategies¶
- Pipeline Integration: Embed quality control in processing workflows
- Feedback Loops: Use quality metrics to improve chunking parameters
- Dashboard Integration: Provide real-time quality monitoring
- Alert Systems: Implement automated alerting for quality issues
🧭 Navigation¶
Previous: Session 1 - Foundations →
Next: Session 3 - Advanced Patterns →