Skip to content

Commit

Permalink
fix: unhandled errors in observers correctly scheduled (#6118)
Browse files Browse the repository at this point in the history
* fix: unhandled errors in observers correctly scheduled

- Resolves an issue where errors thrown in a handler provided as a plain function, or as part of a POJO observer, to a `subscribe` call were not being handled appropriately. They will no longer synchronously throw unless `useDeprecatedSynchronousErrorHandling` is configured to `true`.
- Removes two tests that were poorly written, and were not passing (and should never have been passing). The ground they covered is covered adequately by other tests.

* chore: update comments in the code
  • Loading branch information
benlesh authored Mar 13, 2021
1 parent 3ab3f6f commit c02ceb7
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 84 deletions.
31 changes: 29 additions & 2 deletions spec/Subject-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { Subject, ObjectUnsubscribedError, Observable, AsyncSubject, Observer, of } from 'rxjs';
import { Subject, ObjectUnsubscribedError, Observable, AsyncSubject, Observer, of, config } from 'rxjs';
import { AnonymousSubject } from 'rxjs/internal/Subject';
import { delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
Expand Down Expand Up @@ -680,14 +680,41 @@ describe('Subject', () => {
expect(results).to.deep.equal([42, 'done']);
});
});

describe('error thrown scenario', () => {
afterEach(() => {
config.onUnhandledError = null;
});

it('should not synchronously error when nexted into', (done) => {
config.onUnhandledError = (err) => {
expect(err.message).to.equal('Boom!');
done();
};

const source = new Subject<number>();
source.subscribe();
source.subscribe(() => {
throw new Error('Boom!');
});
source.subscribe();
try {
source.next(42);
} catch (err) {
// This should not happen!
expect(true).to.be.false;
}
expect(true).to.be.true;
});
});
});

describe('AnonymousSubject', () => {
it('should be exposed', () => {
expect(AnonymousSubject).to.be.a('function');
});

it('should not eager', () => {
it('should not be eager', () => {
let subscribed = false;

const subject = Subject.create(
Expand Down
56 changes: 0 additions & 56 deletions spec/operators/throttle-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,62 +381,6 @@ describe('throttle', () => {
});
});

it('should throttle by promise resolves', () => {
testScheduler.run(() => {
const e1 = concat(of(1), timer(10).pipe(mapTo(2)), timer(10).pipe(mapTo(3)), timer(50).pipe(mapTo(4)));
const expected = [1, 2, 3, 4];

e1.pipe(
throttle(() => {
return new Promise((resolve: any) => {
resolve(42);
});
})
).subscribe(
(x: number) => {
expect(x).to.equal(expected.shift());
},
() => {
throw new Error('should not be called');
},
() => {
expect(expected.length).to.equal(0);
}
);
});
});

it('should raise error when promise rejects', () => {
const e1 = concat(of(1), timer(10).pipe(mapTo(2)), timer(10).pipe(mapTo(3)), timer(50).pipe(mapTo(4)));
const expected = [1, 2, 3];
const error = new Error('error');

e1.pipe(
throttle((x: number) => {
if (x === 3) {
return new Promise((resolve: any, reject: any) => {
reject(error);
});
} else {
return new Promise((resolve: any) => {
resolve(42);
});
}
})
).subscribe(
(x: number) => {
expect(x).to.equal(expected.shift());
},
(err: any) => {
expect(err).to.be.an('error', 'error');
expect(expected.length).to.equal(0);
},
() => {
throw new Error('should not be called');
}
);
});

describe('throttle(fn, { leading: true, trailing: true })', () => {
it('should immediately emit the first value in each time window', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
Expand Down
8 changes: 7 additions & 1 deletion src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ export class Observable<T> implements Subscribable<T> {
// otherwise, it may be from a user-made observable instance, and we want to
// wrap it in a try/catch so we can handle errors appropriately.
const { operator, source } = this;

let dest: any = subscriber;
if (config.useDeprecatedSynchronousErrorHandling) {
dest._syncErrorHack_isSubscribing = true;
}

subscriber.add(
operator
? operator.call(subscriber, source)
Expand All @@ -225,12 +231,12 @@ export class Observable<T> implements Subscribable<T> {
);

if (config.useDeprecatedSynchronousErrorHandling) {
dest._syncErrorHack_isSubscribing = false;
// In the case of the deprecated sync error handling,
// we need to crawl forward through our subscriber chain and
// look to see if there's any synchronously thrown errors.
// Does this suck for perf? Yes. So stop using the deprecated sync
// error handling already. We're removing this in v8.
let dest: any = subscriber;
while (dest) {
if (dest.__syncError) {
throw dest.__syncError;
Expand Down
51 changes: 26 additions & 25 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,38 +167,43 @@ export class SafeSubscriber<T> extends Subscriber<T> {
// Once we set the destination, the superclass `Subscriber` will
// do it's magic in the `_next`, `_error`, and `_complete` methods.
this.destination = {
next: next ? maybeWrapForDeprecatedSyncErrorHandling(next, this) : noop,
error: maybeWrapForDeprecatedSyncErrorHandling(error ? error : defaultErrorHandler, this),
complete: complete ? maybeWrapForDeprecatedSyncErrorHandling(complete, this) : noop,
next: next ? wrapForErrorHandling(next, this) : noop,
error: wrapForErrorHandling(error ?? defaultErrorHandler, this),
complete: complete ? wrapForErrorHandling(complete, this) : noop,
};
}
}

/**
* Checks to see if the user has chosen to use the super gross deprecated error handling that
* no one should ever use, ever. If they did choose that path, we need to catch their error
* so we can stick it on a super-secret property and check it after the subscription is done
* in the `Observable` subscribe call.
*
* We have to do this, because if we simply rethrow the error, it will be caught by any upstream
* try/catch blocks and send back down again, basically playing ping-pong with the error until the
* downstream runs out of chances to rethrow and it gives up.
*
* In the general case, for non-crazy people, this just returns the handler directly.
* Wraps a user-provided handler (or our {@link defaultErrorHandler} in one case) to
* ensure that any thrown errors are caught and handled appropriately.
*
* @param handler The handler to wrap
* @param instance The SafeSubscriber instance we're going to mark if there's an error.
*/
function maybeWrapForDeprecatedSyncErrorHandling(handler: (arg?: any) => void, instance: SafeSubscriber<any>) {
return config.useDeprecatedSynchronousErrorHandling
? (arg?: any) => {
try {
handler(arg);
} catch (err) {
function wrapForErrorHandling(handler: (arg?: any) => void, instance: SafeSubscriber<any>) {
return (...args: any[]) => {
try {
handler(...args);
} catch (err) {
if (config.useDeprecatedSynchronousErrorHandling) {
// If the user has opted for "super-gross" mode, we need to check to see
// if we're currently subscribing. If we are, we need to mark the _syncError
// So that it can be rethrown in the `subscribe` call on `Observable`.
if ((instance as any)._syncErrorHack_isSubscribing) {
(instance as any).__syncError = err;
} else {
// We're not currently subscribing, but we're in super-gross mode,
// so throw it immediately.
throw err;
}
} else {
// Ideal path, we report this as an unhandled error,
// which is thrown on a new call stack.
reportUnhandledError(err);
}
: handler;
}
};
}
/**
* An error handler used when no error handler was supplied
Expand All @@ -207,11 +212,7 @@ function maybeWrapForDeprecatedSyncErrorHandling(handler: (arg?: any) => void, i
* @param err The error to handle
*/
function defaultErrorHandler(err: any) {
// TODO: Remove in v8.
if (config.useDeprecatedSynchronousErrorHandling) {
throw err;
}
reportUnhandledError(err);
throw err;
}

/**
Expand Down

0 comments on commit c02ceb7

Please sign in to comment.