From 21fba6387a719f0365b3d5e4e974b89a0026184d Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 15 Jun 2017 18:30:26 -0700 Subject: [PATCH] feat(refCount): add higher-order lettable version of refCount NOTE: I am a little worried about a circular dependency here between ConnectableObservable and the lettable refCount --- src/observable/ConnectableObservable.ts | 3 +- src/operators/index.ts | 1 + src/operators/refCount.ts | 94 +++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 src/operators/refCount.ts diff --git a/src/observable/ConnectableObservable.ts b/src/observable/ConnectableObservable.ts index a53eb1e319..d38e3a3282 100644 --- a/src/observable/ConnectableObservable.ts +++ b/src/observable/ConnectableObservable.ts @@ -3,6 +3,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription, TeardownLogic } from '../Subscription'; +import { refCount as higherOrderRefCount } from '../operators'; /** * @class ConnectableObservable @@ -49,7 +50,7 @@ export class ConnectableObservable extends Observable { } refCount(): Observable { - return this.lift(new RefCountOperator(this)); + return higherOrderRefCount()(this); } } diff --git a/src/operators/index.ts b/src/operators/index.ts index e08803359f..f99b877438 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -9,6 +9,7 @@ export { min } from './min'; export { multicast } from './multicast'; export { publish } from './publish'; export { reduce } from './reduce'; +export { refCount } from './refCount'; export { scan } from './scan'; export { switchAll } from './switchAll'; export { switchMap } from './switchMap'; diff --git a/src/operators/refCount.ts b/src/operators/refCount.ts new file mode 100644 index 0000000000..e66320e4b0 --- /dev/null +++ b/src/operators/refCount.ts @@ -0,0 +1,94 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Subscription, TeardownLogic } from '../Subscription'; +import { MonoTypeOperatorFunction } from '../interfaces'; +import { ConnectableObservable } from '../observable/ConnectableObservable'; +import { Observable } from '../Observable'; + +export function refCount(): MonoTypeOperatorFunction { + return function refCountOperatorFunction(source: ConnectableObservable): Observable { + return source.lift(new RefCountOperator(source)); + }; +} + +class RefCountOperator implements Operator { + constructor(private connectable: ConnectableObservable) { + } + call(subscriber: Subscriber, source: any): TeardownLogic { + + const { connectable } = this; + ( connectable)._refCount++; + + const refCounter = new RefCountSubscriber(subscriber, connectable); + const subscription = source.subscribe(refCounter); + + if (!refCounter.closed) { + ( refCounter).connection = connectable.connect(); + } + + return subscription; + } +} + +class RefCountSubscriber extends Subscriber { + + private connection: Subscription; + + constructor(destination: Subscriber, + private connectable: ConnectableObservable) { + super(destination); + } + + protected _unsubscribe() { + + const { connectable } = this; + if (!connectable) { + this.connection = null; + return; + } + + this.connectable = null; + const refCount = ( connectable)._refCount; + if (refCount <= 0) { + this.connection = null; + return; + } + + ( connectable)._refCount = refCount - 1; + if (refCount > 1) { + this.connection = null; + return; + } + + /// + // Compare the local RefCountSubscriber's connection Subscription to the + // connection Subscription on the shared ConnectableObservable. In cases + // where the ConnectableObservable source synchronously emits values, and + // the RefCountSubscriber's downstream Observers synchronously unsubscribe, + // execution continues to here before the RefCountOperator has a chance to + // supply the RefCountSubscriber with the shared connection Subscription. + // For example: + // ``` + // Observable.range(0, 10) + // .publish() + // .refCount() + // .take(5) + // .subscribe(); + // ``` + // In order to account for this case, RefCountSubscriber should only dispose + // the ConnectableObservable's shared connection Subscription if the + // connection Subscription exists, *and* either: + // a. RefCountSubscriber doesn't have a reference to the shared connection + // Subscription yet, or, + // b. RefCountSubscriber's connection Subscription reference is identical + // to the shared connection Subscription + /// + const { connection } = this; + const sharedConnection = ( connectable)._connection; + this.connection = null; + + if (sharedConnection && (!connection || sharedConnection === connection)) { + sharedConnection.unsubscribe(); + } + } +}