Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 166 additions & 2 deletions spec/Subject-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { Subject, ObjectUnsubscribedError, Observable, AsyncSubject, Observer, of, config } from 'rxjs';
import { Subject, ObjectUnsubscribedError, Observable, AsyncSubject, Observer, of, config, throwError, concat } from 'rxjs';
import { AnonymousSubject } from 'rxjs/internal/Subject';
import { delay } from 'rxjs/operators';
import { catchError, delay, map, mergeMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';

Expand Down Expand Up @@ -759,3 +759,167 @@ describe('AnonymousSubject', () => {
expect(subscribed).to.be.true;
});
});

describe('useDeprecatedSynchronousErrorHandling', () => {
beforeEach(() => {
config.useDeprecatedSynchronousErrorHandling = true;
});

afterEach(() => {
config.useDeprecatedSynchronousErrorHandling = false;
});

it('should throw an error when nexting with a flattened, erroring inner observable', () => {
const subject = new Subject<string>();
subject.pipe(mergeMap(() => throwError(() => new Error('bad')))).subscribe();

expect(() => {
subject.next('wee');
}).to.throw(Error, 'bad');
});

it('should throw an error when nexting with a flattened, erroring inner observable with more than one operator', () => {
const subject = new Subject<string>();
subject.pipe(mergeMap(() => throwError(() => new Error('bad'))), map(x => x)).subscribe();

expect(() => {
subject.next('wee');
}).to.throw(Error, 'bad');
});

it('should throw an error when notifying an error with catchError returning an erroring inner observable', () => {
const subject = new Subject<string>();
subject.pipe(catchError(() => throwError(() => new Error('bad')))).subscribe();

expect(() => {
subject.error('wee');
}).to.throw(Error, 'bad');
});

it('should throw an error when nexting with an operator that errors synchronously', () => {
const subject = new Subject<string>();
subject.pipe(mergeMap(() => {
throw new Error('lol');
})).subscribe();

expect(() => {
subject.next('wee');
}).to.throw(Error, 'lol');
});


it('should throw an error when notifying an error with a catchError that errors synchronously', () => {
const subject = new Subject<string>();
subject.pipe(catchError(() => {
throw new Error('lol');
})).subscribe();

expect(() => {
subject.error('wee');
}).to.throw(Error, 'lol');
});

it('should throw an error when nexting with an erroring next handler', () => {
const subject = new Subject<string>();
subject.subscribe(() => {
throw new Error('lol');
});

expect(() => {
subject.next('wee');
}).to.throw(Error, 'lol');
});

it('should throw an error when notifying with an erroring error handler', () => {
const subject = new Subject<string>();
subject.subscribe({
error: () => {
throw new Error('lol');
}
});

expect(() => {
subject.error('wee');
}).to.throw(Error, 'lol');
});

it('should throw an error when notifying with an erroring complete handler', () => {
const subject = new Subject<string>();
subject.subscribe({
complete: () => {
throw new Error('lol');
}
});

expect(() => {
subject.complete();
}).to.throw(Error, 'lol');
});

// TODO: This is still an issue. Not sure how to handle this one yet.
it.skip('should throw an error when notifying an complete, and concatenated with another observable that synchronously errors', () => {
const subject = new Subject<string>();
concat(subject, throwError(new Error('lol'))).subscribe();

expect(() => {
subject.complete();
}).to.throw(Error, 'lol');
});

it('should not throw on second error passed', () => {
const subject = new Subject();

subject.subscribe();

expect(() => {
subject.error(new Error('one'));
}).to.throw(Error, 'one');

expect(() => {
subject.error(new Error('two'));
}).not.to.throw(Error, 'two');
});

it('should not throw on second error passed, even after having been operated on', () => {
const subject = new Subject();

subject.pipe(mergeMap(x => [x])).subscribe();

expect(() => {
subject.error(new Error('one'));
}).to.throw(Error, 'one');

expect(() => {
subject.error('two');
}).not.to.throw();
});

it('deep rethrowing 1', () => {
const subject1 = new Subject();
const subject2 = new Subject();

subject2.subscribe();

subject1.subscribe({
next: () => subject2.error(new Error('hahaha'))
});

expect(() => {
subject1.next('test');
}).to.throw(Error, 'hahaha');
});

it('deep rethrowing 2', () => {
const subject1 = new Subject();

subject1.subscribe({
next: () => {
throwError(new Error('hahaha')).subscribe();
}
});

expect(() => {
subject1.next('test');
}).to.throw(Error, 'hahaha');
});
});
51 changes: 4 additions & 47 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';
import { isFunction } from './util/isFunction';
import { errorContext } from './util/errorContext';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
Expand Down Expand Up @@ -216,9 +217,7 @@ export class Observable<T> implements Subscribable<T> {
): Subscription {
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);

if (config.useDeprecatedSynchronousErrorHandling) {
this._deprecatedSyncErrorSubscribe(subscriber);
} else {
errorContext(() => {
const { operator, source } = this;
subscriber.add(
operator
Expand All @@ -234,51 +233,9 @@ export class Observable<T> implements Subscribable<T> {
// function, so we need to catch errors and handle them appropriately.
this._trySubscribe(subscriber)
);
}
return subscriber;
}

/**
* REMOVE THIS ENTIRE METHOD IN VERSION 8.
*/
private _deprecatedSyncErrorSubscribe(subscriber: Subscriber<unknown>) {
const localSubscriber: any = subscriber;
localSubscriber._syncErrorHack_isSubscribing = true;
const { operator } = this;
if (operator) {
// We don't need to try/catch on operators, as they
// are doing their own try/catching, and will
// properly decorate the subscriber with `__syncError`.
subscriber.add(operator.call(subscriber, this.source));
} else {
try {
subscriber.add(this._subscribe(subscriber));
} catch (err) {
localSubscriber.__syncError = err;
}
}
});

// In the case of the deprecated sync error handling,
// we need to crawl forward through our subscriber chain and
// look to see if there's any synchronously thrown errors.
// Does this suck for perf? Yes. So stop using the deprecated sync
// error handling already. We're removing this in v8.
let dest = localSubscriber;
while (dest) {
// Technically, someone could throw something falsy, like 0, or "",
// so we need to check to see if anything was thrown, and we know
// that by the mere existence of `__syncError`.
if ('__syncError' in dest) {
try {
throw dest.__syncError;
} finally {
subscriber.unsubscribe();
}
}
dest = dest.destination;
}

localSubscriber._syncErrorHack_isSubscribing = false;
return subscriber;
}

/** @internal */
Expand Down
49 changes: 28 additions & 21 deletions src/internal/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Subscription, EMPTY_SUBSCRIPTION } from './Subscription';
import { Observer, SubscriptionLike, TeardownLogic } from './types';
import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
import { arrRemove } from './util/arrRemove';
import { errorContext } from './util/errorContext';

/**
* A Subject is a special type of Observable that allows values to be
Expand Down Expand Up @@ -54,36 +55,42 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {
}

next(value: T) {
this._throwIfClosed();
if (!this.isStopped) {
const copy = this.observers.slice();
for (const observer of copy) {
observer.next(value);
errorContext(() => {
this._throwIfClosed();
if (!this.isStopped) {
const copy = this.observers.slice();
for (const observer of copy) {
observer.next(value);
}
}
}
});
}

error(err: any) {
this._throwIfClosed();
if (!this.isStopped) {
this.hasError = this.isStopped = true;
this.thrownError = err;
const { observers } = this;
while (observers.length) {
observers.shift()!.error(err);
errorContext(() => {
this._throwIfClosed();
if (!this.isStopped) {
this.hasError = this.isStopped = true;
this.thrownError = err;
const { observers } = this;
while (observers.length) {
observers.shift()!.error(err);
}
}
}
});
}

complete() {
this._throwIfClosed();
if (!this.isStopped) {
this.isStopped = true;
const { observers } = this;
while (observers.length) {
observers.shift()!.complete();
errorContext(() => {
this._throwIfClosed();
if (!this.isStopped) {
this.isStopped = true;
const { observers } = this;
while (observers.length) {
observers.shift()!.complete();
}
}
}
});
}

unsubscribe() {
Expand Down
12 changes: 2 additions & 10 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { reportUnhandledError } from './util/reportUnhandledError';
import { noop } from './util/noop';
import { nextNotification, errorNotification, COMPLETE_NOTIFICATION } from './NotificationFactories';
import { timeoutProvider } from './scheduler/timeoutProvider';
import { captureError } from './util/errorContext';

/**
* Implements the {@link Observer} interface and extends the
Expand Down Expand Up @@ -193,16 +194,7 @@ function wrapForErrorHandling(handler: (arg?: any) => void, instance: SafeSubscr
handler(...args);
} catch (err) {
if (config.useDeprecatedSynchronousErrorHandling) {
// If the user has opted for "super-gross" mode, we need to check to see
// if we're currently subscribing. If we are, we need to mark the _syncError
// So that it can be rethrown in the `subscribe` call on `Observable`.
if ((instance as any)._syncErrorHack_isSubscribing) {
(instance as any).__syncError = err;
} else {
// We're not currently subscribing, but we're in super-gross mode,
// so throw it immediately.
throw err;
}
captureError(err);
} else {
// Ideal path, we report this as an unhandled error,
// which is thrown on a new call stack.
Expand Down
42 changes: 42 additions & 0 deletions src/internal/util/errorContext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { config } from '../config';

let context: { errorThrown: boolean; error: any } | null = null;

/**
* Handles dealing with errors for super-gross mode. Creates a context, in which
* any synchronously thrown errors will be passed to {@link captureError}. Which
* will record the error such that it will be rethrown after the call back is complete.
* TODO: Remove in v8
* @param cb An immediately executed function.
*/
export function errorContext(cb: () => void) {
if (config.useDeprecatedSynchronousErrorHandling) {
const isRoot = !context;
if (isRoot) {
context = { errorThrown: false, error: null };
}
cb();
if (isRoot) {
const { errorThrown, error } = context!;
context = null;
if (errorThrown) {
throw error;
}
}
} else {
// This is the general non-deprecated path for everyone that
// isn't crazy enough to use super-gross mode (useDeprecatedSynchronousErrorHandling)
cb();
}
}

/**
* Captures errors only in super-gross mode.
* @param err the error to capture
*/
export function captureError(err: any) {
if (config.useDeprecatedSynchronousErrorHandling && context) {
context.errorThrown = true;
context.error = err;
}
}