-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(takeLast): adds takeLast operator.
- Loading branch information
Showing
10 changed files
with
252 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
var RxOld = require('rx'); | ||
var RxNew = require('../../../../index'); | ||
|
||
module.exports = function (suite) { | ||
var oldTakeLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.currentThread).takeLast(50); | ||
var newTakeLastWithImmediateScheduler = RxNew.Observable.range(0, 500, RxNew.Scheduler.queue).takeLast(50); | ||
|
||
function _next(x) { } | ||
function _error(e) { } | ||
function _complete() { } | ||
return suite | ||
.add('old take with immediate scheduler', function () { | ||
oldTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}) | ||
.add('new take with immediate scheduler', function () { | ||
newTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}); | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
var RxOld = require('rx'); | ||
var RxNew = require('../../../../index'); | ||
|
||
module.exports = function (suite) { | ||
var oldTakeLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.immediate).takeLast(50); | ||
var newTakeLastWithImmediateScheduler = RxNew.Observable.range(0, 500).takeLast(50); | ||
|
||
function _next(x) { } | ||
function _error(e) { } | ||
function _complete() { } | ||
return suite | ||
.add('old take with immediate scheduler', function () { | ||
oldTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}) | ||
.add('new take with immediate scheduler', function () { | ||
newTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}); | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.takeLast()', function () { | ||
it.asDiagram('takeLast(2)')('should take two values of an observable with many values', function () { | ||
var e1 = cold('--a-----b----c---d--| '); | ||
var e1subs = '^ ! '; | ||
var expected = '--------------------(cd|)'; | ||
|
||
expectObservable(e1.takeLast(2)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should work with empty', function () { | ||
var e1 = cold('|'); | ||
var e1subs = '(^!)'; | ||
var expected = '|'; | ||
|
||
expectObservable(e1.takeLast(42)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should go on forever on never', function () { | ||
var e1 = cold('-'); | ||
var e1subs = '^'; | ||
var expected = '-'; | ||
|
||
expectObservable(e1.takeLast(42)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should be empty on takeLast(0)', function () { | ||
var e1 = hot('--a--^--b----c---d--|'); | ||
var e1subs = []; // Don't subscribe at all | ||
var expected = '|'; | ||
|
||
expectObservable(e1.takeLast(0)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should take one value from an observable with one value', function () { | ||
var e1 = hot('---(a|)'); | ||
var e1subs = '^ ! '; | ||
var expected = '---(a|)'; | ||
|
||
expectObservable(e1.takeLast(1)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should take one value from an observable with many values', function () { | ||
var e1 = hot('--a--^--b----c---d--| '); | ||
var e1subs = '^ ! '; | ||
var expected = '---------------(d|)'; | ||
|
||
expectObservable(e1.takeLast(1)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should error on empty', function () { | ||
var e1 = hot('--a--^----|'); | ||
var e1subs = '^ !'; | ||
var expected = '-----|'; | ||
|
||
expectObservable(e1.takeLast(42)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should propagate error from the source observable', function () { | ||
var e1 = hot('---^---#', null, 'too bad'); | ||
var e1subs = '^ !'; | ||
var expected = '----#'; | ||
|
||
expectObservable(e1.takeLast(42)).toBe(expected, null, 'too bad'); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should propagate error from an observable with values', function () { | ||
var e1 = hot('---^--a--b--#'); | ||
var e1subs = '^ !'; | ||
var expected = '---------#'; | ||
|
||
expectObservable(e1.takeLast(42)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should allow unsubscribing explicitly and early', function () { | ||
var e1 = hot('---^--a--b-----c--d--e--|'); | ||
var unsub = ' ! '; | ||
var e1subs = '^ ! '; | ||
var expected = '---------- '; | ||
|
||
expectObservable(e1.takeLast(42), unsub).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should work with throw', function () { | ||
var e1 = cold('#'); | ||
var e1subs = '(^!)'; | ||
var expected = '#'; | ||
|
||
expectObservable(e1.takeLast(42)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should throw if total is less than zero', function () { | ||
expect(function () { Observable.range(0,10).takeLast(-1); }) | ||
.toThrow(new Rx.ArgumentOutOfRangeError()); | ||
}); | ||
|
||
it('should not break unsubscription chain when unsubscribed explicitly', function () { | ||
var e1 = hot('---^--a--b-----c--d--e--|'); | ||
var unsub = ' ! '; | ||
var e1subs = '^ ! '; | ||
var expected = '---------- '; | ||
|
||
var result = e1 | ||
.mergeMap(function (x) { return Observable.of(x); }) | ||
.takeLast(42) | ||
.mergeMap(function (x) { return Observable.of(x); }); | ||
|
||
expectObservable(result, unsub).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
/** | ||
* Everything in this file is generated by the 'tools/generate-operator-patches.ts' script. | ||
* Any manual edits to this file will be lost next time the script is run. | ||
**/ | ||
import {Observable} from '../../Observable'; | ||
import {takeLast} from '../../operator/takeLast'; | ||
|
||
Observable.prototype.takeLast = takeLast; | ||
|
||
export var _void: void; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import {Operator} from '../Operator'; | ||
import {Subscriber} from '../Subscriber'; | ||
import {ArgumentOutOfRangeError} from '../util/ArgumentOutOfRangeError'; | ||
import {EmptyObservable} from '../observable/empty'; | ||
import {Observable} from '../Observable'; | ||
|
||
export function takeLast<T>(total: number): Observable<T> { | ||
if (total === 0) { | ||
return new EmptyObservable<T>(); | ||
} else { | ||
return this.lift(new TakeLastOperator(total)); | ||
} | ||
} | ||
|
||
class TakeLastOperator<T> implements Operator<T, T> { | ||
constructor(private total: number) { | ||
if (this.total < 0) { | ||
throw new ArgumentOutOfRangeError; | ||
} | ||
} | ||
|
||
call(subscriber: Subscriber<T>): Subscriber<T> { | ||
return new TakeLastSubscriber(subscriber, this.total); | ||
} | ||
} | ||
|
||
class TakeLastSubscriber<T> extends Subscriber<T> { | ||
private ring: T[]; | ||
private count: number = 0; | ||
private index: number = 0; | ||
|
||
constructor(destination: Subscriber<T>, private total: number) { | ||
super(destination); | ||
this.ring = new Array(total); | ||
} | ||
|
||
protected _next(value: T): void { | ||
|
||
let index = this.index; | ||
const ring = this.ring; | ||
const total = this.total; | ||
const count = this.count; | ||
|
||
if (total > 1) { | ||
if (count < total) { | ||
this.count = count + 1; | ||
this.index = index + 1; | ||
} else if (index === 0) { | ||
this.index = ++index; | ||
} else if (index < total) { | ||
this.index = index + 1; | ||
} else { | ||
this.index = index = 0; | ||
} | ||
} else if (count < total) { | ||
this.count = total; | ||
} | ||
|
||
ring[index] = value; | ||
} | ||
|
||
protected _complete(): void { | ||
|
||
let iter = -1; | ||
const { ring, count, total, destination } = this; | ||
let index = (total === 1 || count < total) ? 0 : this.index - 1; | ||
|
||
while (++iter < count) { | ||
if (iter + index === total) { | ||
index = total - iter; | ||
} | ||
destination.next(ring[iter + index]); | ||
} | ||
destination.complete(); | ||
} | ||
} |