diff --git a/src/operator/delayWhen.ts b/src/operator/delayWhen.ts index b272f2793f..25f822e6a6 100644 --- a/src/operator/delayWhen.ts +++ b/src/operator/delayWhen.ts @@ -1,11 +1,6 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { Observable } from '../Observable'; -import { Subscription, TeardownLogic } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { Observable } from '../Observable'; +import { delayWhen as higherOrder } from '../operators'; /** * Delays the emission of items from the source Observable by a given time span @@ -54,151 +49,5 @@ import { subscribeToResult } from '../util/subscribeToResult'; */ export function delayWhen(this: Observable, delayDurationSelector: (value: T) => Observable, subscriptionDelay?: Observable): Observable { - if (subscriptionDelay) { - return new SubscriptionDelayObservable(this, subscriptionDelay) - .lift(new DelayWhenOperator(delayDurationSelector)); - } - return this.lift(new DelayWhenOperator(delayDurationSelector)); -} - -class DelayWhenOperator implements Operator { - constructor(private delayDurationSelector: (value: T) => Observable) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new DelayWhenSubscriber(subscriber, this.delayDurationSelector)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class DelayWhenSubscriber extends OuterSubscriber { - private completed: boolean = false; - private delayNotifierSubscriptions: Array = []; - private values: Array = []; - - constructor(destination: Subscriber, - private delayDurationSelector: (value: T) => Observable) { - super(destination); - } - - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.destination.next(outerValue); - this.removeSubscription(innerSub); - this.tryComplete(); - } - - notifyError(error: any, innerSub: InnerSubscriber): void { - this._error(error); - } - - notifyComplete(innerSub: InnerSubscriber): void { - const value = this.removeSubscription(innerSub); - if (value) { - this.destination.next(value); - } - this.tryComplete(); - } - - protected _next(value: T): void { - try { - const delayNotifier = this.delayDurationSelector(value); - if (delayNotifier) { - this.tryDelay(delayNotifier, value); - } - } catch (err) { - this.destination.error(err); - } - } - - protected _complete(): void { - this.completed = true; - this.tryComplete(); - } - - private removeSubscription(subscription: InnerSubscriber): T { - subscription.unsubscribe(); - - const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription); - let value: T = null; - - if (subscriptionIdx !== -1) { - value = this.values[subscriptionIdx]; - this.delayNotifierSubscriptions.splice(subscriptionIdx, 1); - this.values.splice(subscriptionIdx, 1); - } - - return value; - } - - private tryDelay(delayNotifier: Observable, value: T): void { - const notifierSubscription = subscribeToResult(this, delayNotifier, value); - - if (notifierSubscription && !notifierSubscription.closed) { - this.add(notifierSubscription); - this.delayNotifierSubscriptions.push(notifierSubscription); - } - - this.values.push(value); - } - - private tryComplete(): void { - if (this.completed && this.delayNotifierSubscriptions.length === 0) { - this.destination.complete(); - } - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class SubscriptionDelayObservable extends Observable { - constructor(protected source: Observable, private subscriptionDelay: Observable) { - super(); - } - - protected _subscribe(subscriber: Subscriber) { - this.subscriptionDelay.subscribe(new SubscriptionDelaySubscriber(subscriber, this.source)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class SubscriptionDelaySubscriber extends Subscriber { - private sourceSubscribed: boolean = false; - - constructor(private parent: Subscriber, private source: Observable) { - super(); - } - - protected _next(unused: any) { - this.subscribeToSource(); - } - - protected _error(err: any) { - this.unsubscribe(); - this.parent.error(err); - } - - protected _complete() { - this.subscribeToSource(); - } - - private subscribeToSource(): void { - if (!this.sourceSubscribed) { - this.sourceSubscribed = true; - this.unsubscribe(); - this.source.subscribe(this.parent); - } - } + return higherOrder(delayDurationSelector, subscriptionDelay)(this); } diff --git a/src/operators/delayWhen.ts b/src/operators/delayWhen.ts new file mode 100644 index 0000000000..f9793813ab --- /dev/null +++ b/src/operators/delayWhen.ts @@ -0,0 +1,205 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; +import { Subscription, TeardownLogic } from '../Subscription'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Delays the emission of items from the source Observable by a given time span + * determined by the emissions of another Observable. + * + * It's like {@link delay}, but the time span of the + * delay duration is determined by a second Observable. + * + * + * + * `delayWhen` time shifts each emitted value from the source Observable by a + * time span determined by another Observable. When the source emits a value, + * the `delayDurationSelector` function is called with the source value as + * argument, and should return an Observable, called the "duration" Observable. + * The source value is emitted on the output Observable only when the duration + * Observable emits a value or completes. + * + * Optionally, `delayWhen` takes a second argument, `subscriptionDelay`, which + * is an Observable. When `subscriptionDelay` emits its first value or + * completes, the source Observable is subscribed to and starts behaving like + * described in the previous paragraph. If `subscriptionDelay` is not provided, + * `delayWhen` will subscribe to the source Observable as soon as the output + * Observable is subscribed. + * + * @example Delay each click by a random amount of time, between 0 and 5 seconds + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var delayedClicks = clicks.delayWhen(event => + * Rx.Observable.interval(Math.random() * 5000) + * ); + * delayedClicks.subscribe(x => console.log(x)); + * + * @see {@link debounce} + * @see {@link delay} + * + * @param {function(value: T): Observable} delayDurationSelector A function that + * returns an Observable for each value emitted by the source Observable, which + * is then used to delay the emission of that item on the output Observable + * until the Observable returned from this function emits a value. + * @param {Observable} subscriptionDelay An Observable that triggers the + * subscription to the source Observable once it emits any value. + * @return {Observable} An Observable that delays the emissions of the source + * Observable by an amount of time specified by the Observable returned by + * `delayDurationSelector`. + * @method delayWhen + * @owner Observable + */ +export function delayWhen(delayDurationSelector: (value: T) => Observable, + subscriptionDelay?: Observable): MonoTypeOperatorFunction { + if (subscriptionDelay) { + return (source: Observable) => + new SubscriptionDelayObservable(source, subscriptionDelay) + .lift(new DelayWhenOperator(delayDurationSelector)); + } + return (source: Observable) => source.lift(new DelayWhenOperator(delayDurationSelector)); +} + +class DelayWhenOperator implements Operator { + constructor(private delayDurationSelector: (value: T) => Observable) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new DelayWhenSubscriber(subscriber, this.delayDurationSelector)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class DelayWhenSubscriber extends OuterSubscriber { + private completed: boolean = false; + private delayNotifierSubscriptions: Array = []; + private values: Array = []; + + constructor(destination: Subscriber, + private delayDurationSelector: (value: T) => Observable) { + super(destination); + } + + notifyNext(outerValue: T, innerValue: any, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.destination.next(outerValue); + this.removeSubscription(innerSub); + this.tryComplete(); + } + + notifyError(error: any, innerSub: InnerSubscriber): void { + this._error(error); + } + + notifyComplete(innerSub: InnerSubscriber): void { + const value = this.removeSubscription(innerSub); + if (value) { + this.destination.next(value); + } + this.tryComplete(); + } + + protected _next(value: T): void { + try { + const delayNotifier = this.delayDurationSelector(value); + if (delayNotifier) { + this.tryDelay(delayNotifier, value); + } + } catch (err) { + this.destination.error(err); + } + } + + protected _complete(): void { + this.completed = true; + this.tryComplete(); + } + + private removeSubscription(subscription: InnerSubscriber): T { + subscription.unsubscribe(); + + const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription); + let value: T = null; + + if (subscriptionIdx !== -1) { + value = this.values[subscriptionIdx]; + this.delayNotifierSubscriptions.splice(subscriptionIdx, 1); + this.values.splice(subscriptionIdx, 1); + } + + return value; + } + + private tryDelay(delayNotifier: Observable, value: T): void { + const notifierSubscription = subscribeToResult(this, delayNotifier, value); + + if (notifierSubscription && !notifierSubscription.closed) { + this.add(notifierSubscription); + this.delayNotifierSubscriptions.push(notifierSubscription); + } + + this.values.push(value); + } + + private tryComplete(): void { + if (this.completed && this.delayNotifierSubscriptions.length === 0) { + this.destination.complete(); + } + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class SubscriptionDelayObservable extends Observable { + constructor(protected source: Observable, private subscriptionDelay: Observable) { + super(); + } + + protected _subscribe(subscriber: Subscriber) { + this.subscriptionDelay.subscribe(new SubscriptionDelaySubscriber(subscriber, this.source)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class SubscriptionDelaySubscriber extends Subscriber { + private sourceSubscribed: boolean = false; + + constructor(private parent: Subscriber, private source: Observable) { + super(); + } + + protected _next(unused: any) { + this.subscribeToSource(); + } + + protected _error(err: any) { + this.unsubscribe(); + this.parent.error(err); + } + + protected _complete() { + this.subscribeToSource(); + } + + private subscribeToSource(): void { + if (!this.sourceSubscribed) { + this.sourceSubscribed = true; + this.unsubscribe(); + this.source.subscribe(this.parent); + } + } +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 02a29416c6..2f2355cefa 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -15,6 +15,7 @@ export { debounce } from './debounce'; export { debounceTime } from './debounceTime'; export { defaultIfEmpty } from './defaultIfEmpty'; export { delay } from './delay'; +export { delayWhen } from './delayWhen'; export { dematerialize } from './dematerialize'; export { filter } from './filter'; export { ignoreElements } from './ignoreElements';