Skip to content

Commit 07cbec4

Browse files
authored
fix(ext/node): handle 'upgrade' responses (#19412)
This commit adds support for "upgrade" events in "node:http" "ClientRequest". Currently only "Websocket" upgrades are handled. Thanks to this change package like "npm:puppeteer" and "npm:discord" should work. Closes #18913 Closes #17847
1 parent 44bd59c commit 07cbec4

File tree

4 files changed

+298
-19
lines changed

4 files changed

+298
-19
lines changed

cli/tests/unit_node/http_test.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,3 +649,53 @@ Deno.test("[node/http] HTTPS server", async () => {
649649
await Promise.all([promise, promise2]);
650650
client.close();
651651
});
652+
653+
Deno.test(
654+
"[node/http] client upgrade",
655+
{ permissions: { net: true } },
656+
async () => {
657+
const promise = deferred();
658+
const server = http.createServer((_req, res) => {
659+
res.writeHead(200, { "Content-Type": "text/plain" });
660+
res.end("okay");
661+
});
662+
// @ts-ignore it's a socket for real
663+
let serverSocket;
664+
server.on("upgrade", (_req, socket, _head) => {
665+
socket.write(
666+
"HTTP/1.1 101 Web Socket Protocol Handshake\r\n" +
667+
"Upgrade: WebSocket\r\n" +
668+
"Connection: Upgrade\r\n" +
669+
"\r\n",
670+
);
671+
serverSocket = socket;
672+
});
673+
674+
// Now that server is running
675+
server.listen(1337, "127.0.0.1", () => {
676+
// make a request
677+
const options = {
678+
port: 1337,
679+
host: "127.0.0.1",
680+
headers: {
681+
"Connection": "Upgrade",
682+
"Upgrade": "websocket",
683+
},
684+
};
685+
686+
const req = http.request(options);
687+
req.end();
688+
689+
req.on("upgrade", (_res, socket, _upgradeHead) => {
690+
socket.end();
691+
// @ts-ignore it's a socket for real
692+
serverSocket!.end();
693+
server.close(() => {
694+
promise.resolve();
695+
});
696+
});
697+
});
698+
699+
await promise;
700+
},
701+
);

ext/fetch/26_fetch.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) {
8686
* @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>}
8787
*/
8888
function opFetchSend(rid) {
89-
return core.opAsync("op_fetch_send", rid);
89+
return core.opAsync("op_fetch_send", rid, true);
9090
}
9191

9292
/**

ext/fetch/lib.rs

Lines changed: 178 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use deno_core::op;
2323
use deno_core::BufView;
2424
use deno_core::WriteOutcome;
2525

26+
use deno_core::task::spawn;
2627
use deno_core::url::Url;
2728
use deno_core::AsyncRefCell;
2829
use deno_core::AsyncResult;
@@ -58,6 +59,8 @@ use reqwest::RequestBuilder;
5859
use reqwest::Response;
5960
use serde::Deserialize;
6061
use serde::Serialize;
62+
use tokio::io::AsyncReadExt;
63+
use tokio::io::AsyncWriteExt;
6164
use tokio::sync::mpsc;
6265

6366
// Re-export reqwest and data_url
@@ -109,6 +112,8 @@ deno_core::extension!(deno_fetch,
109112
ops = [
110113
op_fetch<FP>,
111114
op_fetch_send,
115+
op_fetch_response_into_byte_stream,
116+
op_fetch_response_upgrade,
112117
op_fetch_custom_client<FP>,
113118
],
114119
esm = [
@@ -414,12 +419,15 @@ pub struct FetchResponse {
414419
pub url: String,
415420
pub response_rid: ResourceId,
416421
pub content_length: Option<u64>,
422+
pub remote_addr_ip: Option<String>,
423+
pub remote_addr_port: Option<u16>,
417424
}
418425

419426
#[op]
420427
pub async fn op_fetch_send(
421428
state: Rc<RefCell<OpState>>,
422429
rid: ResourceId,
430+
into_byte_stream: bool,
423431
) -> Result<FetchResponse, AnyError> {
424432
let request = state
425433
.borrow_mut()
@@ -436,7 +444,6 @@ pub async fn op_fetch_send(
436444
Err(_) => return Err(type_error("request was cancelled")),
437445
};
438446

439-
//debug!("Fetch response {}", url);
440447
let status = res.status();
441448
let url = res.url().to_string();
442449
let mut res_headers = Vec::new();
@@ -445,29 +452,175 @@ pub async fn op_fetch_send(
445452
}
446453

447454
let content_length = res.content_length();
455+
let remote_addr = res.remote_addr();
456+
let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr {
457+
(Some(addr.ip().to_string()), Some(addr.port()))
458+
} else {
459+
(None, None)
460+
};
448461

449-
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
450-
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
451-
}));
452-
let rid = state
453-
.borrow_mut()
454-
.resource_table
455-
.add(FetchResponseBodyResource {
456-
reader: AsyncRefCell::new(stream.peekable()),
457-
cancel: CancelHandle::default(),
458-
size: content_length,
459-
});
462+
let response_rid = if !into_byte_stream {
463+
state
464+
.borrow_mut()
465+
.resource_table
466+
.add(FetchResponseResource {
467+
response: res,
468+
size: content_length,
469+
})
470+
} else {
471+
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
472+
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
473+
}));
474+
state
475+
.borrow_mut()
476+
.resource_table
477+
.add(FetchResponseBodyResource {
478+
reader: AsyncRefCell::new(stream.peekable()),
479+
cancel: CancelHandle::default(),
480+
size: content_length,
481+
})
482+
};
460483

461484
Ok(FetchResponse {
462485
status: status.as_u16(),
463486
status_text: status.canonical_reason().unwrap_or("").to_string(),
464487
headers: res_headers,
465488
url,
466-
response_rid: rid,
489+
response_rid,
467490
content_length,
491+
remote_addr_ip,
492+
remote_addr_port,
468493
})
469494
}
470495

496+
#[op]
497+
pub fn op_fetch_response_into_byte_stream(
498+
state: &mut OpState,
499+
rid: ResourceId,
500+
) -> Result<ResourceId, AnyError> {
501+
let raw_response = state.resource_table.take::<FetchResponseResource>(rid)?;
502+
let raw_response = Rc::try_unwrap(raw_response)
503+
.expect("Someone is holding onto FetchResponseResource");
504+
let stream: BytesStream =
505+
Box::pin(raw_response.response.bytes_stream().map(|r| {
506+
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
507+
}));
508+
509+
let rid = state.resource_table.add(FetchResponseBodyResource {
510+
reader: AsyncRefCell::new(stream.peekable()),
511+
cancel: CancelHandle::default(),
512+
size: raw_response.size,
513+
});
514+
515+
Ok(rid)
516+
}
517+
518+
#[op]
519+
pub async fn op_fetch_response_upgrade(
520+
state: Rc<RefCell<OpState>>,
521+
rid: ResourceId,
522+
) -> Result<ResourceId, AnyError> {
523+
let raw_response = state
524+
.borrow_mut()
525+
.resource_table
526+
.take::<FetchResponseResource>(rid)?;
527+
let raw_response = Rc::try_unwrap(raw_response)
528+
.expect("Someone is holding onto FetchResponseResource");
529+
530+
let (read, write) = tokio::io::duplex(1024);
531+
let (read_rx, write_tx) = tokio::io::split(read);
532+
let (mut write_rx, mut read_tx) = tokio::io::split(write);
533+
let upgraded = raw_response.response.upgrade().await?;
534+
{
535+
// Stage 3: Pump the data
536+
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
537+
538+
spawn(async move {
539+
let mut buf = [0; 1024];
540+
loop {
541+
let read = upgraded_rx.read(&mut buf).await?;
542+
if read == 0 {
543+
break;
544+
}
545+
read_tx.write_all(&buf[..read]).await?;
546+
}
547+
Ok::<_, AnyError>(())
548+
});
549+
spawn(async move {
550+
let mut buf = [0; 1024];
551+
loop {
552+
let read = write_rx.read(&mut buf).await?;
553+
if read == 0 {
554+
break;
555+
}
556+
upgraded_tx.write_all(&buf[..read]).await?;
557+
}
558+
Ok::<_, AnyError>(())
559+
});
560+
}
561+
562+
Ok(
563+
state
564+
.borrow_mut()
565+
.resource_table
566+
.add(UpgradeStream::new(read_rx, write_tx)),
567+
)
568+
}
569+
570+
struct UpgradeStream {
571+
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
572+
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
573+
cancel_handle: CancelHandle,
574+
}
575+
576+
impl UpgradeStream {
577+
pub fn new(
578+
read: tokio::io::ReadHalf<tokio::io::DuplexStream>,
579+
write: tokio::io::WriteHalf<tokio::io::DuplexStream>,
580+
) -> Self {
581+
Self {
582+
read: AsyncRefCell::new(read),
583+
write: AsyncRefCell::new(write),
584+
cancel_handle: CancelHandle::new(),
585+
}
586+
}
587+
588+
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
589+
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
590+
async {
591+
let read = RcRef::map(self, |this| &this.read);
592+
let mut read = read.borrow_mut().await;
593+
Ok(Pin::new(&mut *read).read(buf).await?)
594+
}
595+
.try_or_cancel(cancel_handle)
596+
.await
597+
}
598+
599+
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
600+
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
601+
async {
602+
let write = RcRef::map(self, |this| &this.write);
603+
let mut write = write.borrow_mut().await;
604+
Ok(Pin::new(&mut *write).write(buf).await?)
605+
}
606+
.try_or_cancel(cancel_handle)
607+
.await
608+
}
609+
}
610+
611+
impl Resource for UpgradeStream {
612+
fn name(&self) -> Cow<str> {
613+
"fetchUpgradedStream".into()
614+
}
615+
616+
deno_core::impl_readable_byob!();
617+
deno_core::impl_writable!();
618+
619+
fn close(self: Rc<Self>) {
620+
self.cancel_handle.cancel();
621+
}
622+
}
623+
471624
type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>;
472625

473626
pub struct FetchRequestResource(
@@ -545,6 +698,18 @@ impl Resource for FetchRequestBodyResource {
545698
type BytesStream =
546699
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
547700

701+
#[derive(Debug)]
702+
pub struct FetchResponseResource {
703+
pub response: Response,
704+
pub size: Option<u64>,
705+
}
706+
707+
impl Resource for FetchResponseResource {
708+
fn name(&self) -> Cow<str> {
709+
"fetchResponse".into()
710+
}
711+
}
712+
548713
pub struct FetchResponseBodyResource {
549714
pub reader: AsyncRefCell<Peekable<BytesStream>>,
550715
pub cancel: CancelHandle,

0 commit comments

Comments
 (0)