-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(debounce): add higher-order lettable version of debounce
- Loading branch information
Showing
3 changed files
with
141 additions
and
90 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import { Operator } from '../Operator'; | ||
import { Observable, SubscribableOrPromise } from '../Observable'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { Subscription, TeardownLogic } from '../Subscription'; | ||
|
||
import { OuterSubscriber } from '../OuterSubscriber'; | ||
import { InnerSubscriber } from '../InnerSubscriber'; | ||
import { subscribeToResult } from '../util/subscribeToResult'; | ||
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Emits a value from the source Observable only after a particular time span | ||
* determined by another Observable has passed without another source emission. | ||
* | ||
* <span class="informal">It's like {@link debounceTime}, but the time span of | ||
* emission silence is determined by a second Observable.</span> | ||
* | ||
* <img src="./img/debounce.png" width="100%"> | ||
* | ||
* `debounce` delays values emitted by the source Observable, but drops previous | ||
* pending delayed emissions if a new value arrives on the source Observable. | ||
* This operator keeps track of the most recent value from the source | ||
* Observable, and spawns a duration Observable by calling the | ||
* `durationSelector` function. The value is emitted only when the duration | ||
* Observable emits a value or completes, and if no other value was emitted on | ||
* the source Observable since the duration Observable was spawned. If a new | ||
* value appears before the duration Observable emits, the previous value will | ||
* be dropped and will not be emitted on the output Observable. | ||
* | ||
* Like {@link debounceTime}, this is a rate-limiting operator, and also a | ||
* delay-like operator since output emissions do not necessarily occur at the | ||
* same time as they did on the source Observable. | ||
* | ||
* @example <caption>Emit the most recent click after a burst of clicks</caption> | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var result = clicks.debounce(() => Rx.Observable.interval(1000)); | ||
* result.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link audit} | ||
* @see {@link debounceTime} | ||
* @see {@link delayWhen} | ||
* @see {@link throttle} | ||
* | ||
* @param {function(value: T): SubscribableOrPromise} durationSelector A function | ||
* that receives a value from the source Observable, for computing the timeout | ||
* duration for each source value, returned as an Observable or a Promise. | ||
* @return {Observable} An Observable that delays the emissions of the source | ||
* Observable by the specified duration Observable returned by | ||
* `durationSelector`, and may drop some values if they occur too frequently. | ||
* @method debounce | ||
* @owner Observable | ||
*/ | ||
export function debounce<T>(durationSelector: (value: T) => SubscribableOrPromise<number>): MonoTypeOperatorFunction<T> { | ||
return (source: Observable<T>) => source.lift(new DebounceOperator(durationSelector)); | ||
} | ||
|
||
class DebounceOperator<T> implements Operator<T, T> { | ||
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>) { | ||
} | ||
|
||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
return source.subscribe(new DebounceSubscriber(subscriber, this.durationSelector)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> { | ||
private value: T; | ||
private hasValue: boolean = false; | ||
private durationSubscription: Subscription = null; | ||
|
||
constructor(destination: Subscriber<R>, | ||
private durationSelector: (value: T) => SubscribableOrPromise<number>) { | ||
super(destination); | ||
} | ||
|
||
protected _next(value: T): void { | ||
try { | ||
const result = this.durationSelector.call(this, value); | ||
|
||
if (result) { | ||
this._tryNext(value, result); | ||
} | ||
} catch (err) { | ||
this.destination.error(err); | ||
} | ||
} | ||
|
||
protected _complete(): void { | ||
this.emitValue(); | ||
this.destination.complete(); | ||
} | ||
|
||
private _tryNext(value: T, duration: SubscribableOrPromise<number>): void { | ||
let subscription = this.durationSubscription; | ||
this.value = value; | ||
this.hasValue = true; | ||
if (subscription) { | ||
subscription.unsubscribe(); | ||
this.remove(subscription); | ||
} | ||
|
||
subscription = subscribeToResult(this, duration); | ||
if (!subscription.closed) { | ||
this.add(this.durationSubscription = subscription); | ||
} | ||
} | ||
|
||
notifyNext(outerValue: T, innerValue: R, | ||
outerIndex: number, innerIndex: number, | ||
innerSub: InnerSubscriber<T, R>): void { | ||
this.emitValue(); | ||
} | ||
|
||
notifyComplete(): void { | ||
this.emitValue(); | ||
} | ||
|
||
emitValue(): void { | ||
if (this.hasValue) { | ||
const value = this.value; | ||
const subscription = this.durationSubscription; | ||
if (subscription) { | ||
this.durationSubscription = null; | ||
subscription.unsubscribe(); | ||
this.remove(subscription); | ||
} | ||
this.value = null; | ||
this.hasValue = false; | ||
super._next(value); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters