RedisSaver#
Synchronous checkpoint saver backed by Redis.
- class AsyncRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Bases:
BaseRedisSaver[Redis|RedisCluster,AsyncSearchIndex]Async Redis implementation for checkpoint saver.
Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).
Initialize Redis-backed checkpoint saver.
- Parameters:
redis_url (Optional[str]) – Redis connection URL
redis_client (Optional[Union[AsyncRedis, AsyncRedisCluster]]) – Redis client instance to use (alternative to redis_url)
connection_args (Optional[Dict[str, Any]]) – Additional arguments for Redis connection
ttl (Optional[Dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads
checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)
checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)
- classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
- Parameters:
redis_url (str | None)
redis_client (Redis | RedisCluster | None)
connection_args (Dict[str, Any] | None)
ttl (Dict[str, Any] | None)
checkpoint_prefix (str)
checkpoint_write_prefix (str)
- Return type:
AsyncIterator[AsyncRedisSaver]
- async adelete_thread(thread_id)[source]#
Delete all checkpoints and writes associated with a specific thread ID.
- Parameters:
thread_id (str) – The thread ID whose checkpoints should be deleted.
- Return type:
None
- async aget_channel_values(thread_id, checkpoint_ns='', checkpoint_id='', channel_versions=None)[source]#
Retrieve channel_values using efficient FT.SEARCH with checkpoint_id.
- Parameters:
thread_id (str)
checkpoint_ns (str)
checkpoint_id (str)
channel_versions (Dict[str, Any] | None)
- Return type:
Dict[str, Any]
- async aget_tuple(config)[source]#
Get a checkpoint tuple from Redis asynchronously.
- Parameters:
config (RunnableConfig)
- Return type:
CheckpointTuple | None
- async alist(config, *, filter=None, before=None, limit=None)[source]#
List checkpoints from Redis asynchronously.
- Parameters:
config (RunnableConfig | None)
filter (dict[str, Any] | None)
before (RunnableConfig | None)
limit (int | None)
- Return type:
AsyncIterator[CheckpointTuple]
- async aprune(thread_ids, *, strategy='keep_latest', keep_last=None, max_results=10000)[source]#
Prune old checkpoints for the given threads per namespace.
Retains the most-recent checkpoints per checkpoint namespace and removes the rest, along with their associated write keys and key-registry sorted sets.
Each namespace (root
""and any subgraph namespaces) is treated as an independent checkpoint chain. Channel values are stored inline within each checkpoint document, so they are automatically removed when the checkpoint document is deleted.- Parameters:
thread_ids (Sequence[str]) – Thread IDs whose old checkpoints should be pruned.
strategy (str) – Pruning strategy.
"keep_latest"retains only the most recent checkpoint per namespace (default)."delete"removes all checkpoints for the thread.keep_last (int | None) – Optional override — number of recent checkpoints to retain per namespace. When provided, takes precedence over
strategy. Usekeep_last=0to remove all checkpoints.max_results (int) – Maximum number of checkpoints fetched from the index per thread in a single query. Defaults to 10 000.
- Return type:
None
- async aput(config, checkpoint, metadata, new_versions, stream_mode='values')[source]#
Store a checkpoint to Redis with proper transaction handling.
This method ensures that all Redis operations are performed atomically using Redis transactions. In case of interruption (asyncio.CancelledError), the transaction will be aborted, ensuring consistency.
- Parameters:
config (RunnableConfig) – The config to associate with the checkpoint
checkpoint (Checkpoint) – The checkpoint data to store
metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint
new_versions (dict[str, str | int | float]) – New channel versions as of this write
stream_mode (str) – The streaming mode being used (values, updates, etc.)
- Returns:
Updated configuration after storing the checkpoint
- Raises:
asyncio.CancelledError – If the operation is cancelled/interrupted
- Return type:
RunnableConfig
- async aput_writes(config, writes, task_id, task_path='')[source]#
Store intermediate writes linked to a checkpoint using Redis JSON.
This method uses Redis pipeline without transaction to avoid lock contention during parallel test execution.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (List[Tuple[str, Any]]) – List of writes to store.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Path of the task creating the writes.
- Raises:
asyncio.CancelledError – If the operation is cancelled/interrupted
- Return type:
None
- configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#
Configure the Redis client.
Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).
- Parameters:
redis_url (str | None)
redis_client (Redis | RedisCluster | None)
connection_args (Dict[str, Any] | None)
- Return type:
None
- get_channel_values(thread_id, checkpoint_ns='', checkpoint_id='')[source]#
Retrieve channel_values using efficient FT.SEARCH with checkpoint_id (sync wrapper).
- Parameters:
thread_id (str)
checkpoint_ns (str)
checkpoint_id (str)
- Return type:
Dict[str, Any]
- get_tuple(config)[source]#
Get a checkpoint tuple from Redis.
- Parameters:
config (RunnableConfig) – The config to use for retrieving the checkpoint.
- Returns:
The retrieved checkpoint tuple, or None if no matching checkpoint was found.
- Return type:
Optional[CheckpointTuple]
- Raises:
asyncio.InvalidStateError – If called from the wrong thread/event loop
- put(config, checkpoint, metadata, new_versions)[source]#
Store a checkpoint to Redis.
- Parameters:
config (RunnableConfig) – The config to associate with the checkpoint.
checkpoint (Checkpoint) – The checkpoint to save.
metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint.
new_versions (ChannelVersions) – New channel versions as of this write.
- Returns:
Updated configuration after storing the checkpoint.
- Return type:
RunnableConfig
- Raises:
asyncio.InvalidStateError – If called from the wrong thread/event loop
- put_writes(config, writes, task_id, task_path='')[source]#
Synchronous wrapper for aput_writes.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (List[Tuple[str, Any]]) – List of writes to store.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Path of the task creating the writes.
- Return type:
None
- async setup()[source]#
Set up the checkpoint saver asynchronously.
This method creates the necessary indices in Redis and detects cluster mode. It MUST be called before using the checkpointer.
This async method follows the canonical pattern used by PostgreSQL and SQLite checkpointers in the LangGraph ecosystem. The type ignore is necessary because the base class defines a sync setup() method, but async checkpointers require an async setup() method to properly handle coroutines.
Usage: await checkpointer.setup()
- Return type:
None
- checkpoint_writes_index: AsyncSearchIndex#
- checkpoints_index: AsyncSearchIndex#
- cluster_mode: bool | None = None#
- class AsyncShallowRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Bases:
BaseRedisSaver[Redis,AsyncSearchIndex]Async Redis implementation that only stores the most recent checkpoint.
Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).
Initialize Redis-backed checkpoint saver.
- Parameters:
redis_url (Optional[str]) – Redis connection URL
redis_client (Optional[AsyncRedis]) – Redis client instance to use (alternative to redis_url)
connection_args (Optional[dict[str, Any]]) – Additional arguments for Redis connection
ttl (Optional[dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads
checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)
checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)
- classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Create a new AsyncShallowRedisSaver instance.
- Parameters:
redis_url (str | None)
redis_client (Redis | None)
connection_args (dict[str, Any] | None)
ttl (dict[str, Any] | None)
checkpoint_prefix (str)
checkpoint_write_prefix (str)
- Return type:
AsyncIterator[AsyncShallowRedisSaver]
- async adelete_thread(thread_id)[source]#
Delete checkpoint and writes associated with a specific thread ID.
- Parameters:
thread_id (str) – The thread ID which checkpoint should be deleted.
- Return type:
None
- async aget_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#
Retrieve channel_values dictionary from inline checkpoint data.
- Parameters:
thread_id (str)
checkpoint_ns (str)
checkpoint_id (str)
channel_versions (Dict[str, Any] | None)
- Return type:
dict[str, Any]
- async aget_tuple(config)[source]#
Retrieve a checkpoint tuple from Redis asynchronously.
- Parameters:
config (RunnableConfig)
- Return type:
CheckpointTuple | None
- async alist(config, *, filter=None, before=None, limit=None)[source]#
List checkpoints from Redis asynchronously.
- Parameters:
config (RunnableConfig | None)
filter (Dict[str, Any] | None)
before (RunnableConfig | None)
limit (int | None)
- Return type:
AsyncIterator[CheckpointTuple]
- async aprune(thread_ids, *, strategy='keep_latest', keep_last=None)[source]#
Prune checkpoints for the given threads.
AsyncShallowRedisSaverstores at most one checkpoint per namespace by design, sostrategy="keep_latest"(orkeep_last >= 1) is always a no-op.strategy="delete"(orkeep_last=0) removes all checkpoints for each thread (equivalent toadelete_thread).- Parameters:
thread_ids (Sequence[str]) – Thread IDs to prune.
strategy (str) – Pruning strategy.
"keep_latest"is a no-op for shallow savers (default)."delete"removes all.keep_last (int | None) – Optional override. Any value >= 1 is a no-op. Pass
0to delete all.
- Return type:
None
- async aput(config, checkpoint, metadata, new_versions)[source]#
Store checkpoint with INLINE channel values
Stores all channel values directly in main checkpoint JSON
- Parameters:
config (RunnableConfig) – The config to associate with the checkpoint
checkpoint (Checkpoint) – The checkpoint data to store
metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint
new_versions (dict[str, str | int | float]) – New channel versions as of this write
- Returns:
Updated configuration after storing the checkpoint
- Raises:
asyncio.CancelledError – If the operation is cancelled/interrupted
- Return type:
RunnableConfig
- async aput_writes(config, writes, task_id, task_path='')[source]#
Store intermediate writes for the latest checkpoint and clean up old writes with transaction handling.
This method uses Redis pipeline with transaction=True to ensure atomicity of all write operations. In case of interruption, all operations will be aborted.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (List[Tuple[str, Any]]) – List of writes to store.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Path of the task creating the writes.
- Raises:
asyncio.CancelledError – If the operation is cancelled/interrupted
- Return type:
None
- configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#
Configure the Redis client.
Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).
- Parameters:
redis_url (str | None)
redis_client (Redis | None)
connection_args (dict[str, Any] | None)
- Return type:
None
- get_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#
Retrieve channel_values dictionary with properly constructed message objects (sync wrapper).
- Parameters:
thread_id (str)
checkpoint_ns (str)
checkpoint_id (str)
channel_versions (Dict[str, Any] | None)
- Return type:
dict[str, Any]
- get_tuple(config)[source]#
Retrieve a checkpoint tuple from Redis synchronously.
- Parameters:
config (RunnableConfig)
- Return type:
CheckpointTuple | None
- put(config, checkpoint, metadata, new_versions)[source]#
Store only the latest checkpoint synchronously.
- Parameters:
config (RunnableConfig)
checkpoint (Checkpoint)
metadata (CheckpointMetadata)
new_versions (dict[str, str | int | float])
- Return type:
RunnableConfig
- put_writes(config, writes, task_id, task_path='')[source]#
Store intermediate writes synchronously.
- Parameters:
config (RunnableConfig)
writes (Sequence[Tuple[str, Any]])
task_id (str)
task_path (str)
- Return type:
None
- async setup()[source]#
Set up the checkpoint saver asynchronously.
This method creates the necessary indices in Redis. It MUST be called before using the checkpointer.
This async method follows the canonical pattern used by other async checkpointers in the LangGraph ecosystem. The type ignore is necessary because the base class defines a sync setup() method, but async checkpointers require an async setup() method to properly handle coroutines.
- Return type:
None
- checkpoint_writes_index: AsyncSearchIndex#
- checkpoints_index: AsyncSearchIndex#
- class BaseRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Bases:
BaseCheckpointSaver[str],Generic[RedisClientType,IndexType]Base Redis implementation for checkpoint saving.
Uses Redis JSON for storing checkpoints and related data, with RediSearch for querying.
Initialize Redis-backed checkpoint saver.
- Parameters:
redis_url (str | None) – Redis connection URL
redis_client (RedisClientType | None) – Redis client instance to use (alternative to redis_url)
connection_args (Dict[str, Any] | None) – Additional arguments for Redis connection
ttl (Dict[str, Any] | None) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads
checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)
checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)
- async aset_client_info()[source]#
Set client info for Redis monitoring asynchronously.
- Return type:
None
- abstractmethod configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#
Configure the Redis client.
- Parameters:
redis_url (str | None)
redis_client (RedisClientType | None)
connection_args (Dict[str, Any] | None)
- Return type:
None
- abstractmethod create_indexes()[source]#
Create appropriate SearchIndex instances.
- Return type:
None
- get_next_version(current, channel)[source]#
Generate next version number.
- Parameters:
current (str | None)
channel (ChannelProtocol[Any, Any, Any])
- Return type:
str
- put_writes(config, writes, task_id, task_path='')[source]#
Store intermediate writes linked to a checkpoint.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Optional path info for the task.
- Return type:
None
- checkpoint_writes_index: IndexType#
- checkpoints_index: IndexType#
- property checkpoints_schema: Dict[str, Any]#
Schema for the checkpoints index.
- property writes_schema: Dict[str, Any]#
Schema for the checkpoint writes index.
- class MessageExporter(redis_saver, recipe=None)[source]#
Bases:
objectExport messages from Redis checkpoints.
- Parameters:
redis_saver (Any)
recipe (MessageRecipe | None)
- class MessageRecipe(*args, **kwargs)[source]#
Bases:
ProtocolProtocol for message extraction recipes.
Implement this interface to support custom message formats.
- class RedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Bases:
BaseRedisSaver[Redis|RedisCluster,SearchIndex]Standard Redis implementation for checkpoint saving.
Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).
Initialize Redis-backed checkpoint saver.
- Parameters:
redis_url (Optional[str]) – Redis connection URL
redis_client (Optional[Union[Redis, RedisCluster]]) – Redis client instance to use (alternative to redis_url)
connection_args (Optional[Dict[str, Any]]) – Additional arguments for Redis connection
ttl (Optional[Dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads
checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)
checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)
- classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Create a new RedisSaver instance.
- Parameters:
redis_url (str | None)
redis_client (Redis | RedisCluster | None)
connection_args (Dict[str, Any] | None)
ttl (Dict[str, Any] | None)
checkpoint_prefix (str)
checkpoint_write_prefix (str)
- Return type:
Iterator[RedisSaver]
- configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#
Configure the Redis client.
Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).
- Parameters:
redis_url (str | None)
redis_client (Redis | RedisCluster | None)
connection_args (Dict[str, Any] | None)
- Return type:
None
- delete_thread(thread_id)[source]#
Delete all checkpoints and writes associated with a specific thread ID.
- Parameters:
thread_id (str) – The thread ID whose checkpoints should be deleted.
- Return type:
None
- get_channel_values(thread_id, checkpoint_ns='', checkpoint_id='', channel_versions=None)[source]#
Retrieve channel_values using efficient FT.SEARCH with checkpoint_id.
- Parameters:
thread_id (str)
checkpoint_ns (str)
checkpoint_id (str)
channel_versions (Dict[str, str] | None)
- Return type:
Dict[str, Any]
- get_tuple(config)[source]#
Get a checkpoint tuple from Redis.
- Parameters:
config (RunnableConfig) – The config to use for retrieving the checkpoint.
- Returns:
The retrieved checkpoint tuple, or None if no matching checkpoint was found.
- Return type:
Optional[CheckpointTuple]
- list(config, *, filter=None, before=None, limit=None)[source]#
List checkpoints from Redis.
- Parameters:
config (RunnableConfig | None)
filter (dict[str, Any] | None)
before (RunnableConfig | None)
limit (int | None)
- Return type:
Iterator[CheckpointTuple]
- prune(thread_ids, *, strategy='keep_latest', keep_last=None, max_results=10000)[source]#
Prune old checkpoints for the given threads per namespace.
Retains the most-recent checkpoints per checkpoint namespace and removes the rest, along with their associated write keys and key-registry sorted sets.
Each namespace (root
""and any subgraph namespaces) is treated as an independent checkpoint chain. Channel values are stored inline within each checkpoint document, so they are automatically removed when the checkpoint document is deleted.- Parameters:
thread_ids (Sequence[str]) – Thread IDs whose old checkpoints should be pruned.
strategy (str) – Pruning strategy.
"keep_latest"retains only the most recent checkpoint per namespace (default)."delete"removes all checkpoints for the thread.keep_last (int | None) – Optional override — number of recent checkpoints to retain per namespace. When provided, takes precedence over
strategy. Usekeep_last=0to remove all checkpoints.max_results (int) – Maximum number of checkpoints fetched from the index per thread in a single query. Defaults to 10 000.
- Return type:
None
- put(config, checkpoint, metadata, new_versions)[source]#
Store a checkpoint to Redis with inline channel value storage.
- Parameters:
config (RunnableConfig)
checkpoint (Checkpoint)
metadata (CheckpointMetadata)
new_versions (dict[str, str | int | float])
- Return type:
RunnableConfig
- put_writes(config, writes, task_id, task_path='')[source]#
Store intermediate writes linked to a checkpoint with integrated key registry.
- Parameters:
config (RunnableConfig)
writes (Sequence[tuple[str, Any]])
task_id (str)
task_path (str)
- Return type:
None
- cluster_mode: bool | None = None#
- class ShallowRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, key_cache_max_size=None, channel_cache_max_size=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Bases:
BaseRedisSaver[Redis,SearchIndex]Redis implementation that only stores the most recent checkpoint.
Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).
Initialize Redis-backed checkpoint saver.
- Parameters:
redis_url (Optional[str]) – Redis connection URL
redis_client (Optional[Redis]) – Redis client instance to use (alternative to redis_url)
connection_args (Optional[dict[str, Any]]) – Additional arguments for Redis connection
ttl (Optional[dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads
checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)
checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)
key_cache_max_size (Optional[int])
channel_cache_max_size (Optional[int])
- classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, key_cache_max_size=None, channel_cache_max_size=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Create a new ShallowRedisSaver instance.
- Parameters:
redis_url (str | None)
redis_client (Redis | None)
connection_args (dict[str, Any] | None)
ttl (dict[str, Any] | None)
key_cache_max_size (int | None)
channel_cache_max_size (int | None)
checkpoint_prefix (str)
checkpoint_write_prefix (str)
- Return type:
Iterator[ShallowRedisSaver]
- configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#
Configure the Redis client.
- Parameters:
redis_url (str | None)
redis_client (Redis | None)
connection_args (dict[str, Any] | None)
- Return type:
None
- delete_thread(thread_id)[source]#
Delete all checkpoints and writes associated with a specific thread ID.
- Parameters:
thread_id (str) – The thread ID whose checkpoints should be deleted.
- Return type:
None
- get_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#
Retrieve channel_values dictionary from inline checkpoint data.
- Parameters:
thread_id (str)
checkpoint_ns (str)
checkpoint_id (str)
channel_versions (Dict[str, Any] | None)
- Return type:
dict[str, Any]
- get_tuple(config)[source]#
Get checkpoint with inline channel values.
- Parameters:
config (RunnableConfig)
- Return type:
CheckpointTuple | None
- list(config, *, filter=None, before=None, limit=None)[source]#
List checkpoints from Redis.
- Parameters:
config (RunnableConfig | None)
filter (Dict[str, Any] | None)
before (RunnableConfig | None)
limit (int | None)
- Return type:
Iterator[CheckpointTuple]
- prune(thread_ids, *, strategy='keep_latest', keep_last=None)[source]#
Prune checkpoints for the given threads.
ShallowRedisSaverstores at most one checkpoint per namespace by design, sostrategy="keep_latest"(orkeep_last >= 1) is always a no-op.strategy="delete"(orkeep_last=0) removes all checkpoints for each thread (equivalent todelete_thread).- Parameters:
thread_ids (Sequence[str]) – Thread IDs to prune.
strategy (str) – Pruning strategy.
"keep_latest"is a no-op for shallow savers (default)."delete"removes all.keep_last (int | None) – Optional override. Any value >= 1 is a no-op. Pass
0to delete all.
- Return type:
None
- put(config, checkpoint, metadata, new_versions)[source]#
Store checkpoint with inline channel values.
- Parameters:
config (RunnableConfig)
checkpoint (Checkpoint)
metadata (CheckpointMetadata)
new_versions (dict[str, str | int | float])
- Return type:
RunnableConfig
- put_writes(config, writes, task_id, task_path='')[source]#
Store intermediate writes linked to a checkpoint with checkpoint-level registry.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Optional path info for the task.
- Return type:
None
- DEFAULT_CHANNEL_CACHE_MAX_SIZE = 100#
- DEFAULT_KEY_CACHE_MAX_SIZE = 1000#
- class BaseRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Bases:
BaseCheckpointSaver[str],Generic[RedisClientType,IndexType]Base Redis implementation for checkpoint saving.
Uses Redis JSON for storing checkpoints and related data, with RediSearch for querying.
Initialize Redis-backed checkpoint saver.
- Parameters:
redis_url (str | None) – Redis connection URL
redis_client (RedisClientType | None) – Redis client instance to use (alternative to redis_url)
connection_args (Dict[str, Any] | None) – Additional arguments for Redis connection
ttl (Dict[str, Any] | None) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads
checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)
checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)
- async aset_client_info()[source]#
Set client info for Redis monitoring asynchronously.
- Return type:
None
- abstractmethod configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#
Configure the Redis client.
- Parameters:
redis_url (str | None)
redis_client (RedisClientType | None)
connection_args (Dict[str, Any] | None)
- Return type:
None
- abstractmethod create_indexes()[source]#
Create appropriate SearchIndex instances.
- Return type:
None
- get_next_version(current, channel)[source]#
Generate next version number.
- Parameters:
current (str | None)
channel (ChannelProtocol[Any, Any, Any])
- Return type:
str
- put_writes(config, writes, task_id, task_path='')[source]#
Store intermediate writes linked to a checkpoint.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Optional path info for the task.
- Return type:
None
- checkpoint_writes_index: IndexType#
- checkpoints_index: IndexType#
- property checkpoints_schema: Dict[str, Any]#
Schema for the checkpoints index.
- property writes_schema: Dict[str, Any]#
Schema for the checkpoint writes index.