Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using for await...of with break to read from a stream #46717

Open
Babak-Gholamzadeh opened this issue Feb 18, 2023 · 15 comments
Open

Using for await...of with break to read from a stream #46717

Babak-Gholamzadeh opened this issue Feb 18, 2023 · 15 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@Babak-Gholamzadeh
Copy link

Version

v18.14.1

Platform

Microsoft Windows NT 10.0.19045.0 x64"Microsoft Windows NT 10.0.19045.0 x64

Subsystem

streams

What steps will reproduce the bug?

I'm trying to read from process.stdin by using for await until I get \n (as you can see in the code snippet blow):

const readLine = async () => {
  process.stdin.on('error', err => {
    console.log('stdin error:', err);
  });
  let str = '';
  for await (const chunk of process.stdin) {
    str += chunk.toString();
    if (str.includes('\n'))
      break;
  }
  return str.split('\n')[0];
};

But when it reaches to break, it will throw an error in the stream.

The output of my log:

stdin error: AbortError: The operation was aborted
    at Object.destroyer (node:internal/streams/destroy:305:11)
    at createAsyncIterator (node:internal/streams/readable:1141:19) {
  code: 'ABORT_ERR'
}

I've also tried to read from a file stream like this and got the same error.

I got the same error on a Mac machine as well.

How often does it reproduce? Is there a required condition?

Always

What is the expected behavior?

I don't know what exactly the expected behavior should be here, but throwing an error doesn't make sense to me

What do you see instead?

Additional information

No response

@ronag
Copy link
Member

ronag commented Feb 18, 2023

@nodejs/stream @mcollina wdyt?

@ronag
Copy link
Member

ronag commented Feb 18, 2023

@benjamingr

@debadree25 debadree25 added the stream Issues and PRs related to the stream subsystem. label Feb 18, 2023
@aduh95
Copy link
Contributor

aduh95 commented Feb 18, 2023

Not exactly related to the underlying issue, but we have a built-in module to achieve what the code snippet on the OP is trying to do:

import readline from 'node:readline/promises';

const rl = readline.createInterface({ input: process.stdin })[Symbol.asyncIterator]();

const readLine = async () => {
  const { done, value } = await rl.next();
  if (done) throw new Error('no more data');
  return value;
};

@Babak-Gholamzadeh
Copy link
Author

@aduh95 Actually, the problem that I have is not related to stdin and I'm not aiming to create a readline function at all.
I had this issue with another type of stream in a project, and because that code was a little complicated and also didn't want to put it here, so I just tried to use stdin as an example of reading from a readable stream with for await to show where the error occurs.
I'm sorry if that wasn't a good example and causes distraction 🙏

@mcollina
Copy link
Member

This is working as expected. By default, doing a break destroy the stream to prevent memory leakage. You can avoid this by creating an iterator with https://nodejs.org/dist/latest-v18.x/docs/api/stream.html#readableiteratoroptions and manually close/destroy the stream.

@tonivj5
Copy link

tonivj5 commented Feb 19, 2023

This is working as expected. By default, doing a break destroy the stream to prevent memory leakage. You can avoid this by creating an iterator with https://nodejs.org/dist/latest-v18.x/docs/api/stream.html#readableiteratoroptions and manually close/destroy the stream.

I think @mcollina, what @Babak-Gholamzadeh means is that no error should happen, when he stops consuming the stream from for-await-of

I think, he's expecting this behavior (no error)
Screenshot_20230219-045443.jpg

@benjamingr
Copy link
Member

Breaking from a for await closes the iterator which destroys the stream so you don't leak resources.

This is intentional in the design of async generators and how async iterators generally behave in JS.

In order to enable your use case - we've added .iterator which you can use and not destroy the stream on breaks https://nodejs.org/api/stream.html#readableiteratoroptions

@benjamingr
Copy link
Member

I also think it's fine to destroy the stream without an AbortError (probably) but I recall objections to that last time we discussed it.

@ronag
Copy link
Member

ronag commented Feb 19, 2023

Actually I think there is a third case here which we haven't considered. I'm out travelling atm but will get back to this.

@Babak-Gholamzadeh
Copy link
Author

Destroying the stream on break is totally fine and reasonable.

But as @tonivj5 mentioned too, the problem is with throwing the error.

Also, using the .iterator to avoid destroying the stream on breaks doesn't look like a solution to me, because what will happen if I want the stream to be destroyed on breaks? Then I have to handle this error?

@ronag
Copy link
Member

ronag commented Feb 24, 2023

I'm a little confused, the following does work as expected, i.e. no error.

for await (const z of Readable.from([1,2,3,4])) {
  break
}

Is this a problem only with stdin? Does anyone have a minimal repro?

@rluvaton
Copy link
Member

rluvaton commented Feb 25, 2023

@ronag I was able to reproduce:

const common = require('../common');

const { Readable } = require('stream');

{
  async function* generate() {
    yield 'something';
  }

  const abortedStream = Readable.from(generate());

  abortedStream.on('error', common.mustNotCall());

  (async () => {
    for await (const chunk of abortedStream) {
      break;
    }
  })().then(common.mustCall());
}
error: AbortError: The operation was aborted
    at Object.destroyer (node:internal/streams/destroy:305:11)
    at createAsyncIterator (node:internal/streams/readable:1149:19)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  code: 'ABORT_ERR'
}

the async IIFE does not return rejected promise but the error event omits though...

@rluvaton
Copy link
Member

Actually, @ronag your example is ok but you didn't listen to error events

const abortedStream = Readable.from([1,2,3,4]);

abortedStream.on('error', common.mustNotCall());

(async () => {
  for await (const chunk of abortedStream) {
    break;
  }
})().then(common.mustCall());

does emit:

AbortError: The operation was aborted
    at Object.destroyer (node:internal/streams/destroy:311:11)
    at createAsyncIterator (node:internal/streams/readable:1149:19)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  code: 'ABORT_ERR'
}

@rluvaton
Copy link
Member

rluvaton commented Feb 25, 2023

@benjamingr is this the expected behavior?

if we want it to not emit an abort error by default when destroying a stream we should remove this line:

err = new AbortError();

but I'm afraid it's a breaking change and not the wanted behavior.

if only on break from iterable we should think of another approach.

maybe a better way is destroy without an error (why destroy must have an error anyway and not be treated as finish in case error is missing?) in here:

destroyImpl.destroyer(stream, null);

@mcollina
Copy link
Member

Because interrupting a flowing stream is an error situation for any of the other consumer.

Note that you can "observe" all the data flowing via asynciterator by adding an on('data') listener. The error emitted in this case is to let them know what happened.

As said before, you can customize this behavior by calling the iterator method and passing a few options.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

8 participants