📝 Session 6: Building Atomic Components¶
📝 PARTICIPANT PATH CONTENT Prerequisites: Complete 🎯 Atomic Architecture Essentials Time Investment: 1.5-2 hours Outcome: Build reusable atomic components for data processing systems
Learning Outcomes¶
After completing this module, you will be able to:
- Create specialized atomic agents for specific data processing tasks
- Design clean interfaces for component integration
- Implement focused data transformation components
- Test atomic components for reliability and performance
Building Data Processing Components¶
Let's build reusable atomic components that handle specific data processing operations. Each component will demonstrate the single responsibility principle while providing clean interfaces for integration.
Component Creation Patterns¶
The key to building effective atomic components is focusing each one on a specific data processing operation. Here's how to create a specialized data transformation agent:
File: src/session6/data_transform_agent.py
from atomic_agents.agents import BaseAgent
from atomic_agents.lib.components.chat_memory import ChatMemory
class DataTransformProcessorAgent(BaseAgent):
"""Atomic agent for data transformation processing tasks"""
def __init__(self):
super().__init__(
agent_name="data_transform_processor",
system_prompt="""You are a data transformation specialist.
Focus on: schema transformation, data format conversion,
and field mapping operations.""",
memory=ChatMemory(max_messages=5),
tools=[],
max_tokens=400
)
This initialization establishes the foundation for our data transformation specialist. The system prompt clearly defines the agent's scope, while the limited memory keeps it lightweight for data processing efficiency.
Next, we add the core transformation methods that provide specific functionality:
def transform_schema(self, data_payload: str, target_schema: str) -> str:
"""Specialized schema transformation method"""
prompt = f"Transform data to match target schema:\nData: {data_payload}\nTarget: {target_schema}"
return self.run(prompt)
def convert_format(self, data_payload: str, target_format: str) -> str:
"""Specialized format conversion for data pipelines"""
prompt = f"Convert this data to {target_format} format:\n\n{data_payload}"
return self.run(prompt)
These methods provide focused functionality for specific data transformation needs, making the component highly reusable across different pipeline contexts.
Interface Design for Pipeline Integration¶
Creating clean, composable interfaces is crucial for seamless integration with distributed data processing systems. Let's design an interface that works well with streaming data:
from abc import ABC, abstractmethod
from typing import Dict, Any, List
class DataPipelineProcessor(ABC):
"""Abstract interface for data processing pipeline components"""
@abstractmethod
def process_data_stream(self, data_stream: Dict[str, Any]) -> Dict[str, Any]:
"""Process streaming data and return processed results"""
pass
This abstract interface defines the contract that our atomic agents must fulfill to participate in data processing pipelines.
Now let's implement this interface with our atomic agent:
class AtomicDataPipelineAgent(BaseAgent, DataPipelineProcessor):
"""Atomic agent implementing data processing pipeline interface"""
def __init__(self):
super().__init__(
agent_name="data_pipeline_processor",
system_prompt="Process and analyze streaming data for distributed systems",
memory=None, # Stateless for distributed processing
tools=[],
max_tokens=600
)
def process_data_stream(self, data_stream: Dict[str, Any]) -> Dict[str, Any]:
"""Implement the data processing pipeline interface"""
stream_str = str(data_stream)
analysis = self.run(f"Process this data stream: {stream_str}")
return {
"original_data_stream": data_stream,
"processing_results": analysis,
"processed_at": "timestamp_here",
"metadata": {
"pipeline_stage": "atomic_processing",
"data_quality_score": "calculated_score"
}
}
This implementation provides the standardized interface while maintaining the atomic agent's focused responsibility for data stream processing.
Specialized Component Examples¶
Let's build several specialized components that demonstrate different aspects of atomic agent design:
Data Validation Component¶
class DataValidationAgent(BaseAgent):
"""Specialized agent for data quality validation"""
def __init__(self):
super().__init__(
agent_name="data_quality_validator",
system_prompt="Ensure data quality and schema compliance",
memory=ChatMemory(max_messages=3),
tools=[]
)
def validate_schema_compliance(self, data: str, schema: str) -> Dict:
"""Check if data matches required schema"""
prompt = f"Validate data compliance:\nData: {data}\nSchema: {schema}"
result = self.run(prompt)
return {
"validation_result": result,
"compliant": "compliant" in result.lower(),
"validation_timestamp": "timestamp_here"
}
This validation component focuses solely on data quality concerns, making it reusable across different pipeline stages where validation is needed.
Data Aggregation Component¶
class DataAggregationAgent(BaseAgent):
"""Specialized agent for data aggregation operations"""
def __init__(self):
super().__init__(
agent_name="data_aggregation_specialist",
system_prompt="Perform data aggregation and summarization",
memory=None, # Stateless for aggregation operations
tools=[]
)
def aggregate_stream_data(self, data_streams: List[Dict]) -> Dict:
"""Aggregate multiple data streams"""
combined = {
"stream_items": data_streams,
"stream_count": len(data_streams),
"aggregation_type": "stream_combination"
}
analysis = self.run(f"Aggregate these data streams: {str(combined)}")
return {
"aggregated_data": analysis,
"source_stream_count": len(data_streams),
"aggregation_metadata": {"type": "multi_stream", "status": "completed"}
}
This aggregation component provides focused functionality for combining multiple data streams, essential for batch processing and stream windowing operations.
The following graph outlines how a typical Atomic Agent interacts with tools and LLMs (via user + system prompts).
Atomic LLM usage patterns
Component Coordination Patterns¶
Now let's see how these specialized components work together in a coordinated system:
class AtomicDataProcessingCoordinator:
"""Coordinate multiple atomic agents for data processing"""
def __init__(self):
# Initialize specialized data processing agents
self.transform_agent = DataTransformProcessorAgent()
self.validation_agent = DataValidationAgent()
self.pipeline_agent = AtomicDataPipelineAgent()
self.aggregation_agent = DataAggregationAgent()
def process_data_workflow(self, raw_data: str, schema: str) -> Dict:
"""Execute complete data processing workflow"""
# Step 1: Validate input data
validation_result = self.validation_agent.validate_schema_compliance(
raw_data, schema
)
if not validation_result.get("compliant", False):
return {"error": "Data validation failed", "details": validation_result}
# Step 2: Transform data format
transformed_data = self.transform_agent.convert_format(
raw_data, "standardized_json"
)
# Step 3: Process through pipeline
pipeline_result = self.pipeline_agent.process_data_stream({
"data": transformed_data,
"validation": validation_result
})
return {
"workflow_status": "completed",
"validation": validation_result,
"transformation": transformed_data,
"pipeline_processing": pipeline_result
}
This coordinator demonstrates how atomic components work together while maintaining their individual responsibilities and clean interfaces.
Testing Atomic Components¶
Validating atomic components is crucial for reliable data processing systems:
def test_atomic_data_components():
"""Test individual atomic data processing components"""
# Test data transformation
transform_agent = DataTransformProcessorAgent()
test_payload = "{'user_id': 123, 'event': 'page_view'}"
schema_result = transform_agent.transform_schema(
test_payload, "standard_event_schema"
)
assert len(schema_result) > 0
assert "user_id" in schema_result or "transformed" in schema_result.lower()
# Test data validation
validation_agent = DataValidationAgent()
validation_result = validation_agent.validate_schema_compliance(
test_payload, "event_schema"
)
assert "validation_result" in validation_result
# Test pipeline processing
pipeline_agent = AtomicDataPipelineAgent()
test_stream = {"stream_data": [1, 2, 3], "source": "test"}
pipeline_result = pipeline_agent.process_data_stream(test_stream)
assert "processing_results" in pipeline_result
assert "original_data_stream" in pipeline_result
print("✅ All atomic component tests passed!")
This testing approach validates each component individually, ensuring they meet their specific contracts before being used in larger systems.
Component Performance Optimization¶
For production data processing systems, optimize components for efficiency:
class OptimizedDataAgent(BaseAgent):
"""Performance-optimized atomic agent for high-throughput processing"""
def __init__(self, operation_type: str):
super().__init__(
agent_name=f"optimized_{operation_type}_agent",
system_prompt=f"Efficient {operation_type} processing specialist",
memory=None, # Stateless for maximum throughput
tools=[], # Minimal tool overhead
max_tokens=150 # Constrained for predictable performance
)
self.operation_type = operation_type
def process_efficiently(self, data: str) -> str:
"""Optimized processing method with minimal overhead"""
# Streamlined prompt for faster processing
return self.run(f"Process {self.operation_type}: {data[:200]}")
Key optimization strategies:
- Stateless Design: No memory reduces overhead
- Token Limits: Predictable response sizes
- Focused Prompts: Reduce processing complexity
- Input Truncation: Prevent excessive input processing
Building Custom Components¶
Now it's your turn to build custom atomic components. Consider these component types for your data processing needs:
Data Ingestion Components¶
- Stream Reader: Read from Kafka, Kinesis, or other streams
- Batch Loader: Handle file-based data ingestion
- API Connector: Connect to REST APIs and databases
Processing Components¶
- Data Cleaner: Remove invalid or corrupted data
- Field Mapper: Map fields between different schemas
- Data Enricher: Add additional context or calculated fields
Output Components¶
- Data Writer: Write to databases, files, or streams
- Report Generator: Create summary reports and metrics
- Alert Manager: Handle data quality alerts and notifications
Discussion¶
Practical Exercise¶
Build your own atomic component using this template:
class MyCustomDataAgent(BaseAgent):
"""Your custom atomic data processing agent"""
def __init__(self, custom_config: Dict):
super().__init__(
agent_name="my_custom_agent",
system_prompt="Define your agent's specific purpose here",
memory=None if custom_config.get("stateless", True) else ChatMemory(max_messages=5),
tools=[], # Add specific tools as needed
max_tokens=custom_config.get("max_tokens", 300)
)
self.config = custom_config
def process_data(self, input_data: Any) -> Any:
"""Implement your custom processing logic"""
# Your implementation here
pass
def validate_input(self, input_data: Any) -> bool:
"""Validate input meets your component's requirements"""
# Your validation logic here
pass
Next Steps¶
With atomic components mastered, you're ready to learn system assembly:
- 📝 System Assembly Practice - Putting components together into complete systems
For advanced implementation patterns:
- ⚙️ Advanced Orchestration - Complex pipeline orchestration
- ⚙️ Production Deployment - Enterprise deployment strategies
🧭 Navigation¶
Previous: Session 5 - Type-Safe Development →
Next: Session 7 - Agent Systems →