redis_openai_agents.AgentCoordinator#
- class AgentCoordinator(redis_url='redis://localhost:6379', stream_name='agent_events', consumer_group=None, consumer_name=None)[source]#
Streams-based coordination for distributed agents.
Enables: - Real-time handoff notifications (low latency vs polling) - Tool result broadcasting to multiple consumers - State synchronization across replicas - Crash recovery via pending message claiming
Example
>>> coordinator = AgentCoordinator( ... redis_url="redis://localhost:6379", ... stream_name="agent_events", ... consumer_group="workers", ... ) >>> await coordinator.initialize() >>> await coordinator.publish_handoff_ready( ... from_agent="research", ... to_agent="analysis", ... session_id="sess_123", ... context={"data": "value"}, ... )
Initialize AgentCoordinator.
- Parameters:
redis_url (str) – Redis connection URL
stream_name (str) – Name of the Redis Stream
consumer_group (str | None) – Consumer group name (required for subscribing)
consumer_name (str | None) – Consumer name within group (auto-generated if not provided)
- __init__(redis_url='redis://localhost:6379', stream_name='agent_events', consumer_group=None, consumer_name=None)[source]#
Initialize AgentCoordinator.
- Parameters:
redis_url (str) – Redis connection URL
stream_name (str) – Name of the Redis Stream
consumer_group (str | None) – Consumer group name (required for subscribing)
consumer_name (str | None) – Consumer name within group (auto-generated if not provided)
- Return type:
None
Methods
__init__([redis_url, stream_name, ...])Initialize AgentCoordinator.
claim_abandoned_messages([min_idle_ms])Claim messages from crashed consumers.
close()Close the Redis connection.
get_stream_info()Get stream statistics.
initialize()Initialize the coordinator and create consumer group if needed.
publish_agent_completed(agent_name, ...)Notify that an agent completed processing.
publish_agent_started(agent_name, ...)Notify that an agent started processing.
publish_error(session_id, error_type, ...[, ...])Publish error event.
publish_handoff_ready(from_agent, to_agent, ...)Notify target agent that handoff is ready.
publish_state_changed(session_id, changes)Notify about session state changes.
publish_tool_result(tool_name, session_id, ...)Broadcast tool completion to all interested consumers.
subscribe([event_types, timeout_ms, max_events])Subscribe to coordination events.
trim_stream(max_length[, approximate])Trim stream to maximum length.