Skip to content

Session 7: Agent-to-Agent Communication (A2A) - Enterprise Multi-Agent Orchestration

Learning Outcomes

By the end of this session, you will be able to: - Understand the Agent2Agent (A2A) protocol standard and Google's multi-agent orchestration framework - Implement JSON-RPC 2.0 based agent communication with enterprise security - Create agent discovery systems using standardized Agent Cards - Build collaborative agent teams with distributed task management - Deploy scalable multi-agent networks with Microsoft Azure AI Foundry integration

Chapter Overview

What You'll Learn: The A2A Protocol Standard

In this session, we'll implement the Agent2Agent (A2A) protocol - the first open standard designed specifically for AI agent collaboration. A2A enables multiple AI agents to communicate, coordinate, and share information across different platforms and organizational boundaries while maintaining enterprise-grade security and observability.

Why This Matters: The 2024-2025 Multi-Agent Revolution

Based on industry research, A2A represents a critical advancement in enterprise AI:

  • Industry Consortium: Google launched A2A with support from 50+ technology partners including Atlassian, Box, Cohere, Intuit, LangChain, MongoDB, PayPal, Salesforce, SAP, ServiceNow, and major consulting firms (Accenture, BCG, McKinsey, PwC, Deloitte)
  • Enterprise Integration: Microsoft committed to A2A support in Azure AI Foundry and Copilot Studio for cross-cloud agent collaboration
  • Protocol Foundation: Built on JSON-RPC 2.0 and HTTP(S) with enterprise features like streaming, push notifications, and standard authentication
  • Task Management Standard: Formal task lifecycle management (submitted, working, completed, failed) with tracking IDs
  • Distributed Memory: Standardized protocol for agents to exchange task-specific information and maintain coherent interaction history

How A2A Stands Out: Enterprise-Grade Agent Orchestration

The Agent2Agent protocol differentiates itself through standardized enterprise features: - Agent Cards: Machine-readable JSON documents advertising capabilities, endpoints, and authentication requirements - Task Lifecycle Management: Structured workflows with states (pending, in progress, completed) and message exchanges - Transport Flexibility: HTTP(S) for standard communication, Server-Sent Events (SSE) for real-time streaming - Security Integration: Enterprise authentication with parity to OpenAPI's authentication schemes - Cross-Platform Compatibility: Enables collaboration across clouds, platforms, and organizational boundaries

Where You'll Apply This: Enterprise Multi-Agent Use Cases

A2A excels in complex enterprise scenarios requiring agent collaboration: - Enterprise Workflow Automation: Agents coordinating across CRM, ERP, and business intelligence systems - Cross-Organizational Collaboration: Secure agent communication between different companies and platforms - Distributed Decision Making: Multiple specialized agents contributing to complex business decisions - Scalable AI Operations: Agent networks that can grow and adapt with changing business requirements - Compliance and Auditing: Standardized communication patterns that support regulatory requirements

A2A Communication Architecture Figure 1: A2A enterprise architecture showing Agent Cards for discovery, JSON-RPC 2.0 communication, distributed task management, and cross-platform coordination enabling scalable multi-agent collaboration

Learning Path Options

Observer Path (30 minutes): Understand A2A protocol concepts and enterprise patterns - Focus: Quick insights into Agent Cards, task management, and distributed coordination - Best for: Getting oriented with enterprise multi-agent architectures

🙋‍♂️ Participant Path (55 minutes): Implement working A2A agents and coordination systems
- Focus: Hands-on Agent Card creation, JSON-RPC communication, and task lifecycle management - Best for: Building practical enterprise multi-agent systems

🛠️ Implementer Path (85 minutes): Advanced enterprise deployment and cross-platform integration - Focus: Azure AI Foundry integration, security patterns, and scalable orchestration - Best for: Enterprise multi-agent architecture and platform integration


Part 1: A2A Protocol Architecture (Observer: 10 min | Participant: 22 min)

The Enterprise Multi-Agent Challenge

As organizations deploy multiple AI agents across different platforms and departments, they face the fundamental challenge of enabling secure, standardized communication between heterogeneous agent systems. A2A solves this through formal protocol standardization.

Enterprise Requirements Addressed: 1. Cross-Platform Discovery: Finding and registering agents across different cloud environments 2. Standardized Communication: JSON-RPC 2.0 based message exchange with enterprise security 3. Task Coordination: Formal lifecycle management for complex multi-agent workflows 4. Distributed Memory: Maintaining coherent interaction history across agent networks 5. Audit and Compliance: Traceable communication patterns for regulatory requirements

OBSERVER PATH: A2A Protocol Standards

Google's A2A Foundation (2024-2025): The Agent2Agent protocol emerged as the first open standard specifically for AI agent collaboration, addressing the collaboration challenges when agents are developed by different organizations.

Core Protocol Components: 1. Agent Cards: JSON documents advertising agent capabilities, endpoints, and requirements 2. Task Management: Formal task lifecycle with states (pending, working, completed, failed) 3. Message Exchange: JSON-RPC 2.0 over HTTP(S) with streaming support via Server-Sent Events 4. Authentication: Enterprise-grade security with OpenAPI authentication scheme parity 5. Distributed Coordination: Cross-platform task coordination with unique tracking identifiers

Industry Consortium Support: With 50+ technology partners including major enterprise platforms (Salesforce, SAP, ServiceNow) and consulting firms (McKinsey, PwC, Deloitte), A2A provides production-ready enterprise adoption.

PARTICIPANT PATH: Implementing A2A JSON-RPC Protocol

Step 1: Standard A2A Message Structure

Implement the JSON-RPC 2.0 based A2A protocol following Google's specification:

# a2a/protocol.py - Google A2A Standard Implementation

from typing import Dict, Any, List, Optional, Union, Literal
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from enum import Enum
import uuid
import json
from jsonrpc import JSONRPCRequestManager, dispatcher

class TaskState(Enum):
    """A2A standard task lifecycle states."""
    SUBMITTED = "submitted"
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class MessagePriority(Enum):
    """A2A message priority levels for enterprise workflows."""
    LOW = "low"
    NORMAL = "normal"
    HIGH = "high"
    URGENT = "urgent"

@dataclass
class A2ATask:
    """A2A standard task with formal lifecycle management."""

    # Task identification (A2A standard)
    task_id: str
    session_id: str = None
    correlation_id: str = None

    # Task lifecycle
    state: TaskState = TaskState.SUBMITTED
    created_at: datetime = None
    updated_at: datetime = None

    # Task content
    task_type: str = None
    description: str = None
    requirements: Dict[str, Any] = None
    context: Dict[str, Any] = None

    # Agent routing
    requesting_agent: str = None
    assigned_agent: str = None

    # Results and artifacts
    result: Any = None
    artifacts: List[Dict[str, Any]] = None
    error_info: Dict[str, Any] = None

    def __post_init__(self):
        if not self.task_id:
            self.task_id = str(uuid.uuid4())
        if not self.session_id:
            self.session_id = str(uuid.uuid4())
        if not self.created_at:
            self.created_at = datetime.now(timezone.utc)
        if not self.artifacts:
            self.artifacts = []

A2A Standard Benefits: - Task Tracking: Unique identifiers enable distributed task coordination - Lifecycle Management: Formal states provide clear workflow progression - Artifact Support: Structured storage for task outputs and intermediate results - Enterprise Integration: Compatible with existing enterprise task management systems

Step 2: Agent Card Implementation

Implement standardized Agent Cards for discovery:

@dataclass
class AgentCard:
    """A2A standard Agent Card for capability advertisement."""

    # Agent identity
    agent_id: str
    name: str
    version: str
    description: str

    # Capabilities and endpoints
    capabilities: List[str]  # What the agent can do
    endpoints: Dict[str, str]  # Available API endpoints
    supported_protocols: List[str] = None  # ["jsonrpc-2.0", "http", "sse"]

    # Authentication and security
    authentication: Dict[str, Any] = None  # Auth requirements
    rate_limits: Dict[str, Any] = None     # Usage limits

    # Operational metadata
    availability: str = "24/7"  # Availability schedule
    response_time_sla: str = "<1s"  # Expected response time
    max_concurrent_tasks: int = 10

    # Contact and governance
    owner: str = None
    contact_email: str = None
    documentation_url: str = None

    def to_json(self) -> str:
        """Export Agent Card as JSON for A2A discovery."""
        return json.dumps(asdict(self), indent=2, default=str)
# Message metadata
timestamp: str = None
priority: Priority = Priority.NORMAL
ttl: int = 300             # Time to live in seconds

# Content
action: str = None          # What action to perform
payload: Dict[str, Any] = None
capabilities_required: List[str] = None

# Response handling
requires_response: bool = False
timeout: int = 30          # Response timeout in seconds

def __post_init__(self):
    if self.message_id is None:
        self.message_id = str(uuid.uuid4())
    if self.timestamp is None:
        self.timestamp = datetime.now().isoformat()
    if self.payload is None:
        self.payload = {}

def to_dict(self) -> Dict[str, Any]:
    """Convert message to dictionary for serialization."""
    data = asdict(self)
    # Convert enums to their values
    data['message_type'] = self.message_type.value
    data['priority'] = self.priority.value
    return data

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'A2AMessage':
    """Create message from dictionary."""
    # Convert enum values back to enums
    if 'message_type' in data:
        data['message_type'] = MessageType(data['message_type'])
    if 'priority' in data:
        data['priority'] = Priority(data['priority'])

    return cls(**data)

def to_json(self) -> str:
    """Serialize message to JSON."""
    return json.dumps(self.to_dict())

@classmethod
def from_json(cls, json_str: str) -> 'A2AMessage':
    """Deserialize message from JSON."""
    return cls.from_dict(json.loads(json_str))

@dataclass class AgentCapability: """Describes an agent's capability."""

name: str
description: str
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
cost: float = 0.0           # Resource cost
latency: float = 1.0        # Expected latency in seconds
reliability: float = 0.95   # Reliability score (0-1)

def to_dict(self) -> Dict[str, Any]:
    return asdict(self)

@dataclass class AgentProfile: """Agent profile for discovery and coordination."""

agent_id: str
name: str
description: str
capabilities: List[AgentCapability]
endpoint: str               # How to reach this agent
status: str = "active"      # active, busy, maintenance, offline
load: float = 0.0          # Current load (0-1)  
last_heartbeat: str = None
metadata: Dict[str, Any] = None

def __post_init__(self):
    if self.last_heartbeat is None:
        self.last_heartbeat = datetime.now().isoformat()
    if self.metadata is None:
        self.metadata = {}

def to_dict(self) -> Dict[str, Any]:
    data = asdict(self)
    data['capabilities'] = [cap.to_dict() for cap in self.capabilities]
    return data

```

Step 1.2: Agent Registry and Discovery

The agent registry serves as the central directory for agent discovery and capability matching in an A2A network. It provides dynamic service discovery, health monitoring, and intelligent agent selection based on capabilities and availability.

1.2.1: Registry Foundation and Infrastructure Setup

First, let's establish the registry infrastructure with Redis for scalable, distributed storage: ```python

a2a/registry.py - Registry Foundation

import asyncio import json from typing import Dict, List, Optional, Set from datetime import datetime, timedelta import redis import logging from a2a.protocol import AgentProfile, AgentCapability, A2AMessage, MessageType logger = logging.getLogger(name) ```

Key Concepts: Redis provides fast, distributed storage for agent profiles and capability indices. The foundation supports both single-node and clustered Redis deployments for scalability.

1.2.2: Registry Class and Configuration Management

The registry class manages configuration and connection state for distributed agent tracking:

class AgentRegistry:
    """Centralized agent registry for discovery and coordination."""

    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.registry_prefix = "agent_registry:"
        self.capability_index = "capability_index:"
        self.heartbeat_interval = 30  # seconds
        self.heartbeat_timeout = 90   # seconds

        # Start background tasks
        self._heartbeat_task = None
        self._cleanup_task = None

Key Concepts: The registry uses prefixed keys for organization and capability indices for fast lookup. Heartbeat configuration ensures timely detection of agent failures.

1.2.3: Agent Registration with Capability Indexing

Agent registration creates both profile storage and capability indices for efficient discovery:

    async def register_agent(self, profile: AgentProfile) -> bool:
        """Register an agent in the registry."""
        try:
            # Store agent profile
            profile_key = f"{self.registry_prefix}{profile.agent_id}"
            profile_data = json.dumps(profile.to_dict())

            self.redis_client.set(profile_key, profile_data)
            self.redis_client.expire(profile_key, self.heartbeat_timeout)

            # Index capabilities for fast lookup
            for capability in profile.capabilities:
                cap_key = f"{self.capability_index}{capability.name}"
                self.redis_client.sadd(cap_key, profile.agent_id)
                self.redis_client.expire(cap_key, self.heartbeat_timeout)

            logger.info(f"Registered agent {profile.agent_id} with {len(profile.capabilities)} capabilities")
            return True

        except Exception as e:
            logger.error(f"Failed to register agent {profile.agent_id}: {e}")
            return False

Key Concepts: Registration creates both profile storage and reverse capability indices. TTL expiration ensures automatic cleanup of dead agents, while capability indexing enables fast discovery queries.

1.2.4: Agent Unregistration and Cleanup

Proper unregistration removes both profile data and capability index entries:

    async def unregister_agent(self, agent_id: str) -> bool:
        """Unregister an agent from the registry."""
        try:
            # Get agent profile first to clean up capability indices
            profile = await self.get_agent_profile(agent_id)

            if profile:
                # Remove from capability indices
                for capability in profile.capabilities:
                    cap_key = f"{self.capability_index}{capability.name}"
                    self.redis_client.srem(cap_key, agent_id)

            # Remove agent profile
            profile_key = f"{self.registry_prefix}{agent_id}"
            self.redis_client.delete(profile_key)

            logger.info(f"Unregistered agent {agent_id}")
            return True

        except Exception as e:
            logger.error(f"Failed to unregister agent {agent_id}: {e}")
            return False

Key Concepts: Unregistration requires cleanup of both profile data and capability indices to prevent orphaned references. This ensures accurate discovery results.

1.2.5: Heartbeat Updates and Status Management

Heartbeat updates maintain agent liveness information and current status:

    async def update_heartbeat(self, agent_id: str, load: float = None, 
                             status: str = None) -> bool:
        """Update agent heartbeat and status."""
        try:
            profile = await self.get_agent_profile(agent_id)
            if not profile:
                return False

            # Update profile with new information
            profile.last_heartbeat = datetime.now().isoformat()
            if load is not None:
                profile.load = load
            if status is not None:
                profile.status = status

            # Store updated profile
            profile_key = f"{self.registry_prefix}{agent_id}"
            profile_data = json.dumps(profile.to_dict())

            self.redis_client.set(profile_key, profile_data)
            self.redis_client.expire(profile_key, self.heartbeat_timeout)

            return True

        except Exception as e:
            logger.error(f"Failed to update heartbeat for {agent_id}: {e}")
            return False

Key Concepts: Heartbeats update both liveness timestamps and dynamic status information like current load. This enables intelligent load balancing and availability decisions.

1.2.6: Capability-Based Agent Discovery

Agent discovery uses capability indices for efficient matching of requirements to available agents:

    async def discover_agents(self, required_capabilities: List[str] = None,
                            status_filter: str = "active",
                            max_load: float = 0.8) -> List[AgentProfile]:
        """Discover agents matching the specified criteria."""
        try:
            candidate_agents = set()

            if required_capabilities:
                # Find agents with required capabilities
                for capability in required_capabilities:
                    cap_key = f"{self.capability_index}{capability}"
                    agents_with_cap = self.redis_client.smembers(cap_key)

                    if not candidate_agents:
                        candidate_agents = set(agents_with_cap)
                    else:
                        # Intersection - agents must have ALL required capabilities
                        candidate_agents = candidate_agents.intersection(agents_with_cap)
            else:
                # Get all registered agents
                pattern = f"{self.registry_prefix}*"
                keys = self.redis_client.keys(pattern)
                candidate_agents = {key.decode().split(':')[-1] for key in keys}

Key Concepts: Discovery uses set intersection to find agents with ALL required capabilities. Capability indices make this operation very fast even with many agents.

1.2.7: Agent Filtering and Selection

After capability matching, additional filters ensure only suitable agents are selected:

            # Filter by status and load
            matching_agents = []
            for agent_id in candidate_agents:
                profile = await self.get_agent_profile(agent_id.decode() if isinstance(agent_id, bytes) else agent_id)

                if profile:
                    # Check if agent meets criteria
                    if (profile.status == status_filter and 
                        profile.load <= max_load and
                        self._is_agent_alive(profile)):
                        matching_agents.append(profile)

            # Sort by load (prefer less loaded agents)
            matching_agents.sort(key=lambda x: x.load)

            return matching_agents

        except Exception as e:
            logger.error(f"Failed to discover agents: {e}")
            return []

Key Concepts: Multi-criteria filtering considers status, load, and liveness. Load-based sorting enables intelligent work distribution across available agents.

1.2.8: Profile Retrieval and Data Reconstruction

Profile retrieval reconstructs agent objects from stored JSON data:

    async def get_agent_profile(self, agent_id: str) -> Optional[AgentProfile]:
        """Get detailed profile for a specific agent."""
        try:
            profile_key = f"{self.registry_prefix}{agent_id}"
            profile_data = self.redis_client.get(profile_key)

            if profile_data:
                data = json.loads(profile_data)

                # Reconstruct AgentCapability objects
                capabilities = []
                for cap_data in data['capabilities']:
                    capabilities.append(AgentCapability(**cap_data))

                data['capabilities'] = capabilities
                return AgentProfile(**data)

            return None

        except Exception as e:
            logger.error(f"Failed to get profile for agent {agent_id}: {e}")
            return None

Key Concepts: Profile retrieval handles JSON deserialization and object reconstruction, ensuring type safety and proper data structure restoration.

1.2.9: Agent Liveness Detection

Liveness checking determines if agents are still responsive based on heartbeat timing:

    def _is_agent_alive(self, profile: AgentProfile) -> bool:
        """Check if agent is still alive based on heartbeat."""
        try:
            last_heartbeat = datetime.fromisoformat(profile.last_heartbeat)
            now = datetime.now()

            return (now - last_heartbeat).total_seconds() < self.heartbeat_timeout

        except Exception:
            return False

Key Concepts: Liveness detection uses configurable timeout thresholds to determine agent availability. This prevents routing to dead or unresponsive agents.

1.2.10: Dead Agent Cleanup and Maintenance

Automatic cleanup removes agents that have stopped sending heartbeats:

    async def cleanup_dead_agents(self):
        """Remove agents that haven't sent heartbeats."""
        try:
            pattern = f"{self.registry_prefix}*"
            keys = self.redis_client.keys(pattern)

            for key in keys:
                agent_id = key.decode().split(':')[-1]
                profile = await self.get_agent_profile(agent_id)

                if profile and not self._is_agent_alive(profile):
                    await self.unregister_agent(agent_id)
                    logger.info(f"Cleaned up dead agent: {agent_id}")

        except Exception as e:
            logger.error(f"Error during cleanup: {e}")

Key Concepts: Cleanup operations maintain registry hygiene by removing dead agents. This should run periodically to prevent accumulation of stale entries.

Complete Implementation: For advanced registry features including clustering and metrics, see src/session7/registry.py in the course materials.

Step 1.3: Message Router

The message router is the core component responsible for intelligent message routing between agents in an A2A network. Let's build it step by step, demonstrating the communication progression from discovery to coordination.

1.3.1: Router Foundation and Initialization

First, let's establish the basic router infrastructure with dependency management:

# a2a/router.py - Router Foundation

import asyncio
import json
from typing import Dict, List, Optional, Callable, Any
import aiohttp
import logging
from datetime import datetime, timedelta

from a2a.protocol import A2AMessage, MessageType, Priority
from a2a.registry import AgentRegistry

logger = logging.getLogger(__name__)

Key Concepts: This foundation imports all necessary components for asynchronous message handling, HTTP communication, and integration with our A2A protocol and registry systems.

1.3.2: Router Class Structure and State Management

Next, we define the router class with essential state management for tracking connections and pending operations:

class MessageRouter:
    """Routes messages between agents in an A2A network."""

    def __init__(self, registry: AgentRegistry):
        self.registry = registry
        self.message_handlers: Dict[str, Callable] = {}
        self.pending_requests: Dict[str, asyncio.Future] = {}
        self.message_queue: asyncio.Queue = asyncio.Queue()
        self.session: Optional[aiohttp.ClientSession] = None
        self._processor_task = None

Key Concepts: The router maintains several critical data structures: - message_handlers: Maps actions to their handling functions - pending_requests: Tracks ongoing request-response pairs using futures - message_queue: Asynchronous queue for message processing - session: HTTP session for agent communication

1.3.3: Router Lifecycle Management

Lifecycle management ensures proper startup and cleanup of the router's asynchronous components:

    async def start(self):
        """Start the message router."""
        self.session = aiohttp.ClientSession()
        self._processor_task = asyncio.create_task(self._process_messages())
        logger.info("Message router started")

    async def stop(self):
        """Stop the message router."""
        if self._processor_task:
            self._processor_task.cancel()

        if self.session:
            await self.session.close()

        logger.info("Message router stopped")

Key Concepts: Proper lifecycle management prevents resource leaks and ensures graceful shutdown. The router starts a background task for message processing and maintains an HTTP session pool.

1.3.4: Handler Registration for Action Mapping

Agent capabilities are exposed through registered handlers that map actions to executable functions:

    def register_handler(self, action: str, handler: Callable):
        """Register a message handler for a specific action."""
        self.message_handlers[action] = handler
        logger.info(f"Registered handler for action: {action}")

Key Concepts: This registration system allows agents to expose their capabilities dynamically. When an agent receives a message with a specific action, the router can invoke the appropriate handler.

1.3.5: Direct Message Sending with Response Handling

The core message sending functionality handles both fire-and-forget and request-response patterns:

    async def send_message(self, message: A2AMessage, 
                         wait_for_response: bool = None) -> Optional[A2AMessage]:
        """Send a message to another agent."""

        if wait_for_response is None:
            wait_for_response = message.requires_response

        try:
            # Add to message queue for processing
            await self.message_queue.put(message)

            # If response is required, wait for it
            if wait_for_response:
                future = asyncio.Future()
                self.pending_requests[message.message_id] = future

                try:
                    response = await asyncio.wait_for(future, timeout=message.timeout)
                    return response
                except asyncio.TimeoutError:
                    logger.warning(f"Message {message.message_id} timed out")
                    return None
                finally:
                    self.pending_requests.pop(message.message_id, None)

            return None

        except Exception as e:
            logger.error(f"Failed to send message: {e}")
            return None

Key Concepts: This method demonstrates the request-response pattern in A2A communication. It uses asyncio.Future to handle asynchronous responses and implements timeout handling for reliability.

1.3.6: Broadcast Messaging for Group Coordination

Broadcast functionality enables one-to-many communication for coordinating multiple agents:

    async def broadcast_message(self, message: A2AMessage, 
                              capability_filter: List[str] = None) -> int:
        """Broadcast a message to multiple agents."""

        # Discover target agents
        agents = await self.registry.discover_agents(
            required_capabilities=capability_filter
        )

        sent_count = 0
        for agent in agents:
            # Create individual message for each agent
            agent_message = A2AMessage(
                message_type=MessageType.BROADCAST,
                sender_id=message.sender_id,
                recipient_id=agent.agent_id,
                action=message.action,
                payload=message.payload,
                priority=message.priority
            )

            try:
                await self._route_message(agent_message)
                sent_count += 1
            except Exception as e:
                logger.error(f"Failed to send broadcast to {agent.agent_id}: {e}")

        logger.info(f"Broadcast sent to {sent_count} agents")
        return sent_count

Key Concepts: Broadcasting uses agent discovery to find suitable recipients based on capabilities. Each agent receives an individual message, allowing for personalized communication while maintaining broadcast semantics.

1.3.7: Asynchronous Message Processing Queue

The message processing queue ensures all messages are handled efficiently without blocking:

    async def _process_messages(self):
        """Process messages from the queue."""
        while True:
            try:
                message = await self.message_queue.get()
                await self._route_message(message)
                self.message_queue.task_done()

            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error processing message: {e}")

Key Concepts: This background task continuously processes messages from the queue, ensuring non-blocking operation. Error handling prevents one failed message from stopping the entire router.

1.3.8: Intelligent Message Routing Logic

The routing logic determines the best agent to handle each message based on capabilities and availability:

    async def _route_message(self, message: A2AMessage):
        """Route a message to its destination."""

        if message.recipient_id:
            # Direct message to specific agent
            await self._send_to_agent(message, message.recipient_id)
        else:
            # Discover suitable agents
            agents = await self.registry.discover_agents(
                required_capabilities=message.capabilities_required
            )

            if agents:
                # Send to the best available agent (lowest load)
                await self._send_to_agent(message, agents[0].agent_id)
            else:
                logger.warning(f"No suitable agents found for message {message.message_id}")

Key Concepts: Routing implements both direct (recipient specified) and capability-based routing. The system automatically selects the best available agent when no specific recipient is provided.

1.3.9: Agent Communication and Error Handling

Actual agent communication uses HTTP to deliver messages and handle responses:

    async def _send_to_agent(self, message: A2AMessage, agent_id: str):
        """Send message to a specific agent."""

        # Get agent endpoint
        profile = await self.registry.get_agent_profile(agent_id)
        if not profile:
            logger.error(f"Agent {agent_id} not found in registry")
            return

        try:
            # Send HTTP request to agent endpoint
            async with self.session.post(
                f"{profile.endpoint}/a2a/message",
                json=message.to_dict(),
                headers={"Content-Type": "application/json"}
            ) as response:

                if response.status == 200:
                    response_data = await response.json()

                    # Handle response if this was a request
                    if message.requires_response:
                        response_message = A2AMessage.from_dict(response_data)
                        await self._handle_response(response_message)
                else:
                    logger.error(f"Failed to send message to {agent_id}: {response.status}")

        except Exception as e:
            logger.error(f"Error sending message to {agent_id}: {e}")

Key Concepts: HTTP-based communication allows agents to run on different machines. The method handles network errors gracefully and processes responses for request-response patterns.

1.3.10: Response Correlation and Future Resolution

Response handling links incoming responses to their original requests using correlation IDs:

    async def _handle_response(self, response: A2AMessage):
        """Handle a response message."""

        # Check if we have a pending request for this response
        correlation_id = response.correlation_id

        if correlation_id in self.pending_requests:
            future = self.pending_requests[correlation_id]
            if not future.done():
                future.set_result(response)

Key Concepts: Correlation IDs enable asynchronous request-response patterns. When a response arrives, the router uses the correlation ID to complete the corresponding Future, unblocking the waiting sender.

1.3.11: Incoming Message Processing and Action Dispatch

Incoming messages are processed through registered handlers, enabling agents to respond to requests:

    async def handle_incoming_message(self, message_data: Dict[str, Any]) -> Dict[str, Any]:
        """Handle an incoming message from another agent."""

        try:
            message = A2AMessage.from_dict(message_data)

            # Check if this is a response to our request
            if message.message_type == MessageType.RESPONSE:
                await self._handle_response(message)
                return {"status": "response_processed"}

            # Handle the message using registered handlers
            if message.action in self.message_handlers:
                handler = self.message_handlers[message.action]
                result = await handler(message)

                # Send response if required
                if message.requires_response:
                    response = A2AMessage(
                        message_type=MessageType.RESPONSE,
                        correlation_id=message.message_id,
                        sender_id=message.recipient_id,
                        recipient_id=message.sender_id,
                        payload=result
                    )

                    return response.to_dict()

                return {"status": "message_processed"}
            else:
                logger.warning(f"No handler for action: {message.action}")
                return {"error": f"No handler for action: {message.action}"}

        except Exception as e:
            logger.error(f"Error handling incoming message: {e}")
            return {"error": str(e)}

Key Concepts: This method completes the communication cycle by processing incoming messages, dispatching them to appropriate handlers, and generating responses when required. The message type determines whether to process as a new request or as a response to an existing request.

Complete Implementation: For the complete router implementation, see src/session7/router.py in the course materials.


Part 2: Multi-Agent Coordination Patterns (25 minutes)

Step 2.1: Orchestration Pattern

Orchestration provides centralized control over multi-agent workflows, enabling complex coordination patterns where a central orchestrator manages the execution sequence, dependencies, and data flow between agents. Let's build a comprehensive orchestration system step by step.

2.1.1: Workflow Data Structures and Foundation

First, let's define the core data structures that represent workflow steps and overall workflows:

# a2a/orchestrator.py - Foundation and Data Structures

import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import logging

from a2a.protocol import A2AMessage, MessageType, Priority
from a2a.router import MessageRouter
from a2a.registry import AgentRegistry

logger = logging.getLogger(__name__)

Key Concepts: The foundation imports support asynchronous orchestration with proper logging, type hints, and integration with our A2A communication components.

2.1.2: Workflow Step Definition

Each workflow step represents a unit of work that requires specific agent capabilities:

@dataclass
class WorkflowStep:
    """Represents a step in a workflow."""
    step_id: str
    action: str
    agent_capability: str
    input_mapping: Dict[str, str]  # Map workflow data to step input
    output_mapping: Dict[str, str] # Map step output to workflow data
    dependencies: List[str] = None # Other steps that must complete first
    timeout: int = 30
    retry_count: int = 2

    def __post_init__(self):
        if self.dependencies is None:
            self.dependencies = []

Key Concepts: Workflow steps define the mapping between workflow data and agent inputs/outputs, enabling data flow through the workflow. Dependencies ensure proper execution order.

2.1.3: Workflow Definition Structure

A workflow combines multiple steps with their dependencies and initial data:

@dataclass
class Workflow:
    """Defines a multi-agent workflow."""
    workflow_id: str
    name: str
    description: str
    steps: List[WorkflowStep]
    initial_data: Dict[str, Any] = None

    def __post_init__(self):
        if self.initial_data is None:
            self.initial_data = {}

Key Concepts: Workflows encapsulate the complete definition of a multi-agent process, including metadata and initial context data that flows through the execution.

2.1.4: Orchestrator Class Initialization

The orchestrator manages workflow execution with access to the agent registry and message router:

class WorkflowOrchestrator:
    """Orchestrates multi-agent workflows."""

    def __init__(self, router: MessageRouter, registry: AgentRegistry):
        self.router = router
        self.registry = registry
        self.active_workflows: Dict[str, Dict] = {}

Key Concepts: The orchestrator serves as the central coordination point, maintaining state for active workflows and using the router and registry for agent communication and discovery.

2.1.5: Workflow State Initialization

When starting a workflow, we establish comprehensive state tracking:

    async def execute_workflow(self, workflow: Workflow) -> Dict[str, Any]:
        """Execute a multi-agent workflow."""

        workflow_state = {
            "workflow_id": workflow.workflow_id,
            "status": "running",
            "started_at": datetime.now().isoformat(),
            "data": workflow.initial_data.copy(),
            "completed_steps": set(),
            "failed_steps": set(),
            "step_outputs": {}
        }

        self.active_workflows[workflow.workflow_id] = workflow_state

Key Concepts: Workflow state tracks execution progress, data transformations, and step completion status. This enables recovery, monitoring, and debugging of complex workflows.

2.1.6: Dependency Resolution and Step Selection

The orchestrator continuously evaluates which steps can execute based on dependency satisfaction:

        try:
            # Execute steps based on dependencies
            remaining_steps = workflow.steps.copy()

            while remaining_steps:
                # Find steps that can be executed (all dependencies met)
                ready_steps = [
                    step for step in remaining_steps
                    if all(dep in workflow_state["completed_steps"] 
                          for dep in step.dependencies)
                ]

                if not ready_steps:
                    # Check if we have failed steps blocking progress
                    if workflow_state["failed_steps"]:
                        workflow_state["status"] = "failed"
                        break

                    # Wait for more steps to complete
                    await asyncio.sleep(1)
                    continue

Key Concepts: Dependency resolution ensures proper execution order while enabling parallel execution of independent steps. Failed step detection prevents infinite waiting.

2.1.7: Parallel Step Execution and Result Gathering

Ready steps execute in parallel for optimal performance:

                # Execute ready steps in parallel
                tasks = []
                for step in ready_steps:
                    task = asyncio.create_task(
                        self._execute_step(workflow, step, workflow_state)
                    )
                    tasks.append(task)

                # Wait for all ready steps to complete
                results = await asyncio.gather(*tasks, return_exceptions=True)

Key Concepts: Parallel execution maximizes throughput while maintaining dependency constraints. Exception handling ensures one failed step doesn't crash the entire workflow.

2.1.8: Result Processing and State Management

Step results update workflow state and enable data flow to dependent steps:

                # Process results
                for i, result in enumerate(results):
                    step = ready_steps[i]

                    if isinstance(result, Exception):
                        logger.error(f"Step {step.step_id} failed: {result}")
                        workflow_state["failed_steps"].add(step.step_id)
                    else:
                        workflow_state["completed_steps"].add(step.step_id)
                        workflow_state["step_outputs"][step.step_id] = result

                        # Apply output mapping to workflow data
                        self._apply_output_mapping(step, result, workflow_state["data"])

                # Remove completed and failed steps
                remaining_steps = [
                    step for step in remaining_steps
                    if step.step_id not in workflow_state["completed_steps"] 
                    and step.step_id not in workflow_state["failed_steps"]
                ]

Key Concepts: Result processing updates workflow state, applies output mappings for data flow, and removes completed steps from the execution queue.

2.1.9: Workflow Completion and Status Determination

Final workflow status reflects the overall execution outcome:

            # Determine final status
            if workflow_state["failed_steps"]:
                workflow_state["status"] = "failed"
            else:
                workflow_state["status"] = "completed"

            workflow_state["completed_at"] = datetime.now().isoformat()

            return {
                "workflow_id": workflow.workflow_id,
                "status": workflow_state["status"],
                "data": workflow_state["data"],
                "execution_time": workflow_state["completed_at"],
                "completed_steps": len(workflow_state["completed_steps"]),
                "failed_steps": len(workflow_state["failed_steps"])
            }

Key Concepts: Workflow completion provides comprehensive execution results including final data state, timing information, and success metrics.

2.1.10: Error Handling and Workflow Cleanup

Proper error handling ensures graceful failure recovery:

        except Exception as e:
            logger.error(f"Workflow {workflow.workflow_id} execution failed: {e}")
            workflow_state["status"] = "error"
            workflow_state["error"] = str(e)

            return {
                "workflow_id": workflow.workflow_id,
                "status": "error",
                "error": str(e)
            }

        finally:
            # Cleanup
            self.active_workflows.pop(workflow.workflow_id, None)

Key Concepts: Exception handling captures unexpected errors while cleanup ensures no resource leaks from abandoned workflows.

2.1.11: Individual Step Execution with Agent Discovery

Each step executes by discovering suitable agents and delegating work:

    async def _execute_step(self, workflow: Workflow, step: WorkflowStep, 
                          workflow_state: Dict) -> Any:
        """Execute a single workflow step."""

        # Prepare step input data
        step_input = self._prepare_step_input(step, workflow_state["data"])

        # Find suitable agent
        agents = await self.registry.discover_agents(
            required_capabilities=[step.agent_capability]
        )

        if not agents:
            raise Exception(f"No agents found with capability: {step.agent_capability}")

Key Concepts: Step execution begins with input preparation and agent discovery, ensuring the right agent handles each task based on required capabilities.

2.1.12: Step Execution with Retry Logic and Fault Tolerance

Robust step execution includes retry mechanisms and error handling:

        # Try executing with retries
        last_error = None
        for attempt in range(step.retry_count + 1):
            try:
                # Create message for agent
                message = A2AMessage(
                    sender_id="orchestrator",
                    recipient_id=agents[0].agent_id,  # Use best available agent
                    action=step.action,
                    payload=step_input,
                    requires_response=True,
                    timeout=step.timeout,
                    priority=Priority.HIGH
                )

                # Send message and wait for response
                response = await self.router.send_message(message, wait_for_response=True)

                if response and response.payload:
                    logger.info(f"Step {step.step_id} completed successfully")
                    return response.payload
                else:
                    raise Exception("No response received from agent")

            except Exception as e:
                last_error = e
                logger.warning(f"Step {step.step_id} attempt {attempt + 1} failed: {e}")

                if attempt < step.retry_count:
                    # Try with next available agent if available
                    if len(agents) > attempt + 1:
                        continue
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff

        raise last_error

Key Concepts: Retry logic with exponential backoff and agent failover provides resilience against transient failures and agent unavailability.

2.1.13: Data Mapping and Transformation Utilities

Helper methods handle data flow between workflow context and step inputs/outputs:

    def _prepare_step_input(self, step: WorkflowStep, workflow_data: Dict[str, Any]) -> Dict[str, Any]:
        """Prepare input data for a workflow step."""
        step_input = {}

        for step_param, workflow_key in step.input_mapping.items():
            if workflow_key in workflow_data:
                step_input[step_param] = workflow_data[workflow_key]

        return step_input

    def _apply_output_mapping(self, step: WorkflowStep, step_output: Dict[str, Any], 
                            workflow_data: Dict[str, Any]):
        """Apply step output to workflow data."""

        for workflow_key, step_param in step.output_mapping.items():
            if step_param in step_output:
                workflow_data[workflow_key] = step_output[step_param]

Key Concepts: Data mapping enables seamless data flow through the workflow by transforming between step-specific formats and the shared workflow context.

Complete Implementation: For complete orchestrator examples and advanced patterns, see src/session7/orchestrator.py in the course materials.

Step 2.2: Choreography Pattern

Choreography enables decentralized agent coordination through event-driven patterns, where agents react to events autonomously without central control. This creates flexible, loosely-coupled systems that can adapt dynamically to changing conditions.

2.2.1: Choreography Foundation and Event Structures

First, let's establish the foundation for event-driven coordination:

# a2a/choreography.py - Foundation

import asyncio
from typing import Dict, List, Any, Callable, Optional
from dataclasses import dataclass
from datetime import datetime
import logging

from a2a.protocol import A2AMessage, MessageType
from a2a.router import MessageRouter

logger = logging.getLogger(__name__)

Key Concepts: Choreography relies on event-driven architecture where agents publish events and subscribe to patterns, creating reactive coordination without central orchestration.

2.2.2: Event Pattern Definition

Event patterns define the triggers and actions that enable autonomous agent coordination:

@dataclass
class EventPattern:
    """Defines an event pattern that triggers agent actions."""
    event_type: str
    condition: str              # Python expression to evaluate
    action: str                 # Action to perform when pattern matches
    target_capability: str      # Required capability for handling agent
    priority: int = 1           # Pattern priority (higher = more important)

Key Concepts: Event patterns map event types to actions through conditional logic. Priority ordering ensures important events are processed first during high-load scenarios.

2.2.3: Choreography Engine Initialization

The choreography engine manages event patterns, handlers, and event history:

class ChoreographyEngine:
    """Event-driven choreography engine for agent coordination."""

    def __init__(self, router: MessageRouter):
        self.router = router
        self.event_patterns: List[EventPattern] = []
        self.event_handlers: Dict[str, Callable] = {}
        self.event_history: List[Dict] = []
        self.max_history = 1000

        # Register default message handler
        self.router.register_handler("choreography_event", self._handle_choreography_event)

Key Concepts: The engine maintains event patterns for triggering actions, handlers for processing events, and history for context-aware decision making.

2.2.4: Event Pattern Registration and Management

Pattern registration enables dynamic configuration of choreography behaviors:

    def register_event_pattern(self, pattern: EventPattern):
        """Register an event pattern for choreography."""
        self.event_patterns.append(pattern)
        # Sort by priority (higher priority first)
        self.event_patterns.sort(key=lambda x: x.priority, reverse=True)
        logger.info(f"Registered event pattern: {pattern.event_type}")

    def register_event_handler(self, event_type: str, handler: Callable):
        """Register a handler for specific event types."""
        self.event_handlers[event_type] = handler
        logger.info(f"Registered event handler: {event_type}")

Key Concepts: Pattern registration supports dynamic choreography reconfiguration. Priority-based sorting ensures critical patterns are evaluated first.

2.2.5: Event Publishing and History Management

Event publishing triggers the choreography process and maintains context history:

    async def publish_event(self, event_type: str, event_data: Dict[str, Any], 
                          source_agent: str = None):
        """Publish an event that may trigger choreographed actions."""

        event = {
            "event_id": f"evt_{int(datetime.now().timestamp() * 1000)}",
            "event_type": event_type,
            "timestamp": datetime.now().isoformat(),
            "source_agent": source_agent,
            "data": event_data
        }

        # Add to event history
        self.event_history.append(event)
        if len(self.event_history) > self.max_history:
            self.event_history.pop(0)

        logger.info(f"Published event: {event_type} from {source_agent}")

        # Check event patterns and trigger actions
        await self._process_event_patterns(event)

Key Concepts: Event publishing creates structured event records with unique IDs and timestamps. History management enables pattern matching based on event sequences.

2.2.6: Event Pattern Processing and Matching

Pattern processing evaluates events against registered patterns to determine triggered actions:

    async def _process_event_patterns(self, event: Dict[str, Any]):
        """Process event against registered patterns."""

        triggered_actions = []

        for pattern in self.event_patterns:
            if pattern.event_type == event["event_type"] or pattern.event_type == "*":
                # Evaluate condition
                if self._evaluate_condition(pattern.condition, event):
                    triggered_actions.append(pattern)

        # Execute triggered actions
        for pattern in triggered_actions:
            await self._execute_choreography_action(pattern, event)

Key Concepts: Pattern matching supports both specific event types and wildcard patterns. Condition evaluation enables sophisticated trigger logic based on event content.

2.2.7: Conditional Logic Evaluation

Condition evaluation determines whether patterns should trigger based on event content and context:

    def _evaluate_condition(self, condition: str, event: Dict[str, Any]) -> bool:
        """Evaluate a condition expression against event data."""

        if not condition or condition == "true":
            return True

        try:
            # Create evaluation context
            context = {
                "event": event,
                "data": event["data"],
                "source": event.get("source_agent"),
                "recent_events": self.event_history[-10:]  # Last 10 events
            }

            # Evaluate condition (be careful with eval in production!)
            result = eval(condition, {"__builtins__": {}}, context)
            return bool(result)

        except Exception as e:
            logger.warning(f"Failed to evaluate condition '{condition}': {e}")
            return False

Key Concepts: Condition evaluation provides a rich context including current event data, source agent, and recent event history. Safe evaluation prevents malicious code execution.

2.2.8: Choreography Action Execution

Action execution translates triggered patterns into concrete agent messages:

    async def _execute_choreography_action(self, pattern: EventPattern, event: Dict[str, Any]):
        """Execute a choreography action triggered by an event pattern."""

        try:
            # Create action message
            action_message = A2AMessage(
                sender_id="choreography_engine",
                action=pattern.action,
                payload={
                    "trigger_event": event,
                    "pattern": {
                        "event_type": pattern.event_type,
                        "action": pattern.action
                    }
                },
                capabilities_required=[pattern.target_capability],
                requires_response=False  # Fire and forget for choreography
            )

            # Send to appropriate agent(s)
            await self.router.send_message(action_message)

            logger.info(f"Executed choreography action: {pattern.action}")

        except Exception as e:
            logger.error(f"Failed to execute choreography action {pattern.action}: {e}")

Key Concepts: Action execution creates A2A messages with trigger context. Fire-and-forget messaging supports asynchronous, decoupled coordination.

2.2.9: Incoming Event Message Handling

Event message handling processes choreography events received from other agents:

    async def _handle_choreography_event(self, message: A2AMessage) -> Dict[str, Any]:
        """Handle incoming choreography event messages."""

        event_type = message.payload.get("event_type")
        event_data = message.payload.get("event_data", {})

        if event_type and event_type in self.event_handlers:
            handler = self.event_handlers[event_type]
            result = await handler(message)
            return result

        # Default handling - just acknowledge
        return {"status": "event_received", "event_type": event_type}

Key Concepts: Message handling enables agents to receive and process choreography events from other agents, supporting distributed event-driven coordination.

2.2.10: Practical Choreography Example

Let's see a complete choreography pattern for travel planning coordination:

# Example choreography patterns

def create_travel_planning_choreography() -> List[EventPattern]:
    """Create choreography patterns for travel planning scenario."""

    return [
        EventPattern(
            event_type="trip_requested",
            condition="data.get('destination') is not None",
            action="get_weather_forecast",
            target_capability="weather_forecasting",
            priority=5
        ),
        EventPattern(
            event_type="weather_obtained",
            condition="data.get('temperature') is not None",
            action="find_accommodations", 
            target_capability="accommodation_search",
            priority=4
        ),
        EventPattern(
            event_type="accommodations_found",
            condition="len(data.get('hotels', [])) > 0",
            action="book_transportation",
            target_capability="transportation_booking",
            priority=3
        ),
        EventPattern(
            event_type="transportation_booked",
            condition="data.get('booking_confirmed') == True",
            action="create_itinerary",
            target_capability="itinerary_planning",
            priority=2
        ),
        EventPattern(
            event_type="itinerary_ready",
            condition="data.get('itinerary') is not None",
            action="send_confirmation",
            target_capability="notification_service",
            priority=1
        )
    ]

Key Concepts: This example demonstrates a complete travel planning workflow where each event triggers the next step. Agents coordinate autonomously based on event-driven patterns, creating a flexible, decentralized process.

Complete Implementation: For advanced choreography patterns and complex event processing, see src/session7/choreography.py in the course materials.


Chapter Summary

Congratulations! You've built a comprehensive A2A communication system with the following capabilities:

Communication Infrastructure Implemented

Protocol Layer

  • Standardized messaging with A2AMessage protocol
  • Agent profiles with capability descriptions
  • Message routing with priority and timeout handling
  • Service discovery through centralized registry

Coordination Patterns

  • Orchestration for centralized workflow management
  • Choreography for event-driven agent coordination
  • Multi-step workflows with dependency management
  • Fault tolerance with retries and error handling

Distributed Architecture

  • Agent registry with heartbeat monitoring
  • Message broadcasting for group coordination
  • Load balancing based on agent availability
  • Event patterns for reactive behavior

Testing Your Understanding

Quick Check Questions

  1. What is the primary advantage of A2A communication over direct API calls?
  2. A) Better performance
  3. B) Dynamic discovery and loose coupling
  4. C) Simpler implementation
  5. D) Lower latency

  6. In orchestration patterns, who controls the workflow execution?

  7. A) Each individual agent
  8. B) The first agent in the chain
  9. C) A central orchestrator
  10. D) The user interface

  11. What triggers actions in choreography patterns?

  12. A) Direct commands
  13. B) Scheduled timers
  14. C) Published events
  15. D) User requests

  16. How does agent discovery work in the A2A system?

  17. A) Fixed configuration files
  18. B) Capability-based matching in registry
  19. C) Random selection
  20. D) First-come-first-served

  21. What is the purpose of message correlation IDs?

  22. A) Security authentication
  23. B) Performance optimization
  24. C) Linking requests with responses
  25. D) Message encryption

Practical Exercise

Create a multi-agent customer service system:

class CustomerServiceChoreography:
    """Multi-agent customer service with A2A coordination."""

    def __init__(self, choreography_engine: ChoreographyEngine):
        self.engine = choreography_engine
        self._setup_patterns()

    def _setup_patterns(self):
        """Set up choreography patterns for customer service."""

        patterns = [
            # TODO: Define patterns for:
            # 1. Customer inquiry received -> Route to appropriate agent
            # 2. Technical issue detected -> Escalate to technical support
            # 3. Resolution provided -> Update customer and close ticket
            # 4. Escalation needed -> Notify supervisor agent
        ]

        for pattern in patterns:
            self.engine.register_event_pattern(pattern)

Hint: Check the 🗂️ View Test Solutions → for complete implementations and advanced patterns.


Next Session Preview

In Session 8, we'll explore Advanced Agent Workflows including: - Complex multi-agent orchestration patterns - Agent specialization and role-based coordination
- Performance optimization for large agent networks - Monitoring and observability for distributed agent systems

Homework

  1. Implement peer-to-peer agent discovery without centralized registry
  2. Create conflict resolution mechanisms for competing agent requests
  3. Add message encryption for secure agent communication
  4. Build agent load balancing with health-based routing

Test Your Knowledge

Ready to test your understanding of Agent-to-Agent Communication? Take our comprehensive multiple-choice test to verify your mastery of the concepts.

Multiple Choice Test - Session 7

Test your understanding of Agent-to-Agent Communication:

Question 1: What is the primary purpose of Agent-to-Agent (A2A) communication?

A) To reduce computational costs
B) To improve individual agent performance
C) To enable multiple agents to collaborate and coordinate actions
D) To replace human operators

Question 2: Which message type is used for finding agents with specific capabilities?

A) RESPONSE
B) HEARTBEAT
C) DISCOVERY
D) REQUEST

Question 3: What information is essential for proper A2A message routing?

A) Just the timestamp
B) Only the priority level
C) Only the message content
D) Sender ID, recipient ID, and message type

Question 4: What is the difference between orchestration and choreography in multi-agent systems?

A) Orchestration uses centralized control, choreography uses distributed coordination
B) Choreography requires more memory
C) There is no difference
D) Orchestration is faster than choreography

Question 5: How do agents announce their capabilities in an A2A system?

A) Using ANNOUNCEMENT messages with capability metadata
B) Through manual configuration
C) Via external databases only
D) Through file-based configurations

Question 6: What mechanism ensures A2A communication reliability when agents become unavailable?

A) Faster processing
B) Increased memory allocation
C) Message queuing with retry logic and timeouts
D) Multiple network interfaces

Question 7: What is the purpose of capability negotiation in A2A systems?

A) To improve performance
B) To match agent capabilities with task requirements
C) To simplify configuration
D) To reduce costs

Question 8: When should URGENT priority be used for A2A messages?

A) For time-critical operations requiring immediate attention
B) For data backup operations
C) For routine status updates
D) For all important messages

Question 9: What is the purpose of correlation IDs in A2A messaging?

A) To validate message integrity
B) To encrypt messages
C) To compress message content
D) To link related messages in multi-step workflows

Question 10: What is a key benefit of collaborative agent teams in A2A systems?

A) Diverse expertise and parallel problem-solving capabilities
B) Lower computational requirements
C) Reduced network traffic
D) Simpler implementation

🗂️ View Test Solutions →


Additional Resources

Remember: Great multi-agent systems balance autonomy with coordination, enabling agents to work together while maintaining their independence! 🤖🤝🤖