Session 6: Atomic Agents Modular Architecture¶
Building Composable, Type-Safe Agent Systems¶
Learning Outcomes¶
By the end of this session, you will be able to:
- Master atomic agent design principles using modular, composable architecture patterns
- Implement type-safe agent components with schema validation and context providers
- Build production-ready atomic systems with monitoring, error handling, and scaling
- Apply LEGO-like composition patterns for building complex agent workflows from simple components
- Deploy enterprise atomic architectures with service discovery and orchestration
Chapter Overview¶
Atomic Agents represent the next evolution in agent architecture, embracing the principle that complex systems should be built from simple, composable, and reusable components. Like LEGO blocks, each atomic agent is designed to be self-contained yet seamlessly interoperable with others.
This session introduces a paradigm shift from monolithic agent frameworks to atomic, modular architectures that scale from simple tasks to enterprise systems. We'll build agents that are predictable, testable, and maintainable.

The atomic approach transforms traditional agent development by emphasizing:
- Single Responsibility: Each agent handles one concern perfectly
- Type Safety: Schemas and validation ensure reliable data flow
- Composition: Complex behaviors emerge from simple component interactions
- Observability: Built-in monitoring and debugging capabilities
Part 1: Atomic Principles Foundation (15 minutes)¶
Understanding Atomic Agent Philosophy¶
Traditional agent frameworks often create monolithic systems where functionality is tightly coupled. Atomic agents embrace a different philosophy: building complex intelligence from simple, interoperable components.
The LEGO Metaphor¶
Think of atomic agents like LEGO blocks:
- Each block has a specific shape and purpose (single responsibility)
- Blocks connect through standardized interfaces (type-safe schemas)
- Complex structures emerge from simple combinations (compositional architecture)
- Blocks are reusable across different projects (modularity)
Step 1.1: Core Atomic Agent Structure¶
Let's start by defining the foundation of an atomic agent:
# From [`src/session6/atomic_foundation.py`](https://github.com/fwornle/agentic-ai-nano/blob/main/docs-content/01_frameworks/src/session6/atomic_foundation.py)
from typing import Any, Dict, List, Optional, TypeVar, Generic
from pydantic import BaseModel, Field, validator
from abc import ABC, abstractmethod
from datetime import datetime
import uuid
class AtomicContext(BaseModel):
"""Context container for atomic agent execution."""
request_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = Field(default_factory=datetime.utcnow)
user_id: Optional[str] = None
session_id: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class Config:
"""Pydantic configuration for JSON serialization."""
json_encoders = {datetime: lambda v: v.isoformat()}
The AtomicContext provides essential tracking information that flows through the entire agent pipeline. Every atomic operation receives context, enabling debugging, monitoring, and user session management.
Why This Matters: Context threading is crucial for production systems. It enables distributed tracing, user session management, and comprehensive logging across atomic components.
Step 1.2: Input/Output Schema Definition¶
Atomic agents enforce strict input/output contracts using Pydantic schemas:
# Schema foundation for type-safe operations
T_Input = TypeVar('T_Input', bound=BaseModel)
T_Output = TypeVar('T_Output', bound=BaseModel)
class AtomicAgent(Generic[T_Input, T_Output], ABC):
"""Base class for all atomic agents with type-safe I/O."""
def __init__(self, name: str, version: str = "1.0.0"):
self.name = name
self.version = version
self.agent_id = str(uuid.uuid4())
self._execution_count = 0
@abstractmethod
async def execute(self, input_data: T_Input, context: AtomicContext) -> T_Output:
"""Execute the atomic operation with type-safe inputs and outputs."""
pass
@abstractmethod
def get_input_schema(self) -> type[T_Input]:
"""Return the input schema class."""
pass
@abstractmethod
def get_output_schema(self) -> type[T_Output]:
"""Return the output schema class."""
pass
This generic base class ensures every atomic agent has well-defined inputs and outputs. The type system prevents runtime errors by catching schema mismatches at development time.
Step 1.3: Error Handling and Validation¶
Atomic agents must handle errors gracefully and provide detailed debugging information:
class AtomicError(Exception):
"""Base exception for atomic agent errors."""
def __init__(
self,
message: str,
agent_name: str,
context: Optional[AtomicContext] = None,
details: Optional[Dict[str, Any]] = None
):
super().__init__(message)
self.message = message
self.agent_name = agent_name
self.context = context
self.details = details or {}
self.timestamp = datetime.utcnow()
class ValidationError(AtomicError):
"""Schema validation error."""
pass
class ExecutionError(AtomicError):
"""Runtime execution error."""
pass
Key Design Decision: Structured exceptions with context provide detailed error information for debugging and monitoring. This enables proper error propagation in composed agent workflows.
Part 2: Building Your First Atomic Agent (20 minutes)¶
Creating a Text Processing Atomic Agent¶
Let's build our first atomic agent that demonstrates the core principles:
# From [`src/session6/text_processor_agent.py`](https://github.com/fwornle/agentic-ai-nano/blob/main/docs-content/01_frameworks/src/session6/text_processor_agent.py)
from typing import List
import re
from dataclasses import dataclass
class TextInput(BaseModel):
"""Input schema for text processing operations."""
content: str = Field(..., min_length=1, max_length=10000)
operation: str = Field(..., regex=r'^(summarize|extract_keywords|sentiment)$')
options: Dict[str, Any] = Field(default_factory=dict)
@validator('content')
def validate_content(cls, v):
"""Ensure content is not just whitespace."""
if not v.strip():
raise ValueError("Content cannot be empty or just whitespace")
return v.strip()
class TextOutput(BaseModel):
"""Output schema for text processing results."""
result: str
confidence: float = Field(..., ge=0.0, le=1.0)
processing_time_ms: int
word_count: int
metadata: Dict[str, Any] = Field(default_factory=dict)
These schemas define the exact contract for our text processing agent. Notice the comprehensive validation rules and metadata fields for monitoring and debugging.
Step 2.1: Implementing the Atomic Agent¶
class TextProcessorAgent(AtomicAgent[TextInput, TextOutput]):
"""Atomic agent for text processing operations."""
def __init__(self):
super().__init__("TextProcessor", "1.0.0")
async def execute(self, input_data: TextInput, context: AtomicContext) -> TextOutput:
"""Execute text processing with comprehensive error handling."""
start_time = datetime.utcnow()
try:
# Validate input schema
if not isinstance(input_data, TextInput):
raise ValidationError(
f"Invalid input type: expected TextInput, got {type(input_data)}",
self.name,
context
)
# Process based on operation type
result = await self._process_text(input_data.content, input_data.operation, input_data.options)
# Calculate processing metrics
processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
word_count = len(input_data.content.split())
# Update execution count
self._execution_count += 1
return TextOutput(
result=result["text"],
confidence=result["confidence"],
processing_time_ms=int(processing_time),
word_count=word_count,
metadata={
"agent_id": self.agent_id,
"execution_count": self._execution_count,
"operation": input_data.operation
}
)
except Exception as e:
raise ExecutionError(
f"Text processing failed: {str(e)}",
self.name,
context,
{"input_length": len(input_data.content)}
)
This implementation demonstrates key atomic agent principles:
- Type safety through schema validation
- Comprehensive error handling with structured exceptions
- Performance monitoring with execution metrics
- State tracking with execution counts
Step 2.2: Core Processing Logic¶
async def _process_text(self, content: str, operation: str, options: Dict[str, Any]) -> Dict[str, Any]:
"""Core text processing logic with operation switching."""
if operation == "summarize":
return await self._summarize(content, options)
elif operation == "extract_keywords":
return await self._extract_keywords(content, options)
elif operation == "sentiment":
return await self._analyze_sentiment(content, options)
else:
raise ValueError(f"Unknown operation: {operation}")
async def _summarize(self, content: str, options: Dict[str, Any]) -> Dict[str, Any]:
"""Simple extractive summarization."""
sentences = content.split('. ')
max_sentences = options.get('max_sentences', 3)
# Simple heuristic: take first and last sentences, plus longest middle sentence
if len(sentences) <= max_sentences:
summary = content
else:
summary_sentences = [sentences[0]] # First sentence
# Add longest sentence from middle
if len(sentences) > 2:
middle_sentences = sentences[1:-1]
longest = max(middle_sentences, key=len)
summary_sentences.append(longest)
# Add last sentence
if max_sentences > 2:
summary_sentences.append(sentences[-1])
summary = '. '.join(summary_sentences)
return {
"text": summary,
"confidence": 0.8 # Static confidence for demo
}
This processing logic demonstrates the atomic principle of focused functionality. Each method handles a specific operation with clear inputs and outputs.
Why This Matters: Single-purpose methods are easier to test, debug, and maintain. They can be composed into more complex operations while remaining predictable and reusable.
Part 3: Advanced Atomic Patterns (25 minutes)¶
Context Provider Pattern¶
Context providers supply external dependencies and configuration to atomic agents:
# From [`src/session6/context_providers.py`](https://github.com/fwornle/agentic-ai-nano/blob/main/docs-content/01_frameworks/src/session6/context_providers.py)
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class ContextProvider(ABC):
"""Abstract base class for context providers."""
@abstractmethod
async def provide(self, context: AtomicContext) -> Dict[str, Any]:
"""Provide context-specific resources."""
pass
@abstractmethod
def get_provider_type(self) -> str:
"""Return the type of context this provider supplies."""
pass
class DatabaseContextProvider(ContextProvider):
"""Provides database connections and queries."""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self._connection_pool = None
async def provide(self, context: AtomicContext) -> Dict[str, Any]:
"""Provide database connection with user context."""
# Initialize connection pool if needed
if self._connection_pool is None:
self._connection_pool = await self._create_pool()
return {
"db_connection": self._connection_pool.get_connection(),
"user_id": context.user_id,
"session_id": context.session_id,
"query_timeout": 30
}
def get_provider_type(self) -> str:
return "database"
async def _create_pool(self):
"""Create database connection pool (simplified implementation)."""
# In production, use proper connection pooling
return MockConnectionPool(self.connection_string)
class MockConnectionPool:
"""Mock connection pool for demonstration."""
def __init__(self, connection_string: str):
self.connection_string = connection_string
def get_connection(self):
return MockDatabaseConnection(self.connection_string)
class MockDatabaseConnection:
"""Mock database connection."""
def __init__(self, connection_string: str):
self.connection_string = connection_string
Context providers follow the dependency injection pattern, allowing atomic agents to receive external resources without tight coupling.
Step 3.1: Agent Composition Patterns¶
The power of atomic agents emerges through composition. Let's build a pipeline that chains multiple atomic agents:
# From [`src/session6/composition_engine.py`](https://github.com/fwornle/agentic-ai-nano/blob/main/docs-content/01_frameworks/src/session6/composition_engine.py)
from typing import List, Type, Any
import asyncio
class AtomicPipeline:
"""Compose multiple atomic agents into a processing pipeline."""
def __init__(self, name: str):
self.name = name
self.pipeline_id = str(uuid.uuid4())
self.agents: List[AtomicAgent] = []
self.context_providers: Dict[str, ContextProvider] = {}
def add_agent(self, agent: AtomicAgent) -> 'AtomicPipeline':
"""Add an agent to the pipeline (builder pattern)."""
self.agents.append(agent)
return self
def add_context_provider(self, provider: ContextProvider) -> 'AtomicPipeline':
"""Add a context provider to the pipeline."""
self.context_providers[provider.get_provider_type()] = provider
return self
async def execute(self, initial_input: Any, context: AtomicContext) -> Any:
"""Execute the entire pipeline with data flowing between agents."""
current_data = initial_input
execution_trace = []
# Enrich context with providers
enriched_context = await self._enrich_context(context)
for i, agent in enumerate(self.agents):
try:
step_start = datetime.utcnow()
# Execute current agent
current_data = await agent.execute(current_data, enriched_context)
# Record execution trace
step_duration = (datetime.utcnow() - step_start).total_seconds() * 1000
execution_trace.append({
"step": i + 1,
"agent": agent.name,
"duration_ms": step_duration,
"success": True
})
except AtomicError as e:
# Record failure and stop pipeline
execution_trace.append({
"step": i + 1,
"agent": agent.name,
"error": str(e),
"success": False
})
raise ExecutionError(
f"Pipeline failed at step {i + 1} ({agent.name}): {str(e)}",
self.name,
enriched_context,
{"execution_trace": execution_trace}
)
# Add execution trace to final result
if hasattr(current_data, 'metadata'):
current_data.metadata["pipeline_trace"] = execution_trace
return current_data
This pipeline pattern demonstrates how atomic agents can be composed into complex workflows while maintaining individual agent simplicity and error isolation.
Step 3.2: Parallel Execution Patterns¶
For independent operations, atomic agents can execute in parallel:
class AtomicParallelExecutor:
"""Execute multiple atomic agents in parallel."""
def __init__(self, max_concurrent: int = 5):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute_parallel(
self,
agent_tasks: List[tuple[AtomicAgent, Any]],
context: AtomicContext
) -> List[Any]:
"""Execute multiple agents concurrently with concurrency control."""
async def execute_with_semaphore(agent: AtomicAgent, input_data: Any):
async with self.semaphore:
return await agent.execute(input_data, context)
# Create tasks for all agents
tasks = [
execute_with_semaphore(agent, input_data)
for agent, input_data in agent_tasks
]
# Execute all tasks concurrently
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
# Separate successful results from exceptions
successful_results = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append({
"agent": agent_tasks[i][0].name,
"error": str(result)
})
else:
successful_results.append(result)
if errors:
raise ExecutionError(
f"Parallel execution had {len(errors)} failures",
"ParallelExecutor",
context,
{"errors": errors, "successful_count": len(successful_results)}
)
return successful_results
except Exception as e:
raise ExecutionError(
f"Parallel execution failed: {str(e)}",
"ParallelExecutor",
context
)
Key Benefits:
- Concurrency control with semaphores prevents resource exhaustion
- Error isolation allows partial success in parallel operations
- Performance optimization through concurrent execution of independent tasks
Step 3.3: CLI Integration Pattern¶
Atomic agents integrate seamlessly with command-line interfaces:
# From [`src/session6/atomic_cli.py`](https://github.com/fwornle/agentic-ai-nano/blob/main/docs-content/01_frameworks/src/session6/atomic_cli.py)
import click
import asyncio
import json
from pathlib import Path
@click.group()
@click.option('--config', default='config.json', help='Configuration file path')
@click.pass_context
def cli(ctx, config):
"""Atomic Agents CLI interface."""
ctx.ensure_object(dict)
# Load configuration
config_path = Path(config)
if config_path.exists():
with open(config_path) as f:
ctx.obj['config'] = json.load(f)
else:
ctx.obj['config'] = {}
@cli.command()
@click.option('--text', required=True, help='Text to process')
@click.option('--operation', type=click.Choice(['summarize', 'extract_keywords', 'sentiment']),
default='summarize', help='Processing operation')
@click.option('--output', help='Output file path')
@click.pass_context
def process_text(ctx, text, operation, output):
"""Process text using atomic text processor agent."""
async def run_processing():
# Create atomic context
context = AtomicContext(
user_id='cli-user',
metadata={'cli_command': 'process_text'}
)
# Create input
text_input = TextInput(
content=text,
operation=operation
)
# Execute agent
agent = TextProcessorAgent()
result = await agent.execute(text_input, context)
# Output results
output_data = {
'result': result.result,
'confidence': result.confidence,
'processing_time_ms': result.processing_time_ms,
'word_count': result.word_count,
'metadata': result.metadata
}
if output:
with open(output, 'w') as f:
json.dump(output_data, f, indent=2)
click.echo(f"Results written to {output}")
else:
click.echo(json.dumps(output_data, indent=2))
# Run async operation
asyncio.run(run_processing())
@cli.command()
@click.option('--config-file', required=True, help='Pipeline configuration file')
@click.option('--input-file', required=True, help='Input data file')
@click.option('--output-file', help='Output file path')
@click.pass_context
def run_pipeline(ctx, config_file, input_file, output_file):
"""Execute an atomic agent pipeline from configuration."""
async def run_pipeline_execution():
# Load pipeline configuration
with open(config_file) as f:
pipeline_config = json.load(f)
# Load input data
with open(input_file) as f:
input_data = json.load(f)
# Build pipeline from configuration (simplified)
pipeline = AtomicPipeline(pipeline_config.get('name', 'CLI Pipeline'))
# Add agents based on configuration
for agent_config in pipeline_config.get('agents', []):
if agent_config['type'] == 'text_processor':
pipeline.add_agent(TextProcessorAgent())
# Create context
context = AtomicContext(
user_id='cli-user',
metadata={
'cli_command': 'run_pipeline',
'config_file': config_file,
'input_file': input_file
}
)
# Execute pipeline
result = await pipeline.execute(input_data, context)
# Output results
if output_file:
with open(output_file, 'w') as f:
json.dump(result.dict(), f, indent=2)
click.echo(f"Pipeline results written to {output_file}")
else:
click.echo(json.dumps(result.dict(), indent=2))
# Run async operation
asyncio.run(run_pipeline_execution())
if __name__ == '__main__':
cli()
This CLI integration shows how atomic agents can be exposed through command-line interfaces, enabling easy automation and integration with existing workflows.
Part 4: Production Atomic Systems (20 minutes)¶
Enterprise Deployment Patterns¶
Production atomic systems require sophisticated orchestration and monitoring:
# From [`src/session6/production_orchestrator.py`](https://github.com/fwornle/agentic-ai-nano/blob/main/docs-content/01_frameworks/src/session6/production_orchestrator.py)
from typing import Dict, List, Any, Optional
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
@dataclass
class ServiceRegistration:
"""Service registration for atomic agent discovery."""
service_id: str
service_name: str
agent_type: str
endpoint: str
health_check_url: str
capabilities: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
registered_at: datetime = field(default_factory=datetime.utcnow)
last_health_check: Optional[datetime] = None
is_healthy: bool = True
class AtomicOrchestrator:
"""Production orchestrator for atomic agent services."""
def __init__(
self,
service_name: str = "atomic-orchestrator",
health_check_interval: int = 30
):
self.service_name = service_name
self.orchestrator_id = str(uuid.uuid4())
self.health_check_interval = health_check_interval
# Service registry
self.services: Dict[str, ServiceRegistration] = {}
self.service_lock = asyncio.Lock()
# Monitoring
self.metrics_collector = MetricsCollector()
self.logger = logging.getLogger(f"orchestrator.{service_name}")
# Background tasks
self._health_check_task: Optional[asyncio.Task] = None
self._shutdown_event = asyncio.Event()
async def start(self):
"""Start the orchestrator with health checking."""
self.logger.info(f"Starting atomic orchestrator {self.orchestrator_id}")
# Start background health checking
self._health_check_task = asyncio.create_task(self._health_check_loop())
self.logger.info("Orchestrator started successfully")
async def stop(self):
"""Gracefully shutdown the orchestrator."""
self.logger.info("Shutting down orchestrator...")
# Signal shutdown
self._shutdown_event.set()
# Wait for health check task to complete
if self._health_check_task:
await self._health_check_task
self.logger.info("Orchestrator shutdown complete")
async def register_service(self, registration: ServiceRegistration):
"""Register an atomic agent service."""
async with self.service_lock:
self.services[registration.service_id] = registration
self.logger.info(f"Registered service: {registration.service_name} ({registration.service_id})")
# Record registration metric
await self.metrics_collector.record_service_registration(registration)
async def unregister_service(self, service_id: str):
"""Unregister a service."""
async with self.service_lock:
if service_id in self.services:
service = self.services.pop(service_id)
self.logger.info(f"Unregistered service: {service.service_name} ({service_id})")
await self.metrics_collector.record_service_unregistration(service)
async def discover_services(
self,
agent_type: Optional[str] = None,
capabilities: Optional[List[str]] = None
) -> List[ServiceRegistration]:
"""Discover available services with optional filtering."""
async with self.service_lock:
services = list(self.services.values())
# Filter by agent type
if agent_type:
services = [s for s in services if s.agent_type == agent_type]
# Filter by capabilities
if capabilities:
services = [
s for s in services
if all(cap in s.capabilities for cap in capabilities)
]
# Only return healthy services
services = [s for s in services if s.is_healthy]
return services
The orchestrator provides service discovery, health checking, and metrics collection for atomic agent services.
Step 4.1: Health Monitoring and Metrics¶
class MetricsCollector:
"""Collect and aggregate metrics from atomic agents."""
def __init__(self):
self.metrics: Dict[str, Any] = {
'service_registrations': 0,
'service_unregistrations': 0,
'health_checks_performed': 0,
'health_checks_failed': 0,
'agent_executions': 0,
'agent_execution_errors': 0,
'average_execution_time_ms': 0.0
}
self.execution_times: List[float] = []
self.metrics_lock = asyncio.Lock()
async def record_service_registration(self, service: ServiceRegistration):
"""Record service registration metrics."""
async with self.metrics_lock:
self.metrics['service_registrations'] += 1
async def record_health_check(self, service_id: str, is_healthy: bool, response_time_ms: float):
"""Record health check results."""
async with self.metrics_lock:
self.metrics['health_checks_performed'] += 1
if not is_healthy:
self.metrics['health_checks_failed'] += 1
async def record_agent_execution(self, agent_name: str, execution_time_ms: float, success: bool):
"""Record agent execution metrics."""
async with self.metrics_lock:
self.metrics['agent_executions'] += 1
if not success:
self.metrics['agent_execution_errors'] += 1
# Update execution time metrics
self.execution_times.append(execution_time_ms)
# Keep only recent execution times (sliding window)
if len(self.execution_times) > 1000:
self.execution_times = self.execution_times[-1000:]
# Calculate average execution time
if self.execution_times:
self.metrics['average_execution_time_ms'] = sum(self.execution_times) / len(self.execution_times)
async def get_metrics_summary(self) -> Dict[str, Any]:
"""Get current metrics summary."""
async with self.metrics_lock:
return {
**self.metrics,
'metrics_collected_at': datetime.utcnow().isoformat(),
'recent_execution_count': len(self.execution_times)
}
Step 4.2: Health Check Implementation¶
async def _health_check_loop(self):
"""Background health checking for all registered services."""
import aiohttp
while not self._shutdown_event.is_set():
try:
# Get current services
async with self.service_lock:
services_to_check = list(self.services.values())
# Check each service
health_check_tasks = []
for service in services_to_check:
task = asyncio.create_task(self._check_service_health(service))
health_check_tasks.append(task)
# Wait for all health checks with timeout
if health_check_tasks:
await asyncio.wait(health_check_tasks, timeout=10.0)
except Exception as e:
self.logger.error(f"Health check loop error: {str(e)}")
# Wait for next check interval or shutdown
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=self.health_check_interval
)
# If shutdown event is set, exit loop
break
except asyncio.TimeoutError:
# Timeout is expected, continue with next health check
continue
async def _check_service_health(self, service: ServiceRegistration):
"""Check health of a single service."""
import aiohttp
start_time = datetime.utcnow()
is_healthy = False
try:
async with aiohttp.ClientSession() as session:
async with session.get(
service.health_check_url,
timeout=aiohttp.ClientTimeout(total=5.0)
) as response:
is_healthy = response.status == 200
except Exception as e:
self.logger.warning(f"Health check failed for {service.service_name}: {str(e)}")
is_healthy = False
# Calculate response time
response_time = (datetime.utcnow() - start_time).total_seconds() * 1000
# Update service status
async with self.service_lock:
if service.service_id in self.services:
self.services[service.service_id].is_healthy = is_healthy
self.services[service.service_id].last_health_check = datetime.utcnow()
# Record metrics
await self.metrics_collector.record_health_check(
service.service_id,
is_healthy,
response_time
)
if not is_healthy:
self.logger.warning(f"Service {service.service_name} is unhealthy")
Step 4.3: Load Balancing and Failover¶
class AtomicLoadBalancer:
"""Load balancer for atomic agent services."""
def __init__(self, orchestrator: AtomicOrchestrator):
self.orchestrator = orchestrator
self.round_robin_counters: Dict[str, int] = {}
async def route_request(
self,
agent_type: str,
input_data: Any,
context: AtomicContext,
strategy: str = "round_robin"
) -> Any:
"""Route request to available service using specified strategy."""
# Discover available services
services = await self.orchestrator.discover_services(agent_type=agent_type)
if not services:
raise ExecutionError(
f"No healthy services available for agent type: {agent_type}",
"LoadBalancer",
context
)
# Select service using strategy
if strategy == "round_robin":
selected_service = self._round_robin_select(agent_type, services)
elif strategy == "least_connections":
selected_service = self._least_connections_select(services)
else:
# Default to first available
selected_service = services[0]
# Execute request with failover
return await self._execute_with_failover(
selected_service,
services,
input_data,
context
)
def _round_robin_select(self, agent_type: str, services: List[ServiceRegistration]) -> ServiceRegistration:
"""Select service using round-robin strategy."""
if agent_type not in self.round_robin_counters:
self.round_robin_counters[agent_type] = 0
selected_index = self.round_robin_counters[agent_type] % len(services)
self.round_robin_counters[agent_type] += 1
return services[selected_index]
async def _execute_with_failover(
self,
primary_service: ServiceRegistration,
all_services: List[ServiceRegistration],
input_data: Any,
context: AtomicContext,
max_retries: int = 2
) -> Any:
"""Execute request with automatic failover."""
import aiohttp
services_to_try = [primary_service] + [s for s in all_services if s.service_id != primary_service.service_id]
for attempt, service in enumerate(services_to_try[:max_retries + 1]):
try:
# Make HTTP request to service
async with aiohttp.ClientSession() as session:
request_data = {
'input_data': input_data.dict() if hasattr(input_data, 'dict') else input_data,
'context': context.dict()
}
async with session.post(
f"{service.endpoint}/execute",
json=request_data,
timeout=aiohttp.ClientTimeout(total=30.0)
) as response:
if response.status == 200:
result_data = await response.json()
return result_data
else:
error_text = await response.text()
raise aiohttp.ClientError(f"HTTP {response.status}: {error_text}")
except Exception as e:
if attempt < len(services_to_try) - 1:
# Try next service
continue
else:
# Final attempt failed
raise ExecutionError(
f"All service attempts failed. Last error: {str(e)}",
"LoadBalancer",
context,
{"attempts": attempt + 1, "services_tried": [s.service_id for s in services_to_try[:attempt + 1]]}
)
Why This Matters: Production systems require resilience against service failures. Load balancing and failover ensure that atomic agent systems remain available even when individual services experience issues.
Self-Assessment: Atomic Agents Mastery (10 minutes)¶
Test your understanding of atomic agent architecture and implementation patterns:
Question 1: What are the four core principles of atomic agent design?
Show Answer
The four core principles are: 1. **Single Responsibility** - Each agent handles one specific concern perfectly 2. **Type Safety** - Schemas and validation ensure reliable data flow between components 3. **Composition** - Complex behaviors emerge from combining simple, interoperable components 4. **Observability** - Built-in monitoring, logging, and debugging capabilities throughout the system These principles ensure atomic agents are predictable, testable, maintainable, and scalable from simple tasks to enterprise systems.Question 2: How does the AtomicContext class improve system observability and debugging?
Show Answer
AtomicContext provides: - **Request tracking** with unique request IDs for distributed tracing - **User session management** linking operations to specific users and sessions - **Timestamp tracking** for performance monitoring and debugging - **Metadata propagation** for additional context throughout the pipeline - **Error correlation** enabling detailed error analysis across composed agents This context threading enables comprehensive logging, monitoring, and debugging across atomic components in production systems.Question 3: What makes the AtomicAgent generic type system superior to traditional agent frameworks?
Show Answer
The generic type system (`AtomicAgent[T_Input, T_Output]`) provides: - **Compile-time type checking** preventing runtime schema mismatches - **IDE support** with autocomplete and type hints for faster development - **Self-documenting interfaces** through explicit input/output schemas - **Validation guarantees** ensuring data integrity at component boundaries - **Testability** through clear contracts and mocking capabilities This approach catches errors during development rather than production and makes agent composition predictable and reliable.Question 4: How does the AtomicPipeline pattern differ from traditional sequential processing?
Show Answer
AtomicPipeline provides: - **Type-safe data flow** with validation at each step - **Execution tracing** recording performance and success/failure of each step - **Error isolation** preventing cascading failures and providing detailed error context - **Context enrichment** through dependency injection of external resources - **Rollback capabilities** through structured error handling and state management Unlike traditional pipelines, atomic pipelines provide comprehensive observability, error handling, and type safety throughout the execution flow.Question 5: What are the benefits of the context provider pattern in atomic agent systems?
Show Answer
Context providers enable: - **Dependency injection** decoupling agents from external resources - **Environment-specific configuration** (development, staging, production) - **Resource pooling** sharing expensive resources like database connections - **Security isolation** managing credentials and access permissions centrally - **Testing simplification** through mock providers and test fixtures This pattern makes atomic agents environment-agnostic and easier to test, deploy, and maintain across different contexts.Question 6: How does the AtomicOrchestrator support production deployment requirements?
Show Answer
The orchestrator provides: - **Service discovery** enabling dynamic service registration and discovery - **Health monitoring** with automatic health checks and failure detection - **Load balancing** distributing requests across available service instances - **Metrics collection** tracking performance, errors, and system health - **Graceful shutdown** handling service lifecycle management properly This infrastructure supports enterprise-grade deployments with high availability, monitoring, and scalability requirements.Question 7: Why is the parallel execution pattern important for atomic agent performance?
Show Answer
Parallel execution provides: - **Concurrency control** preventing resource exhaustion through semaphores - **Performance optimization** executing independent operations simultaneously - **Error isolation** allowing partial success when some operations fail - **Resource efficiency** maximizing utilization of available compute resources - **Scalability** supporting high-throughput scenarios with multiple concurrent requests For atomic systems processing independent operations, parallel execution can dramatically improve performance while maintaining error isolation and resource management.Question 8: How does the CLI integration pattern enhance atomic agent usability?
Show Answer
CLI integration provides: - **Developer productivity** through command-line automation and testing - **CI/CD integration** enabling automated pipelines and deployments - **Configuration management** through file-based pipeline definitions - **Debugging capabilities** with detailed output and error reporting - **Scripting support** for complex workflows and batch operations This makes atomic agents accessible to developers, DevOps teams, and automated systems without requiring custom integration code.Question 9: What production concerns does the atomic agent architecture address?
Show Answer
The architecture addresses: - **Reliability** through structured error handling and failover mechanisms - **Scalability** via service discovery, load balancing, and horizontal scaling - **Observability** with comprehensive metrics, logging, and tracing - **Maintainability** through clear interfaces, single responsibility, and composability - **Security** via context providers and centralized credential management - **Performance** through parallel execution, connection pooling, and monitoring These patterns ensure atomic agent systems meet enterprise production requirements for reliability, scale, and maintainability.Question 10: How do atomic agents compare to previous frameworks covered in this module?
Show Answer
Compared to previous frameworks: **vs. Bare Metal (Session 1):** - Atomic agents provide structured composition vs. custom implementations - Built-in type safety and validation vs. manual error handling - Production-ready patterns vs. educational foundations **vs. LangChain (Session 2):** - Explicit schemas vs. flexible but less predictable interfaces - Atomic composition vs. chain-based workflows - Type safety vs. runtime validation **vs. PydanticAI (Session 5):** - Microservice architecture vs. monolithic agents - Service discovery and orchestration vs. single-agent deployments - Horizontal scaling vs. vertical optimization Atomic agents excel at building distributed, scalable systems from simple, composable components with enterprise-grade reliability and observability.Summary: Mastering Atomic Agent Architecture¶
This session introduced atomic agent architecture as the next evolution in agent system design. You learned to build composable, type-safe systems that scale from simple tasks to enterprise deployments.
Key Achievements:
- Atomic Design Principles: Single responsibility, type safety, composition, and observability
- Type-Safe Composition: Building complex workflows from simple, validated components
- Production Patterns: Orchestration, service discovery, health monitoring, and load balancing
- Enterprise Features: Context providers, parallel execution, CLI integration, and metrics collection
Production Benefits:
- Reliability: Structured error handling and failover mechanisms
- Scalability: Horizontal scaling through service orchestration
- Maintainability: Clear interfaces and atomic component isolation
- Observability: Comprehensive monitoring and debugging capabilities
Next Steps:
- Practice building atomic agent pipelines for specific use cases
- Implement custom context providers for your infrastructure
- Deploy atomic systems with monitoring and alerting
- Explore advanced composition patterns for complex workflows
Atomic agents represent a paradigm shift toward modular, composable agent architecture that maintains simplicity while enabling enterprise-scale complexity. The patterns learned here form the foundation for building robust, maintainable agent systems that grow with your requirements.
🧭 Navigation¶
Previous: Session 5 - Type-Safe Development →
Next: Session 7 - Agent Systems →