AsyncRedisSaver#

Asynchronous checkpoint saver backed by Redis.

class AsyncRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Bases: BaseRedisSaver[Redis | RedisCluster, AsyncSearchIndex]

Async Redis implementation for checkpoint saver.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Initialize Redis-backed checkpoint saver.

Parameters:
  • redis_url (Optional[str]) – Redis connection URL

  • redis_client (Optional[Union[AsyncRedis, AsyncRedisCluster]]) – Redis client instance to use (alternative to redis_url)

  • connection_args (Optional[Dict[str, Any]]) – Additional arguments for Redis connection

  • ttl (Optional[Dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads

  • checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)

  • checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)

classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Parameters:
  • redis_url (str | None)

  • redis_client (Redis | RedisCluster | None)

  • connection_args (Dict[str, Any] | None)

  • ttl (Dict[str, Any] | None)

  • checkpoint_prefix (str)

  • checkpoint_write_prefix (str)

Return type:

AsyncIterator[AsyncRedisSaver]

async adelete_thread(thread_id)[source]#

Delete all checkpoints and writes associated with a specific thread ID.

Parameters:

thread_id (str) – The thread ID whose checkpoints should be deleted.

Return type:

None

async aget_channel_values(thread_id, checkpoint_ns='', checkpoint_id='', channel_versions=None)[source]#

Retrieve channel_values using efficient FT.SEARCH with checkpoint_id.

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

  • channel_versions (Dict[str, Any] | None)

Return type:

Dict[str, Any]

async aget_tuple(config)[source]#

Get a checkpoint tuple from Redis asynchronously.

Parameters:

config (RunnableConfig)

Return type:

CheckpointTuple | None

async alist(config, *, filter=None, before=None, limit=None)[source]#

List checkpoints from Redis asynchronously.

Parameters:
  • config (RunnableConfig | None)

  • filter (dict[str, Any] | None)

  • before (RunnableConfig | None)

  • limit (int | None)

Return type:

AsyncIterator[CheckpointTuple]

async aprune(thread_ids, *, strategy='keep_latest', keep_last=None, max_results=10000)[source]#

Prune old checkpoints for the given threads per namespace.

Retains the most-recent checkpoints per checkpoint namespace and removes the rest, along with their associated write keys and key-registry sorted sets.

Each namespace (root "" and any subgraph namespaces) is treated as an independent checkpoint chain. Channel values are stored inline within each checkpoint document, so they are automatically removed when the checkpoint document is deleted.

Parameters:
  • thread_ids (Sequence[str]) – Thread IDs whose old checkpoints should be pruned.

  • strategy (str) – Pruning strategy. "keep_latest" retains only the most recent checkpoint per namespace (default). "delete" removes all checkpoints for the thread.

  • keep_last (int | None) – Optional override — number of recent checkpoints to retain per namespace. When provided, takes precedence over strategy. Use keep_last=0 to remove all checkpoints.

  • max_results (int) – Maximum number of checkpoints fetched from the index per thread in a single query. Defaults to 10 000.

Return type:

None

async aput(config, checkpoint, metadata, new_versions, stream_mode='values')[source]#

Store a checkpoint to Redis with proper transaction handling.

This method ensures that all Redis operations are performed atomically using Redis transactions. In case of interruption (asyncio.CancelledError), the transaction will be aborted, ensuring consistency.

Parameters:
  • config (RunnableConfig) – The config to associate with the checkpoint

  • checkpoint (Checkpoint) – The checkpoint data to store

  • metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint

  • new_versions (dict[str, str | int | float]) – New channel versions as of this write

  • stream_mode (str) – The streaming mode being used (values, updates, etc.)

Returns:

Updated configuration after storing the checkpoint

Raises:

asyncio.CancelledError – If the operation is cancelled/interrupted

Return type:

RunnableConfig

async aput_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes linked to a checkpoint using Redis JSON.

This method uses Redis pipeline without transaction to avoid lock contention during parallel test execution.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (List[Tuple[str, Any]]) – List of writes to store.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Path of the task creating the writes.

Raises:

asyncio.CancelledError – If the operation is cancelled/interrupted

Return type:

None

async asetup()[source]#

Set up the checkpoint saver.

Return type:

None

configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#

Configure the Redis client.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | RedisCluster | None)

  • connection_args (Dict[str, Any] | None)

Return type:

None

create_indexes()[source]#

Create indexes without connecting to Redis.

Return type:

None

get_channel_values(thread_id, checkpoint_ns='', checkpoint_id='')[source]#

Retrieve channel_values using efficient FT.SEARCH with checkpoint_id (sync wrapper).

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

Return type:

Dict[str, Any]

get_tuple(config)[source]#

Get a checkpoint tuple from Redis.

Parameters:

config (RunnableConfig) – The config to use for retrieving the checkpoint.

Returns:

The retrieved checkpoint tuple, or None if no matching checkpoint was found.

Return type:

Optional[CheckpointTuple]

Raises:

asyncio.InvalidStateError – If called from the wrong thread/event loop

put(config, checkpoint, metadata, new_versions)[source]#

Store a checkpoint to Redis.

Parameters:
  • config (RunnableConfig) – The config to associate with the checkpoint.

  • checkpoint (Checkpoint) – The checkpoint to save.

  • metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint.

  • new_versions (ChannelVersions) – New channel versions as of this write.

Returns:

Updated configuration after storing the checkpoint.

Return type:

RunnableConfig

Raises:

asyncio.InvalidStateError – If called from the wrong thread/event loop

put_writes(config, writes, task_id, task_path='')[source]#

Synchronous wrapper for aput_writes.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (List[Tuple[str, Any]]) – List of writes to store.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Path of the task creating the writes.

Return type:

None

async setup()[source]#

Set up the checkpoint saver asynchronously.

This method creates the necessary indices in Redis and detects cluster mode. It MUST be called before using the checkpointer.

This async method follows the canonical pattern used by PostgreSQL and SQLite checkpointers in the LangGraph ecosystem. The type ignore is necessary because the base class defines a sync setup() method, but async checkpointers require an async setup() method to properly handle coroutines.

Usage: await checkpointer.setup()

Return type:

None

checkpoint_writes_index: AsyncSearchIndex#
checkpoints_index: AsyncSearchIndex#
cluster_mode: bool | None = None#