Skip to content

Commit

Permalink
fix reactor#767 Revert publishOn async path detection over-optimization
Browse files Browse the repository at this point in the history
This commit reverts an optimization from 5254299 that causes a bug
in some cases where the ASYNC fusion mode is enabled but the onNext is
called with a non-null value.

This would result in calling `offer` on some QueueSubscription, most of
which will just throw an `UnsupportedOperationException`.
  • Loading branch information
simonbasle committed Jul 31, 2017
1 parent 73daad4 commit 475ca5e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ void initialRequest() {

@Override
public void onNext(T t) {
if (t == null) {//async fusion
if (sourceMode == ASYNC) {
if (trySchedule() == Scheduler.REJECTED) {
throw Operators.onRejectedExecution(this, null, t);
}
Expand Down Expand Up @@ -697,7 +697,7 @@ void initialRequest() {

@Override
public void onNext(T t) {
if (t == null) {//async fusion
if (sourceMode == ASYNC) {
if (trySchedule() == Scheduler.REJECTED) {
throw Operators.onRejectedExecution(this, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
Expand All @@ -55,6 +55,7 @@
import reactor.test.publisher.FluxOperatorTest;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.concurrent.Queues;
import reactor.util.function.Tuple2;

import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.hamcrest.CoreMatchers.*;
Expand Down Expand Up @@ -1307,6 +1308,49 @@ public void scanConditionalSubscriber() {
Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

//see https://github.com/reactor/reactor-core/issues/767
@Test
public void publishOnAsyncDetection() {
Publisher<String> a = Flux.just("a");
Publisher<String> b = Mono.just("b");

Flux<Tuple2<String, String>> flux =
Flux.from(a)
.flatMap(value -> Mono.just(value)
.and(Mono.from(b)))
.publishOn(Schedulers.single());

StepVerifier.create(flux)
.expectFusion(Fuseable.ASYNC)
.assertNext(tuple -> {
Assertions.assertThat(tuple.getT1()).isEqualTo("a");
Assertions.assertThat(tuple.getT2()).isEqualTo("b");
})
.verifyComplete();
}

//see https://github.com/reactor/reactor-core/issues/767
@Test
public void publishOnAsyncDetectionConditional() {
Publisher<String> a = Flux.just("a");
Publisher<String> b = Mono.just("b");

Flux<Tuple2<String, String>> flux =
Flux.from(a)
.flatMap(value -> Mono.just(value)
.and(Mono.from(b)))
.publishOn(Schedulers.single())
.filter(t -> true);

StepVerifier.create(flux)
.expectFusion(Fuseable.ASYNC)
.assertNext(tuple -> {
Assertions.assertThat(tuple.getT1()).isEqualTo("a");
Assertions.assertThat(tuple.getT2()).isEqualTo("b");
})
.verifyComplete();
}

private static class FailNullWorkerScheduler implements Scheduler {

@Override
Expand Down

0 comments on commit 475ca5e

Please sign in to comment.