Skip to content

Commit

Permalink
refactor(mergeMap/switchMap): add handling for different inner return…
Browse files Browse the repository at this point in the history
… types

- support promise
- support array
- support iterable
- support observable (lowercase o)
- support Observable
  • Loading branch information
benlesh committed Sep 23, 2015
1 parent fef727c commit d249895
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 128 deletions.
27 changes: 19 additions & 8 deletions spec/operators/merge-map-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,39 @@ var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.mergeMap()', function () {
it('should map and flatten', function (done) {
it('should map and flatten', function (){
var source = Observable.of(1, 2, 3, 4).mergeMap(function (x) {
return Observable.of(x + '!');
});

var expected = ['1!', '2!', '3!', '4!'];
var i = 0;
var completed = false;

var sub = source.subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);
expect(x).toBe(expected.shift());
}, null, function(){
expect(expected.length).toBe(0);
completed = true;
});

expect(completed).toBe(true);
});
it('should map and flatten an Array', function (done) {

it('should map and flatten an Array', function () {
var source = Observable.of(1, 2, 3, 4).mergeMap(function (x) {
return [x + '!'];
});

var expected = ['1!', '2!', '3!', '4!'];
var i = 0;
var completed = false;

var sub = source.subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);
expect(x).toBe(expected.shift());
}, null, function(){
expect(expected.length).toBe(0);
completed = true;
});

expect(completed).toBe(true);
});
});
2 changes: 1 addition & 1 deletion spec/operators/switchMap-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,5 @@ describe('Observable.prototype.switchMap()', function () {
}, function(innerValue, outerValue, innerIndex, outerIndex) {
return [innerValue, outerValue, innerIndex, outerIndex];
})).toBe(expected, expectedValues);
})
});
});
26 changes: 26 additions & 0 deletions src/InnerSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import Subscriber from './Subscriber';
import Observer from './Observer';
import OuterSubscriber from './OuterSubscriber';
import tryCatch from './util/tryCatch';
import { errorObject } from './util/errorObject';

export default class InnerSubscriber<T, R> extends Subscriber<R> {
index: number = 0;

constructor(private parent: OuterSubscriber<T, R>, private outerValue: T, private outerIndex: number) {
super();
}

_next(value: R) {
const index = this.index++;
this.parent.notifyNext(value, this.outerValue, index, this.outerIndex);
}

_error(error: any) {
this.parent.notifyError(error, this);
}

_complete() {
this.parent.notifyComplete(this);
}
}
17 changes: 17 additions & 0 deletions src/OuterSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import InnerSubscriber from './InnerSubscriber';
import Subscriber from './Subscriber';


export default class OuterSubscriber<T, R> extends Subscriber<T> {
notifyComplete(inner?: InnerSubscriber<T, R>) {
this.destination.complete();
}

notifyNext(innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) {
this.destination.next(innerValue);
}

notifyError(error?: any, inner?: InnerSubscriber<T, R>) {
this.destination.error(error);
}
}
99 changes: 25 additions & 74 deletions src/operators/mergeMap-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ import Subscription from '../Subscription';
import Observer from '../Observer';
import tryCatch from '../util/tryCatch';
import { errorObject } from '../util/errorObject';

export default function mergeMap<T, R, R2>(project: (value: T, index: number) => Observable<R>,
resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R,
concurrent: number = Number.POSITIVE_INFINITY) {
return this.lift(new MergeMapOperator(project, resultSelector, concurrent));
}
import subscribeToResult from '../util/subscribeToResult';
import OuterSubscriber from '../OuterSubscriber';
import InnerSubscriber from '../InnerSubscriber';

export class MergeMapOperator<T, R, R2> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => Observable<R>,
Expand All @@ -23,7 +20,7 @@ export class MergeMapOperator<T, R, R2> implements Operator<T, R> {
}
}

export class MergeMapSubscriber<T, R, R2> extends Subscriber<T> {
export class MergeMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
private hasCompleted: boolean = false;
private buffer: Observable<any>[] = [];
private active: number = 0;
Expand All @@ -40,50 +37,21 @@ export class MergeMapSubscriber<T, R, R2> extends Subscriber<T> {
if(this.active < this.concurrent) {
const resultSelector = this.resultSelector;
const index = this.index++;
const observable = tryCatch(this.project)(value, index);
if(observable === errorObject) {
this.destination.error(observable.e);
const ish = tryCatch(this.project)(value, index);
const destination = this.destination;
if(ish === errorObject) {
destination.error(ish.e);
} else {
this._innerSubscribe(observable, value, index);
this.active++;
this._innerSub(ish, value, index);
}
} else {
this.buffer.push(value);
}
}

_innerSubscribe(observable, value, index) {
const resultSelector = this.resultSelector;
if(observable._isScalar) {
if(resultSelector) {
let result = tryCatch(resultSelector)(observable.value, value, 0, index);
if(result === errorObject) {
this.destination.error(result.e);
} else {
this.destination.next(result);
}
} else {
this.destination.next(observable.value);
}
} else if (Array.isArray(observable)) {
let arrIndex = -1;
const arrLen = observable.length;
while (++arrIndex < arrLen) {
const arrValue = observable[arrIndex];
if(resultSelector) {
let result = tryCatch(resultSelector)(arrValue, value, arrIndex, index);
if(result === errorObject) {
this.destination.error(result.e);
} else {
this.destination.next(result);
}
} else {
this.destination.next(arrValue);
}
}
} else {
this.active++;
this.add(observable.subscribe(new MergeMapInnerSubscriber(this.destination, this, value, index, resultSelector)));
}
_innerSub(ish: any, value: T, index: number) {
this.add(subscribeToResult<T,R,R2>(this, ish, value, index));
}

_complete() {
Expand All @@ -93,6 +61,19 @@ export class MergeMapSubscriber<T, R, R2> extends Subscriber<T> {
}
}

notifyNext(innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) {
const { destination, resultSelector } = this;
if(resultSelector) {
const result = tryCatch(resultSelector)(innerValue, outerValue, innerIndex, outerIndex);
if(result === errorObject) {
destination.error(errorObject.e);
} else {
destination.next(result);
}
}
destination.next(innerValue);
}

notifyComplete(innerSub: Subscription<T>) {
const buffer = this.buffer;
this.remove(innerSub);
Expand All @@ -103,34 +84,4 @@ export class MergeMapSubscriber<T, R, R2> extends Subscriber<T> {
this.destination.complete();
}
}
}

export class MergeMapInnerSubscriber<T, R, R2> extends Subscriber<R> {
index: number = 0;

constructor(destination: Observer<R>, private parent: MergeMapSubscriber<any, R, R2>,
private outerValue: T,
private outerIndex: number,
private resultSelector?: (innerValue: T, outerValue: R, innerIndex: number, outerIndex: number) => R2) {
super(destination);
}

_next(value: R) {
const resultSelector = this.resultSelector;
const index = this.index++;
if(resultSelector) {
let result = tryCatch(resultSelector)(value, this.outerValue, index, this.outerIndex);
if(result === errorObject) {
this.destination.error(result.e);
} else {
this.destination.next(result);
}
} else {
this.destination.next(value);
}
}

_complete() {
this.parent.notifyComplete(this);
}
}
85 changes: 73 additions & 12 deletions src/operators/mergeMapTo-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,91 @@ import Operator from '../Operator';
import Observer from '../Observer';
import Observable from '../Observable';
import Subscriber from '../Subscriber';
import { MergeMapSubscriber } from './mergeMap-support';
import tryCatch from '../util/tryCatch';
import { errorObject } from '../util/errorObject';
import OuterSubscriber from '../OuterSubscriber';
import subscribeToResult from '../util/subscribeToResult';
import InnerSubscriber from '../InnerSubscriber';

export class MergeMapToOperator<T, R, R2> implements Operator<T,R> {
constructor(private observable: Observable<R>,
constructor(private ish: any,
private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2,
private concurrent: number = Number.POSITIVE_INFINITY) {

}

call(observer: Subscriber<R>): Subscriber<T> {
return new MergeMapToSubscriber(observer, this.observable, this.resultSelector, this.concurrent);
return new MergeMapToSubscriber(observer, this.ish, this.resultSelector, this.concurrent);
}
}

export class MergeMapToSubscriber<T, R, R2> extends MergeMapSubscriber<T, R, R2> {
constructor(destination: Observer<T>, private observable: Observable<R>,
resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2,
concurrent: number = Number.POSITIVE_INFINITY) {
super(destination, null, resultSelector, concurrent);
export class MergeMapToSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
private hasCompleted: boolean = false;
private buffer: Observable<any>[] = [];
private active: number = 0;
protected index: number = 0;

constructor(destination: Observer<T>,
private ish: any,
private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2,
private concurrent: number = Number.POSITIVE_INFINITY) {
super(destination);
}

_next(value: any) {
if(this.active < this.concurrent) {
const resultSelector = this.resultSelector;
const index = this.index++;
const ish = this.ish;
const destination = this.destination;
if(ish === errorObject) {
destination.error(ish.e);
} else {
this.active--;
this._innerSub(ish, destination, resultSelector, value, index);
}
} else {
this.buffer.push(value);
}
}

_innerSub(ish: any, destination: Observer<R>, resultSelector: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, value: T, index: number) {
this.add(subscribeToResult<T,R,R2>(this, ish, value, index));
}

_next(value: T) {
const observable = this.observable;
const index = this.index++;
super._innerSubscribe(observable, value, index);
_complete() {
this.hasCompleted = true;
if(this.active === 0 && this.buffer.length === 0) {
this.destination.complete();
}
}

notifyNext(innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) {
const { resultSelector, destination } = this;
if(resultSelector) {
const result = tryCatch(resultSelector)(innerValue, outerValue, innerIndex, outerIndex);
if(result === errorObject) {
destination.error(errorObject.e);
} else {
destination.next(result);
}
} else {
destination.next(innerValue);
}
}

notifyError(err: any) {
this.destination.error(err);
}

notifyComplete(innerSub: InnerSubscriber<T, R>) {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if(buffer.length > 0) {
this._next(buffer.shift());
} else if (this.active === 0 && this.hasCompleted) {
this.destination.complete();
}
}
}
Loading

0 comments on commit d249895

Please sign in to comment.