Metrics & Observability#
This guide covers built-in metrics collection using RedisTimeSeries and Prometheus export.
Overview#
Redis OpenAI Agents provides:
AgentMetrics - RedisTimeSeries-based metrics collection
PrometheusExporter - Standard Prometheus metrics endpoint
RedisTracingProcessor - Trace storage in Redis Streams
import os
import time
os.environ.setdefault("OPENAI_API_KEY", "your-api-key-here")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
AgentMetrics#
from redis_openai_agents import AgentMetrics
# Create metrics collector (name is required)
metrics = AgentMetrics(name="my-agent", redis_url=REDIS_URL)
print(f"Metrics collector initialized for '{metrics.name}'")
Recording Metrics#
# Record metrics for agent requests using the unified record() method
metrics.record(latency_ms=150.5, input_tokens=100, output_tokens=50, cache_hit=False)
metrics.record(latency_ms=120.3, input_tokens=200, output_tokens=100, cache_hit=True)
metrics.record(latency_ms=180.7, input_tokens=150, output_tokens=75, cache_hit=False)
print("Recorded 3 agent request metrics")
# You can also record partial metrics
metrics.record(latency_ms=95.2) # Only latency
metrics.record(input_tokens=50, output_tokens=25) # Only tokens
metrics.record(cache_hit=True) # Only cache status
print("Recorded partial metrics")
# Async recording
await metrics.arecord(latency_ms=110.0, input_tokens=80, output_tokens=40, cache_hit=False)
print("Recorded async metric")
Querying Metrics#
# Get aggregated statistics
stats = metrics.get_stats()
print("Metrics Statistics:")
print(f" Total requests: {stats['count']}")
print(f" Avg latency: {stats['latency_avg']:.1f} ms")
print(f" Min latency: {stats['latency_min']:.1f} ms")
print(f" Max latency: {stats['latency_max']:.1f} ms")
print(f" Total input tokens: {stats['input_tokens_sum']:.0f}")
print(f" Total output tokens: {stats['output_tokens_sum']:.0f}")
print(f" Cache hit rate: {stats['cache_hit_rate']:.1%}")
# Query time range using range() — takes metric name and timestamps in ms
# from_time=0 means "from the beginning", to_time=current time
import time as _time
now_ms = int(_time.time() * 1000) + 1000
data = metrics.range("latency", from_time=0, to_time=now_ms)
print(f"Latency data points: {len(data)}")
for ts, val in data[:3]:
print(f" {ts}: {val:.1f} ms")
# get_stats() is synchronous — there is no async variant
stats = metrics.get_stats()
print(f"Stats - count: {stats['count']}, avg latency: {stats['latency_avg']:.1f} ms")
Integration with Agent Runs#
from agents import Agent, Runner
from redis_openai_agents import with_metrics
agent = Agent(
name="monitored-agent",
instructions="You are a helpful assistant."
)
@with_metrics(metrics)
async def run_monitored(query: str):
"""Run agent with automatic metrics collection."""
result = await Runner.run(agent, input=query)
return result
# Run with metrics
result = await run_monitored("What is 2+2?")
print(f"Response: {result.final_output}")
# Check updated metrics (get_stats takes no arguments)
stats = metrics.get_stats()
print(f"Total runs recorded: {stats['count']}")
Prometheus Export#
from redis_openai_agents import PrometheusExporter
# Create Prometheus exporter
exporter = PrometheusExporter(metrics)
# Generate Prometheus text format output
print("Prometheus exporter configured")
# Get metrics in Prometheus format
prometheus_output = exporter.generate()
print("Prometheus output:")
print(prometheus_output)
Custom Metrics#
# The record() method accepts all metrics at once
# This is the recommended pattern for recording agent requests
metrics.record(
latency_ms=200.0,
input_tokens=300,
output_tokens=150,
cache_hit=False,
)
print("Combined metric recorded")
Tracing#
from redis_openai_agents import RedisTracingProcessor
# RedisTracingProcessor implements the OpenAI Agents SDK TracingProcessor interface.
# It receives trace/span events via SDK callbacks — you don't call it directly.
#
# Usage with the SDK:
# from agents import set_trace_processors
# tracer = RedisTracingProcessor(redis_url=REDIS_URL)
# await tracer.initialize()
# set_trace_processors([tracer])
# # Now all Agent runs are automatically traced to Redis
#
# The processor buffers events and flushes them to Redis Streams.
tracer = RedisTracingProcessor(
redis_url=REDIS_URL,
stream_name="demo_traces",
buffer_size=10,
trace_ttl=3600, # 1 hour TTL for demo
)
await tracer.initialize()
# Simulate trace events (normally the SDK calls these automatically)
from unittest.mock import SimpleNamespace
mock_trace = SimpleNamespace(
trace_id="trace-123",
name="agent_run",
started_at=time.time(),
completed_at=time.time() + 0.15,
error=None,
)
mock_span = SimpleNamespace(
trace_id="trace-123",
span_id="span-1",
started_at=time.time(),
completed_at=time.time() + 0.1,
span_data=SimpleNamespace(type="agent"),
error=None,
)
# These are the SDK callback methods
tracer.on_trace_start(mock_trace)
tracer.on_span_start(mock_span)
tracer.on_span_end(mock_span)
tracer.on_trace_end(mock_trace)
# Flush buffered events to Redis
await tracer.aforce_flush()
print("Trace events flushed to Redis Stream")
# The tracer stores events in a Redis Stream — you can read them directly
import redis.asyncio as aioredis
r = aioredis.from_url(REDIS_URL, decode_responses=True)
entries = await r.xrange("demo_traces")
print(f"Found {len(entries)} events in the trace stream")
for entry_id, fields in entries[:4]:
print(f" {fields.get('event_type', 'unknown')}: trace={fields.get('trace_id', 'N/A')}")
await r.aclose()
Best Practices#
1. Record All Metrics in One Call#
metrics.record(
latency_ms=latency,
input_tokens=100,
output_tokens=50,
cache_hit=False,
)
2. Use the with_metrics Decorator#
@with_metrics(metrics)
async def run_agent(query: str):
return await Runner.run(agent, input=query)
3. Create Dashboards#
Use Grafana with Prometheus datasource to visualize:
Latency percentiles (p50, p95, p99)
Token usage over time
Cache hit rates
Error rates
Cleanup#
# Clean up metrics and tracing
metrics.delete() # Delete all TimeSeries data
await tracer.close() # Close the tracing processor connection
print("Metrics deleted and tracer closed!")