-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(distinct): add distinct operator
- Loading branch information
Showing
5 changed files
with
352 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.distinct()', function () { | ||
it('should distinguish between values', function () { | ||
var e1 = hot('--a--a--a--b--b--a--|'); | ||
var e1subs = '^ !'; | ||
var expected = '--a--------b--------|'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should distinguish between values and does not completes', function () { | ||
var e1 = hot('--a--a--a--b--b--a-'); | ||
var e1subs = '^ '; | ||
var expected = '--a--------b-------'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should not completes if source never completes', function () { | ||
var e1 = cold('-'); | ||
var e1subs = '^'; | ||
var expected = '-'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should not completes if source does not completes', function () { | ||
var e1 = hot('-'); | ||
var e1subs = '^'; | ||
var expected = '-'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should complete if source is empty', function () { | ||
var e1 = cold('|'); | ||
var e1subs = '(^!)'; | ||
var expected = '|'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should complete if source does not emit', function () { | ||
var e1 = hot('------|'); | ||
var e1subs = '^ !'; | ||
var expected = '------|'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should emit if source emits single element only', function () { | ||
var e1 = hot('--a--|'); | ||
var e1subs = '^ !'; | ||
var expected = '--a--|'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should emit if source is scalar', function () { | ||
var e1 = Observable.of('a'); | ||
var expected = '(a|)'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
}); | ||
|
||
it('should raises error if source raises error', function () { | ||
var e1 = hot('--a--a--#'); | ||
var e1subs = '^ !'; | ||
var expected = '--a-----#'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should raises error if source throws', function () { | ||
var e1 = cold('#'); | ||
var e1subs = '(^!)'; | ||
var expected = '#'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should not omit if source elements are all different', function () { | ||
var e1 = hot('--a--b--c--d--e--f--|'); | ||
var e1subs = '^ !'; | ||
var expected = '--a--b--c--d--e--f--|'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should allow unsubscribing early and explicitly', function () { | ||
var e1 = hot('--a--b--b--d--a--f--|'); | ||
var e1subs = '^ ! '; | ||
var expected = '--a--b----- '; | ||
var unsub = ' ! '; | ||
|
||
var result = e1.distinct(); | ||
|
||
expectObservable(result, unsub).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should not break unsubscription chains when unsubscribed explicitly', function () { | ||
var e1 = hot('--a--b--b--d--a--f--|'); | ||
var e1subs = '^ ! '; | ||
var expected = '--a--b----- '; | ||
var unsub = ' ! '; | ||
|
||
var result = e1 | ||
.mergeMap(function (x) { return Observable.of(x); }) | ||
.distinct() | ||
.mergeMap(function (x) { return Observable.of(x); }); | ||
|
||
expectObservable(result, unsub).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should emit once if source elements are all same', function () { | ||
var e1 = hot('--a--a--a--a--a--a--|'); | ||
var e1subs = '^ !'; | ||
var expected = '--a-----------------|'; | ||
|
||
expectObservable(e1.distinct()).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should emit once if comparer returns true always regardless of source emits', function () { | ||
var e1 = hot('--a--b--c--d--e--f--|'); | ||
var e1subs = '^ !'; | ||
var expected = '--a-----------------|'; | ||
|
||
expectObservable(e1.distinct(function () { return true; })).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should emit all if comparer returns false always regardless of source emits', function () { | ||
var e1 = hot('--a--a--a--a--a--a--|'); | ||
var e1subs = '^ !'; | ||
var expected = '--a--a--a--a--a--a--|'; | ||
|
||
expectObservable(e1.distinct(function () { return false; })).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should distinguish values by selector', function () { | ||
var e1 = hot('--a--b--c--d--e--f--|', {a: 1, b: 2, c: 3, d: 4, e: 5, f: 6}); | ||
var e1subs = '^ !'; | ||
var expected = '--a-----c-----e-----|'; | ||
var selector = function (x, y) { | ||
return y % 2 === 0; | ||
}; | ||
|
||
expectObservable(e1.distinct(selector)).toBe(expected, {a: 1, c: 3, e: 5}); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should raises error when comparer throws', function () { | ||
var e1 = hot('--a--b--c--d--e--f--|'); | ||
var e1subs = '^ ! '; | ||
var expected = '--a--b--c--# '; | ||
var selector = function (x, y) { | ||
if (y === 'd') { | ||
throw 'error'; | ||
} | ||
return x === y; | ||
}; | ||
|
||
expectObservable(e1.distinct(selector)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should support a flushing stream', function () { | ||
var e1 = hot('--a--b--a--b--a--b--|'); | ||
var e1subs = '^ !'; | ||
var e2 = hot('-----------x--------|'); | ||
var e2subs = '^ !'; | ||
var expected = '--a--b--------a--b--|'; | ||
var selector = function (x, y) { | ||
return x === y; | ||
}; | ||
|
||
expectObservable(e1.distinct(selector, e2)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
expectSubscriptions(e2.subscriptions).toBe(e2subs); | ||
}); | ||
|
||
it('should unsubscribe from the flushing stream when the main stream is unsubbed', function () { | ||
var e1 = hot('--a--b--a--b--a--b--|'); | ||
var e1subs = '^ ! '; | ||
var e2 = hot('-----------x--------|'); | ||
var e2subs = '^ ! '; | ||
var unsub = ' ! '; | ||
var expected = '--a--b------'; | ||
var selector = function (x, y) { | ||
return x === y; | ||
}; | ||
|
||
expectObservable(e1.distinct(selector, e2), unsub).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
expectSubscriptions(e2.subscriptions).toBe(e2subs); | ||
}); | ||
|
||
it('should allow opting in to default comparator with flush', function () { | ||
var e1 = hot('--a--b--a--b--a--b--|'); | ||
var e1subs = '^ !'; | ||
var e2 = hot('-----------x--------|'); | ||
var e2subs = '^ !'; | ||
var expected = '--a--b--------a--b--|'; | ||
|
||
expectObservable(e1.distinct(null, e2)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
expectSubscriptions(e2.subscriptions).toBe(e2subs); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/** | ||
* Everything in this file is generated by the 'tools/generate-operator-patches.ts' script. | ||
* Any manual edits to this file will be lost next time the script is run. | ||
**/ | ||
import {Observable} from '../../Observable'; | ||
import {distinct} from '../../operator/distinct'; | ||
import {KitchenSinkOperators} from '../../Rx.KitchenSink'; | ||
|
||
const observableProto = (<KitchenSinkOperators<any>>Observable.prototype); | ||
observableProto.distinct = distinct; | ||
|
||
export var _void: void; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import {Observable} from '../Observable'; | ||
import {Operator} from '../Operator'; | ||
import {Subscriber} from '../Subscriber'; | ||
import {Subscription} from '../Subscription'; | ||
import {tryCatch} from '../util/tryCatch'; | ||
import {errorObject} from '../util/errorObject'; | ||
|
||
/** | ||
* Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. | ||
* If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted. | ||
* If a comparator function is not provided, an equality check is used by default. | ||
* As the internal HashSet of this operator grows larger and larger, care should be taken in the domain of inputs this operator may see. | ||
* An optional paramter is also provided such that an Observable can be provided to queue the internal HashSet to flush the values it holds. | ||
* @param {function} [compare] optional comparison function called to test if an item is distinct from previous items in the source. | ||
* @param {Observable} [flushes] optional Observable for flushing the internal HashSet of the operator. | ||
* @returns {Observable} an Observable that emits items from the source Observable with distinct values. | ||
*/ | ||
export function distinct<T>(compare?: (x: T, y: T) => boolean, flushes?: Observable<any>) { | ||
return this.lift(new DistinctOperator(compare, flushes)); | ||
} | ||
|
||
class DistinctOperator<T, R> implements Operator<T, R> { | ||
constructor(private compare: (x: T, y: T) => boolean, private flushes: Observable<any>) { | ||
} | ||
|
||
call(subscriber: Subscriber<T>): Subscriber<T> { | ||
return new DistinctSubscriber(subscriber, this.compare, this.flushes); | ||
} | ||
} | ||
|
||
class HashSet<T> { | ||
private set: Array<T> = []; | ||
|
||
constructor(private compare: (x: T, y: T) => boolean) { | ||
} | ||
|
||
private has(item: T): boolean { | ||
for (var i = 0; i < this.set.length; i++) { | ||
if (this.compare(this.set[i], item)) { | ||
return true; | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
|
||
push(item: T): boolean { | ||
if (this.has(item)) { | ||
return false; | ||
} else { | ||
this.set.push(item); | ||
return true; | ||
} | ||
} | ||
|
||
flush(): void { | ||
this.set = []; | ||
} | ||
} | ||
|
||
class DistinctSubscriber<T> extends Subscriber<T> { | ||
private hashSet: HashSet<T>; | ||
private flushSubscription: Subscription; | ||
|
||
constructor(destination: Subscriber<T>, compare: (x: T, y: T) => boolean, flushes: Observable<any>) { | ||
super(destination); | ||
if (typeof compare === 'function') { | ||
this.compare = compare; | ||
} | ||
this.hashSet = new HashSet(this.compare); | ||
|
||
if (flushes) { | ||
this.flushSubscription = flushes.subscribe(() => this.hashSet.flush()); | ||
} | ||
} | ||
|
||
private compare(x: T, y: T): boolean { | ||
return x === y; | ||
} | ||
|
||
private disposeFlushSubscription(): void { | ||
if (this.flushSubscription) { | ||
this.flushSubscription.unsubscribe(); | ||
} | ||
} | ||
|
||
protected _next(value: T): void { | ||
let result: any = false; | ||
|
||
result = tryCatch(this.hashSet.push.bind(this.hashSet))(value); | ||
if (result === errorObject) { | ||
this.destination.error(errorObject.e); | ||
return; | ||
} | ||
|
||
if (result) { | ||
this.destination.next(value); | ||
} | ||
} | ||
|
||
protected _complete(): void { | ||
this.disposeFlushSubscription(); | ||
super._complete(); | ||
} | ||
|
||
unsubscribe(): void { | ||
this.disposeFlushSubscription(); | ||
super.unsubscribe(); | ||
} | ||
|
||
} |