Source code for langgraph.checkpoint.redis.message_exporter
"""Message exporter for extracting conversation messages from checkpoints."""
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Protocol
import orjson
[docs]
class MessageRecipe(Protocol):
"""Protocol for message extraction recipes.
Implement this interface to support custom message formats.
"""
[docs]
def extract(self, message: Any) -> Optional[Dict[str, Any]]:
"""Extract structured data from a message.
Args:
message: The message to extract data from.
Returns:
Dict with at least 'role' and 'content' keys, or None if message cannot be extracted.
"""
...
[docs]
class LangChainRecipe:
"""Default recipe for extracting LangChain messages."""
[docs]
def extract(self, message: Any) -> Optional[Dict[str, Any]]:
"""Extract data from LangChain message objects."""
try:
from langchain_core.messages import BaseMessage
if isinstance(message, BaseMessage):
# Handle actual message objects
return {
"role": message.__class__.__name__.replace("Message", "").lower(),
"content": message.content,
"type": message.__class__.__name__,
"id": getattr(message, "id", None),
"metadata": {
"name": getattr(message, "name", None),
"tool_calls": getattr(message, "tool_calls", None),
"additional_kwargs": getattr(message, "additional_kwargs", {}),
},
}
except ImportError:
# langchain_core not available, handle as dict
pass
if isinstance(message, dict):
# Handle serialized LangChain format
if message.get("lc") and message.get("type") == "constructor":
kwargs = message.get("kwargs", {})
message_type = (
message.get("id", ["unknown"])[-1]
if isinstance(message.get("id"), list)
else "unknown"
)
return {
"role": message_type.replace("Message", "").lower(),
"content": kwargs.get("content", ""),
"type": message_type,
"id": kwargs.get("id"),
"metadata": kwargs,
}
# Handle simple dict format
elif "role" in message and "content" in message:
return message
elif isinstance(message, str):
# Plain string message
return {"role": "unknown", "content": message, "type": "string"}
return None
[docs]
class MessageExporter:
"""Export messages from Redis checkpoints."""
def __init__(
self, redis_saver: Any, recipe: Optional[MessageRecipe] = None
) -> None:
self.saver = redis_saver
self.recipe = recipe or LangChainRecipe()
[docs]
def export(
self, thread_id: str, checkpoint_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Export messages from checkpoint data.
Args:
thread_id: The conversation thread ID
checkpoint_id: Specific checkpoint ID (latest if None)
Returns:
List of extracted message dictionaries
"""
# Get checkpoint
if checkpoint_id:
config = {
"configurable": {"thread_id": thread_id, "checkpoint_id": checkpoint_id}
}
checkpoint = self.saver.get(config)
else:
# Get latest checkpoint
checkpoint_tuple = self.saver.get_tuple(
{"configurable": {"thread_id": thread_id}}
)
checkpoint = checkpoint_tuple.checkpoint if checkpoint_tuple else None
if not checkpoint:
return []
# Extract messages from channel_values
messages = checkpoint.get("channel_values", {}).get("messages", [])
extracted = []
for msg in messages:
extracted_msg = self.recipe.extract(msg)
if extracted_msg:
extracted.append(extracted_msg)
return extracted
[docs]
def export_thread(self, thread_id: str) -> Dict[str, Any]:
"""Export all messages from all checkpoints in a thread.
Args:
thread_id: The conversation thread ID
Returns:
Dict with thread_id, messages, and export timestamp
"""
messages = []
seen_ids = set()
# Get all checkpoints for thread
for checkpoint_tuple in self.saver.list(
{"configurable": {"thread_id": thread_id}}
):
checkpoint_messages = checkpoint_tuple.checkpoint.get(
"channel_values", {}
).get("messages", [])
for msg in checkpoint_messages:
extracted = self.recipe.extract(msg)
if extracted:
# Add checkpoint metadata
extracted["checkpoint_id"] = checkpoint_tuple.checkpoint.get("id")
extracted["checkpoint_ts"] = checkpoint_tuple.checkpoint.get("ts")
# Deduplicate by message ID if available
msg_id = extracted.get("id")
if msg_id:
if msg_id in seen_ids:
continue
seen_ids.add(msg_id)
messages.append(extracted)
return {
"thread_id": thread_id,
"messages": messages,
"export_timestamp": datetime.now(timezone.utc).isoformat(),
}