Skip to content

Commit

Permalink
stream: fix readable state awaitDrain increase in recursion
Browse files Browse the repository at this point in the history
PR-URL: #27572
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
abbshr authored and addaleax committed Aug 26, 2019
1 parent 627bf59 commit 698a294
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 36 deletions.
65 changes: 51 additions & 14 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ function ReadableState(options, stream, isDuplex) {
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = options.defaultEncoding || 'utf8';

// The number of writers that are awaiting a drain event in .pipe()s
this.awaitDrain = 0;
// Ref the piped dest which we need a drain event on it
// type: null | Writable | Set<Writable>
this.awaitDrainWriters = null;
this.multiAwaitDrain = false;

// If true, a maybeReadMore has been scheduled
this.readingMore = false;
Expand Down Expand Up @@ -310,7 +312,13 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {

function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
state.awaitDrain = 0;
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
stream.emit('data', chunk);
} else {
// Update the buffer info.
Expand Down Expand Up @@ -511,7 +519,11 @@ Readable.prototype.read = function(n) {
n = 0;
} else {
state.length -= n;
state.awaitDrain = 0;
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
}

if (state.length === 0) {
Expand Down Expand Up @@ -656,6 +668,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this._readableState;

if (state.pipes.length === 1) {
if (!state.multiAwaitDrain) {
state.multiAwaitDrain = true;
state.awaitDrainWriters = new Set(
state.awaitDrainWriters ? [state.awaitDrainWriters] : []
);
}
}

state.pipes.push(dest);
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);

Expand Down Expand Up @@ -709,7 +730,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
if (ondrain && state.awaitDrain &&
if (ondrain && state.awaitDrainWriters &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
}
Expand All @@ -724,16 +745,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) {
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src);
ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
src.pause();
Expand Down Expand Up @@ -783,13 +810,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
return dest;
};

function pipeOnDrain(src) {
function pipeOnDrain(src, dest) {
return function pipeOnDrainFunctionResult() {
const state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {

// `ondrain` will call directly,
// `this` maybe not a reference to dest,
// so we use the real dest here.
if (state.awaitDrainWriters === dest) {
debug('pipeOnDrain', 1);
state.awaitDrainWriters = null;
} else if (state.multiAwaitDrain) {
debug('pipeOnDrain', state.awaitDrainWriters.size);
state.awaitDrainWriters.delete(dest);
}

if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict';
const common = require('../common');
const { PassThrough } = require('stream');

const encode = new PassThrough({
highWaterMark: 1
});

const decode = new PassThrough({
highWaterMark: 1
});

const send = common.mustCall((buf) => {
encode.write(buf);
}, 4);

let i = 0;
const onData = common.mustCall(() => {
if (++i === 2) {
send(Buffer.from([0x3]));
send(Buffer.from([0x4]));
}
}, 4);

encode.pipe(decode).on('data', onData);

send(Buffer.from([0x1]));
send(Buffer.from([0x2]));
25 changes: 13 additions & 12 deletions test/parallel/test-stream-pipe-await-drain-manual-resume.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ readable.pipe(writable);

readable.once('pause', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
1,
'Expected awaitDrain to equal 1 but instead got ' +
`${readable._readableState.awaitDrain}`
readable._readableState.awaitDrainWriters,
writable,
'Expected awaitDrainWriters to be a Writable but instead got ' +
`${readable._readableState.awaitDrainWriters}`
);
// First pause, resume manually. The next write() to writable will still
// return false, because chunks are still being buffered, so it will increase
Expand All @@ -43,10 +43,10 @@ readable.once('pause', common.mustCall(() => {

readable.once('pause', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
1,
'.resume() should not reset the counter but instead got ' +
`${readable._readableState.awaitDrain}`
readable._readableState.awaitDrainWriters,
writable,
'.resume() should not reset the awaitDrainWriters, but instead got ' +
`${readable._readableState.awaitDrainWriters}`
);
// Second pause, handle all chunks from now on. Once all callbacks that
// are currently queued up are handled, the awaitDrain drain counter should
Expand All @@ -65,10 +65,11 @@ readable.push(null);

writable.on('finish', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
0,
'awaitDrain should equal 0 after all chunks are written but instead got' +
`${readable._readableState.awaitDrain}`
readable._readableState.awaitDrainWriters,
null,
`awaitDrainWriters should be reset to null
after all chunks are written but instead got
${readable._readableState.awaitDrainWriters}`
);
// Everything okay, all chunks were written.
}));
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ const assert = require('assert');
const writable = new stream.Writable({
write: common.mustCall(function(chunk, encoding, cb) {
assert.strictEqual(
readable._readableState.awaitDrain,
0
readable._readableState.awaitDrainWriters,
null,
);

if (chunk.length === 32 * 1024) { // first chunk
readable.push(Buffer.alloc(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased in the next
// tick, because awaitDrain is incremented after this method finished
process.nextTick(() => {
assert.strictEqual(readable._readableState.awaitDrain, 1);
assert.strictEqual(readable._readableState.awaitDrainWriters, writable);
});
}

Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-stream-pipe-await-drain.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) {

writer1.once('chunk-received', () => {
assert.strictEqual(
reader._readableState.awaitDrain,
reader._readableState.awaitDrainWriters.size,
0,
'awaitDrain initial value should be 0, actual is ' +
reader._readableState.awaitDrain
reader._readableState.awaitDrainWriters
);
setImmediate(() => {
// This one should *not* get through to writer1 because writer2 is not
Expand All @@ -39,10 +39,10 @@ writer1.once('chunk-received', () => {
// A "slow" consumer:
writer2._write = common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(
reader._readableState.awaitDrain,
reader._readableState.awaitDrainWriters.size,
1,
'awaitDrain should be 1 after first push, actual is ' +
reader._readableState.awaitDrain
reader._readableState.awaitDrainWriters
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
Expand All @@ -51,10 +51,10 @@ writer2._write = common.mustCall((chunk, encoding, cb) => {

writer3._write = common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(
reader._readableState.awaitDrain,
reader._readableState.awaitDrainWriters.size,
2,
'awaitDrain should be 2 after second push, actual is ' +
reader._readableState.awaitDrain
reader._readableState.awaitDrainWriters
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-stream2-basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ class TestWriter extends EE {
assert.strictEqual(v, null);

const w = new R();

w.write = function(buffer) {
written = true;
assert.strictEqual(ended, false);
Expand Down

0 comments on commit 698a294

Please sign in to comment.