Skip to content

Commit f91923f

Browse files
committed
Merge remote-tracking branch 'redis/master' into xread-block-2
2 parents 67a17bb + 182cb96 commit f91923f

22 files changed

+426
-43
lines changed

.github/release-drafter-config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
name-template: 'Version $NEXT_PATCH_VERSION🌈'
2-
tag-template: 'v$NEXT_PATCH_VERSION'
1+
name-template: '$NEXT_PATCH_VERSION🌈'
2+
tag-template: 'jedis-$NEXT_PATCH_VERSION'
33
categories:
44
- title: '🚀Features'
55
labels:

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ Or use it as a maven dependency:
5858
<dependency>
5959
<groupId>redis.clients</groupId>
6060
<artifactId>jedis</artifactId>
61-
<version>3.5.1</version>
61+
<version>3.5.2</version>
6262
<type>jar</type>
6363
<scope>compile</scope>
6464
</dependency>

src/main/java/redis/clients/jedis/BinaryClient.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1749,7 +1749,7 @@ public void xpendingSummary(final byte[] key, final byte[] groupname) {
17491749
public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
17501750
long newIdleTime, int retries, boolean force, byte[][] ids) {
17511751

1752-
ArrayList<byte[]> arguments = new ArrayList<>(10 + ids.length);
1752+
List<byte[]> arguments = new ArrayList<>(10 + ids.length);
17531753

17541754
arguments.add(key);
17551755
arguments.add(groupname);
@@ -1772,6 +1772,37 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId
17721772
sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][]));
17731773
}
17741774

1775+
private void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
1776+
XClaimParams params, byte[][] ids, boolean justId) {
1777+
final byte[][] bparams = params.getByteParams();
1778+
final int paramLength = bparams.length;
1779+
final int idsLength = ids.length;
1780+
final byte[][] args = new byte[4 + paramLength + idsLength + (justId ? 1 : 0)][];
1781+
int index = 0;
1782+
args[index++] = key;
1783+
args[index++] = groupname;
1784+
args[index++] = consumername;
1785+
args[index++] = toByteArray(minIdleTime);
1786+
System.arraycopy(ids, 0, args, index, idsLength);
1787+
index += idsLength;
1788+
System.arraycopy(bparams, 0, args, index, paramLength);
1789+
index += paramLength;
1790+
if (justId) {
1791+
args[index++] = Keyword.JUSTID.getRaw();
1792+
}
1793+
sendCommand(XCLAIM, args);
1794+
}
1795+
1796+
public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
1797+
XClaimParams params, byte[]... ids) {
1798+
xclaim(key, groupname, consumername, minIdleTime, params, ids, false);
1799+
}
1800+
1801+
public void xclaimJustId(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
1802+
XClaimParams params, byte[]... ids) {
1803+
xclaim(key, groupname, consumername, minIdleTime, params, ids, true);
1804+
}
1805+
17751806
public void xinfoStream(byte[] key) {
17761807
sendCommand(XINFO, Keyword.STREAM.getRaw(), key);
17771808
}

src/main/java/redis/clients/jedis/BinaryJedis.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4583,6 +4583,22 @@ public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, lo
45834583
return client.getBinaryMultiBulkReply();
45844584
}
45854585

4586+
@Override
4587+
public List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
4588+
XClaimParams params, byte[]... ids) {
4589+
checkIsInMultiOrPipeline();
4590+
client.xclaim(key, group, consumername, minIdleTime, params, ids);
4591+
return client.getBinaryMultiBulkReply();
4592+
}
4593+
4594+
@Override
4595+
public List<byte[]> xclaimJustId(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
4596+
XClaimParams params, byte[]... ids) {
4597+
checkIsInMultiOrPipeline();
4598+
client.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
4599+
return client.getBinaryMultiBulkReply();
4600+
}
4601+
45864602
@Override
45874603
public StreamInfo xinfoStream(byte[] key) {
45884604
checkIsInMultiOrPipeline();

src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2530,6 +2530,28 @@ public List<byte[]> execute(Jedis connection) {
25302530
}.runBinary(key);
25312531
}
25322532

2533+
@Override
2534+
public List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
2535+
XClaimParams params, byte[]... ids) {
2536+
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
2537+
@Override
2538+
public List<byte[]> execute(Jedis connection) {
2539+
return connection.xclaim(key, group, consumername, minIdleTime, params, ids);
2540+
}
2541+
}.runBinary(key);
2542+
}
2543+
2544+
@Override
2545+
public List<byte[]> xclaimJustId(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
2546+
XClaimParams params, byte[]... ids) {
2547+
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
2548+
@Override
2549+
public List<byte[]> execute(Jedis connection) {
2550+
return connection.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
2551+
}
2552+
}.runBinary(key);
2553+
}
2554+
25332555
@Override
25342556
public Long waitReplicas(final byte[] key, final int replicas, final long timeout) {
25352557
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {

src/main/java/redis/clients/jedis/BinaryShardedJedis.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import redis.clients.jedis.params.GeoRadiusParam;
1515
import redis.clients.jedis.params.GetExParams;
1616
import redis.clients.jedis.params.SetParams;
17+
import redis.clients.jedis.params.XClaimParams;
1718
import redis.clients.jedis.params.ZAddParams;
1819
import redis.clients.jedis.params.ZIncrByParams;
1920
import redis.clients.jedis.params.LPosParams;
@@ -1176,6 +1177,20 @@ public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, lo
11761177
return j.xclaim(key, groupname, consumername, minIdleTime, newIdleTime, retries, force, ids);
11771178
}
11781179

1180+
@Override
1181+
public List<byte[]> xclaim(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
1182+
XClaimParams params, byte[]... ids) {
1183+
Jedis j = getShard(key);
1184+
return j.xclaim(key, group, consumername, minIdleTime, params, ids);
1185+
}
1186+
1187+
@Override
1188+
public List<byte[]> xclaimJustId(byte[] key, byte[] group, byte[] consumername, long minIdleTime,
1189+
XClaimParams params, byte[]... ids) {
1190+
Jedis j = getShard(key);
1191+
return j.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
1192+
}
1193+
11791194
@Override
11801195
public StreamInfo xinfoStream(byte[] key) {
11811196
Jedis j = getShard(key);

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -688,29 +688,52 @@ public String toString() {
688688
}
689689
};
690690

691-
public static final Builder<List<Map.Entry<String, List<StreamEntry>>>> STREAM_READ_RESPONSE
692-
= new Builder<List<Map.Entry<String, List<StreamEntry>>>>() {
691+
public static final Builder<List<StreamEntryID>> STREAM_ENTRY_ID_LIST = new Builder<List<StreamEntryID>>() {
693692
@Override
694-
public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
695-
if (data == null) {
693+
@SuppressWarnings("unchecked")
694+
public List<StreamEntryID> build(Object data) {
695+
if (null == data) {
696696
return null;
697697
}
698-
List<Object> streams = (List<Object>) data;
698+
List<Object> objectList = (List<Object>) data;
699+
List<StreamEntryID> responses = new ArrayList<>(objectList.size());
700+
if (!objectList.isEmpty()) {
701+
for(Object object : objectList) {
702+
responses.add(STREAM_ENTRY_ID.build(object));
703+
}
704+
}
705+
return responses;
706+
}
707+
};
699708

700-
List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(streams.size());
701-
for (Object streamObj : streams) {
702-
List<Object> stream = (List<Object>) streamObj;
703-
String streamId = SafeEncoder.encode((byte[]) stream.get(0));
704-
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
705-
result.add(new AbstractMap.SimpleEntry<>(streamId, streamEntries));
709+
public static final Builder<StreamEntry> STREAM_ENTRY = new Builder<StreamEntry>() {
710+
@Override
711+
@SuppressWarnings("unchecked")
712+
public StreamEntry build(Object data) {
713+
if (null == data) {
714+
return null;
706715
}
716+
List<Object> objectList = (List<Object>) data;
707717

708-
return result;
718+
if (objectList.isEmpty()) {
719+
return null;
720+
}
721+
722+
String entryIdString = SafeEncoder.encode((byte[]) objectList.get(0));
723+
StreamEntryID entryID = new StreamEntryID(entryIdString);
724+
List<byte[]> hash = (List<byte[]>) objectList.get(1);
725+
726+
Iterator<byte[]> hashIterator = hash.iterator();
727+
Map<String, String> map = new HashMap<>(hash.size() / 2);
728+
while (hashIterator.hasNext()) {
729+
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
730+
}
731+
return new StreamEntry(entryID, map);
709732
}
710733

711734
@Override
712735
public String toString() {
713-
return "List<Entry<String, List<StreamEntry>>>";
736+
return "StreamEntry";
714737
}
715738
};
716739

@@ -754,34 +777,29 @@ public String toString() {
754777
}
755778
};
756779

757-
public static final Builder<StreamEntry> STREAM_ENTRY = new Builder<StreamEntry>() {
780+
public static final Builder<List<Map.Entry<String, List<StreamEntry>>>> STREAM_READ_RESPONSE
781+
= new Builder<List<Map.Entry<String, List<StreamEntry>>>>() {
758782
@Override
759-
@SuppressWarnings("unchecked")
760-
public StreamEntry build(Object data) {
761-
if (null == data) {
783+
public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
784+
if (data == null) {
762785
return null;
763786
}
764-
List<Object> objectList = (List<Object>) data;
787+
List<Object> streams = (List<Object>) data;
765788

766-
if (objectList.isEmpty()) {
767-
return null;
789+
List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(streams.size());
790+
for (Object streamObj : streams) {
791+
List<Object> stream = (List<Object>) streamObj;
792+
String streamId = SafeEncoder.encode((byte[]) stream.get(0));
793+
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
794+
result.add(new AbstractMap.SimpleEntry<>(streamId, streamEntries));
768795
}
769796

770-
String entryIdString = SafeEncoder.encode((byte[]) objectList.get(0));
771-
StreamEntryID entryID = new StreamEntryID(entryIdString);
772-
List<byte[]> hash = (List<byte[]>) objectList.get(1);
773-
774-
Iterator<byte[]> hashIterator = hash.iterator();
775-
Map<String, String> map = new HashMap<>(hash.size() / 2);
776-
while (hashIterator.hasNext()) {
777-
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
778-
}
779-
return new StreamEntry(entryID, map);
797+
return result;
780798
}
781799

782800
@Override
783801
public String toString() {
784-
return "StreamEntry";
802+
return "List<Entry<String, List<StreamEntry>>>";
785803
}
786804
};
787805

src/main/java/redis/clients/jedis/Client.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,14 +1476,26 @@ public void xpendingSummary(String key, String groupname) {
14761476
@Override
14771477
public void xclaim(String key, String group, String consumername, long minIdleTime,
14781478
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
1479-
1480-
final byte[][] bids = new byte[ids.length][];
1481-
for (int i = 0; i < ids.length; i++) {
1482-
bids[i] = SafeEncoder.encode(ids[i].toString());
1483-
}
1479+
final byte[][] bids = convertStreamEntryIDsToBinary(ids);
14841480
xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername), minIdleTime, newIdleTime, retries, force, bids);
14851481
}
14861482

1483+
@Override
1484+
public void xclaim(String key, String group, String consumername, long minIdleTime,
1485+
XClaimParams params, StreamEntryID... ids) {
1486+
final byte[][] bids = convertStreamEntryIDsToBinary(ids);
1487+
xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername),
1488+
minIdleTime, params, bids);
1489+
}
1490+
1491+
@Override
1492+
public void xclaimJustId(String key, String group, String consumername, long minIdleTime,
1493+
XClaimParams params, StreamEntryID... ids) {
1494+
final byte[][] bids = convertStreamEntryIDsToBinary(ids);
1495+
xclaimJustId(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername),
1496+
minIdleTime, params, bids);
1497+
}
1498+
14871499
@Override
14881500
public void xinfoStream(String key) {
14891501
xinfoStream(SafeEncoder.encode(key));
@@ -1499,4 +1511,11 @@ public void xinfoConsumers(String key, String group) {
14991511
xinfoConsumers(SafeEncoder.encode(key), SafeEncoder.encode(group));
15001512
}
15011513

1514+
private byte[][] convertStreamEntryIDsToBinary(StreamEntryID... ids) {
1515+
final byte[][] bids = new byte[ids.length][];
1516+
for (int i = 0; i < ids.length; i++) {
1517+
bids[i] = SafeEncoder.encode(ids[i].toString());
1518+
}
1519+
return bids;
1520+
}
15021521
}

src/main/java/redis/clients/jedis/Jedis.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4197,6 +4197,24 @@ public List<StreamEntry> xclaim(String key, String group, String consumername, l
41974197
return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply());
41984198
}
41994199

4200+
@Override
4201+
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
4202+
XClaimParams params, StreamEntryID... ids) {
4203+
checkIsInMultiOrPipeline();
4204+
client.xclaim(key, group, consumername, minIdleTime, params, ids);
4205+
4206+
return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply());
4207+
}
4208+
4209+
@Override
4210+
public List<StreamEntryID> xclaimJustId(String key, String group, String consumername,
4211+
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
4212+
checkIsInMultiOrPipeline();
4213+
client.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
4214+
4215+
return BuilderFactory.STREAM_ENTRY_ID_LIST.build(client.getObjectMultiBulkReply());
4216+
}
4217+
42004218
@Override
42014219
public StreamInfo xinfoStream(String key) {
42024220
client.xinfoStream(key);

src/main/java/redis/clients/jedis/JedisCluster.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2622,6 +2622,28 @@ public List<StreamEntry> execute(Jedis connection) {
26222622
}.run(key);
26232623
}
26242624

2625+
@Override
2626+
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
2627+
XClaimParams params, StreamEntryID... ids) {
2628+
return new JedisClusterCommand<List<StreamEntry>>(connectionHandler, maxAttempts) {
2629+
@Override
2630+
public List<StreamEntry> execute(Jedis connection) {
2631+
return connection.xclaim(key, group, consumername, minIdleTime, params, ids);
2632+
}
2633+
}.run(key);
2634+
}
2635+
2636+
@Override
2637+
public List<StreamEntryID> xclaimJustId(String key, String group, String consumername,
2638+
long minIdleTime, XClaimParams params, StreamEntryID... ids) {
2639+
return new JedisClusterCommand<List<StreamEntryID>>(connectionHandler, maxAttempts) {
2640+
@Override
2641+
public List<StreamEntryID> execute(Jedis connection) {
2642+
return connection.xclaimJustId(key, group, consumername, minIdleTime, params, ids);
2643+
}
2644+
}.run(key);
2645+
}
2646+
26252647
public Long waitReplicas(final String key, final int replicas, final long timeout) {
26262648
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
26272649
@Override

0 commit comments

Comments
 (0)