Skip to content

Commit

Permalink
2.x: enable operator fusion in onBackpressureBuffer (#4622)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Sep 28, 2016
1 parent c3a1d91 commit 1c82063
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.reactivex.functions.Action;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.queue.*;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.internal.util.BackpressureHelper;

public final class FlowableOnBackpressureBuffer<T> extends AbstractFlowableWithUpstream<T, T> {
Expand All @@ -44,9 +44,10 @@ protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new BackpressureBufferSubscriber<T>(s, bufferSize, unbounded, delayError, onOverflow));
}

static final class BackpressureBufferSubscriber<T> extends AtomicInteger implements Subscriber<T>, Subscription {
static final class BackpressureBufferSubscriber<T> extends BasicIntQueueSubscription<T> implements Subscriber<T> {

private static final long serialVersionUID = -2514538129242366402L;

final Subscriber<? super T> actual;
final SimpleQueue<T> queue;
final boolean delayError;
Expand All @@ -61,6 +62,8 @@ static final class BackpressureBufferSubscriber<T> extends AtomicInteger impleme

final AtomicLong requested = new AtomicLong();

boolean outputFused;

BackpressureBufferSubscriber(Subscriber<? super T> actual, int bufferSize,
boolean unbounded, boolean delayError, Action onOverflow) {
this.actual = actual;
Expand Down Expand Up @@ -101,38 +104,52 @@ public void onNext(T t) {
onError(ex);
return;
}
drain();
if (outputFused) {
actual.onNext(null);
} else {
drain();
}
}

@Override
public void onError(Throwable t) {
error = t;
done = true;
drain();
if (outputFused) {
actual.onError(t);
} else {
drain();
}
}

@Override
public void onComplete() {
done = true;
drain();
if (outputFused) {
actual.onComplete();
} else {
drain();
}
}

@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
if (!outputFused) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
}

@Override
public void cancel() {
if (!cancelled) {
cancelled = true;
s.cancel();

if (getAndIncrement() == 0) {
queue.clear();
s.cancel();
}
}
}
Expand Down Expand Up @@ -204,7 +221,6 @@ void drain() {

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a) {
if (cancelled) {
s.cancel();
queue.clear();
return true;
}
Expand Down Expand Up @@ -234,5 +250,29 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a) {
}
return false;
}

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}

@Override
public T poll() throws Exception {
return queue.poll();
}

@Override
public void clear() {
queue.clear();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.reactivex.Flowable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.fuseable.QueueSubscription;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.*;
Expand Down Expand Up @@ -247,4 +248,47 @@ public void delayErrorBuffer() {
ts.request(1);
ts.assertFailure(TestException.class, 1);
}

@Test
public void fusedNormal() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueSubscription.ANY);

Flowable.range(1, 10).onBackpressureBuffer().subscribe(ts);

ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueSubscription.ASYNC))
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

@Test
public void fusedError() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueSubscription.ANY);

Flowable.<Integer>error(new TestException()).onBackpressureBuffer().subscribe(ts);

ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueSubscription.ASYNC))
.assertFailure(TestException.class);
}

@Test
public void fusedPreconsume() throws Exception {

TestSubscriber<Integer> ts = Flowable.range(1, 1000 * 1000)
.onBackpressureBuffer()
.observeOn(Schedulers.single())
.test(0L);

ts.assertEmpty();

Thread.sleep(100);

ts.request(1000 * 1000);

ts
.awaitDone(5, TimeUnit.SECONDS)
.assertValueCount(1000 * 1000)
.assertNoErrors()
.assertComplete();
}
}

0 comments on commit 1c82063

Please sign in to comment.