Session 6 - Module A: Advanced Composition Patterns (35 minutes)¶
Prerequisites: Session 6 Core Section Complete
Target Audience: Implementers building sophisticated atomic systems
Cognitive Load: 4 advanced concepts
Module Overview¶
This module explores sophisticated atomic agent composition patterns including pipeline orchestration, parallel processing, dynamic agent assembly, and CLI integration for DevOps workflows. You'll learn to build complex, maintainable systems from simple atomic components using proven architectural patterns.
Learning Objectives¶
By the end of this module, you will:
- Design sequential and parallel agent pipelines with robust error handling
- Implement dynamic agent composition systems that adapt to runtime requirements
- Create CLI tools for atomic agent management and DevOps integration
- Build sophisticated context-aware agent orchestration systems
Part 1: Agent Pipeline Orchestration (20 minutes)¶
Sequential Processing Pipelines¶
🗂️ File: src/session6/advanced_composition.py
- Sophisticated pipeline patterns
Complex atomic systems emerge from combining simple agents through well-designed composition patterns. Let's build this step by step, starting with the foundational imports and types.
Step 1: Foundation - Imports and Type System¶
First, we establish the type system and imports that will power our sophisticated pipeline:
from typing import List, Dict, Any, TypeVar, Generic, Optional, Callable, Protocol
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import asyncio
import logging
from datetime import datetime
from enum import Enum
from atomic_schemas import TaskInput, TaskOutput, AnalysisInput, AnalysisOutput
T_Input = TypeVar('T_Input')
T_Output = TypeVar('T_Output')
T_Intermediate = TypeVar('T_Intermediate')
These type variables enable us to build type-safe pipelines that can handle any data transformation while maintaining compile-time checks.
Step 2: Pipeline Behavior Enums¶
Next, we define the core enums that control pipeline behavior:
class ErrorPolicy(Enum):
"""Pipeline error handling policies"""
STOP = "stop" # Stop pipeline on first error
SKIP = "skip" # Skip failed stage and continue
RETRY = "retry" # Retry failed stage with backoff
class PipelineStatus(Enum):
"""Pipeline execution status"""
PENDING = "pending" # Pipeline waiting to start
RUNNING = "running" # Pipeline currently executing
SUCCESS = "success" # Pipeline completed successfully
FAILED = "failed" # Pipeline failed
CANCELLED = "cancelled" # Pipeline was cancelled
These enums provide clear, type-safe ways to handle different execution scenarios and track pipeline state throughout its lifecycle.
Step 3: Stage Configuration System¶
Now we build the configuration system that makes each pipeline stage configurable and resilient:
@dataclass
class StageConfiguration:
"""Configuration for pipeline stage"""
retry_count: int = 3
timeout_seconds: int = 60
error_policy: ErrorPolicy = ErrorPolicy.RETRY
def __post_init__(self):
if self.retry_count < 0:
raise ValueError("retry_count must be non-negative")
if self.timeout_seconds <= 0:
raise ValueError("timeout_seconds must be positive")
This configuration class encapsulates stage-level resilience settings. The validation in __post_init__
ensures we never have invalid configurations that could break pipeline execution.
Step 4: Pipeline Stage Definition¶
Next, we define what constitutes a single stage in our pipeline:
@dataclass
class PipelineStage:
"""Definition of a pipeline processing stage"""
stage_id: str
agent: Any # AtomicAgent instance
stage_name: str
description: str
config: StageConfiguration = field(default_factory=StageConfiguration)
def __post_init__(self):
if not self.stage_id:
raise ValueError("stage_id cannot be empty")
if not self.stage_name:
raise ValueError("stage_name cannot be empty")
Each stage wraps an atomic agent with metadata and configuration. This design allows us to treat any atomic agent as a pipeline component with consistent error handling and monitoring.
Step 5: Pipeline Context and Error Tracking¶
The pipeline context acts as the "memory" of our pipeline, tracking everything that happens during execution:
@dataclass
class PipelineContext:
"""Context passed through pipeline execution"""
pipeline_id: str
execution_start: datetime
stage_history: List[Dict[str, Any]] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
error_log: List[Dict[str, Any]] = field(default_factory=list)
status: PipelineStatus = PipelineStatus.PENDING
def add_error(self, error: str, stage_id: Optional[str] = None):
"""Add error to error log with timestamp"""
self.error_log.append({
"error": error,
"stage_id": stage_id,
"timestamp": datetime.now().isoformat()
})
This context object travels with the pipeline execution, collecting a complete audit trail. The add_error
method ensures all errors are timestamped and trackable to specific stages.
Step 6: Exception Handling and Protocols¶
We need proper exception handling and extensibility protocols:
class PipelineExecutionException(Exception):
"""Exception raised during pipeline execution"""
def __init__(self, message: str, stage_id: Optional[str] = None,
original_error: Optional[Exception] = None):
super().__init__(message)
self.stage_id = stage_id
self.original_error = original_error
This custom exception preserves context about where the failure occurred and what caused it, making debugging much easier.
Step 7: Middleware and Metrics Protocols¶
Next, we define the protocols that make our pipeline extensible:
class MiddlewareProtocol(Protocol):
"""Protocol for pipeline middleware"""
async def __call__(self, data: Any, stage: PipelineStage,
context: PipelineContext) -> Any:
"""Process data through middleware"""
...
class MetricsCollector(ABC):
"""Abstract base class for metrics collection"""
@abstractmethod
async def record_stage_execution(self, stage_id: str, duration: float, status: str):
"""Record stage execution metrics"""
pass
@abstractmethod
async def record_pipeline_execution(self, pipeline_id: str, duration: float, status: str):
"""Record pipeline execution metrics"""
pass
The middleware protocol allows you to inject cross-cutting concerns (logging, authentication, transformation) at any stage. The metrics collector ensures we can plug in any monitoring system.
Step 8: Default Metrics Implementation¶
We provide a simple but effective default metrics collector:
class DefaultMetricsCollector(MetricsCollector):
"""Default implementation of metrics collector"""
def __init__(self):
self.metrics = {}
self.logger = logging.getLogger(__name__)
async def record_stage_execution(self, stage_id: str, duration: float, status: str):
"""Record stage execution metrics"""
if stage_id not in self.metrics:
self.metrics[stage_id] = []
self.metrics[stage_id].append({
"duration": duration,
"status": status,
"timestamp": datetime.now().isoformat()
})
self.logger.info(f"Stage {stage_id} completed in {duration:.2f}s with status {status}")
async def record_pipeline_execution(self, pipeline_id: str, duration: float, status: str):
"""Record pipeline execution metrics"""
self.logger.info(f"Pipeline {pipeline_id} completed in {duration:.2f}s with status {status}")
This implementation stores metrics in memory and logs to the standard logging system. In production, you might replace this with a system that sends to Prometheus, DataDog, or your monitoring platform of choice.
Step 9: Pipeline Class Foundation¶
Now we build the main pipeline orchestrator, starting with its constructor and basic stage management:
class AdvancedAtomicPipeline:
"""Sophisticated pipeline orchestration with SOLID principles adherence"""
def __init__(self, pipeline_id: str, metrics_collector: Optional[MetricsCollector] = None):
if not pipeline_id:
raise ValueError("pipeline_id cannot be empty")
self.pipeline_id = pipeline_id
self._stages: List[PipelineStage] = []
self._middleware_functions: List[MiddlewareProtocol] = []
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self._metrics_collector = metrics_collector or DefaultMetricsCollector()
The constructor establishes the pipeline's identity and core components. Notice how we inject the metrics collector as a dependency, following the Dependency Inversion Principle.
Step 10: Fluent Interface for Pipeline Building¶
Next, we add methods for building pipelines with a fluent interface:
def add_stage(self, stage: PipelineStage) -> 'AdvancedAtomicPipeline':
"""Add a processing stage to the pipeline"""
if not isinstance(stage, PipelineStage):
raise TypeError("stage must be an instance of PipelineStage")
self._stages.append(stage)
return self # Enable fluent interface
def add_middleware(self, middleware_func: MiddlewareProtocol) -> 'AdvancedAtomicPipeline':
"""Add middleware function for cross-cutting concerns"""
self._middleware_functions.append(middleware_func)
return self
@property
def stages(self) -> List[PipelineStage]:
"""Get read-only access to stages"""
return self._stages.copy()
The fluent interface allows you to chain method calls: pipeline.add_stage(stage1).add_stage(stage2).add_middleware(auth_middleware)
. This makes pipeline construction both readable and concise.
Step 11: Main Pipeline Execution Logic¶
The heart of our pipeline is the execute method. Let's break it down into understandable parts:
async def execute(self, initial_input: Any,
context: Optional[PipelineContext] = None) -> Dict[str, Any]:
"""Execute the complete pipeline with comprehensive monitoring"""
# Set up execution context
if context is None:
context = PipelineContext(
pipeline_id=self.pipeline_id,
execution_start=datetime.now()
)
context.status = PipelineStatus.RUNNING
pipeline_start_time = datetime.now()
We start by ensuring we have a proper execution context to track our pipeline's journey.
Step 12: Stage Execution Loop Setup¶
Next comes the core execution loop setup and validation:
try:
current_data = initial_input
execution_results = []
if not self._stages:
raise PipelineExecutionException("Pipeline has no stages to execute")
# Execute each stage in sequence
for stage_index, stage in enumerate(self._stages):
try:
stage_result = await self._execute_single_stage(
stage, current_data, context, stage_index
)
execution_results.append(stage_result)
This initializes the execution environment and begins processing each stage.
Step 12b: Stage Result Processing¶
For each stage result, we handle success and error cases according to configured policies:
# Handle stage result based on error policy
if stage_result["status"] == "success":
current_data = stage_result["output"]
await self._metrics_collector.record_stage_execution(
stage.stage_id, stage_result["execution_time"], "success"
)
elif stage_result["status"] == "error":
await self._metrics_collector.record_stage_execution(
stage.stage_id, stage_result["execution_time"], "error"
)
if not await self._handle_stage_error(stage, stage_result, context):
# Error handling determined we should stop
context.status = PipelineStatus.FAILED
break
The loop processes each stage sequentially, handling success and error cases according to the configured error policies.
Step 13: Pipeline Completion and Metrics¶
Finally, we handle pipeline completion and record comprehensive metrics:
except PipelineExecutionException as e:
context.add_error(str(e), e.stage_id)
if stage.config.error_policy != ErrorPolicy.SKIP:
context.status = PipelineStatus.FAILED
break
# Determine final status
if context.status == PipelineStatus.RUNNING:
context.status = PipelineStatus.SUCCESS
# Record pipeline metrics
total_execution_time = (datetime.now() - pipeline_start_time).total_seconds()
await self._metrics_collector.record_pipeline_execution(
self.pipeline_id, total_execution_time, context.status.value
)
return self._build_execution_result(
current_data, execution_results, context, total_execution_time
)
Notice how we ensure metrics are recorded even when exceptions occur, providing complete observability into pipeline behavior.
Step 14: Exception Recovery¶
The pipeline also handles unexpected exceptions gracefully:
except Exception as e:
context.status = PipelineStatus.FAILED
context.add_error(f"Pipeline execution failed: {str(e)}")
total_execution_time = (datetime.now() - pipeline_start_time).total_seconds()
await self._metrics_collector.record_pipeline_execution(
self.pipeline_id, total_execution_time, "failed"
)
raise PipelineExecutionException(
f"Pipeline {self.pipeline_id} execution failed",
original_error=e
)
This ensures that even unexpected failures are properly logged and reported with metrics.
Step 15: Single Stage Execution with Middleware¶
Each stage execution involves middleware processing and detailed tracking:
async def _execute_single_stage(self, stage: PipelineStage, input_data: Any,
context: PipelineContext, stage_index: int) -> Dict[str, Any]:
"""Execute a single pipeline stage with middleware and error handling"""
stage_start_time = datetime.now()
current_data = input_data
try:
# Apply middleware before stage execution
for middleware in self._middleware_functions:
current_data = await middleware(current_data, stage, context)
# Execute stage with resilience
stage_result = await self._execute_stage_with_resilience(
stage, current_data, context
)
The middleware chain allows for cross-cutting concerns like authentication, logging, or data transformation to be applied consistently across all stages.
Step 16: Stage Result Recording and Error Handling¶
We maintain detailed records of each stage execution:
# Record stage execution with comprehensive metrics
stage_duration = (datetime.now() - stage_start_time).total_seconds()
stage_record = {
"stage_index": stage_index,
"stage_id": stage.stage_id,
"stage_name": stage.stage_name,
"execution_time": stage_duration,
"status": stage_result.get("status", "unknown"),
"input_size": len(str(current_data)),
"output_size": len(str(stage_result.get("output", ""))),
"timestamp": stage_start_time.isoformat(),
"output": stage_result.get("output"),
"error": stage_result.get("error")
}
context.stage_history.append(stage_record)
return stage_record
Every stage execution, successful or failed, is recorded with timing and size metrics. This provides valuable insights for optimization and debugging.
Step 17: Stage Exception Handling¶
Even when individual stages fail, we capture comprehensive error information:
except Exception as e:
# Even failures get recorded for debugging
stage_duration = (datetime.now() - stage_start_time).total_seconds()
error_record = {
"stage_index": stage_index,
"stage_id": stage.stage_id,
"stage_name": stage.stage_name,
"execution_time": stage_duration,
"status": "error",
"error": str(e),
"timestamp": stage_start_time.isoformat()
}
context.stage_history.append(error_record)
raise PipelineExecutionException(
f"Stage {stage.stage_id} execution failed: {str(e)}",
stage_id=stage.stage_id,
original_error=e
)
This detailed error recording helps with debugging and understanding where pipeline failures occur.
Step 18: Error Policy Implementation¶
How we handle stage failures depends on the configured error policy:
async def _handle_stage_error(self, stage: PipelineStage, stage_result: Dict[str, Any],
context: PipelineContext) -> bool:
"""Handle stage error based on error policy. Returns True if pipeline should continue."""
error_policy = stage.config.error_policy
if error_policy == ErrorPolicy.STOP:
self.logger.error(f"Pipeline stopped due to stage {stage.stage_id} failure")
return False
elif error_policy == ErrorPolicy.SKIP:
self.logger.warning(f"Skipping failed stage: {stage.stage_id}")
return True
elif error_policy == ErrorPolicy.RETRY:
# Retries are handled in _execute_stage_with_resilience
self.logger.error(f"Stage {stage.stage_id} failed after retries")
return False
return False
This method implements the three error policies: STOP halts the pipeline, SKIP continues to the next stage, and RETRY attempts the stage again with exponential backoff.
Step 19: Results Assembly¶
After pipeline execution, we assemble a comprehensive result object:
def _build_execution_result(self, final_output: Any, execution_results: List[Dict[str, Any]],
context: PipelineContext, total_execution_time: float) -> Dict[str, Any]:
"""Build the final execution result"""
return {
"pipeline_id": self.pipeline_id,
"final_output": final_output,
"status": context.status.value,
"execution_summary": {
"total_stages": len(self._stages),
"successful_stages": len([r for r in execution_results if r["status"] == "success"]),
"failed_stages": len([r for r in execution_results if r["status"] == "error"]),
"total_execution_time": total_execution_time,
"average_stage_time": total_execution_time / len(self._stages) if self._stages else 0
},
"stage_details": execution_results,
"context": context,
"performance_metrics": self._calculate_performance_metrics(execution_results)
}
This result object provides everything needed for monitoring, debugging, and optimization - from high-level success metrics to detailed stage-by-stage execution data.
Step 20: Resilient Stage Execution with Retries¶
The most sophisticated part of our pipeline is the resilient execution system:
async def _execute_stage_with_resilience(self, stage: PipelineStage,
input_data: Any,
context: PipelineContext) -> Dict[str, Any]:
"""Execute stage with retry logic and timeout handling"""
retry_count = stage.config.retry_count
timeout_seconds = stage.config.timeout_seconds
last_error = None
# Retry loop with exponential backoff
for attempt in range(retry_count):
try:
# Execute stage with timeout protection
stage_task = asyncio.create_task(stage.agent.process(input_data))
output = await asyncio.wait_for(stage_task, timeout=timeout_seconds)
if attempt > 0:
self.logger.info(f"Stage {stage.stage_id} succeeded on attempt {attempt + 1}")
return {
"status": "success",
"output": output,
"attempt": attempt + 1,
"retry_needed": attempt > 0
}
The retry mechanism uses asyncio.wait_for
to enforce timeouts and tracks which attempt succeeded.
Step 21: Timeout Handling in Retries¶
When timeout errors occur during retry attempts, we handle them gracefully:
except asyncio.TimeoutError as e:
last_error = e
error_msg = f"Stage {stage.stage_id} timed out after {timeout_seconds}s"
self.logger.warning(f"{error_msg} (attempt {attempt + 1}/{retry_count})")
if attempt == retry_count - 1: # Last attempt
context.add_error(error_msg, stage.stage_id)
return {
"status": "error",
"error": error_msg,
"error_type": "timeout",
"attempt": attempt + 1,
"retry_exhausted": True
}
Timeout errors are logged with detailed context about which attempt failed and why.
Step 21b: General Exception Handling in Retries¶
For all other exceptions, we provide comprehensive error tracking:
except Exception as e:
last_error = e
error_msg = f"Stage {stage.stage_id} failed: {str(e)}"
self.logger.warning(f"{error_msg} (attempt {attempt + 1}/{retry_count})")
if attempt == retry_count - 1: # Last attempt
context.add_error(error_msg, stage.stage_id)
return {
"status": "error",
"error": error_msg,
"error_type": type(e).__name__,
"attempt": attempt + 1,
"retry_exhausted": True
}
Each retry attempt is logged, and we distinguish between timeout errors and other exceptions for better debugging.
Step 22: Exponential Backoff Implementation¶
The retry system implements exponential backoff to avoid overwhelming failing services:
# Exponential backoff before retry
if attempt < retry_count - 1: # Don't wait after last attempt
backoff_time = min(2 ** attempt, 10) # Max 10 seconds
self.logger.debug(f"Waiting {backoff_time}s before retry {attempt + 2}")
await asyncio.sleep(backoff_time)
# Safety net (should never be reached)
return {
"status": "error",
"error": f"Unexpected retry loop exit: {str(last_error)}" if last_error else "Unknown error",
"retry_exhausted": True
}
The exponential backoff (2^attempt seconds, capped at 10) prevents overwhelming failing services while giving them time to recover.
Step 23: Parallel Processing Foundation¶
Now let's build the parallel processing capabilities. We start with configuration and result classes:
class ConcurrencyConfiguration:
"""Configuration for parallel processing"""
def __init__(self, max_concurrent: int = 5, timeout_seconds: int = 300):
if max_concurrent <= 0:
raise ValueError("max_concurrent must be positive")
if timeout_seconds <= 0:
raise ValueError("timeout_seconds must be positive")
self.max_concurrent = max_concurrent
self.timeout_seconds = timeout_seconds
This configuration class controls the concurrency limits, preventing resource exhaustion while maximizing throughput.
Step 24: Batch Result Analytics¶
The BatchResult class provides rich analytics for parallel executions:
class BatchResult:
"""Result of batch processing operation"""
def __init__(self, batch_id: str, results: List[Any], metrics: Dict[str, Any]):
self.batch_id = batch_id
self.results = results
self.metrics = metrics
self.timestamp = datetime.now()
@property
def success_count(self) -> int:
"""Count of successful results"""
return len([r for r in self.results if not isinstance(r, Exception) and r.get("status") == "success"])
@property
def failure_count(self) -> int:
"""Count of failed results"""
return len(self.results) - self.success_count
@property
def success_rate(self) -> float:
"""Success rate as a percentage"""
if not self.results:
return 0.0
return self.success_count / len(self.results)
These computed properties provide instant insights into batch execution quality without needing to manually calculate success rates.
Step 25: Parallel Processor Setup¶
The ParallelAtomicProcessor handles concurrent execution of multiple agents:
class ParallelAtomicProcessor:
"""Advanced parallel processing with load balancing and fault tolerance"""
def __init__(self, config: ConcurrencyConfiguration,
metrics_collector: Optional[MetricsCollector] = None):
self._config = config
self._semaphore = asyncio.Semaphore(config.max_concurrent)
self._metrics_collector = metrics_collector or DefaultMetricsCollector()
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
The semaphore is the key to controlling concurrency - it ensures we never exceed the configured maximum concurrent agents, preventing resource exhaustion.
Step 26: Batch Processing Orchestration¶
The main batch processing method coordinates parallel execution:
async def process_batch(self, agent_input_pairs: List[tuple],
batch_context: Optional[Dict[str, Any]] = None) -> BatchResult:
"""Process multiple agent-input pairs with sophisticated load balancing"""
if not agent_input_pairs:
raise ValueError("agent_input_pairs cannot be empty")
# Set up batch context with unique ID
if batch_context is None:
batch_context = {"batch_id": f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"}
batch_id = batch_context["batch_id"]
batch_start_time = datetime.now()
We start by establishing the batch context and timing.
Step 27: Task Creation and Execution¶
Next, we create and execute all tasks in parallel:
try:
# Create processing tasks with semaphore control
tasks = [
self._process_with_semaphore(agent, input_data, f"task_{i}", batch_context)
for i, (agent, input_data) in enumerate(agent_input_pairs)
]
# Execute all tasks with timeout and progress monitoring
results = await asyncio.wait_for(
self._execute_with_monitoring(tasks, batch_context),
timeout=self._config.timeout_seconds
)
# Calculate comprehensive batch metrics
batch_duration = (datetime.now() - batch_start_time).total_seconds()
metrics = self._calculate_batch_metrics(results, batch_duration, len(agent_input_pairs))
# Record successful batch execution
await self._metrics_collector.record_pipeline_execution(
batch_id, batch_duration, "success" if metrics["success_rate"] > 0.5 else "failed"
)
return BatchResult(batch_id, results, metrics)
All tasks are created upfront and then executed with monitoring and timeout protection.
Step 28: Batch Error Handling¶
We handle various failure scenarios gracefully:
except asyncio.TimeoutError:
batch_duration = (datetime.now() - batch_start_time).total_seconds()
await self._metrics_collector.record_pipeline_execution(
batch_id, batch_duration, "timeout"
)
raise PipelineExecutionException(
f"Batch {batch_id} timed out after {self._config.timeout_seconds}s"
)
except Exception as e:
batch_duration = (datetime.now() - batch_start_time).total_seconds()
await self._metrics_collector.record_pipeline_execution(
batch_id, batch_duration, "error"
)
raise PipelineExecutionException(
f"Batch {batch_id} execution failed: {str(e)}",
original_error=e
)
Even when batches fail, we ensure metrics are recorded for observability.
Step 29: Semaphore-Controlled Task Execution¶
Each individual task executes within semaphore constraints:
async def _process_with_semaphore(self, agent: Any, input_data: Any,
task_id: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Process single agent with semaphore-controlled concurrency"""
async with self._semaphore:
task_start_time = datetime.now()
try:
result = await agent.process(input_data)
task_duration = (datetime.now() - task_start_time).total_seconds()
# Record individual task success
await self._metrics_collector.record_stage_execution(
task_id, task_duration, "success"
)
return {
"task_id": task_id,
"status": "success",
"result": result,
"execution_time": task_duration,
"agent_id": getattr(agent, 'agent_id', 'unknown'),
"timestamp": task_start_time.isoformat()
}
The async with self._semaphore:
ensures only the configured number of agents run concurrently, preventing resource exhaustion.
Step 30: Task-Level Error Handling¶
Individual task failures are handled gracefully:
except Exception as e:
task_duration = (datetime.now() - task_start_time).total_seconds()
self.logger.error(f"Task {task_id} failed: {str(e)}")
# Record individual task failure for monitoring
await self._metrics_collector.record_stage_execution(
task_id, task_duration, "error"
)
return {
"task_id": task_id,
"status": "error",
"error": str(e),
"error_type": type(e).__name__,
"execution_time": task_duration,
"agent_id": getattr(agent, 'agent_id', 'unknown'),
"timestamp": task_start_time.isoformat()
}
Task failures don't stop the batch - they're recorded and the batch continues with other tasks.
Step 31: Progress Monitoring and Completion Tracking¶
The monitoring system provides real-time feedback on batch progress:
async def _execute_with_monitoring(self, tasks: List, context: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Execute tasks with progress monitoring and graceful error handling"""
results = []
completed = 0
total_tasks = len(tasks)
# Log batch start for observability
self.logger.info(f"Starting batch execution of {total_tasks} tasks with max concurrency {self._config.max_concurrent}")
# Use asyncio.as_completed for real-time progress monitoring
for coro in asyncio.as_completed(tasks):
try:
result = await coro
results.append(result)
completed += 1
# Log progress at 10% intervals
if completed % max(1, total_tasks // 10) == 0 or completed == total_tasks:
progress = (completed / total_tasks) * 100
self.logger.info(f"Batch progress: {completed}/{total_tasks} ({progress:.1f}%)")
The asyncio.as_completed
approach gives us results as soon as they're available, rather than waiting for all tasks to complete.
Step 32: Graceful Error Handling in Batch Processing¶
Even when individual tasks fail, the batch continues:
except Exception as e:
# Create error result for failed tasks
error_result = {
"task_id": f"unknown_task_{completed}",
"status": "error",
"error": str(e),
"error_type": type(e).__name__,
"execution_time": 0.0,
"timestamp": datetime.now().isoformat()
}
results.append(error_result)
completed += 1
self.logger.error(f"Task failed in batch: {str(e)}")
return results
This ensures one failing task doesn't bring down the entire batch operation.
Step 33: Comprehensive Batch Metrics Calculation¶
We calculate detailed metrics to understand batch performance:
def _calculate_batch_metrics(self, results: List[Dict[str, Any]],
batch_duration: float, total_tasks: int) -> Dict[str, Any]:
"""Calculate comprehensive batch execution metrics"""
successful_tasks = len([r for r in results if r.get("status") == "success"])
failed_tasks = len(results) - successful_tasks
# Calculate timing metrics for performance analysis
execution_times = [r.get("execution_time", 0.0) for r in results if "execution_time" in r]
avg_task_time = sum(execution_times) / len(execution_times) if execution_times else 0.0
return {
"total_tasks": total_tasks,
"successful_tasks": successful_tasks,
"failed_tasks": failed_tasks,
"success_rate": successful_tasks / total_tasks if total_tasks > 0 else 0.0,
"batch_duration": batch_duration,
"average_task_time": avg_task_time,
"concurrency_used": min(total_tasks, self._config.max_concurrent),
"throughput_tasks_per_second": total_tasks / batch_duration if batch_duration > 0 else 0.0,
"efficiency": (successful_tasks / total_tasks) * (avg_task_time / batch_duration) if batch_duration > 0 and total_tasks > 0 else 0.0
}
These metrics help you understand not just whether the batch succeeded, but how efficiently it used resources and where optimization opportunities might exist.
Step 34: Pipeline Performance Analytics¶
Finally, we provide comprehensive performance analytics for pipeline optimization:
def _calculate_performance_metrics(self, execution_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Calculate performance metrics for pipeline execution"""
if not execution_results:
return {"error": "No execution results to analyze"}
execution_times = [r.get("execution_time", 0.0) for r in execution_results]
successful_stages = [r for r in execution_results if r.get("status") == "success"]
return {
"total_stages": len(execution_results),
"successful_stages": len(successful_stages),
"failure_rate": (len(execution_results) - len(successful_stages)) / len(execution_results),
"average_stage_time": sum(execution_times) / len(execution_times) if execution_times else 0.0,
"min_stage_time": min(execution_times) if execution_times else 0.0,
"max_stage_time": max(execution_times) if execution_times else 0.0,
"total_execution_time": sum(execution_times),
"stage_efficiency": len(successful_stages) / len(execution_results) if execution_results else 0.0
}
These metrics help identify bottlenecks, optimize resource allocation, and improve overall pipeline performance.
Part 2: Dynamic Agent Assembly and CLI Integration (15 minutes)¶
Runtime Agent Composition¶
🗂️ File: src/session6/dynamic_assembly.py
- Runtime composition systems
Dynamic agent assembly allows us to build pipelines at runtime based on capabilities and requirements. Let's explore this powerful concept step by step.
Step 35: Foundation - Capability System¶
First, we establish the capability framework that enables intelligent agent selection:
from typing import Dict, List, Any, Type, Optional
import importlib
import inspect
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
import json
class AgentCapability(Enum):
"""Standard atomic agent capabilities"""
TASK_PROCESSING = "task_processing" # General task execution
CONTENT_ANALYSIS = "content_analysis" # Text/content analysis
DATA_TRANSFORMATION = "data_transformation" # Data format conversion
COMMUNICATION = "communication" # External system interaction
VALIDATION = "validation" # Data/result validation
REPORTING = "reporting" # Report generation
This capability system allows the dynamic assembly to understand what each agent can do, enabling intelligent pipeline construction.
Step 36: Agent Definition Schema¶
Next, we define how agents are described for dynamic instantiation:
@dataclass
class AgentDefinition:
"""Runtime agent definition"""
agent_class: str # Class name to instantiate
module_path: str # Python module path
capabilities: List[AgentCapability] # What this agent can do
input_schema: str # Expected input format
output_schema: str # Produced output format
configuration: Dict[str, Any] # Default configuration
resource_requirements: Dict[str, Any] # Resource needs (memory, CPU)
This definition acts as a "recipe" for creating agent instances, including everything needed for instantiation and compatibility checking.
Step 37: Dynamic Agent Registry Setup¶
The registry acts as a smart catalog of available agents with sophisticated indexing:
class DynamicAgentRegistry:
"""Registry for dynamic agent discovery and instantiation"""
def __init__(self):
self.registered_agents: Dict[str, AgentDefinition] = {}
self.capability_index: Dict[AgentCapability, List[str]] = {}
self.schema_compatibility: Dict[str, List[str]] = {}
The registry maintains three key data structures: the main agent definitions, capability-based indexing for fast lookup, and schema compatibility mapping for pipeline chaining.
Step 38: Intelligent Agent Registration¶
Agent registration builds multiple indices for efficient discovery:
def register_agent(self, agent_id: str, definition: AgentDefinition):
"""Register an agent definition for dynamic instantiation"""
self.registered_agents[agent_id] = definition
# Build capability-based index for fast lookup
for capability in definition.capabilities:
if capability not in self.capability_index:
self.capability_index[capability] = []
self.capability_index[capability].append(agent_id)
# Build schema compatibility index
output_schema = definition.output_schema
if output_schema not in self.schema_compatibility:
self.schema_compatibility[output_schema] = []
# Find agents that can consume this agent's output
for other_id, other_def in self.registered_agents.items():
if other_def.input_schema == output_schema:
self.schema_compatibility[output_schema].append(other_id)
The indexing system enables O(1) lookup by capability and automatic discovery of compatible agent chains.
Step 39: Agent Discovery Methods¶
The registry provides multiple ways to find agents based on different criteria:
def find_agents_by_capability(self, capability: AgentCapability) -> List[str]:
"""Find all agents with specified capability"""
return self.capability_index.get(capability, [])
def find_compatible_agents(self, output_schema: str) -> List[str]:
"""Find agents compatible with given output schema"""
return self.schema_compatibility.get(output_schema, [])
These methods enable capability-driven and schema-driven agent discovery, essential for automatic pipeline construction.
Step 40: Dynamic Agent Instantiation¶
The registry can instantiate agents at runtime with custom configuration:
async def instantiate_agent(self, agent_id: str,
configuration_overrides: Dict[str, Any] = None) -> Any:
"""Dynamically instantiate an agent from its definition"""
if agent_id not in self.registered_agents:
raise ValueError(f"Agent {agent_id} not registered")
definition = self.registered_agents[agent_id]
# Load agent class dynamically using Python's import system
module = importlib.import_module(definition.module_path)
agent_class = getattr(module, definition.agent_class)
# Merge default config with runtime overrides
config = definition.configuration.copy()
if configuration_overrides:
config.update(configuration_overrides)
# Create and return the agent instance
return agent_class(**config)
This method demonstrates the power of Python's reflection capabilities - we can instantiate any class at runtime given its module path and name.
Step 41: Intelligent Pipeline Suggestion¶
The registry can automatically suggest pipeline configurations based on desired capabilities:
def suggest_pipeline(self, start_capability: AgentCapability,
end_capability: AgentCapability) -> List[List[str]]:
"""Suggest agent pipeline from start to end capability"""
start_agents = self.find_agents_by_capability(start_capability)
end_agents = self.find_agents_by_capability(end_capability)
pipeline_suggestions = []
# Try direct two-agent pipelines first
for start_agent_id in start_agents:
start_def = self.registered_agents[start_agent_id]
compatible_agents = self.find_compatible_agents(start_def.output_schema)
for middle_agent_id in compatible_agents:
middle_def = self.registered_agents[middle_agent_id]
if end_capability in middle_def.capabilities:
pipeline_suggestions.append([start_agent_id, middle_agent_id])
This algorithm starts with simple two-agent pipelines, trying to connect start and end capabilities directly.
Step 42: Three-Agent Pipeline Discovery¶
If direct connections aren't possible, we expand to three-agent pipelines:
else:
# Try three-agent pipelines if direct connection isn't possible
final_compatible = self.find_compatible_agents(middle_def.output_schema)
for end_agent_id in final_compatible:
end_def = self.registered_agents[end_agent_id]
if end_capability in end_def.capabilities:
pipeline_suggestions.append([start_agent_id, middle_agent_id, end_agent_id])
return pipeline_suggestions
This breadth-first approach finds the shortest possible pipelines that connect start and end capabilities. More sophisticated graph algorithms could be implemented for complex scenarios.
Step 43: CLI Foundation¶
Now let's build the CLI system that makes dynamic assembly accessible to DevOps teams:
class AtomicCLI:
"""Advanced CLI for atomic agent management and DevOps integration"""
def __init__(self, config_path: str = "atomic_config.json"):
self.config_path = Path(config_path)
self.config = self._load_config()
self.agent_registry = DynamicAgentRegistry()
self.logger = logging.getLogger(__name__)
The CLI provides a user-friendly interface for managing atomic agents in production environments, bridging the gap between code and operations.
Step 44: Configuration Management¶
The CLI uses a sophisticated configuration system with sensible defaults:
def _load_config(self) -> Dict[str, Any]:
"""Load atomic agent configuration with schema validation"""
# Define comprehensive default configuration
default_config = {
"agents": {}, # Registered agent definitions
"pipelines": {}, # Saved pipeline configurations
"providers": {}, # External service providers
"cli_settings": {
"log_level": "INFO",
"output_format": "json",
"auto_save": True
},
"monitoring": {
"enabled": True,
"metrics_retention_days": 7,
"performance_alerts": True
}
}
This comprehensive default configuration ensures the CLI works out-of-the-box for common scenarios.
Step 45: Configuration Loading and Error Handling¶
The configuration system handles missing files and parsing errors gracefully:
# Load existing config or use defaults
if self.config_path.exists():
try:
with open(self.config_path, 'r') as f:
loaded_config = json.load(f)
# Merge with defaults to ensure all keys exist
for key, value in default_config.items():
if key not in loaded_config:
loaded_config[key] = value
return loaded_config
except Exception as e:
self.logger.error(f"Failed to load config: {e}")
return default_config
else:
return default_config
This ensures the CLI is resilient to configuration issues and always starts with valid defaults.
Step 46: Built-in Agent Registration¶
The CLI comes with pre-configured agents for common use cases:
def register_builtin_agents(self):
"""Register common atomic agent types"""
# Task Processing Agent - General purpose task execution
self.agent_registry.register_agent("task_processor", AgentDefinition(
agent_class="TaskProcessorAgent",
module_path="atomic_agents.task_processor",
capabilities=[AgentCapability.TASK_PROCESSING],
input_schema="TaskInput",
output_schema="TaskOutput",
configuration={"model": "gpt-4o-mini", "temperature": 0.3},
resource_requirements={"memory_mb": 512, "cpu_cores": 1}
))
Task processing agents handle general-purpose work with balanced model settings.
Step 47: Content Analysis Agent Registration¶
The content analyzer is specialized for text analysis tasks:
# Content Analysis Agent - Specialized for text analysis
self.agent_registry.register_agent("content_analyzer", AgentDefinition(
agent_class="ContentAnalysisAgent",
module_path="atomic_agents.content_analysis",
capabilities=[AgentCapability.CONTENT_ANALYSIS],
input_schema="AnalysisInput",
output_schema="AnalysisOutput",
configuration={"model": "gpt-4o", "temperature": 0.1},
resource_requirements={"memory_mb": 1024, "cpu_cores": 1}
))
Notice the lower temperature (0.1) for more consistent analysis results.
Step 48: Data Transformation Agent Registration¶
The data transformer handles format conversions with deterministic settings:
# Data Transformer Agent - Format conversion and transformation
self.agent_registry.register_agent("data_transformer", AgentDefinition(
agent_class="DataTransformerAgent",
module_path="atomic_agents.data_transformer",
capabilities=[AgentCapability.DATA_TRANSFORMATION],
input_schema="TransformInput",
output_schema="TransformOutput",
configuration={"model": "gpt-4o-mini", "temperature": 0.0},
resource_requirements={"memory_mb": 256, "cpu_cores": 1}
))
Temperature 0.0 ensures deterministic transformations, crucial for data consistency.
Step 49: Dynamic Pipeline Creation Setup¶
The most powerful feature - creating pipelines automatically from capability requirements:
async def create_dynamic_pipeline(self, capability_sequence: List[AgentCapability]) -> AdvancedAtomicPipeline:
"""Create pipeline dynamically based on capability requirements"""
if len(capability_sequence) < 2:
raise ValueError("Pipeline requires at least 2 capabilities")
pipeline = AdvancedAtomicPipeline(f"dynamic_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
# Step 1: Find compatible agents for each capability
selected_agents = []
for i, capability in enumerate(capability_sequence):
candidates = self.agent_registry.find_agents_by_capability(capability)
if not candidates:
raise ValueError(f"No agents found for capability: {capability}")
# Select best agent (could use more sophisticated selection logic)
selected_agent_id = candidates[0] # Simple: pick first
The dynamic pipeline creation starts by finding agents that match each required capability.
Step 50: Schema Compatibility Validation¶
We ensure the selected agents can work together:
# Step 2: Validate schema compatibility with previous agent
if i > 0:
prev_agent_def = self.agent_registry.registered_agents[selected_agents[-1]]
current_agent_def = self.agent_registry.registered_agents[selected_agent_id]
if prev_agent_def.output_schema != current_agent_def.input_schema:
# Try to find compatible agent
compatible_agents = self.agent_registry.find_compatible_agents(prev_agent_def.output_schema)
capability_compatible = [
agent_id for agent_id in compatible_agents
if capability in self.agent_registry.registered_agents[agent_id].capabilities
]
if capability_compatible:
selected_agent_id = capability_compatible[0]
else:
raise ValueError(f"No schema-compatible agents found for {capability}")
selected_agents.append(selected_agent_id)
This ensures data can flow properly between pipeline stages.
Step 51: Pipeline Assembly and Stage Creation¶
Finally, we instantiate agents and build the complete pipeline:
# Step 3: Instantiate agents and build pipeline
for i, agent_id in enumerate(selected_agents):
agent_instance = await self.agent_registry.instantiate_agent(agent_id)
stage = PipelineStage(
stage_id=f"stage_{i}_{agent_id}",
agent=agent_instance,
stage_name=f"Stage {i+1}: {agent_id}",
description=f"Process using {agent_id} agent",
config=StageConfiguration(error_policy=ErrorPolicy.RETRY, retry_count=2)
)
pipeline.add_stage(stage)
return pipeline
This method demonstrates the full power of dynamic composition - from capability requirements to a fully functional pipeline, all generated automatically with schema validation and error handling.
Module Summary¶
You've now mastered advanced atomic agent composition patterns:
✅ Pipeline Orchestration: Built sophisticated sequential processing with error handling and monitoring
✅ Parallel Processing: Implemented load-balanced parallel execution with fault tolerance
✅ Dynamic Assembly: Created runtime agent composition systems with capability-based selection
✅ CLI Integration: Designed DevOps-friendly command-line tools for atomic agent management
📝 Multiple Choice Test - Module A¶
Test your understanding of advanced atomic agent composition patterns:
Question 1: What method does the PipelineOrchestrator use to handle errors during stage execution? A) Stop pipeline execution immediately
B) Skip failed stages and continue
C) Retry with exponential backoff and circuit breaker protection
D) Log errors but ignore them
Question 2: How does the ParallelProcessor determine agent assignment for balanced load distribution? A) Random assignment only
B) Round-robin assignment based on agent IDs
C) Workload calculation considering active tasks and agent capacity
D) First-available agent selection
Question 3: What factors does the DynamicAssembly system consider when selecting agents for capability matching? A) Agent name only
B) Capability scores, performance metrics, and availability status
C) Creation timestamp
D) Memory usage only
Question 4: What information does the AtomicCLI provide when displaying pipeline status? A) Just success/failure status
B) Comprehensive execution details including stage status, timing, and error information
C) Agent names only
D) Memory consumption
Question 5: How does the error handling in advanced composition patterns ensure pipeline reliability? A) Single retry attempt
B) Circuit breaker integration with configurable retry policies and failure tracking
C) Manual intervention required
D) No error handling
Next Steps¶
- Continue to Module B: Enterprise Modular Systems for production-scale architectures
- Return to Core: Session 6 Main
- Advance to Session 7: First ADK Agent
🗂️ Source Files for Module A:
src/session6/advanced_composition.py
- Sophisticated pipeline patternssrc/session6/dynamic_assembly.py
- Runtime composition systemssrc/session6/atomic_cli.py
- DevOps CLI integration