Skip to content

Commit

Permalink
Prepare work on 3.2.0, integrate error mode (for MILESTONE 1) (reacto…
Browse files Browse the repository at this point in the history
…r#1021)

See reactor#629 and reactor#840

This commit adds an error strategy concept to some of the operators
that let `Flux` and `Mono` recover from an error mid-stream: the
`continue` strategy.

Such errors are dropped into a special hook, and the sequence can
continue with the next value from the upstream.

Operators that support this mode have a special tag in their javadoc.
These include `map`, `filter`, `flatMap`, `handle`, `doOnNext`.

The error mode is activated using `Context`, so it follows the same
propagation semantics. The API is on `Flux`:

  - `errorStrategyContinue(...)` will activate the continue strategy for
  that particular `Flux`, upstream of it.
  - `errorStrategyStop()` let you go back to the default RS behavior,
  eg. in case you don't want the continue strategy to apply within a
  `flatMap`.
  • Loading branch information
simonbasle committed Feb 16, 2018
1 parent 9da9e50 commit 3206bae
Show file tree
Hide file tree
Showing 26 changed files with 3,505 additions and 198 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ project('reactor-core') {
options.stylesheetFile = file("$rootDir/src/api/stylesheet.css")
options.links(rootProject.ext.javadocLinks )
options.tags = [ "apiNote:a:API Note:", "implSpec:a:Implementation Requirements:",
"implNote:a:Implementation Note:" ]
"implNote:a:Implementation Note:", "reactor.errorMode:m:Error Mode Support" ]

// In Java 9, javadoc generator will complain if it finds invalid class files in the classpath
// And Kotlin produces such classes, plus kotlin plugin puts them in the main sourceSet
Expand Down
182 changes: 182 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -3722,6 +3722,12 @@ public final Flux<T> doOnError(Predicate<? super Throwable> predicate,
* <p>
* @param onNext the callback to call on {@link Subscriber#onNext}
*
* @reactor.errorMode This operator supports {@link #errorStrategyContinue() resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the consumer are passed to
* the {@link #errorStrategyContinue(BiConsumer)} error consumer (the value consumer
* is not invoked, as the source element will be part of the sequence). The onNext
* signal is then propagated as normal.
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnNext(Consumer<? super T> onNext) {
Expand Down Expand Up @@ -3872,6 +3878,140 @@ public final Mono<T> elementAt(int index, T defaultValue) {
return Mono.onAssembly(new MonoElementAt<>(this, index, defaultValue));
}

/**
* Let compatible operators upstream recover from an error by dropping the incriminating
* element from the sequence and continuing with subsequent elements.
* <p>
* Note that this error handling mode is not necessarily implemented by all operators
* (look for the {@code Error Mode Support} javadoc section to find operators that
* support it).
* The error and associated root-cause element are dropped to {@link Operators#onErrorDropped(Throwable, Context)}
* and {@link Operators#onNextDropped(Object, Context)} respectively.
*
* @return a {@link Flux} that attempts to continue processing on errors.
*/
public final Flux<T> errorStrategyContinue() {
return subscriberContext(Context.of(
OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
OnNextFailureStrategy.resumeDrop()));
}

/**
* Let compatible operators upstream recover from some errors by dropping the
* incriminating element from the sequence and continuing with subsequent elements.
* Only errors matching the specified {@code type} are recovered from.
* <p>
* Note that this error handling mode is not necessarily implemented by all operators
* (look for the {@code Error Mode Support} javadoc section to find operators that
* support it).
* The error and associated root-cause element are dropped to {@link Operators#onErrorDropped(Throwable, Context)}
* and {@link Operators#onNextDropped(Object, Context)} respectively.
*
* @param type the class of the exception type to handle.
* @return a {@link Flux} that attempts to continue processing on some errors.
*/
public final Flux<T> errorStrategyContinue(Class<? extends Throwable> type) {
return subscriberContext(Context.of(
OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
OnNextFailureStrategy.resumeDropIf(type::isInstance)
));
}

/**
* Let compatible operators upstream recover from some errors by dropping the
* incriminating element from the sequence and continuing with subsequent elements.
* Only errors matching the {@link Predicate} are recovered from.
* <p>
* Note that this error handling mode is not necessarily implemented by all operators
* (look for the {@code Error Mode Support} javadoc section to find operators that
* support it).
* The error and associated root-cause element are dropped to {@link Operators#onErrorDropped(Throwable, Context)}
* and {@link Operators#onNextDropped(Object, Context)} respectively.
*
* @return a {@link Flux} that attempts to continue processing on some errors.
*/
public final Flux<T> errorStrategyContinue(Predicate<Throwable> errorPredicate) {
return subscriberContext(Context.of(
OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
OnNextFailureStrategy.resumeDropIf(errorPredicate)
));
}

/**
* Let compatible operators upstream recover from errors by dropping the
* incriminating element from the sequence and continuing with subsequent elements.
* Only errors matching the {@link Predicate} are recovered from.
* The recovered error and associated value are notified via the provided {@link BiConsumer}.
* <p>
* Note that this error handling mode is not necessarily implemented by all operators
* (look for the {@code Error Mode Support} javadoc section to find operators that
* support it).
*
* @return a {@link Flux} that attempts to continue processing on errors.
*/
public final Flux<T> errorStrategyContinue(BiConsumer<Throwable, ? super T> errorConsumer) {
//this cast is ok as only T values will be propagated in this sequence
@SuppressWarnings("unchecked") BiConsumer<Throwable, Object> genericConsumer = (BiConsumer<Throwable, Object>) errorConsumer;
return subscriberContext(Context.of(
OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
OnNextFailureStrategy.resume(genericConsumer)
));
}

/**
* Let compatible operators upstream recover from errors by dropping the
* incriminating element from the sequence and continuing with subsequent elements.
* Only errors matching the specified {@code type} are recovered from.
* The recovered error and associated value are notified via the provided {@link BiConsumer}.
* <p>
* Note that this error handling mode is not necessarily implemented by all operators
* (look for the {@code Error Mode Support} javadoc section to find operators that
* support it).
*
* @return a {@link Flux} that attempts to continue processing on some errors.
*/
public final <E extends Throwable> Flux<T> errorStrategyContinue(Class<E> type,
BiConsumer<Throwable, ? super T> errorConsumer) {
return errorStrategyContinue(type::isInstance, errorConsumer);
}

/**
* Let compatible operators upstream recover from errors by dropping the
* incriminating element from the sequence and continuing with subsequent elements.
* Only errors matching the {@link Predicate} are recovered from.
* The recovered error and associated value are notified via the provided {@link BiConsumer}.
* <p>
* Note that this error handling mode is not necessarily implemented by all operators
* (look for the {@code Error Mode Support} javadoc section to find operators that
* support it).
*
* @return a {@link Flux} that attempts to continue processing on some errors.
*/
public final <E extends Throwable> Flux<T> errorStrategyContinue(Predicate<E> errorPredicate,
BiConsumer<Throwable, ? super T> errorConsumer) {
//this cast is ok as only T values will be propagated in this sequence
@SuppressWarnings("unchecked")
Predicate<Throwable> genericPredicate = (Predicate<Throwable>) errorPredicate;
@SuppressWarnings("unchecked")
BiConsumer<Throwable, Object> genericErrorConsumer = (BiConsumer<Throwable, Object>) errorConsumer;
return subscriberContext(Context.of(
OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
OnNextFailureStrategy.resumeIf(genericPredicate, genericErrorConsumer)
));
}

/**
* Reset on next failure strategy to the default 'STOP' mode. This can be used to perform simply scoping of the on
* next failure strategy or to override the the inherited strategy in a sub-stream for example in a flatMap.
*
* @return a {@link Flux} that completes exceptionally on errors.
*/
public final Flux<T> errorStrategyStop() {
return subscriberContext(Context.of(
OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
OnNextFailureStrategy.stop()));
}

/**
* Recursively expand elements into a graph and emit all the resulting element,
* in a depth-first traversal order.
Expand Down Expand Up @@ -4035,6 +4175,11 @@ public final Flux<T> expand(Function<? super T, ? extends Publisher<? extends T>
*
* @param p the {@link Predicate} to test values against
*
* @reactor.errorMode This operator supports {@link #errorStrategyContinue() resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the predicate are
* considered as if the predicate returned false: they cause the source value to be
* dropped and a new element ({@code request(1)}) being requested from upstream.
*
* @return a new {@link Flux} containing only values that pass the predicate test
*/
public final Flux<T> filter(Predicate<? super T> p) {
Expand Down Expand Up @@ -4107,6 +4252,13 @@ public final Flux<T> filterWhen(Function<? super T, ? extends Publisher<Boolean>
* @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher}
* @param <R> the merged output sequence type
*
* @reactor.errorMode This operator supports {@link #errorStrategyContinue() resuming on errors}
* in the mapper {@link Function}. Exceptions thrown by the mapper then behave as if
* it had mapped the value to an empty publisher. If the mapper does map to a scalar
* publisher (an optimization in which the value can be resolved immediately without
* subscribing to the publisher, e.g. a {@link Mono#fromCallable(Callable)}) but said
* publisher throws, this can be resumed from in the same manner.
*
* @return a new {@link Flux}
*/
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
Expand Down Expand Up @@ -4139,6 +4291,13 @@ public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? exten
* @param concurrency the maximum number of in-flight inner sequences
* @param <V> the merged output sequence type
*
* @reactor.errorMode This operator supports {@link #errorStrategyContinue() resuming on errors}
* in the mapper {@link Function}. Exceptions thrown by the mapper then behave as if
* it had mapped the value to an empty publisher. If the mapper does map to a scalar
* publisher (an optimization in which the value can be resolved immediately without
* subscribing to the publisher, e.g. a {@link Mono#fromCallable(Callable)}) but said
* publisher throws, this can be resumed from in the same manner.
*
* @return a new {@link Flux}
*/
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int
Expand Down Expand Up @@ -4173,6 +4332,13 @@ public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? exten
* @param prefetch the maximum in-flight elements from each inner {@link Publisher} sequence
* @param <V> the merged output sequence type
*
* @reactor.errorMode This operator supports {@link #errorStrategyContinue() resuming on errors}
* in the mapper {@link Function}. Exceptions thrown by the mapper then behave as if
* it had mapped the value to an empty publisher. If the mapper does map to a scalar
* publisher (an optimization in which the value can be resolved immediately without
* subscribing to the publisher, e.g. a {@link Mono#fromCallable(Callable)}) but said
* publisher throws, this can be resumed from in the same manner.
*
* @return a merged {@link Flux}
*/
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int
Expand Down Expand Up @@ -4208,6 +4374,13 @@ public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? exten
* @param prefetch the maximum in-flight elements from each inner {@link Publisher} sequence
* @param <V> the merged output sequence type
*
* @reactor.errorMode This operator supports {@link #errorStrategyContinue() resuming on errors}
* in the mapper {@link Function}. Exceptions thrown by the mapper then behave as if
* it had mapped the value to an empty publisher. If the mapper does map to a scalar
* publisher (an optimization in which the value can be resolved immediately without
* subscribing to the publisher, e.g. a {@link Mono#fromCallable(Callable)}) but said
* publisher throws, this can be resumed from in the same manner.
*
* @return a merged {@link Flux}
*/
public final <V> Flux<V> flatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper,
Expand Down Expand Up @@ -4642,6 +4815,10 @@ public final <TRight, TLeftEnd, TRightEnd, R> Flux<R> groupJoin(
* @param handler the handling {@link BiConsumer}
* @param <R> the transformed type
*
* @reactor.errorMode This operator supports {@link #errorStrategyContinue() resuming on errors} (including when
* fusion is enabled) when the {@link BiConsumer} throws an exception or if an error is signaled explicitly via
* {@link SynchronousSink#error(Throwable)}.
*
* @return a transformed {@link Flux}
*/
public final <R> Flux<R> handle(BiConsumer<? super T, SynchronousSink<R>> handler) {
Expand Down Expand Up @@ -5062,6 +5239,11 @@ public final Flux<T> log(Logger logger,
* @param mapper the synchronous transforming {@link Function}
* @param <V> the transformed type
*
* @reactor.errorMode This operator supports {@link #errorStrategyContinue() resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the mapper then cause the
* source value to be dropped and a new element ({@code request(1)}) being requested
* from upstream.
*
* @return a transformed {@link Flux}
*/
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,18 +287,25 @@ public void innerComplete() {

@Override
public void innerError(Throwable e) {
if (Exceptions.addThrowable(ERROR, this, e)) {
s.cancel();
e = Operators.onNextInnerError(e, currentContext(), s);
if(e != null) {
if (Exceptions.addThrowable(ERROR, this, e)) {
s.cancel();

if (GUARD.getAndIncrement(this) == 0) {
e = Exceptions.terminate(ERROR, this);
if (e != TERMINATED) {
actual.onError(e);
if (GUARD.getAndIncrement(this) == 0) {
e = Exceptions.terminate(ERROR, this);
if (e != TERMINATED) {
actual.onError(e);
}
}
}
else {
Operators.onErrorDropped(e, actual.currentContext());
}
}
else {
Operators.onErrorDropped(e, actual.currentContext());
active = false;
drain();
}
}

Expand Down Expand Up @@ -358,9 +365,15 @@ void drain() {
"The mapper returned a null Publisher");
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(s, e, v,
actual.currentContext()));
return;
Throwable e_ = Operators.onNextError(v, e, actual.currentContext(), s);
if (e_ != null) {
actual.onError(Operators.onOperatorError(s, e, v,
actual.currentContext()));
return;
}
else {
continue;
}
}

if (sourceMode != Fuseable.SYNC) {
Expand All @@ -384,9 +397,15 @@ void drain() {
vr = callable.call();
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(s, e, v,
actual.currentContext()));
return;
Throwable e_ = Operators.onNextError(v, e, actual.currentContext(), s);
if (e_ != null) {
actual.onError(Operators.onOperatorError(s, e, v,
actual.currentContext()));
return;
}
else {
continue;
}
}

if (vr == null) {
Expand Down Expand Up @@ -612,16 +631,22 @@ public void innerComplete() {

@Override
public void innerError(Throwable e) {
if (Exceptions.addThrowable(ERROR, this, e)) {
if (!veryEnd) {
s.cancel();
done = true;
e = Operators.onNextInnerError(e, currentContext(), s);
if(e != null) {
if (Exceptions.addThrowable(ERROR, this, e)) {
if (!veryEnd) {
s.cancel();
done = true;
}
active = false;
drain();
}
else {
Operators.onErrorDropped(e, actual.currentContext());
}
active = false;
drain();
}
else {
Operators.onErrorDropped(e, actual.currentContext());
active = false;
}
}

Expand Down Expand Up @@ -695,9 +720,15 @@ void drain() {
"The mapper returned a null Publisher");
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(s, e, v,
actual.currentContext()));
return;
Throwable e_ = Operators.onNextError(v, e, actual.currentContext(), s);
if (e_ != null) {
actual.onError(Operators.onOperatorError(s, e, v,
actual.currentContext()));
return;
}
else {
continue;
}
}

if (sourceMode != Fuseable.SYNC) {
Expand All @@ -721,12 +752,17 @@ void drain() {
vr = supplier.call();
}
catch (Throwable e) {
//does the strategy apply? if so, short-circuit the delayError. In any case, don't cancel
Throwable e_ = Operators.onNextPollError(v, e, actual.currentContext());
if (e_ == null) {
continue;
}
//now if error mode strategy doesn't apply, let delayError play
if (veryEnd && Exceptions.addThrowable(ERROR, this, e)) {
continue;
}
else {
actual.onError(Operators.onOperatorError(s, e, v,
actual.currentContext()));
actual.onError(Operators.onOperatorError(s, e, v, actual.currentContext()));
return;
}
}
Expand Down
Loading

0 comments on commit 3206bae

Please sign in to comment.