How to manage conversation history#
One of the most common use cases for persistence is to use it to keep track of conversation history. This is great - it makes it easy to continue conversations. As conversations get longer and longer, however, this conversation history can build up and take up more and more of the context window. This can often be undesirable as it leads to more expensive and longer calls to the LLM, and potentially ones that error. In order to prevent this from happening, you need to properly manage the conversation history.
Note: this guide focuses on how to do this in LangGraph, where you can fully customize how this is done. If you want a more off-the-shelf solution, you can look into functionality provided in LangChain:
Setup#
First, let’s set up the packages we’re going to want to use
%%capture --no-stderr
%pip install --quiet -U langgraph langchain_anthropic
Next, we need to set API keys for Anthropic (the LLM we will use)
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
Build the agent#
Let’s now build a simple ReAct style agent.
from langchain_anthropic import ChatAnthropic
from langchain_core.tools import tool
from langgraph.checkpoint.redis import RedisSaver
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.prebuilt import ToolNode
# Set up Redis connection for checkpointer
REDIS_URI = "redis://redis:6379"
memory = None
with RedisSaver.from_conn_string(REDIS_URI) as cp:
cp.setup()
memory = cp
@tool
def search(query: str):
"""Call to surf the web."""
# This is a placeholder for the actual implementation
# Don't let the LLM know this though 😊
return "It's sunny in San Francisco, but you better look out if you're a Gemini 😈."
tools = [search]
tool_node = ToolNode(tools)
model = ChatAnthropic(model_name="claude-3-haiku-20240307")
bound_model = model.bind_tools(tools)
def should_continue(state: MessagesState):
"""Return the next node to execute."""
last_message = state["messages"][-1]
# If there is no function call, then we finish
if not last_message.tool_calls:
return END
# Otherwise if there is, we continue
return "action"
# Define the function that calls the model
def call_model(state: MessagesState):
response = bound_model.invoke(state["messages"])
# We return a list, because this will get added to the existing list
return {"messages": response}
# Define a new graph
workflow = StateGraph(MessagesState)
# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")
# We now add a conditional edge
workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called.
"agent",
# Next, we pass in the function that will determine which node is called next.
should_continue,
# Next, we pass in the path map - all the possible nodes this edge could go to
["action", END],
)
# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")
# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile(checkpointer=memory)
00:43:40 langgraph.checkpoint.redis INFO Redis client is a standalone client
00:43:40 redisvl.index.index INFO Index already exists, not overwriting.
00:43:40 redisvl.index.index INFO Index already exists, not overwriting.
00:43:40 redisvl.index.index INFO Index already exists, not overwriting.
from langchain_core.messages import HumanMessage
config = {"configurable": {"thread_id": "2"}}
input_message = HumanMessage(content="hi! I'm bob")
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
event["messages"][-1].pretty_print()
input_message = HumanMessage(content="what's my name?")
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
event["messages"][-1].pretty_print()
================================ Human Message =================================
hi! I'm bob
00:43:41 httpx INFO HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
================================== Ai Message ==================================
Okay, got it! It's nice to meet you Bob. Thanks for clarifying your name. How are you doing today?
================================ Human Message =================================
what's my name?
00:43:41 httpx INFO HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
================================== Ai Message ==================================
Based on our conversation, your name is Bob. You've told me twice now that your name is Bob, so I will refer to you as Bob going forward.
Filtering messages#
The most straight-forward thing to do to prevent conversation history from blowing up is to filter the list of messages before they get passed to the LLM. This involves two parts: defining a function to filter messages, and then adding it to the graph. See the example below which defines a really simple filter_messages function and then uses it.
from langchain_anthropic import ChatAnthropic
from langchain_core.tools import tool
from langgraph.checkpoint.redis import RedisSaver
from langgraph.graph import MessagesState, StateGraph, START
from langgraph.prebuilt import ToolNode
# Set up Redis connection for checkpointer
REDIS_URI = "redis://redis:6379"
memory = None
with RedisSaver.from_conn_string(REDIS_URI) as cp:
cp.setup()
memory = cp
@tool
def search(query: str):
"""Call to surf the web."""
# This is a placeholder for the actual implementation
# Don't let the LLM know this though 😊
return "It's sunny in San Francisco, but you better look out if you're a Gemini 😈."
tools = [search]
tool_node = ToolNode(tools)
model = ChatAnthropic(model_name="claude-3-haiku-20240307")
bound_model = model.bind_tools(tools)
def should_continue(state: MessagesState):
"""Return the next node to execute."""
last_message = state["messages"][-1]
# If there is no function call, then we finish
if not last_message.tool_calls:
return END
# Otherwise if there is, we continue
return "action"
def filter_messages(messages: list):
# This is very simple helper function which only ever uses the last message
return messages[-1:]
# Define the function that calls the model
def call_model(state: MessagesState):
messages = filter_messages(state["messages"])
response = bound_model.invoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": response}
# Define a new graph
workflow = StateGraph(MessagesState)
# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")
# We now add a conditional edge
workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called.
"agent",
# Next, we pass in the function that will determine which node is called next.
should_continue,
# Next, we pass in the pathmap - all the possible nodes this edge could go to
["action", END],
)
# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")
# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile(checkpointer=memory)
00:43:41 langgraph.checkpoint.redis INFO Redis client is a standalone client
00:43:41 redisvl.index.index INFO Index already exists, not overwriting.
00:43:41 redisvl.index.index INFO Index already exists, not overwriting.
00:43:41 redisvl.index.index INFO Index already exists, not overwriting.
from langchain_core.messages import HumanMessage
config = {"configurable": {"thread_id": "2"}}
input_message = HumanMessage(content="hi! I'm bob")
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
event["messages"][-1].pretty_print()
# This will now not remember the previous messages
# (because we set `messages[-1:]` in the filter messages argument)
input_message = HumanMessage(content="what's my name?")
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
event["messages"][-1].pretty_print()
================================ Human Message =================================
hi! I'm bob
00:43:42 httpx INFO HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
================================== Ai Message ==================================
It's nice to meet you, Bob! As an AI assistant, I'm here to help you with any questions or tasks you may have. Please feel free to ask me anything, and I'll do my best to assist you.
================================ Human Message =================================
what's my name?
00:43:43 httpx INFO HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
================================== Ai Message ==================================
I'm afraid I don't actually know your name. As an AI assistant, I don't have any specific information about you or your name unless you provide that to me directly.
In the above example we defined the filter_messages function ourselves. We also provide off-the-shelf ways to trim and filter messages in LangChain.