Skip to content

⚙️ Session 7 Advanced: Choreography Systems

⚙️ IMPLEMENTER PATH CONTENT Prerequisites: Complete 🎯 Observer and 📝 Participant paths in main session Time Investment: 3-4 hours Outcome: Deep mastery of distributed A2A choreography patterns

Advanced Learning Outcomes

After completing this module, you will master:

  • Event-driven choreography patterns for distributed agent coordination
  • Complex event pattern recognition and response systems
  • Distributed state management for choreographed workflows
  • Production-grade event sourcing and replay capabilities

The Dance of Autonomy - Choreography Patterns

While orchestration is like conducting a symphony, choreography is like a perfectly synchronized dance where each performer knows their role and responds to the movements of others without a central conductor.

The Philosophy of Distributed Harmony

Choreography represents a fundamental shift from centralized control to distributed intelligence, where agents coordinate through shared events and patterns rather than explicit commands.

# Advanced choreography imports
import asyncio
from typing import Dict, List, Any, Callable, Optional
from dataclasses import dataclass
from datetime import datetime
import logging

These imports provide the foundation for sophisticated event-driven coordination systems.

# Choreography framework imports
from a2a.protocol import A2AMessage, MessageType
from a2a.router import MessageRouter

logger = logging.getLogger(__name__)

Choreography builds upon the existing A2A infrastructure while adding event-driven coordination capabilities.

Defining Dance Steps: Event Patterns

Each event pattern is like a dance move that triggers when specific conditions are met:

@dataclass
class EventPattern:
    """Defines an event pattern that triggers agent actions."""
    event_type: str
    condition: str              # Python expression to evaluate
    action: str                 # Action to perform when pattern matches
    target_capability: str      # Required capability for handling agent
    priority: int = 1           # Pattern priority (higher = more important)
    correlation_keys: List[str] = None  # Keys for event correlation
    timeout_seconds: int = 300  # Pattern timeout for complex sequences

    def __post_init__(self):
        if self.correlation_keys is None:
            self.correlation_keys = []

Event patterns define sophisticated triggering conditions with correlation and timeout management.

@dataclass
class EventSequence:
    """Defines a sequence of events that must occur in order."""
    sequence_id: str
    events: List[str]           # Event types in required order
    max_interval: int = 60      # Maximum seconds between events
    action: str                 # Action to trigger when sequence completes
    target_capability: str      # Capability needed for action

Event sequences enable complex multi-step choreography patterns with timing constraints.

The Dance Studio: Choreography Engine

The choreography engine watches for events and triggers appropriate responses, like a dance instructor who recognizes when it's time for the next movement:

class ChoreographyEngine:
    """Event-driven choreography engine for agent coordination."""

    def __init__(self, router: MessageRouter):
        self.router = router
        self.event_patterns: List[EventPattern] = []
        self.event_sequences: List[EventSequence] = []
        self.event_history: List[Dict] = []
        self.sequence_states: Dict[str, Dict] = {}  # Track sequence progress
        self.max_history = 1000

        # Advanced event processing
        self.event_correlations: Dict[str, List[Dict]] = {}
        self.pattern_metrics: Dict[str, Dict] = {}

        # Register message handler
        self.router.register_handler("choreography_event", self._handle_choreography_event)

The enhanced choreography engine provides sophisticated event correlation, sequence tracking, and performance metrics.

Publishing Events: Broadcasting Dance Cues

When something significant happens, agents can publish events that may trigger coordinated responses from other agents:

    async def publish_event(self, event_type: str, event_data: Dict[str, Any],
                          source_agent: str = None, correlation_id: str = None):
        """Publish an event that may trigger choreographed actions."""

        event = {
            "event_id": f"evt_{int(datetime.now().timestamp() * 1000)}",
            "event_type": event_type,
            "timestamp": datetime.now().isoformat(),
            "source_agent": source_agent,
            "correlation_id": correlation_id or f"corr_{event_type}_{datetime.now().timestamp()}",
            "data": event_data
        }

        # Add to event history with rotation
        self.event_history.append(event)
        if len(self.event_history) > self.max_history:
            self.event_history.pop(0)

        # Update correlation tracking
        if correlation_id:
            if correlation_id not in self.event_correlations:
                self.event_correlations[correlation_id] = []
            self.event_correlations[correlation_id].append(event)

        logger.info(f"Published event: {event_type} from {source_agent}")

        # Process event patterns and sequences
        await self._process_event_patterns(event)
        await self._process_event_sequences(event)

Enhanced event publishing provides correlation tracking and comprehensive pattern processing.

The Intelligence of Pattern Recognition

The engine evaluates complex conditions to determine when patterns should trigger:

    def _evaluate_condition(self, condition: str, event: Dict[str, Any]) -> bool:
        """Evaluate a condition expression against event data."""

        if not condition or condition == "true":
            return True

        try:
            # Create comprehensive evaluation context
            context = {
                "event": event,
                "data": event["data"],
                "source": event.get("source_agent"),
                "timestamp": event.get("timestamp"),
                "correlation_id": event.get("correlation_id"),

                # Advanced context
                "recent_events": self.event_history[-10:],
                "correlated_events": self._get_correlated_events(event),
                "event_count": len(self.event_history),
                "source_event_count": self._count_events_from_source(event.get("source_agent"))
            }

            # Enhanced safety: restricted eval environment
            safe_dict = {
                '__builtins__': {},
                'len': len,
                'str': str,
                'int': int,
                'float': float,
                'bool': bool,
                'abs': abs,
                'min': min,
                'max': max
            }

            result = eval(condition, safe_dict, context)
            return bool(result)

        except Exception as e:
            logger.warning(f"Failed to evaluate condition '{condition}': {e}")
            return False

Advanced condition evaluation provides rich context with safety constraints for production use.

Event Correlation and Analysis

The choreography engine provides sophisticated event correlation capabilities:

    def _get_correlated_events(self, event: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Get events correlated with the given event."""
        correlation_id = event.get("correlation_id")
        if not correlation_id:
            return []

        return self.event_correlations.get(correlation_id, [])

    def _analyze_event_patterns(self, correlation_id: str) -> Dict[str, Any]:
        """Analyze patterns in correlated events."""
        events = self.event_correlations.get(correlation_id, [])
        if not events:
            return {}

        # Time-based analysis
        timestamps = [datetime.fromisoformat(e["timestamp"]) for e in events]
        duration = (max(timestamps) - min(timestamps)).total_seconds()

        # Frequency analysis
        event_types = [e["event_type"] for e in events]
        type_counts = {et: event_types.count(et) for et in set(event_types)}

        # Source analysis
        sources = [e.get("source_agent") for e in events if e.get("source_agent")]
        unique_sources = len(set(sources))

        return {
            "event_count": len(events),
            "duration_seconds": duration,
            "event_types": type_counts,
            "unique_sources": unique_sources,
            "average_interval": duration / max(1, len(events) - 1)
        }

Event correlation analysis enables sophisticated pattern recognition and workflow optimization.

Processing Event Sequences

Complex choreography often requires recognition of event sequences:

    async def _process_event_sequences(self, event: Dict[str, Any]):
        """Process event against defined sequences."""

        for sequence in self.event_sequences:
            await self._check_sequence_progress(sequence, event)

    async def _check_sequence_progress(self, sequence: EventSequence, event: Dict[str, Any]):
        """Check if event advances a sequence."""

        if event["event_type"] not in sequence.events:
            return

        sequence_key = f"{sequence.sequence_id}_{event.get('correlation_id', 'default')}"

        # Initialize sequence state if needed
        if sequence_key not in self.sequence_states:
            self.sequence_states[sequence_key] = {
                "sequence": sequence,
                "progress": [],
                "started_at": datetime.now(),
                "last_event_at": None
            }

        state = self.sequence_states[sequence_key]

        # Check if this is the next expected event
        expected_index = len(state["progress"])
        if (expected_index < len(sequence.events) and
            sequence.events[expected_index] == event["event_type"]):

            # Check timing constraint
            if state["last_event_at"]:
                interval = (datetime.now() - state["last_event_at"]).total_seconds()
                if interval > sequence.max_interval:
                    # Sequence timed out, reset
                    logger.warning(f"Sequence {sequence.sequence_id} timed out")
                    del self.sequence_states[sequence_key]
                    return

            # Advance sequence
            state["progress"].append(event)
            state["last_event_at"] = datetime.now()

            # Check if sequence is complete
            if len(state["progress"]) == len(sequence.events):
                await self._trigger_sequence_action(sequence, state)
                del self.sequence_states[sequence_key]

Sequence processing enables complex multi-step choreography patterns with proper timing validation.

Advanced Choreography Patterns

Real-world choreography often involves sophisticated patterns:

    async def register_complex_pattern(self, pattern_config: Dict[str, Any]):
        """Register a complex choreography pattern."""

        if pattern_config["type"] == "temporal_sequence":
            # Events must occur within time windows
            pattern = EventPattern(
                event_type=pattern_config["trigger_event"],
                condition=f"""
                    len([e for e in recent_events
                         if e['event_type'] in {pattern_config['required_events']}
                         and (datetime.fromisoformat(event['timestamp']) -
                              datetime.fromisoformat(e['timestamp'])).total_seconds()
                         <= {pattern_config['time_window']}]) >= {len(pattern_config['required_events'])}
                """,
                action=pattern_config["action"],
                target_capability=pattern_config["capability"],
                priority=pattern_config.get("priority", 1)
            )

        elif pattern_config["type"] == "threshold_pattern":
            # Trigger when event count reaches threshold
            pattern = EventPattern(
                event_type=pattern_config["event_type"],
                condition=f"""
                    len([e for e in recent_events
                         if e['event_type'] == '{pattern_config['event_type']}'
                         and (datetime.fromisoformat(event['timestamp']) -
                              datetime.fromisoformat(e['timestamp'])).total_seconds()
                         <= {pattern_config['time_window']}]) >= {pattern_config['threshold']}
                """,
                action=pattern_config["action"],
                target_capability=pattern_config["capability"],
                priority=pattern_config.get("priority", 1)
            )

        elif pattern_config["type"] == "state_transition":
            # Complex state-based transitions
            pattern = EventPattern(
                event_type=pattern_config["event_type"],
                condition=pattern_config["condition"],  # Complex custom condition
                action=pattern_config["action"],
                target_capability=pattern_config["capability"],
                priority=pattern_config.get("priority", 1)
            )

        self.event_patterns.append(pattern)
        logger.info(f"Registered complex pattern: {pattern_config['type']}")

Complex pattern registration enables sophisticated real-world choreography scenarios.

A Real-World Dance: E-Commerce Order Processing Choreography

Here's how choreography works in a complex e-commerce order processing system:

def create_ecommerce_choreography() -> List[EventPattern]:
    """Create choreography patterns for e-commerce order processing."""

    return [
        # Order received - trigger inventory check
        EventPattern(
            event_type="order_received",
            condition="data.get('order_value') > 0",
            action="check_inventory",
            target_capability="inventory_management",
            priority=8
        ),

        # Inventory confirmed - trigger payment processing
        EventPattern(
            event_type="inventory_confirmed",
            condition="data.get('items_available') == True",
            action="process_payment",
            target_capability="payment_processing",
            priority=7
        ),

        # Payment successful - trigger fulfillment
        EventPattern(
            event_type="payment_successful",
            condition="data.get('payment_status') == 'confirmed'",
            action="initiate_fulfillment",
            target_capability="order_fulfillment",
            priority=6
        ),

        # High-value order special handling
        EventPattern(
            event_type="order_received",
            condition="data.get('order_value', 0) > 1000",
            action="request_manual_review",
            target_capability="fraud_detection",
            priority=9
        ),

        # Inventory shortage - trigger supplier notification
        EventPattern(
            event_type="inventory_shortage",
            condition="data.get('shortage_quantity', 0) > 0",
            action="notify_suppliers",
            target_capability="supplier_management",
            priority=5
        ),

        # Payment failed - trigger retry or cancellation
        EventPattern(
            event_type="payment_failed",
            condition="data.get('retry_count', 0) < 3",
            action="retry_payment",
            target_capability="payment_processing",
            priority=7
        ),

        # Multiple payment failures - cancel order
        EventPattern(
            event_type="payment_failed",
            condition="data.get('retry_count', 0) >= 3",
            action="cancel_order",
            target_capability="order_management",
            priority=8
        ),

        # Fulfillment complete - trigger shipping
        EventPattern(
            event_type="fulfillment_complete",
            condition="data.get('items_packed') == True",
            action="schedule_shipping",
            target_capability="shipping_management",
            priority=5
        ),

        # Shipping complete - trigger customer notification
        EventPattern(
            event_type="shipping_complete",
            condition="data.get('tracking_number') is not None",
            action="send_tracking_notification",
            target_capability="customer_communication",
            priority=4
        ),

        # Delivery confirmed - trigger review request
        EventPattern(
            event_type="delivery_confirmed",
            condition="True",  # Always trigger for completed deliveries
            action="request_product_review",
            target_capability="customer_engagement",
            priority=2
        )
    ]

This choreography creates a sophisticated order processing pipeline where each agent responds to events autonomously, creating a resilient distributed workflow.

Event Sourcing and Replay

Production choreography systems need event sourcing capabilities:

class EventStore:
    """Event store for choreography event sourcing and replay."""

    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.event_streams: Dict[str, List[Dict]] = {}

    async def append_event(self, stream_id: str, event: Dict[str, Any]):
        """Append event to a stream."""

        if stream_id not in self.event_streams:
            self.event_streams[stream_id] = []

        # Add stream metadata
        event_with_metadata = {
            **event,
            "stream_id": stream_id,
            "stream_position": len(self.event_streams[stream_id]),
            "global_position": await self._get_next_global_position()
        }

        self.event_streams[stream_id].append(event_with_metadata)
        await self.storage.persist_event(event_with_metadata)

    async def replay_events(self, choreography_engine: ChoreographyEngine,
                          stream_id: str = None, from_position: int = 0):
        """Replay events to reconstruct choreography state."""

        if stream_id:
            events = self.event_streams.get(stream_id, [])[from_position:]
        else:
            # Replay all events in global order
            events = []
            for stream in self.event_streams.values():
                events.extend(stream)
            events.sort(key=lambda x: x["global_position"])
            events = events[from_position:]

        logger.info(f"Replaying {len(events)} events")

        for event in events:
            await choreography_engine.publish_event(
                event["event_type"],
                event["data"],
                event.get("source_agent"),
                event.get("correlation_id")
            )

Event sourcing enables system recovery, debugging, and audit capabilities essential for production systems.

Performance Monitoring and Optimization

Production choreography requires sophisticated monitoring:

    async def generate_choreography_metrics(self) -> Dict[str, Any]:
        """Generate comprehensive choreography performance metrics."""

        # Event volume metrics
        recent_events = [e for e in self.event_history
                        if (datetime.now() - datetime.fromisoformat(e["timestamp"])).total_seconds() <= 3600]

        event_rate = len(recent_events) / 3600  # Events per second

        # Pattern performance metrics
        pattern_stats = {}
        for pattern in self.event_patterns:
            pattern_id = f"{pattern.event_type}:{pattern.action}"
            pattern_stats[pattern_id] = {
                "trigger_count": self.pattern_metrics.get(pattern_id, {}).get("triggers", 0),
                "success_count": self.pattern_metrics.get(pattern_id, {}).get("successes", 0),
                "failure_count": self.pattern_metrics.get(pattern_id, {}).get("failures", 0),
                "average_response_time": self.pattern_metrics.get(pattern_id, {}).get("avg_response_time", 0)
            }

        # Correlation efficiency
        correlation_stats = {
            "active_correlations": len(self.event_correlations),
            "average_events_per_correlation": sum(len(events) for events in self.event_correlations.values()) / max(1, len(self.event_correlations)),
            "longest_correlation": max((len(events) for events in self.event_correlations.values()), default=0)
        }

        # Sequence completion rates
        sequence_stats = {
            "active_sequences": len(self.sequence_states),
            "completed_sequences": sum(1 for seq in self.event_sequences for state in self.sequence_states.values() if state["sequence"].sequence_id == seq.sequence_id and len(state["progress"]) == len(seq.events))
        }

        return {
            "timestamp": datetime.now().isoformat(),
            "event_rate_per_hour": event_rate * 3600,
            "total_events_processed": len(self.event_history),
            "pattern_statistics": pattern_stats,
            "correlation_statistics": correlation_stats,
            "sequence_statistics": sequence_stats,
            "memory_usage": {
                "event_history_size": len(self.event_history),
                "correlation_entries": len(self.event_correlations),
                "sequence_states": len(self.sequence_states)
            }
        }

Comprehensive metrics enable optimization and troubleshooting of production choreography systems.

Production Considerations

Scalability Patterns

Enterprise choreography must handle massive event volumes:

  • Event partitioning: Distributing events across multiple engine instances
  • Stream processing: Using event streams for high-throughput scenarios
  • Caching strategies: Optimizing pattern matching performance
  • Load balancing: Distributing choreography processing load

Reliability Patterns

Production choreography requires bulletproof reliability:

  • Event deduplication: Handling duplicate events gracefully
  • Circuit breakers: Preventing cascade failures in event processing
  • Dead letter queues: Managing failed event processing
  • Eventual consistency: Handling distributed state synchronization

Security Considerations

Enterprise choreography needs comprehensive security:

  • Event authentication: Verifying event sources and integrity
  • Pattern authorization: Controlling which agents can register patterns
  • Data privacy: Ensuring sensitive data in events is protected
  • Audit trails: Maintaining complete event processing logs

📝 Multiple Choice Test - Session 7

Test your understanding of the concepts covered in this session.

Question 1: What is the primary benefit of the concepts covered in this session?
A) Reduced complexity
B) Improved performance and scalability
C) Lower costs
D) Easier implementation

Question 2: Which approach is recommended for production deployments?
A) Manual configuration
B) Automated systems with proper monitoring
C) Simple setup without monitoring
D) Basic implementation only

Question 3: What is a key consideration when implementing these patterns?
A) Cost optimization only
B) Security, scalability, and maintainability
C) Speed of development only
D) Minimal feature set

Question 4: How should error handling be implemented?
A) Ignore errors
B) Basic try-catch only
C) Comprehensive error handling with logging and recovery
D) Manual error checking

Question 5: What is important for production monitoring?
A) No monitoring needed
B) Basic logs only
C) Comprehensive metrics, alerts, and observability
D) Manual checking only

View Solutions →


Previous: Session 6 - Modular Architecture →
Next: Session 8 - Production Ready →