Skip to content

📝 Session 9: Practical Coordination - ReAct Implementation & Communication

📝 PARTICIPANT PATH CONTENT Prerequisites: Complete 🎯 Multi-Agent Patterns Time Investment: 1.5-2 hours Outcome: Practical implementation of ReAct patterns and multi-agent communication

Learning Outcomes

After completing this module, you will:

  • Implement a complete ReAct agent for data processing with execution monitoring
  • Build multi-agent communication systems with message routing and validation
  • Create consensus mechanisms for distributed agent coordination
  • Understand practical deployment patterns for coordinated agent networks

Practical ReAct Implementation - Advanced Reasoning Engine

Building on the essential concepts, let's implement a complete ReAct system with monitoring and quality assessment:

File: src/session9/reflection_engine.py - Advanced data processing reasoning patterns

Complete ReAct Execution Flow

async def _execute_data_reasoning_step(
    self, context: str, step_num: int
) -> ReActStep:
    """Execute a single ReAct reasoning step for data processing"""

    # Generate thought based on current data context
    thought = await self._generate_data_processing_thought(context)

    # Determine action based on data pipeline requirements
    action_decision = await self._decide_next_data_action(thought, context)
    action_type = ActionType(action_decision['action'])
    action_input = action_decision['input']

The reasoning step execution begins with thought generation followed by action planning. This two-phase approach mirrors human problem-solving - first understanding the situation, then deciding what to do about it.

    # Execute data action and get observation
    observation = await self._execute_data_action(action_type, action_input)

    # Calculate data quality confidence for this step
    data_quality_score = await self._calculate_data_quality_confidence(
        thought, action_type, observation
    )

    return ReActStep(
        step_number=step_num,
        thought=thought,
        action=action_type,
        action_input=action_input,
        observation=observation,
        data_quality_score=data_quality_score
    )

Action execution and quality assessment complete the reasoning cycle. The observation captures what actually happened when the action was performed, while the quality score provides confidence assessment.

Systematic Thought Generation Framework

def _generate_data_processing_thought(self, context: str) -> str:
    """Generate systematic thought with data processing framework"""
    prompt = f"""
    Current data context: {context}
    Recent processing history: {self._get_recent_data_history_summary()}

    Think systematically about data processing:
    1. What do I understand about this data schema and quality?
    2. What data transformation gaps remain?
    3. What's the most productive next processing action?
    4. What data consistency risks should I consider?

    Provide clear reasoning for the next data processing step:
    """
    return await self.llm.generate(prompt)

Systematic thought generation uses structured prompts to guide reasoning quality. The framework questions ensure the agent considers data schema, transformation requirements, next actions, and consistency risks.

Meta-Reasoning Quality Assessment

Evaluating data processing reasoning quality for continuous improvement:

class MetaDataReActAnalyzer:
    """Analyzes and improves data processing ReAct reasoning quality"""

    def __init__(self, llm_client):
        self.llm = llm_client

    async def analyze_data_reasoning_quality(
        self, reasoning_history: List[ReActStep]
    ) -> Dict[str, Any]:
        """Analyze data processing reasoning chain quality"""

        if len(reasoning_history) < 2:
            return {'quality_score': 0.5, 'issues': []}

        # Detect circular data processing patterns
        circular_analysis = await self._detect_circular_data_processing(reasoning_history)

        # Assess data transformation progress quality
        progress_analysis = await self._assess_data_progress_quality(reasoning_history)

        # Evaluate data quality confidence patterns
        quality_analysis = await self._analyze_data_quality_patterns(reasoning_history)

        return {
            'quality_score': self._calculate_overall_data_quality(
                circular_analysis, progress_analysis, quality_analysis
            ),
            'circular_processing': circular_analysis,
            'progress_quality': progress_analysis,
            'data_quality_patterns': quality_analysis,
            'recommendations': await self._generate_data_improvement_recommendations(
                reasoning_history
            )
        }

Multi-dimensional quality assessment examines different aspects of reasoning effectiveness. Circular pattern detection identifies when agents get stuck in loops, progress analysis measures forward momentum toward goals.

Circular Pattern Detection

async def _detect_circular_data_processing(
    self, history: List[ReActStep]
) -> Dict[str, Any]:
    """Detect if agent is stuck in data processing loops"""
    recent_steps = history[-4:]  # Examine last 4 steps
    action_sequence = [step.action for step in recent_steps]

    # Check for repeated data processing action patterns
    if len(set(action_sequence)) <= 2 and len(action_sequence) >= 3:
        return {
            'has_circular_processing': True,
            'pattern': action_sequence,
            'severity': 'high'
        }

    return {'has_circular_processing': False}

Circular pattern detection prevents agents from wasting time in unproductive loops. By examining recent action sequences, the analyzer identifies when agents repeat the same actions without making progress.

Multi-Agent Communication Implementation

Building practical communication systems for coordinated agent networks:

Advanced Communication Hub

class DataCommunicationHub:
    """Central coordination hub for multi-agent data processing communication"""

    def __init__(self):
        self.data_agents: Dict[str, 'BaseDataAgent'] = {}
        self.message_queue: List[DataAgentMessage] = []
        self.active_data_conversations: Dict[str, List[DataAgentMessage]] = {}
        self.data_lineage_tracking: Dict[str, Dict[str, Any]] = {}

    async def register_data_agent(self, agent: 'BaseDataAgent'):
        """Register data processing agent with communication hub"""
        self.data_agents[agent.agent_id] = agent
        await agent.set_data_communication_hub(self)

The communication hub serves as the central nervous system for multi-agent data processing. It maintains agent registries, message queues, conversation threads, and data lineage tracking.

Message Delivery with Lineage Tracking

async def send_data_message(self, message: DataAgentMessage) -> bool:
    """Send data processing message with delivery confirmation and lineage tracking"""

    # Validate recipient exists
    if message.recipient_id not in self.data_agents:
        return False

    # Track data lineage for this message
    await self._track_data_lineage(message)

    # Add to conversation thread
    if message.conversation_id:
        if message.conversation_id not in self.active_data_conversations:
            self.active_data_conversations[message.conversation_id] = []
        self.active_data_conversations[message.conversation_id].append(message)

    # Deliver message
    recipient = self.data_agents[message.recipient_id]
    success = await recipient.receive_data_message(message)

    return success

Message validation and lineage tracking form the foundation of reliable multi-agent communication. Recipient validation prevents message loss, while data lineage tracking maintains complete audit trails.

Practical Consensus Implementation

class DataConsensusManager:
    """Basic consensus mechanisms for multi-agent data processing decisions"""

    def __init__(self, agents: List['BaseDataAgent'], threshold: float = 0.67):
        self.data_agents = agents
        self.consensus_threshold = threshold
        self.data_voting_history: List[Dict[str, Any]] = []

    async def data_schema_consensus(
        self, schema_proposal: str, data_context: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Schema validation consensus across data processing agents"""

        # Collect schema validation proposals from all agents
        proposals = await self._collect_schema_proposals(schema_proposal, data_context)

        # Conduct data validation voting round
        votes = await self._conduct_schema_voting_round(proposals, data_context)

        # Count votes and determine schema acceptance
        vote_counts = self._count_schema_votes(votes)
        winner = max(vote_counts.items(), key=lambda x: x[1])

        # Check if data consensus threshold met
        total_votes = sum(vote_counts.values())
        if winner[1] / total_votes >= self.consensus_threshold:
            return {
                'consensus_reached': True,
                'schema_decision': winner[0],
                'vote_counts': vote_counts,
                'data_confidence': winner[1] / total_votes
            }
        else:
            return {
                'consensus_reached': False,
                'vote_counts': vote_counts,
                'reason': 'Data validation threshold not met'
            }

Democratic voting mechanisms ensure that schema changes have broad multi-agent agreement before implementation. The threshold-based consensus prevents minority agent opinions from blocking necessary changes.

Parallel Proposal Collection

async def _collect_schema_proposals(
    self, schema_proposal: str, data_context: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """Collect initial schema proposals from all data agents"""
    proposal_tasks = []
    for agent in self.data_agents:
        task = self._get_agent_schema_proposal(agent, schema_proposal, data_context)
        proposal_tasks.append(task)

    proposals = await asyncio.gather(*proposal_tasks, return_exceptions=True)

    # Filter out failed schema proposals
    valid_proposals = []
    for i, proposal in enumerate(proposals):
        if not isinstance(proposal, Exception):
            valid_proposals.append({
                'agent_id': self.data_agents[i].agent_id,
                'schema_proposal': proposal,
                'data_quality_score': proposal.get('data_quality_score', 0.5),
                'timestamp': datetime.now()
            })

    return valid_proposals

Parallel proposal collection maximizes coordination efficiency by gathering agent input simultaneously rather than sequentially. The asyncio.gather pattern enables scalable coordination even with hundreds of participating agents.

Hierarchical Coordination Patterns

Implementing structured multi-agent coordination for complex data processing workflows:

Dynamic Hierarchy Creation

class HierarchicalDataCoordinator:
    """Implements hierarchical multi-agent coordination patterns for data processing"""

    def __init__(self):
        self.coordinator_agents: Dict[str, 'DataCoordinatorAgent'] = {}
        self.worker_agents: Dict[str, 'DataWorkerAgent'] = {}
        self.data_delegation_rules: Dict[str, List[str]] = {}

    async def create_data_coordination_hierarchy(
        self, data_task: str, complexity_analysis: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Create hierarchical data processing coordination structure"""

        # Analyze data processing decomposition requirements
        decomposition = await self._analyze_data_task_decomposition(data_task, complexity_analysis)

        # Create coordinator for high-level data pipeline planning
        coordinator = await self._create_data_task_coordinator(decomposition)

        # Create workers for data processing execution
        workers = await self._create_data_worker_agents(decomposition)

        # Establish data processing delegation relationships
        delegation_map = await self._establish_data_delegation_hierarchy(
            coordinator, workers, decomposition
        )

        return {
            'data_coordinator': coordinator,
            'data_workers': workers,
            'delegation_map': delegation_map,
            'processing_depth': decomposition['required_levels']
        }

Dynamic hierarchy creation adapts organizational structure to task complexity - simple transformations use flat structures, while complex analytical workflows create deep hierarchies.

Three-Phase Execution

async def execute_hierarchical_data_task(
    self, data_task: str, hierarchy: Dict[str, Any]
) -> Dict[str, Any]:
    """Execute data processing task using hierarchical coordination"""

    # Phase 1: High-level data pipeline planning
    high_level_plan = await self._create_data_pipeline_plan(
        data_task, hierarchy['data_coordinator']
    )

    # Phase 2: Data processing delegation and parallel execution
    delegation_results = await self._execute_delegated_data_tasks(
        high_level_plan, hierarchy['delegation_map']
    )

    # Phase 3: Data result aggregation and validation
    final_result = await self._aggregate_hierarchical_data_results(
        delegation_results, hierarchy['data_coordinator']
    )

    return {
        'data_task': data_task,
        'processing_result': final_result,
        'execution_success': True
    }

Three-phase hierarchical execution balances strategic coordination with parallel processing efficiency. High-level planning creates optimal execution strategies, while delegated execution enables massive parallelization.

Auction-Based Task Allocation

Implementing market mechanisms for optimal resource utilization in multi-agent systems:

Competitive Bidding System

File: src/session9/auction_mechanisms.py - Competitive data processing coordination

class DataProcessingAuctionCoordinator:
    """Basic auction-based data processing task allocation"""

    def __init__(self, agents: List['BaseDataAgent']):
        self.data_agents = agents
        self.data_auction_history: List[Dict[str, Any]] = []

    async def conduct_data_processing_auction(
        self, data_task: str, processing_requirements: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Conduct sealed-bid auction for data processing task allocation"""

        # Phase 1: Assess agent data processing capabilities
        capability_assessments = await self._assess_data_processing_capabilities(
            data_task, processing_requirements
        )

        # Filter eligible data processing agents
        eligible_agents = [
            agent for agent, assessment in capability_assessments.items()
            if assessment['meets_data_requirements']
        ]

        if not eligible_agents:
            return {'success': False, 'reason': 'No eligible data processing agents'}

        # Phase 2: Collect data processing bids
        bids = await self._collect_data_processing_bids(data_task, eligible_agents, processing_requirements)

        # Phase 3: Select winner (best cost/performance ratio)
        winner = await self._select_data_auction_winner(bids, processing_requirements)

        if winner:
            return {
                'success': True,
                'winner': winner['agent_id'],
                'winning_bid': winner['bid'],
                'data_task': data_task,
                'expected_processing_time': winner['bid']['estimated_processing_time']
            }
        else:
            return {'success': False, 'reason': 'No valid data processing bids received'}

Three-phase auction implementation ensures fair and efficient task allocation. Capability assessment prevents unqualified agents from participating, while eligibility filtering creates a qualified bidder pool.

📝 Practice Exercises

Exercise 1: ReAct Agent Implementation

Implement a ReAct agent that processes CSV data with schema validation:

# Your task: Complete this ReAct implementation
class CSVDataReActAgent(DataProcessingReActAgent):
    async def process_csv_file(self, file_path: str) -> Dict[str, Any]:
        """Process CSV file using ReAct pattern"""
        # TODO: Implement ReAct reasoning for CSV processing
        # 1. Schema analysis step
        # 2. Data validation step
        # 3. Transformation planning step
        # 4. Processing execution step
        pass

Exercise 2: Multi-Agent Consensus

Build a consensus system for data quality thresholds:

# Your task: Implement data quality consensus
class DataQualityConsensusManager(DataConsensusManager):
    async def quality_threshold_consensus(
        self, dataset_name: str, proposed_thresholds: Dict[str, float]
    ) -> Dict[str, Any]:
        """Reach consensus on data quality thresholds"""
        # TODO: Implement consensus algorithm
        # 1. Collect agent assessments of proposed thresholds
        # 2. Conduct voting on threshold values
        # 3. Apply consensus rules and return decision
        pass

Exercise 3: Hierarchical Task Breakdown

Create a hierarchical coordinator for ETL pipeline processing:

# Your task: Implement hierarchical ETL coordination
class ETLHierarchicalCoordinator(HierarchicalDataCoordinator):
    async def coordinate_etl_pipeline(
        self, pipeline_config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Coordinate complex ETL pipeline using hierarchical agents"""
        # TODO: Implement hierarchical coordination
        # 1. Analyze ETL complexity and create hierarchy
        # 2. Delegate extraction, transformation, and loading tasks
        # 3. Monitor execution and aggregate results
        pass

Previous: Session 8 - Production Ready →
Next: Session 10 - Enterprise Integration →