@@ -112,7 +112,6 @@ deno_core::extension!(deno_fetch,
112112 ops = [
113113 op_fetch<FP >,
114114 op_fetch_send,
115- op_fetch_response_into_byte_stream,
116115 op_fetch_response_upgrade,
117116 op_fetch_custom_client<FP >,
118117 ] ,
@@ -427,7 +426,6 @@ pub struct FetchResponse {
427426pub async fn op_fetch_send (
428427 state : Rc < RefCell < OpState > > ,
429428 rid : ResourceId ,
430- into_byte_stream : bool ,
431429) -> Result < FetchResponse , AnyError > {
432430 let request = state
433431 . borrow_mut ( )
@@ -459,27 +457,10 @@ pub async fn op_fetch_send(
459457 ( None , None )
460458 } ;
461459
462- let response_rid = if !into_byte_stream {
463- state
464- . borrow_mut ( )
465- . resource_table
466- . add ( FetchResponseResource {
467- response : res,
468- size : content_length,
469- } )
470- } else {
471- let stream: BytesStream = Box :: pin ( res. bytes_stream ( ) . map ( |r| {
472- r. map_err ( |err| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , err) )
473- } ) ) ;
474- state
475- . borrow_mut ( )
476- . resource_table
477- . add ( FetchResponseBodyResource {
478- reader : AsyncRefCell :: new ( stream. peekable ( ) ) ,
479- cancel : CancelHandle :: default ( ) ,
480- size : content_length,
481- } )
482- } ;
460+ let response_rid = state
461+ . borrow_mut ( )
462+ . resource_table
463+ . add ( FetchResponseResource :: new ( res, content_length) ) ;
483464
484465 Ok ( FetchResponse {
485466 status : status. as_u16 ( ) ,
@@ -493,28 +474,6 @@ pub async fn op_fetch_send(
493474 } )
494475}
495476
496- #[ op]
497- pub fn op_fetch_response_into_byte_stream (
498- state : & mut OpState ,
499- rid : ResourceId ,
500- ) -> Result < ResourceId , AnyError > {
501- let raw_response = state. resource_table . take :: < FetchResponseResource > ( rid) ?;
502- let raw_response = Rc :: try_unwrap ( raw_response)
503- . expect ( "Someone is holding onto FetchResponseResource" ) ;
504- let stream: BytesStream =
505- Box :: pin ( raw_response. response . bytes_stream ( ) . map ( |r| {
506- r. map_err ( |err| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , err) )
507- } ) ) ;
508-
509- let rid = state. resource_table . add ( FetchResponseBodyResource {
510- reader : AsyncRefCell :: new ( stream. peekable ( ) ) ,
511- cancel : CancelHandle :: default ( ) ,
512- size : raw_response. size ,
513- } ) ;
514-
515- Ok ( rid)
516- }
517-
518477#[ op]
519478pub async fn op_fetch_response_upgrade (
520479 state : Rc < RefCell < OpState > > ,
@@ -530,7 +489,7 @@ pub async fn op_fetch_response_upgrade(
530489 let ( read, write) = tokio:: io:: duplex ( 1024 ) ;
531490 let ( read_rx, write_tx) = tokio:: io:: split ( read) ;
532491 let ( mut write_rx, mut read_tx) = tokio:: io:: split ( write) ;
533- let upgraded = raw_response. response . upgrade ( ) . await ?;
492+ let upgraded = raw_response. upgrade ( ) . await ?;
534493 {
535494 // Stage 3: Pump the data
536495 let ( mut upgraded_rx, mut upgraded_tx) = tokio:: io:: split ( upgraded) ;
@@ -698,35 +657,72 @@ impl Resource for FetchRequestBodyResource {
698657type BytesStream =
699658 Pin < Box < dyn Stream < Item = Result < bytes:: Bytes , std:: io:: Error > > + Unpin > > ;
700659
660+ pub enum FetchResponseReader {
661+ Start ( Response ) ,
662+ BodyReader ( Peekable < BytesStream > ) ,
663+ }
664+
665+ impl Default for FetchResponseReader {
666+ fn default ( ) -> Self {
667+ let stream: BytesStream = Box :: pin ( deno_core:: futures:: stream:: empty ( ) ) ;
668+ Self :: BodyReader ( stream. peekable ( ) )
669+ }
670+ }
701671#[ derive( Debug ) ]
702672pub struct FetchResponseResource {
703- pub response : Response ,
673+ pub response_reader : AsyncRefCell < FetchResponseReader > ,
674+ pub cancel : CancelHandle ,
704675 pub size : Option < u64 > ,
705676}
706677
707- impl Resource for FetchResponseResource {
708- fn name ( & self ) -> Cow < str > {
709- "fetchResponse" . into ( )
678+ impl FetchResponseResource {
679+ pub fn new ( response : Response , size : Option < u64 > ) -> Self {
680+ Self {
681+ response_reader : AsyncRefCell :: new ( FetchResponseReader :: Start ( response) ) ,
682+ cancel : CancelHandle :: default ( ) ,
683+ size,
684+ }
710685 }
711- }
712686
713- pub struct FetchResponseBodyResource {
714- pub reader : AsyncRefCell < Peekable < BytesStream > > ,
715- pub cancel : CancelHandle ,
716- pub size : Option < u64 > ,
687+ pub async fn upgrade ( self ) -> Result < reqwest:: Upgraded , AnyError > {
688+ let reader = self . response_reader . into_inner ( ) ;
689+ match reader {
690+ FetchResponseReader :: Start ( resp) => Ok ( resp. upgrade ( ) . await ?) ,
691+ _ => unreachable ! ( ) ,
692+ }
693+ }
717694}
718695
719- impl Resource for FetchResponseBodyResource {
696+ impl Resource for FetchResponseResource {
720697 fn name ( & self ) -> Cow < str > {
721- "fetchResponseBody " . into ( )
698+ "fetchResponse " . into ( )
722699 }
723700
724701 fn read ( self : Rc < Self > , limit : usize ) -> AsyncResult < BufView > {
725702 Box :: pin ( async move {
726- let reader = RcRef :: map ( & self , |r| & r. reader ) . borrow_mut ( ) . await ;
703+ let mut reader =
704+ RcRef :: map ( & self , |r| & r. response_reader ) . borrow_mut ( ) . await ;
727705
706+ let body = loop {
707+ match & mut * reader {
708+ FetchResponseReader :: BodyReader ( reader) => break reader,
709+ FetchResponseReader :: Start ( _) => { }
710+ }
711+
712+ match std:: mem:: take ( & mut * reader) {
713+ FetchResponseReader :: Start ( resp) => {
714+ let stream: BytesStream = Box :: pin ( resp. bytes_stream ( ) . map ( |r| {
715+ r. map_err ( |err| {
716+ std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , err)
717+ } )
718+ } ) ) ;
719+ * reader = FetchResponseReader :: BodyReader ( stream. peekable ( ) ) ;
720+ }
721+ FetchResponseReader :: BodyReader ( _) => unreachable ! ( ) ,
722+ }
723+ } ;
728724 let fut = async move {
729- let mut reader = Pin :: new ( reader ) ;
725+ let mut reader = Pin :: new ( body ) ;
730726 loop {
731727 match reader. as_mut ( ) . peek_mut ( ) . await {
732728 Some ( Ok ( chunk) ) if !chunk. is_empty ( ) => {
0 commit comments