Skip to content

Commit d1412bc

Browse files
committed
fix(ConnectableObservable): fix race conditions in ConnectableObservable and refCount.
1 parent 993a2c3 commit d1412bc

File tree

2 files changed

+129
-110
lines changed

2 files changed

+129
-110
lines changed

spec/operators/refCount-spec.ts

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@ describe('ConnectableObservable.prototype.refCount', () => {
1919
});
2020

2121
it('should count references', () => {
22-
const source = Observable.never().publish().refCount();
22+
const connectable = Observable.never().publish();
23+
const refCounted = connectable.refCount();
2324

24-
const sub1 = source.subscribe({ next: function () { //noop
25+
const sub1 = refCounted.subscribe({ next: function () { //noop
2526
} });
26-
const sub2 = source.subscribe({ next: function () { //noop
27+
const sub2 = refCounted.subscribe({ next: function () { //noop
2728
} });
28-
const sub3 = source.subscribe({ next: function () { //noop
29+
const sub3 = refCounted.subscribe({ next: function () { //noop
2930
} });
3031

31-
expect((<any>source).refCount).to.equal(3);
32+
expect((<any>connectable)._refCount).to.equal(3);
3233

3334
sub1.unsubscribe();
3435
sub2.unsubscribe();
@@ -37,30 +38,72 @@ describe('ConnectableObservable.prototype.refCount', () => {
3738

3839
it('should unsub from the source when all other subscriptions are unsubbed', (done: MochaDone) => {
3940
let unsubscribeCalled = false;
40-
const source = new Observable((observer: Rx.Observer<boolean>) => {
41+
const connectable = new Observable((observer: Rx.Observer<boolean>) => {
4142
observer.next(true);
42-
4343
return () => {
4444
unsubscribeCalled = true;
4545
};
46-
}).publish().refCount();
46+
}).publish();
47+
const refCounted = connectable.refCount();
4748

48-
const sub1 = source.subscribe(() => {
49+
const sub1 = refCounted.subscribe(() => {
4950
//noop
5051
});
51-
const sub2 = source.subscribe(() => {
52+
const sub2 = refCounted.subscribe(() => {
5253
//noop
5354
});
54-
const sub3 = source.subscribe((x: any) => {
55-
expect((<any>source).refCount).to.equal(1);
55+
const sub3 = refCounted.subscribe((x: any) => {
56+
expect((<any>connectable)._refCount).to.equal(1);
5657
});
5758

5859
sub1.unsubscribe();
5960
sub2.unsubscribe();
6061
sub3.unsubscribe();
6162

62-
expect((<any>source).refCount).to.equal(0);
63+
expect((<any>connectable)._refCount).to.equal(0);
6364
expect(unsubscribeCalled).to.be.true;
6465
done();
6566
});
66-
});
67+
68+
it('should not unsubscribe when a subscriber synchronously unsubscribes if ' +
69+
'other subscribers are present', () => {
70+
let unsubscribeCalled = false;
71+
const connectable = new Observable((observer: Rx.Observer<boolean>) => {
72+
observer.next(true);
73+
return () => {
74+
unsubscribeCalled = true;
75+
};
76+
}).publishReplay(1);
77+
78+
const refCounted = connectable.refCount();
79+
80+
refCounted.subscribe();
81+
refCounted.subscribe().unsubscribe();
82+
83+
expect((<any>connectable)._refCount).to.equal(1);
84+
expect(unsubscribeCalled).to.be.false;
85+
});
86+
87+
it('should not unsubscribe when a subscriber synchronously unsubscribes if ' +
88+
'other subscribers are present and the source is a Subject', () => {
89+
90+
const arr = [];
91+
const subject = new Rx.Subject();
92+
const connectable = subject.publishReplay(1);
93+
const refCounted = connectable.refCount();
94+
95+
refCounted.subscribe((val) => {
96+
arr.push(val);
97+
});
98+
99+
subject.next('the number one');
100+
101+
refCounted.first().subscribe().unsubscribe();
102+
103+
subject.next('the number two');
104+
105+
expect((<any>connectable)._refCount).to.equal(1);
106+
expect(arr[0]).to.equal('the number one');
107+
expect(arr[1]).to.equal('the number two');
108+
});
109+
});
Lines changed: 72 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import {Subject} from '../Subject';
2+
import {Operator} from '../Operator';
3+
import {Observer} from '../Observer';
24
import {Observable} from '../Observable';
35
import {Subscriber} from '../Subscriber';
46
import {Subscription} from '../Subscription';
@@ -8,8 +10,9 @@ import {Subscription} from '../Subscription';
810
*/
911
export class ConnectableObservable<T> extends Observable<T> {
1012

11-
protected subject: Subject<T>;
12-
protected subscription: Subscription;
13+
protected _subject: Subject<T>;
14+
protected _refCount: number = 0;
15+
protected _connection: Subscription;
1316

1417
constructor(protected source: Observable<T>,
1518
protected subjectFactory: () => Subject<T>) {
@@ -20,133 +23,106 @@ export class ConnectableObservable<T> extends Observable<T> {
2023
return this.getSubject().subscribe(subscriber);
2124
}
2225

23-
protected getSubject() {
24-
const subject = this.subject;
25-
if (subject && !subject.isUnsubscribed) {
26-
return subject;
27-
}
28-
return (this.subject = this.subjectFactory());
26+
protected getSubject(): Subject<T> {
27+
return this._subject || (this._subject = this.subjectFactory());
2928
}
3029

3130
connect(): Subscription {
32-
const source = this.source;
33-
let subscription = this.subscription;
34-
if (subscription && !subscription.isUnsubscribed) {
35-
return subscription;
31+
let connection = this._connection;
32+
if (!connection) {
33+
connection = this.source.subscribe(new ConnectableSubscriber(this.getSubject(), this));
34+
if (connection.isUnsubscribed) {
35+
this._connection = null;
36+
connection = Subscription.EMPTY;
37+
} else {
38+
this._connection = connection;
39+
}
3640
}
37-
subscription = source.subscribe(this.getSubject());
38-
subscription.add(new ConnectableSubscription(this));
39-
return (this.subscription = subscription);
41+
return connection;
4042
}
4143

4244
refCount(): Observable<T> {
43-
return new RefCountObservable(this);
44-
}
45-
46-
/**
47-
* This method is opened for `ConnectableSubscription`.
48-
* Not to call from others.
49-
*/
50-
_closeSubscription(): void {
51-
this.subject = null;
52-
this.subscription = null;
45+
return this.lift(new RefCountOperator<T>(this));
5346
}
5447
}
5548

56-
/**
57-
* We need this JSDoc comment for affecting ESDoc.
58-
* @ignore
59-
* @extends {Ignored}
60-
*/
61-
class ConnectableSubscription extends Subscription {
62-
constructor(protected connectable: ConnectableObservable<any>) {
63-
super();
49+
class ConnectableSubscriber<T> extends Subscriber<T> {
50+
constructor(destination: Observer<T>,
51+
private connectable: ConnectableObservable<T>) {
52+
super(destination);
53+
}
54+
protected _error(err: any): void {
55+
this._unsubscribe();
56+
super._error(err);
57+
}
58+
protected _complete(): void {
59+
this._unsubscribe();
60+
super._complete();
6461
}
65-
6662
protected _unsubscribe() {
67-
const connectable = this.connectable;
68-
connectable._closeSubscription();
69-
this.connectable = null;
63+
const { connectable } = this;
64+
if (connectable) {
65+
this.connectable = null;
66+
(<any> connectable)._refCount = 0;
67+
(<any> connectable)._subject = null;
68+
(<any> connectable)._connection = null;
69+
}
7070
}
7171
}
7272

73-
/**
74-
* We need this JSDoc comment for affecting ESDoc.
75-
* @ignore
76-
* @extends {Ignored}
77-
*/
78-
class RefCountObservable<T> extends Observable<T> {
79-
connection: Subscription;
80-
81-
constructor(protected connectable: ConnectableObservable<T>,
82-
public refCount: number = 0) {
83-
super();
73+
class RefCountOperator<T> implements Operator<T, T> {
74+
constructor(private connectable: ConnectableObservable<T>) {
8475
}
76+
call(subscriber: Subscriber<T>, source: any): any {
8577

86-
protected _subscribe(subscriber: Subscriber<T>) {
87-
const connectable = this.connectable;
88-
const refCountSubscriber: RefCountSubscriber<T> = new RefCountSubscriber(subscriber, this);
89-
const subscription = connectable.subscribe(refCountSubscriber);
90-
if (!subscription.isUnsubscribed && ++this.refCount === 1) {
91-
refCountSubscriber.connection = this.connection = connectable.connect();
78+
const { connectable } = this;
79+
(<any> connectable)._refCount++;
80+
81+
const refCounter = new RefCountSubscriber(subscriber, connectable);
82+
const subscription = source._subscribe(refCounter);
83+
84+
if (!refCounter.isUnsubscribed) {
85+
(<any> refCounter).connection = connectable.connect();
9286
}
87+
9388
return subscription;
9489
}
9590
}
9691

97-
/**
98-
* We need this JSDoc comment for affecting ESDoc.
99-
* @ignore
100-
* @extends {Ignored}
101-
*/
10292
class RefCountSubscriber<T> extends Subscriber<T> {
103-
connection: Subscription;
10493

105-
constructor(public destination: Subscriber<T>,
106-
private refCountObservable: RefCountObservable<T>) {
107-
super(null);
108-
this.connection = refCountObservable.connection;
109-
destination.add(this);
110-
}
94+
private connection: Subscription;
11195

112-
protected _next(value: T) {
113-
this.destination.next(value);
96+
constructor(destination: Subscriber<T>,
97+
private connectable: ConnectableObservable<T>) {
98+
super(destination);
11499
}
115100

116-
protected _error(err: any) {
117-
this._resetConnectable();
118-
this.destination.error(err);
119-
}
101+
protected _unsubscribe() {
120102

121-
protected _complete() {
122-
this._resetConnectable();
123-
this.destination.complete();
124-
}
103+
const { connectable } = this;
104+
if (!connectable) {
105+
this.connection = null;
106+
return;
107+
}
125108

126-
private _resetConnectable() {
127-
const observable = this.refCountObservable;
128-
const obsConnection = observable.connection;
129-
const subConnection = this.connection;
130-
if (subConnection && subConnection === obsConnection) {
131-
observable.refCount = 0;
132-
obsConnection.unsubscribe();
133-
observable.connection = null;
134-
this.unsubscribe();
109+
this.connectable = null;
110+
const refCount = (<any> connectable)._refCount;
111+
if (refCount <= 0) {
112+
this.connection = null;
113+
return;
135114
}
136-
}
137115

138-
protected _unsubscribe() {
139-
const observable = this.refCountObservable;
140-
if (observable.refCount === 0) {
116+
(<any> connectable)._refCount = refCount - 1;
117+
if (refCount > 1) {
118+
this.connection = null;
141119
return;
142120
}
143-
if (--observable.refCount === 0) {
144-
const obsConnection = observable.connection;
145-
const subConnection = this.connection;
146-
if (subConnection && subConnection === obsConnection) {
147-
obsConnection.unsubscribe();
148-
observable.connection = null;
149-
}
121+
122+
const { connection } = this;
123+
if (connection) {
124+
this.connection = null;
125+
connection.unsubscribe();
150126
}
151127
}
152128
}

0 commit comments

Comments
 (0)