Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.AbstractPipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.UnifiedJedis;

Expand Down Expand Up @@ -160,7 +160,7 @@ public void mset(Map<String, float[]> embeddings, String modelName) {
return;
}

try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
try (AbstractPipeline pipeline = redisClient.pipelined()) {
for (Map.Entry<String, float[]> entry : embeddings.entrySet()) {
String key = generateKey(entry.getKey(), modelName);
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -192,7 +192,7 @@ public Map<String, float[]> mget(List<String> texts, String modelName) {
Map<String, float[]> results = new HashMap<>();
Map<String, Response<byte[]>> responses = new HashMap<>();

try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
try (AbstractPipeline pipeline = redisClient.pipelined()) {
for (String text : texts) {
String key = generateKey(text, modelName);
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -227,7 +227,7 @@ public Map<String, Boolean> mexists(List<String> texts, String modelName) {
Map<String, Boolean> results = new HashMap<>();
Map<String, Response<Boolean>> responses = new HashMap<>();

try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
try (AbstractPipeline pipeline = redisClient.pipelined()) {
for (String text : texts) {
String key = generateKey(text, modelName);
responses.put(text, pipeline.exists(key));
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/com/redis/vl/storage/BaseStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.function.Function;
import lombok.AccessLevel;
import lombok.Getter;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.AbstractPipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.UnifiedJedis;

Expand Down Expand Up @@ -325,7 +325,7 @@ public List<String> write(
// Pass 2: Write all valid objects in batches
List<String> addedKeys = new ArrayList<>();

try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
try (AbstractPipeline pipeline = redisClient.pipelined()) {
for (int i = 0; i < preparedObjects.size(); i++) {
KeyValuePair kvp = preparedObjects.get(i);
set(pipeline, kvp.key, kvp.value);
Expand Down Expand Up @@ -465,7 +465,7 @@ public List<Map<String, Object>> get(
// Use a pipeline to batch the retrieval
List<Response<Map<String, Object>>> responses = new ArrayList<>();

try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
try (AbstractPipeline pipeline = redisClient.pipelined()) {
for (String key : keys) {
Response<Map<String, Object>> response = getResponse(pipeline, key);
responses.add(response);
Expand Down Expand Up @@ -517,20 +517,29 @@ protected Map<String, Object> convertBytes(Map<String, Object> map) {
/**
* Set a key-value pair in Redis using a pipeline.
*
* <p>Python port: Uses AbstractPipeline to support both regular Pipeline and
* MultiClusterPipeline, matching Python's handling of both Pipeline and ClusterPipeline types
* (issue #365).
*
* @param pipeline The Redis pipeline to use
* @param key The Redis key
* @param obj The object to store
*/
protected abstract void set(Pipeline pipeline, String key, Map<String, Object> obj);
protected abstract void set(AbstractPipeline pipeline, String key, Map<String, Object> obj);

/**
* Get a response for retrieving a value from Redis using a pipeline.
*
* <p>Python port: Uses AbstractPipeline to support both regular Pipeline and
* MultiClusterPipeline, matching Python's handling of both Pipeline and ClusterPipeline types
* (issue #365).
*
* @param pipeline The Redis pipeline to use
* @param key The Redis key
* @return Response containing the retrieved object
*/
protected abstract Response<Map<String, Object>> getResponse(Pipeline pipeline, String key);
protected abstract Response<Map<String, Object>> getResponse(
AbstractPipeline pipeline, String key);

/** Helper class for key-value pairs used during preprocessing and validation. */
protected static class KeyValuePair {
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/com/redis/vl/storage/HashStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.AbstractPipeline;
import redis.clients.jedis.Response;

/**
Expand All @@ -29,7 +29,7 @@ public HashStorage(IndexSchema indexSchema) {
}

@Override
protected void set(Pipeline pipeline, String key, Map<String, Object> obj) {
protected void set(AbstractPipeline pipeline, String key, Map<String, Object> obj) {
Map<byte[], byte[]> binaryFields = new HashMap<>();
Map<String, String> stringFields = new HashMap<>();

Expand Down Expand Up @@ -75,7 +75,7 @@ protected void set(Pipeline pipeline, String key, Map<String, Object> obj) {

@Override
@SuppressWarnings("unchecked")
protected Response<Map<String, Object>> getResponse(Pipeline pipeline, String key) {
protected Response<Map<String, Object>> getResponse(AbstractPipeline pipeline, String key) {
// For hash, we use hgetAll to get all fields
Response<Map<String, String>> response = pipeline.hgetAll(key);
// We need to return Response<Map<String, Object>> so cast it
Expand All @@ -101,7 +101,7 @@ public List<Map<String, Object>> get(
List<Response<Map<String, String>>> stringResponses = new ArrayList<>();
Map<String, List<Response<byte[]>>> vectorResponses = new HashMap<>();

try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
try (AbstractPipeline pipeline = redisClient.pipelined()) {
// Get all string fields and identify vector fields
for (String key : keys) {
Response<Map<String, String>> response = pipeline.hgetAll(key);
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/com/redis/vl/storage/JsonStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.AbstractPipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.json.Path2;

Expand All @@ -28,7 +28,7 @@ public JsonStorage(IndexSchema indexSchema) {
}

@Override
protected void set(Pipeline pipeline, String key, Map<String, Object> obj) {
protected void set(AbstractPipeline pipeline, String key, Map<String, Object> obj) {
// For JSON storage, vectors are stored as JSON arrays
Map<String, Object> jsonDocument = new HashMap<>();

Expand Down Expand Up @@ -83,7 +83,7 @@ protected void set(Pipeline pipeline, String key, Map<String, Object> obj) {

@Override
@SuppressWarnings("unchecked")
protected Response<Map<String, Object>> getResponse(Pipeline pipeline, String key) {
protected Response<Map<String, Object>> getResponse(AbstractPipeline pipeline, String key) {
// For JSON, we get the entire document
Response<Object> response = pipeline.jsonGet(key);
// We need to return Response<Map<String, Object>> so cast it
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.redis.vl.storage;

import static org.assertj.core.api.Assertions.assertThat;

import com.redis.vl.BaseIntegrationTest;
import com.redis.vl.schema.IndexSchema;
import java.util.*;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.AbstractPipeline;

/**
* Integration tests for AbstractPipeline compatibility in storage classes.
*
* <p>Tests the fix for issue #365: Ensures BaseStorage uses AbstractPipeline instead of Pipeline to
* support both regular Pipeline and MultiClusterPipeline objects, matching Python's graceful
* handling of both Pipeline and ClusterPipeline types.
*
* <p>Python port: Matches Python's isinstance() checks for (AsyncPipeline, AsyncClusterPipeline) by
* using the common base class AbstractPipeline in Java.
*/
@DisplayName("AbstractPipeline Compatibility Tests")
class AbstractPipelineIntegrationTest extends BaseIntegrationTest {

@Test
@DisplayName("Should accept AbstractPipeline in HashStorage.set()")
void testHashStorageAcceptsAbstractPipeline() {
// Create a simple schema
Map<String, Object> schemaDict =
Map.of(
"index",
Map.of("name", "test-index", "prefix", "test", "storage_type", "hash"),
"fields",
List.of(Map.of("name", "field1", "type", "text")));

IndexSchema schema = IndexSchema.fromDict(schemaDict);
HashStorage storage = new HashStorage(schema);

// Verify the set() method signature accepts AbstractPipeline
Map<String, Object> testData = Map.of("field1", "value1");

// Use the unifiedJedis from BaseIntegrationTest (Testcontainers)
try (AbstractPipeline pipeline = unifiedJedis.pipelined()) {
// This should compile without ClassCastException
storage.set(pipeline, "test:key", testData);
pipeline.sync();
}

assertThat(true).as("Method signature accepts AbstractPipeline").isTrue();
}

@Test
@DisplayName("Should accept AbstractPipeline in JsonStorage.set()")
void testJsonStorageAcceptsAbstractPipeline() {
// Create a simple schema
Map<String, Object> schemaDict =
Map.of(
"index",
Map.of("name", "test-index", "prefix", "test", "storage_type", "json"),
"fields",
List.of(Map.of("name", "field1", "type", "text", "path", "$.field1")));

IndexSchema schema = IndexSchema.fromDict(schemaDict);
JsonStorage storage = new JsonStorage(schema);

// Verify the set() method signature accepts AbstractPipeline
Map<String, Object> testData = Map.of("field1", "value1");

// Use the unifiedJedis from BaseIntegrationTest (Testcontainers)
try (AbstractPipeline pipeline = unifiedJedis.pipelined()) {
// This should compile without ClassCastException
storage.set(pipeline, "test:key", testData);
pipeline.sync();
}

assertThat(true).as("Method signature accepts AbstractPipeline").isTrue();
}

@Test
@DisplayName("Should accept AbstractPipeline in HashStorage.getResponse()")
void testHashStorageGetResponseAcceptsAbstractPipeline() {
// Create a simple schema
Map<String, Object> schemaDict =
Map.of(
"index",
Map.of("name", "test-index", "prefix", "test", "storage_type", "hash"),
"fields",
List.of(Map.of("name", "field1", "type", "text")));

IndexSchema schema = IndexSchema.fromDict(schemaDict);
HashStorage storage = new HashStorage(schema);

// Use the unifiedJedis from BaseIntegrationTest (Testcontainers)
try (AbstractPipeline pipeline = unifiedJedis.pipelined()) {
// This should compile without ClassCastException
storage.getResponse(pipeline, "test:key");
pipeline.sync();
}

assertThat(true).as("Method signature accepts AbstractPipeline").isTrue();
}

@Test
@DisplayName("Should accept AbstractPipeline in JsonStorage.getResponse()")
void testJsonStorageGetResponseAcceptsAbstractPipeline() {
// Create a simple schema
Map<String, Object> schemaDict =
Map.of(
"index",
Map.of("name", "test-index", "prefix", "test", "storage_type", "json"),
"fields",
List.of(Map.of("name", "field1", "type", "text", "path", "$.field1")));

IndexSchema schema = IndexSchema.fromDict(schemaDict);
JsonStorage storage = new JsonStorage(schema);

// Use the unifiedJedis from BaseIntegrationTest (Testcontainers)
try (AbstractPipeline pipeline = unifiedJedis.pipelined()) {
// This should compile without ClassCastException
storage.getResponse(pipeline, "test:key");
pipeline.sync();
}

assertThat(true).as("Method signature accepts AbstractPipeline").isTrue();
}
}