Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(runtime/spawn): collect output using op_read_all #16596

Merged
merged 6 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cli/bench/spawn.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

Deno.bench("echo deno", async () => {
await Deno.spawn("echo", { args: ["deno"] });
});

Deno.bench("cat 128kb", async () => {
await Deno.spawn("cat", {
args: ["./cli/bench/testdata/128k.bin"],
});
});
30 changes: 26 additions & 4 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@
const stream = webidl.createBranded(ReadableStream);
stream[promiseIdSymbol] = undefined;
stream[_isUnref] = false;
stream[_resourceBackingUnrefable] = { rid, autoClose: true };
const underlyingSource = {
type: "bytes",
async pull(controller) {
Expand Down Expand Up @@ -767,16 +768,24 @@
return stream;
}

function readableStreamIsUnrefable(stream) {
return _isUnref in stream;
}

function readableStreamForRidUnrefableRef(stream) {
if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
if (!readableStreamIsUnrefable(stream)) {
throw new TypeError("Not an unrefable stream");
}
stream[_isUnref] = false;
if (stream[promiseIdSymbol] !== undefined) {
core.refOp(stream[promiseIdSymbol]);
}
}

function readableStreamForRidUnrefableUnref(stream) {
if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
if (!readableStreamIsUnrefable(stream)) {
throw new TypeError("Not an unrefable stream");
}
stream[_isUnref] = true;
if (stream[promiseIdSymbol] !== undefined) {
core.unrefOp(stream[promiseIdSymbol]);
Expand All @@ -787,15 +796,25 @@
return stream[_resourceBacking];
}

function getReadableStreamResourceBackingUnrefable(stream) {
return stream[_resourceBackingUnrefable];
}

async function readableStreamCollectIntoUint8Array(stream) {
const resourceBacking = getReadableStreamResourceBacking(stream);
const resourceBacking = getReadableStreamResourceBacking(stream) ||
getReadableStreamResourceBackingUnrefable(stream);
const reader = acquireReadableStreamDefaultReader(stream);

if (resourceBacking) {
// fast path, read whole body in a single op call
try {
readableStreamDisturb(stream);
const buf = await core.opAsync("op_read_all", resourceBacking.rid);
const promise = core.opAsync("op_read_all", resourceBacking.rid);
if (readableStreamIsUnrefable(stream)) {
const promiseId = stream[promiseIdSymbol] = promise[promiseIdSymbol];
if (stream[_isUnref]) core.unrefOp(promiseId);
}
const buf = await promise;
readableStreamThrowIfErrored(stream);
readableStreamClose(stream);
return buf;
Expand Down Expand Up @@ -4585,6 +4604,9 @@
}

const _resourceBacking = Symbol("[[resourceBacking]]");
// This distinction exists to prevent unrefable streams being used in
// regular fast streams that are unaware of refability
const _resourceBackingUnrefable = Symbol("[[resourceBackingUnrefable]]");
/** @template R */
class ReadableStream {
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
Expand Down
20 changes: 3 additions & 17 deletions runtime/js/40_spawn.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
ObjectEntries,
String,
TypeError,
Uint8Array,
PromisePrototypeThen,
SafePromiseAll,
SymbolFor,
} = window.__bootstrap.primordials;
const {
readableStreamCollectIntoUint8Array,
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
Expand Down Expand Up @@ -64,26 +64,12 @@
};
}

async function collectOutput(readableStream) {
function collectOutput(readableStream) {
if (!(readableStream instanceof ReadableStream)) {
return null;
}

const bufs = [];
let size = 0;
for await (const chunk of readableStream) {
bufs.push(chunk);
size += chunk.byteLength;
}

const buffer = new Uint8Array(size);
let offset = 0;
for (const chunk of bufs) {
buffer.set(chunk, offset);
offset += chunk.byteLength;
}

return buffer;
return readableStreamCollectIntoUint8Array(readableStream);
}

class Child {
Expand Down