Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs: implement byob mode for readableWebStream() #46933

Merged
merged 3 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup! update logic, add tests and doc
  • Loading branch information
debadree25 committed Mar 4, 2023
commit 698ab098fa6271fcd7789fccb992d46fe5520e45
4 changes: 4 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ number of bytes read is zero.

<!-- YAML
added: v17.0.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46933
description: Added option to create a 'bytes' stream.
-->

> Stability: 1 - Experimental
Expand Down
25 changes: 15 additions & 10 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,10 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
readableStreamCancel,
ReadableStream,
} = require('internal/webstreams/readablestream');
this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});

const readFn = FunctionPrototypeBind(this.read, this);
const ondone = FunctionPrototypeBind(this[kUnref], this);

readable = new ReadableStream({
type: 'bytes',
autoAllocateChunkSize: 16384,
Expand All @@ -300,16 +299,22 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength);

if (bytesRead === 0) {
try {
controller.close();
this[kUnref]();
} catch (error) {
controller.error(error);
}
ondone();
controller.close();
}

controller.byobRequest.respond(bytesRead);
},

cancel() {
ondone();
},
});

this[kRef]();

this.once('close', () => {
readableStreamCancel(readable);
});
}

Expand Down
84 changes: 84 additions & 0 deletions test/parallel/test-filehandle-readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,87 @@ const check = readFileSync(__filename, { encoding: 'utf8' });
mc.port1.close();
await file.close();
})().then(common.mustCall());

// Make sure 'bytes' stream works
(async () => {
const file = await open(__filename);
const dec = new TextDecoder();
const readable = file.readableWebStream({ type: 'bytes' });
const reader = readable.getReader({ mode: 'byob' });

let data = '';
let result;
do {
const buff = new ArrayBuffer(100);
result = await reader.read(new DataView(buff));
if (result.value !== undefined) {
data += dec.decode(result.value);
assert.ok(result.value.byteLength <= 100);
}
} while (!result.done);

assert.strictEqual(check, data);

assert.throws(() => file.readableWebStream(), {
code: 'ERR_INVALID_STATE',
});

await file.close();
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream 'bytes' stream
// fails if the FileHandle is already closed.
(async () => {
const file = await open(__filename);
await file.close();

assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream 'bytes' stream
// fails if the FileHandle is already closing.
(async () => {
const file = await open(__filename);
file.close();

assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure the 'bytes' ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream({ type: 'bytes' });
const reader = readable.getReader({ mode: 'byob' });
file.close();
await reader.closed;
})().then(common.mustCall());

// Make sure the 'bytes' ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream({ type: 'bytes' });
file.close();
const reader = readable.getReader({ mode: 'byob' });
await reader.closed;
})().then(common.mustCall());

// Make sure that the FileHandle is properly marked "in use"
// when a 'bytes' ReadableStream has been acquired for it.
(async () => {
const file = await open(__filename);
file.readableWebStream({ type: 'bytes' });
const mc = new MessageChannel();
mc.port1.onmessage = common.mustNotCall();
assert.throws(() => mc.port2.postMessage(file, [file]), {
code: 25,
name: 'DataCloneError',
});
mc.port1.close();
await file.close();
})().then(common.mustCall());