From f0b703b1f2925a20e905a5f8016688c5233e8e56 Mon Sep 17 00:00:00 2001 From: Jason Aden Date: Thu, 28 Sep 2017 15:18:24 -0700 Subject: [PATCH] fix(mergeAll): use higher-order lettable version of mergeAll --- src/operator/mergeAll.ts | 59 ++-------------------------------------- 1 file changed, 2 insertions(+), 57 deletions(-) diff --git a/src/operator/mergeAll.ts b/src/operator/mergeAll.ts index 73fed64cfd..fa540c974d 100644 --- a/src/operator/mergeAll.ts +++ b/src/operator/mergeAll.ts @@ -1,10 +1,6 @@ import { Observable } from '../Observable'; -import { Operator } from '../Operator'; -import { Observer } from '../Observer'; -import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; import { Subscribable } from '../Observable'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { mergeAll as higherOrder } from '../operators/mergeAll'; export function mergeAll(this: Observable, concurrent?: number): T; export function mergeAll(this: Observable, concurrent?: number): Subscribable; @@ -54,56 +50,5 @@ export function mergeAll(this: Observable, concurrent?: number): Subscr * @owner Observable */ export function mergeAll(this: Observable, concurrent: number = Number.POSITIVE_INFINITY): T { - return this.lift(new MergeAllOperator(concurrent)); -} - -export class MergeAllOperator implements Operator, T> { - constructor(private concurrent: number) { - } - - call(observer: Observer, source: any): any { - return source.subscribe(new MergeAllSubscriber(observer, this.concurrent)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class MergeAllSubscriber extends OuterSubscriber, T> { - private hasCompleted: boolean = false; - private buffer: Observable[] = []; - private active: number = 0; - - constructor(destination: Observer, private concurrent: number) { - super(destination); - } - - protected _next(observable: Observable) { - if (this.active < this.concurrent) { - this.active++; - this.add(subscribeToResult, T>(this, observable)); - } else { - this.buffer.push(observable); - } - } - - protected _complete() { - this.hasCompleted = true; - if (this.active === 0 && this.buffer.length === 0) { - this.destination.complete(); - } - } - - notifyComplete(innerSub: Subscription) { - const buffer = this.buffer; - this.remove(innerSub); - this.active--; - if (buffer.length > 0) { - this._next(buffer.shift()); - } else if (this.active === 0 && this.hasCompleted) { - this.destination.complete(); - } - } + return higherOrder(concurrent)(this); }