Skip to content

AbstractOnSubscribe throws exception when onNext contains a request for more  #2853

Closed
@davidmoten

Description

@davidmoten

AbstractOnSubscribe improperly throws an exception when the onNext method in a subscriber makes a call to request (all calls synchronous). The error thrown is this:

java.lang.IllegalStateException: This is not reentrant nor threadsafe!
    at rx.observables.AbstractOnSubscribe$SubscriptionState.use(AbstractOnSubscribe.java:590)
    at rx.observables.AbstractOnSubscribe$SubscriptionProducer.doNext(AbstractOnSubscribe.java:360)
    at rx.observables.AbstractOnSubscribe$SubscriptionProducer.request(AbstractOnSubscribe.java:345)
    at rx.Subscriber.request(Subscriber.java:145)
    at rx.observables.AbstractOnSubscribeTest$20.onNext(AbstractOnSubscribeTest.java:541)
    at rx.observables.AbstractOnSubscribeTest$20.onNext(AbstractOnSubscribeTest.java:1)
    at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130)
    at rx.observables.AbstractOnSubscribe$SubscriptionState.accept(AbstractOnSubscribe.java:533)
    at rx.observables.AbstractOnSubscribe$SubscriptionProducer.doNext(AbstractOnSubscribe.java:367)
    at rx.observables.AbstractOnSubscribe$SubscriptionProducer.request(AbstractOnSubscribe.java:337)
    at rx.Subscriber.setProducer(Subscriber.java:175)
    at rx.Subscriber.setProducer(Subscriber.java:171)
    at rx.observables.AbstractOnSubscribe.call(AbstractOnSubscribe.java:191)
    at rx.observables.AbstractOnSubscribe.call(AbstractOnSubscribe.java:1)
    at rx.Observable.subscribe(Observable.java:7585)
    at rx.observables.AbstractOnSubscribeTest.testCanRequestInOnNext(AbstractOnSubscribeTest.java:527)

Here's a failing unit test:

    @Test
    public void testCanRequestInOnNext() {
        AbstractOnSubscribe<Integer, Void> aos = new AbstractOnSubscribe<Integer, Void>() {
            @Override
            protected void next(SubscriptionState<Integer, Void> state) {
                state.onNext(1);
                state.onCompleted();
            }
        };
        final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
        aos.toObservable().subscribe(new Subscriber<Integer>() {

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                exception.set(e);
            }

            @Override
            public void onNext(Integer t) {
                request(1);
            }
        });
        if (exception.get()!=null)
            exception.get().printStackTrace();
        assertNull(exception.get());
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions