redis_openai_agents.AgentCoordinator#

class AgentCoordinator(redis_url='redis://localhost:6379', stream_name='agent_events', consumer_group=None, consumer_name=None)[source]#

Streams-based coordination for distributed agents.

Enables: - Real-time handoff notifications (low latency vs polling) - Tool result broadcasting to multiple consumers - State synchronization across replicas - Crash recovery via pending message claiming

Example

>>> coordinator = AgentCoordinator(
...     redis_url="redis://localhost:6379",
...     stream_name="agent_events",
...     consumer_group="workers",
... )
>>> await coordinator.initialize()
>>> await coordinator.publish_handoff_ready(
...     from_agent="research",
...     to_agent="analysis",
...     session_id="sess_123",
...     context={"data": "value"},
... )

Initialize AgentCoordinator.

Parameters:
  • redis_url (str) – Redis connection URL

  • stream_name (str) – Name of the Redis Stream

  • consumer_group (str | None) – Consumer group name (required for subscribing)

  • consumer_name (str | None) – Consumer name within group (auto-generated if not provided)

__init__(redis_url='redis://localhost:6379', stream_name='agent_events', consumer_group=None, consumer_name=None)[source]#

Initialize AgentCoordinator.

Parameters:
  • redis_url (str) – Redis connection URL

  • stream_name (str) – Name of the Redis Stream

  • consumer_group (str | None) – Consumer group name (required for subscribing)

  • consumer_name (str | None) – Consumer name within group (auto-generated if not provided)

Return type:

None

Methods

__init__([redis_url, stream_name, ...])

Initialize AgentCoordinator.

claim_abandoned_messages([min_idle_ms])

Claim messages from crashed consumers.

close()

Close the Redis connection.

get_stream_info()

Get stream statistics.

initialize()

Initialize the coordinator and create consumer group if needed.

publish_agent_completed(agent_name, ...)

Notify that an agent completed processing.

publish_agent_started(agent_name, ...)

Notify that an agent started processing.

publish_error(session_id, error_type, ...[, ...])

Publish error event.

publish_handoff_ready(from_agent, to_agent, ...)

Notify target agent that handoff is ready.

publish_state_changed(session_id, changes)

Notify about session state changes.

publish_tool_result(tool_name, session_id, ...)

Broadcast tool completion to all interested consumers.

subscribe([event_types, timeout_ms, max_events])

Subscribe to coordination events.

trim_stream(max_length[, approximate])

Trim stream to maximum length.