Skip to content

Commit f934706

Browse files
Merge pull request ReactiveX#1947 from akarnokd/ReplaySubjectFirstEmissionFix
Fixed first emission racing with pre and post subscription.
2 parents b02e572 + 053cc4f commit f934706

File tree

4 files changed

+303
-24
lines changed

4 files changed

+303
-24
lines changed

src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,42 @@ public void call(SubjectObserver<T> o) {
102102
o.index(lastIndex);
103103
}
104104
};
105+
ssm.onAdded = new Action1<SubjectObserver<T>>() {
106+
@Override
107+
public void call(SubjectObserver<T> o) {
108+
synchronized (o) {
109+
if (!o.first || o.emitting) {
110+
return;
111+
}
112+
o.first = false;
113+
o.emitting = true;
114+
}
115+
boolean skipFinal = false;
116+
try {
117+
for (;;) {
118+
int idx = o.index();
119+
int sidx = state.index;
120+
if (idx != sidx) {
121+
Integer j = state.replayObserverFromIndex(idx, o);
122+
o.index(j);
123+
}
124+
synchronized (o) {
125+
if (sidx == state.index) {
126+
o.emitting = false;
127+
skipFinal = true;
128+
break;
129+
}
130+
}
131+
}
132+
} finally {
133+
if (!skipFinal) {
134+
synchronized (o) {
135+
o.emitting = false;
136+
}
137+
}
138+
}
139+
}
140+
};
105141
ssm.onTerminated = new Action1<SubjectObserver<T>>() {
106142
@Override
107143
public void call(SubjectObserver<T> o) {
@@ -264,6 +300,42 @@ static final <T> ReplaySubject<T> createWithState(final BoundedState<T> state,
264300
Action1<SubjectObserver<T>> onStart) {
265301
SubjectSubscriptionManager<T> ssm = new SubjectSubscriptionManager<T>();
266302
ssm.onStart = onStart;
303+
ssm.onAdded = new Action1<SubjectObserver<T>>() {
304+
@Override
305+
public void call(SubjectObserver<T> o) {
306+
synchronized (o) {
307+
if (!o.first || o.emitting) {
308+
return;
309+
}
310+
o.first = false;
311+
o.emitting = true;
312+
}
313+
boolean skipFinal = false;
314+
try {
315+
for (;;) {
316+
NodeList.Node<Object> idx = o.index();
317+
NodeList.Node<Object> sidx = state.tail();
318+
if (idx != sidx) {
319+
NodeList.Node<Object> j = state.replayObserverFromIndex(idx, o);
320+
o.index(j);
321+
}
322+
synchronized (o) {
323+
if (sidx == state.tail()) {
324+
o.emitting = false;
325+
skipFinal = true;
326+
break;
327+
}
328+
}
329+
}
330+
} finally {
331+
if (!skipFinal) {
332+
synchronized (o) {
333+
o.emitting = false;
334+
}
335+
}
336+
}
337+
}
338+
};
267339
ssm.onTerminated = new Action1<SubjectObserver<T>>() {
268340

269341
@Override
@@ -355,9 +427,10 @@ public boolean hasObservers() {
355427

356428
private boolean caughtUp(SubjectObserver<? super T> o) {
357429
if (!o.caughtUp) {
358-
o.caughtUp = true;
359-
state.replayObserver(o);
360-
o.index(null); // once caught up, no need for the index anymore
430+
if (state.replayObserver(o)) {
431+
o.caughtUp = true;
432+
o.index(null); // once caught up, no need for the index anymore
433+
}
361434
return false;
362435
} else {
363436
// it was caught up so proceed the "raw route"
@@ -423,11 +496,20 @@ public boolean terminated() {
423496
}
424497

425498
@Override
426-
public void replayObserver(SubjectObserver<? super T> observer) {
499+
public boolean replayObserver(SubjectObserver<? super T> observer) {
500+
501+
synchronized (observer) {
502+
observer.first = false;
503+
if (observer.emitting) {
504+
return false;
505+
}
506+
}
507+
427508
Integer lastEmittedLink = observer.index();
428509
if (lastEmittedLink != null) {
429510
int l = replayObserverFromIndex(lastEmittedLink, observer);
430511
observer.index(l);
512+
return true;
431513
} else {
432514
throw new IllegalStateException("failed to find lastEmittedLink for: " + observer);
433515
}
@@ -525,10 +607,18 @@ public Node<Object> tail() {
525607
return tail;
526608
}
527609
@Override
528-
public void replayObserver(SubjectObserver<? super T> observer) {
610+
public boolean replayObserver(SubjectObserver<? super T> observer) {
611+
synchronized (observer) {
612+
observer.first = false;
613+
if (observer.emitting) {
614+
return false;
615+
}
616+
}
617+
529618
NodeList.Node<Object> lastEmittedLink = observer.index();
530619
NodeList.Node<Object> l = replayObserverFromIndex(lastEmittedLink, observer);
531620
observer.index(l);
621+
return true;
532622
}
533623

534624
@Override
@@ -571,8 +661,9 @@ interface ReplayState<T, I> {
571661
/**
572662
* Replay contents to the given observer.
573663
* @param observer the receiver of events
664+
* @return true if the subject has caught up
574665
*/
575-
void replayObserver(SubjectObserver<? super T> observer);
666+
boolean replayObserver(SubjectObserver<? super T> observer);
576667
/**
577668
* Replay the buffered values from an index position and return a new index
578669
* @param idx the current index position

src/test/java/rx/subjects/BehaviorSubjectTest.java

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,19 @@
2525
import static org.mockito.Mockito.times;
2626
import static org.mockito.Mockito.verify;
2727

28-
import org.junit.Test;
28+
import java.util.concurrent.*;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
31+
import org.junit.*;
2932
import org.mockito.InOrder;
3033
import org.mockito.Mockito;
3134

32-
import rx.Observable;
33-
import rx.Observer;
34-
import rx.Subscription;
35+
import rx.*;
3536
import rx.exceptions.CompositeException;
3637
import rx.exceptions.OnErrorNotImplementedException;
37-
import rx.functions.Func1;
38+
import rx.functions.*;
3839
import rx.observers.TestSubscriber;
40+
import rx.schedulers.Schedulers;
3941

4042
public class BehaviorSubjectTest {
4143

@@ -417,4 +419,67 @@ public void testOnErrorThrowsDoesntPreventDelivery2() {
417419
// even though the onError above throws we should still receive it on the other subscriber
418420
assertEquals(1, ts.getOnErrorEvents().size());
419421
}
422+
@Test
423+
public void testEmissionSubscriptionRace() throws Exception {
424+
Scheduler s = Schedulers.io();
425+
Scheduler.Worker worker = Schedulers.io().createWorker();
426+
for (int i = 0; i < 50000; i++) {
427+
if (i % 1000 == 0) {
428+
System.out.println(i);
429+
}
430+
final BehaviorSubject<Object> rs = BehaviorSubject.create();
431+
432+
final CountDownLatch finish = new CountDownLatch(1);
433+
final CountDownLatch start = new CountDownLatch(1);
434+
435+
worker.schedule(new Action0() {
436+
@Override
437+
public void call() {
438+
try {
439+
start.await();
440+
} catch (Exception e1) {
441+
e1.printStackTrace();
442+
}
443+
rs.onNext(1);
444+
}
445+
});
446+
447+
final AtomicReference<Object> o = new AtomicReference<Object>();
448+
449+
rs.subscribeOn(s).observeOn(Schedulers.io())
450+
.subscribe(new Observer<Object>() {
451+
452+
@Override
453+
public void onCompleted() {
454+
o.set(-1);
455+
finish.countDown();
456+
}
457+
458+
@Override
459+
public void onError(Throwable e) {
460+
o.set(e);
461+
finish.countDown();
462+
}
463+
464+
@Override
465+
public void onNext(Object t) {
466+
o.set(t);
467+
finish.countDown();
468+
}
469+
470+
});
471+
start.countDown();
472+
473+
if (!finish.await(5, TimeUnit.SECONDS)) {
474+
System.out.println(o.get());
475+
System.out.println(rs.hasObservers());
476+
rs.onCompleted();
477+
Assert.fail("Timeout @ " + i);
478+
break;
479+
} else {
480+
Assert.assertEquals(1, o.get());
481+
rs.onCompleted();
482+
}
483+
}
484+
}
420485
}

src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.concurrent.atomic.AtomicReference;
2727

28-
import org.junit.Test;
28+
import org.junit.*;
2929

30-
import rx.Observable;
30+
import rx.*;
3131
import rx.Observable.OnSubscribe;
3232
import rx.Subscriber;
33-
import rx.functions.Action1;
33+
import rx.functions.*;
3434
import rx.observers.TestSubscriber;
3535
import rx.schedulers.Schedulers;
3636

@@ -337,4 +337,67 @@ public void run() {
337337
}
338338
}
339339
}
340+
@Test
341+
public void testReplaySubjectEmissionSubscriptionRace() throws Exception {
342+
Scheduler s = Schedulers.io();
343+
Scheduler.Worker worker = Schedulers.io().createWorker();
344+
for (int i = 0; i < 50000; i++) {
345+
if (i % 1000 == 0) {
346+
System.out.println(i);
347+
}
348+
final ReplaySubject<Object> rs = ReplaySubject.createWithSize(2);
349+
350+
final CountDownLatch finish = new CountDownLatch(1);
351+
final CountDownLatch start = new CountDownLatch(1);
352+
353+
worker.schedule(new Action0() {
354+
@Override
355+
public void call() {
356+
try {
357+
start.await();
358+
} catch (Exception e1) {
359+
e1.printStackTrace();
360+
}
361+
rs.onNext(1);
362+
}
363+
});
364+
365+
final AtomicReference<Object> o = new AtomicReference<Object>();
366+
367+
rs.subscribeOn(s).observeOn(Schedulers.io())
368+
.subscribe(new Observer<Object>() {
369+
370+
@Override
371+
public void onCompleted() {
372+
o.set(-1);
373+
finish.countDown();
374+
}
375+
376+
@Override
377+
public void onError(Throwable e) {
378+
o.set(e);
379+
finish.countDown();
380+
}
381+
382+
@Override
383+
public void onNext(Object t) {
384+
o.set(t);
385+
finish.countDown();
386+
}
387+
388+
});
389+
start.countDown();
390+
391+
if (!finish.await(5, TimeUnit.SECONDS)) {
392+
System.out.println(o.get());
393+
System.out.println(rs.hasObservers());
394+
rs.onCompleted();
395+
Assert.fail("Timeout @ " + i);
396+
break;
397+
} else {
398+
Assert.assertEquals(1, o.get());
399+
rs.onCompleted();
400+
}
401+
}
402+
}
340403
}

0 commit comments

Comments
 (0)