diff --git a/src/operator/mergeMap.ts b/src/operator/mergeMap.ts index b5076f0ef4..71e1b98209 100644 --- a/src/operator/mergeMap.ts +++ b/src/operator/mergeMap.ts @@ -2,8 +2,6 @@ import {Observable} from '../Observable'; import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; -import {tryCatch} from '../util/tryCatch'; -import {errorObject} from '../util/errorObject'; import {subscribeToResult} from '../util/subscribeToResult'; import {OuterSubscriber} from '../OuterSubscriber'; @@ -39,7 +37,7 @@ export class MergeMapOperator implements Operator { export class MergeMapSubscriber extends OuterSubscriber { private hasCompleted: boolean = false; - private buffer: T[] = []; + private buffer: Observable[] = []; private active: number = 0; protected index: number = 0; @@ -50,23 +48,28 @@ export class MergeMapSubscriber extends OuterSubscriber { super(destination); } - protected _next(value: T): void { + protected _next(value: any): void { if (this.active < this.concurrent) { - const index = this.index++; - const ish = tryCatch(this.project)(value, index); - const destination = this.destination; - if (ish === errorObject) { - destination.error(errorObject.e); - } else { - this.active++; - this._innerSub(ish, value, index); - } + this._tryNext(value); } else { this.buffer.push(value); } } - private _innerSub(ish: Observable, value: T, index: number): void { + protected _tryNext(value: any) { + let result: any; + const index = this.index++; + try { + result = this.project(value, index); + } catch (err) { + this.destination.error(err); + return; + } + this.active++; + this._innerSub(result, value, index); + } + + private _innerSub(ish: any, value: T, index: number): void { this.add(subscribeToResult(this, ish, value, index)); } @@ -78,17 +81,22 @@ export class MergeMapSubscriber extends OuterSubscriber { } notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { - const { destination, resultSelector } = this; - if (resultSelector) { - const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex); - if (result === errorObject) { - destination.error(errorObject.e); - } else { - destination.next(result); - } + if (this.resultSelector) { + this._notifyResultSelector(outerValue, innerValue, outerIndex, innerIndex); } else { - destination.next(innerValue); + this.destination.next(innerValue); + } + } + + _notifyResultSelector(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) { + let result: any; + try { + result = this.resultSelector(outerValue, innerValue, outerIndex, innerIndex); + } catch (err) { + this.destination.error(err); + return; } + this.destination.next(result); } notifyComplete(innerSub: Subscription): void {