Skip to content

Commit f67a596

Browse files
Andre Medeirosbenlesh
authored andcommitted
fix(repeat): fix inner subscription semantics for repeat
Fix repeat operator to unsubscribe from the repeatable source as soon as possible (when the previous repetition was completed), not when the resulting Observable was unsubscribed (which is as late as possible). Also fix repeat operator to not subscribe to the source at all if count=0. Resolves #554.
1 parent d3ea639 commit f67a596

File tree

1 file changed

+64
-20
lines changed

1 file changed

+64
-20
lines changed

src/operators/repeat.ts

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,92 @@ import Operator from '../Operator';
22
import Observer from '../Observer';
33
import Subscriber from '../Subscriber';
44
import Observable from '../Observable';
5+
import EmptyObservable from '../observables/EmptyObservable';
56
import immediate from '../schedulers/immediate';
7+
import Subscription from '../Subscription';
68

79
export default function repeat<T>(count: number = -1): Observable<T> {
8-
return this.lift(new RepeatOperator(count, this));
10+
if (count === 0) {
11+
return EmptyObservable.create();
12+
} else {
13+
return this.lift(new RepeatOperator(count, this));
14+
}
915
}
1016

1117
class RepeatOperator<T, R> implements Operator<T, R> {
12-
constructor(private count: number, private original: Observable<T>) {
18+
constructor(private count: number,
19+
private source: Observable<T>) {
1320
}
1421

1522
call(subscriber: Subscriber<T>): Subscriber<T> {
16-
return new RepeatSubscriber(subscriber, this.count, this.original);
23+
return new FirstRepeatSubscriber(subscriber, this.count, this.source);
1724
}
1825
}
1926

20-
class RepeatSubscriber<T> extends Subscriber<T> {
21-
constructor(destination: Observer<T>, private count: number, private original: Observable<T>) {
22-
super(destination);
23-
this.invalidateRepeat();
27+
class FirstRepeatSubscriber<T> extends Subscriber<T> {
28+
private lastSubscription: Subscription<T>;
29+
30+
constructor(public destination: Subscriber<T>,
31+
private count: number,
32+
private source: Observable<T>) {
33+
super(null);
34+
if (count === 0) {
35+
this.destination.complete();
36+
super.unsubscribe();
37+
}
38+
this.lastSubscription = this;
39+
}
40+
41+
_next(value: T) {
42+
this.destination.next(value);
43+
}
44+
45+
_error(err: any) {
46+
this.destination.error(err);
47+
}
48+
49+
complete() {
50+
if (!this.isUnsubscribed) {
51+
this.resubscribe(this.count);
52+
}
2453
}
2554

26-
private repeatSubscription(): void {
27-
let state = { dest: this.destination, count: this.count, original: this.original };
28-
immediate.scheduleNow(RepeatSubscriber.dispatchSubscription, state);
55+
unsubscribe() {
56+
const lastSubscription = this.lastSubscription;
57+
if (lastSubscription === this) {
58+
super.unsubscribe();
59+
} else {
60+
lastSubscription.unsubscribe();
61+
}
2962
}
3063

31-
private invalidateRepeat(): Boolean {
32-
let completed = this.count === 0;
33-
if (completed) {
64+
resubscribe(count: number) {
65+
this.lastSubscription.unsubscribe();
66+
if (count - 1 === 0) {
3467
this.destination.complete();
68+
} else {
69+
const nextSubscriber = new MoreRepeatSubscriber(this, count - 1);
70+
this.lastSubscription = this.source.subscribe(nextSubscriber);
3571
}
36-
return completed;
3772
}
73+
}
3874

39-
private static dispatchSubscription({ dest, count, original }): void {
40-
return original.subscribe(new RepeatSubscriber(dest, count, original));
75+
class MoreRepeatSubscriber<T> extends Subscriber<T> {
76+
constructor(private parent: FirstRepeatSubscriber<T>,
77+
private count: number) {
78+
super(null);
79+
}
80+
81+
_next(value: T) {
82+
this.parent.destination.next(value);
83+
}
84+
85+
_error(err: any) {
86+
this.parent.destination.error(err);
4187
}
4288

4389
_complete() {
44-
if (!this.invalidateRepeat()) {
45-
this.count--;
46-
this.repeatSubscription();
47-
}
90+
const count = this.count;
91+
this.parent.resubscribe(count < 0 ? -1 : count);
4892
}
4993
}

0 commit comments

Comments
 (0)