diff --git a/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java b/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java index 8e658cd5d7..4b0618ed7f 100644 --- a/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java @@ -3,9 +3,9 @@ * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. @@ -18,14 +18,22 @@ import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.function.*; import org.junit.*; import org.junit.rules.TestName; import org.reactivestreams.*; +import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; +import io.reactivex.FlowableEmitter; +import io.reactivex.FlowableOnSubscribe; import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.functions.*; +import io.reactivex.functions.BiFunction; +import io.reactivex.functions.Consumer; +import io.reactivex.functions.Function; +import io.reactivex.functions.Predicate; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.schedulers.Schedulers; @@ -97,17 +105,17 @@ public void testObserveOnWithSlowConsumer() { AtomicInteger c = new AtomicInteger(); TestSubscriber ts = new TestSubscriber(); incrementingIntegers(c).observeOn(Schedulers.computation()).map( - new Function() { - @Override - public Integer apply(Integer i) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); + new Function() { + @Override + public Integer apply(Integer i) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return i; } - return i; } - } ).take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); @@ -177,9 +185,9 @@ public void testMergeAsyncThenObserveOnLoop() { incrementingIntegers(c2).subscribeOn(Schedulers.computation())); merged - .observeOn(Schedulers.io()) - .take(NUM) - .subscribe(ts); + .observeOn(Schedulers.io()) + .take(NUM) + .subscribe(ts); ts.awaitTerminalEvent(5, TimeUnit.SECONDS); @@ -220,13 +228,13 @@ public void testFlatMapSync() { TestSubscriber ts = new TestSubscriber(); incrementingIntegers(c) - .flatMap(new Function>() { - @Override - public Publisher apply(Integer i) { - return incrementingIntegers(new AtomicInteger()).take(10); - } - }) - .take(NUM).subscribe(ts); + .flatMap(new Function>() { + @Override + public Publisher apply(Integer i) { + return incrementingIntegers(new AtomicInteger()).take(10); + } + }) + .take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); @@ -244,17 +252,17 @@ public void testFlatMapAsync() { TestSubscriber ts = new TestSubscriber(); incrementingIntegers(c) - .subscribeOn(Schedulers.computation()) - .flatMap(new Function>() { - @Override - public Publisher apply(Integer i) { - return incrementingIntegers(new AtomicInteger()) - .take(10) - .subscribeOn(Schedulers.computation()); - } - } - ) - .take(NUM).subscribe(ts); + .subscribeOn(Schedulers.computation()) + .flatMap(new Function>() { + @Override + public Publisher apply(Integer i) { + return incrementingIntegers(new AtomicInteger()) + .take(10) + .subscribeOn(Schedulers.computation()); + } + } + ) + .take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); @@ -284,7 +292,7 @@ public Integer apply(Integer t1, Integer t2) { }); zipped.take(NUM) - .subscribe(ts); + .subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); @@ -435,43 +443,43 @@ public void testUserSubscriberUsingRequestAsync() throws InterruptedException { incrementingIntegers(c).subscribeOn(Schedulers.newThread()).subscribe( new ResourceSubscriber() { - @Override - public void onStart() { - request(100); - } + @Override + public void onStart() { + request(100); + } - @Override - public void onComplete() { - latch.countDown(); - } + @Override + public void onComplete() { + latch.countDown(); + } - @Override - public void onError(Throwable e) { - latch.countDown(); - } + @Override + public void onError(Throwable e) { + latch.countDown(); + } - @Override - public void onNext(Integer t) { - int total = totalReceived.incrementAndGet(); - received.incrementAndGet(); - boolean done = false; - if (total >= 2000) { - done = true; - dispose(); - } - if (received.get() == 100) { - batches.incrementAndGet(); - received.set(0); - if (!done) { - request(100); + @Override + public void onNext(Integer t) { + int total = totalReceived.incrementAndGet(); + received.incrementAndGet(); + boolean done = false; + if (total >= 2000) { + done = true; + dispose(); + } + if (received.get() == 100) { + batches.incrementAndGet(); + received.set(0); + if (!done) { + request(100); + } + } + if (done) { + latch.countDown(); + } } - } - if (done) { - latch.countDown(); - } - } - }); + }); latch.await(); System.out.println("testUserSubscriberUsingRequestAsync => Received: " + totalReceived.get() + " Emitted: " + c.get() + " Request Batches: " + batches.get()); @@ -486,18 +494,18 @@ public void testFirehoseFailsAsExpected() { TestSubscriber ts = new TestSubscriber(); firehose(c).observeOn(Schedulers.computation()) - .map(new Function() { - @Override - public Integer apply(Integer v) { - try { - Thread.sleep(10); - } catch (Exception e) { - e.printStackTrace(); - } - return v; - } - }) - .subscribe(ts); + .map(new Function() { + @Override + public Integer apply(Integer v) { + try { + Thread.sleep(10); + } catch (Exception e) { + e.printStackTrace(); + } + return v; + } + }) + .subscribe(ts); ts.awaitTerminalEvent(); System.out.println("testFirehoseFailsAsExpected => Received: " + ts.valueCount() + " Emitted: " + c.get()); @@ -529,8 +537,8 @@ public void testOnBackpressureDrop() { AtomicInteger c = new AtomicInteger(); TestSubscriber ts = new TestSubscriber(); firehose(c).onBackpressureDrop() - .observeOn(Schedulers.computation()) - .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); + .observeOn(Schedulers.computation()) + .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); @@ -555,21 +563,21 @@ public void testOnBackpressureDropWithAction() { TestSubscriber ts = new TestSubscriber(); firehose(emitCount) - .onBackpressureDrop(new Consumer() { - @Override - public void accept(Integer v) { - dropCount.incrementAndGet(); - } - }) - .doOnNext(new Consumer() { - @Override - public void accept(Integer v) { - passCount.incrementAndGet(); - } - }) - .observeOn(Schedulers.computation()) - .map(SLOW_PASS_THRU) - .take(NUM).subscribe(ts); + .onBackpressureDrop(new Consumer() { + @Override + public void accept(Integer v) { + dropCount.incrementAndGet(); + } + }) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) { + passCount.incrementAndGet(); + } + }) + .observeOn(Schedulers.computation()) + .map(SLOW_PASS_THRU) + .take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); @@ -594,7 +602,7 @@ public void testOnBackpressureDropSynchronous() { AtomicInteger c = new AtomicInteger(); TestSubscriber ts = new TestSubscriber(); firehose(c).onBackpressureDrop() - .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); + .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); @@ -622,7 +630,7 @@ public void accept(Integer j) { dropCount.incrementAndGet(); } }) - .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); + .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); @@ -652,9 +660,9 @@ public boolean test(Integer t1) { return t1 < 100000; } }) - .onBackpressureBuffer() - .observeOn(Schedulers.computation()) - .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); + .onBackpressureBuffer() + .observeOn(Schedulers.computation()) + .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); @@ -664,6 +672,59 @@ public boolean test(Integer t1) { assertEquals(NUM - 1, ts.values().get(NUM - 1).intValue()); } + @Test + public void testFlowable2() throws Exception { + + final Flowable flowable = Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter e) throws Exception { + AtomicInteger integer = new AtomicInteger(0); + + while (integer.get() < 1000) { + e.onNext(integer.getAndIncrement()); + } + Thread.sleep(10); + while (integer.get() < 1000) { + e.onNext(integer.getAndIncrement()); + } + + e.onComplete(); + } + }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()); + + flowable.blockingSubscribe(new Subscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + s.request(1); + } + + @Override + public void onNext(Integer integer) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + System.out.println("value:" + integer); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + System.out.println("onComplete"); + + } + }); + + } + /** * A synchronous Observable that will emit incrementing integers as requested. * @@ -735,6 +796,7 @@ public void subscribe(Subscriber s) { static final Function SLOW_PASS_THRU = new Function() { volatile int sink; + @Override public Integer apply(Integer t1) { // be slow ... but faster than Thread.sleep(1)