@@ -431,7 +431,7 @@ function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
431431function readableAddChunkUnshiftValue ( stream , state , chunk ) {
432432 if ( ( state [ kState ] & kEndEmitted ) !== 0 )
433433 errorOrDestroy ( stream , new ERR_STREAM_UNSHIFT_AFTER_END_EVENT ( ) ) ;
434- else if ( state . destroyed || state . errored )
434+ else if ( ( state [ kState ] & ( kDestroyed | kErrored ) ) !== 0 )
435435 return false ;
436436 else
437437 addChunk ( stream , state , chunk , true ) ;
@@ -608,7 +608,7 @@ function computeNewHighWaterMark(n) {
608608// This function is designed to be inlinable, so please take care when making
609609// changes to the function body.
610610function howMuchToRead ( n , state ) {
611- if ( n <= 0 || ( state . length === 0 && state . ended ) )
611+ if ( n <= 0 || ( state . length === 0 && ( state [ kState ] & kEnded ) !== 0 ) )
612612 return 0 ;
613613 if ( ( state [ kState ] & kObjectMode ) !== 0 )
614614 return 1 ;
@@ -652,7 +652,7 @@ Readable.prototype.read = function(n) {
652652 state . length >= state . highWaterMark :
653653 state . length > 0 ) ||
654654 ( state [ kState ] & kEnded ) !== 0 ) ) {
655- debug ( 'read: emitReadable' , state . length , ( state [ kState ] & kEnded ) !== 0 ) ;
655+ debug ( 'read: emitReadable' ) ;
656656 if ( state . length === 0 && ( state [ kState ] & kEnded ) !== 0 )
657657 endReadable ( this ) ;
658658 else
@@ -810,7 +810,7 @@ function emitReadable(stream) {
810810function emitReadable_ ( stream ) {
811811 const state = stream . _readableState ;
812812 debug ( 'emitReadable_' ) ;
813- if ( ( state [ kState ] & ( kDestroyed | kErrored ) ) === 0 && ( state . length || state . ended ) ) {
813+ if ( ( state [ kState ] & ( kDestroyed | kErrored ) ) === 0 && ( state . length || ( state [ kState ] & kEnded ) !== 0 ) ) {
814814 stream . emit ( 'readable' ) ;
815815 state [ kState ] &= ~ kEmittedReadable ;
816816 }
@@ -891,7 +891,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
891891 const state = this . _readableState ;
892892
893893 if ( state . pipes . length === 1 ) {
894- if ( ! state . multiAwaitDrain ) {
894+ if ( ( state [ kState ] & kMultiAwaitDrain ) === 0 ) {
895895 state [ kState ] |= kMultiAwaitDrain ;
896896 state . awaitDrainWriters = new SafeSet (
897897 state . awaitDrainWriters ? [ state . awaitDrainWriters ] : [ ] ,
@@ -907,7 +907,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
907907 dest !== process . stderr ;
908908
909909 const endFn = doEnd ? onend : unpipe ;
910- if ( state . endEmitted )
910+ if ( ( state [ kState ] & kEndEmitted ) !== 0 )
911911 process . nextTick ( endFn ) ;
912912 else
913913 src . once ( 'end' , endFn ) ;
@@ -966,7 +966,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
966966 if ( state . pipes . length === 1 && state . pipes [ 0 ] === dest ) {
967967 debug ( 'false write response, pause' , 0 ) ;
968968 state . awaitDrainWriters = dest ;
969- state . multiAwaitDrain = false ;
969+ state [ kState ] &= ~ kMultiAwaitDrain ;
970970 } else if ( state . pipes . length > 1 && state . pipes . includes ( dest ) ) {
971971 debug ( 'false write response, pause' , state . awaitDrainWriters . size ) ;
972972 state . awaitDrainWriters . add ( dest ) ;
@@ -1038,7 +1038,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
10381038
10391039 if ( dest . writableNeedDrain === true ) {
10401040 pause ( ) ;
1041- } else if ( ! state . flowing ) {
1041+ } else if ( ( state [ kState ] & kFlowing ) === 0 ) {
10421042 debug ( 'pipe resume' ) ;
10431043 src . resume ( ) ;
10441044 }
@@ -1056,7 +1056,7 @@ function pipeOnDrain(src, dest) {
10561056 if ( state . awaitDrainWriters === dest ) {
10571057 debug ( 'pipeOnDrain' , 1 ) ;
10581058 state . awaitDrainWriters = null ;
1059- } else if ( state . multiAwaitDrain ) {
1059+ } else if ( ( state [ kState ] & kMultiAwaitDrain ) !== 0 ) {
10601060 debug ( 'pipeOnDrain' , state . awaitDrainWriters . size ) ;
10611061 state . awaitDrainWriters . delete ( dest ) ;
10621062 }
@@ -1111,20 +1111,20 @@ Readable.prototype.on = function(ev, fn) {
11111111 if ( ev === 'data' ) {
11121112 // Update readableListening so that resume() may be a no-op
11131113 // a few lines down. This is needed to support once('readable').
1114- state . readableListening = this . listenerCount ( 'readable' ) > 0 ;
1114+ state [ kState ] | = this . listenerCount ( 'readable' ) > 0 ? kReadableListening : 0 ;
11151115
11161116 // Try start flowing on next tick if stream isn't explicitly paused.
1117- if ( state . flowing !== false )
1117+ if ( ( state [ kState ] & ( kHasFlowing | kFlowing ) ) !== kHasFlowing ) {
11181118 this . resume ( ) ;
1119+ }
11191120 } else if ( ev === 'readable' ) {
1120- if ( ! state . endEmitted && ! state . readableListening ) {
1121- state . readableListening = state . needReadable = true ;
1122- state . flowing = false ;
1123- state . emittedReadable = false ;
1124- debug ( 'on readable' , state . length , state . reading ) ;
1121+ if ( ( state [ kState ] & ( kEndEmitted | kReadableListening ) ) === 0 ) {
1122+ state [ kState ] |= kReadableListening | kNeedReadable | kHasFlowing ;
1123+ state [ kState ] &= ~ ( kFlowing | kEmittedReadable ) ;
1124+ debug ( 'on readable' ) ;
11251125 if ( state . length ) {
11261126 emitReadable ( this ) ;
1127- } else if ( ! state . reading ) {
1127+ } else if ( ( state [ kState ] & kReading ) === 0 ) {
11281128 process . nextTick ( nReadingNextTick , this ) ;
11291129 }
11301130 }
@@ -1171,7 +1171,12 @@ Readable.prototype.removeAllListeners = function(ev) {
11711171
11721172function updateReadableListening ( self ) {
11731173 const state = self . _readableState ;
1174- state . readableListening = self . listenerCount ( 'readable' ) > 0 ;
1174+
1175+ if ( self . listenerCount ( 'readable' ) > 0 ) {
1176+ state [ kState ] |= kReadableListening ;
1177+ } else {
1178+ state [ kState ] &= ~ kReadableListening ;
1179+ }
11751180
11761181 if ( ( state [ kState ] & ( kHasPaused | kPaused | kResumeScheduled ) ) === ( kHasPaused | kResumeScheduled ) ) {
11771182 // Flowing needs to be set to true now, otherwise
@@ -1201,7 +1206,7 @@ Readable.prototype.resume = function() {
12011206 // for readable, but we still have to call
12021207 // resume().
12031208 state [ kState ] |= kHasFlowing ;
1204- if ( ! state . readableListening ) {
1209+ if ( ( state [ kState ] & kReadableListening ) === 0 ) {
12051210 state [ kState ] |= kFlowing ;
12061211 } else {
12071212 state [ kState ] &= ~ kFlowing ;
@@ -1214,8 +1219,8 @@ Readable.prototype.resume = function() {
12141219} ;
12151220
12161221function resume ( stream , state ) {
1217- if ( ! state . resumeScheduled ) {
1218- state . resumeScheduled = true ;
1222+ if ( ( state [ kState ] & kResumeScheduled ) === 0 ) {
1223+ state [ kState ] |= kResumeScheduled ;
12191224 process . nextTick ( resume_ , stream , state ) ;
12201225 }
12211226}
@@ -1236,7 +1241,7 @@ function resume_(stream, state) {
12361241Readable . prototype . pause = function ( ) {
12371242 const state = this . _readableState ;
12381243 debug ( 'call pause' ) ;
1239- if ( state . flowing !== false ) {
1244+ if ( ( state [ kState ] & ( kHasFlowing | kFlowing ) ) !== kHasFlowing ) {
12401245 debug ( 'pause' ) ;
12411246 state [ kState ] |= kHasFlowing ;
12421247 state [ kState ] &= ~ kFlowing ;
@@ -1651,20 +1656,19 @@ function fromList(n, state) {
16511656function endReadable ( stream ) {
16521657 const state = stream . _readableState ;
16531658
1654- debug ( 'endReadable' , ( state [ kState ] & kEndEmitted ) !== 0 ) ;
1659+ debug ( 'endReadable' ) ;
16551660 if ( ( state [ kState ] & kEndEmitted ) === 0 ) {
16561661 state [ kState ] |= kEnded ;
16571662 process . nextTick ( endReadableNT , state , stream ) ;
16581663 }
16591664}
16601665
16611666function endReadableNT ( state , stream ) {
1662- debug ( 'endReadableNT' , state . endEmitted , state . length ) ;
1667+ debug ( 'endReadableNT' ) ;
16631668
16641669 // Check that we didn't get one last unshift.
1665- if ( ! state . errored && ! state . closeEmitted &&
1666- ! state . endEmitted && state . length === 0 ) {
1667- state . endEmitted = true ;
1670+ if ( ( state [ kState ] & ( kErrored | kCloseEmitted | kEndEmitted ) ) === 0 && state . length === 0 ) {
1671+ state [ kState ] |= kEndEmitted ;
16681672 stream . emit ( 'end' ) ;
16691673
16701674 if ( stream . writable && stream . allowHalfOpen === false ) {
0 commit comments