-
-
Notifications
You must be signed in to change notification settings - Fork 32.9k
Description
Version
22.8.0
Platform
Darwin [hostname] 23.6.0 Darwin Kernel Version 23.6.0: Mon Jul 29 21:14:46 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6031 arm64
Subsystem
stream
What steps will reproduce the bug?
Run the following code:
import { Readable, Transform } from "node:stream";
async function main() {
const objectReadable = Readable.from([
{ hello: "world" },
{ goodbye: "world" }
]);
objectReadable.on("error", err => {
console.log("objectReadable error", err);
});
const passThrough = new Transform({
transform(chunk, _encoding, cb) {
this.push(chunk);
cb(null);
},
objectMode: false // code works if set to true
});
passThrough.on("error", err => {
console.log("passThrough error", err);
});
try {
console.assert(objectReadable.readableObjectMode, "objectReadable is not in object mode");
console.assert(!passThrough.writableObjectMode, "passThrough is not in byte mode write side");
console.log("beginning pipe");
objectReadable.pipe(passThrough);
} catch (e) {
console.error("caught error when calling pipe", e);
return;
}
try {
console.log("beginning consume of passThrough");
const output = [];
for await (const v of passThrough) {
output.push(v);
}
console.log("output", output);
} catch (e) {
console.error("caught error while consuming output", e);
return;
}
console.log("done");
}
process.setUncaughtExceptionCaptureCallback(err => {
console.error("uncaught exception", err);
});
main();
How often does it reproduce? Is there a required condition?
Always
What is the expected behavior? Why is that the expected behavior?
Probably Readable.pipe()
should compare the objectMode
state of the source and the write side of the destination, and if they don't match, it should synchronously throw.
So, in the code above, I would expect to catch an error on the line that reads console.error("caught error when calling pipe", e);
Alternatively, the writable (or transform) stream could emit an error. In that case, in the code above, I would expect to see an error logged by the passThrough.on("error")
callback.
What do you see instead?
There is an uncaught exception. It is caught by the callback to process.setUncaughtExceptionCaptureCallback
. That exception is:
TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer, TypedArray, or DataView. Received an instance of Object
at _write (node:internal/streams/writable:480:13)
at Writable.write (node:internal/streams/writable:508:10)
at Readable.ondata (node:internal/streams/readable:1007:22)
at Readable.emit (node:events:520:28)
at Readable.read (node:internal/streams/readable:780:10)
at flow (node:internal/streams/readable:1281:53)
at emitReadable_ (node:internal/streams/readable:845:3)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_INVALID_ARG_TYPE'
}
I don't believe there is any way to catch this exception in the code that has the streams in context. (But if there is, then that'd be swell! And there's no issue here! And in that case, apologies for not understanding the right part of the streams API to handle these errors correctly.)
Additional information
Broadly, the stream module is great ❤️ ! I love building stuff with it. Thanks for your hard work.