-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(takeLast): add higher-order lettable version of takeLast
- Loading branch information
Showing
3 changed files
with
113 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; | ||
import { EmptyObservable } from '../observable/EmptyObservable'; | ||
import { Observable } from '../Observable'; | ||
import { TeardownLogic } from '../Subscription'; | ||
import { OperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Emits only the last `count` values emitted by the source Observable. | ||
* | ||
* <span class="informal">Remembers the latest `count` values, then emits those | ||
* only when the source completes.</span> | ||
* | ||
* <img src="./img/takeLast.png" width="100%"> | ||
* | ||
* `takeLast` returns an Observable that emits at most the last `count` values | ||
* emitted by the source Observable. If the source emits fewer than `count` | ||
* values then all of its values are emitted. This operator must wait until the | ||
* `complete` notification emission from the source in order to emit the `next` | ||
* values on the output Observable, because otherwise it is impossible to know | ||
* whether or not more values will be emitted on the source. For this reason, | ||
* all values are emitted synchronously, followed by the complete notification. | ||
* | ||
* @example <caption>Take the last 3 values of an Observable with many values</caption> | ||
* var many = Rx.Observable.range(1, 100); | ||
* var lastThree = many.takeLast(3); | ||
* lastThree.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link take} | ||
* @see {@link takeUntil} | ||
* @see {@link takeWhile} | ||
* @see {@link skip} | ||
* | ||
* @throws {ArgumentOutOfRangeError} When using `takeLast(i)`, it delivers an | ||
* ArgumentOutOrRangeError to the Observer's `error` callback if `i < 0`. | ||
* | ||
* @param {number} count The maximum number of values to emit from the end of | ||
* the sequence of values emitted by the source Observable. | ||
* @return {Observable<T>} An Observable that emits at most the last count | ||
* values emitted by the source Observable. | ||
* @method takeLast | ||
* @owner Observable | ||
*/ | ||
export function takeLast<T>(count: number): OperatorFunction<T, T> { | ||
return function takeLastOperatorFunction(source: Observable<T>): Observable<T> { | ||
if (count === 0) { | ||
return new EmptyObservable<T>(); | ||
} else { | ||
return source.lift(new TakeLastOperator(count)); | ||
} | ||
}; | ||
} | ||
|
||
class TakeLastOperator<T> implements Operator<T, T> { | ||
constructor(private total: number) { | ||
if (this.total < 0) { | ||
throw new ArgumentOutOfRangeError; | ||
} | ||
} | ||
|
||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
return source.subscribe(new TakeLastSubscriber(subscriber, this.total)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class TakeLastSubscriber<T> extends Subscriber<T> { | ||
private ring: Array<T> = new Array(); | ||
private count: number = 0; | ||
|
||
constructor(destination: Subscriber<T>, private total: number) { | ||
super(destination); | ||
} | ||
|
||
protected _next(value: T): void { | ||
const ring = this.ring; | ||
const total = this.total; | ||
const count = this.count++; | ||
|
||
if (ring.length < total) { | ||
ring.push(value); | ||
} else { | ||
const index = count % total; | ||
ring[index] = value; | ||
} | ||
} | ||
|
||
protected _complete(): void { | ||
const destination = this.destination; | ||
let count = this.count; | ||
|
||
if (count > 0) { | ||
const total = this.count >= this.total ? this.total : this.count; | ||
const ring = this.ring; | ||
|
||
for (let i = 0; i < total; i++) { | ||
const idx = (count++) % total; | ||
destination.next(ring[idx]); | ||
} | ||
} | ||
|
||
destination.complete(); | ||
} | ||
} |