-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(scan): add higher-order lettable version of scan
note arguments length checking in prototype version
- Loading branch information
Showing
3 changed files
with
120 additions
and
63 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import { Operator } from '../Operator'; | ||
import { Observable } from '../Observable'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { OperatorFunction } from '../interfaces'; | ||
|
||
/* tslint:disable:max-line-length */ | ||
export function scan<T>(accumulator: (acc: T, value: T, index: number) => T, seed?: T): OperatorFunction<T, T>; | ||
export function scan<T>(accumulator: (acc: T[], value: T, index: number) => T[], seed?: T[]): OperatorFunction<T, T[]>; | ||
export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: R): OperatorFunction<T, R>; | ||
/* tslint:enable:max-line-length */ | ||
|
||
/** | ||
* Applies an accumulator function over the source Observable, and returns each | ||
* intermediate result, with an optional seed value. | ||
* | ||
* <span class="informal">It's like {@link reduce}, but emits the current | ||
* accumulation whenever the source emits a value.</span> | ||
* | ||
* <img src="./img/scan.png" width="100%"> | ||
* | ||
* Combines together all values emitted on the source, using an accumulator | ||
* function that knows how to join a new source value into the accumulation from | ||
* the past. Is similar to {@link reduce}, but emits the intermediate | ||
* accumulations. | ||
* | ||
* Returns an Observable that applies a specified `accumulator` function to each | ||
* item emitted by the source Observable. If a `seed` value is specified, then | ||
* that value will be used as the initial value for the accumulator. If no seed | ||
* value is specified, the first item of the source is used as the seed. | ||
* | ||
* @example <caption>Count the number of click events</caption> | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var ones = clicks.mapTo(1); | ||
* var seed = 0; | ||
* var count = ones.scan((acc, one) => acc + one, seed); | ||
* count.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link expand} | ||
* @see {@link mergeScan} | ||
* @see {@link reduce} | ||
* | ||
* @param {function(acc: R, value: T, index: number): R} accumulator | ||
* The accumulator function called on each source value. | ||
* @param {T|R} [seed] The initial accumulation value. | ||
* @return {Observable<R>} An observable of the accumulated values. | ||
* @method scan | ||
* @owner Observable | ||
*/ | ||
export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): OperatorFunction<T, R> { | ||
let hasSeed = false; | ||
// providing a seed of `undefined` *should* be valid and trigger | ||
// hasSeed! so don't use `seed !== undefined` checks! | ||
// For this reason, we have to check it here at the original call site | ||
// otherwise inside Operator/Subscriber we won't know if `undefined` | ||
// means they didn't provide anything or if they literally provided `undefined` | ||
if (arguments.length >= 2) { | ||
hasSeed = true; | ||
} | ||
|
||
return function scanOperatorFunction(source: Observable<T>): Observable<R> { | ||
return source.lift(new ScanOperator(accumulator, seed, hasSeed)); | ||
}; | ||
} | ||
|
||
class ScanOperator<T, R> implements Operator<T, R> { | ||
constructor(private accumulator: (acc: R, value: T, index: number) => R, private seed?: T | R, private hasSeed: boolean = false) {} | ||
|
||
call(subscriber: Subscriber<R>, source: any): any { | ||
return source.subscribe(new ScanSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class ScanSubscriber<T, R> extends Subscriber<T> { | ||
private index: number = 0; | ||
|
||
get seed(): T | R { | ||
return this._seed; | ||
} | ||
|
||
set seed(value: T | R) { | ||
this.hasSeed = true; | ||
this._seed = value; | ||
} | ||
|
||
constructor(destination: Subscriber<R>, private accumulator: (acc: R, value: T, index: number) => R, private _seed: T | R, | ||
private hasSeed: boolean) { | ||
super(destination); | ||
} | ||
|
||
protected _next(value: T): void { | ||
if (!this.hasSeed) { | ||
this.seed = value; | ||
this.destination.next(value); | ||
} else { | ||
return this._tryNext(value); | ||
} | ||
} | ||
|
||
private _tryNext(value: T): void { | ||
const index = this.index++; | ||
let result: any; | ||
try { | ||
result = this.accumulator(<R>this.seed, value, index); | ||
} catch (err) { | ||
this.destination.error(err); | ||
} | ||
this.seed = result; | ||
this.destination.next(result); | ||
} | ||
} |