Skip to content

fix: AsyncRedisSaver aget_tuple returning None for checkpoint_id #65

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions langgraph/checkpoint/redis/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
"checkpoint_id": doc_checkpoint_id,
}
}

Expand All @@ -361,7 +361,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
)

pending_writes = await self._aload_pending_writes(
thread_id, checkpoint_ns, checkpoint_id or EMPTY_ID_SENTINEL
thread_id, checkpoint_ns, doc_checkpoint_id
)

return CheckpointTuple(
Expand Down
139 changes: 139 additions & 0 deletions tests/test_async_aget_tuple_checkpoint_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Test for AsyncRedisSaver aget_tuple checkpoint_id issue (GitHub issue #64)."""

import asyncio
import uuid
from typing import AsyncGenerator

import pytest
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import empty_checkpoint

from langgraph.checkpoint.redis.aio import AsyncRedisSaver


@pytest.fixture
async def saver(redis_url: str) -> AsyncGenerator[AsyncRedisSaver, None]:
"""Async saver fixture for this test."""
saver = AsyncRedisSaver(redis_url)
await saver.asetup()
yield saver


@pytest.mark.asyncio
async def test_aget_tuple_returns_correct_checkpoint_id(saver: AsyncRedisSaver):
"""Test that aget_tuple returns the correct checkpoint_id when not specified in config.

This test reproduces the issue described in GitHub issue #64 where AsyncRedisSaver
aget_tuple was returning None for checkpoint_id while the sync version worked correctly.
"""
# Create a unique thread ID
thread_id = str(uuid.uuid4())

# Config with only thread_id and checkpoint_ns (no checkpoint_id)
runnable_config: RunnableConfig = {
"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}
}

# Put several checkpoints
checkpoint_ids = []
for run in range(3):
checkpoint_id = str(run)
checkpoint_ids.append(checkpoint_id)

await saver.aput(
{
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_id,
"checkpoint_ns": "",
}
},
empty_checkpoint(),
{
"source": "loop",
"step": run,
"writes": {},
},
{},
)

# Get the tuple using the config without checkpoint_id
# This should return the latest checkpoint
get_tuple = await saver.aget_tuple(runnable_config)

# Verify the checkpoint_id is not None and matches the expected value
assert get_tuple is not None, f"Expected checkpoint tuple, got None for run {run}"

returned_checkpoint_id = get_tuple.config["configurable"]["checkpoint_id"]
assert returned_checkpoint_id is not None, (
f"Expected checkpoint_id to be set, got None for run {run}. "
f"This indicates the bug where aget_tuple returns None for checkpoint_id."
)

# Since we're getting the latest checkpoint each time, it should be the current checkpoint_id
assert returned_checkpoint_id == checkpoint_id, (
f"Expected checkpoint_id {checkpoint_id}, got {returned_checkpoint_id} for run {run}"
)


@pytest.mark.asyncio
async def test_aget_tuple_with_explicit_checkpoint_id(saver: AsyncRedisSaver):
"""Test that aget_tuple works correctly when checkpoint_id is explicitly provided."""
# Create a unique thread ID
thread_id = str(uuid.uuid4())

# Put several checkpoints
checkpoint_ids = []
for run in range(3):
checkpoint_id = str(run)
checkpoint_ids.append(checkpoint_id)

await saver.aput(
{
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_id,
"checkpoint_ns": "",
}
},
empty_checkpoint(),
{
"source": "loop",
"step": run,
"writes": {},
},
{},
)

# Test retrieving each checkpoint by explicit checkpoint_id
for checkpoint_id in checkpoint_ids:
config_with_id: RunnableConfig = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_id,
"checkpoint_ns": ""
}
}

get_tuple = await saver.aget_tuple(config_with_id)

assert get_tuple is not None, f"Expected checkpoint tuple, got None for checkpoint_id {checkpoint_id}"

returned_checkpoint_id = get_tuple.config["configurable"]["checkpoint_id"]
assert returned_checkpoint_id == checkpoint_id, (
f"Expected checkpoint_id {checkpoint_id}, got {returned_checkpoint_id}"
)


@pytest.mark.asyncio
async def test_aget_tuple_no_checkpoint_returns_none(saver: AsyncRedisSaver):
"""Test that aget_tuple returns None when no checkpoint exists for the thread."""
# Use a thread ID that doesn't exist
thread_id = str(uuid.uuid4())

runnable_config: RunnableConfig = {
"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}
}

get_tuple = await saver.aget_tuple(runnable_config)
assert get_tuple is None, "Expected None when no checkpoint exists for thread"