AsyncShallowRedisSaver#
Asynchronous shallow checkpoint saver that stores only the latest checkpoint.
- class AsyncShallowRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Bases:
BaseRedisSaver[Redis,AsyncSearchIndex]Async Redis implementation that only stores the most recent checkpoint.
Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).
Initialize Redis-backed checkpoint saver.
- Parameters:
redis_url (Optional[str]) – Redis connection URL
redis_client (Optional[AsyncRedis]) – Redis client instance to use (alternative to redis_url)
connection_args (Optional[dict[str, Any]]) – Additional arguments for Redis connection
ttl (Optional[dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads
checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)
checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)
- classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#
Create a new AsyncShallowRedisSaver instance.
- Parameters:
redis_url (str | None)
redis_client (Redis | None)
connection_args (dict[str, Any] | None)
ttl (dict[str, Any] | None)
checkpoint_prefix (str)
checkpoint_write_prefix (str)
- Return type:
AsyncIterator[AsyncShallowRedisSaver]
- async adelete_thread(thread_id)[source]#
Delete checkpoint and writes associated with a specific thread ID.
- Parameters:
thread_id (str) – The thread ID which checkpoint should be deleted.
- Return type:
None
- async aget_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#
Retrieve channel_values dictionary from inline checkpoint data.
- Parameters:
thread_id (str)
checkpoint_ns (str)
checkpoint_id (str)
channel_versions (Dict[str, Any] | None)
- Return type:
dict[str, Any]
- async aget_tuple(config)[source]#
Retrieve a checkpoint tuple from Redis asynchronously.
- Parameters:
config (RunnableConfig)
- Return type:
CheckpointTuple | None
- async alist(config, *, filter=None, before=None, limit=None)[source]#
List checkpoints from Redis asynchronously.
- Parameters:
config (RunnableConfig | None)
filter (Dict[str, Any] | None)
before (RunnableConfig | None)
limit (int | None)
- Return type:
AsyncIterator[CheckpointTuple]
- async aprune(thread_ids, *, strategy='keep_latest', keep_last=None)[source]#
Prune checkpoints for the given threads.
AsyncShallowRedisSaverstores at most one checkpoint per namespace by design, sostrategy="keep_latest"(orkeep_last >= 1) is always a no-op.strategy="delete"(orkeep_last=0) removes all checkpoints for each thread (equivalent toadelete_thread).- Parameters:
thread_ids (Sequence[str]) – Thread IDs to prune.
strategy (str) – Pruning strategy.
"keep_latest"is a no-op for shallow savers (default)."delete"removes all.keep_last (int | None) – Optional override. Any value >= 1 is a no-op. Pass
0to delete all.
- Return type:
None
- async aput(config, checkpoint, metadata, new_versions)[source]#
Store checkpoint with INLINE channel values
Stores all channel values directly in main checkpoint JSON
- Parameters:
config (RunnableConfig) – The config to associate with the checkpoint
checkpoint (Checkpoint) – The checkpoint data to store
metadata (CheckpointMetadata) – Additional metadata to save with the checkpoint
new_versions (dict[str, str | int | float]) – New channel versions as of this write
- Returns:
Updated configuration after storing the checkpoint
- Raises:
asyncio.CancelledError – If the operation is cancelled/interrupted
- Return type:
RunnableConfig
- async aput_writes(config, writes, task_id, task_path='')[source]#
Store intermediate writes for the latest checkpoint and clean up old writes with transaction handling.
This method uses Redis pipeline with transaction=True to ensure atomicity of all write operations. In case of interruption, all operations will be aborted.
- Parameters:
config (RunnableConfig) – Configuration of the related checkpoint.
writes (List[Tuple[str, Any]]) – List of writes to store.
task_id (str) – Identifier for the task creating the writes.
task_path (str) – Path of the task creating the writes.
- Raises:
asyncio.CancelledError – If the operation is cancelled/interrupted
- Return type:
None
- configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#
Configure the Redis client.
Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).
- Parameters:
redis_url (str | None)
redis_client (Redis | None)
connection_args (dict[str, Any] | None)
- Return type:
None
- get_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#
Retrieve channel_values dictionary with properly constructed message objects (sync wrapper).
- Parameters:
thread_id (str)
checkpoint_ns (str)
checkpoint_id (str)
channel_versions (Dict[str, Any] | None)
- Return type:
dict[str, Any]
- get_tuple(config)[source]#
Retrieve a checkpoint tuple from Redis synchronously.
- Parameters:
config (RunnableConfig)
- Return type:
CheckpointTuple | None
- put(config, checkpoint, metadata, new_versions)[source]#
Store only the latest checkpoint synchronously.
- Parameters:
config (RunnableConfig)
checkpoint (Checkpoint)
metadata (CheckpointMetadata)
new_versions (dict[str, str | int | float])
- Return type:
RunnableConfig
- put_writes(config, writes, task_id, task_path='')[source]#
Store intermediate writes synchronously.
- Parameters:
config (RunnableConfig)
writes (Sequence[Tuple[str, Any]])
task_id (str)
task_path (str)
- Return type:
None
- async setup()[source]#
Set up the checkpoint saver asynchronously.
This method creates the necessary indices in Redis. It MUST be called before using the checkpointer.
This async method follows the canonical pattern used by other async checkpointers in the LangGraph ecosystem. The type ignore is necessary because the base class defines a sync setup() method, but async checkpointers require an async setup() method to properly handle coroutines.
- Return type:
None
- checkpoint_writes_index: AsyncSearchIndex#
- checkpoints_index: AsyncSearchIndex#