diff --git a/src/operator/mergeScan.ts b/src/operator/mergeScan.ts index 62f9c6f6da..682708c2fa 100644 --- a/src/operator/mergeScan.ts +++ b/src/operator/mergeScan.ts @@ -1,12 +1,6 @@ -import { Operator } from '../Operator'; + import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { tryCatch } from '../util/tryCatch'; -import { errorObject } from '../util/errorObject'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; +import { mergeScan as higherOrder } from '../operators/mergeScan'; /** * Applies an accumulator function over the source Observable where the @@ -43,91 +37,5 @@ export function mergeScan(this: Observable, accumulator: (acc: R, value: T) => Observable, seed: R, concurrent: number = Number.POSITIVE_INFINITY): Observable { - return this.lift(new MergeScanOperator(accumulator, seed, concurrent)); -} - -export class MergeScanOperator implements Operator { - constructor(private accumulator: (acc: R, value: T) => Observable, - private seed: R, - private concurrent: number) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new MergeScanSubscriber( - subscriber, this.accumulator, this.seed, this.concurrent - )); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class MergeScanSubscriber extends OuterSubscriber { - private hasValue: boolean = false; - private hasCompleted: boolean = false; - private buffer: Observable[] = []; - private active: number = 0; - protected index: number = 0; - - constructor(destination: Subscriber, - private accumulator: (acc: R, value: T) => Observable, - private acc: R, - private concurrent: number) { - super(destination); - } - - protected _next(value: any): void { - if (this.active < this.concurrent) { - const index = this.index++; - const ish = tryCatch(this.accumulator)(this.acc, value); - const destination = this.destination; - if (ish === errorObject) { - destination.error(errorObject.e); - } else { - this.active++; - this._innerSub(ish, value, index); - } - } else { - this.buffer.push(value); - } - } - - private _innerSub(ish: any, value: T, index: number): void { - this.add(subscribeToResult(this, ish, value, index)); - } - - protected _complete(): void { - this.hasCompleted = true; - if (this.active === 0 && this.buffer.length === 0) { - if (this.hasValue === false) { - this.destination.next(this.acc); - } - this.destination.complete(); - } - } - - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - const { destination } = this; - this.acc = innerValue; - this.hasValue = true; - destination.next(innerValue); - } - - notifyComplete(innerSub: Subscription): void { - const buffer = this.buffer; - this.remove(innerSub); - this.active--; - if (buffer.length > 0) { - this._next(buffer.shift()); - } else if (this.active === 0 && this.hasCompleted) { - if (this.hasValue === false) { - this.destination.next(this.acc); - } - this.destination.complete(); - } - } + return higherOrder(accumulator, seed, concurrent)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 5d26a1e038..6838d2fcad 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -41,6 +41,7 @@ export { merge } from './merge'; export { mergeAll } from './mergeAll'; export { mergeMap } from './mergeMap'; export { mergeMapTo } from './mergeMapTo'; +export { mergeScan } from './mergeScan'; export { min } from './min'; export { multicast } from './multicast'; export { observeOn } from './observeOn'; diff --git a/src/operators/mergeScan.ts b/src/operators/mergeScan.ts new file mode 100644 index 0000000000..b45da5a83e --- /dev/null +++ b/src/operators/mergeScan.ts @@ -0,0 +1,133 @@ +import { Operator } from '../Operator'; +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; +import { tryCatch } from '../util/tryCatch'; +import { errorObject } from '../util/errorObject'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { OperatorFunction } from '../interfaces'; + +/** + * Applies an accumulator function over the source Observable where the + * accumulator function itself returns an Observable, then each intermediate + * Observable returned is merged into the output Observable. + * + * It's like {@link scan}, but the Observables returned + * by the accumulator are merged into the outer Observable. + * + * @example Count the number of click events + * const click$ = Rx.Observable.fromEvent(document, 'click'); + * const one$ = click$.mapTo(1); + * const seed = 0; + * const count$ = one$.mergeScan((acc, one) => Rx.Observable.of(acc + one), seed); + * count$.subscribe(x => console.log(x)); + * + * // Results: + * 1 + * 2 + * 3 + * 4 + * // ...and so on for each click + * + * @param {function(acc: R, value: T): Observable} accumulator + * The accumulator function called on each source value. + * @param seed The initial accumulation value. + * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of + * input Observables being subscribed to concurrently. + * @return {Observable} An observable of the accumulated values. + * @method mergeScan + * @owner Observable + */ +export function mergeScan(accumulator: (acc: R, value: T) => Observable, + seed: R, + concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction { + return (source: Observable) => source.lift(new MergeScanOperator(accumulator, seed, concurrent)); +} + +export class MergeScanOperator implements Operator { + constructor(private accumulator: (acc: R, value: T) => Observable, + private seed: R, + private concurrent: number) { + } + + call(subscriber: Subscriber, source: any): any { + return source.subscribe(new MergeScanSubscriber( + subscriber, this.accumulator, this.seed, this.concurrent + )); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +export class MergeScanSubscriber extends OuterSubscriber { + private hasValue: boolean = false; + private hasCompleted: boolean = false; + private buffer: Observable[] = []; + private active: number = 0; + protected index: number = 0; + + constructor(destination: Subscriber, + private accumulator: (acc: R, value: T) => Observable, + private acc: R, + private concurrent: number) { + super(destination); + } + + protected _next(value: any): void { + if (this.active < this.concurrent) { + const index = this.index++; + const ish = tryCatch(this.accumulator)(this.acc, value); + const destination = this.destination; + if (ish === errorObject) { + destination.error(errorObject.e); + } else { + this.active++; + this._innerSub(ish, value, index); + } + } else { + this.buffer.push(value); + } + } + + private _innerSub(ish: any, value: T, index: number): void { + this.add(subscribeToResult(this, ish, value, index)); + } + + protected _complete(): void { + this.hasCompleted = true; + if (this.active === 0 && this.buffer.length === 0) { + if (this.hasValue === false) { + this.destination.next(this.acc); + } + this.destination.complete(); + } + } + + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + const { destination } = this; + this.acc = innerValue; + this.hasValue = true; + destination.next(innerValue); + } + + notifyComplete(innerSub: Subscription): void { + const buffer = this.buffer; + this.remove(innerSub); + this.active--; + if (buffer.length > 0) { + this._next(buffer.shift()); + } else if (this.active === 0 && this.hasCompleted) { + if (this.hasValue === false) { + this.destination.next(this.acc); + } + this.destination.complete(); + } + } +}