diff --git a/spec/operators/delay-spec.js b/spec/operators/delay-spec.js index 8db55eeca9..ab1d9c90eb 100644 --- a/spec/operators/delay-spec.js +++ b/spec/operators/delay-spec.js @@ -1,11 +1,96 @@ -/* globals describe, it, expect, expectObservable, hot, rxTestScheduler */ +/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; describe('Observable.prototype.delay()', function () { it('should delay by specified timeframe', function () { - var source = hot('--a--|'); - var expected = '-----a--|'; + var e1 = hot('--a--b--|'); + var expected = '-----a--b--|'; + var subs = '^ !'; - expectObservable(source.delay(30, rxTestScheduler)).toBe(expected); + expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should delay by absolute time period', function () { + var e1 = hot('--a--b--|'); + var expected = '-----a--b--|'; + var subs = '^ !'; + var absoluteDelay = new Date(rxTestScheduler.now() + 30); + + expectObservable(e1.delay(absoluteDelay, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should delay by absolute time period after subscription', function () { + var e1 = hot('---^--a--b--|'); + var expected = '------a--b--|'; + var subs = '^ !'; + var absoluteDelay = new Date(rxTestScheduler.now() + 30); + + expectObservable(e1.delay(absoluteDelay, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should raise error when source raises error', function () { + var e1 = hot('---a---b---#'); + var expected = '------a---b#'; + var subs = '^ !'; + + expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should raise error when source raises error', function () { + var e1 = hot('--a--b--#'); + var expected = '-----a--#'; + var subs = '^ !'; + var absoluteDelay = new Date(rxTestScheduler.now() + 30); + + expectObservable(e1.delay(absoluteDelay, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should raise error when source raises error after subscription', function () { + var e1 = hot('---^---a---b---#'); + var expected = '-------a---b#'; + var e1Sub = '^ !'; + var absoluteDelay = new Date(rxTestScheduler.now() + 30); + + expectObservable(e1.delay(absoluteDelay, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1Sub); + }); + + it('should delay when source does not emits', function () { + var e1 = hot('----|'); + var expected = '-------|'; + var subs = '^ !'; + + expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should delay when source is empty', function () { + var e1 = Observable.empty(); + var expected = '---|'; + + expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected); + }); + + it('should not complete when source does not completes', function () { + var e1 = hot('---a---b-'); + var expected = '------a---b-'; + var unsub = '----------------!'; + var subs = '^ !'; + + expectObservable(e1.delay(30, rxTestScheduler), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should not complete when source never completes', function () { + var e1 = Observable.never(); + var expected = '-'; + + expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected); }); }); \ No newline at end of file diff --git a/src/operators/delay.ts b/src/operators/delay.ts index 5b80045fd4..2cdc1944ce 100644 --- a/src/operators/delay.ts +++ b/src/operators/delay.ts @@ -1,22 +1,20 @@ import Operator from '../Operator'; -import Observer from '../Observer'; import Scheduler from '../Scheduler'; import Subscriber from '../Subscriber'; import Notification from '../Notification'; import immediate from '../schedulers/immediate'; +import isDate from '../util/isDate'; -export default function delay(delay: number, scheduler: Scheduler = immediate) { - return this.lift(new DelayOperator(delay, scheduler)); +export default function delay(delay: number|Date, + scheduler: Scheduler = immediate) { + let absoluteDelay = isDate(delay); + let delayFor = absoluteDelay ? (+delay - scheduler.now()) : delay; + return this.lift(new DelayOperator(delayFor, scheduler)); } class DelayOperator implements Operator { - - delay: number; - scheduler: Scheduler; - - constructor(delay: number, scheduler: Scheduler) { - this.delay = delay; - this.scheduler = scheduler; + constructor(private delay: number, + private scheduler: Scheduler) { } call(subscriber: Subscriber): Subscriber { @@ -25,21 +23,20 @@ class DelayOperator implements Operator { } class DelaySubscriber extends Subscriber { + private queue: Array = []; + private active: boolean = false; + private errored: boolean = false; - protected delay: number; - protected queue: Array = []; - protected scheduler: Scheduler; - protected active: boolean = false; - protected errored: boolean = false; - - static dispatch(state) { + private static dispatch(state): void { const source = state.source; const queue = source.queue; const scheduler = state.scheduler; const destination = state.destination; + while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) { queue.shift().notification.observe(destination); } + if (queue.length > 0) { let delay = Math.max(0, queue[0].time - scheduler.now()); ( this).schedule(state, delay); @@ -48,56 +45,50 @@ class DelaySubscriber extends Subscriber { } } - constructor(destination: Subscriber, delay: number, scheduler: Scheduler) { + constructor(destination: Subscriber, + private delay: number, + private scheduler: Scheduler) { super(destination); - this.delay = delay; - this.scheduler = scheduler; } - _next(x) { - if (this.errored) { + private _schedule(scheduler: Scheduler): void { + this.active = true; + this.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, { + source: this, destination: this.destination, scheduler: scheduler + })); + } + + private scheduleNotification(notification: Notification): void { + if (this.errored === true) { return; } + const scheduler = this.scheduler; - this.queue.push(new DelayMessage(scheduler.now() + this.delay, Notification.createNext(x))); + let message = new DelayMessage(scheduler.now() + this.delay, notification); + this.queue.push(message); + if (this.active === false) { this._schedule(scheduler); } } - _error(e) { - const scheduler = this.scheduler; - this.errored = true; - this.queue = [new DelayMessage(scheduler.now() + this.delay, Notification.createError(e))]; - if (this.active === false) { - this._schedule(scheduler); - } + _next(value: T) { + this.scheduleNotification(Notification.createNext(value)); } - _complete() { - if (this.errored) { - return; - } - const scheduler = this.scheduler; - this.queue.push(new DelayMessage(scheduler.now() + this.delay, Notification.createComplete())); - if (this.active === false) { - this._schedule(scheduler); - } + _error(err) { + this.errored = true; + this.queue = []; + this.destination.error(err); } - _schedule(scheduler) { - this.active = true; - this.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, { - source: this, destination: this.destination, scheduler: scheduler - })); + _complete() { + this.scheduleNotification(Notification.createComplete()); } } class DelayMessage { - time: number; - notification: any; - constructor(time: number, notification: any) { - this.time = time; - this.notification = notification; + constructor(private time: number, + private notification: any) { } } \ No newline at end of file