From fea08e965dd4ad82dfe1eab2865eb48908fc564c Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 24 Oct 2016 13:56:12 -0700 Subject: [PATCH] fix(ReplaySubject): observer now subscribed prior to running subscription function (#2046) Removed reliance on inherited `_subscribe` method from `Subject`. ReplaySubject has different behaviors during subscription for notifying new subscribers after a completion or error. After moving the `super._subscribe` call before the notification from saved `_events`, it was apparent that the inherited behavior from `Subject` was stopping the subscriber before it could be notified properly. fixes #2044 --- spec/subjects/ReplaySubject-spec.ts | 15 +++++++++++++++ src/ReplaySubject.ts | 23 +++++++++++++++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/spec/subjects/ReplaySubject-spec.ts b/spec/subjects/ReplaySubject-spec.ts index 0b5a8c7f11..a415f98994 100644 --- a/spec/subjects/ReplaySubject-spec.ts +++ b/spec/subjects/ReplaySubject-spec.ts @@ -16,6 +16,21 @@ describe('ReplaySubject', () => { done(); }); + it('should add the observer before running subscription code', () => { + const subject = new ReplaySubject(); + subject.next(1); + const results = []; + + subject.subscribe((value) => { + results.push(value); + if (value < 3) { + subject.next(value + 1); + } + }); + + expect(results).to.deep.equal([1, 2, 3]); + }); + it('should replay values upon subscription', (done: MochaDone) => { const subject = new ReplaySubject(); const expects = [1, 2, 3]; diff --git a/src/ReplaySubject.ts b/src/ReplaySubject.ts index 72bd7691d5..0ab64ee603 100644 --- a/src/ReplaySubject.ts +++ b/src/ReplaySubject.ts @@ -4,7 +4,8 @@ import { queue } from './scheduler/queue'; import { Subscriber } from './Subscriber'; import { Subscription } from './Subscription'; import { ObserveOnSubscriber } from './operator/observeOn'; - +import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError'; +import { SubjectSubscription } from './SubjectSubscription'; /** * @class ReplaySubject */ @@ -31,6 +32,18 @@ export class ReplaySubject extends Subject { protected _subscribe(subscriber: Subscriber): Subscription { const _events = this._trimBufferThenGetEvents(); const scheduler = this.scheduler; + let subscription: Subscription; + + if (this.closed) { + throw new ObjectUnsubscribedError(); + } else if (this.hasError) { + subscription = Subscription.EMPTY; + } else if (this.isStopped) { + subscription = Subscription.EMPTY; + } else { + this.observers.push(subscriber); + subscription = new SubjectSubscription(this, subscriber); + } if (scheduler) { subscriber.add(subscriber = new ObserveOnSubscriber(subscriber, scheduler)); @@ -41,7 +54,13 @@ export class ReplaySubject extends Subject { subscriber.next(_events[i].value); } - return super._subscribe(subscriber); + if (this.hasError) { + subscriber.error(this.thrownError); + } else if (this.isStopped) { + subscriber.complete(); + } + + return subscription; } _getNow(): number {