Token Streaming#
This guide covers reliable, replayable token streaming using Redis Streams.
Overview#
Redis Streams provide:
Persistence - Events are stored and can be replayed
Consumer Groups - Multiple consumers can process events
Acknowledgment - Guaranteed delivery with message acknowledgment
Recovery - Resume from any position after disconnection
import os
import asyncio
os.environ.setdefault("OPENAI_API_KEY", "your-api-key-here")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
Basic Streaming#
from redis_openai_agents import RedisStreamTransport
# Create a stream transport
transport = RedisStreamTransport(
stream_name="demo-stream",
redis_url=REDIS_URL
)
print(f"Stream transport created: {transport.stream_name}")
Publishing Events#
# Publish token events
tokens = ["Hello", " ", "world", "!", " ", "How", " ", "are", " ", "you", "?"]
for token in tokens:
await transport.publish({
"type": "token",
"data": {"text": token}
})
# Signal completion
await transport.publish({
"type": "complete",
"data": {"total_tokens": len(tokens)}
})
print(f"Published {len(tokens)} tokens")
Consuming Events#
from redis_openai_agents import RobustStreamProcessor
# Create a consumer
processor = RobustStreamProcessor(
stream_name="demo-stream",
consumer_group="demo-consumers",
consumer_name="consumer-1",
redis_url=REDIS_URL
)
# Process events
print("Received: ", end="")
async for event in processor.process():
if event["type"] == "token":
print(event["data"]["text"], end="", flush=True)
elif event["type"] == "complete":
print(f"\n\nStream complete! Total tokens: {event['data']['total_tokens']}")
break
Streaming Agent Output#
from redis_openai_agents import StreamingEventPublisher
from agents import Agent, Runner
# Create agent
agent = Agent(
name="streaming-agent",
instructions="You are a helpful assistant. Give concise responses."
)
# Create a new stream for this agent run
agent_stream = RedisStreamTransport(
stream_name="agent-output-demo",
redis_url=REDIS_URL
)
# Run agent with streaming
async def run_agent_streaming(query: str):
result = await Runner.run(agent, input=query, stream=True)
# Publish tokens to stream
async for event in result.stream:
await agent_stream.publish({
"type": "token",
"data": {"text": event.text}
})
# Signal completion
await agent_stream.publish({
"type": "complete",
"data": {"final_output": result.final_output}
})
Multiple Consumers#
Consumer groups allow multiple consumers to process events in parallel.
# Create multiple consumers for load balancing
async def create_consumer(name: str):
processor = RobustStreamProcessor(
stream_name="shared-stream",
consumer_group="workers",
consumer_name=name,
redis_url=REDIS_URL
)
return processor
# Each consumer will get different messages
consumer1 = await create_consumer("worker-1")
consumer2 = await create_consumer("worker-2")
print("Created 2 consumers in 'workers' group")
Resumable Streaming#
from redis_openai_agents import ResumableStreamRunner
# Create a resumable runner
runner = ResumableStreamRunner(
stream_name="resumable-stream",
redis_url=REDIS_URL
)
# Start streaming with checkpoints
session_id = "session-123"
# Publish with checkpoints
for i, token in enumerate(["Part", " ", "1", ".", " ", "Part", " ", "2", "."]):
await runner.publish(
session_id=session_id,
event={"type": "token", "data": {"text": token}},
checkpoint=i # Save position
)
print("Published with checkpoints")
# Resume from checkpoint (e.g., after reconnection)
last_checkpoint = 4 # Simulate where we left off
print(f"Resuming from checkpoint {last_checkpoint}:")
async for event in runner.resume(session_id=session_id, from_checkpoint=last_checkpoint):
if event["type"] == "token":
print(event["data"]["text"], end="", flush=True)
print()
Event Types#
Standard event types for agent streaming:
Type |
Purpose |
|---|---|
|
Text token from LLM |
|
Agent calling a tool |
|
Tool execution result |
|
Agent handoff event |
|
Error occurred |
|
Stream finished |
# Example: Full agent event stream
event_stream = RedisStreamTransport(
stream_name="full-agent-stream",
redis_url=REDIS_URL
)
# Simulate agent events
events = [
{"type": "token", "data": {"text": "Let me search for that..."}},
{"type": "tool_call", "data": {"tool": "search", "args": {"query": "Redis"}}},
{"type": "tool_result", "data": {"tool": "search", "result": "Found 5 results"}},
{"type": "token", "data": {"text": "Based on the search results..."}},
{"type": "complete", "data": {"status": "success"}}
]
for event in events:
await event_stream.publish(event)
print(f"Published: {event['type']}")
Best Practices#
1. Use Unique Stream Names#
stream_name = f"agent-output:{session_id}:{run_id}"
2. Set Stream TTL#
transport = RedisStreamTransport(
stream_name="...",
redis_url=REDIS_URL,
max_len=1000 # Keep last 1000 events
)
3. Handle Errors Gracefully#
try:
async for event in processor.process():
handle_event(event)
except StreamError as e:
await transport.publish({"type": "error", "data": {"message": str(e)}})
Cleanup#
# Clean up streams
await transport.clear()
await agent_stream.clear()
await event_stream.clear()
print("Streams cleaned up!")