Skip to content

Commit

Permalink
feat(operator): add debounce
Browse files Browse the repository at this point in the history
closes #193
  • Loading branch information
benlesh committed Aug 22, 2015
1 parent 1d735b9 commit f03adaf
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 1 deletion.
24 changes: 24 additions & 0 deletions spec/operators/debounce-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.debounce()', function () {
it('should debounce events', function (done) {
Observable.of(1, 2, 3).debounce(50)
.subscribe(function (x) {
expect(x).toBe(1);
}, null, done);
});

it('should debounce events multiple times', function (done) {
var expected = ['1-0', '2-0']
Observable.concat(
Observable.timer(0, 10).take(3).map(function (x) { return '1-' + x }),
Observable.timer(80, 10).take(5).map(function (x) { return '2-' + x })
)
.debounce(50)
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
});
});
3 changes: 2 additions & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ export default class Observable<T> {
scan: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
reduce: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
startWith: <T>(x: T) => Observable<T>;

debounce: <R>(dueTime: number, scheduler?: Scheduler) => Observable<R>;

filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
distinctUntilKeyChanged: (key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) => Observable<T>;
Expand Down
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ observableProto.groupBy = groupBy;

import delay from './operators/delay';
import throttle from './operators/throttle';
import debounce from './operators/debounce';

observableProto.delay = delay;
observableProto.throttle = throttle;
observableProto.debounce = debounce;

export {

Expand Down
56 changes: 56 additions & 0 deletions src/operators/debounce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Scheduler from '../Scheduler';
import Observable from '../Observable';
import Subscription from '../Subscription';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function debounce<T>(dueTime: number, scheduler: Scheduler = Scheduler.nextTick): Observable<T> {
return this.lift(new DebounceOperator(dueTime, scheduler));
}

export class DebounceOperator<T, R> extends Operator<T, R> {

constructor(private dueTime: number, private scheduler: Scheduler) {
super();
}

call(observer: Observer<T>): Observer<T> {
return new DebounceSubscriber(observer, this.dueTime, this.scheduler);
}
}

export class DebounceSubscriber<T> extends Subscriber<T> {
private debounced: Subscription<any>;

constructor(destination: Observer < T >, private dueTime: number, private scheduler: Scheduler) {
super(destination);
}

_next(value: T) {
if (!this.debounced) {
this.add(this.debounced = this.scheduler.schedule(this.dueTime, { value, subscriber: this }, dispatchNext));
}
}

clearDebounce() {
const debounced = this.debounced;
if (debounced) {
debounced.unsubscribe();
this.remove(debounced);
}
}

debouncedNext(value: T) {
this.clearDebounce();
this.destination.next(value);
}
}

function dispatchNext<T>({ value, subscriber }) {
subscriber.debouncedNext(value);
}

0 comments on commit f03adaf

Please sign in to comment.