Skip to content

Commit

Permalink
feat(takeLast): adds takeLast operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt authored and benlesh committed Jan 25, 2016
1 parent 59a8c18 commit 3583cd3
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 0 deletions.
18 changes: 18 additions & 0 deletions perf/micro/current-thread-scheduler/operators/takelast.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldTakeLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.currentThread).takeLast(50);
var newTakeLastWithImmediateScheduler = RxNew.Observable.range(0, 500, RxNew.Scheduler.queue).takeLast(50);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old take with immediate scheduler', function () {
oldTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new take with immediate scheduler', function () {
newTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
18 changes: 18 additions & 0 deletions perf/micro/immediate-scheduler/operators/takelast.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldTakeLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.immediate).takeLast(50);
var newTakeLastWithImmediateScheduler = RxNew.Observable.range(0, 500).takeLast(50);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old take with immediate scheduler', function () {
oldTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new take with immediate scheduler', function () {
newTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
125 changes: 125 additions & 0 deletions spec/operators/takeLast-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.takeLast()', function () {
it.asDiagram('takeLast(2)')('should take two values of an observable with many values', function () {
var e1 = cold('--a-----b----c---d--| ');
var e1subs = '^ ! ';
var expected = '--------------------(cd|)';

expectObservable(e1.takeLast(2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should work with empty', function () {
var e1 = cold('|');
var e1subs = '(^!)';
var expected = '|';

expectObservable(e1.takeLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should go on forever on never', function () {
var e1 = cold('-');
var e1subs = '^';
var expected = '-';

expectObservable(e1.takeLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should be empty on takeLast(0)', function () {
var e1 = hot('--a--^--b----c---d--|');
var e1subs = []; // Don't subscribe at all
var expected = '|';

expectObservable(e1.takeLast(0)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should take one value from an observable with one value', function () {
var e1 = hot('---(a|)');
var e1subs = '^ ! ';
var expected = '---(a|)';

expectObservable(e1.takeLast(1)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should take one value from an observable with many values', function () {
var e1 = hot('--a--^--b----c---d--| ');
var e1subs = '^ ! ';
var expected = '---------------(d|)';

expectObservable(e1.takeLast(1)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should error on empty', function () {
var e1 = hot('--a--^----|');
var e1subs = '^ !';
var expected = '-----|';

expectObservable(e1.takeLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should propagate error from the source observable', function () {
var e1 = hot('---^---#', null, 'too bad');
var e1subs = '^ !';
var expected = '----#';

expectObservable(e1.takeLast(42)).toBe(expected, null, 'too bad');
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should propagate error from an observable with values', function () {
var e1 = hot('---^--a--b--#');
var e1subs = '^ !';
var expected = '---------#';

expectObservable(e1.takeLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow unsubscribing explicitly and early', function () {
var e1 = hot('---^--a--b-----c--d--e--|');
var unsub = ' ! ';
var e1subs = '^ ! ';
var expected = '---------- ';

expectObservable(e1.takeLast(42), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should work with throw', function () {
var e1 = cold('#');
var e1subs = '(^!)';
var expected = '#';

expectObservable(e1.takeLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should throw if total is less than zero', function () {
expect(function () { Observable.range(0,10).takeLast(-1); })
.toThrow(new Rx.ArgumentOutOfRangeError());
});

it('should not break unsubscription chain when unsubscribed explicitly', function () {
var e1 = hot('---^--a--b-----c--d--e--|');
var unsub = ' ! ';
var e1subs = '^ ! ';
var expected = '---------- ';

var result = e1
.mergeMap(function (x) { return Observable.of(x); })
.takeLast(42)
.mergeMap(function (x) { return Observable.of(x); });

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export interface CoreOperators<T> {
switchMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
switchMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take?: (count: number) => Observable<T>;
takeLast?: (count: number) => Observable<T>;
takeUntil?: (notifier: Observable<any>) => Observable<T>;
takeWhile?: (predicate: (value: T, index: number) => boolean) => Observable<T>;
throttle?: (durationSelector: (value: T) => Observable<any> | Promise<any>) => Observable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ export class Observable<T> implements CoreOperators<T> {
switchMap: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
switchMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take: (count: number) => Observable<T>;
takeLast: (count: number) => Observable<T>;
takeUntil: (notifier: Observable<any>) => Observable<T>;
takeWhile: (predicate: (value: T, index: number) => boolean) => Observable<T>;
throttle: (durationSelector: (value: T) => Observable<any> | Promise<any>) => Observable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Rx.DOM.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import './add/operator/switch';
import './add/operator/switchMap';
import './add/operator/switchMapTo';
import './add/operator/take';
import './add/operator/takeLast';
import './add/operator/takeUntil';
import './add/operator/takeWhile';
import './add/operator/throttle';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ import './add/operator/switch';
import './add/operator/switchMap';
import './add/operator/switchMapTo';
import './add/operator/take';
import './add/operator/takeLast';
import './add/operator/takeUntil';
import './add/operator/takeWhile';
import './add/operator/throttle';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import './add/operator/switch';
import './add/operator/switchMap';
import './add/operator/switchMapTo';
import './add/operator/take';
import './add/operator/takeLast';
import './add/operator/takeUntil';
import './add/operator/takeWhile';
import './add/operator/throttle';
Expand Down
10 changes: 10 additions & 0 deletions src/add/operator/takeLast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* Everything in this file is generated by the 'tools/generate-operator-patches.ts' script.
* Any manual edits to this file will be lost next time the script is run.
**/
import {Observable} from '../../Observable';
import {takeLast} from '../../operator/takeLast';

Observable.prototype.takeLast = takeLast;

export var _void: void;
76 changes: 76 additions & 0 deletions src/operator/takeLast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {ArgumentOutOfRangeError} from '../util/ArgumentOutOfRangeError';
import {EmptyObservable} from '../observable/empty';
import {Observable} from '../Observable';

export function takeLast<T>(total: number): Observable<T> {
if (total === 0) {
return new EmptyObservable<T>();
} else {
return this.lift(new TakeLastOperator(total));
}
}

class TakeLastOperator<T> implements Operator<T, T> {
constructor(private total: number) {
if (this.total < 0) {
throw new ArgumentOutOfRangeError;
}
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new TakeLastSubscriber(subscriber, this.total);
}
}

class TakeLastSubscriber<T> extends Subscriber<T> {
private ring: T[];
private count: number = 0;
private index: number = 0;

constructor(destination: Subscriber<T>, private total: number) {
super(destination);
this.ring = new Array(total);
}

protected _next(value: T): void {

let index = this.index;
const ring = this.ring;
const total = this.total;
const count = this.count;

if (total > 1) {
if (count < total) {
this.count = count + 1;
this.index = index + 1;
} else if (index === 0) {
this.index = ++index;
} else if (index < total) {
this.index = index + 1;
} else {
this.index = index = 0;
}
} else if (count < total) {
this.count = total;
}

ring[index] = value;
}

protected _complete(): void {

let iter = -1;
const { ring, count, total, destination } = this;
let index = (total === 1 || count < total) ? 0 : this.index - 1;

while (++iter < count) {
if (iter + index === total) {
index = total - iter;
}
destination.next(ring[iter + index]);
}
destination.complete();
}
}

0 comments on commit 3583cd3

Please sign in to comment.