Skip to content

Commit b23daf1

Browse files
committed
feat(TestScheduler): add TestScheduler
adds a test scheduler that features a marble parser as well as cold and hot observable creation methods closes #270
1 parent 96f9386 commit b23daf1

File tree

4 files changed

+140
-2
lines changed

4 files changed

+140
-2
lines changed

spec/schedulers/TestScheduler-spec.js

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/* globals describe, it, expect */
2+
var Rx = require('../../dist/cjs/Rx');
3+
var TestScheduler = Rx.TestScheduler;
4+
var Notification = Rx.Notification;
5+
6+
describe('TestScheduler', function() {
7+
it('should exist', function () {
8+
expect(typeof TestScheduler).toBe('function');
9+
});
10+
11+
describe('parseMarbles()', function () {
12+
it('should parse a marble string into a series of notifications and types', function () {
13+
var result = TestScheduler.parseMarbles('-------a---b---|', { a: 'A', b: 'B' });
14+
expect(result).toDeepEqual([
15+
{ frame: 70, notification: Notification.createNext('A') },
16+
{ frame: 110, notification: Notification.createNext('B') },
17+
{ frame: 150, notification: Notification.createComplete() }
18+
]);
19+
});
20+
21+
it('should parse a marble string with a subscription point', function () {
22+
var result = TestScheduler.parseMarbles('---^---a---b---|', { a: 'A', b: 'B' });
23+
expect(result).toDeepEqual([
24+
{ frame: 40, notification: Notification.createNext('A') },
25+
{ frame: 80, notification: Notification.createNext('B') },
26+
{ frame: 120, notification: Notification.createComplete() }
27+
]);
28+
});
29+
30+
it('should parse a marble string with an error', function () {
31+
var result = TestScheduler.parseMarbles('-------a---b---#', { a: 'A', b: 'B' }, 'omg error!');
32+
expect(result).toDeepEqual([
33+
{ frame: 70, notification: Notification.createNext('A') },
34+
{ frame: 110, notification: Notification.createNext('B') },
35+
{ frame: 150, notification: Notification.createError('omg error!') }
36+
]);
37+
});
38+
});
39+
40+
describe('createColdObservable()', function () {
41+
it('should create a cold observable', function () {
42+
var expected = ['A', 'B'];
43+
var scheduler = new TestScheduler();
44+
var source = scheduler.createColdObservable('--a---b--|', { a: 'A', b: 'B' });
45+
expect(source instanceof Rx.Observable).toBe(true);
46+
source.subscribe(function (x) {
47+
expect(x).toBe(expected.shift());
48+
});
49+
scheduler.flush();
50+
expect(expected.length).toBe(0);
51+
});
52+
});
53+
54+
describe('createHotObservable()', function () {
55+
it('should create a cold observable', function () {
56+
var expected = ['A', 'B'];
57+
var scheduler = new TestScheduler();
58+
var source = scheduler.createHotObservable('--a---b--|', { a: 'A', b: 'B' });
59+
expect(source instanceof Rx.Subject).toBe(true);
60+
source.subscribe(function (x) {
61+
expect(x).toBe(expected.shift());
62+
});
63+
scheduler.flush();
64+
expect(expected.length).toBe(0);
65+
});
66+
});
67+
});

src/Rx.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import Subject from './Subject';
22
import ImmediateScheduler from './schedulers/ImmediateScheduler';
33
import NextTickScheduler from './schedulers/NextTickScheduler';
44
import VirtualTimeScheduler from './schedulers/VirtualTimeScheduler';
5+
import TestScheduler from './schedulers/TestScheduler';
56
import immediate from './schedulers/immediate';
67
import nextTick from './schedulers/nextTick';
78
import Observable from './Observable';
@@ -229,5 +230,6 @@ export {
229230
BehaviorSubject,
230231
ConnectableObservable,
231232
Notification,
232-
VirtualTimeScheduler
233+
VirtualTimeScheduler,
234+
TestScheduler
233235
};

src/schedulers/TestScheduler.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import Observable from '../Observable';
2+
import VirtualTimeScheduler from './VirtualTimeScheduler';
3+
import Notification from '../Notification';
4+
import Subject from '../Subject';
5+
6+
export default class TestScheduler extends VirtualTimeScheduler {
7+
createColdObservable(marbles: string, values?: any, error?: any) {
8+
if (marbles.indexOf('^') !== -1) {
9+
throw new Error('cold observable cannot have subscription offset "^"');
10+
}
11+
let messages = TestScheduler.parseMarbles(marbles, values, error);
12+
return Observable.create(subscriber => {
13+
messages.forEach(({ notification, frame }) => {
14+
this.schedule(() => {
15+
notification.observe(subscriber);
16+
}, frame);
17+
});
18+
});
19+
}
20+
21+
createHotObservable(marbles: string, values?: any, error?: any) {
22+
let messages = TestScheduler.parseMarbles(marbles, values, error);
23+
let subject = new Subject();
24+
messages.forEach(({ notification, frame }) => {
25+
this.schedule(() => {
26+
notification.observe(subject);
27+
}, frame);
28+
});
29+
return subject;
30+
}
31+
32+
static parseMarbles(marbles: string, values?: any, errorValue?: any) : ({ notification: Notification<any>, frame: number })[] {
33+
let len = marbles.length;
34+
let results: ({ notification: Notification<any>, frame: number })[] = [];
35+
let subIndex = marbles.indexOf('^');
36+
let frameOffset = subIndex === -1 ? 0 : (subIndex * -10);
37+
38+
for (let i = 0; i < len; i++) {
39+
let frame = i * 10;
40+
let notification;
41+
let c = marbles[i];
42+
switch (c) {
43+
case '-':
44+
break;
45+
case '|':
46+
notification = Notification.createComplete();
47+
break;
48+
case '^':
49+
break;
50+
case '#':
51+
notification = Notification.createError(errorValue || 'error');
52+
break;
53+
default:
54+
notification = Notification.createNext(values[c]);
55+
break;
56+
}
57+
58+
frame += frameOffset;
59+
60+
if (notification) {
61+
results.push({ notification, frame });
62+
}
63+
}
64+
return results;
65+
}
66+
}

src/schedulers/VirtualTimeScheduler.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export default class VirtualTimeScheduler implements Scheduler {
88
scheduled: boolean = false;
99
index: number = 0;
1010
sorted: boolean = false;
11+
frame: number = -1;
1112

1213
now() {
1314
return 0;
@@ -24,10 +25,12 @@ export default class VirtualTimeScheduler implements Scheduler {
2425

2526
flush() {
2627
this.sortActions();
27-
this.actions.forEach(action => {
28+
this.actions.forEach((action, frame) => {
29+
this.frame = frame;
2830
action.execute();
2931
});
3032
this.actions.length = 0;
33+
this.frame = -1;
3134
}
3235

3336
schedule<T>(work: (x?: any) => Subscription<T> | void, delay: number = 0, state?: any): Subscription<T> {

0 commit comments

Comments
 (0)