Token Streaming#

This guide covers reliable, replayable token streaming using Redis Streams.

Overview#

Redis Streams provide:

  • Persistence - Events are stored and can be replayed

  • Consumer Groups - Multiple consumers can process events

  • Acknowledgment - Guaranteed delivery with message acknowledgment

  • Recovery - Resume from any position after disconnection

import os
import asyncio
os.environ.setdefault("OPENAI_API_KEY", "your-api-key-here")

REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")

Basic Streaming#

from redis_openai_agents import RedisStreamTransport

# Create a stream transport
transport = RedisStreamTransport(
    stream_name="demo-stream",
    redis_url=REDIS_URL
)

print(f"Stream transport created: {transport.stream_name}")

Publishing Events#

# Publish token events
tokens = ["Hello", " ", "world", "!", " ", "How", " ", "are", " ", "you", "?"]

for token in tokens:
    await transport.publish({
        "type": "token",
        "data": {"text": token}
    })

# Signal completion
await transport.publish({
    "type": "complete",
    "data": {"total_tokens": len(tokens)}
})

print(f"Published {len(tokens)} tokens")

Consuming Events#

from redis_openai_agents import RobustStreamProcessor

# Create a consumer
processor = RobustStreamProcessor(
    stream_name="demo-stream",
    consumer_group="demo-consumers",
    consumer_name="consumer-1",
    redis_url=REDIS_URL
)

# Process events
print("Received: ", end="")
async for event in processor.process():
    if event["type"] == "token":
        print(event["data"]["text"], end="", flush=True)
    elif event["type"] == "complete":
        print(f"\n\nStream complete! Total tokens: {event['data']['total_tokens']}")
        break

Streaming Agent Output#

from redis_openai_agents import StreamingEventPublisher
from agents import Agent, Runner

# Create agent
agent = Agent(
    name="streaming-agent",
    instructions="You are a helpful assistant. Give concise responses."
)

# Create a new stream for this agent run
agent_stream = RedisStreamTransport(
    stream_name="agent-output-demo",
    redis_url=REDIS_URL
)

# Run agent with streaming
async def run_agent_streaming(query: str):
    result = await Runner.run(agent, input=query, stream=True)
    
    # Publish tokens to stream
    async for event in result.stream:
        await agent_stream.publish({
            "type": "token",
            "data": {"text": event.text}
        })
    
    # Signal completion
    await agent_stream.publish({
        "type": "complete",
        "data": {"final_output": result.final_output}
    })

Multiple Consumers#

Consumer groups allow multiple consumers to process events in parallel.

# Create multiple consumers for load balancing
async def create_consumer(name: str):
    processor = RobustStreamProcessor(
        stream_name="shared-stream",
        consumer_group="workers",
        consumer_name=name,
        redis_url=REDIS_URL
    )
    return processor

# Each consumer will get different messages
consumer1 = await create_consumer("worker-1")
consumer2 = await create_consumer("worker-2")

print("Created 2 consumers in 'workers' group")

Resumable Streaming#

from redis_openai_agents import ResumableStreamRunner

# Create a resumable runner
runner = ResumableStreamRunner(
    stream_name="resumable-stream",
    redis_url=REDIS_URL
)

# Start streaming with checkpoints
session_id = "session-123"

# Publish with checkpoints
for i, token in enumerate(["Part", " ", "1", ".", " ", "Part", " ", "2", "."]):
    await runner.publish(
        session_id=session_id,
        event={"type": "token", "data": {"text": token}},
        checkpoint=i  # Save position
    )

print("Published with checkpoints")
# Resume from checkpoint (e.g., after reconnection)
last_checkpoint = 4  # Simulate where we left off

print(f"Resuming from checkpoint {last_checkpoint}:")
async for event in runner.resume(session_id=session_id, from_checkpoint=last_checkpoint):
    if event["type"] == "token":
        print(event["data"]["text"], end="", flush=True)
print()

Event Types#

Standard event types for agent streaming:

Type

Purpose

token

Text token from LLM

tool_call

Agent calling a tool

tool_result

Tool execution result

handoff

Agent handoff event

error

Error occurred

complete

Stream finished

# Example: Full agent event stream
event_stream = RedisStreamTransport(
    stream_name="full-agent-stream",
    redis_url=REDIS_URL
)

# Simulate agent events
events = [
    {"type": "token", "data": {"text": "Let me search for that..."}},
    {"type": "tool_call", "data": {"tool": "search", "args": {"query": "Redis"}}},
    {"type": "tool_result", "data": {"tool": "search", "result": "Found 5 results"}},
    {"type": "token", "data": {"text": "Based on the search results..."}},
    {"type": "complete", "data": {"status": "success"}}
]

for event in events:
    await event_stream.publish(event)
    print(f"Published: {event['type']}")

Best Practices#

1. Use Unique Stream Names#

stream_name = f"agent-output:{session_id}:{run_id}"

2. Set Stream TTL#

transport = RedisStreamTransport(
    stream_name="...",
    redis_url=REDIS_URL,
    max_len=1000  # Keep last 1000 events
)

3. Handle Errors Gracefully#

try:
    async for event in processor.process():
        handle_event(event)
except StreamError as e:
    await transport.publish({"type": "error", "data": {"message": str(e)}})

Cleanup#

# Clean up streams
await transport.clear()
await agent_stream.clear()
await event_stream.clear()

print("Streams cleaned up!")