redis_openai_agents.DeduplicationService#

class DeduplicationService(redis_url='redis://localhost:6379', prefix='dedup', default_error_rate=0.01)[source]#

Bloom filter-based deduplication for agent operations.

Prevents: - Duplicate tool executions - Duplicate message storage - Cache stampede (multiple concurrent cache-miss handlers) - Request replay attacks

Example

>>> dedup = DeduplicationService(redis_url="redis://localhost:6379")
>>> await dedup.initialize()
>>> is_dup = await dedup.is_duplicate_tool_call("search", {"q": "redis"})

Initialize DeduplicationService.

Parameters:
  • redis_url (str) – Redis connection URL

  • prefix (str) – Key prefix for all deduplication keys

  • default_error_rate (float) – Default false positive rate for Bloom filters

__init__(redis_url='redis://localhost:6379', prefix='dedup', default_error_rate=0.01)[source]#

Initialize DeduplicationService.

Parameters:
  • redis_url (str) – Redis connection URL

  • prefix (str) – Key prefix for all deduplication keys

  • default_error_rate (float) – Default false positive rate for Bloom filters

Return type:

None

Methods

__init__([redis_url, prefix, default_error_rate])

Initialize DeduplicationService.

add_item(filter_name, item)

Add an item to a Bloom filter.

check_exists(filter_name, item)

Check if an item exists in a Bloom filter.

close()

Close the Redis connection.

create_filter(name[, capacity, error_rate])

Create a new Bloom filter.

initialize()

Initialize the Redis connection.

is_duplicate_message(session_id, message_content)

Check if message was already added to session.

is_duplicate_tool_call(tool_name, params[, ...])

Check if tool was recently called with same params.

mark_request_processed(request_id)

Mark request as processed (for idempotency).

prevent_cache_stampede(query_hash[, ...])

Acquire lock to prevent cache stampede.

release_cache_lock(query_hash)

Release a cache stampede lock.