Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: shared Observables not actually repeatable/retryable #678

Closed
staltz opened this issue Nov 9, 2015 · 17 comments
Closed

Bug: shared Observables not actually repeatable/retryable #678

staltz opened this issue Nov 9, 2015 · 17 comments
Labels
bug Confirmed bug

Comments

@staltz
Copy link
Member

staltz commented Nov 9, 2015

var source = Rx.Observable.interval(100).take(3)
  .concat(Rx.Observable.throw(new Error('BOOM')))
  .share();

source.retry(3).subscribe(
  x => console.log(x), 
  e => console.error(e.message), 
  () => console.log('|'));

source.retry(3).subscribe(
  x => console.log('   '+x), 
  e => console.error('   '+e.message), 
  () => console.log('   |'));

Runs as

0
   0
1
   1
2
   2

while we expected

0
   0
1
   1
2
   2
0
   0
1
   1
2
   2
0
   0
1
   1
2
   2
BOOM
   BOOM

I'm investigating some solutions in ConnectableObservable to be able to support @Blesh's use cases for retryable/repeatable Observables, but it seems like this is a bug.

@benlesh
Copy link
Member

benlesh commented Nov 9, 2015

Weird, I thought I had tests around that from the beginning. It's probably my most recent changes to that that broke the behavior. I was using share and retry in my AngularConnect example (alpha.7 I think), and it was fine.

@staltz
Copy link
Member Author

staltz commented Nov 9, 2015

We have this test:

  it('should retry just fine', function () {
    var e1 =  cold('---a--b--c--d--e--#');
    var expected = '---a--b--c--d--e-----a--b--c--d--e--#';

    expectObservable(e1.share().retry(1)).toBe(expected);
  })

But notice how there is just one subscriber in this case.

@staltz
Copy link
Member Author

staltz commented Nov 9, 2015

Also, this test doesn't have multiple subscribers: https://github.com/ReactiveX/RxJS/blob/master/spec/operators/multicast-spec.js#L88

@benlesh
Copy link
Member

benlesh commented Nov 9, 2015

Hmm... it looks like retryWhen works fine, but the other two do not. My example was using retryWhen.

It doesn't seem like a problem with ConnectableObservable, my suspicion lies with the Subscribers for repeat and retry. I suspect they're being disabled by a flag when the error or completion hits them.

@benlesh
Copy link
Member

benlesh commented Nov 9, 2015

Also, this test doesn't have multiple subscribers: https://github.com/ReactiveX/RxJS/blob/master/spec/operators/multicast-spec.js#L88

It looks like it resubscribes whenever it completes: https://github.com/ReactiveX/RxJS/blob/e9b13d0178a7ebad2cccd217fc0249449804831d/spec/operators/multicast-spec.js#L102

(also, pro-tip: Hit Y before you copy a link in GitHub to get the SHA in the URL so it doesn't change over time.)

@staltz
Copy link
Member Author

staltz commented Nov 10, 2015

Ok, I'm narrowing down the problem. RetryWhen works because it usually retries asynchronously some time later. Retry is synchronous, and a race condition happens with RefCounting.

UPDATE: I changed this message to include more details

var source = Rx.Observable.create(observer => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.error(new Error('BOOM');
}).share();

// Subscriber A
source.retry(1).subscribe(
  x => console.log(x), 
  e => console.error(e.message), 
  () => console.log('|'));

// Subscriber B
source.retry(1).subscribe(
  x => console.log('   '+x), 
  e => console.error('   '+e.message), 
  () => console.log('   |'));
  1. A subscribes to ...refCount().retry(1) (refCount counter becomes 1)
  2. Subject1 is created and cold source is subscribed.
  3. B subscribes to ...refCount().retry(1) (refCount counter becomes 2)
  4. Subject1 raises error to A's RefCountSubscriber (refCount counter becomes 1)
  5. A's Retry kicks in and re-subscribes (refCount counter becomes 2)
  6. Subject1 raises error to B's RefCountSubscriber (refCount counter becomes 1)
  7. B's Retry kicks in and re-subscribes (refCount counter becomes 2)

As you can see, the refCount counter never goes to zero, which means the cold source is not resubscribed.

Working on a solution...

@staltz
Copy link
Member Author

staltz commented Nov 10, 2015

I tried a solution where the RefCountSubscriber resets the counter when it encounters an error, but I still have problems with retry's synchronous behavior kicking in too soon (because Subject errors are propagated depth-first to observers, not breadth-first):

  1. A subscribes to ...refCount().retry(1) (refCount counter becomes 1)
  2. Subject1 is created and cold source is subscribed.
  3. B subscribes to ...refCount().retry(1) (refCount counter becomes 2)
  4. Subject1 raises error to A's RefCountSubscriber (refCount counter is RESET to 0)
  5. A's Retry kicks in and re-subscribes (refCount counter becomes 1)
  6. Subject2 is created and cold source is re-subscribed.
  7. Subject1 raises error to B's RefCountSubscriber (refCount counter is RESET to 0)
  8. B's Retry kicks in and re-subscribes (refCount counter becomes 1)
  9. Subject3 is created and cold source is re-subscribed.

Conclusion: only subscriber B actually performs a retry, the subscriber A doesn't because it gets interrupted by "subject1.observer[subscriberB].error()".

A sloppy solution for this would be to change retry to resubscribe on next tick. But it's sloppy. Any suggestions @trxcllnt @Blesh ?

@staltz
Copy link
Member Author

staltz commented Nov 12, 2015

Update to this issue:

I've been exploring how does RxJava solve this problem. (I verified that RxJava gives the correct expected output for the same code snippet in this issue)

My current guess is it has all to do with refCount and ConnectableObservable, we might need to treat synchronous Observables as a special case. E.g.

public final class OnSubscribeRefCount<T> implements OnSubscribe<T> {
    @Override
    public void call(final Subscriber<? super T> subscriber) {

        lock.lock();
        if (subscriptionCount.incrementAndGet() == 1) {

            final AtomicBoolean writeLocked = new AtomicBoolean(true);

            try {
                // need to use this overload of connect to ensure that
                // baseSubscription is set in the case that source is a
                // synchronous Observable
                source.connect(onSubscribe(subscriber, writeLocked));
            } finally {
                // need to cover the case where the source is subscribed to
                // outside of this class thus preventing the Action1 passed
                // to source.connect above being called
                if (writeLocked.get()) {
                    // Action1 passed to source.connect was not called
                    lock.unlock();
                }
            }
        } else {
            try {
                // ready to subscribe to source so do it
                doSubscribe(subscriber, baseSubscription);
            } finally {
                // release the read lock
                lock.unlock();
            }
        }
    }
}

@staltz
Copy link
Member Author

staltz commented Nov 12, 2015

Also, confirmed that I have to "RefCountSubscriber resets the counter when it encounters an error", because RxJava does it too, see cleanup():

public final class OnSubscribeRefCount<T> implements OnSubscribe<T> {
    void doSubscribe(final Subscriber<? super T> subscriber, final CompositeSubscription currentBase) {
        // handle unsubscribing from the base subscription
        subscriber.add(disconnect(currentBase));

        source.unsafeSubscribe(new Subscriber<T>(subscriber) {
            @Override
            public void onError(Throwable e) {
                cleanup();
                subscriber.onError(e);
            }
            @Override
            public void onNext(T t) {
                subscriber.onNext(t);
            }
            @Override
            public void onCompleted() {
                cleanup();
                subscriber.onCompleted();
            }
            void cleanup() {
                // on error or completion we need to unsubscribe the base subscription
                // and set the subscriptionCount to 0 
                lock.lock();
                try {
                    if (baseSubscription == currentBase) {
                        baseSubscription.unsubscribe();
                        baseSubscription = new CompositeSubscription();
                        subscriptionCount.set(0);
                    }
                } finally {
                    lock.unlock();
                }
            }
        });
    }
}

@staltz
Copy link
Member Author

staltz commented Nov 12, 2015

Another mind dump:

I noticed RxJava does the following, and RxJS doesn't, but should. Seems very obvious the refCount producer should emit synchronously to the first subscriber everything it has, then error, then retry, and only after that, start the same for the second subscriber. In other words, RxJS subscribers are interleaved but should execute serially when the source is entirely synchronous.

  1. A subscribes to ...refCount().retry(1) (refCount counter becomes 1)
  2. Subject1 is created and cold source is subscribed.
  3. Subject1 raises error to A's RefCountSubscriber (refCount counter is RESET to 0)
  4. A's Retry kicks in and re-subscribes (refCount counter becomes 1)
  5. Subject2 is created and cold source is re-subscribed.
  6. Subject2 raises error to A's RefCountSubscriber (refCount counter is RESET to 0)
  7. - - - - - - - - - - -
  8. B subscribes to ...refCount().retry(1) (refCount counter becomes 1)
  9. Subject3 is created and cold source is re-subscribed.
  10. Subject3 raises error to B's RefCountSubscriber (refCount counter is RESET to 0)
  11. B's Retry kicks in and re-subscribes (refCount counter becomes 1)
  12. Subject4 is created and cold source is re-subscribed.
  13. Subject4 raises error to B's RefCountSubscriber (refCount counter is RESET to 0)
0
1
2
0
1
2
BOOM
   0
   1
   2
   0
   1
   2
   BOOM

@trxcllnt
Copy link
Member

@staltz In other words, RxJS subscribers are interleaved but should execute serially when the source is entirely synchronous.

Yes, I was wondering about this myself. Let me know if you'd like me to take a look at this as well.

@staltz
Copy link
Member Author

staltz commented Nov 12, 2015

I solved that one already, using this:

  /**
   * Instructs the ConnectableObservable to begin emitting the items from its
   * underlying source to its Subscribers.
   *
   * @param onSubscribe a function that receives the connection subscription
   * before the subscription to source happens, allowing the caller to
   * synchronously disconnect a synchronous source.
   */
  _callbackConnect(onSubscribe: (subscription: Subscription<T>) => void): void {
    let subscription = this.subscription;
    if (subscription && !subscription.isUnsubscribed) {
      onSubscribe(subscription);
      return;
    }
    this.subscription = subscription = new Subscription();
    onSubscribe(subscription);
    subscription.add(this.source.subscribe(this._getSubject()));
    subscription.add(new ConnectableSubscription(this));
  }

As per RxJava.
Now I'm onto solving still the main bug described in this issue.

@benlesh
Copy link
Member

benlesh commented Nov 12, 2015

@staltz and @trxcllnt ... we have a big of unfinished business with the es7 observable spec that relates to this I think.

We need to add a start event to Observer interface that fires on subscription and provides the subscriber. This seems pretty related to the onSubscribe event in the commit above.

staltz added a commit to staltz/RxJSNext that referenced this issue Nov 13, 2015
…and refCounting

Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous
retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other
asynchronous scenarios. This commit is a major rework of ConnectableObservable.

Resolves bug ReactiveX#678.
@benlesh
Copy link
Member

benlesh commented Nov 14, 2015

@staltz, once #726 lands, we may be able to leverage the start handler and create a different approach to this. Also @abersnaze had mentioned a method of doing multicast refCounting that used a custom subject with some special logic in the start method. Food for thought.

staltz added a commit to staltz/RxJSNext that referenced this issue Nov 19, 2015
…and refCounting

Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous
retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other
asynchronous scenarios. This commit is a major rework of ConnectableObservable.

Resolves bug ReactiveX#678.
staltz added a commit to staltz/RxJSNext that referenced this issue Nov 19, 2015
…and refCounting

Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous
retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other
asynchronous scenarios. This commit is a major rework of ConnectableObservable.

Resolves bug ReactiveX#678.
staltz added a commit to staltz/RxJSNext that referenced this issue Nov 19, 2015
…antics

When the ConnectableObservable with refCount always shares the same instance of the underlying
subject (such as in publish, publishReplay, publishBehavior), the subscription to the connectable
observable should NOT incur additional subscriptions to the underlying cold source. See how tests
for publish/publishBehavior/publishReplay were updated to assert that only one subscription to the
underlying cold source happens, not multiple, because as soon as the multicasting subject raises an
error, this error impedes subsequent subscriptions to the cold source from happening.

Related to ReactiveX#678.
staltz added a commit to staltz/RxJSNext that referenced this issue Nov 25, 2015
…and refCounting

When the ConnectableObservable with refCount always shares the same instance of
the underlying subject (such as in publish, publishReplay, publishBehavior), the
subscription to the connectable observable should NOT incur additional subscriptions
to the underlying cold source. See how tests for
publish/publishBehavior/publishReplay were updated to assert that only one
subscription to the underlying cold source happens, not multiple, because as soon
as the multicasting subject raises an error, this error impedes subsequent
subscriptions to the cold source from happening.

Fix ConnectableObservable, its connect() method, and the RefCountObservable to
support synchronous retry/repeat in the presence of multiple subscribers, and to
support retry/repeat in other asynchronous scenarios.

Resolves bug ReactiveX#678.
benlesh pushed a commit that referenced this issue Nov 25, 2015
…and refCounting

When the ConnectableObservable with refCount always shares the same instance of
the underlying subject (such as in publish, publishReplay, publishBehavior), the
subscription to the connectable observable should NOT incur additional subscriptions
to the underlying cold source. See how tests for
publish/publishBehavior/publishReplay were updated to assert that only one
subscription to the underlying cold source happens, not multiple, because as soon
as the multicasting subject raises an error, this error impedes subsequent
subscriptions to the cold source from happening.

Fix ConnectableObservable, its connect() method, and the RefCountObservable to
support synchronous retry/repeat in the presence of multiple subscribers, and to
support retry/repeat in other asynchronous scenarios.

Resolves bug #678.
@benlesh
Copy link
Member

benlesh commented Dec 3, 2015

@staltz are you satisfied that this can be closed now?

@staltz
Copy link
Member Author

staltz commented Dec 4, 2015

Definitely can be closed.

@lock
Copy link

lock bot commented Jun 7, 2018

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 7, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Confirmed bug
Projects
None yet
Development

No branches or pull requests

3 participants