Open
Description
As far as I know, the reactive streams specification imposes no restrictions on when or from which thread(s) a subscriber may subscribe to a source. However, if a subscriber subscribes to a ReplaySubject
in parallel with the subject's upstream issuing a legal sequence of signals to that same ReplaySubject
, the ReplaySubject
can erroneously release an onNext
signal carrying a NotificationLite
value.
See the following repro:
@Test
public void normal() throws Exception {
for (int i = 1; i <= 1000000; i++) {
Subject<String> source = ReplaySubject.create();
Subject<String> sink = PublishSubject.create();
TestObserver<String> observer = sink.test();
Schedulers.computation().scheduleDirect(() -> {
// issue signals to the source in adherence to the reactive streams specification
source.onSubscribe(Disposable.empty());
source.onNext("hello");
source.onNext("world");
source.onComplete();
});
Schedulers.computation().scheduleDirect(() -> {
// connect the source to the sink in parallel with the signals issued to the source
// note the cast() operator, which is here to detect non-String escapees
source.cast(String.class).subscribe(sink);
});
observer.await().assertValues("hello", "world").assertComplete();
}
}
When I run this, I get the following as output:
ReplaySubjectTest > normal() FAILED
java.lang.AssertionError: Not completed (latch = 0, values = 2, errors = 1, completions = 0)
at io.reactivex.rxjava3.observers.BaseTestConsumer.fail(BaseTestConsumer.java:128)
at io.reactivex.rxjava3.observers.BaseTestConsumer.assertComplete(BaseTestConsumer.java:181)
at joepembe.ReplaySubjectTest.normal(ReplaySubjectTest.java:57)
Caused by:
java.lang.ClassCastException: Cannot cast io.reactivex.rxjava3.internal.util.NotificationLite to java.lang.String
at java.base/java.lang.Class.cast(Class.java:3606)
at io.reactivex.rxjava3.internal.functions.Functions$CastToClass.apply(Functions.java:235)
at io.reactivex.rxjava3.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:58)
at io.reactivex.rxjava3.subjects.ReplaySubject$UnboundedReplayBuffer.replay(ReplaySubject.java:791)
at io.reactivex.rxjava3.subjects.ReplaySubject.subscribeActual(ReplaySubject.java:344)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at joepembe.ReplaySubjectTest.lambda$normal$3(ReplaySubjectTest.java:55)
The same behavior occurs even when using SerializedSubject
(not that I would expect anything different to occur, as SerializedSubject
merely forwards the subscribe()
invocation to the "real" subject):
for (int i = 1; i <= 1000000; i++) {
Subject<String> source = ReplaySubject.<String>create().toSerialized();
Subject<String> sink = PublishSubject.<String>create().toSerialized();
TestObserver<String> observer = sink.test();
Schedulers.computation().scheduleDirect(() -> {
// issue signals to the source in adherence to the reactive streams specification
source.onSubscribe(Disposable.empty());
source.onNext("hello");
source.onNext("world");
source.onComplete();
});
Schedulers.computation().scheduleDirect(() -> {
// connect the source to the sink in parallel with the signals issued to the source
// note the cast() operator, which is here to detect non-String escapees
source.cast(String.class).subscribe(sink);
});
observer.await().assertValues("hello", "world").assertComplete();
}
Yields:
ReplaySubjectTest > serialized() FAILED
java.lang.AssertionError: Not completed (latch = 0, values = 2, errors = 1, completions = 0)
at io.reactivex.rxjava3.observers.BaseTestConsumer.fail(BaseTestConsumer.java:128)
at io.reactivex.rxjava3.observers.BaseTestConsumer.assertComplete(BaseTestConsumer.java:181)
at joepembe.ReplaySubjectTest.serialized(ReplaySubjectTest.java:33)
Caused by:
java.lang.ClassCastException: Cannot cast io.reactivex.rxjava3.internal.util.NotificationLite to java.lang.String
at java.base/java.lang.Class.cast(Class.java:3606)
at io.reactivex.rxjava3.internal.functions.Functions$CastToClass.apply(Functions.java:235)
at io.reactivex.rxjava3.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:58)
at io.reactivex.rxjava3.subjects.ReplaySubject$UnboundedReplayBuffer.replay(ReplaySubject.java:791)
at io.reactivex.rxjava3.subjects.ReplaySubject.subscribeActual(ReplaySubject.java:344)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at io.reactivex.rxjava3.subjects.SerializedSubject.subscribeActual(SerializedSubject.java:49)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at joepembe.ReplaySubjectTest.lambda$serialized$1(ReplaySubjectTest.java:31)