diff --git a/src/operators/repeat.ts b/src/operators/repeat.ts index befff15e83..118fa21d3f 100644 --- a/src/operators/repeat.ts +++ b/src/operators/repeat.ts @@ -2,48 +2,92 @@ import Operator from '../Operator'; import Observer from '../Observer'; import Subscriber from '../Subscriber'; import Observable from '../Observable'; +import EmptyObservable from '../observables/EmptyObservable'; import immediate from '../schedulers/immediate'; +import Subscription from '../Subscription'; export default function repeat(count: number = -1): Observable { - return this.lift(new RepeatOperator(count, this)); + if (count === 0) { + return EmptyObservable.create(); + } else { + return this.lift(new RepeatOperator(count, this)); + } } class RepeatOperator implements Operator { - constructor(private count: number, private original: Observable) { + constructor(private count: number, + private source: Observable) { } call(subscriber: Subscriber): Subscriber { - return new RepeatSubscriber(subscriber, this.count, this.original); + return new FirstRepeatSubscriber(subscriber, this.count, this.source); } } -class RepeatSubscriber extends Subscriber { - constructor(destination: Observer, private count: number, private original: Observable) { - super(destination); - this.invalidateRepeat(); +class FirstRepeatSubscriber extends Subscriber { + private lastSubscription: Subscription; + + constructor(public destination: Subscriber, + private count: number, + private source: Observable) { + super(null); + if (count === 0) { + this.destination.complete(); + super.unsubscribe(); + } + this.lastSubscription = this; + } + + _next(value: T) { + this.destination.next(value); + } + + _error(err: any) { + this.destination.error(err); + } + + complete() { + if (!this.isUnsubscribed) { + this.resubscribe(this.count); + } } - private repeatSubscription(): void { - let state = { dest: this.destination, count: this.count, original: this.original }; - immediate.scheduleNow(RepeatSubscriber.dispatchSubscription, state); + unsubscribe() { + const lastSubscription = this.lastSubscription; + if (lastSubscription === this) { + super.unsubscribe(); + } else { + lastSubscription.unsubscribe(); + } } - private invalidateRepeat(): Boolean { - let completed = this.count === 0; - if (completed) { + resubscribe(count: number) { + this.lastSubscription.unsubscribe(); + if (count - 1 === 0) { this.destination.complete(); + } else { + const nextSubscriber = new MoreRepeatSubscriber(this, count - 1); + this.lastSubscription = this.source.subscribe(nextSubscriber); } - return completed; } +} - private static dispatchSubscription({ dest, count, original }): void { - return original.subscribe(new RepeatSubscriber(dest, count, original)); +class MoreRepeatSubscriber extends Subscriber { + constructor(private parent: FirstRepeatSubscriber, + private count: number) { + super(null); + } + + _next(value: T) { + this.parent.destination.next(value); + } + + _error(err: any) { + this.parent.destination.error(err); } _complete() { - if (!this.invalidateRepeat()) { - this.count--; - this.repeatSubscription(); - } + const count = this.count; + this.parent.resubscribe(count < 0 ? -1 : count); } }