@@ -11,7 +11,7 @@ const { FormData, setFormDataState } = require('./formdata')
1111const { webidl } = require ( '../webidl' )
1212const assert = require ( 'node:assert' )
1313const { isErrored, isDisturbed } = require ( 'node:stream' )
14- const { isArrayBuffer } = require ( 'node:util/types' )
14+ const { isUint8Array } = require ( 'node:util/types' )
1515const { serializeAMimeType } = require ( './data-url' )
1616const { multipartFormDataParser } = require ( './formdata-parser' )
1717const { createDeferredPromise } = require ( '../../util/promise' )
@@ -45,6 +45,7 @@ const streamRegistry = new FinalizationRegistry((weakRef) => {
4545function extractBody ( object , keepalive = false ) {
4646 // 1. Let stream be null.
4747 let stream = null
48+ let controller = null
4849
4950 // 2. If object is a ReadableStream object, then set stream to object.
5051 if ( webidl . is . ReadableStream ( object ) ) {
@@ -57,16 +58,11 @@ function extractBody (object, keepalive = false) {
5758 // 4. Otherwise, set stream to a new ReadableStream object, and set
5859 // up stream with byte reading support.
5960 stream = new ReadableStream ( {
60- pull ( controller ) {
61- const buffer = typeof source === 'string' ? textEncoder . encode ( source ) : source
62-
63- if ( buffer . byteLength ) {
64- controller . enqueue ( buffer )
65- }
66-
67- queueMicrotask ( ( ) => readableStreamClose ( controller ) )
61+ pull ( ) { } ,
62+ start ( c ) {
63+ controller = c
6864 } ,
69- start ( ) { } ,
65+ cancel ( ) { } ,
7066 type : 'bytes'
7167 } )
7268 }
@@ -108,9 +104,8 @@ function extractBody (object, keepalive = false) {
108104 // Set type to `application/x-www-form-urlencoded;charset=UTF-8`.
109105 type = 'application/x-www-form-urlencoded;charset=UTF-8'
110106 } else if ( webidl . is . BufferSource ( object ) ) {
111- source = isArrayBuffer ( object )
112- ? new Uint8Array ( object . slice ( ) )
113- : new Uint8Array ( object . buffer . slice ( object . byteOffset , object . byteOffset + object . byteLength ) )
107+ // Set source to a copy of the bytes held by object.
108+ source = webidl . util . getCopyOfBytesHeldByBufferSource ( object )
114109 } else if ( webidl . is . FormData ( object ) ) {
115110 const boundary = `----formdata-undici-0${ `${ random ( 1e11 ) } ` . padStart ( 11 , '0' ) } `
116111 const prefix = `--${ boundary } \r\nContent-Disposition: form-data`
@@ -213,45 +208,36 @@ function extractBody (object, keepalive = false) {
213208
214209 // 11. If source is a byte sequence, then set action to a
215210 // step that returns source and length to sourceβs length.
216- if ( typeof source === 'string' || util . isBuffer ( source ) ) {
217- length = Buffer . byteLength ( source )
211+ if ( typeof source === 'string' || isUint8Array ( source ) ) {
212+ action = ( ) => {
213+ length = typeof source === 'string' ? Buffer . byteLength ( source ) : source . length
214+ return source
215+ }
218216 }
219217
220- // 12. If action is non-null, then run these steps in in parallel:
218+ // 12. If action is non-null, then run these steps in parallel:
221219 if ( action != null ) {
222- // Run action.
223- let iterator
224- stream = new ReadableStream ( {
225- start ( ) {
226- iterator = action ( object ) [ Symbol . asyncIterator ] ( )
227- } ,
228- pull ( controller ) {
229- return iterator . next ( ) . then ( ( { value, done } ) => {
230- if ( done ) {
231- // When running action is done, close stream.
232- queueMicrotask ( ( ) => {
233- controller . close ( )
234- controller . byobRequest ?. respond ( 0 )
235- } )
236- } else {
237- // Whenever one or more bytes are available and stream is not errored,
238- // enqueue a Uint8Array wrapping an ArrayBuffer containing the available
239- // bytes into stream.
240- if ( ! isErrored ( stream ) ) {
241- const buffer = new Uint8Array ( value )
242- if ( buffer . byteLength ) {
243- controller . enqueue ( buffer )
244- }
245- }
220+ ; ( async ( ) => {
221+ // 1. Run action.
222+ const result = action ( )
223+
224+ // 2. Whenever one or more bytes are available and stream is not errored,
225+ // enqueue the result of creating a Uint8Array from the available bytes into stream.
226+ const iterator = result ?. [ Symbol . asyncIterator ] ?. ( )
227+ if ( iterator ) {
228+ for await ( const bytes of iterator ) {
229+ if ( isErrored ( stream ) ) break
230+ if ( bytes . length ) {
231+ controller . enqueue ( new Uint8Array ( bytes ) )
246232 }
247- return controller . desiredSize > 0
248- } )
249- } ,
250- cancel ( reason ) {
251- return iterator . return ( )
252- } ,
253- type : 'bytes'
254- } )
233+ }
234+ } else if ( result ?. length && ! isErrored ( stream ) ) {
235+ controller . enqueue ( typeof result === 'string' ? textEncoder . encode ( result ) : new Uint8Array ( result ) )
236+ }
237+
238+ // 3. When running action is done, close stream.
239+ queueMicrotask ( ( ) => readableStreamClose ( controller ) )
240+ } ) ( )
255241 }
256242
257243 // 13. Let body be a body whose stream is stream, source is source,
0 commit comments