|
| 1 | +import { Action } from '../scheduler/Action'; |
1 | 2 | import { Operator } from '../Operator'; |
2 | 3 | import { Subscriber } from '../Subscriber'; |
3 | 4 | import { IScheduler } from '../Scheduler'; |
4 | 5 | import { async } from '../scheduler/async'; |
5 | | -import { Subscription, TeardownLogic } from '../Subscription'; |
| 6 | +import { TeardownLogic } from '../Subscription'; |
6 | 7 | import { Observable, ObservableInput } from '../Observable'; |
7 | 8 | import { isDate } from '../util/isDate'; |
8 | 9 | import { OuterSubscriber } from '../OuterSubscriber'; |
@@ -49,65 +50,50 @@ class TimeoutWithOperator<T> implements Operator<T, T> { |
49 | 50 | * @extends {Ignored} |
50 | 51 | */ |
51 | 52 | class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> { |
52 | | - private timeoutSubscription: Subscription = undefined; |
53 | | - private index: number = 0; |
54 | | - private _previousIndex: number = 0; |
55 | | - get previousIndex(): number { |
56 | | - return this._previousIndex; |
57 | | - } |
58 | | - private _hasCompleted: boolean = false; |
59 | | - get hasCompleted(): boolean { |
60 | | - return this._hasCompleted; |
61 | | - } |
62 | 53 |
|
63 | | - constructor(public destination: Subscriber<T>, |
| 54 | + private action: Action<TimeoutWithSubscriber<T, R>> = null; |
| 55 | + |
| 56 | + constructor(destination: Subscriber<T>, |
64 | 57 | private absoluteTimeout: boolean, |
65 | 58 | private waitFor: number, |
66 | 59 | private withObservable: ObservableInput<any>, |
67 | 60 | private scheduler: IScheduler) { |
68 | | - super(); |
69 | | - destination.add(this); |
| 61 | + super(destination); |
70 | 62 | this.scheduleTimeout(); |
71 | 63 | } |
72 | 64 |
|
73 | | - private static dispatchTimeout(state: any): void { |
74 | | - const source = state.subscriber; |
75 | | - const currentIndex = state.index; |
76 | | - if (!source.hasCompleted && source.previousIndex === currentIndex) { |
77 | | - source.handleTimeout(); |
78 | | - } |
| 65 | + private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void { |
| 66 | + const { withObservable } = subscriber; |
| 67 | + (<any> subscriber)._unsubscribeAndRecycle(); |
| 68 | + subscriber.add(subscribeToResult(subscriber, withObservable)); |
79 | 69 | } |
80 | 70 |
|
81 | 71 | private scheduleTimeout(): void { |
82 | | - let currentIndex = this.index; |
83 | | - const timeoutState = { subscriber: this, index: currentIndex }; |
84 | | - this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState); |
85 | | - this.index++; |
86 | | - this._previousIndex = currentIndex; |
| 72 | + const { action } = this; |
| 73 | + if (action) { |
| 74 | + // Recycle the action if we've already scheduled one. All the production |
| 75 | + // Scheduler Actions mutate their state/delay time and return themeselves. |
| 76 | + // VirtualActions are immutable, so they create and return a clone. In this |
| 77 | + // case, we need to set the action reference to the most recent VirtualAction, |
| 78 | + // to ensure that's the one we clone from next time. |
| 79 | + this.action = (<Action<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor)); |
| 80 | + } else { |
| 81 | + this.add(this.action = (<Action<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule( |
| 82 | + TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this |
| 83 | + ))); |
| 84 | + } |
87 | 85 | } |
88 | 86 |
|
89 | | - protected _next(value: T) { |
90 | | - this.destination.next(value); |
| 87 | + protected _next(value: T): void { |
91 | 88 | if (!this.absoluteTimeout) { |
92 | 89 | this.scheduleTimeout(); |
93 | 90 | } |
| 91 | + super._next(value); |
94 | 92 | } |
95 | 93 |
|
96 | | - protected _error(err: any) { |
97 | | - this.destination.error(err); |
98 | | - this._hasCompleted = true; |
99 | | - } |
100 | | - |
101 | | - protected _complete() { |
102 | | - this.destination.complete(); |
103 | | - this._hasCompleted = true; |
104 | | - } |
105 | | - |
106 | | - handleTimeout(): void { |
107 | | - if (!this.closed) { |
108 | | - const withObservable = this.withObservable; |
109 | | - this.unsubscribe(); |
110 | | - this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable)); |
111 | | - } |
| 94 | + protected _unsubscribe() { |
| 95 | + this.action = null; |
| 96 | + this.scheduler = null; |
| 97 | + this.withObservable = null; |
112 | 98 | } |
113 | 99 | } |
0 commit comments