From 38b4a3fc3b61166aecf23b132272f4a54733bc23 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 27 Sep 2023 18:20:22 +0200 Subject: [PATCH] stream: writable state bitmap --- lib/internal/streams/writable.js | 192 +++++++++++++++++++++---------- 1 file changed, 133 insertions(+), 59 deletions(-) diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 595aadc23c8bec..9c0833df44f3ea 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -73,27 +73,35 @@ ObjectSetPrototypeOf(Writable, Stream); function nop() {} const kOnFinished = Symbol('kOnFinished'); +const kErrored = Symbol('kErrored'); +const kCorkedValue = Symbol('kCorked'); + +const kCorked = 0b111111; // 6 bits +const kObjectMode = 1 << 7; +const kEnded = 1 << 8; +const kConstructed = 1 << 9; +const kSync = 1 << 10; +const kErrorEmitted = 1 << 11; +const kEmitClose = 1 << 12; +const kAutoDestroy = 1 << 13; +const kDestroyed = 1 << 14; +const kClosed = 1 << 15; +const kCloseEmitted = 1 << 16; +const kFinalCalled = 1 << 17; +const kNeedDrain = 1 << 18; +const kEnding = 1 << 19; +const kFinished = 1 << 20; +const kDecodeStrings = 1 << 21; +const kWriting = 1 << 22; +const kBufferProcessing = 1 << 23; +const kPrefinished = 1 << 24; +const kAllBuffers = 1 << 25; +const kAllNoop = 1 << 26; +const kHasOnFinished = 1 << 27; +const kHasErrored = 1 << 28; +const kHasWritable = 1 << 29; +const kWritable = 1 << 30; -const kObjectMode = 1 << 0; -const kEnded = 1 << 1; -const kConstructed = 1 << 2; -const kSync = 1 << 3; -const kErrorEmitted = 1 << 4; -const kEmitClose = 1 << 5; -const kAutoDestroy = 1 << 6; -const kDestroyed = 1 << 7; -const kClosed = 1 << 8; -const kCloseEmitted = 1 << 9; -const kFinalCalled = 1 << 10; -const kNeedDrain = 1 << 11; -const kEnding = 1 << 12; -const kFinished = 1 << 13; -const kDecodeStrings = 1 << 14; -const kWriting = 1 << 15; -const kBufferProcessing = 1 << 16; -const kPrefinished = 1 << 17; -const kAllBuffers = 1 << 18; -const kAllNoop = 1 << 19; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { @@ -176,6 +184,58 @@ ObjectDefineProperties(WritableState.prototype, { allBuffers: makeBitMapDescriptor(kAllBuffers), allNoop: makeBitMapDescriptor(kAllNoop), + + // Indicates whether the stream has errored. When true all write() calls + // should return false. This is needed since when autoDestroy + // is disabled we need a way to tell whether the stream has failed. + // This is/should be a cold path. + errored: { + enumerable: false, + get() { return (this.state & kHasErrored) !== 0 ? this[kErrored] : null; }, + set(value) { + if (value) { + this[kErrored] = value; + this.state |= kHasErrored; + } else { + this.state &= ~kHasErrored; + } + }, + }, + + + writable: { + enumerable: false, + get() { return (this.state & kHasWritable) !== 0 ? (this.state & kWritable) !== 0 : null; }, + set(value) { + if (value == null) { + this.state &= (kHasWritable | kWritable); + } else if (value) { + this.state |= (kHasWritable | kWritable); + } else { + this.state |= kHasWritable; + this.state &= ~kWritable; + } + }, + }, + + // When true all writes will be buffered until .uncork() call. + // This is/should be a cold path. + corked: { + enumerable: false, + get() { + const corked = this.state & kCorked; + return corked !== kCorked ? val : this[kCorkedValue]; + }, + set(value) { + if (value < kCorked) { + this.state &= ~kCorked; + this.state |= value; + } else { + this.state |= kCorked + this[kCorkedValue] = value; + } + }, + }, }); function WritableState(options, stream, isDuplex) { @@ -226,9 +286,6 @@ function WritableState(options, stream, isDuplex) { // socket or file. this.length = 0; - // When true all writes will be buffered until .uncork() call. - this.corked = 0; - // The callback that's passed to _write(chunk, cb). this.onwrite = onwrite.bind(undefined, stream); @@ -247,13 +304,6 @@ function WritableState(options, stream, isDuplex) { // Number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted. this.pendingcb = 0; - - // Indicates whether the stream has errored. When true all write() calls - // should return false. This is needed since when autoDestroy - // is disabled we need a way to tell whether the stream has failed. - this.errored = null; - - this[kOnFinished] = []; } function resetBuffer(state) { @@ -394,17 +444,32 @@ Writable.prototype.write = function(chunk, encoding, cb) { }; Writable.prototype.cork = function() { - this._writableState.corked++; + const state = this._writableState; + + const corked = (state & kCorked) + 1; + if (corked < kCorked) { + state.state += 1; + } else { + state.corked++; + } }; Writable.prototype.uncork = function() { const state = this._writableState; - if (state.corked) { + if ((state.state & kCorked) === 0) { + return + } + + const corked = state & kCorked; + if (corked < kCorked) { + state.state -= 1; + } else { state.corked--; + } - if ((state.state & kWriting) === 0) - clearBuffer(this, state); + if ((state.state & kWriting) === 0) { + clearBuffer(this, state); } }; @@ -432,7 +497,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { if (!ret) state.state |= kNeedDrain; - if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) { + if ((state.state & kWriting) !== 0 || (state.state & (kHasErrored | kCorked)) || (state.state & kConstructed) === 0) { state.buffered.push({ chunk, encoding, callback }); if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') { state.state &= ~kAllBuffers; @@ -450,7 +515,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. - return ret && !state.errored && (state.state & kDestroyed) === 0; + return ret && (state.state & kHasErrored) === 0 && (state.state & kDestroyed) === 0; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { @@ -498,7 +563,7 @@ function onwrite(stream, er) { // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 er.stack; // eslint-disable-line no-unused-expressions - if (!state.errored) { + if ((state.state & kHasErrored) === 0) { state.errored = er; } @@ -573,9 +638,11 @@ function errorBuffer(state) { callback(state.errored ?? new ERR_STREAM_DESTROYED('write')); } - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end')); + if ((state.state & kHasOnFinished) !== 0) { + const onfinishCallbacks = state[kOnFinished].splice(0); + for (let i = 0; i < onfinishCallbacks.length; i++) { + onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end')); + } } resetBuffer(state); @@ -583,8 +650,7 @@ function errorBuffer(state) { // If there's something in the buffer waiting, then process it. function clearBuffer(stream, state) { - if (state.corked || - (state.state & (kDestroyed | kBufferProcessing)) !== 0 || + if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 || (state.state & kConstructed) === 0) { return; } @@ -669,14 +735,16 @@ Writable.prototype.end = function(chunk, encoding, cb) { } // .end() fully uncorks. - if (state.corked) { - state.corked = 1; - this.uncork(); + if ((state.state & kCorked) !== 0) { + state.state &= ~kCorked; + if ((state.state & kWriting) === 0) { + clearBuffer(this, state); + } } if (err) { // Do nothing... - } else if (!state.errored && (state.state & kEnding) === 0) { + } else if ((state.state & kErrored) === 0 && (state.state & kEnding) === 0) { // This is forgiving in terms of unnecessary calls to end() and can hide // logic errors. However, usually such errors are harmless and causing a // hard error can be disproportionately destructive. It is not always @@ -698,6 +766,8 @@ Writable.prototype.end = function(chunk, encoding, cb) { } else if ((state.state & kFinished) !== 0) { process.nextTick(cb, null); } else { + state.state |= kHasOnFinished; + state[kOnFinished] ??= []; state[kOnFinished].push(cb); } } @@ -715,10 +785,10 @@ function needFinish(state) { kFinished | kWriting | kErrorEmitted | - kCloseEmitted + kCloseEmitted | + kHasErrored )) === (kEnding | kConstructed) && state.length === 0 && - !state.errored && state.buffered.length === 0); } @@ -734,9 +804,11 @@ function callFinal(stream, state) { state.pendingcb--; if (err) { - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - onfinishCallbacks[i](err); + if ((state.state & kHasOnFinished) !== 0) { + const onfinishCallbacks = state[kOnFinished].splice(0); + for (let i = 0; i < onfinishCallbacks.length; i++) { + onfinishCallbacks[i](err); + } } errorOrDestroy(stream, err, (state.state & kSync) !== 0); } else if (needFinish(state)) { @@ -799,9 +871,11 @@ function finish(stream, state) { state.pendingcb--; state.state |= kFinished; - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - onfinishCallbacks[i](null); + if ((state.state & kHasOnFinished) !== 0) { + const onfinishCallbacks = state[kOnFinished].splice(0); + for (let i = 0; i < onfinishCallbacks.length; i++) { + onfinishCallbacks[i](null); + } } stream.emit('finish'); @@ -853,8 +927,8 @@ ObjectDefineProperties(Writable.prototype, { // where the writable side was disabled upon construction. // Compat. The user might manually disable writable side through // deprecated setter. - return !!w && w.writable !== false && !w.errored && - (w.state & (kEnding | kEnded | kDestroyed)) === 0; + return !!w && w.writable !== false && + (w.state & (kEnding | kEnded | kDestroyed | kHasErrored)) === 0; }, set(val) { // Backwards compatible. @@ -928,7 +1002,7 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, enumerable: false, get() { - return this._writableState ? this._writableState.errored : null; + return this._writableState && (this._writableState.state & kHasErrored) !== 0 ? this._writableState.errored : null; }, }, @@ -938,7 +1012,7 @@ ObjectDefineProperties(Writable.prototype, { get: function() { return !!( this._writableState.writable !== false && - ((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) && + (this._writableState.state & (kDestroyed | kHasErrored)) !== 0 && (this._writableState.state & kFinished) === 0 ); }, @@ -952,7 +1026,7 @@ Writable.prototype.destroy = function(err, cb) { // Invoke pending callbacks. if ((state.state & kDestroyed) === 0 && (state.bufferedIndex < state.buffered.length || - state[kOnFinished].length)) { + (((state.state & kHasOnFinished) !== 0) && state[kOnFinished].length))) { process.nextTick(errorBuffer, state); }