AsyncShallowRedisSaver#

Asynchronous shallow checkpoint saver that stores only the latest checkpoint.

class AsyncShallowRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Bases: BaseRedisSaver[Redis, AsyncSearchIndex]

Async Redis implementation that only stores the most recent checkpoint.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Initialize Redis-backed checkpoint saver.

Parameters:
  • redis_url (Optional[str]) – Redis connection URL

  • redis_client (Optional[AsyncRedis]) – Redis client instance to use (alternative to redis_url)

  • connection_args (Optional[dict[str, Any]]) – Additional arguments for Redis connection

  • ttl (Optional[dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads

  • checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)

  • checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)

classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Create a new AsyncShallowRedisSaver instance.

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | None)

  • connection_args (dict[str, Any] | None)

  • ttl (dict[str, Any] | None)

  • checkpoint_prefix (str)

  • checkpoint_write_prefix (str)

Return type:

AsyncIterator[AsyncShallowRedisSaver]

async adelete_thread(thread_id)[source]#

Delete checkpoint and writes associated with a specific thread ID.

Parameters:

thread_id (str) – The thread ID which checkpoint should be deleted.

Return type:

None

async aget_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#

Retrieve channel_values dictionary from inline checkpoint data.

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

  • channel_versions (Dict[str, Any] | None)

Return type:

dict[str, Any]

async aget_tuple(config)[source]#

Retrieve a checkpoint tuple from Redis asynchronously.

Parameters:

config (RunnableConfig)

Return type:

CheckpointTuple | None

async alist(config, *, filter=None, before=None, limit=None)[source]#

List checkpoints from Redis asynchronously.

Parameters:
  • config (RunnableConfig | None)

  • filter (Dict[str, Any] | None)

  • before (RunnableConfig | None)

  • limit (int | None)

Return type:

AsyncIterator[CheckpointTuple]

async aprune(thread_ids, *, strategy='keep_latest', keep_last=None)[source]#

Prune checkpoints for the given threads.

AsyncShallowRedisSaver stores at most one checkpoint per namespace by design, so strategy="keep_latest" (or keep_last >= 1) is always a no-op. strategy="delete" (or keep_last=0) removes all checkpoints for each thread (equivalent to adelete_thread).

Parameters:
  • thread_ids (Sequence[str]) – Thread IDs to prune.

  • strategy (str) – Pruning strategy. "keep_latest" is a no-op for shallow savers (default). "delete" removes all.

  • keep_last (int | None) – Optional override. Any value >= 1 is a no-op. Pass 0 to delete all.

Return type:

None

async aput(config, checkpoint, metadata, new_versions)[source]#

Store checkpoint with INLINE channel values

Stores all channel values directly in main checkpoint JSON

Parameters:
  • config (RunnableConfig) – The config to associate with the checkpoint

  • checkpoint (Checkpoint) – The checkpoint data to store

  • metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint

  • new_versions (dict[str, str | int | float]) – New channel versions as of this write

Returns:

Updated configuration after storing the checkpoint

Raises:

asyncio.CancelledError – If the operation is cancelled/interrupted

Return type:

RunnableConfig

async aput_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes for the latest checkpoint and clean up old writes with transaction handling.

This method uses Redis pipeline with transaction=True to ensure atomicity of all write operations. In case of interruption, all operations will be aborted.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (List[Tuple[str, Any]]) – List of writes to store.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Path of the task creating the writes.

Raises:

asyncio.CancelledError – If the operation is cancelled/interrupted

Return type:

None

async asetup()[source]#

Initialize Redis indexes asynchronously.

Return type:

None

configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#

Configure the Redis client.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | None)

  • connection_args (dict[str, Any] | None)

Return type:

None

create_indexes()[source]#

Create indexes without connecting to Redis.

Return type:

None

get_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#

Retrieve channel_values dictionary with properly constructed message objects (sync wrapper).

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

  • channel_versions (Dict[str, Any] | None)

Return type:

dict[str, Any]

get_tuple(config)[source]#

Retrieve a checkpoint tuple from Redis synchronously.

Parameters:

config (RunnableConfig)

Return type:

CheckpointTuple | None

put(config, checkpoint, metadata, new_versions)[source]#

Store only the latest checkpoint synchronously.

Parameters:
  • config (RunnableConfig)

  • checkpoint (Checkpoint)

  • metadata (CheckpointMetadata)

  • new_versions (dict[str, str | int | float])

Return type:

RunnableConfig

put_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes synchronously.

Parameters:
  • config (RunnableConfig)

  • writes (Sequence[Tuple[str, Any]])

  • task_id (str)

  • task_path (str)

Return type:

None

async setup()[source]#

Set up the checkpoint saver asynchronously.

This method creates the necessary indices in Redis. It MUST be called before using the checkpointer.

This async method follows the canonical pattern used by other async checkpointers in the LangGraph ecosystem. The type ignore is necessary because the base class defines a sync setup() method, but async checkpointers require an async setup() method to properly handle coroutines.

Return type:

None

checkpoint_writes_index: AsyncSearchIndex#
checkpoints_index: AsyncSearchIndex#