Skip to content

Commit

Permalink
fix reactor#1131 Add Mono.fromFuture/fromCompletionStage Supplier var…
Browse files Browse the repository at this point in the history
…iants
  • Loading branch information
simonbasle authored Apr 3, 2018
1 parent d734ad8 commit 668330f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
33 changes: 33 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,22 @@ public static <T> Mono<T> fromCompletionStage(CompletionStage<? extends T> compl
return onAssembly(new MonoCompletionStage<>(completionStage));
}

/**
* Create a {@link Mono} that wraps a {@link CompletionStage} on subscription,
* emitting the value produced by the {@link CompletionStage}.
*
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/fromfuture.png" alt="">
* <p>
* @param stageSupplier The {@link Supplier} of a {@link CompletionStage} that will produce a value (or a null to
* complete immediately). This allows lazy triggering of CompletionStage-based APIs.
* @param <T> type of the expected value
* @return A {@link Mono}.
*/
public static <T> Mono<T> fromCompletionStage(Supplier<? extends CompletionStage<? extends T>> stageSupplier) {
return defer(() -> onAssembly(new MonoCompletionStage<>(stageSupplier.get())));
}

/**
* Convert a {@link Publisher} to a {@link Mono} without any cardinality check
* (ie this method doesn't check if the source is already a Mono, nor cancels the
Expand Down Expand Up @@ -404,6 +420,23 @@ public static <T> Mono<T> fromFuture(CompletableFuture<? extends T> future) {
return onAssembly(new MonoCompletionStage<>(future));
}

/**
* Create a {@link Mono} that wraps a {@link CompletableFuture} on subscription,
* emitting the value produced by the Future.
*
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/fromfuture.png" alt="">
* <p>
* @param futureSupplier The {@link Supplier} of a {@link CompletableFuture} that will produce a value (or a null to
* complete immediately). This allows lazy triggering of future-based APIs.
* @param <T> type of the expected value
* @return A {@link Mono}.
* @see #fromCompletionStage(Supplier) fromCompletionStage for a generalization
*/
public static <T> Mono<T> fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier) {
return defer(() -> onAssembly(new MonoCompletionStage<>(futureSupplier.get())));
}

/**
* Create a {@link Mono} that completes empty once the provided {@link Runnable} has
* been executed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -202,4 +207,40 @@ public void testMonoAndFunctionRightSideEmpty() {
.expectComplete()
.verify();
}

@Test
public void fromFutureSupplier() {
AtomicInteger source = new AtomicInteger();

Supplier<CompletableFuture<Integer>> supplier = () -> CompletableFuture.completedFuture(source.incrementAndGet());
Mono<Number> mono = Mono.fromFuture(supplier);

Assertions.assertThat(source).hasValue(0);

Assertions.assertThat(mono.block())
.isEqualTo(source.get())
.isEqualTo(1);

Assertions.assertThat(mono.block())
.isEqualTo(source.get())
.isEqualTo(2);
}

@Test
public void fromCompletionStageSupplier() {
AtomicInteger source = new AtomicInteger();

Supplier<CompletableFuture<Integer>> supplier = () -> CompletableFuture.completedFuture(source.incrementAndGet());
Mono<Number> mono = Mono.fromCompletionStage(supplier);

Assertions.assertThat(source).hasValue(0);

Assertions.assertThat(mono.block())
.isEqualTo(source.get())
.isEqualTo(1);

Assertions.assertThat(mono.block())
.isEqualTo(source.get())
.isEqualTo(2);
}
}

0 comments on commit 668330f

Please sign in to comment.