Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dubbo-2187] Use CompletetableFuture to rewrite the embedded ListenableFuture #2213

Merged
merged 15 commits into from
Aug 13, 2018
Prev Previous commit
Next Next commit
Reformat code & fix bugs
  • Loading branch information
chickenlj committed Aug 13, 2018
commit 23396923a56aa5b4879ba7c06ac9d9c7a53b3831
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand Down Expand Up @@ -70,12 +77,8 @@ public Monitor getMonitor(URL url) {
}

final URL monitorUrl = url;
final CompletableFuture<Monitor> completableFuture = CompletableFuture.supplyAsync(()->{
Monitor newMonitor = AbstractMonitorFactory.this.createMonitor(monitorUrl);
return newMonitor;

});
completableFuture.thenRunAsync(new MonitorListener(key),executor);
final CompletableFuture<Monitor> completableFuture = CompletableFuture.supplyAsync(() -> AbstractMonitorFactory.this.createMonitor(monitorUrl));
completableFuture.thenRunAsync(new MonitorListener(key), executor);
FUTURES.put(key, completableFuture);

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@
*/
package org.apache.dubbo.remoting.zookeeper.zkclient;

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.zookeeper.Watcher.Event.KeeperState;

import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Zkclient wrapper class that can monitor the state of the connection automatically after the connection is out of time
Expand All @@ -36,24 +39,20 @@
* @date 2017/10/29
*/
public class ZkClientWrapper {
private static final ExecutorService executor =
new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new NamedThreadFactory("DubboMonitorCreator", true));
Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class);

private long timeout;
private ZkClient client;
private volatile KeeperState state;
private CompletableFuture<ZkClient> completableFuture;
private volatile boolean started = false;
private String serverAddr;
private static final ExecutorService executor =
new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new NamedThreadFactory("DubboMonitorCreator", true));

public ZkClientWrapper(final String serverAddr, long timeout) {
this.timeout = timeout;
completableFuture = CompletableFuture.supplyAsync(() -> {
ZkClient client = new ZkClient(serverAddr, Integer.MAX_VALUE);
return client;
});
completableFuture = CompletableFuture.supplyAsync(() -> new ZkClient(serverAddr, Integer.MAX_VALUE));
}

public void start() {
Expand All @@ -70,18 +69,13 @@ public void start() {
}

public void addListener(final IZkStateListener listener) {
completableFuture.whenComplete((value,exception)->{
if (exception != null){
logger.error("Got an exception when completableFuture finished in ZkClientWrapper, please check!", exception);
}
try {
completableFuture.whenComplete((value, exception) -> {
if (exception != null) {
logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", exception);
} else {
client = value;
client.subscribeStateChanges(listener);
} catch (Exception e){
logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e);
}


});
}

Expand Down