Skip to content

Commit

Permalink
MoreFutures 支持 transformAsync (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
ymwangzq authored and PhantomThief committed Jan 6, 2020
1 parent e64335b commit 4bb156b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.github.phantomthief.util.ThrowableFunction;
import com.github.phantomthief.util.ThrowableRunnable;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -315,6 +316,22 @@ public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
}
}

public static <I, O> ListenableFuture<O> transformAsync(ListenableFuture<I> input,
AsyncFunction<? super I, ? extends O> function,
Executor executor) {
ListenableFuture<O> result = Futures.transformAsync(input, function, executor);
if (input instanceof TimeoutListenableFuture) {
TimeoutListenableFuture<O> newResult = new TimeoutListenableFuture<>(result);
for (ThrowableConsumer<TimeoutException, Exception> timeoutListener : ((TimeoutListenableFuture<I>) input)
.getTimeoutListeners()) {
newResult.addTimeoutListener(timeoutListener);
}
return newResult;
} else {
return result;
}
}

public interface Scheduled {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

/**
* @author w.vela
Expand Down Expand Up @@ -45,4 +46,26 @@ void test() throws ExecutionException, InterruptedException {
assertTrue(timeout[0]);
assertEquals("test!", timeout2.get());
}

@Test
void testTransformAsync() throws ExecutionException, InterruptedException {
ListenableFuture<String> orig = executor.submit(() -> {
sleepUninterruptibly(1, SECONDS);
return "test";
});
boolean[] timeout = {false};
ListenableFuture<String> timeout1 = new TimeoutListenableFuture<>(orig)
.addTimeoutListener(e -> timeout[0] = true);
assertThrows(TimeoutException.class, () -> timeout1.get(1, MILLISECONDS));
assertTrue(timeout[0]);


timeout[0] = false;
ListenableFuture<String> timeout2 = MoreFutures
.transformAsync(timeout1, it -> MoreExecutors.listeningDecorator(executor).submit(() -> it + "!"),
directExecutor());
assertThrows(TimeoutException.class, () -> timeout2.get(1, MILLISECONDS));
assertTrue(timeout[0]);
assertEquals("test!", timeout2.get());
}
}

0 comments on commit 4bb156b

Please sign in to comment.