Closed
Description
Simple reproduction steps:
Dependency versions from pip freeze
:
redis==6.2.0
redisvl==0.7.0
langgraph-checkpoint==2.0.26
langgraph-checkpoint-redis==0.0.6
Using Redis Stack 8.0.0 in cluster configuration.
Setup the AsyncRedisSaver
using a RedisCluster
object directly:
import asyncio
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from redis.asyncio.cluster import RedisCluster, ClusterNode
class SimpleState(TypedDict):
value: str
async def set_value_node(state: SimpleState):
return {"value": "Hello Redis Cluster"}
async def main():
redis_client = RedisCluster(startup_nodes=[ClusterNode(host="node1", port=6380), ClusterNode(host="node2", port=6380), ClusterNode(host="node3", port=6380)], ssl=True)
checkpointer = AsyncRedisSaver(redis_client=redis_client)
await checkpointer.asetup()
builder = StateGraph(SimpleState)
builder.add_node("setter", set_value_node)
builder.set_entry_point("setter")
builder.add_edge("setter", END)
graph = builder.compile(checkpointer=checkpointer)
thread_config = {"configurable": {"thread_id": "simple-thread-1"}}
initial_input = {"value": "initial"}
final_output = await graph.ainvoke(initial_input, config=thread_config)
print(f"Graph output: {final_output}")
saved_state = await graph.aget_state(thread_config)
print(f"Saved state: {saved_state.values if saved_state else 'Not found'}")
await redis_client.aclose()
if __name__ == "__main__":
asyncio.run(main())
Throws the following exception:
File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/__init__.py", line 2788, in ainvoke
async for chunk in self.astream(
File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/__init__.py", line 2596, in astream
async with AsyncPregelLoop(
^^^^^^^^^^^^^^^^
File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/loop.py", line 1393, in __aexit__
return await exit_task
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/contextlib.py", line 754, in __aexit__
raise exc_details[1]
File "/usr/local/lib/python3.12/contextlib.py", line 737, in __aexit__
cb_suppress = await cb(*exc_details)
^^^^^^^^^^^^^^^^^^^^^^
File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/executor.py", line 209, in __aexit__
raise exc
File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/loop.py", line 1260, in _checkpointer_put_after_previous
await prev
File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/loop.py", line 1262, in _checkpointer_put_after_previous
await cast(BaseCheckpointSaver, self.checkpointer).aput(
File "/home/www/.local/lib/python3.12/site-packages/langgraph/checkpoint/redis/aio.py", line 585, in aput
raise e
File "/home/www/.local/lib/python3.12/site-packages/langgraph/checkpoint/redis/aio.py", line 505, in aput
await pipeline.json().set(checkpoint_key, "$", checkpoint_data)
^^^^^^^^^^^^^^^
File "/home/www/.local/lib/python3.12/site-packages/redis/commands/redismodules.py", line 24, in json
jj = JSON(client=self, encoder=encoder, decoder=decoder)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/www/.local/lib/python3.12/site-packages/redis/commands/json/__init__.py", line 71, in __init__
if get_protocol_version(self.client) in ["3", 3]:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/www/.local/lib/python3.12/site-packages/redis/commands/helpers.py", line 118, in get_protocol_version
return client.nodes_manager.connection_kwargs.get("protocol")
^^^^^^^^^^^^^^^^^^^^
AttributeError: 'ClusterPipeline' object has no attribute 'nodes_manager'
From line https://github.com/redis-developer/langgraph-redis/blob/main/langgraph/checkpoint/redis/aio.py#L505
await pipeline.json().set(checkpoint_key, "$", checkpoint_data)
Metadata
Metadata
Assignees
Labels
No labels