Skip to content

Commit

Permalink
fix(switchMapTo): reimplement switchMapTo to pass tests
Browse files Browse the repository at this point in the history
Rewrite switchMapTo operator, which was previously behaving just like
concatMapTo, because it reused mergeMapTo behavior, with concurrency
parameter=1.
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 27, 2015
1 parent 56b42ad commit d4789cd
Showing 1 changed file with 62 additions and 10 deletions.
72 changes: 62 additions & 10 deletions src/operators/switchMapTo.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Observable from '../Observable';
import Subscriber from '../Subscriber';
import Subscription from '../Subscription';

import { MergeMapToSubscriber } from './mergeMapTo-support';
import tryCatch from '../util/tryCatch';
import { errorObject } from '../util/errorObject';
import OuterSubscriber from '../OuterSubscriber';
import subscribeToResult from '../util/subscribeToResult';

export default function switchMapTo<T, R, R2>(observable: Observable<R>,
projectResult?: (outerValue: T,
Expand All @@ -26,13 +29,62 @@ class SwitchMapToOperator<T, R, R2> implements Operator<T, R> {
}
}

class SwitchMapToSubscriber<T, R, R2> extends MergeMapToSubscriber<T, R, R2> {
constructor(destination: Subscriber<R>,
observable: Observable<R>,
resultSelector?: (outerValue: T,
innerValue: R,
outerIndex: number,
innerIndex: number) => R2) {
super(destination, observable, resultSelector, 1);
class SwitchMapToSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
private innerSubscription: Subscription<T>;
private hasCompleted = false;
index: number = 0;

constructor(destination: Observer<T>,
private inner: Observable<R>,
private resultSelector?: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => R2) {
super(destination);
}

_next(value: any) {
const index = this.index++;
const innerSubscription = this.innerSubscription;
if (innerSubscription) {
innerSubscription.unsubscribe();
}
this.add(this.innerSubscription = subscribeToResult(this, this.inner, value, index));
}

_complete() {
const innerSubscription = this.innerSubscription;
this.hasCompleted = true;
if (!innerSubscription || innerSubscription.isUnsubscribed) {
this.destination.complete();
}
}

notifyComplete(innerSub: Subscription<R>) {
this.remove(innerSub);
const prevSubscription = this.innerSubscription;
if (prevSubscription) {
prevSubscription.unsubscribe();
}
this.innerSubscription = null;

if (this.hasCompleted) {
this.destination.complete();
}
}

notifyError(err: any) {
this.destination.error(err);
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) {
const { resultSelector, destination } = this;
if (resultSelector) {
const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex);
if (result === errorObject) {
destination.error(errorObject.e);
} else {
destination.next(result);
}
} else {
destination.next(innerValue);
}
}
}

0 comments on commit d4789cd

Please sign in to comment.