Skip to content

Commit

Permalink
perf(ext/fetch): consume body using ops (denoland#16038)
Browse files Browse the repository at this point in the history
This commit adds a fast path to `Request` and `Response` that
make consuming request bodies much faster when using `Body#text`,
`Body#arrayBuffer`, and `Body#blob`, if the body is a FastStream.
Because the response bodies for `fetch` are FastStream, this speeds up
consuming `fetch` response bodies significantly.
  • Loading branch information
marcosc90 authored Oct 4, 2022
1 parent 0b4a6c4 commit 569287b
Show file tree
Hide file tree
Showing 16 changed files with 291 additions and 109 deletions.
16 changes: 16 additions & 0 deletions cli/bench/http/deno_http_flash_post_bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const { serve } = Deno;

async function handler(request) {
try {
const buffer = await request.arrayBuffer();
return new Response(buffer.byteLength);
} catch (e) {
console.log(e);
}
}

serve(handler, { hostname, port });
5 changes: 5 additions & 0 deletions cli/bench/http/deno_http_flash_post_bin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"

file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")
19 changes: 19 additions & 0 deletions cli/bench/http/deno_post_bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const listener = Deno.listen({ hostname, port: Number(port) });
console.log("Server listening on", addr);

for await (const conn of listener) {
(async () => {
const requests = Deno.serveHttp(conn);
for await (const { respondWith, request } of requests) {
if (request.method == "POST") {
const buffer = await request.arrayBuffer();
respondWith(new Response(buffer.byteLength))
.catch((e) => console.log(e));
}
}
})();
}
5 changes: 5 additions & 0 deletions cli/bench/http/deno_post_bin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"

file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")
18 changes: 18 additions & 0 deletions cli/bench/http/node_post_bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
const http = require("http");
const port = process.argv[2] || "4544";
console.log("port", port);
http
.Server((req, res) => {
if (req.method == "POST") {
let chunks = [];
req.on("data", function (data) {
chunks.push(data);
});
req.on("end", function () {
const buffer = Buffer.concat(chunks);
res.end(buffer.byteLength.toString());
});
}
})
.listen(port);
5 changes: 5 additions & 0 deletions cli/bench/http/node_post_bin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"

file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")
16 changes: 16 additions & 0 deletions cli/tests/unit/fetch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1789,3 +1789,19 @@ Deno.test(
assertEquals(await res.text(), "ok");
},
);

Deno.test(
{ permissions: { net: true } },
async function fetchResponseStreamIsLockedWhileReading() {
const response = await fetch("http://localhost:4545/echo_server", {
body: new Uint8Array(5000),
method: "POST",
});

assertEquals(response.body!.locked, false);
const promise = response.arrayBuffer();
assertEquals(response.body!.locked, true);

await promise;
},
);
81 changes: 81 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2292,6 +2292,87 @@ Deno.test("upgradeHttp unix", {
await Promise.all([server, client()]);
});

Deno.test(
{ permissions: { net: true } },
async function httpServerReadLargeBodyWithContentLength() {
const TLS_PACKET_SIZE = 16 * 1024 + 256;
// We want the body to be read in multiple packets
const body = "aa\n" + "deno.land large body\n".repeat(TLS_PACKET_SIZE) +
"zz";

let httpConn: Deno.HttpConn;
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
listener.close();
httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
assertEquals(await request.text(), body);
await respondWith(new Response(body));
})();

const resp = await fetch("http://127.0.0.1:4501/", {
method: "POST",
headers: { "connection": "close" },
body,
});
const text = await resp.text();
assertEquals(text, body);
await promise;

httpConn!.close();
},
);

Deno.test(
{ permissions: { net: true } },
async function httpServerReadLargeBodyWithTransferChunked() {
const TLS_PACKET_SIZE = 16 * 1024 + 256;

// We want the body to be read in multiple packets
const chunks = [
"aa\n",
"deno.land large body\n".repeat(TLS_PACKET_SIZE),
"zz",
];

const body = chunks.join("");

const stream = new TransformStream();
const writer = stream.writable.getWriter();
for (const chunk of chunks) {
writer.write(new TextEncoder().encode(chunk));
}
writer.close();

let httpConn: Deno.HttpConn;
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
listener.close();
httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
assertEquals(await request.text(), body);
await respondWith(new Response(body));
})();

const resp = await fetch("http://127.0.0.1:4501/", {
method: "POST",
headers: { "connection": "close" },
body: stream.readable,
});
const text = await resp.text();
assertEquals(text, body);
await promise;

httpConn!.close();
},
);

function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
Expand Down
21 changes: 21 additions & 0 deletions core/ops_builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(crate) fn init_builtins() -> Extension {
op_add::decl(),
// // TODO(@AaronO): track IO metrics for builtin streams
op_read::decl(),
op_read_all::decl(),
op_write::decl(),
op_shutdown::decl(),
op_metrics::decl(),
Expand Down Expand Up @@ -168,6 +169,26 @@ async fn op_read(
resource.read_return(buf).await.map(|(n, _)| n as u32)
}

#[op]
async fn op_read_all(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<ZeroCopyBuf, Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
let (min, maximum) = resource.size_hint();
let size = maximum.unwrap_or(min) as usize;

let mut buffer = Vec::with_capacity(size);
loop {
let tmp = ZeroCopyBuf::new_temp(vec![0u8; 64 * 1024]);
let (nread, tmp) = resource.clone().read_return(tmp).await?;
if nread == 0 {
return Ok(buffer.into());
}
buffer.extend_from_slice(&tmp[..nread]);
}
}

#[op]
async fn op_write(
state: Rc<RefCell<OpState>>,
Expand Down
4 changes: 4 additions & 0 deletions core/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub trait Resource: Any + 'static {
fn backing_fd(self: Rc<Self>) -> Option<std::os::unix::prelude::RawFd> {
None
}

fn size_hint(&self) -> (u64, Option<u64>) {
(0, None)
}
}

impl dyn Resource {
Expand Down
48 changes: 5 additions & 43 deletions ext/fetch/22_body.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,18 @@
errorReadableStream,
readableStreamClose,
readableStreamDisturb,
readableStreamCollectIntoUint8Array,
createProxy,
ReadableStreamPrototype,
} = globalThis.__bootstrap.streams;
const {
ArrayBufferPrototype,
ArrayBufferIsView,
ArrayPrototypePush,
ArrayPrototypeMap,
JSONParse,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
TypeError,
Uint8Array,
Expand All @@ -66,21 +65,17 @@
}

class InnerBody {
#knownExactLength = null;

/**
* @param {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} stream
*/
constructor(stream, knownExactLength) {
constructor(stream) {
/** @type {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} */
this.streamOrStatic = stream ??
{ body: new Uint8Array(), consumed: false };
/** @type {null | Uint8Array | string | Blob | FormData} */
this.source = null;
/** @type {null | number} */
this.length = null;

this.#knownExactLength = knownExactLength;
}

get stream() {
Expand Down Expand Up @@ -144,48 +139,15 @@
* https://fetch.spec.whatwg.org/#concept-body-consume-body
* @returns {Promise<Uint8Array>}
*/
async consume() {
consume() {
if (this.unusable()) throw new TypeError("Body already consumed.");
if (
ObjectPrototypeIsPrototypeOf(
ReadableStreamPrototype,
this.streamOrStatic,
)
) {
const reader = this.stream.getReader();
/** @type {Uint8Array[]} */
const chunks = [];

let finalBuffer = this.#knownExactLength
? new Uint8Array(this.#knownExactLength)
: null;

let totalLength = 0;
while (true) {
const { value: chunk, done } = await reader.read();
if (done) break;

if (finalBuffer) {
// fast path, content-length is present
TypedArrayPrototypeSet(finalBuffer, chunk, totalLength);
} else {
// slow path, content-length is not present
ArrayPrototypePush(chunks, chunk);
}
totalLength += chunk.byteLength;
}

if (finalBuffer) {
return finalBuffer;
}

finalBuffer = new Uint8Array(totalLength);
let i = 0;
for (const chunk of chunks) {
TypedArrayPrototypeSet(finalBuffer, chunk, i);
i += chunk.byteLength;
}
return finalBuffer;
return readableStreamCollectIntoUint8Array(this.stream);
} else {
this.streamOrStatic.consumed = true;
return this.streamOrStatic.body;
Expand Down Expand Up @@ -224,7 +186,7 @@
clone() {
const [out1, out2] = this.stream.tee();
this.streamOrStatic = out1;
const second = new InnerBody(out2, this.#knownExactLength);
const second = new InnerBody(out2);
second.source = core.deserialize(core.serialize(this.source));
second.length = this.length;
return second;
Expand Down
Loading

0 comments on commit 569287b

Please sign in to comment.