Skip to content

Commit

Permalink
fix(onErrorResumeNext): no longer holds onto subscriptions too long
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Feb 2, 2018
1 parent d487d6b commit abbbdad
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 74 deletions.
57 changes: 33 additions & 24 deletions spec/observables/onErrorResumeNext-spec.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,54 @@
import * as Rx from '../../src/Rx';
import { onErrorResumeNext } from '../../src/create';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;

const Observable = Rx.Observable;

describe('Observable.onErrorResumeNext', () => {
describe('onErrorResumeNext', () => {
it('should continue with observables', () => {
const source = hot('--a--b--#');
const next1 = cold( '--c--d--#');
const next2 = cold( '--e--#');
const next3 = cold( '--f--g--|');
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(Observable.onErrorResumeNext(source, next1, next2, next3)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
const s1 = hot('--a--b--#');
const s2 = cold( '--c--d--#');
const s3 = cold( '--e--#');
const s4 = cold( '--f--g--|');
const subs1 = '^ !';
const subs2 = ' ^ !';
const subs3 = ' ^ !';
const subs4 = ' ^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(onErrorResumeNext(s1, s2, s3, s4)).toBe(expected);
expectSubscriptions(s1.subscriptions).toBe(subs1);
expectSubscriptions(s2.subscriptions).toBe(subs2);
expectSubscriptions(s3.subscriptions).toBe(subs3);
expectSubscriptions(s4.subscriptions).toBe(subs4);
});

it('should continue array of observables', () => {
const source = hot('--a--b--#');
const next = [ source,
cold( '--c--d--#'),
cold( '--e--#'),
cold( '--f--g--|')];
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(Observable.onErrorResumeNext(next)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
const s1 = hot('--a--b--#');
const s2 = cold( '--c--d--#');
const s3 = cold( '--e--#');
const s4 = cold( '--f--g--|');
const subs1 = '^ !';
const subs2 = ' ^ !';
const subs3 = ' ^ !';
const subs4 = ' ^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(onErrorResumeNext([s1, s2, s3, s4])).toBe(expected);
expectSubscriptions(s1.subscriptions).toBe(subs1);
expectSubscriptions(s2.subscriptions).toBe(subs2);
expectSubscriptions(s3.subscriptions).toBe(subs3);
expectSubscriptions(s4.subscriptions).toBe(subs4);
});

it('should complete single observable throws', () => {
const source = hot('#');
const subs = '(^!)';
const expected = '|';

expectObservable(Observable.onErrorResumeNext(source)).toBe(expected);
expectObservable(onErrorResumeNext(source)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
69 changes: 19 additions & 50 deletions src/internal/observable/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import { Observable, ObservableInput } from '../Observable';
import { from } from './from';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { isArray } from '..//util/isArray';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '..//util/subscribeToResult';
import { isArray } from '../util/isArray';
import { EMPTY } from './empty';

/* tslint:disable:max-line-length */
export function onErrorResumeNext<R>(v: ObservableInput<R>): Observable<R>;
Expand Down Expand Up @@ -73,62 +69,35 @@ export function onErrorResumeNext<R>(array: ObservableInput<any>[]): Observable<
* @see {@link concat}
* @see {@link catch}
*
* @param {...ObservableInput} observables Observables passed either directly or as an array.
* @param {...ObservableInput} sources Observables passed either directly or as an array.
* @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes
* to the next passed Observable and so on, until it completes or runs out of Observables.
* @method onErrorResumeNext
* @owner Observable
*/
export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
export function onErrorResumeNext<T, R>(...sources: Array<ObservableInput<any> |
Array<ObservableInput<any>> |
((...values: Array<any>) => R)>): Observable<R> {
let source: ObservableInput<any> = null;

if (nextSources.length === 1 && isArray(nextSources[0])) {
nextSources = <Array<ObservableInput<any>>>nextSources[0];
if (sources.length === 0) {
return EMPTY;
}
source = nextSources.shift();

return from(source, null).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
}

class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
constructor(private nextSources: Array<ObservableInput<any>>) {
}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
}
}

class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
constructor(protected destination: Subscriber<T>,
private nextSources: Array<ObservableInput<any>>) {
super(destination);
}

notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
this.subscribeToNextSource();
}
const [ first, ...remainder ] = sources;

notifyComplete(innerSub: InnerSubscriber<T, any>): void {
this.subscribeToNextSource();
if (sources.length === 1 && isArray(first)) {
return onErrorResumeNext(...first);
}

protected _error(err: any): void {
this.subscribeToNextSource();
}
return new Observable(subscriber => {
const subNext = () => subscriber.add(
onErrorResumeNext(...remainder).subscribe(subscriber)
);

protected _complete(): void {
this.subscribeToNextSource();
}

private subscribeToNextSource(): void {
const next = this.nextSources.shift();
if (next) {
this.add(subscribeToResult(this, next));
} else {
this.destination.complete();
}
}
return from(first).subscribe({
next(value) { subscriber.next(value); },
error: subNext,
complete: subNext,
});
});
}

0 comments on commit abbbdad

Please sign in to comment.