-
-
Notifications
You must be signed in to change notification settings - Fork 33.9k
Description
Version
v18.16.0
Platform
Linux 6.3.6-zen1-1-zen #1 ZEN SMP PREEMPT_DYNAMIC Mon, 05 Jun 2023 15:12:42 +0000 x86_64 GNU/Linux
Subsystem
stream
What steps will reproduce the bug?
This is the snippet that generate Readable stream by pushing start, finished, and 1024 UUID instances in between, and then using that to pipe to file Writable stream named foobar.txt
There are two code paths in this snippet
- Without flag:
node pipeline.mjswill usestream.pipeline(rs,ws)to stream the file- function name:
pipelineStream
- function name:
- With flag:
node pipeline.mjs mwill users.pipe(ws)+Promiseto stream the file- function name:
manuallyPipeStream
- function name:
import { createWriteStream } from "fs";
import { Readable } from "stream";
import { pipeline } from "stream/promises";
const OUTPUT_FILENAME = "foobar.txt";
const generateContent = async (rs) => {
for (let i = 0; i < 1024; i++) {
rs.push("11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000");
rs.push("\n");
}
rs.push("finished\n");
rs.push(null);
};
const pipelineStream = async (rs) => {
const ws = createWriteStream(OUTPUT_FILENAME);
await pipeline(rs, ws);
console.log("finished piping");
};
const manuallyPipeStream = async (rs) => {
const ws = createWriteStream(OUTPUT_FILENAME);
rs.pipe(ws);
await new Promise((resolve, reject) => {
ws.on("error", reject);
ws.on("finish", resolve);
});
console.log("finished piping");
};
const main = async () => {
const manualFn = process.argv[2] === "m";
console.log("--- node version:", process.version);
console.log(`--- using ${manualFn ? "`rs.pipe(ws)`" : "`stream.pipeline(rs,ws)`"}`)
const rs = new Readable();
rs.push("start\n");
await Promise.all([
generateContent(rs),
manualFn ? manuallyPipeStream(rs) : pipelineStream(rs),
]);
console.log("--- program finished successfully");
};
main();How often does it reproduce? Is there a required condition?
I've tested the code against v16.20.0, v18.15.0, v18.16.0, v20.3.0, in Arch Linux and macOS 12
OSes doesn't seems to be the factor of this problem.
The problem seems to only exist when all of these conditions are met:
- Node Versions:
v18.16.0orv20.3.0 - Running the code without flag:
node pipeline.mjs, to usestream.pipeline(rs,ws)code path
What is the expected behavior? Why is that the expected behavior?
When running the code successfully,
- the standard output is:
--- node version: v18.15.0
--- using `stream.pipeline(rs,ws)`
finished piping
--- program finished successfully
- File named
foobar.txtwith the size of 37 KiB, with properstart,finishedand UUIDs in between
start
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
# ... 1019 other UUIDs
finished
What do you see instead?
- The standard output is only just:
--- node version: v18.16.0
--- using `stream.pipeline(rs,ws)`
-
The node process is abnormally killed without any error/exception thrown and exit code is just
0 -
File
foobar.txtbecome malformed:- Only
startand 443 UUIDs, withoutfinished - Only the size of 16 KiB, which suspiciously match the size of highWaterMark
16384
- Only
Additional information
Regardless of node versions and OSes, running the code using flag: node pipeline.mjs m, to use rs.pipe(ws) + Promise code path, do always produce the correct results.
Minimal snippet that has one code path, and only focus on the issue
import { createWriteStream } from "fs";
import { Readable, promises } from "stream";
const generateContent = async (rs) => {
rs.push("start\n");
for (let i = 0; i < 1024; i++)
rs.push("11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000\n");
rs.push("finished\n");
rs.push(null);
};
const main = async () => {
const [rs, ws] = [new Readable(), createWriteStream("foobar.txt")];
await Promise.all([generateContent(rs), promises.pipeline(rs, ws)]);
console.log("--- program finished successfully");
};
main().catch((e) => console.error("*** ERR:", e));