diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/concurrent/ListenableFuture.java b/dubbo-common/src/main/java/org/apache/dubbo/common/concurrent/ListenableFuture.java deleted file mode 100644 index ce6196e50c3..00000000000 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/concurrent/ListenableFuture.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.common.concurrent; - -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RejectedExecutionException; - -/** - * A {@link Future} that accepts completion listeners. Each listener has an - * associated executor, and it is invoked using this executor once the future's - * computation is {@linkplain Future#isDone() complete}. If the computation has - * already completed when the listener is added, the listener will execute - * immediately. - *

- *

See the Guava User Guide article on - * {@code ListenableFuture}. - *

- *

Purpose

- *

- *

Most commonly, {@code ListenableFuture} is used as an input to another - * derived {@code Future}, as in {@link Futures#allAsList(Iterable) - * Futures.allAsList}. Many such methods are impossible to implement efficiently - * without listener support. - *

- *

It is possible to call {@link #addListener addListener} directly, but this - * is uncommon because the {@code Runnable} interface does not provide direct - * access to the {@code Future} result. (Users who want such access may prefer - * {@link Futures#addCallback Futures.addCallback}.) Still, direct {@code - * addListener} calls are occasionally useful:

   {@code
- *   final String name = ...;
- *   inFlight.add(name);
- *   ListenableFuture future = service.query(name);
- *   future.addListener(new Runnable() {
- *     public void run() {
- *       processedCount.incrementAndGet();
- *       inFlight.remove(name);
- *       lastProcessed.set(name);
- *       logger.info("Done with {0}", name);
- *     }
- *   }, executor);}
- *

- *

How to get an instance

- *

- *

Developers are encouraged to return {@code ListenableFuture} from their - * methods so that users can take advantages of the utilities built atop the - * class. The way that they will create {@code ListenableFuture} instances - * depends on how they currently create {@code Future} instances: - *

- *

- *

Occasionally, an API will return a plain {@code Future} and it will be - * impossible to change the return type. For this case, we provide a more - * expensive workaround in {@code JdkFutureAdapters}. However, when possible, it - * is more efficient and reliable to create a {@code ListenableFuture} directly. - */ -public interface ListenableFuture extends Future { - /** - * Registers a listener to be {@linkplain Executor#execute(Runnable) run} on - * the given executor. The listener will run when the {@code Future}'s - * computation is {@linkplain Future#isDone() complete} or, if the computation - * is already complete, immediately. - *

- *

There is no guaranteed ordering of execution of listeners, but any - * listener added through this method is guaranteed to be called once the - * computation is complete. - *

- *

Exceptions thrown by a listener will be propagated up to the executor. - * Any exception thrown during {@code Executor.execute} (e.g., a {@code - * RejectedExecutionException} or an exception thrown by {@linkplain - * MoreExecutors#sameThreadExecutor inline execution}) will be caught and - * logged. - *

- *

Note: For fast, lightweight listeners that would be safe to execute in - * any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier - * listeners, {@code sameThreadExecutor()} carries some caveats. For - * example, the listener may run on an unpredictable or undesirable thread: - *

- *

- *

- *

Also note that, regardless of which thread executes the - * {@code sameThreadExecutor()} listener, all other registered but unexecuted - * listeners are prevented from running during its execution, even if those - * listeners are to run in other executors. - *

- *

This is the most general listener interface. For common operations - * performed using listeners, see {@link - * com.google.common.util.concurrent.Futures}. For a simplified but general - * listener interface, see {@link - * com.google.common.util.concurrent.Futures#addCallback addCallback()}. - * - * @param listener the listener to run when the computation is complete - * @param executor the executor to run the listener in - * @throws NullPointerException if the executor or listener was null - * @throws RejectedExecutionException if we tried to execute the listener - * immediately but the executor rejected it. - */ - void addListener(Runnable listener, Executor executor); - - void addListener(Runnable listener); -} diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/concurrent/ListenableFutureTask.java b/dubbo-common/src/main/java/org/apache/dubbo/common/concurrent/ListenableFutureTask.java deleted file mode 100644 index dd1b8798c6b..00000000000 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/concurrent/ListenableFutureTask.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.common.concurrent; - -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.FutureTask; - -/** - * A {@link FutureTask} that also implements the {@link ListenableFuture} - * interface. Unlike {@code FutureTask}, {@code ListenableFutureTask} does not - * provide an overrideable {@link FutureTask#done() done()} method. For similar - * functionality, call {@link #addListener}. - */ -public class ListenableFutureTask extends FutureTask - implements ListenableFuture { - // TODO(cpovirk): explore ways of making ListenableFutureTask final. There are - // some valid reasons such as BoundedQueueExecutorService to allow extends but it - // would be nice to make it final to avoid unintended usage. - - // The execution list to hold our listeners. - private final ExecutionList executionList = new ExecutionList(); - - /** - * Creates a {@code ListenableFutureTask} that will upon running, execute the - * given {@code Callable}. - * - * @param callable the callable task - * @since 10.0 - */ - public static ListenableFutureTask create(Callable callable) { - return new ListenableFutureTask(callable); - } - - /** - * Creates a {@code ListenableFutureTask} that will upon running, execute the - * given {@code Runnable}, and arrange that {@code get} will return the - * given result on successful completion. - * - * @param runnable the runnable task - * @param result the result to return on successful completion. If you don't - * need a particular result, consider using constructions of the form: - * {@code ListenableFuture f = ListenableFutureTask.create(runnable, - * null)} - * @since 10.0 - */ - public static ListenableFutureTask create( - Runnable runnable, V result) { - return new ListenableFutureTask(runnable, result); - } - - ListenableFutureTask(Callable callable) { - super(callable); - } - - ListenableFutureTask(Runnable runnable, V result) { - super(runnable, result); - } - - @Override - public void addListener(Runnable listener, Executor exec) { - executionList.add(listener, exec); - } - - @Override - public void addListener(Runnable listener) { - executionList.add(listener, null); - } - - /** - * Internal implementation detail used to invoke the listeners. - */ - @Override - protected void done() { - executionList.execute(); - } -} \ No newline at end of file diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/concurrent/ListenableFutureTaskTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/concurrent/CompletableFutureTaskTest.java similarity index 56% rename from dubbo-common/src/test/java/org/apache/dubbo/common/concurrent/ListenableFutureTaskTest.java rename to dubbo-common/src/test/java/org/apache/dubbo/common/concurrent/CompletableFutureTaskTest.java index 0dc9e144685..8e426ad707c 100644 --- a/dubbo-common/src/test/java/org/apache/dubbo/common/concurrent/ListenableFutureTaskTest.java +++ b/dubbo-common/src/test/java/org/apache/dubbo/common/concurrent/CompletableFutureTaskTest.java @@ -16,69 +16,72 @@ */ package org.apache.dubbo.common.concurrent; -import org.junit.Test; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.junit.Test; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -public class ListenableFutureTaskTest { +public class CompletableFutureTaskTest { + + private static final ExecutorService executor = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new NamedThreadFactory("DubboMonitorCreator", true)); + @Test public void testCreate() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); - ListenableFutureTask futureTask = ListenableFutureTask.create(new Callable() { - @Override - public Boolean call() throws Exception { - countDownLatch.countDown(); - return true; - } - }); - futureTask.run(); + CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { + countDownLatch.countDown(); + return true; + },executor); countDownLatch.await(); } @Test public void testRunnableResponse() throws ExecutionException, InterruptedException { - ListenableFutureTask futureTask = ListenableFutureTask.create(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } + CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); } - }, true); - futureTask.run(); + return true; + }, executor); - Boolean result = futureTask.get(); + Boolean result = completableFuture.get(); assertThat(result, is(true)); } @Test public void testListener() throws InterruptedException { - ListenableFutureTask futureTask = ListenableFutureTask.create(new Callable() { - @Override - public String call() throws Exception { + CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { + try { Thread.sleep(500); - return "hello"; + } catch (InterruptedException e) { + e.printStackTrace(); } - }); + return "hello"; + + },executor); final CountDownLatch countDownLatch = new CountDownLatch(1); - futureTask.addListener(new Runnable() { + completableFuture.thenRunAsync(new Runnable() { @Override public void run() { countDownLatch.countDown(); } }); - futureTask.run(); countDownLatch.await(); } @@ -86,15 +89,9 @@ public void run() { @Test public void testCustomExecutor() { Executor mockedExecutor = mock(Executor.class); - ListenableFutureTask futureTask = ListenableFutureTask.create(new Callable() { - @Override - public Integer call() throws Exception { - return 0; - } + CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { + return 0; }); - futureTask.addListener(mock(Runnable.class), mockedExecutor); - futureTask.run(); - - verify(mockedExecutor).execute(any(Runnable.class)); + completableFuture.thenRunAsync(mock(Runnable.class), verify(mockedExecutor)); } } \ No newline at end of file diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/AbstractMonitorFactory.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/AbstractMonitorFactory.java index 737e2e406c5..c49cb1be3c0 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/AbstractMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/AbstractMonitorFactory.java @@ -18,8 +18,6 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.concurrent.ListenableFuture; -import org.apache.dubbo.common.concurrent.ListenableFutureTask; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.NamedThreadFactory; @@ -30,7 +28,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -52,7 +50,7 @@ public abstract class AbstractMonitorFactory implements MonitorFactory { // monitor centers Map private static final Map MONITORS = new ConcurrentHashMap(); - private static final Map> FUTURES = new ConcurrentHashMap>(); + private static final Map> FUTURES = new ConcurrentHashMap>(); private static final ExecutorService executor = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new NamedThreadFactory("DubboMonitorCreator", true)); @@ -79,10 +77,9 @@ public Monitor getMonitor(URL url) { } final URL monitorUrl = url; - final ListenableFutureTask listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl)); - listenableFutureTask.addListener(new MonitorListener(key)); - executor.execute(listenableFutureTask); - FUTURES.put(key, listenableFutureTask); + final CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> AbstractMonitorFactory.this.createMonitor(monitorUrl)); + completableFuture.thenRunAsync(new MonitorListener(key), executor); + FUTURES.put(key, completableFuture); return null; } finally { @@ -93,20 +90,6 @@ public Monitor getMonitor(URL url) { protected abstract Monitor createMonitor(URL url); - class MonitorCreator implements Callable { - - private URL url; - - public MonitorCreator(URL url) { - this.url = url; - } - - @Override - public Monitor call() throws Exception { - Monitor monitor = AbstractMonitorFactory.this.createMonitor(url); - return monitor; - } - } class MonitorListener implements Runnable { @@ -119,8 +102,8 @@ public MonitorListener(String key) { @Override public void run() { try { - ListenableFuture listenableFuture = AbstractMonitorFactory.FUTURES.get(key); - AbstractMonitorFactory.MONITORS.put(key, listenableFuture.get()); + CompletableFuture completableFuture = AbstractMonitorFactory.FUTURES.get(key); + AbstractMonitorFactory.MONITORS.put(key, completableFuture.get()); AbstractMonitorFactory.FUTURES.remove(key); } catch (InterruptedException e) { logger.warn("Thread was interrupted unexpectedly, monitor will never be got."); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java index c83a0ad064f..17aa372da8d 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java @@ -16,19 +16,16 @@ */ package org.apache.dubbo.remoting.zookeeper.zkclient; -import org.apache.dubbo.common.concurrent.ListenableFutureTask; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.Assert; - import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkClient; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.Assert; import org.apache.zookeeper.Watcher.Event.KeeperState; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -38,35 +35,26 @@ * @date 2017/10/29 */ public class ZkClientWrapper { - Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class); - + private Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class); private long timeout; private ZkClient client; private volatile KeeperState state; - private ListenableFutureTask listenableFutureTask; + private CompletableFuture completableFuture; private volatile boolean started = false; - public ZkClientWrapper(final String serverAddr, long timeout) { this.timeout = timeout; - listenableFutureTask = ListenableFutureTask.create(new Callable() { - @Override - public ZkClient call() throws Exception { - return new ZkClient(serverAddr, Integer.MAX_VALUE); - } - }); + completableFuture = CompletableFuture.supplyAsync(() -> new ZkClient(serverAddr, Integer.MAX_VALUE)); } public void start() { if (!started) { - Thread connectThread = new Thread(listenableFutureTask); - connectThread.setName("DubboZkclientConnector"); - connectThread.setDaemon(true); - connectThread.start(); try { - client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS); + client = completableFuture.get(timeout, TimeUnit.MILLISECONDS); +// this.client.subscribeStateChanges(IZkStateListener); } catch (Throwable t) { logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t); + completableFuture.whenComplete(this::makeClientReady); } started = true; } else { @@ -74,24 +62,18 @@ public void start() { } } - public void addListener(final IZkStateListener listener) { - listenableFutureTask.addListener(new Runnable() { - @Override - public void run() { - try { - client = listenableFutureTask.get(); - client.subscribeStateChanges(listener); - } catch (InterruptedException e) { - logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!"); - } catch (ExecutionException e) { - logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e); - } + public void addListener(IZkStateListener listener) { + completableFuture.whenComplete((value, exception) -> { + this.makeClientReady(value, exception); + if (exception == null) { + client.subscribeStateChanges(listener); } }); } public boolean isConnected() { - return client != null && state == KeeperState.SyncConnected; +// return client != null && state == KeeperState.SyncConnected; + return client != null; } public void createPersistent(String path) { @@ -134,5 +116,14 @@ public void unsubscribeChildChanges(String path, IZkChildListener listener) { client.unsubscribeChildChanges(path, listener); } + private void makeClientReady(ZkClient client, Throwable e) { + if (e != null) { + logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e); + } else { + this.client = client; +// this.client.subscribeStateChanges(IZkStateListener); + } + } + }