Readable stream pipe made with {end: false} does not emit an 'unpipe' event on the writable stream. #11837
Description
- Version: v7.7.2
- Platform: Windows 10 64bit
- Subsystem: stream
After a readable stream emits an end event would it not be natural for the stream to completely unpipe it's self from all the destinations even if the pipe was made with pipeOpts = {end: false}
?
The pipe does partially clean it's self up but it fails to emit an unpipe event on the writable stream and it ends up leaving a reference to the writable stream in the internal state.pipes of the readable stream.
Looking at _stream_readable.js it appear like it would be easy to fix this by changing:
line 512 from var endFn = doEnd ? onend : cleanup;
to var endFn = doEnd ? onend : unpipe;
and
line 548 from src.removeListener('end', cleanup);
to src.removeListener('end', unpipe);
the unpipe
in this scope calls src.unpipe(dest)
that ends with calling dest.emit('unpipe', src)
that triggers the dest.on('unpipe', onunpipe)
listener
onunpipe
then calls cleanup
So replacing directly calling cleanup
with unpipe
there shouldn't result in any side effects other then an 'unpipe' event emitted on the writable stream and readable stream losing an reference to an writable stream that it's never going to use again.
Examples:
const stream = require('stream');
function makeEampleSource(name){
let source = new stream.Readable({read: () => {
source.push(`I'm ${name}`)
source.push(null)
}})
source.name = name
return source
}
let nullOut = new stream.Writable({write: (chunk, encoding, callback) => callback()});
nullOut.on('unpipe', (source)=>{
console.log(`Unpiped ${source.name}`);
})
nullOut.on('pipe', (source)=>{
console.log(`Piped ${source.name}`);
})
let sourceA = makeEampleSource('sourceA')
sourceA.pipe(nullOut, {end: false})
sourceA.on('end', ()=>{
setImmediate(() => {
console.log('I expected this to be 0:', sourceA._readableState.pipesCount)
})
})
setTimeout(() => {
let sourceB = makeEampleSource('sourceB')
sourceB.pipe(nullOut, {end: false})
//This next line works around partial unpipe issue
sourceB.on('end', ()=>{sourceB.unpipe()})
sourceB.on('end', ()=>{
setImmediate(() => {
console.log('This time it is 0:', sourceB._readableState.pipesCount)
})
})
}, 10)
setTimeout(() => {
let sourceC = makeEampleSource('sourceC')
sourceC.pipe(nullOut)
sourceC.on('end', ()=>{
setImmediate(() => {
console.log('This time it is still 0 as the target\'s \'finish\' ended up trigering unpipe:', sourceC._readableState.pipesCount)
})
})
}, 20)
Note the line 'Unpiped sourceA' was never logged but sourceA did end and will never send data to the nullOut again.