Agent Coordination#

This guide covers multi-agent coordination using Redis pub/sub and atomic operations.

Overview#

AgentCoordinator enables:

  • Real-time handoffs between agents

  • State synchronization across distributed agents

  • Event-driven communication via pub/sub

import os
import asyncio
os.environ.setdefault("OPENAI_API_KEY", "your-api-key-here")

REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")

Basic Coordination#

from redis_openai_agents import AgentCoordinator, EventType

# Create a coordinator for a session
coordinator = AgentCoordinator(
    session_id="support-session-001",
    redis_url=REDIS_URL
)

print(f"Coordinator created for session: {coordinator.session_id}")

Event Types#

# Available event types
print("Available event types:")
for event_type in EventType:
    print(f"  - {event_type.name}: {event_type.value}")

Publishing Events#

# Publish a handoff event
await coordinator.publish(EventType.HANDOFF_READY, {
    "from_agent": "triage",
    "to_agent": "billing_specialist",
    "context": {
        "issue_type": "refund",
        "order_id": "ORD-12345",
        "customer_id": "CUST-789"
    },
    "conversation_summary": "Customer requesting refund for order ORD-12345"
})

print("Handoff event published")
# Publish agent status
await coordinator.publish(EventType.AGENT_READY, {
    "agent_id": "billing_specialist",
    "capabilities": ["refunds", "invoices", "subscriptions"],
    "load": 0.3
})

print("Agent ready event published")

Multi-Agent System Example#

from agents import Agent, Runner

# Define specialized agents
triage_agent = Agent(
    name="triage",
    instructions="""You are a triage agent. Analyze the customer's request and 
    determine which specialist should handle it. Categories: billing, technical, sales."""
)

billing_agent = Agent(
    name="billing_specialist",
    instructions="""You are a billing specialist. Handle refunds, invoices, 
    and subscription issues. Be empathetic and solution-oriented."""
)

tech_agent = Agent(
    name="tech_specialist",
    instructions="""You are a technical support specialist. Help troubleshoot 
    issues and provide step-by-step solutions."""
)
async def triage_and_handoff(query: str, session_id: str):
    """Triage a query and handoff to specialist."""
    coord = AgentCoordinator(session_id=session_id, redis_url=REDIS_URL)
    
    # Run triage
    result = await Runner.run(
        triage_agent,
        input=f"Analyze this request and respond with the category (billing/technical/sales): {query}"
    )
    
    # Determine target agent
    category = result.final_output.lower()
    if "billing" in category:
        target = "billing_specialist"
    elif "technical" in category:
        target = "tech_specialist"
    else:
        target = "sales_specialist"
    
    # Signal handoff
    await coord.publish(EventType.HANDOFF_READY, {
        "from_agent": "triage",
        "to_agent": target,
        "query": query,
        "triage_result": result.final_output
    })
    
    return target
# Test triage
target = await triage_and_handoff(
    "I was charged twice for my subscription last month",
    "session-demo-001"
)
print(f"Handoff to: {target}")

Subscribing to Events#

async def specialist_listener(agent_name: str, session_id: str):
    """Listen for handoff events for a specific agent."""
    coord = AgentCoordinator(session_id=session_id, redis_url=REDIS_URL)
    
    print(f"{agent_name} listening for events...")
    
    # In production, this would run continuously
    async for event in coord.subscribe(timeout=5.0):  # 5 second timeout for demo
        if event.type == EventType.HANDOFF_READY:
            if event.data.get("to_agent") == agent_name:
                print(f"\n{agent_name} received handoff!")
                print(f"  From: {event.data.get('from_agent')}")
                print(f"  Query: {event.data.get('query')}")
                return event.data
    
    print(f"{agent_name} timeout - no events received")
    return None
# Simulate specialist listening (with timeout)
result = await specialist_listener("billing_specialist", "session-demo-001")

Atomic Operations#

from redis_openai_agents import AtomicOperations

# Create atomic operations handler
atomic = AtomicOperations(redis_url=REDIS_URL)

# Atomic handoff - ensures only one agent can claim a handoff
claimed = await atomic.claim_handoff(
    session_id="session-demo-001",
    agent_id="billing_specialist"
)

if claimed:
    print("Handoff claimed successfully!")
else:
    print("Handoff already claimed by another agent")
# Try to claim again (should fail)
claimed_again = await atomic.claim_handoff(
    session_id="session-demo-001",
    agent_id="tech_specialist"  # Different agent
)

print(f"Second claim attempt: {claimed_again}")
# Release handoff
await atomic.release_handoff(session_id="session-demo-001")
print("Handoff released")

State Management#

# Set shared state
await coordinator.set_state("current_agent", "billing_specialist")
await coordinator.set_state("issue_status", "in_progress")
await coordinator.set_state("resolution", None)

# Get state
current = await coordinator.get_state("current_agent")
print(f"Current agent: {current}")

# Get all state
all_state = await coordinator.get_all_state()
print(f"All state: {all_state}")

Best Practices#

1. Use Unique Session IDs#

session_id = f"{user_id}:{conversation_id}:{timestamp}"

2. Include Context in Handoffs#

await coordinator.publish(EventType.HANDOFF_READY, {
    "from_agent": "triage",
    "to_agent": "specialist",
    "context": {
        "conversation_history": messages,
        "customer_info": customer_data,
        "intent": detected_intent
    }
})

3. Handle Timeouts#

async for event in coordinator.subscribe(timeout=30.0):
    # Handle event
    pass
else:
    # Timeout - escalate or fallback
    await escalate_to_human()

Cleanup#

# Clean up
await coordinator.clear()
await atomic.clear()

print("Coordination state cleared!")