Skip to content

feat(store): add hash storage support for vector data #60

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions langgraph/store/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,17 +372,38 @@ def _batch_put_ops(

vector_docs: list[dict[str, Any]] = []
vector_keys: list[str] = []

# Check if we're using hash storage for vectors
vector_storage_type = "json" # default
if self.index_config:
index_dict = dict(self.index_config)
vector_storage_type = index_dict.get("vector_storage_type", "json")

for (ns, key, path, _), vector in zip(text_params, vectors):
vector_key: tuple[str, str] = (ns, key)
doc_id = doc_ids[vector_key]

# Prepare vector based on storage type
if vector_storage_type == "hash":
# For hash storage, convert vector to byte string
from redisvl.redis.utils import array_to_buffer

vector_list = (
vector.tolist() if hasattr(vector, "tolist") else vector
)
embedding_value = array_to_buffer(vector_list, "float32")
else:
# For JSON storage, keep as list
embedding_value = (
vector.tolist() if hasattr(vector, "tolist") else vector
)

vector_docs.append(
{
"prefix": ns,
"key": key,
"field_name": path,
"embedding": (
vector.tolist() if hasattr(vector, "tolist") else vector
),
"embedding": embedding_value,
"created_at": datetime.now(timezone.utc).timestamp(),
"updated_at": datetime.now(timezone.utc).timestamp(),
}
Expand Down
29 changes: 24 additions & 5 deletions langgraph/store/redis/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,22 @@

from __future__ import annotations

import copy
import logging
import threading
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from typing import Any, Generic, Iterable, Optional, Sequence, TypedDict, TypeVar, Union
from typing import (
Any,
Dict,
Generic,
Iterable,
Optional,
Sequence,
TypedDict,
TypeVar,
Union,
)

from langgraph.store.base import (
GetOp,
Expand Down Expand Up @@ -222,7 +233,15 @@ def __init__(

# Configure vector index if needed
if self.index_config:
vector_schema = self.SCHEMAS[1].copy()
# Get storage type from index config, default to "json" for backward compatibility
# Cast to dict to safely access potential extra fields
index_dict = dict(self.index_config)
vector_storage_type = index_dict.get("vector_storage_type", "json")

vector_schema: Dict[str, Any] = copy.deepcopy(self.SCHEMAS[1])
# Update storage type in schema
vector_schema["index"]["storage_type"] = vector_storage_type

vector_fields = vector_schema.get("fields", [])
vector_field = None
for f in vector_fields:
Expand All @@ -243,14 +262,14 @@ def __init__(
"l2": "L2",
}[
_ensure_string_or_literal(
self.index_config.get("distance_type", "cosine")
index_dict.get("distance_type", "cosine")
)
],
}

# Apply any additional vector type config
if "ann_index_config" in self.index_config:
vector_field["attrs"].update(self.index_config["ann_index_config"])
if "ann_index_config" in index_dict:
vector_field["attrs"].update(index_dict["ann_index_config"])

self.vector_index = SearchIndex.from_dict(
vector_schema, redis_client=self._redis
Expand Down
228 changes: 228 additions & 0 deletions tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,3 +642,231 @@ def mock_echo(self, message):
finally:
client.close()
client.connection_pool.disconnect()


def test_vector_storage_json(redis_url, fake_embeddings: CharacterEmbeddings) -> None:
"""Test JSON vector storage (default behavior)."""
# Test data
docs = [
("doc1", {"text": "hello world"}),
("doc2", {"text": "hello universe"}),
("doc3", {"text": "goodbye world"}),
]

index_config = {
"dims": fake_embeddings.dims,
"embed": fake_embeddings,
"distance_type": "cosine",
"fields": ["text"],
# vector_storage_type defaults to "json"
}

ttl_config = {"default_ttl": 2, "refresh_on_read": True}

with RedisStore.from_conn_string(
redis_url, index=index_config, ttl=ttl_config
) as store:
store.setup()

# Insert documents
for key, value in docs:
store.put(("test_json",), key, value)

# Test vector search functionality
results = store.search(("test_json",), query="hello")
assert len(results) >= 2, "Vector search failed for JSON storage"

# Verify both hello documents are found
doc_keys = [r.key for r in results]
assert "doc1" in doc_keys, "doc1 not found in JSON storage"
assert "doc2" in doc_keys, "doc2 not found in JSON storage"

# Test that scores are reasonable (should be > 0 for cosine similarity)
for result in results:
if result.key in ["doc1", "doc2"]:
assert (
result.score > 0
), f"Invalid score for JSON storage: {result.score}"

# Test retrieval by key still works
item = store.get(("test_json",), "doc1")
assert item is not None, "Get operation failed for JSON storage"
assert (
item.value["text"] == "hello world"
), "Retrieved wrong value for JSON storage"


def test_vector_storage_hash(redis_url, fake_embeddings: CharacterEmbeddings) -> None:
"""Test hash vector storage for improved memory efficiency."""
# Test data
docs = [
("doc1", {"text": "hello world"}),
("doc2", {"text": "hello universe"}),
("doc3", {"text": "goodbye world"}),
]

index_config = {
"dims": fake_embeddings.dims,
"embed": fake_embeddings,
"distance_type": "cosine",
"fields": ["text"],
"vector_storage_type": "hash", # Enable hash storage
}

ttl_config = {"default_ttl": 2, "refresh_on_read": True}

with RedisStore.from_conn_string(
redis_url, index=index_config, ttl=ttl_config
) as store:
store.setup()

# Insert documents
for key, value in docs:
store.put(("test_hash",), key, value)

# Test vector search functionality
results = store.search(("test_hash",), query="hello")
assert len(results) >= 2, "Vector search failed for hash storage"

# Verify both hello documents are found
doc_keys = [r.key for r in results]
assert "doc1" in doc_keys, "doc1 not found in hash storage"
assert "doc2" in doc_keys, "doc2 not found in hash storage"

# Test that scores are reasonable (should be > 0 for cosine similarity)
for result in results:
if result.key in ["doc1", "doc2"]:
assert (
result.score > 0
), f"Invalid score for hash storage: {result.score}"

# Test retrieval by key still works
item = store.get(("test_hash",), "doc1")
assert item is not None, "Get operation failed for hash storage"
assert (
item.value["text"] == "hello world"
), "Retrieved wrong value for hash storage"


def test_vector_search_hash(redis_url, fake_embeddings: CharacterEmbeddings) -> None:
"""Test vector search functionality with hash storage."""
index_config = {
"dims": fake_embeddings.dims,
"embed": fake_embeddings,
"distance_type": "cosine",
"fields": ["text"],
"vector_storage_type": "hash",
}

ttl_config = {"default_ttl": 2, "refresh_on_read": True}

with RedisStore.from_conn_string(
redis_url, index=index_config, ttl=ttl_config
) as store:
store.setup()

# Insert documents with text that can be embedded
docs = [
("doc1", {"text": "short text"}),
("doc2", {"text": "longer text document"}),
("doc3", {"text": "longest text document here"}),
]

for key, value in docs:
store.put(("test",), key, value)

# Search with query
results = store.search(("test",), query="longer text")
assert len(results) >= 2

# Doc2 and doc3 should be closer matches to "longer text"
doc_keys = [r.key for r in results]
assert "doc2" in doc_keys
assert "doc3" in doc_keys


def test_vector_search_with_filters_hash(
redis_url, fake_embeddings: CharacterEmbeddings
) -> None:
"""Test vector search with additional filters using hash storage."""
index_config = {
"dims": fake_embeddings.dims,
"embed": fake_embeddings,
"distance_type": "cosine",
"fields": ["text"],
"vector_storage_type": "hash",
}

ttl_config = {"default_ttl": 2, "refresh_on_read": True}

with RedisStore.from_conn_string(
redis_url, index=index_config, ttl=ttl_config
) as store:
store.setup()

# Insert test documents
docs = [
("doc1", {"text": "red apple", "color": "red", "score": 4.5}),
("doc2", {"text": "red car", "color": "red", "score": 3.0}),
("doc3", {"text": "green apple", "color": "green", "score": 4.0}),
("doc4", {"text": "blue car", "color": "blue", "score": 3.5}),
]

for key, value in docs:
store.put(("test",), key, value)

# Search for "apple" within red items
results = store.search(("test",), query="apple", filter={"color": "red"})
assert len(results) >= 1
# Doc1 should be the closest match for "apple" with color=red
assert results[0].key == "doc1"

# Search for "car" within red items
results = store.search(("test",), query="car", filter={"color": "red"})
assert len(results) >= 1
# Doc2 should be the closest match for "car" with color=red
assert results[0].key == "doc2"


def test_vector_update_with_score_verification_hash(
redis_url, fake_embeddings: CharacterEmbeddings
) -> None:
"""Test that updating items properly updates their embeddings with hash storage."""
index_config = {
"dims": fake_embeddings.dims,
"embed": fake_embeddings,
"distance_type": "cosine",
"fields": ["text"],
"vector_storage_type": "hash",
}

ttl_config = {"default_ttl": 2, "refresh_on_read": True}

with RedisStore.from_conn_string(
redis_url, index=index_config, ttl=ttl_config
) as store:
store.setup()

store.put(("test",), "doc1", {"text": "zany zebra xylophone"})
store.put(("test",), "doc2", {"text": "something about dogs"})

# Search for a term similar to doc1's content
results_initial = store.search(("test",), query="zany xylophone")
assert len(results_initial) >= 1
assert results_initial[0].key == "doc1"
initial_score = results_initial[0].score

# Update doc1 to be about dogs instead
store.put(("test",), "doc1", {"text": "new text about dogs"})

# The original query should now match doc1 less strongly
results_after = store.search(("test",), query="zany xylophone")
assert len(results_after) >= 1
after_score = next((r.score for r in results_after if r.key == "doc1"), None)
if after_score is not None:
assert after_score < initial_score

# A dog-related query should now match doc1 more strongly
results_new = store.search(("test",), query="dogs text")
doc1_score = next((r.score for r in results_new if r.key == "doc1"), None)
assert doc1_score is not None