Skip to content

Commit

Permalink
fix(observeOn): remove observed subscription to clean up notification…
Browse files Browse the repository at this point in the history
… instance

- relates to ReactiveX#2244
  • Loading branch information
kwonoj committed Jan 2, 2017
1 parent 6922b16 commit 3e6ce97
Showing 1 changed file with 23 additions and 14 deletions.
37 changes: 23 additions & 14 deletions src/operator/observeOn.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { Observable } from '../Observable';
import { Scheduler } from '../Scheduler';
import { Operator } from '../Operator';
import { PartialObserver } from '../Observer';
import { Subscriber } from '../Subscriber';
import { Notification } from '../Notification';
import { TeardownLogic } from '../Subscription';
import { Subscription, TeardownLogic } from '../Subscription';

/**
* @see {@link Notification}
Expand Down Expand Up @@ -34,21 +33,20 @@ export class ObserveOnOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
export class ObserveOnSubscriber<T> extends Subscriber<T> {
static dispatch(arg: ObserveOnMessage) {
const { notification, destination } = arg;
notification.observe(destination);
}

constructor(destination: Subscriber<T>,
constructor(protected destination: Subscriber<T>,
private scheduler: Scheduler,
private delay: number = 0) {
super(destination);
}

private scheduleMessage(notification: Notification<any>): void {
this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch,
this.delay,
new ObserveOnMessage(notification, this.destination)));
const message = new ObserveOnContext(notification, this);
const subscription = this.scheduler.schedule(ObserveOnDispatch, this.delay, message);

//do not add into subscription if scheduled synchronously
if (!subscription.closed) {
this.add(message.subscription = subscription);
}
}

protected _next(value: T): void {
Expand All @@ -62,10 +60,21 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
protected _complete(): void {
this.scheduleMessage(Notification.createComplete());
}

public observe(notification: Notification<any>, subscription: Subscription): void {
notification.observe(this.destination);
this.remove(subscription);
}
}

function ObserveOnDispatch(context: ObserveOnContext) {
const { notification, subscriber, subscription } = context;
subscriber.observe(notification, subscription);
}

export class ObserveOnMessage {
constructor(public notification: Notification<any>,
public destination: PartialObserver<any>) {
class ObserveOnContext {
public subscription: Subscription;
constructor(public readonly notification: Notification<any>,
public readonly subscriber: ObserveOnSubscriber<any>) {
}
}

0 comments on commit 3e6ce97

Please sign in to comment.