Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ext/fetch): consume body using ops #16038

Merged
merged 11 commits into from
Oct 4, 2022
16 changes: 16 additions & 0 deletions cli/bench/http/deno_http_flash_post_bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const { serve } = Deno;

async function handler(request) {
try {
const buffer = await request.arrayBuffer();
return new Response(buffer.byteLength);
} catch (e) {
console.log(e);
}
}

serve(handler, { hostname, port });
5 changes: 5 additions & 0 deletions cli/bench/http/deno_http_flash_post_bin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"

file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")
19 changes: 19 additions & 0 deletions cli/bench/http/deno_post_bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const listener = Deno.listen({ hostname, port: Number(port) });
console.log("Server listening on", addr);

for await (const conn of listener) {
(async () => {
const requests = Deno.serveHttp(conn);
for await (const { respondWith, request } of requests) {
if (request.method == "POST") {
const buffer = await request.arrayBuffer();
respondWith(new Response(buffer.byteLength))
.catch((e) => console.log(e));
}
}
})();
}
5 changes: 5 additions & 0 deletions cli/bench/http/deno_post_bin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"

file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")
18 changes: 18 additions & 0 deletions cli/bench/http/node_post_bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
const http = require("http");
const port = process.argv[2] || "4544";
console.log("port", port);
http
.Server((req, res) => {
if (req.method == "POST") {
let chunks = [];
req.on("data", function (data) {
chunks.push(data);
});
req.on("end", function () {
const buffer = Buffer.concat(chunks);
res.end(buffer.byteLength.toString());
});
}
})
.listen(port);
5 changes: 5 additions & 0 deletions cli/bench/http/node_post_bin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"

file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")
16 changes: 16 additions & 0 deletions cli/tests/unit/fetch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1789,3 +1789,19 @@ Deno.test(
assertEquals(await res.text(), "ok");
},
);

Deno.test(
{ permissions: { net: true } },
async function fetchResponseStreamIsLockedWhileReading() {
const response = await fetch("http://localhost:4545/echo_server", {
body: new Uint8Array(5000),
method: "POST",
});

assertEquals(response.body!.locked, false);
const promise = response.arrayBuffer();
assertEquals(response.body!.locked, true);

await promise;
},
);
81 changes: 81 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2292,6 +2292,87 @@ Deno.test("upgradeHttp unix", {
await Promise.all([server, client()]);
});

Deno.test(
{ permissions: { net: true } },
async function httpServerReadLargeBodyWithContentLength() {
const TLS_PACKET_SIZE = 16 * 1024 + 256;
// We want the body to be read in multiple packets
const body = "aa\n" + "deno.land large body\n".repeat(TLS_PACKET_SIZE) +
"zz";

let httpConn: Deno.HttpConn;
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
listener.close();
httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
assertEquals(await request.text(), body);
await respondWith(new Response(body));
})();

const resp = await fetch("http://127.0.0.1:4501/", {
method: "POST",
headers: { "connection": "close" },
body,
});
const text = await resp.text();
assertEquals(text, body);
await promise;

httpConn!.close();
},
);

Deno.test(
{ permissions: { net: true } },
async function httpServerReadLargeBodyWithTransferChunked() {
const TLS_PACKET_SIZE = 16 * 1024 + 256;

// We want the body to be read in multiple packets
const chunks = [
"aa\n",
"deno.land large body\n".repeat(TLS_PACKET_SIZE),
"zz",
];

const body = chunks.join("");

const stream = new TransformStream();
const writer = stream.writable.getWriter();
for (const chunk of chunks) {
writer.write(new TextEncoder().encode(chunk));
}
writer.close();

let httpConn: Deno.HttpConn;
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
listener.close();
httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
assertEquals(await request.text(), body);
await respondWith(new Response(body));
})();

const resp = await fetch("http://127.0.0.1:4501/", {
method: "POST",
headers: { "connection": "close" },
body: stream.readable,
});
const text = await resp.text();
assertEquals(text, body);
await promise;

httpConn!.close();
},
);

function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
Expand Down
21 changes: 21 additions & 0 deletions core/ops_builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(crate) fn init_builtins() -> Extension {
op_add::decl(),
// // TODO(@AaronO): track IO metrics for builtin streams
op_read::decl(),
op_read_all::decl(),
op_write::decl(),
op_shutdown::decl(),
op_metrics::decl(),
Expand Down Expand Up @@ -168,6 +169,26 @@ async fn op_read(
resource.read_return(buf).await.map(|(n, _)| n as u32)
}

#[op]
async fn op_read_all(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. #16115 will allow us to get rid of multiple allocations in this op in a follow up.

state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<ZeroCopyBuf, Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
let (min, maximum) = resource.size_hint();
let size = maximum.unwrap_or(min) as usize;

let mut buffer = Vec::with_capacity(size);
loop {
let tmp = ZeroCopyBuf::new_temp(vec![0u8; 64 * 1024]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume the tmp could be moved out of the loop and only allocated once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error[E0382]: use of moved value: `tmp`

Copy link
Member

@lucacasonato lucacasonato Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aapoalas the whole allocation + copy can go away for known size bodies when #16115 lands

let (nread, tmp) = resource.clone().read_return(tmp).await?;
if nread == 0 {
return Ok(buffer.into());
}
buffer.extend_from_slice(&tmp[..nread]);
}
}

#[op]
async fn op_write(
state: Rc<RefCell<OpState>>,
Expand Down
4 changes: 4 additions & 0 deletions core/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub trait Resource: Any + 'static {
fn backing_fd(self: Rc<Self>) -> Option<std::os::unix::prelude::RawFd> {
None
}

fn size_hint(&self) -> (u64, Option<u64>) {
(64 * 1024, None)
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl dyn Resource {
Expand Down
49 changes: 7 additions & 42 deletions ext/fetch/22_body.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,18 @@
errorReadableStream,
readableStreamClose,
readableStreamDisturb,
readableStreamCollectIntoUint8Array,
createProxy,
ReadableStreamPrototype,
} = globalThis.__bootstrap.streams;
const {
ArrayBufferPrototype,
ArrayBufferIsView,
ArrayPrototypePush,
ArrayPrototypeMap,
JSONParse,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
TypeError,
Uint8Array,
Expand All @@ -66,21 +65,19 @@
}

class InnerBody {
#knownExactLength = null;
#cloned = false;

/**
* @param {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} stream
*/
constructor(stream, knownExactLength) {
constructor(stream) {
/** @type {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} */
this.streamOrStatic = stream ??
{ body: new Uint8Array(), consumed: false };
/** @type {null | Uint8Array | string | Blob | FormData} */
this.source = null;
/** @type {null | number} */
this.length = null;

this.#knownExactLength = knownExactLength;
}

get stream() {
Expand Down Expand Up @@ -144,48 +141,15 @@
* https://fetch.spec.whatwg.org/#concept-body-consume-body
* @returns {Promise<Uint8Array>}
*/
async consume() {
consume() {
if (this.unusable()) throw new TypeError("Body already consumed.");
if (
ObjectPrototypeIsPrototypeOf(
ReadableStreamPrototype,
this.streamOrStatic,
)
) {
const reader = this.stream.getReader();
/** @type {Uint8Array[]} */
const chunks = [];

let finalBuffer = this.#knownExactLength
? new Uint8Array(this.#knownExactLength)
: null;

let totalLength = 0;
while (true) {
const { value: chunk, done } = await reader.read();
if (done) break;

if (finalBuffer) {
// fast path, content-length is present
TypedArrayPrototypeSet(finalBuffer, chunk, totalLength);
} else {
// slow path, content-length is not present
ArrayPrototypePush(chunks, chunk);
}
totalLength += chunk.byteLength;
}

if (finalBuffer) {
return finalBuffer;
}

finalBuffer = new Uint8Array(totalLength);
let i = 0;
for (const chunk of chunks) {
TypedArrayPrototypeSet(finalBuffer, chunk, i);
i += chunk.byteLength;
}
return finalBuffer;
return readableStreamCollectIntoUint8Array(this.stream, this.#cloned);
} else {
this.streamOrStatic.consumed = true;
return this.streamOrStatic.body;
Expand Down Expand Up @@ -222,9 +186,10 @@
* @returns {InnerBody}
*/
clone() {
this.#cloned = true;
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
const [out1, out2] = this.stream.tee();
this.streamOrStatic = out1;
const second = new InnerBody(out2, this.#knownExactLength);
const second = new InnerBody(out2);
second.source = core.deserialize(core.serialize(this.source));
second.length = this.length;
return second;
Expand Down
Loading