Skip to content

Commit 2e4300f

Browse files
authored
Add support for 8.2 stream commands (#3374)
* Add support for 8.2 stream commands - Add support for XDELEX and XACKDEL - Extend XADD and XTRIM to support trimming policy * Add @SInCE tags * Update NodeSelection API * Add StreamEntryDeletionResult.UNKNOWN to avoid throwing exceptions * Clean up RedisCommandBuilder - Remove redundant code blocks - Use CommandKeyword for IDS - Add unit tests * Fix broken test
1 parent 371beb0 commit 2e4300f

20 files changed

+1286
-2
lines changed

src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.lettuce.core.models.stream.ClaimedMessages;
3838
import io.lettuce.core.models.stream.PendingMessage;
3939
import io.lettuce.core.models.stream.PendingMessages;
40+
import io.lettuce.core.models.stream.StreamEntryDeletionResult;
4041
import io.lettuce.core.output.CommandOutput;
4142
import io.lettuce.core.output.KeyStreamingChannel;
4243
import io.lettuce.core.output.KeyValueStreamingChannel;
@@ -2708,6 +2709,17 @@ public RedisFuture<Long> xack(K key, K group, String... messageIds) {
27082709
return dispatch(commandBuilder.xack(key, group, messageIds));
27092710
}
27102711

2712+
@Override
2713+
public RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, String... messageIds) {
2714+
return dispatch(commandBuilder.xackdel(key, group, messageIds));
2715+
}
2716+
2717+
@Override
2718+
public RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, StreamDeletionPolicy policy,
2719+
String... messageIds) {
2720+
return dispatch(commandBuilder.xackdel(key, group, policy, messageIds));
2721+
}
2722+
27112723
@Override
27122724
public RedisFuture<String> xadd(K key, Map<K, V> body) {
27132725
return dispatch(commandBuilder.xadd(key, null, body));
@@ -2748,6 +2760,16 @@ public RedisFuture<Long> xdel(K key, String... messageIds) {
27482760
return dispatch(commandBuilder.xdel(key, messageIds));
27492761
}
27502762

2763+
@Override
2764+
public RedisFuture<List<StreamEntryDeletionResult>> xdelex(K key, String... messageIds) {
2765+
return dispatch(commandBuilder.xdelex(key, messageIds));
2766+
}
2767+
2768+
@Override
2769+
public RedisFuture<List<StreamEntryDeletionResult>> xdelex(K key, StreamDeletionPolicy policy, String... messageIds) {
2770+
return dispatch(commandBuilder.xdelex(key, policy, messageIds));
2771+
}
2772+
27512773
@Override
27522774
public RedisFuture<String> xgroupCreate(XReadArgs.StreamOffset<K> offset, K group) {
27532775
return dispatch(commandBuilder.xgroupCreate(offset, group, null));

src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.lettuce.core.models.stream.ClaimedMessages;
3838
import io.lettuce.core.models.stream.PendingMessage;
3939
import io.lettuce.core.models.stream.PendingMessages;
40+
import io.lettuce.core.models.stream.StreamEntryDeletionResult;
4041
import io.lettuce.core.output.CommandOutput;
4142
import io.lettuce.core.output.KeyStreamingChannel;
4243
import io.lettuce.core.output.KeyValueStreamingChannel;
@@ -2790,6 +2791,16 @@ public Mono<Long> xack(K key, K group, String... messageIds) {
27902791
return createMono(() -> commandBuilder.xack(key, group, messageIds));
27912792
}
27922793

2794+
@Override
2795+
public Flux<StreamEntryDeletionResult> xackdel(K key, K group, String... messageIds) {
2796+
return createDissolvingFlux(() -> commandBuilder.xackdel(key, group, messageIds));
2797+
}
2798+
2799+
@Override
2800+
public Flux<StreamEntryDeletionResult> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds) {
2801+
return createDissolvingFlux(() -> commandBuilder.xackdel(key, group, policy, messageIds));
2802+
}
2803+
27932804
@Override
27942805
public Mono<String> xadd(K key, Map<K, V> body) {
27952806
return createMono(() -> commandBuilder.xadd(key, null, body));
@@ -2831,6 +2842,16 @@ public Mono<Long> xdel(K key, String... messageIds) {
28312842
return createMono(() -> commandBuilder.xdel(key, messageIds));
28322843
}
28332844

2845+
@Override
2846+
public Flux<StreamEntryDeletionResult> xdelex(K key, String... messageIds) {
2847+
return createDissolvingFlux(() -> commandBuilder.xdelex(key, messageIds));
2848+
}
2849+
2850+
@Override
2851+
public Flux<StreamEntryDeletionResult> xdelex(K key, StreamDeletionPolicy policy, String... messageIds) {
2852+
return createDissolvingFlux(() -> commandBuilder.xdelex(key, policy, messageIds));
2853+
}
2854+
28342855
@Override
28352856
public Mono<String> xgroupCreate(XReadArgs.StreamOffset<K> streamOffset, K group) {
28362857
return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group, null));

src/main/java/io/lettuce/core/RedisCommandBuilder.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import io.lettuce.core.models.stream.ClaimedMessages;
2828
import io.lettuce.core.models.stream.PendingMessage;
2929
import io.lettuce.core.models.stream.PendingMessages;
30+
import io.lettuce.core.models.stream.StreamEntryDeletionResult;
3031
import io.lettuce.core.output.*;
32+
import io.lettuce.core.output.StreamEntryDeletionResultListOutput;
3133
import io.lettuce.core.protocol.BaseRedisCommandBuilder;
3234
import io.lettuce.core.protocol.Command;
3335
import io.lettuce.core.protocol.CommandArgs;
@@ -3159,6 +3161,32 @@ public Command<K, V, Long> xack(K key, K group, String[] messageIds) {
31593161
return createCommand(XACK, new IntegerOutput<>(codec), args);
31603162
}
31613163

3164+
public Command<K, V, List<StreamEntryDeletionResult>> xackdel(K key, K group, String[] messageIds) {
3165+
return xackdel(key, group, null, messageIds);
3166+
}
3167+
3168+
public Command<K, V, List<StreamEntryDeletionResult>> xackdel(K key, K group, StreamDeletionPolicy policy,
3169+
String[] messageIds) {
3170+
notNullKey(key);
3171+
LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL);
3172+
LettuceAssert.notEmpty(messageIds, "MessageIds " + MUST_NOT_BE_EMPTY);
3173+
LettuceAssert.noNullElements(messageIds, "MessageIds " + MUST_NOT_CONTAIN_NULL_ELEMENTS);
3174+
3175+
CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key).addKey(group);
3176+
3177+
if (policy != null) {
3178+
args.add(policy);
3179+
}
3180+
3181+
args.add(CommandKeyword.IDS).add(messageIds.length);
3182+
3183+
for (String messageId : messageIds) {
3184+
args.add(messageId);
3185+
}
3186+
3187+
return createCommand(XACKDEL, new StreamEntryDeletionResultListOutput<>(codec), args);
3188+
}
3189+
31623190
public Command<K, V, ClaimedMessages<K, V>> xautoclaim(K key, XAutoClaimArgs<K> xAutoClaimArgs) {
31633191
notNullKey(key);
31643192
LettuceAssert.notNull(xAutoClaimArgs, "XAutoClaimArgs " + MUST_NOT_BE_NULL);
@@ -3243,6 +3271,30 @@ public Command<K, V, Long> xdel(K key, String[] messageIds) {
32433271
return createCommand(XDEL, new IntegerOutput<>(codec), args);
32443272
}
32453273

3274+
public Command<K, V, List<StreamEntryDeletionResult>> xdelex(K key, String[] messageIds) {
3275+
return xdelex(key, null, messageIds);
3276+
}
3277+
3278+
public Command<K, V, List<StreamEntryDeletionResult>> xdelex(K key, StreamDeletionPolicy policy, String[] messageIds) {
3279+
notNullKey(key);
3280+
LettuceAssert.notEmpty(messageIds, "MessageIds " + MUST_NOT_BE_EMPTY);
3281+
LettuceAssert.noNullElements(messageIds, "MessageIds " + MUST_NOT_CONTAIN_NULL_ELEMENTS);
3282+
3283+
CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key);
3284+
3285+
if (policy != null) {
3286+
args.add(policy);
3287+
}
3288+
3289+
args.add(CommandKeyword.IDS).add(messageIds.length);
3290+
3291+
for (String messageId : messageIds) {
3292+
args.add(messageId);
3293+
}
3294+
3295+
return createCommand(XDELEX, new StreamEntryDeletionResultListOutput<>(codec), args);
3296+
}
3297+
32463298
public Command<K, V, String> xgroupCreate(StreamOffset<K> offset, K group, XGroupCreateArgs commandArgs) {
32473299
LettuceAssert.notNull(offset, "StreamOffset " + MUST_NOT_BE_NULL);
32483300
LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2025-Present, Redis Ltd. and Contributors
3+
* All rights reserved.
4+
*
5+
* Licensed under the MIT License.
6+
*/
7+
package io.lettuce.core;
8+
9+
import io.lettuce.core.protocol.ProtocolKeyword;
10+
11+
import java.nio.charset.StandardCharsets;
12+
13+
/**
14+
* Deletion policy for stream commands that handle consumer group references. Used with XDELEX, XACKDEL, and enhanced XADD/XTRIM
15+
* commands.
16+
*/
17+
public enum StreamDeletionPolicy implements ProtocolKeyword {
18+
19+
/**
20+
* Preserves existing references to entries in all consumer groups' PEL. This is the default behavior similar to XDEL.
21+
*/
22+
KEEP_REFERENCES("KEEPREF"),
23+
24+
/**
25+
* Removes all references to entries from all consumer groups' pending entry lists, effectively cleaning up all traces of
26+
* the messages.
27+
*/
28+
DELETE_REFERENCES("DELREF"),
29+
30+
/**
31+
* Only operates on entries that were read and acknowledged by all consumer groups.
32+
*/
33+
ACKNOWLEDGED("ACKED");
34+
35+
public final byte[] bytes;
36+
37+
StreamDeletionPolicy(String redisParamName) {
38+
bytes = redisParamName.getBytes(StandardCharsets.US_ASCII);
39+
}
40+
41+
@Override
42+
public byte[] getBytes() {
43+
return bytes;
44+
}
45+
46+
}

src/main/java/io/lettuce/core/XAddArgs.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class XAddArgs implements CompositeArgument {
4949

5050
private Long limit;
5151

52+
private StreamDeletionPolicy trimmingMode;
53+
5254
/**
5355
* Builder entry points for {@link XAddArgs}.
5456
*/
@@ -155,6 +157,18 @@ public XAddArgs limit(long limit) {
155157
return this;
156158
}
157159

160+
/**
161+
* When trimming, defines desired behaviour for handling consumer group references. See {@link StreamDeletionPolicy} for
162+
* details.
163+
*
164+
* @param trimmingMode the deletion policy to apply during trimming.
165+
* @return {@code this}
166+
*/
167+
public XAddArgs trimmingMode(StreamDeletionPolicy trimmingMode) {
168+
this.trimmingMode = trimmingMode;
169+
return this;
170+
}
171+
158172
/**
159173
* Apply efficient trimming for capped streams using the {@code ~} flag.
160174
*
@@ -253,6 +267,10 @@ public <K, V> void build(CommandArgs<K, V> args) {
253267
args.add(CommandKeyword.LIMIT).add(limit);
254268
}
255269

270+
if (trimmingMode != null) {
271+
args.add(trimmingMode);
272+
}
273+
256274
if (nomkstream) {
257275
args.add(CommandKeyword.NOMKSTREAM);
258276
}

src/main/java/io/lettuce/core/XTrimArgs.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class XTrimArgs implements CompositeArgument {
4444

4545
private Long limit;
4646

47+
private StreamDeletionPolicy trimmingMode;
48+
4749
/**
4850
* Builder entry points for {@link XTrimArgs}.
4951
*/
@@ -164,6 +166,18 @@ public XTrimArgs exactTrimming(boolean exactTrimming) {
164166
return this;
165167
}
166168

169+
/**
170+
* Defines desired behaviour for handling consumer group references during trimming. See {@link StreamDeletionPolicy} for
171+
* details.
172+
*
173+
* @param trimmingMode the deletion policy to apply during trimming.
174+
* @return {@code this}
175+
*/
176+
public XTrimArgs trimmingMode(StreamDeletionPolicy trimmingMode) {
177+
this.trimmingMode = trimmingMode;
178+
return this;
179+
}
180+
167181
@Override
168182
public <K, V> void build(CommandArgs<K, V> args) {
169183

@@ -193,6 +207,10 @@ public <K, V> void build(CommandArgs<K, V> args) {
193207
if (limit != null && approximateTrimming) {
194208
args.add(CommandKeyword.LIMIT).add(limit);
195209
}
210+
211+
if (trimmingMode != null) {
212+
args.add(trimmingMode);
213+
}
196214
}
197215

198216
}

src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.lettuce.core.models.stream.ClaimedMessages;
2828
import io.lettuce.core.models.stream.PendingMessage;
2929
import io.lettuce.core.models.stream.PendingMessages;
30+
import io.lettuce.core.models.stream.StreamEntryDeletionResult;
3031

3132
/**
3233
* Asynchronous executed commands for Streams.
@@ -49,6 +50,32 @@ public interface RedisStreamAsyncCommands<K, V> {
4950
*/
5051
RedisFuture<Long> xack(K key, K group, String... messageIds);
5152

53+
/**
54+
* Acknowledge and delete one or more messages from the stream and consumer group. Returns detailed results for each message
55+
* ID indicating whether it was deleted, not found, or not deleted due to pending references.
56+
*
57+
* @param key the stream key.
58+
* @param group name of the consumer group.
59+
* @param messageIds message Id's to acknowledge and delete.
60+
* @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID.
61+
* @since 6.8
62+
*/
63+
RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, String... messageIds);
64+
65+
/**
66+
* Acknowledge and delete one or more messages from the stream and consumer group with a specific deletion policy. Returns
67+
* detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to pending
68+
* references.
69+
*
70+
* @param key the stream key.
71+
* @param group name of the consumer group.
72+
* @param policy the deletion policy to apply.
73+
* @param messageIds message Id's to acknowledge and delete.
74+
* @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID.
75+
* @since 6.8
76+
*/
77+
RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds);
78+
5279
/**
5380
* Append a message to the stream {@code key}.
5481
*
@@ -132,6 +159,30 @@ public interface RedisStreamAsyncCommands<K, V> {
132159
*/
133160
RedisFuture<Long> xdel(K key, String... messageIds);
134161

162+
/**
163+
* Extended delete operation that removes the specified entries from the stream and returns detailed results for each
164+
* message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment status.
165+
*
166+
* @param key the stream key.
167+
* @param messageIds stream message Id's.
168+
* @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID.
169+
* @since 6.8
170+
*/
171+
RedisFuture<List<StreamEntryDeletionResult>> xdelex(K key, String... messageIds);
172+
173+
/**
174+
* Extended delete operation that removes the specified entries from the stream with a specific deletion policy and returns
175+
* detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment
176+
* status.
177+
*
178+
* @param key the stream key.
179+
* @param policy the deletion policy to apply.
180+
* @param messageIds stream message Id's.
181+
* @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID.
182+
* @since 6.8
183+
*/
184+
RedisFuture<List<StreamEntryDeletionResult>> xdelex(K key, StreamDeletionPolicy policy, String... messageIds);
185+
135186
/**
136187
* Create a consumer group.
137188
*

0 commit comments

Comments
 (0)