Skip to content

Commit

Permalink
fix(subscribe): report errors that occur in subscribe after the initi…
Browse files Browse the repository at this point in the history
…al error (#4089)

* chore(reportError): implement reportError

* fix(subscribe): don't swallow internal errors

Closes #3803

* chore(reportError): remove console.log from test

* chore(reportError): test destination is Subscriber

* test(reportError): use closed observer

* chore(reportError): test subscriber symbol too

* chore(reportError): fix logic

* chore(reportError): rename to consoleWarn

* test(reportError): stub the console

* chore(reportError): fix whitespace

* refactor(reportError): use canReportError instead

* refactor(canReport): remove recursion

* test(subscribe): test internal error reporting

* test(bindCallback): test error reporting

* test(bindNodeCallback): test error reporting

* chore(canReport): fix JSDoc

* chore(test): fix test description

* test(canReport): use noop error handlers

* chore(canReport): use isTrustedSubscriber

* chore(canReport): restrict observer type

* chore(canReport): fix tests

* chore(canReport): use console.warn directly
  • Loading branch information
cartant authored and benlesh committed Sep 12, 2018
1 parent b72f0fe commit 9b4b2bc
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 5 deletions.
17 changes: 16 additions & 1 deletion spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -901,5 +901,20 @@ describe('Observable.lift', () => {
]);
done();
});
});
});

it('should not swallow internal errors', () => {
const consoleStub = sinon.stub(console, 'warn');
try {
let source = new Observable<number>(observer => observer.next(42));
for (let i = 0; i < 10000; ++i) {
let base = source;
source = new Observable<number>(observer => base.subscribe(observer));
}
source.subscribe();
expect(consoleStub).to.have.property('called', true);
} finally {
consoleStub.restore();
}
});
});
14 changes: 14 additions & 0 deletions spec/observables/bindCallback-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,18 @@ describe('bindCallback', () => {
expect(calls).to.equal(0);
});
});

it('should not swallow post-callback errors', () => {
function badFunction(callback: (answer: number) => void): void {
callback(42);
throw new Error('kaboom');
}
const consoleStub = sinon.stub(console, 'warn');
try {
bindCallback(badFunction)().subscribe();
expect(consoleStub).to.have.property('called', true);
} finally {
consoleStub.restore();
}
});
});
14 changes: 14 additions & 0 deletions spec/observables/bindNodeCallback-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,18 @@ describe('bindNodeCallback', () => {
expect(results1).to.deep.equal([42, 'done']);
expect(results2).to.deep.equal([42, 'done']);
});

it('should not swallow post-callback errors', () => {
function badFunction(callback: (error: Error, answer: number) => void): void {
callback(null, 42);
throw new Error('kaboom');
}
const consoleStub = sinon.stub(console, 'warn');
try {
bindNodeCallback(badFunction)().subscribe();
expect(consoleStub).to.have.property('called', true);
} finally {
consoleStub.restore();
}
});
});
29 changes: 29 additions & 0 deletions spec/util/canReportError-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { expect } from 'chai';
import { noop, Subject, Subscriber } from 'rxjs';
import { canReportError } from 'rxjs/internal/util/canReportError';

describe('canReportError', () => {
it('should report errors to an observer if possible', () => {
const subscriber = new Subscriber<{}>(noop, noop);
expect(canReportError(subscriber)).to.be.true;
});

it('should not report errors to a stopped observer', () => {
const subscriber = new Subscriber<{}>(noop, noop);
subscriber.error(new Error('kaboom'));
expect(canReportError(subscriber)).to.be.false;
});

it('should not report errors to a closed subject', () => {
const subject = new Subject<{}>();
subject.unsubscribe();
expect(canReportError(subject)).to.be.false;
});

it('should not report errors an observer with a stopped destination', () => {
const destination = new Subscriber<{}>(noop, noop);
const subscriber = new Subscriber<{}>(destination);
destination.error(new Error('kaboom'));
expect(canReportError(subscriber)).to.be.false;
});
});
7 changes: 6 additions & 1 deletion src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Operator } from './Operator';
import { Subscriber } from './Subscriber';
import { Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable } from './types';
import { canReportError } from './util/canReportError';
import { toSubscriber } from './util/toSubscriber';
import { iif } from './observable/iif';
import { throwError } from './observable/throwError';
Expand Down Expand Up @@ -226,7 +227,11 @@ export class Observable<T> implements Subscribable<T> {
sink.syncErrorThrown = true;
sink.syncErrorValue = err;
}
sink.error(err);
if (canReportError(sink)) {
sink.error(err);
} else {
console.warn(err);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,6 @@ export class SafeSubscriber<T> extends Subscriber<T> {
}
}

function isTrustedSubscriber(obj: any) {
export function isTrustedSubscriber(obj: any) {
return obj instanceof Subscriber || ('_addParentTeardownLogic' in obj && obj[rxSubscriberSymbol]);
}
7 changes: 6 additions & 1 deletion src/internal/observable/bindCallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Observable } from '../Observable';
import { AsyncSubject } from '../AsyncSubject';
import { Subscriber } from '../Subscriber';
import { map } from '../operators/map';
import { canReportError } from '../util/canReportError';
import { isArray } from '../util/isArray';
import { isScheduler } from '../util/isScheduler';

Expand Down Expand Up @@ -204,7 +205,11 @@ export function bindCallback<T>(
try {
callbackFunc.apply(context, [...args, handler]);
} catch (err) {
subject.error(err);
if (canReportError(subject)) {
subject.error(err);
} else {
console.warn(err);
}
}
}
return subject.subscribe(subscriber);
Expand Down
7 changes: 6 additions & 1 deletion src/internal/observable/bindNodeCallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AsyncSubject } from '../AsyncSubject';
import { Subscriber } from '../Subscriber';
import { SchedulerAction, SchedulerLike } from '../types';
import { map } from '../operators/map';
import { canReportError } from '../util/canReportError';
import { isScheduler } from '../util/isScheduler';
import { isArray } from '../util/isArray';

Expand Down Expand Up @@ -198,7 +199,11 @@ export function bindNodeCallback<T>(
try {
callbackFunc.apply(context, [...args, handler]);
} catch (err) {
subject.error(err);
if (canReportError(subject)) {
subject.error(err);
} else {
console.warn(err);
}
}
}
return subject.subscribe(subscriber);
Expand Down
22 changes: 22 additions & 0 deletions src/internal/util/canReportError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { isTrustedSubscriber, Subscriber } from '../Subscriber';
import { Subject } from '../Subject';

/**
* Determines whether the ErrorObserver is closed or stopped or has a
* destination that is closed or stopped - in which case errors will
* need to be reported via a different mechanism.
* @param observer the observer
*/
export function canReportError(observer: Subscriber<any> | Subject<any>): boolean {
while (observer) {
const { closed, destination, isStopped } = observer as any;
if (closed || isStopped) {
return false;
} else if (destination && isTrustedSubscriber(destination)) {
observer = destination;
} else {
observer = null;
}
}
return true;
}

0 comments on commit 9b4b2bc

Please sign in to comment.