Skip to content

Commit

Permalink
Merge reactor#1945 into 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Nov 7, 2019
2 parents c17e276 + b220eb8 commit cb74c0c
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -765,16 +765,16 @@ void drain() {
if (ctx == null) {
ctx = actual.currentContext();
}
Throwable e_ = Operators.onNextPollError(v, e, ctx);
Throwable e_ = Operators.onNextError(v, e, ctx);
if (e_ == null) {
continue;
}
//now if error mode strategy doesn't apply, let delayError play
if (veryEnd && Exceptions.addThrowable(ERROR, this, e)) {
if (veryEnd && Exceptions.addThrowable(ERROR, this, e_)) {
continue;
}
else {
actual.onError(Operators.onOperatorError(s, e, v, ctx));
actual.onError(Operators.onOperatorError(s, e_, v, ctx));
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ static <T, R> boolean trySubscribeScalarMap(Publisher<? extends T> source,
catch (Throwable e) {
Context ctx = s.currentContext();
Throwable e_ = errorContinueExpected ?
Operators.onNextPollError(null, e, ctx) :
Operators.onNextError(null, e, ctx) :
Operators.onOperatorError(e, ctx);
if (e_ != null) {
Operators.error(s, e_);
Expand All @@ -154,7 +154,7 @@ static <T, R> boolean trySubscribeScalarMap(Publisher<? extends T> source,
catch (Throwable e) {
Context ctx = s.currentContext();
Throwable e_ = errorContinueExpected ?
Operators.onNextPollError(t, e, ctx) :
Operators.onNextError(t, e, ctx) :
Operators.onOperatorError(null, e, t, ctx);
if (e_ != null) {
Operators.error(s, e_);
Expand All @@ -175,7 +175,7 @@ static <T, R> boolean trySubscribeScalarMap(Publisher<? extends T> source,
catch (Throwable e) {
Context ctx = s.currentContext();
Throwable e_ = errorContinueExpected ?
Operators.onNextPollError(t, e, ctx) :
Operators.onNextError(t, e, ctx) :
Operators.onOperatorError(null, e, t, ctx);
if (e_ != null) {
Operators.error(s, e_);
Expand Down Expand Up @@ -399,13 +399,13 @@ public void onNext(T t) {
catch (Throwable e) {
Context ctx = actual.currentContext();
//does the strategy apply? if so, short-circuit the delayError. In any case, don't cancel
Throwable e_ = Operators.onNextPollError(t, e, ctx);
Throwable e_ = Operators.onNextError(t, e, ctx);
if (e_ == null) {
return;
}
//now if error mode strategy doesn't apply, let delayError play
if (!delayError || !Exceptions.addThrowable(ERROR, this, e)) {
onError(Operators.onOperatorError(s, e, t, ctx));
if (!delayError || !Exceptions.addThrowable(ERROR, this, e_)) {
onError(Operators.onOperatorError(s, e_, t, ctx));
}
Operators.onDiscard(t, ctx);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,9 @@ public R poll() {
handler.accept(v, this);
}
catch (Throwable error){
Throwable e_ = Operators.onNextPollError(v, error, actual.currentContext());
RuntimeException e_ = Operators.onNextPollError(v, error, actual.currentContext());
if (e_ != null) {
throw Exceptions.propagate(e_);
throw e_;
}
else {
reset();
Expand All @@ -277,10 +277,10 @@ public R poll() {
data = null;
if (stop) {
if (error != null) {
Throwable e_ = Operators.onNextPollError(v, error, actual.currentContext());
RuntimeException e_ = Operators.onNextPollError(v, error, actual.currentContext());
if (e_ != null) {
done = true; //set done because we throw or go through `actual` directly
throw Exceptions.propagate(e_);
throw e_;
}
//else continue
}
Expand Down Expand Up @@ -313,9 +313,9 @@ else if (dropped != 0L) {
handler.accept(v, this);
}
catch (Throwable error){
Throwable e_ = Operators.onNextPollError(v, error, actual.currentContext());
RuntimeException e_ = Operators.onNextPollError(v, error, actual.currentContext());
if (e_ != null) {
throw Exceptions.propagate(e_);
throw e_;
}
else {
reset();
Expand All @@ -326,10 +326,10 @@ else if (dropped != 0L) {
data = null;
if (stop) {
if (error != null) {
Throwable e_ = Operators.onNextPollError(v, error, actual.currentContext());
RuntimeException e_ = Operators.onNextPollError(v, error, actual.currentContext());
if (e_ != null) {
done = true; //set done because we throw or go through `actual` directly
throw Exceptions.propagate(e_);
throw e_;
}
else {
reset();
Expand Down Expand Up @@ -651,9 +651,9 @@ public R poll() {
handler.accept(v, this);
}
catch (Throwable error){
Throwable e_ = Operators.onNextPollError(v, error, actual.currentContext());
RuntimeException e_ = Operators.onNextPollError(v, error, actual.currentContext());
if (e_ != null) {
throw Exceptions.propagate(e_);
throw e_;
}
else {
reset();
Expand Down Expand Up @@ -703,9 +703,9 @@ else if (dropped != 0L) {
handler.accept(v, this);
}
catch (Throwable error){
Throwable e_ = Operators.onNextPollError(v, error, actual.currentContext());
RuntimeException e_ = Operators.onNextPollError(v, error, actual.currentContext());
if (e_ != null) {
throw Exceptions.propagate(e_);
throw e_;
}
else {
reset();
Expand All @@ -717,9 +717,9 @@ else if (dropped != 0L) {
if (stop) {
done = true; //set done because we throw or go through `actual` directly
if (error != null) {
Throwable e_ = Operators.onNextPollError(v, error, actual.currentContext());
RuntimeException e_ = Operators.onNextPollError(v, error, actual.currentContext());
if (e_ != null) {
throw Exceptions.propagate(e_);
throw e_;
}
else{
reset();
Expand Down
54 changes: 46 additions & 8 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -682,15 +682,18 @@ static final OnNextFailureStrategy onNextErrorStrategy(Context context) {
* return {@code false} to indicate value was not consumed and more must be
* tried.</li>
*
* <li>{@code poll}: use {@link #onNextPollError(Object, Throwable, Context)} instead.</li>
* <li>any of the above where the error is going to be propagated through onError but the
* subscription shouldn't be cancelled: use {@link #onNextError(Object, Throwable, Context)} instead.</li>
*
* <li>{@code poll} (where the error will be thrown): use {@link #onNextPollError(Object, Throwable, Context)} instead.</li>
* </ul>
*
* @param value The onNext value that caused an error.
* @param value The onNext value that caused an error. Can be null.
* @param error The error.
* @param context The most significant {@link Context} in which to look for an {@link OnNextFailureStrategy}.
* @param subscriptionForCancel The {@link Subscription} that should be cancelled if the
* strategy is terminal. Not null, use {@link #onNextPollError(Object, Throwable, Context)}
* when unsubscribing is undesirable (eg. for poll()).
* @param subscriptionForCancel The mandatory {@link Subscription} that should be cancelled if the
* strategy is terminal. See also {@link #onNextError(Object, Throwable, Context)} and
* {@link #onNextPollError(Object, Throwable, Context)} for alternatives that don't cancel a subscription
* @param <T> The type of the value causing the error.
* @return a {@link Throwable} to propagate through onError if the strategy is
* terminal and cancelled the subscription, null if not.
Expand All @@ -714,6 +717,35 @@ public static <T> Throwable onNextError(@Nullable T value, Throwable error, Cont
}
}

/**
* Find the {@link OnNextFailureStrategy} to apply to the calling async operator (which could be
* a local error mode defined in the {@link Context}) and apply it.
* <p>
* This variant never cancels a {@link Subscription}. It returns a {@link Throwable} if the error is
* fatal for the error mode, in which case the operator should call onError with the
* returned error. On the contrary, if the error mode allows the sequence to
* continue, this method returns {@code null}.
*
* @param value The onNext value that caused an error.
* @param error The error.
* @param context The most significant {@link Context} in which to look for an {@link OnNextFailureStrategy}.
* @param <T> The type of the value causing the error.
* @return a {@link Throwable} to propagate through onError if the strategy is terminal, null if not.
* @see #onNextError(Object, Throwable, Context, Subscription)
*/
@Nullable
public static <T> Throwable onNextError(@Nullable T value, Throwable error, Context context) {
error = unwrapOnNextError(error);
OnNextFailureStrategy strategy = onNextErrorStrategy(context);
if (strategy.test(error, value)) {
//some strategies could still return an exception, eg. if the consumer throws
return strategy.process(error, value, context);
}
else {
return onOperatorError(null, error, value, context);
}
}

/**
* Find the {@link OnNextFailureStrategy} to apply to the calling operator (which could be a local
* error mode defined in the {@link Context}) and apply it.
Expand Down Expand Up @@ -747,17 +779,23 @@ public static <T> Throwable onNextInnerError(Throwable error, Context context, S
* Find the {@link OnNextFailureStrategy} to apply to the calling async operator (which could be
* a local error mode defined in the {@link Context}) and apply it.
* <p>
* Returns a {@link RuntimeException} if errors are fatal for the error mode, in which
* Returns a {@link RuntimeException} if the error is fatal for the error mode, in which
* case the operator poll should throw the returned error. On the contrary if the
* error mode allows the sequence to continue, returns {@code null} in which case
* the operator should retry the {@link Queue#poll() poll()}.
* <p>
* Note that this method {@link Exceptions#propagate(Throwable) wraps} checked exceptions in order to
* return a {@link RuntimeException} that can be thrown from an arbitrary method. If you don't want to
* throw the returned exception and this wrapping behavior is undesirable, but you still don't want to
* cancel a subscription, you can use {@link #onNextError(Object, Throwable, Context)} instead.
*
* @param value The onNext value that caused an error.
* @param error The error.
* @param context The most significant {@link Context} in which to look for an {@link OnNextFailureStrategy}.
* @param <T> The type of the value causing the error.
* @return a {@link Throwable} to propagate through onError if the strategy is
* terminal and cancelled the subscription, null if not.
* @return a {@link RuntimeException} to be thrown (eg. within {@link Queue#poll()} if the error is terminal in
* the strategy, null if not.
* @see #onNextError(Object, Throwable, Context)
*/
@Nullable
public static <T> RuntimeException onNextPollError(@Nullable T value, Throwable error, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1688,4 +1688,36 @@ public void errorModeContinueScalarSourceMappedCallableFails() {

assertThat(msg).contains("42 skipped, reason: boom");
}

@Test
public void noWrappingOfCheckedExceptions() {
Flux.just("single")
.flatMap(x -> Mono.error(new NoSuchMethodException()))
.as(StepVerifier::create)
.expectError(NoSuchMethodException.class)
.verify();

Flux.just("a", "b")
.flatMap(x -> Mono.error(new NoSuchMethodException()))
.as(StepVerifier::create)
.expectError(NoSuchMethodException.class)
.verify();
}

@Test
public void noWrappingOfCheckedExceptions_hide() {
Flux.just("single")
.hide()
.flatMap(x -> Mono.error(new NoSuchMethodException()))
.as(StepVerifier::create)
.expectError(NoSuchMethodException.class)
.verify();

Flux.just("a", "b")
.hide()
.flatMap(x -> Mono.error(new NoSuchMethodException()))
.as(StepVerifier::create)
.expectError(NoSuchMethodException.class)
.verify();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;

Expand Down Expand Up @@ -93,4 +94,23 @@ public void scanInner() {
test.cancel();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

@Test
public void noWrappingOfCheckedExceptions() {
Mono.just("single")
.flatMap(x -> Mono.error(new NoSuchMethodException()))
.as(StepVerifier::create)
.expectError(NoSuchMethodException.class)
.verify();
}

@Test
public void noWrappingOfCheckedExceptions_hide() {
Mono.just("single")
.hide()
.flatMap(x -> Mono.error(new NoSuchMethodException()))
.as(StepVerifier::create)
.expectError(NoSuchMethodException.class)
.verify();
}
}

0 comments on commit cb74c0c

Please sign in to comment.