From abbbdadebcacec7bf0ca57da465697719af3e028 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 1 Feb 2018 22:38:07 -0800 Subject: [PATCH 1/2] fix(onErrorResumeNext): no longer holds onto subscriptions too long related #2459 closes #3178 --- spec/observables/onErrorResumeNext-spec.ts | 57 +++++++++------- src/internal/observable/onErrorResumeNext.ts | 69 ++++++-------------- 2 files changed, 52 insertions(+), 74 deletions(-) diff --git a/spec/observables/onErrorResumeNext-spec.ts b/spec/observables/onErrorResumeNext-spec.ts index 54e354b0fe..3f9c8a9e5d 100644 --- a/spec/observables/onErrorResumeNext-spec.ts +++ b/spec/observables/onErrorResumeNext-spec.ts @@ -1,4 +1,4 @@ -import * as Rx from '../../src/Rx'; +import { onErrorResumeNext } from '../../src/create'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports declare const hot: typeof marbleTestingSignature.hot; @@ -6,32 +6,41 @@ declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; -const Observable = Rx.Observable; - -describe('Observable.onErrorResumeNext', () => { +describe('onErrorResumeNext', () => { it('should continue with observables', () => { - const source = hot('--a--b--#'); - const next1 = cold( '--c--d--#'); - const next2 = cold( '--e--#'); - const next3 = cold( '--f--g--|'); - const subs = '^ !'; - const expected = '--a--b----c--d----e----f--g--|'; - - expectObservable(Observable.onErrorResumeNext(source, next1, next2, next3)).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(subs); + const s1 = hot('--a--b--#'); + const s2 = cold( '--c--d--#'); + const s3 = cold( '--e--#'); + const s4 = cold( '--f--g--|'); + const subs1 = '^ !'; + const subs2 = ' ^ !'; + const subs3 = ' ^ !'; + const subs4 = ' ^ !'; + const expected = '--a--b----c--d----e----f--g--|'; + + expectObservable(onErrorResumeNext(s1, s2, s3, s4)).toBe(expected); + expectSubscriptions(s1.subscriptions).toBe(subs1); + expectSubscriptions(s2.subscriptions).toBe(subs2); + expectSubscriptions(s3.subscriptions).toBe(subs3); + expectSubscriptions(s4.subscriptions).toBe(subs4); }); it('should continue array of observables', () => { - const source = hot('--a--b--#'); - const next = [ source, - cold( '--c--d--#'), - cold( '--e--#'), - cold( '--f--g--|')]; - const subs = '^ !'; - const expected = '--a--b----c--d----e----f--g--|'; - - expectObservable(Observable.onErrorResumeNext(next)).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(subs); + const s1 = hot('--a--b--#'); + const s2 = cold( '--c--d--#'); + const s3 = cold( '--e--#'); + const s4 = cold( '--f--g--|'); + const subs1 = '^ !'; + const subs2 = ' ^ !'; + const subs3 = ' ^ !'; + const subs4 = ' ^ !'; + const expected = '--a--b----c--d----e----f--g--|'; + + expectObservable(onErrorResumeNext([s1, s2, s3, s4])).toBe(expected); + expectSubscriptions(s1.subscriptions).toBe(subs1); + expectSubscriptions(s2.subscriptions).toBe(subs2); + expectSubscriptions(s3.subscriptions).toBe(subs3); + expectSubscriptions(s4.subscriptions).toBe(subs4); }); it('should complete single observable throws', () => { @@ -39,7 +48,7 @@ describe('Observable.onErrorResumeNext', () => { const subs = '(^!)'; const expected = '|'; - expectObservable(Observable.onErrorResumeNext(source)).toBe(expected); + expectObservable(onErrorResumeNext(source)).toBe(expected); expectSubscriptions(source.subscriptions).toBe(subs); }); }); diff --git a/src/internal/observable/onErrorResumeNext.ts b/src/internal/observable/onErrorResumeNext.ts index d1c89e65b9..edd468cd82 100644 --- a/src/internal/observable/onErrorResumeNext.ts +++ b/src/internal/observable/onErrorResumeNext.ts @@ -1,11 +1,7 @@ import { Observable, ObservableInput } from '../Observable'; import { from } from './from'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { isArray } from '..//util/isArray'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '..//util/subscribeToResult'; +import { isArray } from '../util/isArray'; +import { EMPTY } from './empty'; /* tslint:disable:max-line-length */ export function onErrorResumeNext(v: ObservableInput): Observable; @@ -73,62 +69,35 @@ export function onErrorResumeNext(array: ObservableInput[]): Observable< * @see {@link concat} * @see {@link catch} * - * @param {...ObservableInput} observables Observables passed either directly or as an array. + * @param {...ObservableInput} sources Observables passed either directly or as an array. * @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes * to the next passed Observable and so on, until it completes or runs out of Observables. * @method onErrorResumeNext * @owner Observable */ -export function onErrorResumeNext(...nextSources: Array | +export function onErrorResumeNext(...sources: Array | Array> | ((...values: Array) => R)>): Observable { - let source: ObservableInput = null; - if (nextSources.length === 1 && isArray(nextSources[0])) { - nextSources = >>nextSources[0]; + if (sources.length === 0) { + return EMPTY; } - source = nextSources.shift(); - return from(source, null).lift(new OnErrorResumeNextOperator(nextSources)); -} - -class OnErrorResumeNextOperator implements Operator { - constructor(private nextSources: Array>) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources)); - } -} - -class OnErrorResumeNextSubscriber extends OuterSubscriber { - constructor(protected destination: Subscriber, - private nextSources: Array>) { - super(destination); - } - - notifyError(error: any, innerSub: InnerSubscriber): void { - this.subscribeToNextSource(); - } + const [ first, ...remainder ] = sources; - notifyComplete(innerSub: InnerSubscriber): void { - this.subscribeToNextSource(); + if (sources.length === 1 && isArray(first)) { + return onErrorResumeNext(...first); } - protected _error(err: any): void { - this.subscribeToNextSource(); - } + return new Observable(subscriber => { + const subNext = () => subscriber.add( + onErrorResumeNext(...remainder).subscribe(subscriber) + ); - protected _complete(): void { - this.subscribeToNextSource(); - } - - private subscribeToNextSource(): void { - const next = this.nextSources.shift(); - if (next) { - this.add(subscribeToResult(this, next)); - } else { - this.destination.complete(); - } - } + return from(first).subscribe({ + next(value) { subscriber.next(value); }, + error: subNext, + complete: subNext, + }); + }); } From b781e278ae938f7eb3988769832941c0e4d6f087 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 1 Feb 2018 22:55:18 -0800 Subject: [PATCH 2/2] docs(onErrorResumeNext): update docs for onErrorResumeNext --- src/internal/observable/onErrorResumeNext.ts | 65 +++++++++----------- 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/src/internal/observable/onErrorResumeNext.ts b/src/internal/observable/onErrorResumeNext.ts index edd468cd82..7fddb26547 100644 --- a/src/internal/observable/onErrorResumeNext.ts +++ b/src/internal/observable/onErrorResumeNext.ts @@ -22,40 +22,37 @@ export function onErrorResumeNext(array: ObservableInput[]): Observable< * * * - * `onErrorResumeNext` is an operator that accepts a series of Observables, provided either directly as - * arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same - * as the source. + * `onErrorResumeNext` Will subscribe to each observable source it is provided, in order. + * If the source it's subscribed to emits an error or completes, it will move to the next source + * without error. * - * `onErrorResumeNext` returns an Observable that starts by subscribing and re-emitting values from the source Observable. - * When its stream of values ends - no matter if Observable completed or emitted an error - `onErrorResumeNext` - * will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting - * its values as well and - again - when that stream ends, `onErrorResumeNext` will proceed to subscribing yet another - * Observable in provided series, no matter if previous Observable completed or ended with an error. This will - * be happening until there is no more Observables left in the series, at which point returned Observable will - * complete - even if the last subscribed stream ended with an error. + * If `onErrorResumeNext` is provided no arguments, or a single, empty array, it will return {@link EMPTY}. * - * `onErrorResumeNext` can be therefore thought of as version of {@link concat} operator, which is more permissive - * when it comes to the errors emitted by its input Observables. While `concat` subscribes to the next Observable - * in series only if previous one successfully completed, `onErrorResumeNext` subscribes even if it ended with - * an error. - * - * Note that you do not get any access to errors emitted by the Observables. In particular do not - * expect these errors to appear in error callback passed to {@link subscribe}. If you want to take - * specific actions based on what error was emitted by an Observable, you should try out {@link catch} instead. + * `onErrorResumeNext` is basically {@link concat}, only it will continue, even if one of its + * sources emits an error. * + * Note that there is no way to handle any errors thrown by sources via the resuult of + * `onErrorResumeNext`. If you want to handle errors thrown in any given source, you can + * always use the {@link catchError} operator on them before passing them into `onErrorResumeNext`. * * @example Subscribe to the next Observable after map fails - * Rx.Observable.of(1, 2, 3, 0) - * .map(x => { - * if (x === 0) { throw Error(); } - return 10 / x; - * }) - * .onErrorResumeNext(Rx.Observable.of(1, 2, 3)) - * .subscribe( - * val => console.log(val), - * err => console.log(err), // Will never be called. - * () => console.log('that\'s it!') - * ); + * import { onErrorResumeNext, of } from 'rxjs/create'; + * import { map } from 'rxjs/operators'; + * + * onErrorResumeNext( + * of(1, 2, 3, 0).pipe( + * map(x => { + * if (x === 0) throw Error(); + * return 10 / x; + * }) + * ), + * of(1, 2, 3), + * ) + * .subscribe( + * val => console.log(val), + * err => console.log(err), // Will never be called. + * () => console.log('done') + * ); * * // Logs: * // 10 @@ -64,16 +61,14 @@ export function onErrorResumeNext(array: ObservableInput[]): Observable< * // 1 * // 2 * // 3 - * // "that's it!" + * // "done" * * @see {@link concat} * @see {@link catch} * - * @param {...ObservableInput} sources Observables passed either directly or as an array. - * @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes - * to the next passed Observable and so on, until it completes or runs out of Observables. - * @method onErrorResumeNext - * @owner Observable + * @param {...ObservableInput} sources Observables (or anything that *is* observable) passed either directly or as an array. + * @return {Observable} An Observable that concatenates all sources, one after the other, + * ignoring all errors, such that any error causes it to move on to the next source. */ export function onErrorResumeNext(...sources: Array | Array> |