Skip to content

Commit 79e9084

Browse files
luisgabrielbenlesh
authored andcommitted
fix(expand): accept scheduler parameter
Also moves the handling of the default value for optional parameters to the expand function instead of the operator's ctor. Closes #841.
1 parent ef83066 commit 79e9084

File tree

5 files changed

+57
-15
lines changed

5 files changed

+57
-15
lines changed

spec/operators/expand-spec.js

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,4 +323,28 @@ describe('Observable.prototype.expand()', function () {
323323
done();
324324
});
325325
});
326-
});
326+
327+
it('should work when passing undefined for the optional arguments', function () {
328+
var values = {
329+
a: 1,
330+
b: 1 + 1, // a + a,
331+
c: 2 + 2, // b + b,
332+
d: 4 + 4, // c + c,
333+
e: 8 + 8, // d + d
334+
};
335+
var e1 = hot('(a|)', values);
336+
var e1subs = '^ ! ';
337+
var e2shape = '---(z|) ';
338+
var expected = 'a--b--c--d--(e|)';
339+
340+
var result = e1.expand(function (x) {
341+
if (x === 16) {
342+
return Observable.empty();
343+
}
344+
return cold(e2shape, { z: x + x });
345+
}, undefined, undefined);
346+
347+
expectObservable(result).toBe(expected, values);
348+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
349+
});
350+
});

src/CoreOperators.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export interface CoreOperators<T> {
2626
delay?: (delay: number, scheduler?: Scheduler) => Observable<T>;
2727
distinctUntilChanged?: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
2828
do?: (next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) => Observable<T>;
29-
expand?: <R>(project: (x: T, ix: number) => Observable<R>) => Observable<R>;
29+
expand?: <R>(project: (x: T, ix: number) => Observable<R>, concurrent: number, scheduler: Scheduler) => Observable<R>;
3030
filter?: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
3131
finally?: (ensure: () => void, thisArg?: any) => Observable<T>;
3232
first?: <R>(predicate?: (value: T, index: number, source: Observable<T>) => boolean,

src/Observable.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ export class Observable<T> implements CoreOperators<T> {
185185
delay: (delay: number, scheduler?: Scheduler) => Observable<T>;
186186
distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
187187
do: (next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) => Observable<T>;
188-
expand: <R>(project: (x: T, ix: number) => Observable<R>) => Observable<R>;
188+
expand: <R>(project: (x: T, ix: number) => Observable<R>, concurrent: number, scheduler: Scheduler) => Observable<R>;
189189
filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
190190
finally: (ensure: () => void, thisArg?: any) => Observable<T>;
191191
first: <R>(predicate?: (value: T, index: number, source: Observable<T>) => boolean,

src/operator/expand-support.ts

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {Operator} from '../Operator';
22
import {Observable} from '../Observable';
3+
import {Scheduler} from '../Scheduler';
34
import {Subscriber} from '../Subscriber';
45
import {tryCatch} from '../util/tryCatch';
56
import {errorObject} from '../util/errorObject';
@@ -8,12 +9,13 @@ import {InnerSubscriber} from '../InnerSubscriber';
89
import {subscribeToResult} from '../util/subscribeToResult';
910

1011
export class ExpandOperator<T, R> implements Operator<T, R> {
11-
constructor(private project: (value: T, index: number) => Observable<any>,
12-
private concurrent: number = Number.POSITIVE_INFINITY) {
12+
constructor(private project: (value: T, index: number) => Observable<R>,
13+
private concurrent: number,
14+
private scheduler: Scheduler) {
1315
}
1416

1517
call(subscriber: Subscriber<R>): Subscriber<T> {
16-
return new ExpandSubscriber(subscriber, this.project, this.concurrent);
18+
return new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler);
1719
}
1820
}
1921

@@ -25,13 +27,18 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
2527

2628
constructor(destination: Subscriber<R>,
2729
private project: (value: T, index: number) => Observable<R>,
28-
private concurrent: number = Number.POSITIVE_INFINITY) {
30+
private concurrent: number,
31+
private scheduler: Scheduler) {
2932
super(destination);
3033
if (concurrent < Number.POSITIVE_INFINITY) {
3134
this.buffer = [];
3235
}
3336
}
3437

38+
private static dispatch({subscriber, result, value, index}): void {
39+
subscriber.subscribeToProjection(result, value, index);
40+
}
41+
3542
_next(value: any): void {
3643
const destination = this.destination;
3744

@@ -46,19 +53,26 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
4653
let result = tryCatch(this.project)(value, index);
4754
if (result === errorObject) {
4855
destination.error(result.e);
56+
} else if (!this.scheduler) {
57+
this.subscribeToProjection(result, value, index);
4958
} else {
50-
if (result._isScalar) {
51-
this._next(result.value);
52-
} else {
53-
this.active++;
54-
this.add(subscribeToResult<T, R>(this, result, value, index));
55-
}
59+
const state = { subscriber: this, result, value, index };
60+
this.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state));
5661
}
5762
} else {
5863
this.buffer.push(value);
5964
}
6065
}
6166

67+
private subscribeToProjection(result, value: T, index: number): void {
68+
if (result._isScalar) {
69+
this._next(result.value);
70+
} else {
71+
this.active++;
72+
this.add(subscribeToResult<T, R>(this, result, value, index));
73+
}
74+
}
75+
6276
_complete(): void {
6377
this.hasCompleted = true;
6478
if (this.hasCompleted && this.active === 0) {

src/operator/expand.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import {Observable} from '../Observable';
2+
import {Scheduler} from '../Scheduler';
23
import {ExpandOperator} from './expand-support';
34

45
export function expand<T, R>(project: (value: T, index: number) => Observable<R>,
5-
concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
6-
return this.lift(new ExpandOperator(project, concurrent));
6+
concurrent: number = Number.POSITIVE_INFINITY,
7+
scheduler: Scheduler = undefined): Observable<R> {
8+
concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent;
9+
10+
return this.lift(new ExpandOperator(project, concurrent, scheduler));
711
}

0 commit comments

Comments
 (0)