Skip to content

[stream] uncatchable error thrown during piping if source and dest don't have same objectMode setting #54945

@joelrbrandt

Description

@joelrbrandt

Version

22.8.0

Platform

Darwin [hostname] 23.6.0 Darwin Kernel Version 23.6.0: Mon Jul 29 21:14:46 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6031 arm64

Subsystem

stream

What steps will reproduce the bug?

Run the following code:

import { Readable, Transform } from "node:stream";

async function main() {
  const objectReadable = Readable.from([
    { hello: "world" },
    { goodbye: "world" }
  ]);

  objectReadable.on("error", err => {
    console.log("objectReadable error", err);
  });

  const passThrough = new Transform({
    transform(chunk, _encoding, cb) {
      this.push(chunk);
      cb(null);
    },
    objectMode: false // code works if set to true
  });

  passThrough.on("error", err => {
    console.log("passThrough error", err);
  });

  try {
    console.assert(objectReadable.readableObjectMode, "objectReadable is not in object mode");
    console.assert(!passThrough.writableObjectMode, "passThrough is not in byte mode write side");

    console.log("beginning pipe");
    objectReadable.pipe(passThrough);
  } catch (e) {
    console.error("caught error when calling pipe", e);
    return;
  }

  try {
    console.log("beginning consume of passThrough");
    const output = [];
    for await (const v of passThrough) {
      output.push(v);
    }
    console.log("output", output);
  } catch (e) {
    console.error("caught error while consuming output", e);
    return;
  }

  console.log("done");
}

process.setUncaughtExceptionCaptureCallback(err => {
  console.error("uncaught exception", err);
});

main();

How often does it reproduce? Is there a required condition?

Always

What is the expected behavior? Why is that the expected behavior?

Probably Readable.pipe() should compare the objectMode state of the source and the write side of the destination, and if they don't match, it should synchronously throw.

So, in the code above, I would expect to catch an error on the line that reads console.error("caught error when calling pipe", e);

Alternatively, the writable (or transform) stream could emit an error. In that case, in the code above, I would expect to see an error logged by the passThrough.on("error") callback.

What do you see instead?

There is an uncaught exception. It is caught by the callback to process.setUncaughtExceptionCaptureCallback. That exception is:

TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer, TypedArray, or DataView. Received an instance of Object
    at _write (node:internal/streams/writable:480:13)
    at Writable.write (node:internal/streams/writable:508:10)
    at Readable.ondata (node:internal/streams/readable:1007:22)
    at Readable.emit (node:events:520:28)
    at Readable.read (node:internal/streams/readable:780:10)
    at flow (node:internal/streams/readable:1281:53)
    at emitReadable_ (node:internal/streams/readable:845:3)
    at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
  code: 'ERR_INVALID_ARG_TYPE'
}

I don't believe there is any way to catch this exception in the code that has the streams in context. (But if there is, then that'd be swell! And there's no issue here! And in that case, apologies for not understanding the right part of the streams API to handle these errors correctly.)

Additional information

Broadly, the stream module is great ❤️ ! I love building stuff with it. Thanks for your hard work.

Metadata

Metadata

Assignees

No one assigned

    Labels

    confirmed-bugIssues with confirmed bugs.streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions