Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -2938,7 +2938,7 @@ public Long scriptExists(byte[] sha1) {
a[0] = sha1;
return scriptExists(a).get(0);
}

public List<Long> scriptExists(byte[]... sha1) {
client.scriptExists(sha1);
return client.getIntegerMultiBulkReply();
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Map<String, String> build(Object data) {
final Iterator<Object> iterator = flatHash.iterator();
while (iterator.hasNext()) {
hash.put(SafeEncoder.encode((byte[]) iterator.next()),
String.valueOf((Long) iterator.next()));
String.valueOf((Long) iterator.next()));
}

return hash;
Expand Down Expand Up @@ -285,8 +285,7 @@ public String toString() {
}

private Object evalResult(Object result) {
if (result instanceof byte[])
return SafeEncoder.encode((byte[]) result);
if (result instanceof byte[]) return SafeEncoder.encode((byte[]) result);

if (result instanceof List<?>) {
List<?> list = (List<?>) result;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/redis/clients/jedis/HostAndPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class HostAndPort {
private int port;

public HostAndPort(String host, int port) {
this.host = host;
this.host = convertHost(host);
this.port = port;
}

Expand Down Expand Up @@ -46,6 +46,7 @@ public String toString() {
private String convertHost(String host) {
if (host.equals("127.0.0.1")) return LOCALHOST_STR;
else if (host.equals("::1")) return LOCALHOST_STR;
else if (host.isEmpty()) return LOCALHOST_STR;

return host;
}
Expand Down
102 changes: 97 additions & 5 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package redis.clients.jedis;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

Expand Down Expand Up @@ -1332,6 +1339,10 @@ public Map<String, JedisPool> getClusterNodes() {
return connectionHandler.getNodes();
}

public List<JedisPool> getMasterNodes() {
return connectionHandler.getMasterNodes();
}

@Override
public Long waitReplicas(int replicas, long timeout) {
// TODO Auto-generated method stub
Expand All @@ -1346,7 +1357,19 @@ public ScanResult<Entry<String, String>> hscan(final String key, final String cu
public ScanResult<Entry<String, String>> execute(Jedis connection) {
return connection.hscan(key, cursor);
}
}.run(null);
}.run(key);
}

@Override
public ScanResult<Entry<String, String>> hscan(final String key, final String cursor,
final ScanParams params) {
return new JedisClusterCommand<ScanResult<Entry<String, String>>>(connectionHandler, timeout,
maxRedirections) {
@Override
public ScanResult<Entry<String, String>> execute(Jedis connection) {
return connection.hscan(key, cursor, params);
}
}.run(key);
}

@Override
Expand All @@ -1356,7 +1379,17 @@ public ScanResult<String> sscan(final String key, final String cursor) {
public ScanResult<String> execute(Jedis connection) {
return connection.sscan(key, cursor);
}
}.run(null);
}.run(key);
}

@Override
public ScanResult<String> sscan(final String key, final String cursor, final ScanParams params) {
return new JedisClusterCommand<ScanResult<String>>(connectionHandler, timeout, maxRedirections) {
@Override
public ScanResult<String> execute(Jedis connection) {
return connection.sscan(key, cursor, params);
}
}.run(key);
}

@Override
Expand All @@ -1366,7 +1399,17 @@ public ScanResult<Tuple> zscan(final String key, final String cursor) {
public ScanResult<Tuple> execute(Jedis connection) {
return connection.zscan(key, cursor);
}
}.run(null);
}.run(key);
}

@Override
public ScanResult<Tuple> zscan(final String key, final String cursor, final ScanParams params) {
return new JedisClusterCommand<ScanResult<Tuple>>(connectionHandler, timeout, maxRedirections) {
@Override
public ScanResult<Tuple> execute(Jedis connection) {
return connection.zscan(key, cursor, params);
}
}.run(key);
}

@Override
Expand Down Expand Up @@ -1396,7 +1439,7 @@ public List<String> blpop(final int timeout, final String key) {
public List<String> execute(Jedis connection) {
return connection.blpop(timeout, key);
}
}.run(null);
}.run(key);
}

@Override
Expand All @@ -1406,7 +1449,56 @@ public List<String> brpop(final int timeout, final String key) {
public List<String> execute(Jedis connection) {
return connection.brpop(timeout, key);
}
}.run(null);
}.run(key);
}

public ScanResult<String> scan() {
return scan(null);
}

public ScanResult<String> scan(final ScanParams params) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, Runtime.getRuntime()
.availableProcessors(), 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
connectionHandler.getNodes().size()));

List<Future<List<String>>> futures = new ArrayList<Future<List<String>>>();
for (final JedisPool entry : connectionHandler.getMasterNodes()) {
futures.add(executor.submit(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
Jedis resource = entry.getResource();
try {
List<String> result = new ArrayList<String>();
String cursor = "0";
do {
ScanResult<String> scan;
if (params == null) {
scan = resource.scan(cursor);
} else {
scan = resource.scan(cursor, params);
}
result.addAll(scan.getResult());
cursor = scan.getCursor();
} while (!cursor.equals("0"));
return result;
} finally {
entry.returnResource(resource);
}
}
}));
}

executor.shutdown();

List<String> result = new ArrayList<String>();
for (Future<List<String>> future : futures) {
try {
result.addAll(future.get());
} catch (InterruptedException ex) {
} catch (ExecutionException ex) {
}
}

return new ScanResult<String>("0", result);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.util.List;
import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey;

import java.util.Map;
Expand Down Expand Up @@ -39,14 +40,21 @@ public Map<String, JedisPool> getNodes() {
return cache.getNodes();
}

public List<JedisPool> getMasterNodes() {
return cache.getMasterNodes();
}

public void assignSlotToNode(int slot, HostAndPort targetNode) {
cache.assignSlotToNode(slot, targetNode);
}

private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) {
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
JedisPool jp = new JedisPool(poolConfig, hostAndPort.getHost(), hostAndPort.getPort());

Jedis jedis = null;
try {
jedis = jp.getResource();
cache.discoverClusterNodesAndSlots(jedis);
break;
} catch (JedisConnectionException e) {
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -18,6 +21,7 @@ public class JedisClusterInfoCache {

private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
private List<JedisPool> masters = new LinkedList<JedisPool>();

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
Expand All @@ -44,6 +48,8 @@ public void discoverClusterNodesAndSlots(Jedis jedis) {
setNodeIfNotExist(targetNode);
assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode);
}

discoverClusterSlots(jedis);
} finally {
w.unlock();
}
Expand All @@ -54,6 +60,7 @@ public void discoverClusterSlots(Jedis jedis) {

try {
this.slots.clear();
this.masters.clear();

List<Object> slots = jedis.clusterSlots();

Expand All @@ -76,6 +83,7 @@ public void discoverClusterSlots(Jedis jedis) {
HostAndPort targetNode = generateHostAndPort(hostInfos);

setNodeIfNotExist(targetNode);
masters.add(getNode(getNodeKey(targetNode)));
assignSlotsToNode(slotNums, targetNode);
}
} finally {
Expand Down Expand Up @@ -161,6 +169,15 @@ public Map<String, JedisPool> getNodes() {
}
}

public List<JedisPool> getMasterNodes() {
r.lock();
try {
return new ArrayList<JedisPool>(masters);
} finally {
r.unlock();
}
}

public static String getNodeKey(HostAndPort hnp) {
return hnp.getHost() + ":" + hnp.getPort();
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,17 @@ Set<String> zrevrangeByLex(final String key, final String max, final String min,

ScanResult<Map.Entry<String, String>> hscan(final String key, final String cursor);

ScanResult<Map.Entry<String, String>> hscan(final String key, final String cursor,
final ScanParams params);

ScanResult<String> sscan(final String key, final String cursor);

ScanResult<String> sscan(final String key, final String cursor, final ScanParams params);

ScanResult<Tuple> zscan(final String key, final String cursor);

ScanResult<Tuple> zscan(final String key, final String cursor, final ScanParams params);

Long pfadd(final String key, final String... elements);

long pfcount(final String key);
Expand Down
Loading