redis_openai_agents.ResumableStreamRunner#

class ResumableStreamRunner(redis_url='redis://localhost:6379', stream_prefix='llm_stream', max_stream_length=None, consumer_group='consumers')[source]#

Resumable LLM streaming backed by Redis Streams.

Enables durable, resumable streaming of LLM responses. Events are published to Redis Streams and can be consumed by multiple clients, each tracking their own progress.

The key insight is separating generation from consumption: - Generation continues even if clients disconnect - Clients can reconnect and resume from where they left off - Multiple clients can consume the same stream independently

stream_prefix#

Prefix for Redis Stream keys.

max_stream_length#

Maximum events per stream (for trimming).

consumer_group#

Default consumer group name.

Initialize the resumable stream runner.

Parameters:
  • redis_url (str) – Redis connection URL.

  • stream_prefix (str) – Prefix for stream keys.

  • max_stream_length (int | None) – Max events per stream (None = unlimited).

  • consumer_group (str) – Default consumer group name.

__init__(redis_url='redis://localhost:6379', stream_prefix='llm_stream', max_stream_length=None, consumer_group='consumers')[source]#

Initialize the resumable stream runner.

Parameters:
  • redis_url (str) – Redis connection URL.

  • stream_prefix (str) – Prefix for stream keys.

  • max_stream_length (int | None) – Max events per stream (None = unlimited).

  • consumer_group (str) – Default consumer group name.

Return type:

None

Methods

__init__([redis_url, stream_prefix, ...])

Initialize the resumable stream runner.

ack(session_id, consumer_id, message_id)

Acknowledge a message as processed.

claim_pending(session_id, consumer_id[, ...])

Claim pending messages from dead consumers.

close()

Close Redis connection.

delete_stream(session_id)

Delete a stream and all its messages.

get_all_events(session_id)

Get all events from a stream.

get_pending_count(session_id[, consumer_id])

Get count of pending (unacknowledged) messages.

get_stream_info(session_id)

Get information about a stream.

initialize()

Initialize Redis connection.

publish_event(session_id, event_type, data)

Publish a streaming event to Redis.

subscribe(session_id[, from_id, timeout_ms, ...])

Subscribe to streaming events.

subscribe_as_consumer(session_id, consumer_id)

Subscribe as a consumer in a consumer group.