Shallow Checkpointers#
Shallow checkpointers store only the most recent checkpoint per thread and namespace, rather than maintaining a full history. This trades off history traversal for significantly reduced storage consumption.
When to Use Shallow Checkpointers#
Use a shallow checkpointer when:
You only need the current state of a conversation, not its full history.
Storage efficiency is a priority (e.g., high-volume, short-lived threads).
Your application does not require time-travel debugging or checkpoint replay.
You want to minimize the number of Redis keys per thread.
Use the full RedisSaver when:
You need to traverse or replay checkpoint history.
Your workflow uses
beforecheckpoints for interrupt/resume patterns that require referencing prior states.You need audit trails of state transitions.
ShallowRedisSaver#
The synchronous shallow checkpointer:
from langgraph.checkpoint.redis.shallow import ShallowRedisSaver
with ShallowRedisSaver.from_conn_string("redis://localhost:6379") as saver:
saver.setup()
graph = builder.compile(checkpointer=saver)
config = {"configurable": {"thread_id": "shallow-thread"}}
result = graph.invoke(inputs, config)
Import Path#
from langgraph.checkpoint.redis.shallow import ShallowRedisSaver
Cache Configuration#
ShallowRedisSaver includes internal LRU caches for key lookups and channel
data. You can configure their sizes:
saver = ShallowRedisSaver(
redis_url="redis://localhost:6379",
key_cache_max_size=2000, # Default: 1000
channel_cache_max_size=200, # Default: 100
)
saver.setup()
Larger cache sizes reduce Redis round trips at the cost of more memory in your application process.
AsyncShallowRedisSaver#
The async variant for use with asyncio:
from langgraph.checkpoint.redis.ashallow import AsyncShallowRedisSaver
async with AsyncShallowRedisSaver.from_conn_string("redis://localhost:6379") as saver:
await saver.asetup()
graph = builder.compile(checkpointer=saver)
config = {"configurable": {"thread_id": "async-shallow-thread"}}
result = await graph.ainvoke(inputs, config)
Import Path#
from langgraph.checkpoint.redis.ashallow import AsyncShallowRedisSaver
Same API Surface#
Shallow checkpointers implement the same BaseCheckpointSaver interface as
the full checkpointers. You can use them as drop-in replacements:
# These all work the same way as with RedisSaver
checkpoint_tuple = saver.get_tuple(config)
for ct in saver.list(config):
print(ct.metadata)
saver.put_writes(config, writes, task_id)
The key difference is that list() returns at most one checkpoint per
thread-namespace combination.
Trade-offs#
Feature |
RedisSaver |
ShallowRedisSaver |
|---|---|---|
Checkpoint history |
Full history |
Latest only |
Storage per thread |
Grows with steps |
Constant |
Time travel |
Supported |
Not supported |
|
All checkpoints |
Latest checkpoint |
Redis key pattern |
|
|
Interrupt/resume |
Full support |
Supported for current state |
Using with TTL#
Shallow checkpointers support TTL configuration, just like the full checkpointers:
saver = ShallowRedisSaver(
redis_url="redis://localhost:6379",
ttl={"default_ttl": 60, "refresh_on_read": True},
)
saver.setup()
See TTL Configuration for more details on TTL configuration.
Example: High-Volume Chat Application#
For a chat application with many concurrent users where only the latest conversation state matters:
from langgraph.checkpoint.redis.shallow import ShallowRedisSaver
with ShallowRedisSaver.from_conn_string(
"redis://localhost:6379",
ttl={"default_ttl": 120, "refresh_on_read": True}, # 2-hour TTL
) as saver:
saver.setup()
graph = builder.compile(checkpointer=saver)
# Each user session gets its own thread
for user_id, message in incoming_messages:
config = {"configurable": {"thread_id": f"chat:{user_id}"}}
result = graph.invoke({"messages": [message]}, config)
send_response(user_id, result)
Next Steps#
Checkpointers – full checkpointer reference
TTL Configuration – configure automatic expiration
Stores – add cross-thread persistence with
RedisStore