From e9bda1176127fd10290c98c8732e0e8bfdb77a79 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 5 Oct 2023 22:27:45 +0200 Subject: [PATCH] stream: lazy allocate back pressure buffer PR-URL: https://github.com/nodejs/node/pull/50013 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Luigi Pinca --- lib/internal/streams/writable.js | 64 ++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 9851b77bd8fa98..cd191fb70aa803 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -77,6 +77,7 @@ const kErroredValue = Symbol('kErroredValue'); const kDefaultEncodingValue = Symbol('kDefaultEncodingValue'); const kWriteCbValue = Symbol('kWriteCbValue'); const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue'); +const kBufferedValue = Symbol('kBufferedValue'); const kObjectMode = 1 << 0; const kEnded = 1 << 1; @@ -108,7 +109,7 @@ const kWriteCb = 1 << 26; const kExpectWriteCb = 1 << 27; const kAfterWriteTickInfo = 1 << 28; const kAfterWritePending = 1 << 29; -const kHasBuffer = 1 << 30; +const kBuffered = 1 << 30; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { @@ -270,6 +271,21 @@ ObjectDefineProperties(WritableState.prototype, { } }, }, + + buffered: { + __proto__: null, + enumerable: false, + get() { return (this.state & kBuffered) !== 0 ? this[kBufferedValue] : []; }, + set(value) { + this[kBufferedValue] = value; + if (value) { + this.state |= kBuffered; + } else { + this.state &= ~kBuffered; + } + }, + }, + }); function WritableState(options, stream, isDuplex) { @@ -338,20 +354,20 @@ function WritableState(options, stream, isDuplex) { } function resetBuffer(state) { - state.buffered = []; + state[kBufferedValue] = null; state.bufferedIndex = 0; state.state |= kAllBuffers | kAllNoop; - state.state &= ~kHasBuffer; + state.state &= ~kBuffered; } WritableState.prototype.getBuffer = function getBuffer() { - return ArrayPrototypeSlice(this.buffered, this.bufferedIndex); + return (this.state & kBuffered) === 0 ? [] : ArrayPrototypeSlice(this.buffered, this.bufferedIndex); }; ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { __proto__: null, get() { - return this.buffered.length - this.bufferedIndex; + return (this.state & kBuffered) === 0 ? 0 : this[kBufferedValue].length - this.bufferedIndex; }, }); @@ -518,8 +534,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { state.length += len; if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) { - state.buffered.push({ chunk, encoding, callback }); - state.state |= kHasBuffer; + if ((state.state & kBuffered) === 0) { + state.state |= kBuffered; + state[kBufferedValue] = []; + } + + state[kBufferedValue].push({ chunk, encoding, callback }); if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') { state.state &= ~kAllBuffers; } @@ -611,7 +631,7 @@ function onwrite(stream, er) { onwriteError(stream, state, er, cb); } } else { - if ((state.state & kHasBuffer) !== 0) { + if ((state.state & kBuffered) !== 0) { clearBuffer(stream, state); } @@ -687,11 +707,13 @@ function errorBuffer(state) { return; } - for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { - const { chunk, callback } = state.buffered[n]; - const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length; - state.length -= len; - callback(state.errored ?? new ERR_STREAM_DESTROYED('write')); + if ((state.state & kBuffered) !== 0) { + for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { + const { chunk, callback } = state[kBufferedValue][n]; + const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length; + state.length -= len; + callback(state.errored ?? new ERR_STREAM_DESTROYED('write')); + } } @@ -702,13 +724,12 @@ function errorBuffer(state) { // If there's something in the buffer waiting, then process it. function clearBuffer(stream, state) { - if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kHasBuffer)) !== kHasBuffer || - (state.state & kConstructed) === 0) { + if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kBuffered)) !== kBuffered) { return; } const objectMode = (state.state & kObjectMode) !== 0; - const { buffered, bufferedIndex } = state; + const { [kBufferedValue]: buffered, bufferedIndex } = state; const bufferedLength = buffered.length - bufferedIndex; if (!bufferedLength) { @@ -838,10 +859,9 @@ function needFinish(state) { kWriting | kErrorEmitted | kCloseEmitted | - kErrored - )) === (kEnding | kConstructed) && - state.length === 0 && - state.buffered.length === 0); + kErrored | + kBuffered + )) === (kEnding | kConstructed) && state.length === 0); } function callFinal(stream, state) { @@ -1083,9 +1103,7 @@ Writable.prototype.destroy = function(err, cb) { const state = this._writableState; // Invoke pending callbacks. - if ((state.state & kDestroyed) === 0 && - (state.bufferedIndex < state.buffered.length || - (state.state & kOnFinished) !== 0)) { + if ((state.state & (kBuffered | kOnFinished | kDestroyed)) !== kDestroyed) { process.nextTick(errorBuffer, state); }