Skip to content

Commit

Permalink
fix #1853 Prevent Mono#using to trigger onNext+onError if cleanup fails
Browse files Browse the repository at this point in the history
This commit slightly changes the behavior of Mono#using so that, in
eager mode, the cleanup is applied inside `onNext`. Thus it is applied
before the value is passed to downstream (which at this point is no
longer in the protected scope of the `sourceSupplier`).

This is done to prevent the case where the value would be passed down
yet the cleanup throws, resulting in an invalid sequence of onNext then
onError signals.
  • Loading branch information
simonbasle committed Aug 27, 2019
1 parent df34ad9 commit 2d55d7d
Show file tree
Hide file tree
Showing 3 changed files with 409 additions and 19 deletions.
16 changes: 11 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -679,15 +679,20 @@ public static Mono<Context> subscriberContext() {
* Mono derived from the same resource and makes sure the resource is released if the
* sequence terminates or the Subscriber cancels.
* <p>
* <ul> <li>Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup
* Consumer may override the terminal event.</li> <li>Non-eager cleanup will drop any exception.</li> </ul>
* <ul>
* <li>For eager cleanup, unlike in {@link Flux#using(Callable, Function, Consumer, boolean) Flux},
* in the case of a valued {@link Mono} the cleanup happens just before passing the value to downstream.
* In all cases, exceptions raised by the eager cleanup {@link Consumer} may override the terminal event,
* discarding the element if the derived {@link Mono} was valued.</li>
* <li>Non-eager cleanup will drop any exception.</li>
* </ul>
* <p>
* <img class="marble" src="doc-files/marbles/usingForMono.svg" alt="">
*
* @param resourceSupplier a {@link Callable} that is called on subscribe to create the resource
* @param sourceSupplier a {@link Mono} factory to create the Mono depending on the created resource
* @param resourceCleanup invoked on completion to clean-up the resource
* @param eager set to true to clean before terminating downstream subscribers
* @param eager set to true to clean before any signal (including onNext) is passed downstream
* @param <T> emitted type
* @param <D> resource type
*
Expand All @@ -706,8 +711,9 @@ public static <T, D> Mono<T> using(Callable<? extends D> resourceSupplier,
* Mono derived from the same resource and makes sure the resource is released if the
* sequence terminates or the Subscriber cancels.
* <p>
* Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer
* may override the terminal event.
* Unlike in {@link Flux#using(Callable, Function, Consumer) Flux}, in the case of a valued {@link Mono} the cleanup
* happens just before passing the value to downstream. In all cases, exceptions raised by the cleanup
* {@link Consumer} may override the terminal event, discarding the element if the derived {@link Mono} was valued.
* <p>
* <img class="marble" src="doc-files/marbles/usingForMono.svg" alt="">
*
Expand Down
262 changes: 250 additions & 12 deletions reactor-core/src/main/java/reactor/core/publisher/MonoUsing.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;

import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Uses a resource, generated by a supplier for each individual Subscriber,
Expand Down Expand Up @@ -63,7 +68,6 @@ final class MonoUsing<T, S> extends Mono<T> implements Fuseable, SourceProducer<
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T> actual) {
S resource;

Expand Down Expand Up @@ -95,28 +99,262 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

if (p instanceof Fuseable) {
p.subscribe(new FluxUsing.UsingFuseableSubscriber<>(actual,
p.subscribe(new MonoUsingSubscriber<>(actual,
resourceCleanup,
resource,
eager));
}
else if (actual instanceof ConditionalSubscriber) {
p.subscribe(new FluxUsing.UsingConditionalSubscriber<>((ConditionalSubscriber<? super
T>) actual,
resourceCleanup,
resource,
eager));
eager,
true));
}
else {
p.subscribe(new FluxUsing.UsingSubscriber<>(actual,
p.subscribe(new MonoUsingSubscriber<>(actual,
resourceCleanup,
resource,
eager));
eager,
false));
}
}

@Override
public Object scanUnsafe(Attr key) {
return null; //no particular key to be represented, still useful in hooks
}

static final class MonoUsingSubscriber<T, S>
implements InnerOperator<T, T>, QueueSubscription<T> {

final CoreSubscriber<? super T> actual;

final Consumer<? super S> resourceCleanup;

final S resource;

final boolean eager;
final boolean allowFusion;

Subscription s;
@Nullable
QueueSubscription<T> qs;

volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MonoUsingSubscriber> WIP =
AtomicIntegerFieldUpdater.newUpdater(MonoUsingSubscriber.class, "wip");

int mode;
boolean valued;

MonoUsingSubscriber(CoreSubscriber<? super T> actual,
Consumer<? super S> resourceCleanup,
S resource,
boolean eager,
boolean allowFusion) {
this.actual = actual;
this.resourceCleanup = resourceCleanup;
this.resource = resource;
this.eager = eager;
this.allowFusion = allowFusion;
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED || key == Attr.CANCELLED)
return wip == 1;
if (key == Attr.PARENT) return s;

return InnerOperator.super.scanUnsafe(key);
}

@Override
public CoreSubscriber<? super T> actual() {
return actual;
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
if (WIP.compareAndSet(this, 0, 1)) {
s.cancel();

cleanup();
}
}

void cleanup() {
try {
resourceCleanup.accept(resource);
}
catch (Throwable e) {
Operators.onErrorDropped(e, actual.currentContext());
}
}

@SuppressWarnings("unchecked")
@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueSubscription) {
this.qs = (QueueSubscription<T>) s;
}

actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (mode == ASYNC) {
actual.onNext(null);
return;
}
this.valued = true;

if (eager && WIP.compareAndSet(this, 0, 1)) {
try {
resourceCleanup.accept(resource);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
actual.onError(Operators.onOperatorError(e, ctx));
Operators.onDiscard(t, ctx);
return;
}
}

actual.onNext(t);
actual.onComplete();

if (!eager && WIP.compareAndSet(this, 0, 1)) {
try {
resourceCleanup.accept(resource);
}
catch (Throwable e) {
Operators.onErrorDropped(e, actual.currentContext());
}
}
}

@Override
public void onError(Throwable t) {
if (valued && mode != ASYNC) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
if (eager && WIP.compareAndSet(this, 0, 1)) {
try {
resourceCleanup.accept(resource);
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
t = Exceptions.addSuppressed(_e, t);
}
}

actual.onError(t);

if (!eager && WIP.compareAndSet(this, 0, 1)) {
cleanup();
}
}

@Override
public void onComplete() {
if (valued && mode != ASYNC) {
return;
}
//this should only happen in the empty case
if (eager && WIP.compareAndSet(this, 0, 1)) {
try {
resourceCleanup.accept(resource);
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(e, actual.currentContext()));
return;
}
}

actual.onComplete();

if (!eager && WIP.compareAndSet(this, 0, 1)) {
try {
resourceCleanup.accept(resource);
}
catch (Throwable e) {
Operators.onErrorDropped(e, actual.currentContext());
}
}
}

@Override
public void clear() {
if (qs != null) {
qs.clear();
}
}

@Override
public boolean isEmpty() {
return qs == null || qs.isEmpty();
}

@Override
@Nullable
public T poll() {
if (mode == NONE || qs == null) {
return null;
}

T v = qs.poll();

if (v != null) {
valued = true;
if (eager && WIP.compareAndSet(this, 0, 1)) {
try {
resourceCleanup.accept(resource); //throws upwards
}
catch (Throwable t) {
Operators.onDiscard(v, actual.currentContext());
throw t;
}
}
}
else if (mode == SYNC) {
if (!eager && WIP.compareAndSet(this, 0, 1)) {
try {
resourceCleanup.accept(resource);
}
catch (Throwable t) {
if (!valued) throw t;
else Operators.onErrorDropped(t, actual.currentContext());
//returns null, ie onComplete, in the second case
}
}
}
return v;
}

@Override
public int requestFusion(int requestedMode) {
if (qs == null) {
mode = NONE;
return NONE;
}
int m = qs.requestFusion(requestedMode);
mode = m;
return m;
}

@Override
public int size() {
if (qs == null) {
return 0;
}
return qs.size();
}
}
}
Loading

0 comments on commit 2d55d7d

Please sign in to comment.