diff --git a/spec/Subject-spec.js b/spec/Subject-spec.js index 777a28f9876..dff48247e68 100644 --- a/spec/Subject-spec.js +++ b/spec/Subject-spec.js @@ -129,9 +129,6 @@ describe('Subject', function () { subscription1.unsubscribe(); subject.complete(); - subject.next(9); - subject.complete(); - subject.error(new Error('err')); subscription2.unsubscribe(); @@ -179,9 +176,6 @@ describe('Subject', function () { subscription1.unsubscribe(); - subject.error(new Error('err')); - subject.next(9); - subject.complete(); subject.error(new Error('err')); subscription2.unsubscribe(); @@ -221,9 +215,6 @@ describe('Subject', function () { subscription1.unsubscribe(); subject.complete(); - subject.next(9); - subject.complete(); - subject.error(new Error('err')); subscription2.unsubscribe(); @@ -509,6 +500,57 @@ describe('Subject', function () { source.subscribe(subject); }); + it('should throw ObjectUnsubscribedError when emit after unsubscribed', function () { + var subject = new Rx.Subject(); + subject.unsubscribe(); + + expect(function () { + subject.next('a'); + }).toThrow(new Rx.ObjectUnsubscribedError()); + + expect(function () { + subject.error('a'); + }).toThrow(new Rx.ObjectUnsubscribedError()); + + expect(function () { + subject.complete(); + }).toThrow(new Rx.ObjectUnsubscribedError()); + }); + + it('should throw ObjectUnsubscribedError when emit after completed', function () { + var subject = new Rx.Subject(); + subject.complete(); + + expect(function () { + subject.next('a'); + }).toThrow(new Rx.ObjectUnsubscribedError()); + + expect(function () { + subject.error('a'); + }).toThrow(new Rx.ObjectUnsubscribedError()); + + expect(function () { + subject.complete(); + }).toThrow(new Rx.ObjectUnsubscribedError()); + }); + + it('should throw ObjectUnsubscribedError when emit after error', function () { + var subject = new Rx.Subject(); + subject.error('e'); + + expect(function () { + subject.next('a'); + }).toThrow(new Rx.ObjectUnsubscribedError()); + + expect(function () { + subject.error('a'); + }).toThrow(new Rx.ObjectUnsubscribedError()); + + expect(function () { + subject.complete(); + }).toThrow(new Rx.ObjectUnsubscribedError()); + }); + describe('asObservable', function () { it('should hide subject', function () { var subject = new Rx.Subject(); diff --git a/spec/subjects/AsyncSubject-spec.js b/spec/subjects/AsyncSubject-spec.js index e676114fcab..3c8c65fb4e1 100644 --- a/spec/subjects/AsyncSubject-spec.js +++ b/spec/subjects/AsyncSubject-spec.js @@ -70,8 +70,6 @@ describe('AsyncSubject', function () { expect(observer.results).toEqual([]); subject.complete(); expect(observer.results).toEqual([2, 'done']); - subject.next(3); - expect(observer.results).toEqual([2, 'done']); }); it('should not emit values if unsubscribed before complete', function () { diff --git a/src/Subject.ts b/src/Subject.ts index 315dc6817fa..da8b502cb32 100644 --- a/src/Subject.ts +++ b/src/Subject.ts @@ -6,6 +6,9 @@ import {Subscription} from './Subscription'; import {SubjectSubscription} from './subject/SubjectSubscription'; import {rxSubscriber} from './symbol/rxSubscriber'; +import {throwError} from './util/throwError'; +import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError'; + export class Subject extends Observable implements Observer, Subscription { static create: Function = (destination: Observer, source: Observable): Subject => { @@ -53,10 +56,10 @@ export class Subject extends Observable implements Observer, Subscripti return subscriber.error(this.errorValue); } else if (this.hasCompleted) { return subscriber.complete(); - } else if (this.isUnsubscribed) { - throw new Error('Cannot subscribe to a disposed Subject.'); } + this.throwIfUnsubscribed(); + const subscription = new SubjectSubscription(this, subscriber); this.observers.push(subscriber); @@ -73,6 +76,8 @@ export class Subject extends Observable implements Observer, Subscripti } next(value: T): void { + this.throwIfUnsubscribed(); + if (this.isStopped) { return; } @@ -89,6 +94,8 @@ export class Subject extends Observable implements Observer, Subscripti } error(err?: any): void { + this.throwIfUnsubscribed(); + if (this.isStopped) { return; } @@ -105,6 +112,8 @@ export class Subject extends Observable implements Observer, Subscripti } complete(): void { + this.throwIfUnsubscribed(); + if (this.isStopped) { return; } @@ -200,6 +209,12 @@ export class Subject extends Observable implements Observer, Subscripti this.unsubscribe(); } + private throwIfUnsubscribed(): void { + if (this.isUnsubscribed) { + throwError(new ObjectUnsubscribedError()); + } + } + [rxSubscriber]() { return new Subscriber(this); }