Skip to content

Commit

Permalink
fix(delay): accepts absolute time delay
Browse files Browse the repository at this point in the history
- delay operator accepts absolute time as well as relative time
- add additional test coverages

relates to #549
  • Loading branch information
kwonoj authored and benlesh committed Oct 26, 2015
1 parent f67a596 commit b109100
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 53 deletions.
93 changes: 89 additions & 4 deletions spec/operators/delay-spec.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,96 @@
/* globals describe, it, expect, expectObservable, hot, rxTestScheduler */
/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.delay()', function () {
it('should delay by specified timeframe', function () {
var source = hot('--a--|');
var expected = '-----a--|';
var e1 = hot('--a--b--|');
var expected = '-----a--b--|';
var subs = '^ !';

expectObservable(source.delay(30, rxTestScheduler)).toBe(expected);
expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should delay by absolute time period', function () {
var e1 = hot('--a--b--|');
var expected = '-----a--b--|';
var subs = '^ !';
var absoluteDelay = new Date(rxTestScheduler.now() + 30);

expectObservable(e1.delay(absoluteDelay, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should delay by absolute time period after subscription', function () {
var e1 = hot('---^--a--b--|');
var expected = '------a--b--|';
var subs = '^ !';
var absoluteDelay = new Date(rxTestScheduler.now() + 30);

expectObservable(e1.delay(absoluteDelay, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should raise error when source raises error', function () {
var e1 = hot('---a---b---#');
var expected = '------a---b#';
var subs = '^ !';

expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should raise error when source raises error', function () {
var e1 = hot('--a--b--#');
var expected = '-----a--#';
var subs = '^ !';
var absoluteDelay = new Date(rxTestScheduler.now() + 30);

expectObservable(e1.delay(absoluteDelay, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should raise error when source raises error after subscription', function () {
var e1 = hot('---^---a---b---#');
var expected = '-------a---b#';
var e1Sub = '^ !';
var absoluteDelay = new Date(rxTestScheduler.now() + 30);

expectObservable(e1.delay(absoluteDelay, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1Sub);
});

it('should delay when source does not emits', function () {
var e1 = hot('----|');
var expected = '-------|';
var subs = '^ !';

expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should delay when source is empty', function () {
var e1 = Observable.empty();
var expected = '---|';

expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected);
});

it('should not complete when source does not completes', function () {
var e1 = hot('---a---b-');
var expected = '------a---b-';
var unsub = '----------------!';
var subs = '^ !';

expectObservable(e1.delay(30, rxTestScheduler), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should not complete when source never completes', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.delay(30, rxTestScheduler)).toBe(expected);
});
});
89 changes: 40 additions & 49 deletions src/operators/delay.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Scheduler from '../Scheduler';
import Subscriber from '../Subscriber';
import Notification from '../Notification';
import immediate from '../schedulers/immediate';
import isDate from '../util/isDate';

export default function delay<T>(delay: number, scheduler: Scheduler = immediate) {
return this.lift(new DelayOperator(delay, scheduler));
export default function delay<T>(delay: number|Date,
scheduler: Scheduler = immediate) {
let absoluteDelay = isDate(delay);
let delayFor = absoluteDelay ? (+delay - scheduler.now()) : <number>delay;
return this.lift(new DelayOperator(delayFor, scheduler));
}

class DelayOperator<T, R> implements Operator<T, R> {

delay: number;
scheduler: Scheduler;

constructor(delay: number, scheduler: Scheduler) {
this.delay = delay;
this.scheduler = scheduler;
constructor(private delay: number,
private scheduler: Scheduler) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
Expand All @@ -25,21 +23,20 @@ class DelayOperator<T, R> implements Operator<T, R> {
}

class DelaySubscriber<T> extends Subscriber<T> {
private queue: Array<any> = [];
private active: boolean = false;
private errored: boolean = false;

protected delay: number;
protected queue: Array<any> = [];
protected scheduler: Scheduler;
protected active: boolean = false;
protected errored: boolean = false;

static dispatch(state) {
private static dispatch(state): void {
const source = state.source;
const queue = source.queue;
const scheduler = state.scheduler;
const destination = state.destination;

while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) {
queue.shift().notification.observe(destination);
}

if (queue.length > 0) {
let delay = Math.max(0, queue[0].time - scheduler.now());
(<any> this).schedule(state, delay);
Expand All @@ -48,56 +45,50 @@ class DelaySubscriber<T> extends Subscriber<T> {
}
}

constructor(destination: Subscriber<T>, delay: number, scheduler: Scheduler) {
constructor(destination: Subscriber<T>,
private delay: number,
private scheduler: Scheduler) {
super(destination);
this.delay = delay;
this.scheduler = scheduler;
}

_next(x) {
if (this.errored) {
private _schedule(scheduler: Scheduler): void {
this.active = true;
this.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, {
source: this, destination: this.destination, scheduler: scheduler
}));
}

private scheduleNotification(notification: Notification<any>): void {
if (this.errored === true) {
return;
}

const scheduler = this.scheduler;
this.queue.push(new DelayMessage<T>(scheduler.now() + this.delay, Notification.createNext(x)));
let message = new DelayMessage<T>(scheduler.now() + this.delay, notification);
this.queue.push(message);

if (this.active === false) {
this._schedule(scheduler);
}
}

_error(e) {
const scheduler = this.scheduler;
this.errored = true;
this.queue = [new DelayMessage<T>(scheduler.now() + this.delay, Notification.createError(e))];
if (this.active === false) {
this._schedule(scheduler);
}
_next(value: T) {
this.scheduleNotification(Notification.createNext(value));
}

_complete() {
if (this.errored) {
return;
}
const scheduler = this.scheduler;
this.queue.push(new DelayMessage<T>(scheduler.now() + this.delay, Notification.createComplete()));
if (this.active === false) {
this._schedule(scheduler);
}
_error(err) {
this.errored = true;
this.queue = [];
this.destination.error(err);
}

_schedule(scheduler) {
this.active = true;
this.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, {
source: this, destination: this.destination, scheduler: scheduler
}));
_complete() {
this.scheduleNotification(Notification.createComplete());
}
}

class DelayMessage<T> {
time: number;
notification: any;
constructor(time: number, notification: any) {
this.time = time;
this.notification = notification;
constructor(private time: number,
private notification: any) {
}
}

0 comments on commit b109100

Please sign in to comment.