diff --git a/src/operator/observeOn.ts b/src/operator/observeOn.ts index 88d070ed850..8cc6afb837e 100644 --- a/src/operator/observeOn.ts +++ b/src/operator/observeOn.ts @@ -1,10 +1,9 @@ -import { Observable } from '../Observable'; -import { Scheduler } from '../Scheduler'; -import { Operator } from '../Operator'; -import { PartialObserver } from '../Observer'; -import { Subscriber } from '../Subscriber'; -import { Notification } from '../Notification'; -import { TeardownLogic } from '../Subscription'; +import {Observable} from '../Observable'; +import {Scheduler} from '../Scheduler'; +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {Notification} from '../Notification'; +import {Subscription, TeardownLogic} from '../Subscription'; /** * @see {@link Notification} @@ -34,21 +33,20 @@ export class ObserveOnOperator implements Operator { * @extends {Ignored} */ export class ObserveOnSubscriber extends Subscriber { - static dispatch(arg: ObserveOnMessage) { - const { notification, destination } = arg; - notification.observe(destination); - } - - constructor(destination: Subscriber, + constructor(protected destination: Subscriber, private scheduler: Scheduler, private delay: number = 0) { super(destination); } private scheduleMessage(notification: Notification): void { - this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, - this.delay, - new ObserveOnMessage(notification, this.destination))); + const message = new ObserveOnContext(notification, this); + const subscription = this.scheduler.schedule(ObserveOnDispatch, this.delay, message); + + //do not add into subscription if scheduled synchronously + if (!subscription.closed) { + this.add(message.subscription = subscription); + } } protected _next(value: T): void { @@ -62,10 +60,21 @@ export class ObserveOnSubscriber extends Subscriber { protected _complete(): void { this.scheduleMessage(Notification.createComplete()); } + + public observe(notification: Notification, subscription: Subscription): void { + notification.observe(this.destination); + this.remove(subscription); + } +} + +function ObserveOnDispatch(context: ObserveOnContext) { + const { notification, subscriber, subscription } = context; + subscriber.observe(notification, subscription); } -export class ObserveOnMessage { - constructor(public notification: Notification, - public destination: PartialObserver) { +class ObserveOnContext { + public subscription: Subscription; + constructor(public readonly notification: Notification, + public readonly subscriber: ObserveOnSubscriber) { } }