From f67a5968ebb4b3004171abaec4c33ba74c515adf Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Mon, 19 Oct 2015 15:47:15 +0300 Subject: [PATCH] fix(repeat): fix inner subscription semantics for repeat Fix repeat operator to unsubscribe from the repeatable source as soon as possible (when the previous repetition was completed), not when the resulting Observable was unsubscribed (which is as late as possible). Also fix repeat operator to not subscribe to the source at all if count=0. Resolves #554. --- src/operators/repeat.ts | 84 +++++++++++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 20 deletions(-) diff --git a/src/operators/repeat.ts b/src/operators/repeat.ts index befff15e83..118fa21d3f 100644 --- a/src/operators/repeat.ts +++ b/src/operators/repeat.ts @@ -2,48 +2,92 @@ import Operator from '../Operator'; import Observer from '../Observer'; import Subscriber from '../Subscriber'; import Observable from '../Observable'; +import EmptyObservable from '../observables/EmptyObservable'; import immediate from '../schedulers/immediate'; +import Subscription from '../Subscription'; export default function repeat(count: number = -1): Observable { - return this.lift(new RepeatOperator(count, this)); + if (count === 0) { + return EmptyObservable.create(); + } else { + return this.lift(new RepeatOperator(count, this)); + } } class RepeatOperator implements Operator { - constructor(private count: number, private original: Observable) { + constructor(private count: number, + private source: Observable) { } call(subscriber: Subscriber): Subscriber { - return new RepeatSubscriber(subscriber, this.count, this.original); + return new FirstRepeatSubscriber(subscriber, this.count, this.source); } } -class RepeatSubscriber extends Subscriber { - constructor(destination: Observer, private count: number, private original: Observable) { - super(destination); - this.invalidateRepeat(); +class FirstRepeatSubscriber extends Subscriber { + private lastSubscription: Subscription; + + constructor(public destination: Subscriber, + private count: number, + private source: Observable) { + super(null); + if (count === 0) { + this.destination.complete(); + super.unsubscribe(); + } + this.lastSubscription = this; + } + + _next(value: T) { + this.destination.next(value); + } + + _error(err: any) { + this.destination.error(err); + } + + complete() { + if (!this.isUnsubscribed) { + this.resubscribe(this.count); + } } - private repeatSubscription(): void { - let state = { dest: this.destination, count: this.count, original: this.original }; - immediate.scheduleNow(RepeatSubscriber.dispatchSubscription, state); + unsubscribe() { + const lastSubscription = this.lastSubscription; + if (lastSubscription === this) { + super.unsubscribe(); + } else { + lastSubscription.unsubscribe(); + } } - private invalidateRepeat(): Boolean { - let completed = this.count === 0; - if (completed) { + resubscribe(count: number) { + this.lastSubscription.unsubscribe(); + if (count - 1 === 0) { this.destination.complete(); + } else { + const nextSubscriber = new MoreRepeatSubscriber(this, count - 1); + this.lastSubscription = this.source.subscribe(nextSubscriber); } - return completed; } +} - private static dispatchSubscription({ dest, count, original }): void { - return original.subscribe(new RepeatSubscriber(dest, count, original)); +class MoreRepeatSubscriber extends Subscriber { + constructor(private parent: FirstRepeatSubscriber, + private count: number) { + super(null); + } + + _next(value: T) { + this.parent.destination.next(value); + } + + _error(err: any) { + this.parent.destination.error(err); } _complete() { - if (!this.invalidateRepeat()) { - this.count--; - this.repeatSubscription(); - } + const count = this.count; + this.parent.resubscribe(count < 0 ? -1 : count); } }