Closed
Description
Version
>= 17.3.0
Platform
Ubuntu 20.04
Subsystem
stream
What steps will reproduce the bug?
This behavior started since 17.3.0
and it can also be reproduce on 18.0.0
import { Readable, Writable } from "stream";
import { pipeline } from "stream/promises";
class CustomReadable extends Readable {
constructor(name) {
super();
this.name = name;
}
_read() {
console.log(`${this.name}: write hello world`);
this.push("hello world");
console.log(`${this.name}: end readable`);
this.push(null);
}
}
class CustomWritable extends Writable {
constructor(name) {
super();
this.name = name;
}
_write() {
console.log(`${this.name}: fire write`);
}
end() {
console.log(`${this.name}: fire end`);
}
}
// comment either one to see the actual flow
// fire end once when using pipe
new CustomReadable("pipe").pipe(new CustomWritable("pipe"));
// fire end twice when using pipeline
await pipeline(new CustomReadable("pipeline"), new CustomWritable("pipeline"));
How often does it reproduce? Is there a required condition?
Always.
What is the expected behavior?
.end
should not call twice and behave similar to pipe
.
// pipeline
pipeline: write hello world
pipeline: end readable
pipeline: fire write
pipeline: fire end
// pipe
pipe: write hello world
pipe: end readable
pipe: fire write
pipe: fire end
What do you see instead?
.end
being called twice when using pipeline
.
// pipeline
pipeline: write hello world
pipeline: end readable
pipeline: fire write
pipeline: fire end
pipeline: fire end
// pipe
pipe: write hello world
pipe: end readable
pipe: fire write
pipe: fire end
Additional information
It is currently affecting mongodb
usage in node@18
. I assume it is bug from core but not their side.
If it is the expected behavior, I will try to fix it on mongodb
side.
import { pipeline } from 'stream/promises'
import fs from 'fs'
const { MongoClient, GridFSBucket } = require('mongodb');
const client = await MongoClient.connect(`mongodb://localhost:27017`);
const database = client.db('test01');
const bucket = new GridFSBucket(database, { bucketName: 'test01' });
const readable = fs.createReadStream('/tmp/test.txt');
const writable = bucket.openUploadStream('test.txt')
await pipeline(readable, writable)