Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: support decoding buffers for Writables #7425

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions benchmark/streams/writable-simple.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';

const common = require('../common');
const Writable = require('stream').Writable;

const bench = common.createBenchmark(main, {
n: [50000],
inputType: ['buffer', 'string'],
inputEncoding: ['utf8', 'ucs2'],
decodeAs: ['', 'utf8', 'ucs2']
});

const inputStr = `袎逜 釂鱞鸄 碄碆碃 蒰裧頖 鋑, 瞂 覮轀 蔝蓶蓨 踥踕踛 鬐鶤 鄜 忁曨曣
翀胲胵, 鬵鵛嚪 釢髟偛 碞碠粻 漀 涾烰 跬 窱縓 墥墡嬇 禒箈箑, 餤駰 瀁瀎瀊 躆轖轕 蒛, 銙 簎艜薤
樆樦潏 魡鳱 櫱瀯灂 鷜鷙鷵 禒箈箑 綧 駓駗, 鋡 嗛嗕塨 嶭嶴憝 爂犤繵 罫蓱 摮 灉礭蘠 蠬襱覾 脬舑莕
躐鑏, 襆贂 漀 刲匊呥 肒芅邥 泏狔狑, 瀗犡礝 浘涀缹 輲輹 綧`;

function main(conf) {
const n = +conf.n;
const s = new Writable({
decodeBuffers: !!conf.decodeAs,
defaultEncoding: conf.decodeAs || undefined,
write(chunk, encoding, cb) { cb(); }
});

const inputEnc = conf.inputType === 'buffer' ? undefined : conf.inputEncoding;
const input = conf.inputType === 'buffer' ?
Buffer.from(inputStr, conf.inputEncoding) : inputStr;

bench.start();
for (var k = 0; k < n; ++k) {
s.write(input, inputEnc);
}
bench.end(n);
}
3 changes: 3 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,9 @@ constructor and implement the `writable._write()` method. The
* `decodeStrings` {Boolean} Whether or not to decode strings into
Buffers before passing them to [`stream._write()`][stream-_write].
Defaults to `true`
* `decodeBuffers` {Boolean} Whether or not to decode Buffers into strings
using the default encoding before passing them to
[`stream._write()`][stream-_write].
* `objectMode` {Boolean} Whether or not the
[`stream.write(anyObj)`][stream-write] is a valid operation. When set,
it becomes possible to write JavaScript values other than string or
Expand Down
68 changes: 57 additions & 11 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const util = require('util');
const internalUtil = require('internal/util');
const Stream = require('stream');
const Buffer = require('buffer').Buffer;
var StringDecoder;

util.inherits(Writable, Stream);

Expand Down Expand Up @@ -46,6 +47,8 @@ function WritableState(options, stream) {
// drain event flag.
this.needDrain = false;
// at the start of calling end()
this.flushing = false;
// at the start of calling endWritable()
this.ending = false;
// when end() has been called, and returned
this.ended = false;
Expand All @@ -55,6 +58,8 @@ function WritableState(options, stream) {
// should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
// handling at a lower level.
// This is confusingly named. For character encodings (like utf8), setting
// decodeStrings to true will *encode* strings as Buffers.
var noDecode = options.decodeStrings === false;
this.decodeStrings = !noDecode;

Expand All @@ -63,6 +68,21 @@ function WritableState(options, stream) {
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = options.defaultEncoding || 'utf8';

// A StringDecoder instance that is used for decoding incoming Buffers
// if that is desired by the stream implementer, as indicated by the
// `decodeBuffers` option.
this.stringDecoder = null;
if (options.decodeBuffers) {
// Check whether the caller explicitly requested inconsistent options.
if (options.decodeStrings === true) {
throw new Error('decodeBuffers and decodeStrings cannot both be true');
}

if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this.stringDecoder = new StringDecoder(this.defaultEncoding);
}

// not an actual buffer we keep track of, but a measurement
// of how much we're waiting to get pushed to some underlying
// socket or file.
Expand Down Expand Up @@ -276,23 +296,43 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
};

function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
if (state.objectMode)
return chunk;

var sd = state.stringDecoder;
if (typeof chunk === 'string') {
if (sd !== null && encoding === sd.encoding && sd.lastNeed === 0)
return chunk; // No re-encoding encessary.

if (state.decodeStrings !== false || sd !== null)
chunk = Buffer.from(chunk, encoding);
}

if (sd !== null) {
// chunk is always a Buffer now.
if (state.flushing) {
chunk = sd.end(chunk);
} else {
chunk = sd.write(chunk);
}
}

return chunk;
}

// if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!isBuf) {
var sd = state.stringDecoder;
if (!isBuf || sd) {
chunk = decodeChunk(state, chunk, encoding);
if (chunk instanceof Buffer)
encoding = 'buffer';
else if (sd)
encoding = sd.encoding;
}

var len = state.objectMode ? 1 : chunk.length;

state.length += len;
Expand Down Expand Up @@ -459,20 +499,26 @@ Writable.prototype._write = function(chunk, encoding, cb) {

Writable.prototype._writev = null;

Writable.prototype.end = function(chunk, encoding, cb) {
const empty = Buffer.alloc(0);

Writable.prototype.end = function(chunk, enc, cb) {
var state = this._writableState;

if (typeof chunk === 'function') {
cb = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
enc = null;
} else if (typeof enc === 'function') {
cb = enc;
enc = null;
}

state.flushing = true;

if (chunk !== null && chunk !== undefined)
this.write(chunk, encoding);
this.write(chunk, enc);
else if (state.stringDecoder && state.stringDecoder.lastNeed > 0)
this.write(empty);

// .end() fully uncorks
if (state.corked) {
Expand Down
121 changes: 121 additions & 0 deletions test/parallel/test-stream-writable-decode-buffers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
'use strict';
require('../common');
const assert = require('assert');

const stream = require('stream');

class ChunkStoringWritable extends stream.Writable {
constructor(options) {
super(options);

this.chunks = [];
}

_write(data, encoding, callback) {
this.chunks.push({ data, encoding });
callback();
}
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });
w.write(Buffer.from('e4bd', 'hex'));
w.write(Buffer.from('a0e5', 'hex'));
w.write(Buffer.from('a5bd', 'hex'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['', '你', '好']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });
w.write(Buffer.from('你', 'utf8'));
w.write(Buffer.from('好', 'utf8'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你', '好']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });
w.write(Buffer.from('80', 'hex'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['\ufffd']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });
w.write(Buffer.from('c3', 'hex'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['', '\ufffd']);
}

{
const w = new ChunkStoringWritable({
decodeBuffers: true,
defaultEncoding: 'utf16le'
});

w.write(Buffer.from('你好', 'utf16le'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你好']);
}

{
const w = new ChunkStoringWritable({
decodeBuffers: true,
defaultEncoding: 'base64'
});

w.write(Buffer.from('你好', 'utf16le'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['YE99', 'WQ==']);
}

{
const w = new ChunkStoringWritable({
decodeBuffers: true,
defaultEncoding: 'utf16le'
});

w.write('你好', 'utf16le');
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你好']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });

w.write(Buffer.from([0x44, 0xc3])); // Ends on incomplete UTF8.

// This write should *not* be passed through directly as there's
// input pending.
w.write('a');

w.end(Buffer.from('bc7373656c', 'hex'));

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['D', '�a', '�ssel']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });

w.write('a');
w.setDefaultEncoding('ucs2');
w.end('换');

assert.deepStrictEqual(w.chunks, [
{ data: 'a', encoding: 'utf8' },
{ data: 'bc', encoding: 'utf8' }
]);
}

assert.throws(() => new stream.Writable({
decodeBuffers: true,
decodeStrings: true
}), /decodeBuffers and decodeStrings cannot both be true/);