Skip to content

Commit 8b45e8e

Browse files
author
dengliming
authored
Add support IDLE arg to XPENDING command (#2470)
* Add support IDLE arg to XPENDING command * review
1 parent 1db3f04 commit 8b45e8e

18 files changed

+231
-2
lines changed

src/main/java/redis/clients/jedis/BinaryClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1848,6 +1848,10 @@ public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int
18481848
}
18491849
}
18501850

1851+
public void xpending(byte[] key, byte[] groupname, XPendingParams params) {
1852+
sendCommand(XPENDING, joinParameters(key, groupname, params.getByteParams()));
1853+
}
1854+
18511855
public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
18521856
long newIdleTime, int retries, boolean force, byte[][] ids) {
18531857

src/main/java/redis/clients/jedis/BinaryJedis.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4758,6 +4758,13 @@ public Object xpending(final byte[] key, final byte[] groupname) {
47584758
return client.getOne();
47594759
}
47604760

4761+
@Override
4762+
public List<Object> xpending(final byte[] key, final byte[] groupname, final XPendingParams params) {
4763+
checkIsInMultiOrPipeline();
4764+
client.xpending(key, groupname, params);
4765+
return client.getObjectMultiBulkReply();
4766+
}
4767+
47614768
@Override
47624769
public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
47634770
long newIdleTime, int retries, boolean force, byte[]... ids) {

src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2679,6 +2679,16 @@ public Object execute(Jedis connection) {
26792679
}.runBinary(key);
26802680
}
26812681

2682+
@Override
2683+
public List<Object> xpending(final byte[] key, final byte[] groupname, final XPendingParams params) {
2684+
return new JedisClusterCommand<List<Object>>(connectionHandler, maxAttempts) {
2685+
@Override
2686+
public List<Object> execute(Jedis connection) {
2687+
return connection.xpending(key, groupname, params);
2688+
}
2689+
}.runBinary(key);
2690+
}
2691+
26822692
@Override
26832693
public List<byte[]> xclaim(final byte[] key, final byte[] groupname, final byte[] consumername,
26842694
final long minIdleTime, final long newIdleTime, final int retries, final boolean force,

src/main/java/redis/clients/jedis/BinaryShardedJedis.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import redis.clients.jedis.params.SetParams;
1717
import redis.clients.jedis.params.XAddParams;
1818
import redis.clients.jedis.params.XClaimParams;
19+
import redis.clients.jedis.params.XPendingParams;
1920
import redis.clients.jedis.params.XTrimParams;
2021
import redis.clients.jedis.params.ZAddParams;
2122
import redis.clients.jedis.params.ZIncrByParams;
@@ -1232,6 +1233,12 @@ public Object xpending(final byte[] key, final byte[] groupname) {
12321233
return j.xpending(key, groupname);
12331234
}
12341235

1236+
@Override
1237+
public List<Object> xpending(final byte[] key, final byte[] groupname, final XPendingParams params) {
1238+
Jedis j = getShard(key);
1239+
return j.xpending(key, groupname, params);
1240+
}
1241+
12351242
@Override
12361243
public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
12371244
long newIdleTime, int retries, boolean force, byte[]... ids) {

src/main/java/redis/clients/jedis/Client.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,6 +1561,11 @@ public void xpending(String key, String groupname, StreamEntryID start, StreamEn
15611561
SafeEncoder.encode(end==null ? "+" : end.toString()), count, consumername == null? null : SafeEncoder.encode(consumername));
15621562
}
15631563

1564+
@Override
1565+
public void xpending(String key, String groupname, XPendingParams params) {
1566+
xpending(SafeEncoder.encode(key), SafeEncoder.encode(groupname), params);
1567+
}
1568+
15641569
@Override
15651570
public void xclaim(String key, String group, String consumername, long minIdleTime,
15661571
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {

src/main/java/redis/clients/jedis/Jedis.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4323,6 +4323,13 @@ public List<StreamPendingEntry> xpending(final String key, final String groupnam
43234323
return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(client.getObjectMultiBulkReply());
43244324
}
43254325

4326+
@Override
4327+
public List<StreamPendingEntry> xpending(final String key, final String groupname, final XPendingParams params) {
4328+
checkIsInMultiOrPipeline();
4329+
client.xpending(key, groupname, params);
4330+
return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(client.getObjectMultiBulkReply());
4331+
}
4332+
43264333
@Override
43274334
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
43284335
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {

src/main/java/redis/clients/jedis/JedisCluster.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2732,6 +2732,16 @@ public List<StreamPendingEntry> execute(Jedis connection) {
27322732
}.run(key);
27332733
}
27342734

2735+
@Override
2736+
public List<StreamPendingEntry> xpending(final String key, final String groupname, final XPendingParams params) {
2737+
return new JedisClusterCommand<List<StreamPendingEntry>>(connectionHandler, maxAttempts) {
2738+
@Override
2739+
public List<StreamPendingEntry> execute(Jedis connection) {
2740+
return connection.xpending(key, groupname, params);
2741+
}
2742+
}.run(key);
2743+
}
2744+
27352745
@Override
27362746
public Long xdel(final String key, final StreamEntryID... ids) {
27372747
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {

src/main/java/redis/clients/jedis/PipelineBase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import redis.clients.jedis.params.SetParams;
1414
import redis.clients.jedis.params.XAddParams;
1515
import redis.clients.jedis.params.XClaimParams;
16+
import redis.clients.jedis.params.XPendingParams;
1617
import redis.clients.jedis.params.XTrimParams;
1718
import redis.clients.jedis.params.ZAddParams;
1819
import redis.clients.jedis.params.ZIncrByParams;
@@ -2294,6 +2295,18 @@ public Response<List<Object>> xpendingBinary(byte[] key, byte[] groupname, byte[
22942295
return getResponse(BuilderFactory.RAW_OBJECT_LIST);
22952296
}
22962297

2298+
@Override
2299+
public Response<List<Object>> xpending(byte[] key, byte[] groupname, XPendingParams params) {
2300+
getClient(key).xpending(key, groupname, params);
2301+
return getResponse(BuilderFactory.RAW_OBJECT_LIST);
2302+
}
2303+
2304+
@Override
2305+
public Response<List<StreamPendingEntry>> xpending(String key, String groupname, XPendingParams params) {
2306+
getClient(key).xpending(key, groupname, params);
2307+
return getResponse(BuilderFactory.STREAM_PENDING_ENTRY_LIST);
2308+
}
2309+
22972310
@Override
22982311
public Response<Long> xdel(String key, StreamEntryID... ids) {
22992312
getClient(key).xdel(key, ids);

src/main/java/redis/clients/jedis/ShardedJedis.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import redis.clients.jedis.params.SetParams;
1616
import redis.clients.jedis.params.XAddParams;
1717
import redis.clients.jedis.params.XClaimParams;
18+
import redis.clients.jedis.params.XPendingParams;
1819
import redis.clients.jedis.params.XTrimParams;
1920
import redis.clients.jedis.params.ZAddParams;
2021
import redis.clients.jedis.params.ZIncrByParams;
@@ -1228,6 +1229,12 @@ public List<StreamPendingEntry> xpending(String key, String groupname, StreamEnt
12281229
return j.xpending(key, groupname, start, end, count, consumername);
12291230
}
12301231

1232+
@Override
1233+
public List<StreamPendingEntry> xpending(String key, String groupname, XPendingParams params) {
1234+
Jedis j = getShard(key);
1235+
return j.xpending(key, groupname, params);
1236+
}
1237+
12311238
@Override
12321239
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
12331240
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {

src/main/java/redis/clients/jedis/commands/BinaryJedisClusterCommands.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import redis.clients.jedis.params.SetParams;
1515
import redis.clients.jedis.params.XAddParams;
1616
import redis.clients.jedis.params.XClaimParams;
17+
import redis.clients.jedis.params.XPendingParams;
1718
import redis.clients.jedis.params.XTrimParams;
1819
import redis.clients.jedis.params.ZAddParams;
1920
import redis.clients.jedis.params.ZIncrByParams;
@@ -425,6 +426,8 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(byte[] key, byte[] member, dou
425426

426427
List<Object> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername);
427428

429+
List<Object> xpending(byte[] key, byte[] groupname, XPendingParams params);
430+
428431
List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids);
429432

430433
List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,

0 commit comments

Comments
 (0)