Skip to content

AsyncRedisSaver fails to aput #55

Closed
@nkilleen-work

Description

@nkilleen-work

Simple reproduction steps:

Dependency versions from pip freeze:

redis==6.2.0
redisvl==0.7.0
langgraph-checkpoint==2.0.26
langgraph-checkpoint-redis==0.0.6

Using Redis Stack 8.0.0 in cluster configuration.

Setup the AsyncRedisSaver using a RedisCluster object directly:

import asyncio
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from redis.asyncio.cluster import RedisCluster, ClusterNode

class SimpleState(TypedDict):
    value: str

async def set_value_node(state: SimpleState):
    return {"value": "Hello Redis Cluster"}

async def main():
    redis_client = RedisCluster(startup_nodes=[ClusterNode(host="node1", port=6380), ClusterNode(host="node2", port=6380), ClusterNode(host="node3", port=6380)], ssl=True)

    checkpointer = AsyncRedisSaver(redis_client=redis_client)
    await checkpointer.asetup()

    builder = StateGraph(SimpleState)
    builder.add_node("setter", set_value_node)
    builder.set_entry_point("setter")
    builder.add_edge("setter", END)
    graph = builder.compile(checkpointer=checkpointer)

    thread_config = {"configurable": {"thread_id": "simple-thread-1"}}
    
    initial_input = {"value": "initial"}
    final_output = await graph.ainvoke(initial_input, config=thread_config)
    print(f"Graph output: {final_output}")

    saved_state = await graph.aget_state(thread_config)
    print(f"Saved state: {saved_state.values if saved_state else 'Not found'}")

    await redis_client.aclose()

if __name__ == "__main__":
    asyncio.run(main())

Throws the following exception:

  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/__init__.py", line 2788, in ainvoke
    async for chunk in self.astream(
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/__init__.py", line 2596, in astream
    async with AsyncPregelLoop(
                 ^^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/loop.py", line 1393, in __aexit__
    return await exit_task
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 754, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.12/contextlib.py", line 737, in __aexit__
    cb_suppress = await cb(*exc_details)
                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/executor.py", line 209, in __aexit__
    raise exc
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/loop.py", line 1260, in _checkpointer_put_after_previous
    await prev
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/loop.py", line 1262, in _checkpointer_put_after_previous
    await cast(BaseCheckpointSaver, self.checkpointer).aput(
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/checkpoint/redis/aio.py", line 585, in aput
    raise e
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/checkpoint/redis/aio.py", line 505, in aput
    await pipeline.json().set(checkpoint_key, "$", checkpoint_data)
          ^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/redis/commands/redismodules.py", line 24, in json
    jj = JSON(client=self, encoder=encoder, decoder=decoder)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/redis/commands/json/__init__.py", line 71, in __init__
    if get_protocol_version(self.client) in ["3", 3]:
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/redis/commands/helpers.py", line 118, in get_protocol_version
    return client.nodes_manager.connection_kwargs.get("protocol")
           ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'ClusterPipeline' object has no attribute 'nodes_manager'

From line https://github.com/redis-developer/langgraph-redis/blob/main/langgraph/checkpoint/redis/aio.py#L505

await pipeline.json().set(checkpoint_key, "$", checkpoint_data)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions