Description
Version
20.11.0
Platform
Darwin MacBook-Pro-4.local 23.1.0 Darwin Kernel Version 23.1.0: Mon Oct 9 21:27:27 PDT 2023; root:xnu-10002.41.9~6/RELEASE_X86_64 x86_64
Subsystem
stream
What steps will reproduce the bug?
When calling destroy(err)
on a stream created using stream.compose()
some streams that may still be actively processing data do not have _destroy
called and do not emit an error
event. Illustrated by the code below.
If there is a stream that slowly processes data as the last entry in compose(...)
and all it's input has been written (finish
event has fired) and then destroy()
is called on the composed stream, I was expecting the slow processing stream to also be destroyed and emit the error.
Inserting a PassThrough
stream as the last entry in the compose chain seems to fix the issue (uncomment lines below in sample code), but it's unclear to me why that is. Either I've misunderstood something about compose()
ing streams or there's a bug.
import { PassThrough, Duplex, compose } from 'stream'
class SlowProcessor extends Duplex {
constructor (options) {
super({ ...options, objectMode: true })
this.stuff = []
}
_write (message, encoding, callback) {
this.stuff.push(message)
callback()
}
_destroy(err, cb) {
console.log('SlowProcessor _destroy called', err)
cb(err)
}
_read () {
// emulate some slow processing
setTimeout(() => {
if (this.stuff.length) {
this.push(this.stuff.shift())
} else if (this.writableEnded) {
this.push(null)
} else {
setTimeout(() => this._read(), 100)
}
}, 100)
}
}
const composed = compose(
new PassThrough({ objectMode:true })
.on('finish', () => console.log('input passthrough finish'))
.on('end', () => console.log('input passthrough end'))
.on('error', err => console.log('input passthrough error', err)),
new SlowProcessor()
.on('finish', () => console.log('SlowProcessor finish'))
.on('end', () => console.log('SlowProcessor end'))
.on('error', err => console.log('SlowProcessor error', err)),
//
// UNCOMMENT THIS AND IT WORKS AS EXPECTED
//
// new PassThrough({ objectMode:true })
// .on('finish', () => console.log('terminal passthrough finish'))
// .on('end', () => console.log('terminal passthrough end'))
// .on('error', err => console.log('terminal passthrough error', err))
)
.on('finish', () => console.log('composed finish'))
.on('error', err => console.log('composed', err))
.on('end', () => console.log('composed end'))
.on('data', d => console.log('data:', d))
composed.write('hello')
composed.write('world')
composed.end()
setTimeout(() => {
composed.destroy(new Error('Uh-oh'))
}, 100)
How often does it reproduce? Is there a required condition?
Reliably reproduces using sample above.
What is the expected behavior? Why is that the expected behavior?
I'd expect any stream that's part of a compose(...)
chain that is still processing data to emit errors when the outer stream is destroyed.
I'd expect output from the above script something like:
input passthrough end
input passthrough finish
SlowProcessor finish
_destroy called Error: Uh-oh
SlowProcessor error Error: Uh-oh
composed Error: Uh-oh
What do you see instead?
The SlowProcessor
stream does not get destroyed even though its Readable
side has not end
ed.
input passthrough end
input passthrough finish
SlowProcessor finish
// <=== I'd expect an error event from the SlowProcessor here as it hasn't ended
composed Error: Uh-oh
When there's an extra PassThrough
inserted as the last in the compose chain, the results look as I'd expect:
input passthrough end
input passthrough finish
SlowProcessor finish
terminal passthrough error Error: Uh-oh
SlowProcessor _destroy called Error: Uh-oh
SlowProcessor error Error: Uh-oh
composed Error: Uh-oh
Additional information
No response