redis_openai_agents.RedisStreamTransport#

class RedisStreamTransport(stream_name, redis_url='redis://localhost:6379', consumer_group='agents', max_len=10000, pool=None)[source]#

Redis Streams-based event transport for token streaming.

Provides reliable, replayable event streaming using Redis Streams. Supports consumer groups for multiple concurrent clients.

Example

>>> stream = RedisStreamTransport(stream_name="agent_output")
>>> stream.publish(event_type="token", data={"token": "Hello"})
>>> events = stream.read_all(count=10)
Parameters:
  • stream_name (str) – Name of the Redis Stream

  • redis_url (str) – Redis connection URL

  • consumer_group (str) – Consumer group name for reading

  • max_len (int) – Maximum stream length (older entries trimmed)

  • pool (RedisConnectionPool | None)

Initialize the stream transport.

Parameters:
  • stream_name (str) – Name of the Redis Stream

  • redis_url (str) – Redis connection URL

  • consumer_group (str) – Consumer group name for reading

  • max_len (int) – Maximum stream length (older entries trimmed)

  • pool (RedisConnectionPool | None) – Optional shared connection pool

__init__(stream_name, redis_url='redis://localhost:6379', consumer_group='agents', max_len=10000, pool=None)[source]#

Initialize the stream transport.

Parameters:
  • stream_name (str) – Name of the Redis Stream

  • redis_url (str) – Redis connection URL

  • consumer_group (str) – Consumer group name for reading

  • max_len (int) – Maximum stream length (older entries trimmed)

  • pool (RedisConnectionPool | None) – Optional shared connection pool

Return type:

None

Methods

__init__(stream_name[, redis_url, ...])

Initialize the stream transport.

ack(ids)

Acknowledge events as processed (XACK).

apublish(event_type, data[, metadata])

Async version of publish() - publish an event to the stream.

asubscribe([last_id, block_ms])

Async generator that yields events from the stream.

claim(consumer, min_idle_ms, ids)

Claim pending messages from another consumer (XCLAIM).

close()

Close the Redis connection.

delete()

Delete the stream and all its data.

info()

Get stream information.

pending()

Get information about pending messages (XPENDING).

publish(event_type, data[, metadata])

Publish an event to the stream.

read_all([count, start_id])

Read all events from the stream.

read_group(consumer[, count, block_ms])

Read events using consumer group (XREADGROUP).

Attributes

consumer_group

Consumer group name.

stream_name

Name of the Redis Stream.