Closed
Description
The aget_tuple
function in AsyncRedisSaver
is not returning the correct checkpoint_id
unless it is specified in the RunnableConfig
. If you call aget_tuple
with a runnable config that contains only thread_id
and checkpoint_ns
the returned checkpoint_id
will always be None. RedisSaver
does not have this problem, checkpoint_id
appears to always be set.
Replication
import asyncio
import uuid
from langgraph.checkpoint.base import empty_checkpoint
from langgraph.checkpoint.redis import RedisSaver
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
def sync_checkpointer():
runnable_config = {
"configurable": {"thread_id": str(uuid.uuid4()), "checkpoint_ns": ""}
}
with RedisSaver.from_conn_string("redis://localhost:6379") as checkpointer:
checkpointer.setup()
for run in range(3):
checkpointer.put(
{
"configurable": {
"thread_id": runnable_config["configurable"]["thread_id"],
"checkpoint_id": str(run),
"checkpoint_ns": runnable_config["configurable"][
"checkpoint_ns"
],
}
},
empty_checkpoint(),
{
"source": "loop",
"step": run,
"writes": {},
},
{},
)
get_tuple = checkpointer.get_tuple(runnable_config)
print(
f"Get tuple ({run}): {get_tuple.config['configurable']['checkpoint_id']}"
)
async def async_checkpointer():
runnable_config = {
"configurable": {"thread_id": str(uuid.uuid4()), "checkpoint_ns": ""}
}
async with AsyncRedisSaver.from_conn_string(
"redis://localhost:6379"
) as checkpointer:
await checkpointer.asetup()
for run in range(3):
await checkpointer.aput(
{
"configurable": {
"thread_id": runnable_config["configurable"]["thread_id"],
"checkpoint_id": str(run),
"checkpoint_ns": runnable_config["configurable"][
"checkpoint_ns"
],
}
},
empty_checkpoint(),
{
"source": "loop",
"step": run,
"writes": {},
},
{},
)
get_tuple = await checkpointer.aget_tuple(runnable_config)
print(
f"Get tuple ({run}): {get_tuple.config['configurable']['checkpoint_id']}"
)
if __name__ == "__main__":
print("Sync checkpointer:")
sync_checkpointer()
print("=======================================")
print("Async checkpointer:")
asyncio.run(async_checkpointer())
Replication Output
Sync checkpointer:
Get tuple (0): 0
Get tuple (1): 1
Get tuple (2): 2
=======================================
Async checkpointer:
Get tuple (0): None
Get tuple (1): None
Get tuple (2): None