Skip to content

Commit 85f49ea

Browse files
committed
stream: add fast-path for readable streams
1 parent 7675749 commit 85f49ea

File tree

2 files changed

+57
-2
lines changed

2 files changed

+57
-2
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const Readable = require('stream').Readable;
5+
6+
const BASE = 'hello world\n\n';
7+
8+
const bench = common.createBenchmark(main, {
9+
encoding: ['utf-8', 'latin1'],
10+
len: [256, 512, 1024 * 16],
11+
op: ['unshift', 'push'],
12+
n: [1e3]
13+
});
14+
15+
function main({ n, encoding, len, op }) {
16+
const b = BASE.repeat(len);
17+
const s = new Readable({
18+
objectMode: false,
19+
});
20+
21+
bench.start();
22+
switch (op) {
23+
case 'unshift': {
24+
for (let i = 0; i < n; i++)
25+
s.unshift(b, encoding);
26+
break;
27+
}
28+
case 'push': {
29+
for (let i = 0; i < n; i++)
30+
s.push(b, encoding);
31+
break;
32+
}
33+
}
34+
bench.end(n);
35+
}

lib/internal/streams/readable.js

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ const {
3232
Promise,
3333
SafeSet,
3434
SymbolAsyncIterator,
35-
Symbol
35+
Symbol,
36+
TypedArrayPrototypeGetBuffer,
37+
TypedArrayPrototypeGetByteOffset,
38+
TypedArrayPrototypeGetByteLength,
3639
} = primordials;
3740

3841
module.exports = Readable;
@@ -42,6 +45,8 @@ const EE = require('events');
4245
const { Stream, prependListener } = require('internal/streams/legacy');
4346
const { Buffer } = require('buffer');
4447

48+
const { TextEncoder } = require('internal/encoding');
49+
4550
const {
4651
addAbortSignal,
4752
} = require('internal/streams/add-abort-signal');
@@ -73,6 +78,9 @@ const kPaused = Symbol('kPaused');
7378

7479
const { StringDecoder } = require('string_decoder');
7580
const from = require('internal/streams/from');
81+
const { normalizeEncoding } = require('internal/util');
82+
const { FastBuffer } = require('internal/buffer');
83+
const encoder = new TextEncoder();
7684

7785
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
7886
ObjectSetPrototypeOf(Readable, Stream);
@@ -251,9 +259,21 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
251259
if (addToFront && state.encoding) {
252260
// When unshifting, if state.encoding is set, we have to save
253261
// the string in the BufferList with the state encoding.
262+
254263
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
255264
} else {
256-
chunk = Buffer.from(chunk, encoding);
265+
const enc = normalizeEncoding(encoding);
266+
267+
if (enc === 'utf8') {
268+
const buf = encoder.encode(chunk);
269+
chunk = new FastBuffer(
270+
TypedArrayPrototypeGetBuffer(buf),
271+
TypedArrayPrototypeGetByteOffset(buf),
272+
TypedArrayPrototypeGetByteLength(buf),
273+
);
274+
} else {
275+
chunk = Buffer.from(chunk, enc);
276+
}
257277
encoding = '';
258278
}
259279
}

0 commit comments

Comments
 (0)