Skip to content

Commit 6b77e69

Browse files
committed
feat(sampleTime): reimplement sampleTime with RxJS 4 behavior
BREAKING CHANGE: `sampleTime` now has the same behavior `sample(number, scheduler)` did in RxJS 4
1 parent e93bffc commit 6b77e69

File tree

6 files changed

+163
-0
lines changed

6 files changed

+163
-0
lines changed

spec/operators/sampleTime-spec.js

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot, rxTestScheduler */
2+
var Rx = require('../../dist/cjs/Rx.KitchenSink');
3+
var Observable = Rx.Observable;
4+
5+
describe('Observable.prototype.sampleTime', function () {
6+
it('should get samples on a delay', function () {
7+
var e1 = hot('----a-^--b----c----d----e----f----|');
8+
var e1subs = '^ !';
9+
var expected = '-----------c----------e-----|';
10+
// timer -----------!----------!---------
11+
12+
expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
13+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
14+
});
15+
16+
it('should sample nothing if new value has not arrived', function () {
17+
var e1 = hot('----a-^--b----c--------------f----|');
18+
var e1subs = '^ !';
19+
var expected = '-----------c----------------|';
20+
// timer -----------!----------!---------
21+
22+
expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
23+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
24+
});
25+
26+
it('should sample ifnew value has arrived, even if it is the same value', function () {
27+
var e1 = hot('----a-^--b----c----------c---f----|');
28+
var e1subs = '^ !';
29+
var expected = '-----------c----------c-----|';
30+
// timer -----------!----------!---------
31+
32+
expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
33+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
34+
});
35+
36+
it('should sample nothing if source has not nexted by time of sample', function () {
37+
var e1 = hot('----a-^-------------b-------------|');
38+
var e1subs = '^ !';
39+
var expected = '----------------------b-----|';
40+
// timer -----------!----------!---------
41+
42+
expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
43+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
44+
});
45+
46+
it('should raise error if source raises error', function () {
47+
var e1 = hot('----a-^--b----c----d----#');
48+
var e1subs = '^ !';
49+
var expected = '-----------c------#';
50+
// timer -----------!----------!---------
51+
52+
expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
53+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
54+
});
55+
56+
it('should allow unsubscribing explicitly and early', function () {
57+
var e1 = hot('----a-^--b----c----d----e----f----|');
58+
var unsub = ' ! ';
59+
var e1subs = '^ ! ';
60+
var expected = '-----------c----- ';
61+
// timer -----------!----------!---------
62+
63+
expectObservable(e1.sampleTime(110, rxTestScheduler), unsub).toBe(expected);
64+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
65+
});
66+
67+
it('should not break unsubscription chains when result is unsubscribed explicitly', function () {
68+
var e1 = hot('----a-^--b----c----d----e----f----|');
69+
var e1subs = '^ ! ';
70+
// timer -----------!----------!---------
71+
var expected = '-----------c----- ';
72+
var unsub = ' ! ';
73+
74+
var result = e1
75+
.mergeMap(function (x) { return Observable.of(x); })
76+
.sampleTime(110, rxTestScheduler)
77+
.mergeMap(function (x) { return Observable.of(x); });
78+
79+
expectObservable(result, unsub).toBe(expected);
80+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
81+
});
82+
83+
it('should completes if source does not emits', function () {
84+
var e1 = cold('|');
85+
var e1subs = '(^!)';
86+
var expected = '|';
87+
88+
expectObservable(e1.sampleTime(60, rxTestScheduler)).toBe(expected);
89+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
90+
});
91+
92+
it('should raise error if source throws immediately', function () {
93+
var e1 = cold('#');
94+
var e1subs = '(^!)';
95+
var expected = '#';
96+
97+
expectObservable(e1.sampleTime(60, rxTestScheduler)).toBe(expected);
98+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
99+
});
100+
101+
it('should not completes if source does not complete', function () {
102+
var e1 = cold('-');
103+
var e1subs = '^';
104+
var expected = '-';
105+
106+
expectObservable(e1.sampleTime(60, rxTestScheduler)).toBe(expected);
107+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
108+
});
109+
});

src/Observable.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ export class Observable<T> implements CoreOperators<T> {
247247
retry: (count?: number) => Observable<T>;
248248
retryWhen: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
249249
sample: (notifier: Observable<any>) => Observable<T>;
250+
sampleTime: (delay: number, scheduler?: Scheduler) => Observable<T>;
250251
scan: <R>(accumulator: (acc: R, x: T) => R, seed?: T | R) => Observable<R>;
251252
share: () => Observable<T>;
252253
single: (predicate?: (value: T, index: number) => boolean) => Observable<T>;

src/Rx.KitchenSink.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ import './add/operator/repeat';
103103
import './add/operator/retry';
104104
import './add/operator/retryWhen';
105105
import './add/operator/sample';
106+
import './add/operator/sampleTime';
106107
import './add/operator/scan';
107108
import './add/operator/share';
108109
import './add/operator/single';

src/Rx.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ import './add/operator/repeat';
7575
import './add/operator/retry';
7676
import './add/operator/retryWhen';
7777
import './add/operator/sample';
78+
import './add/operator/sampleTime';
7879
import './add/operator/scan';
7980
import './add/operator/share';
8081
import './add/operator/single';

src/add/operator/sampleTime.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import {Observable} from '../../Observable';
2+
import {sampleTime} from '../../operator/sampleTime';
3+
Observable.prototype.sampleTime = sampleTime;
4+
5+
export var _void: void;

src/operator/sampleTime.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import {Observable} from '../Observable';
2+
import {Operator} from '../Operator';
3+
import {Subscriber} from '../Subscriber';
4+
import {Scheduler} from '../Scheduler';
5+
import {asap} from '../scheduler/asap';
6+
7+
export function sampleTime<T>(delay: number, scheduler: Scheduler = asap): Observable<T> {
8+
return this.lift(new SampleTimeOperator(delay, scheduler));
9+
}
10+
11+
class SampleTimeOperator<T, R> implements Operator<T, R> {
12+
constructor(private delay: number, private scheduler: Scheduler) {
13+
}
14+
15+
call(subscriber: Subscriber<R>) {
16+
return new SampleTimeSubscriber(subscriber, this.delay, this.scheduler);
17+
}
18+
}
19+
20+
class SampleTimeSubscriber<T> extends Subscriber<T> {
21+
lastValue: T;
22+
hasValue: boolean = false;
23+
24+
constructor(destination: Subscriber<T>, private delay: number, private scheduler: Scheduler) {
25+
super(destination);
26+
this.add(scheduler.schedule(dispatchNotification, delay, { subscriber: this, delay }));
27+
}
28+
29+
_next(value: T) {
30+
this.lastValue = value;
31+
this.hasValue = true;
32+
}
33+
34+
notifyNext() {
35+
if (this.hasValue) {
36+
this.hasValue = false;
37+
this.destination.next(this.lastValue);
38+
}
39+
}
40+
}
41+
42+
function dispatchNotification<T>(state) {
43+
let { subscriber, delay } = state;
44+
subscriber.notifyNext();
45+
(<any>this).schedule(state, delay);
46+
}

0 commit comments

Comments
 (0)