Skip to content

Schema Registries

The schema registry caches index field types loaded from Redis via FT.INFO. There are sync and async variants. Conceptual background is in Schema-aware translation.

Class Description
SchemaRegistry Sync registry. Lazy and eager loading, polling for index changes.
AsyncSchemaRegistry Async registry. Coalesced concurrent loads, cancellation-safe.

SchemaRegistry

SchemaRegistry

SchemaRegistry(redis_client: Redis)

Loads and caches index schemas from Redis.

Supports automatic schema refresh via Redis keyspace notifications.

Source code in sql_redis/schema.py
def __init__(self, redis_client: redis.Redis):
    self._client = redis_client
    self._schemas: dict[str, dict[str, str]] = {}
    self._on_change: Callable[[str, str], None] | None = None
    self._watching = False

load_all

load_all() -> None

Load schemas for all indexes on the server.

Source code in sql_redis/schema.py
def load_all(self) -> None:
    """Load schemas for all indexes on the server."""
    self._schemas.clear()
    indexes = self._client.execute_command("FT._LIST")
    for index_name in indexes:
        # Decode bytes to string if needed
        if isinstance(index_name, bytes):
            index_name = index_name.decode("utf-8")
        self._load_index_schema(index_name)

get_field_type

get_field_type(index: str, field: str) -> str | None

Get field type for a given index and field.

Lazily loads the index schema if not already cached. Returns None if index doesn't exist or field is unknown.

Source code in sql_redis/schema.py
def get_field_type(self, index: str, field: str) -> str | None:
    """Get field type for a given index and field.

    Lazily loads the index schema if not already cached.
    Returns None if index doesn't exist or field is unknown.
    """
    schema = self.get_schema(index)
    return schema.get(field)

get_schema

get_schema(index: str) -> dict[str, str]

Get full schema for an index, loading lazily if not cached.

On first access for a given index, issues a single FT.INFO call to Redis. Subsequent calls return the cached schema with no I/O.

Returns empty dict if index does not exist in Redis.

Source code in sql_redis/schema.py
def get_schema(self, index: str) -> dict[str, str]:
    """Get full schema for an index, loading lazily if not cached.

    On first access for a given index, issues a single FT.INFO call
    to Redis. Subsequent calls return the cached schema with no I/O.

    Returns empty dict if index does not exist in Redis.
    """
    if index not in self._schemas:
        self._load_index_schema(index)
    return self._schemas.get(index, {})

invalidate

invalidate(index: str | None = None) -> None

Invalidate cached schema(s), forcing reload on next access.

Parameters:

Name Type Description Default
index str | None

Specific index to invalidate. If None, invalidates all.

None
Source code in sql_redis/schema.py
def invalidate(self, index: str | None = None) -> None:
    """Invalidate cached schema(s), forcing reload on next access.

    Args:
        index: Specific index to invalidate. If None, invalidates all.
    """
    if index is not None:
        self._schemas.pop(index, None)
    else:
        self._schemas.clear()

refresh

refresh(index_name: str) -> None

Refresh schema for a single index.

If the index no longer exists, removes it from the registry. If the index is new or changed, updates its cached schema.

Source code in sql_redis/schema.py
def refresh(self, index_name: str) -> None:
    """Refresh schema for a single index.

    If the index no longer exists, removes it from the registry.
    If the index is new or changed, updates its cached schema.
    """
    self._load_index_schema(index_name)

start_watching

start_watching(
    on_change: Callable[[str, str], None] | None = None,
) -> None

Start watching for index changes.

Since RediSearch doesn't emit keyspace notifications for FT commands, this uses polling via FT._LIST to detect changes.

Parameters:

Name Type Description Default
on_change Callable[[str, str], None] | None

Optional callback invoked with (event_type, index_name) when an index is created, dropped, or altered.

None
Source code in sql_redis/schema.py
def start_watching(
    self, on_change: Callable[[str, str], None] | None = None
) -> None:
    """Start watching for index changes.

    Since RediSearch doesn't emit keyspace notifications for FT commands,
    this uses polling via FT._LIST to detect changes.

    Args:
        on_change: Optional callback invoked with (event_type, index_name)
                   when an index is created, dropped, or altered.
    """
    self._on_change = on_change
    self._watching = True

stop_watching

stop_watching() -> None

Stop watching for index changes.

Source code in sql_redis/schema.py
def stop_watching(self) -> None:
    """Stop watching for index changes."""
    self._watching = False
    self._on_change = None

process_pending_events

process_pending_events() -> None

Process any pending index change events.

Since RediSearch doesn't emit keyspace notifications, this polls FT._LIST to detect new and deleted indexes. Call this periodically.

Source code in sql_redis/schema.py
def process_pending_events(self) -> None:
    """Process any pending index change events.

    Since RediSearch doesn't emit keyspace notifications, this polls
    FT._LIST to detect new and deleted indexes. Call this periodically.
    """
    if not self._watching:
        return

    # Get current indexes from Redis (decode bytes to str for comparison)
    raw_indexes = self._client.execute_command("FT._LIST")
    current_indexes = {
        idx.decode("utf-8") if isinstance(idx, bytes) else idx
        for idx in raw_indexes
    }

    cached_indexes = set(self._schemas.keys())

    # Detect new indexes
    new_indexes = current_indexes - cached_indexes
    for idx in new_indexes:
        self._load_index_schema(idx)
        if self._on_change:
            self._on_change("created", idx)

    # Detect deleted indexes
    deleted_indexes = cached_indexes - current_indexes
    for idx in deleted_indexes:
        self._schemas.pop(idx, None)
        if self._on_change:
            self._on_change("dropped", idx)

AsyncSchemaRegistry

AsyncSchemaRegistry

AsyncSchemaRegistry(redis_client: 'async_redis.Redis')

Async version of SchemaRegistry for use with redis.asyncio clients.

Loads and caches index schemas from Redis asynchronously.

Initialize with an async Redis client.

Parameters:

Name Type Description Default
redis_client 'async_redis.Redis'

An async Redis client (redis.asyncio.Redis).

required
Source code in sql_redis/schema.py
def __init__(self, redis_client: "async_redis.Redis") -> None:
    """Initialize with an async Redis client.

    Args:
        redis_client: An async Redis client (redis.asyncio.Redis).
    """
    self._client = redis_client
    self._schemas: dict[str, dict[str, str]] = {}
    self._loading: dict[str, asyncio.Task[None]] = {}

load_all async

load_all() -> None

Load schemas for all indexes on the server.

Uses asyncio.gather() to load all index schemas concurrently. Cancels any in-flight ensure_schema() tasks first.

Source code in sql_redis/schema.py
async def load_all(self) -> None:
    """Load schemas for all indexes on the server.

    Uses asyncio.gather() to load all index schemas concurrently.
    Cancels any in-flight ensure_schema() tasks first.
    """
    self._cancel_all_inflight()
    self._schemas.clear()
    indexes = await self._client.execute_command("FT._LIST")
    # Decode bytes to strings
    decoded_indexes = [
        idx.decode("utf-8") if isinstance(idx, bytes) else idx for idx in indexes
    ]
    # Load all schemas concurrently
    await asyncio.gather(
        *[self._load_index_schema(name) for name in decoded_indexes]
    )

get_field_type

get_field_type(index: str, field: str) -> str | None

Get field type for a given index and field.

Note: For async lazy loading, call ensure_schema() first. Returns None if index or field is unknown.

Source code in sql_redis/schema.py
def get_field_type(self, index: str, field: str) -> str | None:
    """Get field type for a given index and field.

    Note: For async lazy loading, call ensure_schema() first.
    Returns None if index or field is unknown.
    """
    schema = self._schemas.get(index, {})
    return schema.get(field)

get_schema

get_schema(index: str) -> dict[str, str]

Get full schema for an index (sync access to cache).

Returns empty dict if index is not cached. Use ensure_schema() to load lazily in async contexts.

Source code in sql_redis/schema.py
def get_schema(self, index: str) -> dict[str, str]:
    """Get full schema for an index (sync access to cache).

    Returns empty dict if index is not cached. Use ensure_schema()
    to load lazily in async contexts.
    """
    return self._schemas.get(index, {})

ensure_schema async

ensure_schema(index: str) -> dict[str, str]

Ensure schema for an index is loaded, fetching lazily if needed.

This is the async equivalent of the sync get_schema() lazy path. On first access for a given index, issues a single FT.INFO call. Subsequent calls return the cached schema with no I/O.

Concurrent calls for the same index share a single in-flight FT.INFO task instead of issuing duplicate requests.

If the in-flight task is cancelled (e.g. by invalidate()), the current cache state is returned instead of propagating CancelledError to callers.

Returns empty dict if index does not exist in Redis.

Source code in sql_redis/schema.py
async def ensure_schema(self, index: str) -> dict[str, str]:
    """Ensure schema for an index is loaded, fetching lazily if needed.

    This is the async equivalent of the sync get_schema() lazy path.
    On first access for a given index, issues a single FT.INFO call.
    Subsequent calls return the cached schema with no I/O.

    Concurrent calls for the same index share a single in-flight
    FT.INFO task instead of issuing duplicate requests.

    If the in-flight task is cancelled (e.g. by invalidate()), the
    current cache state is returned instead of propagating
    CancelledError to callers.

    Returns empty dict if index does not exist in Redis.
    """
    if index in self._schemas:
        return self._schemas[index]

    if index not in self._loading:
        new_task = asyncio.create_task(self._load_index_schema(index))
        self._loading[index] = new_task

        # Attach a done-callback to clean up _loading even if all
        # awaiters are cancelled (no finally block would run).
        def _cleanup_loading(t: asyncio.Task[None], _index: str = index) -> None:
            if self._loading.get(_index) is t:
                self._loading.pop(_index, None)

        new_task.add_done_callback(_cleanup_loading)

    maybe_task = self._loading.get(index)
    if maybe_task is None:
        # Task was removed (e.g. by invalidate()) before we could await it
        return self._schemas.get(index, {})

    task = maybe_task  # narrowed to non-None

    try:
        # Shield the shared task so that caller cancellation (e.g.
        # asyncio.wait_for timeout) does not cancel the shared FT.INFO
        # for other awaiters. invalidate()/refresh()/load_all() still
        # cancel the underlying task directly via task.cancel().
        await asyncio.shield(task)
    except asyncio.CancelledError:
        if not task.cancelled():
            # The shared load task is still running — this CancelledError
            # came from the *caller* being cancelled (e.g. asyncio.wait_for
            # timeout). Propagate so the caller actually aborts.
            raise
        # invalidate()/refresh()/load_all() cancelled the in-flight load.
        # Return the current (post-invalidate) cache state rather than
        # propagating cancellation to higher-level callers.
        return self._schemas.get(index, {})
    finally:
        # Only remove if this is still the current task for this index
        # and the underlying task has finished. If the caller was
        # cancelled while the shielded task continues running, keep it
        # registered so other callers still share the same in-flight task.
        if self._loading.get(index) is task and task.done():
            self._loading.pop(index, None)

    return self._schemas.get(index, {})

invalidate

invalidate(index: str | None = None) -> None

Invalidate cached schema(s), forcing reload on next access.

Also cancels any in-flight ensure_schema() tasks for the invalidated index(es) to prevent stale data from being written.

Parameters:

Name Type Description Default
index str | None

Specific index to invalidate. If None, invalidates all.

None
Source code in sql_redis/schema.py
def invalidate(self, index: str | None = None) -> None:
    """Invalidate cached schema(s), forcing reload on next access.

    Also cancels any in-flight ensure_schema() tasks for the
    invalidated index(es) to prevent stale data from being written.

    Args:
        index: Specific index to invalidate. If None, invalidates all.
    """
    if index is not None:
        self._schemas.pop(index, None)
        task = self._loading.pop(index, None)
        if task is not None:
            task.cancel()
    else:
        self._cancel_all_inflight()
        self._schemas.clear()

refresh async

refresh(index_name: str) -> None

Refresh schema for a single index.

Cancels any in-flight ensure_schema() task for this index first. If the index no longer exists, removes it from the registry.

Source code in sql_redis/schema.py
async def refresh(self, index_name: str) -> None:
    """Refresh schema for a single index.

    Cancels any in-flight ensure_schema() task for this index first.
    If the index no longer exists, removes it from the registry.
    """
    task = self._loading.pop(index_name, None)
    if task is not None:
        task.cancel()
    await self._load_index_schema(index_name)