Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jun 21, 2020
1 parent eebec50 commit fbe7a31
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 18 deletions.
19 changes: 12 additions & 7 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ const {
module.exports = Writable;

const Stream = require('stream');
const { WritableBase } = require('internal/streams/base');
const {
WritableBase,
WritableStateBase,
errorOrDestroy
} = require('internal/streams/base');

const {
getHighWaterMark,
Expand All @@ -54,7 +58,7 @@ const {
function nop() {}

const kFlush = Symbol('kFlush');
class WritableState extends WritableBase.WritableState {
class WritableState extends WritableStateBase {
constructor(options, stream, isDuplex) {
super(options);

Expand Down Expand Up @@ -203,11 +207,12 @@ function Writable(options) {

WritableBase.call(this, {
...options,
start: (stream, state) => {
start: function() {
const state = this._writableState;
if (!state.writing) {
clearBuffer(stream, state);
clearBuffer(this, state);
}
finishMaybe(stream, state);
finishMaybe(this, state);
},
write: writeOrBuffer,
flush: function(state, cb) {
Expand Down Expand Up @@ -305,7 +310,7 @@ function onwriteError(stream, state, er, cb) {
// writes.
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
// This can emit error, but error must always follow cb.
WritableBase.errorOrDestroy(stream, er);
errorOrDestroy(stream, er);
}

function onwrite(stream, er) {
Expand All @@ -314,7 +319,7 @@ function onwrite(stream, er) {
const cb = state.writecb;

if (typeof cb !== 'function') {
WritableBase.errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
return;
}

Expand Down
21 changes: 15 additions & 6 deletions lib/internal/streams/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const {
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
ERR_STREAM_ALREADY_FINISHED,
ERR_MULTIPLE_CALLBACK,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING
Expand Down Expand Up @@ -74,6 +75,10 @@ class WritableStateBase {

// Indicates whether the stream has finished destroying.
this.closed = false;

// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false;
}
}

Expand All @@ -98,8 +103,6 @@ function WritableBase(options, isDuplex, State) {
destroyImpl.construct(this, options.start);
}
WritableBase.prototype = ObjectCreate(Stream.prototype);
WritableBase.errorOrDestroy = errorOrDestroy;
WritableBase.WritableState = WritableStateBase;
WritableBase.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;

Expand All @@ -108,6 +111,8 @@ WritableBase.prototype.write = function(chunk, encoding, cb) {
encoding = state.defaultEncoding;
} else if (!encoding) {
encoding = state.defaultEncoding;
} else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) {
throw new ERR_UNKNOWN_ENCODING(encoding);
}

if (chunk === null) {
Expand Down Expand Up @@ -178,8 +183,10 @@ WritableBase.prototype.end = function(chunk, encoding, cb) {
encoding = null;
}

if (chunk !== null && chunk !== undefined)
if (chunk !== null && chunk !== undefined) {
// TODO (ronag): Propagate callback error.
this.write(chunk, encoding);
}

// This is forgiving in terms of unnecessary calls to end() and can hide
// logic errors. However, usually such errors are harmless and causing a
Expand All @@ -191,7 +198,7 @@ WritableBase.prototype.end = function(chunk, encoding, cb) {
let called = false;
this[kFlush](state, (err) => {
if (called) {
// TODO(ronag): ERR_MULTIPLE_CALLBACK?
errorOrDestroy(this, new ERR_MULTIPLE_CALLBACK());
return;
}
called = true;
Expand Down Expand Up @@ -225,7 +232,7 @@ WritableBase.prototype.end = function(chunk, encoding, cb) {
function finish(stream, state) {
// TODO(ronag): state.closed, state.errored, state.destroyed?

if (state.errorEmitted)
if (state.errorEmitted || state.closeEmitted)
return;

// TODO(ronag): This could occur after 'close' is emitted.
Expand Down Expand Up @@ -329,5 +336,7 @@ ObjectDefineProperties(WritableBase.prototype, {
});

module.exports = {
WritableBase
WritableBase,
WritableStateBase,
errorOrDestroy
};
10 changes: 5 additions & 5 deletions test/parallel/test-stream-writable-write-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function test(autoDestroy) {
{
const w = new Writable({
autoDestroy,
_write() {}
write() {}
});
w.end();
expectError(w, ['asd'], 'ERR_STREAM_WRITE_AFTER_END');
Expand All @@ -40,23 +40,23 @@ function test(autoDestroy) {
{
const w = new Writable({
autoDestroy,
_write() {}
write() {}
});
w.destroy();
}

{
const w = new Writable({
autoDestroy,
_write() {}
write() {}
});
expectError(w, [null], 'ERR_STREAM_NULL_VALUES', true);
}

{
const w = new Writable({
autoDestroy,
_write() {}
write() {}
});
expectError(w, [{}], 'ERR_INVALID_ARG_TYPE', true);
}
Expand All @@ -65,7 +65,7 @@ function test(autoDestroy) {
const w = new Writable({
decodeStrings: false,
autoDestroy,
_write() {}
write() {}
});
expectError(w, ['asd', 'noencoding'], 'ERR_UNKNOWN_ENCODING', true);
}
Expand Down

0 comments on commit fbe7a31

Please sign in to comment.