Agent Coordination#
This guide covers multi-agent coordination using Redis pub/sub and atomic operations.
Overview#
AgentCoordinator enables:
Real-time handoffs between agents
State synchronization across distributed agents
Event-driven communication via pub/sub
import os
import asyncio
os.environ.setdefault("OPENAI_API_KEY", "your-api-key-here")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
Basic Coordination#
from redis_openai_agents import AgentCoordinator, EventType
# Create a coordinator for a session
coordinator = AgentCoordinator(
session_id="support-session-001",
redis_url=REDIS_URL
)
print(f"Coordinator created for session: {coordinator.session_id}")
Event Types#
# Available event types
print("Available event types:")
for event_type in EventType:
print(f" - {event_type.name}: {event_type.value}")
Publishing Events#
# Publish a handoff event
await coordinator.publish(EventType.HANDOFF_READY, {
"from_agent": "triage",
"to_agent": "billing_specialist",
"context": {
"issue_type": "refund",
"order_id": "ORD-12345",
"customer_id": "CUST-789"
},
"conversation_summary": "Customer requesting refund for order ORD-12345"
})
print("Handoff event published")
# Publish agent status
await coordinator.publish(EventType.AGENT_READY, {
"agent_id": "billing_specialist",
"capabilities": ["refunds", "invoices", "subscriptions"],
"load": 0.3
})
print("Agent ready event published")
Multi-Agent System Example#
from agents import Agent, Runner
# Define specialized agents
triage_agent = Agent(
name="triage",
instructions="""You are a triage agent. Analyze the customer's request and
determine which specialist should handle it. Categories: billing, technical, sales."""
)
billing_agent = Agent(
name="billing_specialist",
instructions="""You are a billing specialist. Handle refunds, invoices,
and subscription issues. Be empathetic and solution-oriented."""
)
tech_agent = Agent(
name="tech_specialist",
instructions="""You are a technical support specialist. Help troubleshoot
issues and provide step-by-step solutions."""
)
async def triage_and_handoff(query: str, session_id: str):
"""Triage a query and handoff to specialist."""
coord = AgentCoordinator(session_id=session_id, redis_url=REDIS_URL)
# Run triage
result = await Runner.run(
triage_agent,
input=f"Analyze this request and respond with the category (billing/technical/sales): {query}"
)
# Determine target agent
category = result.final_output.lower()
if "billing" in category:
target = "billing_specialist"
elif "technical" in category:
target = "tech_specialist"
else:
target = "sales_specialist"
# Signal handoff
await coord.publish(EventType.HANDOFF_READY, {
"from_agent": "triage",
"to_agent": target,
"query": query,
"triage_result": result.final_output
})
return target
# Test triage
target = await triage_and_handoff(
"I was charged twice for my subscription last month",
"session-demo-001"
)
print(f"Handoff to: {target}")
Subscribing to Events#
async def specialist_listener(agent_name: str, session_id: str):
"""Listen for handoff events for a specific agent."""
coord = AgentCoordinator(session_id=session_id, redis_url=REDIS_URL)
print(f"{agent_name} listening for events...")
# In production, this would run continuously
async for event in coord.subscribe(timeout=5.0): # 5 second timeout for demo
if event.type == EventType.HANDOFF_READY:
if event.data.get("to_agent") == agent_name:
print(f"\n{agent_name} received handoff!")
print(f" From: {event.data.get('from_agent')}")
print(f" Query: {event.data.get('query')}")
return event.data
print(f"{agent_name} timeout - no events received")
return None
# Simulate specialist listening (with timeout)
result = await specialist_listener("billing_specialist", "session-demo-001")
Atomic Operations#
from redis_openai_agents import AtomicOperations
# Create atomic operations handler
atomic = AtomicOperations(redis_url=REDIS_URL)
# Atomic handoff - ensures only one agent can claim a handoff
claimed = await atomic.claim_handoff(
session_id="session-demo-001",
agent_id="billing_specialist"
)
if claimed:
print("Handoff claimed successfully!")
else:
print("Handoff already claimed by another agent")
# Try to claim again (should fail)
claimed_again = await atomic.claim_handoff(
session_id="session-demo-001",
agent_id="tech_specialist" # Different agent
)
print(f"Second claim attempt: {claimed_again}")
# Release handoff
await atomic.release_handoff(session_id="session-demo-001")
print("Handoff released")
State Management#
# Set shared state
await coordinator.set_state("current_agent", "billing_specialist")
await coordinator.set_state("issue_status", "in_progress")
await coordinator.set_state("resolution", None)
# Get state
current = await coordinator.get_state("current_agent")
print(f"Current agent: {current}")
# Get all state
all_state = await coordinator.get_all_state()
print(f"All state: {all_state}")
Best Practices#
1. Use Unique Session IDs#
session_id = f"{user_id}:{conversation_id}:{timestamp}"
2. Include Context in Handoffs#
await coordinator.publish(EventType.HANDOFF_READY, {
"from_agent": "triage",
"to_agent": "specialist",
"context": {
"conversation_history": messages,
"customer_info": customer_data,
"intent": detected_intent
}
})
3. Handle Timeouts#
async for event in coordinator.subscribe(timeout=30.0):
# Handle event
pass
else:
# Timeout - escalate or fallback
await escalate_to_human()
Cleanup#
# Clean up
await coordinator.clear()
await atomic.clear()
print("Coordination state cleared!")