redis_openai_agents.StreamingEventPublisher#

class StreamingEventPublisher(runner, session_id)[source]#

High-level helper for publishing SDK streaming events.

Provides convenient methods for publishing common event types from the OpenAI Agents SDK streaming interface.

Example

>>> publisher = StreamingEventPublisher(runner, session_id="chat_123")
>>>
>>> # Publish text deltas as they arrive
>>> async for event in sdk_stream_events:
...     if isinstance(event, RawResponsesStreamEvent):
...         await publisher.publish_raw_event(event.data)
...     elif isinstance(event, RunItemStreamEvent):
...         await publisher.publish_item_event(event.name, event.item)

Initialize the publisher.

Parameters:
  • runner (ResumableStreamRunner) – ResumableStreamRunner instance.

  • session_id (str) – Session identifier for this stream.

__init__(runner, session_id)[source]#

Initialize the publisher.

Parameters:
  • runner (ResumableStreamRunner) – ResumableStreamRunner instance.

  • session_id (str) – Session identifier for this stream.

Return type:

None

Methods

__init__(runner, session_id)

Initialize the publisher.

publish_error(error_type, message[, details])

Publish error event.

publish_handoff(from_agent, to_agent)

Publish agent handoff event.

publish_stream_end([reason, metadata])

Publish stream end event.

publish_stream_start(agent_name[, metadata])

Publish stream start event.

publish_text_delta(delta)

Publish a text delta event.

publish_tool_call(tool_name, arguments[, ...])

Publish a tool call event.

publish_tool_result(tool_name, result[, call_id])

Publish a tool result event.