Skip to content

Commit 3583cd3

Browse files
trxcllntbenlesh
authored andcommitted
feat(takeLast): adds takeLast operator.
1 parent 59a8c18 commit 3583cd3

File tree

10 files changed

+252
-0
lines changed

10 files changed

+252
-0
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
var RxOld = require('rx');
2+
var RxNew = require('../../../../index');
3+
4+
module.exports = function (suite) {
5+
var oldTakeLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.currentThread).takeLast(50);
6+
var newTakeLastWithImmediateScheduler = RxNew.Observable.range(0, 500, RxNew.Scheduler.queue).takeLast(50);
7+
8+
function _next(x) { }
9+
function _error(e) { }
10+
function _complete() { }
11+
return suite
12+
.add('old take with immediate scheduler', function () {
13+
oldTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete);
14+
})
15+
.add('new take with immediate scheduler', function () {
16+
newTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete);
17+
});
18+
};
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
var RxOld = require('rx');
2+
var RxNew = require('../../../../index');
3+
4+
module.exports = function (suite) {
5+
var oldTakeLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.immediate).takeLast(50);
6+
var newTakeLastWithImmediateScheduler = RxNew.Observable.range(0, 500).takeLast(50);
7+
8+
function _next(x) { }
9+
function _error(e) { }
10+
function _complete() { }
11+
return suite
12+
.add('old take with immediate scheduler', function () {
13+
oldTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete);
14+
})
15+
.add('new take with immediate scheduler', function () {
16+
newTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete);
17+
});
18+
};

spec/operators/takeLast-spec.js

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */
2+
var Rx = require('../../dist/cjs/Rx');
3+
var Observable = Rx.Observable;
4+
5+
describe('Observable.prototype.takeLast()', function () {
6+
it.asDiagram('takeLast(2)')('should take two values of an observable with many values', function () {
7+
var e1 = cold('--a-----b----c---d--| ');
8+
var e1subs = '^ ! ';
9+
var expected = '--------------------(cd|)';
10+
11+
expectObservable(e1.takeLast(2)).toBe(expected);
12+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
13+
});
14+
15+
it('should work with empty', function () {
16+
var e1 = cold('|');
17+
var e1subs = '(^!)';
18+
var expected = '|';
19+
20+
expectObservable(e1.takeLast(42)).toBe(expected);
21+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
22+
});
23+
24+
it('should go on forever on never', function () {
25+
var e1 = cold('-');
26+
var e1subs = '^';
27+
var expected = '-';
28+
29+
expectObservable(e1.takeLast(42)).toBe(expected);
30+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
31+
});
32+
33+
it('should be empty on takeLast(0)', function () {
34+
var e1 = hot('--a--^--b----c---d--|');
35+
var e1subs = []; // Don't subscribe at all
36+
var expected = '|';
37+
38+
expectObservable(e1.takeLast(0)).toBe(expected);
39+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
40+
});
41+
42+
it('should take one value from an observable with one value', function () {
43+
var e1 = hot('---(a|)');
44+
var e1subs = '^ ! ';
45+
var expected = '---(a|)';
46+
47+
expectObservable(e1.takeLast(1)).toBe(expected);
48+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
49+
});
50+
51+
it('should take one value from an observable with many values', function () {
52+
var e1 = hot('--a--^--b----c---d--| ');
53+
var e1subs = '^ ! ';
54+
var expected = '---------------(d|)';
55+
56+
expectObservable(e1.takeLast(1)).toBe(expected);
57+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
58+
});
59+
60+
it('should error on empty', function () {
61+
var e1 = hot('--a--^----|');
62+
var e1subs = '^ !';
63+
var expected = '-----|';
64+
65+
expectObservable(e1.takeLast(42)).toBe(expected);
66+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
67+
});
68+
69+
it('should propagate error from the source observable', function () {
70+
var e1 = hot('---^---#', null, 'too bad');
71+
var e1subs = '^ !';
72+
var expected = '----#';
73+
74+
expectObservable(e1.takeLast(42)).toBe(expected, null, 'too bad');
75+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
76+
});
77+
78+
it('should propagate error from an observable with values', function () {
79+
var e1 = hot('---^--a--b--#');
80+
var e1subs = '^ !';
81+
var expected = '---------#';
82+
83+
expectObservable(e1.takeLast(42)).toBe(expected);
84+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
85+
});
86+
87+
it('should allow unsubscribing explicitly and early', function () {
88+
var e1 = hot('---^--a--b-----c--d--e--|');
89+
var unsub = ' ! ';
90+
var e1subs = '^ ! ';
91+
var expected = '---------- ';
92+
93+
expectObservable(e1.takeLast(42), unsub).toBe(expected);
94+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
95+
});
96+
97+
it('should work with throw', function () {
98+
var e1 = cold('#');
99+
var e1subs = '(^!)';
100+
var expected = '#';
101+
102+
expectObservable(e1.takeLast(42)).toBe(expected);
103+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
104+
});
105+
106+
it('should throw if total is less than zero', function () {
107+
expect(function () { Observable.range(0,10).takeLast(-1); })
108+
.toThrow(new Rx.ArgumentOutOfRangeError());
109+
});
110+
111+
it('should not break unsubscription chain when unsubscribed explicitly', function () {
112+
var e1 = hot('---^--a--b-----c--d--e--|');
113+
var unsub = ' ! ';
114+
var e1subs = '^ ! ';
115+
var expected = '---------- ';
116+
117+
var result = e1
118+
.mergeMap(function (x) { return Observable.of(x); })
119+
.takeLast(42)
120+
.mergeMap(function (x) { return Observable.of(x); });
121+
122+
expectObservable(result, unsub).toBe(expected);
123+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
124+
});
125+
});

src/CoreOperators.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ export interface CoreOperators<T> {
7676
switchMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
7777
switchMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
7878
take?: (count: number) => Observable<T>;
79+
takeLast?: (count: number) => Observable<T>;
7980
takeUntil?: (notifier: Observable<any>) => Observable<T>;
8081
takeWhile?: (predicate: (value: T, index: number) => boolean) => Observable<T>;
8182
throttle?: (durationSelector: (value: T) => Observable<any> | Promise<any>) => Observable<T>;

src/Observable.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ export class Observable<T> implements CoreOperators<T> {
261261
switchMap: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
262262
switchMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
263263
take: (count: number) => Observable<T>;
264+
takeLast: (count: number) => Observable<T>;
264265
takeUntil: (notifier: Observable<any>) => Observable<T>;
265266
takeWhile: (predicate: (value: T, index: number) => boolean) => Observable<T>;
266267
throttle: (durationSelector: (value: T) => Observable<any> | Promise<any>) => Observable<T>;

src/Rx.DOM.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import './add/operator/switch';
9191
import './add/operator/switchMap';
9292
import './add/operator/switchMapTo';
9393
import './add/operator/take';
94+
import './add/operator/takeLast';
9495
import './add/operator/takeUntil';
9596
import './add/operator/takeWhile';
9697
import './add/operator/throttle';

src/Rx.KitchenSink.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ import './add/operator/switch';
118118
import './add/operator/switchMap';
119119
import './add/operator/switchMapTo';
120120
import './add/operator/take';
121+
import './add/operator/takeLast';
121122
import './add/operator/takeUntil';
122123
import './add/operator/takeWhile';
123124
import './add/operator/throttle';

src/Rx.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ import './add/operator/switch';
9393
import './add/operator/switchMap';
9494
import './add/operator/switchMapTo';
9595
import './add/operator/take';
96+
import './add/operator/takeLast';
9697
import './add/operator/takeUntil';
9798
import './add/operator/takeWhile';
9899
import './add/operator/throttle';

src/add/operator/takeLast.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/**
2+
* Everything in this file is generated by the 'tools/generate-operator-patches.ts' script.
3+
* Any manual edits to this file will be lost next time the script is run.
4+
**/
5+
import {Observable} from '../../Observable';
6+
import {takeLast} from '../../operator/takeLast';
7+
8+
Observable.prototype.takeLast = takeLast;
9+
10+
export var _void: void;

src/operator/takeLast.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import {Operator} from '../Operator';
2+
import {Subscriber} from '../Subscriber';
3+
import {ArgumentOutOfRangeError} from '../util/ArgumentOutOfRangeError';
4+
import {EmptyObservable} from '../observable/empty';
5+
import {Observable} from '../Observable';
6+
7+
export function takeLast<T>(total: number): Observable<T> {
8+
if (total === 0) {
9+
return new EmptyObservable<T>();
10+
} else {
11+
return this.lift(new TakeLastOperator(total));
12+
}
13+
}
14+
15+
class TakeLastOperator<T> implements Operator<T, T> {
16+
constructor(private total: number) {
17+
if (this.total < 0) {
18+
throw new ArgumentOutOfRangeError;
19+
}
20+
}
21+
22+
call(subscriber: Subscriber<T>): Subscriber<T> {
23+
return new TakeLastSubscriber(subscriber, this.total);
24+
}
25+
}
26+
27+
class TakeLastSubscriber<T> extends Subscriber<T> {
28+
private ring: T[];
29+
private count: number = 0;
30+
private index: number = 0;
31+
32+
constructor(destination: Subscriber<T>, private total: number) {
33+
super(destination);
34+
this.ring = new Array(total);
35+
}
36+
37+
protected _next(value: T): void {
38+
39+
let index = this.index;
40+
const ring = this.ring;
41+
const total = this.total;
42+
const count = this.count;
43+
44+
if (total > 1) {
45+
if (count < total) {
46+
this.count = count + 1;
47+
this.index = index + 1;
48+
} else if (index === 0) {
49+
this.index = ++index;
50+
} else if (index < total) {
51+
this.index = index + 1;
52+
} else {
53+
this.index = index = 0;
54+
}
55+
} else if (count < total) {
56+
this.count = total;
57+
}
58+
59+
ring[index] = value;
60+
}
61+
62+
protected _complete(): void {
63+
64+
let iter = -1;
65+
const { ring, count, total, destination } = this;
66+
let index = (total === 1 || count < total) ? 0 : this.index - 1;
67+
68+
while (++iter < count) {
69+
if (iter + index === total) {
70+
index = total - iter;
71+
}
72+
destination.next(ring[iter + index]);
73+
}
74+
destination.complete();
75+
}
76+
}

0 commit comments

Comments
 (0)