redis_openai_agents.RankedOperations#

class RankedOperations(redis_url='redis://localhost:6379', prefix='rank')[source]#

Sorted set-based ranking for agent operations.

Use cases: - Agent success rate leaderboards - Tool effectiveness ranking - LRU session tracking - Rate limit budgets

Example

>>> ranking = RankedOperations(redis_url="redis://localhost:6379")
>>> await ranking.initialize()
>>> await ranking.record_agent_success("agent_1", "research", True, 150.0)
>>> top_agents = await ranking.get_best_agents("research", limit=5)

Initialize RankedOperations.

Parameters:
  • redis_url (str) – Redis connection URL

  • prefix (str) – Key prefix for all sorted sets

__init__(redis_url='redis://localhost:6379', prefix='rank')[source]#

Initialize RankedOperations.

Parameters:
  • redis_url (str) – Redis connection URL

  • prefix (str) – Key prefix for all sorted sets

Return type:

None

Methods

__init__([redis_url, prefix])

Initialize RankedOperations.

check_token_budget(user_id, tokens_needed[, ...])

Check if user has token budget remaining.

close()

Close the Redis connection.

evict_stale_sessions([max_age_seconds, ...])

Evict sessions not accessed within max_age.

get_best_agents(task_type[, limit])

Get top-performing agents for task type.

get_best_tools([limit])

Get top-performing tools.

get_stale_sessions([max_age_seconds, limit])

Get sessions not accessed within max_age.

get_token_usage(user_id[, budget_per_hour])

Get token usage statistics for user.

initialize()

Initialize the Redis connection.

record_agent_success(agent_id, task_type, ...)

Update agent performance score.

record_tool_success(tool_name, success, ...)

Update tool effectiveness score.

touch_session(session_id)

Mark session as recently used.