diff --git a/cli/bench/http/deno_http_flash_post_bin.js b/cli/bench/http/deno_http_flash_post_bin.js new file mode 100644 index 00000000000000..cea530e6039db0 --- /dev/null +++ b/cli/bench/http/deno_http_flash_post_bin.js @@ -0,0 +1,16 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +const addr = Deno.args[0] || "127.0.0.1:4500"; +const [hostname, port] = addr.split(":"); +const { serve } = Deno; + +async function handler(request) { + try { + const buffer = await request.arrayBuffer(); + return new Response(buffer.byteLength); + } catch (e) { + console.log(e); + } +} + +serve(handler, { hostname, port }); diff --git a/cli/bench/http/deno_http_flash_post_bin.lua b/cli/bench/http/deno_http_flash_post_bin.lua new file mode 100644 index 00000000000000..c8f5d3e3f7e9df --- /dev/null +++ b/cli/bench/http/deno_http_flash_post_bin.lua @@ -0,0 +1,5 @@ +wrk.method = "POST" +wrk.headers["Content-Type"] = "application/octet-stream" + +file = io.open("./cli/bench/testdata/128k.bin", "rb") +wrk.body = file:read("*a") \ No newline at end of file diff --git a/cli/bench/http/deno_post_bin.js b/cli/bench/http/deno_post_bin.js new file mode 100644 index 00000000000000..33ffeed1b05eb8 --- /dev/null +++ b/cli/bench/http/deno_post_bin.js @@ -0,0 +1,19 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +const addr = Deno.args[0] || "127.0.0.1:4500"; +const [hostname, port] = addr.split(":"); +const listener = Deno.listen({ hostname, port: Number(port) }); +console.log("Server listening on", addr); + +for await (const conn of listener) { + (async () => { + const requests = Deno.serveHttp(conn); + for await (const { respondWith, request } of requests) { + if (request.method == "POST") { + const buffer = await request.arrayBuffer(); + respondWith(new Response(buffer.byteLength)) + .catch((e) => console.log(e)); + } + } + })(); +} diff --git a/cli/bench/http/deno_post_bin.lua b/cli/bench/http/deno_post_bin.lua new file mode 100644 index 00000000000000..c8f5d3e3f7e9df --- /dev/null +++ b/cli/bench/http/deno_post_bin.lua @@ -0,0 +1,5 @@ +wrk.method = "POST" +wrk.headers["Content-Type"] = "application/octet-stream" + +file = io.open("./cli/bench/testdata/128k.bin", "rb") +wrk.body = file:read("*a") \ No newline at end of file diff --git a/cli/bench/http/node_post_bin.js b/cli/bench/http/node_post_bin.js new file mode 100644 index 00000000000000..d0f2d6667cddd4 --- /dev/null +++ b/cli/bench/http/node_post_bin.js @@ -0,0 +1,18 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +const http = require("http"); +const port = process.argv[2] || "4544"; +console.log("port", port); +http + .Server((req, res) => { + if (req.method == "POST") { + let chunks = []; + req.on("data", function (data) { + chunks.push(data); + }); + req.on("end", function () { + const buffer = Buffer.concat(chunks); + res.end(buffer.byteLength.toString()); + }); + } + }) + .listen(port); diff --git a/cli/bench/http/node_post_bin.lua b/cli/bench/http/node_post_bin.lua new file mode 100644 index 00000000000000..c8f5d3e3f7e9df --- /dev/null +++ b/cli/bench/http/node_post_bin.lua @@ -0,0 +1,5 @@ +wrk.method = "POST" +wrk.headers["Content-Type"] = "application/octet-stream" + +file = io.open("./cli/bench/testdata/128k.bin", "rb") +wrk.body = file:read("*a") \ No newline at end of file diff --git a/cli/tests/unit/fetch_test.ts b/cli/tests/unit/fetch_test.ts index 36c1926f2fb1bd..e2ff0d5e04b6d4 100644 --- a/cli/tests/unit/fetch_test.ts +++ b/cli/tests/unit/fetch_test.ts @@ -1789,3 +1789,19 @@ Deno.test( assertEquals(await res.text(), "ok"); }, ); + +Deno.test( + { permissions: { net: true } }, + async function fetchResponseStreamIsLockedWhileReading() { + const response = await fetch("http://localhost:4545/echo_server", { + body: new Uint8Array(5000), + method: "POST", + }); + + assertEquals(response.body!.locked, false); + const promise = response.arrayBuffer(); + assertEquals(response.body!.locked, true); + + await promise; + }, +); diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 3de93076e4ca8a..7bb16aecc51f75 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -2292,6 +2292,87 @@ Deno.test("upgradeHttp unix", { await Promise.all([server, client()]); }); +Deno.test( + { permissions: { net: true } }, + async function httpServerReadLargeBodyWithContentLength() { + const TLS_PACKET_SIZE = 16 * 1024 + 256; + // We want the body to be read in multiple packets + const body = "aa\n" + "deno.land large body\n".repeat(TLS_PACKET_SIZE) + + "zz"; + + let httpConn: Deno.HttpConn; + const promise = (async () => { + const listener = Deno.listen({ port: 4501 }); + const conn = await listener.accept(); + listener.close(); + httpConn = Deno.serveHttp(conn); + const reqEvent = await httpConn.nextRequest(); + assert(reqEvent); + const { request, respondWith } = reqEvent; + assertEquals(await request.text(), body); + await respondWith(new Response(body)); + })(); + + const resp = await fetch("http://127.0.0.1:4501/", { + method: "POST", + headers: { "connection": "close" }, + body, + }); + const text = await resp.text(); + assertEquals(text, body); + await promise; + + httpConn!.close(); + }, +); + +Deno.test( + { permissions: { net: true } }, + async function httpServerReadLargeBodyWithTransferChunked() { + const TLS_PACKET_SIZE = 16 * 1024 + 256; + + // We want the body to be read in multiple packets + const chunks = [ + "aa\n", + "deno.land large body\n".repeat(TLS_PACKET_SIZE), + "zz", + ]; + + const body = chunks.join(""); + + const stream = new TransformStream(); + const writer = stream.writable.getWriter(); + for (const chunk of chunks) { + writer.write(new TextEncoder().encode(chunk)); + } + writer.close(); + + let httpConn: Deno.HttpConn; + const promise = (async () => { + const listener = Deno.listen({ port: 4501 }); + const conn = await listener.accept(); + listener.close(); + httpConn = Deno.serveHttp(conn); + const reqEvent = await httpConn.nextRequest(); + assert(reqEvent); + const { request, respondWith } = reqEvent; + assertEquals(await request.text(), body); + await respondWith(new Response(body)); + })(); + + const resp = await fetch("http://127.0.0.1:4501/", { + method: "POST", + headers: { "connection": "close" }, + body: stream.readable, + }); + const text = await resp.text(); + assertEquals(text, body); + await promise; + + httpConn!.close(); + }, +); + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r); diff --git a/cli/tests/unit/response_test.ts b/cli/tests/unit/response_test.ts index c46218b627eec8..c2a2301383c4f1 100644 --- a/cli/tests/unit/response_test.ts +++ b/cli/tests/unit/response_test.ts @@ -90,3 +90,13 @@ Deno.test(function customInspectFunction() { ); assertStringIncludes(Deno.inspect(Response.prototype), "Response"); }); + +Deno.test(async function responseBodyUsed() { + const response = new Response("body"); + assert(!response.bodyUsed); + await response.text(); + assert(response.bodyUsed); + // .body getter is needed so we can test the faulty code path + response.body; + assert(response.bodyUsed); +}); diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index 02ecabc9cc4b13..a2391ea6d952ec 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -34,6 +34,7 @@ pub(crate) fn init_builtins() -> Extension { op_add::decl(), // // TODO(@AaronO): track IO metrics for builtin streams op_read::decl(), + op_read_all::decl(), op_write::decl(), op_shutdown::decl(), op_metrics::decl(), @@ -168,6 +169,28 @@ async fn op_read( resource.read(buf).await.map(|n| n as u32) } +#[op] +async fn op_read_all( + state: Rc>, + rid: ResourceId, + mut size: usize, +) -> Result { + if size == 0 { + size = 64 * 1024; + } + + let mut buffer = Vec::with_capacity(size); + loop { + let resource = state.borrow().resource_table.get_any(rid)?; + let tmp = ZeroCopyBuf::new_temp(vec![0u8; 64 * 1024]); + let (nread, tmp) = resource.clone().read_return(tmp).await?; + if nread == 0 { + return Ok(buffer.into()); + } + buffer.extend_from_slice(&tmp[..nread]); + } +} + #[op] async fn op_write( state: Rc>, diff --git a/ext/fetch/22_body.js b/ext/fetch/22_body.js index 97a8a8db157742..5dcd28a9d04b97 100644 --- a/ext/fetch/22_body.js +++ b/ext/fetch/22_body.js @@ -26,8 +26,12 @@ const mimesniff = globalThis.__bootstrap.mimesniff; const { BlobPrototype } = globalThis.__bootstrap.file; const { + _disturbed, isReadableStreamDisturbed, + readableStreamClose, errorReadableStream, + readableStreamClose, + readableStreamDisturb, createProxy, ReadableStreamPrototype, } = globalThis.__bootstrap.streams; @@ -65,11 +69,16 @@ class InnerBody { #knownExactLength = null; + #rid = null; + #op = null; + #cloned = false; + #closeRid = false; + #terminator = null; /** * @param {ReadableStream | { body: Uint8Array | string, consumed: boolean }} stream */ - constructor(stream, knownExactLength) { + constructor(stream, opts) { /** @type {ReadableStream | { body: Uint8Array | string, consumed: boolean }} */ this.streamOrStatic = stream ?? { body: new Uint8Array(), consumed: false }; @@ -78,7 +87,12 @@ /** @type {null | number} */ this.length = null; - this.#knownExactLength = knownExactLength; + // used for fast paths when possible + this.#knownExactLength = opts?.knownExactLength; + this.#rid = opts?.rid; + this.#op = opts?.op; // to be called with (rid, buf) + this.#closeRid = opts?.closeRid; + this.#terminator = opts?.terminator; } get stream() { @@ -92,6 +106,8 @@ if (consumed) { this.streamOrStatic = new ReadableStream(); this.streamOrStatic.getReader(); + readableStreamDisturb(this.streamOrStatic); + readableStreamClose(this.streamOrStatic); } else { this.streamOrStatic = new ReadableStream({ start(controller) { @@ -149,6 +165,43 @@ ) ) { const reader = this.stream.getReader(); + + if (this.#op && !this.#cloned) { + // fast path, read whole body in a single op call + // op must have following signature: (rid, usize = 0): Uint8Array + + const closeStream = (err) => { + if (this.#closeRid) { + core.tryClose(this.#rid); + } + if (err) { + throw err; + } + readableStreamClose(this.stream); + }; + + try { + // We need to mimic stream behavior, set to disturbed + this.stream[_disturbed] = true; + const buf = await core.opAsync( + this.#op, + this.#rid, + this.#knownExactLength || 0, + ); + if (this.#terminator?.aborted) { + throw this.#terminator.reason; + } + + closeStream(); + + return buf; + } catch (err) { + closeStream(this.#terminator?.reason || err); + throw err; + } + } + + // slow path /** @type {Uint8Array[]} */ const chunks = []; @@ -218,6 +271,7 @@ * @returns {InnerBody} */ clone() { + this.#cloned = true; const [out1, out2] = this.stream.tee(); this.streamOrStatic = out1; const second = new InnerBody(out2, this.#knownExactLength); diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 3e90429ce436c2..2bf1469a0c3869 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -338,7 +338,13 @@ } else { response.body = new InnerBody( createResponseBodyStream(resp.responseRid, terminator), - resp.contentLength, + { + knownExactLength: resp.contentLength, + rid: resp.responseRid, + op: "op_read_all", + closeRid: true, + terminator, + }, ); } } diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 63023a29649e72..04fc548b537b03 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -112,7 +112,7 @@ return null; } - const [streamRid, method, url] = nextRequest; + const [streamRid, method, url, contentLength] = nextRequest; SetPrototypeAdd(this.managedResources, streamRid); /** @type {ReadableStream | undefined} */ @@ -128,7 +128,16 @@ () => method, url, () => ops.op_http_headers(streamRid), - body !== null ? new InnerBody(body) : null, + body !== null + ? new InnerBody( + body, + { + knownExactLength: contentLength, + rid: streamRid, + op: "op_http_read_all", + }, + ) + : null, false, ); const signal = abortSignal.newSignal(); diff --git a/ext/http/lib.rs b/ext/http/lib.rs index d1b38fb42dbb71..dd26588e6c157b 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -79,6 +79,7 @@ pub fn init() -> Extension { .ops(vec![ op_http_accept::decl(), op_http_read::decl(), + op_http_read_all::decl(), op_http_write_headers::decl(), op_http_headers::decl(), op_http_write::decl(), @@ -165,7 +166,8 @@ impl HttpConnResource { // Accepts a new incoming HTTP request. async fn accept( self: &Rc, - ) -> Result, AnyError> { + ) -> Result)>, AnyError> + { let fut = async { let (request_tx, request_rx) = oneshot::channel(); let (response_tx, response_rx) = oneshot::channel(); @@ -187,11 +189,26 @@ impl HttpConnResource { .unwrap_or(Encoding::Identity) }; + // get content-length if content-encoding is empty + let content_length = if request + .headers() + .get(hyper::header::CONTENT_ENCODING) + .is_none() + { + request + .headers() + .get(hyper::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + } else { + None + }; + let method = request.method().to_string(); let url = req_url(&request, self.scheme, &self.addr); let stream = HttpStreamResource::new(self, request, response_tx, accept_encoding); - Some((stream, method, url)) + Some((stream, method, url, content_length)) }; async { @@ -378,6 +395,8 @@ struct NextRequestResponse( String, // url: String, + // Content Length + Option, ); #[op] @@ -388,10 +407,10 @@ async fn op_http_accept( let conn = state.borrow().resource_table.get::(rid)?; match conn.accept().await { - Ok(Some((stream, method, url))) => { + Ok(Some((stream, method, url, content_length))) => { let stream_rid = state.borrow_mut().resource_table.add_rc(Rc::new(stream)); - let r = NextRequestResponse(stream_rid, method, url); + let r = NextRequestResponse(stream_rid, method, url, content_length); Ok(Some(r)) } Ok(None) => Ok(None), @@ -865,6 +884,58 @@ async fn op_http_read( fut.try_or_cancel(cancel_handle).await } +#[op] +async fn op_http_read_all( + state: Rc>, + rid: ResourceId, + mut size: usize, +) -> Result { + let stream = state + .borrow_mut() + .resource_table + .get::(rid)?; + let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(_, body) => break body, + HttpRequestReader::Closed => return Err(http_error("request is closed")), + } + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let (parts, body) = request.into_parts(); + *rd = HttpRequestReader::Body(parts.headers, body.peekable()); + } + _ => unreachable!(), + }; + }; + + if size == 0 { + size = 64 * 1024; + } + + let mut buffer = Vec::with_capacity(size); + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + buffer.extend_from_slice(&chunk.split_to(chunk.len())); + } + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(buffer.into()), + } + } + }; + + let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await +} + #[op] fn op_http_websocket_accept_header(key: String) -> Result { let digest = ring::digest::digest( diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index bd1714964ac03f..c5c9ca3ab5d850 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -1153,6 +1153,15 @@ reader[_closedPromise].resolve(undefined); } + /** + * @template R + * @param {ReadableStream} stream + * @returns {void} + */ + function readableStreamDisturb(stream) { + stream[_disturbed] = true; + } + /** @param {ReadableStreamDefaultController} controller */ function readableStreamDefaultControllerCallPullIfNeeded(controller) { const shouldPull = readableStreamDefaultcontrollerShouldCallPull( @@ -5904,12 +5913,14 @@ window.__bootstrap.streams = { // Non-Public + _disturbed, _state, isReadableStreamDisturbed, errorReadableStream, createProxy, writableStreamClose, readableStreamClose, + readableStreamDisturb, readableStreamForRid, getReadableStreamRid, Deferred,