Skip to content

Commit

Permalink
Take(n) now behaves as take(n,true)/limitRequest (reactor#2969)
Browse files Browse the repository at this point in the history
This commit changes the behavior of `take(long)` to limit the request
made to upstream like the now deprecated operator `limitRequest`,
as documented in the deprecation notice.

Most usages of `take(long)` in the codebase have been switched to use
`take(long, false)` instead to keep relying on the old behavior. All
usages of `limitRequest(n)` have been turned into `take(n)`.

Finally, the javadocs, marble diagrams and reference guide have been
updated to reflect the new behavior.

Fixes reactor#2690.
  • Loading branch information
simonbasle authored Mar 17, 2022
1 parent bf63f7f commit 96bb61f
Show file tree
Hide file tree
Showing 64 changed files with 313 additions and 474 deletions.
3 changes: 1 addition & 2 deletions docs/asciidoc/apdx-operatorChoice.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ I want to deal with:

* I want to keep only a subset of the sequence:
** by taking N elements:
*** at the beginning of the sequence: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-boolean-[Flux#take(long, true)]
*** at the beginning of the sequence: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-[Flux#take(long)]
**** ...requesting an unbounded amount from upstream: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-boolean-[Flux#take(long, false)]
**** ...based on a duration: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-java.time.Duration-[Flux#take(Duration)]
**** ...only the first element, as a https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html[Mono]: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#next--[Flux#next()]
**** ...using https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Subscription.html#request(long)[request(N)] rather than cancellation: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#limitRequest-long-[Flux#limitRequest(long)]
*** at the end of the sequence: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeLast-int-[Flux#takeLast]
*** until a criteria is met (inclusive): https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeUntil-java.util.function.Predicate-[Flux#takeUntil] (predicate-based), https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeUntilOther-org.reactivestreams.Publisher-[Flux#takeUntilOther] (companion publisher-based)
*** while a criteria is met (exclusive): https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeWhile-java.util.function.Predicate-[Flux#takeWhile]
Expand Down
2 changes: 1 addition & 1 deletion docs/asciidoc/coreFeatures.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ Flux.just("foo", "bar")
<1> `doFinally` consumes a `SignalType` for the type of termination.
<2> Similarly to `finally` blocks, we always record the timing.
<3> Here we also increment statistics in case of cancellation only.
<4> `take(1)` cancels after one item is emitted.
<4> `take(1)` requests exactly 1 from upstream, and cancels after one item is emitted.
====

On the other hand, `using` handles the case where a `Flux` is derived from a
Expand Down
15 changes: 8 additions & 7 deletions docs/asciidoc/debugging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ This prints out the following (through the logger's console appender):
====
----
10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) <1>
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) <2>
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(3) <2>
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) <3>
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(3)
Expand All @@ -611,13 +611,14 @@ characters, the actual event gets printed. Here, we get an `onSubscribe` call, a
to the operator-specific implementation. Between square brackets, we get additional
information, including whether the operator can be automatically optimized through
synchronous or asynchronous fusion.
<2> On the second line, we can see that an unbounded request was propagated up from
downstream.
<2> On the second line, we can see that take limited the request to upstream to 3.
<3> Then the range sends three values in a row.
<4> On the last line, we see `cancel()`.
====

The last line, (4), is the most interesting. We can see the `take` in action there. It
operates by cutting the sequence short after it has seen enough elements emitted. In
short, `take()` causes the source to `cancel()` once it has emitted the user-requested
amount.
The second (2) and last lines (4) are the most interesting. We can see the `take` in action there.
It leverages backpressure in order to ask the source for exactly the expected amount of elements.
After having received enough elements, it tells the source no more items will be needed by calling `cancel()`.
Note that if downstream had itself used backpressure, eg. by requesting only 1 element,
the `take` operator would have honored that (it _caps_ the request when propagating it from downstream
to upstream).
42 changes: 21 additions & 21 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -6001,7 +6001,7 @@ public final Mono<T> last(T defaultValue) {
* Typically used for scenarios where consumer(s) request a large amount of data
* (eg. {@code Long.MAX_VALUE}) but the data source behaves better or can be optimized
* with smaller requests (eg. database paging, etc...). All data is still processed,
* unlike with {@link #limitRequest(long)} which will cap the grand total request
* unlike with {@link #take(long)} which will cap the grand total request
* amount.
* <p>
* Equivalent to {@code flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() }.
Expand All @@ -6015,7 +6015,7 @@ public final Mono<T> last(T defaultValue) {
*
* @return a {@link Flux} limiting downstream's backpressure
* @see #publishOn(Scheduler, int)
* @see #limitRequest(long)
* @see #take(long)
*/
public final Flux<T> limitRate(int prefetchRate) {
return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
Expand All @@ -6033,7 +6033,7 @@ public final Flux<T> limitRate(int prefetchRate) {
* Typically used for scenarios where consumer(s) request a large amount of data
* (eg. {@code Long.MAX_VALUE}) but the data source behaves better or can be optimized
* with smaller requests (eg. database paging, etc...). All data is still processed,
* unlike with {@link #limitRequest(long)} which will cap the grand total request
* unlike with {@link #take(long)} which will cap the grand total request
* amount.
* <p>
* Similar to {@code flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() },
Expand All @@ -6057,7 +6057,7 @@ public final Flux<T> limitRate(int prefetchRate) {
* @return a {@link Flux} limiting downstream's backpressure and customizing the
* replenishment request amount
* @see #publishOn(Scheduler, int)
* @see #limitRequest(long)
* @see #take(long)
*/
public final Flux<T> limitRate(int highTide, int lowTide) {
return onAssembly(this.publishOn(Schedulers.immediate(), true, highTide, lowTide));
Expand Down Expand Up @@ -8764,27 +8764,28 @@ public final Flux<T> tag(String key, String value) {

/**
* Take only the first N values from this {@link Flux}, if available.
* If n is zero, the source is subscribed to but immediately cancelled, then the operator completes.
* If n is zero, the source isn't even subscribed to and the operator completes immediately upon subscription.
* <p>
* <img class="marble" src="doc-files/marbles/take.svg" alt="">
* <img class="marble" src="doc-files/marbles/takeLimitRequestTrue.svg" alt="">
* <p>
* <b>Warning:</b> The below behavior will change in 3.5.0 from that of
* {@link #take(long, boolean) take(n, false)} to that of {@link #take(long, boolean) take(n, true)}.
* See https://github.com/reactor/reactor-core/issues/2339
* This ensures that the total amount requested upstream is capped at {@code n}, although smaller
* requests can be made if the downstream makes requests &lt; n. In any case, this operator never lets
* the upstream produce more elements than the cap, and it can be used to more strictly adhere to backpressure.
* <p>
* Note that this operator doesn't propagate the backpressure requested amount.
* Rather, it makes an unbounded request and cancels once N elements have been emitted.
* As a result, the source could produce a lot of extraneous elements in the meantime.
* If that behavior is undesirable and you do not own the request from downstream
* (e.g. prefetching operators), consider using {@link #limitRequest(long)} instead.
*
* @param n the number of items to emit from this {@link Flux}
* This mode is typically useful for cases where a race between request and cancellation can lead
* the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g.
* a source that would send the extraneous data over the network).
* It is equivalent to {@link #take(long, boolean)} with {@code limitRequest == true},
* If there is a requirement for unbounded upstream request (eg. for performance reasons),
* use {@link #take(long, boolean)} with {@code limitRequest=false} instead.
*
* @param n the maximum number of items to request from upstream and emit from this {@link Flux}
*
* @return a {@link Flux} limited to size N
* @see #take(long, boolean)
*/
public final Flux<T> take(long n) {
return take(n, false);
return take(n, true);
}

/**
Expand All @@ -8796,7 +8797,7 @@ public final Flux<T> take(long n) {
* at {@code n}. In that configuration, this operator never let the upstream produce more elements
* than the cap, and it can be used to more strictly adhere to backpressure.
* If n is zero, the source isn't even subscribed to and the operator completes immediately
* upon subscription.
* upon subscription (the behavior inherited from {@link #take(long)}).
* <p>
* This mode is typically useful for cases where a race between request and cancellation can lead
* the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g.
Expand All @@ -8806,8 +8807,7 @@ public final Flux<T> take(long n) {
* <p>
* If {@code limitRequest == false} this operator doesn't propagate the backpressure requested amount.
* Rather, it makes an unbounded request and cancels once N elements have been emitted.
* If n is zero, the source is subscribed to but immediately cancelled, then the operator completes
* (the behavior inherited from {@link #take(long)}).
* If n is zero, the source is subscribed to but immediately cancelled, then the operator completes.
* <p>
* In this mode, the source could produce a lot of extraneous elements despite cancellation.
* If that behavior is undesirable and you do not own the request from downstream
Expand Down Expand Up @@ -8868,7 +8868,7 @@ public final Flux<T> take(Duration timespan, Scheduler timer) {
return takeUntilOther(Mono.delay(timespan, timer));
}
else {
return take(0);
return take(0, false);
}
}

Expand Down
8 changes: 5 additions & 3 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -3973,9 +3973,11 @@ public final Mono<T> repeatWhenEmpty(int maxRepeat, Function<Flux<Long>, ? exten
return repeatFactory.apply(o.index().map(Tuple2::getT1));
}
else {
return repeatFactory.apply(o.index().map(Tuple2::getT1)
.take(maxRepeat)
.concatWith(Flux.error(() -> new IllegalStateException("Exceeded maximum number of repeats"))));
return repeatFactory.apply(o
.index()
.map(Tuple2::getT1)
.take(maxRepeat, false)
.concatWith(Flux.error(() -> new IllegalStateException("Exceeded maximum number of repeats"))));
}
}).next());
}
Expand Down
Loading

0 comments on commit 96bb61f

Please sign in to comment.