Skip to content

Commit

Permalink
feat(operator): add debounceTime operator
Browse files Browse the repository at this point in the history
- add debounceTime operator
- expand test coverage including micro perf test

relates to #493
  • Loading branch information
kwonoj authored and benlesh committed Oct 13, 2015
1 parent 26a0696 commit dd2ba40
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 1 deletion.
24 changes: 24 additions & 0 deletions perf/micro/immediate-scheduler/operators/debouncetime.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var time = [10, 30, 20, 40, 10];

var oldDebounceTimeWithImmediateScheduler = RxOld.Observable.range(0, 5, RxOld.Scheduler.immediate)
.flatMap(function (x) { return RxOld.Observable.of(x, RxOld.Scheduler.immediate).delay(time[x]); })
.debounce(25);
var newDebounceTimeWithImmediateScheduler = RxNew.Observable.range(0, 5)
.mergeMap(function (x) { return RxNew.Observable.of(x).delay(time[x]); })
.debounceTime(25);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old debounceTime() with immediate scheduler', function () {
oldDebounceTimeWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new debounceTime() with immediate scheduler', function () {
newDebounceTimeWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
89 changes: 89 additions & 0 deletions spec/operators/debouncetime-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/* globals describe, it, expect, expectObservable, hot, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.debounceTime()', function () {
it('should delay all element by the specified time', function () {
var e1 = hot('-a--------b------c----|');
var expected = '------a--------b------(c|)';

expectObservable(e1.debounceTime(50, rxTestScheduler)).toBe(expected);
});

it('should debounce and delay element by the specified time', function () {
var e1 = hot('-a--(bc)-----------d-------|');
var expected = '---------c--------------d--|';

expectObservable(e1.debounceTime(50, rxTestScheduler)).toBe(expected);
});

it('should complete when source does not emit', function () {
var e1 = hot('-----|');
var expected = '-----|';

expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected);
});

it('should complete when source is empty', function () {
var e1 = Observable.empty();
var expected = '|';

expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected);
});

it('should raise error when source does not emit and raises error', function () {
var e1 = hot('-----#');
var expected = '-----#';

expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected);
});

it('should raise error when source throws', function () {
var e1 = Observable.throw('error');
var expected = '#';

expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected);
});

it('should debounce and does not complete when source does not completes', function () {
var e1 = hot('-a--(bc)-----------d-------');
var expected = '---------c--------------d--';

expectObservable(e1.debounceTime(50, rxTestScheduler)).toBe(expected);
});

it('should not completes when source does not completes', function () {
var e1 = hot('-');
var expected = '-';

expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected);
});

it('should not completes when source never completes', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected);
});

it('should delay all element until source raises error', function () {
var e1 = hot('-a--------b------c----#');
var expected = '------a--------b------#';

expectObservable(e1.debounceTime(50, rxTestScheduler)).toBe(expected);
});

it('should debounce all elements while source emits within given time', function () {
var e1 = hot('--a--b--c--d--e--f--g--h-|');
var expected = '-------------------------(h|)';

expectObservable(e1.debounceTime(40, rxTestScheduler)).toBe(expected);
});

it('should debounce all element while source emits within given time until raises error', function () {
var e1 = hot('--a--b--c--d--e--f--g--h-#');
var expected = '-------------------------#';

expectObservable(e1.debounceTime(40, rxTestScheduler)).toBe(expected);
});
});
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export interface CoreOperators<T> {
concatMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
count?: () => Observable<number>;
dematerialize?: () => Observable<any>;
debounce?: <R>(dueTime: number, scheduler?: Scheduler) => Observable<R>;
debounceTime?: <R>(dueTime: number, scheduler?: Scheduler) => Observable<R>;
defaultIfEmpty?: <T, R>(defaultValue: R) => Observable<T>|Observable<R>;
delay?: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;
distinctUntilChanged?: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ observableProto.dematerialize = dematerialize;
import debounce from './operators/debounce';
observableProto.debounce = debounce;

import debounceTime from './operators/debounceTime';
observableProto.debounceTime = debounceTime;

import defaultIfEmpty from './operators/defaultIfEmpty';
observableProto.defaultIfEmpty = defaultIfEmpty;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ observableProto.dematerialize = dematerialize;
import debounce from './operators/debounce';
observableProto.debounce = debounce;

import debounceTime from './operators/debounceTime';
observableProto.debounceTime = debounceTime;

import defaultIfEmpty from './operators/defaultIfEmpty';
observableProto.defaultIfEmpty = defaultIfEmpty;

Expand Down
63 changes: 63 additions & 0 deletions src/operators/debounceTime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import Operator from '../Operator';
import Observable from '../Observable';
import Subscriber from '../Subscriber';
import Scheduler from '../Scheduler';
import Subscription from '../Subscription';
import nextTick from '../schedulers/nextTick';

export default function debounceTime<T>(dueTime: number, scheduler: Scheduler = nextTick): Observable<T> {
return this.lift(new DebounceTimeOperator(dueTime, scheduler));
}

class DebounceTimeOperator<T, R> implements Operator<T, R> {
constructor(private dueTime: number, private scheduler: Scheduler) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new DebounceTimeSubscriber(subscriber, this.dueTime, this.scheduler);
}
}

class DebounceTimeSubscriber<T> extends Subscriber<T> {
private debouncedSubscription: Subscription<any> = null;
private lastValue: any = null;

constructor(destination: Subscriber<T>,
private dueTime: number,
private scheduler: Scheduler) {
super(destination);
}

_next(value: T) {
this.clearDebounce();
this.lastValue = value;
this.add(this.debouncedSubscription = this.scheduler.schedule(dispatchNext, this.dueTime, this));
}

_complete() {
this.debouncedNext();
this.destination.complete();
}

debouncedNext(): void {
this.clearDebounce();
if (this.lastValue != null) {
this.destination.next(this.lastValue);
this.lastValue = null;
}
}

private clearDebounce(): void {
const debouncedSubscription = this.debouncedSubscription;

if (debouncedSubscription !== null) {
this.remove(debouncedSubscription);
debouncedSubscription.unsubscribe();
this.debouncedSubscription = null;
}
}
}

function dispatchNext(subscriber) {
subscriber.debouncedNext();
}

0 comments on commit dd2ba40

Please sign in to comment.