Skip to content

Upgrade dependencies and improve resource handling #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,26 @@ on:
branches:
- main

workflow_dispatch:

env:
POETRY_VERSION: "1.8.3"

jobs:
test:
name: Python ${{ matrix.python-version }} - ${{ matrix.connection }} [redis-stack ${{matrix.redis-stack-version}}]
name: Python ${{ matrix.python-version }} - [redis ${{ matrix.redis-version }}]
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
python-version: [3.9, '3.10', 3.11, 3.12]
connection: ['hiredis', 'plain']
redis-stack-version: ['6.2.6-v9', 'latest', 'edge']

services:
redis:
image: redis/redis-stack-server:${{matrix.redis-stack-version}}
ports:
- 6379:6379
python-version: [3.9, '3.10', 3.11, 3.12, 3.13]
redis-version: ['6.2.6-v9', 'latest', '8.0-M03']

steps:
- uses: actions/checkout@v2
- name: Check out repository
uses: actions/checkout@v3

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
Expand All @@ -45,15 +42,22 @@ jobs:
run: |
poetry install --all-extras

- name: Install hiredis if needed
if: matrix.connection == 'hiredis'
- name: Set Redis image name
run: |
poetry add hiredis

- name: Set Redis version
if [[ "${{ matrix.redis-version }}" == "8.0-M03" ]]; then
echo "REDIS_IMAGE=redis:${{ matrix.redis-version }}" >> $GITHUB_ENV
else
echo "REDIS_IMAGE=redis/redis-stack-server:${{ matrix.redis-version }}" >> $GITHUB_ENV
fi

- name: Run API tests
if: matrix.redis-version == 'latest'
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
echo "REDIS_VERSION=${{ matrix.redis-stack-version }}" >> $GITHUB_ENV
make test-all

- name: Run tests
if: matrix.redis-version != 'latest'
run: |
make test
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: install format lint test clean redis-start redis-stop check-types check
.PHONY: install format lint test test-all clean redis-start redis-stop check-types check

install:
poetry install --all-extras
Expand All @@ -21,6 +21,9 @@ lint: format check-types
test:
poetry run test-verbose

test-all:
poetry run test-verbose --run-api-tests

check: lint test

clean:
Expand Down
18 changes: 9 additions & 9 deletions langgraph/checkpoint/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@ def configure_client(
) -> None:
"""Configure the Redis client."""
self._owns_its_client = redis_client is None

self._redis = redis_client or RedisConnectionFactory.get_redis_connection(
redis_url, **connection_args
)

def create_indexes(self) -> None:
self.checkpoints_index = SearchIndex.from_dict(self.SCHEMAS[0])
self.checkpoint_blobs_index = SearchIndex.from_dict(self.SCHEMAS[1])
self.checkpoint_writes_index = SearchIndex.from_dict(self.SCHEMAS[2])

# Connect Redis client to indices
self.checkpoints_index.set_client(self._redis)
self.checkpoint_blobs_index.set_client(self._redis)
self.checkpoint_writes_index.set_client(self._redis)
self.checkpoints_index = SearchIndex.from_dict(
self.SCHEMAS[0], redis_client=self._redis
)
self.checkpoint_blobs_index = SearchIndex.from_dict(
self.SCHEMAS[1], redis_client=self._redis
)
self.checkpoint_writes_index = SearchIndex.from_dict(
self.SCHEMAS[2], redis_client=self._redis
)

def list(
self,
Expand Down
17 changes: 9 additions & 8 deletions langgraph/checkpoint/redis/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,15 @@ def configure_client(

def create_indexes(self) -> None:
"""Create indexes without connecting to Redis."""
self.checkpoints_index = AsyncSearchIndex.from_dict(self.SCHEMAS[0])
self.checkpoint_blobs_index = AsyncSearchIndex.from_dict(self.SCHEMAS[1])
self.checkpoint_writes_index = AsyncSearchIndex.from_dict(self.SCHEMAS[2])
self.checkpoints_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[0], redis_client=self._redis
)
self.checkpoint_blobs_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[1], redis_client=self._redis
)
self.checkpoint_writes_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[2], redis_client=self._redis
)

async def __aenter__(self) -> AsyncRedisSaver:
"""Async context manager enter."""
Expand All @@ -116,11 +122,6 @@ async def __aexit__(

async def asetup(self) -> None:
"""Initialize Redis indexes asynchronously."""
# Connect Redis client to indices asynchronously
await self.checkpoints_index.set_client(self._redis)
await self.checkpoint_blobs_index.set_client(self._redis)
await self.checkpoint_writes_index.set_client(self._redis)

# Create indexes in Redis asynchronously
await self.checkpoints_index.create(overwrite=False)
await self.checkpoint_blobs_index.create(overwrite=False)
Expand Down
17 changes: 9 additions & 8 deletions langgraph/checkpoint/redis/ashallow.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ async def from_conn_string(

async def asetup(self) -> None:
"""Initialize Redis indexes asynchronously."""
# Connect Redis client to indices asynchronously
await self.checkpoints_index.set_client(self._redis)
await self.checkpoint_blobs_index.set_client(self._redis)
await self.checkpoint_writes_index.set_client(self._redis)

# Create indexes in Redis asynchronously
await self.checkpoints_index.create(overwrite=False)
await self.checkpoint_blobs_index.create(overwrite=False)
Expand Down Expand Up @@ -557,9 +552,15 @@ def configure_client(

def create_indexes(self) -> None:
"""Create indexes without connecting to Redis."""
self.checkpoints_index = AsyncSearchIndex.from_dict(self.SCHEMAS[0])
self.checkpoint_blobs_index = AsyncSearchIndex.from_dict(self.SCHEMAS[1])
self.checkpoint_writes_index = AsyncSearchIndex.from_dict(self.SCHEMAS[2])
self.checkpoints_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[0], redis_client=self._redis
)
self.checkpoint_blobs_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[1], redis_client=self._redis
)
self.checkpoint_writes_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[2], redis_client=self._redis
)

def setup(self) -> None:
"""Initialize the checkpoint_index in Redis."""
Expand Down
17 changes: 9 additions & 8 deletions langgraph/checkpoint/redis/shallow.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,15 @@ def configure_client(
)

def create_indexes(self) -> None:
self.checkpoints_index = SearchIndex.from_dict(self.SCHEMAS[0])
self.checkpoint_blobs_index = SearchIndex.from_dict(self.SCHEMAS[1])
self.checkpoint_writes_index = SearchIndex.from_dict(self.SCHEMAS[2])

# Connect Redis client to indices
self.checkpoints_index.set_client(self._redis)
self.checkpoint_blobs_index.set_client(self._redis)
self.checkpoint_writes_index.set_client(self._redis)
self.checkpoints_index = SearchIndex.from_dict(
self.SCHEMAS[0], redis_client=self._redis
)
self.checkpoint_blobs_index = SearchIndex.from_dict(
self.SCHEMAS[1], redis_client=self._redis
)
self.checkpoint_writes_index = SearchIndex.from_dict(
self.SCHEMAS[2], redis_client=self._redis
)

def put_writes(
self,
Expand Down
2 changes: 1 addition & 1 deletion langgraph/store/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from contextlib import contextmanager
from datetime import datetime, timezone
from typing import Any, Iterable, Iterator, Optional, Sequence, cast
from ulid import ULID

from langgraph.store.base import (
BaseStore,
Expand All @@ -26,6 +25,7 @@
from redisvl.query import FilterQuery, VectorQuery
from redisvl.redis.connection import RedisConnectionFactory
from redisvl.utils.token_escaper import TokenEscaper
from ulid import ULID

from langgraph.store.redis.aio import AsyncRedisStore
from langgraph.store.redis.base import (
Expand Down
39 changes: 24 additions & 15 deletions langgraph/store/redis/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from datetime import datetime, timezone
from types import TracebackType
from typing import Any, AsyncIterator, Iterable, Optional, Sequence, cast
from ulid import ULID

from langgraph.store.base import (
BaseStore,
Expand All @@ -29,6 +28,7 @@
from redisvl.query import FilterQuery, VectorQuery
from redisvl.redis.connection import RedisConnectionFactory
from redisvl.utils.token_escaper import TokenEscaper
from ulid import ULID

from langgraph.store.redis.base import (
REDIS_KEY_SEPARATOR,
Expand Down Expand Up @@ -57,14 +57,15 @@ class AsyncRedisStore(

store_index: AsyncSearchIndex
vector_index: AsyncSearchIndex
_owns_client: bool
_owns_its_client: bool

def __init__(
self,
redis_url: Optional[str] = None,
*,
redis_client: Optional[AsyncRedis] = None,
index: Optional[IndexConfig] = None,
connection_args: Optional[dict[str, Any]] = None,
) -> None:
"""Initialize store with Redis connection and optional index config."""
if redis_url is None and redis_client is None:
Expand Down Expand Up @@ -94,10 +95,16 @@ def __init__(
]

# Configure client
self.configure_client(redis_url=redis_url, redis_client=redis_client)
self.configure_client(
redis_url=redis_url,
redis_client=redis_client,
connection_args=connection_args or {},
)

# Create store index
self.store_index = AsyncSearchIndex.from_dict(self.SCHEMAS[0])
self.store_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[0], redis_client=self._redis
)

# Configure vector index if needed
if self.index_config:
Expand Down Expand Up @@ -131,7 +138,9 @@ def __init__(
vector_field["attrs"].update(self.index_config["ann_index_config"])

try:
self.vector_index = AsyncSearchIndex.from_dict(vector_schema)
self.vector_index = AsyncSearchIndex.from_dict(
vector_schema, redis_client=self._redis
)
except Exception as e:
raise ValueError(
f"Failed to create vector index with schema: {vector_schema}. Error: {str(e)}"
Expand All @@ -145,11 +154,12 @@ def configure_client(
self,
redis_url: Optional[str] = None,
redis_client: Optional[AsyncRedis] = None,
connection_args: Optional[dict[str, Any]] = None,
) -> None:
"""Configure the Redis client."""
self._owns_client = redis_client is None
self._owns_its_client = redis_client is None
self._redis = redis_client or RedisConnectionFactory.get_async_redis_connection(
redis_url
redis_url, **connection_args
)

async def setup(self) -> None:
Expand All @@ -160,11 +170,6 @@ async def setup(self) -> None:
self.index_config.get("embed"),
)

# Now connect Redis client to indices
await self.store_index.set_client(self._redis)
if self.index_config:
await self.vector_index.set_client(self._redis)

# Create indices in Redis
await self.store_index.create(overwrite=False)
if self.index_config:
Expand All @@ -188,9 +193,13 @@ async def from_conn_string(

def create_indexes(self) -> None:
"""Create async indices."""
self.store_index = AsyncSearchIndex.from_dict(self.SCHEMAS[0])
self.store_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[0], redis_client=self._redis
)
if self.index_config:
self.vector_index = AsyncSearchIndex.from_dict(self.SCHEMAS[1])
self.vector_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[1], redis_client=self._redis
)

async def __aenter__(self) -> AsyncRedisStore:
"""Async context manager enter."""
Expand All @@ -210,7 +219,7 @@ async def __aexit__(
except asyncio.CancelledError:
pass

if self._owns_client:
if self._owns_its_client:
await self._redis.aclose() # type: ignore[attr-defined]
await self._redis.connection_pool.disconnect()

Expand Down
10 changes: 6 additions & 4 deletions langgraph/store/redis/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ def __init__(
]

# Initialize search indices
self.store_index = SearchIndex.from_dict(self.SCHEMAS[0])
self.store_index.set_client(self._redis)
self.store_index = SearchIndex.from_dict(
self.SCHEMAS[0], redis_client=self._redis
)

# Configure vector index if needed
if self.index_config:
Expand Down Expand Up @@ -156,8 +157,9 @@ def __init__(
if "ann_index_config" in self.index_config:
vector_field["attrs"].update(self.index_config["ann_index_config"])

self.vector_index = SearchIndex.from_dict(vector_schema)
self.vector_index.set_client(self._redis)
self.vector_index = SearchIndex.from_dict(
vector_schema, redis_client=self._redis
)

def _get_batch_GET_ops_queries(
self,
Expand Down
Loading