11import { Operator } from '../Operator' ;
22import { Observable } from '../Observable' ;
3+ import { Scheduler } from '../Scheduler' ;
34import { Subscriber } from '../Subscriber' ;
45import { tryCatch } from '../util/tryCatch' ;
56import { errorObject } from '../util/errorObject' ;
@@ -8,12 +9,13 @@ import {InnerSubscriber} from '../InnerSubscriber';
89import { subscribeToResult } from '../util/subscribeToResult' ;
910
1011export class ExpandOperator < T , R > implements Operator < T , R > {
11- constructor ( private project : ( value : T , index : number ) => Observable < any > ,
12- private concurrent : number = Number . POSITIVE_INFINITY ) {
12+ constructor ( private project : ( value : T , index : number ) => Observable < R > ,
13+ private concurrent : number ,
14+ private scheduler : Scheduler ) {
1315 }
1416
1517 call ( subscriber : Subscriber < R > ) : Subscriber < T > {
16- return new ExpandSubscriber ( subscriber , this . project , this . concurrent ) ;
18+ return new ExpandSubscriber ( subscriber , this . project , this . concurrent , this . scheduler ) ;
1719 }
1820}
1921
@@ -25,13 +27,18 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
2527
2628 constructor ( destination : Subscriber < R > ,
2729 private project : ( value : T , index : number ) => Observable < R > ,
28- private concurrent : number = Number . POSITIVE_INFINITY ) {
30+ private concurrent : number ,
31+ private scheduler : Scheduler ) {
2932 super ( destination ) ;
3033 if ( concurrent < Number . POSITIVE_INFINITY ) {
3134 this . buffer = [ ] ;
3235 }
3336 }
3437
38+ private static dispatch ( { subscriber, result, value, index} ) : void {
39+ subscriber . subscribeToProjection ( result , value , index ) ;
40+ }
41+
3542 _next ( value : any ) : void {
3643 const destination = this . destination ;
3744
@@ -46,19 +53,26 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
4653 let result = tryCatch ( this . project ) ( value , index ) ;
4754 if ( result === errorObject ) {
4855 destination . error ( result . e ) ;
56+ } else if ( ! this . scheduler ) {
57+ this . subscribeToProjection ( result , value , index ) ;
4958 } else {
50- if ( result . _isScalar ) {
51- this . _next ( result . value ) ;
52- } else {
53- this . active ++ ;
54- this . add ( subscribeToResult < T , R > ( this , result , value , index ) ) ;
55- }
59+ const state = { subscriber : this , result, value, index } ;
60+ this . add ( this . scheduler . schedule ( ExpandSubscriber . dispatch , 0 , state ) ) ;
5661 }
5762 } else {
5863 this . buffer . push ( value ) ;
5964 }
6065 }
6166
67+ private subscribeToProjection ( result , value : T , index : number ) : void {
68+ if ( result . _isScalar ) {
69+ this . _next ( result . value ) ;
70+ } else {
71+ this . active ++ ;
72+ this . add ( subscribeToResult < T , R > ( this , result , value , index ) ) ;
73+ }
74+ }
75+
6276 _complete ( ) : void {
6377 this . hasCompleted = true ;
6478 if ( this . hasCompleted && this . active === 0 ) {
0 commit comments