Skip to content

Session 7 - Module A: Advanced ADK Integration (35 minutes)

Prerequisites: Session 7 Core Section Complete
Target Audience: Developers building production ADK agents
Cognitive Load: 4 advanced concepts


Module Overview

This module explores sophisticated ADK (Agent Development Kit) integration patterns including advanced Gemini model usage, MCP server orchestration, conversation memory systems, and production deployment strategies. You'll learn to build enterprise-grade ADK agents that can handle complex workflows with persistent state and multi-service coordination.

Learning Objectives

By the end of this module, you will: - Implement advanced Gemini model integration with function calling and streaming - Design sophisticated MCP server orchestration for multi-service coordination - Build persistent conversation memory systems with context-aware responses - Create production deployment strategies for Google Cloud Platform


Part 1: Advanced Gemini Integration (20 minutes)

Step 1: Setting Up Model Capabilities Framework

Let's start by building a sophisticated framework for managing different Gemini model capabilities. This creates the foundation for enterprise-grade ADK integration.

🗂️ File: src/session7/advanced_gemini.py - Advanced Gemini model patterns

from typing import Dict, List, Any, Optional, Callable, AsyncGenerator
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncio
import json
import logging
from enum import Enum

import vertexai
from vertexai.generative_models import (
    GenerativeModel, 
    Tool, 
    FunctionDeclaration,
    Part,
    Content,
    ChatSession
)
from google.cloud import aiplatform

This setup imports all the essential components we'll need for advanced Gemini integration, including function calling, streaming, and enterprise monitoring capabilities.

Step 2: Defining Model Capabilities

Now we'll create an enum to track different capabilities our ADK agent can handle:

class ModelCapability(Enum):
    """ADK agent model capabilities"""
    TEXT_GENERATION = "text_generation"
    FUNCTION_CALLING = "function_calling"
    CODE_GENERATION = "code_generation"
    MULTIMODAL = "multimodal"
    STREAMING = "streaming"

This enum allows us to clearly define and track what features our agent supports, making it easier to configure different deployment scenarios.

Step 3: Creating Advanced Model Configuration

Next, let's build a comprehensive configuration system that goes far beyond basic chat settings:

@dataclass
class ModelConfiguration:
    """Advanced Gemini model configuration"""
    model_name: str = "gemini-2.0-flash-exp"
    temperature: float = 0.7
    max_output_tokens: int = 2048
    top_p: float = 0.8
    top_k: int = 40

    # Advanced features
    enable_function_calling: bool = True
    enable_streaming: bool = False
    enable_safety_settings: bool = True

    # Performance tuning
    request_timeout: int = 60
    retry_count: int = 3
    concurrent_requests: int = 5

This configuration dataclass provides enterprise-level control over model behavior, from basic generation parameters to advanced performance tuning and feature toggles.

Step 4: Building the Advanced Gemini Agent Class

Now let's create the main agent class that orchestrates all these capabilities. We'll start with the class definition and basic initialization:

class AdvancedGeminiAgent:
    """Production-ready Gemini agent with advanced capabilities"""

    def __init__(self, config: ModelConfiguration, project_id: str, location: str):
        # Store core configuration
        self.config = config
        self.project_id = project_id
        self.location = location
        self.logger = logging.getLogger(__name__)

This starts our agent with the essential configuration elements - the model settings, Google Cloud project details, and logging infrastructure.

Next, let's initialize the Vertex AI connection:

        # Initialize Vertex AI
        vertexai.init(project=project_id, location=location)

This single line establishes our connection to Google's Vertex AI platform, enabling all the advanced Gemini model capabilities.

Now let's set up the model and session management components:

        # Model and session management
        self.model: Optional[GenerativeModel] = None
        self.chat_session: Optional[ChatSession] = None
        self.function_registry: Dict[str, Callable] = {}
        self.tool_declarations: List[FunctionDeclaration] = []

These attributes create the foundation for our agent's conversational abilities. The function registry allows us to dynamically add tool capabilities, while the tool declarations define what functions Gemini can call.

Finally, let's add comprehensive performance tracking:

        # Performance tracking
        self.request_metrics = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "average_response_time": 0.0,
            "function_calls": 0
        }

        self._initialize_model()

This metrics dictionary tracks all the essential performance indicators for production monitoring. The final call to _initialize_model() brings everything together to create a ready-to-use agent.

Step 5: Implementing Advanced Model Initialization

Let's build the sophisticated model initialization system step by step. We'll start with the method setup and error handling:

    def _initialize_model(self):
        """Initialize Gemini model with advanced configuration"""

        try:

The try block ensures robust error handling for any initialization failures that might occur with the Vertex AI connection.

First, let's configure the safety settings for content filtering:

            # Configure safety settings
            safety_settings = None
            if self.config.enable_safety_settings:
                from vertexai.generative_models import HarmCategory, HarmBlockThreshold
                safety_settings = {
                    HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
                    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
                    HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
                    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
                }

These safety settings provide enterprise-grade content filtering, blocking potentially harmful content while allowing legitimate business conversations to proceed normally.

Next, let's set up the generation configuration parameters:

            # Generation configuration
            generation_config = {
                "temperature": self.config.temperature,
                "max_output_tokens": self.config.max_output_tokens,
                "top_p": self.config.top_p,
                "top_k": self.config.top_k
            }

This configuration controls how creative and focused the model's responses will be, balancing consistency with creativity based on your specific use case.

Now let's configure the function calling tools:

            # Create tools if function calling is enabled
            tools = None
            if self.config.enable_function_calling and self.tool_declarations:
                tools = [Tool(function_declarations=self.tool_declarations)]

This conditional setup enables function calling only when both the configuration allows it and there are actual functions registered for the agent to call.

Let's initialize the model with all our configurations:

            # Initialize model
            self.model = GenerativeModel(
                model_name=self.config.model_name,
                generation_config=generation_config,
                safety_settings=safety_settings,
                tools=tools
            )

This creates the GenerativeModel instance with all our carefully configured settings, creating a production-ready AI agent.

Finally, let's establish the chat session and add proper logging:

            # Initialize chat session
            self.chat_session = self.model.start_chat()

            self.logger.info(f"Initialized Gemini model: {self.config.model_name}")

        except Exception as e:
            self.logger.error(f"Failed to initialize Gemini model: {e}")
            raise

The chat session enables persistent conversation context, while the logging provides essential debugging information for production deployments.

Step 6: Building Function Registration System

Next, let's implement the function registration system that allows our agent to call external tools:

    def register_function(self, name: str, description: str, 
                         parameters: Dict[str, Any], 
                         function: Callable):
        """Register a function for Gemini function calling"""

        # Create function declaration
        function_declaration = FunctionDeclaration(
            name=name,
            description=description,
            parameters=parameters
        )

        self.tool_declarations.append(function_declaration)
        self.function_registry[name] = function

        # Reinitialize model with new tools
        self._initialize_model()

        self.logger.info(f"Registered function: {name}")

This function registration system enables dynamic tool loading - you can add new capabilities to your agent at runtime by registering functions that Gemini can call.

Step 7: Creating Advanced Response Generation

Now let's build the sophisticated response generation system that handles both streaming and standard responses. We'll start with the method signature and initial setup:

    async def generate_response(self, prompt: str, 
                              context: Dict[str, Any] = None,
                              use_streaming: bool = None) -> str:
        """Generate response with advanced features"""

        if use_streaming is None:
            use_streaming = self.config.enable_streaming

        start_time = datetime.now()

This setup allows flexible streaming control - you can override the default configuration on a per-request basis, while tracking timing for performance metrics.

Now let's add the request tracking and context enhancement:

        try:
            self.request_metrics["total_requests"] += 1

            # Prepare context-enhanced prompt
            enhanced_prompt = self._enhance_prompt_with_context(prompt, context or {})

We increment our request counter immediately and enhance the prompt with any provided context to make responses more relevant and personalized.

Next, let's implement the dual response generation paths:

            if use_streaming:
                response = await self._generate_streaming_response(enhanced_prompt)
            else:
                response = await self._generate_standard_response(enhanced_prompt)

This branching logic allows the same method to handle both real-time streaming responses for interactive applications and standard responses for batch processing.

Finally, let's add comprehensive metrics tracking and error handling:

            # Update metrics
            response_time = (datetime.now() - start_time).total_seconds()
            self._update_metrics(response_time, success=True)

            return response

        except Exception as e:
            self.logger.error(f"Response generation failed: {e}")
            self._update_metrics(0, success=False)
            raise

This completion section tracks response times for successful requests and properly logs and handles any failures, ensuring robust production operation with full observability.

Step 8: Implementing Standard Response Generation

Let's implement the standard response generation with function calling support:

    async def _generate_standard_response(self, prompt: str) -> str:
        """Generate standard response with function calling support"""

        response = await self.chat_session.send_message_async(prompt)

        # Check if function calling is required
        if response.candidates[0].function_calls:
            # Handle function calls
            function_responses = await self._handle_function_calls(
                response.candidates[0].function_calls
            )

            # Send function responses back to model
            response = await self.chat_session.send_message_async(function_responses)

        return response.text

This method demonstrates the two-step process of function calling: first, Gemini generates function calls, then we execute them and send the results back for final response generation.

Step 9: Building Streaming Response Capability

Now let's add streaming support for real-time applications:

    async def _generate_streaming_response(self, prompt: str) -> str:
        """Generate streaming response for real-time applications"""

        response_chunks = []

        async for chunk in self.chat_session.send_message_async(prompt, stream=True):
            if chunk.text:
                response_chunks.append(chunk.text)
                # Yield partial response for real-time display
                yield chunk.text

        return "".join(response_chunks)

Streaming responses are crucial for user experience in chat applications - users see text appearing in real-time rather than waiting for the complete response.

Step 10: Implementing Function Call Handler

Let's build the sophisticated function call execution system step by step. We'll start with the method setup and response collection:

    async def _handle_function_calls(self, function_calls: List[Any]) -> List[Part]:
        """Handle function calls from Gemini model"""

        function_responses = []

This method processes the list of function calls that Gemini wants to execute and collects all the responses to send back to the model.

Now let's iterate through each function call and extract the details:

        for function_call in function_calls:
            function_name = function_call.name
            function_args = function_call.args

            self.request_metrics["function_calls"] += 1

For each function call, we extract the function name and arguments, while tracking the total number of function calls for performance monitoring.

Let's implement the function execution with robust error handling:

            if function_name in self.function_registry:
                try:
                    # Execute registered function
                    function_impl = self.function_registry[function_name]

                    if asyncio.iscoroutinefunction(function_impl):
                        result = await function_impl(**function_args)
                    else:
                        result = function_impl(**function_args)

This code checks if the function exists in our registry and handles both synchronous and asynchronous functions automatically - essential for supporting diverse tool types.

Now let's create successful function responses:

                    # Create function response
                    function_response = Part.from_function_response(
                        name=function_name,
                        response={"result": result}
                    )
                    function_responses.append(function_response)

When function execution succeeds, we package the result in the format Gemini expects and add it to our response collection.

Finally, let's handle errors and unknown functions:

                except Exception as e:
                    self.logger.error(f"Function {function_name} execution failed: {e}")
                    error_response = Part.from_function_response(
                        name=function_name,
                        response={"error": str(e)}
                    )
                    function_responses.append(error_response)
            else:
                self.logger.warning(f"Unknown function called: {function_name}")

        return function_responses

This error handling ensures that function failures don't crash the entire conversation - instead, we return error messages that Gemini can use to provide helpful responses to users.

Step 11: Building Context Enhancement System

Now let's implement the context enhancement system that makes responses more relevant. We'll start with the method setup and empty context handling:

    def _enhance_prompt_with_context(self, prompt: str, context: Dict[str, Any]) -> str:
        """Enhance prompt with contextual information"""

        if not context:
            return prompt

        context_parts = []

This setup ensures we handle cases where no context is provided gracefully while preparing to collect contextual information.

Let's add user identification context:

        # Add user context
        if "user_id" in context:
            context_parts.append(f"User ID: {context['user_id']}")

User identification helps personalize responses and enables user-specific behavior patterns in enterprise applications.

Now let's include conversation history for continuity:

        # Add conversation context
        if "conversation_history" in context:
            history = context["conversation_history"]
            if history:
                context_parts.append("Recent conversation:")
                for turn in history[-3:]:  # Last 3 turns
                    role = turn.get("role", "unknown")
                    content = turn.get("content", "")
                    context_parts.append(f"  {role}: {content[:100]}...")

This conversation history provides continuity by including the last few exchanges, helping Gemini understand the ongoing discussion context.

Let's add business-specific context:

        # Add business context
        if "business_context" in context:
            business_ctx = context["business_context"]
            context_parts.append(f"Business context: {business_ctx}")

        # Add temporal context
        context_parts.append(f"Current time: {datetime.now().isoformat()}")

Business context enables domain-specific responses, while temporal context helps with time-sensitive requests and scheduling.

Finally, let's format the enhanced prompt:

        if context_parts:
            enhanced_prompt = f"""Context:
{chr(10).join(context_parts)}

User request: {prompt}"""
            return enhanced_prompt

        return prompt

This formatting creates a clear structure that helps Gemini understand both the contextual background and the specific user request, leading to more relevant and accurate responses.

Step 12: Implementing Performance Metrics

Let's add the metrics tracking system for production monitoring:

    def _update_metrics(self, response_time: float, success: bool):
        """Update performance metrics"""

        if success:
            self.request_metrics["successful_requests"] += 1
        else:
            self.request_metrics["failed_requests"] += 1

        # Update average response time
        total_successful = self.request_metrics["successful_requests"]
        if total_successful > 0:
            current_avg = self.request_metrics["average_response_time"]
            new_avg = ((current_avg * (total_successful - 1)) + response_time) / total_successful
            self.request_metrics["average_response_time"] = new_avg

This metrics system tracks success rates, response times, and function call usage - essential data for monitoring production performance and identifying bottlenecks.

Step 13: Building MCP Service Orchestrator

Now let's create the MCP orchestrator that enables multi-service coordination:

class MCPGeminiOrchestrator:
    """Orchestrates multiple MCP services with Gemini intelligence"""

    def __init__(self, gemini_agent: AdvancedGeminiAgent):
        self.gemini_agent = gemini_agent
        self.mcp_services: Dict[str, Any] = {}
        self.service_capabilities: Dict[str, List[str]] = {}
        self.orchestration_history: List[Dict[str, Any]] = []

This orchestrator class enables our Gemini agent to intelligently coordinate multiple MCP services, creating powerful multi-service workflows.

Step 14: Implementing Service Registration

Let's build the service registration system that allows dynamic service discovery:

    def register_mcp_service(self, service_name: str, 
                           service_client: Any, 
                           capabilities: List[str]):
        """Register MCP service with its capabilities"""

        self.mcp_services[service_name] = service_client
        self.service_capabilities[service_name] = capabilities

        # Register service functions with Gemini
        self._register_service_functions(service_name, capabilities)

This registration system makes MCP services available to Gemini as callable functions, enabling seamless integration between AI reasoning and external services.

Step 15: Creating Dynamic Function Registration

Now let's implement the automatic function registration for MCP services. We'll start with the method setup and capability iteration:

    def _register_service_functions(self, service_name: str, capabilities: List[str]):
        """Register MCP service functions with Gemini"""

        for capability in capabilities:
            function_name = f"{service_name}_{capability}"

This setup creates unique function names by combining the service name with each capability, ensuring no naming conflicts between different services.

Next, let's define the function parameter schema:

            # Create function declaration
            parameters = {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": f"Query for {capability} in {service_name}"
                    },
                    "parameters": {
                        "type": "object",
                        "description": "Additional parameters for the service call"
                    }
                },
                "required": ["query"]
            }

This parameter schema follows the OpenAPI specification, defining a flexible interface that accepts a query string and optional additional parameters for any service call.

Now let's create the wrapper function that bridges Gemini and the MCP service:

            # Create wrapper function
            async def service_function(query: str, parameters: Dict[str, Any] = None):
                return await self._call_mcp_service(service_name, capability, query, parameters or {})

This wrapper function encapsulates the service call logic, providing a clean interface that Gemini can invoke while handling the complexity of MCP service communication.

Finally, let's register the function with our Gemini agent:

            self.gemini_agent.register_function(
                name=function_name,
                description=f"Use {service_name} service for {capability}",
                parameters=parameters,
                function=service_function
            )

This registration makes each service capability available to Gemini as a callable function, enabling intelligent service selection and execution based on user queries.

Step 16: Building Service Call Handler

Let's implement the robust service call handler with comprehensive error handling. We'll start with the method signature and service validation:

    async def _call_mcp_service(self, service_name: str, 
                               capability: str, 
                               query: str, 
                               parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Call MCP service with error handling and logging"""

        service_client = self.mcp_services.get(service_name)
        if not service_client:
            return {"error": f"Service {service_name} not available"}

This initial validation ensures we have a valid service client before attempting any operations, preventing runtime errors from invalid service names.

Now let's set up comprehensive orchestration tracking:

        try:
            # Record orchestration attempt
            orchestration_record = {
                "timestamp": datetime.now(),
                "service": service_name,
                "capability": capability,
                "query": query,
                "parameters": parameters
            }

This tracking record captures all the essential information about each service call, creating a detailed audit trail for debugging and performance analysis.

Let's implement the actual service call:

            # Call MCP service
            result = await service_client.call_tool(capability, {
                "query": query,
                **parameters
            })

This call combines the query with any additional parameters and invokes the specific capability on the MCP service client.

Now let's handle successful execution:

            orchestration_record["result"] = result
            orchestration_record["status"] = "success"

            self.orchestration_history.append(orchestration_record)
            return result

When the service call succeeds, we record the result and status, add the complete record to our history, and return the result to Gemini.

Finally, let's add robust error handling:

        except Exception as e:
            error_result = {"error": str(e)}
            orchestration_record["result"] = error_result
            orchestration_record["status"] = "error"

            self.orchestration_history.append(orchestration_record)
            return error_result

This error handling ensures that service failures are captured, logged, and returned in a consistent format that Gemini can use to provide meaningful error messages to users.

Step 17: Implementing Intelligent Service Selection

Now let's build the intelligent service selection system that uses Gemini to choose optimal services. We'll start with the method setup and analysis prompt creation:

    async def intelligent_service_selection(self, user_query: str) -> Dict[str, Any]:
        """Use Gemini to intelligently select and orchestrate MCP services"""

        # Analyze query to determine required services
        analysis_prompt = f"""
        Analyze this user query and determine which services would be most helpful:
        Query: "{user_query}"

        Available services and their capabilities:
        {self._format_service_capabilities()}

        Respond with a JSON object indicating:
        1. Which services to use
        2. What specific capabilities to invoke
        3. The order of operations
        4. Any parameters needed
        """

This analysis prompt provides Gemini with clear instructions to analyze the user query and create a structured execution plan using available services.

Next, let's get Gemini's service selection analysis:

        service_plan = await self.gemini_agent.generate_response(analysis_prompt)

Gemini analyzes the user query against available service capabilities and returns a JSON plan for executing the optimal service sequence.

Now let's parse and execute the service plan:

        try:
            # Parse service execution plan
            plan = json.loads(service_plan)
            execution_results = []

We parse the JSON response from Gemini and prepare to collect results from each service execution step.

Let's implement the step-by-step execution:

            # Execute services according to plan
            for step in plan.get("steps", []):
                service_name = step.get("service")
                capability = step.get("capability")
                query = step.get("query", user_query)
                parameters = step.get("parameters", {})

                result = await self._call_mcp_service(
                    service_name, capability, query, parameters
                )
                execution_results.append({
                    "step": step,
                    "result": result
                })

This execution loop follows Gemini's plan precisely, calling each service with the specified parameters and collecting all results.

Finally, let's return comprehensive results with fallback handling:

            return {
                "plan": plan,
                "execution_results": execution_results,
                "status": "completed"
            }

        except json.JSONDecodeError:
            # Fallback to direct service calls
            return await self._fallback_service_execution(user_query)

The response includes both the original plan and execution results, while the fallback ensures the system continues working even if Gemini returns invalid JSON.

Step 18: Adding Service Capability Formatting

Finally, let's implement the service capability formatting for AI analysis:

    def _format_service_capabilities(self) -> str:
        """Format service capabilities for Gemini analysis"""

        formatted = []
        for service_name, capabilities in self.service_capabilities.items():
            caps_str = ", ".join(capabilities)
            formatted.append(f"- {service_name}: {caps_str}")

        return "\n".join(formatted)

This formatting method creates a clear description of available services that Gemini can analyze to make intelligent orchestration decisions.

With these 18 steps, we've built a comprehensive advanced Gemini integration system that provides enterprise-grade capabilities including function calling, streaming responses, intelligent MCP service orchestration, and comprehensive monitoring. The modular design allows for easy extension and customization for specific use cases.


Part 2: Production Memory and Context Systems (15 minutes)

Step 19: Building Advanced Memory Framework

Now let's create a sophisticated memory system that enables persistent conversation context and intelligent information retrieval.

🗂️ File: src/session7/advanced_memory.py - Production memory systems

from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
import sqlite3
import asyncio
import threading
from enum import Enum

This setup provides all the components we need for building a production-grade memory system with persistence, threading support, and comprehensive data handling.

Step 20: Defining Memory Types

Let's create a classification system for different types of memories:

class MemoryType(Enum):
    """Types of memory for different use cases"""
    SHORT_TERM = "short_term"      # Current conversation
    LONG_TERM = "long_term"        # Persistent user history
    WORKING = "working"            # Active task context
    EPISODIC = "episodic"         # Specific event memories
    SEMANTIC = "semantic"         # General knowledge

This memory type system mimics human memory organization, allowing our agent to manage information appropriately based on its purpose and longevity.

Step 21: Creating Memory Entry Structure

Now let's build the data structure for individual memory entries. We'll start with the basic dataclass definition:

@dataclass
class MemoryEntry:
    """Individual memory entry with metadata"""
    content: str
    memory_type: MemoryType
    timestamp: datetime
    importance: float = 0.5  # 0.0 to 1.0
    context: Dict[str, Any] = field(default_factory=dict)
    tags: List[str] = field(default_factory=list)
    embedding: Optional[List[float]] = None

This structure captures all essential information about a memory: the content itself, its type, when it was created, how important it is, contextual metadata, categorization tags, and optional embeddings for semantic search.

Next, let's add serialization capability for database storage:

    def to_dict(self) -> Dict[str, Any]:
        return {
            "content": self.content,
            "memory_type": self.memory_type.value,
            "timestamp": self.timestamp.isoformat(),
            "importance": self.importance,
            "context": self.context,
            "tags": self.tags,
            "embedding": self.embedding
        }

This method converts the memory entry to a dictionary format suitable for JSON serialization and database storage, handling enum values and datetime formatting.

Finally, let's add deserialization for loading from storage:

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'MemoryEntry':
        return cls(
            content=data["content"],
            memory_type=MemoryType(data["memory_type"]),
            timestamp=datetime.fromisoformat(data["timestamp"]),
            importance=data.get("importance", 0.5),
            context=data.get("context", {}),
            tags=data.get("tags", []),
            embedding=data.get("embedding")
        )

This class method reconstructs MemoryEntry objects from stored dictionary data, handling type conversions and providing sensible defaults for optional fields.

Step 22: Building the Advanced Memory Manager

Let's create the main memory management class:

class AdvancedConversationMemory: """Advanced memory system with multiple memory types and persistence"""

def __init__(self, user_id: str, db_path: str = "agent_memory.db"):
    self.user_id = user_id
    self.db_path = db_path
    self.memories: Dict[MemoryType, List[MemoryEntry]] = {
        memory_type: [] for memory_type in MemoryType
    }
    self.memory_lock = threading.Lock()

    # Configuration
    self.max_short_term_memories = 50
    self.max_working_memories = 20
    self.importance_threshold = 0.3
    self.retention_days = 90

    # Initialize database
    self._initialize_database()
    self._load_memories()

``` This memory manager provides user-specific memory isolation, configurable limits, and thread-safe operations for concurrent access scenarios.

Step 23: Setting Up Database Persistence

Let's implement the SQLite database initialization for persistent memory storage: python def _initialize_database(self): """Initialize SQLite database for memory persistence""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, content TEXT NOT NULL, memory_type TEXT NOT NULL, timestamp TEXT NOT NULL, importance REAL NOT NULL, context TEXT, tags TEXT, embedding TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_user_type_time ON memories(user_id, memory_type, timestamp) """)

This database schema provides efficient storage with proper indexing for fast retrieval by user, memory type, and timestamp - essential for production performance.

Step 24: Loading Memories from Database

Now let's implement the memory loading system:

    def _load_memories(self):
        """Load memories from database"""

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                SELECT content, memory_type, timestamp, importance, context, tags, embedding
                FROM memories 
                WHERE user_id = ? AND timestamp > ?
                ORDER BY timestamp DESC
            """, (self.user_id, (datetime.now() - timedelta(days=self.retention_days)).isoformat()))

            for row in cursor.fetchall():
                content, memory_type, timestamp, importance, context, tags, embedding = row

                memory_entry = MemoryEntry(
                    content=content,
                    memory_type=MemoryType(memory_type),
                    timestamp=datetime.fromisoformat(timestamp),
                    importance=importance,
                    context=json.loads(context or "{}"),
                    tags=json.loads(tags or "[]"),
                    embedding=json.loads(embedding) if embedding else None
                )

                self.memories[memory_entry.memory_type].append(memory_entry)

This loading system efficiently retrieves memories within the retention period, deserializes complex data structures, and organizes them by memory type for quick access.

Step 25: Implementing Memory Addition

Let's create the system for adding new memories with automatic management:

    def add_memory(self, content: str, memory_type: MemoryType, 
                   importance: float = 0.5, context: Dict[str, Any] = None,
                   tags: List[str] = None):
        """Add new memory entry with automatic management"""

        memory_entry = MemoryEntry(
            content=content,
            memory_type=memory_type,
            timestamp=datetime.now(),
            importance=importance,
            context=context or {},
            tags=tags or []
        )

        with self.memory_lock:
            # Add to in-memory storage
            self.memories[memory_type].append(memory_entry)

            # Manage memory limits
            self._manage_memory_limits(memory_type)

            # Persist to database
            self._persist_memory(memory_entry)

This add_memory method provides thread-safe memory addition with automatic limit management and immediate persistence to prevent data loss.

Step 26: Building Memory Limit Management

Now let's implement intelligent memory limit management:

    def _manage_memory_limits(self, memory_type: MemoryType):
        """Manage memory limits and cleanup old entries"""

        memories = self.memories[memory_type]

        if memory_type == MemoryType.SHORT_TERM:
            if len(memories) > self.max_short_term_memories:
                # Keep most recent and most important
                memories.sort(key=lambda m: (m.importance, m.timestamp), reverse=True)
                self.memories[memory_type] = memories[:self.max_short_term_memories]

        elif memory_type == MemoryType.WORKING:
            if len(memories) > self.max_working_memories:
                # Keep most recent working memories
                memories.sort(key=lambda m: m.timestamp, reverse=True)
                self.memories[memory_type] = memories[:self.max_working_memories]

        # For long-term memories, use importance-based retention
        elif memory_type == MemoryType.LONG_TERM:
            # Remove low-importance old memories
            cutoff_date = datetime.now() - timedelta(days=30)
            self.memories[memory_type] = [
                m for m in memories 
                if m.importance >= self.importance_threshold or m.timestamp > cutoff_date
            ]

This management system uses different strategies for different memory types: importance-based for short-term, recency-based for working memory, and hybrid approaches for long-term storage.

Step 27: Implementing Memory Persistence

Let's add the database persistence functionality:

    def _persist_memory(self, memory_entry: MemoryEntry):
        """Persist memory entry to database"""

        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO memories (user_id, content, memory_type, timestamp, 
                                    importance, context, tags, embedding)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                self.user_id,
                memory_entry.content,
                memory_entry.memory_type.value,
                memory_entry.timestamp.isoformat(),
                memory_entry.importance,
                json.dumps(memory_entry.context),
                json.dumps(memory_entry.tags),
                json.dumps(memory_entry.embedding) if memory_entry.embedding else None
            ))

This persistence method ensures all memory entries are immediately saved to the database with proper JSON serialization for complex data types.

Step 28: Building Memory Retrieval System

Now let's implement the intelligent memory retrieval system:

    def retrieve_relevant_memories(self, query: str, 
                                 memory_types: List[MemoryType] = None,
                                 max_memories: int = 10) -> List[MemoryEntry]:
        """Retrieve memories relevant to a query"""

        if memory_types is None:
            memory_types = list(MemoryType)

        relevant_memories = []

        with self.memory_lock:
            for memory_type in memory_types:
                for memory in self.memories[memory_type]:
                    # Simple relevance scoring (could be enhanced with embeddings)
                    relevance_score = self._calculate_relevance(query, memory)

                    if relevance_score > 0.1:  # Minimum relevance threshold
                        relevant_memories.append((memory, relevance_score))

        # Sort by relevance and return top memories
        relevant_memories.sort(key=lambda x: x[1], reverse=True)
        return [memory for memory, _ in relevant_memories[:max_memories]]

This retrieval system searches across specified memory types, calculates relevance scores, and returns the most relevant memories in ranked order.

Step 29: Implementing Relevance Calculation

Let's build the relevance scoring algorithm step by step. We'll start with the basic setup and text preprocessing:

    def _calculate_relevance(self, query: str, memory: MemoryEntry) -> float:
        """Calculate relevance score between query and memory"""

        query_lower = query.lower()
        content_lower = memory.content.lower()

        # Simple keyword matching (could be enhanced with semantic similarity)
        query_words = set(query_lower.split())
        content_words = set(content_lower.split())

        if not query_words:
            return 0.0

This setup normalizes text to lowercase and creates word sets for comparison. The empty query check prevents division by zero errors.

Next, let's calculate the basic keyword overlap:

        # Calculate word overlap
        overlap = len(query_words.intersection(content_words))
        relevance = overlap / len(query_words)

This calculates what percentage of query words appear in the memory content, providing a basic relevance score between 0 and 1.

Now let's add importance-based boosting:

        # Boost relevance based on importance and recency
        importance_boost = memory.importance * 0.2

More important memories get a relevance boost, ensuring crucial information is prioritized even if keyword matching is moderate.

Let's add recency boosting for recent memories:

        # Recency boost (memories from last 24 hours get boost)
        if datetime.now() - memory.timestamp < timedelta(hours=24):
            recency_boost = 0.1
        else:
            recency_boost = 0.0

Recent memories get a small boost since they're likely more relevant to current conversations and context.

Finally, let's combine all factors and ensure the score stays within bounds:

        return min(relevance + importance_boost + recency_boost, 1.0)

This final calculation combines keyword relevance with importance and recency boosts, capping the result at 1.0 to maintain a consistent scoring scale.

Step 30: Creating Conversation Context System

Let's implement the conversation context retrieval for prompt enhancement:

    def get_conversation_context(self, max_turns: int = 10) -> List[Dict[str, Any]]:
        """Get recent conversation context for prompt enhancement"""

        short_term_memories = self.memories[MemoryType.SHORT_TERM]

        # Sort by timestamp and get recent memories
        recent_memories = sorted(
            short_term_memories,
            key=lambda m: m.timestamp,
            reverse=True
        )[:max_turns]

        # Convert to conversation format
        context = []
        for memory in reversed(recent_memories):  # Chronological order
            context.append({
                "timestamp": memory.timestamp.isoformat(),
                "content": memory.content,
                "importance": memory.importance,
                "tags": memory.tags
            })

        return context

This context system provides structured conversation history that can be used to enhance prompts with relevant background information.

Step 31: Implementing Memory Consolidation

Finally, let's add the memory consolidation system that promotes important memories:

    def consolidate_memories(self):
        """Consolidate and organize memories for better retrieval"""

        with self.memory_lock:
            # Promote important short-term memories to long-term
            short_term_memories = self.memories[MemoryType.SHORT_TERM]

            for memory in short_term_memories[:]:
                if memory.importance >= 0.8:  # High importance threshold
                    # Move to long-term memory
                    memory.memory_type = MemoryType.LONG_TERM
                    self.memories[MemoryType.LONG_TERM].append(memory)
                    self.memories[MemoryType.SHORT_TERM].remove(memory)

                    # Update in database
                    self._update_memory_type(memory)

This consolidation system automatically promotes important short-term memories to long-term storage, ensuring valuable information is preserved.

Step 32: Adding Database Update Functionality

Let's complete the memory system with database update capability:

    def _update_memory_type(self, memory: MemoryEntry):
        """Update memory type in database"""

        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                UPDATE memories 
                SET memory_type = ? 
                WHERE user_id = ? AND content = ? AND timestamp = ?
            """, (
                memory.memory_type.value,
                self.user_id,
                memory.content,
                memory.timestamp.isoformat()
            ))

This completes our advanced memory system with 32 comprehensive steps that provide enterprise-grade conversation memory with persistence, intelligent retrieval, and automatic consolidation capabilities.


Module Summary

You've now mastered advanced ADK integration patterns:

Advanced Gemini Integration: Implemented sophisticated model configuration with function calling and streaming
MCP Service Orchestration: Built intelligent multi-service coordination with Gemini-driven selection
Production Memory Systems: Created persistent conversation memory with multiple memory types
Enterprise Context Management: Designed context-aware response generation with relevance scoring


📝 Multiple Choice Test - Module A

Test your understanding of advanced ADK integration and enterprise patterns:

Question 1: What advanced features does the enhanced Gemini agent implement for function calling? A) Basic text generation only
B) Streaming responses with function calling and model configuration flexibility
C) Static responses only
D) Image processing only

Question 2: How does the MCP service orchestrator determine which service to use for a given query? A) Random selection
B) Gemini AI-driven analysis of query content and service capabilities
C) First available service
D) Manual configuration only

Question 3: What types of memory does the advanced memory system support for conversation persistence? A) Short-term memory only
B) Multiple memory types including conversation, factual, emotional, and preference memory
C) File-based storage only
D) No memory persistence

Question 4: How does the enterprise context manager determine response relevance? A) Random scoring
B) Keyword counting only
C) Semantic similarity analysis with configurable relevance thresholds
D) Manual relevance assignment

Question 5: What production features does the advanced ADK integration provide for enterprise deployment? A) Basic functionality only
B) Persistent memory, context management, error handling, and streaming capabilities
C) Development features only
D) Single-user support

🗂️ View Test Solutions →

Next Steps


🗂️ Source Files for Module A: - src/session7/advanced_gemini.py - Sophisticated Gemini model integration - src/session7/advanced_memory.py - Production memory systems - src/session7/mcp_orchestration.py - Multi-service coordination patterns