Skip to content

Commit 143bceb

Browse files
committed
fs: add signal option to filehandle.readableWebStream()
1 parent 139c2e1 commit 143bceb

File tree

3 files changed

+169
-3
lines changed

3 files changed

+169
-3
lines changed

doc/api/fs.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,9 @@ number of bytes read is zero.
481481
<!-- YAML
482482
added: v17.0.0
483483
changes:
484+
- version: REPLACEME
485+
pr-url: https://github.com/nodejs/node/pull/58725
486+
description: Added the `signal` option.
484487
- version: v24.2.0
485488
pr-url: https://github.com/nodejs/node/pull/58548
486489
description: Added the `autoClose` option.
@@ -502,6 +505,7 @@ changes:
502505
* `options` {Object}
503506
* `autoClose` {boolean} When true, causes the {FileHandle} to be closed when the
504507
stream is closed. **Default:** `false`
508+
* `signal` {AbortSignal|undefined} allows aborting the stream. **Default:** `undefined`
505509
* Returns: {ReadableStream}
506510
507511
Returns a byte-oriented `ReadableStream` that may be used to read the file's

lib/internal/fs/promises.js

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,11 @@ class FileHandle extends EventEmitter {
278278
/**
279279
* @typedef {import('../webstreams/readablestream').ReadableStream
280280
* } ReadableStream
281-
* @param {{ type?: 'bytes', autoClose?: boolean }} [options]
281+
* @param {{
282+
type?: 'bytes';
283+
autoClose?: boolean;
284+
signal?: AbortSignal;
285+
}} [options]
282286
* @returns {ReadableStream}
283287
*/
284288
readableWebStream(options = kEmptyObject) {
@@ -294,9 +298,18 @@ class FileHandle extends EventEmitter {
294298
const {
295299
type = 'bytes',
296300
autoClose = false,
301+
signal,
297302
} = options;
298303

299304
validateBoolean(autoClose, 'options.autoClose');
305+
validateAbortSignal(signal, 'options.signal');
306+
307+
// Can't use checkAborted() because we may need to close filehandle first
308+
if (signal?.aborted) {
309+
if (autoClose) this.close();
310+
throw new AbortError(undefined, { cause: signal.reason });
311+
}
312+
let signalListenerCleanup = null;
300313

301314
if (type !== 'bytes') {
302315
process.emitWarning(
@@ -308,6 +321,7 @@ class FileHandle extends EventEmitter {
308321

309322
const readFn = FunctionPrototypeBind(this.read, this);
310323
const ondone = async () => {
324+
signalListenerCleanup?.();
311325
this[kUnref]();
312326
if (autoClose) await this.close();
313327
};
@@ -317,6 +331,21 @@ class FileHandle extends EventEmitter {
317331
type: 'bytes',
318332
autoAllocateChunkSize: 16384,
319333

334+
start: (controller) => {
335+
if (signal) {
336+
const onAbort = async () => {
337+
this.off('close', cancelOnClose);
338+
controller.error(new AbortError(undefined, { cause: signal.reason }));
339+
await ondone();
340+
};
341+
342+
signal.addEventListener('abort', onAbort, { __proto__: null, once: true });
343+
signalListenerCleanup = () => {
344+
signal.removeEventListener('abort', onAbort);
345+
};
346+
}
347+
},
348+
320349
async pull(controller) {
321350
const view = controller.byobRequest.view;
322351
const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength);
@@ -339,9 +368,11 @@ class FileHandle extends EventEmitter {
339368
readableStreamCancel,
340369
} = require('internal/webstreams/readablestream');
341370
this[kRef]();
342-
this.once('close', () => {
371+
372+
const cancelOnClose = () => {
343373
readableStreamCancel(readable);
344-
});
374+
};
375+
this.once('close', cancelOnClose);
345376

346377
return readable;
347378
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import { skip } from '../common/index.mjs';
2+
import { open, access, constants } from 'node:fs/promises';
3+
import assert from 'node:assert';
4+
5+
// Test that signal option in filehandle.readableWebStream() works
6+
7+
const shortFile = new URL(import.meta.url);
8+
9+
{
10+
await using fh = await open(shortFile);
11+
const cause = new Error('am abort reason');
12+
const controller = new AbortController();
13+
const { signal } = controller;
14+
controller.abort(cause);
15+
assert.throws(() => fh.readableWebStream({ signal, autoClose: false }), {
16+
code: 'ABORT_ERR',
17+
cause,
18+
});
19+
20+
// Filehandle must be still open
21+
await fh.read();
22+
}
23+
24+
{
25+
await using fh = await open(shortFile);
26+
const cause = new Error('am abort reason');
27+
const controller = new AbortController();
28+
const { signal } = controller;
29+
controller.abort(cause);
30+
assert.throws(() => fh.readableWebStream({ signal, autoClose: true }), {
31+
code: 'ABORT_ERR',
32+
cause,
33+
});
34+
35+
// Filehandle must be closed after abort
36+
await assert.rejects(() => fh.read(), {
37+
code: 'EBADF',
38+
});
39+
}
40+
41+
{
42+
await using fh = await open(shortFile);
43+
const cause = new Error('am abort reason');
44+
const controller = new AbortController();
45+
const { signal } = controller;
46+
const stream = fh.readableWebStream({ signal, autoClose: false });
47+
const reader = stream.getReader();
48+
controller.abort(cause);
49+
await assert.rejects(() => reader.read(), {
50+
code: 'ABORT_ERR',
51+
cause,
52+
});
53+
54+
// Filehandle must be still open
55+
await fh.read();
56+
}
57+
58+
{
59+
await using fh = await open(shortFile);
60+
const cause = new Error('am abort reason');
61+
const controller = new AbortController();
62+
const { signal } = controller;
63+
const stream = fh.readableWebStream({ signal, autoClose: true });
64+
const reader = stream.getReader();
65+
controller.abort(cause);
66+
await assert.rejects(() => reader.read(), {
67+
code: 'ABORT_ERR',
68+
cause,
69+
});
70+
71+
// Filehandle must be closed after abort
72+
await assert.rejects(() => fh.read(), {
73+
code: 'EBADF',
74+
});
75+
}
76+
77+
const longFile = new URL('file:///dev/zero');
78+
79+
try {
80+
await access(longFile, constants.R_OK);
81+
} catch {
82+
skip('Can not perform long test');
83+
}
84+
85+
{
86+
await using fh = await open(longFile);
87+
const cause = new Error('am abort reason');
88+
const controller = new AbortController();
89+
const { signal } = controller;
90+
const stream = fh.readableWebStream({ signal, autoClose: false });
91+
const reader = stream.getReader();
92+
setTimeout(() => controller.abort(cause), 100);
93+
await assert.rejects(async () => {
94+
while (true) {
95+
await new Promise((resolve) => setTimeout(resolve, 5));
96+
const { done } = await reader.read();
97+
assert.ok(done === false, 'we exhausted /dev/zero');
98+
}
99+
}, {
100+
code: 'ABORT_ERR',
101+
cause,
102+
});
103+
104+
// Filehandle must be still open
105+
await fh.read();
106+
}
107+
108+
{
109+
await using fh = await open(longFile);
110+
const cause = new Error('am abort reason');
111+
const controller = new AbortController();
112+
const { signal } = controller;
113+
const stream = fh.readableWebStream({ signal, autoClose: true });
114+
const reader = stream.getReader();
115+
setTimeout(() => controller.abort(cause), 100);
116+
await assert.rejects(async () => {
117+
while (true) {
118+
await new Promise((resolve) => setTimeout(resolve, 5));
119+
const { done } = await reader.read();
120+
assert.ok(done === false, 'we exhausted /dev/zero');
121+
}
122+
}, {
123+
code: 'ABORT_ERR',
124+
cause,
125+
});
126+
127+
// Filehandle must be closed after abort
128+
await assert.rejects(() => fh.read(), {
129+
code: 'EBADF',
130+
});
131+
}

0 commit comments

Comments
 (0)