Skip to content

Commit 7bd0dce

Browse files
authored
A blocking version of sendCommand for Redis modules (#2321)
1 parent 4db9203 commit 7bd0dce

File tree

8 files changed

+87
-8
lines changed

8 files changed

+87
-8
lines changed

src/main/java/redis/clients/jedis/BinaryJedis.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@
4242

4343
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
4444
AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
45+
4546
protected Client client = null;
4647
protected Transaction transaction = null;
4748
protected Pipeline pipeline = null;
48-
private final byte[][] dummyArray = new byte[0][];
49+
protected static final byte[][] DUMMY_ARRAY = new byte[0][];
4950

5051
public BinaryJedis() {
5152
client = new Client();
@@ -4377,12 +4378,6 @@ public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername,
43774378
return client.getBinaryMultiBulkReply();
43784379
}
43794380

4380-
public Object sendCommand(ProtocolCommand cmd, byte[]... args) {
4381-
checkIsInMultiOrPipeline();
4382-
client.sendCommand(cmd, args);
4383-
return client.getOne();
4384-
}
4385-
43864381
@Override
43874382
public StreamInfo xinfoStream(byte[] key) {
43884383
checkIsInMultiOrPipeline();
@@ -4407,7 +4402,24 @@ public List<StreamConsumersInfo> xinfoConsumers (byte[] key, byte[] group) {
44074402
return BuilderFactory.STREAM_CONSUMERS_INFO_LIST.build(client.getBinaryMultiBulkReply());
44084403
}
44094404

4405+
public Object sendCommand(ProtocolCommand cmd, byte[]... args) {
4406+
checkIsInMultiOrPipeline();
4407+
client.sendCommand(cmd, args);
4408+
return client.getOne();
4409+
}
4410+
4411+
public Object sendBlockingCommand(ProtocolCommand cmd, byte[]... args) {
4412+
checkIsInMultiOrPipeline();
4413+
client.sendCommand(cmd, args);
4414+
client.setTimeoutInfinite();
4415+
try {
4416+
return client.getOne();
4417+
} finally {
4418+
client.rollbackTimeout();
4419+
}
4420+
}
4421+
44104422
public Object sendCommand(ProtocolCommand cmd) {
4411-
return sendCommand(cmd, dummyArray);
4423+
return sendCommand(cmd, DUMMY_ARRAY);
44124424
}
44134425
}

src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2382,4 +2382,13 @@ public Object execute(Jedis connection){
23822382
}
23832383
}.runBinary(sampleKey);
23842384
}
2385+
2386+
public Object sendBlockingCommand(final byte[] sampleKey, final ProtocolCommand cmd, final byte[]... args) {
2387+
return new JedisClusterCommand<Object>(connectionHandler, maxAttempts) {
2388+
@Override
2389+
public Object execute(Jedis connection){
2390+
return connection.sendBlockingCommand(cmd, args);
2391+
}
2392+
}.runBinary(sampleKey);
2393+
}
23852394
}

src/main/java/redis/clients/jedis/BinaryShardedJedis.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,6 +1134,13 @@ public Object sendCommand(ProtocolCommand cmd, byte[]... args) {
11341134
return j.sendCommand(cmd, args);
11351135
}
11361136

1137+
public Object sendBlockingCommand(ProtocolCommand cmd, byte[]... args) {
1138+
// default since no sample key provided in JedisCommands interface
1139+
byte[] sampleKey = args.length > 0 ? args[0] : cmd.getRaw();
1140+
Jedis j = getShard(sampleKey);
1141+
return j.sendBlockingCommand(cmd, args);
1142+
}
1143+
11371144
public Object sendCommand(ProtocolCommand cmd) {
11381145
return sendCommand(cmd, dummyArray);
11391146
}

src/main/java/redis/clients/jedis/Jedis.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4104,4 +4104,15 @@ public Object sendCommand(ProtocolCommand cmd, String... args) {
41044104
client.sendCommand(cmd, args);
41054105
return client.getOne();
41064106
}
4107+
4108+
public Object sendBlockingCommand(ProtocolCommand cmd, String... args) {
4109+
checkIsInMultiOrPipeline();
4110+
client.sendCommand(cmd, args);
4111+
client.setTimeoutInfinite();
4112+
try {
4113+
return client.getOne();
4114+
} finally {
4115+
client.rollbackTimeout();
4116+
}
4117+
}
41074118
}

src/main/java/redis/clients/jedis/JedisCluster.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2459,5 +2459,13 @@ public Object execute(Jedis connection){
24592459
}.run(sampleKey);
24602460
}
24612461

2462+
public Object sendBlockingCommand(final String sampleKey, final ProtocolCommand cmd, final String... args) {
2463+
return new JedisClusterCommand<Object>(connectionHandler, maxAttempts) {
2464+
@Override
2465+
public Object execute(Jedis connection){
2466+
return connection.sendBlockingCommand(cmd, args);
2467+
}
2468+
}.run(sampleKey);
2469+
}
24622470

24632471
}

src/main/java/redis/clients/jedis/ShardedJedis.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,4 +1142,11 @@ public Object sendCommand(ProtocolCommand cmd, String... args) {
11421142
Jedis j = getShard(sampleKey);
11431143
return j.sendCommand(cmd, args);
11441144
}
1145+
1146+
public Object sendBlockingCommand(ProtocolCommand cmd, String... args) {
1147+
// default since no sample key provided in JedisCommands interface
1148+
String sampleKey = args.length > 0 ? args[0] : cmd.toString();
1149+
Jedis j = getShard(sampleKey);
1150+
return j.sendBlockingCommand(cmd, args);
1151+
}
11451152
}

src/test/java/redis/clients/jedis/tests/commands/AllKindOfValuesCommandsTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static org.junit.Assert.assertNotNull;
88
import static org.junit.Assert.assertNull;
99
import static org.junit.Assert.fail;
10+
import static redis.clients.jedis.Protocol.Command.BLPOP;
1011
import static redis.clients.jedis.Protocol.Command.HGETALL;
1112
import static redis.clients.jedis.Protocol.Command.GET;
1213
import static redis.clients.jedis.Protocol.Command.LRANGE;
@@ -886,6 +887,15 @@ public void sendCommandTest(){
886887
assertEquals("PONG", SafeEncoder.encode((byte[]) jedis.sendCommand(PING)));
887888
}
888889

890+
@Test
891+
public void sendBlockingCommandTest(){
892+
assertNull(jedis.sendBlockingCommand(BLPOP, "foo", Long.toString(1L)));
893+
894+
jedis.sendCommand(RPUSH, "foo", "bar");
895+
assertEquals(Arrays.asList("foo", "bar"), SafeEncoder.encodeObject(jedis.sendBlockingCommand(BLPOP, "foo", Long.toString(1L))));
896+
897+
assertNull(jedis.sendBlockingCommand(BLPOP, "foo", Long.toString(1L)));
898+
}
889899

890900
@Test
891901
public void encodeCompleteResponse(){

src/test/java/redis/clients/jedis/tests/commands/BinaryValuesCommandsTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static org.junit.Assert.assertEquals;
55
import static org.junit.Assert.assertNull;
66
import static org.junit.Assert.assertTrue;
7+
import static redis.clients.jedis.Protocol.Command.BLPOP;
78
import static redis.clients.jedis.Protocol.Command.GET;
89
import static redis.clients.jedis.Protocol.Command.LRANGE;
910
import static redis.clients.jedis.Protocol.Command.RPUSH;
@@ -17,6 +18,7 @@
1718

1819
import org.junit.Before;
1920
import org.junit.Test;
21+
import redis.clients.jedis.Protocol;
2022

2123
import redis.clients.jedis.Protocol.Keyword;
2224
import redis.clients.jedis.exceptions.JedisDataException;
@@ -323,4 +325,17 @@ public void sendCommandTest(){
323325
for (int i = 0; i < 3; i++)
324326
assertArrayEquals(expected.get(i), list.get(i));
325327
}
328+
329+
@Test
330+
public void sendBlockingCommandTest() {
331+
assertNull(jedis.sendBlockingCommand(BLPOP, bfoo, Protocol.toByteArray(1L)));
332+
333+
jedis.sendCommand(RPUSH, bfoo, bbar);
334+
List<byte[]> blpop = (List<byte[]>) jedis.sendBlockingCommand(BLPOP, bfoo, Protocol.toByteArray(1L));
335+
assertEquals(2, blpop.size());
336+
assertArrayEquals(bfoo, blpop.get(0));
337+
assertArrayEquals(bbar, blpop.get(1));
338+
339+
assertNull(jedis.sendBlockingCommand(BLPOP, bfoo, Protocol.toByteArray(1L)));
340+
}
326341
}

0 commit comments

Comments
 (0)