Skip to content

Commit

Permalink
Fix volatile read & conditionalSub classcast in switchOnFirst (reacto…
Browse files Browse the repository at this point in the history
…r#2166)

This commit prevents double application of the function due to the ordering of
reading the `this.first` volatile field.

The out-of-order read could lead to a ClassCastException where the downstream
subscriber is expected to be a ConditionalSubscriber but hasn't been wrapped
accordingly.
  • Loading branch information
OlegDokuka authored May 19, 2020
1 parent 887bd8f commit 9b14da6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
// read of the first should occur before the read of inner since otherwise
// first may be nulled while the previous read has shown that inner is still
// null hence double invocation of transformer occurs
final T f = this.first;
final CoreSubscriber<? super T> i = this.inner;
if (this.done || i == Operators.EMPTY_SUBSCRIBER) {
Operators.onErrorDropped(t, currentContext());
Expand All @@ -169,7 +173,6 @@ public void onError(Throwable t) {
this.throwable = t;
this.done = true;

final T f = this.first;
if (f == null && i == null) {
final Publisher<? extends R> result;
final CoreSubscriber<? super R> o = this.outer;
Expand All @@ -194,14 +197,17 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
// read of the first should occur before the read of inner since otherwise
// first may be nulled while the previous read has shown that inner is still
// null hence double invocation of transformer occurs
final T f = this.first;
final CoreSubscriber<? super T> i = this.inner;
if (this.done || i == Operators.EMPTY_SUBSCRIBER) {
return;
}

this.done = true;

final T f = this.first;
if (f == null && i == null) {
final Publisher<? extends R> result;
final CoreSubscriber<? super R> o = outer;
Expand Down Expand Up @@ -368,7 +374,7 @@ static final class SwitchOnFirstConditionalMain<T, R> extends AbstractSwitchOnFi

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
if (this.inner == null && INNER.compareAndSet(this, null, actual)) {
if (this.inner == null && INNER.compareAndSet(this, null, Operators.toConditionalSubscriber(actual))) {
if (this.first == null && this.done) {
final Throwable t = this.throwable;
if (t != null) {
Expand All @@ -379,7 +385,6 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}
return;
}
this.inner = Operators.toConditionalSubscriber(actual);
actual.onSubscribe(this);
}
else if (this.inner != Operators.EMPTY_SUBSCRIBER) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,23 @@ public void shouldSendOnNextSignal() {

@Test
public void shouldSendOnNextAsyncSignal() {
@SuppressWarnings("unchecked")
Signal<? extends Long>[] first = new Signal[1];
for (int i = 0; i < 10000; i++) {
@SuppressWarnings("unchecked") Signal<? extends Long>[] first = new Signal[1];

StepVerifier.create(Flux.just(1L)
.switchOnFirst((s, f) -> {
first[0] = s;
StepVerifier.create(Flux.just(1L)
.switchOnFirst((s, f) -> {
first[0] = s;

return f.subscribeOn(Schedulers.elastic());
}))
.expectSubscription()
.expectNext(1L)
.expectComplete()
.verify(Duration.ofSeconds(5));
return f.subscribeOn(Schedulers.elastic());
}))
.expectSubscription()
.expectNext(1L)
.expectComplete()
.verify(Duration.ofSeconds(5));


Assertions.assertThat((long) first[0].get()).isEqualTo(1L);
Assertions.assertThat((long) first[0].get())
.isEqualTo(1L);
}
}

@Test
Expand Down

0 comments on commit 9b14da6

Please sign in to comment.