redis_openai_agents.RobustStreamProcessor#

class RobustStreamProcessor(redis_url='redis://localhost:6379', stream_name='agent_events', consumer_group='workers', consumer_name=None, dlq_stream=None, max_retries=3, claim_timeout_ms=300000)[source]#

Fault-tolerant stream processor with DLQ support.

Features: - Automatic pending message recovery (XCLAIM) - Dead-letter queue for failed messages - Processing timeout detection - Health statistics

Example

>>> processor = RobustStreamProcessor(
...     redis_url="redis://localhost:6379",
...     stream_name="agent_events",
...     consumer_group="workers",
... )
>>> await processor.initialize()
>>>
>>> async def handle_message(msg: dict) -> bool:
...     # Process message, return True on success
...     return True
>>>
>>> await processor.process_with_recovery(handle_message)

Initialize RobustStreamProcessor.

Parameters:
  • redis_url (str) – Redis connection URL

  • stream_name (str) – Name of the main Redis Stream

  • consumer_group (str) – Consumer group name

  • consumer_name (str | None) – Consumer name (auto-generated if not provided)

  • dlq_stream (str | None) – Dead-letter queue stream name (default: {stream_name}:dlq)

  • max_retries (int) – Maximum delivery attempts before moving to DLQ

  • claim_timeout_ms (int) – Idle time (ms) before claiming pending messages

__init__(redis_url='redis://localhost:6379', stream_name='agent_events', consumer_group='workers', consumer_name=None, dlq_stream=None, max_retries=3, claim_timeout_ms=300000)[source]#

Initialize RobustStreamProcessor.

Parameters:
  • redis_url (str) – Redis connection URL

  • stream_name (str) – Name of the main Redis Stream

  • consumer_group (str) – Consumer group name

  • consumer_name (str | None) – Consumer name (auto-generated if not provided)

  • dlq_stream (str | None) – Dead-letter queue stream name (default: {stream_name}:dlq)

  • max_retries (int) – Maximum delivery attempts before moving to DLQ

  • claim_timeout_ms (int) – Idle time (ms) before claiming pending messages

Return type:

None

Methods

__init__([redis_url, stream_name, ...])

Initialize RobustStreamProcessor.

claim_pending_messages()

Claim messages from crashed/slow consumers.

close()

Close the Redis connection.

get_dlq_messages([count])

Get messages from dead-letter queue for inspection.

get_health_stats()

Get processor health statistics.

initialize()

Initialize the processor and create consumer group if needed.

process_batch(handler[, batch_size, ...])

Process a batch of messages from the stream.

process_with_recovery(handler[, batch_size, ...])

Process stream with automatic failure recovery.

replay_dlq_message(dlq_message_id)

Replay a DLQ message back to main stream.