Skip to content

Commit

Permalink
feat(distinct): add distinct operator
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwoo authored and benlesh committed Jan 27, 2016
1 parent 9d5a01c commit 94a034d
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 0 deletions.
1 change: 1 addition & 0 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
- [debounce](function/index.html#static-function-debounce)
- [defaultIfEmpty](function/index.html#static-function-defaultIfEmpty)
- [delay](function/index.html#static-function-delay)
- [distinct](function/index.html#static-function-distinct)
- [distinctUntilChanged](function/index.html#static-function-distinctUntilChanged)
- [distinctUntilKeyChanged](function/index.html#static-function-distinctUntilKeyChanged)
- [do](function/index.html#static-function-do)
Expand Down
226 changes: 226 additions & 0 deletions spec/operators/distinct-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.distinct()', function () {
it('should distinguish between values', function () {
var e1 = hot('--a--a--a--b--b--a--|');
var e1subs = '^ !';
var expected = '--a--------b--------|';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should distinguish between values and does not completes', function () {
var e1 = hot('--a--a--a--b--b--a-');
var e1subs = '^ ';
var expected = '--a--------b-------';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not completes if source never completes', function () {
var e1 = cold('-');
var e1subs = '^';
var expected = '-';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not completes if source does not completes', function () {
var e1 = hot('-');
var e1subs = '^';
var expected = '-';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should complete if source is empty', function () {
var e1 = cold('|');
var e1subs = '(^!)';
var expected = '|';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should complete if source does not emit', function () {
var e1 = hot('------|');
var e1subs = '^ !';
var expected = '------|';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit if source emits single element only', function () {
var e1 = hot('--a--|');
var e1subs = '^ !';
var expected = '--a--|';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit if source is scalar', function () {
var e1 = Observable.of('a');
var expected = '(a|)';

expectObservable(e1.distinct()).toBe(expected);
});

it('should raises error if source raises error', function () {
var e1 = hot('--a--a--#');
var e1subs = '^ !';
var expected = '--a-----#';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raises error if source throws', function () {
var e1 = cold('#');
var e1subs = '(^!)';
var expected = '#';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not omit if source elements are all different', function () {
var e1 = hot('--a--b--c--d--e--f--|');
var e1subs = '^ !';
var expected = '--a--b--c--d--e--f--|';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow unsubscribing early and explicitly', function () {
var e1 = hot('--a--b--b--d--a--f--|');
var e1subs = '^ ! ';
var expected = '--a--b----- ';
var unsub = ' ! ';

var result = e1.distinct();

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains when unsubscribed explicitly', function () {
var e1 = hot('--a--b--b--d--a--f--|');
var e1subs = '^ ! ';
var expected = '--a--b----- ';
var unsub = ' ! ';

var result = e1
.mergeMap(function (x) { return Observable.of(x); })
.distinct()
.mergeMap(function (x) { return Observable.of(x); });

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit once if source elements are all same', function () {
var e1 = hot('--a--a--a--a--a--a--|');
var e1subs = '^ !';
var expected = '--a-----------------|';

expectObservable(e1.distinct()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit once if comparer returns true always regardless of source emits', function () {
var e1 = hot('--a--b--c--d--e--f--|');
var e1subs = '^ !';
var expected = '--a-----------------|';

expectObservable(e1.distinct(function () { return true; })).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit all if comparer returns false always regardless of source emits', function () {
var e1 = hot('--a--a--a--a--a--a--|');
var e1subs = '^ !';
var expected = '--a--a--a--a--a--a--|';

expectObservable(e1.distinct(function () { return false; })).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should distinguish values by selector', function () {
var e1 = hot('--a--b--c--d--e--f--|', {a: 1, b: 2, c: 3, d: 4, e: 5, f: 6});
var e1subs = '^ !';
var expected = '--a-----c-----e-----|';
var selector = function (x, y) {
return y % 2 === 0;
};

expectObservable(e1.distinct(selector)).toBe(expected, {a: 1, c: 3, e: 5});
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raises error when comparer throws', function () {
var e1 = hot('--a--b--c--d--e--f--|');
var e1subs = '^ ! ';
var expected = '--a--b--c--# ';
var selector = function (x, y) {
if (y === 'd') {
throw 'error';
}
return x === y;
};

expectObservable(e1.distinct(selector)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should support a flushing stream', function () {
var e1 = hot('--a--b--a--b--a--b--|');
var e1subs = '^ !';
var e2 = hot('-----------x--------|');
var e2subs = '^ !';
var expected = '--a--b--------a--b--|';
var selector = function (x, y) {
return x === y;
};

expectObservable(e1.distinct(selector, e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should unsubscribe from the flushing stream when the main stream is unsubbed', function () {
var e1 = hot('--a--b--a--b--a--b--|');
var e1subs = '^ ! ';
var e2 = hot('-----------x--------|');
var e2subs = '^ ! ';
var unsub = ' ! ';
var expected = '--a--b------';
var selector = function (x, y) {
return x === y;
};

expectObservable(e1.distinct(selector, e2), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should allow opting in to default comparator with flush', function () {
var e1 = hot('--a--b--a--b--a--b--|');
var e1subs = '^ !';
var e2 = hot('-----------x--------|');
var e2subs = '^ !';
var expected = '--a--b--------a--b--|';

expectObservable(e1.distinct(null, e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
2 changes: 2 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {Scheduler as IScheduler} from './Scheduler';
export interface KitchenSinkOperators<T> extends CoreOperators<T> {
isEmpty?: () => Observable<boolean>;
elementAt?: (index: number, defaultValue?: any) => Observable<T>;
distinct?: (compare?: (x: T, y: T) => boolean, flushes?: Observable<any>) => Observable<T>;
distinctUntilKeyChanged?: (key: string, compare?: (x: any, y: any) => boolean) => Observable<T>;
find?: (predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any) => Observable<T>;
findIndex?: (predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any) => Observable<number>;
Expand Down Expand Up @@ -63,6 +64,7 @@ import './add/operator/debounce';
import './add/operator/debounceTime';
import './add/operator/defaultIfEmpty';
import './add/operator/delay';
import './add/operator/distinct';
import './add/operator/distinctUntilChanged';
import './add/operator/distinctUntilKeyChanged';
import './add/operator/do';
Expand Down
12 changes: 12 additions & 0 deletions src/add/operator/distinct.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* Everything in this file is generated by the 'tools/generate-operator-patches.ts' script.
* Any manual edits to this file will be lost next time the script is run.
**/
import {Observable} from '../../Observable';
import {distinct} from '../../operator/distinct';
import {KitchenSinkOperators} from '../../Rx.KitchenSink';

const observableProto = (<KitchenSinkOperators<any>>Observable.prototype);
observableProto.distinct = distinct;

export var _void: void;
111 changes: 111 additions & 0 deletions src/operator/distinct.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

/**
* Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
* If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.
* If a comparator function is not provided, an equality check is used by default.
* As the internal HashSet of this operator grows larger and larger, care should be taken in the domain of inputs this operator may see.
* An optional paramter is also provided such that an Observable can be provided to queue the internal HashSet to flush the values it holds.
* @param {function} [compare] optional comparison function called to test if an item is distinct from previous items in the source.
* @param {Observable} [flushes] optional Observable for flushing the internal HashSet of the operator.
* @returns {Observable} an Observable that emits items from the source Observable with distinct values.
*/
export function distinct<T>(compare?: (x: T, y: T) => boolean, flushes?: Observable<any>) {
return this.lift(new DistinctOperator(compare, flushes));
}

class DistinctOperator<T, R> implements Operator<T, R> {
constructor(private compare: (x: T, y: T) => boolean, private flushes: Observable<any>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new DistinctSubscriber(subscriber, this.compare, this.flushes);
}
}

class HashSet<T> {
private set: Array<T> = [];

constructor(private compare: (x: T, y: T) => boolean) {
}

private has(item: T): boolean {
for (var i = 0; i < this.set.length; i++) {
if (this.compare(this.set[i], item)) {
return true;
}
}

return false;
}

push(item: T): boolean {
if (this.has(item)) {
return false;
} else {
this.set.push(item);
return true;
}
}

flush(): void {
this.set = [];
}
}

class DistinctSubscriber<T> extends Subscriber<T> {
private hashSet: HashSet<T>;
private flushSubscription: Subscription;

constructor(destination: Subscriber<T>, compare: (x: T, y: T) => boolean, flushes: Observable<any>) {
super(destination);
if (typeof compare === 'function') {
this.compare = compare;
}
this.hashSet = new HashSet(this.compare);

if (flushes) {
this.flushSubscription = flushes.subscribe(() => this.hashSet.flush());
}
}

private compare(x: T, y: T): boolean {
return x === y;
}

private disposeFlushSubscription(): void {
if (this.flushSubscription) {
this.flushSubscription.unsubscribe();
}
}

protected _next(value: T): void {
let result: any = false;

result = tryCatch(this.hashSet.push.bind(this.hashSet))(value);
if (result === errorObject) {
this.destination.error(errorObject.e);
return;
}

if (result) {
this.destination.next(value);
}
}

protected _complete(): void {
this.disposeFlushSubscription();
super._complete();
}

unsubscribe(): void {
this.disposeFlushSubscription();
super.unsubscribe();
}

}

0 comments on commit 94a034d

Please sign in to comment.