Skip to content

Commit 216bb20

Browse files
committed
stream: resume stream on drain
Previously we would just resume "flowing" the stream without reseting the "paused" state. Fixes this by properly using pause/resume methods for .pipe. Fixes: #41785
1 parent 7faf763 commit 216bb20

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

lib/internal/streams/readable.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -853,8 +853,7 @@ function pipeOnDrain(src, dest) {
853853

854854
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
855855
EE.listenerCount(src, 'data')) {
856-
state.flowing = true;
857-
flow(src);
856+
src.resume();
858857
}
859858
};
860859
}

test/parallel/test-stream-readable-pause-and-resume.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,20 @@ function readAndPause() {
5656
assert(readable.isPaused());
5757
});
5858
}
59+
60+
{
61+
const { PassThrough } = require('stream');
62+
63+
const source3 = new PassThrough();
64+
const target3 = new PassThrough();
65+
66+
const chunk = Buffer.allocUnsafe(1000);
67+
let chunks = 1;
68+
while (target3.write(chunk)) chunks++;
69+
70+
source3.pipe(target3);
71+
target3.on('drain', common.mustCall(() => {
72+
assert(!source3.isPaused());
73+
}));
74+
target3.on('data', () => {});
75+
}

tmp.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
const { PassThrough } = require('stream');
2+
3+
// THIRD EXPERIMENT
4+
console.info('\n********** THIRD EXPERIMENT **********');
5+
const source3 = new PassThrough();
6+
const target3 = new PassThrough();
7+
8+
// stall target3
9+
const chunk = Buffer.allocUnsafe(1000);
10+
let chunks = 1;
11+
while (target3.write(chunk)) chunks++;
12+
console.info(`${chunks} chunks of ${chunk.length} bytes to stall target3`);
13+
14+
// `Readable.pipe()` PAUSES the source if the target needs drain (only in
15+
// version >= v14.17.0) and it does not resume it after drain
16+
console.info(`source3 before pipe. Paused: ${source3.isPaused()}`);
17+
source3.pipe(target3);
18+
console.info(`source3 after pipe. Paused: ${source3.isPaused()}`);
19+
target3.on('drain', () => {
20+
console.info('target3 drained');
21+
console.info(`source3 after drain. Paused: ${source3.isPaused()}`);
22+
console.info(source3.readableFlowing);
23+
});
24+
target3.on('data', () => {});

0 commit comments

Comments
 (0)