From 03c41a51910f8bae12578f6f2d7dbac5c712127f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 25 Sep 2020 18:50:49 +0200 Subject: [PATCH] fix: Client.stream writableNeedDrain Fixes: https://github.com/nodejs/undici/issues/441 Refs: https://github.com/nodejs/node/pull/35348 Refs: https://github.com/nodejs/node/issues/35341 --- lib/client-stream.js | 6 ++++ lib/core/client.js | 82 +++++++++++++++++++++++++++++++++---------- lib/core/request.js | 1 + lib/core/symbols.js | 1 - test/client-stream.js | 44 +++++++++++++++++++++++ 5 files changed, 114 insertions(+), 20 deletions(-) diff --git a/lib/client-stream.js b/lib/client-stream.js index d3188cfd956..6d9f75d4035 100644 --- a/lib/client-stream.js +++ b/lib/client-stream.js @@ -109,6 +109,12 @@ class StreamHandler extends AsyncResource { }) this.res = res + + const needDrain = res.writableNeedDrain !== undefined + ? res.writableNeedDrain + : res._writableState && res._writableState.needDrain + + return needDrain !== true } onData (chunk) { diff --git a/lib/core/client.js b/lib/core/client.js index 4aa5fa21581..25968abc5df 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -21,7 +21,6 @@ const { const { kUrl, kReset, - kPause, kHost, kResume, kClient, @@ -376,9 +375,37 @@ class Parser extends HTTPParser { this.shouldKeepAlive = false this.read = 0 this.request = null + this.paused = false + this.queue = [] + + this._resume = () => { + this.paused = false + + while (this.queue.length) { + const [fn, ...args] = this.queue.shift() + + fn.apply(this, args) + + if (this.paused) { + return + } + } + + socketResume(socket) + } + + this._pause = () => { + this.paused = true + socketPause(socket) + } } [HTTPParser.kOnHeaders] (rawHeaders) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnHeaders], rawHeaders]) + return + } + if (this.headers) { Array.prototype.push.apply(this.headers, rawHeaders) } else { @@ -387,6 +414,11 @@ class Parser extends HTTPParser { } [HTTPParser.kOnExecute] (ret) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnExecute], ret]) + return + } + const { upgrade, socket } = this if (!Number.isFinite(ret)) { @@ -438,8 +470,6 @@ class Parser extends HTTPParser { setImmediate((self) => self.close(), socket[kParser]) socket[kParser] = null - socket[kPause] = null - socket[kResume] = null socket[kClient] = null socket[kError] = null socket @@ -459,6 +489,12 @@ class Parser extends HTTPParser { [HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method, url, statusCode, statusMessage, upgrade, shouldKeepAlive) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnHeadersComplete], versionMajor, versionMinor, rawHeaders, method, + url, statusCode, statusMessage, upgrade, shouldKeepAlive]) + return + } + const { client, socket } = this const request = client[kQueue][client[kRunningIdx]] @@ -541,8 +577,8 @@ class Parser extends HTTPParser { } try { - if (request.onHeaders(statusCode, headers, socket[kResume]) === false) { - socket[kPause]() + if (request.onHeaders(statusCode, headers, this._resume) === false) { + this._pause() } } catch (err) { util.destroy(socket, err) @@ -553,6 +589,11 @@ class Parser extends HTTPParser { } [HTTPParser.kOnBody] (chunk, offset, length) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnBody], chunk, offset, length]) + return + } + const { socket, statusCode, request } = this if (socket.destroyed) { @@ -563,7 +604,7 @@ class Parser extends HTTPParser { try { if (request.onBody(chunk, offset, length) === false) { - socket[kPause]() + this._pause() } } catch (err) { util.destroy(socket, err) @@ -571,6 +612,11 @@ class Parser extends HTTPParser { } [HTTPParser.kOnMessageComplete] () { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnMessageComplete]]) + return + } + const { client, socket, statusCode, headers, upgrade, request, trailers } = this if (socket.destroyed) { @@ -636,7 +682,7 @@ class Parser extends HTTPParser { // have been queued since then. util.destroy(socket, new InformationalError('reset')) } else { - socket[kResume]() + socketResume(socket) resume(client) } } @@ -766,8 +812,6 @@ function connect (client) { socket[kIdleTimeout] = null socket[kIdleTimeoutValue] = null - socket[kPause] = socketPause.bind(socket) - socket[kResume] = socketResume.bind(socket) socket[kError] = null socket[kParser] = parser socket[kClient] = client @@ -779,24 +823,24 @@ function connect (client) { .on('close', onSocketClose) } -function socketPause () { +function socketPause (socket) { // TODO: Pause parser. - if (this._handle && this._handle.reading) { - this._handle.reading = false - const err = this._handle.readStop() + if (socket._handle && socket._handle.reading) { + socket._handle.reading = false + const err = socket._handle.readStop() if (err) { - this.destroy(util.errnoException(err, 'read')) + socket.destroy(util.errnoException(err, 'read')) } } } -function socketResume () { +function socketResume (socket) { // TODO: Resume parser. - if (this._handle && !this._handle.reading) { - this._handle.reading = true - const err = this._handle.readStart() + if (socket._handle && !socket._handle.reading) { + socket._handle.reading = true + const err = socket._handle.readStart() if (err) { - this.destroy(util.errnoException(err, 'read')) + socket.destroy(util.errnoException(err, 'read')) } } } diff --git a/lib/core/request.js b/lib/core/request.js index d4bed72afb1..63c55ee7929 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -162,6 +162,7 @@ class Request { onBody (chunk, offset, length) { assert(!this.aborted) + assert(!this[kPaused]) if (this[kTimeout] && this[kTimeout].refresh) { this[kTimeout].refresh() diff --git a/lib/core/symbols.js b/lib/core/symbols.js index de9c32c3bfe..29421e07ea8 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -5,7 +5,6 @@ module.exports = { kQueue: Symbol('queue'), kConnect: Symbol('connect'), kResume: Symbol('resume'), - kPause: Symbol('pause'), kIdleTimeout: Symbol('idle timeout'), kIdleTimeoutValue: Symbol('idle timeout value'), kKeepAliveDefaultTimeout: Symbol('default keep alive timeout'), diff --git a/test/client-stream.js b/test/client-stream.js index d36d9fc1a10..fa558f82e2f 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -657,3 +657,47 @@ test('stream body destroyed on invalid callback', (t) => { } }) }) + +test('stream needDrain', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end(Buffer.alloc(4096)) + }) + t.tearDown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.tearDown(() => { + console.error(3) + client.destroy() + }) + + const dst = new PassThrough() + dst.pause() + + while (dst.write(Buffer.alloc(4096))) { + + } + + const orgWrite = dst.write + dst.write = () => t.fail() + const p = client.stream({ + path: '/', + method: 'GET' + }, () => { + return dst + }) + + setTimeout(() => { + dst.write = (...args) => { + orgWrite.call(dst, ...args) + } + dst.resume() + }, 1e3) + + await p + + t.pass() + }) +})