Description
Version
v18.16.0
Platform
Linux 6.3.6-zen1-1-zen #1 ZEN SMP PREEMPT_DYNAMIC Mon, 05 Jun 2023 15:12:42 +0000 x86_64 GNU/Linux
Subsystem
stream
What steps will reproduce the bug?
This is the snippet that generate Readable stream by pushing start
, finished
, and 1024 UUID instances in between, and then using that to pipe to file Writable stream named foobar.txt
There are two code paths in this snippet
- Without flag:
node pipeline.mjs
will usestream.pipeline(rs,ws)
to stream the file- function name:
pipelineStream
- function name:
- With flag:
node pipeline.mjs m
will users.pipe(ws)
+Promise
to stream the file- function name:
manuallyPipeStream
- function name:
import { createWriteStream } from "fs";
import { Readable } from "stream";
import { pipeline } from "stream/promises";
const OUTPUT_FILENAME = "foobar.txt";
const generateContent = async (rs) => {
for (let i = 0; i < 1024; i++) {
rs.push("11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000");
rs.push("\n");
}
rs.push("finished\n");
rs.push(null);
};
const pipelineStream = async (rs) => {
const ws = createWriteStream(OUTPUT_FILENAME);
await pipeline(rs, ws);
console.log("finished piping");
};
const manuallyPipeStream = async (rs) => {
const ws = createWriteStream(OUTPUT_FILENAME);
rs.pipe(ws);
await new Promise((resolve, reject) => {
ws.on("error", reject);
ws.on("finish", resolve);
});
console.log("finished piping");
};
const main = async () => {
const manualFn = process.argv[2] === "m";
console.log("--- node version:", process.version);
console.log(`--- using ${manualFn ? "`rs.pipe(ws)`" : "`stream.pipeline(rs,ws)`"}`)
const rs = new Readable();
rs.push("start\n");
await Promise.all([
generateContent(rs),
manualFn ? manuallyPipeStream(rs) : pipelineStream(rs),
]);
console.log("--- program finished successfully");
};
main();
How often does it reproduce? Is there a required condition?
I've tested the code against v16.20.0
, v18.15.0
, v18.16.0
, v20.3.0
, in Arch Linux and macOS 12
OSes doesn't seems to be the factor of this problem.
The problem seems to only exist when all of these conditions are met:
- Node Versions:
v18.16.0
orv20.3.0
- Running the code without flag:
node pipeline.mjs
, to usestream.pipeline(rs,ws)
code path
What is the expected behavior? Why is that the expected behavior?
When running the code successfully,
- the standard output is:
--- node version: v18.15.0
--- using `stream.pipeline(rs,ws)`
finished piping
--- program finished successfully
- File named
foobar.txt
with the size of 37 KiB, with properstart
,finished
and UUIDs in between
start
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
# ... 1019 other UUIDs
finished
What do you see instead?
- The standard output is only just:
--- node version: v18.16.0
--- using `stream.pipeline(rs,ws)`
-
The node process is abnormally killed without any error/exception thrown and exit code is just
0
-
File
foobar.txt
become malformed:- Only
start
and 443 UUIDs, withoutfinished
- Only the size of 16 KiB, which suspiciously match the size of highWaterMark
16384
- Only
Additional information
Regardless of node versions and OSes, running the code using flag: node pipeline.mjs m
, to use rs.pipe(ws)
+ Promise
code path, do always produce the correct results.
Minimal snippet that has one code path, and only focus on the issue
import { createWriteStream } from "fs";
import { Readable, promises } from "stream";
const generateContent = async (rs) => {
rs.push("start\n");
for (let i = 0; i < 1024; i++)
rs.push("11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000\n");
rs.push("finished\n");
rs.push(null);
};
const main = async () => {
const [rs, ws] = [new Readable(), createWriteStream("foobar.txt")];
await Promise.all([generateContent(rs), promises.pipeline(rs, ws)]);
console.log("--- program finished successfully");
};
main().catch((e) => console.error("*** ERR:", e));