diff --git a/spec/helpers/test-helper.ts b/spec/helpers/test-helper.ts index 3df33ff484..0647f9348a 100644 --- a/spec/helpers/test-helper.ts +++ b/spec/helpers/test-helper.ts @@ -3,7 +3,10 @@ declare const global: any; declare const Symbol: any; import * as Rx from '../../dist/cjs/Rx'; +import {ObservableInput} from '../../dist/cjs/Observable'; import {root} from '../../dist/cjs/util/root'; +import {$$iterator} from '../../dist/cjs/symbol/iterator'; +import $$symbolObservable from 'symbol-observable'; export function lowerCaseO(...args): Rx.Observable { const values = [].slice.apply(arguments); @@ -24,4 +27,19 @@ export function lowerCaseO(...args): Rx.Observable { return o; }; +export const createObservableInputs = (value: T) => Rx.Observable.of>( + Rx.Observable.of(value), + Rx.Observable.of(value, Rx.Scheduler.async), + [value], + Promise.resolve(value), + ({ [$$iterator]: () => { + return { + next: () => { + return value; + } + }; + }}), + ({ [$$symbolObservable]: () => Rx.Observable.of(value) }) +); + global.__root__ = root; diff --git a/spec/operators/catch-spec.ts b/spec/operators/catch-spec.ts index 1c2aeefc65..787a4e0fc1 100644 --- a/spec/operators/catch-spec.ts +++ b/spec/operators/catch-spec.ts @@ -1,5 +1,6 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; +import {createObservableInputs} from '../helpers/test-helper'; declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; declare const rxTestSchdeuler: Rx.TestScheduler; @@ -227,4 +228,18 @@ describe('Observable.prototype.catch', () => { done(); }); }); + + it('should accept selector returns any ObservableInput', (done: MochaDone) => { + const input$ = createObservableInputs(42); + + input$.mergeMap(input => + Observable.throw('bad').catch(err => input) + ).subscribe(x => { + expect(x).to.be.equal(42); + }, (err: any) => { + done(new Error('should not be called')); + }, () => { + done(); + }); + }); }); diff --git a/src/operator/catch.ts b/src/operator/catch.ts index 13d6814545..395d052752 100644 --- a/src/operator/catch.ts +++ b/src/operator/catch.ts @@ -1,6 +1,9 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; -import {Observable} from '../Observable'; +import {Observable, ObservableInput} from '../Observable'; + +import {OuterSubscriber} from '../OuterSubscriber'; +import {subscribeToResult} from '../util/subscribeToResult'; /** * Catches errors on the observable to be handled by returning a new observable or throwing an error. @@ -12,21 +15,21 @@ import {Observable} from '../Observable'; * @method catch * @owner Observable */ -export function _catch(selector: (err: any, caught: Observable) => Observable): Observable { +export function _catch(selector: (err: any, caught: Observable) => ObservableInput): Observable { const operator = new CatchOperator(selector); const caught = this.lift(operator); return (operator.caught = caught); } export interface CatchSignature { - (selector: (err: any, caught: Observable) => Observable): Observable; - (selector: (err: any, caught: Observable) => Observable): Observable; + (selector: (err: any, caught: Observable) => ObservableInput): Observable; + (selector: (err: any, caught: Observable) => ObservableInput): Observable; } class CatchOperator implements Operator { - caught: Observable; + caught: Observable; - constructor(private selector: (err: any, caught: Observable) => Observable) { + constructor(private selector: (err: any, caught: Observable) => ObservableInput) { } call(subscriber: Subscriber, source: any): any { @@ -39,11 +42,10 @@ class CatchOperator implements Operator { * @ignore * @extends {Ignored} */ -class CatchSubscriber extends Subscriber { - +class CatchSubscriber extends OuterSubscriber { constructor(destination: Subscriber, - private selector: (err: any, caught: Observable) => Observable, - private caught: Observable) { + private selector: (err: any, caught: Observable) => ObservableInput, + private caught: Observable) { super(destination); } @@ -60,13 +62,9 @@ class CatchSubscriber extends Subscriber { return; } - this._innerSub(result); + this.unsubscribe(); + (this.destination).remove(this); + subscribeToResult(this, result); } } - - private _innerSub(result: Observable) { - this.unsubscribe(); - (this.destination).remove(this); - result.subscribe(this.destination); - } }