-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(delayWhen): add higher-order lettable version of delayWhen
- Loading branch information
Showing
3 changed files
with
209 additions
and
154 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { Observable } from '../Observable'; | ||
import { Subscription, TeardownLogic } from '../Subscription'; | ||
import { OuterSubscriber } from '../OuterSubscriber'; | ||
import { InnerSubscriber } from '../InnerSubscriber'; | ||
import { subscribeToResult } from '../util/subscribeToResult'; | ||
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Delays the emission of items from the source Observable by a given time span | ||
* determined by the emissions of another Observable. | ||
* | ||
* <span class="informal">It's like {@link delay}, but the time span of the | ||
* delay duration is determined by a second Observable.</span> | ||
* | ||
* <img src="./img/delayWhen.png" width="100%"> | ||
* | ||
* `delayWhen` time shifts each emitted value from the source Observable by a | ||
* time span determined by another Observable. When the source emits a value, | ||
* the `delayDurationSelector` function is called with the source value as | ||
* argument, and should return an Observable, called the "duration" Observable. | ||
* The source value is emitted on the output Observable only when the duration | ||
* Observable emits a value or completes. | ||
* | ||
* Optionally, `delayWhen` takes a second argument, `subscriptionDelay`, which | ||
* is an Observable. When `subscriptionDelay` emits its first value or | ||
* completes, the source Observable is subscribed to and starts behaving like | ||
* described in the previous paragraph. If `subscriptionDelay` is not provided, | ||
* `delayWhen` will subscribe to the source Observable as soon as the output | ||
* Observable is subscribed. | ||
* | ||
* @example <caption>Delay each click by a random amount of time, between 0 and 5 seconds</caption> | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var delayedClicks = clicks.delayWhen(event => | ||
* Rx.Observable.interval(Math.random() * 5000) | ||
* ); | ||
* delayedClicks.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link debounce} | ||
* @see {@link delay} | ||
* | ||
* @param {function(value: T): Observable} delayDurationSelector A function that | ||
* returns an Observable for each value emitted by the source Observable, which | ||
* is then used to delay the emission of that item on the output Observable | ||
* until the Observable returned from this function emits a value. | ||
* @param {Observable} subscriptionDelay An Observable that triggers the | ||
* subscription to the source Observable once it emits any value. | ||
* @return {Observable} An Observable that delays the emissions of the source | ||
* Observable by an amount of time specified by the Observable returned by | ||
* `delayDurationSelector`. | ||
* @method delayWhen | ||
* @owner Observable | ||
*/ | ||
export function delayWhen<T>(delayDurationSelector: (value: T) => Observable<any>, | ||
subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T> { | ||
if (subscriptionDelay) { | ||
return (source: Observable<T>) => | ||
new SubscriptionDelayObservable(source, subscriptionDelay) | ||
.lift(new DelayWhenOperator(delayDurationSelector)); | ||
} | ||
return (source: Observable<T>) => source.lift(new DelayWhenOperator(delayDurationSelector)); | ||
} | ||
|
||
class DelayWhenOperator<T> implements Operator<T, T> { | ||
constructor(private delayDurationSelector: (value: T) => Observable<any>) { | ||
} | ||
|
||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
return source.subscribe(new DelayWhenSubscriber(subscriber, this.delayDurationSelector)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> { | ||
private completed: boolean = false; | ||
private delayNotifierSubscriptions: Array<Subscription> = []; | ||
private values: Array<T> = []; | ||
|
||
constructor(destination: Subscriber<T>, | ||
private delayDurationSelector: (value: T) => Observable<any>) { | ||
super(destination); | ||
} | ||
|
||
notifyNext(outerValue: T, innerValue: any, | ||
outerIndex: number, innerIndex: number, | ||
innerSub: InnerSubscriber<T, R>): void { | ||
this.destination.next(outerValue); | ||
this.removeSubscription(innerSub); | ||
this.tryComplete(); | ||
} | ||
|
||
notifyError(error: any, innerSub: InnerSubscriber<T, R>): void { | ||
this._error(error); | ||
} | ||
|
||
notifyComplete(innerSub: InnerSubscriber<T, R>): void { | ||
const value = this.removeSubscription(innerSub); | ||
if (value) { | ||
this.destination.next(value); | ||
} | ||
this.tryComplete(); | ||
} | ||
|
||
protected _next(value: T): void { | ||
try { | ||
const delayNotifier = this.delayDurationSelector(value); | ||
if (delayNotifier) { | ||
this.tryDelay(delayNotifier, value); | ||
} | ||
} catch (err) { | ||
this.destination.error(err); | ||
} | ||
} | ||
|
||
protected _complete(): void { | ||
this.completed = true; | ||
this.tryComplete(); | ||
} | ||
|
||
private removeSubscription(subscription: InnerSubscriber<T, R>): T { | ||
subscription.unsubscribe(); | ||
|
||
const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription); | ||
let value: T = null; | ||
|
||
if (subscriptionIdx !== -1) { | ||
value = this.values[subscriptionIdx]; | ||
this.delayNotifierSubscriptions.splice(subscriptionIdx, 1); | ||
this.values.splice(subscriptionIdx, 1); | ||
} | ||
|
||
return value; | ||
} | ||
|
||
private tryDelay(delayNotifier: Observable<any>, value: T): void { | ||
const notifierSubscription = subscribeToResult(this, delayNotifier, value); | ||
|
||
if (notifierSubscription && !notifierSubscription.closed) { | ||
this.add(notifierSubscription); | ||
this.delayNotifierSubscriptions.push(notifierSubscription); | ||
} | ||
|
||
this.values.push(value); | ||
} | ||
|
||
private tryComplete(): void { | ||
if (this.completed && this.delayNotifierSubscriptions.length === 0) { | ||
this.destination.complete(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class SubscriptionDelayObservable<T> extends Observable<T> { | ||
constructor(protected source: Observable<T>, private subscriptionDelay: Observable<any>) { | ||
super(); | ||
} | ||
|
||
protected _subscribe(subscriber: Subscriber<T>) { | ||
this.subscriptionDelay.subscribe(new SubscriptionDelaySubscriber(subscriber, this.source)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class SubscriptionDelaySubscriber<T> extends Subscriber<T> { | ||
private sourceSubscribed: boolean = false; | ||
|
||
constructor(private parent: Subscriber<T>, private source: Observable<T>) { | ||
super(); | ||
} | ||
|
||
protected _next(unused: any) { | ||
this.subscribeToSource(); | ||
} | ||
|
||
protected _error(err: any) { | ||
this.unsubscribe(); | ||
this.parent.error(err); | ||
} | ||
|
||
protected _complete() { | ||
this.subscribeToSource(); | ||
} | ||
|
||
private subscribeToSource(): void { | ||
if (!this.sourceSubscribed) { | ||
this.sourceSubscribed = true; | ||
this.unsubscribe(); | ||
this.source.subscribe(this.parent); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters