Skip to content

Commit 6cd12be

Browse files
committed
fs: add FileHandle.prototype.readableWebStream()
Adds an experimental `readableWebStream()` method to `FileHandle` that returns a web `ReadableStream` Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: #39331 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent e2a6399 commit 6cd12be

File tree

5 files changed

+195
-3
lines changed

5 files changed

+195
-3
lines changed

doc/api/fs.md

+46
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,52 @@ Reads data from the file and stores that in the given buffer.
302302
If the file is not modified concurrently, the end-of-file is reached when the
303303
number of bytes read is zero.
304304
305+
#### `filehandle.readableWebStream()`
306+
<!-- YAML
307+
added: REPLACEME
308+
-->
309+
310+
> Stability: 1 - Experimental
311+
312+
* Returns: {ReadableStream}
313+
314+
Returns a `ReadableStream` that may be used to read the files data.
315+
316+
An error will be thrown if this method is called more than once or is called
317+
after the `FileHandle` is closed or closing.
318+
319+
```mjs
320+
import {
321+
open,
322+
} from 'node:fs/promises';
323+
324+
const file = await open('./some/file/to/read');
325+
326+
for await (const chunk of file.readableWebStream())
327+
console.log(chunk);
328+
329+
await file.close();
330+
```
331+
332+
```cjs
333+
const {
334+
open,
335+
} = require('fs/promises');
336+
337+
(async () => {
338+
const file = await open('./some/file/to/read');
339+
340+
for await (const chunk of file.readableWebStream())
341+
console.log(chunk);
342+
343+
await file.close();
344+
})();
345+
```
346+
347+
While the `ReadableStream` will read the file to completion, it will not
348+
close the `FileHandle` automatically. User code must still call the
349+
`fileHandle.close()` method.
350+
305351
#### `filehandle.readFile(options)`
306352
<!-- YAML
307353
added: v10.0.0

lib/internal/fs/promises.js

+37-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const {
3030
codes: {
3131
ERR_FS_FILE_TOO_LARGE,
3232
ERR_INVALID_ARG_VALUE,
33+
ERR_INVALID_STATE,
3334
ERR_METHOD_NOT_IMPLEMENTED,
3435
},
3536
AbortError,
@@ -90,12 +91,21 @@ const kCloseResolve = Symbol('kCloseResolve');
9091
const kCloseReject = Symbol('kCloseReject');
9192
const kRef = Symbol('kRef');
9293
const kUnref = Symbol('kUnref');
94+
const kLocked = Symbol('kLocked');
9395

9496
const { kUsePromises } = binding;
9597
const {
9698
JSTransferable, kDeserialize, kTransfer, kTransferList
9799
} = require('internal/worker/js_transferable');
98100

101+
const {
102+
newReadableStreamFromStreamBase,
103+
} = require('internal/webstreams/adapters');
104+
105+
const {
106+
readableStreamCancel,
107+
} = require('internal/webstreams/readablestream');
108+
99109
const getDirectoryEntriesPromise = promisify(getDirents);
100110
const validateRmOptionsPromise = promisify(validateRmOptions);
101111

@@ -209,6 +219,33 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
209219
return this[kClosePromise];
210220
}
211221

222+
/**
223+
* @typedef {import('../webstreams/readablestream').ReadableStream
224+
* } ReadableStream
225+
* @returns {ReadableStream}
226+
*/
227+
readableWebStream() {
228+
if (this[kFd] === -1)
229+
throw new ERR_INVALID_STATE('The FileHandle is closed');
230+
if (this[kClosePromise])
231+
throw new ERR_INVALID_STATE('The FileHandle is closing');
232+
if (this[kLocked])
233+
throw new ERR_INVALID_STATE('The FileHandle is locked');
234+
this[kLocked] = true;
235+
236+
const readable = newReadableStreamFromStreamBase(
237+
this[kHandle],
238+
undefined,
239+
{ ondone: () => this[kUnref]() });
240+
241+
this[kRef]();
242+
this.once('close', () => {
243+
readableStreamCancel(readable);
244+
});
245+
246+
return readable;
247+
}
248+
212249
[kTransfer]() {
213250
if (this[kClosePromise] || this[kRefs] > 1) {
214251
throw lazyDOMException('Cannot transfer FileHandle while in use',
@@ -788,8 +825,6 @@ module.exports = {
788825
appendFile,
789826
readFile,
790827
watch,
791-
792-
kHandle,
793828
},
794829

795830
FileHandle,

lib/internal/webstreams/adapters.js

+20-1
Original file line numberDiff line numberDiff line change
@@ -859,12 +859,20 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
859859
* @param {QueuingStrategy} strategy
860860
* @returns {ReadableStream}
861861
*/
862-
function newReadableStreamFromStreamBase(streamBase, strategy) {
862+
function newReadableStreamFromStreamBase(streamBase, strategy, options = {}) {
863863
validateObject(streamBase, 'streamBase');
864+
validateObject(options, 'options');
865+
866+
const {
867+
ondone = () => {},
868+
} = options;
864869

865870
if (typeof streamBase.onread === 'function')
866871
throw new ERR_INVALID_STATE('StreamBase already has a consumer');
867872

873+
if (typeof ondone !== 'function')
874+
throw new ERR_INVALID_ARG_TYPE('options.ondone', 'Function', ondone);
875+
868876
let controller;
869877

870878
streamBase.onread = (arrayBuffer) => {
@@ -877,6 +885,11 @@ function newReadableStreamFromStreamBase(streamBase, strategy) {
877885
if (nread === UV_EOF) {
878886
controller.close();
879887
streamBase.readStop();
888+
try {
889+
ondone();
890+
} catch (error) {
891+
controller.error(error);
892+
}
880893
return;
881894
}
882895

@@ -899,6 +912,12 @@ function newReadableStreamFromStreamBase(streamBase, strategy) {
899912

900913
cancel() {
901914
const promise = createDeferredPromise();
915+
try {
916+
ondone();
917+
} catch (error) {
918+
promise.reject(error);
919+
return promise.promise;
920+
}
902921
const req = new ShutdownWrap();
903922
req.oncomplete = () => promise.resolve();
904923
const err = streamBase.shutdown(req);

test/parallel/test-bootstrap-modules.js

+5
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ const expectedModules = new Set([
120120
'NativeModule internal/util/inspect',
121121
'NativeModule internal/util/iterable_weak_map',
122122
'NativeModule internal/util/types',
123+
'NativeModule internal/webstreams/util',
124+
'NativeModule internal/webstreams/writablestream',
125+
'NativeModule internal/webstreams/readablestream',
126+
'NativeModule internal/webstreams/queuingstrategies',
127+
'NativeModule internal/webstreams/adapters',
123128
'NativeModule internal/validators',
124129
'NativeModule internal/vm/module',
125130
'NativeModule internal/worker/io',
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
6+
const {
7+
readFileSync,
8+
} = require('fs');
9+
10+
const {
11+
open,
12+
} = require('fs/promises');
13+
14+
const check = readFileSync(__filename, { encoding: 'utf8' });
15+
16+
// Make sure the ReadableStream works...
17+
(async () => {
18+
const dec = new TextDecoder();
19+
const file = await open(__filename);
20+
let data = '';
21+
for await (const chunk of file.readableWebStream())
22+
data += dec.decode(chunk);
23+
24+
assert.strictEqual(check, data);
25+
26+
assert.throws(() => file.readableWebStream(), {
27+
code: 'ERR_INVALID_STATE',
28+
});
29+
30+
await file.close();
31+
})().then(common.mustCall());
32+
33+
// Make sure that acquiring a ReadableStream fails if the
34+
// FileHandle is already closed.
35+
(async () => {
36+
const file = await open(__filename);
37+
await file.close();
38+
39+
assert.throws(() => file.readableWebStream(), {
40+
code: 'ERR_INVALID_STATE',
41+
});
42+
})().then(common.mustCall());
43+
44+
// Make sure that acquiring a ReadableStream fails if the
45+
// FileHandle is already closing.
46+
(async () => {
47+
const file = await open(__filename);
48+
file.close();
49+
50+
assert.throws(() => file.readableWebStream(), {
51+
code: 'ERR_INVALID_STATE',
52+
});
53+
})().then(common.mustCall());
54+
55+
// Make sure the ReadableStream is closed when the underlying
56+
// FileHandle is closed.
57+
(async () => {
58+
const file = await open(__filename);
59+
const readable = file.readableWebStream();
60+
const reader = readable.getReader();
61+
file.close();
62+
await reader.closed;
63+
})().then(common.mustCall());
64+
65+
// Make sure the ReadableStream is closed when the underlying
66+
// FileHandle is closed.
67+
(async () => {
68+
const file = await open(__filename);
69+
const readable = file.readableWebStream();
70+
file.close();
71+
const reader = readable.getReader();
72+
await reader.closed;
73+
})().then(common.mustCall());
74+
75+
// Make sure that the FileHandle is properly marked "in use"
76+
// when a ReadableStream has been acquired for it.
77+
(async () => {
78+
const file = await open(__filename);
79+
file.readableWebStream();
80+
const mc = new MessageChannel();
81+
mc.port1.onmessage = common.mustNotCall();
82+
assert.throws(() => mc.port2.postMessage(file, [file]), {
83+
code: 25 // DataCloneError
84+
});
85+
mc.port1.close();
86+
await file.close();
87+
})().then(common.mustCall());

0 commit comments

Comments
 (0)