diff --git a/perf/micro/current-thread-scheduler/operators/skiplast.js b/perf/micro/current-thread-scheduler/operators/skiplast.js new file mode 100644 index 0000000000..a885498e17 --- /dev/null +++ b/perf/micro/current-thread-scheduler/operators/skiplast.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldSkipLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.currentThread).skipLast(50); + var newSkipLastWithImmediateScheduler = RxNew.Observable.range(0, 500, RxNew.Scheduler.queue).skipLast(50); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old skipLast with current thread scheduler', function () { + oldSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new skipLast with current thread scheduler', function () { + newSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/perf/micro/immediate-scheduler/operators/skiplast.js b/perf/micro/immediate-scheduler/operators/skiplast.js new file mode 100644 index 0000000000..390f076561 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/skiplast.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldSkipLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.immediate).skipLast(50); + var newSkipLastWithImmediateScheduler = RxNew.Observable.range(0, 500).skipLast(50); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old skipLast with immediate scheduler', function () { + oldSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new skipLast with immediate scheduler', function () { + newSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/spec/operators/skipLast-spec.ts b/spec/operators/skipLast-spec.ts new file mode 100644 index 0000000000..e3688b71ec --- /dev/null +++ b/spec/operators/skipLast-spec.ts @@ -0,0 +1,155 @@ +import {expect} from 'chai'; +import * as Rx from '../../dist/cjs/Rx'; +declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; + +const Observable = Rx.Observable; + +/** @test {takeLast} */ +describe('Observable.prototype.skipLast', () => { + asDiagram('skipLast(2)')('should skip two values of an observable with many values', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '-------------a---b--|'; + + expectObservable(e1.skipLast(2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip last three values', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '-----------------a--|'; + + expectObservable(e1.skipLast(3)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip all values when trying to take larger then source', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '--------------------|'; + + expectObservable(e1.skipLast(5)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip all element when try to take exact', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '--------------------|'; + + expectObservable(e1.skipLast(4)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not skip any values', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '--a-----b----c---d--|'; + + expectObservable(e1.skipLast(0)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should work with empty', () => { + const e1 = cold('|'); + const e1subs = '(^!)'; + const expected = '|'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should go on forever on never', () => { + const e1 = cold('-'); + const e1subs = '^'; + const expected = '-'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip one value from an observable with one value', () => { + const e1 = hot('---(a|)'); + const e1subs = '^ ! '; + const expected = '---| '; + + expectObservable(e1.skipLast(1)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip one value from an observable with many values', () => { + const e1 = hot('--a--^--b----c---d--|'); + const e1subs = '^ !'; + const expected = '--------b---c--|'; + + expectObservable(e1.skipLast(1)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should work with empty and early emission', () => { + const e1 = hot('--a--^----|'); + const e1subs = '^ !'; + const expected = '-----|'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should propagate error from the source observable', () => { + const e1 = hot('---^---#', null, 'too bad'); + const e1subs = '^ !'; + const expected = '----#'; + + expectObservable(e1.skipLast(42)).toBe(expected, null, 'too bad'); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should propagate error from an observable with values', () => { + const e1 = hot('---^--a--b--#'); + const e1subs = '^ !'; + const expected = '---------#'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow unsubscribing explicitly and early', () => { + const e1 = hot('---^--a--b-----c--d--e--|'); + const unsub = ' ! '; + const e1subs = '^ ! '; + const expected = '---------- '; + + expectObservable(e1.skipLast(42), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should work with throw', () => { + const e1 = cold('#'); + const e1subs = '(^!)'; + const expected = '#'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should throw if total is less than zero', () => { + expect(() => { Observable.range(0, 10).skipLast(-1); }) + .to.throw(Rx.ArgumentOutOfRangeError); + }); + + it('should not break unsubscription chain when unsubscribed explicitly', () => { + const e1 = hot('---^--a--b-----c--d--e--|'); + const unsub = ' ! '; + const e1subs = '^ ! '; + const expected = '---------- '; + + const result = e1 + .mergeMap((x: string) => Observable.of(x)) + .skipLast(42) + .mergeMap((x: string) => Observable.of(x)); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); +}); \ No newline at end of file diff --git a/src/Rx.ts b/src/Rx.ts index b04c99c505..a1457e3402 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -112,6 +112,7 @@ import './add/operator/sequenceEqual'; import './add/operator/share'; import './add/operator/single'; import './add/operator/skip'; +import './add/operator/skipLast'; import './add/operator/skipUntil'; import './add/operator/skipWhile'; import './add/operator/startWith'; diff --git a/src/add/operator/skipLast.ts b/src/add/operator/skipLast.ts new file mode 100644 index 0000000000..31484fe523 --- /dev/null +++ b/src/add/operator/skipLast.ts @@ -0,0 +1,10 @@ +import { Observable } from '../../Observable'; +import { skipLast } from '../../operator/skipLast'; + +Observable.prototype.skipLast = skipLast; + +declare module '../../Observable' { + interface Observable { + skipLast: typeof skipLast; + } +} \ No newline at end of file diff --git a/src/operator/skipLast.ts b/src/operator/skipLast.ts new file mode 100644 index 0000000000..55fdc0ef09 --- /dev/null +++ b/src/operator/skipLast.ts @@ -0,0 +1,88 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; +import { Observable } from '../Observable'; +import { TeardownLogic } from '../Subscription'; + +/** + * Skip the last `count` values emitted by the source Observable. + * + * + * + * `skipLast` returns an Observable that accumulates a queue with a length + * enough to store the first `count` values. As more values are received, + * values are taken from the front of the queue and produced on the result + * sequence. This causes values to be delayed. + * + * @example Skip the last 2 values of an Observable with many values + * var many = Rx.Observable.range(1, 5); + * var skipLastTwo = many.skipLast(2); + * skipLastTwo.subscribe(x => console.log(x)); + * + * // Results in: + * // 1 2 3 + * + * @see {@link skip} + * @see {@link skipUntil} + * @see {@link skipWhile} + * @see {@link take} + * + * @throws {ArgumentOutOfRangeError} When using `skipLast(i)`, it throws + * ArgumentOutOrRangeError if `i < 0`. + * + * @param {number} count Number of elements to skip from the end of the source Observable. + * @returns {Observable} An Observable that skips the last count values + * emitted by the source Observable. + * @method skipLast + * @owner Observable + */ +export function skipLast(this: Observable, count: number): Observable { + return this.lift(new SkipLastOperator(count)); +} + +class SkipLastOperator implements Operator { + constructor(private total: number) { + if (this.total < 0) { + throw new ArgumentOutOfRangeError; + } + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + if (this.total === 0) { + // If we don't want to skip any values then just subscribe + // to Subscriber without any further logic. + return source.subscribe(new Subscriber(subscriber)); + } else { + return source.subscribe(new SkipLastSubscriber(subscriber, this.total)); + } + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class SkipLastSubscriber extends Subscriber { + private ring: T[] = []; + private count: number = 0; + + constructor(destination: Subscriber, private total: number) { + super(destination); + } + + protected _next(value: T): void { + const len = this.ring.length; + + if (len < this.total) { + this.ring.push(value); + this.count++; + } else { + const idx = this.count++ % this.total; + const oldValue = this.ring[idx]; + + this.ring[idx] = value; + this.destination.next(oldValue); + } + } +} \ No newline at end of file