From 03d941637d191aeb84a46dacb00c422947c8834a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 23 Oct 2023 17:03:00 +0200 Subject: [PATCH] streams: use Array for Readable buffer --- benchmark/streams/readable-bigread.js | 2 +- lib/internal/streams/readable.js | 136 +++++++++++++++++++++----- 2 files changed, 115 insertions(+), 23 deletions(-) diff --git a/benchmark/streams/readable-bigread.js b/benchmark/streams/readable-bigread.js index 0d963c6803299e..8b36f49d7ffa53 100644 --- a/benchmark/streams/readable-bigread.js +++ b/benchmark/streams/readable-bigread.js @@ -15,7 +15,7 @@ function main({ n }) { bench.start(); for (let k = 0; k < n; ++k) { - for (let i = 0; i < 1e4; ++i) + for (let i = 0; i < 1e3; ++i) s.push(b); while (s.read(128)); } diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index a129b1b6f4b75d..831a8601295fc0 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -73,6 +73,7 @@ const { const { validateObject } = require('internal/validators'); const kState = Symbol('kState'); +const FastBuffer = Buffer[Symbol.species]; const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); @@ -278,7 +279,8 @@ function ReadableState(options, stream, isDuplex) { // A linked list is used to store data chunks instead of an array because the // linked list can remove elements from the beginning faster than // array.shift(). - this.buffer = new BufferList(); + this.buffer = []; + this.bufferIndex = 0; this.length = 0; this.pipes = []; @@ -546,10 +548,15 @@ function addChunk(stream, state, chunk, addToFront) { } else { // Update the buffer info. state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; - if (addToFront) - state.buffer.unshift(chunk); - else + if (addToFront) { + if (state.bufferIndex > 0) { + state.buffer[--state.bufferIndex] = chunk; + } else { + state.buffer.unshift(chunk); // Slow path + } + } else { state.buffer.push(chunk); + } if ((state[kState] & kNeedReadable) !== 0) emitReadable(stream); @@ -564,21 +571,24 @@ Readable.prototype.isPaused = function() { // Backwards compatibility. Readable.prototype.setEncoding = function(enc) { + const state = this._readableState; + const decoder = new StringDecoder(enc); - this._readableState.decoder = decoder; + state.decoder = decoder; // If setEncoding(null), decoder.encoding equals utf8. - this._readableState.encoding = this._readableState.decoder.encoding; + state.encoding = state.decoder.encoding; - const buffer = this._readableState.buffer; // Iterate over current buffer to convert already stored Buffers: let content = ''; - for (const data of buffer) { + for (const data of state.buffer.slice(state.bufferIndex)) { content += decoder.write(data); } - buffer.clear(); + state.buffer.length = 0; + state.bufferIndex = 0; + if (content !== '') buffer.push(content); - this._readableState.length = content.length; + state.length = content.length; return this; }; @@ -611,7 +621,7 @@ function howMuchToRead(n, state) { if (NumberIsNaN(n)) { // Only flow one buffer at a time. if ((state[kState] & kFlowing) !== 0 && state.length) - return state.buffer.first().length; + return state.buffer[state.bufferIndex].length; return state.length; } if (n <= state.length) @@ -1550,20 +1560,102 @@ function fromList(n, state) { return null; let ret; - if (state.objectMode) - ret = state.buffer.shift(); - else if (!n || n >= state.length) { + if ((state[kState] & kObjectMode) !== 0) { + ret = state.buffer[state.bufferIndex++]; + } else if (!n || n >= state.length) { // Read it all, truncate the list. - if (state.decoder) - ret = state.buffer.join(''); - else if (state.buffer.length === 1) - ret = state.buffer.first(); - else - ret = state.buffer.concat(state.length); - state.buffer.clear(); + if ((state[kState] & kDecoder) !== 0) { + ret = '' + for (let n = state.bufferIndex; n < state.buffer.length; n++) { + ret += state.buffer[n]; + } + } else if (state.buffer.length - state.bufferIndex === 0) { + ret = Buffer.alloc(0) + } else if (state.buffer.length - state.bufferIndex === 1) { + ret = state.buffer[state.bufferIndex]; + } else { + ret = Buffer.allocUnsafe(n >>> 0); + let i = 0; + for (let n = state.bufferIndex; n < state.buffer.length; n++) { + const data = state.buffer[n]; + ret.set(data, i); + i += data.length; + } + } + state.buffer.length = 0; + state.bufferIndex = 0; } else { // read part of list. - ret = state.buffer.consume(n, state.decoder); + + const data = state.buffer[state.bufferIndex]; + + if (n < data.length) { + // `slice` is the same for buffers and strings. + const slice = data.slice(0, n); + state.buffer[state.bufferIndex] = data.slice(n); + return slice; + } + + if (n === data.length) { + // First chunk is a perfect match. + return state.buffer[state.bufferIndex++]; + } + + let idx = state.bufferIndex; + let buf = state.buffer; + const len = buf.length; + + if ((state[kState] & kDecoder) !== 0) { + ret = ''; + while (idx < state.buffer.length) { + const str = buf[idx]; + if (n > str.length) { + ret += str; + n -= str.length; + idx++; + } else { + if (n === buf.length) { + ret += str; + idx++; + } else { + ret += str.slice(0, n); + buf[idx] = str.slice(n); + } + break; + } + } + } else { + ret = Buffer.allocUnsafe(n); + + const retLen = n; + while (idx < len) { + const data = buf[idx]; + if (n > data.length) { + ret.set(data, retLen - n); + n -= data.length; + idx++; + } else { + if (n === data.length) { + ret.set(data, retLen - n); + idx++; + } else { + ret.set(new FastBuffer(data.buffer, data.byteOffset, n), retLen - n); + buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n); + } + break; + } + } + } + + if (idx === buf.length) { + state.buffer.length = 0; + state.bufferIndex = 0 + } else if (idx > 1024) { + state.buffer.splice(0, idx); + state.bufferIndex = 0; + } else { + state.bufferIndex = idx; + } } return ret;