Skip to content

Commit

Permalink
fix #1924 Discard elements of Collection in collect(), cover more cases
Browse files Browse the repository at this point in the history
 - Flux.collect(Supplier, BiConsumer) now detects if the
 container is of type Collection, in which case it discards each element
 in the collection when appropriate
 - the operator also now additionally discard the element T along the
 container when the collector function fails
 - Flux.collect(Collector) now also discards elements in a similar
 fashion to Flux.collect(Supplier, BiConsumer). The accumulator failure
 case and finisher Function failure case are taken into account. The
 discarding of individual elements inside a Collection is applied to
 the intermediate container, not the result of the finisher()

Behavior change: if you extend Operators.MonoSubscriber, keep the
following in mind: `discard(O v)` no longer nulls out the `this.value`
field. Calling methods now need to do that themselves instead.
  • Loading branch information
simonbasle authored Dec 4, 2019
1 parent f2b940c commit a9eb446
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 54 deletions.
39 changes: 29 additions & 10 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -3121,7 +3121,10 @@ public final Flux<T> checkpoint(@Nullable String description, boolean forceStack
* @param containerSupplier the supplier of the container instance for each Subscriber
* @param collector a consumer of both the container instance and the value being currently collected
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator discards the container upon cancellation or error triggered by a data signal.
* Either the container type is a {@link Collection} (in which case individual elements are discarded)
* or not (in which case the entire container is discarded). In case the collector {@link BiConsumer} fails
* to accumulate an element, the container is discarded as above and the triggering element is also discarded.
*
* @return a {@link Mono} of the collected container on complete
*
Expand All @@ -3142,6 +3145,13 @@ public final <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E, ?
* @param <A> The mutable accumulation type
* @param <R> the container type
*
* @reactor.discard This operator discards the intermediate container (see {@link Collector#supplier()} upon
* cancellation, error or exception while applying the {@link Collector#finisher()}. Either the container type
* is a {@link Collection} (in which case individual elements are discarded) or not (in which case the entire
* container is discarded). In case the accumulator {@link BiConsumer} of the collector fails to accumulate
* an element into the intermediate container, the container is discarded as above and the triggering element
* is also discarded.
*
* @return a {@link Mono} of the collected container on complete
*
*/
Expand All @@ -3156,7 +3166,8 @@ public final <R, A> Mono<R> collect(Collector<? super T, A, ? extends R> collect
* <p>
* <img class="marble" src="doc-files/marbles/collectList.svg" alt="">
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator discards the elements in the {@link List} upon
* cancellation or error triggered by a data signal.
*
* @return a {@link Mono} of a {@link List} of all values from this {@link Flux}
*/
Expand Down Expand Up @@ -3204,7 +3215,8 @@ public final Mono<List<T>> collectList() {
* <p>
* <img class="marble" src="doc-files/marbles/collectMapWithKeyExtractor.svg" alt="">
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator discards the whole {@link Map} upon cancellation or error
* triggered by a data signal, so discard handlers will have to unpack the map.
*
* @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
* @param <K> the type of the key extracted from each source element
Expand All @@ -3227,7 +3239,8 @@ public final <K> Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> key
* <p>
* <img class="marble" src="doc-files/marbles/collectMapWithKeyAndValueExtractors.svg" alt="">
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator discards the whole {@link Map} upon cancellation or error
* triggered by a data signal, so discard handlers will have to unpack the map.
*
* @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
* @param valueExtractor a {@link Function} to map elements to a value for the {@link Map}
Expand All @@ -3254,7 +3267,8 @@ public final <K, V> Mono<Map<K, V>> collectMap(Function<? super T, ? extends K>
* <p>
* <img class="marble" src="doc-files/marbles/collectMapWithKeyAndValueExtractors.svg" alt="">
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator discards the whole {@link Map} upon cancellation or error
* triggered by a data signal, so discard handlers will have to unpack the map.
*
* @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
* @param valueExtractor a {@link Function} to map elements to a value for the {@link Map}
Expand Down Expand Up @@ -3286,7 +3300,8 @@ public final <K, V> Mono<Map<K, V>> collectMap(
* <p>
* <img class="marble" src="doc-files/marbles/collectMultiMapWithKeyExtractor.svg" alt="">
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator discards the whole {@link Map} upon cancellation or error
* triggered by a data signal, so discard handlers will have to unpack the list values in the map.
*
* @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
*
Expand All @@ -3308,7 +3323,8 @@ public final <K> Mono<Map<K, Collection<T>>> collectMultimap(Function<? super T,
* <p>
* <img class="marble" src="doc-files/marbles/collectMultiMapWithKeyAndValueExtractors.svg" alt="">
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator discards the whole {@link Map} upon cancellation or error
* triggered by a data signal, so discard handlers will have to unpack the list values in the map.
*
* @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
* @param valueExtractor a {@link Function} to map elements to a value for the {@link Map}
Expand All @@ -3334,7 +3350,8 @@ public final <K, V> Mono<Map<K, Collection<V>>> collectMultimap(Function<? super
* <p>
* <img class="marble" src="doc-files/marbles/collectMultiMapWithKeyAndValueExtractors.svg" alt="">
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator discards the whole {@link Map} upon cancellation or error
* triggered by a data signal, so discard handlers will have to unpack the list values in the map.
*
* @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
* @param valueExtractor a {@link Function} to map elements to a value for the {@link Map}
Expand Down Expand Up @@ -3369,7 +3386,8 @@ public final <K, V> Mono<Map<K, Collection<V>>> collectMultimap(
* <p>
* <img class="marble" src="doc-files/marbles/collectSortedList.svg" alt="">
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator is based on {@link #collectList()}, and as such discards the
* elements in the {@link List} individually upon cancellation or error triggered by a data signal.
*
* @return a {@link Mono} of a sorted {@link List} of all values from this {@link Flux}, in natural order
*/
Expand All @@ -3385,7 +3403,8 @@ public final Mono<List<T>> collectSortedList() {
* <p>
* <img class="marble" src="doc-files/marbles/collectSortedListWithComparator.svg" alt="">
*
* @reactor.discard This operator discards the buffer upon cancellation or error triggered by a data signal.
* @reactor.discard This operator is based on {@link #collectList()}, and as such discards the
* elements in the {@link List} individually upon cancellation or error triggered by a data signal.
*
* @param comparator a {@link Comparator} to sort the items of this sequences
*
Expand Down
19 changes: 17 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.core.publisher;

import java.util.Collection;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand All @@ -24,6 +25,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Collects the values of the source sequence into a container returned by
Expand Down Expand Up @@ -97,6 +99,17 @@ public void cancel() {
s.cancel();
}

@Override
protected void discard(R v) {
if (v instanceof Collection) {
Collection<?> c = (Collection<?>) v;
Operators.onDiscardMultiple(c, actual.currentContext());
}
else {
super.discard(v);
}
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
Expand All @@ -119,7 +132,9 @@ public void onNext(T t) {
action.accept(value, t);
}
catch (Throwable e) {
onError(Operators.onOperatorError(this, e, t, actual.currentContext()));
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(this, e, t, ctx));
}
}

Expand All @@ -131,7 +146,7 @@ public void onError(Throwable t) {
}
done = true;
R v = value;
Operators.onDiscard(v, currentContext());
discard(v);
value = null;
actual.onError(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void cancel() {
}
if (l != null) {
s.cancel();
Operators.onDiscardMultiple(l, actual.currentContext());
discard(l);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.core.publisher;

import java.util.Collection;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand All @@ -25,6 +26,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Collects the values from the source sequence into a {@link java.util.stream.Collector}
Expand Down Expand Up @@ -79,7 +81,7 @@ static final class StreamCollectorSubscriber<T, A, R>

final Function<? super A, ? extends R> finisher;

A container;
A container; //not final to be able to null it out on termination

Subscription s;

Expand All @@ -104,6 +106,17 @@ public Object scanUnsafe(Attr key) {
return super.scanUnsafe(key);
}

protected void discardIntermediateContainer(A a) {
Context ctx = actual.currentContext();
if (a instanceof Collection) {
Operators.onDiscardMultiple((Collection<?>) a, ctx);
}
else {
Operators.onDiscard(a, ctx);
}
}
//NB: value and thus discard are not used

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
Expand All @@ -125,7 +138,9 @@ public void onNext(T t) {
accumulator.accept(container, t);
}
catch (Throwable ex) {
onError(Operators.onOperatorError(s, ex, t, actual.currentContext()));
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(s, ex, t, ctx)); //discards intermediate container
}
}

Expand All @@ -136,6 +151,7 @@ public void onError(Throwable t) {
return;
}
done = true;
discardIntermediateContainer(container);
container = null;
actual.onError(t);
}
Expand All @@ -156,6 +172,7 @@ public void onComplete() {
r = finisher.apply(a);
}
catch (Throwable ex) {
discardIntermediateContainer(a);
actual.onError(Operators.onOperatorError(ex, actual.currentContext()));
return;
}
Expand All @@ -167,6 +184,8 @@ public void onComplete() {
public void cancel() {
super.cancel();
s.cancel();
discardIntermediateContainer(container);
container = null;
}
}
}
12 changes: 10 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -1496,8 +1496,9 @@ public MonoSubscriber(CoreSubscriber<? super O> actual) {

@Override
public void cancel() {
O v = value;
if (STATE.getAndSet(this, CANCELLED) <= HAS_REQUEST_NO_VALUE) {
Operators.onDiscard(value, currentContext());
discard(v);
}
value = null;
}
Expand Down Expand Up @@ -1543,6 +1544,7 @@ public final void complete(O v) {

// if state is >= HAS_CANCELLED or bit zero is set (*_HAS_VALUE) case, return
if ((state & ~HAS_REQUEST_NO_VALUE) != 0) {
this.value = null;
discard(v);
return;
}
Expand All @@ -1561,8 +1563,14 @@ public final void complete(O v) {
}
}

/**
* Discard the given value, generally this.value field. Lets derived subscriber with further knowledge about
* the possible types of the value discard such values in a specific way. Note that fields should generally be
* nulled out along the discard call.
*
* @param v the value to discard
*/
protected void discard(O v) {
this.value = null;
Operators.onDiscard(v, actual.currentContext());
}

Expand Down
Loading

0 comments on commit a9eb446

Please sign in to comment.