Semantic Cache Middleware with LangChain Agents#
This notebook demonstrates how to use SemanticCacheMiddleware with LangChain agents using the standard create_agent pattern.
Key Features#
Semantic matching: Cache similar prompts, not just exact matches
Cost reduction: Avoid redundant LLM calls for similar queries
Latency improvement: Instant responses for cached queries
Tool-aware caching: Smart handling of tool-calling workflows
Responses API support: Transparent handling of both string and block-based content formats
Two API Modes#
OpenAI offers two API modes that produce different content formats:
Default (Chat Completions):
AIMessage.contentis a plain stringResponses API:
AIMessage.contentis a list of content blocks with embedded provider IDs
The Responses API is used by Azure OpenAI and when enabling advanced features like reasoning or annotations. The middleware handles both formats transparently, stripping provider-specific IDs from cached content blocks to prevent duplicate ID errors on subsequent LLM calls.
Tool-Aware Caching#
When the LLM uses tools, caching behavior depends on whether tools are deterministic:
Deterministic tools (e.g., calculator): Same input always produces same output. Safe to cache.
Non-deterministic tools (e.g., stock prices): Output changes over time. Don’t cache.
Configure this via deterministic_tools in SemanticCacheConfig:
SemanticCacheConfig(
deterministic_tools=["calculate", "convert_units"], # Safe to cache after these
)
Prerequisites#
Redis 8.0+ or Redis Stack (with RedisJSON and RediSearch)
OpenAI API key
Note on Async Usage#
The Redis middleware uses async methods internally. When using with create_agent, you must use await agent.ainvoke() rather than agent.invoke().
Setup#
Install required packages and set API keys.
%%capture --no-stderr
# When running via docker-compose, the local library is already installed via editable mount.
# Only install from PyPI if not already available.
try:
import langgraph.middleware.redis
print("langgraph-checkpoint-redis already installed")
except ImportError:
%pip install -U langgraph-checkpoint-redis
%pip install -U langchain langchain-openai sentence-transformers
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
Two-Model Setup and Tools#
We create two model instances to demonstrate both API modes side-by-side:
model_default: Standard Chat Completions — content is a plain stringmodel_responses_api: Responses API — content is a list of blocks with embedded IDs
We also define helper tools and a response inspection utility.
import ast
import operator as op
import time
import uuid
from langchain_openai import ChatOpenAI
# Default mode: content is a plain string
model_default = ChatOpenAI(model="gpt-4o-mini")
# Responses API mode: content is a list of blocks with embedded IDs
# Used by Azure OpenAI and advanced features (reasoning, annotations)
model_responses_api = ChatOpenAI(model="gpt-4o-mini", use_responses_api=True)
print("Models created:")
print("- model_default: Chat Completions (string content)")
print("- model_responses_api: Responses API (list-of-blocks content)")
Models created:
- model_default: Chat Completions (string content)
- model_responses_api: Responses API (list-of-blocks content)
def format_content(content, max_len=200):
"""Extract readable text from AI message content (handles both API modes)."""
if isinstance(content, str):
text = content
elif isinstance(content, list):
parts = []
for block in content:
if isinstance(block, dict):
parts.append(block.get("text", ""))
elif isinstance(block, str):
parts.append(block)
text = " ".join(parts)
else:
text = str(content)
if max_len and len(text) > max_len:
return text[:max_len] + "..."
return text
def inspect_response(result, label=""):
"""Show the structure and content of an AI response."""
ai_msg = result["messages"][-1]
print(f"\n--- {label} ---")
print(f"Content type: {type(ai_msg.content).__name__}")
if isinstance(ai_msg.content, list):
print(f"Number of content blocks: {len(ai_msg.content)}")
for i, block in enumerate(ai_msg.content):
if isinstance(block, dict):
print(f" Block {i}: type={block.get('type')}, has_id={'id' in block}")
print(f"Response: {format_content(ai_msg.content)}")
cached = ai_msg.additional_kwargs.get("cached", False)
print(f"Cached: {cached}")
from langchain_core.tools import tool
# Safe math evaluator - no arbitrary code execution
SAFE_OPS = {
ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul,
ast.Div: op.truediv, ast.Pow: op.pow, ast.USub: op.neg,
}
def _eval_node(node):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp) and type(node.op) in SAFE_OPS:
return SAFE_OPS[type(node.op)](_eval_node(node.left), _eval_node(node.right))
elif isinstance(node, ast.UnaryOp) and type(node.op) in SAFE_OPS:
return SAFE_OPS[type(node.op)](_eval_node(node.operand))
raise ValueError("Unsupported expression")
def safe_eval(expr: str) -> float:
return _eval_node(ast.parse(expr, mode='eval').body)
# Define some tools for the agent
@tool
def get_weather(location: str) -> str:
"""Get the current weather for a location."""
# Simulated weather data
weather_data = {
"new york": "72°F, Partly Cloudy",
"san francisco": "65°F, Foggy",
"london": "58°F, Rainy",
"tokyo": "80°F, Sunny",
}
location_lower = location.lower()
for city, weather in weather_data.items():
if city in location_lower:
return f"Weather in {location}: {weather}"
return f"Weather data not available for {location}"
@tool
def calculate(expression: str) -> str:
"""Evaluate a mathematical expression."""
try:
result = safe_eval(expression)
return f"{expression} = {result}"
except Exception as e:
return f"Error: {str(e)}"
tools = [get_weather, calculate]
REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379")
Understanding the Two API Modes#
Before demonstrating caching, let’s see how the two API modes differ in their response format. This is important because the middleware must handle both formats correctly.
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
# Create temporary agents (no middleware) to show raw response formats
agent_default_raw = create_agent(model=model_default, tools=tools)
agent_responses_raw = create_agent(model=model_responses_api, tools=tools)
# Same question, two different API modes
question = "What is 2 + 2?"
result_default = await agent_default_raw.ainvoke(
{"messages": [HumanMessage(content=question)]}
)
inspect_response(result_default, label="Default Mode (Chat Completions)")
result_responses = await agent_responses_raw.ainvoke(
{"messages": [HumanMessage(content=question)]}
)
inspect_response(result_responses, label="Responses API Mode")
print("\nNotice: Responses API content is a list of blocks, each with an embedded 'id'.")
print("The middleware strips these IDs from cached content to prevent duplicate ID errors.")
--- Default Mode (Chat Completions) ---
Content type: str
Response: The result of \(2 + 2\) is \(4\).
Cached: False
--- Responses API Mode ---
Content type: list
Number of content blocks: 1
Block 0: type=text, has_id=True
Response: \(2 + 2 = 4\).
Cached: False
Notice: Responses API content is a list of blocks, each with an embedded 'id'.
The middleware strips these IDs from cached content to prevent duplicate ID errors.
Semantic Cache with Default Mode#
First, let’s demonstrate semantic caching with the standard Chat Completions API. The cache matches semantically similar prompts, not just exact strings.
from langgraph.middleware.redis import SemanticCacheMiddleware, SemanticCacheConfig
# Unique cache name to avoid collisions between runs
cache_name_default = f"demo_semantic_cache_default_{uuid.uuid4().hex[:8]}"
# Create the semantic cache middleware
cache_middleware_default = SemanticCacheMiddleware(
SemanticCacheConfig(
redis_url=REDIS_URL,
name=cache_name_default,
distance_threshold=0.15, # Lower = stricter matching
ttl_seconds=3600, # Cache entries expire after 1 hour
cache_final_only=True, # Only cache final responses (not tool calls)
deterministic_tools=["calculate"], # Calculator is deterministic
)
)
print("SemanticCacheMiddleware created for default mode!")
print(f"- Cache name: {cache_name_default}")
print("- distance_threshold: 0.15 (semantic matching)")
print("- cache_final_only: True (don't cache tool-calling responses)")
print("- deterministic_tools: ['calculate'] (safe to cache after these tools)")
SemanticCacheMiddleware created for default mode!
- Cache name: demo_semantic_cache_default_45361e15
- distance_threshold: 0.15 (semantic matching)
- cache_final_only: True (don't cache tool-calling responses)
- deterministic_tools: ['calculate'] (safe to cache after these tools)
# Create the agent with semantic cache middleware + default model
agent_default = create_agent(
model=model_default,
tools=tools,
middleware=[cache_middleware_default],
)
print("Agent created with SemanticCacheMiddleware (default mode)!")
Agent created with SemanticCacheMiddleware (default mode)!
Cache Hit/Miss Demo#
Let’s make some queries and observe how the cache works. The first query will hit the LLM, while semantically similar queries should hit the cache.
Important: We use await agent.ainvoke() because the middleware is async-first.
# First query - will be a cache miss (hits the LLM)
print("Query 1: 'What is the capital of France?'")
print("="*50)
start = time.time()
result1 = await agent_default.ainvoke({"messages": [HumanMessage(content="What is the capital of France?")]})
elapsed1 = time.time() - start
print(f"Response: {result1['messages'][-1].content[:200]}...")
print(f"Time: {elapsed1:.2f}s (cache miss - LLM call)")
inspect_response(result1, label="Query 1 (cache miss)")
Query 1: 'What is the capital of France?'
==================================================
Warning: You are sending unauthenticated requests to the HF Hub. Please set a HF_TOKEN to enable higher rate limits and faster downloads.
This vectorizer has no async embed method. Falling back to sync.
This vectorizer has no async embed method. Falling back to sync.
Response: The capital of France is Paris....
Time: 4.47s (cache miss - LLM call)
--- Query 1 (cache miss) ---
Content type: str
Response: The capital of France is Paris.
Cached: False
# Second query - semantically similar, should hit cache
print("\nQuery 2: 'Tell me France's capital city'")
print("="*50)
start = time.time()
result2 = await agent_default.ainvoke({"messages": [HumanMessage(content="Tell me France's capital city")]})
elapsed2 = time.time() - start
print(f"Response: {result2['messages'][-1].content[:200]}...")
print(f"Time: {elapsed2:.2f}s (expected: cache hit - much faster!)")
inspect_response(result2, label="Query 2 (cache hit)")
This vectorizer has no async embed method. Falling back to sync.
Query 2: 'Tell me France's capital city'
==================================================
Response: The capital of France is Paris....
Time: 0.10s (expected: cache hit - much faster!)
--- Query 2 (cache hit) ---
Content type: str
Response: The capital of France is Paris.
Cached: True
# Third query - different topic, should be cache miss
print("\nQuery 3: 'What is the capital of Germany?'")
print("="*50)
start = time.time()
result3 = await agent_default.ainvoke({"messages": [HumanMessage(content="What is the capital of Germany?")]})
elapsed3 = time.time() - start
print(f"Response: {result3['messages'][-1].content[:200]}...")
print(f"Time: {elapsed3:.2f}s (cache miss - different topic)")
This vectorizer has no async embed method. Falling back to sync.
Query 3: 'What is the capital of Germany?'
==================================================
This vectorizer has no async embed method. Falling back to sync.
Response: The capital of Germany is Berlin....
Time: 0.80s (cache miss - different topic)
# Summary
print("\n" + "="*50)
print("SUMMARY (Default Mode)")
print("="*50)
print(f"Query 1 (France capital, miss): {elapsed1:.2f}s")
print(f"Query 2 (France capital, hit): {elapsed2:.2f}s")
print(f"Query 3 (Germany capital, miss): {elapsed3:.2f}s")
if elapsed2 < elapsed1 * 0.5:
print("\nCache hit was significantly faster!")
print(f" Speedup: {elapsed1/elapsed2:.1f}x")
==================================================
SUMMARY (Default Mode)
==================================================
Query 1 (France capital, miss): 4.47s
Query 2 (France capital, hit): 0.10s
Query 3 (Germany capital, miss): 0.80s
Cache hit was significantly faster!
Speedup: 44.6x
Semantic Cache with Responses API Mode#
Now let’s demonstrate the same caching behavior with the Responses API mode.
When use_responses_api=True, the LLM returns content as a list of blocks, each with
an embedded provider ID (e.g., rs_...). The middleware automatically strips these IDs
from cached content to prevent duplicate ID errors.
This is critical for Azure OpenAI customers who use ChatOpenAI(use_responses_api=True).
# Create a separate cache for Responses API mode
cache_name_responses = f"demo_semantic_cache_responses_{uuid.uuid4().hex[:8]}"
cache_middleware_responses = SemanticCacheMiddleware(
SemanticCacheConfig(
redis_url=REDIS_URL,
name=cache_name_responses,
distance_threshold=0.15,
ttl_seconds=3600,
cache_final_only=True,
deterministic_tools=["calculate"],
)
)
agent_responses = create_agent(
model=model_responses_api,
tools=tools,
middleware=[cache_middleware_responses],
)
print("Agent created with SemanticCacheMiddleware (Responses API mode)!")
Agent created with SemanticCacheMiddleware (Responses API mode)!
# Cache miss - first call with Responses API
print("Query 1 (Responses API): 'What is the capital of Japan?'")
print("="*50)
start = time.time()
result_resp_1 = await agent_responses.ainvoke(
{"messages": [HumanMessage(content="What is the capital of Japan?")]}
)
elapsed_resp_1 = time.time() - start
ai_msg_1 = result_resp_1["messages"][-1]
print(f"Time: {elapsed_resp_1:.2f}s (cache miss - LLM call)")
inspect_response(result_resp_1, label="Responses API - Cache Miss")
Query 1 (Responses API): 'What is the capital of Japan?'
==================================================
This vectorizer has no async embed method. Falling back to sync.
This vectorizer has no async embed method. Falling back to sync.
Time: 3.55s (cache miss - LLM call)
--- Responses API - Cache Miss ---
Content type: list
Number of content blocks: 1
Block 0: type=text, has_id=True
Response: The capital of Japan is Tokyo.
Cached: False
# Cache hit - semantically similar query
print("\nQuery 2 (Responses API): 'Tell me Japan's capital city'")
print("="*50)
start = time.time()
result_resp_2 = await agent_responses.ainvoke(
{"messages": [HumanMessage(content="Tell me Japan's capital city")]}
)
elapsed_resp_2 = time.time() - start
ai_msg_2 = result_resp_2["messages"][-1]
print(f"Time: {elapsed_resp_2:.2f}s (expected: cache hit)")
inspect_response(result_resp_2, label="Responses API - Cache Hit")
This vectorizer has no async embed method. Falling back to sync.
Query 2 (Responses API): 'Tell me Japan's capital city'
==================================================
Time: 0.18s (expected: cache hit)
--- Responses API - Cache Hit ---
Content type: list
Number of content blocks: 1
Block 0: type=text, has_id=False
Response: The capital of Japan is Tokyo.
Cached: True
# Verify cached Responses API message has no provider IDs
print("Verifying cached content blocks are clean...")
print("="*50)
ai_msg = result_resp_2["messages"][-1]
if isinstance(ai_msg.content, list):
for block in ai_msg.content:
if isinstance(block, dict):
assert "id" not in block, f"Cached block has provider ID: {block}"
print("Cached content blocks are clean -- no provider IDs!")
print(f"Number of blocks: {len(ai_msg.content)}")
for i, block in enumerate(ai_msg.content):
if isinstance(block, dict):
print(f" Block {i}: type={block.get('type')}, keys={list(block.keys())}")
else:
print(f"Content is a plain string (length={len(ai_msg.content)})")
print("This is expected if the cache stored it as a string.")
print(f"\nCached: {ai_msg.additional_kwargs.get('cached', False)}")
print("\nThe middleware strips 'id' fields from content blocks during cache storage.")
print("This prevents duplicate ID errors when cached responses re-enter the LLM.")
Verifying cached content blocks are clean...
==================================================
Cached content blocks are clean -- no provider IDs!
Number of blocks: 1
Block 0: type=text, keys=['type', 'text', 'annotations']
Cached: True
The middleware strips 'id' fields from content blocks during cache storage.
This prevents duplicate ID errors when cached responses re-enter the LLM.
Multi-Turn Conversation#
When using the Responses API with a checkpointer, state accumulates across turns. Each turn’s AIMessage may contain content blocks with provider IDs. The middleware ensures that cached messages don’t introduce duplicate IDs into the conversation.
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
# Create checkpointer for multi-turn state
multi_turn_checkpointer = AsyncRedisSaver(redis_url=REDIS_URL)
await multi_turn_checkpointer.asetup()
cache_name_multi = f"demo_multi_turn_cache_{uuid.uuid4().hex[:8]}"
cache_middleware_multi = SemanticCacheMiddleware(
SemanticCacheConfig(
redis_url=REDIS_URL,
name=cache_name_multi,
distance_threshold=0.15,
ttl_seconds=3600,
cache_final_only=True,
)
)
agent_multi = create_agent(
model=model_responses_api,
tools=tools,
checkpointer=multi_turn_checkpointer,
middleware=[cache_middleware_multi],
)
thread_id = f"multi_turn_{uuid.uuid4().hex[:8]}"
config = {"configurable": {"thread_id": thread_id}}
# Turn 1: Original question
print("Turn 1: 'What is the tallest mountain in the world?'")
print("="*50)
result_t1 = await agent_multi.ainvoke(
{"messages": [HumanMessage(content="What is the tallest mountain in the world?")]},
config=config,
)
inspect_response(result_t1, label="Turn 1")
# Turn 2: Follow-up question (different topic, no cache hit expected)
print("\nTurn 2: 'How tall is it in feet?'")
print("="*50)
result_t2 = await agent_multi.ainvoke(
{"messages": [HumanMessage(content="How tall is it in feet?")]},
config=config,
)
inspect_response(result_t2, label="Turn 2")
# Turn 3: New thread asking same question as Turn 1 (should hit cache)
thread_id_2 = f"multi_turn_{uuid.uuid4().hex[:8]}"
config_2 = {"configurable": {"thread_id": thread_id_2}}
print("\nTurn 3 (new thread): 'What is the tallest mountain on Earth?'")
print("="*50)
result_t3 = await agent_multi.ainvoke(
{"messages": [HumanMessage(content="What is the tallest mountain on Earth?")]},
config=config_2,
)
inspect_response(result_t3, label="Turn 3 (cache hit expected)")
# Verify no duplicate IDs across all turns
all_ids = set()
for label, result in [("Turn 1", result_t1), ("Turn 2", result_t2), ("Turn 3", result_t3)]:
ai_msg = result["messages"][-1]
if isinstance(ai_msg.content, list):
for block in ai_msg.content:
if isinstance(block, dict) and "id" in block:
block_id = block["id"]
assert block_id not in all_ids, f"Duplicate ID found in {label}: {block_id}"
all_ids.add(block_id)
print("\nNo duplicate content block IDs across turns!")
Turn 1: 'What is the tallest mountain in the world?'
==================================================
This vectorizer has no async embed method. Falling back to sync.
This vectorizer has no async embed method. Falling back to sync.
This vectorizer has no async embed method. Falling back to sync.
--- Turn 1 ---
Content type: list
Number of content blocks: 1
Block 0: type=text, has_id=True
Response: The tallest mountain in the world is Mount Everest. It stands at an elevation of 8,848.86 meters (29,031.7 feet) above sea level. Mount Everest is located in the Himalayas, on the border between Nepal...
Cached: False
Turn 2: 'How tall is it in feet?'
==================================================
This vectorizer has no async embed method. Falling back to sync.
This vectorizer has no async embed method. Falling back to sync.
--- Turn 2 ---
Content type: list
Number of content blocks: 1
Block 0: type=text, has_id=True
Response: Mount Everest is 29,031.7 feet tall.
Cached: False
Turn 3 (new thread): 'What is the tallest mountain on Earth?'
==================================================
--- Turn 3 (cache hit expected) ---
Content type: list
Number of content blocks: 1
Block 0: type=text, has_id=False
Response: The tallest mountain in the world is Mount Everest. It stands at an elevation of 8,848.86 meters (29,031.7 feet) above sea level. Mount Everest is located in the Himalayas, on the border between Nepal...
Cached: True
No duplicate content block IDs across turns!
Tool-Aware Caching#
By default, cache_final_only=True means only final responses (without tool calls) are cached.
This prevents caching intermediate tool-calling responses.
# Query that requires a tool call
print("Query with tool: 'What's the weather in Tokyo?'")
print("="*50)
start = time.time()
result = await agent_default.ainvoke({"messages": [HumanMessage(content="What's the weather in Tokyo?")]})
elapsed = time.time() - start
print(f"Response: {result['messages'][-1].content}")
print(f"Time: {elapsed:.2f}s")
print("\nNote: The final response (after tool execution) is cached, not the tool-calling step.")
This vectorizer has no async embed method. Falling back to sync.
Query with tool: 'What's the weather in Tokyo?'
==================================================
This vectorizer has no async embed method. Falling back to sync.
Response: The weather in Tokyo is currently 80°F and sunny.
Time: 1.65s
Note: The final response (after tool execution) is cached, not the tool-calling step.
Cleanup#
# Close all middleware and connections
await cache_middleware_default.aclose()
await cache_middleware_responses.aclose()
await cache_middleware_multi.aclose()
try:
await multi_turn_checkpointer.aclose()
except Exception:
pass
print("Middleware closed.")
print("Demo complete!")
Middleware closed.
Demo complete!