redis_openai_agents.RedisStreamTransport#
- class RedisStreamTransport(stream_name, redis_url='redis://localhost:6379', consumer_group='agents', max_len=10000, pool=None)[source]#
Redis Streams-based event transport for token streaming.
Provides reliable, replayable event streaming using Redis Streams. Supports consumer groups for multiple concurrent clients.
Example
>>> stream = RedisStreamTransport(stream_name="agent_output") >>> stream.publish(event_type="token", data={"token": "Hello"}) >>> events = stream.read_all(count=10)
- Parameters:
stream_name (str) – Name of the Redis Stream
redis_url (str) – Redis connection URL
consumer_group (str) – Consumer group name for reading
max_len (int) – Maximum stream length (older entries trimmed)
pool (RedisConnectionPool | None)
Initialize the stream transport.
- Parameters:
stream_name (str) – Name of the Redis Stream
redis_url (str) – Redis connection URL
consumer_group (str) – Consumer group name for reading
max_len (int) – Maximum stream length (older entries trimmed)
pool (RedisConnectionPool | None) – Optional shared connection pool
- __init__(stream_name, redis_url='redis://localhost:6379', consumer_group='agents', max_len=10000, pool=None)[source]#
Initialize the stream transport.
- Parameters:
stream_name (str) – Name of the Redis Stream
redis_url (str) – Redis connection URL
consumer_group (str) – Consumer group name for reading
max_len (int) – Maximum stream length (older entries trimmed)
pool (RedisConnectionPool | None) – Optional shared connection pool
- Return type:
None
Methods
__init__(stream_name[, redis_url, ...])Initialize the stream transport.
ack(ids)Acknowledge events as processed (XACK).
apublish(event_type, data[, metadata])Async version of publish() - publish an event to the stream.
asubscribe([last_id, block_ms])Async generator that yields events from the stream.
claim(consumer, min_idle_ms, ids)Claim pending messages from another consumer (XCLAIM).
close()Close the Redis connection.
delete()Delete the stream and all its data.
info()Get stream information.
pending()Get information about pending messages (XPENDING).
publish(event_type, data[, metadata])Publish an event to the stream.
read_all([count, start_id])Read all events from the stream.
read_group(consumer[, count, block_ms])Read events using consumer group (XREADGROUP).
Attributes
consumer_groupConsumer group name.
stream_nameName of the Redis Stream.