Skip to content

Commit

Permalink
fix(bufferTime): inner intervals will now clean up properly
Browse files Browse the repository at this point in the history
- adds marble tests around bufferTime
- adds `maxFrames` property to `VirtualTimeScheduler` that will limit the execution of tests
  • Loading branch information
benlesh committed Sep 18, 2015
1 parent 97ce36c commit 4ef41b0
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 32 deletions.
103 changes: 75 additions & 28 deletions spec/operators/bufferTime-spec.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,81 @@
/* globals describe, it, expect */
/* globals describe, it, expect, hot, cold, rxTestScheduler, expectObservable */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.bufferTime', function () {
it('should emit buffers at intervals', function (done) {
var expected = [
[0, 1, 2],
[3, 4, 5],
[6, 7, 8]
];
Observable.interval(100)
.bufferTime(320)
.take(3)
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
describe('Observable.prototype.bufferTime', function () {
it('should emit buffers at intervals', function (){
var values = {
w: ['a','b'],
x: ['c','d','e'],
y: ['f', 'g'],
z: []
};
var e1 = hot('---a---b---c---d---e---f---g---|');
var expected = '----------w---------x---------y(z|)';

expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, values);
});

it('should emit buffers at intervals test 2', function() {
var e1 = hot('---------a---------b---------c---------d---------e---------g--------|')
var expected = '--------------------------------x-------------------------------y---(z|)';

expectObservable(e1.bufferTime(320, null, rxTestScheduler)).toBe(expected, { x: ['a','b','c'], y: ['d', 'e', 'g'], z: []});
});

it('should emit buffers that have been created at intervals and close after the specified delay', function (done) {
var expected = [
[0, 1, 2, 3, 4],
[2, 3, 4, 5, 6],
[4, 5, 6, 7, 8]
];
Observable.interval(100)
.bufferTime(520, 220)
.take(3)
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
it('should emit buffers that have been created at intervals and close after the specified delay', function (){
var e1 = hot('---a---b---c----d----e----f----g----h----i----(k|)');
// --------------------*--------------------*---- start interval
// ---------------------| timespans
// ---------------------|
// -----|
var expected = '---------------------x-------------------y----(z|)';
var values = {
x: ['a', 'b', 'c', 'd', 'e'],
y: ['e', 'f', 'g', 'h', 'i'],
z: ['i', 'k']
};
expectObservable(e1.bufferTime(210, 200, rxTestScheduler)).toBe(expected, values);
});

it('should handle empty', function (){
var e1 = Observable.empty();
expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe('(a|)', { a: [] });
});

it('should handle never', function () {
var e1 = Observable.never();
var expected = '----------a---------a---------a---------a---------a---------a---------a-----'; // 750 frame limit
expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, { a: [] });
});

it('should handle throw', function (){
var e1 = Observable.throw(new Error('haha'));
var expected = '#';
expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, undefined, new Error('haha'));
});

it('should handle errors', function () {
var values = {
w: ['a','b']
};
var e1 = hot('---a---b---c---#---e---f---g---|');
var expected = '----------w----#';

expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, values);
});

it('should emit buffers that have been created at intervals and close after the specified delay with errors', function (){
var e1 = hot('---a---b---c----d----e----f----g----h----i--#');
// --------------------*--------------------*---- start interval
// ---------------------| timespans
// ---------------------|
// -----|
var expected = '---------------------x-------------------y--#';
var values = {
x: ['a', 'b', 'c', 'd', 'e'],
y: ['e', 'f', 'g', 'h', 'i']
};
expectObservable(e1.bufferTime(210, 200, rxTestScheduler)).toBe(expected, values);
});
});
10 changes: 7 additions & 3 deletions src/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ function dispatchBufferTimeSpanOnly(state) {
}

state.buffer = subscriber.openBuffer();
(<any>this).schedule(state, state.bufferTimeSpan);
if(!subscriber.isUnsubscribed) {
(<any>this).schedule(state, state.bufferTimeSpan);
}
}

function dispatchBufferCreation(state) {
let { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
let buffer = subscriber.openBuffer();
var action = <Action>this;
action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer }));
action.schedule(state, bufferCreationInterval);
if(!subscriber.isUnsubscribed) {
action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer }));
action.schedule(state, bufferCreationInterval);
}
}

function dispatchBufferClose({ subscriber, buffer }) {
Expand Down
9 changes: 8 additions & 1 deletion src/schedulers/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,25 @@ export default class VirtualTimeScheduler implements Scheduler {
index: number = 0;
sorted: boolean = false;
frame: number = 0;
maxFrames: number = 750;

now() {
return 0;
}

flush() {
const actions = this.actions;
const maxFrames = this.maxFrames;
while (actions.length > 0) {
let action = actions.shift();
this.frame = action.delay;
action.execute();
if(this.frame <= maxFrames) {
action.execute();
} else {
break;
}
}
actions.length = 0;
this.frame = 0;
}

Expand Down

0 comments on commit 4ef41b0

Please sign in to comment.