Skip to content

Commit 34270c7

Browse files
author
dengliming
authored
Add support for blocking zpopmax, zpopmin commands (#2425)
* Add support for blocking zpopmax, zpopmin commands * Fix review * review
1 parent 99e9836 commit 34270c7

17 files changed

+248
-4
lines changed

src/main/java/redis/clients/jedis/BinaryClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,22 @@ public void blpop(final int timeout, final byte[]... keys) {
692692
blpop(args.toArray(new byte[args.size()][]));
693693
}
694694

695+
public void bzpopmax(final int timeout, final byte[]... keys) {
696+
final List<byte[]> args = new ArrayList<>();
697+
Collections.addAll(args, keys);
698+
699+
args.add(Protocol.toByteArray(timeout));
700+
sendCommand(BZPOPMAX, args.toArray(new byte[args.size()][]));
701+
}
702+
703+
public void bzpopmin(final int timeout, final byte[]... keys) {
704+
final List<byte[]> args = new ArrayList<>();
705+
Collections.addAll(args, keys);
706+
707+
args.add(Protocol.toByteArray(timeout));
708+
sendCommand(BZPOPMIN, args.toArray(new byte[args.size()][]));
709+
}
710+
695711
public void sort(final byte[] key, final SortingParams sortingParameters, final byte[] dstkey) {
696712
final List<byte[]> args = new ArrayList<>();
697713
args.add(key);

src/main/java/redis/clients/jedis/BinaryJedis.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2363,6 +2363,20 @@ public List<byte[]> blpop(final int timeout, final byte[]... keys) {
23632363
return blpop(getArgsAddTimeout(timeout, keys));
23642364
}
23652365

2366+
@Override
2367+
public KeyedTuple bzpopmax(final int timeout, final byte[]... keys) {
2368+
checkIsInMultiOrPipeline();
2369+
client.bzpopmax(timeout, keys);
2370+
return BuilderFactory.KEYED_TUPLE.build(client.getBinaryMultiBulkReply());
2371+
}
2372+
2373+
@Override
2374+
public KeyedTuple bzpopmin(final int timeout, final byte[]... keys) {
2375+
checkIsInMultiOrPipeline();
2376+
client.bzpopmin(timeout, keys);
2377+
return BuilderFactory.KEYED_TUPLE.build(client.getBinaryMultiBulkReply());
2378+
}
2379+
23662380
private byte[][] getArgsAddTimeout(int timeout, byte[][] keys) {
23672381
int size = keys.length;
23682382
final byte[][] args = new byte[size + 1][];

src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1666,6 +1666,26 @@ public List<byte[]> execute(Jedis connection) {
16661666
}.runBinary(keys.length, keys);
16671667
}
16681668

1669+
@Override
1670+
public KeyedTuple bzpopmax(int timeout, byte[]... keys) {
1671+
return new JedisClusterCommand<KeyedTuple>(connectionHandler, maxAttempts) {
1672+
@Override
1673+
public KeyedTuple execute(Jedis connection) {
1674+
return connection.bzpopmax(timeout, keys);
1675+
}
1676+
}.runBinary(keys.length, keys);
1677+
}
1678+
1679+
@Override
1680+
public KeyedTuple bzpopmin(int timeout, byte[]... keys) {
1681+
return new JedisClusterCommand<KeyedTuple>(connectionHandler, maxAttempts) {
1682+
@Override
1683+
public KeyedTuple execute(Jedis connection) {
1684+
return connection.bzpopmin(timeout, keys);
1685+
}
1686+
}.runBinary(keys.length, keys);
1687+
}
1688+
16691689
@Override
16701690
public List<byte[]> brpop(final int timeout, final byte[]... keys) {
16711691
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,24 @@ public String toString() {
326326

327327
};
328328

329+
public static final Builder<KeyedTuple> KEYED_TUPLE = new Builder<KeyedTuple>() {
330+
@Override
331+
@SuppressWarnings("unchecked")
332+
public KeyedTuple build(Object data) {
333+
List<byte[]> l = (List<byte[]>) data; // never null
334+
if (l.isEmpty()) {
335+
return null;
336+
}
337+
return new KeyedTuple(l.get(0), l.get(1), DOUBLE.build(l.get(2)));
338+
}
339+
340+
@Override
341+
public String toString() {
342+
return "KeyedTuple";
343+
}
344+
345+
};
346+
329347
public static final Builder<Object> EVAL_RESULT = new Builder<Object>() {
330348

331349
@Override

src/main/java/redis/clients/jedis/Client.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,14 @@ public void blpop(final int timeout, final String... keys) {
592592
blpop(args.toArray(new String[size]));
593593
}
594594

595+
public void bzpopmax(final int timeout, final String... keys) {
596+
bzpopmax(timeout, SafeEncoder.encodeMany(keys));
597+
}
598+
599+
public void bzpopmin(final int timeout, final String... keys) {
600+
bzpopmin(timeout, SafeEncoder.encodeMany(keys));
601+
}
602+
595603
@Override
596604
public void sort(final String key, final SortingParams sortingParameters, final String dstkey) {
597605
sort(SafeEncoder.encode(key), sortingParameters, SafeEncoder.encode(dstkey));

src/main/java/redis/clients/jedis/Jedis.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2024,6 +2024,20 @@ public List<String> blpop(final int timeout, final String... keys) {
20242024
return blpop(getArgsAddTimeout(timeout, keys));
20252025
}
20262026

2027+
@Override
2028+
public KeyedTuple bzpopmax(int timeout, String... keys) {
2029+
checkIsInMultiOrPipeline();
2030+
client.bzpopmax(timeout, keys);
2031+
return BuilderFactory.KEYED_TUPLE.build(client.getObjectMultiBulkReply());
2032+
}
2033+
2034+
@Override
2035+
public KeyedTuple bzpopmin(int timeout, String... keys) {
2036+
checkIsInMultiOrPipeline();
2037+
client.bzpopmin(timeout, keys);
2038+
return BuilderFactory.KEYED_TUPLE.build(client.getObjectMultiBulkReply());
2039+
}
2040+
20272041
private String[] getArgsAddTimeout(int timeout, String[] keys) {
20282042
final int keyCount = keys.length;
20292043
final String[] args = new String[keyCount + 1];

src/main/java/redis/clients/jedis/JedisCluster.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1756,6 +1756,26 @@ public List<String> execute(Jedis connection) {
17561756
}.run(keys.length, keys);
17571757
}
17581758

1759+
@Override
1760+
public KeyedTuple bzpopmax(int timeout, String... keys) {
1761+
return new JedisClusterCommand<KeyedTuple>(connectionHandler, maxAttempts) {
1762+
@Override
1763+
public KeyedTuple execute(Jedis connection) {
1764+
return connection.bzpopmax(timeout, keys);
1765+
}
1766+
}.run(keys.length, keys);
1767+
}
1768+
1769+
@Override
1770+
public KeyedTuple bzpopmin(int timeout, String... keys) {
1771+
return new JedisClusterCommand<KeyedTuple>(connectionHandler, maxAttempts) {
1772+
@Override
1773+
public KeyedTuple execute(Jedis connection) {
1774+
return connection.bzpopmin(timeout, keys);
1775+
}
1776+
}.run(keys.length, keys);
1777+
}
1778+
17591779
@Override
17601780
public List<String> mget(final String... keys) {
17611781
return new JedisClusterCommand<List<String>>(connectionHandler, maxAttempts) {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package redis.clients.jedis;
2+
3+
import redis.clients.jedis.util.SafeEncoder;
4+
5+
import java.util.Arrays;
6+
7+
public class KeyedTuple extends Tuple {
8+
private byte[] key;
9+
10+
public KeyedTuple(byte[] key, byte[] element, Double score) {
11+
super(element, score);
12+
this.key = key;
13+
}
14+
15+
public KeyedTuple(String key, String element, Double score) {
16+
super(element, score);
17+
this.key = SafeEncoder.encode(key);
18+
}
19+
20+
public String getKey() {
21+
if (null != key) {
22+
return SafeEncoder.encode(key);
23+
}
24+
return null;
25+
}
26+
27+
@Override
28+
public boolean equals(Object o) {
29+
if (this == o) return true;
30+
if (!(o instanceof KeyedTuple)) return false;
31+
if (!super.equals(o)) return false;
32+
33+
KeyedTuple that = (KeyedTuple) o;
34+
return Arrays.equals(key, that.key);
35+
}
36+
37+
@Override
38+
public int hashCode() {
39+
return 31 * (key != null ? Arrays.hashCode(key) : 0) + super.hashCode();
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return "KeyedTuple{" + "key=" + SafeEncoder.encode(key) + ", element='" + getElement() + "'"
45+
+ ", score=" + getScore() + "} ";
46+
}
47+
}

src/main/java/redis/clients/jedis/MultiKeyPipelineBase.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,30 @@ public Response<List<String>> blpop(int timeout, byte[]... keys) {
6969
return getResponse(BuilderFactory.STRING_LIST);
7070
}
7171

72+
@Override
73+
public Response<KeyedTuple> bzpopmax(int timeout, String... keys) {
74+
client.bzpopmax(timeout, keys);
75+
return getResponse(BuilderFactory.KEYED_TUPLE);
76+
}
77+
78+
@Override
79+
public Response<KeyedTuple> bzpopmin(int timeout, String... keys) {
80+
client.bzpopmin(timeout, keys);
81+
return getResponse(BuilderFactory.KEYED_TUPLE);
82+
}
83+
84+
@Override
85+
public Response<KeyedTuple> bzpopmax(int timeout, byte[]... keys) {
86+
client.bzpopmax(timeout, keys);
87+
return getResponse(BuilderFactory.KEYED_TUPLE);
88+
}
89+
90+
@Override
91+
public Response<KeyedTuple> bzpopmin(int timeout, byte[]... keys) {
92+
client.bzpopmin(timeout, keys);
93+
return getResponse(BuilderFactory.KEYED_TUPLE);
94+
}
95+
7296
@Override
7397
public Response<Long> del(String... keys) {
7498
client.del(keys);

src/main/java/redis/clients/jedis/Protocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ public static enum Command implements ProtocolCommand {
261261
READONLY, GEOADD, GEODIST, GEOHASH, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER,
262262
GEORADIUSBYMEMBER_RO, MODULE, BITFIELD, HSTRLEN, TOUCH, SWAPDB, MEMORY, XADD, XLEN, XDEL,
263263
XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM, ACL, XINFO,
264-
BITFIELD_RO, LPOS, SMISMEMBER, ZMSCORE;
264+
BITFIELD_RO, LPOS, SMISMEMBER, ZMSCORE, BZPOPMIN, BZPOPMAX;
265265

266266
private final byte[] raw;
267267

0 commit comments

Comments
 (0)