Skip to content

Commit 14c794a

Browse files
committed
XRead(Group) Params with allowing block=0
1 parent a07f707 commit 14c794a

File tree

15 files changed

+460
-87
lines changed

15 files changed

+460
-87
lines changed

src/main/java/redis/clients/jedis/BinaryClient.java

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import static redis.clients.jedis.Protocol.Keyword.WITHSCORES;
1515
import static redis.clients.jedis.Protocol.Keyword.FREQ;
1616
import static redis.clients.jedis.Protocol.Keyword.HELP;
17-
import static redis.clients.jedis.Protocol.Keyword.COUNT;
1817

1918
import java.util.ArrayList;
2019
import java.util.Collections;
@@ -27,13 +26,7 @@
2726
import javax.net.ssl.SSLSocketFactory;
2827

2928
import redis.clients.jedis.Protocol.Keyword;
30-
import redis.clients.jedis.params.ClientKillParams;
31-
import redis.clients.jedis.params.GeoRadiusParam;
32-
import redis.clients.jedis.params.MigrateParams;
33-
import redis.clients.jedis.params.SetParams;
34-
import redis.clients.jedis.params.ZAddParams;
35-
import redis.clients.jedis.params.ZIncrByParams;
36-
import redis.clients.jedis.params.LPosParams;
29+
import redis.clients.jedis.params.*;
3730
import redis.clients.jedis.util.SafeEncoder;
3831

3932
public class BinaryClient extends Connection {
@@ -1424,10 +1417,28 @@ public void xread(final int count, final long block, final Map<byte[], byte[]> s
14241417
params[streamsIndex++] = entry.getKey();
14251418
params[idsIndex++] = entry.getValue();
14261419
}
1427-
1420+
14281421
sendCommand(XREAD, params);
1429-
}
1430-
1422+
}
1423+
1424+
public void xread(final XReadParams params, final Entry<byte[], byte[]>... streams) {
1425+
final byte[][] bparams = params.getByteParams();
1426+
final int paramLength = bparams.length;
1427+
1428+
final byte[][] args = new byte[paramLength + 1 + streams.length * 2][];
1429+
System.arraycopy(bparams, 0, args, 0, paramLength);
1430+
1431+
args[paramLength] = Keyword.STREAMS.raw;
1432+
int keyIndex = paramLength + 1;
1433+
int idsIndex = keyIndex + streams.length;
1434+
for (final Entry<byte[], byte[]> entry : streams) {
1435+
args[keyIndex++] = entry.getKey();
1436+
args[idsIndex++] = entry.getValue();
1437+
}
1438+
1439+
sendCommand(XREAD, args);
1440+
}
1441+
14311442
public void xack(final byte[] key, final byte[] group, final byte[]... ids) {
14321443
final byte[][] params = new byte[2 + ids.length][];
14331444
int index = 0;
@@ -1519,7 +1530,29 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block,
15191530
sendCommand(XREADGROUP, params);
15201531
}
15211532

1522-
1533+
public void xreadGroup(byte[] groupname, byte[] consumer, final XReadGroupParams params, final Entry<byte[], byte[]>... streams) {
1534+
final byte[][] bparams = params.getByteParams();
1535+
final int paramLength = bparams.length;
1536+
1537+
final byte[][] args = new byte[3 + paramLength + 1 + streams.length * 2][];
1538+
int index = 0;
1539+
args[index++] = Keyword.GROUP.raw;
1540+
args[index++] = groupname;
1541+
args[index++] = consumer;
1542+
System.arraycopy(bparams, 0, args, index, paramLength);
1543+
index += paramLength;
1544+
1545+
args[index++] = Keyword.STREAMS.raw;
1546+
int keyIndex = index;
1547+
int idsIndex = keyIndex + streams.length;
1548+
for (final Entry<byte[], byte[]> entry : streams) {
1549+
args[keyIndex++] = entry.getKey();
1550+
args[idsIndex++] = entry.getValue();
1551+
}
1552+
1553+
sendCommand(XREADGROUP, args);
1554+
}
1555+
15231556
public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) {
15241557
if(consumername == null) {
15251558
sendCommand(XPENDING, key, groupname, start, end, toByteArray(count));

src/main/java/redis/clients/jedis/BinaryJedis.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.LinkedHashSet;
1515
import java.util.List;
1616
import java.util.Map;
17+
import java.util.Map.Entry;
1718
import java.util.Set;
1819

1920
import javax.net.ssl.HostnameVerifier;
@@ -29,13 +30,7 @@
2930
import redis.clients.jedis.exceptions.InvalidURIException;
3031
import redis.clients.jedis.exceptions.JedisDataException;
3132
import redis.clients.jedis.exceptions.JedisException;
32-
import redis.clients.jedis.params.ClientKillParams;
33-
import redis.clients.jedis.params.GeoRadiusParam;
34-
import redis.clients.jedis.params.MigrateParams;
35-
import redis.clients.jedis.params.SetParams;
36-
import redis.clients.jedis.params.ZAddParams;
37-
import redis.clients.jedis.params.ZIncrByParams;
38-
import redis.clients.jedis.params.LPosParams;
33+
import redis.clients.jedis.params.*;
3934
import redis.clients.jedis.util.JedisByteHashMap;
4035
import redis.clients.jedis.util.JedisURIHelper;
4136

@@ -4179,6 +4174,23 @@ public List<byte[]> xread(int count, long block, Map<byte[], byte[]> streams) {
41794174
}
41804175
}
41814176

4177+
@Override
4178+
public List<byte[]> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
4179+
checkIsInMultiOrPipeline();
4180+
client.xread(xReadParams, streams);
4181+
4182+
if (!xReadParams.hasBlock()) {
4183+
return client.getBinaryMultiBulkReply();
4184+
}
4185+
4186+
client.setTimeoutInfinite();
4187+
try {
4188+
return client.getBinaryMultiBulkReply();
4189+
} finally {
4190+
client.rollbackTimeout();
4191+
}
4192+
}
4193+
41824194
@Override
41834195
public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck,
41844196
Map<byte[], byte[]> streams) {
@@ -4192,6 +4204,24 @@ public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, int count, lon
41924204
}
41934205
}
41944206

4207+
@Override
4208+
public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, XReadGroupParams xReadGroupParams,
4209+
Entry<byte[], byte[]>... streams) {
4210+
checkIsInMultiOrPipeline();
4211+
client.xreadGroup(groupname, consumer, xReadGroupParams, streams);
4212+
4213+
if (!xReadGroupParams.hasBlock()) {
4214+
return client.getBinaryMultiBulkReply();
4215+
}
4216+
4217+
client.setTimeoutInfinite();
4218+
try {
4219+
return client.getBinaryMultiBulkReply();
4220+
} finally {
4221+
client.rollbackTimeout();
4222+
}
4223+
}
4224+
41954225
@Override
41964226
public byte[] xadd(byte[] key, byte[] id, Map<byte[], byte[]> hash, long maxLen, boolean approximateLength) {
41974227
checkIsInMultiOrPipeline();

src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,15 @@
44
import redis.clients.jedis.commands.JedisClusterBinaryScriptingCommands;
55
import redis.clients.jedis.commands.MultiKeyBinaryJedisClusterCommands;
66
import redis.clients.jedis.commands.ProtocolCommand;
7-
import redis.clients.jedis.params.GeoRadiusParam;
8-
import redis.clients.jedis.params.SetParams;
9-
import redis.clients.jedis.params.ZAddParams;
10-
import redis.clients.jedis.params.ZIncrByParams;
11-
import redis.clients.jedis.params.LPosParams;
7+
import redis.clients.jedis.params.*;
128
import redis.clients.jedis.util.JedisClusterHashTagUtil;
139
import redis.clients.jedis.util.KeyMergeUtil;
1410
import redis.clients.jedis.util.SafeEncoder;
1511

1612
import java.io.Closeable;
1713
import java.util.List;
1814
import java.util.Map;
15+
import java.util.Map.Entry;
1916
import java.util.Set;
2017
import javax.net.ssl.HostnameVerifier;
2118
import javax.net.ssl.SSLParameters;
@@ -2198,6 +2195,16 @@ public List<byte[]> execute(Jedis connection) {
21982195
}.runBinary(keys.length, keys);
21992196
}
22002197

2198+
@Override
2199+
public List<byte[]> xread(final XReadParams xReadParams, final Entry<byte[], byte[]>... streams) {
2200+
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
2201+
@Override
2202+
public List<byte[]> execute(Jedis connection) {
2203+
return connection.xread(xReadParams, streams);
2204+
}
2205+
}.runBinary(streams.length, getKeys(streams));
2206+
}
2207+
22012208
@Override
22022209
public Long xack(final byte[] key, final byte[] group, final byte[]... ids) {
22032210
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
@@ -2262,6 +2269,17 @@ public List<byte[]> execute(Jedis connection) {
22622269
}.runBinary(keys.length, keys);
22632270
}
22642271

2272+
@Override
2273+
public List<byte[]> xreadGroup(final byte[] groupname, final byte[] consumer, final XReadGroupParams xReadGroupParams,
2274+
final Entry<byte[], byte[]>... streams) {
2275+
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
2276+
@Override
2277+
public List<byte[]> execute(Jedis connection) {
2278+
return connection.xreadGroup(groupname, consumer, xReadGroupParams, streams);
2279+
}
2280+
}.runBinary(streams.length, getKeys(streams));
2281+
}
2282+
22652283
@Override
22662284
public Long xdel(final byte[] key, final byte[]... ids) {
22672285
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
@@ -2322,4 +2340,12 @@ public Object execute(Jedis connection){
23222340
}
23232341
}.runBinary(sampleKey);
23242342
}
2343+
2344+
private static byte[][] getKeys(final Entry<byte[], ?>... entries) {
2345+
byte[][] keys = new byte[entries.length][];
2346+
for (int i = 0; i < entries.length; i++) {
2347+
keys[i] = entries[i].getKey();
2348+
}
2349+
return keys;
2350+
}
23252351
}

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package redis.clients.jedis;
22

3+
import java.util.AbstractMap;
34
import java.util.ArrayList;
45
import java.util.HashMap;
56
import java.util.HashSet;
@@ -555,7 +556,32 @@ public String toString() {
555556
return "StreamEntryID";
556557
}
557558
};
558-
559+
560+
public static final Builder<List<Map.Entry<String, List<StreamEntry>>>> STREAM_READ_RESPONSE
561+
= new Builder<List<Map.Entry<String, List<StreamEntry>>>>() {
562+
@Override
563+
public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
564+
if (data == null) {
565+
return null;
566+
}
567+
List<Object> streams = (List<Object>) data;
568+
569+
List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(streams.size());
570+
for (Object streamObj : streams) {
571+
List<Object> stream = (List<Object>) streamObj;
572+
String streamId = SafeEncoder.encode((byte[]) stream.get(0));
573+
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
574+
result.add(new AbstractMap.SimpleEntry<>(streamId, streamEntries));
575+
}
576+
577+
return result;
578+
}
579+
580+
@Override
581+
public String toString() {
582+
return "List<Entry<String, List<StreamEntry>>>";
583+
}
584+
};
559585

560586
public static final Builder<List<StreamEntry>> STREAM_ENTRY_LIST = new Builder<List<StreamEntry>>() {
561587
@Override

src/main/java/redis/clients/jedis/Client.java

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@
1414
import javax.net.ssl.SSLSocketFactory;
1515

1616
import redis.clients.jedis.commands.Commands;
17-
import redis.clients.jedis.params.GeoRadiusParam;
18-
import redis.clients.jedis.params.MigrateParams;
19-
import redis.clients.jedis.params.SetParams;
20-
import redis.clients.jedis.params.ZAddParams;
21-
import redis.clients.jedis.params.ZIncrByParams;
22-
import redis.clients.jedis.params.LPosParams;
17+
import redis.clients.jedis.params.*;
2318
import redis.clients.jedis.util.SafeEncoder;
2419

2520
public class Client extends BinaryClient implements Commands {
@@ -1267,7 +1262,26 @@ public void xread(final int count, final long block, final Entry<String, StreamE
12671262
}
12681263
xread(count, block, bhash);
12691264
}
1270-
1265+
1266+
@Override
1267+
public void xread(final XReadParams params, final Map<String, StreamEntryID> streams) {
1268+
final byte[][] bparams = params.getByteParams();
1269+
final int paramLength = bparams.length;
1270+
1271+
final byte[][] args = new byte[paramLength + 1 + streams.size() * 2][];
1272+
System.arraycopy(bparams, 0, args, 0, paramLength);
1273+
1274+
args[paramLength] = Protocol.Keyword.STREAMS.raw;
1275+
int keyIndex = paramLength + 1;
1276+
int idsIndex = keyIndex + streams.size();
1277+
for (Entry<String, StreamEntryID> entry : streams.entrySet()) {
1278+
args[keyIndex++] = SafeEncoder.encode(entry.getKey());
1279+
args[idsIndex++] = SafeEncoder.encode(entry.getValue().toString());
1280+
}
1281+
1282+
sendCommand(Protocol.Command.XREAD, args);
1283+
}
1284+
12711285
@Override
12721286
public void xack(final String key, final String group, final StreamEntryID... ids) {
12731287
final byte[][] bids = new byte[ids.length][];
@@ -1322,6 +1336,30 @@ public void xreadGroup(String groupname, String consumer, int count, long block,
13221336
xreadGroup(SafeEncoder.encode(groupname), SafeEncoder.encode(consumer), count, block, noAck, bhash);
13231337
}
13241338

1339+
@Override
1340+
public void xreadGroup(String groupname, String consumer, XReadGroupParams params, Map<String, StreamEntryID> streams) {
1341+
final byte[][] bparams = params.getByteParams();
1342+
final int paramLength = bparams.length;
1343+
1344+
final byte[][] args = new byte[3 + paramLength + 1 + streams.size() * 2][];
1345+
int index = 0;
1346+
args[index++] = Protocol.Keyword.GROUP.raw;
1347+
args[index++] = SafeEncoder.encode(groupname);
1348+
args[index++] = SafeEncoder.encode(consumer);
1349+
System.arraycopy(bparams, 0, args, index, paramLength);
1350+
index += paramLength;
1351+
1352+
args[index++] = Protocol.Keyword.STREAMS.raw;
1353+
int keyIndex = index;
1354+
int idsIndex = keyIndex + streams.size();
1355+
for (Entry<String, StreamEntryID> entry : streams.entrySet()) {
1356+
args[keyIndex++] = SafeEncoder.encode(entry.getKey());
1357+
args[idsIndex++] = SafeEncoder.encode(entry.getValue().toString());
1358+
}
1359+
1360+
sendCommand(Protocol.Command.XREADGROUP, args);
1361+
}
1362+
13251363
@Override
13261364
public void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername) {
13271365
xpending(SafeEncoder.encode(key), SafeEncoder.encode(groupname), SafeEncoder.encode(start==null ? "-" : start.toString()),
@@ -1359,5 +1397,5 @@ public void xinfoConsumers(String key, String group) {
13591397
xinfoConsumers(SafeEncoder.encode(key),SafeEncoder.encode(group));
13601398

13611399
}
1362-
1400+
13631401
}

0 commit comments

Comments
 (0)