🎯📝⚙️ Session 7: Agent-to-Agent Communication Hub¶
The Grand Alliance: When Digital Minds Must Unite¶
Picture this: You're witnessing the formation of the United Nations, but instead of human diplomats around a conference table, you're watching AI agents from different organizations, platforms, and purposes coming together for the first time. Each agent speaks its own technical dialect, operates under different protocols, and serves different masters—yet they must find a way to collaborate on challenges too complex for any single mind to solve.
This is the reality of modern AI systems. Your weather prediction agent in the cloud needs to coordinate with a logistics agent on a factory floor. Your data analysis agent running in a hospital must collaborate with research agents in laboratories across the world. Your financial trading agent has to work with risk assessment agents, market data agents, and compliance agents in real-time harmony.
Welcome to Agent-to-Agent (A2A) Communication—where artificial minds learn the art of diplomacy, cooperation, and collective intelligence.
A2A Communication Overview
The following diagram outlines the main classes involved in A2A.
A2A Architecture Overview
🎯📝⚙️ Learning Path Overview¶
This session offers three distinct learning paths designed to match your goals and time investment:
Focus: Understanding concepts and architecture
Activities: Core A2A communication principles, digital diplomacy, embassy system basics
Ideal for: Decision makers, architects, overview learners
Focus: Guided implementation and analysis
Activities: Implement basic A2A systems, embassy networks, message routing
Ideal for: Developers, technical leads, hands-on learners
Focus: Complete implementation and customization
Activities: Enterprise A2A systems, advanced orchestration, choreography patterns
Ideal for: Senior engineers, architects, specialists
🎯 Part 1: The Universal Language of Digital Diplomacy¶
Before agents can collaborate, they need to establish diplomatic protocols—a universal language that transcends the boundaries of individual platforms, organizations, and technical implementations.
The Five Pillars of A2A Civilization¶
Think of A2A as the constitution for a society of digital minds:
- Agent Cards: Like diplomatic credentials, these JSON documents announce what each agent can do, where to find them, and how to work with them
- Task Management: A formal lifecycle that tracks every collaboration from initial request to final resolution
- Message Exchange: JSON-RPC 2.0 over HTTP(S)—the diplomatic language that any agent can speak and understand
- Authentication: Enterprise-grade security that ensures only authorized agents can participate in sensitive collaborations
- Distributed Coordination: The ability to track complex, multi-step collaborations across organizational boundaries
The DNA of Agent Diplomacy: Message Structure¶
Every conversation between agents follows a carefully designed diplomatic protocol. Like formal diplomatic correspondence, each message contains essential metadata for proper routing, processing, and response.
The A2A protocol defines essential data structures for agent communication:
# Core A2A imports and enums
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
import uuid
These imports establish the foundation for type-safe A2A communication with proper data validation.
# Task lifecycle states
class TaskState(Enum):
SUBMITTED = "submitted"
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
Task states provide clear lifecycle management for complex multi-agent workflows.
# Message priority levels
class MessagePriority(Enum):
LOW = "low"
NORMAL = "normal"
HIGH = "high"
URGENT = "urgent"
Priority levels enable proper message scheduling and resource allocation in busy agent networks.
# A2A Task data structure
@dataclass
class A2ATask:
# Core identification
task_id: str
session_id: str = None
correlation_id: str = None
# Lifecycle management
state: TaskState = TaskState.SUBMITTED
created_at: datetime = None
updated_at: datetime = None
The task structure provides comprehensive tracking and correlation capabilities for distributed agent operations.
Digital Passports: Agent Card Implementation¶
Each agent in the A2A ecosystem carries a digital passport—an Agent Card that serves as both identification and capability advertisement.
Agent Cards serve as digital passports in the A2A ecosystem:
# Agent Card structure - Part 1: Identity
@dataclass
class AgentCard:
# Core agent identity
agent_id: str
name: str
version: str
description: str
The identity section provides essential information for agent discovery and selection.
# Capability advertisement
capabilities: List[str]
endpoints: Dict[str, str]
supported_protocols: List[str] = None
Capability information enables intelligent agent matching for complex tasks.
# Security and operational limits
authentication: Dict[str, Any] = None
rate_limits: Dict[str, Any] = None
availability: str = "24/7"
response_time_sla: str = "<1s"
max_concurrent_tasks: int = 10
Operational metadata ensures reliable service level agreements between collaborating agents.
def to_json(self) -> str:
"""Export Agent Card as JSON."""
return json.dumps(asdict(self),
indent=2, default=str)
JSON serialization enables cross-platform agent card exchange in distributed systems.
This Agent Card is like a business card, resume, and service contract all rolled into one. Other agents can examine it to understand not just what this agent can do, but how to work with it professionally.
📝 Part 2: The Embassy System: Agent Registry and Discovery¶
Prerequisites: Complete 🎯 Observer Path sections above
Imagine a world where every embassy maintains a comprehensive directory of all diplomatic personnel from every nation. That's exactly what the Agent Registry provides—a living, breathing directory where agents announce their presence, capabilities, and availability.
The Heart of Digital Diplomacy¶
The registry foundation requires several key components:
# Registry foundation imports
import asyncio
import json
from typing import Dict, List, Optional, Set
from datetime import datetime, timedelta
import redis
import logging
These imports provide the infrastructure for distributed agent registration and discovery.
# Core registry configuration
class AgentRegistry:
def __init__(self, redis_client):
self.redis_client = redis_client
self.registry_prefix = "agent_registry:"
self.capability_index = "capability_index:"
self.heartbeat_interval = 30 # seconds
self.heartbeat_timeout = 90 # seconds
Redis provides the backbone of our embassy system—fast, distributed storage that can scale from a single server to a global network of interconnected registries. The registry uses a sophisticated indexing system with capability indices—specialized phone books for finding agents with specific skills.
The Registration Ceremony: Joining the Digital Society¶
When an agent joins the A2A network, it goes through a formal registration process, much like a diplomat presenting credentials at a new embassy:
async def register_agent(self, profile: AgentProfile) -> bool:
"""Register an agent in the registry."""
try:
# Store agent profile with expiration
profile_key = f"{self.registry_prefix}{profile.agent_id}"
profile_data = json.dumps(profile.to_dict())
self.redis_client.set(profile_key, profile_data)
self.redis_client.expire(profile_key, self.heartbeat_timeout)
Registration stores the complete agent profile with automatic expiration to prevent stale entries.
# Index capabilities for fast lookup
for capability in profile.capabilities:
cap_key = f"{self.capability_index}{capability.name}"
self.redis_client.sadd(cap_key, profile.agent_id)
self.redis_client.expire(cap_key, self.heartbeat_timeout)
return True
except Exception as e:
logger.error(f"Failed to register {profile.agent_id}: {e}")
return False
Capability indexing enables instant discovery of agents with specific skills, essential for large-scale deployments.
Graceful Departures: The Unregistration Process¶
When an agent needs to leave the network—whether for maintenance, shutdown, or migration—it can formally announce its departure:
async def unregister_agent(self, agent_id: str) -> bool:
"""Unregister an agent from the registry."""
try:
# Get agent profile first
profile = await self.get_agent_profile(agent_id)
if profile:
# Remove from capability indices
for capability in profile.capabilities:
cap_key = f"{self.capability_index}{capability.name}"
self.redis_client.srem(cap_key, agent_id)
Graceful unregistration maintains registry consistency by cleaning up all associated indices.
The Pulse of Digital Life: Heartbeat Management¶
Like embassies maintaining contact with their home countries, agents must regularly send heartbeats to prove they're still operational:
async def update_heartbeat(self, agent_id: str, load: float = None) -> bool:
"""Update agent heartbeat and status."""
try:
profile = await self.get_agent_profile(agent_id)
if not profile:
return False
# Update profile with current status
profile.last_heartbeat = datetime.now().isoformat()
if load is not None:
profile.load = load
# Store updated profile
profile_key = f"{self.registry_prefix}{agent_id}"
self.redis_client.set(profile_key,
json.dumps(profile.to_dict()))
return True
except Exception as e:
logger.error(f"Failed to update heartbeat: {e}")
return False
Heartbeat updates maintain real-time agent status for reliable load balancing and failure detection.
Intelligent Discovery: Finding the Right Partner¶
The discovery process is like having a master diplomat who knows exactly which expert to call for any given challenge:
async def discover_agents(self, required_capabilities: List[str] = None,
status_filter: str = "active",
max_load: float = 0.8) -> List[AgentProfile]:
"""Discover agents matching criteria."""
try:
candidate_agents = set()
if required_capabilities:
# Find agents with required capabilities
for capability in required_capabilities:
cap_key = f"{self.capability_index}{capability}"
agents_with_cap = self.redis_client.smembers(cap_key)
if not candidate_agents:
candidate_agents = set(agents_with_cap)
else:
# Must have ALL required capabilities
candidate_agents = candidate_agents.intersection(agents_with_cap)
Discovery uses set operations to efficiently find agents that meet all specified criteria.
# Filter by status and load
matching_agents = []
for agent_id in candidate_agents:
profile = await self.get_agent_profile(agent_id)
if (profile and
profile.status == status_filter and
profile.load <= max_load and
self._is_agent_alive(profile)):
matching_agents.append(profile)
# Sort by load (prefer less loaded agents)
matching_agents.sort(key=lambda x: x.load)
return matching_agents
except Exception as e:
logger.error(f"Failed to discover agents: {e}")
return []
Intelligent sorting ensures optimal agent selection based on current load and availability.
📝 Part 3: The Message Routing Network: Digital Post Office¶
Prerequisites: Complete 🎯 Observer and 📝 Participant Registry sections above
Think of the Message Router as the world's most sophisticated post office, capable of routing millions of messages between agents while ensuring perfect delivery, handling timeouts, and managing responses.
The Foundation of Digital Communication¶
# Router foundation imports
import asyncio
import json
from typing import Dict, List, Optional, Callable, Any
import aiohttp
import logging
from datetime import datetime, timedelta
These imports provide asynchronous HTTP communication capabilities for reliable message delivery.
# Message router core structure
class MessageRouter:
def __init__(self, registry: AgentRegistry):
self.registry = registry
self.message_handlers: Dict[str, Callable] = {}
self.pending_requests: Dict[str, asyncio.Future] = {}
self.message_queue: asyncio.Queue = asyncio.Queue()
self.session: Optional[aiohttp.ClientSession] = None
The router maintains critical data structures for agent discovery, message handling, request tracking, and queue management.
The Art of Message Sending¶
Sending a message in the A2A world is like sending a diplomatic cable—it must be properly formatted, routed through the correct channels, and tracked until completion:
async def send_message(self, message: A2AMessage,
wait_for_response: bool = None) -> Optional[A2AMessage]:
"""Send a message to another agent."""
if wait_for_response is None:
wait_for_response = message.requires_response
try:
# Add to message queue for processing
await self.message_queue.put(message)
# If response is required, wait for it
if wait_for_response:
future = asyncio.Future()
self.pending_requests[message.message_id] = future
Message queuing with response tracking ensures reliable delivery and proper timeout handling.
try:
response = await asyncio.wait_for(future, timeout=message.timeout)
return response
except asyncio.TimeoutError:
logger.warning(f"Message {message.message_id} timed out")
return None
finally:
self.pending_requests.pop(message.message_id, None)
except Exception as e:
logger.error(f"Failed to send message: {e}")
return None
Proper timeout handling and cleanup prevent resource leaks in high-volume environments.
Broadcasting: Speaking to the Masses¶
Sometimes, an agent needs to communicate with multiple agents simultaneously—like a UN secretary-general addressing all member nations:
async def broadcast_message(self, message: A2AMessage,
capability_filter: List[str] = None) -> int:
"""Broadcast a message to multiple agents."""
# Discover target agents
agents = await self.registry.discover_agents(
required_capabilities=capability_filter
)
sent_count = 0
for agent in agents:
try:
# Create individual message for each agent
agent_message = A2AMessage(
sender_id=message.sender_id,
recipient_id=agent.agent_id,
action=message.action,
payload=message.payload
)
The broadcast function first discovers eligible agents and then creates individual messages for each target agent.
await self._route_message(agent_message)
sent_count += 1
except Exception as e:
logger.error(f"Failed to send to {agent.agent_id}: {e}")
return sent_count
Broadcasting enables efficient one-to-many communication patterns essential for coordinated agent activities.
The Message Processing Engine¶
Behind the scenes, a dedicated task continuously processes the message queue, ensuring that every message reaches its intended destination:
async def _route_message(self, message: A2AMessage):
"""Route a message to its destination."""
if message.recipient_id:
# Direct message to specific agent
await self._send_to_agent(message, message.recipient_id)
else:
# Discover suitable agents
agents = await self.registry.discover_agents(
required_capabilities=message.capabilities_required
)
if agents:
# Send to best available agent (lowest load)
await self._send_to_agent(message, agents[0].agent_id)
else:
logger.warning(f"No suitable agents found for {message.message_id}")
Intelligent routing supports both direct addressing and capability-based agent selection.
🎯📝 Practice Exercises¶
Exercise 1: Basic Agent Registration¶
Create a simple agent that registers itself with the A2A registry and responds to heartbeat requests.
Exercise 2: Message Exchange¶
Implement a basic request-response pattern between two agents using the A2A message format.
Exercise 3: Capability Discovery¶
Build a discovery system that finds agents based on required capabilities and load constraints.
Advanced A2A Patterns¶
For comprehensive coverage of advanced A2A coordination patterns:
- ⚙️ Advanced Orchestration Patterns - Centralized workflow management
- ⚙️ Advanced Choreography Systems - Distributed event-driven coordination
These advanced modules cover enterprise-scale orchestration and choreography patterns for complex multi-agent systems.
Test Your Diplomatic Skills¶
Before we venture into advanced workflows, let's ensure you've mastered the art of agent diplomacy:
Question 1: What is the primary purpose of Agent-to-Agent (A2A) communication?
A) To reduce computational costs
B) To improve individual agent performance
C) To enable multiple agents to collaborate and coordinate actions
D) To replace human operators
Question 2: Which component serves as a "digital passport" in A2A systems?
A) Message Router
B) Agent Card
C) Task State
D) Priority Queue
Question 3: What information is essential for proper A2A message routing?
A) Just the timestamp
B) Only the priority level
C) Only the message content
D) Sender ID, recipient ID, and message type
Question 4: How do agents announce their capabilities in an A2A system?
A) Through manual configuration
B) Via external databases only
C) Through Agent Cards with capability metadata
D) Through file-based configurations
Question 5: What mechanism ensures A2A communication reliability when agents become unavailable?
A) Faster processing
B) Increased memory allocation
C) Message queuing with retry logic and timeouts
D) Multiple network interfaces
Question 6: What is the purpose of capability negotiation in A2A systems?
A) To improve performance
B) To match agent capabilities with task requirements
C) To simplify configuration
D) To reduce costs
Question 7: When should URGENT priority be used for A2A messages?
A) For time-critical operations requiring immediate attention
B) For data backup operations
C) For routine status updates
D) For all important messages
Question 8: What is the purpose of correlation IDs in A2A messaging?
A) To validate message integrity
B) To encrypt messages
C) To compress message content
D) To link related messages in multi-step workflows
Question 9: What is a key benefit of collaborative agent teams in A2A systems?
A) Diverse expertise and parallel problem-solving capabilities
B) Lower computational requirements
C) Reduced network traffic
D) Simpler implementation
Question 10: How does the Agent Registry maintain data consistency?
A) Through manual updates
B) Via heartbeat monitoring and automatic cleanup
C) Using external synchronization services
D) Through periodic full rebuilds
🧭 Navigation¶
Previous: Session 6 - Modular Architecture →
Next: Session 8 - Production Ready →