Skip to content

Commit

Permalink
feat(operator): add elementAt operator
Browse files Browse the repository at this point in the history
- adds ArgumentOutOfRangeError error type
- adds elementAt operator
  • Loading branch information
kwonoj authored and benlesh committed Sep 16, 2015
1 parent d65e7ea commit cd562c4
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 2 deletions.
20 changes: 20 additions & 0 deletions perf/micro/immediate-scheduler/operators/elementat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var oldElementAtWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).elementAt(5);
var newElementAtWithImmediateScheduler = RxNew.Observable.range(0, 25).elementAt(5);

return suite
.add('old elementAt with immediate scheduler', function () {
oldElementAtWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new elementAt with immediate scheduler', function () {
newElementAtWithImmediateScheduler.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
47 changes: 47 additions & 0 deletions spec/operators/elementat-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* globals describe, it, expect, hot, cold, expectObservable */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.elementAt', function() {
it("should return first element by zero-based index", function() {
var source = hot('--a--b--c--|');
var expected = '--(a|)';

expectObservable(source.elementAt(0)).toBe(expected);
});

it("should return non-first element by zero-based index", function() {
var source = hot('--a--b--c--d--e--f--|');
var expected = '-----------(d|)';

expectObservable(source.elementAt(3)).toBe(expected);
});

it("should return last element by zero-based index", function() {
var source = hot('--a--b--c--|');
var expected = '--------(c|)';

expectObservable(source.elementAt(2)).toBe(expected);
});

it("should throw if index is smaller than zero", function() {
expect(function() { Observable.range(0,10).elementAt(-1); })
.toThrow(new Rx.ArgumentOutOfRangeError);
});

it("should raise error if index is out of range but does not have default value", function() {
var source = hot('--a--|');
var expected = '-----#';

expectObservable(source.elementAt(3))
.toBe(expected, null, new Rx.ArgumentOutOfRangeError);
});

it("should return default value if index is out of range", function() {
var source = hot('--a--|');
var expected = '-----(x|)';
var defaultValue = '42';

expectObservable(source.elementAt(3, defaultValue)).toBe(expected, { x: defaultValue });
});
});
3 changes: 2 additions & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export default class Observable<T> {
startWith: <T>(x: T) => Observable<T>;
debounce: <R>(dueTime: number, scheduler?: Scheduler) => Observable<R>;

elementAt: (index: number, defaultValue?: any) => Observable<T>;
last: (predicate?: (value: T, index:number) => boolean, thisArg?: any, defaultValue?: any) => Observable<T>;

filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
Expand Down Expand Up @@ -233,4 +234,4 @@ export default class Observable<T> {
finally: (ensure: () => void, thisArg?: any) => Observable<T>;
timeout: <T>(due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith: <T>(due: number|Date, withObservable: Observable<any>, scheduler?: Scheduler) => Observable<T>;
}
}
6 changes: 5 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Subscriber from './Subscriber';
import Subscription from './Subscription';
import Notification from './Notification';
import EmptyError from './util/EmptyError';
import ArgumentOutOfRangeError from './util/ArgumentOutOfRangeError';

import ReplaySubject from './subjects/ReplaySubject';
import BehaviorSubject from './subjects/BehaviorSubject';
Expand Down Expand Up @@ -102,6 +103,7 @@ import take from './operators/take';
import skip from './operators/skip';
import skipUntil from './operators/skipUntil';
import takeUntil from './operators/takeUntil';
import elementAt from './operators/elementAt';
import filter from './operators/filter';
import distinctUntilChanged from './operators/distinctUntilChanged';
import distinctUntilKeyChanged from './operators/distinctUntilKeyChanged';
Expand All @@ -110,6 +112,7 @@ observableProto.take = take;
observableProto.skip = skip;
observableProto.takeUntil = takeUntil;
observableProto.skipUntil = skipUntil;
observableProto.elementAt = elementAt;
observableProto.filter = filter;
observableProto.distinctUntilChanged = distinctUntilChanged;
observableProto.distinctUntilKeyChanged = distinctUntilKeyChanged;
Expand Down Expand Up @@ -237,5 +240,6 @@ export {
Notification,
VirtualTimeScheduler,
TestScheduler,
EmptyError
EmptyError,
ArgumentOutOfRangeError
};
47 changes: 47 additions & 0 deletions src/operators/elementAt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import ArgumentOutOfRangeError from '../util/ArgumentOutOfRangeError';

export default function elementAt(index: number, defaultValue?: any) {
return this.lift(new ElementAtOperator(index, defaultValue));
}

class ElementAtOperator<T, R> implements Operator<T,R> {

constructor(private index: number, private defaultValue?: any) {
if (index < 0) {
throw new ArgumentOutOfRangeError;
}
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new ElementAtSubscriber(subscriber, this.index, this.defaultValue);
}
}

class ElementAtSubscriber<T, R> extends Subscriber<T> {

constructor(destination: Subscriber<T>, private index: number, private defaultValue?: any) {
super(destination);
}

_next(x) {
if (this.index-- === 0) {
this.destination.next(x);
this.destination.complete();
}
}

_complete() {
const destination = this.destination;
if (this.index >= 0) {
if(typeof this.defaultValue !== 'undefined') {
destination.next(this.defaultValue);
} else {
destination.error(new ArgumentOutOfRangeError);
}
}
destination.complete();
}
}
4 changes: 4 additions & 0 deletions src/util/ArgumentOutOfRangeError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export default class ArgumentOutOfRangeError implements Error {
name = 'ArgumentOutOfRangeError';
message = 'argument out of range';
}

0 comments on commit cd562c4

Please sign in to comment.