Skip to content

Commit 4bd401e

Browse files
committed
stream: writable bitfield
1 parent 283e7a4 commit 4bd401e

File tree

1 file changed

+101
-36
lines changed

1 file changed

+101
-36
lines changed

lib/_stream_writable.js

Lines changed: 101 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ function WritableState(options, stream, isDuplex) {
7373
if (typeof isDuplex !== 'boolean')
7474
isDuplex = stream instanceof Stream.Duplex;
7575

76+
this.bitfield = 0;
77+
7678
// Object stream flag to indicate whether or not this stream
7779
// contains buffers or objects.
7880
this.objectMode = !!(options && options.objectMode);
@@ -184,6 +186,73 @@ function WritableState(options, stream, isDuplex) {
184186
this.corkedRequestsFree = corkReq;
185187
}
186188

189+
const F = {};
190+
for (let [ key, options ] of Object.entries({
191+
objectMode: {},
192+
finalCalled: {},
193+
drained: {},
194+
ending: {},
195+
ended: {},
196+
finished: {},
197+
destroyed: {},
198+
decodeStrings: {},
199+
writing: {},
200+
sync: {},
201+
bufferProcessing: {},
202+
prefinished: {},
203+
errorEmitted: {},
204+
emitClose: {},
205+
autoDestroy: {},
206+
errored: {}
207+
})) {
208+
options = options || {}
209+
const mask = 1 << (Object.keys(F).length + 8);
210+
F[key] = mask;
211+
ObjectDefineProperty(WritableState.prototype, key, {
212+
get () {
213+
return !!(this.bitfield & mask);
214+
},
215+
set (val) {
216+
if (val) {
217+
this.bitfield |= mask;
218+
} else {
219+
this.bitfield &= ~mask;
220+
}
221+
}
222+
})
223+
}
224+
225+
ObjectDefineProperty(WritableState.prototype, 'needDrain', {
226+
get () {
227+
return !this.drained;
228+
},
229+
set (val) {
230+
this.drained = !val;
231+
}
232+
});
233+
234+
F.corked = 0xF0;
235+
ObjectDefineProperty(WritableState.prototype, 'corked', {
236+
get () {
237+
return (this.bitfield & F.corked) >> 4;
238+
},
239+
set (val) {
240+
this.bitfield &= ~F.corked;
241+
this.bitfield |= (val << 4) & F.corked;
242+
}
243+
});
244+
245+
F.pendingcb = 0xF;
246+
ObjectDefineProperty(WritableState.prototype, 'pendingcb', {
247+
get () {
248+
return (this.bitfield & F.pendingcb);
249+
},
250+
set (val) {
251+
this.bitfield &= ~F.pendingcb;
252+
this.bitfield |= (val & F.pendingcb);
253+
}
254+
});
255+
187256
WritableState.prototype.getBuffer = function getBuffer() {
188257
var current = this.bufferedRequest;
189258
const out = [];
@@ -294,8 +363,9 @@ function validChunk(stream, state, chunk, cb) {
294363

295364
Writable.prototype.write = function(chunk, encoding, cb) {
296365
const state = this._writableState;
366+
const objectMode = state.bitfield & F.objectMode;
297367
var ret = false;
298-
const isBuf = !state.objectMode && Stream._isUint8Array(chunk);
368+
const isBuf = !objectMode && Stream._isUint8Array(chunk);
299369

300370
// Do not use Object.getPrototypeOf as it is slower since V8 7.3.
301371
if (isBuf && !(chunk instanceof Buffer)) {
@@ -315,12 +385,14 @@ Writable.prototype.write = function(chunk, encoding, cb) {
315385
if (typeof cb !== 'function')
316386
cb = nop;
317387

318-
if (state.ending) {
319-
writeAfterEnd(this, cb);
320-
} else if (state.destroyed) {
321-
const err = new ERR_STREAM_DESTROYED('write');
322-
process.nextTick(cb, err);
323-
errorOrDestroy(this, err);
388+
if (state.bitfield & (F.ending | F.destroyed)) {
389+
if (state.ending) {
390+
writeAfterEnd(this, cb);
391+
} else if (state.destroyed) {
392+
const err = new ERR_STREAM_DESTROYED('write');
393+
process.nextTick(cb, err);
394+
errorOrDestroy(this, err);
395+
}
324396
} else if (isBuf || validChunk(this, state, chunk, cb)) {
325397
state.pendingcb++;
326398
ret = writeOrBuffer(this, state, chunk, encoding, cb);
@@ -401,8 +473,11 @@ ObjectDefineProperty(Writable.prototype, 'writableCorked', {
401473
// in the queue, and wait our turn. Otherwise, call _write
402474
// If we return false, then we need a drain event, so set that flag.
403475
function writeOrBuffer(stream, state, chunk, encoding, cb) {
404-
if (!state.objectMode &&
405-
state.decodeStrings !== false &&
476+
const objectMode = state.bitfield & F.objectMode;
477+
const decodeStrings = state.bitfield & F.decodeStrings;
478+
479+
if (!objectMode &&
480+
decodeStrings &&
406481
encoding !== 'buffer' &&
407482
typeof chunk === 'string') {
408483
chunk = Buffer.from(chunk, encoding);
@@ -415,9 +490,9 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
415490
const ret = state.length < state.highWaterMark;
416491
// We must ensure that previous needDrain will not be reset to false.
417492
if (!ret)
418-
state.needDrain = true;
493+
state.bitfield &= ~F.drained;
419494

420-
if (state.writing || state.corked || state.errored) {
495+
if (state.bitfield & (F.writing | F.corked | F.errored)) {
421496
var last = state.lastBufferedRequest;
422497
state.lastBufferedRequest = {
423498
chunk,
@@ -437,21 +512,18 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
437512

438513
// Return false if errored or destroyed in order to break
439514
// any synchronous while(stream.write(data)) loops.
440-
return ret && !state.errored && !state.destroyed;
515+
return ret && !(state.bitfield & (F.errored | F.destroyed));
441516
}
442517

443518
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
444519
state.writelen = len;
445520
state.writecb = cb;
446-
state.writing = true;
447-
state.sync = true;
448-
if (state.destroyed)
449-
state.onwrite(new ERR_STREAM_DESTROYED('write'));
450-
else if (writev)
521+
state.bitfield |= F.writing | F.sync;
522+
if (writev)
451523
stream._writev(chunk, state.onwrite);
452524
else
453525
stream._write(chunk, encoding, state.onwrite);
454-
state.sync = false;
526+
state.bitfield &= ~F.sync;
455527
}
456528

457529
function onwriteError(stream, state, er, cb) {
@@ -464,15 +536,15 @@ function onwriteError(stream, state, er, cb) {
464536

465537
function onwrite(stream, er) {
466538
const state = stream._writableState;
467-
const sync = state.sync;
539+
const sync = state.bitfield & F.sync;
468540
const cb = state.writecb;
469541

470542
if (typeof cb !== 'function') {
471543
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
472544
return;
473545
}
474546

475-
state.writing = false;
547+
state.bitfield &= ~F.writing;
476548
state.writecb = null;
477549
state.length -= state.writelen;
478550
state.writelen = 0;
@@ -485,12 +557,7 @@ function onwrite(stream, er) {
485557
onwriteError(stream, state, er, cb);
486558
}
487559
} else {
488-
// Check if we're actually ready to finish, but don't emit yet
489-
var finished = needFinish(state) || stream.destroyed;
490-
491-
if (!finished &&
492-
!state.corked &&
493-
!state.bufferProcessing &&
560+
if (!(state.bitfield & (F.corked | F.bufferProcessing)) &&
494561
state.bufferedRequest) {
495562
clearBuffer(stream, state);
496563
}
@@ -519,10 +586,10 @@ function afterWriteTick({ stream, state, count, cb }) {
519586
}
520587

521588
function afterWrite(stream, state, count, cb) {
522-
const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
523-
state.needDrain;
589+
const needDrain = !(state.bitfield & (F.ending | F.destroyed | F.drained)) &&
590+
state.length === 0;
524591
if (needDrain) {
525-
state.needDrain = false;
592+
state.bitfield |= F.drained;
526593
stream.emit('drain');
527594
}
528595

@@ -536,7 +603,7 @@ function afterWrite(stream, state, count, cb) {
536603

537604
// If there's something in the buffer waiting, then process it
538605
function clearBuffer(stream, state) {
539-
state.bufferProcessing = true;
606+
state.bitfield |= F.bufferProcessing;
540607
var entry = state.bufferedRequest;
541608

542609
if (stream._writev && entry && entry.next) {
@@ -597,7 +664,7 @@ function clearBuffer(stream, state) {
597664
}
598665

599666
state.bufferedRequest = entry;
600-
state.bufferProcessing = false;
667+
state.bitfield &= ~F.bufferProcessing;
601668
}
602669

603670
Writable.prototype._write = function(chunk, encoding, cb) {
@@ -656,12 +723,10 @@ ObjectDefineProperty(Writable.prototype, 'writableLength', {
656723
});
657724

658725
function needFinish(state) {
659-
return (state.ending &&
726+
return ((state.bitfield & F.ending) &&
727+
!(state.bitfield & (F.errored | F.finished | F.writing)) &&
660728
state.length === 0 &&
661-
!state.errored &&
662-
state.bufferedRequest === null &&
663-
!state.finished &&
664-
!state.writing);
729+
state.bufferedRequest === null);
665730
}
666731

667732
function callFinal(stream, state) {

0 commit comments

Comments
 (0)