Source code for redis_openai_agents.vector

"""RedisVectorStore - Vector storage and search for RAG.

This module provides vector storage and similarity search using Redis,
built on top of RedisVL's SearchIndex.

Features:
- Document storage with automatic embedding generation
- Semantic similarity search
- Metadata filtering
- HNSW algorithm for fast approximate nearest neighbor search
"""

import asyncio
import json
import logging
import uuid
from typing import TYPE_CHECKING, Any

import numpy as np
from redisvl.index import SearchIndex  # type: ignore[import-untyped]
from redisvl.query import VectorQuery  # type: ignore[import-untyped]
from redisvl.utils.vectorize import HFTextVectorizer  # type: ignore[import-untyped]

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from redisvl.extensions.cache.embeddings import (  # type: ignore[import-untyped]
        EmbeddingsCache,
    )

DEFAULT_VECTORIZER_MODEL = "sentence-transformers/all-MiniLM-L6-v2"


[docs] class RedisVectorStore: """Vector store for document storage and semantic search. Uses Redis with RedisVL for high-performance vector similarity search. Example: >>> store = RedisVectorStore(name="docs", redis_url="redis://localhost:6379") >>> store.add_documents([{"content": "Hello world", "metadata": {"source": "test"}}]) >>> results = store.search(query="greeting", k=5) Args: name: Index name in Redis redis_url: Redis connection URL vector_dims: Dimension of embedding vectors (default 384 for all-MiniLM-L6-v2) distance_metric: Distance metric (COSINE, L2, IP) """
[docs] def __init__( self, name: str, redis_url: str = "redis://localhost:6379", vector_dims: int = 384, distance_metric: str = "COSINE", embeddings_cache: "EmbeddingsCache | None" = None, ) -> None: """Initialize the vector store. Args: name: Index name in Redis redis_url: Redis connection URL vector_dims: Dimension of embedding vectors (384 for all-MiniLM-L6-v2) distance_metric: Distance metric (COSINE, L2, IP) embeddings_cache: Optional RedisVL EmbeddingsCache. When provided, repeated embeddings of identical content are served from the cache rather than re-invoking the vectorizer. """ self._name = name self._redis_url = redis_url self._vector_dims = vector_dims self._distance_metric = distance_metric self._vectorizer_model = DEFAULT_VECTORIZER_MODEL # Initialize vectorizer for embedding generation # Use a general-purpose embedding model for document retrieval # (redis/langcache-embed-v1 is optimized for caching, not retrieval) self._vectorizer = HFTextVectorizer( model=self._vectorizer_model, cache=embeddings_cache, ) # Create index schema schema = { "index": { "name": name, "prefix": f"doc:{name}:", "storage_type": "hash", }, "fields": [ {"name": "content", "type": "text"}, {"name": "metadata", "type": "text"}, { "name": "embedding", "type": "vector", "attrs": { "dims": vector_dims, "distance_metric": distance_metric, "algorithm": "HNSW", "datatype": "FLOAT32", }, }, ], } self._index = SearchIndex.from_dict(schema, redis_url=redis_url) # Create index if it doesn't exist try: self._index.create(overwrite=False) except Exception as exc: # Index might already exist logger.debug("Index '%s' creation skipped (may already exist): %s", name, exc)
@property def name(self) -> str: """Index name in Redis.""" return self._name @property def vectorizer_model(self) -> str: """Name of the underlying embedding model.""" return self._vectorizer_model def add_documents( self, documents: list[dict[str, Any]], ) -> list[str]: """Add documents to the vector store. Documents are automatically embedded using the vectorizer. Args: documents: List of documents with 'content' and optional 'metadata' Returns: List of document IDs """ ids: list[str] = [] records: list[dict[str, Any]] = [] for doc in documents: content = doc.get("content", "") metadata = doc.get("metadata", {}) # Generate ID doc_id = str(uuid.uuid4().hex[:16]) ids.append(doc_id) # Generate embedding and convert to bytes for Redis storage embedding = self._vectorizer.embed(content) embedding_bytes = np.array(embedding, dtype=np.float32).tobytes() # Create record record = { "id": doc_id, "content": content, "metadata": json.dumps(metadata) if metadata else "{}", "embedding": embedding_bytes, } records.append(record) # Load records into Redis if records: self._index.load(records, id_field="id") return ids def search( self, query: str, k: int = 5, filter: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """Search for similar documents. Args: query: Search query text k: Number of results to return filter: Optional metadata filter Returns: List of matching documents with content, metadata, and score """ # Generate query embedding query_embedding = self._vectorizer.embed(query) # Build filter expression if provided filter_expr = None if filter: # For now, simple tag-based filtering # Format: @field:{value} conditions = [] for _field, value in filter.items(): conditions.append(f"@metadata:*{value}*") if conditions: filter_expr = " ".join(conditions) # Create vector query vector_query = VectorQuery( vector=query_embedding, vector_field_name="embedding", return_fields=["content", "metadata"], num_results=k, filter_expression=filter_expr, ) # Execute search try: results = self._index.query(vector_query) except Exception as exc: logger.debug("Vector search query failed for index '%s': %s", self._name, exc) return [] # Format results formatted_results: list[dict[str, Any]] = [] for result in results: # Parse metadata from JSON string metadata_str = result.get("metadata", "{}") try: metadata = json.loads(metadata_str) if metadata_str else {} except (json.JSONDecodeError, TypeError): metadata = {} # Calculate similarity from distance distance = float(result.get("vector_distance", 0.0)) score = 1.0 - distance # Convert distance to similarity formatted_results.append( { "content": result.get("content", ""), "metadata": metadata, "score": score, "id": result.get("id", ""), } ) return formatted_results def hybrid_search( self, query: str, k: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5, filter: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """Hybrid search combining vector similarity and BM25 text search. This method runs both semantic (vector) and lexical (BM25) searches, then combines the results using weighted scoring. Args: query: Search query text k: Number of results to return text_weight: Weight for BM25 text search scores (0.0 to 1.0) vector_weight: Weight for vector similarity scores (0.0 to 1.0) filter: Optional metadata filter Returns: List of matching documents with combined scores """ from redisvl.query import FilterQuery from redisvl.query.filter import Text # type: ignore[import-untyped] # Normalize weights total_weight = text_weight + vector_weight if total_weight > 0: text_weight = text_weight / total_weight vector_weight = vector_weight / total_weight # Run vector search vector_results = self.search(query=query, k=k * 2, filter=filter) # Run text search using FT.SEARCH with text query text_results: list[dict[str, Any]] = [] try: # Build text filter text_filter = Text("content") % query # Build metadata filter if provided filter_expr = text_filter if filter: for _field, value in filter.items(): filter_expr = filter_expr & (Text("metadata") % f"*{value}*") fq = FilterQuery( return_fields=["content", "metadata"], filter_expression=filter_expr, num_results=k * 2, ) results = self._index.query(fq) for result in results: metadata_str = result.get("metadata", "{}") try: metadata = json.loads(metadata_str) if metadata_str else {} except (json.JSONDecodeError, TypeError): metadata = {} text_results.append( { "content": result.get("content", ""), "metadata": metadata, "score": 1.0, # Text matches get score 1.0 "id": result.get("id", ""), } ) except Exception as exc: # Text search failed, continue with vector results only logger.debug( "Text search failed for index '%s', using vector results only: %s", self._name, exc ) # Combine results using Reciprocal Rank Fusion (RRF) # Score = sum of 1/(rank + k) for each result list rrf_k = 60 # RRF constant # Build score maps combined_scores: dict[str, float] = {} combined_docs: dict[str, dict[str, Any]] = {} # Add vector results for rank, doc in enumerate(vector_results): doc_id = doc.get("id") or doc.get("content", "")[:50] rrf_score = vector_weight * (1.0 / (rank + rrf_k)) combined_scores[doc_id] = combined_scores.get(doc_id, 0) + rrf_score combined_docs[doc_id] = doc # Add text results for rank, doc in enumerate(text_results): doc_id = doc.get("id") or doc.get("content", "")[:50] rrf_score = text_weight * (1.0 / (rank + rrf_k)) combined_scores[doc_id] = combined_scores.get(doc_id, 0) + rrf_score if doc_id not in combined_docs: combined_docs[doc_id] = doc # Sort by combined score sorted_ids = sorted(combined_scores.keys(), key=lambda x: combined_scores[x], reverse=True) # Return top k results final_results: list[dict[str, Any]] = [] for doc_id in sorted_ids[:k]: doc = combined_docs[doc_id] doc["score"] = combined_scores[doc_id] final_results.append(doc) return final_results def count(self) -> int: """Count documents in the store. Returns: Number of documents """ try: info = self._index.info() return int(info.get("num_docs", 0)) except Exception as exc: logger.debug("Failed to get document count for index '%s': %s", self._name, exc) return 0 def delete(self, ids: list[str]) -> None: """Delete documents by ID. Args: ids: List of document IDs to delete """ from redis import Redis client = Redis.from_url(self._redis_url) try: for doc_id in ids: key = f"doc:{self._name}:{doc_id}" client.delete(key) finally: client.close() def delete_all(self) -> None: """Delete all documents and the index.""" try: self._index.delete(drop=True) except Exception as exc: logger.debug("Failed to drop index '%s' during delete_all: %s", self._name, exc) # Recreate the empty index schema = { "index": { "name": self._name, "prefix": f"doc:{self._name}:", "storage_type": "hash", }, "fields": [ {"name": "content", "type": "text"}, {"name": "metadata", "type": "text"}, { "name": "embedding", "type": "vector", "attrs": { "dims": self._vector_dims, "distance_metric": self._distance_metric, "algorithm": "HNSW", "datatype": "FLOAT32", }, }, ], } self._index = SearchIndex.from_dict(schema, redis_url=self._redis_url) try: self._index.create(overwrite=True) except Exception as exc: logger.error( "Failed to recreate index '%s' after drop; store is in a broken state: %s", self._name, exc, ) # Async methods async def aadd_documents( self, documents: list[dict[str, Any]], ) -> list[str]: """Async version of add_documents(). Args: documents: List of documents with 'content' and optional 'metadata' Returns: List of document IDs """ return await asyncio.to_thread(self.add_documents, documents) async def asearch( self, query: str, k: int = 5, filter: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """Async version of search(). Args: query: Search query text k: Number of results to return filter: Optional metadata filter Returns: List of matching documents """ return await asyncio.to_thread(self.search, query=query, k=k, filter=filter) async def ahybrid_search( self, query: str, k: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5, filter: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """Async version of hybrid_search(). Args: query: Search query text k: Number of results to return text_weight: Weight for BM25 text search scores vector_weight: Weight for vector similarity scores filter: Optional metadata filter Returns: List of matching documents with combined scores """ return await asyncio.to_thread( self.hybrid_search, query=query, k=k, text_weight=text_weight, vector_weight=vector_weight, filter=filter, )