Skip to content

Commit

Permalink
Fix reactor#230: the onNext/onComplete callback reversal in async-fus…
Browse files Browse the repository at this point in the history
…ed sequences (reactor#243)
  • Loading branch information
akarnokd authored and smaldini committed Oct 28, 2016
1 parent 36bbec7 commit 1a0b64d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 53 deletions.
110 changes: 60 additions & 50 deletions src/main/java/reactor/core/publisher/FluxPeekFuseable.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static final class PeekFuseableSubscriber<T>

int sourceMode;

boolean done;
volatile boolean done;

public PeekFuseableSubscriber(Subscriber<? super T> actual, SignalPeek<T> parent) {
this.actual = actual;
Expand Down Expand Up @@ -211,31 +211,35 @@ public void onComplete() {
return;
}
done = true;
if(parent.onCompleteCall() != null) {
try {
parent.onCompleteCall().run();
}
catch (Throwable e) {
onError(Operators.onOperatorError(e));
return;
}
}

if (sourceMode == NONE) {
if(parent.onCompleteCall() != null) {
try {
parent.onCompleteCall().run();
}
catch (Throwable e) {
onError(Operators.onOperatorError(e));
return;
}
}
}

actual.onComplete();

if(parent.onAfterTerminateCall() != null) {
try {
parent.onAfterTerminateCall().run();
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e);
if(parent.onErrorCall() != null) {
parent.onErrorCall().accept(_e);
}
Operators.onErrorDropped(_e);
}
}
}
if (sourceMode == NONE) {
if(parent.onAfterTerminateCall() != null) {
try {
parent.onAfterTerminateCall().run();
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e);
if(parent.onErrorCall() != null) {
parent.onErrorCall().accept(_e);
}
Operators.onErrorDropped(_e);
}
}
}
}

@Override
public Object downstream() {
Expand All @@ -249,6 +253,7 @@ public Object upstream() {

@Override
public T poll() {
boolean d = done;
T v = s.poll();
if (v != null && parent.onNextCall() != null) {
try {
Expand All @@ -258,7 +263,7 @@ public T poll() {
throw Exceptions.propagate(Operators.onOperatorError(s, e, v));
}
}
if (v == null && sourceMode == SYNC) {
if (v == null && (d || sourceMode == SYNC)) {
Runnable call = parent.onCompleteCall();
if (call != null) {
call.run();
Expand Down Expand Up @@ -310,7 +315,7 @@ static final class PeekFuseableConditionalSubscriber<T>

int sourceMode;

boolean done;
volatile boolean done;

public PeekFuseableConditionalSubscriber(ConditionalSubscriber<? super T> actual, SignalPeek<T> parent) {
this.actual = actual;
Expand Down Expand Up @@ -452,31 +457,35 @@ public void onComplete() {
return;
}
done = true;
if(parent.onCompleteCall() != null) {
try {
parent.onCompleteCall().run();
}
catch (Throwable e) {
onError(Operators.onOperatorError(e));
return;
}
}

if (sourceMode == NONE) {
if(parent.onCompleteCall() != null) {
try {
parent.onCompleteCall().run();
}
catch (Throwable e) {
onError(Operators.onOperatorError(e));
return;
}
}
}

actual.onComplete();

if(parent.onAfterTerminateCall() != null) {
try {
parent.onAfterTerminateCall().run();
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e);
if(parent.onErrorCall() != null) {
parent.onErrorCall().accept(_e);
}
Operators.onErrorDropped(_e);
}
}
}
if (sourceMode == NONE) {
if(parent.onAfterTerminateCall() != null) {
try {
parent.onAfterTerminateCall().run();
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e);
if(parent.onErrorCall() != null) {
parent.onErrorCall().accept(_e);
}
Operators.onErrorDropped(_e);
}
}
}
}

@Override
public Object downstream() {
Expand All @@ -490,6 +499,7 @@ public Object upstream() {

@Override
public T poll() {
boolean d = done;
T v = s.poll();
if (v != null && parent.onNextCall() != null) {
try {
Expand All @@ -499,7 +509,7 @@ public T poll() {
throw Exceptions.propagate(Operators.onOperatorError(s, e, v));
}
}
if (v == null && sourceMode == SYNC) {
if (v == null && (d || sourceMode == SYNC)) {
Runnable call = parent.onCompleteCall();
if (call != null) {
call.run();
Expand Down
48 changes: 45 additions & 3 deletions src/test/java/reactor/core/publisher/FluxPeekTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
*/
package reactor.core.publisher;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;

import java.util.concurrent.atomic.*;

import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Subscription;
import org.slf4j.*;

import reactor.core.Fuseable;
import reactor.core.scheduler.Schedulers;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.concurrent.QueueSupplier;
import reactor.core.Exceptions;
Expand Down Expand Up @@ -322,4 +326,42 @@ public void syncdoAfterTerminateCalled() {
Assert.assertTrue("onComplete not called back", onTerminate.get());
}

@Test
public void should_reduce_to_10_events() {
for (int i = 0; i < 20; i++) {
AtomicInteger count = new AtomicInteger();
Flux.range(0, 10).flatMap(x ->
Flux.range(0, 2).map(y -> blockingOp(x, y)).subscribeOn(Schedulers.parallel())
.reduce((l, r) -> l + "_" + r)
.doOnSuccess(s -> {count.incrementAndGet();})
).blockLast();

assertEquals(10, count.get());
}
}

@Test
public void should_reduce_to_10_events_conditional() {
for (int i = 0; i < 20; i++) {
AtomicInteger count = new AtomicInteger();
Flux.range(0, 10).flatMap(x ->
Flux.range(0, 2).map(y -> blockingOp(x, y)).subscribeOn(Schedulers.parallel())
.reduce((l, r) -> l + "_" + r)
.doOnSuccess(s -> { count.incrementAndGet() ; })
.filter(v -> true)
).blockLast();

assertEquals(10, count.get());
}
}

static String blockingOp(Integer x, Integer y) {
try {
sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "x" + x + "y" + y;
}

}

0 comments on commit 1a0b64d

Please sign in to comment.