Skip to content

Concurrent subscription to ReplaySubject leads to bogus onNext(NotificationLite) signals #7878

Open
@joepembe

Description

@joepembe

As far as I know, the reactive streams specification imposes no restrictions on when or from which thread(s) a subscriber may subscribe to a source. However, if a subscriber subscribes to a ReplaySubject in parallel with the subject's upstream issuing a legal sequence of signals to that same ReplaySubject, the ReplaySubject can erroneously release an onNext signal carrying a NotificationLite value.

See the following repro:

@Test
public void normal() throws Exception {
    for (int i = 1; i <= 1000000; i++) {
        Subject<String> source = ReplaySubject.create();
        Subject<String> sink = PublishSubject.create();
        TestObserver<String> observer = sink.test();
        Schedulers.computation().scheduleDirect(() -> {
            // issue signals to the source in adherence to the reactive streams specification
            source.onSubscribe(Disposable.empty());
            source.onNext("hello");
            source.onNext("world");
            source.onComplete();
        });
        Schedulers.computation().scheduleDirect(() -> {
            // connect the source to the sink in parallel with the signals issued to the source
            // note the cast() operator, which is here to detect non-String escapees
            source.cast(String.class).subscribe(sink);
        });
        observer.await().assertValues("hello", "world").assertComplete();
    }
}

When I run this, I get the following as output:

ReplaySubjectTest > normal() FAILED
    java.lang.AssertionError: Not completed (latch = 0, values = 2, errors = 1, completions = 0)
        at io.reactivex.rxjava3.observers.BaseTestConsumer.fail(BaseTestConsumer.java:128)
        at io.reactivex.rxjava3.observers.BaseTestConsumer.assertComplete(BaseTestConsumer.java:181)
        at joepembe.ReplaySubjectTest.normal(ReplaySubjectTest.java:57)

        Caused by:
        java.lang.ClassCastException: Cannot cast io.reactivex.rxjava3.internal.util.NotificationLite to java.lang.String
            at java.base/java.lang.Class.cast(Class.java:3606)
            at io.reactivex.rxjava3.internal.functions.Functions$CastToClass.apply(Functions.java:235)
            at io.reactivex.rxjava3.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:58)
            at io.reactivex.rxjava3.subjects.ReplaySubject$UnboundedReplayBuffer.replay(ReplaySubject.java:791)
            at io.reactivex.rxjava3.subjects.ReplaySubject.subscribeActual(ReplaySubject.java:344)
            at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
            at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
            at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
            at joepembe.ReplaySubjectTest.lambda$normal$3(ReplaySubjectTest.java:55)

The same behavior occurs even when using SerializedSubject (not that I would expect anything different to occur, as SerializedSubject merely forwards the subscribe() invocation to the "real" subject):

for (int i = 1; i <= 1000000; i++) {
    Subject<String> source = ReplaySubject.<String>create().toSerialized();
    Subject<String> sink = PublishSubject.<String>create().toSerialized();
    TestObserver<String> observer = sink.test();
    Schedulers.computation().scheduleDirect(() -> {
        // issue signals to the source in adherence to the reactive streams specification
        source.onSubscribe(Disposable.empty());
        source.onNext("hello");
        source.onNext("world");
        source.onComplete();
    });
    Schedulers.computation().scheduleDirect(() -> {
        // connect the source to the sink in parallel with the signals issued to the source
        // note the cast() operator, which is here to detect non-String escapees
        source.cast(String.class).subscribe(sink);
    });
    observer.await().assertValues("hello", "world").assertComplete();
}

Yields:

ReplaySubjectTest > serialized() FAILED
    java.lang.AssertionError: Not completed (latch = 0, values = 2, errors = 1, completions = 0)
        at io.reactivex.rxjava3.observers.BaseTestConsumer.fail(BaseTestConsumer.java:128)
        at io.reactivex.rxjava3.observers.BaseTestConsumer.assertComplete(BaseTestConsumer.java:181)
        at joepembe.ReplaySubjectTest.serialized(ReplaySubjectTest.java:33)

        Caused by:
        java.lang.ClassCastException: Cannot cast io.reactivex.rxjava3.internal.util.NotificationLite to java.lang.String
            at java.base/java.lang.Class.cast(Class.java:3606)
            at io.reactivex.rxjava3.internal.functions.Functions$CastToClass.apply(Functions.java:235)
            at io.reactivex.rxjava3.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:58)
            at io.reactivex.rxjava3.subjects.ReplaySubject$UnboundedReplayBuffer.replay(ReplaySubject.java:791)
            at io.reactivex.rxjava3.subjects.ReplaySubject.subscribeActual(ReplaySubject.java:344)
            at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
            at io.reactivex.rxjava3.subjects.SerializedSubject.subscribeActual(SerializedSubject.java:49)
            at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
            at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
            at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
            at joepembe.ReplaySubjectTest.lambda$serialized$1(ReplaySubjectTest.java:31)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions