Skip to content

Commit

Permalink
streams: set default hwm properly for Duplex
Browse files Browse the repository at this point in the history
Default highWaterMark is now set properly when using stream Duplex's
writableObjectMode and readableObjectMode options.

Added condition to the already existing split objectMode test to ensure
the highWaterMark is being set to the correct default value on both the
ReadableState and WritableState for readableObjectMode and
writableObjectMode.

Signed-off-by: Fedor Indutny <fedor@indutny.com>
  • Loading branch information
oppenlander authored and indutny committed Jul 10, 2014
1 parent 832d4db commit e1fec22
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 17 deletions.
17 changes: 8 additions & 9 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ util.inherits(Readable, Stream);
function ReadableState(options, stream) {
options = options || {};

// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;

if (stream instanceof Stream.Duplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;

// the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
var defaultHwm = options.objectMode ? 16 : 16 * 1024;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

// cast to ints.
Expand All @@ -63,14 +70,6 @@ function ReadableState(options, stream) {
this.emittedReadable = false;
this.readableListening = false;


// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;

if (stream instanceof Stream.Duplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
Expand Down
14 changes: 7 additions & 7 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ function WriteReq(chunk, encoding, cb) {
function WritableState(options, stream) {
options = options || {};

// the point at which write() starts returning false
// Note: 0 is a valid value, means that we always return false if
// the entire buffer is not flushed immediately on write()
var hwm = options.highWaterMark;
var defaultHwm = options.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;

if (stream instanceof Stream.Duplex)
this.objectMode = this.objectMode || !!options.writableObjectMode;

// the point at which write() starts returning false
// Note: 0 is a valid value, means that we always return false if
// the entire buffer is not flushed immediately on write()
var hwm = options.highWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

// cast to ints.
this.highWaterMark = ~~this.highWaterMark;

Expand Down
6 changes: 5 additions & 1 deletion test/simple/test-stream-transform-split-objectmode.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var parser = new Transform({ readableObjectMode : true });

assert(parser._readableState.objectMode);
assert(!parser._writableState.objectMode);
assert(parser._readableState.highWaterMark === 16);
assert(parser._writableState.highWaterMark === (16 * 1024));

parser._transform = function (chunk, enc, callback) {
callback(null, { val : chunk[0] });
Expand All @@ -50,10 +52,12 @@ var serializer = new Transform({ writableObjectMode : true });

assert(!serializer._readableState.objectMode);
assert(serializer._writableState.objectMode);
assert(serializer._readableState.highWaterMark === (16 * 1024));
assert(serializer._writableState.highWaterMark === 16);

serializer._transform = function (obj, _, callback) {
callback(null, new Buffer([obj.val]));
}
};

var serialized;

Expand Down

0 comments on commit e1fec22

Please sign in to comment.