From 7abc61f668255fc8ac829a71afcbddcf02a1281c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 15 Dec 2019 12:19:16 +0100 Subject: [PATCH] stream: refactor Writable buffering Refactors buffering in Writable to use an array instead of a linked list. PR-URL: https://github.com/nodejs/node/pull/31046 Reviewed-By: Ruben Bridgewater Reviewed-By: Denys Otrishko Reviewed-By: Matteo Collina --- lib/_stream_writable.js | 220 ++++++++++++++++------------------------ 1 file changed, 88 insertions(+), 132 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 399c27617d17c8..1d02e2ff8e27f6 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -26,7 +26,6 @@ 'use strict'; const { - Array, FunctionPrototype, ObjectDefineProperty, ObjectDefineProperties, @@ -150,8 +149,7 @@ function WritableState(options, stream, isDuplex) { // synchronous _write() completion. this.afterWriteTickInfo = null; - this.bufferedRequest = null; - this.lastBufferedRequest = null; + resetBuffer(this); // Number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted @@ -177,27 +175,25 @@ function WritableState(options, stream, isDuplex) { // Indicates whether the stream has finished destroying. this.closed = false; +} - // Count buffered requests - this.bufferedRequestCount = 0; - - // Allocate the first CorkedRequest, there is always - // one allocated and free to use, and we maintain at most two - const corkReq = { next: null, entry: null, finish: undefined }; - corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this); - this.corkedRequestsFree = corkReq; +function resetBuffer(state) { + state.buffered = []; + state.bufferedIndex = 0; + state.allBuffers = true; + state.allNoop = true; } WritableState.prototype.getBuffer = function getBuffer() { - let current = this.bufferedRequest; - const out = []; - while (current) { - out.push(current); - current = current.next; - } - return out; + return this.buffered.slice(this.bufferedIndex); }; +ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { + get() { + return this.buffered.length - this.bufferedIndex; + } +}); + // Test _writableState for inheritance to account for Duplex streams, // whose prototype chain only points to Readable. let realHasInstance; @@ -318,10 +314,7 @@ Writable.prototype.uncork = function() { if (state.corked) { state.corked--; - if (!state.writing && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) + if (!state.writing) clearBuffer(this, state); } }; @@ -339,7 +332,7 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { // If we're already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. -function writeOrBuffer(stream, state, chunk, encoding, cb) { +function writeOrBuffer(stream, state, chunk, encoding, callback) { const len = state.objectMode ? 1 : chunk.length; state.length += len; @@ -350,22 +343,16 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { state.needDrain = true; if (state.writing || state.corked || state.errored) { - const last = state.lastBufferedRequest; - state.lastBufferedRequest = { - chunk, - encoding, - callback: cb, - next: null - }; - if (last) { - last.next = state.lastBufferedRequest; - } else { - state.bufferedRequest = state.lastBufferedRequest; + state.buffered.push({ chunk, encoding, callback }); + if (state.allBuffers && encoding !== 'buffer') { + state.allBuffers = false; + } + if (state.allNoop && callback !== nop) { + state.allNoop = false; } - state.bufferedRequestCount += 1; } else { state.writelen = len; - state.writecb = cb; + state.writecb = callback; state.writing = true; state.sync = true; stream._write(chunk, encoding, state.onwrite); @@ -427,30 +414,27 @@ function onwrite(stream, er) { onwriteError(stream, state, er, cb); } } else { - // Check if we're actually ready to finish, but don't emit yet - const finished = needFinish(state) || stream.destroyed; - - if (!finished && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) { + if (!state.destroyed) { clearBuffer(stream, state); } - - if (sync) { - // It is a common case that the callback passed to .write() is always - // the same. In that case, we do not schedule a new nextTick(), but rather - // just increase a counter, to improve performance and avoid memory - // allocations. - if (state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb) { - state.afterWriteTickInfo.count++; + if (state.needDrain || cb !== nop || state.ending || state.destroyed) { + if (sync) { + // It is a common case that the callback passed to .write() is always + // the same. In that case, we do not schedule a new nextTick(), but + // rather just increase a counter, to improve performance and avoid + // memory allocations. + if (state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb) { + state.afterWriteTickInfo.count++; + } else { + state.afterWriteTickInfo = { count: 1, cb, stream, state }; + process.nextTick(afterWriteTick, state.afterWriteTickInfo); + } } else { - state.afterWriteTickInfo = { count: 1, cb, stream, state }; - process.nextTick(afterWriteTick, state.afterWriteTickInfo); + afterWrite(stream, state, 1, cb); } } else { - afterWrite(stream, state, 1, cb); + state.pendingcb--; } } } @@ -482,83 +466,69 @@ function afterWrite(stream, state, count, cb) { // If there's something in the buffer waiting, then invoke callbacks. function errorBuffer(state, err) { - if (state.writing || !state.bufferedRequest) { + if (state.writing) { return; } - for (let entry = state.bufferedRequest; entry; entry = entry.next) { - const len = state.objectMode ? 1 : entry.chunk.length; + for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { + const { chunk, callback } = state.buffered[n]; + const len = state.objectMode ? 1 : chunk.length; state.length -= len; - entry.callback(err); + callback(err); } - state.bufferedRequest = null; - state.lastBufferedRequest = null; - state.bufferedRequestCount = 0; + + resetBuffer(state); } // If there's something in the buffer waiting, then process it function clearBuffer(stream, state) { + if (state.corked || state.bufferProcessing) { + return; + } + + const { buffered, bufferedIndex, objectMode } = state; + const bufferedLength = buffered.length - bufferedIndex; + + if (!bufferedLength) { + return; + } + + let i = bufferedIndex; + state.bufferProcessing = true; - let entry = state.bufferedRequest; - - if (stream._writev && entry && entry.next) { - // Fast case, write everything using _writev() - const l = state.bufferedRequestCount; - const buffer = new Array(l); - const holder = state.corkedRequestsFree; - holder.entry = entry; - - let count = 0; - let allBuffers = true; - while (entry) { - buffer[count] = entry; - if (entry.encoding !== 'buffer') - allBuffers = false; - entry = entry.next; - count += 1; - } - buffer.allBuffers = allBuffers; + if (bufferedLength > 1 && stream._writev) { + state.pendingcb -= bufferedLength - 1; + + const callback = state.allNoop ? nop : (err) => { + for (let n = i; n < buffered.length; ++n) { + buffered[n].callback(err); + } + }; + // Make a copy of `buffered` if it's going to be used by `callback` above, + // since `doWrite` will mutate the array. + const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); + chunks.allBuffers = state.allBuffers; - doWrite(stream, state, true, state.length, buffer, '', holder.finish); + doWrite(stream, state, true, state.length, chunks, '', callback); - // doWrite is almost always async, defer these to save a bit of time - // as the hot path ends with doWrite - state.pendingcb++; - state.lastBufferedRequest = null; - if (holder.next) { - state.corkedRequestsFree = holder.next; - holder.next = null; - } else { - const corkReq = { next: null, entry: null, finish: undefined }; - corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state); - state.corkedRequestsFree = corkReq; - } - state.bufferedRequestCount = 0; + resetBuffer(state); } else { - // Slow case, write chunks one-by-one - while (entry) { - const chunk = entry.chunk; - const encoding = entry.encoding; - const cb = entry.callback; - const len = state.objectMode ? 1 : chunk.length; - - doWrite(stream, state, false, len, chunk, encoding, cb); - entry = entry.next; - state.bufferedRequestCount--; - // If we didn't call the onwrite immediately, then - // it means that we need to wait until it does. - // also, that means that the chunk and cb are currently - // being processed, so move the buffer counter past them. - if (state.writing) { - break; - } + do { + const { chunk, encoding, callback } = buffered[i]; + buffered[i++] = null; + const len = objectMode ? 1 : chunk.length; + doWrite(stream, state, false, len, chunk, encoding, callback); + } while (i < buffered.length && !state.writing); + + if (i === buffered.length) { + resetBuffer(state); + } else if (i > 256) { + buffered.splice(0, i); + state.bufferedIndex = 0; + } else { + state.bufferedIndex = i; } - - if (entry === null) - state.lastBufferedRequest = null; } - - state.bufferedRequest = entry; state.bufferProcessing = false; } @@ -622,7 +592,7 @@ function needFinish(state) { return (state.ending && state.length === 0 && !state.errored && - state.bufferedRequest === null && + state.buffered.length === 0 && !state.finished && !state.writing); } @@ -693,20 +663,6 @@ function finish(stream, state) { } } -function onCorkedFinish(corkReq, state, err) { - let entry = corkReq.entry; - corkReq.entry = null; - while (entry) { - const cb = entry.callback; - state.pendingcb--; - cb(err); - entry = entry.next; - } - - // Reuse the free corkReq. - state.corkedRequestsFree.next = corkReq; -} - // TODO(ronag): Avoid using events to implement internal logic. function onFinished(stream, state, cb) { function onerror(err) {