Source code for redis_openai_agents.middleware.stack

"""MiddlewareStack - composes middleware around a base Model.

The stack implements the OpenAI Agents SDK ``Model`` interface so it can be
passed directly to ``Agent(model=stack)`` or ``Runner.run(..., model=stack)``.
Middlewares are applied outer-to-inner for the request path and inner-to-outer
for the response path.
"""

from __future__ import annotations

from collections.abc import AsyncIterator, Sequence
from typing import TYPE_CHECKING, Any

from agents.models.interface import Model

from .base import AgentMiddleware, ModelCallHandler, ModelRequest

if TYPE_CHECKING:
    pass


[docs] class MiddlewareStack(Model): """Compose middleware around an OpenAI Agents SDK ``Model``. Example:: base = OpenAIResponsesModel(model="gpt-4o") stack = MiddlewareStack( model=base, middlewares=[SemanticCacheMiddleware(cache), MetricsMiddleware(metrics)], ) result = await Runner.run(agent, "Hello", model=stack) Args: model: The inner ``Model`` instance that performs the actual LLM call. middlewares: Sequence of middleware applied outermost-first. """
[docs] def __init__( self, model: Any, middlewares: Sequence[AgentMiddleware], ) -> None: self._inner = model self._middlewares: list[AgentMiddleware] = list(middlewares)
@property def inner(self) -> Any: """The wrapped ``Model`` instance.""" return self._inner @property def middlewares(self) -> list[AgentMiddleware]: """Applied middlewares, in outer-to-inner order.""" return list(self._middlewares) async def get_response( self, system_instructions: str | None, input: Any, model_settings: Any, tools: list[Any], output_schema: Any, handoffs: list[Any], tracing: Any, *, previous_response_id: str | None = None, conversation_id: str | None = None, prompt: Any = None, ) -> Any: """Run the middleware chain then the underlying model.""" request = ModelRequest( system_instructions=system_instructions, input=input, model_settings=model_settings, tools=tools, output_schema=output_schema, handoffs=handoffs, tracing=tracing, previous_response_id=previous_response_id, conversation_id=conversation_id, prompt=prompt, ) async def terminal(req: ModelRequest) -> Any: return await self._inner.get_response( system_instructions=req.system_instructions, input=req.input, model_settings=req.model_settings, tools=req.tools, output_schema=req.output_schema, handoffs=req.handoffs, tracing=req.tracing, previous_response_id=req.previous_response_id, conversation_id=req.conversation_id, prompt=req.prompt, ) chain: ModelCallHandler = terminal for middleware in reversed(self._middlewares): chain = _wrap(middleware, chain) return await chain(request) def stream_response( self, system_instructions: str | None, input: Any, model_settings: Any, tools: list[Any], output_schema: Any, handoffs: list[Any], tracing: Any, *, previous_response_id: str | None = None, conversation_id: str | None = None, prompt: Any = None, ) -> AsyncIterator[Any]: """Delegate streaming directly to the inner model. Streaming middleware is a separate concern (frame-level interception). For now, middlewares only see complete responses via ``get_response``. """ iterator: AsyncIterator[Any] = self._inner.stream_response( system_instructions=system_instructions, input=input, model_settings=model_settings, tools=tools, output_schema=output_schema, handoffs=handoffs, tracing=tracing, previous_response_id=previous_response_id, conversation_id=conversation_id, prompt=prompt, ) return iterator async def close(self) -> None: close = getattr(self._inner, "close", None) if close is not None: await close()
def _wrap(middleware: AgentMiddleware, next_handler: ModelCallHandler) -> ModelCallHandler: async def wrapped(request: ModelRequest) -> Any: return await middleware.awrap_model_call(request, next_handler) return wrapped