Skip to content

Executor

The executor runs a translated SQL query against Redis and parses the response into a QueryResult. There are sync and async variants and factory functions that wire up a schema registry for you.

Symbol Description
Executor Sync executor.
AsyncExecutor Async executor.
create_executor Factory for the sync executor with a configurable cache strategy.
create_async_executor Factory for the async executor.
QueryResult Result rows and total count.
SchemaCacheStrategy "lazy" or "load_all" literal.

Executor

Executor

Executor(client: Redis, schema_registry: SchemaRegistry)

Bases: _ScoreParseMixin

Executes SQL queries against Redis.

Initialize executor with Redis client and schema registry.

Source code in sql_redis/executor.py
def __init__(self, client: redis.Redis, schema_registry: SchemaRegistry) -> None:
    """Initialize executor with Redis client and schema registry."""
    self._client = client
    self._schema_registry = schema_registry
    self._translator = Translator(schema_registry)

execute

execute(
    sql: str, *, params: dict | None = None
) -> QueryResult

Execute a SQL query and return results.

Source code in sql_redis/executor.py
def execute(self, sql: str, *, params: dict | None = None) -> QueryResult:
    """Execute a SQL query and return results."""
    params = params or {}

    # Substitute non-bytes params in SQL using token-based approach
    sql = _substitute_params(sql, params)

    # Translate SQL to Redis command
    translated = self._translator.translate(sql)

    # Build command list and substitute vector params
    # Use list[str | bytes] to allow bytes for vector params
    cmd: list[str | bytes] = list(translated.to_command_list())

    # Find any bytes params (vectors) to substitute
    vector_param: bytes | None = None
    for value in params.values():
        if isinstance(value, bytes):
            vector_param = value
            break

    # Replace $vector placeholder with actual bytes
    if vector_param:
        for i, arg in enumerate(cmd):
            if arg == "$vector":
                cmd[i] = vector_param

    # Execute command
    try:
        raw_result = self._client.execute_command(*cmd)
    except redis.ResponseError as e:
        error_msg = str(e)
        _ismissing_signatures = (
            "Unknown function",
            "No such function",
            "Syntax error",
            "INDEXMISSING",
        )
        if "ismissing(@" in translated.query_string and any(
            sig in error_msg for sig in _ismissing_signatures
        ):
            raise redis.ResponseError(
                f"{error_msg}. This error may be caused by use of the "
                "ismissing() function. ismissing() requires Redis 7.4+ "
                "(RediSearch 2.10+) and the field must have INDEXMISSING "
                "declared in the schema."
            ) from e
        raise

    # Parse result based on command type
    count = raw_result[0] if raw_result else 0
    rows = []

    if translated.command == "FT.SEARCH":
        # Use the explicit score_alias signal rather than scanning args
        # for the literal token "WITHSCORES", which could false-positive
        # if a returned field happened to be named "WITHSCORES".
        with_scores = translated.score_alias is not None
        # RETURN 0 suppresses document fields (like NOCONTENT);
        # with WITHSCORES the reply is [count, id, score, id, score, ...]
        no_content = self._has_return_0(translated.args)

        # Pre-resolve score alias; may be deferred for SELECT *
        score_alias: str | None = None

        if with_scores and no_content:
            # WITHSCORES + RETURN 0: [count, id1, score1, id2, score2, ...]
            # Stride of 2: key, score (no field array)
            score_alias = self._resolve_score_alias(
                translated.score_alias, translated.args
            )
            for i in range(1, len(raw_result) - 1, 2):
                score = raw_result[i + 1]
                row = {score_alias: score}
                rows.append(row)
        elif with_scores:
            # WITHSCORES format: [count, key1, score1, [fields1], key2, score2, [fields2], ...]
            # Stride of 3: key, score, field_list
            # First pass: collect all field names across all rows so the
            # alias avoids collisions with any document field, not just
            # the first row's fields.
            all_field_names: set[str] = set()
            parsed_rows: list[tuple[dict, Any]] = []
            for i in range(1, len(raw_result) - 2, 3):
                score = raw_result[i + 1]
                row_data = raw_result[i + 2]
                row = dict(zip(row_data[::2], row_data[1::2]))
                all_field_names.update(row.keys())
                parsed_rows.append((row, score))
            resolved_alias = self._resolve_score_alias(
                translated.score_alias,
                translated.args,
                first_row_fields=all_field_names,
            )
            for row, score in parsed_rows:
                row[resolved_alias] = score
                rows.append(row)
        else:
            # Standard format: [count, key1, [fields1], key2, [fields2], ...]
            for i in range(2, len(raw_result), 2):
                row_data = raw_result[i]
                row = dict(zip(row_data[::2], row_data[1::2]))
                rows.append(row)
    else:
        # FT.AGGREGATE format: [count, [fields1], [fields2], ...]
        for row_data in raw_result[1:]:
            row = dict(zip(row_data[::2], row_data[1::2]))
            rows.append(row)

    return QueryResult(rows=rows, count=count)

AsyncExecutor

AsyncExecutor

AsyncExecutor(
    client: "async_redis.Redis",
    schema_registry: AsyncSchemaRegistry,
)

Bases: _ScoreParseMixin

Async version of Executor for use with redis.asyncio clients.

Initialize async executor with Redis client and schema registry.

Parameters:

Name Type Description Default
client 'async_redis.Redis'

An async Redis client (redis.asyncio.Redis).

required
schema_registry AsyncSchemaRegistry

An AsyncSchemaRegistry instance.

required
Source code in sql_redis/executor.py
def __init__(
    self,
    client: "async_redis.Redis",
    schema_registry: AsyncSchemaRegistry,
) -> None:
    """Initialize async executor with Redis client and schema registry.

    Args:
        client: An async Redis client (redis.asyncio.Redis).
        schema_registry: An AsyncSchemaRegistry instance.
    """
    self._client = client
    self._schema_registry = schema_registry
    self._translator = Translator(schema_registry)

execute async

execute(
    sql: str, *, params: dict | None = None
) -> QueryResult

Execute a SQL query asynchronously and return results.

Source code in sql_redis/executor.py
async def execute(self, sql: str, *, params: dict | None = None) -> QueryResult:
    """Execute a SQL query asynchronously and return results."""
    params = params or {}

    # Substitute non-bytes params in SQL
    sql = _substitute_params(sql, params)

    # Parse once, ensure schema is loaded (async lazy-load), then
    # translate from the pre-parsed result to avoid double-parsing.
    parsed = self._translator.parse(sql)
    if parsed.index:
        await self._schema_registry.ensure_schema(parsed.index)

    # Translate from pre-parsed query (sync - no Redis calls)
    translated = self._translator.translate_parsed(parsed)

    # Build command list and substitute vector params
    cmd: list[str | bytes] = list(translated.to_command_list())

    # Find any bytes params (vectors) to substitute
    vector_param: bytes | None = None
    for value in params.values():
        if isinstance(value, bytes):
            vector_param = value
            break

    # Replace $vector placeholder with actual bytes
    if vector_param:
        for i, arg in enumerate(cmd):
            if arg == "$vector":
                cmd[i] = vector_param

    # Execute command asynchronously
    try:
        raw_result = await self._client.execute_command(*cmd)
    except redis.ResponseError as e:
        error_msg = str(e)
        _ismissing_signatures = (
            "Unknown function",
            "No such function",
            "Syntax error",
            "INDEXMISSING",
        )
        if "ismissing(@" in translated.query_string and any(
            sig in error_msg for sig in _ismissing_signatures
        ):
            raise redis.ResponseError(
                f"{error_msg}. This error may be caused by use of the "
                "ismissing() function. ismissing() requires Redis 7.4+ "
                "(RediSearch 2.10+) and the field must have INDEXMISSING "
                "declared in the schema."
            ) from e
        raise

    # Parse result based on command type
    count = raw_result[0] if raw_result else 0
    rows = []

    if translated.command == "FT.SEARCH":
        with_scores = translated.score_alias is not None
        no_content = self._has_return_0(translated.args)

        score_alias: str | None = None

        if with_scores and no_content:
            # WITHSCORES + RETURN 0: [count, id1, score1, id2, score2, ...]
            score_alias = self._resolve_score_alias(
                translated.score_alias, translated.args
            )
            for i in range(1, len(raw_result) - 1, 2):
                score = raw_result[i + 1]
                row = {score_alias: score}
                rows.append(row)
        elif with_scores:
            # WITHSCORES format: [count, key1, score1, [fields1], ...]
            # First pass: collect all field names across all rows so the
            # alias avoids collisions with any document field.
            all_field_names: set[str] = set()
            parsed_rows: list[tuple[dict, Any]] = []
            for i in range(1, len(raw_result) - 2, 3):
                score = raw_result[i + 1]
                row_data = raw_result[i + 2]
                row = dict(zip(row_data[::2], row_data[1::2]))
                all_field_names.update(row.keys())
                parsed_rows.append((row, score))
            resolved_alias = self._resolve_score_alias(
                translated.score_alias,
                translated.args,
                first_row_fields=all_field_names,
            )
            for row, score in parsed_rows:
                row[resolved_alias] = score
                rows.append(row)
        else:
            # Standard format: [count, key1, [fields1], key2, [fields2], ...]
            for i in range(2, len(raw_result), 2):
                row_data = raw_result[i]
                row = dict(zip(row_data[::2], row_data[1::2]))
                rows.append(row)
    else:
        # FT.AGGREGATE format: [count, [fields1], [fields2], ...]
        for row_data in raw_result[1:]:
            row = dict(zip(row_data[::2], row_data[1::2]))
            rows.append(row)

    return QueryResult(rows=rows, count=count)

create_executor

create_executor

create_executor(
    client: Redis,
    *,
    schema_registry: SchemaRegistry | None = None,
    schema_cache_strategy: SchemaCacheStrategy = "lazy"
) -> Executor

Create a sync SQL executor with the requested schema cache strategy.

Parameters:

Name Type Description Default
client Redis

Redis client used by the executor.

required
schema_registry SchemaRegistry | None

Optional existing registry to reuse.

None
schema_cache_strategy SchemaCacheStrategy

Schema loading strategy. "lazy" defers FT.INFO calls until a referenced index is needed. "load_all" preserves the historical eager behavior by preloading all schemas.

'lazy'
Source code in sql_redis/executor.py
def create_executor(
    client: redis.Redis,
    *,
    schema_registry: SchemaRegistry | None = None,
    schema_cache_strategy: SchemaCacheStrategy = "lazy",
) -> Executor:
    """Create a sync SQL executor with the requested schema cache strategy.

    Args:
        client: Redis client used by the executor.
        schema_registry: Optional existing registry to reuse.
        schema_cache_strategy: Schema loading strategy. ``"lazy"`` defers
            ``FT.INFO`` calls until a referenced index is needed. ``"load_all"``
            preserves the historical eager behavior by preloading all schemas.
    """
    schema_cache_strategy = _validate_schema_cache_strategy(schema_cache_strategy)

    registry = schema_registry or SchemaRegistry(client)
    if schema_cache_strategy == "load_all":
        registry.load_all()

    return Executor(client, registry)

create_async_executor

create_async_executor async

create_async_executor(
    client: "async_redis.Redis",
    *,
    schema_registry: AsyncSchemaRegistry | None = None,
    schema_cache_strategy: SchemaCacheStrategy = "lazy"
) -> AsyncExecutor

Create an async SQL executor with the requested schema cache strategy.

Parameters:

Name Type Description Default
client 'async_redis.Redis'

Async Redis client used by the executor.

required
schema_registry AsyncSchemaRegistry | None

Optional existing async registry to reuse.

None
schema_cache_strategy SchemaCacheStrategy

Schema loading strategy. "lazy" defers FT.INFO calls until a referenced index is needed. "load_all" preserves the historical eager behavior by preloading all schemas.

'lazy'
Source code in sql_redis/executor.py
async def create_async_executor(
    client: "async_redis.Redis",
    *,
    schema_registry: AsyncSchemaRegistry | None = None,
    schema_cache_strategy: SchemaCacheStrategy = "lazy",
) -> AsyncExecutor:
    """Create an async SQL executor with the requested schema cache strategy.

    Args:
        client: Async Redis client used by the executor.
        schema_registry: Optional existing async registry to reuse.
        schema_cache_strategy: Schema loading strategy. ``"lazy"`` defers
            ``FT.INFO`` calls until a referenced index is needed. ``"load_all"``
            preserves the historical eager behavior by preloading all schemas.
    """
    schema_cache_strategy = _validate_schema_cache_strategy(schema_cache_strategy)

    registry = schema_registry or AsyncSchemaRegistry(client)
    if schema_cache_strategy == "load_all":
        await registry.load_all()

    return AsyncExecutor(client, registry)

QueryResult

QueryResult dataclass

QueryResult(rows: list[dict], count: int)

Result of executing a SQL query.

SchemaCacheStrategy

SchemaCacheStrategy module-attribute

SchemaCacheStrategy = Literal['lazy', 'load_all']

__version__

__version__ module-attribute

__version__ = version('sql-redis')

The installed package version, as a string. Useful for log lines, bug reports, and version-gated feature checks.