@@ -3,6 +3,8 @@ import Operator from '../Operator';
3
3
import Subscriber from '../Subscriber' ;
4
4
import Observer from '../Observer' ;
5
5
import Subscription from '../Subscription' ;
6
+ import OuterSubscriber from '../OuterSubscriber' ;
7
+ import subscribeToResult from '../util/subscribeToResult' ;
6
8
7
9
export class MergeAllOperator < T , R > implements Operator < T , R > {
8
10
constructor ( private concurrent : number ) {
@@ -14,10 +16,11 @@ export class MergeAllOperator<T, R> implements Operator<T, R> {
14
16
}
15
17
}
16
18
17
- export class MergeAllSubscriber < T > extends Subscriber < T > {
19
+ export class MergeAllSubscriber < T , R > extends OuterSubscriber < T , R > {
18
20
private hasCompleted : boolean = false ;
19
21
private buffer : Observable < any > [ ] = [ ] ;
20
22
private active : number = 0 ;
23
+
21
24
constructor ( destination : Observer < T > , private concurrent :number ) {
22
25
super ( destination ) ;
23
26
}
@@ -28,7 +31,7 @@ export class MergeAllSubscriber<T> extends Subscriber<T> {
28
31
this . destination . next ( observable . value ) ;
29
32
} else {
30
33
this . active ++ ;
31
- this . add ( observable . subscribe ( new MergeAllInnerSubscriber ( this . destination , this ) ) )
34
+ this . add ( subscribeToResult < T , R > ( this , observable ) ) ;
32
35
}
33
36
} else {
34
37
this . buffer . push ( observable ) ;
@@ -52,14 +55,4 @@ export class MergeAllSubscriber<T> extends Subscriber<T> {
52
55
this . destination . complete ( ) ;
53
56
}
54
57
}
55
- }
56
-
57
- export class MergeAllInnerSubscriber < T > extends Subscriber < T > {
58
- constructor ( destination : Observer < T > , private parent : MergeAllSubscriber < T > ) {
59
- super ( destination ) ;
60
- }
61
-
62
- _complete ( ) {
63
- this . parent . notifyComplete ( this ) ;
64
- }
65
58
}
0 commit comments