diff --git a/src/operator/scan.ts b/src/operator/scan.ts index 558f87667d..c3597f5631 100644 --- a/src/operator/scan.ts +++ b/src/operator/scan.ts @@ -1,6 +1,6 @@ -import { Operator } from '../Operator'; + import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; +import { scan as higherOrderScan } from '../operators'; /* tslint:disable:max-line-length */ export function scan(this: Observable, accumulator: (acc: T, value: T, index: number) => T, seed?: T): Observable; @@ -46,67 +46,8 @@ export function scan(this: Observable, accumulator: (acc: R, value: T, * @owner Observable */ export function scan(this: Observable, accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): Observable { - let hasSeed = false; - // providing a seed of `undefined` *should* be valid and trigger - // hasSeed! so don't use `seed !== undefined` checks! - // For this reason, we have to check it here at the original call site - // otherwise inside Operator/Subscriber we won't know if `undefined` - // means they didn't provide anything or if they literally provided `undefined` if (arguments.length >= 2) { - hasSeed = true; - } - - return this.lift(new ScanOperator(accumulator, seed, hasSeed)); -} - -class ScanOperator implements Operator { - constructor(private accumulator: (acc: R, value: T, index: number) => R, private seed?: T | R, private hasSeed: boolean = false) {} - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new ScanSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class ScanSubscriber extends Subscriber { - private index: number = 0; - - get seed(): T | R { - return this._seed; - } - - set seed(value: T | R) { - this.hasSeed = true; - this._seed = value; - } - - constructor(destination: Subscriber, private accumulator: (acc: R, value: T, index: number) => R, private _seed: T | R, - private hasSeed: boolean) { - super(destination); - } - - protected _next(value: T): void { - if (!this.hasSeed) { - this.seed = value; - this.destination.next(value); - } else { - return this._tryNext(value); - } - } - - private _tryNext(value: T): void { - const index = this.index++; - let result: any; - try { - result = this.accumulator(this.seed, value, index); - } catch (err) { - this.destination.error(err); - } - this.seed = result; - this.destination.next(result); + return higherOrderScan(accumulator, seed)(this); } + return higherOrderScan(accumulator)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 35bb86dbfc..bfb54ca5a3 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -2,4 +2,5 @@ export { concatMap } from './concatMap'; export { filter } from './filter'; export { map } from './map'; export { mergeMap } from './mergeMap'; +export { scan } from './scan'; export { switchMap } from './switchMap'; diff --git a/src/operators/scan.ts b/src/operators/scan.ts new file mode 100644 index 0000000000..4dc6046a45 --- /dev/null +++ b/src/operators/scan.ts @@ -0,0 +1,115 @@ +import { Operator } from '../Operator'; +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { OperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function scan(accumulator: (acc: T, value: T, index: number) => T, seed?: T): OperatorFunction; +export function scan(accumulator: (acc: T[], value: T, index: number) => T[], seed?: T[]): OperatorFunction; +export function scan(accumulator: (acc: R, value: T, index: number) => R, seed?: R): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Applies an accumulator function over the source Observable, and returns each + * intermediate result, with an optional seed value. + * + * It's like {@link reduce}, but emits the current + * accumulation whenever the source emits a value. + * + * + * + * Combines together all values emitted on the source, using an accumulator + * function that knows how to join a new source value into the accumulation from + * the past. Is similar to {@link reduce}, but emits the intermediate + * accumulations. + * + * Returns an Observable that applies a specified `accumulator` function to each + * item emitted by the source Observable. If a `seed` value is specified, then + * that value will be used as the initial value for the accumulator. If no seed + * value is specified, the first item of the source is used as the seed. + * + * @example Count the number of click events + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var ones = clicks.mapTo(1); + * var seed = 0; + * var count = ones.scan((acc, one) => acc + one, seed); + * count.subscribe(x => console.log(x)); + * + * @see {@link expand} + * @see {@link mergeScan} + * @see {@link reduce} + * + * @param {function(acc: R, value: T, index: number): R} accumulator + * The accumulator function called on each source value. + * @param {T|R} [seed] The initial accumulation value. + * @return {Observable} An observable of the accumulated values. + * @method scan + * @owner Observable + */ +export function scan(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): OperatorFunction { + let hasSeed = false; + // providing a seed of `undefined` *should* be valid and trigger + // hasSeed! so don't use `seed !== undefined` checks! + // For this reason, we have to check it here at the original call site + // otherwise inside Operator/Subscriber we won't know if `undefined` + // means they didn't provide anything or if they literally provided `undefined` + if (arguments.length >= 2) { + hasSeed = true; + } + + return function scanOperatorFunction(source: Observable): Observable { + return source.lift(new ScanOperator(accumulator, seed, hasSeed)); + }; +} + +class ScanOperator implements Operator { + constructor(private accumulator: (acc: R, value: T, index: number) => R, private seed?: T | R, private hasSeed: boolean = false) {} + + call(subscriber: Subscriber, source: any): any { + return source.subscribe(new ScanSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class ScanSubscriber extends Subscriber { + private index: number = 0; + + get seed(): T | R { + return this._seed; + } + + set seed(value: T | R) { + this.hasSeed = true; + this._seed = value; + } + + constructor(destination: Subscriber, private accumulator: (acc: R, value: T, index: number) => R, private _seed: T | R, + private hasSeed: boolean) { + super(destination); + } + + protected _next(value: T): void { + if (!this.hasSeed) { + this.seed = value; + this.destination.next(value); + } else { + return this._tryNext(value); + } + } + + private _tryNext(value: T): void { + const index = this.index++; + let result: any; + try { + result = this.accumulator(this.seed, value, index); + } catch (err) { + this.destination.error(err); + } + this.seed = result; + this.destination.next(result); + } +}