Source code for redis_openai_agents.hybrid

"""HybridSearchService - Combined BM25 + Vector search.

This module provides hybrid search combining lexical (BM25) and semantic (vector)
search using Reciprocal Rank Fusion (RRF) for score combination.

Features:
- Combined BM25 + Vector similarity search
- Configurable weights for text vs vector
- Automatic result deduplication
- Metadata filtering
"""

import asyncio
import hashlib
from dataclasses import dataclass, field
from typing import Any

from .search import RedisFullTextSearch
from .vector import RedisVectorStore


[docs] @dataclass class SearchResult: """A search result from hybrid search. Attributes: id: Document ID content: Document content metadata: Document metadata dictionary score: Combined RRF score (0-1 range) """ id: str content: str metadata: dict[str, Any] = field(default_factory=dict) score: float = 0.0
[docs] class HybridSearchService: """Combined BM25 + Vector search using RedisVL. Performs both lexical (BM25) and semantic (vector) search, then combines results using Reciprocal Rank Fusion (RRF). Example: >>> service = HybridSearchService( ... redis_url="redis://localhost:6379", ... index_name="documents", ... ) >>> await service.index_documents([ ... {"content": "Redis is a fast database.", "metadata": {"source": "docs"}}, ... ]) >>> results = await service.search("fast database", k=5) Args: redis_url: Redis connection URL index_name: Name for the search indices default_vector_weight: Default weight for vector similarity (0-1) default_text_weight: Default weight for BM25 text search (0-1) """ # RRF constant for smoothing rankings _RRF_K = 60
[docs] def __init__( self, redis_url: str, index_name: str, default_vector_weight: float = 0.7, default_text_weight: float = 0.3, ) -> None: """Initialize the hybrid search service. Args: redis_url: Redis connection URL index_name: Name for the search indices default_vector_weight: Default weight for vector similarity (0-1) default_text_weight: Default weight for BM25 text search (0-1) Raises: ValueError: If weights are not in valid range """ # Validate weights if not 0.0 <= default_vector_weight <= 1.0: raise ValueError( f"default_vector_weight must be between 0 and 1, got {default_vector_weight}" ) if not 0.0 <= default_text_weight <= 1.0: raise ValueError( f"default_text_weight must be between 0 and 1, got {default_text_weight}" ) self._redis_url = redis_url self._index_name = index_name self._default_vector_weight = default_vector_weight self._default_text_weight = default_text_weight # Initialize underlying stores self._vector_store = RedisVectorStore( name=f"{index_name}_vec", redis_url=redis_url, ) self._text_search = RedisFullTextSearch( name=f"{index_name}_txt", redis_url=redis_url, )
@property def index_name(self) -> str: """The index name for this service.""" return self._index_name @property def default_vector_weight(self) -> float: """Default weight for vector similarity.""" return self._default_vector_weight @property def default_text_weight(self) -> float: """Default weight for BM25 text search.""" return self._default_text_weight async def index_documents( self, documents: list[dict[str, Any]], ) -> list[str]: """Index documents for hybrid search. Documents are indexed in both vector and text stores. Args: documents: List of documents with 'content' and optional 'metadata' Returns: List of document IDs """ # Index in vector store (async) ids = await self._vector_store.aadd_documents(documents) # Index in text store (sync, run in executor) # Transform docs to match text search schema text_docs = [] for doc in documents: text_doc = { "title": "", # Optional, empty for now "content": doc.get("content", ""), "category": doc.get("metadata", {}).get("category", ""), "tags": doc.get("metadata", {}).get("tags", []), **doc.get("metadata", {}), } text_docs.append(text_doc) await asyncio.to_thread(self._text_search.add_documents, text_docs) return ids async def count(self) -> int: """Count indexed documents. Returns: Number of documents in the index """ return self._vector_store.count() async def search( self, query: str, k: int = 10, vector_weight: float | None = None, text_weight: float | None = None, filter_expression: dict[str, Any] | None = None, ) -> list[SearchResult]: """Hybrid search combining semantic and lexical matching. Uses Reciprocal Rank Fusion (RRF) to combine results from vector similarity search and BM25 text search. Args: query: Search query text k: Number of results to return vector_weight: Weight for vector similarity (0-1), uses default if None text_weight: Weight for BM25 text search (0-1), uses default if None filter_expression: Optional metadata filter dict Returns: Combined and re-ranked results as SearchResult objects """ # Use defaults if not specified if vector_weight is None: vector_weight = self._default_vector_weight if text_weight is None: text_weight = self._default_text_weight # Normalize weights to sum to 1 total_weight = vector_weight + text_weight if total_weight > 0: vector_weight = vector_weight / total_weight text_weight = text_weight / total_weight else: # Both zero - equal weights vector_weight = 0.5 text_weight = 0.5 # Fetch more results for better fusion fetch_k = k * 2 # Run vector search (async) vector_results = await self._vector_store.asearch( query=query, k=fetch_k, filter=filter_expression, ) # Run text search (sync, run in thread) text_results = await asyncio.to_thread( self._text_search.search, query=query, k=fetch_k, filter=filter_expression, ) # Combine using RRF return self._rrf_fusion( vector_results=vector_results, text_results=text_results, k=k, vector_weight=vector_weight, text_weight=text_weight, ) def _rrf_fusion( self, vector_results: list[dict[str, Any]], text_results: list[dict[str, Any]], k: int, vector_weight: float, text_weight: float, ) -> list[SearchResult]: """Combine results using Reciprocal Rank Fusion. RRF score = sum of weight * (1 / (k + rank)) for each list Args: vector_results: Results from vector search text_results: Results from text search k: Number of results to return vector_weight: Weight for vector results text_weight: Weight for text results Returns: Combined and deduplicated results """ # Build score and document maps # Use content hash for deduplication since vector/text stores have different IDs combined_scores: dict[str, float] = {} combined_docs: dict[str, dict[str, Any]] = {} content_to_id: dict[str, str] = {} # Map content hash to original ID # Process vector results for rank, doc in enumerate(vector_results): content = doc.get("content", "") content_key = self._content_hash(content) original_id = doc.get("id", content_key) rrf_score = vector_weight * (1.0 / (self._RRF_K + rank + 1)) combined_scores[content_key] = combined_scores.get(content_key, 0.0) + rrf_score if content_key not in combined_docs: combined_docs[content_key] = doc content_to_id[content_key] = original_id # Process text results for rank, doc in enumerate(text_results): content = doc.get("content", "") content_key = self._content_hash(content) original_id = doc.get("id", content_key) rrf_score = text_weight * (1.0 / (self._RRF_K + rank + 1)) combined_scores[content_key] = combined_scores.get(content_key, 0.0) + rrf_score if content_key not in combined_docs: combined_docs[content_key] = doc content_to_id[content_key] = original_id # Sort by combined score sorted_keys = sorted( combined_scores.keys(), key=lambda x: combined_scores[x], reverse=True, ) # Build results results: list[SearchResult] = [] for content_key in sorted_keys[:k]: doc = combined_docs[content_key] results.append( SearchResult( id=content_to_id[content_key], content=doc.get("content", ""), metadata=doc.get("metadata", {}), score=combined_scores[content_key], ) ) return results def _content_hash(self, content: str) -> str: """Generate a hash for content as fallback ID. Args: content: Document content Returns: Hash string for the content """ return hashlib.sha256(content.encode()).hexdigest()[:16] async def delete_all(self) -> None: """Delete all documents from both stores.""" self._vector_store.delete_all() self._text_search.delete_all()