diff --git a/core/src/main/java/com/github/phantomthief/concurrent/MoreFutures.java b/core/src/main/java/com/github/phantomthief/concurrent/MoreFutures.java index 13e08b5..f98add5 100644 --- a/core/src/main/java/com/github/phantomthief/concurrent/MoreFutures.java +++ b/core/src/main/java/com/github/phantomthief/concurrent/MoreFutures.java @@ -33,6 +33,7 @@ import com.github.phantomthief.util.ThrowableFunction; import com.github.phantomthief.util.ThrowableRunnable; import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.ExecutionError; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -315,6 +316,22 @@ public static ListenableFuture transform(ListenableFuture input, } } + public static ListenableFuture transformAsync(ListenableFuture input, + AsyncFunction function, + Executor executor) { + ListenableFuture result = Futures.transformAsync(input, function, executor); + if (input instanceof TimeoutListenableFuture) { + TimeoutListenableFuture newResult = new TimeoutListenableFuture<>(result); + for (ThrowableConsumer timeoutListener : ((TimeoutListenableFuture) input) + .getTimeoutListeners()) { + newResult.addTimeoutListener(timeoutListener); + } + return newResult; + } else { + return result; + } + } + public interface Scheduled { /** diff --git a/core/src/test/java/com/github/phantomthief/concurrent/MoreFuturesTest2.java b/core/src/test/java/com/github/phantomthief/concurrent/MoreFuturesTest2.java index f2af28f..106b354 100644 --- a/core/src/test/java/com/github/phantomthief/concurrent/MoreFuturesTest2.java +++ b/core/src/test/java/com/github/phantomthief/concurrent/MoreFuturesTest2.java @@ -17,6 +17,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * @author w.vela @@ -45,4 +46,26 @@ void test() throws ExecutionException, InterruptedException { assertTrue(timeout[0]); assertEquals("test!", timeout2.get()); } + + @Test + void testTransformAsync() throws ExecutionException, InterruptedException { + ListenableFuture orig = executor.submit(() -> { + sleepUninterruptibly(1, SECONDS); + return "test"; + }); + boolean[] timeout = {false}; + ListenableFuture timeout1 = new TimeoutListenableFuture<>(orig) + .addTimeoutListener(e -> timeout[0] = true); + assertThrows(TimeoutException.class, () -> timeout1.get(1, MILLISECONDS)); + assertTrue(timeout[0]); + + + timeout[0] = false; + ListenableFuture timeout2 = MoreFutures + .transformAsync(timeout1, it -> MoreExecutors.listeningDecorator(executor).submit(() -> it + "!"), + directExecutor()); + assertThrows(TimeoutException.class, () -> timeout2.get(1, MILLISECONDS)); + assertTrue(timeout[0]); + assertEquals("test!", timeout2.get()); + } }