redis_openai_agents.RobustStreamProcessor#
- class RobustStreamProcessor(redis_url='redis://localhost:6379', stream_name='agent_events', consumer_group='workers', consumer_name=None, dlq_stream=None, max_retries=3, claim_timeout_ms=300000)[source]#
Fault-tolerant stream processor with DLQ support.
Features: - Automatic pending message recovery (XCLAIM) - Dead-letter queue for failed messages - Processing timeout detection - Health statistics
Example
>>> processor = RobustStreamProcessor( ... redis_url="redis://localhost:6379", ... stream_name="agent_events", ... consumer_group="workers", ... ) >>> await processor.initialize() >>> >>> async def handle_message(msg: dict) -> bool: ... # Process message, return True on success ... return True >>> >>> await processor.process_with_recovery(handle_message)
Initialize RobustStreamProcessor.
- Parameters:
redis_url (str) – Redis connection URL
stream_name (str) – Name of the main Redis Stream
consumer_group (str) – Consumer group name
consumer_name (str | None) – Consumer name (auto-generated if not provided)
dlq_stream (str | None) – Dead-letter queue stream name (default: {stream_name}:dlq)
max_retries (int) – Maximum delivery attempts before moving to DLQ
claim_timeout_ms (int) – Idle time (ms) before claiming pending messages
- __init__(redis_url='redis://localhost:6379', stream_name='agent_events', consumer_group='workers', consumer_name=None, dlq_stream=None, max_retries=3, claim_timeout_ms=300000)[source]#
Initialize RobustStreamProcessor.
- Parameters:
redis_url (str) – Redis connection URL
stream_name (str) – Name of the main Redis Stream
consumer_group (str) – Consumer group name
consumer_name (str | None) – Consumer name (auto-generated if not provided)
dlq_stream (str | None) – Dead-letter queue stream name (default: {stream_name}:dlq)
max_retries (int) – Maximum delivery attempts before moving to DLQ
claim_timeout_ms (int) – Idle time (ms) before claiming pending messages
- Return type:
None
Methods
__init__([redis_url, stream_name, ...])Initialize RobustStreamProcessor.
claim_pending_messages()Claim messages from crashed/slow consumers.
close()Close the Redis connection.
get_dlq_messages([count])Get messages from dead-letter queue for inspection.
get_health_stats()Get processor health statistics.
initialize()Initialize the processor and create consumer group if needed.
process_batch(handler[, batch_size, ...])Process a batch of messages from the stream.
process_with_recovery(handler[, batch_size, ...])Process stream with automatic failure recovery.
replay_dlq_message(dlq_message_id)Replay a DLQ message back to main stream.