Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take(n) now behaves as take(n,true)/limitRequest #2969

Merged
merged 8 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/asciidoc/apdx-operatorChoice.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ I want to deal with:

* I want to keep only a subset of the sequence:
** by taking N elements:
*** at the beginning of the sequence: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-boolean-[Flux#take(long, true)]
*** at the beginning of the sequence: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-[Flux#take(long)]
**** ...requesting an unbounded amount from upstream: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-boolean-[Flux#take(long, false)]
**** ...based on a duration: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-java.time.Duration-[Flux#take(Duration)]
**** ...only the first element, as a https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html[Mono]: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#next--[Flux#next()]
**** ...using https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Subscription.html#request(long)[request(N)] rather than cancellation: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#limitRequest-long-[Flux#limitRequest(long)]
*** at the end of the sequence: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeLast-int-[Flux#takeLast]
*** until a criteria is met (inclusive): https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeUntil-java.util.function.Predicate-[Flux#takeUntil] (predicate-based), https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeUntilOther-org.reactivestreams.Publisher-[Flux#takeUntilOther] (companion publisher-based)
*** while a criteria is met (exclusive): https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#takeWhile-java.util.function.Predicate-[Flux#takeWhile]
Expand Down
2 changes: 1 addition & 1 deletion docs/asciidoc/coreFeatures.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ Flux.just("foo", "bar")
<1> `doFinally` consumes a `SignalType` for the type of termination.
<2> Similarly to `finally` blocks, we always record the timing.
<3> Here we also increment statistics in case of cancellation only.
<4> `take(1)` cancels after one item is emitted.
<4> `take(1)` requests exactly 1 from upstream, and cancels after one item is emitted.
====

On the other hand, `using` handles the case where a `Flux` is derived from a
Expand Down
15 changes: 8 additions & 7 deletions docs/asciidoc/debugging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ This prints out the following (through the logger's console appender):
====
----
10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) <1>
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) <2>
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(3) <2>
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) <3>
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(3)
Expand All @@ -611,13 +611,14 @@ characters, the actual event gets printed. Here, we get an `onSubscribe` call, a
to the operator-specific implementation. Between square brackets, we get additional
information, including whether the operator can be automatically optimized through
synchronous or asynchronous fusion.
<2> On the second line, we can see that an unbounded request was propagated up from
downstream.
<2> On the second line, we can see that take limited the request to upstream to 3.
<3> Then the range sends three values in a row.
<4> On the last line, we see `cancel()`.
====

The last line, (4), is the most interesting. We can see the `take` in action there. It
operates by cutting the sequence short after it has seen enough elements emitted. In
short, `take()` causes the source to `cancel()` once it has emitted the user-requested
amount.
The second (2) and last lines (4) are the most interesting. We can see the `take` in action there.
It leverages backpressure in order to ask the source for exactly the expected amount of elements.
After having received enough elements, it tells the source no more items will be needed by calling `cancel()`.
Note that if downstream had itself used backpressure, eg. by requesting only 1 element,
the `take` operator would have honored that (it _caps_ the request when propagating it from downstream
to upstream).
44 changes: 22 additions & 22 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -6014,7 +6014,7 @@ public final Mono<T> last(T defaultValue) {
* Typically used for scenarios where consumer(s) request a large amount of data
* (eg. {@code Long.MAX_VALUE}) but the data source behaves better or can be optimized
* with smaller requests (eg. database paging, etc...). All data is still processed,
* unlike with {@link #limitRequest(long)} which will cap the grand total request
* unlike with {@link #take(long)} which will cap the grand total request
* amount.
* <p>
* Equivalent to {@code flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() }.
Expand All @@ -6028,7 +6028,7 @@ public final Mono<T> last(T defaultValue) {
*
* @return a {@link Flux} limiting downstream's backpressure
* @see #publishOn(Scheduler, int)
* @see #limitRequest(long)
* @see #take(long)
*/
public final Flux<T> limitRate(int prefetchRate) {
return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
Expand All @@ -6046,7 +6046,7 @@ public final Flux<T> limitRate(int prefetchRate) {
* Typically used for scenarios where consumer(s) request a large amount of data
* (eg. {@code Long.MAX_VALUE}) but the data source behaves better or can be optimized
* with smaller requests (eg. database paging, etc...). All data is still processed,
* unlike with {@link #limitRequest(long)} which will cap the grand total request
* unlike with {@link #take(long)} which will cap the grand total request
* amount.
* <p>
* Similar to {@code flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() },
Expand All @@ -6070,7 +6070,7 @@ public final Flux<T> limitRate(int prefetchRate) {
* @return a {@link Flux} limiting downstream's backpressure and customizing the
* replenishment request amount
* @see #publishOn(Scheduler, int)
* @see #limitRequest(long)
* @see #take(long)
*/
public final Flux<T> limitRate(int highTide, int lowTide) {
return onAssembly(this.publishOn(Schedulers.immediate(), true, highTide, lowTide));
Expand Down Expand Up @@ -8823,27 +8823,28 @@ public final Flux<T> tag(String key, String value) {

/**
* Take only the first N values from this {@link Flux}, if available.
* If n is zero, the source is subscribed to but immediately cancelled, then the operator completes.
* If n is zero, the source isn't even subscribed to and the operator completes immediately upon subscription.
* <p>
* <img class="marble" src="doc-files/marbles/take.svg" alt="">
* <img class="marble" src="doc-files/marbles/takeLimitRequestTrue.svg" alt="">
* <p>
* <b>Warning:</b> The below behavior will change in 3.5.0 from that of
* {@link #take(long, boolean) take(n, false)} to that of {@link #take(long, boolean) take(n, true)}.
* See https://github.com/reactor/reactor-core/issues/2339
* This ensures that the total amount requested upstream is capped at {@code n}, although smaller
* requests can be made if the downstream makes requests &lt; n. In any case, this operator never lets
* the upstream produce more elements than the cap, and it can be used to more strictly adhere to backpressure.
* <p>
* Note that this operator doesn't propagate the backpressure requested amount.
* Rather, it makes an unbounded request and cancels once N elements have been emitted.
* As a result, the source could produce a lot of extraneous elements in the meantime.
* If that behavior is undesirable and you do not own the request from downstream
* (e.g. prefetching operators), consider using {@link #limitRequest(long)} instead.
*
* @param n the number of items to emit from this {@link Flux}
* This mode is typically useful for cases where a race between request and cancellation can lead
* the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g.
* a source that would send the extraneous data over the network).
* It is equivalent to {@link #take(long, boolean)} with {@code limitRequest == true},
* If there is a requirement for unbounded upstream request (eg. for performance reasons),
* use {@link #take(long, boolean)} with {@code limitRequest=false} instead.
*
* @param n the maximum number of items to request from upstream and emit from this {@link Flux}
*
* @return a {@link Flux} limited to size N
* @see #take(long, boolean)
*/
public final Flux<T> take(long n) {
return take(n, false);
return take(n, true);
}

/**
Expand All @@ -8855,7 +8856,7 @@ public final Flux<T> take(long n) {
* at {@code n}. In that configuration, this operator never let the upstream produce more elements
* than the cap, and it can be used to more strictly adhere to backpressure.
* If n is zero, the source isn't even subscribed to and the operator completes immediately
* upon subscription.
* upon subscription (the behavior inherited from {@link #take(long)}).
* <p>
* This mode is typically useful for cases where a race between request and cancellation can lead
* the upstream to producing a lot of extraneous data, and such a production is undesirable (e.g.
Expand All @@ -8865,8 +8866,7 @@ public final Flux<T> take(long n) {
* <p>
* If {@code limitRequest == false} this operator doesn't propagate the backpressure requested amount.
* Rather, it makes an unbounded request and cancels once N elements have been emitted.
* If n is zero, the source is subscribed to but immediately cancelled, then the operator completes
* (the behavior inherited from {@link #take(long)}).
* If n is zero, the source is subscribed to but immediately cancelled, then the operator completes.
* <p>
* In this mode, the source could produce a lot of extraneous elements despite cancellation.
* If that behavior is undesirable and you do not own the request from downstream
Expand Down Expand Up @@ -8927,7 +8927,7 @@ public final Flux<T> take(Duration timespan, Scheduler timer) {
return takeUntilOther(Mono.delay(timespan, timer));
}
else {
return take(0);
return take(0, false);
}
}

Expand Down
10 changes: 6 additions & 4 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -4057,9 +4057,11 @@ public final Mono<T> repeatWhenEmpty(int maxRepeat, Function<Flux<Long>, ? exten
return repeatFactory.apply(o.index().map(Tuple2::getT1));
}
else {
return repeatFactory.apply(o.index().map(Tuple2::getT1)
.take(maxRepeat)
.concatWith(Flux.error(() -> new IllegalStateException("Exceeded maximum number of repeats"))));
return repeatFactory.apply(o
.index()
.map(Tuple2::getT1)
.take(maxRepeat, false)
.concatWith(Flux.error(() -> new IllegalStateException("Exceeded maximum number of repeats"))));
}
}).next());
}
Expand Down
Loading