Skip to content

Commit 04e94e6

Browse files
committed
Merge pull request #687 from HeartSaVioR/cluster-support-multi-key-new
Supports Multi Key commands to JedisCluster (revised of #673)
2 parents 18308d1 + d584951 commit 04e94e6

File tree

8 files changed

+957
-101
lines changed

8 files changed

+957
-101
lines changed

src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Lines changed: 328 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package redis.clients.jedis;
22

3+
import redis.clients.util.KeyMergeUtil;
34
import redis.clients.util.SafeEncoder;
45

56
import java.io.Closeable;
@@ -11,7 +12,7 @@
1112

1213
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
1314

14-
public class BinaryJedisCluster implements BinaryJedisCommands,
15+
public class BinaryJedisCluster implements BinaryJedisCommands, MultiKeyBinaryJedisClusterCommands,
1516
JedisClusterBinaryScriptingCommands, Closeable {
1617

1718
public static final short HASHSLOTS = 16384;
@@ -52,6 +53,10 @@ public void close() throws IOException {
5253
}
5354
}
5455

56+
public Map<String, JedisPool> getClusterNodes() {
57+
return connectionHandler.getNodes();
58+
}
59+
5560
@Override
5661
public String set(final byte[] key, final byte[] value) {
5762
return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
@@ -1194,10 +1199,6 @@ public Long execute(Jedis connection) {
11941199
}.runBinary(key);
11951200
}
11961201

1197-
public Map<String, JedisPool> getClusterNodes() {
1198-
return connectionHandler.getNodes();
1199-
}
1200-
12011202
@Override
12021203
public Object eval(final byte[] script, final byte[] keyCount, final byte[]... params) {
12031204
return new JedisClusterCommand<Object>(connectionHandler, maxRedirections) {
@@ -1308,4 +1309,326 @@ public String execute(Jedis connection) {
13081309
}.runBinary(key);
13091310
}
13101311

1312+
@Override
1313+
public Long del(final byte[]... keys) {
1314+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1315+
@Override
1316+
public Long execute(Jedis connection) {
1317+
return connection.del(keys);
1318+
}
1319+
}.runBinary(keys.length, keys);
1320+
}
1321+
1322+
@Override
1323+
public List<byte[]> blpop(final int timeout, final byte[]... keys) {
1324+
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxRedirections) {
1325+
@Override
1326+
public List<byte[]> execute(Jedis connection) {
1327+
return connection.blpop(timeout, keys);
1328+
}
1329+
}.runBinary(keys.length, keys);
1330+
}
1331+
1332+
@Override
1333+
public List<byte[]> brpop(final int timeout, final byte[]... keys) {
1334+
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxRedirections) {
1335+
@Override
1336+
public List<byte[]> execute(Jedis connection) {
1337+
return connection.brpop(timeout, keys);
1338+
}
1339+
}.runBinary(keys.length, keys);
1340+
}
1341+
1342+
@Override
1343+
public List<byte[]> mget(final byte[]... keys) {
1344+
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxRedirections) {
1345+
@Override
1346+
public List<byte[]> execute(Jedis connection) {
1347+
return connection.mget(keys);
1348+
}
1349+
}.runBinary(keys.length - 1, keys);
1350+
}
1351+
1352+
@Override
1353+
public String mset(final byte[]... keysvalues) {
1354+
byte[][] keys = new byte[keysvalues.length / 2][];
1355+
1356+
for (int keyIdx = 0; keyIdx < keys.length; keyIdx++) {
1357+
keys[keyIdx] = keysvalues[keyIdx * 2];
1358+
}
1359+
1360+
return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
1361+
@Override
1362+
public String execute(Jedis connection) {
1363+
return connection.mset(keysvalues);
1364+
}
1365+
}.runBinary(keys.length, keys);
1366+
}
1367+
1368+
@Override
1369+
public Long msetnx(final byte[]... keysvalues) {
1370+
byte[][] keys = new byte[keysvalues.length / 2][];
1371+
1372+
for (int keyIdx = 0; keyIdx < keys.length; keyIdx++) {
1373+
keys[keyIdx] = keysvalues[keyIdx * 2];
1374+
}
1375+
1376+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1377+
@Override
1378+
public Long execute(Jedis connection) {
1379+
return connection.msetnx(keysvalues);
1380+
}
1381+
}.runBinary(keys.length, keys);
1382+
}
1383+
1384+
@Override
1385+
public String rename(final byte[] oldkey, final byte[] newkey) {
1386+
return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
1387+
@Override
1388+
public String execute(Jedis connection) {
1389+
return connection.rename(oldkey, newkey);
1390+
}
1391+
}.runBinary(2, oldkey, newkey);
1392+
}
1393+
1394+
@Override
1395+
public Long renamenx(final byte[] oldkey, final byte[] newkey) {
1396+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1397+
@Override
1398+
public Long execute(Jedis connection) {
1399+
return connection.renamenx(oldkey, newkey);
1400+
}
1401+
}.runBinary(2, oldkey, newkey);
1402+
}
1403+
1404+
@Override
1405+
public byte[] rpoplpush(final byte[] srckey, final byte[] dstkey) {
1406+
return new JedisClusterCommand<byte[]>(connectionHandler, maxRedirections) {
1407+
@Override
1408+
public byte[] execute(Jedis connection) {
1409+
return connection.rpoplpush(srckey, dstkey);
1410+
}
1411+
}.runBinary(2, srckey, dstkey);
1412+
}
1413+
1414+
@Override
1415+
public Set<byte[]> sdiff(final byte[]... keys) {
1416+
return new JedisClusterCommand<Set<byte[]>>(connectionHandler, maxRedirections) {
1417+
@Override
1418+
public Set<byte[]> execute(Jedis connection) {
1419+
return connection.sdiff(keys);
1420+
}
1421+
}.runBinary(keys.length, keys);
1422+
}
1423+
1424+
@Override
1425+
public Long sdiffstore(final byte[] dstkey, final byte[]... keys) {
1426+
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, keys);
1427+
1428+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1429+
@Override
1430+
public Long execute(Jedis connection) {
1431+
return connection.sdiffstore(dstkey, keys);
1432+
}
1433+
}.runBinary(wholeKeys.length, wholeKeys);
1434+
}
1435+
1436+
@Override
1437+
public Set<byte[]> sinter(final byte[]... keys) {
1438+
return new JedisClusterCommand<Set<byte[]>>(connectionHandler, maxRedirections) {
1439+
@Override
1440+
public Set<byte[]> execute(Jedis connection) {
1441+
return connection.sinter(keys);
1442+
}
1443+
}.runBinary(keys.length, keys);
1444+
}
1445+
1446+
@Override
1447+
public Long sinterstore(final byte[] dstkey, final byte[]... keys) {
1448+
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, keys);
1449+
1450+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1451+
@Override
1452+
public Long execute(Jedis connection) {
1453+
return connection.sinterstore(dstkey, keys);
1454+
}
1455+
}.runBinary(wholeKeys.length, wholeKeys);
1456+
}
1457+
1458+
@Override
1459+
public Long smove(final byte[] srckey, final byte[] dstkey, final byte[] member) {
1460+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1461+
@Override
1462+
public Long execute(Jedis connection) {
1463+
return connection.smove(srckey, dstkey, member);
1464+
}
1465+
}.runBinary(2, srckey, dstkey);
1466+
}
1467+
1468+
@Override
1469+
public Long sort(final byte[] key, final SortingParams sortingParameters, final byte[] dstkey) {
1470+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1471+
@Override
1472+
public Long execute(Jedis connection) {
1473+
return connection.sort(key, sortingParameters, dstkey);
1474+
}
1475+
}.runBinary(2, key, dstkey);
1476+
}
1477+
1478+
@Override
1479+
public Long sort(final byte[] key, final byte[] dstkey) {
1480+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1481+
@Override
1482+
public Long execute(Jedis connection) {
1483+
return connection.sort(key, dstkey);
1484+
}
1485+
}.runBinary(2, key, dstkey);
1486+
}
1487+
1488+
@Override
1489+
public Set<byte[]> sunion(final byte[]... keys) {
1490+
return new JedisClusterCommand<Set<byte[]>>(connectionHandler, maxRedirections) {
1491+
@Override
1492+
public Set<byte[]> execute(Jedis connection) {
1493+
return connection.sunion(keys);
1494+
}
1495+
}.runBinary(keys.length, keys);
1496+
}
1497+
1498+
@Override
1499+
public Long sunionstore(final byte[] dstkey, final byte[]... keys) {
1500+
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, keys);
1501+
1502+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1503+
@Override
1504+
public Long execute(Jedis connection) {
1505+
return connection.sunionstore(dstkey, keys);
1506+
}
1507+
}.runBinary(wholeKeys.length, wholeKeys);
1508+
}
1509+
1510+
@Override
1511+
public Long zinterstore(final byte[] dstkey, final byte[]... sets) {
1512+
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, sets);
1513+
1514+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1515+
@Override
1516+
public Long execute(Jedis connection) {
1517+
return connection.zinterstore(dstkey, sets);
1518+
}
1519+
}.runBinary(wholeKeys.length, wholeKeys);
1520+
}
1521+
1522+
@Override
1523+
public Long zinterstore(final byte[] dstkey, final ZParams params, final byte[]... sets) {
1524+
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, sets);
1525+
1526+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1527+
@Override
1528+
public Long execute(Jedis connection) {
1529+
return connection.zinterstore(dstkey, params, sets);
1530+
}
1531+
}.runBinary(wholeKeys.length, wholeKeys);
1532+
}
1533+
1534+
@Override
1535+
public Long zunionstore(final byte[] dstkey, final byte[]... sets) {
1536+
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, sets);
1537+
1538+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1539+
@Override
1540+
public Long execute(Jedis connection) {
1541+
return connection.zunionstore(dstkey, sets);
1542+
}
1543+
}.runBinary(wholeKeys.length, wholeKeys);
1544+
}
1545+
1546+
@Override
1547+
public Long zunionstore(final byte[] dstkey, final ZParams params, final byte[]... sets) {
1548+
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, sets);
1549+
1550+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1551+
@Override
1552+
public Long execute(Jedis connection) {
1553+
return connection.zunionstore(dstkey, params, sets);
1554+
}
1555+
}.runBinary(wholeKeys.length, wholeKeys);
1556+
}
1557+
1558+
@Override
1559+
public byte[] brpoplpush(final byte[] source, final byte[] destination, final int timeout) {
1560+
return new JedisClusterCommand<byte[]>(connectionHandler, maxRedirections) {
1561+
@Override
1562+
public byte[] execute(Jedis connection) {
1563+
return connection.brpoplpush(source, destination, timeout);
1564+
}
1565+
}.runBinary(2, source, destination);
1566+
}
1567+
1568+
@Override
1569+
public Long publish(final byte[] channel, final byte[] message) {
1570+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1571+
@Override
1572+
public Long execute(Jedis connection) {
1573+
return connection.publish(channel, message);
1574+
}
1575+
}.runWithAnyNode();
1576+
}
1577+
1578+
@Override
1579+
public void subscribe(final BinaryJedisPubSub jedisPubSub, final byte[]... channels) {
1580+
new JedisClusterCommand<Integer>(connectionHandler, maxRedirections) {
1581+
@Override
1582+
public Integer execute(Jedis connection) {
1583+
connection.subscribe(jedisPubSub, channels);
1584+
return 0;
1585+
}
1586+
}.runWithAnyNode();
1587+
}
1588+
1589+
@Override
1590+
public void psubscribe(final BinaryJedisPubSub jedisPubSub, final byte[]... patterns) {
1591+
new JedisClusterCommand<Integer>(connectionHandler, maxRedirections) {
1592+
@Override
1593+
public Integer execute(Jedis connection) {
1594+
connection.subscribe(jedisPubSub, patterns);
1595+
return 0;
1596+
}
1597+
}.runWithAnyNode();
1598+
}
1599+
1600+
@Override
1601+
public Long bitop(final BitOP op, final byte[] destKey, final byte[]... srcKeys) {
1602+
byte[][] wholeKeys = KeyMergeUtil.merge(destKey, srcKeys);
1603+
1604+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1605+
@Override
1606+
public Long execute(Jedis connection) {
1607+
return connection.bitop(op, destKey, srcKeys);
1608+
}
1609+
}.runBinary(wholeKeys.length, wholeKeys);
1610+
}
1611+
1612+
@Override
1613+
public String pfmerge(final byte[] destkey, final byte[]... sourcekeys) {
1614+
byte[][] wholeKeys = KeyMergeUtil.merge(destkey, sourcekeys);
1615+
1616+
return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
1617+
@Override
1618+
public String execute(Jedis connection) {
1619+
return connection.pfmerge(destkey, sourcekeys);
1620+
}
1621+
}.runBinary(wholeKeys.length, wholeKeys);
1622+
}
1623+
1624+
@Override
1625+
public Long pfcount(final byte[]... keys) {
1626+
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
1627+
@Override
1628+
public Long execute(Jedis connection) {
1629+
return connection.pfcount(keys);
1630+
}
1631+
}.runBinary(keys.length, keys);
1632+
}
1633+
13111634
}

0 commit comments

Comments
 (0)