5
5
6
6
const {
7
7
ArrayIsArray,
8
+ Promise,
8
9
SymbolAsyncIterator,
9
10
} = primordials ;
10
11
@@ -13,21 +14,27 @@ const eos = require('internal/streams/end-of-stream');
13
14
const { once } = require ( 'internal/util' ) ;
14
15
const destroyImpl = require ( 'internal/streams/destroy' ) ;
15
16
const {
16
- ERR_INVALID_ARG_TYPE ,
17
- ERR_INVALID_RETURN_VALUE ,
18
- ERR_MISSING_ARGS ,
19
- ERR_STREAM_DESTROYED
20
- } = require ( 'internal/errors' ) . codes ;
17
+ aggregateTwoErrors,
18
+ codes : {
19
+ ERR_INVALID_ARG_TYPE ,
20
+ ERR_INVALID_RETURN_VALUE ,
21
+ ERR_MISSING_ARGS ,
22
+ ERR_STREAM_DESTROYED ,
23
+ ERR_STREAM_PREMATURE_CLOSE ,
24
+ } ,
25
+ } = require ( 'internal/errors' ) ;
21
26
22
27
const { validateCallback } = require ( 'internal/validators' ) ;
23
28
29
+ function noop ( ) { }
30
+
24
31
const {
25
32
isIterable,
26
33
isReadable,
27
34
isStream,
28
35
} = require ( 'internal/streams/utils' ) ;
36
+ const assert = require ( 'internal/assert' ) ;
29
37
30
- let EE ;
31
38
let PassThrough ;
32
39
let Readable ;
33
40
@@ -101,25 +108,62 @@ async function* fromReadable(val) {
101
108
}
102
109
103
110
async function pump ( iterable , writable , finish ) {
104
- if ( ! EE ) {
105
- EE = require ( 'events' ) ;
106
- }
107
111
let error ;
112
+ let callback = noop ;
113
+ const resume = ( err ) => {
114
+ error = aggregateTwoErrors ( error , err ) ;
115
+ const _callback = callback ;
116
+ callback = noop ;
117
+ _callback ( ) ;
118
+ } ;
119
+ const onClose = ( ) => {
120
+ resume ( new ERR_STREAM_PREMATURE_CLOSE ( ) ) ;
121
+ } ;
122
+
123
+ const waitForDrain = ( ) => new Promise ( ( resolve ) => {
124
+ assert ( callback === noop ) ;
125
+ if ( error || writable . destroyed ) {
126
+ resolve ( ) ;
127
+ } else {
128
+ callback = resolve ;
129
+ }
130
+ } ) ;
131
+
132
+ writable
133
+ . on ( 'drain' , resume )
134
+ . on ( 'error' , resume )
135
+ . on ( 'close' , onClose ) ;
136
+
108
137
try {
109
- if ( writable . writableNeedDrain === true ) {
110
- await EE . once ( writable , 'drain' ) ;
138
+ if ( writable . writableNeedDrain ) {
139
+ await waitForDrain ( ) ;
140
+ }
141
+
142
+ if ( error ) {
143
+ return ;
111
144
}
112
145
113
146
for await ( const chunk of iterable ) {
114
147
if ( ! writable . write ( chunk ) ) {
115
- if ( writable . destroyed ) return ;
116
- await EE . once ( writable , 'drain' ) ;
148
+ await waitForDrain ( ) ;
149
+ }
150
+ if ( error ) {
151
+ return ;
117
152
}
118
153
}
154
+
155
+ if ( error ) {
156
+ return ;
157
+ }
158
+
119
159
writable . end ( ) ;
120
160
} catch ( err ) {
121
- error = err ;
161
+ error = aggregateTwoErrors ( error , err ) ;
122
162
} finally {
163
+ writable
164
+ . off ( 'drain' , resume )
165
+ . off ( 'error' , resume )
166
+ . off ( 'close' , onClose ) ;
123
167
finish ( error ) ;
124
168
}
125
169
}
0 commit comments