Session 3 - Module B: Advanced Workflow Orchestration¶
ADVANCED OPTIONAL MODULE This is supplementary content for deeper specialization.
Prerequisites: Complete Session 3 core content first. Time Investment: 50 minutes Target Audience: Implementer path students and workflow architects
Module Learning Outcomes¶
After completing this module, you will master: - Parallel processing and concurrent workflow execution patterns - Conditional branching and dynamic workflow routing - Advanced state management and data flow optimization - Distributed workflow coordination across multiple systems - Error recovery and compensation patterns for complex workflows
Industry Context & Applications¶
Enterprise workflow orchestration goes beyond simple sequential processing. Real-world applications require:
Complex Coordination Scenarios: - Financial processing: Multi-step verification with compliance checkpoints - Content moderation: Parallel analysis across text, image, and metadata - Customer onboarding: Conditional flows based on customer type and region - Data pipeline orchestration: Processing millions of records with fault tolerance - Multi-system integration: Coordinating APIs, databases, and external services
Modern organizations using LangGraph report 60% faster processing times and 40% fewer errors when implementing advanced orchestration patterns compared to sequential agent approaches.
Advanced Orchestration Patterns¶
Pattern 1: Parallel Processing with Result Aggregation¶
When multiple independent operations can run simultaneously, parallel processing dramatically improves performance.
# workflows/parallel_processor.py
import asyncio
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass, field
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, END
import time
import logging
logger = logging.getLogger(__name__)
@dataclass
class ParallelWorkflowState:
"""State for parallel processing workflow."""
query: str
messages: List[Any]
# Parallel processing results
weather_result: Optional[Dict] = None
file_result: Optional[Dict] = None
database_result: Optional[Dict] = None
# Processing status tracking
completed_tasks: Set[str] = field(default_factory=set)
failed_tasks: Set[str] = field(default_factory=set)
start_time: Optional[float] = None
# Final aggregated result
aggregated_result: Optional[Dict] = None
processing_time: Optional[float] = None
class ParallelWorkflowOrchestrator:
"""Advanced parallel workflow with intelligent task coordination."""
def __init__(self, mcp_manager):
self.mcp_manager = mcp_manager
self.workflow = None
async def build_workflow(self) -> StateGraph:
"""Build parallel processing workflow graph."""
workflow = StateGraph(ParallelWorkflowState)
# Add processing nodes
workflow.add_node("initializer", self._initialize_processing)
workflow.add_node("weather_processor", self._process_weather)
workflow.add_node("file_processor", self._process_files)
workflow.add_node("database_processor", self._process_database)
workflow.add_node("aggregator", self._aggregate_results)
workflow.add_node("error_handler", self._handle_errors)
# Define parallel execution flow
workflow.set_entry_point("initializer")
# Fan out to parallel processors
workflow.add_edge("initializer", "weather_processor")
workflow.add_edge("initializer", "file_processor")
workflow.add_edge("initializer", "database_processor")
# Conditional aggregation based on completion
workflow.add_conditional_edges(
"weather_processor",
self._check_completion_status,
{
"aggregate": "aggregator",
"wait": END, # Continue processing
"error": "error_handler"
}
)
workflow.add_conditional_edges(
"file_processor",
self._check_completion_status,
{
"aggregate": "aggregator",
"wait": END,
"error": "error_handler"
}
)
workflow.add_conditional_edges(
"database_processor",
self._check_completion_status,
{
"aggregate": "aggregator",
"wait": END,
"error": "error_handler"
}
)
workflow.add_edge("aggregator", END)
workflow.add_edge("error_handler", END)
self.workflow = workflow.compile()
return self.workflow
async def _initialize_processing(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
"""Initialize parallel processing state."""
state.start_time = time.time()
state.completed_tasks = set()
state.failed_tasks = set()
logger.info(f"Starting parallel processing for query: {state.query}")
return state
async def _process_weather(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
"""Process weather data in parallel."""
try:
if "weather" in state.query.lower():
adapter = await self.mcp_manager.get_adapter("weather")
if adapter:
cities = self._extract_cities(state.query)
weather_data = {}
# Process multiple cities in parallel
tasks = [
self._get_weather_for_city(adapter, city)
for city in cities
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for city, result in zip(cities, results):
if not isinstance(result, Exception):
weather_data[city] = result
state.weather_result = weather_data
else:
state.weather_result = {"error": "Weather service unavailable"}
else:
state.weather_result = {"skipped": "No weather query detected"}
state.completed_tasks.add("weather")
logger.info("Weather processing completed")
except Exception as e:
state.failed_tasks.add("weather")
state.weather_result = {"error": str(e)}
logger.error(f"Weather processing failed: {e}")
return state
async def _get_weather_for_city(self, adapter, city: str) -> Dict:
"""Get weather for a single city with timeout."""
try:
return await asyncio.wait_for(
adapter.call_tool("get_current_weather", {"city": city}),
timeout=5.0
)
except asyncio.TimeoutError:
return {"error": f"Timeout getting weather for {city}"}
except Exception as e:
return {"error": str(e)}
async def _process_files(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
"""Process file operations in parallel."""
try:
adapter = await self.mcp_manager.get_adapter("filesystem")
if adapter:
search_terms = self._extract_search_terms(state.query)
# Search for multiple terms in parallel
search_tasks = [
self._search_files_for_term(adapter, term)
for term in search_terms
]
results = await asyncio.gather(*search_tasks, return_exceptions=True)
file_data = {}
for term, result in zip(search_terms, results):
if not isinstance(result, Exception):
file_data[f"files_for_{term}"] = result
state.file_result = file_data
else:
state.file_result = {"error": "File service unavailable"}
state.completed_tasks.add("files")
logger.info("File processing completed")
except Exception as e:
state.failed_tasks.add("files")
state.file_result = {"error": str(e)}
logger.error(f"File processing failed: {e}")
return state
async def _search_files_for_term(self, adapter, term: str) -> Dict:
"""Search files for a specific term with timeout."""
try:
return await asyncio.wait_for(
adapter.call_tool("search_files", {
"pattern": f"*{term}*",
"search_type": "name"
}),
timeout=10.0
)
except asyncio.TimeoutError:
return {"error": f"Timeout searching files for {term}"}
except Exception as e:
return {"error": str(e)}
async def _process_database(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
"""Process database operations in parallel."""
try:
adapter = await self.mcp_manager.get_adapter("database")
if adapter:
# Simulate parallel database queries
query_tasks = [
self._execute_database_query(adapter, "user_data", state.query),
self._execute_database_query(adapter, "historical_data", state.query),
self._execute_database_query(adapter, "metadata", state.query)
]
results = await asyncio.gather(*query_tasks, return_exceptions=True)
database_data = {}
query_types = ["user_data", "historical_data", "metadata"]
for query_type, result in zip(query_types, results):
if not isinstance(result, Exception):
database_data[query_type] = result
state.database_result = database_data
else:
state.database_result = {"error": "Database service unavailable"}
state.completed_tasks.add("database")
logger.info("Database processing completed")
except Exception as e:
state.failed_tasks.add("database")
state.database_result = {"error": str(e)}
logger.error(f"Database processing failed: {e}")
return state
async def _execute_database_query(self, adapter, query_type: str, query: str) -> Dict:
"""Execute a database query with timeout."""
try:
return await asyncio.wait_for(
adapter.call_tool("query", {
"table": query_type,
"query": query
}),
timeout=15.0
)
except asyncio.TimeoutError:
return {"error": f"Timeout executing {query_type} query"}
except Exception as e:
return {"error": str(e)}
def _check_completion_status(self, state: ParallelWorkflowState) -> str:
"""Check if all parallel tasks are complete."""
expected_tasks = {"weather", "files", "database"}
all_tasks = state.completed_tasks | state.failed_tasks
if all_tasks >= expected_tasks:
if state.failed_tasks:
return "error"
else:
return "aggregate"
return "wait"
async def _aggregate_results(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
"""Aggregate results from all parallel processors."""
processing_time = time.time() - state.start_time if state.start_time else 0
state.processing_time = processing_time
# Aggregate all results
aggregated = {
"query": state.query,
"processing_time_seconds": processing_time,
"completed_tasks": list(state.completed_tasks),
"failed_tasks": list(state.failed_tasks),
"results": {}
}
if state.weather_result:
aggregated["results"]["weather"] = state.weather_result
if state.file_result:
aggregated["results"]["files"] = state.file_result
if state.database_result:
aggregated["results"]["database"] = state.database_result
# Generate summary
summary_parts = []
if state.weather_result and "error" not in state.weather_result:
summary_parts.append("Weather data retrieved successfully")
if state.file_result and "error" not in state.file_result:
summary_parts.append("File search completed")
if state.database_result and "error" not in state.database_result:
summary_parts.append("Database queries executed")
aggregated["summary"] = "; ".join(summary_parts) if summary_parts else "Partial results available"
state.aggregated_result = aggregated
logger.info(f"Parallel processing completed in {processing_time:.2f} seconds")
logger.info(f"Completed: {state.completed_tasks}, Failed: {state.failed_tasks}")
return state
async def _handle_errors(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
"""Handle errors in parallel processing."""
processing_time = time.time() - state.start_time if state.start_time else 0
error_summary = {
"query": state.query,
"processing_time_seconds": processing_time,
"completed_tasks": list(state.completed_tasks),
"failed_tasks": list(state.failed_tasks),
"partial_results": {},
"errors": {}
}
# Collect partial results and errors
if state.weather_result:
if "error" in state.weather_result:
error_summary["errors"]["weather"] = state.weather_result["error"]
else:
error_summary["partial_results"]["weather"] = state.weather_result
if state.file_result:
if "error" in state.file_result:
error_summary["errors"]["files"] = state.file_result["error"]
else:
error_summary["partial_results"]["files"] = state.file_result
if state.database_result:
if "error" in state.database_result:
error_summary["errors"]["database"] = state.database_result["error"]
else:
error_summary["partial_results"]["database"] = state.database_result
state.aggregated_result = error_summary
logger.warning(f"Parallel processing completed with errors after {processing_time:.2f} seconds")
return state
def _extract_cities(self, query: str) -> List[str]:
"""Extract city names from query."""
cities = ["London", "New York", "Tokyo", "Sydney", "Paris"]
found = [city for city in cities if city.lower() in query.lower()]
return found or ["London"]
def _extract_search_terms(self, query: str) -> List[str]:
"""Extract search terms from query."""
stop_words = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for"}
words = [word for word in query.lower().split() if len(word) > 3 and word not in stop_words]
return words[:3]
async def run_parallel_workflow(self, query: str) -> Dict[str, Any]:
"""Execute parallel workflow."""
if not self.workflow:
await self.build_workflow()
initial_state = ParallelWorkflowState(
query=query,
messages=[HumanMessage(content=query)]
)
try:
final_state = await self.workflow.ainvoke(initial_state)
return {
"success": True,
"result": final_state.aggregated_result,
"processing_time": final_state.processing_time
}
except Exception as e:
return {
"success": False,
"error": str(e),
"query": query
}
Pattern 2: Conditional Workflow Routing¶
Dynamic workflows adapt their execution path based on runtime conditions and intermediate results.
# workflows/conditional_router.py
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import re
import logging
logger = logging.getLogger(__name__)
class WorkflowType(Enum):
CUSTOMER_SERVICE = "customer_service"
TECHNICAL_SUPPORT = "technical_support"
SALES_INQUIRY = "sales_inquiry"
DATA_ANALYSIS = "data_analysis"
GENERAL_QUERY = "general_query"
class Priority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class ConditionalWorkflowState:
"""State for conditional workflow routing."""
query: str
messages: List[Any]
# Classification results
workflow_type: Optional[WorkflowType] = None
priority: Optional[Priority] = None
customer_tier: Optional[str] = None
urgency_keywords: List[str] = None
# Processing results
primary_result: Optional[Dict] = None
escalation_result: Optional[Dict] = None
# Routing decisions
routing_path: List[str] = None
escalated: bool = False
class ConditionalWorkflowRouter:
"""Intelligent workflow routing based on content analysis."""
def __init__(self, mcp_manager):
self.mcp_manager = mcp_manager
self.workflow = None
# Define routing rules
self.workflow_patterns = {
WorkflowType.CUSTOMER_SERVICE: [
r"account|billing|payment|refund|cancel|subscription",
r"order|shipping|delivery|tracking|return",
r"login|password|access|locked"
],
WorkflowType.TECHNICAL_SUPPORT: [
r"error|bug|crash|broken|not working|issue",
r"api|integration|connection|timeout",
r"performance|slow|loading|response"
],
WorkflowType.SALES_INQUIRY: [
r"price|cost|quote|demo|trial|purchase",
r"features|comparison|upgrade|plan",
r"contact sales|sales team|pricing"
],
WorkflowType.DATA_ANALYSIS: [
r"report|analytics|data|statistics|metrics",
r"trend|analysis|forecast|prediction",
r"dashboard|visualization|chart"
]
}
self.urgency_patterns = [
(r"urgent|emergency|critical|asap|immediately", Priority.CRITICAL),
(r"important|priority|soon|quickly", Priority.HIGH),
(r"please|when possible|sometime", Priority.MEDIUM)
]
self.escalation_triggers = [
r"complaint|frustrated|angry|disappointed",
r"manager|supervisor|escalate",
r"unacceptable|terrible|worst"
]
async def build_workflow(self) -> StateGraph:
"""Build conditional routing workflow."""
workflow = StateGraph(ConditionalWorkflowState)
# Add processing nodes
workflow.add_node("classifier", self._classify_query)
workflow.add_node("customer_service_handler", self._handle_customer_service)
workflow.add_node("technical_support_handler", self._handle_technical_support)
workflow.add_node("sales_handler", self._handle_sales_inquiry)
workflow.add_node("data_analysis_handler", self._handle_data_analysis)
workflow.add_node("general_handler", self._handle_general_query)
workflow.add_node("escalation_handler", self._handle_escalation)
workflow.add_node("priority_processor", self._process_priority)
# Set entry point
workflow.set_entry_point("classifier")
# Conditional routing based on classification
workflow.add_conditional_edges(
"classifier",
self._route_workflow,
{
"customer_service": "customer_service_handler",
"technical_support": "technical_support_handler",
"sales_inquiry": "sales_handler",
"data_analysis": "data_analysis_handler",
"general_query": "general_handler",
"escalation": "escalation_handler"
}
)
# Priority processing after main handling
workflow.add_edge("customer_service_handler", "priority_processor")
workflow.add_edge("technical_support_handler", "priority_processor")
workflow.add_edge("sales_handler", "priority_processor")
workflow.add_edge("data_analysis_handler", "priority_processor")
workflow.add_edge("general_handler", "priority_processor")
# Escalation handling
workflow.add_conditional_edges(
"priority_processor",
self._check_escalation_needed,
{
"escalate": "escalation_handler",
"complete": END
}
)
workflow.add_edge("escalation_handler", END)
self.workflow = workflow.compile()
return self.workflow
async def _classify_query(self, state: ConditionalWorkflowState) -> ConditionalWorkflowState:
"""Classify query type and extract metadata."""
query_lower = state.query.lower()
# Classify workflow type
workflow_scores = {}
for workflow_type, patterns in self.workflow_patterns.items():
score = 0
for pattern in patterns:
matches = len(re.findall(pattern, query_lower))
score += matches
workflow_scores[workflow_type] = score
# Select highest scoring workflow type
if max(workflow_scores.values()) > 0:
state.workflow_type = max(workflow_scores, key=workflow_scores.get)
else:
state.workflow_type = WorkflowType.GENERAL_QUERY
# Determine priority
state.priority = Priority.LOW # Default
for pattern, priority in self.urgency_patterns:
if re.search(pattern, query_lower):
if priority.value > state.priority.value:
state.priority = priority
# Extract urgency keywords
state.urgency_keywords = []
for pattern, _ in self.urgency_patterns:
state.urgency_keywords.extend(re.findall(pattern, query_lower))
# Check for escalation triggers
for pattern in self.escalation_triggers:
if re.search(pattern, query_lower):
state.escalated = True
break
# Routing path tracking
state.routing_path = ["classifier"]
logger.info(f"Classified query as {state.workflow_type.value} with priority {state.priority.value}")
return state
def _route_workflow(self, state: ConditionalWorkflowState) -> str:
"""Route workflow based on classification."""
if state.escalated:
return "escalation"
return state.workflow_type.value
async def _handle_customer_service(self, state: ConditionalWorkflowState) -> ConditionalWorkflowState:
"""Handle customer service workflow."""
state.routing_path.append("customer_service")
try:
# Simulate customer service processing
adapter = await self.mcp_manager.get_adapter("database")
if adapter:
# Look up customer information
customer_data = await adapter.call_tool("query", {
"table": "customers",
"query": state.query
})
# Check account status
if customer_data:
state.customer_tier = self._determine_customer_tier(customer_data)
state.primary_result = {
"type": "customer_service",
"action": "Account inquiry processed",
"customer_tier": state.customer_tier,
"recommendations": self._get_customer_service_recommendations(state)
}
except Exception as e:
state.primary_result = {
"type": "customer_service",
"error": str(e),
"fallback": "Standard customer service response"
}
return state
async def _handle_technical_support(self, state: ConditionalWorkflowState) -> ConditionalWorkflowState:
"""Handle technical support workflow."""
state.routing_path.append("technical_support")
try:
# Check system status and logs
adapter = await self.mcp_manager.get_adapter("filesystem")
if adapter:
log_data = await adapter.call_tool("search_files", {
"pattern": "*.log",
"search_type": "name"
})
state.primary_result = {
"type": "technical_support",
"action": "Technical analysis completed",
"severity": self._assess_technical_severity(state),
"next_steps": self._get_technical_recommendations(state)
}
except Exception as e:
state.primary_result = {
"type": "technical_support",
"error": str(e),
"fallback": "Standard technical support response"
}
return state
async def _handle_sales_inquiry(self, state: ConditionalWorkflowState) -> ConditionalWorkflowState:
"""Handle sales inquiry workflow."""
state.routing_path.append("sales")
try:
# Get pricing and product information
adapter = await self.mcp_manager.get_adapter("database")
if adapter:
product_data = await adapter.call_tool("query", {
"table": "products",
"query": state.query
})
state.primary_result = {
"type": "sales_inquiry",
"action": "Sales information provided",
"lead_quality": self._assess_lead_quality(state),
"recommendations": self._get_sales_recommendations(state)
}
except Exception as e:
state.primary_result = {
"type": "sales_inquiry",
"error": str(e),
"fallback": "Standard sales response"
}
return state
async def _handle_data_analysis(self, state: ConditionalWorkflowState) -> ConditionalWorkflowState:
"""Handle data analysis workflow."""
state.routing_path.append("data_analysis")
try:
# Perform data analysis
adapter = await self.mcp_manager.get_adapter("database")
if adapter:
analytics_data = await adapter.call_tool("query", {
"table": "analytics",
"query": state.query
})
state.primary_result = {
"type": "data_analysis",
"action": "Data analysis completed",
"insights": self._generate_insights(state),
"visualizations": "Charts and graphs generated"
}
except Exception as e:
state.primary_result = {
"type": "data_analysis",
"error": str(e),
"fallback": "Standard analytics response"
}
return state
async def _handle_general_query(self, state: ConditionalWorkflowState) -> ConditionalWorkflowState:
"""Handle general query workflow."""
state.routing_path.append("general")
state.primary_result = {
"type": "general_query",
"action": "General information provided",
"response": "Comprehensive general response based on available data"
}
return state
async def _process_priority(self, state: ConditionalWorkflowState) -> ConditionalWorkflowState:
"""Process based on priority level."""
state.routing_path.append("priority_processor")
if state.priority in [Priority.HIGH, Priority.CRITICAL]:
# Add priority flags to result
if state.primary_result:
state.primary_result["priority_processing"] = {
"level": state.priority.name,
"expedited": True,
"sla": "4 hours" if state.priority == Priority.HIGH else "1 hour"
}
return state
def _check_escalation_needed(self, state: ConditionalWorkflowState) -> str:
"""Check if escalation is needed."""
if (state.escalated or
state.priority == Priority.CRITICAL or
(state.customer_tier == "premium" and state.priority == Priority.HIGH)):
return "escalate"
return "complete"
async def _handle_escalation(self, state: ConditionalWorkflowState) -> ConditionalWorkflowState:
"""Handle escalation workflow."""
state.routing_path.append("escalation")
state.escalation_result = {
"escalated": True,
"reason": "Priority level or customer tier requires escalation",
"assigned_to": "Senior support team",
"escalation_time": time.time(),
"original_result": state.primary_result
}
logger.warning(f"Query escalated: {state.query[:50]}...")
return state
# Helper methods for assessment and recommendations
def _determine_customer_tier(self, customer_data: Dict) -> str:
"""Determine customer tier from data."""
# Simplified tier determination
return customer_data.get("tier", "standard")
def _assess_technical_severity(self, state: ConditionalWorkflowState) -> str:
"""Assess technical issue severity."""
if state.priority == Priority.CRITICAL:
return "critical"
elif "error" in state.query.lower() or "crash" in state.query.lower():
return "high"
else:
return "medium"
def _assess_lead_quality(self, state: ConditionalWorkflowState) -> str:
"""Assess sales lead quality."""
if any(word in state.query.lower() for word in ["demo", "trial", "purchase", "buy"]):
return "hot"
elif any(word in state.query.lower() for word in ["price", "cost", "quote"]):
return "warm"
else:
return "cold"
def _get_customer_service_recommendations(self, state: ConditionalWorkflowState) -> List[str]:
"""Get customer service recommendations."""
return [
"Verify account information",
"Check recent transaction history",
"Provide relevant documentation"
]
def _get_technical_recommendations(self, state: ConditionalWorkflowState) -> List[str]:
"""Get technical support recommendations."""
return [
"Check system logs",
"Verify configuration settings",
"Test connectivity"
]
def _get_sales_recommendations(self, state: ConditionalWorkflowState) -> List[str]:
"""Get sales recommendations."""
return [
"Provide product comparison",
"Schedule demo if interested",
"Send pricing information"
]
def _generate_insights(self, state: ConditionalWorkflowState) -> List[str]:
"""Generate data insights."""
return [
"Trend analysis completed",
"Key metrics identified",
"Recommendations generated"
]
async def run_conditional_workflow(self, query: str) -> Dict[str, Any]:
"""Execute conditional workflow."""
if not self.workflow:
await self.build_workflow()
initial_state = ConditionalWorkflowState(
query=query,
messages=[HumanMessage(content=query)]
)
try:
final_state = await self.workflow.ainvoke(initial_state)
result = {
"success": True,
"workflow_type": final_state.workflow_type.value,
"priority": final_state.priority.name,
"routing_path": final_state.routing_path,
"primary_result": final_state.primary_result
}
if final_state.escalation_result:
result["escalation"] = final_state.escalation_result
return result
except Exception as e:
return {
"success": False,
"error": str(e),
"query": query
}
Pattern 3: State Recovery and Compensation¶
Advanced workflows implement compensation patterns to handle failures and maintain data consistency.
# workflows/compensation_handler.py
import asyncio
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import json
import time
import logging
logger = logging.getLogger(__name__)
class TransactionStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATED = "compensated"
@dataclass
class CompensationAction:
"""Represents a compensation action for failed operations."""
step_name: str
compensation_function: str
compensation_args: Dict[str, Any]
executed: bool = False
execution_time: Optional[float] = None
@dataclass
class CompensationWorkflowState:
"""State for workflow with compensation handling."""
query: str
messages: List[Any]
# Transaction tracking
transaction_id: str
executed_steps: List[str] = field(default_factory=list)
failed_steps: List[str] = field(default_factory=list)
compensation_actions: List[CompensationAction] = field(default_factory=list)
# Results tracking
step_results: Dict[str, Any] = field(default_factory=dict)
final_status: TransactionStatus = TransactionStatus.PENDING
# Recovery information
checkpoint_data: Dict[str, Any] = field(default_factory=dict)
recovery_attempts: int = 0
max_recovery_attempts: int = 3
class CompensationWorkflowHandler:
"""Workflow handler with advanced compensation and recovery patterns."""
def __init__(self, mcp_manager):
self.mcp_manager = mcp_manager
self.workflow = None
# Define compensation mappings
self.compensation_map = {
"create_user_account": self._compensate_create_user,
"charge_payment": self._compensate_payment,
"send_notification": self._compensate_notification,
"update_inventory": self._compensate_inventory,
"create_shipping_label": self._compensate_shipping
}
async def build_workflow(self) -> StateGraph:
"""Build workflow with compensation handling."""
workflow = StateGraph(CompensationWorkflowState)
# Add processing nodes with compensation
workflow.add_node("initialize_transaction", self._initialize_transaction)
workflow.add_node("step_1_user_account", self._create_user_account)
workflow.add_node("step_2_payment", self._charge_payment)
workflow.add_node("step_3_notification", self._send_notification)
workflow.add_node("step_4_inventory", self._update_inventory)
workflow.add_node("step_5_shipping", self._create_shipping_label)
workflow.add_node("finalize_transaction", self._finalize_transaction)
workflow.add_node("compensation_handler", self._execute_compensation)
workflow.add_node("recovery_handler", self._handle_recovery)
# Define execution flow
workflow.set_entry_point("initialize_transaction")
workflow.add_edge("initialize_transaction", "step_1_user_account")
# Conditional flows with error handling
workflow.add_conditional_edges(
"step_1_user_account",
self._check_step_status,
{
"continue": "step_2_payment",
"retry": "recovery_handler",
"compensate": "compensation_handler"
}
)
workflow.add_conditional_edges(
"step_2_payment",
self._check_step_status,
{
"continue": "step_3_notification",
"retry": "recovery_handler",
"compensate": "compensation_handler"
}
)
workflow.add_conditional_edges(
"step_3_notification",
self._check_step_status,
{
"continue": "step_4_inventory",
"retry": "recovery_handler",
"compensate": "compensation_handler"
}
)
workflow.add_conditional_edges(
"step_4_inventory",
self._check_step_status,
{
"continue": "step_5_shipping",
"retry": "recovery_handler",
"compensate": "compensation_handler"
}
)
workflow.add_conditional_edges(
"step_5_shipping",
self._check_step_status,
{
"continue": "finalize_transaction",
"retry": "recovery_handler",
"compensate": "compensation_handler"
}
)
# Recovery and compensation flows
workflow.add_conditional_edges(
"recovery_handler",
self._check_recovery_status,
{
"retry_step_1": "step_1_user_account",
"retry_step_2": "step_2_payment",
"retry_step_3": "step_3_notification",
"retry_step_4": "step_4_inventory",
"retry_step_5": "step_5_shipping",
"compensate": "compensation_handler",
"abort": END
}
)
workflow.add_edge("compensation_handler", END)
workflow.add_edge("finalize_transaction", END)
self.workflow = workflow.compile()
return self.workflow
async def _initialize_transaction(self, state: CompensationWorkflowState) -> CompensationWorkflowState:
"""Initialize transaction with compensation tracking."""
state.transaction_id = f"txn_{int(time.time())}"
state.executed_steps = []
state.failed_steps = []
state.compensation_actions = []
state.step_results = {}
state.checkpoint_data = {"initialized": True, "start_time": time.time()}
logger.info(f"Initialized transaction {state.transaction_id}")
return state
async def _create_user_account(self, state: CompensationWorkflowState) -> CompensationWorkflowState:
"""Step 1: Create user account with compensation tracking."""
step_name = "create_user_account"
try:
# Save checkpoint before execution
state.checkpoint_data[step_name] = {"status": "executing", "timestamp": time.time()}
# Simulate user account creation
adapter = await self.mcp_manager.get_adapter("database")
if adapter:
result = await adapter.call_tool("insert", {
"table": "users",
"data": {"query": state.query, "transaction_id": state.transaction_id}
})
state.step_results[step_name] = result
state.executed_steps.append(step_name)
# Register compensation action
compensation = CompensationAction(
step_name=step_name,
compensation_function="delete_user_account",
compensation_args={"user_id": result.get("user_id"), "transaction_id": state.transaction_id}
)
state.compensation_actions.append(compensation)
logger.info(f"Successfully executed {step_name}")
else:
raise Exception("Database adapter not available")
except Exception as e:
state.failed_steps.append(step_name)
state.step_results[step_name] = {"error": str(e)}
logger.error(f"Failed to execute {step_name}: {e}")
return state
async def _charge_payment(self, state: CompensationWorkflowState) -> CompensationWorkflowState:
"""Step 2: Charge payment with compensation tracking."""
step_name = "charge_payment"
try:
state.checkpoint_data[step_name] = {"status": "executing", "timestamp": time.time()}
# Simulate payment processing
# In real implementation, this would call payment processor
result = {
"payment_id": f"pay_{int(time.time())}",
"amount": 99.99,
"status": "charged",
"transaction_id": state.transaction_id
}
state.step_results[step_name] = result
state.executed_steps.append(step_name)
# Register compensation action
compensation = CompensationAction(
step_name=step_name,
compensation_function="refund_payment",
compensation_args={"payment_id": result["payment_id"], "amount": result["amount"]}
)
state.compensation_actions.append(compensation)
logger.info(f"Successfully executed {step_name}")
except Exception as e:
state.failed_steps.append(step_name)
state.step_results[step_name] = {"error": str(e)}
logger.error(f"Failed to execute {step_name}: {e}")
return state
async def _send_notification(self, state: CompensationWorkflowState) -> CompensationWorkflowState:
"""Step 3: Send notification with compensation tracking."""
step_name = "send_notification"
try:
state.checkpoint_data[step_name] = {"status": "executing", "timestamp": time.time()}
# Simulate notification sending
result = {
"notification_id": f"notif_{int(time.time())}",
"recipient": "user@example.com",
"status": "sent",
"transaction_id": state.transaction_id
}
state.step_results[step_name] = result
state.executed_steps.append(step_name)
# Register compensation action
compensation = CompensationAction(
step_name=step_name,
compensation_function="send_cancellation_notification",
compensation_args={"notification_id": result["notification_id"], "recipient": result["recipient"]}
)
state.compensation_actions.append(compensation)
logger.info(f"Successfully executed {step_name}")
except Exception as e:
state.failed_steps.append(step_name)
state.step_results[step_name] = {"error": str(e)}
logger.error(f"Failed to execute {step_name}: {e}")
return state
async def _update_inventory(self, state: CompensationWorkflowState) -> CompensationWorkflowState:
"""Step 4: Update inventory with compensation tracking."""
step_name = "update_inventory"
try:
state.checkpoint_data[step_name] = {"status": "executing", "timestamp": time.time()}
# Simulate inventory update
adapter = await self.mcp_manager.get_adapter("database")
if adapter:
result = await adapter.call_tool("update", {
"table": "inventory",
"data": {"quantity": -1, "transaction_id": state.transaction_id}
})
state.step_results[step_name] = result
state.executed_steps.append(step_name)
# Register compensation action
compensation = CompensationAction(
step_name=step_name,
compensation_function="restore_inventory",
compensation_args={"item_id": "item_123", "quantity": 1}
)
state.compensation_actions.append(compensation)
logger.info(f"Successfully executed {step_name}")
else:
raise Exception("Database adapter not available")
except Exception as e:
state.failed_steps.append(step_name)
state.step_results[step_name] = {"error": str(e)}
logger.error(f"Failed to execute {step_name}: {e}")
return state
async def _create_shipping_label(self, state: CompensationWorkflowState) -> CompensationWorkflowState:
"""Step 5: Create shipping label with compensation tracking."""
step_name = "create_shipping_label"
try:
state.checkpoint_data[step_name] = {"status": "executing", "timestamp": time.time()}
# Simulate shipping label creation
result = {
"label_id": f"label_{int(time.time())}",
"tracking_number": f"TRK{int(time.time())}",
"status": "created",
"transaction_id": state.transaction_id
}
state.step_results[step_name] = result
state.executed_steps.append(step_name)
# Register compensation action
compensation = CompensationAction(
step_name=step_name,
compensation_function="cancel_shipping_label",
compensation_args={"label_id": result["label_id"], "tracking_number": result["tracking_number"]}
)
state.compensation_actions.append(compensation)
logger.info(f"Successfully executed {step_name}")
except Exception as e:
state.failed_steps.append(step_name)
state.step_results[step_name] = {"error": str(e)}
logger.error(f"Failed to execute {step_name}: {e}")
return state
def _check_step_status(self, state: CompensationWorkflowState) -> str:
"""Check status of last executed step."""
if state.failed_steps:
last_failed = state.failed_steps[-1]
if state.recovery_attempts < state.max_recovery_attempts:
return "retry"
else:
return "compensate"
return "continue"
async def _handle_recovery(self, state: CompensationWorkflowState) -> CompensationWorkflowState:
"""Handle recovery attempts."""
state.recovery_attempts += 1
if state.failed_steps:
failed_step = state.failed_steps[-1]
logger.info(f"Attempting recovery for {failed_step} (attempt {state.recovery_attempts})")
# Clear the failed step to retry
state.failed_steps.remove(failed_step)
# Add some delay before retry
await asyncio.sleep(1.0)
return state
def _check_recovery_status(self, state: CompensationWorkflowState) -> str:
"""Determine recovery action."""
if state.failed_steps:
failed_step = state.failed_steps[-1]
if state.recovery_attempts >= state.max_recovery_attempts:
return "compensate"
# Route to specific retry based on failed step
return f"retry_{failed_step}"
return "abort"
async def _execute_compensation(self, state: CompensationWorkflowState) -> CompensationWorkflowState:
"""Execute compensation actions for failed transaction."""
state.final_status = TransactionStatus.FAILED
logger.warning(f"Executing compensation for transaction {state.transaction_id}")
# Execute compensation actions in reverse order
for compensation in reversed(state.compensation_actions):
if not compensation.executed:
try:
await self._execute_single_compensation(compensation)
compensation.executed = True
compensation.execution_time = time.time()
logger.info(f"Executed compensation: {compensation.compensation_function}")
except Exception as e:
logger.error(f"Compensation failed: {compensation.compensation_function} - {e}")
state.final_status = TransactionStatus.COMPENSATED
return state
async def _execute_single_compensation(self, compensation: CompensationAction):
"""Execute a single compensation action."""
func_name = compensation.compensation_function
if func_name in self.compensation_map:
compensation_func = self.compensation_map[func_name]
await compensation_func(compensation.compensation_args)
else:
logger.warning(f"No compensation function found for {func_name}")
async def _finalize_transaction(self, state: CompensationWorkflowState) -> CompensationWorkflowState:
"""Finalize successful transaction."""
state.final_status = TransactionStatus.COMPLETED
logger.info(f"Transaction {state.transaction_id} completed successfully")
logger.info(f"Executed steps: {state.executed_steps}")
return state
# Compensation functions
async def _compensate_create_user(self, args: Dict[str, Any]):
"""Compensate user account creation."""
adapter = await self.mcp_manager.get_adapter("database")
if adapter:
await adapter.call_tool("delete", {
"table": "users",
"id": args["user_id"]
})
async def _compensate_payment(self, args: Dict[str, Any]):
"""Compensate payment charge."""
# Simulate refund processing
logger.info(f"Refunding payment {args['payment_id']} amount {args['amount']}")
async def _compensate_notification(self, args: Dict[str, Any]):
"""Compensate notification sending."""
# Send cancellation notification
logger.info(f"Sending cancellation notification to {args['recipient']}")
async def _compensate_inventory(self, args: Dict[str, Any]):
"""Compensate inventory update."""
adapter = await self.mcp_manager.get_adapter("database")
if adapter:
await adapter.call_tool("update", {
"table": "inventory",
"data": {"quantity": args["quantity"]}
})
async def _compensate_shipping(self, args: Dict[str, Any]):
"""Compensate shipping label creation."""
# Cancel shipping label
logger.info(f"Cancelling shipping label {args['label_id']}")
async def run_compensation_workflow(self, query: str) -> Dict[str, Any]:
"""Execute workflow with compensation handling."""
if not self.workflow:
await self.build_workflow()
initial_state = CompensationWorkflowState(
query=query,
messages=[HumanMessage(content=query)],
transaction_id=""
)
try:
final_state = await self.workflow.ainvoke(initial_state)
return {
"success": final_state.final_status == TransactionStatus.COMPLETED,
"transaction_id": final_state.transaction_id,
"final_status": final_state.final_status.value,
"executed_steps": final_state.executed_steps,
"failed_steps": final_state.failed_steps,
"recovery_attempts": final_state.recovery_attempts,
"compensation_actions": len([c for c in final_state.compensation_actions if c.executed]),
"step_results": final_state.step_results
}
except Exception as e:
return {
"success": False,
"error": str(e),
"query": query
}
Module Assessment (10 minutes)¶
Question 1: What is the primary advantage of parallel processing in workflows?
A) Simplified code structure B) Reduced resource usage C) Faster overall execution time D) Better error handling
Question 2: In conditional workflow routing, what determines the execution path?
A) Random selection B) Runtime conditions and content analysis C) User preferences D) System load
Question 3: What is the purpose of compensation actions in workflow patterns?
A) Improve performance B) Undo completed steps when later steps fail C) Reduce memory usage D) Simplify configuration
Question 4: How does the parallel workflow handle partial failures?
A) Stops all processing immediately B) Continues with successful results and reports failures C) Retries all operations D) Ignores failed operations
Question 5: What triggers escalation in the conditional workflow router?
A) High system load B) Customer tier, priority level, or escalation keywords C) Time of day D) Random intervals
Question 6: In the compensation pattern, in what order are compensation actions executed?
A) Random order B) Same order as original execution C) Reverse order of original execution D) Priority-based order
Question 7: What is the benefit of checkpoint data in advanced workflows?
A) Performance optimization B) State recovery and resumption after failures C) User experience improvement D) Reduced memory usage
View Module B Test Solutions →