diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 49df23cba9f4c2..f551053bf7b79c 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -73,6 +73,7 @@ const { const { validateObject } = require('internal/validators'); const kPaused = Symbol('kPaused'); +const kState = Symbol('kState'); const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); @@ -107,10 +108,10 @@ const kDataEmitted = 1 << 18; function makeBitMapDescriptor(bit) { return { enumerable: false, - get() { return (this.state & bit) !== 0; }, + get() { return (this[kState] & bit) !== 0; }, set(value) { - if (value) this.state |= bit; - else this.state &= ~bit; + if (value) this[kState] |= bit; + else this[kState] &= ~bit; }, }; } @@ -163,13 +164,13 @@ function ReadableState(options, stream, isDuplex) { // Bit map field to store ReadableState more effciently with 1 bit per field // instead of a V8 slot per field. - this.state = kEmitClose | kAutoDestroy | kConstructed | kSync; + this[kState] = kEmitClose | kAutoDestroy | kConstructed | kSync; // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. - if (options && options.objectMode) this.state |= kObjectMode; + if (options && options.objectMode) this[kState] |= kObjectMode; if (isDuplex && options && options.readableObjectMode) - this.state |= kObjectMode; + this[kState] |= kObjectMode; // The point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" @@ -188,10 +189,10 @@ function ReadableState(options, stream, isDuplex) { this[kPaused] = null; // Should close be emitted on destroy. Defaults to true. - if (options && options.emitClose === false) this.state &= ~kEmitClose; + if (options && options.emitClose === false) this[kState] &= ~kEmitClose; // Should .destroy() be called after 'end' (and potentially 'finish'). - if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy; + if (options && options.autoDestroy === false) this[kState] &= ~kAutoDestroy; // Indicates whether the stream has errored. When true no further @@ -296,7 +297,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { const state = stream._readableState; let err; - if ((state.state & kObjectMode) === 0) { + if ((state[kState] & kObjectMode) === 0) { if (typeof chunk === 'string') { encoding = encoding || state.defaultEncoding; if (state.encoding !== encoding) { @@ -323,11 +324,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { if (err) { errorOrDestroy(stream, err); } else if (chunk === null) { - state.state &= ~kReading; + state[kState] &= ~kReading; onEofChunk(stream, state); - } else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { + } else if (((state[kState] & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { if (addToFront) { - if ((state.state & kEndEmitted) !== 0) + if ((state[kState] & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); else if (state.destroyed || state.errored) return false; @@ -338,7 +339,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } else if (state.destroyed || state.errored) { return false; } else { - state.state &= ~kReading; + state[kState] &= ~kReading; if (state.decoder && !encoding) { chunk = state.decoder.write(chunk); if (state.objectMode || chunk.length !== 0) @@ -350,7 +351,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } } } else if (!addToFront) { - state.state &= ~kReading; + state[kState] &= ~kReading; maybeReadMore(stream, state); } @@ -366,7 +367,7 @@ function addChunk(stream, state, chunk, addToFront) { stream.listenerCount('data') > 0) { // Use the guard to avoid creating `Set()` repeatedly // when we have multiple pipes. - if ((state.state & kMultiAwaitDrain) !== 0) { + if ((state[kState] & kMultiAwaitDrain) !== 0) { state.awaitDrainWriters.clear(); } else { state.awaitDrainWriters = null; @@ -382,7 +383,7 @@ function addChunk(stream, state, chunk, addToFront) { else state.buffer.push(chunk); - if ((state.state & kNeedReadable) !== 0) + if ((state[kState] & kNeedReadable) !== 0) emitReadable(stream); } maybeReadMore(stream, state); @@ -437,7 +438,7 @@ function computeNewHighWaterMark(n) { function howMuchToRead(n, state) { if (n <= 0 || (state.length === 0 && state.ended)) return 0; - if ((state.state & kObjectMode) !== 0) + if ((state[kState] & kObjectMode) !== 0) return 1; if (NumberIsNaN(n)) { // Only flow one buffer at a time. @@ -468,7 +469,7 @@ Readable.prototype.read = function(n) { state.highWaterMark = computeNewHighWaterMark(n); if (n !== 0) - state.state &= ~kEmittedReadable; + state[kState] &= ~kEmittedReadable; // If we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger @@ -519,7 +520,7 @@ Readable.prototype.read = function(n) { // 3. Actually pull the requested chunks out of the buffer and return. // if we need a readable event, then we need to do some reading. - let doRead = (state.state & kNeedReadable) !== 0; + let doRead = (state[kState] & kNeedReadable) !== 0; debug('need readable', doRead); // If we currently have less than the highWaterMark, then also read some. @@ -537,10 +538,10 @@ Readable.prototype.read = function(n) { debug('reading, ended or constructing', doRead); } else if (doRead) { debug('do read'); - state.state |= kReading | kSync; + state[kState] |= kReading | kSync; // If the length is currently zero, then we *need* a readable event. if (state.length === 0) - state.state |= kNeedReadable; + state[kState] |= kNeedReadable; // Call internal read method try { @@ -548,7 +549,7 @@ Readable.prototype.read = function(n) { } catch (err) { errorOrDestroy(this, err); } - state.state &= ~kSync; + state[kState] &= ~kSync; // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 7b1896baeb47c2..5a0390d1c03639 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -77,6 +77,8 @@ const kErroredValue = Symbol('kErroredValue'); const kDefaultEncodingValue = Symbol('kDefaultEncodingValue'); const kWriteCbValue = Symbol('kWriteCbValue'); const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue'); +const kOnFinished = Symbol('kOnFinished'); +const kState = Symbol('kState'); const kObjectMode = 1 << 0; const kEnded = 1 << 1; @@ -113,10 +115,10 @@ const kAfterWritePending = 1 << 29; function makeBitMapDescriptor(bit) { return { enumerable: false, - get() { return (this.state & bit) !== 0; }, + get() { return (this[kState] & bit) !== 0; }, set(value) { - if (value) this.state |= bit; - else this.state &= ~bit; + if (value) this[kState] |= bit; + else this[kState] &= ~bit; }, }; } @@ -198,13 +200,13 @@ ObjectDefineProperties(WritableState.prototype, { errored: { __proto__: null, enumerable: false, - get() { return (this.state & kErrored) !== 0 ? this[kErroredValue] : null; }, + get() { return (this[kState] & kErrored) !== 0 ? this[kErroredValue] : null; }, set(value) { if (value) { this[kErroredValue] = value; - this.state |= kErrored; + this[kState] |= kErrored; } else { - this.state &= ~kErrored; + this[kState] &= ~kErrored; } }, }, @@ -212,15 +214,15 @@ ObjectDefineProperties(WritableState.prototype, { writable: { __proto__: null, enumerable: false, - get() { return (this.state & kHasWritable) !== 0 ? (this.state & kWritable) !== 0 : undefined; }, + get() { return (this[kState] & kHasWritable) !== 0 ? (this[kState] & kWritable) !== 0 : undefined; }, set(value) { if (value == null) { - this.state &= ~(kHasWritable | kWritable); + this[kState] &= ~(kHasWritable | kWritable); } else if (value) { - this.state |= (kHasWritable | kWritable); + this[kState] |= (kHasWritable | kWritable); } else { - this.state |= kHasWritable; - this.state &= ~kWritable; + this[kState] |= kHasWritable; + this[kState] &= ~kWritable; } }, }, @@ -228,12 +230,12 @@ ObjectDefineProperties(WritableState.prototype, { defaultEncoding: { __proto__: null, enumerable: false, - get() { return (this.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : this[kDefaultEncodingValue]; }, + get() { return (this[kState] & kDefaultUTF8Encoding) !== 0 ? 'utf8' : this[kDefaultEncodingValue]; }, set(value) { if (value === 'utf8' || value === 'utf-8') { - this.state |= kDefaultUTF8Encoding; + this[kState] |= kDefaultUTF8Encoding; } else { - this.state &= ~kDefaultUTF8Encoding; + this[kState] &= ~kDefaultUTF8Encoding; this[kDefaultEncodingValue] = value; } }, @@ -243,13 +245,13 @@ ObjectDefineProperties(WritableState.prototype, { writecb: { __proto__: null, enumerable: false, - get() { return (this.state & kWriteCb) !== 0 ? this[kWriteCbValue] : nop; }, + get() { return (this[kState] & kWriteCb) !== 0 ? this[kWriteCbValue] : nop; }, set(value) { if (value) { this[kWriteCbValue] = value; - this.state |= kWriteCb; + this[kState] |= kWriteCb; } else { - this.state &= ~kWriteCb; + this[kState] &= ~kWriteCb; } }, }, @@ -259,13 +261,13 @@ ObjectDefineProperties(WritableState.prototype, { afterWriteTickInfo: { __proto__: null, enumerable: false, - get() { return (this.state & kAfterWriteTickInfo) !== 0 ? this[kAfterWriteTickInfoValue] : null; }, + get() { return (this[kState] & kAfterWriteTickInfo) !== 0 ? this[kAfterWriteTickInfoValue] : null; }, set(value) { if (value) { this[kAfterWriteTickInfoValue] = value; - this.state |= kAfterWriteTickInfo; + this[kState] |= kAfterWriteTickInfo; } else { - this.state &= ~kAfterWriteTickInfo; + this[kState] &= ~kAfterWriteTickInfo; } }, }, @@ -282,10 +284,10 @@ function WritableState(options, stream, isDuplex) { // Bit map field to store WritableState more effciently with 1 bit per field // instead of a V8 slot per field. - this.state = kSync | kConstructed | kEmitClose | kAutoDestroy; + this[kState] = kSync | kConstructed | kEmitClose | kAutoDestroy; - if (options && options.objectMode) this.state |= kObjectMode; - if (isDuplex && options && options.writableObjectMode) this.state |= kObjectMode; + if (options && options.objectMode) this[kState] |= kObjectMode; + if (isDuplex && options && options.writableObjectMode) this[kState] |= kObjectMode; // The point at which write() starts returning false // Note: 0 is a valid value, means that we always return false if @@ -294,22 +296,22 @@ function WritableState(options, stream, isDuplex) { getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) : getDefaultHighWaterMark(false); - if (!options || options.decodeStrings !== false) this.state |= kDecodeStrings; + if (!options || options.decodeStrings !== false) this[kState] |= kDecodeStrings; // Should close be emitted on destroy. Defaults to true. - if (options && options.emitClose === false) this.state &= ~kEmitClose; + if (options && options.emitClose === false) this[kState] &= ~kEmitClose; // Should .destroy() be called after 'end' (and potentially 'finish'). - if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy; + if (options && options.autoDestroy === false) this[kState] &= ~kAutoDestroy; // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. const defaultEncoding = options?.defaultEncoding; if (defaultEncoding == null || defaultEncoding === 'utf8' || defaultEncoding === 'utf-8') { - this.state |= kDefaultUTF8Encoding; + this[kState] |= kDefaultUTF8Encoding; } else if (Buffer.isEncoding(defaultEncoding)) { - this.state &= ~kDefaultUTF8Encoding; + this[kState] &= ~kDefaultUTF8Encoding; this[kDefaultEncodingValue] = defaultEncoding; } else { throw new ERR_UNKNOWN_ENCODING(defaultEncoding); @@ -339,7 +341,7 @@ function WritableState(options, stream, isDuplex) { function resetBuffer(state) { state.buffered = []; state.bufferedIndex = 0; - state.state |= kAllBuffers | kAllNoop; + state[kState] |= kAllBuffers | kAllNoop; } WritableState.prototype.getBuffer = function getBuffer() { @@ -424,10 +426,10 @@ function _write(stream, chunk, encoding, cb) { if (typeof encoding === 'function') { cb = encoding; - encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; + encoding = (state[kState] & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; } else { if (!encoding) - encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; + encoding = (state[kState] & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) throw new ERR_UNKNOWN_ENCODING(encoding); if (typeof cb !== 'function') @@ -436,9 +438,9 @@ function _write(stream, chunk, encoding, cb) { if (chunk === null) { throw new ERR_STREAM_NULL_VALUES(); - } else if ((state.state & kObjectMode) === 0) { + } else if ((state[kState] & kObjectMode) === 0) { if (typeof chunk === 'string') { - if ((state.state & kDecodeStrings) !== 0) { + if ((state[kState] & kDecodeStrings) !== 0) { chunk = Buffer.from(chunk, encoding); encoding = 'buffer'; } @@ -454,9 +456,9 @@ function _write(stream, chunk, encoding, cb) { } let err; - if ((state.state & kEnding) !== 0) { + if ((state[kState] & kEnding) !== 0) { err = new ERR_STREAM_WRITE_AFTER_END(); - } else if ((state.state & kDestroyed) !== 0) { + } else if ((state[kState] & kDestroyed) !== 0) { err = new ERR_STREAM_DESTROYED('write'); } @@ -476,7 +478,7 @@ Writable.prototype.write = function(chunk, encoding, cb) { Writable.prototype.cork = function() { const state = this._writableState; - state.state |= kCorked; + state[kState] |= kCorked; state.corked++; }; @@ -487,10 +489,10 @@ Writable.prototype.uncork = function() { state.corked--; if (!state.corked) { - state.state &= ~kCorked; + state[kState] &= ~kCorked; } - if ((state.state & kWriting) === 0) + if ((state[kState] & kWriting) === 0) clearBuffer(this, state); } }; @@ -509,7 +511,7 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, chunk, encoding, callback) { - const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length; + const len = (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; state.length += len; @@ -518,30 +520,30 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { // We must ensure that previous needDrain will not be reset to false. if (!ret) { - state.state |= kNeedDrain; + state[kState] |= kNeedDrain; } - if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) { + if ((state[kState] & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) { state.buffered.push({ chunk, encoding, callback }); - if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') { - state.state &= ~kAllBuffers; + if ((state[kState] & kAllBuffers) !== 0 && encoding !== 'buffer') { + state[kState] &= ~kAllBuffers; } - if ((state.state & kAllNoop) !== 0 && callback !== nop) { - state.state &= ~kAllNoop; + if ((state[kState] & kAllNoop) !== 0 && callback !== nop) { + state[kState] &= ~kAllNoop; } } else { state.writelen = len; if (callback !== nop) { state.writecb = callback; } - state.state |= kWriting | kSync | kExpectWriteCb; + state[kState] |= kWriting | kSync | kExpectWriteCb; stream._write(chunk, encoding, state.onwrite); - state.state &= ~kSync; + state[kState] &= ~kSync; } // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. - return ret && (state.state & (kDestroyed | kErrored)) === 0; + return ret && (state[kState] & (kDestroyed | kErrored)) === 0; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { @@ -549,14 +551,14 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { if (cb !== nop) { state.writecb = cb; } - state.state |= kWriting | kSync | kExpectWriteCb; - if ((state.state & kDestroyed) !== 0) + state[kState] |= kWriting | kSync | kExpectWriteCb; + if ((state[kState] & kDestroyed) !== 0) state.onwrite(new ERR_STREAM_DESTROYED('write')); else if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); - state.state &= ~kSync; + state[kState] &= ~kSync; } function onwriteError(stream, state, er, cb) { @@ -575,15 +577,15 @@ function onwriteError(stream, state, er, cb) { function onwrite(stream, er) { const state = stream._writableState; - if ((state.state & kExpectWriteCb) === 0) { + if ((state[kState] & kExpectWriteCb) === 0) { errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); return; } - const sync = (state.state & kSync) !== 0; - const cb = (state.state & kWriteCb) !== 0 ? state[kWriteCbValue] : nop; + const sync = (state[kState] & kSync) !== 0; + const cb = (state[kState] & kWriteCb) !== 0 ? state[kWriteCbValue] : nop; - state.state &= ~(kWriting | kExpectWriteCb | kWriteCb); + state[kState] &= ~(kWriting | kExpectWriteCb | kWriteCb); state.length -= state.writelen; state.writelen = 0; @@ -617,9 +619,9 @@ function onwrite(stream, er) { // rather just increase a counter, to improve performance and avoid // memory allocations. if (cb === nop) { - if ((state.state & kAfterWritePending) === 0) { + if ((state[kState] & kAfterWritePending) === 0) { process.nextTick(afterWrite, stream, state, 1, cb); - state.state |= kAfterWritePending; + state[kState] |= kAfterWritePending; } else { state.pendingcb -= 1; } @@ -629,7 +631,7 @@ function onwrite(stream, er) { } else { state.afterWriteTickInfo = { count: 1, cb, stream, state }; process.nextTick(afterWriteTick, state.afterWriteTickInfo); - state.state |= kAfterWritePending; + state[kState] |= kAfterWritePending; } } else { afterWrite(stream, state, 1, cb); @@ -643,11 +645,11 @@ function afterWriteTick({ stream, state, count, cb }) { } function afterWrite(stream, state, count, cb) { - state.state &= ~kAfterWritePending; + state[kState] &= ~kAfterWritePending; - const needDrain = (state.state & (kEnding | kNeedDrain | kDestroyed)) === kNeedDrain && state.length === 0; + const needDrain = (state[kState] & (kEnding | kNeedDrain | kDestroyed)) === kNeedDrain && state.length === 0; if (needDrain) { - state.state &= ~kNeedDrain; + state[kState] &= ~kNeedDrain; stream.emit('drain'); } @@ -656,7 +658,7 @@ function afterWrite(stream, state, count, cb) { cb(null); } - if ((state.state & kDestroyed) !== 0) { + if ((state[kState] & kDestroyed) !== 0) { errorBuffer(state); } @@ -665,13 +667,13 @@ function afterWrite(stream, state, count, cb) { // If there's something in the buffer waiting, then invoke callbacks. function errorBuffer(state) { - if ((state.state & kWriting) !== 0) { + if ((state[kState] & kWriting) !== 0) { return; } for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { const { chunk, callback } = state.buffered[n]; - const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length; + const len = (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; state.length -= len; callback(state.errored ?? new ERR_STREAM_DESTROYED('write')); } @@ -684,12 +686,12 @@ function errorBuffer(state) { // If there's something in the buffer waiting, then process it. function clearBuffer(stream, state) { - if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 || - (state.state & kConstructed) === 0) { + if ((state[kState] & (kDestroyed | kBufferProcessing | kCorked)) !== 0 || + (state[kState] & kConstructed) === 0) { return; } - const objectMode = (state.state & kObjectMode) !== 0; + const objectMode = (state[kState] & kObjectMode) !== 0; const { buffered, bufferedIndex } = state; const bufferedLength = buffered.length - bufferedIndex; @@ -699,20 +701,20 @@ function clearBuffer(stream, state) { let i = bufferedIndex; - state.state |= kBufferProcessing; + state[kState] |= kBufferProcessing; if (bufferedLength > 1 && stream._writev) { state.pendingcb -= bufferedLength - 1; - const callback = (state.state & kAllNoop) !== 0 ? nop : (err) => { + const callback = (state[kState] & kAllNoop) !== 0 ? nop : (err) => { for (let n = i; n < buffered.length; ++n) { buffered[n].callback(err); } }; // Make a copy of `buffered` if it's going to be used by `callback` above, // since `doWrite` will mutate the array. - const chunks = (state.state & kAllNoop) !== 0 && i === 0 ? + const chunks = (state[kState] & kAllNoop) !== 0 && i === 0 ? buffered : ArrayPrototypeSlice(buffered, i); - chunks.allBuffers = (state.state & kAllBuffers) !== 0; + chunks.allBuffers = (state[kState] & kAllBuffers) !== 0; doWrite(stream, state, true, state.length, chunks, '', callback); @@ -723,7 +725,7 @@ function clearBuffer(stream, state) { buffered[i++] = null; const len = objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, callback); - } while (i < buffered.length && (state.state & kWriting) === 0); + } while (i < buffered.length && (state[kState] & kWriting) === 0); if (i === buffered.length) { resetBuffer(state); @@ -734,7 +736,7 @@ function clearBuffer(stream, state) { state.bufferedIndex = i; } } - state.state &= ~kBufferProcessing; + state[kState] &= ~kBufferProcessing; } Writable.prototype._write = function(chunk, encoding, cb) { @@ -769,36 +771,36 @@ Writable.prototype.end = function(chunk, encoding, cb) { } // .end() fully uncorks. - if ((state.state & kCorked) !== 0) { + if ((state[kState] & kCorked) !== 0) { state.corked = 1; this.uncork(); } if (err) { // Do nothing... - } else if ((state.state & (kEnding | kErrored)) === 0) { + } else if ((state[kState] & (kEnding | kErrored)) === 0) { // This is forgiving in terms of unnecessary calls to end() and can hide // logic errors. However, usually such errors are harmless and causing a // hard error can be disproportionately destructive. It is not always // trivial for the user to determine whether end() needs to be called // or not. - state.state |= kEnding; + state[kState] |= kEnding; finishMaybe(this, state, true); - state.state |= kEnded; - } else if ((state.state & kFinished) !== 0) { + state[kState] |= kEnded; + } else if ((state[kState] & kFinished) !== 0) { err = new ERR_STREAM_ALREADY_FINISHED('end'); - } else if ((state.state & kDestroyed) !== 0) { + } else if ((state[kState] & kDestroyed) !== 0) { err = new ERR_STREAM_DESTROYED('end'); } if (typeof cb === 'function') { if (err) { process.nextTick(cb, err); - } else if ((state.state & kFinished) !== 0) { + } else if ((state[kState] & kFinished) !== 0) { process.nextTick(cb, null); } else { - state.state |= kOnFinished; + state[kState] |= kOnFinished; state[kOnFinishedValue] ??= []; state[kOnFinishedValue].push(cb); } @@ -810,7 +812,7 @@ Writable.prototype.end = function(chunk, encoding, cb) { function needFinish(state) { return ( // State is ended && constructed but not destroyed, finished, writing, errorEmitted or closedEmitted - (state.state & ( + (state[kState] & ( kEnding | kDestroyed | kConstructed | @@ -837,9 +839,9 @@ function callFinal(stream, state) { state.pendingcb--; if (err) { callFinishedCallbacks(state, err); - errorOrDestroy(stream, err, (state.state & kSync) !== 0); + errorOrDestroy(stream, err, (state[kState] & kSync) !== 0); } else if (needFinish(state)) { - state.state |= kPrefinished; + state[kState] |= kPrefinished; stream.emit('prefinish'); // Backwards compat. Don't check state.sync here. // Some streams assume 'finish' will be emitted @@ -849,7 +851,7 @@ function callFinal(stream, state) { } } - state.state |= kSync; + state[kState] |= kSync; state.pendingcb++; try { @@ -858,16 +860,16 @@ function callFinal(stream, state) { onFinish(err); } - state.state &= ~kSync; + state[kState] &= ~kSync; } function prefinish(stream, state) { - if ((state.state & (kPrefinished | kFinalCalled)) === 0) { - if (typeof stream._final === 'function' && (state.state & kDestroyed) === 0) { - state.state |= kFinalCalled; + if ((state[kState] & (kPrefinished | kFinalCalled)) === 0) { + if (typeof stream._final === 'function' && (state[kState] & kDestroyed) === 0) { + state[kState] |= kFinalCalled; callFinal(stream, state); } else { - state.state |= kPrefinished; + state[kState] |= kPrefinished; stream.emit('prefinish'); } } @@ -896,13 +898,13 @@ function finishMaybe(stream, state, sync) { function finish(stream, state) { state.pendingcb--; - state.state |= kFinished; + state[kState] |= kFinished; callFinishedCallbacks(state, null); stream.emit('finish'); - if ((state.state & kAutoDestroy) !== 0) { + if ((state[kState] & kAutoDestroy) !== 0) { // In case of duplex streams we need a way to detect // if the readable side is ready for autoDestroy as well. const rState = stream._readableState; @@ -919,13 +921,13 @@ function finish(stream, state) { } function callFinishedCallbacks(state, err) { - if ((state.state & kOnFinished) === 0) { + if ((state[kState] & kOnFinished) === 0) { return; } const onfinishCallbacks = state[kOnFinishedValue]; state[kOnFinishedValue] = null; - state.state &= ~kOnFinished; + state[kState] &= ~kOnFinished; for (let i = 0; i < onfinishCallbacks.length; i++) { onfinishCallbacks[i](err); } @@ -935,21 +937,21 @@ ObjectDefineProperties(Writable.prototype, { closed: { __proto__: null, get() { - return this._writableState ? (this._writableState.state & kClosed) !== 0 : false; + return this._writableState ? (this._writableState[kState] & kClosed) !== 0 : false; }, }, destroyed: { __proto__: null, get() { - return this._writableState ? (this._writableState.state & kDestroyed) !== 0 : false; + return this._writableState ? (this._writableState[kState] & kDestroyed) !== 0 : false; }, set(value) { // Backward compatibility, the user is explicitly managing destroyed. if (!this._writableState) return; - if (value) this._writableState.state |= kDestroyed; - else this._writableState.state &= ~kDestroyed; + if (value) this._writableState[kState] |= kDestroyed; + else this._writableState[kState] &= ~kDestroyed; }, }, @@ -962,7 +964,7 @@ ObjectDefineProperties(Writable.prototype, { // Compat. The user might manually disable writable side through // deprecated setter. return !!w && w.writable !== false && !w.errored && - (w.state & (kEnding | kEnded | kDestroyed)) === 0; + (w[kState] & (kEnding | kEnded | kDestroyed)) === 0; }, set(val) { // Backwards compatible. @@ -976,7 +978,7 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, get() { const state = this._writableState; - return state ? (state.state & kFinished) !== 0 : false; + return state ? (state[kState] & kFinished) !== 0 : false; }, }, @@ -984,11 +986,8 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, get() { const state = this._writableState; - return state ? (state.state & kObjectMode) !== 0 : false; + return state ? (state[kState] & kObjectMode) !== 0 : false; }, - }, - - writableBuffer: { __proto__: null, get() { const state = this._writableState; @@ -1000,7 +999,7 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, get() { const state = this._writableState; - return state ? (state.state & kEnding) !== 0 : false; + return state ? (state[kState] & kEnding) !== 0 : false; }, }, @@ -1008,15 +1007,7 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, get() { const state = this._writableState; - return state ? (state.state & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain : false; - }, - }, - - writableHighWaterMark: { - __proto__: null, - get() { - const state = this._writableState; - return state && state.highWaterMark; + return state ? (state[kState] & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain : false; }, }, @@ -1050,9 +1041,9 @@ ObjectDefineProperties(Writable.prototype, { get: function() { const state = this._writableState; return ( - (state.state & (kHasWritable | kWritable)) !== kHasWritable && - (state.state & (kDestroyed | kErrored)) !== 0 && - (state.state & kFinished) === 0 + (state[kState] & (kHasWritable | kWritable)) !== kHasWritable && + (state[kState] & (kDestroyed | kErrored)) !== 0 && + (state[kState] & kFinished) === 0 ); }, }, @@ -1063,9 +1054,9 @@ Writable.prototype.destroy = function(err, cb) { const state = this._writableState; // Invoke pending callbacks. - if ((state.state & kDestroyed) === 0 && + if ((state[kState] & kDestroyed) === 0 && (state.bufferedIndex < state.buffered.length || - (state.state & kOnFinished) !== 0)) { + (state[kState] & kOnFinished) !== 0)) { process.nextTick(errorBuffer, state); }