Session 7 - Module A: Advanced ADK Integration (35 minutes)¶
Prerequisites: Session 7 Core Section Complete
Target Audience: Developers building production ADK agents
Cognitive Load: 4 advanced concepts
Module Overview¶
This module explores sophisticated ADK (Agent Development Kit) integration patterns including advanced Gemini model usage, MCP server orchestration, conversation memory systems, and production deployment strategies. You'll learn to build enterprise-grade ADK agents that can handle complex workflows with persistent state and multi-service coordination.
Learning Objectives¶
By the end of this module, you will: - Implement advanced Gemini model integration with function calling and streaming - Design sophisticated MCP server orchestration for multi-service coordination - Build persistent conversation memory systems with context-aware responses - Create production deployment strategies for Google Cloud Platform
Part 1: Advanced Gemini Integration (20 minutes)¶
Step 1: Setting Up Model Capabilities Framework¶
Let's start by building a sophisticated framework for managing different Gemini model capabilities. This creates the foundation for enterprise-grade ADK integration.
🗂️ File: src/session7/advanced_gemini.py
- Advanced Gemini model patterns
from typing import Dict, List, Any, Optional, Callable, AsyncGenerator
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncio
import json
import logging
from enum import Enum
import vertexai
from vertexai.generative_models import (
GenerativeModel,
Tool,
FunctionDeclaration,
Part,
Content,
ChatSession
)
from google.cloud import aiplatform
This setup imports all the essential components we'll need for advanced Gemini integration, including function calling, streaming, and enterprise monitoring capabilities.
Step 2: Defining Model Capabilities¶
Now we'll create an enum to track different capabilities our ADK agent can handle:
class ModelCapability(Enum):
"""ADK agent model capabilities"""
TEXT_GENERATION = "text_generation"
FUNCTION_CALLING = "function_calling"
CODE_GENERATION = "code_generation"
MULTIMODAL = "multimodal"
STREAMING = "streaming"
This enum allows us to clearly define and track what features our agent supports, making it easier to configure different deployment scenarios.
Step 3: Creating Advanced Model Configuration¶
Next, let's build a comprehensive configuration system that goes far beyond basic chat settings:
@dataclass
class ModelConfiguration:
"""Advanced Gemini model configuration"""
model_name: str = "gemini-2.0-flash-exp"
temperature: float = 0.7
max_output_tokens: int = 2048
top_p: float = 0.8
top_k: int = 40
# Advanced features
enable_function_calling: bool = True
enable_streaming: bool = False
enable_safety_settings: bool = True
# Performance tuning
request_timeout: int = 60
retry_count: int = 3
concurrent_requests: int = 5
This configuration dataclass provides enterprise-level control over model behavior, from basic generation parameters to advanced performance tuning and feature toggles.
Step 4: Building the Advanced Gemini Agent Class¶
Now let's create the main agent class that orchestrates all these capabilities. We'll start with the class definition and basic initialization:
class AdvancedGeminiAgent:
"""Production-ready Gemini agent with advanced capabilities"""
def __init__(self, config: ModelConfiguration, project_id: str, location: str):
# Store core configuration
self.config = config
self.project_id = project_id
self.location = location
self.logger = logging.getLogger(__name__)
This starts our agent with the essential configuration elements - the model settings, Google Cloud project details, and logging infrastructure.
Next, let's initialize the Vertex AI connection:
This single line establishes our connection to Google's Vertex AI platform, enabling all the advanced Gemini model capabilities.
Now let's set up the model and session management components:
# Model and session management
self.model: Optional[GenerativeModel] = None
self.chat_session: Optional[ChatSession] = None
self.function_registry: Dict[str, Callable] = {}
self.tool_declarations: List[FunctionDeclaration] = []
These attributes create the foundation for our agent's conversational abilities. The function registry allows us to dynamically add tool capabilities, while the tool declarations define what functions Gemini can call.
Finally, let's add comprehensive performance tracking:
# Performance tracking
self.request_metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"average_response_time": 0.0,
"function_calls": 0
}
self._initialize_model()
This metrics dictionary tracks all the essential performance indicators for production monitoring. The final call to _initialize_model()
brings everything together to create a ready-to-use agent.
Step 5: Implementing Advanced Model Initialization¶
Let's build the sophisticated model initialization system step by step. We'll start with the method setup and error handling:
The try block ensures robust error handling for any initialization failures that might occur with the Vertex AI connection.
First, let's configure the safety settings for content filtering:
# Configure safety settings
safety_settings = None
if self.config.enable_safety_settings:
from vertexai.generative_models import HarmCategory, HarmBlockThreshold
safety_settings = {
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
}
These safety settings provide enterprise-grade content filtering, blocking potentially harmful content while allowing legitimate business conversations to proceed normally.
Next, let's set up the generation configuration parameters:
# Generation configuration
generation_config = {
"temperature": self.config.temperature,
"max_output_tokens": self.config.max_output_tokens,
"top_p": self.config.top_p,
"top_k": self.config.top_k
}
This configuration controls how creative and focused the model's responses will be, balancing consistency with creativity based on your specific use case.
Now let's configure the function calling tools:
# Create tools if function calling is enabled
tools = None
if self.config.enable_function_calling and self.tool_declarations:
tools = [Tool(function_declarations=self.tool_declarations)]
This conditional setup enables function calling only when both the configuration allows it and there are actual functions registered for the agent to call.
Let's initialize the model with all our configurations:
# Initialize model
self.model = GenerativeModel(
model_name=self.config.model_name,
generation_config=generation_config,
safety_settings=safety_settings,
tools=tools
)
This creates the GenerativeModel instance with all our carefully configured settings, creating a production-ready AI agent.
Finally, let's establish the chat session and add proper logging:
# Initialize chat session
self.chat_session = self.model.start_chat()
self.logger.info(f"Initialized Gemini model: {self.config.model_name}")
except Exception as e:
self.logger.error(f"Failed to initialize Gemini model: {e}")
raise
The chat session enables persistent conversation context, while the logging provides essential debugging information for production deployments.
Step 6: Building Function Registration System¶
Next, let's implement the function registration system that allows our agent to call external tools:
def register_function(self, name: str, description: str,
parameters: Dict[str, Any],
function: Callable):
"""Register a function for Gemini function calling"""
# Create function declaration
function_declaration = FunctionDeclaration(
name=name,
description=description,
parameters=parameters
)
self.tool_declarations.append(function_declaration)
self.function_registry[name] = function
# Reinitialize model with new tools
self._initialize_model()
self.logger.info(f"Registered function: {name}")
This function registration system enables dynamic tool loading - you can add new capabilities to your agent at runtime by registering functions that Gemini can call.
Step 7: Creating Advanced Response Generation¶
Now let's build the sophisticated response generation system that handles both streaming and standard responses. We'll start with the method signature and initial setup:
async def generate_response(self, prompt: str,
context: Dict[str, Any] = None,
use_streaming: bool = None) -> str:
"""Generate response with advanced features"""
if use_streaming is None:
use_streaming = self.config.enable_streaming
start_time = datetime.now()
This setup allows flexible streaming control - you can override the default configuration on a per-request basis, while tracking timing for performance metrics.
Now let's add the request tracking and context enhancement:
try:
self.request_metrics["total_requests"] += 1
# Prepare context-enhanced prompt
enhanced_prompt = self._enhance_prompt_with_context(prompt, context or {})
We increment our request counter immediately and enhance the prompt with any provided context to make responses more relevant and personalized.
Next, let's implement the dual response generation paths:
if use_streaming:
response = await self._generate_streaming_response(enhanced_prompt)
else:
response = await self._generate_standard_response(enhanced_prompt)
This branching logic allows the same method to handle both real-time streaming responses for interactive applications and standard responses for batch processing.
Finally, let's add comprehensive metrics tracking and error handling:
# Update metrics
response_time = (datetime.now() - start_time).total_seconds()
self._update_metrics(response_time, success=True)
return response
except Exception as e:
self.logger.error(f"Response generation failed: {e}")
self._update_metrics(0, success=False)
raise
This completion section tracks response times for successful requests and properly logs and handles any failures, ensuring robust production operation with full observability.
Step 8: Implementing Standard Response Generation¶
Let's implement the standard response generation with function calling support:
async def _generate_standard_response(self, prompt: str) -> str:
"""Generate standard response with function calling support"""
response = await self.chat_session.send_message_async(prompt)
# Check if function calling is required
if response.candidates[0].function_calls:
# Handle function calls
function_responses = await self._handle_function_calls(
response.candidates[0].function_calls
)
# Send function responses back to model
response = await self.chat_session.send_message_async(function_responses)
return response.text
This method demonstrates the two-step process of function calling: first, Gemini generates function calls, then we execute them and send the results back for final response generation.
Step 9: Building Streaming Response Capability¶
Now let's add streaming support for real-time applications:
async def _generate_streaming_response(self, prompt: str) -> str:
"""Generate streaming response for real-time applications"""
response_chunks = []
async for chunk in self.chat_session.send_message_async(prompt, stream=True):
if chunk.text:
response_chunks.append(chunk.text)
# Yield partial response for real-time display
yield chunk.text
return "".join(response_chunks)
Streaming responses are crucial for user experience in chat applications - users see text appearing in real-time rather than waiting for the complete response.
Step 10: Implementing Function Call Handler¶
Let's build the sophisticated function call execution system step by step. We'll start with the method setup and response collection:
async def _handle_function_calls(self, function_calls: List[Any]) -> List[Part]:
"""Handle function calls from Gemini model"""
function_responses = []
This method processes the list of function calls that Gemini wants to execute and collects all the responses to send back to the model.
Now let's iterate through each function call and extract the details:
for function_call in function_calls:
function_name = function_call.name
function_args = function_call.args
self.request_metrics["function_calls"] += 1
For each function call, we extract the function name and arguments, while tracking the total number of function calls for performance monitoring.
Let's implement the function execution with robust error handling:
if function_name in self.function_registry:
try:
# Execute registered function
function_impl = self.function_registry[function_name]
if asyncio.iscoroutinefunction(function_impl):
result = await function_impl(**function_args)
else:
result = function_impl(**function_args)
This code checks if the function exists in our registry and handles both synchronous and asynchronous functions automatically - essential for supporting diverse tool types.
Now let's create successful function responses:
# Create function response
function_response = Part.from_function_response(
name=function_name,
response={"result": result}
)
function_responses.append(function_response)
When function execution succeeds, we package the result in the format Gemini expects and add it to our response collection.
Finally, let's handle errors and unknown functions:
except Exception as e:
self.logger.error(f"Function {function_name} execution failed: {e}")
error_response = Part.from_function_response(
name=function_name,
response={"error": str(e)}
)
function_responses.append(error_response)
else:
self.logger.warning(f"Unknown function called: {function_name}")
return function_responses
This error handling ensures that function failures don't crash the entire conversation - instead, we return error messages that Gemini can use to provide helpful responses to users.
Step 11: Building Context Enhancement System¶
Now let's implement the context enhancement system that makes responses more relevant. We'll start with the method setup and empty context handling:
def _enhance_prompt_with_context(self, prompt: str, context: Dict[str, Any]) -> str:
"""Enhance prompt with contextual information"""
if not context:
return prompt
context_parts = []
This setup ensures we handle cases where no context is provided gracefully while preparing to collect contextual information.
Let's add user identification context:
User identification helps personalize responses and enables user-specific behavior patterns in enterprise applications.
Now let's include conversation history for continuity:
# Add conversation context
if "conversation_history" in context:
history = context["conversation_history"]
if history:
context_parts.append("Recent conversation:")
for turn in history[-3:]: # Last 3 turns
role = turn.get("role", "unknown")
content = turn.get("content", "")
context_parts.append(f" {role}: {content[:100]}...")
This conversation history provides continuity by including the last few exchanges, helping Gemini understand the ongoing discussion context.
Let's add business-specific context:
# Add business context
if "business_context" in context:
business_ctx = context["business_context"]
context_parts.append(f"Business context: {business_ctx}")
# Add temporal context
context_parts.append(f"Current time: {datetime.now().isoformat()}")
Business context enables domain-specific responses, while temporal context helps with time-sensitive requests and scheduling.
Finally, let's format the enhanced prompt:
if context_parts:
enhanced_prompt = f"""Context:
{chr(10).join(context_parts)}
User request: {prompt}"""
return enhanced_prompt
return prompt
This formatting creates a clear structure that helps Gemini understand both the contextual background and the specific user request, leading to more relevant and accurate responses.
Step 12: Implementing Performance Metrics¶
Let's add the metrics tracking system for production monitoring:
def _update_metrics(self, response_time: float, success: bool):
"""Update performance metrics"""
if success:
self.request_metrics["successful_requests"] += 1
else:
self.request_metrics["failed_requests"] += 1
# Update average response time
total_successful = self.request_metrics["successful_requests"]
if total_successful > 0:
current_avg = self.request_metrics["average_response_time"]
new_avg = ((current_avg * (total_successful - 1)) + response_time) / total_successful
self.request_metrics["average_response_time"] = new_avg
This metrics system tracks success rates, response times, and function call usage - essential data for monitoring production performance and identifying bottlenecks.
Step 13: Building MCP Service Orchestrator¶
Now let's create the MCP orchestrator that enables multi-service coordination:
class MCPGeminiOrchestrator:
"""Orchestrates multiple MCP services with Gemini intelligence"""
def __init__(self, gemini_agent: AdvancedGeminiAgent):
self.gemini_agent = gemini_agent
self.mcp_services: Dict[str, Any] = {}
self.service_capabilities: Dict[str, List[str]] = {}
self.orchestration_history: List[Dict[str, Any]] = []
This orchestrator class enables our Gemini agent to intelligently coordinate multiple MCP services, creating powerful multi-service workflows.
Step 14: Implementing Service Registration¶
Let's build the service registration system that allows dynamic service discovery:
def register_mcp_service(self, service_name: str,
service_client: Any,
capabilities: List[str]):
"""Register MCP service with its capabilities"""
self.mcp_services[service_name] = service_client
self.service_capabilities[service_name] = capabilities
# Register service functions with Gemini
self._register_service_functions(service_name, capabilities)
This registration system makes MCP services available to Gemini as callable functions, enabling seamless integration between AI reasoning and external services.
Step 15: Creating Dynamic Function Registration¶
Now let's implement the automatic function registration for MCP services. We'll start with the method setup and capability iteration:
def _register_service_functions(self, service_name: str, capabilities: List[str]):
"""Register MCP service functions with Gemini"""
for capability in capabilities:
function_name = f"{service_name}_{capability}"
This setup creates unique function names by combining the service name with each capability, ensuring no naming conflicts between different services.
Next, let's define the function parameter schema:
# Create function declaration
parameters = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": f"Query for {capability} in {service_name}"
},
"parameters": {
"type": "object",
"description": "Additional parameters for the service call"
}
},
"required": ["query"]
}
This parameter schema follows the OpenAPI specification, defining a flexible interface that accepts a query string and optional additional parameters for any service call.
Now let's create the wrapper function that bridges Gemini and the MCP service:
# Create wrapper function
async def service_function(query: str, parameters: Dict[str, Any] = None):
return await self._call_mcp_service(service_name, capability, query, parameters or {})
This wrapper function encapsulates the service call logic, providing a clean interface that Gemini can invoke while handling the complexity of MCP service communication.
Finally, let's register the function with our Gemini agent:
self.gemini_agent.register_function(
name=function_name,
description=f"Use {service_name} service for {capability}",
parameters=parameters,
function=service_function
)
This registration makes each service capability available to Gemini as a callable function, enabling intelligent service selection and execution based on user queries.
Step 16: Building Service Call Handler¶
Let's implement the robust service call handler with comprehensive error handling. We'll start with the method signature and service validation:
async def _call_mcp_service(self, service_name: str,
capability: str,
query: str,
parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Call MCP service with error handling and logging"""
service_client = self.mcp_services.get(service_name)
if not service_client:
return {"error": f"Service {service_name} not available"}
This initial validation ensures we have a valid service client before attempting any operations, preventing runtime errors from invalid service names.
Now let's set up comprehensive orchestration tracking:
try:
# Record orchestration attempt
orchestration_record = {
"timestamp": datetime.now(),
"service": service_name,
"capability": capability,
"query": query,
"parameters": parameters
}
This tracking record captures all the essential information about each service call, creating a detailed audit trail for debugging and performance analysis.
Let's implement the actual service call:
# Call MCP service
result = await service_client.call_tool(capability, {
"query": query,
**parameters
})
This call combines the query with any additional parameters and invokes the specific capability on the MCP service client.
Now let's handle successful execution:
orchestration_record["result"] = result
orchestration_record["status"] = "success"
self.orchestration_history.append(orchestration_record)
return result
When the service call succeeds, we record the result and status, add the complete record to our history, and return the result to Gemini.
Finally, let's add robust error handling:
except Exception as e:
error_result = {"error": str(e)}
orchestration_record["result"] = error_result
orchestration_record["status"] = "error"
self.orchestration_history.append(orchestration_record)
return error_result
This error handling ensures that service failures are captured, logged, and returned in a consistent format that Gemini can use to provide meaningful error messages to users.
Step 17: Implementing Intelligent Service Selection¶
Now let's build the intelligent service selection system that uses Gemini to choose optimal services. We'll start with the method setup and analysis prompt creation:
async def intelligent_service_selection(self, user_query: str) -> Dict[str, Any]:
"""Use Gemini to intelligently select and orchestrate MCP services"""
# Analyze query to determine required services
analysis_prompt = f"""
Analyze this user query and determine which services would be most helpful:
Query: "{user_query}"
Available services and their capabilities:
{self._format_service_capabilities()}
Respond with a JSON object indicating:
1. Which services to use
2. What specific capabilities to invoke
3. The order of operations
4. Any parameters needed
"""
This analysis prompt provides Gemini with clear instructions to analyze the user query and create a structured execution plan using available services.
Next, let's get Gemini's service selection analysis:
Gemini analyzes the user query against available service capabilities and returns a JSON plan for executing the optimal service sequence.
Now let's parse and execute the service plan:
We parse the JSON response from Gemini and prepare to collect results from each service execution step.
Let's implement the step-by-step execution:
# Execute services according to plan
for step in plan.get("steps", []):
service_name = step.get("service")
capability = step.get("capability")
query = step.get("query", user_query)
parameters = step.get("parameters", {})
result = await self._call_mcp_service(
service_name, capability, query, parameters
)
execution_results.append({
"step": step,
"result": result
})
This execution loop follows Gemini's plan precisely, calling each service with the specified parameters and collecting all results.
Finally, let's return comprehensive results with fallback handling:
return {
"plan": plan,
"execution_results": execution_results,
"status": "completed"
}
except json.JSONDecodeError:
# Fallback to direct service calls
return await self._fallback_service_execution(user_query)
The response includes both the original plan and execution results, while the fallback ensures the system continues working even if Gemini returns invalid JSON.
Step 18: Adding Service Capability Formatting¶
Finally, let's implement the service capability formatting for AI analysis:
def _format_service_capabilities(self) -> str:
"""Format service capabilities for Gemini analysis"""
formatted = []
for service_name, capabilities in self.service_capabilities.items():
caps_str = ", ".join(capabilities)
formatted.append(f"- {service_name}: {caps_str}")
return "\n".join(formatted)
This formatting method creates a clear description of available services that Gemini can analyze to make intelligent orchestration decisions.
With these 18 steps, we've built a comprehensive advanced Gemini integration system that provides enterprise-grade capabilities including function calling, streaming responses, intelligent MCP service orchestration, and comprehensive monitoring. The modular design allows for easy extension and customization for specific use cases.
Part 2: Production Memory and Context Systems (15 minutes)¶
Step 19: Building Advanced Memory Framework¶
Now let's create a sophisticated memory system that enables persistent conversation context and intelligent information retrieval.
🗂️ File: src/session7/advanced_memory.py
- Production memory systems
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
import sqlite3
import asyncio
import threading
from enum import Enum
This setup provides all the components we need for building a production-grade memory system with persistence, threading support, and comprehensive data handling.
Step 20: Defining Memory Types¶
Let's create a classification system for different types of memories:
class MemoryType(Enum):
"""Types of memory for different use cases"""
SHORT_TERM = "short_term" # Current conversation
LONG_TERM = "long_term" # Persistent user history
WORKING = "working" # Active task context
EPISODIC = "episodic" # Specific event memories
SEMANTIC = "semantic" # General knowledge
This memory type system mimics human memory organization, allowing our agent to manage information appropriately based on its purpose and longevity.
Step 21: Creating Memory Entry Structure¶
Now let's build the data structure for individual memory entries. We'll start with the basic dataclass definition:
@dataclass
class MemoryEntry:
"""Individual memory entry with metadata"""
content: str
memory_type: MemoryType
timestamp: datetime
importance: float = 0.5 # 0.0 to 1.0
context: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
embedding: Optional[List[float]] = None
This structure captures all essential information about a memory: the content itself, its type, when it was created, how important it is, contextual metadata, categorization tags, and optional embeddings for semantic search.
Next, let's add serialization capability for database storage:
def to_dict(self) -> Dict[str, Any]:
return {
"content": self.content,
"memory_type": self.memory_type.value,
"timestamp": self.timestamp.isoformat(),
"importance": self.importance,
"context": self.context,
"tags": self.tags,
"embedding": self.embedding
}
This method converts the memory entry to a dictionary format suitable for JSON serialization and database storage, handling enum values and datetime formatting.
Finally, let's add deserialization for loading from storage:
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'MemoryEntry':
return cls(
content=data["content"],
memory_type=MemoryType(data["memory_type"]),
timestamp=datetime.fromisoformat(data["timestamp"]),
importance=data.get("importance", 0.5),
context=data.get("context", {}),
tags=data.get("tags", []),
embedding=data.get("embedding")
)
This class method reconstructs MemoryEntry objects from stored dictionary data, handling type conversions and providing sensible defaults for optional fields.
Step 22: Building the Advanced Memory Manager¶
Let's create the main memory management class:
class AdvancedConversationMemory: """Advanced memory system with multiple memory types and persistence"""
def __init__(self, user_id: str, db_path: str = "agent_memory.db"):
self.user_id = user_id
self.db_path = db_path
self.memories: Dict[MemoryType, List[MemoryEntry]] = {
memory_type: [] for memory_type in MemoryType
}
self.memory_lock = threading.Lock()
# Configuration
self.max_short_term_memories = 50
self.max_working_memories = 20
self.importance_threshold = 0.3
self.retention_days = 90
# Initialize database
self._initialize_database()
self._load_memories()
``` This memory manager provides user-specific memory isolation, configurable limits, and thread-safe operations for concurrent access scenarios.
Step 23: Setting Up Database Persistence¶
Let's implement the SQLite database initialization for persistent memory storage: python def _initialize_database(self): """Initialize SQLite database for memory persistence""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, content TEXT NOT NULL, memory_type TEXT NOT NULL, timestamp TEXT NOT NULL, importance REAL NOT NULL, context TEXT, tags TEXT, embedding TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_user_type_time ON memories(user_id, memory_type, timestamp) """)
This database schema provides efficient storage with proper indexing for fast retrieval by user, memory type, and timestamp - essential for production performance.
Step 24: Loading Memories from Database¶
Now let's implement the memory loading system:
def _load_memories(self):
"""Load memories from database"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT content, memory_type, timestamp, importance, context, tags, embedding
FROM memories
WHERE user_id = ? AND timestamp > ?
ORDER BY timestamp DESC
""", (self.user_id, (datetime.now() - timedelta(days=self.retention_days)).isoformat()))
for row in cursor.fetchall():
content, memory_type, timestamp, importance, context, tags, embedding = row
memory_entry = MemoryEntry(
content=content,
memory_type=MemoryType(memory_type),
timestamp=datetime.fromisoformat(timestamp),
importance=importance,
context=json.loads(context or "{}"),
tags=json.loads(tags or "[]"),
embedding=json.loads(embedding) if embedding else None
)
self.memories[memory_entry.memory_type].append(memory_entry)
This loading system efficiently retrieves memories within the retention period, deserializes complex data structures, and organizes them by memory type for quick access.
Step 25: Implementing Memory Addition¶
Let's create the system for adding new memories with automatic management:
def add_memory(self, content: str, memory_type: MemoryType,
importance: float = 0.5, context: Dict[str, Any] = None,
tags: List[str] = None):
"""Add new memory entry with automatic management"""
memory_entry = MemoryEntry(
content=content,
memory_type=memory_type,
timestamp=datetime.now(),
importance=importance,
context=context or {},
tags=tags or []
)
with self.memory_lock:
# Add to in-memory storage
self.memories[memory_type].append(memory_entry)
# Manage memory limits
self._manage_memory_limits(memory_type)
# Persist to database
self._persist_memory(memory_entry)
This add_memory method provides thread-safe memory addition with automatic limit management and immediate persistence to prevent data loss.
Step 26: Building Memory Limit Management¶
Now let's implement intelligent memory limit management:
def _manage_memory_limits(self, memory_type: MemoryType):
"""Manage memory limits and cleanup old entries"""
memories = self.memories[memory_type]
if memory_type == MemoryType.SHORT_TERM:
if len(memories) > self.max_short_term_memories:
# Keep most recent and most important
memories.sort(key=lambda m: (m.importance, m.timestamp), reverse=True)
self.memories[memory_type] = memories[:self.max_short_term_memories]
elif memory_type == MemoryType.WORKING:
if len(memories) > self.max_working_memories:
# Keep most recent working memories
memories.sort(key=lambda m: m.timestamp, reverse=True)
self.memories[memory_type] = memories[:self.max_working_memories]
# For long-term memories, use importance-based retention
elif memory_type == MemoryType.LONG_TERM:
# Remove low-importance old memories
cutoff_date = datetime.now() - timedelta(days=30)
self.memories[memory_type] = [
m for m in memories
if m.importance >= self.importance_threshold or m.timestamp > cutoff_date
]
This management system uses different strategies for different memory types: importance-based for short-term, recency-based for working memory, and hybrid approaches for long-term storage.
Step 27: Implementing Memory Persistence¶
Let's add the database persistence functionality:
def _persist_memory(self, memory_entry: MemoryEntry):
"""Persist memory entry to database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO memories (user_id, content, memory_type, timestamp,
importance, context, tags, embedding)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
self.user_id,
memory_entry.content,
memory_entry.memory_type.value,
memory_entry.timestamp.isoformat(),
memory_entry.importance,
json.dumps(memory_entry.context),
json.dumps(memory_entry.tags),
json.dumps(memory_entry.embedding) if memory_entry.embedding else None
))
This persistence method ensures all memory entries are immediately saved to the database with proper JSON serialization for complex data types.
Step 28: Building Memory Retrieval System¶
Now let's implement the intelligent memory retrieval system:
def retrieve_relevant_memories(self, query: str,
memory_types: List[MemoryType] = None,
max_memories: int = 10) -> List[MemoryEntry]:
"""Retrieve memories relevant to a query"""
if memory_types is None:
memory_types = list(MemoryType)
relevant_memories = []
with self.memory_lock:
for memory_type in memory_types:
for memory in self.memories[memory_type]:
# Simple relevance scoring (could be enhanced with embeddings)
relevance_score = self._calculate_relevance(query, memory)
if relevance_score > 0.1: # Minimum relevance threshold
relevant_memories.append((memory, relevance_score))
# Sort by relevance and return top memories
relevant_memories.sort(key=lambda x: x[1], reverse=True)
return [memory for memory, _ in relevant_memories[:max_memories]]
This retrieval system searches across specified memory types, calculates relevance scores, and returns the most relevant memories in ranked order.
Step 29: Implementing Relevance Calculation¶
Let's build the relevance scoring algorithm step by step. We'll start with the basic setup and text preprocessing:
def _calculate_relevance(self, query: str, memory: MemoryEntry) -> float:
"""Calculate relevance score between query and memory"""
query_lower = query.lower()
content_lower = memory.content.lower()
# Simple keyword matching (could be enhanced with semantic similarity)
query_words = set(query_lower.split())
content_words = set(content_lower.split())
if not query_words:
return 0.0
This setup normalizes text to lowercase and creates word sets for comparison. The empty query check prevents division by zero errors.
Next, let's calculate the basic keyword overlap:
# Calculate word overlap
overlap = len(query_words.intersection(content_words))
relevance = overlap / len(query_words)
This calculates what percentage of query words appear in the memory content, providing a basic relevance score between 0 and 1.
Now let's add importance-based boosting:
More important memories get a relevance boost, ensuring crucial information is prioritized even if keyword matching is moderate.
Let's add recency boosting for recent memories:
# Recency boost (memories from last 24 hours get boost)
if datetime.now() - memory.timestamp < timedelta(hours=24):
recency_boost = 0.1
else:
recency_boost = 0.0
Recent memories get a small boost since they're likely more relevant to current conversations and context.
Finally, let's combine all factors and ensure the score stays within bounds:
This final calculation combines keyword relevance with importance and recency boosts, capping the result at 1.0 to maintain a consistent scoring scale.
Step 30: Creating Conversation Context System¶
Let's implement the conversation context retrieval for prompt enhancement:
def get_conversation_context(self, max_turns: int = 10) -> List[Dict[str, Any]]:
"""Get recent conversation context for prompt enhancement"""
short_term_memories = self.memories[MemoryType.SHORT_TERM]
# Sort by timestamp and get recent memories
recent_memories = sorted(
short_term_memories,
key=lambda m: m.timestamp,
reverse=True
)[:max_turns]
# Convert to conversation format
context = []
for memory in reversed(recent_memories): # Chronological order
context.append({
"timestamp": memory.timestamp.isoformat(),
"content": memory.content,
"importance": memory.importance,
"tags": memory.tags
})
return context
This context system provides structured conversation history that can be used to enhance prompts with relevant background information.
Step 31: Implementing Memory Consolidation¶
Finally, let's add the memory consolidation system that promotes important memories:
def consolidate_memories(self):
"""Consolidate and organize memories for better retrieval"""
with self.memory_lock:
# Promote important short-term memories to long-term
short_term_memories = self.memories[MemoryType.SHORT_TERM]
for memory in short_term_memories[:]:
if memory.importance >= 0.8: # High importance threshold
# Move to long-term memory
memory.memory_type = MemoryType.LONG_TERM
self.memories[MemoryType.LONG_TERM].append(memory)
self.memories[MemoryType.SHORT_TERM].remove(memory)
# Update in database
self._update_memory_type(memory)
This consolidation system automatically promotes important short-term memories to long-term storage, ensuring valuable information is preserved.
Step 32: Adding Database Update Functionality¶
Let's complete the memory system with database update capability:
def _update_memory_type(self, memory: MemoryEntry):
"""Update memory type in database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
UPDATE memories
SET memory_type = ?
WHERE user_id = ? AND content = ? AND timestamp = ?
""", (
memory.memory_type.value,
self.user_id,
memory.content,
memory.timestamp.isoformat()
))
This completes our advanced memory system with 32 comprehensive steps that provide enterprise-grade conversation memory with persistence, intelligent retrieval, and automatic consolidation capabilities.
Module Summary¶
You've now mastered advanced ADK integration patterns:
✅ Advanced Gemini Integration: Implemented sophisticated model configuration with function calling and streaming
✅ MCP Service Orchestration: Built intelligent multi-service coordination with Gemini-driven selection
✅ Production Memory Systems: Created persistent conversation memory with multiple memory types
✅ Enterprise Context Management: Designed context-aware response generation with relevance scoring
📝 Multiple Choice Test - Module A¶
Test your understanding of advanced ADK integration and enterprise patterns:
Question 1: What advanced features does the enhanced Gemini agent implement for function calling? A) Basic text generation only
B) Streaming responses with function calling and model configuration flexibility
C) Static responses only
D) Image processing only
Question 2: How does the MCP service orchestrator determine which service to use for a given query? A) Random selection
B) Gemini AI-driven analysis of query content and service capabilities
C) First available service
D) Manual configuration only
Question 3: What types of memory does the advanced memory system support for conversation persistence? A) Short-term memory only
B) Multiple memory types including conversation, factual, emotional, and preference memory
C) File-based storage only
D) No memory persistence
Question 4: How does the enterprise context manager determine response relevance? A) Random scoring
B) Keyword counting only
C) Semantic similarity analysis with configurable relevance thresholds
D) Manual relevance assignment
Question 5: What production features does the advanced ADK integration provide for enterprise deployment? A) Basic functionality only
B) Persistent memory, context management, error handling, and streaming capabilities
C) Development features only
D) Single-user support
Next Steps¶
- Continue to Module B: Enterprise Agent Systems for production deployment patterns
- Return to Core: Session 7 Main
- Advance to Session 8: Agno Production Ready Agents
🗂️ Source Files for Module A: - src/session7/advanced_gemini.py
- Sophisticated Gemini model integration - src/session7/advanced_memory.py
- Production memory systems - src/session7/mcp_orchestration.py
- Multi-service coordination patterns