File tree Expand file tree Collapse file tree 1 file changed +30
-21
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +30
-21
lines changed Original file line number Diff line number Diff line change @@ -218,15 +218,17 @@ private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? ext
218
218
private void handleScalarSynchronousObservableWithoutRequestLimits (ScalarSynchronousObservable <? extends T > t ) {
219
219
T value = t .get ();
220
220
if (getEmitLock ()) {
221
+ boolean moreToDrain ;
221
222
try {
222
223
actual .onNext (value );
223
- return ;
224
224
} finally {
225
- if (releaseEmitLock ()) {
226
- drainQueuesIfNeeded ();
227
- }
228
- request (1 );
225
+ moreToDrain = releaseEmitLock ();
229
226
}
227
+ if (moreToDrain ) {
228
+ drainQueuesIfNeeded ();
229
+ }
230
+ request (1 );
231
+ return ;
230
232
} else {
231
233
initScalarValueQueueIfNeeded ();
232
234
try {
@@ -241,22 +243,28 @@ private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchro
241
243
private void handleScalarSynchronousObservableWithRequestLimits (ScalarSynchronousObservable <? extends T > t ) {
242
244
if (getEmitLock ()) {
243
245
boolean emitted = false ;
246
+ boolean moreToDrain ;
247
+ boolean isReturn = false ;
244
248
try {
245
249
long r = mergeProducer .requested ;
246
250
if (r > 0 ) {
247
251
emitted = true ;
248
252
actual .onNext (t .get ());
249
253
MergeProducer .REQUESTED .decrementAndGet (mergeProducer );
250
254
// we handle this Observable without ever incrementing the wip or touching other machinery so just return here
251
- return ;
255
+ isReturn = true ;
252
256
}
253
257
} finally {
254
- if (releaseEmitLock ()) {
255
- drainQueuesIfNeeded ();
256
- }
257
- if (emitted ) {
258
- request (1 );
259
- }
258
+ moreToDrain = releaseEmitLock ();
259
+ }
260
+ if (moreToDrain ) {
261
+ drainQueuesIfNeeded ();
262
+ }
263
+ if (emitted ) {
264
+ request (1 );
265
+ }
266
+ if (isReturn ) {
267
+ return ;
260
268
}
261
269
}
262
270
@@ -301,20 +309,21 @@ private boolean drainQueuesIfNeeded() {
301
309
while (true ) {
302
310
if (getEmitLock ()) {
303
311
int emitted = 0 ;
312
+ boolean moreToDrain ;
304
313
try {
305
314
emitted = drainScalarValueQueue ();
306
315
drainChildrenQueues ();
307
316
} finally {
308
- boolean moreToDrain = releaseEmitLock ();
309
- // request outside of lock
310
- if (emitted > 0 ) {
311
- request (emitted );
312
- }
313
- if (!moreToDrain ) {
314
- return true ;
315
- }
316
- // otherwise we'll loop and get whatever was added
317
+ moreToDrain = releaseEmitLock ();
318
+ }
319
+ // request outside of lock
320
+ if (emitted > 0 ) {
321
+ request (emitted );
322
+ }
323
+ if (!moreToDrain ) {
324
+ return true ;
317
325
}
326
+ // otherwise we'll loop and get whatever was added
318
327
} else {
319
328
return false ;
320
329
}
You can’t perform that action at this time.
0 commit comments