redis_openai_agents.StreamingEventPublisher#
- class StreamingEventPublisher(runner, session_id)[source]#
High-level helper for publishing SDK streaming events.
Provides convenient methods for publishing common event types from the OpenAI Agents SDK streaming interface.
Example
>>> publisher = StreamingEventPublisher(runner, session_id="chat_123") >>> >>> # Publish text deltas as they arrive >>> async for event in sdk_stream_events: ... if isinstance(event, RawResponsesStreamEvent): ... await publisher.publish_raw_event(event.data) ... elif isinstance(event, RunItemStreamEvent): ... await publisher.publish_item_event(event.name, event.item)
Initialize the publisher.
- Parameters:
runner (ResumableStreamRunner) – ResumableStreamRunner instance.
session_id (str) – Session identifier for this stream.
- __init__(runner, session_id)[source]#
Initialize the publisher.
- Parameters:
runner (ResumableStreamRunner) – ResumableStreamRunner instance.
session_id (str) – Session identifier for this stream.
- Return type:
None
Methods
__init__(runner, session_id)Initialize the publisher.
publish_error(error_type, message[, details])Publish error event.
publish_handoff(from_agent, to_agent)Publish agent handoff event.
publish_stream_end([reason, metadata])Publish stream end event.
publish_stream_start(agent_name[, metadata])Publish stream start event.
publish_text_delta(delta)Publish a text delta event.
publish_tool_call(tool_name, arguments[, ...])Publish a tool call event.
publish_tool_result(tool_name, result[, call_id])Publish a tool result event.