From c5239e9dbda675009b6f00c63a1b213eee978263 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 22 Sep 2015 09:39:36 -0700 Subject: [PATCH] feat(expand): now handles promises, iterables and lowercase-o observables - also minor refactoring to subscribeToResult function --- spec/operators/expand-spec.js | 62 +++++++++++++++++++++++++++++ spec/operators/switch-spec.js | 2 +- src/operators/expand-support.ts | 24 ++++------- src/operators/mergeMap-support.ts | 2 +- src/operators/mergeMapTo-support.ts | 2 +- src/util/subscribeToResult.ts | 2 +- 6 files changed, 74 insertions(+), 20 deletions(-) diff --git a/spec/operators/expand-spec.js b/spec/operators/expand-spec.js index f18d5292e8..5b605c2bbd 100644 --- a/spec/operators/expand-spec.js +++ b/spec/operators/expand-spec.js @@ -1,6 +1,7 @@ /* globals describe, it, expect, expectObservable, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; +var Promise = require('promise'); describe('Observable.prototype.expand()', function () { it('should map and recursively flatten', function() { @@ -54,4 +55,65 @@ describe('Observable.prototype.expand()', function () { return Observable.of(x + x); // scalar })).toBe(expected, values); }); + + it('should recursively flatten promises', function(done) { + var expected = [1, 2, 4, 8, 16]; + Observable.of(1) + .expand(function(x) { + if(x === 16) { + return Observable.empty(); + } + return Promise.resolve(x + x); + }) + .subscribe(function(x) { + expect(x).toBe(expected.shift()); + }, null, function(){ + expect(expected.length).toBe(0); + done(); + }); + }); + + it('should recursively flatten Arrays', function(done) { + var expected = [1, 2, 4, 8, 16]; + Observable.of(1) + .expand(function(x) { + if(x === 16) { + return Observable.empty(); + } + return [x + x]; + }) + .subscribe(function(x) { + expect(x).toBe(expected.shift()); + }, null, function(){ + expect(expected.length).toBe(0); + done(); + }); + }); + + it('should recursively flatten lowercase-o observables', function(done) { + var expected = [1, 2, 4, 8, 16]; + + Observable.of(1) + .expand(function(x) { + if(x === 16) { + return Observable.empty(); + } + + var ish = { + subscribe: function(observer){ + observer.next(x + x); + observer.complete(); + } + }; + + ish[Symbol.observable] = function(){ return this; }; + return ish; + }) + .subscribe(function(x) { + expect(x).toBe(expected.shift()); + }, null, function(){ + expect(expected.length).toBe(0); + done(); + }); + }); }); \ No newline at end of file diff --git a/spec/operators/switch-spec.js b/spec/operators/switch-spec.js index 4e38163a3a..52599cd0f1 100644 --- a/spec/operators/switch-spec.js +++ b/spec/operators/switch-spec.js @@ -5,7 +5,7 @@ var Promise = require('promise'); var Observable = Rx.Observable; var immediateScheduler = Rx.Scheduler.immediate; -fdescribe('Observable.prototype.switch()', function(){ +describe('Observable.prototype.switch()', function(){ it("should switch to each immediately-scheduled inner Observable", function (done) { var a = Observable.of(1, 2, 3, immediateScheduler); var b = Observable.of(4, 5, 6, immediateScheduler); diff --git a/src/operators/expand-support.ts b/src/operators/expand-support.ts index c93614e20e..c1db02648e 100644 --- a/src/operators/expand-support.ts +++ b/src/operators/expand-support.ts @@ -9,6 +9,8 @@ import ScalarObservable from '../observables/ScalarObservable'; import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; +import OuterSubscriber from '../OuterSubscriber'; +import subscribeToResult from '../util/subscribeToResult'; export class ExpandOperator implements Operator { constructor(private project: (value: T, index: number) => Observable, @@ -20,11 +22,11 @@ export class ExpandOperator implements Operator { } } -export class ExpandSubscriber extends Subscriber { +export class ExpandSubscriber extends OuterSubscriber { private index: number = 0; private active: number = 0; private hasCompleted: boolean = false; - private buffer: T[]; + private buffer: any[]; constructor(destination: Observer, private project: (value: T, index: number) => Observable, private concurrent: number = Number.POSITIVE_INFINITY) { @@ -34,7 +36,7 @@ export class ExpandSubscriber extends Subscriber { } } - _next(value: T) { + _next(value: any) { const index = this.index++; this.destination.next(value); if(this.active < this.concurrent) { @@ -46,7 +48,7 @@ export class ExpandSubscriber extends Subscriber { this._next(result.value); } else { this.active++; - this.add(result.subscribe(new ExpandInnerSubscriber(this.destination, this))); + this.add(subscribeToResult(this, result, value, index)); } } } else { @@ -72,18 +74,8 @@ export class ExpandSubscriber extends Subscriber { this.destination.complete(); } } -} - -export class ExpandInnerSubscriber extends Subscriber { - constructor(destination: Observer, private parent: ExpandSubscriber) { - super(destination); - } - _next(value) { - this.parent._next(value); - } - - _complete() { - this.parent.notifyComplete(this); + notifyNext(innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) { + this._next(innerValue); } } diff --git a/src/operators/mergeMap-support.ts b/src/operators/mergeMap-support.ts index 785342c88a..f8eac64fa1 100644 --- a/src/operators/mergeMap-support.ts +++ b/src/operators/mergeMap-support.ts @@ -51,7 +51,7 @@ export class MergeMapSubscriber extends OuterSubscriber { } _innerSub(ish: any, value: T, index: number) { - this.add(subscribeToResult(this, ish, value, index)); + this.add(subscribeToResult(this, ish, value, index)); } _complete() { diff --git a/src/operators/mergeMapTo-support.ts b/src/operators/mergeMapTo-support.ts index d9846387c0..9a03c339c1 100644 --- a/src/operators/mergeMapTo-support.ts +++ b/src/operators/mergeMapTo-support.ts @@ -51,7 +51,7 @@ export class MergeMapToSubscriber extends OuterSubscriber { } _innerSub(ish: any, destination: Observer, resultSelector: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, value: T, index: number) { - this.add(subscribeToResult(this, ish, value, index)); + this.add(subscribeToResult(this, ish, value, index)); } _complete() { diff --git a/src/util/subscribeToResult.ts b/src/util/subscribeToResult.ts index 9ed906101a..97a057cb40 100644 --- a/src/util/subscribeToResult.ts +++ b/src/util/subscribeToResult.ts @@ -11,7 +11,7 @@ import OuterSubscriber from '../OuterSubscriber'; const isArray = Array.isArray; -export default function subscribeToResult(outerSubscriber: OuterSubscriber, +export default function subscribeToResult(outerSubscriber: OuterSubscriber, result: any, outerValue?: T, outerIndex?: number): Subscription { let destination: Subscriber = new InnerSubscriber(outerSubscriber, outerValue, outerIndex);