From 14522f844caca2d23c92b7a0a06c0bd86ccc6a01 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Thu, 19 Nov 2015 16:35:27 +0200 Subject: [PATCH] fix(ConnectableObservable): fix refCount synchronous subscription semantics When the ConnectableObservable with refCount always shares the same instance of the underlying subject (such as in publish, publishReplay, publishBehavior), the subscription to the connectable observable should NOT incur additional subscriptions to the underlying cold source. See how tests for publish/publishBehavior/publishReplay were updated to assert that only one subscription to the underlying cold source happens, not multiple, because as soon as the multicasting subject raises an error, this error impedes subsequent subscriptions to the cold source from happening. Related to #678. --- spec/operators/publish-spec.js | 9 ++------- spec/operators/publishBehavior-spec.js | 9 ++------- spec/operators/publishReplay-spec.js | 9 ++------- src/observables/ConnectableObservable.ts | 13 +++++++------ 4 files changed, 13 insertions(+), 27 deletions(-) diff --git a/spec/operators/publish-spec.js b/spec/operators/publish-spec.js index 4e06f8f0f2b..3b9528f40f8 100644 --- a/spec/operators/publish-spec.js +++ b/spec/operators/publish-spec.js @@ -132,10 +132,7 @@ describe('Observable.prototype.publish()', function () { it('should NOT be retryable', function () { var source = cold('-1-2-3----4-#'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publish().refCount().retry(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-#'; @@ -152,9 +149,7 @@ describe('Observable.prototype.publish()', function () { it('should NOT be repeatable', function () { var source = cold('-1-2-3----4-|'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publish().refCount().repeat(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-|'; diff --git a/spec/operators/publishBehavior-spec.js b/spec/operators/publishBehavior-spec.js index ad6e2808eaf..881666ff1e4 100644 --- a/spec/operators/publishBehavior-spec.js +++ b/spec/operators/publishBehavior-spec.js @@ -131,10 +131,7 @@ describe('Observable.prototype.publishBehavior()', function () { it('should NOT be retryable', function () { var source = cold('-1-2-3----4-#'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishBehavior('0').refCount().retry(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '01-2-3----4-#'; @@ -151,9 +148,7 @@ describe('Observable.prototype.publishBehavior()', function () { it('should NOT be repeatable', function () { var source = cold('-1-2-3----4-|'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishBehavior('0').refCount().repeat(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '01-2-3----4-|'; diff --git a/spec/operators/publishReplay-spec.js b/spec/operators/publishReplay-spec.js index c61c786b889..794bea6920b 100644 --- a/spec/operators/publishReplay-spec.js +++ b/spec/operators/publishReplay-spec.js @@ -150,10 +150,7 @@ describe('Observable.prototype.publishReplay()', function () { it('should NOT be retryable', function () { var source = cold('-1-2-3----4-#'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishReplay(1).refCount().retry(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-(444#)'; @@ -170,9 +167,7 @@ describe('Observable.prototype.publishReplay()', function () { it('should NOT be repeatable', function () { var source = cold('-1-2-3----4-|'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishReplay(1).refCount().repeat(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-(44|)'; diff --git a/src/observables/ConnectableObservable.ts b/src/observables/ConnectableObservable.ts index 683e2530124..12815a75e9f 100644 --- a/src/observables/ConnectableObservable.ts +++ b/src/observables/ConnectableObservable.ts @@ -96,10 +96,9 @@ class RefCountObservable extends Observable { refCountSubscriber.myConnection = this.connection; const subscription = connectable.subscribe(refCountSubscriber); - if (++this.refCount === 1) { - connectable.connect(subscription => { - this.connection = subscription; - refCountSubscriber.myConnection = subscription; + if (!subscription.isUnsubscribed && ++this.refCount === 1) { + connectable.connect(_subscription => { + refCountSubscriber.myConnection = this.connection = _subscription; }); } return subscription; @@ -131,7 +130,8 @@ class RefCountSubscriber extends Subscriber { _resetConnectable() { const observable = this.refCountObservable; - if (this.myConnection === observable.connection) { + const myConnection = this.myConnection; + if (myConnection && myConnection === observable.connection) { observable.refCount = 0; observable.connection.unsubscribe(); observable.connection = void 0; @@ -144,7 +144,8 @@ class RefCountSubscriber extends Subscriber { if (observable.refCount === 0) { return; } - if (--observable.refCount === 0 && this.myConnection === observable.connection) { + const myConnection = this.myConnection; + if (--observable.refCount === 0 && myConnection && myConnection === observable.connection) { observable.connection.unsubscribe(); observable.connection = void 0; }