Skip to content

Commit cb8ce46

Browse files
committed
feat(debounce): add higher-order lettable version of debounce
1 parent caf713e commit cb8ce46

File tree

3 files changed

+141
-90
lines changed

3 files changed

+141
-90
lines changed

src/operator/debounce.ts

Lines changed: 3 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
import { Operator } from '../Operator';
2-
import { Observable, SubscribableOrPromise } from '../Observable';
3-
import { Subscriber } from '../Subscriber';
4-
import { Subscription, TeardownLogic } from '../Subscription';
51

6-
import { OuterSubscriber } from '../OuterSubscriber';
7-
import { InnerSubscriber } from '../InnerSubscriber';
8-
import { subscribeToResult } from '../util/subscribeToResult';
2+
import { Observable, SubscribableOrPromise } from '../Observable';
3+
import { debounce as higherOrder } from '../operators';
94

105
/**
116
* Emits a value from the source Observable only after a particular time span
@@ -50,87 +45,5 @@ import { subscribeToResult } from '../util/subscribeToResult';
5045
* @owner Observable
5146
*/
5247
export function debounce<T>(this: Observable<T>, durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T> {
53-
return this.lift(new DebounceOperator(durationSelector));
54-
}
55-
56-
class DebounceOperator<T> implements Operator<T, T> {
57-
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>) {
58-
}
59-
60-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
61-
return source.subscribe(new DebounceSubscriber(subscriber, this.durationSelector));
62-
}
63-
}
64-
65-
/**
66-
* We need this JSDoc comment for affecting ESDoc.
67-
* @ignore
68-
* @extends {Ignored}
69-
*/
70-
class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> {
71-
private value: T;
72-
private hasValue: boolean = false;
73-
private durationSubscription: Subscription = null;
74-
75-
constructor(destination: Subscriber<R>,
76-
private durationSelector: (value: T) => SubscribableOrPromise<number>) {
77-
super(destination);
78-
}
79-
80-
protected _next(value: T): void {
81-
try {
82-
const result = this.durationSelector.call(this, value);
83-
84-
if (result) {
85-
this._tryNext(value, result);
86-
}
87-
} catch (err) {
88-
this.destination.error(err);
89-
}
90-
}
91-
92-
protected _complete(): void {
93-
this.emitValue();
94-
this.destination.complete();
95-
}
96-
97-
private _tryNext(value: T, duration: SubscribableOrPromise<number>): void {
98-
let subscription = this.durationSubscription;
99-
this.value = value;
100-
this.hasValue = true;
101-
if (subscription) {
102-
subscription.unsubscribe();
103-
this.remove(subscription);
104-
}
105-
106-
subscription = subscribeToResult(this, duration);
107-
if (!subscription.closed) {
108-
this.add(this.durationSubscription = subscription);
109-
}
110-
}
111-
112-
notifyNext(outerValue: T, innerValue: R,
113-
outerIndex: number, innerIndex: number,
114-
innerSub: InnerSubscriber<T, R>): void {
115-
this.emitValue();
116-
}
117-
118-
notifyComplete(): void {
119-
this.emitValue();
120-
}
121-
122-
emitValue(): void {
123-
if (this.hasValue) {
124-
const value = this.value;
125-
const subscription = this.durationSubscription;
126-
if (subscription) {
127-
this.durationSubscription = null;
128-
subscription.unsubscribe();
129-
this.remove(subscription);
130-
}
131-
this.value = null;
132-
this.hasValue = false;
133-
super._next(value);
134-
}
135-
}
48+
return higherOrder(durationSelector)(this);
13649
}

src/operators/debounce.ts

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import { Operator } from '../Operator';
2+
import { Observable, SubscribableOrPromise } from '../Observable';
3+
import { Subscriber } from '../Subscriber';
4+
import { Subscription, TeardownLogic } from '../Subscription';
5+
6+
import { OuterSubscriber } from '../OuterSubscriber';
7+
import { InnerSubscriber } from '../InnerSubscriber';
8+
import { subscribeToResult } from '../util/subscribeToResult';
9+
import { MonoTypeOperatorFunction } from '../interfaces';
10+
11+
/**
12+
* Emits a value from the source Observable only after a particular time span
13+
* determined by another Observable has passed without another source emission.
14+
*
15+
* <span class="informal">It's like {@link debounceTime}, but the time span of
16+
* emission silence is determined by a second Observable.</span>
17+
*
18+
* <img src="./img/debounce.png" width="100%">
19+
*
20+
* `debounce` delays values emitted by the source Observable, but drops previous
21+
* pending delayed emissions if a new value arrives on the source Observable.
22+
* This operator keeps track of the most recent value from the source
23+
* Observable, and spawns a duration Observable by calling the
24+
* `durationSelector` function. The value is emitted only when the duration
25+
* Observable emits a value or completes, and if no other value was emitted on
26+
* the source Observable since the duration Observable was spawned. If a new
27+
* value appears before the duration Observable emits, the previous value will
28+
* be dropped and will not be emitted on the output Observable.
29+
*
30+
* Like {@link debounceTime}, this is a rate-limiting operator, and also a
31+
* delay-like operator since output emissions do not necessarily occur at the
32+
* same time as they did on the source Observable.
33+
*
34+
* @example <caption>Emit the most recent click after a burst of clicks</caption>
35+
* var clicks = Rx.Observable.fromEvent(document, 'click');
36+
* var result = clicks.debounce(() => Rx.Observable.interval(1000));
37+
* result.subscribe(x => console.log(x));
38+
*
39+
* @see {@link audit}
40+
* @see {@link debounceTime}
41+
* @see {@link delayWhen}
42+
* @see {@link throttle}
43+
*
44+
* @param {function(value: T): SubscribableOrPromise} durationSelector A function
45+
* that receives a value from the source Observable, for computing the timeout
46+
* duration for each source value, returned as an Observable or a Promise.
47+
* @return {Observable} An Observable that delays the emissions of the source
48+
* Observable by the specified duration Observable returned by
49+
* `durationSelector`, and may drop some values if they occur too frequently.
50+
* @method debounce
51+
* @owner Observable
52+
*/
53+
export function debounce<T>(durationSelector: (value: T) => SubscribableOrPromise<number>): MonoTypeOperatorFunction<T> {
54+
return (source: Observable<T>) => source.lift(new DebounceOperator(durationSelector));
55+
}
56+
57+
class DebounceOperator<T> implements Operator<T, T> {
58+
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>) {
59+
}
60+
61+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
62+
return source.subscribe(new DebounceSubscriber(subscriber, this.durationSelector));
63+
}
64+
}
65+
66+
/**
67+
* We need this JSDoc comment for affecting ESDoc.
68+
* @ignore
69+
* @extends {Ignored}
70+
*/
71+
class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> {
72+
private value: T;
73+
private hasValue: boolean = false;
74+
private durationSubscription: Subscription = null;
75+
76+
constructor(destination: Subscriber<R>,
77+
private durationSelector: (value: T) => SubscribableOrPromise<number>) {
78+
super(destination);
79+
}
80+
81+
protected _next(value: T): void {
82+
try {
83+
const result = this.durationSelector.call(this, value);
84+
85+
if (result) {
86+
this._tryNext(value, result);
87+
}
88+
} catch (err) {
89+
this.destination.error(err);
90+
}
91+
}
92+
93+
protected _complete(): void {
94+
this.emitValue();
95+
this.destination.complete();
96+
}
97+
98+
private _tryNext(value: T, duration: SubscribableOrPromise<number>): void {
99+
let subscription = this.durationSubscription;
100+
this.value = value;
101+
this.hasValue = true;
102+
if (subscription) {
103+
subscription.unsubscribe();
104+
this.remove(subscription);
105+
}
106+
107+
subscription = subscribeToResult(this, duration);
108+
if (!subscription.closed) {
109+
this.add(this.durationSubscription = subscription);
110+
}
111+
}
112+
113+
notifyNext(outerValue: T, innerValue: R,
114+
outerIndex: number, innerIndex: number,
115+
innerSub: InnerSubscriber<T, R>): void {
116+
this.emitValue();
117+
}
118+
119+
notifyComplete(): void {
120+
this.emitValue();
121+
}
122+
123+
emitValue(): void {
124+
if (this.hasValue) {
125+
const value = this.value;
126+
const subscription = this.durationSubscription;
127+
if (subscription) {
128+
this.durationSubscription = null;
129+
subscription.unsubscribe();
130+
this.remove(subscription);
131+
}
132+
this.value = null;
133+
this.hasValue = false;
134+
super._next(value);
135+
}
136+
}
137+
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export { concatAll } from './concatAll';
1111
export { concatMap } from './concatMap';
1212
export { concatMapTo } from './concatMapTo';
1313
export { count } from './count';
14+
export { debounce } from './debounce';
1415
export { defaultIfEmpty } from './defaultIfEmpty';
1516
export { dematerialize } from './dematerialize';
1617
export { filter } from './filter';

0 commit comments

Comments
 (0)