55
66const {
77 ArrayIsArray,
8+ Promise,
89 SymbolAsyncIterator,
910} = primordials ;
1011
@@ -13,21 +14,27 @@ const eos = require('internal/streams/end-of-stream');
1314const { once } = require ( 'internal/util' ) ;
1415const destroyImpl = require ( 'internal/streams/destroy' ) ;
1516const {
16- ERR_INVALID_ARG_TYPE ,
17- ERR_INVALID_RETURN_VALUE ,
18- ERR_MISSING_ARGS ,
19- ERR_STREAM_DESTROYED
20- } = require ( 'internal/errors' ) . codes ;
17+ aggregateTwoErrors,
18+ codes : {
19+ ERR_INVALID_ARG_TYPE ,
20+ ERR_INVALID_RETURN_VALUE ,
21+ ERR_MISSING_ARGS ,
22+ ERR_STREAM_DESTROYED ,
23+ ERR_STREAM_PREMATURE_CLOSE ,
24+ } ,
25+ } = require ( 'internal/errors' ) ;
2126
2227const { validateCallback } = require ( 'internal/validators' ) ;
2328
29+ function noop ( ) { }
30+
2431const {
2532 isIterable,
2633 isReadable,
2734 isStream,
2835} = require ( 'internal/streams/utils' ) ;
36+ const assert = require ( 'internal/assert' ) ;
2937
30- let EE ;
3138let PassThrough ;
3239let Readable ;
3340
@@ -101,25 +108,62 @@ async function* fromReadable(val) {
101108}
102109
103110async function pump ( iterable , writable , finish ) {
104- if ( ! EE ) {
105- EE = require ( 'events' ) ;
106- }
107111 let error ;
112+ let callback = noop ;
113+ const resume = ( err ) => {
114+ error = aggregateTwoErrors ( error , err ) ;
115+ const _callback = callback ;
116+ callback = noop ;
117+ _callback ( ) ;
118+ } ;
119+ const onClose = ( ) => {
120+ resume ( new ERR_STREAM_PREMATURE_CLOSE ( ) ) ;
121+ } ;
122+
123+ const waitForDrain = ( ) => new Promise ( ( resolve ) => {
124+ assert ( callback === noop ) ;
125+ if ( error || writable . destroyed ) {
126+ resolve ( ) ;
127+ } else {
128+ callback = resolve ;
129+ }
130+ } ) ;
131+
132+ writable
133+ . on ( 'drain' , resume )
134+ . on ( 'error' , resume )
135+ . on ( 'close' , onClose ) ;
136+
108137 try {
109- if ( writable . writableNeedDrain === true ) {
110- await EE . once ( writable , 'drain' ) ;
138+ if ( writable . writableNeedDrain ) {
139+ await waitForDrain ( ) ;
140+ }
141+
142+ if ( error ) {
143+ return ;
111144 }
112145
113146 for await ( const chunk of iterable ) {
114147 if ( ! writable . write ( chunk ) ) {
115- if ( writable . destroyed ) return ;
116- await EE . once ( writable , 'drain' ) ;
148+ await waitForDrain ( ) ;
149+ }
150+ if ( error ) {
151+ return ;
117152 }
118153 }
154+
155+ if ( error ) {
156+ return ;
157+ }
158+
119159 writable . end ( ) ;
120160 } catch ( err ) {
121- error = err ;
161+ error = aggregateTwoErrors ( error , err ) ;
122162 } finally {
163+ writable
164+ . off ( 'drain' , resume )
165+ . off ( 'error' , resume )
166+ . off ( 'close' , onClose ) ;
123167 finish ( error ) ;
124168 }
125169}
0 commit comments