Skip to content

Commit 21fba63

Browse files
committed
feat(refCount): add higher-order lettable version of refCount
NOTE: I am a little worried about a circular dependency here between ConnectableObservable and the lettable refCount
1 parent 2f12572 commit 21fba63

File tree

3 files changed

+97
-1
lines changed

3 files changed

+97
-1
lines changed

src/observable/ConnectableObservable.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { Operator } from '../Operator';
33
import { Observable } from '../Observable';
44
import { Subscriber } from '../Subscriber';
55
import { Subscription, TeardownLogic } from '../Subscription';
6+
import { refCount as higherOrderRefCount } from '../operators';
67

78
/**
89
* @class ConnectableObservable<T>
@@ -49,7 +50,7 @@ export class ConnectableObservable<T> extends Observable<T> {
4950
}
5051

5152
refCount(): Observable<T> {
52-
return this.lift(new RefCountOperator<T>(this));
53+
return higherOrderRefCount()(this);
5354
}
5455
}
5556

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export { min } from './min';
99
export { multicast } from './multicast';
1010
export { publish } from './publish';
1111
export { reduce } from './reduce';
12+
export { refCount } from './refCount';
1213
export { scan } from './scan';
1314
export { switchAll } from './switchAll';
1415
export { switchMap } from './switchMap';

src/operators/refCount.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { Operator } from '../Operator';
2+
import { Subscriber } from '../Subscriber';
3+
import { Subscription, TeardownLogic } from '../Subscription';
4+
import { MonoTypeOperatorFunction } from '../interfaces';
5+
import { ConnectableObservable } from '../observable/ConnectableObservable';
6+
import { Observable } from '../Observable';
7+
8+
export function refCount<T>(): MonoTypeOperatorFunction<T> {
9+
return function refCountOperatorFunction(source: ConnectableObservable<T>): Observable<T> {
10+
return source.lift(new RefCountOperator(source));
11+
};
12+
}
13+
14+
class RefCountOperator<T> implements Operator<T, T> {
15+
constructor(private connectable: ConnectableObservable<T>) {
16+
}
17+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
18+
19+
const { connectable } = this;
20+
(<any> connectable)._refCount++;
21+
22+
const refCounter = new RefCountSubscriber(subscriber, connectable);
23+
const subscription = source.subscribe(refCounter);
24+
25+
if (!refCounter.closed) {
26+
(<any> refCounter).connection = connectable.connect();
27+
}
28+
29+
return subscription;
30+
}
31+
}
32+
33+
class RefCountSubscriber<T> extends Subscriber<T> {
34+
35+
private connection: Subscription;
36+
37+
constructor(destination: Subscriber<T>,
38+
private connectable: ConnectableObservable<T>) {
39+
super(destination);
40+
}
41+
42+
protected _unsubscribe() {
43+
44+
const { connectable } = this;
45+
if (!connectable) {
46+
this.connection = null;
47+
return;
48+
}
49+
50+
this.connectable = null;
51+
const refCount = (<any> connectable)._refCount;
52+
if (refCount <= 0) {
53+
this.connection = null;
54+
return;
55+
}
56+
57+
(<any> connectable)._refCount = refCount - 1;
58+
if (refCount > 1) {
59+
this.connection = null;
60+
return;
61+
}
62+
63+
///
64+
// Compare the local RefCountSubscriber's connection Subscription to the
65+
// connection Subscription on the shared ConnectableObservable. In cases
66+
// where the ConnectableObservable source synchronously emits values, and
67+
// the RefCountSubscriber's downstream Observers synchronously unsubscribe,
68+
// execution continues to here before the RefCountOperator has a chance to
69+
// supply the RefCountSubscriber with the shared connection Subscription.
70+
// For example:
71+
// ```
72+
// Observable.range(0, 10)
73+
// .publish()
74+
// .refCount()
75+
// .take(5)
76+
// .subscribe();
77+
// ```
78+
// In order to account for this case, RefCountSubscriber should only dispose
79+
// the ConnectableObservable's shared connection Subscription if the
80+
// connection Subscription exists, *and* either:
81+
// a. RefCountSubscriber doesn't have a reference to the shared connection
82+
// Subscription yet, or,
83+
// b. RefCountSubscriber's connection Subscription reference is identical
84+
// to the shared connection Subscription
85+
///
86+
const { connection } = this;
87+
const sharedConnection = (<any> connectable)._connection;
88+
this.connection = null;
89+
90+
if (sharedConnection && (!connection || sharedConnection === connection)) {
91+
sharedConnection.unsubscribe();
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)