Skip to content

Commit 10cc8a6

Browse files
authored
refactor: Improve memory pressure (#5613)
this is the duplicate of #5610, it is refactoring to ensure outer values are not retained when they do not have to be. It needs to be done in a separate PR because the branches diverge just enough to require it. This PR also has some mild, internal type fixes.
1 parent 075af28 commit 10cc8a6

30 files changed

+403
-396
lines changed

src/internal/innerSubscribe.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/** @prettier */
2+
import { Subscription } from './Subscription';
3+
import { Subscriber } from './Subscriber';
4+
import { Observable } from './Observable';
5+
import { subscribeTo } from './util/subscribeTo';
6+
7+
interface SimpleOuterSubscriberLike<T> {
8+
/**
9+
* A handler for inner next notifications from the inner subscription
10+
* @param innerValue the value nexted by the inner producer
11+
*/
12+
notifyNext(innerValue: T): void;
13+
/**
14+
* A handler for inner error notifications from the inner subscription
15+
* @param err the error from the inner producer
16+
*/
17+
notifyError(err: any): void;
18+
/**
19+
* A handler for inner complete notifications from the inner subscription.
20+
*/
21+
notifyComplete(): void;
22+
}
23+
24+
export class SimpleInnerSubscriber<T> extends Subscriber<T> {
25+
constructor(private parent: SimpleOuterSubscriberLike<any>) {
26+
super();
27+
}
28+
29+
protected _next(value: T): void {
30+
this.parent.notifyNext(value);
31+
}
32+
33+
protected _error(error: any): void {
34+
this.parent.notifyError(error);
35+
this.unsubscribe();
36+
}
37+
38+
protected _complete(): void {
39+
this.parent.notifyComplete();
40+
this.unsubscribe();
41+
}
42+
}
43+
44+
export class ComplexInnerSubscriber<T, R> extends Subscriber<R> {
45+
constructor(private parent: ComplexOuterSubscriber<T, R>, public outerValue: T, public outerIndex: number) {
46+
super();
47+
}
48+
49+
protected _next(value: R): void {
50+
this.parent.notifyNext(this.outerValue, value, this.outerIndex, this);
51+
}
52+
53+
protected _error(error: any): void {
54+
this.parent.notifyError(error);
55+
this.unsubscribe();
56+
}
57+
58+
protected _complete(): void {
59+
this.parent.notifyComplete(this);
60+
this.unsubscribe();
61+
}
62+
}
63+
64+
export class SimpleOuterSubscriber<T, R> extends Subscriber<T> implements SimpleOuterSubscriberLike<R> {
65+
notifyNext(innerValue: R): void {
66+
this.destination.next(innerValue);
67+
}
68+
69+
notifyError(err: any): void {
70+
this.destination.error(err);
71+
}
72+
73+
notifyComplete(): void {
74+
this.destination.complete();
75+
}
76+
}
77+
78+
/**
79+
* DO NOT USE (formerly "OuterSubscriber")
80+
* TODO: We want to refactor this and remove it. It is retaining values it shouldn't for long
81+
* periods of time.
82+
*/
83+
export class ComplexOuterSubscriber<T, R> extends Subscriber<T> {
84+
/**
85+
* @param _outerValue Used by: bufferToggle, delayWhen, windowToggle
86+
* @param innerValue Used by: subclass default, combineLatest, race, bufferToggle, windowToggle, withLatestFrom
87+
* @param _outerIndex Used by: combineLatest, race, withLatestFrom
88+
* @param _innerSub Used by: delayWhen
89+
*/
90+
notifyNext(_outerValue: T, innerValue: R, _outerIndex: number, _innerSub: ComplexInnerSubscriber<T, R>): void {
91+
this.destination.next(innerValue);
92+
}
93+
94+
notifyError(error: any): void {
95+
this.destination.error(error);
96+
}
97+
98+
/**
99+
* @param _innerSub Used by: race, bufferToggle, delayWhen, windowToggle, windowWhen
100+
*/
101+
notifyComplete(_innerSub: ComplexInnerSubscriber<T, R>): void {
102+
this.destination.complete();
103+
}
104+
}
105+
106+
export function innerSubscribe(result: any, innerSubscriber: Subscriber<any>): Subscription | undefined {
107+
if (innerSubscriber.closed) {
108+
return undefined;
109+
}
110+
if (result instanceof Observable) {
111+
return result.subscribe(innerSubscriber);
112+
}
113+
return subscribeTo(result)(innerSubscriber) as Subscription;
114+
}

src/internal/observable/combineLatest.ts

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,8 @@ export function combineLatest<R>(...observables: Array<ObservableInput<any> | ((
226226
export function combineLatest<O extends ObservableInput<any>, R>(
227227
...observables: (O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike)[]
228228
): Observable<R> {
229-
let resultSelector: (...values: Array<any>) => R = null;
230-
let scheduler: SchedulerLike = null;
229+
let resultSelector: ((...values: Array<any>) => R) | undefined = undefined;
230+
let scheduler: SchedulerLike|undefined = undefined;
231231

232232
if (isScheduler(observables[observables.length - 1])) {
233233
scheduler = observables.pop() as SchedulerLike;
@@ -243,7 +243,7 @@ export function combineLatest<O extends ObservableInput<any>, R>(
243243
observables = observables[0] as any;
244244
}
245245

246-
return fromArray(observables, scheduler).lift(new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector));
246+
return fromArray(observables, scheduler).lift(new CombineLatestOperator(resultSelector));
247247
}
248248

249249
export class CombineLatestOperator<T, R> implements Operator<T, R> {
@@ -264,7 +264,7 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
264264
private active: number = 0;
265265
private values: any[] = [];
266266
private observables: any[] = [];
267-
private toRespond: number;
267+
private toRespond?: number;
268268

269269
constructor(destination: Subscriber<R>, private resultSelector?: (...values: Array<any>) => R) {
270270
super(destination);
@@ -279,26 +279,25 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
279279
const observables = this.observables;
280280
const len = observables.length;
281281
if (len === 0) {
282-
this.destination.complete();
282+
this.destination.complete!();
283283
} else {
284284
this.active = len;
285285
this.toRespond = len;
286286
for (let i = 0; i < len; i++) {
287287
const observable = observables[i];
288-
this.add(subscribeToResult(this, observable, observable, i));
288+
this.add(subscribeToResult(this, observable, undefined, i));
289289
}
290290
}
291291
}
292292

293293
notifyComplete(unused: Subscriber<R>): void {
294294
if ((this.active -= 1) === 0) {
295-
this.destination.complete();
295+
this.destination.complete!();
296296
}
297297
}
298298

299-
notifyNext(outerValue: T, innerValue: R,
300-
outerIndex: number, innerIndex: number,
301-
innerSub: InnerSubscriber<T, R>): void {
299+
notifyNext(_outerValue: T, innerValue: R,
300+
outerIndex: number): void {
302301
const values = this.values;
303302
const oldVal = values[outerIndex];
304303
const toRespond = !this.toRespond
@@ -310,19 +309,19 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
310309
if (this.resultSelector) {
311310
this._tryResultSelector(values);
312311
} else {
313-
this.destination.next(values.slice());
312+
this.destination.next!(values.slice());
314313
}
315314
}
316315
}
317316

318317
private _tryResultSelector(values: any[]) {
319318
let result: any;
320319
try {
321-
result = this.resultSelector.apply(this, values);
320+
result = this.resultSelector!.apply(this, values);
322321
} catch (err) {
323-
this.destination.error(err);
322+
this.destination.error!(err);
324323
return;
325324
}
326-
this.destination.next(result);
325+
this.destination.next!(result);
327326
}
328327
}

src/internal/observable/race.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,24 +102,23 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
102102
const len = observables.length;
103103

104104
if (len === 0) {
105-
this.destination.complete();
105+
this.destination.complete!();
106106
} else {
107107
for (let i = 0; i < len && !this.hasFirst; i++) {
108-
let observable = observables[i];
109-
let subscription = subscribeToResult(this, observable, observable as any, i);
108+
const observable = observables[i];
109+
const subscription = subscribeToResult(this, observable, undefined, i)!;
110110

111111
if (this.subscriptions) {
112112
this.subscriptions.push(subscription);
113113
}
114114
this.add(subscription);
115115
}
116-
this.observables = null;
116+
this.observables = null!;
117117
}
118118
}
119119

120-
notifyNext(outerValue: T, innerValue: T,
121-
outerIndex: number, innerIndex: number,
122-
innerSub: InnerSubscriber<T, T>): void {
120+
notifyNext(_outerValue: T, innerValue: T,
121+
outerIndex: number): void {
123122
if (!this.hasFirst) {
124123
this.hasFirst = true;
125124

@@ -132,9 +131,9 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
132131
}
133132
}
134133

135-
this.subscriptions = null;
134+
this.subscriptions = null!;
136135
}
137136

138-
this.destination.next(innerValue);
137+
this.destination.next!(innerValue);
139138
}
140139
}

0 commit comments

Comments
 (0)