Skip to content

Commit

Permalink
fix #1752 Make doOnTerminate for Mono same as doOnSuccess
Browse files Browse the repository at this point in the history
This means that the handler is invoked during processing of the `onNext`
signal.

This commit also slightly rework MonoPeekTerminal to make the onSuccess
and onError handlers canonical, instead of the (successOrError) one.

Finally, it makes `doOnError` directly use MonoPeekTerminal instead of
MonoPeek, and modifies the marble diagrams (adding a convention for the
null value / empty set).

Relates-to: #1853
  • Loading branch information
simonbasle committed Aug 27, 2019
1 parent fa86abe commit 8d29500
Show file tree
Hide file tree
Showing 10 changed files with 2,458 additions and 9,097 deletions.
10,929 changes: 2,002 additions & 8,927 deletions docs/svg/conventions.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
65 changes: 34 additions & 31 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -2018,7 +2018,7 @@ public final <X> Mono<X> dematerialize() {
* @return a new {@link Mono}
*/
public final Mono<T> doAfterSuccessOrError(BiConsumer<? super T, Throwable> afterSuccessOrError) {
return onAssembly(new MonoPeekTerminal<>(this, null, null, afterSuccessOrError));
return doOnTerminalSignal(this, null, null, afterSuccessOrError);
}

/**
Expand Down Expand Up @@ -2116,7 +2116,7 @@ public final Mono<T> doFinally(Consumer<SignalType> onFinally) {
*/
public final Mono<T> doOnCancel(Runnable onCancel) {
Objects.requireNonNull(onCancel, "onCancel");
return doOnSignal(this, null, null, null, null, null, onCancel);
return doOnSignal(this, null, null, null, onCancel);
}

/**
Expand Down Expand Up @@ -2158,7 +2158,7 @@ public final <R> Mono<T> doOnDiscard(final Class<R> type, final Consumer<? super
*/
public final Mono<T> doOnNext(Consumer<? super T> onNext) {
Objects.requireNonNull(onNext, "onNext");
return doOnSignal(this, null, onNext, null, null, null, null);
return doOnSignal(this, null, onNext, null, null);
}

/**
Expand All @@ -2180,7 +2180,7 @@ public final Mono<T> doOnNext(Consumer<? super T> onNext) {
*/
public final Mono<T> doOnSuccess(Consumer<? super T> onSuccess) {
Objects.requireNonNull(onSuccess, "onSuccess");
return onAssembly(new MonoPeekTerminal<>(this, onSuccess, null, null));
return doOnTerminalSignal(this, onSuccess, null, null);
}

/**
Expand Down Expand Up @@ -2222,7 +2222,7 @@ public final Mono<T> doOnEach(Consumer<? super Signal<T>> signalConsumer) {
*/
public final Mono<T> doOnError(Consumer<? super Throwable> onError) {
Objects.requireNonNull(onError, "onError");
return doOnSignal(this, null, null, onError, null, null, null);
return doOnTerminalSignal(this, null, onError, null);
}


Expand All @@ -2241,9 +2241,12 @@ public final Mono<T> doOnError(Consumer<? super Throwable> onError) {
public final <E extends Throwable> Mono<T> doOnError(Class<E> exceptionType,
final Consumer<? super E> onError) {
Objects.requireNonNull(exceptionType, "type");
@SuppressWarnings("unchecked")
Consumer<Throwable> handler = (Consumer<Throwable>)onError;
return doOnError(exceptionType::isInstance, handler);
Objects.requireNonNull(onError, "onError");
return doOnTerminalSignal(this, null,
error -> {
if (exceptionType.isInstance(error)) onError.accept(exceptionType.cast(error));
},
null);
}

/**
Expand All @@ -2260,11 +2263,12 @@ public final <E extends Throwable> Mono<T> doOnError(Class<E> exceptionType,
public final Mono<T> doOnError(Predicate<? super Throwable> predicate,
final Consumer<? super Throwable> onError) {
Objects.requireNonNull(predicate, "predicate");
return doOnError(t -> {
if (predicate.test(t)) {
onError.accept(t);
}
});
Objects.requireNonNull(onError, "onError");
return doOnTerminalSignal(this, null,
error -> {
if (predicate.test(error)) onError.accept(error);
},
null);
}
/**
* Add behavior triggering a {@link LongConsumer} when the {@link Mono} receives any request.
Expand All @@ -2281,7 +2285,7 @@ public final Mono<T> doOnError(Predicate<? super Throwable> predicate,
*/
public final Mono<T> doOnRequest(final LongConsumer consumer) {
Objects.requireNonNull(consumer, "consumer");
return doOnSignal(this, null, null, null, null, consumer, null);
return doOnSignal(this, null, null, consumer, null);
}

/**
Expand All @@ -2302,7 +2306,7 @@ public final Mono<T> doOnRequest(final LongConsumer consumer) {
*/
public final Mono<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
Objects.requireNonNull(onSubscribe, "onSubscribe");
return doOnSignal(this, onSubscribe, null, null, null, null, null);
return doOnSignal(this, onSubscribe, null, null, null);
}

/**
Expand All @@ -2323,11 +2327,14 @@ public final Mono<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
*/
public final Mono<T> doOnSuccessOrError(BiConsumer<? super T, Throwable> onSuccessOrError) {
Objects.requireNonNull(onSuccessOrError, "onSuccessOrError");
return onAssembly(new MonoPeekTerminal<>(this, null, onSuccessOrError, null));
return doOnTerminalSignal(this, v -> onSuccessOrError.accept(v, null), e -> onSuccessOrError.accept(null, e), null);
}

/**
* Add behavior triggered when the {@link Mono} terminates, either by completing successfully or with an error.
* Add behavior triggered when the {@link Mono} terminates, either by completing with a value,
* completing empty or completing with an error. Unlike in {@link Flux#doOnTerminate(Runnable)},
* the simple fact that a {@link Mono} emits {@link Subscriber#onNext(Object) onNext} implies
* completion, so the handler is invoked BEFORE the element is propagated (same as with {@link #doOnSuccess(Consumer)}).
*
* <p>
* <img class="marble" src="doc-files/marbles/doOnTerminateForMono.svg" alt="">
Expand All @@ -2338,13 +2345,7 @@ public final Mono<T> doOnSuccessOrError(BiConsumer<? super T, Throwable> onSucce
*/
public final Mono<T> doOnTerminate(Runnable onTerminate) {
Objects.requireNonNull(onTerminate, "onTerminate");
return doOnSignal(this,
null,
null,
e -> onTerminate.run(),
onTerminate,
null,
null);
return doOnTerminalSignal(this, ignoreValue -> onTerminate.run(), ignoreError -> onTerminate.run(), null);
}

/**
Expand Down Expand Up @@ -4614,30 +4615,32 @@ static <T> Mono<Void> empty(Publisher<T> source) {
return then;
}

@SuppressWarnings("unchecked")
static <T> Mono<T> doOnSignal(Mono<T> source,
@Nullable Consumer<? super Subscription> onSubscribe,
@Nullable Consumer<? super T> onNext,
@Nullable Consumer<? super Throwable> onError,
@Nullable Runnable onComplete,
@Nullable LongConsumer onRequest,
@Nullable Runnable onCancel) {
if (source instanceof Fuseable) {
return onAssembly(new MonoPeekFuseable<>(source,
onSubscribe,
onNext,
onError,
onComplete, onRequest,
onRequest,
onCancel));
}
return onAssembly(new MonoPeek<>(source,
onSubscribe,
onNext,
onError,
onComplete, onRequest,
onRequest,
onCancel));
}

static <T> Mono<T> doOnTerminalSignal(Mono<T> source,
@Nullable Consumer<? super T> onSuccess,
@Nullable Consumer<? super Throwable> onError,
@Nullable BiConsumer<? super T, Throwable> onAfterTerminate) {
return onAssembly(new MonoPeekTerminal<>(source, onSuccess, onError, onAfterTerminate));
}

@SuppressWarnings("unchecked")
static <T> BiPredicate<? super T, ? super T> equalsBiPredicate(){
return EQUALS_BIPREDICATE;
Expand Down
12 changes: 2 additions & 10 deletions reactor-core/src/main/java/reactor/core/publisher/MonoPeek.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,18 @@ final class MonoPeek<T> extends InternalMonoOperator<T, T> implements SignalPeek

final Consumer<? super T> onNextCall;

final Consumer<? super Throwable> onErrorCall;

final Runnable onCompleteCall;

final LongConsumer onRequestCall;

final Runnable onCancelCall;

MonoPeek(Mono<? extends T> source,
@Nullable Consumer<? super Subscription> onSubscribeCall,
@Nullable Consumer<? super T> onNextCall,
@Nullable Consumer<? super Throwable> onErrorCall,
@Nullable Runnable onCompleteCall,
@Nullable LongConsumer onRequestCall,
@Nullable Runnable onCancelCall) {
super(source);
this.onSubscribeCall = onSubscribeCall;
this.onNextCall = onNextCall;
this.onErrorCall = onErrorCall;
this.onCompleteCall = onCompleteCall;
this.onRequestCall = onRequestCall;
this.onCancelCall = onCancelCall;
}
Expand Down Expand Up @@ -85,13 +77,13 @@ public Consumer<? super T> onNextCall() {
@Override
@Nullable
public Consumer<? super Throwable> onErrorCall() {
return onErrorCall;
return null;
}

@Override
@Nullable
public Runnable onCompleteCall() {
return onCompleteCall;
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,19 @@ final class MonoPeekFuseable<T> extends InternalMonoOperator<T, T>

final Consumer<? super T> onNextCall;

final Consumer<? super Throwable> onErrorCall;

final Runnable onCompleteCall;

final LongConsumer onRequestCall;

final Runnable onCancelCall;

MonoPeekFuseable(Mono<? extends T> source,
@Nullable Consumer<? super Subscription> onSubscribeCall,
@Nullable Consumer<? super T> onNextCall,
@Nullable Consumer<? super Throwable> onErrorCall,
@Nullable Runnable onCompleteCall,
@Nullable LongConsumer onRequestCall,
@Nullable Runnable onCancelCall) {
super(source);

this.onSubscribeCall = onSubscribeCall;
this.onNextCall = onNextCall;
this.onErrorCall = onErrorCall;
this.onCompleteCall = onCompleteCall;
this.onRequestCall = onRequestCall;
this.onCancelCall = onCancelCall;
}
Expand Down Expand Up @@ -88,13 +80,13 @@ public Consumer<? super T> onNextCall() {
@Override
@Nullable
public Consumer<? super Throwable> onErrorCall() {
return onErrorCall;
return null;
}

@Override
@Nullable
public Runnable onCompleteCall() {
return onCompleteCall;
return null;
}

@Override
Expand Down
Loading

0 comments on commit 8d29500

Please sign in to comment.