From e98b20da5c4559cc56cc6c2e5c177feed4274bdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Tue, 15 Mar 2016 11:17:00 +0100 Subject: [PATCH] 1.x: observeOn - fix in-sequence termination/unsubscription --- .../internal/operators/OperatorObserveOn.java | 22 +++++++++--------- .../operators/OperatorObserveOnTest.java | 23 +++++++++++++++++++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 98464efb89..51d6fc7a23 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -183,15 +183,10 @@ public void call() { // less frequently (usually after each RxRingBuffer.SIZE elements) for (;;) { - if (checkTerminated(finished, q.isEmpty(), localChild, q)) { - return; - } - long requestAmount = requested.get(); - boolean unbounded = requestAmount == Long.MAX_VALUE; long currentEmission = 0L; - while (requestAmount != 0L) { + while (requestAmount != currentEmission) { boolean done = finished; Object v = q.poll(); boolean empty = v == null; @@ -205,14 +200,19 @@ public void call() { } localChild.onNext(localOn.getValue(v)); - - requestAmount--; - currentEmission--; + + currentEmission++; emitted++; } - if (currentEmission != 0L && !unbounded) { - requested.addAndGet(currentEmission); + if (requestAmount == currentEmission) { + if (checkTerminated(finished, q.isEmpty(), localChild, q)) { + return; + } + } + + if (currentEmission != 0L) { + BackpressureUtils.produced(requested, currentEmission); } missed = counter.addAndGet(-missed); diff --git a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index 0b4b98bc8e..d0ba44be23 100644 --- a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -834,4 +834,27 @@ public void testErrorDelayedAsync() { ts.assertError(TestException.class); ts.assertNotCompleted(); } + + @Test + public void requestExactCompletesImmediately() { +TestSubscriber ts = TestSubscriber.create(0); + + TestScheduler test = Schedulers.test(); + + Observable.range(1, 10).observeOn(test).subscribe(ts); + + test.advanceTimeBy(1, TimeUnit.SECONDS); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(10); + + test.advanceTimeBy(1, TimeUnit.SECONDS); + + ts.assertValueCount(10); + ts.assertNoErrors(); + ts.assertCompleted(); + } }