Skip to content

stream.pipeline abruptly kill the node process #48406

Closed
@Thammachart

Description

@Thammachart

Version

v18.16.0

Platform

Linux 6.3.6-zen1-1-zen #1 ZEN SMP PREEMPT_DYNAMIC Mon, 05 Jun 2023 15:12:42 +0000 x86_64 GNU/Linux

Subsystem

stream

What steps will reproduce the bug?

This is the snippet that generate Readable stream by pushing start, finished, and 1024 UUID instances in between, and then using that to pipe to file Writable stream named foobar.txt

There are two code paths in this snippet

  • Without flag: node pipeline.mjs will use stream.pipeline(rs,ws) to stream the file
    • function name: pipelineStream
  • With flag: node pipeline.mjs m will use rs.pipe(ws) + Promise to stream the file
    • function name: manuallyPipeStream
import { createWriteStream } from "fs";
import { Readable } from "stream";
import { pipeline } from "stream/promises";

const OUTPUT_FILENAME = "foobar.txt";

const generateContent = async (rs) => {
  for (let i = 0; i < 1024; i++) {
    rs.push("11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000");
    rs.push("\n");
  }
  rs.push("finished\n");
  rs.push(null);
};

const pipelineStream = async (rs) => {
  const ws = createWriteStream(OUTPUT_FILENAME);

  await pipeline(rs, ws);
  console.log("finished piping");
};

const manuallyPipeStream = async (rs) => {
  const ws = createWriteStream(OUTPUT_FILENAME);

  rs.pipe(ws);
  await new Promise((resolve, reject) => {
    ws.on("error", reject);
    ws.on("finish", resolve);
  });
  console.log("finished piping");
};

const main = async () => {
  const manualFn = process.argv[2] === "m";
  console.log("--- node version:", process.version);
  console.log(`--- using ${manualFn ? "`rs.pipe(ws)`" : "`stream.pipeline(rs,ws)`"}`)

  const rs = new Readable();
  rs.push("start\n");

  await Promise.all([
    generateContent(rs),
    manualFn ? manuallyPipeStream(rs) : pipelineStream(rs),
  ]);

  console.log("--- program finished successfully");
};

main();

How often does it reproduce? Is there a required condition?

I've tested the code against v16.20.0, v18.15.0, v18.16.0, v20.3.0, in Arch Linux and macOS 12

OSes doesn't seems to be the factor of this problem.

The problem seems to only exist when all of these conditions are met:

  • Node Versions: v18.16.0 or v20.3.0
  • Running the code without flag: node pipeline.mjs, to use stream.pipeline(rs,ws) code path

What is the expected behavior? Why is that the expected behavior?

When running the code successfully,

  • the standard output is:
--- node version: v18.15.0
--- using `stream.pipeline(rs,ws)`
finished piping
--- program finished successfully
  • File named foobar.txt with the size of 37 KiB, with proper start, finished and UUIDs in between
start
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000
# ... 1019 other UUIDs
finished

What do you see instead?

  • The standard output is only just:
--- node version: v18.16.0
--- using `stream.pipeline(rs,ws)`
  • The node process is abnormally killed without any error/exception thrown and exit code is just 0

  • File foobar.txt become malformed:

    • Only start and 443 UUIDs, without finished
    • Only the size of 16 KiB, which suspiciously match the size of highWaterMark 16384

Additional information

Regardless of node versions and OSes, running the code using flag: node pipeline.mjs m, to use rs.pipe(ws) + Promise code path, do always produce the correct results.

Minimal snippet that has one code path, and only focus on the issue

import { createWriteStream } from "fs";
import { Readable, promises } from "stream";

const generateContent = async (rs) => {
  rs.push("start\n");
  for (let i = 0; i < 1024; i++)
    rs.push("11bf5b37-e0b8-42e0-8dcf-dc8c4aefc000\n");
  rs.push("finished\n");
  rs.push(null);
};

const main = async () => {
  const [rs, ws] = [new Readable(), createWriteStream("foobar.txt")];
  await Promise.all([generateContent(rs), promises.pipeline(rs, ws)]);
  console.log("--- program finished successfully");
};

main().catch((e) => console.error("*** ERR:", e));

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions