diff --git a/src/operator/sequenceEqual.ts b/src/operator/sequenceEqual.ts index ed4444405b..f6babf02b6 100644 --- a/src/operator/sequenceEqual.ts +++ b/src/operator/sequenceEqual.ts @@ -1,9 +1,5 @@ -import { Operator } from '../Operator'; -import { Observer } from '../Observer'; import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { tryCatch } from '../util/tryCatch'; -import { errorObject } from '../util/errorObject'; +import { sequenceEqual as higherOrder } from '../operators/sequenceEqual'; /** * Compares all values of two observables in sequence using an optional comparor function @@ -59,103 +55,5 @@ import { errorObject } from '../util/errorObject'; */ export function sequenceEqual(this: Observable, compareTo: Observable, comparor?: (a: T, b: T) => boolean): Observable { - return this.lift(new SequenceEqualOperator(compareTo, comparor)); -} - -export class SequenceEqualOperator implements Operator { - constructor(private compareTo: Observable, - private comparor: (a: T, b: T) => boolean) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new SequenceEqualSubscriber(subscriber, this.compareTo, this.comparor)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class SequenceEqualSubscriber extends Subscriber { - private _a: T[] = []; - private _b: T[] = []; - private _oneComplete = false; - - constructor(destination: Observer, - private compareTo: Observable, - private comparor: (a: T, b: T) => boolean) { - super(destination); - this.add(compareTo.subscribe(new SequenceEqualCompareToSubscriber(destination, this))); - } - - protected _next(value: T): void { - if (this._oneComplete && this._b.length === 0) { - this.emit(false); - } else { - this._a.push(value); - this.checkValues(); - } - } - - public _complete(): void { - if (this._oneComplete) { - this.emit(this._a.length === 0 && this._b.length === 0); - } else { - this._oneComplete = true; - } - } - - checkValues() { - const { _a, _b, comparor } = this; - while (_a.length > 0 && _b.length > 0) { - let a = _a.shift(); - let b = _b.shift(); - let areEqual = false; - if (comparor) { - areEqual = tryCatch(comparor)(a, b); - if (areEqual === errorObject) { - this.destination.error(errorObject.e); - } - } else { - areEqual = a === b; - } - if (!areEqual) { - this.emit(false); - } - } - } - - emit(value: boolean) { - const { destination } = this; - destination.next(value); - destination.complete(); - } - - nextB(value: T) { - if (this._oneComplete && this._a.length === 0) { - this.emit(false); - } else { - this._b.push(value); - this.checkValues(); - } - } -} - -class SequenceEqualCompareToSubscriber extends Subscriber { - constructor(destination: Observer, private parent: SequenceEqualSubscriber) { - super(destination); - } - - protected _next(value: T): void { - this.parent.nextB(value); - } - - protected _error(err: any): void { - this.parent.error(err); - } - - protected _complete(): void { - this.parent._complete(); - } + return higherOrder(compareTo, comparor)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 3b8ea26498..0ce5b907d9 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -61,6 +61,7 @@ export { refCount } from './refCount'; export { sample } from './sample'; export { sampleTime } from './sampleTime'; export { scan } from './scan'; +export { sequenceEqual } from './sequenceEqual'; export { subscribeOn } from './subscribeOn'; export { switchAll } from './switchAll'; export { switchMap } from './switchMap'; diff --git a/src/operators/sequenceEqual.ts b/src/operators/sequenceEqual.ts new file mode 100644 index 0000000000..3edf89e0dc --- /dev/null +++ b/src/operators/sequenceEqual.ts @@ -0,0 +1,163 @@ +import { Operator } from '../Operator'; +import { Observer } from '../Observer'; +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { tryCatch } from '../util/tryCatch'; +import { errorObject } from '../util/errorObject'; + +import { OperatorFunction } from '../interfaces'; + +/** + * Compares all values of two observables in sequence using an optional comparor function + * and returns an observable of a single boolean value representing whether or not the two sequences + * are equal. + * + * Checks to see of all values emitted by both observables are equal, in order. + * + * + * + * `sequenceEqual` subscribes to two observables and buffers incoming values from each observable. Whenever either + * observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom + * up; If any value pair doesn't match, the returned observable will emit `false` and complete. If one of the + * observables completes, the operator will wait for the other observable to complete; If the other + * observable emits before completing, the returned observable will emit `false` and complete. If one observable never + * completes or emits after the other complets, the returned observable will never complete. + * + * @example figure out if the Konami code matches + * var code = Rx.Observable.from([ + * "ArrowUp", + * "ArrowUp", + * "ArrowDown", + * "ArrowDown", + * "ArrowLeft", + * "ArrowRight", + * "ArrowLeft", + * "ArrowRight", + * "KeyB", + * "KeyA", + * "Enter" // no start key, clearly. + * ]); + * + * var keys = Rx.Observable.fromEvent(document, 'keyup') + * .map(e => e.code); + * var matches = keys.bufferCount(11, 1) + * .mergeMap( + * last11 => + * Rx.Observable.from(last11) + * .sequenceEqual(code) + * ); + * matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched)); + * + * @see {@link combineLatest} + * @see {@link zip} + * @see {@link withLatestFrom} + * + * @param {Observable} compareTo The observable sequence to compare the source sequence to. + * @param {function} [comparor] An optional function to compare each value pair + * @return {Observable} An Observable of a single boolean value representing whether or not + * the values emitted by both observables were equal in sequence. + * @method sequenceEqual + * @owner Observable + */ +export function sequenceEqual(compareTo: Observable, + comparor?: (a: T, b: T) => boolean): OperatorFunction { + return (source: Observable) => source.lift(new SequenceEqualOperator(compareTo, comparor)); +} + +export class SequenceEqualOperator implements Operator { + constructor(private compareTo: Observable, + private comparor: (a: T, b: T) => boolean) { + } + + call(subscriber: Subscriber, source: any): any { + return source.subscribe(new SequenceEqualSubscriber(subscriber, this.compareTo, this.comparor)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +export class SequenceEqualSubscriber extends Subscriber { + private _a: T[] = []; + private _b: T[] = []; + private _oneComplete = false; + + constructor(destination: Observer, + private compareTo: Observable, + private comparor: (a: T, b: T) => boolean) { + super(destination); + this.add(compareTo.subscribe(new SequenceEqualCompareToSubscriber(destination, this))); + } + + protected _next(value: T): void { + if (this._oneComplete && this._b.length === 0) { + this.emit(false); + } else { + this._a.push(value); + this.checkValues(); + } + } + + public _complete(): void { + if (this._oneComplete) { + this.emit(this._a.length === 0 && this._b.length === 0); + } else { + this._oneComplete = true; + } + } + + checkValues() { + const { _a, _b, comparor } = this; + while (_a.length > 0 && _b.length > 0) { + let a = _a.shift(); + let b = _b.shift(); + let areEqual = false; + if (comparor) { + areEqual = tryCatch(comparor)(a, b); + if (areEqual === errorObject) { + this.destination.error(errorObject.e); + } + } else { + areEqual = a === b; + } + if (!areEqual) { + this.emit(false); + } + } + } + + emit(value: boolean) { + const { destination } = this; + destination.next(value); + destination.complete(); + } + + nextB(value: T) { + if (this._oneComplete && this._a.length === 0) { + this.emit(false); + } else { + this._b.push(value); + this.checkValues(); + } + } +} + +class SequenceEqualCompareToSubscriber extends Subscriber { + constructor(destination: Observer, private parent: SequenceEqualSubscriber) { + super(destination); + } + + protected _next(value: T): void { + this.parent.nextB(value); + } + + protected _error(err: any): void { + this.parent.error(err); + } + + protected _complete(): void { + this.parent._complete(); + } +}