Skip to content

Commit 182cb96

Browse files
denglimingsazzad16
andauthored
Add support JUSTID flag to XCLAIM command (#2428)
* Add support JUSTID flag to XCLAIM command * Use full JUSTID in method name. * Add new xclaim method without JUSTID but with XClaimParams. * Apply suggestions from code review Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>
1 parent 2f7b74e commit 182cb96

20 files changed

+396
-7
lines changed

src/main/java/redis/clients/jedis/BinaryClient.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import redis.clients.jedis.params.GetExParams;
3434
import redis.clients.jedis.params.MigrateParams;
3535
import redis.clients.jedis.params.SetParams;
36+
import redis.clients.jedis.params.XClaimParams;
3637
import redis.clients.jedis.params.ZAddParams;
3738
import redis.clients.jedis.params.ZIncrByParams;
3839
import redis.clients.jedis.params.LPosParams;
@@ -1706,7 +1707,7 @@ public void xpendingSummary(final byte[] key, final byte[] groupname) {
17061707
public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
17071708
long newIdleTime, int retries, boolean force, byte[][] ids) {
17081709

1709-
ArrayList<byte[]> arguments = new ArrayList<>(10 + ids.length);
1710+
List<byte[]> arguments = new ArrayList<>(10 + ids.length);
17101711

17111712
arguments.add(key);
17121713
arguments.add(groupname);
@@ -1729,6 +1730,37 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId
17291730
sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][]));
17301731
}
17311732

1733+
private void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
1734+
XClaimParams params, byte[][] ids, boolean justId) {
1735+
final byte[][] bparams = params.getByteParams();
1736+
final int paramLength = bparams.length;
1737+
final int idsLength = ids.length;
1738+
final byte[][] args = new byte[4 + paramLength + idsLength + (justId ? 1 : 0)][];
1739+
int index = 0;
1740+
args[index++] = key;
1741+
args[index++] = groupname;
1742+
args[index++] = consumername;
1743+
args[index++] = toByteArray(minIdleTime);
1744+
System.arraycopy(ids, 0, args, index, idsLength);
1745+
index += idsLength;
1746+
System.arraycopy(bparams, 0, args, index, paramLength);
1747+
index += paramLength;
1748+
if (justId) {
1749+
args[index++] = Keyword.JUSTID.getRaw();
1750+
}
1751+
sendCommand(XCLAIM, args);
1752+
}
1753+
1754+
public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
1755+
XClaimParams params, byte[]... ids) {
1756+
xclaim(key, groupname, consumername, minIdleTime, params, ids, false);
1757+
}
1758+
1759+
public void xclaimJustId(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
1760+
XClaimParams params, byte[]... ids) {
1761+
xclaim(key, groupname, consumername, minIdleTime, params, ids, true);
1762+
}
1763+
17321764
public void xinfoStream(byte[] key) {
17331765
sendCommand(XINFO, Keyword.STREAM.getRaw(), key);
17341766
}

src/main/java/redis/clients/jedis/BinaryJedis.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import redis.clients.jedis.params.GetExParams;
3737
import redis.clients.jedis.params.MigrateParams;
3838
import redis.clients.jedis.params.SetParams;
39+
import redis.clients.jedis.params.XClaimParams;
3940
import redis.clients.jedis.params.ZAddParams;
4041
import redis.clients.jedis.params.ZIncrByParams;
4142
import redis.clients.jedis.params.LPosParams;
@@ -4556,6 +4557,22 @@ public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, lo
45564557
return client.getBinaryMultiBulkReply();
45574558
}
45584559

4560+
@Override
4561+
public List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
4562+
XClaimParams params, byte[]... ids) {
4563+
checkIsInMultiOrPipeline();
4564+
client.xclaim(key, group, consumername, minIdleTime, params, ids);
4565+
return client.getBinaryMultiBulkReply();
4566+
}
4567+
4568+
@Override
4569+
public List<byte[]> xclaimJustId(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
4570+
XClaimParams params, byte[]... ids) {
4571+
checkIsInMultiOrPipeline();
4572+
client.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
4573+
return client.getBinaryMultiBulkReply();
4574+
}
4575+
45594576
@Override
45604577
public StreamInfo xinfoStream(byte[] key) {
45614578
checkIsInMultiOrPipeline();

src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import redis.clients.jedis.params.GeoRadiusStoreParam;
1010
import redis.clients.jedis.params.GetExParams;
1111
import redis.clients.jedis.params.SetParams;
12+
import redis.clients.jedis.params.XClaimParams;
1213
import redis.clients.jedis.params.ZAddParams;
1314
import redis.clients.jedis.params.ZIncrByParams;
1415
import redis.clients.jedis.params.LPosParams;
@@ -2515,6 +2516,28 @@ public List<byte[]> execute(Jedis connection) {
25152516
}.runBinary(key);
25162517
}
25172518

2519+
@Override
2520+
public List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
2521+
XClaimParams params, byte[]... ids) {
2522+
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
2523+
@Override
2524+
public List<byte[]> execute(Jedis connection) {
2525+
return connection.xclaim(key, group, consumername, minIdleTime, params, ids);
2526+
}
2527+
}.runBinary(key);
2528+
}
2529+
2530+
@Override
2531+
public List<byte[]> xclaimJustId(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
2532+
XClaimParams params, byte[]... ids) {
2533+
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
2534+
@Override
2535+
public List<byte[]> execute(Jedis connection) {
2536+
return connection.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
2537+
}
2538+
}.runBinary(key);
2539+
}
2540+
25182541
@Override
25192542
public Long waitReplicas(final byte[] key, final int replicas, final long timeout) {
25202543
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {

src/main/java/redis/clients/jedis/BinaryShardedJedis.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import redis.clients.jedis.params.GeoRadiusParam;
1515
import redis.clients.jedis.params.GetExParams;
1616
import redis.clients.jedis.params.SetParams;
17+
import redis.clients.jedis.params.XClaimParams;
1718
import redis.clients.jedis.params.ZAddParams;
1819
import redis.clients.jedis.params.ZIncrByParams;
1920
import redis.clients.jedis.params.LPosParams;
@@ -1176,6 +1177,20 @@ public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, lo
11761177
return j.xclaim(key, groupname, consumername, minIdleTime, newIdleTime, retries, force, ids);
11771178
}
11781179

1180+
@Override
1181+
public List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
1182+
XClaimParams params, byte[]... ids) {
1183+
Jedis j = getShard(key);
1184+
return j.xclaim(key, group, consumername, minIdleTime, params, ids);
1185+
}
1186+
1187+
@Override
1188+
public List<byte[]> xclaimJustId(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
1189+
XClaimParams params, byte[]... ids) {
1190+
Jedis j = getShard(key);
1191+
return j.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
1192+
}
1193+
11791194
@Override
11801195
public StreamInfo xinfoStream(byte[] key) {
11811196
Jedis j = getShard(key);

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,29 @@ public String toString() {
687687
}
688688
};
689689

690+
public static final Builder<List<StreamEntryID>> STREAM_ENTRY_ID_LIST = new Builder<List<StreamEntryID>>() {
691+
@Override
692+
@SuppressWarnings("unchecked")
693+
public List<StreamEntryID> build(Object data) {
694+
if (null == data) {
695+
return null;
696+
}
697+
List<Object> objectList = (List<Object>) data;
698+
List<StreamEntryID> responses = new ArrayList<>(objectList.size());
699+
if (!objectList.isEmpty()) {
700+
for(Object object : objectList) {
701+
responses.add(STREAM_ENTRY_ID.build(object));
702+
}
703+
}
704+
return responses;
705+
}
706+
707+
@Override
708+
public String toString() {
709+
return "List<StreamEntryID>";
710+
}
711+
};
712+
690713
public static final Builder<List<StreamEntry>> STREAM_ENTRY_LIST = new Builder<List<StreamEntry>>() {
691714
@Override
692715
@SuppressWarnings("unchecked")

src/main/java/redis/clients/jedis/Client.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import redis.clients.jedis.params.GetExParams;
2121
import redis.clients.jedis.params.MigrateParams;
2222
import redis.clients.jedis.params.SetParams;
23+
import redis.clients.jedis.params.XClaimParams;
2324
import redis.clients.jedis.params.ZAddParams;
2425
import redis.clients.jedis.params.ZIncrByParams;
2526
import redis.clients.jedis.params.LPosParams;
@@ -1441,14 +1442,26 @@ public void xpendingSummary(String key, String groupname) {
14411442
@Override
14421443
public void xclaim(String key, String group, String consumername, long minIdleTime,
14431444
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
1444-
1445-
final byte[][] bids = new byte[ids.length][];
1446-
for (int i = 0; i < ids.length; i++) {
1447-
bids[i] = SafeEncoder.encode(ids[i].toString());
1448-
}
1445+
final byte[][] bids = convertStreamEntryIDsToBinary(ids);
14491446
xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername), minIdleTime, newIdleTime, retries, force, bids);
14501447
}
14511448

1449+
@Override
1450+
public void xclaim(String key, String group, String consumername, long minIdleTime,
1451+
XClaimParams params, StreamEntryID... ids) {
1452+
final byte[][] bids = convertStreamEntryIDsToBinary(ids);
1453+
xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername),
1454+
minIdleTime, params, bids);
1455+
}
1456+
1457+
@Override
1458+
public void xclaimJustId(String key, String group, String consumername, long minIdleTime,
1459+
XClaimParams params, StreamEntryID... ids) {
1460+
final byte[][] bids = convertStreamEntryIDsToBinary(ids);
1461+
xclaimJustId(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername),
1462+
minIdleTime, params, bids);
1463+
}
1464+
14521465
@Override
14531466
public void xinfoStream(String key) {
14541467
xinfoStream(SafeEncoder.encode(key));
@@ -1464,4 +1477,11 @@ public void xinfoConsumers(String key, String group) {
14641477
xinfoConsumers(SafeEncoder.encode(key), SafeEncoder.encode(group));
14651478
}
14661479

1480+
private byte[][] convertStreamEntryIDsToBinary(StreamEntryID... ids) {
1481+
final byte[][] bids = new byte[ids.length][];
1482+
for (int i = 0; i < ids.length; i++) {
1483+
bids[i] = SafeEncoder.encode(ids[i].toString());
1484+
}
1485+
return bids;
1486+
}
14671487
}

src/main/java/redis/clients/jedis/Jedis.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import redis.clients.jedis.params.GetExParams;
2929
import redis.clients.jedis.params.MigrateParams;
3030
import redis.clients.jedis.params.SetParams;
31+
import redis.clients.jedis.params.XClaimParams;
3132
import redis.clients.jedis.params.ZAddParams;
3233
import redis.clients.jedis.params.ZIncrByParams;
3334
import redis.clients.jedis.params.LPosParams;
@@ -4177,6 +4178,24 @@ public List<StreamEntry> xclaim(String key, String group, String consumername, l
41774178
return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply());
41784179
}
41794180

4181+
@Override
4182+
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
4183+
XClaimParams params, StreamEntryID... ids) {
4184+
checkIsInMultiOrPipeline();
4185+
client.xclaim(key, group, consumername, minIdleTime, params, ids);
4186+
4187+
return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply());
4188+
}
4189+
4190+
@Override
4191+
public List<StreamEntryID> xclaimJustId(String key, String group, String consumername,
4192+
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
4193+
checkIsInMultiOrPipeline();
4194+
client.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
4195+
4196+
return BuilderFactory.STREAM_ENTRY_ID_LIST.build(client.getObjectMultiBulkReply());
4197+
}
4198+
41804199
@Override
41814200
public StreamInfo xinfoStream(String key) {
41824201
client.xinfoStream(key);

src/main/java/redis/clients/jedis/JedisCluster.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import redis.clients.jedis.params.GeoRadiusStoreParam;
77
import redis.clients.jedis.params.GetExParams;
88
import redis.clients.jedis.params.SetParams;
9+
import redis.clients.jedis.params.XClaimParams;
910
import redis.clients.jedis.params.ZAddParams;
1011
import redis.clients.jedis.params.ZIncrByParams;
1112
import redis.clients.jedis.params.LPosParams;
@@ -2607,6 +2608,28 @@ public List<StreamEntry> execute(Jedis connection) {
26072608
}.run(key);
26082609
}
26092610

2611+
@Override
2612+
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
2613+
XClaimParams params, StreamEntryID... ids) {
2614+
return new JedisClusterCommand<List<StreamEntry>>(connectionHandler, maxAttempts) {
2615+
@Override
2616+
public List<StreamEntry> execute(Jedis connection) {
2617+
return connection.xclaim(key, group, consumername, minIdleTime, params, ids);
2618+
}
2619+
}.run(key);
2620+
}
2621+
2622+
@Override
2623+
public List<StreamEntryID> xclaimJustId(String key, String group, String consumername,
2624+
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
2625+
return new JedisClusterCommand<List<StreamEntryID>>(connectionHandler, maxAttempts) {
2626+
@Override
2627+
public List<StreamEntryID> execute(Jedis connection) {
2628+
return connection.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
2629+
}
2630+
}.run(key);
2631+
}
2632+
26102633
public Long waitReplicas(final String key, final int replicas, final long timeout) {
26112634
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
26122635
@Override

src/main/java/redis/clients/jedis/PipelineBase.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import redis.clients.jedis.params.GeoRadiusParam;
1212
import redis.clients.jedis.params.GetExParams;
1313
import redis.clients.jedis.params.SetParams;
14+
import redis.clients.jedis.params.XClaimParams;
1415
import redis.clients.jedis.params.ZAddParams;
1516
import redis.clients.jedis.params.ZIncrByParams;
1617
import redis.clients.jedis.params.LPosParams;
@@ -2221,6 +2222,34 @@ public Response<List<byte[]>> xclaim(byte[] key, byte[] group, byte[] consumerna
22212222
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
22222223
}
22232224

2225+
@Override
2226+
public Response<List<StreamEntry>> xclaim(String key, String group, String consumername,
2227+
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
2228+
getClient(key).xclaim(key, group, consumername, minIdleTime, params, ids);
2229+
return getResponse(BuilderFactory.STREAM_ENTRY_LIST);
2230+
}
2231+
2232+
@Override
2233+
public Response<List<byte[]>> xclaim(byte[] key, byte[] group, byte[] consumername,
2234+
long minIdleTime, XClaimParams params, byte[]... ids) {
2235+
getClient(key).xclaim(key, group, consumername, minIdleTime, params, ids);
2236+
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
2237+
}
2238+
2239+
@Override
2240+
public Response<List<StreamEntryID>> xclaimJustId(String key, String group, String consumername,
2241+
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
2242+
getClient(key).xclaimJustId(key, group, consumername, minIdleTime, params, ids);
2243+
return getResponse(BuilderFactory.STREAM_ENTRY_ID_LIST);
2244+
}
2245+
2246+
@Override
2247+
public Response<List<byte[]>> xclaimJustId(byte[] key, byte[] group, byte[] consumername,
2248+
long minIdleTime, XClaimParams params, byte[]... ids) {
2249+
getClient(key).xclaimJustId(key, group, consumername, minIdleTime, params, ids);
2250+
return getResponse(BuilderFactory.BYTE_ARRAY_LIST);
2251+
}
2252+
22242253
public Response<Object> sendCommand(final String sampleKey, final ProtocolCommand cmd,
22252254
final String... args) {
22262255
getClient(sampleKey).sendCommand(cmd, args);

src/main/java/redis/clients/jedis/Protocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ public static enum Keyword {
282282
GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR, BLOCK,
283283
NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, ID, IDLE,
284284
TIME, RETRYCOUNT, FORCE, USAGE, SAMPLES, STREAM, GROUPS, CONSUMERS, HELP, FREQ, SETUSER,
285-
GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS, LOG, INCR, SAVE;
285+
GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS, LOG, INCR, SAVE, JUSTID;
286286

287287
/**
288288
* @deprecated This will be private in future. Use {@link #getRaw()}.

0 commit comments

Comments
 (0)