@@ -114,6 +114,7 @@ function ReadableState(options, stream, isDuplex) {
114114 this . emittedReadable = false ;
115115 this . readableListening = false ;
116116 this . resumeScheduled = false ;
117+ this . paused = true ;
117118
118119 // Should close be emitted on destroy. Defaults to true.
119120 this . emitClose = options . emitClose !== false ;
@@ -862,10 +863,16 @@ Readable.prototype.removeAllListeners = function(ev) {
862863} ;
863864
864865function updateReadableListening ( self ) {
865- self . _readableState . readableListening = self . listenerCount ( 'readable' ) > 0 ;
866+ const state = self . _readableState ;
867+ state . readableListening = self . listenerCount ( 'readable' ) > 0 ;
866868
867- // crude way to check if we should resume
868- if ( self . listenerCount ( 'data' ) > 0 ) {
869+ if ( state . resumeScheduled && ! state . paused ) {
870+ // flowing needs to be set to true now, otherwise
871+ // the upcoming resume will not flow.
872+ state . flowing = true ;
873+
874+ // crude way to check if we should resume
875+ } else if ( self . listenerCount ( 'data' ) > 0 ) {
869876 self . resume ( ) ;
870877 }
871878}
@@ -887,6 +894,7 @@ Readable.prototype.resume = function() {
887894 state . flowing = ! state . readableListening ;
888895 resume ( this , state ) ;
889896 }
897+ state . paused = false ;
890898 return this ;
891899} ;
892900
@@ -917,6 +925,7 @@ Readable.prototype.pause = function() {
917925 this . _readableState . flowing = false ;
918926 this . emit ( 'pause' ) ;
919927 }
928+ this . _readableState . paused = true ;
920929 return this ;
921930} ;
922931
0 commit comments