1
+ /* istanbul ignore file: TODO add more coverage */
2
+
3
+ // Ported from https://github.com/nodejs/undici/pull/907
4
+
1
5
'use strict'
2
6
7
+ const assert = require ( 'assert' )
3
8
const { Readable } = require ( 'stream' )
4
- const { InvalidArgumentError } = require ( '../core/errors' )
9
+ const { InvalidArgumentError, RequestAbortedError } = require ( '../core/errors' )
5
10
6
11
let StringDecoder
7
12
let Blob
8
13
9
14
const kConsume = Symbol ( 'kConsume' )
10
15
const kReading = Symbol ( 'kReading' )
16
+ const kBody = Symbol ( 'kBody' )
11
17
12
- const kWebStreamType = 1
13
- const kTextType = 2
14
- const kBlobType = 3
15
- const kArrayBufferType = 4
16
- const kJSONType = 5
17
-
18
- class AbortError extends Error {
19
- constructor ( message ) {
20
- super ( message )
21
- Error . captureStackTrace ( this , AbortError )
22
- this . name = 'AbortError'
23
- this . message = 'aborted'
24
- this . code = 'UND_ERR_ABORTED'
25
- }
26
- }
18
+ const kTextType = 1
19
+ const kBlobType = 2
20
+ const kArrayBufferType = 3
21
+ const kJSONType = 4
27
22
28
23
module . exports = class BodyReadable extends Readable {
29
24
constructor ( opts ) {
@@ -32,12 +27,24 @@ module.exports = class BodyReadable extends Readable {
32
27
this . _readableState . dataEmitted = false
33
28
34
29
this [ kConsume ] = null
30
+ this [ kBody ] = null
35
31
this [ kReading ] = false // Is stream being consumed through Readable API?
36
32
}
37
33
34
+ destroy ( err ) {
35
+ if ( this . destroyed ) {
36
+ // Node < 16
37
+ return this
38
+ }
39
+ return super . destroy ( err )
40
+ }
41
+
38
42
emit ( ev , ...args ) {
39
43
if ( ev === 'data' ) {
40
44
this . _readableState . dataEmitted = true
45
+ } else if ( ev === 'error' ) {
46
+ // Node < 16
47
+ this . _readableState . errorEmitted = true
41
48
}
42
49
return super . emit ( ev , ...args )
43
50
}
@@ -49,6 +56,13 @@ module.exports = class BodyReadable extends Readable {
49
56
return super . on ( ev , ...args )
50
57
}
51
58
59
+ addListener ( ev , ...args ) {
60
+ if ( ev === 'data' || ev === 'readable' ) {
61
+ this [ kReading ] = true
62
+ }
63
+ return super . addListener ( ev , ...args )
64
+ }
65
+
52
66
push ( chunk , encoding ) {
53
67
if ( this [ kConsume ] && chunk !== null && ! this [ kReading ] ) {
54
68
// Fast path.
@@ -90,12 +104,16 @@ module.exports = class BodyReadable extends Readable {
90
104
return isDisturbed ( this )
91
105
}
92
106
107
+ // https://fetch.spec.whatwg.org/#dom-body-body
93
108
get body ( ) {
94
- if ( this [ kConsume ] && this [ kConsume ] . type === kWebStreamType ) {
95
- return this [ kConsume ] . stream
109
+ if ( ! this [ kBody ] ) {
110
+ if ( isUnusable ( this ) ) {
111
+ throw new TypeError ( 'unusable' )
112
+ }
113
+ this [ kBody ] = Readable . toWeb ( this )
96
114
}
97
115
98
- return consume ( this , kWebStreamType )
116
+ return this [ kBody ]
99
117
}
100
118
}
101
119
@@ -123,32 +141,17 @@ function isUnusable (self) {
123
141
return isDisturbed ( self ) || isLocked ( self )
124
142
}
125
143
126
- async function consume ( parent , type ) {
127
- if ( isUnusable ( parent ) ) {
128
- // eslint-disable-next-line no-restricted-syntax
144
+ async function consume ( stream , type ) {
145
+ if ( isUnusable ( stream ) ) {
129
146
throw new TypeError ( 'unusable' )
130
147
}
131
148
132
- if ( parent [ kConsume ] ) {
133
- // TODO: Should multiple consume in same tick be possible?
134
- // eslint-disable-next-line no-restricted-syntax
135
- throw new TypeError ( 'unusable' )
136
- }
137
-
138
- if ( type === kWebStreamType ) {
139
- const consume = parent [ kConsume ] = {
140
- type,
141
- // TODO: Optimized implementation for web streams.
142
- stream : Readable . toWeb ( parent )
143
- }
144
-
145
- return consume . stream
146
- }
149
+ assert ( ! stream [ kConsume ] )
147
150
148
151
return new Promise ( ( resolve , reject ) => {
149
- parent [ kConsume ] = {
152
+ stream [ kConsume ] = {
150
153
type,
151
- parent ,
154
+ stream ,
152
155
resolve,
153
156
reject,
154
157
length : 0 ,
@@ -159,17 +162,18 @@ async function consume (parent, type) {
159
162
ended : false
160
163
}
161
164
162
- parent
165
+ stream
163
166
. once ( 'error' , function ( err ) {
164
167
consumeFinish ( this [ kConsume ] , err )
165
168
} )
166
169
. once ( 'close' , function ( ) {
167
170
if ( this [ kConsume ] . body !== null ) {
168
- consumeFinish ( this [ kConsume ] , new AbortError ( ) )
171
+ // TODO: Use Node error?
172
+ consumeFinish ( this [ kConsume ] , new RequestAbortedError ( ) )
169
173
}
170
174
} )
171
175
172
- process . nextTick ( consumeStart , parent [ kConsume ] )
176
+ process . nextTick ( consumeStart , stream [ kConsume ] )
173
177
} )
174
178
}
175
179
@@ -178,7 +182,7 @@ function consumeStart (consume) {
178
182
return
179
183
}
180
184
181
- const { _readableState : state } = consume . parent
185
+ const { _readableState : state } = consume . stream
182
186
183
187
for ( const chunk of state . buffer ) {
184
188
consumePush ( consume , chunk )
@@ -187,20 +191,18 @@ function consumeStart (consume) {
187
191
if ( state . endEmitted ) {
188
192
consumeEnd ( this [ kConsume ] )
189
193
} else {
190
- consume . parent . once ( 'end' , function ( ) {
194
+ consume . stream . once ( 'end' , function ( ) {
191
195
consumeEnd ( this [ kConsume ] )
192
196
} )
193
197
}
194
198
195
- if ( consume . parent . isPaused ( ) ) {
196
- consume . parent . resume ( )
197
- }
199
+ consume . stream . resume ( )
198
200
199
- while ( consume . parent . read ( ) != null ) ;
201
+ while ( consume . stream . read ( ) != null ) ;
200
202
}
201
203
202
204
function consumeEnd ( consume ) {
203
- const { type, body, resolve, decoder, parent , length } = consume
205
+ const { type, body, resolve, decoder, stream , length } = consume
204
206
205
207
try {
206
208
if ( type === kTextType ) {
@@ -226,7 +228,7 @@ function consumeEnd (consume) {
226
228
227
229
consumeFinish ( consume )
228
230
} catch ( err ) {
229
- parent . destroy ( err )
231
+ stream . destroy ( err )
230
232
}
231
233
}
232
234
@@ -241,7 +243,7 @@ function consumePush (consume, chunk, encoding) {
241
243
242
244
if ( chunk === null ) {
243
245
consume . ended = true
244
- consume . parent . read ( )
246
+ consume . stream . read ( )
245
247
return false
246
248
}
247
249
@@ -253,7 +255,7 @@ function consumePush (consume, chunk, encoding) {
253
255
consumePushBuffer ( consume , chunk , encoding )
254
256
}
255
257
256
- if ( ! consume . parent [ kReading ] && ! consume . reading ) {
258
+ if ( ! consume . stream [ kReading ] && ! consume . reading ) {
257
259
consume . reading = true
258
260
process . nextTick ( consumeReadMore , consume )
259
261
}
@@ -283,6 +285,7 @@ function consumePushString (consume, chunk, encoding) {
283
285
} else {
284
286
// TODO: What if objectMode? Should we just fail consume
285
287
// or throw?
288
+ // TODO: Use Node error?
286
289
throw new InvalidArgumentError ( 'chunk' )
287
290
}
288
291
@@ -300,6 +303,7 @@ function consumePushBuffer (consume, chunk, encoding) {
300
303
} else if ( ! ArrayBuffer . isView ( chunk ) ) {
301
304
// TODO: What if objectMode? Should we just fail consume
302
305
// or throw?
306
+ // TODO: Use Node error?
303
307
throw new InvalidArgumentError ( 'chunk' )
304
308
}
305
309
@@ -308,21 +312,22 @@ function consumePushBuffer (consume, chunk, encoding) {
308
312
}
309
313
310
314
function consumeReadMore ( consume ) {
311
- if ( consume . parent [ kReading ] ) {
312
- consume . reading = false
313
- return
314
- }
315
-
316
315
consume . pushed = true
317
- while ( consume . pushed ) {
316
+
317
+ while ( consume . pushed && ! consume . stream [ kReading ] ) {
318
318
consume . pushed = false
319
- consume . parent . _read ( consume . parent )
319
+ consume . stream . _read ( consume . stream )
320
320
}
321
321
322
+ consume . pushed = false
322
323
consume . reading = false
323
324
}
324
325
325
326
function consumeFinish ( consume , err ) {
327
+ if ( consume . body === null ) {
328
+ return
329
+ }
330
+
326
331
if ( err ) {
327
332
consume . reject ( err )
328
333
} else {
0 commit comments