Skip to content

Commit

Permalink
refactor: make *_read_all ops return a ZeroCopyBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosc90 committed Sep 29, 2022
1 parent cfbcb3b commit 3e07a56
Show file tree
Hide file tree
Showing 15 changed files with 358 additions and 9 deletions.
16 changes: 16 additions & 0 deletions cli/bench/http/deno_http_flash_post_bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const { serve } = Deno;

async function handler(request) {
try {
const buffer = await request.arrayBuffer();
return new Response(buffer.byteLength);
} catch (e) {
console.log(e);
}
}

serve(handler, { hostname, port });
5 changes: 5 additions & 0 deletions cli/bench/http/deno_http_flash_post_bin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"

file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")
19 changes: 19 additions & 0 deletions cli/bench/http/deno_post_bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const listener = Deno.listen({ hostname, port: Number(port) });
console.log("Server listening on", addr);

for await (const conn of listener) {
(async () => {
const requests = Deno.serveHttp(conn);
for await (const { respondWith, request } of requests) {
if (request.method == "POST") {
const buffer = await request.arrayBuffer();
respondWith(new Response(buffer.byteLength))
.catch((e) => console.log(e));
}
}
})();
}
5 changes: 5 additions & 0 deletions cli/bench/http/deno_post_bin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"

file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")
18 changes: 18 additions & 0 deletions cli/bench/http/node_post_bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
const http = require("http");
const port = process.argv[2] || "4544";
console.log("port", port);
http
.Server((req, res) => {
if (req.method == "POST") {
let chunks = [];
req.on("data", function (data) {
chunks.push(data);
});
req.on("end", function () {
const buffer = Buffer.concat(chunks);
res.end(buffer.byteLength.toString());
});
}
})
.listen(port);
5 changes: 5 additions & 0 deletions cli/bench/http/node_post_bin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"

file = io.open("./cli/bench/testdata/128k.bin", "rb")
wrk.body = file:read("*a")
16 changes: 16 additions & 0 deletions cli/tests/unit/fetch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1789,3 +1789,19 @@ Deno.test(
assertEquals(await res.text(), "ok");
},
);

Deno.test(
{ permissions: { net: true } },
async function fetchResponseStreamIsLockedWhileReading() {
const response = await fetch("http://localhost:4545/echo_server", {
body: new Uint8Array(5000),
method: "POST",
});

assertEquals(response.body!.locked, false);
const promise = response.arrayBuffer();
assertEquals(response.body!.locked, true);

await promise;
},
);
81 changes: 81 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2292,6 +2292,87 @@ Deno.test("upgradeHttp unix", {
await Promise.all([server, client()]);
});

Deno.test(
{ permissions: { net: true } },
async function httpServerReadLargeBodyWithContentLength() {
const TLS_PACKET_SIZE = 16 * 1024 + 256;
// We want the body to be read in multiple packets
const body = "aa\n" + "deno.land large body\n".repeat(TLS_PACKET_SIZE) +
"zz";

let httpConn: Deno.HttpConn;
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
listener.close();
httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
assertEquals(await request.text(), body);
await respondWith(new Response(body));
})();

const resp = await fetch("http://127.0.0.1:4501/", {
method: "POST",
headers: { "connection": "close" },
body,
});
const text = await resp.text();
assertEquals(text, body);
await promise;

httpConn!.close();
},
);

Deno.test(
{ permissions: { net: true } },
async function httpServerReadLargeBodyWithTransferChunked() {
const TLS_PACKET_SIZE = 16 * 1024 + 256;

// We want the body to be read in multiple packets
const chunks = [
"aa\n",
"deno.land large body\n".repeat(TLS_PACKET_SIZE),
"zz",
];

const body = chunks.join("");

const stream = new TransformStream();
const writer = stream.writable.getWriter();
for (const chunk of chunks) {
writer.write(new TextEncoder().encode(chunk));
}
writer.close();

let httpConn: Deno.HttpConn;
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
const conn = await listener.accept();
listener.close();
httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
assertEquals(await request.text(), body);
await respondWith(new Response(body));
})();

const resp = await fetch("http://127.0.0.1:4501/", {
method: "POST",
headers: { "connection": "close" },
body: stream.readable,
});
const text = await resp.text();
assertEquals(text, body);
await promise;

httpConn!.close();
},
);

function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
Expand Down
10 changes: 10 additions & 0 deletions cli/tests/unit/response_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,13 @@ Deno.test(function customInspectFunction() {
);
assertStringIncludes(Deno.inspect(Response.prototype), "Response");
});

Deno.test(async function responseBodyUsed() {
const response = new Response("body");
assert(!response.bodyUsed);
await response.text();
assert(response.bodyUsed);
// .body getter is needed so we can test the faulty code path
response.body;
assert(response.bodyUsed);
});
23 changes: 23 additions & 0 deletions core/ops_builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(crate) fn init_builtins() -> Extension {
op_add::decl(),
// // TODO(@AaronO): track IO metrics for builtin streams
op_read::decl(),
op_read_all::decl(),
op_write::decl(),
op_shutdown::decl(),
op_metrics::decl(),
Expand Down Expand Up @@ -168,6 +169,28 @@ async fn op_read(
resource.read(buf).await.map(|n| n as u32)
}

#[op]
async fn op_read_all(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
mut size: usize,
) -> Result<ZeroCopyBuf, Error> {
if size == 0 {
size = 64 * 1024;
}

let mut buffer = Vec::with_capacity(size);
loop {
let resource = state.borrow().resource_table.get_any(rid)?;
let tmp = ZeroCopyBuf::new_temp(vec![0u8; 64 * 1024]);
let (nread, tmp) = resource.clone().read_return(tmp).await?;
if nread == 0 {
return Ok(buffer.into());
}
buffer.extend_from_slice(&tmp[..nread]);
}
}

#[op]
async fn op_write(
state: Rc<RefCell<OpState>>,
Expand Down
58 changes: 56 additions & 2 deletions ext/fetch/22_body.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
const mimesniff = globalThis.__bootstrap.mimesniff;
const { BlobPrototype } = globalThis.__bootstrap.file;
const {
_disturbed,
isReadableStreamDisturbed,
readableStreamClose,
errorReadableStream,
readableStreamClose,
readableStreamDisturb,
createProxy,
ReadableStreamPrototype,
} = globalThis.__bootstrap.streams;
Expand Down Expand Up @@ -65,11 +69,16 @@

class InnerBody {
#knownExactLength = null;
#rid = null;
#op = null;
#cloned = false;
#closeRid = false;
#terminator = null;

/**
* @param {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} stream
*/
constructor(stream, knownExactLength) {
constructor(stream, opts) {
/** @type {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} */
this.streamOrStatic = stream ??
{ body: new Uint8Array(), consumed: false };
Expand All @@ -78,7 +87,12 @@
/** @type {null | number} */
this.length = null;

this.#knownExactLength = knownExactLength;
// used for fast paths when possible
this.#knownExactLength = opts?.knownExactLength;
this.#rid = opts?.rid;
this.#op = opts?.op; // to be called with (rid, buf)
this.#closeRid = opts?.closeRid;
this.#terminator = opts?.terminator;
}

get stream() {
Expand All @@ -92,6 +106,8 @@
if (consumed) {
this.streamOrStatic = new ReadableStream();
this.streamOrStatic.getReader();
readableStreamDisturb(this.streamOrStatic);
readableStreamClose(this.streamOrStatic);
} else {
this.streamOrStatic = new ReadableStream({
start(controller) {
Expand Down Expand Up @@ -149,6 +165,43 @@
)
) {
const reader = this.stream.getReader();

if (this.#op && !this.#cloned) {
// fast path, read whole body in a single op call
// op must have following signature: (rid, usize = 0): Uint8Array

const closeStream = (err) => {
if (this.#closeRid) {
core.tryClose(this.#rid);
}
if (err) {
throw err;
}
readableStreamClose(this.stream);
};

try {
// We need to mimic stream behavior, set to disturbed
this.stream[_disturbed] = true;
const buf = await core.opAsync(
this.#op,
this.#rid,
this.#knownExactLength || 0,
);
if (this.#terminator?.aborted) {
throw this.#terminator.reason;
}

closeStream();

return buf;
} catch (err) {
closeStream(this.#terminator?.reason || err);
throw err;
}
}

// slow path
/** @type {Uint8Array[]} */
const chunks = [];

Expand Down Expand Up @@ -218,6 +271,7 @@
* @returns {InnerBody}
*/
clone() {
this.#cloned = true;
const [out1, out2] = this.stream.tee();
this.streamOrStatic = out1;
const second = new InnerBody(out2, this.#knownExactLength);
Expand Down
8 changes: 7 additions & 1 deletion ext/fetch/26_fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,13 @@
} else {
response.body = new InnerBody(
createResponseBodyStream(resp.responseRid, terminator),
resp.contentLength,
{
knownExactLength: resp.contentLength,
rid: resp.responseRid,
op: "op_read_all",
closeRid: true,
terminator,
},
);
}
}
Expand Down
13 changes: 11 additions & 2 deletions ext/http/01_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
return null;
}

const [streamRid, method, url] = nextRequest;
const [streamRid, method, url, contentLength] = nextRequest;
SetPrototypeAdd(this.managedResources, streamRid);

/** @type {ReadableStream<Uint8Array> | undefined} */
Expand All @@ -128,7 +128,16 @@
() => method,
url,
() => ops.op_http_headers(streamRid),
body !== null ? new InnerBody(body) : null,
body !== null
? new InnerBody(
body,
{
knownExactLength: contentLength,
rid: streamRid,
op: "op_http_read_all",
},
)
: null,
false,
);
const signal = abortSignal.newSignal();
Expand Down
Loading

0 comments on commit 3e07a56

Please sign in to comment.