Skip to content

Readable stream pipe made with {end: false} does not emit an 'unpipe' event on the writable stream. #11837

Closed
@zaide-chris

Description

  • Version: v7.7.2
  • Platform: Windows 10 64bit
  • Subsystem: stream

After a readable stream emits an end event would it not be natural for the stream to completely unpipe it's self from all the destinations even if the pipe was made with pipeOpts = {end: false}?

The pipe does partially clean it's self up but it fails to emit an unpipe event on the writable stream and it ends up leaving a reference to the writable stream in the internal state.pipes of the readable stream.

Looking at _stream_readable.js it appear like it would be easy to fix this by changing:
line 512 from var endFn = doEnd ? onend : cleanup; to var endFn = doEnd ? onend : unpipe;
and
line 548 from src.removeListener('end', cleanup); to src.removeListener('end', unpipe);

the unpipe in this scope calls src.unpipe(dest)
that ends with calling dest.emit('unpipe', src)
that triggers the dest.on('unpipe', onunpipe) listener
onunpipe then calls cleanup

So replacing directly calling cleanup with unpipe there shouldn't result in any side effects other then an 'unpipe' event emitted on the writable stream and readable stream losing an reference to an writable stream that it's never going to use again.

Examples:

const stream = require('stream');
function makeEampleSource(name){
	let source = new stream.Readable({read: () => {
		source.push(`I'm ${name}`)
		source.push(null)
	}})
	source.name = name
	return source
}
let nullOut = new stream.Writable({write: (chunk, encoding, callback) => callback()});
nullOut.on('unpipe', (source)=>{
	console.log(`Unpiped ${source.name}`);
})
nullOut.on('pipe', (source)=>{
	console.log(`Piped ${source.name}`);
})

let sourceA = makeEampleSource('sourceA')
sourceA.pipe(nullOut, {end: false})
sourceA.on('end', ()=>{
	setImmediate(() => {
		console.log('I expected this to be 0:', sourceA._readableState.pipesCount)
	})
})


setTimeout(() => {
	let sourceB = makeEampleSource('sourceB')
	sourceB.pipe(nullOut, {end: false})
	//This next line works around partial unpipe issue
	sourceB.on('end', ()=>{sourceB.unpipe()})
	sourceB.on('end', ()=>{
		setImmediate(() => {
			console.log('This time it is 0:', sourceB._readableState.pipesCount)
		})
	})
}, 10)


setTimeout(() => {
	let sourceC = makeEampleSource('sourceC')
	sourceC.pipe(nullOut)
	sourceC.on('end', ()=>{
		setImmediate(() => {
			console.log('This time it is still 0 as the target\'s \'finish\' ended up trigering unpipe:', sourceC._readableState.pipesCount)
		})
	})
}, 20)

Note the line 'Unpiped sourceA' was never logged but sourceA did end and will never send data to the nullOut again.

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions