ShallowRedisSaver#

Synchronous shallow checkpoint saver that stores only the latest checkpoint.

class ShallowRedisSaver(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, key_cache_max_size=None, channel_cache_max_size=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Bases: BaseRedisSaver[Redis, SearchIndex]

Redis implementation that only stores the most recent checkpoint.

Supports standard Redis URLs (redis://), SSL (rediss://), and Sentinel URLs (redis+sentinel://host:26379/service_name/db).

Initialize Redis-backed checkpoint saver.

Parameters:
  • redis_url (Optional[str]) – Redis connection URL

  • redis_client (Optional[Redis]) – Redis client instance to use (alternative to redis_url)

  • connection_args (Optional[dict[str, Any]]) – Additional arguments for Redis connection

  • ttl (Optional[dict[str, Any]]) – Optional TTL configuration dict with optional keys: - default_ttl: TTL in minutes for all checkpoint keys - refresh_on_read: Whether to refresh TTL on reads

  • checkpoint_prefix (str) – Prefix for checkpoint keys (default: “checkpoint”)

  • checkpoint_write_prefix (str) – Prefix for checkpoint write keys (default: “checkpoint_write”)

  • key_cache_max_size (Optional[int])

  • channel_cache_max_size (Optional[int])

classmethod from_conn_string(redis_url=None, *, redis_client=None, connection_args=None, ttl=None, key_cache_max_size=None, channel_cache_max_size=None, checkpoint_prefix='checkpoint', checkpoint_write_prefix='checkpoint_write')[source]#

Create a new ShallowRedisSaver instance.

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | None)

  • connection_args (dict[str, Any] | None)

  • ttl (dict[str, Any] | None)

  • key_cache_max_size (int | None)

  • channel_cache_max_size (int | None)

  • checkpoint_prefix (str)

  • checkpoint_write_prefix (str)

Return type:

Iterator[ShallowRedisSaver]

configure_client(redis_url=None, redis_client=None, connection_args=None)[source]#

Configure the Redis client.

Parameters:
  • redis_url (str | None)

  • redis_client (Redis | None)

  • connection_args (dict[str, Any] | None)

Return type:

None

create_indexes()[source]#

Create appropriate SearchIndex instances.

Return type:

None

delete_thread(thread_id)[source]#

Delete all checkpoints and writes associated with a specific thread ID.

Parameters:

thread_id (str) – The thread ID whose checkpoints should be deleted.

Return type:

None

get_channel_values(thread_id, checkpoint_ns, checkpoint_id, channel_versions=None)[source]#

Retrieve channel_values dictionary from inline checkpoint data.

Parameters:
  • thread_id (str)

  • checkpoint_ns (str)

  • checkpoint_id (str)

  • channel_versions (Dict[str, Any] | None)

Return type:

dict[str, Any]

get_tuple(config)[source]#

Get checkpoint with inline channel values.

Parameters:

config (RunnableConfig)

Return type:

CheckpointTuple | None

list(config, *, filter=None, before=None, limit=None)[source]#

List checkpoints from Redis.

Parameters:
  • config (RunnableConfig | None)

  • filter (Dict[str, Any] | None)

  • before (RunnableConfig | None)

  • limit (int | None)

Return type:

Iterator[CheckpointTuple]

prune(thread_ids, *, strategy='keep_latest', keep_last=None)[source]#

Prune checkpoints for the given threads.

ShallowRedisSaver stores at most one checkpoint per namespace by design, so strategy="keep_latest" (or keep_last >= 1) is always a no-op. strategy="delete" (or keep_last=0) removes all checkpoints for each thread (equivalent to delete_thread).

Parameters:
  • thread_ids (Sequence[str]) – Thread IDs to prune.

  • strategy (str) – Pruning strategy. "keep_latest" is a no-op for shallow savers (default). "delete" removes all.

  • keep_last (int | None) – Optional override. Any value >= 1 is a no-op. Pass 0 to delete all.

Return type:

None

put(config, checkpoint, metadata, new_versions)[source]#

Store checkpoint with inline channel values.

Parameters:
  • config (RunnableConfig)

  • checkpoint (Checkpoint)

  • metadata (CheckpointMetadata)

  • new_versions (dict[str, str | int | float])

Return type:

RunnableConfig

put_writes(config, writes, task_id, task_path='')[source]#

Store intermediate writes linked to a checkpoint with checkpoint-level registry.

Parameters:
  • config (RunnableConfig) – Configuration of the related checkpoint.

  • writes (Sequence[tuple[str, Any]]) – List of writes to store, each as (channel, value) pair.

  • task_id (str) – Identifier for the task creating the writes.

  • task_path (str) – Optional path info for the task.

Return type:

None

setup()[source]#

Initialize the indices in Redis.

Return type:

None

DEFAULT_CHANNEL_CACHE_MAX_SIZE = 100#
DEFAULT_KEY_CACHE_MAX_SIZE = 1000#