Skip to content

Commit d4789cd

Browse files
Andre Medeirosbenlesh
authored andcommitted
fix(switchMapTo): reimplement switchMapTo to pass tests
Rewrite switchMapTo operator, which was previously behaving just like concatMapTo, because it reused mergeMapTo behavior, with concurrency parameter=1.
1 parent 56b42ad commit d4789cd

File tree

1 file changed

+62
-10
lines changed

1 file changed

+62
-10
lines changed

src/operators/switchMapTo.ts

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import Operator from '../Operator';
2+
import Observer from '../Observer';
23
import Observable from '../Observable';
34
import Subscriber from '../Subscriber';
45
import Subscription from '../Subscription';
5-
6-
import { MergeMapToSubscriber } from './mergeMapTo-support';
6+
import tryCatch from '../util/tryCatch';
7+
import { errorObject } from '../util/errorObject';
8+
import OuterSubscriber from '../OuterSubscriber';
9+
import subscribeToResult from '../util/subscribeToResult';
710

811
export default function switchMapTo<T, R, R2>(observable: Observable<R>,
912
projectResult?: (outerValue: T,
@@ -26,13 +29,62 @@ class SwitchMapToOperator<T, R, R2> implements Operator<T, R> {
2629
}
2730
}
2831

29-
class SwitchMapToSubscriber<T, R, R2> extends MergeMapToSubscriber<T, R, R2> {
30-
constructor(destination: Subscriber<R>,
31-
observable: Observable<R>,
32-
resultSelector?: (outerValue: T,
33-
innerValue: R,
34-
outerIndex: number,
35-
innerIndex: number) => R2) {
36-
super(destination, observable, resultSelector, 1);
32+
class SwitchMapToSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
33+
private innerSubscription: Subscription<T>;
34+
private hasCompleted = false;
35+
index: number = 0;
36+
37+
constructor(destination: Observer<T>,
38+
private inner: Observable<R>,
39+
private resultSelector?: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => R2) {
40+
super(destination);
41+
}
42+
43+
_next(value: any) {
44+
const index = this.index++;
45+
const innerSubscription = this.innerSubscription;
46+
if (innerSubscription) {
47+
innerSubscription.unsubscribe();
48+
}
49+
this.add(this.innerSubscription = subscribeToResult(this, this.inner, value, index));
50+
}
51+
52+
_complete() {
53+
const innerSubscription = this.innerSubscription;
54+
this.hasCompleted = true;
55+
if (!innerSubscription || innerSubscription.isUnsubscribed) {
56+
this.destination.complete();
57+
}
58+
}
59+
60+
notifyComplete(innerSub: Subscription<R>) {
61+
this.remove(innerSub);
62+
const prevSubscription = this.innerSubscription;
63+
if (prevSubscription) {
64+
prevSubscription.unsubscribe();
65+
}
66+
this.innerSubscription = null;
67+
68+
if (this.hasCompleted) {
69+
this.destination.complete();
70+
}
71+
}
72+
73+
notifyError(err: any) {
74+
this.destination.error(err);
75+
}
76+
77+
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) {
78+
const { resultSelector, destination } = this;
79+
if (resultSelector) {
80+
const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex);
81+
if (result === errorObject) {
82+
destination.error(errorObject.e);
83+
} else {
84+
destination.next(result);
85+
}
86+
} else {
87+
destination.next(innerValue);
88+
}
3789
}
3890
}

0 commit comments

Comments
 (0)