1
1
import Operator from '../Operator' ;
2
+ import Observer from '../Observer' ;
2
3
import Observable from '../Observable' ;
3
4
import Subscriber from '../Subscriber' ;
4
5
import Subscription from '../Subscription' ;
5
-
6
- import { MergeMapToSubscriber } from './mergeMapTo-support' ;
6
+ import tryCatch from '../util/tryCatch' ;
7
+ import { errorObject } from '../util/errorObject' ;
8
+ import OuterSubscriber from '../OuterSubscriber' ;
9
+ import subscribeToResult from '../util/subscribeToResult' ;
7
10
8
11
export default function switchMapTo < T , R , R2 > ( observable : Observable < R > ,
9
12
projectResult ?: ( outerValue : T ,
@@ -26,13 +29,62 @@ class SwitchMapToOperator<T, R, R2> implements Operator<T, R> {
26
29
}
27
30
}
28
31
29
- class SwitchMapToSubscriber < T , R , R2 > extends MergeMapToSubscriber < T , R , R2 > {
30
- constructor ( destination : Subscriber < R > ,
31
- observable : Observable < R > ,
32
- resultSelector ?: ( outerValue : T ,
33
- innerValue : R ,
34
- outerIndex : number ,
35
- innerIndex : number ) => R2 ) {
36
- super ( destination , observable , resultSelector , 1 ) ;
32
+ class SwitchMapToSubscriber < T , R , R2 > extends OuterSubscriber < T , R > {
33
+ private innerSubscription : Subscription < T > ;
34
+ private hasCompleted = false ;
35
+ index : number = 0 ;
36
+
37
+ constructor ( destination : Observer < T > ,
38
+ private inner : Observable < R > ,
39
+ private resultSelector ?: ( outerValue : T , innerValue : R , outerIndex : number , innerIndex : number ) => R2 ) {
40
+ super ( destination ) ;
41
+ }
42
+
43
+ _next ( value : any ) {
44
+ const index = this . index ++ ;
45
+ const innerSubscription = this . innerSubscription ;
46
+ if ( innerSubscription ) {
47
+ innerSubscription . unsubscribe ( ) ;
48
+ }
49
+ this . add ( this . innerSubscription = subscribeToResult ( this , this . inner , value , index ) ) ;
50
+ }
51
+
52
+ _complete ( ) {
53
+ const innerSubscription = this . innerSubscription ;
54
+ this . hasCompleted = true ;
55
+ if ( ! innerSubscription || innerSubscription . isUnsubscribed ) {
56
+ this . destination . complete ( ) ;
57
+ }
58
+ }
59
+
60
+ notifyComplete ( innerSub : Subscription < R > ) {
61
+ this . remove ( innerSub ) ;
62
+ const prevSubscription = this . innerSubscription ;
63
+ if ( prevSubscription ) {
64
+ prevSubscription . unsubscribe ( ) ;
65
+ }
66
+ this . innerSubscription = null ;
67
+
68
+ if ( this . hasCompleted ) {
69
+ this . destination . complete ( ) ;
70
+ }
71
+ }
72
+
73
+ notifyError ( err : any ) {
74
+ this . destination . error ( err ) ;
75
+ }
76
+
77
+ notifyNext ( outerValue : T , innerValue : R , outerIndex : number , innerIndex : number ) {
78
+ const { resultSelector, destination } = this ;
79
+ if ( resultSelector ) {
80
+ const result = tryCatch ( resultSelector ) ( outerValue , innerValue , outerIndex , innerIndex ) ;
81
+ if ( result === errorObject ) {
82
+ destination . error ( errorObject . e ) ;
83
+ } else {
84
+ destination . next ( result ) ;
85
+ }
86
+ } else {
87
+ destination . next ( innerValue ) ;
88
+ }
37
89
}
38
90
}
0 commit comments