Skip to content

Commit 672fa8e

Browse files
committed
1.x: observeOn - fix in-sequence termination/unsubscription
1 parent a42d0bf commit 672fa8e

File tree

2 files changed

+23
-11
lines changed

2 files changed

+23
-11
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -183,15 +183,10 @@ public void call() {
183183
// less frequently (usually after each RxRingBuffer.SIZE elements)
184184

185185
for (;;) {
186-
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
187-
return;
188-
}
189-
190186
long requestAmount = requested.get();
191-
boolean unbounded = requestAmount == Long.MAX_VALUE;
192187
long currentEmission = 0L;
193188

194-
while (requestAmount != 0L) {
189+
while (requestAmount != currentEmission) {
195190
boolean done = finished;
196191
Object v = q.poll();
197192
boolean empty = v == null;
@@ -205,14 +200,19 @@ public void call() {
205200
}
206201

207202
localChild.onNext(localOn.getValue(v));
208-
209-
requestAmount--;
210-
currentEmission--;
203+
204+
currentEmission++;
211205
emitted++;
212206
}
213207

214-
if (currentEmission != 0L && !unbounded) {
215-
requested.addAndGet(currentEmission);
208+
if (requestAmount == currentEmission) {
209+
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
210+
return;
211+
}
212+
}
213+
214+
if (currentEmission != 0L) {
215+
BackpressureUtils.produced(requested, currentEmission);
216216
}
217217

218218
missed = counter.addAndGet(-missed);

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,4 +834,16 @@ public void testErrorDelayedAsync() {
834834
ts.assertError(TestException.class);
835835
ts.assertNotCompleted();
836836
}
837+
838+
@Test
839+
public void requestExactCompletesImmediately() {
840+
TestSubscriber<Integer> ts = TestSubscriber.create(10);
841+
842+
Observable.range(1, 10).observeOn(Schedulers.computation()).subscribe(ts);
843+
844+
ts.awaitTerminalEvent(1, TimeUnit.SECONDS);
845+
ts.assertValueCount(10);
846+
ts.assertNoErrors();
847+
ts.assertCompleted();
848+
}
837849
}

0 commit comments

Comments
 (0)