Skip to content

Commit

Permalink
fix(ConnectableObservable): fix refCount synchronous subscription sem…
Browse files Browse the repository at this point in the history
…antics

When the ConnectableObservable with refCount always shares the same instance of the underlying
subject (such as in publish, publishReplay, publishBehavior), the subscription to the connectable
observable should NOT incur additional subscriptions to the underlying cold source. See how tests
for publish/publishBehavior/publishReplay were updated to assert that only one subscription to the
underlying cold source happens, not multiple, because as soon as the multicasting subject raises an
error, this error impedes subsequent subscriptions to the cold source from happening.

Related to ReactiveX#678.
  • Loading branch information
staltz committed Nov 19, 2015
1 parent 4dcfcc3 commit 14522f8
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 27 deletions.
9 changes: 2 additions & 7 deletions spec/operators/publish-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ describe('Observable.prototype.publish()', function () {

it('should NOT be retryable', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = ['^ !',
' (^!)',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publish().refCount().retry(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '-1-2-3----4-#';
Expand All @@ -152,9 +149,7 @@ describe('Observable.prototype.publish()', function () {

it('should NOT be repeatable', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = ['^ !',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publish().refCount().repeat(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '-1-2-3----4-|';
Expand Down
9 changes: 2 additions & 7 deletions spec/operators/publishBehavior-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,7 @@ describe('Observable.prototype.publishBehavior()', function () {

it('should NOT be retryable', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = ['^ !',
' (^!)',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publishBehavior('0').refCount().retry(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '01-2-3----4-#';
Expand All @@ -151,9 +148,7 @@ describe('Observable.prototype.publishBehavior()', function () {

it('should NOT be repeatable', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = ['^ !',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publishBehavior('0').refCount().repeat(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '01-2-3----4-|';
Expand Down
9 changes: 2 additions & 7 deletions spec/operators/publishReplay-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,7 @@ describe('Observable.prototype.publishReplay()', function () {

it('should NOT be retryable', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = ['^ !',
' (^!)',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publishReplay(1).refCount().retry(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '-1-2-3----4-(444#)';
Expand All @@ -170,9 +167,7 @@ describe('Observable.prototype.publishReplay()', function () {

it('should NOT be repeatable', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = ['^ !',
' (^!)',
' (^!)'];
var sourceSubs = '^ !';
var published = source.publishReplay(1).refCount().repeat(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '-1-2-3----4-(44|)';
Expand Down
13 changes: 7 additions & 6 deletions src/observables/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ class RefCountObservable<T> extends Observable<T> {
refCountSubscriber.myConnection = this.connection;
const subscription = connectable.subscribe(refCountSubscriber);

if (++this.refCount === 1) {
connectable.connect(subscription => {
this.connection = subscription;
refCountSubscriber.myConnection = subscription;
if (!subscription.isUnsubscribed && ++this.refCount === 1) {
connectable.connect(_subscription => {
refCountSubscriber.myConnection = this.connection = _subscription;
});
}
return subscription;
Expand Down Expand Up @@ -131,7 +130,8 @@ class RefCountSubscriber<T> extends Subscriber<T> {

_resetConnectable() {
const observable = this.refCountObservable;
if (this.myConnection === observable.connection) {
const myConnection = this.myConnection;
if (myConnection && myConnection === observable.connection) {
observable.refCount = 0;
observable.connection.unsubscribe();
observable.connection = void 0;
Expand All @@ -144,7 +144,8 @@ class RefCountSubscriber<T> extends Subscriber<T> {
if (observable.refCount === 0) {
return;
}
if (--observable.refCount === 0 && this.myConnection === observable.connection) {
const myConnection = this.myConnection;
if (--observable.refCount === 0 && myConnection && myConnection === observable.connection) {
observable.connection.unsubscribe();
observable.connection = void 0;
}
Expand Down

0 comments on commit 14522f8

Please sign in to comment.