Skip to content

Commit

Permalink
perf: optimize Writable
Browse files Browse the repository at this point in the history
PR-URL: #50012
  • Loading branch information
ronag committed Oct 2, 2023
1 parent 85c09f1 commit 2ea8885
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const kWriteCb = 1 << 26;
const kExpectWriteCb = 1 << 27;
const kAfterWriteTickInfo = 1 << 28;
const kAfterWritePending = 1 << 29;
const kEmptyBuffer = 1 << 30;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down Expand Up @@ -339,7 +340,7 @@ function WritableState(options, stream, isDuplex) {
function resetBuffer(state) {
state.buffered = [];
state.bufferedIndex = 0;
state.state |= kAllBuffers | kAllNoop;
state.state |= kAllBuffers | kAllNoop | kEmptyBuffer;
}

WritableState.prototype.getBuffer = function getBuffer() {
Expand Down Expand Up @@ -523,6 +524,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {

if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
state.buffered.push({ chunk, encoding, callback });
state.state &= ~kEmptyBuffer;
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
state.state &= ~kAllBuffers;
}
Expand Down Expand Up @@ -591,8 +593,9 @@ function onwrite(stream, er) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
er.stack; // eslint-disable-line no-unused-expressions

if (!state.errored) {
state.errored = er;
if ((state.state & kErrored) === 0) {
state[kErroredValue] = er;
state.state |= kErrored;
}

// In case of duplex streams we need to notify the readable side of the
Expand All @@ -607,12 +610,12 @@ function onwrite(stream, er) {
onwriteError(stream, state, er, cb);
}
} else {
if (state.buffered.length > state.bufferedIndex) {
if ((state.state & kEmptyBuffer) === 0) {
clearBuffer(stream, state);
}

if (sync) {
const needDrain = state.length === 0 && (state.state & kNeedDrain) !== 0;
const needDrain = (state.state & kNeedDrain) !== 0 && state.length === 0;
const needTick = needDrain || (state.state & kDestroyed !== 0) || cb !== nop;

// It is a common case that the callback passed to .write() is always
Expand All @@ -625,7 +628,9 @@ function onwrite(stream, er) {
state.state |= kAfterWritePending;
} else {
state.pendingcb--;
finishMaybe(stream, state, true);
if ((state.state & kEnding) !== 0) {
finishMaybe(stream, state, true);
}
}
} else if ((state.state & kAfterWriteTickInfo) !== 0 &&
state[kAfterWriteTickInfoValue].cb === cb) {
Expand All @@ -636,7 +641,9 @@ function onwrite(stream, er) {
state.state |= (kAfterWritePending | kAfterWriteTickInfo);
} else {
state.pendingcb--;
finishMaybe(stream, state, true);
if ((state.state & kEnding) !== 0) {
finishMaybe(stream, state, true);
}
}
} else {
afterWrite(stream, state, 1, cb);
Expand Down Expand Up @@ -668,7 +675,9 @@ function afterWrite(stream, state, count, cb) {
errorBuffer(state);
}

finishMaybe(stream, state);
if ((state.state & kEnding) !== 0) {
finishMaybe(stream, state, true);
}
}

// If there's something in the buffer waiting, then invoke callbacks.
Expand All @@ -692,7 +701,7 @@ function errorBuffer(state) {

// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kEmptyBuffer)) !== 0 ||
(state.state & kConstructed) === 0) {
return;
}
Expand Down

0 comments on commit 2ea8885

Please sign in to comment.