@@ -681,20 +681,17 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
681681 dest . end ( ) ;
682682 }
683683
684- // When the dest drains, it reduces the awaitDrain counter
685- // on the source. This would be more elegant with a .once()
686- // handler in flow(), but adding and removing repeatedly is
687- // too slow.
688- const ondrain = pipeOnDrain ( src ) ;
689- dest . on ( 'drain' , ondrain ) ;
684+ let ondrain ;
690685
691686 var cleanedUp = false ;
692687 function cleanup ( ) {
693688 debug ( 'cleanup' ) ;
694689 // Cleanup event handlers once the pipe is broken
695690 dest . removeListener ( 'close' , onclose ) ;
696691 dest . removeListener ( 'finish' , onfinish ) ;
697- dest . removeListener ( 'drain' , ondrain ) ;
692+ if ( ondrain ) {
693+ dest . removeListener ( 'drain' , ondrain ) ;
694+ }
698695 dest . removeListener ( 'error' , onerror ) ;
699696 dest . removeListener ( 'unpipe' , onunpipe ) ;
700697 src . removeListener ( 'end' , onend ) ;
@@ -708,7 +705,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
708705 // flowing again.
709706 // So, if this is awaiting a drain, then we just call it now.
710707 // If we don't know, then assume that we are waiting for one.
711- if ( state . awaitDrain &&
708+ if ( ondrain && state . awaitDrain &&
712709 ( ! dest . _writableState || dest . _writableState . needDrain ) )
713710 ondrain ( ) ;
714711 }
@@ -729,6 +726,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
729726 debug ( 'false write response, pause' , state . awaitDrain ) ;
730727 state . awaitDrain ++ ;
731728 }
729+ if ( ! ondrain ) {
730+ // When the dest drains, it reduces the awaitDrain counter
731+ // on the source. This would be more elegant with a .once()
732+ // handler in flow(), but adding and removing repeatedly is
733+ // too slow.
734+ ondrain = pipeOnDrain ( src ) ;
735+ dest . on ( 'drain' , ondrain ) ;
736+ }
732737 src . pause ( ) ;
733738 }
734739 }
0 commit comments