diff --git a/perf/micro/immediate-scheduler/operators/debounce.js b/perf/micro/immediate-scheduler/operators/debounce.js new file mode 100644 index 0000000000..0f2a71ab7b --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/debounce.js @@ -0,0 +1,24 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var time = [10, 30, 20, 40, 10]; + + var oldDebounceWithImmediateScheduler = RxOld.Observable.range(0, 5, RxOld.Scheduler.immediate) + .flatMap(function (x) { return RxOld.Observable.of(x, RxOld.Scheduler.immediate).delay(time[x]); }) + .debounce(function (x) { return RxOld.Observable.timer(25); }); + var newDebounceWithImmediateScheduler = RxNew.Observable.range(0, 5) + .mergeMap(function (x) { return RxNew.Observable.of(x).delay(time[x]); }) + .debounce(function (x) { return RxNew.Observable.timer(25); }); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old debounce() with immediate scheduler', function () { + oldDebounceWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new debounce() with immediate scheduler', function () { + newDebounceWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; \ No newline at end of file diff --git a/spec/operators/debounce-spec.js b/spec/operators/debounce-spec.js index f20051f7e4..4a4197b442 100644 --- a/spec/operators/debounce-spec.js +++ b/spec/operators/debounce-spec.js @@ -1,18 +1,233 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, expectObservable, hot, cold, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; describe('Observable.prototype.debounce()', function () { - it('should delay calls by the specified amount', function (done) { - var expected = [3, 4]; - var source = Observable.concat(Observable.of(1), + function getTimerSelector(x) { + return function () { + return Observable.timer(x, rxTestScheduler); + }; + } + + it('should delay all element by selector observable', function () { + var e1 = hot('--a--b--c--d---------|'); + var expected = '----a--b--c--d-------|'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should debounce by selector observable', function () { + var e1 = hot('--a--bc--d----|'); + var expected = '----a---c--d--|'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should complete when source does not emit', function () { + var e1 = hot('-----|'); + var expected = '-----|'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should complete when source is empty', function () { + var e1 = Observable.empty(); + var expected = '|'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should raise error when source does not emit and raises error', function () { + var e1 = hot('-----#'); + var expected = '-----#'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should raise error when source throws', function () { + var e1 = Observable.throw('error'); + var expected = '#'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should debounce and does not complete when source does not completes', function () { + var e1 = hot('--a--bc--d---'); + var expected = '----a---c--d--'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should not completes when source does not completes', function () { + var e1 = hot('-'); + var expected = '-'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should not completes when source never completes', function () { + var e1 = Observable.never(); + var expected = '-'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should delay all element until source raises error', function () { + var e1 = hot('--a--b--c--d---------#'); + var expected = '----a--b--c--d-------#'; + + expectObservable(e1.debounce(getTimerSelector(20))).toBe(expected); + }); + + it('should debounce all elements while source emits by selector observable', function () { + var e1 = hot('---a---b---c---d---e|'); + var expected = '--------------------(e|)'; + + expectObservable(e1.debounce(getTimerSelector(40))).toBe(expected); + }); + + it('should debounce all element while source emits by selector observable until raises error', function () { + var e1 = hot('--a--b--c--d-#'); + var expected = '-------------#'; + + expectObservable(e1.debounce(getTimerSelector(50))).toBe(expected); + }); + + it('should delay element by same selector observable emits multiple', function () { + var e1 = hot('----a--b--c----d----e-------|'); + var expected = '------a--b--c----d----e-----|'; + var selector = cold('--x-y-'); + + expectObservable(e1.debounce(function () { return selector; })).toBe(expected); + }); + + it('should debounce by selector observable emits multiple', function () { + var e1 = hot('----a--b--c----d----e-------|'); + var expected = '------a-----c---------e-----|'; + var selector = [cold('--x-y-'), + cold( '----x-y-'), + cold( '--x-y-'), + cold( '------x-y-'), + cold( '--x-y-')]; + + expectObservable(e1.debounce(function () { return selector.shift(); })).toBe(expected); + }); + + it('should debounce by selector observable until source completes', function () { + var e1 = hot('----a--b--c----d----e|'); + var expected = '------a-----c--------(e|)'; + var selector = [cold('--x-y-'), + cold( '----x-y-'), + cold( '--x-y-'), + cold( '------x-y-'), + cold( '--x-y-')]; + + expectObservable(e1.debounce(function () { return selector.shift(); })).toBe(expected); + }); + + it('should raise error when selector observable raises error', function () { + var e1 = hot('--------a--------b--------c---------|'); + var expected = '---------a---------b---------#'; + var selector = [cold( '-x-y-'), + cold( '--x-y-'), + cold( '---#')]; + + expectObservable(e1.debounce(function () { return selector.shift(); })).toBe(expected); + }); + + it('should raise error when source raises error with selector observable', function () { + var e1 = hot('--------a--------b--------c---------d#'); + var expected = '---------a---------b---------c-------#'; + var selector = [cold( '-x-y-'), + cold( '--x-y-'), + cold( '---x-y-'), + cold( '----x-y-')]; + + expectObservable(e1.debounce(function () { return selector.shift(); })).toBe(expected); + }); + + it('should raise error when selector function throws', function () { + var e1 = hot('--------a--------b--------c---------|'); + var expected = '---------a---------b------#'; + var selector = [cold( '-x-y-'), + cold( '--x-y-')]; + + function selectorFunction(x) { + if (x !== 'c') { + return selector.shift(); + } else { + throw 'error'; + } + } + + expectObservable(e1.debounce(selectorFunction)).toBe(expected); + }); + + it('should delay element by selector observable completes when it does not emits', function () { + var e1 = hot('--------a--------b--------c---------|'); + var expected = '---------a---------b---------c------|'; + var selector = [cold( '-|'), + cold( '--|'), + cold( '---|')]; + + expectObservable(e1.debounce(function () { return selector.shift(); })).toBe(expected); + }); + + it('should debounce by selector observable completes when it does not emits', function () { + var e1 = hot('----a--b-c---------de-------------|'); + var expected = '-----a------c------------e--------|'; + var selector = [cold('-|'), + cold( '--|'), + cold( '---|'), + cold( '----|'), + cold( '-----|')]; + + expectObservable(e1.debounce(function () { return selector.shift(); })).toBe(expected); + }); + + it('should delay by promise resolves', function (done) { + var e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + var expected = [1,2,3,4]; + + e1.debounce(function () { + return new Promise(function (resolve) { resolve(42); }); + }).subscribe(function (x) { + expect(x).toEqual(expected.shift()); }, + function () { throw 'should not be called'; }, + function () { + expect(expected.length).toBe(0); + done(); + }); + }); + + it('should raises error when promise rejects', function (done) { + var e1 = Observable.concat(Observable.of(1), Observable.timer(10).mapTo(2), Observable.timer(10).mapTo(3), Observable.timer(100).mapTo(4) - ) - .debounce(50) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); + ); + var expected = [1,2]; + var error = new Error('error'); + + e1.debounce(function (x) { + if (x === 3) { + return new Promise(function (resolve, reject) {reject(error);}); + } else { + return new Promise(function (resolve) {resolve(42);}); + } + }).subscribe(function (x) { + expect(x).toEqual(expected.shift()); }, + function (err) { + expect(err).toBe(error); + expect(expected.length).toBe(0); + done(); + }, + function () { + throw 'should not be called'; + }); }); }); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 2465a0e190..09ffda5a0c 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -19,6 +19,7 @@ export interface CoreOperators { concatMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; count?: () => Observable; dematerialize?: () => Observable; + debounce?: (durationSelector: (value: T) => Observable | Promise) => Observable; debounceTime?: (dueTime: number, scheduler?: Scheduler) => Observable; defaultIfEmpty?: (defaultValue: R) => Observable|Observable; delay?: (delay: number, scheduler?: Scheduler) => Observable; diff --git a/src/operators/debounce.ts b/src/operators/debounce.ts index cab66c509e..070518feff 100644 --- a/src/operators/debounce.ts +++ b/src/operators/debounce.ts @@ -1,57 +1,105 @@ import Operator from '../Operator'; -import Observer from '../Observer'; -import Subscriber from '../Subscriber'; -import Scheduler from '../Scheduler'; import Observable from '../Observable'; +import PromiseObservable from '../observables/PromiseObservable'; +import Subscriber from '../Subscriber'; import Subscription from '../Subscription'; -import nextTick from '../schedulers/nextTick'; import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; -import bindCallback from '../util/bindCallback'; -export default function debounce(dueTime: number, scheduler: Scheduler = nextTick): Observable { - return this.lift(new DebounceOperator(dueTime, scheduler)); +export default function debounce(durationSelector: (value: T) => Observable | Promise): Observable { + return this.lift(new DebounceOperator(durationSelector)); } class DebounceOperator implements Operator { - constructor(private dueTime: number, private scheduler: Scheduler) { + constructor(private durationSelector: (value: T) => Observable | Promise) { } - call(subscriber: Subscriber): Subscriber { - return new DebounceSubscriber(subscriber, this.dueTime, this.scheduler); + call(observer: Subscriber): Subscriber { + return new DebounceSubscriber(observer, this.durationSelector); } } class DebounceSubscriber extends Subscriber { - private debounced: Subscription; + private debouncedSubscription: Subscription = null; + private lastValue: any = null; + private _index: number = 0; + get index() { + return this._index; + } constructor(destination: Subscriber, - private dueTime: number, - private scheduler: Scheduler) { + private durationSelector: (value: T) => Observable | Promise) { super(destination); } _next(value: T) { - this.clearDebounce(); - this.add(this.debounced = this.scheduler.schedule(dispatchNext, this.dueTime, { value, subscriber: this })); + const destination = this.destination; + const currentIndex = ++this._index; + let debounce = tryCatch(this.durationSelector)(value); + + if (debounce === errorObject) { + destination.error(errorObject.e); + } else { + if (typeof debounce.subscribe !== 'function' + && typeof debounce.then === 'function') { + debounce = PromiseObservable.create(debounce); + } + + this.lastValue = value; + this.add(this.debouncedSubscription = debounce._subscribe(new DurationSelectorSubscriber(this, currentIndex))); + } + } + + _complete() { + this.debouncedNext(); + this.destination.complete(); } - debouncedNext(value: T) { + debouncedNext(): void { this.clearDebounce(); - this.destination.next(value); + if (this.lastValue != null) { + this.destination.next(this.lastValue); + this.lastValue = null; + } } - clearDebounce() { - const debounced = this.debounced; - if (debounced) { - this.remove(debounced); - debounced.unsubscribe(); - this.debounced = null; + private clearDebounce(): void { + const debouncedSubscription = this.debouncedSubscription; + + if (debouncedSubscription !== null) { + this.remove(debouncedSubscription); + this.debouncedSubscription = null; } } } -function dispatchNext({ value, subscriber }) { - subscriber.debouncedNext(value); -} +class DurationSelectorSubscriber extends Subscriber { + constructor(private parent: DebounceSubscriber, + private currentIndex: number) { + super(null); + } + + private debounceNext(): void { + const parent = this.parent; + + if (this.currentIndex === parent.index) { + parent.debouncedNext(); + if (!this.isUnsubscribed) { + this.unsubscribe(); + } + } + } + + _next(unused: T) { + this.debounceNext(); + } + + _error(err) { + this.parent.error(err); + } + + _complete() { + this.debounceNext(); + } +} \ No newline at end of file