Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix pipeline(readable, Duplex.from(<async-iterator>), cb) not triggering data or calling the cb #45534

Closed
wants to merge 5 commits into from

Conversation

rluvaton
Copy link
Member

@rluvaton rluvaton commented Nov 20, 2022

Fix #45533

I think this should be ported to the rest of node versions...

@nodejs-github-bot
Copy link
Collaborator

Review requested:

  • @nodejs/streams

@nodejs-github-bot nodejs-github-bot added the needs-ci PRs that need a full CI run. label Nov 20, 2022
@rluvaton rluvaton changed the title stream: fix readable.pipe(Duplex.from(<async-iterator>)) not triggering stream: fix readable.pipe(Duplex.from(<async-iterator>)) not triggering data Nov 20, 2022
Comment on lines +111 to +112
readable.on('pipe', readable.resume.bind(readable));
readable.on('unpipe', readable.pause.bind(readable));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spent over 5 hours just for these 2 lines 😬

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the correct fix for this issue. resume and pause should be automatically handled my the machinery during pipe and unpipe.

Why do you think so?

@ronag wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this is wrong.

Comment on lines +111 to +112
readable.on('pipe', readable.resume.bind(readable));
readable.on('unpipe', readable.pause.bind(readable));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on destroy are those listeners getting cleaned?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think so.

Copy link
Member

@ronag ronag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. I'll have a deeper look at this issue next week.

@rluvaton
Copy link
Member Author

rluvaton commented Nov 20, 2022

This is incorrect. I'll have a deeper look at this issue next week.

@ronag Maybe I can try to fix it, I want to contribute.

Can you explain why it's incorrect? And maybe provide a hint to where I should focus

Because it's extending from Readable than it shouldn't have this?

The problem was that no one triggered the data to start for some reason

Comment on lines 310 to 311
assert.strictEqual(chunk, data[i++]);
}, 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.strictEqual(chunk, data[i++]);
}, 4);
assert.strictEqual(chunk, data.shift());
}, data.length);

@ronag
Copy link
Member

ronag commented Nov 20, 2022

This is incorrect. I'll have a deeper look at this issue next week.

@ronag Maybe I can try to fix it, I want to contribute.

Can you explain why it's incorrect? And maybe provide a hint to where I should focus

Because it's extending from Readable than it shouldn't have this?

The problem was that no one triggered the data to start for some reason

I'll have a look and try to explain it to you.

@rluvaton rluvaton changed the title stream: fix readable.pipe(Duplex.from(<async-iterator>)) not triggering data stream: fix pipeline(readable, Duplex.from(<async-iterator>), cb) not triggering data or calling the cb Nov 20, 2022
readable.pipe(Duplex.from(<async-iterator>)) should not trigger data
yield chunk;
}
}));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid making my mistake of starting the data flow before on data event listener is attached

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flow should start here, the callback should be called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right, this is why cb = common.mustNotCall();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. You are testing the opposite of what we want?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we did not attach on data listener so the stream should not start...

#45533 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the data listener should not be relevant here. You should not have to worry about that.

Copy link
Member

@ronag ronag Nov 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipe should start the stream regardless.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's what I thought at first, @mcollina WDYT?

@ronag
Copy link
Member

ronag commented Nov 20, 2022

Does any of these test fail without:

  readable.on('pipe', readable.resume.bind(readable));
  readable.on('unpipe', readable.pause.bind(readable));

I think the best way here would be if you could help to contribute with failing tests and then we can try to fix.

@rluvaton
Copy link
Member Author

rluvaton commented Nov 20, 2022

yes, they fail, otherwise I won't add them, I first write the tests and then fix the problem

for the pipeline duplex test:

Mismatched <anonymous> function calls. Expected exactly 1, actual 0.
    at mustCall (/Users/rluvaton/dev/open-source/node/node-fork/test/common/index.js:406:10)
    at Proxy.mustSucceed (/Users/rluvaton/dev/open-source/node/node-fork/test/common/index.js:410:10)
    at Object.<anonymous> (/Users/rluvaton/dev/open-source/node/node-fork/test/parallel/test-stream-pipeline-duplex.js:47:12)
    at Module._compile (node:internal/modules/cjs/loader:1213:14)
    at Module._extensions..js (node:internal/modules/cjs/loader:1267:10)
    at Module.load (node:internal/modules/cjs/loader:1076:32)
    at Module._load (node:internal/modules/cjs/loader:917:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:82:12)
    at node:internal/main/run_main_module:23:47

For the pipe test:

Mismatched <anonymous> function calls. Expected exactly 4, actual 0.
    at Proxy.mustCall (/Users/rluvaton/dev/open-source/node/node-fork/test/common/index.js:406:10)
    at Object.<anonymous> (/Users/rluvaton/dev/open-source/node/node-fork/test/parallel/test-stream-duplex-from.js:308:21)
    at Module._compile (node:internal/modules/cjs/loader:1213:14)
    at Module._extensions..js (node:internal/modules/cjs/loader:1267:10)
    at Module.load (node:internal/modules/cjs/loader:1076:32)
    at Module._load (node:internal/modules/cjs/loader:917:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:82:12)
    at node:internal/main/run_main_module:23:47

the problem is that nothing triggers the data to start flowing

Comment on lines +308 to +310
const cb = common.mustCall((chunk) => {
assert.strictEqual(chunk, data.shift());
}, data.length);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this should be called only once, right?

Because the highWaterMark for the duplex stream that created from async generator function is 1:

highWaterMark: 1,

And according to the docs:

The readable.pipe() method attaches a Writable stream to the readable, causing it to switch automatically into flowing mode and push all of its data to the attached Writable. The flow of data will be automatically managed so that the destination Writable stream is not overwhelmed by a faster Readable stream.

readable.pipe(destination[, options])

and because no data callback exists here we reached the max buffer size.

WDYT @ronag?

@ronag
Copy link
Member

ronag commented Nov 20, 2022

So the first case:

 pipeline(
  Readable.from(['a', 'b', 'c', 'd']),
  Duplex.from(async function*(stream) {
    console.error("###start")
    for await (const chunk of stream) {
      yield chunk;
    }
  }),
  common.mustCall(),
).on('error', common.mustNotCall());

Where the last stream is still readable will return a readable stream. So the correct test looks like this:

const readable = pipeline(
  Readable.from(['a', 'b', 'c', 'd']),
  Duplex.from(async function*(stream) {
    console.error("###start")
    for await (const chunk of stream) {
      yield chunk;
    }
  }),
  common.mustCall(),
).on('error', common.mustNotCall());
readable.resume();

Which works as expected. Maybe improved docs would be appropriate?

@ronag
Copy link
Member

ronag commented Nov 20, 2022

The other test has the same problem. I don't see an issue here.

@rluvaton
Copy link
Member Author

rluvaton commented Nov 20, 2022

Shouldn't:

 pipeline(
  Readable.from(['a', 'b', 'c', 'd']),
  Duplex.from(async function*(stream) {
    console.error("###start")
    for await (const chunk of stream) {
      yield chunk;
    }
  }),
  common.mustCall(),
).on('error', common.mustNotCall());

should behave the same as:

 pipeline(
  Readable.from(['a', 'b', 'c', 'd']),
  async function*(stream) {
    console.error("###start")
    for await (const chunk of stream) {
      yield chunk;
    }
  },
  common.mustCall(),
).on('error', common.mustNotCall());

@ronag
Copy link
Member

ronag commented Nov 20, 2022

Not necessarily. In the case of Duplex.from some buffering is introduced.

@ronag
Copy link
Member

ronag commented Nov 20, 2022

If you make Readable.from(['a', 'b', 'c', 'd']), contain more elements (i.e. more than watermark) they will behave the same.

@rluvaton
Copy link
Member Author

Ok, before I close this PR (😢 ) just wanna make sure

You said that:

pipe should start the stream regardless.
#45534 (comment)

so why would I need to call readable.resume();?


If you make Readable.from(['a', 'b', 'c', 'd']), contain more elements (i.e. more than watermark) they will behave the same.

I assume you mean that you will need to call resume but from what I remembered when I worked with streams and pipeline to manipulate files with 1-7 GB in size I never had to do that at all (at the time it was node 14 I think)

@ronag
Copy link
Member

ronag commented Nov 21, 2022

pipe will start the stream if/when the destination wants to receive data (i.e. !writableNeedDrain). Also Readable.pipe and stream.pipeline are different things.

@rluvaton
Copy link
Member Author

Also Readable.pipe and stream.pipeline are different things.

I know but pipeline use pipe under the hood

thanks anyway!

@rluvaton rluvaton closed this Nov 21, 2022
@rluvaton rluvaton deleted the fix-45533 branch November 21, 2022 07:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-ci PRs that need a full CI run.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

pipeline callback is not called
5 participants