Metrics & Observability#

This guide covers built-in metrics collection using RedisTimeSeries and Prometheus export.

Overview#

Redis OpenAI Agents provides:

  • AgentMetrics - RedisTimeSeries-based metrics collection

  • PrometheusExporter - Standard Prometheus metrics endpoint

  • RedisTracingProcessor - Trace storage in Redis Streams

import os
import time
os.environ.setdefault("OPENAI_API_KEY", "your-api-key-here")

REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")

AgentMetrics#

from redis_openai_agents import AgentMetrics

# Create metrics collector (name is required)
metrics = AgentMetrics(name="my-agent", redis_url=REDIS_URL)

print(f"Metrics collector initialized for '{metrics.name}'")

Recording Metrics#

# Record metrics for agent requests using the unified record() method
metrics.record(latency_ms=150.5, input_tokens=100, output_tokens=50, cache_hit=False)
metrics.record(latency_ms=120.3, input_tokens=200, output_tokens=100, cache_hit=True)
metrics.record(latency_ms=180.7, input_tokens=150, output_tokens=75, cache_hit=False)

print("Recorded 3 agent request metrics")
# You can also record partial metrics
metrics.record(latency_ms=95.2)  # Only latency
metrics.record(input_tokens=50, output_tokens=25)  # Only tokens
metrics.record(cache_hit=True)  # Only cache status

print("Recorded partial metrics")
# Async recording
await metrics.arecord(latency_ms=110.0, input_tokens=80, output_tokens=40, cache_hit=False)

print("Recorded async metric")

Querying Metrics#

# Get aggregated statistics
stats = metrics.get_stats()
print("Metrics Statistics:")
print(f"  Total requests: {stats['count']}")
print(f"  Avg latency: {stats['latency_avg']:.1f} ms")
print(f"  Min latency: {stats['latency_min']:.1f} ms")
print(f"  Max latency: {stats['latency_max']:.1f} ms")
print(f"  Total input tokens: {stats['input_tokens_sum']:.0f}")
print(f"  Total output tokens: {stats['output_tokens_sum']:.0f}")
print(f"  Cache hit rate: {stats['cache_hit_rate']:.1%}")
# Query time range using range() — takes metric name and timestamps in ms
# from_time=0 means "from the beginning", to_time=current time
import time as _time

now_ms = int(_time.time() * 1000) + 1000
data = metrics.range("latency", from_time=0, to_time=now_ms)
print(f"Latency data points: {len(data)}")
for ts, val in data[:3]:
    print(f"  {ts}: {val:.1f} ms")
# get_stats() is synchronous — there is no async variant
stats = metrics.get_stats()
print(f"Stats - count: {stats['count']}, avg latency: {stats['latency_avg']:.1f} ms")

Integration with Agent Runs#

from agents import Agent, Runner
from redis_openai_agents import with_metrics

agent = Agent(
    name="monitored-agent",
    instructions="You are a helpful assistant."
)

@with_metrics(metrics)
async def run_monitored(query: str):
    """Run agent with automatic metrics collection."""
    result = await Runner.run(agent, input=query)
    return result
# Run with metrics
result = await run_monitored("What is 2+2?")
print(f"Response: {result.final_output}")

# Check updated metrics (get_stats takes no arguments)
stats = metrics.get_stats()
print(f"Total runs recorded: {stats['count']}")

Prometheus Export#

from redis_openai_agents import PrometheusExporter

# Create Prometheus exporter
exporter = PrometheusExporter(metrics)

# Generate Prometheus text format output
print("Prometheus exporter configured")
# Get metrics in Prometheus format
prometheus_output = exporter.generate()
print("Prometheus output:")
print(prometheus_output)

Custom Metrics#

# The record() method accepts all metrics at once
# This is the recommended pattern for recording agent requests
metrics.record(
    latency_ms=200.0,
    input_tokens=300,
    output_tokens=150,
    cache_hit=False,
)

print("Combined metric recorded")

Tracing#

from redis_openai_agents import RedisTracingProcessor

# RedisTracingProcessor implements the OpenAI Agents SDK TracingProcessor interface.
# It receives trace/span events via SDK callbacks — you don't call it directly.
#
# Usage with the SDK:
#   from agents import set_trace_processors
#   tracer = RedisTracingProcessor(redis_url=REDIS_URL)
#   await tracer.initialize()
#   set_trace_processors([tracer])
#   # Now all Agent runs are automatically traced to Redis
#
# The processor buffers events and flushes them to Redis Streams.

tracer = RedisTracingProcessor(
    redis_url=REDIS_URL,
    stream_name="demo_traces",
    buffer_size=10,
    trace_ttl=3600,  # 1 hour TTL for demo
)
await tracer.initialize()

# Simulate trace events (normally the SDK calls these automatically)
from unittest.mock import SimpleNamespace

mock_trace = SimpleNamespace(
    trace_id="trace-123",
    name="agent_run",
    started_at=time.time(),
    completed_at=time.time() + 0.15,
    error=None,
)

mock_span = SimpleNamespace(
    trace_id="trace-123",
    span_id="span-1",
    started_at=time.time(),
    completed_at=time.time() + 0.1,
    span_data=SimpleNamespace(type="agent"),
    error=None,
)

# These are the SDK callback methods
tracer.on_trace_start(mock_trace)
tracer.on_span_start(mock_span)
tracer.on_span_end(mock_span)
tracer.on_trace_end(mock_trace)

# Flush buffered events to Redis
await tracer.aforce_flush()

print("Trace events flushed to Redis Stream")
# The tracer stores events in a Redis Stream — you can read them directly
import redis.asyncio as aioredis

r = aioredis.from_url(REDIS_URL, decode_responses=True)
entries = await r.xrange("demo_traces")
print(f"Found {len(entries)} events in the trace stream")
for entry_id, fields in entries[:4]:
    print(f"  {fields.get('event_type', 'unknown')}: trace={fields.get('trace_id', 'N/A')}")
await r.aclose()

Best Practices#

1. Record All Metrics in One Call#

metrics.record(
    latency_ms=latency,
    input_tokens=100,
    output_tokens=50,
    cache_hit=False,
)

2. Use the with_metrics Decorator#

@with_metrics(metrics)
async def run_agent(query: str):
    return await Runner.run(agent, input=query)

3. Create Dashboards#

Use Grafana with Prometheus datasource to visualize:

  • Latency percentiles (p50, p95, p99)

  • Token usage over time

  • Cache hit rates

  • Error rates

Cleanup#

# Clean up metrics and tracing
metrics.delete()  # Delete all TimeSeries data
await tracer.close()  # Close the tracing processor connection

print("Metrics deleted and tracer closed!")