redis_openai_agents.RankedOperations#
- class RankedOperations(redis_url='redis://localhost:6379', prefix='rank')[source]#
Sorted set-based ranking for agent operations.
Use cases: - Agent success rate leaderboards - Tool effectiveness ranking - LRU session tracking - Rate limit budgets
Example
>>> ranking = RankedOperations(redis_url="redis://localhost:6379") >>> await ranking.initialize() >>> await ranking.record_agent_success("agent_1", "research", True, 150.0) >>> top_agents = await ranking.get_best_agents("research", limit=5)
Initialize RankedOperations.
- Parameters:
redis_url (str) – Redis connection URL
prefix (str) – Key prefix for all sorted sets
- __init__(redis_url='redis://localhost:6379', prefix='rank')[source]#
Initialize RankedOperations.
- Parameters:
redis_url (str) – Redis connection URL
prefix (str) – Key prefix for all sorted sets
- Return type:
None
Methods
__init__([redis_url, prefix])Initialize RankedOperations.
check_token_budget(user_id, tokens_needed[, ...])Check if user has token budget remaining.
close()Close the Redis connection.
evict_stale_sessions([max_age_seconds, ...])Evict sessions not accessed within max_age.
get_best_agents(task_type[, limit])Get top-performing agents for task type.
get_best_tools([limit])Get top-performing tools.
get_stale_sessions([max_age_seconds, limit])Get sessions not accessed within max_age.
get_token_usage(user_id[, budget_per_hour])Get token usage statistics for user.
initialize()Initialize the Redis connection.
record_agent_success(agent_id, task_type, ...)Update agent performance score.
record_tool_success(tool_name, success, ...)Update tool effectiveness score.
touch_session(session_id)Mark session as recently used.