RedisSaver

Contents

RedisSaver#

Synchronous checkpoint saver backed by Redis.

class AsyncRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Bases: BaseRedisSaver[Redis | RedisCluster, AsyncSearchIndex]

Async Redis implementation for checkpoint saver.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Initialize Redis-backed checkpoint saver.

Parameters:
  • redis_url (Optional[str]) – Redis connection URL

  • redis_client (Optional[Union[AsyncRedis, AsyncRedisCluster]]) – Redis client instance to use (alternative to redis_url)

  • connection_args (Optional[Dict[str, Any]]) – Additional arguments for Redis connection

  • ttl (Optional[Dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads

  • checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)

  • checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)

classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Parameters:
  • redis_url (str | None)

  • redis_client (Redis | RedisCluster | None)

  • connection_args (Dict[str, Any] | None)

  • ttl (Dict[str, Any] | None)

  • checkpoint_prefix (str)

  • checkpoint_write_prefix (str)

Return type:

AsyncIterator[AsyncRedisSaver]

async adelete_thread(thread_id)[source]#

Delete all checkpoints and writes associated with a specific thread ID.

Parameters:

thread_id (str) – The thread ID whose checkpoints should be deleted.

Return type:

None

async aget_channel_values(thread_id, checkpoint_ns='', checkpoint_id='', channel_versions=None)[source]#

Retrieve channel_values using efficient FT.SEARCH with checkpoint_id.

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

  • channel_versions (Dict[str, Any] | None)

Return type:

Dict[str, Any]

async aget_tuple(config)[source]#

Get a checkpoint tuple from Redis asynchronously.

Parameters:

config (RunnableConfig)

Return type:

CheckpointTuple | None

async alist(config, *, filter=None, before=None, limit=None)[source]#

List checkpoints from Redis asynchronously.

Parameters:
  • config (RunnableConfig | None)

  • filter (dict[str, Any] | None)

  • before (RunnableConfig | None)

  • limit (int | None)

Return type:

AsyncIterator[CheckpointTuple]

async aprune(thread_ids, *, strategy='keep_latest', keep_last=None, max_results=10000)[source]#

Prune old checkpoints for the given threads per namespace.

Retains the most-recent checkpoints per checkpoint namespace and removes the rest, along with their associated write keys and key-registry sorted sets.

Each namespace (root "" and any subgraph namespaces) is treated as an independent checkpoint chain. Channel values are stored inline within each checkpoint document, so they are automatically removed when the checkpoint document is deleted.

Parameters:
  • thread_ids (Sequence[str]) – Thread IDs whose old checkpoints should be pruned.

  • strategy (str) – Pruning strategy. "keep_latest" retains only the most recent checkpoint per namespace (default). "delete" removes all checkpoints for the thread.

  • keep_last (int | None) – Optional override — number of recent checkpoints to retain per namespace. When provided, takes precedence over strategy. Use keep_last=0 to remove all checkpoints.

  • max_results (int) – Maximum number of checkpoints fetched from the index per thread in a single query. Defaults to 10 000.

Return type:

None

async aput(config, checkpoint, metadata, new_versions, stream_mode='values')[source]#

Store a checkpoint to Redis with proper transaction handling.

This method ensures that all Redis operations are performed atomically using Redis transactions. In case of interruption (asyncio.CancelledError), the transaction will be aborted, ensuring consistency.

Parameters:
  • config (RunnableConfig) – The config to associate with the checkpoint

  • checkpoint (Checkpoint) – The checkpoint data to store

  • metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint

  • new_versions (dict[str, str | int | float]) – New channel versions as of this write

  • stream_mode (str) – The streaming mode being used (values, updates, etc.)

Returns:

Updated configuration after storing the checkpoint

Raises:

asyncio.CancelledError – If the operation is cancelled/interrupted

Return type:

RunnableConfig

async aput_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes linked to a checkpoint using Redis JSON.

This method uses Redis pipeline without transaction to avoid lock contention during parallel test execution.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (List[Tuple[str, Any]]) – List of writes to store.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Path of the task creating the writes.

Raises:

asyncio.CancelledError – If the operation is cancelled/interrupted

Return type:

None

async asetup()[source]#

Set up the checkpoint saver.

Return type:

None

configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#

Configure the Redis client.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | RedisCluster | None)

  • connection_args (Dict[str, Any] | None)

Return type:

None

create_indexes()[source]#

Create indexes without connecting to Redis.

Return type:

None

get_channel_values(thread_id, checkpoint_ns='', checkpoint_id='')[source]#

Retrieve channel_values using efficient FT.SEARCH with checkpoint_id (sync wrapper).

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

Return type:

Dict[str, Any]

get_tuple(config)[source]#

Get a checkpoint tuple from Redis.

Parameters:

config (RunnableConfig) – The config to use for retrieving the checkpoint.

Returns:

The retrieved checkpoint tuple, or None if no matching checkpoint was found.

Return type:

Optional[CheckpointTuple]

Raises:

asyncio.InvalidStateError – If called from the wrong thread/event loop

put(config, checkpoint, metadata, new_versions)[source]#

Store a checkpoint to Redis.

Parameters:
  • config (RunnableConfig) – The config to associate with the checkpoint.

  • checkpoint (Checkpoint) – The checkpoint to save.

  • metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint.

  • new_versions (ChannelVersions) – New channel versions as of this write.

Returns:

Updated configuration after storing the checkpoint.

Return type:

RunnableConfig

Raises:

asyncio.InvalidStateError – If called from the wrong thread/event loop

put_writes(config, writes, task_id, task_path='')[source]#

Synchronous wrapper for aput_writes.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (List[Tuple[str, Any]]) – List of writes to store.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Path of the task creating the writes.

Return type:

None

async setup()[source]#

Set up the checkpoint saver asynchronously.

This method creates the necessary indices in Redis and detects cluster mode. It MUST be called before using the checkpointer.

This async method follows the canonical pattern used by PostgreSQL and SQLite checkpointers in the LangGraph ecosystem. The type ignore is necessary because the base class defines a sync setup() method, but async checkpointers require an async setup() method to properly handle coroutines.

Usage: await checkpointer.setup()

Return type:

None

checkpoint_writes_index: AsyncSearchIndex#
checkpoints_index: AsyncSearchIndex#
cluster_mode: bool | None = None#
class AsyncShallowRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Bases: BaseRedisSaver[Redis, AsyncSearchIndex]

Async Redis implementation that only stores the most recent checkpoint.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Initialize Redis-backed checkpoint saver.

Parameters:
  • redis_url (Optional[str]) – Redis connection URL

  • redis_client (Optional[AsyncRedis]) – Redis client instance to use (alternative to redis_url)

  • connection_args (Optional[dict[str, Any]]) – Additional arguments for Redis connection

  • ttl (Optional[dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads

  • checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)

  • checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)

classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Create a new AsyncShallowRedisSaver instance.

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | None)

  • connection_args (dict[str, Any] | None)

  • ttl (dict[str, Any] | None)

  • checkpoint_prefix (str)

  • checkpoint_write_prefix (str)

Return type:

AsyncIterator[AsyncShallowRedisSaver]

async adelete_thread(thread_id)[source]#

Delete checkpoint and writes associated with a specific thread ID.

Parameters:

thread_id (str) – The thread ID which checkpoint should be deleted.

Return type:

None

async aget_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#

Retrieve channel_values dictionary from inline checkpoint data.

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

  • channel_versions (Dict[str, Any] | None)

Return type:

dict[str, Any]

async aget_tuple(config)[source]#

Retrieve a checkpoint tuple from Redis asynchronously.

Parameters:

config (RunnableConfig)

Return type:

CheckpointTuple | None

async alist(config, *, filter=None, before=None, limit=None)[source]#

List checkpoints from Redis asynchronously.

Parameters:
  • config (RunnableConfig | None)

  • filter (Dict[str, Any] | None)

  • before (RunnableConfig | None)

  • limit (int | None)

Return type:

AsyncIterator[CheckpointTuple]

async aprune(thread_ids, *, strategy='keep_latest', keep_last=None)[source]#

Prune checkpoints for the given threads.

AsyncShallowRedisSaver stores at most one checkpoint per namespace by design, so strategy="keep_latest" (or keep_last >= 1) is always a no-op. strategy="delete" (or keep_last=0) removes all checkpoints for each thread (equivalent to adelete_thread).

Parameters:
  • thread_ids (Sequence[str]) – Thread IDs to prune.

  • strategy (str) – Pruning strategy. "keep_latest" is a no-op for shallow savers (default). "delete" removes all.

  • keep_last (int | None) – Optional override. Any value >= 1 is a no-op. Pass 0 to delete all.

Return type:

None

async aput(config, checkpoint, metadata, new_versions)[source]#

Store checkpoint with INLINE channel values

Stores all channel values directly in main checkpoint JSON

Parameters:
  • config (RunnableConfig) – The config to associate with the checkpoint

  • checkpoint (Checkpoint) – The checkpoint data to store

  • metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint

  • new_versions (dict[str, str | int | float]) – New channel versions as of this write

Returns:

Updated configuration after storing the checkpoint

Raises:

asyncio.CancelledError – If the operation is cancelled/interrupted

Return type:

RunnableConfig

async aput_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes for the latest checkpoint and clean up old writes with transaction handling.

This method uses Redis pipeline with transaction=True to ensure atomicity of all write operations. In case of interruption, all operations will be aborted.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (List[Tuple[str, Any]]) – List of writes to store.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Path of the task creating the writes.

Raises:

asyncio.CancelledError – If the operation is cancelled/interrupted

Return type:

None

async asetup()[source]#

Initialize Redis indexes asynchronously.

Return type:

None

configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#

Configure the Redis client.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | None)

  • connection_args (dict[str, Any] | None)

Return type:

None

create_indexes()[source]#

Create indexes without connecting to Redis.

Return type:

None

get_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#

Retrieve channel_values dictionary with properly constructed message objects (sync wrapper).

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

  • channel_versions (Dict[str, Any] | None)

Return type:

dict[str, Any]

get_tuple(config)[source]#

Retrieve a checkpoint tuple from Redis synchronously.

Parameters:

config (RunnableConfig)

Return type:

CheckpointTuple | None

put(config, checkpoint, metadata, new_versions)[source]#

Store only the latest checkpoint synchronously.

Parameters:
  • config (RunnableConfig)

  • checkpoint (Checkpoint)

  • metadata (CheckpointMetadata)

  • new_versions (dict[str, str | int | float])

Return type:

RunnableConfig

put_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes synchronously.

Parameters:
  • config (RunnableConfig)

  • writes (Sequence[Tuple[str, Any]])

  • task_id (str)

  • task_path (str)

Return type:

None

async setup()[source]#

Set up the checkpoint saver asynchronously.

This method creates the necessary indices in Redis. It MUST be called before using the checkpointer.

This async method follows the canonical pattern used by other async checkpointers in the LangGraph ecosystem. The type ignore is necessary because the base class defines a sync setup() method, but async checkpointers require an async setup() method to properly handle coroutines.

Return type:

None

checkpoint_writes_index: AsyncSearchIndex#
checkpoints_index: AsyncSearchIndex#
class BaseRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Bases: BaseCheckpointSaver[str], Generic[RedisClientType, IndexType]

Base Redis implementation for checkpoint saving.

Uses Redis JSON for storing checkpoints and related data, with RediSearch for querying.

Initialize Redis-backed checkpoint saver.

Parameters:
  • redis_url (str | None) – Redis connection URL

  • redis_client (RedisClientType | None) – Redis client instance to use (alternative to redis_url)

  • connection_args (Dict[str, Any] | None) – Additional arguments for Redis connection

  • ttl (Dict[str, Any] | None) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads

  • checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)

  • checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)

async aset_client_info()[source]#

Set client info for Redis monitoring asynchronously.

Return type:

None

abstractmethod configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#

Configure the Redis client.

Parameters:
  • redis_url (str | None)

  • redis_client (RedisClientType | None)

  • connection_args (Dict[str, Any] | None)

Return type:

None

abstractmethod create_indexes()[source]#

Create appropriate SearchIndex instances.

Return type:

None

get_next_version(current, channel)[source]#

Generate next version number.

Parameters:
  • current (str | None)

  • channel (ChannelProtocol[Any, Any, Any])

Return type:

str

put_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes linked to a checkpoint.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Optional path info for the task.

Return type:

None

set_client_info()[source]#

Set client info for Redis monitoring.

Return type:

None

setup()[source]#

Initialize the indices in Redis.

Return type:

None

checkpoint_writes_index: IndexType#
checkpoints_index: IndexType#
property checkpoints_schema: Dict[str, Any]#

Schema for the checkpoints index.

property writes_schema: Dict[str, Any]#

Schema for the checkpoint writes index.

class LangChainRecipe[source]#

Bases: object

Default recipe for extracting LangChain messages.

extract(message)[source]#

Extract data from LangChain message objects.

Parameters:

message (Any)

Return type:

Dict[str, Any] | None

class MessageExporter(redis_saver, recipe=None)[source]#

Bases: object

Export messages from Redis checkpoints.

Parameters:
export(thread_id, checkpoint_id=None)[source]#

Export messages from checkpoint data.

Parameters:
  • thread_id (str) – The conversation thread ID

  • checkpoint_id (str | None) – Specific checkpoint ID (latest if None)

Returns:

List of extracted message dictionaries

Return type:

List[Dict[str, Any]]

export_thread(thread_id)[source]#

Export all messages from all checkpoints in a thread.

Parameters:

thread_id (str) – The conversation thread ID

Returns:

Dict with thread_id, messages, and export timestamp

Return type:

Dict[str, Any]

class MessageRecipe(*args, **kwargs)[source]#

Bases: Protocol

Protocol for message extraction recipes.

Implement this interface to support custom message formats.

extract(message)[source]#

Extract structured data from a message.

Parameters:

message (Any) – The message to extract data from.

Returns:

Dict with at least ‘role’ and ‘content’ keys, or None if message cannot be extracted.

Return type:

Dict[str, Any] | None

class RedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Bases: BaseRedisSaver[Redis | RedisCluster, SearchIndex]

Standard Redis implementation for checkpoint saving.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Initialize Redis-backed checkpoint saver.

Parameters:
  • redis_url (Optional[str]) – Redis connection URL

  • redis_client (Optional[Union[Redis, RedisCluster]]) – Redis client instance to use (alternative to redis_url)

  • connection_args (Optional[Dict[str, Any]]) – Additional arguments for Redis connection

  • ttl (Optional[Dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads

  • checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)

  • checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)

classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Create a new RedisSaver instance.

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | RedisCluster | None)

  • connection_args (Dict[str, Any] | None)

  • ttl (Dict[str, Any] | None)

  • checkpoint_prefix (str)

  • checkpoint_write_prefix (str)

Return type:

Iterator[RedisSaver]

configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#

Configure the Redis client.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | RedisCluster | None)

  • connection_args (Dict[str, Any] | None)

Return type:

None

create_indexes()[source]#

Create appropriate SearchIndex instances.

Return type:

None

delete_thread(thread_id)[source]#

Delete all checkpoints and writes associated with a specific thread ID.

Parameters:

thread_id (str) – The thread ID whose checkpoints should be deleted.

Return type:

None

get_channel_values(thread_id, checkpoint_ns='', checkpoint_id='', channel_versions=None)[source]#

Retrieve channel_values using efficient FT.SEARCH with checkpoint_id.

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

  • channel_versions (Dict[str, str] | None)

Return type:

Dict[str, Any]

get_tuple(config)[source]#

Get a checkpoint tuple from Redis.

Parameters:

config (RunnableConfig) – The config to use for retrieving the checkpoint.

Returns:

The retrieved checkpoint tuple, or None if no matching checkpoint was found.

Return type:

Optional[CheckpointTuple]

list(config, *, filter=None, before=None, limit=None)[source]#

List checkpoints from Redis.

Parameters:
  • config (RunnableConfig | None)

  • filter (dict[str, Any] | None)

  • before (RunnableConfig | None)

  • limit (int | None)

Return type:

Iterator[CheckpointTuple]

prune(thread_ids, *, strategy='keep_latest', keep_last=None, max_results=10000)[source]#

Prune old checkpoints for the given threads per namespace.

Retains the most-recent checkpoints per checkpoint namespace and removes the rest, along with their associated write keys and key-registry sorted sets.

Each namespace (root "" and any subgraph namespaces) is treated as an independent checkpoint chain. Channel values are stored inline within each checkpoint document, so they are automatically removed when the checkpoint document is deleted.

Parameters:
  • thread_ids (Sequence[str]) – Thread IDs whose old checkpoints should be pruned.

  • strategy (str) – Pruning strategy. "keep_latest" retains only the most recent checkpoint per namespace (default). "delete" removes all checkpoints for the thread.

  • keep_last (int | None) – Optional override — number of recent checkpoints to retain per namespace. When provided, takes precedence over strategy. Use keep_last=0 to remove all checkpoints.

  • max_results (int) – Maximum number of checkpoints fetched from the index per thread in a single query. Defaults to 10 000.

Return type:

None

put(config, checkpoint, metadata, new_versions)[source]#

Store a checkpoint to Redis with inline channel value storage.

Parameters:
  • config (RunnableConfig)

  • checkpoint (Checkpoint)

  • metadata (CheckpointMetadata)

  • new_versions (dict[str, str | int | float])

Return type:

RunnableConfig

put_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes linked to a checkpoint with integrated key registry.

Parameters:
  • config (RunnableConfig)

  • writes (Sequence[tuple[str, Any]])

  • task_id (str)

  • task_path (str)

Return type:

None

setup()[source]#

Initialize the indices in Redis and detect cluster mode.

Return type:

None

cluster_mode: bool | None = None#
class ShallowRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, key_cache_max_size=None, channel_cache_max_size=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Bases: BaseRedisSaver[Redis, SearchIndex]

Redis implementation that only stores the most recent checkpoint.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Initialize Redis-backed checkpoint saver.

Parameters:
  • redis_url (Optional[str]) – Redis connection URL

  • redis_client (Optional[Redis]) – Redis client instance to use (alternative to redis_url)

  • connection_args (Optional[dict[str, Any]]) – Additional arguments for Redis connection

  • ttl (Optional[dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads

  • checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)

  • checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)

  • key_cache_max_size (Optional[int])

  • channel_cache_max_size (Optional[int])

classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, key_cache_max_size=None, channel_cache_max_size=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Create a new ShallowRedisSaver instance.

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | None)

  • connection_args (dict[str, Any] | None)

  • ttl (dict[str, Any] | None)

  • key_cache_max_size (int | None)

  • channel_cache_max_size (int | None)

  • checkpoint_prefix (str)

  • checkpoint_write_prefix (str)

Return type:

Iterator[ShallowRedisSaver]

configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#

Configure the Redis client.

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | None)

  • connection_args (dict[str, Any] | None)

Return type:

None

create_indexes()[source]#

Create appropriate SearchIndex instances.

Return type:

None

delete_thread(thread_id)[source]#

Delete all checkpoints and writes associated with a specific thread ID.

Parameters:

thread_id (str) – The thread ID whose checkpoints should be deleted.

Return type:

None

get_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#

Retrieve channel_values dictionary from inline checkpoint data.

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

  • channel_versions (Dict[str, Any] | None)

Return type:

dict[str, Any]

get_tuple(config)[source]#

Get checkpoint with inline channel values.

Parameters:

config (RunnableConfig)

Return type:

CheckpointTuple | None

list(config, *, filter=None, before=None, limit=None)[source]#

List checkpoints from Redis.

Parameters:
  • config (RunnableConfig | None)

  • filter (Dict[str, Any] | None)

  • before (RunnableConfig | None)

  • limit (int | None)

Return type:

Iterator[CheckpointTuple]

prune(thread_ids, *, strategy='keep_latest', keep_last=None)[source]#

Prune checkpoints for the given threads.

ShallowRedisSaver stores at most one checkpoint per namespace by design, so strategy="keep_latest" (or keep_last >= 1) is always a no-op. strategy="delete" (or keep_last=0) removes all checkpoints for each thread (equivalent to delete_thread).

Parameters:
  • thread_ids (Sequence[str]) – Thread IDs to prune.

  • strategy (str) – Pruning strategy. "keep_latest" is a no-op for shallow savers (default). "delete" removes all.

  • keep_last (int | None) – Optional override. Any value >= 1 is a no-op. Pass 0 to delete all.

Return type:

None

put(config, checkpoint, metadata, new_versions)[source]#

Store checkpoint with inline channel values.

Parameters:
  • config (RunnableConfig)

  • checkpoint (Checkpoint)

  • metadata (CheckpointMetadata)

  • new_versions (dict[str, str | int | float])

Return type:

RunnableConfig

put_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes linked to a checkpoint with checkpoint-level registry.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Optional path info for the task.

Return type:

None

setup()[source]#

Initialize the indices in Redis.

Return type:

None

DEFAULT_CHANNEL_CACHE_MAX_SIZE = 100#
DEFAULT_KEY_CACHE_MAX_SIZE = 1000#
class BaseRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Bases: BaseCheckpointSaver[str], Generic[RedisClientType, IndexType]

Base Redis implementation for checkpoint saving.

Uses Redis JSON for storing checkpoints and related data, with RediSearch for querying.

Initialize Redis-backed checkpoint saver.

Parameters:
  • redis_url (str | None) – Redis connection URL

  • redis_client (RedisClientType | None) – Redis client instance to use (alternative to redis_url)

  • connection_args (Dict[str, Any] | None) – Additional arguments for Redis connection

  • ttl (Dict[str, Any] | None) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads

  • checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)

  • checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)

async aset_client_info()[source]#

Set client info for Redis monitoring asynchronously.

Return type:

None

abstractmethod configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#

Configure the Redis client.

Parameters:
  • redis_url (str | None)

  • redis_client (RedisClientType | None)

  • connection_args (Dict[str, Any] | None)

Return type:

None

abstractmethod create_indexes()[source]#

Create appropriate SearchIndex instances.

Return type:

None

get_next_version(current, channel)[source]#

Generate next version number.

Parameters:
  • current (str | None)

  • channel (ChannelProtocol[Any, Any, Any])

Return type:

str

put_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes linked to a checkpoint.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Optional path info for the task.

Return type:

None

set_client_info()[source]#

Set client info for Redis monitoring.

Return type:

None

setup()[source]#

Initialize the indices in Redis.

Return type:

None

checkpoint_writes_index: IndexType#
checkpoints_index: IndexType#
property checkpoints_schema: Dict[str, Any]#

Schema for the checkpoints index.

property writes_schema: Dict[str, Any]#

Schema for the checkpoint writes index.