Skip to content

end is called twice when using pipeline #42866

Closed
@climba03003

Description

@climba03003

Version

>= 17.3.0

Platform

Ubuntu 20.04

Subsystem

stream

What steps will reproduce the bug?

This behavior started since 17.3.0 and it can also be reproduce on 18.0.0

import { Readable, Writable } from "stream";
import { pipeline } from "stream/promises";

class CustomReadable extends Readable {
  constructor(name) {
    super();
    this.name = name;
  }

  _read() {
    console.log(`${this.name}: write hello world`);
    this.push("hello world");
    console.log(`${this.name}: end readable`);
    this.push(null);
  }
}

class CustomWritable extends Writable {
  constructor(name) {
    super();
    this.name = name;
  }

  _write() {
    console.log(`${this.name}: fire write`);
  }

  end() {
    console.log(`${this.name}: fire end`);
  }
}

// comment either one to see the actual flow
// fire end once when using pipe
new CustomReadable("pipe").pipe(new CustomWritable("pipe"));
// fire end twice when using pipeline
await pipeline(new CustomReadable("pipeline"), new CustomWritable("pipeline"));

How often does it reproduce? Is there a required condition?

Always.

What is the expected behavior?

.end should not call twice and behave similar to pipe.

// pipeline
pipeline: write hello world
pipeline: end readable
pipeline: fire write
pipeline: fire end

// pipe
pipe: write hello world
pipe: end readable
pipe: fire write
pipe: fire end

What do you see instead?

.end being called twice when using pipeline.

// pipeline
pipeline: write hello world
pipeline: end readable
pipeline: fire write
pipeline: fire end
pipeline: fire end

// pipe
pipe: write hello world
pipe: end readable
pipe: fire write
pipe: fire end

Additional information

It is currently affecting mongodb usage in node@18. I assume it is bug from core but not their side.
If it is the expected behavior, I will try to fix it on mongodb side.

import { pipeline } from 'stream/promises'
import fs from 'fs'

const { MongoClient, GridFSBucket } = require('mongodb');

const client = await MongoClient.connect(`mongodb://localhost:27017`);

const database = client.db('test01');
const bucket = new GridFSBucket(database, { bucketName: 'test01' });

const readable = fs.createReadStream('/tmp/test.txt');
const writable = bucket.openUploadStream('test.txt')
await pipeline(readable, writable)

Metadata

Metadata

Assignees

No one assigned

    Labels

    help wantedIssues that need assistance from volunteers or PRs that need help to proceed.streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions