Skip to content

Commit 93f29d3

Browse files
committed
fix(throttle): fix issue #709, where throttled Stream does not emit done event.
1 parent e8ad800 commit 93f29d3

File tree

2 files changed

+81
-32
lines changed

2 files changed

+81
-32
lines changed

lib/src/transformers/backpressure/backpressure.dart

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
5454
@override
5555
void onData(S data) {
5656
_hasData = true;
57-
maybeCreateWindow(data, sink);
57+
maybeCreateWindow(data);
5858

5959
if (skip == 0) {
6060
queue.add(data);
@@ -68,7 +68,7 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
6868
skip--;
6969
}
7070

71-
maybeCloseWindow(sink);
71+
maybeCloseWindow();
7272
}
7373

7474
@override
@@ -79,20 +79,27 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
7979
_mainClosed = true;
8080

8181
if (_strategy == WindowStrategy.eventAfterLastWindow) {
82+
resolveWindowEnd(isControllerClosing: true, isWindowClosed: false);
8283
return;
8384
}
8485

8586
// treat the final event as a Window that opens
8687
// and immediately closes again
8788
if (_dispatchOnClose && queue.isNotEmpty) {
88-
resolveWindowStart(queue.last, sink);
89+
resolveWindowStart(queue.last);
8990
}
9091

91-
resolveWindowEnd(sink, true);
92+
resolveWindowEnd(isControllerClosing: true, isWindowClosed: false);
9293

94+
clearAndClose();
95+
}
96+
97+
void clearAndClose() {
9398
queue.clear();
9499

95100
_windowSubscription?.cancel();
101+
_windowSubscription = null;
102+
96103
sink.close();
97104
}
98105

@@ -108,63 +115,65 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
108115
@override
109116
void onResume() => _windowSubscription?.resume();
110117

111-
void maybeCreateWindow(S event, EventSink<T> sink) {
118+
void maybeCreateWindow(S event) {
112119
switch (_strategy) {
113120
// for example throttle
114121
case WindowStrategy.eventAfterLastWindow:
115122
if (_windowSubscription != null) return;
116123

117-
_windowSubscription = singleWindow(event, sink);
124+
_windowSubscription = singleWindow(event);
118125

119-
resolveWindowStart(event, sink);
126+
resolveWindowStart(event);
120127

121128
break;
122129
// for example scan
123130
case WindowStrategy.firstEventOnly:
124131
if (_windowSubscription != null) return;
125132

126-
_windowSubscription = multiWindow(event, sink);
133+
_windowSubscription = multiWindow(event);
127134

128-
resolveWindowStart(event, sink);
135+
resolveWindowStart(event);
129136

130137
break;
131138
// for example debounce
132139
case WindowStrategy.everyEvent:
133140
_windowSubscription?.cancel();
134141

135-
_windowSubscription = singleWindow(event, sink);
142+
_windowSubscription = singleWindow(event);
136143

137-
resolveWindowStart(event, sink);
144+
resolveWindowStart(event);
138145

139146
break;
140147
case WindowStrategy.onHandler:
141148
break;
142149
}
143150
}
144151

145-
void maybeCloseWindow(EventSink<T> sink) {
152+
void maybeCloseWindow() {
146153
if (_closeWindowWhen != null && _closeWindowWhen!(unmodifiableQueue)) {
147-
resolveWindowEnd(sink);
154+
resolveWindowEnd(isControllerClosing: false, isWindowClosed: false);
148155
}
149156
}
150157

151-
StreamSubscription<dynamic> singleWindow(S event, EventSink<T> sink) =>
152-
buildStream(event, sink).take(1).listen(
158+
StreamSubscription<dynamic> singleWindow(S event) =>
159+
buildStream(event).take(1).listen(
153160
null,
154161
onError: sink.addError,
155-
onDone: () => resolveWindowEnd(sink, _mainClosed),
162+
onDone: () => resolveWindowEnd(
163+
isControllerClosing: _mainClosed, isWindowClosed: true),
156164
);
157165

158166
// opens a new Window which is kept open until the main Stream
159167
// closes.
160-
StreamSubscription<dynamic> multiWindow(S event, EventSink<T> sink) =>
161-
buildStream(event, sink).listen(
162-
(dynamic _) => resolveWindowEnd(sink),
168+
StreamSubscription<dynamic> multiWindow(S event) => buildStream(event).listen(
169+
(dynamic _) => resolveWindowEnd(
170+
isControllerClosing: _mainClosed, isWindowClosed: false),
163171
onError: sink.addError,
164-
onDone: () => resolveWindowEnd(sink),
172+
onDone: () => resolveWindowEnd(
173+
isControllerClosing: _mainClosed, isWindowClosed: true),
165174
);
166175

167-
Stream<dynamic> buildStream(S event, EventSink<T> sink) {
176+
Stream<dynamic> buildStream(S event) {
168177
Stream stream;
169178

170179
_windowSubscription?.cancel();
@@ -174,27 +183,38 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
174183
return stream;
175184
}
176185

177-
void resolveWindowStart(S event, EventSink<T> sink) {
186+
void resolveWindowStart(S event) {
178187
if (_onWindowStart != null) {
179188
sink.add(_onWindowStart!(event));
180189
}
181190
}
182191

183-
void resolveWindowEnd(EventSink<T> sink, [bool isControllerClosing = false]) {
192+
void resolveWindowEnd({
193+
required bool isControllerClosing,
194+
required bool isWindowClosed,
195+
}) {
184196
if (isControllerClosing &&
185197
_strategy == WindowStrategy.eventAfterLastWindow) {
186-
if (_dispatchOnClose &&
187-
_hasData &&
188-
queue.length > 1 &&
189-
_onWindowEnd != null) {
190-
sink.add(_onWindowEnd!(unmodifiableQueue));
198+
// has no last data, close immediately
199+
if (!_hasData || queue.length == 1) {
200+
clearAndClose();
201+
return;
191202
}
192203

193-
queue.clear();
194-
_windowSubscription?.cancel();
195-
_windowSubscription = null;
204+
// once the Stream has emitted done event, there may still be a pending data
205+
// waiting to be emitted. If so, wait for the window to end and then
206+
// emit it.
207+
if (!isWindowClosed) {
208+
// defer until the window closes
209+
return;
210+
}
211+
212+
// send the last event
213+
if (_dispatchOnClose && _onWindowEnd != null) {
214+
sink.add(_onWindowEnd!(unmodifiableQueue));
215+
}
196216

197-
sink.close();
217+
clearAndClose();
198218
return;
199219
}
200220

test/transformers/backpressure/throttle_time_test.dart

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,26 @@ void main() {
1515
emitsInOrder(<dynamic>[1, 4, 7, emitsDone]));
1616
});
1717

18+
test('Rx.throttleTime.trailing.empty', () async {
19+
await expectLater(
20+
Stream<int>.empty().throttleTime(
21+
const Duration(milliseconds: 250),
22+
leading: false,
23+
trailing: true,
24+
),
25+
emitsDone,
26+
);
27+
28+
await expectLater(
29+
Stream<int>.empty().throttleTime(
30+
const Duration(milliseconds: 250),
31+
leading: true,
32+
trailing: true,
33+
),
34+
emitsDone,
35+
);
36+
});
37+
1838
test('Rx.throttleTime.trailing', () async {
1939
await expectLater(
2040
_stream()
@@ -99,4 +119,13 @@ void main() {
99119
(s) => s.throttleTime(Duration.zero),
100120
);
101121
});
122+
123+
test('issue/709 throttled stream closes', () async {
124+
final c = StreamController<String>();
125+
unawaited(Future<void>.delayed(Duration(milliseconds: 500))
126+
.then<void>((f) => c.close()));
127+
128+
final s = c.stream.throttleTime(Duration(milliseconds: 100));
129+
await for (var _ in s) {}
130+
});
102131
}

0 commit comments

Comments
 (0)