Closed
Description
AbstractOnSubscribe
improperly throws an exception when the onNext
method in a subscriber makes a call to request
(all calls synchronous). The error thrown is this:
java.lang.IllegalStateException: This is not reentrant nor threadsafe!
at rx.observables.AbstractOnSubscribe$SubscriptionState.use(AbstractOnSubscribe.java:590)
at rx.observables.AbstractOnSubscribe$SubscriptionProducer.doNext(AbstractOnSubscribe.java:360)
at rx.observables.AbstractOnSubscribe$SubscriptionProducer.request(AbstractOnSubscribe.java:345)
at rx.Subscriber.request(Subscriber.java:145)
at rx.observables.AbstractOnSubscribeTest$20.onNext(AbstractOnSubscribeTest.java:541)
at rx.observables.AbstractOnSubscribeTest$20.onNext(AbstractOnSubscribeTest.java:1)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130)
at rx.observables.AbstractOnSubscribe$SubscriptionState.accept(AbstractOnSubscribe.java:533)
at rx.observables.AbstractOnSubscribe$SubscriptionProducer.doNext(AbstractOnSubscribe.java:367)
at rx.observables.AbstractOnSubscribe$SubscriptionProducer.request(AbstractOnSubscribe.java:337)
at rx.Subscriber.setProducer(Subscriber.java:175)
at rx.Subscriber.setProducer(Subscriber.java:171)
at rx.observables.AbstractOnSubscribe.call(AbstractOnSubscribe.java:191)
at rx.observables.AbstractOnSubscribe.call(AbstractOnSubscribe.java:1)
at rx.Observable.subscribe(Observable.java:7585)
at rx.observables.AbstractOnSubscribeTest.testCanRequestInOnNext(AbstractOnSubscribeTest.java:527)
Here's a failing unit test:
@Test
public void testCanRequestInOnNext() {
AbstractOnSubscribe<Integer, Void> aos = new AbstractOnSubscribe<Integer, Void>() {
@Override
protected void next(SubscriptionState<Integer, Void> state) {
state.onNext(1);
state.onCompleted();
}
};
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
aos.toObservable().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
exception.set(e);
}
@Override
public void onNext(Integer t) {
request(1);
}
});
if (exception.get()!=null)
exception.get().printStackTrace();
assertNull(exception.get());
}