From ee2516080923bd2946254cabcc694035e856df02 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Mon, 2 Jan 2017 15:07:58 -0800 Subject: [PATCH] fix(observeOn): remove observed subscription to clean up notification instance - relates to #2244 --- src/operator/observeOn.ts | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/operator/observeOn.ts b/src/operator/observeOn.ts index 88d070ed85..31b54939d4 100644 --- a/src/operator/observeOn.ts +++ b/src/operator/observeOn.ts @@ -1,10 +1,9 @@ import { Observable } from '../Observable'; import { Scheduler } from '../Scheduler'; import { Operator } from '../Operator'; -import { PartialObserver } from '../Observer'; import { Subscriber } from '../Subscriber'; import { Notification } from '../Notification'; -import { TeardownLogic } from '../Subscription'; +import { Subscription, TeardownLogic } from '../Subscription'; /** * @see {@link Notification} @@ -34,21 +33,25 @@ export class ObserveOnOperator implements Operator { * @extends {Ignored} */ export class ObserveOnSubscriber extends Subscriber { - static dispatch(arg: ObserveOnMessage) { - const { notification, destination } = arg; - notification.observe(destination); + private static dispatch(context: ObserveOnContext): void { + const { notification, subscriber, subscription } = context; + subscriber.observe(notification, subscription); } - constructor(destination: Subscriber, + constructor(protected destination: Subscriber, private scheduler: Scheduler, private delay: number = 0) { super(destination); } private scheduleMessage(notification: Notification): void { - this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, - this.delay, - new ObserveOnMessage(notification, this.destination))); + const message = new ObserveOnContext(notification, this); + const subscription = this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message); + + //do not add into subscription if scheduled synchronously + if (!subscription.closed) { + this.add(message.subscription = subscription); + } } protected _next(value: T): void { @@ -62,10 +65,16 @@ export class ObserveOnSubscriber extends Subscriber { protected _complete(): void { this.scheduleMessage(Notification.createComplete()); } + + public observe(notification: Notification, subscription: Subscription): void { + notification.observe(this.destination); + this.remove(subscription); + } } -export class ObserveOnMessage { - constructor(public notification: Notification, - public destination: PartialObserver) { +class ObserveOnContext { + public subscription: Subscription; + constructor(public readonly notification: Notification, + public readonly subscriber: ObserveOnSubscriber) { } }