-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: fix pipeline(readable, Duplex.from(<async-iterator>), cb)
not triggering data or calling the cb
#45534
Conversation
Review requested:
|
readable.pipe(Duplex.from(<async-iterator>))
not triggering data
readable.on('pipe', readable.resume.bind(readable)); | ||
readable.on('unpipe', readable.pause.bind(readable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spent over 5 hours just for these 2 lines 😬
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is the correct fix for this issue. resume
and pause
should be automatically handled my the machinery during pipe and unpipe.
Why do you think so?
@ronag wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, this is wrong.
readable.on('pipe', readable.resume.bind(readable)); | ||
readable.on('unpipe', readable.pause.bind(readable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on destroy are those listeners getting cleaned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect. I'll have a deeper look at this issue next week.
@ronag Maybe I can try to fix it, I want to contribute. Can you explain why it's incorrect? And maybe provide a hint to where I should focus Because it's extending from Readable than it shouldn't have this? The problem was that no one triggered the data to start for some reason |
assert.strictEqual(chunk, data[i++]); | ||
}, 4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert.strictEqual(chunk, data[i++]); | |
}, 4); | |
assert.strictEqual(chunk, data.shift()); | |
}, data.length); |
I'll have a look and try to explain it to you. |
readable.pipe(Duplex.from(<async-iterator>))
not triggering datapipeline(readable, Duplex.from(<async-iterator>), cb)
not triggering data or calling the cb
readable.pipe(Duplex.from(<async-iterator>)) should not trigger data
yield chunk; | ||
} | ||
})); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid making my mistake of starting the data flow before on data event listener is attached
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flow should start here, the callback should be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's right, this is why cb = common.mustNotCall();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. You are testing the opposite of what we want?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we did not attach on data listener so the stream should not start...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the data listener should not be relevant here. You should not have to worry about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipe should start the stream regardless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's what I thought at first, @mcollina WDYT?
Does any of these test fail without: readable.on('pipe', readable.resume.bind(readable));
readable.on('unpipe', readable.pause.bind(readable)); I think the best way here would be if you could help to contribute with failing tests and then we can try to fix. |
yes, they fail, otherwise I won't add them, I first write the tests and then fix the problem for the pipeline duplex test:
For the pipe test:
the problem is that nothing triggers the data to start flowing |
const cb = common.mustCall((chunk) => { | ||
assert.strictEqual(chunk, data.shift()); | ||
}, data.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this should be called only once, right?
Because the highWaterMark
for the duplex stream that created from async generator function is 1:
node/lib/internal/streams/from.js
Line 41 in 61805fb
highWaterMark: 1, |
And according to the docs:
The readable.pipe() method attaches a Writable stream to the readable, causing it to switch automatically into flowing mode and push all of its data to the attached Writable. The flow of data will be automatically managed so that the destination Writable stream is not overwhelmed by a faster Readable stream.
and because no data callback exists here we reached the max buffer size.
WDYT @ronag?
So the first case: pipeline(
Readable.from(['a', 'b', 'c', 'd']),
Duplex.from(async function*(stream) {
console.error("###start")
for await (const chunk of stream) {
yield chunk;
}
}),
common.mustCall(),
).on('error', common.mustNotCall()); Where the last stream is still readable will return a readable stream. So the correct test looks like this: const readable = pipeline(
Readable.from(['a', 'b', 'c', 'd']),
Duplex.from(async function*(stream) {
console.error("###start")
for await (const chunk of stream) {
yield chunk;
}
}),
common.mustCall(),
).on('error', common.mustNotCall());
readable.resume(); Which works as expected. Maybe improved docs would be appropriate? |
The other test has the same problem. I don't see an issue here. |
Shouldn't: pipeline(
Readable.from(['a', 'b', 'c', 'd']),
Duplex.from(async function*(stream) {
console.error("###start")
for await (const chunk of stream) {
yield chunk;
}
}),
common.mustCall(),
).on('error', common.mustNotCall()); should behave the same as: pipeline(
Readable.from(['a', 'b', 'c', 'd']),
async function*(stream) {
console.error("###start")
for await (const chunk of stream) {
yield chunk;
}
},
common.mustCall(),
).on('error', common.mustNotCall()); |
Not necessarily. In the case of |
If you make |
Ok, before I close this PR (😢 ) just wanna make sure You said that:
so why would I need to call
I assume you mean that you will need to call |
pipe will start the stream if/when the destination wants to receive data (i.e. |
I know but thanks anyway! |
Fix #45533
I think this should be ported to the rest of node versions...