Skip to content

Commit 87bca26

Browse files
committed
feat: use AbstractPipeline for Redis Cluster compatibility
Port of Python RedisVL fix for issue #365. Ensures storage classes use AbstractPipeline instead of Pipeline to support both regular Pipeline and MultiClusterPipeline objects. Changes: - BaseStorage: Changed Pipeline to AbstractPipeline in write() and get() methods, removed unsafe casts to Pipeline - HashStorage: Updated set() and getResponse() signatures to accept AbstractPipeline, updated get() to use AbstractPipeline - JsonStorage: Updated set() and getResponse() signatures to accept AbstractPipeline - EmbeddingsCache: Updated mset(), mget(), and mexists() to use AbstractPipeline - Add AbstractPipelineTest with 4 tests verifying compatibility This ensures compatibility with both standalone Redis and Redis Cluster deployments, matching Python's graceful handling of both Pipeline and ClusterPipeline types via isinstance() checks. In Java/Jedis: - Pipeline extends PipelineBase (deprecated) - MultiClusterPipeline extends PipelineBase - Both extend AbstractPipeline as common base - UnifiedJedis.pipelined() returns AbstractPipeline (may be Pipeline or MultiClusterPipeline depending on provider) Without this fix, code would fail with ClassCastException when using MultiClusterPipeline in cluster deployments.
1 parent 3acf928 commit 87bca26

File tree

5 files changed

+151
-16
lines changed

5 files changed

+151
-16
lines changed

core/src/main/java/com/redis/vl/extensions/cache/EmbeddingsCache.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import java.util.List;
99
import java.util.Map;
1010
import java.util.Optional;
11-
import redis.clients.jedis.Pipeline;
11+
import redis.clients.jedis.AbstractPipeline;
1212
import redis.clients.jedis.Response;
1313
import redis.clients.jedis.UnifiedJedis;
1414

@@ -160,7 +160,7 @@ public void mset(Map<String, float[]> embeddings, String modelName) {
160160
return;
161161
}
162162

163-
try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
163+
try (AbstractPipeline pipeline = redisClient.pipelined()) {
164164
for (Map.Entry<String, float[]> entry : embeddings.entrySet()) {
165165
String key = generateKey(entry.getKey(), modelName);
166166
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
@@ -192,7 +192,7 @@ public Map<String, float[]> mget(List<String> texts, String modelName) {
192192
Map<String, float[]> results = new HashMap<>();
193193
Map<String, Response<byte[]>> responses = new HashMap<>();
194194

195-
try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
195+
try (AbstractPipeline pipeline = redisClient.pipelined()) {
196196
for (String text : texts) {
197197
String key = generateKey(text, modelName);
198198
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
@@ -227,7 +227,7 @@ public Map<String, Boolean> mexists(List<String> texts, String modelName) {
227227
Map<String, Boolean> results = new HashMap<>();
228228
Map<String, Response<Boolean>> responses = new HashMap<>();
229229

230-
try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
230+
try (AbstractPipeline pipeline = redisClient.pipelined()) {
231231
for (String text : texts) {
232232
String key = generateKey(text, modelName);
233233
responses.put(text, pipeline.exists(key));

core/src/main/java/com/redis/vl/storage/BaseStorage.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import java.util.function.Function;
88
import lombok.AccessLevel;
99
import lombok.Getter;
10-
import redis.clients.jedis.Pipeline;
10+
import redis.clients.jedis.AbstractPipeline;
1111
import redis.clients.jedis.Response;
1212
import redis.clients.jedis.UnifiedJedis;
1313

@@ -325,7 +325,7 @@ public List<String> write(
325325
// Pass 2: Write all valid objects in batches
326326
List<String> addedKeys = new ArrayList<>();
327327

328-
try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
328+
try (AbstractPipeline pipeline = redisClient.pipelined()) {
329329
for (int i = 0; i < preparedObjects.size(); i++) {
330330
KeyValuePair kvp = preparedObjects.get(i);
331331
set(pipeline, kvp.key, kvp.value);
@@ -465,7 +465,7 @@ public List<Map<String, Object>> get(
465465
// Use a pipeline to batch the retrieval
466466
List<Response<Map<String, Object>>> responses = new ArrayList<>();
467467

468-
try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
468+
try (AbstractPipeline pipeline = redisClient.pipelined()) {
469469
for (String key : keys) {
470470
Response<Map<String, Object>> response = getResponse(pipeline, key);
471471
responses.add(response);
@@ -517,20 +517,29 @@ protected Map<String, Object> convertBytes(Map<String, Object> map) {
517517
/**
518518
* Set a key-value pair in Redis using a pipeline.
519519
*
520+
* <p>Python port: Uses AbstractPipeline to support both regular Pipeline and
521+
* MultiClusterPipeline, matching Python's handling of both Pipeline and ClusterPipeline types
522+
* (issue #365).
523+
*
520524
* @param pipeline The Redis pipeline to use
521525
* @param key The Redis key
522526
* @param obj The object to store
523527
*/
524-
protected abstract void set(Pipeline pipeline, String key, Map<String, Object> obj);
528+
protected abstract void set(AbstractPipeline pipeline, String key, Map<String, Object> obj);
525529

526530
/**
527531
* Get a response for retrieving a value from Redis using a pipeline.
528532
*
533+
* <p>Python port: Uses AbstractPipeline to support both regular Pipeline and
534+
* MultiClusterPipeline, matching Python's handling of both Pipeline and ClusterPipeline types
535+
* (issue #365).
536+
*
529537
* @param pipeline The Redis pipeline to use
530538
* @param key The Redis key
531539
* @return Response containing the retrieved object
532540
*/
533-
protected abstract Response<Map<String, Object>> getResponse(Pipeline pipeline, String key);
541+
protected abstract Response<Map<String, Object>> getResponse(
542+
AbstractPipeline pipeline, String key);
534543

535544
/** Helper class for key-value pairs used during preprocessing and validation. */
536545
protected static class KeyValuePair {

core/src/main/java/com/redis/vl/storage/HashStorage.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import java.util.HashMap;
1010
import java.util.List;
1111
import java.util.Map;
12-
import redis.clients.jedis.Pipeline;
12+
import redis.clients.jedis.AbstractPipeline;
1313
import redis.clients.jedis.Response;
1414

1515
/**
@@ -29,7 +29,7 @@ public HashStorage(IndexSchema indexSchema) {
2929
}
3030

3131
@Override
32-
protected void set(Pipeline pipeline, String key, Map<String, Object> obj) {
32+
protected void set(AbstractPipeline pipeline, String key, Map<String, Object> obj) {
3333
Map<byte[], byte[]> binaryFields = new HashMap<>();
3434
Map<String, String> stringFields = new HashMap<>();
3535

@@ -75,7 +75,7 @@ protected void set(Pipeline pipeline, String key, Map<String, Object> obj) {
7575

7676
@Override
7777
@SuppressWarnings("unchecked")
78-
protected Response<Map<String, Object>> getResponse(Pipeline pipeline, String key) {
78+
protected Response<Map<String, Object>> getResponse(AbstractPipeline pipeline, String key) {
7979
// For hash, we use hgetAll to get all fields
8080
Response<Map<String, String>> response = pipeline.hgetAll(key);
8181
// We need to return Response<Map<String, Object>> so cast it
@@ -101,7 +101,7 @@ public List<Map<String, Object>> get(
101101
List<Response<Map<String, String>>> stringResponses = new ArrayList<>();
102102
Map<String, List<Response<byte[]>>> vectorResponses = new HashMap<>();
103103

104-
try (Pipeline pipeline = (Pipeline) redisClient.pipelined()) {
104+
try (AbstractPipeline pipeline = redisClient.pipelined()) {
105105
// Get all string fields and identify vector fields
106106
for (String key : keys) {
107107
Response<Map<String, String>> response = pipeline.hgetAll(key);

core/src/main/java/com/redis/vl/storage/JsonStorage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import java.util.HashMap;
88
import java.util.List;
99
import java.util.Map;
10-
import redis.clients.jedis.Pipeline;
10+
import redis.clients.jedis.AbstractPipeline;
1111
import redis.clients.jedis.Response;
1212
import redis.clients.jedis.json.Path2;
1313

@@ -28,7 +28,7 @@ public JsonStorage(IndexSchema indexSchema) {
2828
}
2929

3030
@Override
31-
protected void set(Pipeline pipeline, String key, Map<String, Object> obj) {
31+
protected void set(AbstractPipeline pipeline, String key, Map<String, Object> obj) {
3232
// For JSON storage, vectors are stored as JSON arrays
3333
Map<String, Object> jsonDocument = new HashMap<>();
3434

@@ -83,7 +83,7 @@ protected void set(Pipeline pipeline, String key, Map<String, Object> obj) {
8383

8484
@Override
8585
@SuppressWarnings("unchecked")
86-
protected Response<Map<String, Object>> getResponse(Pipeline pipeline, String key) {
86+
protected Response<Map<String, Object>> getResponse(AbstractPipeline pipeline, String key) {
8787
// For JSON, we get the entire document
8888
Response<Object> response = pipeline.jsonGet(key);
8989
// We need to return Response<Map<String, Object>> so cast it
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package com.redis.vl.storage;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.redis.vl.BaseIntegrationTest;
6+
import com.redis.vl.schema.IndexSchema;
7+
import java.util.*;
8+
import org.junit.jupiter.api.DisplayName;
9+
import org.junit.jupiter.api.Test;
10+
import redis.clients.jedis.AbstractPipeline;
11+
12+
/**
13+
* Integration tests for AbstractPipeline compatibility in storage classes.
14+
*
15+
* <p>Tests the fix for issue #365: Ensures BaseStorage uses AbstractPipeline instead of Pipeline to
16+
* support both regular Pipeline and MultiClusterPipeline objects, matching Python's graceful
17+
* handling of both Pipeline and ClusterPipeline types.
18+
*
19+
* <p>Python port: Matches Python's isinstance() checks for (AsyncPipeline, AsyncClusterPipeline) by
20+
* using the common base class AbstractPipeline in Java.
21+
*/
22+
@DisplayName("AbstractPipeline Compatibility Tests")
23+
class AbstractPipelineIntegrationTest extends BaseIntegrationTest {
24+
25+
@Test
26+
@DisplayName("Should accept AbstractPipeline in HashStorage.set()")
27+
void testHashStorageAcceptsAbstractPipeline() {
28+
// Create a simple schema
29+
Map<String, Object> schemaDict =
30+
Map.of(
31+
"index",
32+
Map.of("name", "test-index", "prefix", "test", "storage_type", "hash"),
33+
"fields",
34+
List.of(Map.of("name", "field1", "type", "text")));
35+
36+
IndexSchema schema = IndexSchema.fromDict(schemaDict);
37+
HashStorage storage = new HashStorage(schema);
38+
39+
// Verify the set() method signature accepts AbstractPipeline
40+
Map<String, Object> testData = Map.of("field1", "value1");
41+
42+
// Use the unifiedJedis from BaseIntegrationTest (Testcontainers)
43+
try (AbstractPipeline pipeline = unifiedJedis.pipelined()) {
44+
// This should compile without ClassCastException
45+
storage.set(pipeline, "test:key", testData);
46+
pipeline.sync();
47+
}
48+
49+
assertThat(true).as("Method signature accepts AbstractPipeline").isTrue();
50+
}
51+
52+
@Test
53+
@DisplayName("Should accept AbstractPipeline in JsonStorage.set()")
54+
void testJsonStorageAcceptsAbstractPipeline() {
55+
// Create a simple schema
56+
Map<String, Object> schemaDict =
57+
Map.of(
58+
"index",
59+
Map.of("name", "test-index", "prefix", "test", "storage_type", "json"),
60+
"fields",
61+
List.of(Map.of("name", "field1", "type", "text", "path", "$.field1")));
62+
63+
IndexSchema schema = IndexSchema.fromDict(schemaDict);
64+
JsonStorage storage = new JsonStorage(schema);
65+
66+
// Verify the set() method signature accepts AbstractPipeline
67+
Map<String, Object> testData = Map.of("field1", "value1");
68+
69+
// Use the unifiedJedis from BaseIntegrationTest (Testcontainers)
70+
try (AbstractPipeline pipeline = unifiedJedis.pipelined()) {
71+
// This should compile without ClassCastException
72+
storage.set(pipeline, "test:key", testData);
73+
pipeline.sync();
74+
}
75+
76+
assertThat(true).as("Method signature accepts AbstractPipeline").isTrue();
77+
}
78+
79+
@Test
80+
@DisplayName("Should accept AbstractPipeline in HashStorage.getResponse()")
81+
void testHashStorageGetResponseAcceptsAbstractPipeline() {
82+
// Create a simple schema
83+
Map<String, Object> schemaDict =
84+
Map.of(
85+
"index",
86+
Map.of("name", "test-index", "prefix", "test", "storage_type", "hash"),
87+
"fields",
88+
List.of(Map.of("name", "field1", "type", "text")));
89+
90+
IndexSchema schema = IndexSchema.fromDict(schemaDict);
91+
HashStorage storage = new HashStorage(schema);
92+
93+
// Use the unifiedJedis from BaseIntegrationTest (Testcontainers)
94+
try (AbstractPipeline pipeline = unifiedJedis.pipelined()) {
95+
// This should compile without ClassCastException
96+
storage.getResponse(pipeline, "test:key");
97+
pipeline.sync();
98+
}
99+
100+
assertThat(true).as("Method signature accepts AbstractPipeline").isTrue();
101+
}
102+
103+
@Test
104+
@DisplayName("Should accept AbstractPipeline in JsonStorage.getResponse()")
105+
void testJsonStorageGetResponseAcceptsAbstractPipeline() {
106+
// Create a simple schema
107+
Map<String, Object> schemaDict =
108+
Map.of(
109+
"index",
110+
Map.of("name", "test-index", "prefix", "test", "storage_type", "json"),
111+
"fields",
112+
List.of(Map.of("name", "field1", "type", "text", "path", "$.field1")));
113+
114+
IndexSchema schema = IndexSchema.fromDict(schemaDict);
115+
JsonStorage storage = new JsonStorage(schema);
116+
117+
// Use the unifiedJedis from BaseIntegrationTest (Testcontainers)
118+
try (AbstractPipeline pipeline = unifiedJedis.pipelined()) {
119+
// This should compile without ClassCastException
120+
storage.getResponse(pipeline, "test:key");
121+
pipeline.sync();
122+
}
123+
124+
assertThat(true).as("Method signature accepts AbstractPipeline").isTrue();
125+
}
126+
}

0 commit comments

Comments
 (0)