Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ext/streams): fast path when consuming body of tee'd stream #16329

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,62 @@ Deno.test(
},
);

Deno.test(
{ permissions: { net: true } },
async function httpServerRequestResponseClone() {
const body = "deno".repeat(64 * 1024);
let httpConn: Deno.HttpConn;
const listener = Deno.listen({ port: 4501 });
const promise = (async () => {
const conn = await listener.accept();
listener.close();
httpConn = Deno.serveHttp(conn);
const reqEvent = await httpConn.nextRequest();
assert(reqEvent);
const { request, respondWith } = reqEvent;
const clone = request.clone();
const reader = clone.body!.getReader();

// get first chunk from branch2
const clonedChunks = [];
const { value, done } = await reader.read();
assert(!done);
clonedChunks.push(value);

// consume request after first chunk single read
// readAll should read correctly the rest of the body.
// firstChunk should be in the stream internal buffer
const body1 = await request.text();

while (true) {
const { value, done } = await reader.read();
if (done) break;
clonedChunks.push(value);
}
let offset = 0;
const body2 = new Uint8Array(body.length);
for (const chunk of clonedChunks) {
body2.set(chunk, offset);
offset += chunk.byteLength;
}

assertEquals(body1, body);
assertEquals(body1, new TextDecoder().decode(body2));
await respondWith(new Response(body));
})();

const response = await fetch("http://localhost:4501", {
body,
method: "POST",
});
const clone = response.clone();
assertEquals(await response.text(), await clone.text());

await promise;
httpConn!.close();
},
);

function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
Expand Down
1 change: 1 addition & 0 deletions core/01_core.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@
close: (rid) => ops.op_close(rid),
tryClose: (rid) => ops.op_try_close(rid),
read: opAsync.bind(null, "op_read"),
readAll: opAsync.bind(null, "op_read_all"),
write: opAsync.bind(null, "op_write"),
writeAll: opAsync.bind(null, "op_write_all"),
shutdown: opAsync.bind(null, "op_shutdown"),
Expand Down
27 changes: 27 additions & 0 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,9 @@
const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => {
core.tryClose(rid);
});

const _readAll = Symbol("[[readAll]]");
const _original = Symbol("[[original]]");
/**
* Create a new ReadableStream object that is backed by a Resource that
* implements `Resource::read_return`. This object contains enough metadata to
Expand Down Expand Up @@ -681,6 +684,17 @@
async pull(controller) {
const v = controller.byobRequest.view;
try {
if (controller[_readAll] === true) {
// fast path for tee'd streams consuming body
const chunk = await core.readAll(rid);
if (chunk.byteLength > 0) {
controller.enqueue(chunk);
}
controller.close();
tryClose();
return;
}

const bytesRead = await core.read(rid, v);
if (bytesRead === 0) {
tryClose();
Expand Down Expand Up @@ -809,8 +823,17 @@
/** @type {Uint8Array[]} */
const chunks = [];
let totalLength = 0;

// tee'd stream
if (stream[_original]) {
// One of the branches is consuming the stream
// signal controller.pull that we can consume it in a single op
stream[_original][_controller][_readAll] = true;
}

while (true) {
const { value: chunk, done } = await reader.read();

if (done) break;

if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, chunk)) {
Expand Down Expand Up @@ -3029,6 +3052,10 @@
pull2Algorithm,
cancel2Algorithm,
);

branch1[_original] = stream;
branch2[_original] = stream;

forwardReaderError(reader);
return [branch1, branch2];
}
Expand Down