From 997d6802a4b73da623ba17c7c6b596ffcadcfac0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 7 Jul 2023 14:06:46 +0200 Subject: [PATCH] stream: fix deadlock when pipeing to full sink When piping a paused Readable to a full Writable we didn't register a drain listener which cause the src to never resume. Refs: https://github.com/nodejs/node/issues/48666 --- lib/internal/streams/readable.js | 28 ++++++++++++++-------- test/parallel/test-stream-pipe-deadlock.js | 27 +++++++++++++++++++++ 2 files changed, 45 insertions(+), 10 deletions(-) create mode 100644 test/parallel/test-stream-pipe-deadlock.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 7a2aad41ee2baa..aeba701b5da4e2 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -747,17 +747,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // to get stuck in a permanently paused state if that write // also returned false. // => Check whether `dest` is still a piping destination. - if (!cleanedUp) { - if (state.pipes.length === 1 && state.pipes[0] === dest) { - debug('false write response, pause', 0); - state.awaitDrainWriters = dest; - state.multiAwaitDrain = false; - } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - debug('false write response, pause', state.awaitDrainWriters.size); - state.awaitDrainWriters.add(dest); - } - src.pause(); + if (cleanedUp) { + return + } + + if (state.pipes.length === 1 && state.pipes[0] === dest) { + debug('false write response, pause', 0); + state.awaitDrainWriters = dest; + state.multiAwaitDrain = false; + } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { + debug('false write response, pause', state.awaitDrainWriters.size); + state.awaitDrainWriters.add(dest); } + + src.pause(); + } + + function registerDrain () { if (!ondrain) { // When the dest drains, it reduces the awaitDrain counter // on the source. This would be more elegant with a .once() @@ -775,6 +781,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { debug('dest.write', ret); if (ret === false) { pause(); + registerDrain(); } } @@ -825,6 +832,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { if (state.flowing) { pause(); } + registerDrain(); } else if (!state.flowing) { debug('pipe resume'); src.resume(); diff --git a/test/parallel/test-stream-pipe-deadlock.js b/test/parallel/test-stream-pipe-deadlock.js new file mode 100644 index 00000000000000..aca31fc813af57 --- /dev/null +++ b/test/parallel/test-stream-pipe-deadlock.js @@ -0,0 +1,27 @@ +"use strict"; + +const common = require('../common'); +const { Readable, Writable } = require('stream'); + +// https://github.com/nodejs/node/issues/48666 +(async () => { + // Prepare src that is internally ended, with buffered data pending + const src = new Readable({ read() {} }); + src.push(Buffer.alloc(100)); + src.push(null); + src.pause(); + + // Give it time to settle + await new Promise((resolve) => setImmediate(resolve)); + + const dst = new Writable({ + highWaterMark: 1000, + write(buf, enc, cb) { + process.nextTick(cb); + } + }); + + dst.write(Buffer.alloc(1000)); // Fill write buffer + dst.on('finish', common.mustCall()); + src.pipe(dst); +})();