Skip to content

Commit 9e04070

Browse files
addaleaxMylesBorins
authored andcommitted
worker: add option to track unmanaged file descriptors
Add a public option for Workers which adds tracking for raw file descriptors, as currently, those resources are not cleaned up, unlike e.g. `FileHandle`s. PR-URL: #34303 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent 0f6805d commit 9e04070

File tree

5 files changed

+89
-3
lines changed

5 files changed

+89
-3
lines changed

doc/api/worker_threads.md

+12
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,10 @@ if (isMainThread) {
620620
<!-- YAML
621621
added: v10.5.0
622622
changes:
623+
- version:
624+
- REPLACEME
625+
pr-url: https://github.com/nodejs/node/pull/34303
626+
description: The `trackUnmanagedFds` option was introduced.
623627
- version: v14.0.0
624628
pr-url: https://github.com/nodejs/node/pull/32278
625629
description: The `transferList` option was introduced.
@@ -675,6 +679,12 @@ changes:
675679
occur as described in the [HTML structured clone algorithm][], and an error
676680
will be thrown if the object cannot be cloned (e.g. because it contains
677681
`function`s).
682+
* `trackUnmanagedFds` {boolean} If this is set to `true`, then the Worker will
683+
track raw file descriptors managed through [`fs.open()`][] and
684+
[`fs.close()`][], and close them when the Worker exits, similar to other
685+
resources like network sockets or file descriptors managed through
686+
the [`FileHandle`][] API. This option is automatically inherited by all
687+
nested `Worker`s. **Default**: `false`.
678688
* `transferList` {Object[]} If one or more `MessagePort`-like objects
679689
are passed in `workerData`, a `transferList` is required for those
680690
items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] will be thrown.
@@ -894,6 +904,8 @@ active handle in the event system. If the worker is already `unref()`ed calling
894904
[`WebAssembly.Module`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WebAssembly/Module
895905
[`Worker`]: #worker_threads_class_worker
896906
[`cluster` module]: cluster.html
907+
[`fs.open()`]: fs.html#fs_fs_open_path_flags_mode_callback
908+
[`fs.close()`]: fs.html#fs_fs_close_fd_callback
897909
[`markAsUntransferable()`]: #worker_threads_worker_markasuntransferable_object
898910
[`port.on('message')`]: #worker_threads_event_message
899911
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage

lib/internal/worker.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ class Worker extends EventEmitter {
151151
this[kHandle] = new WorkerImpl(url,
152152
env === process.env ? null : env,
153153
options.execArgv,
154-
parseResourceLimits(options.resourceLimits));
154+
parseResourceLimits(options.resourceLimits),
155+
!!options.trackUnmanagedFds);
155156
if (this[kHandle].invalidExecArgv) {
156157
throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
157158
}

src/node_worker.cc

+5-2
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ void Worker::Run() {
309309
context,
310310
std::move(argv_),
311311
std::move(exec_argv_),
312-
EnvironmentFlags::kNoFlags,
312+
static_cast<EnvironmentFlags::Flags>(environment_flags_),
313313
thread_id_,
314314
std::move(inspector_parent_handle_)));
315315
if (is_stopped()) return;
@@ -456,7 +456,6 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
456456

457457
std::vector<std::string> exec_argv_out;
458458

459-
CHECK_EQ(args.Length(), 4);
460459
// Argument might be a string or URL
461460
if (!args[0]->IsNullOrUndefined()) {
462461
Utf8Value value(
@@ -582,6 +581,10 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
582581
CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
583582
limit_info->CopyContents(worker->resource_limits_,
584583
sizeof(worker->resource_limits_));
584+
585+
CHECK(args[4]->IsBoolean());
586+
if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
587+
worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
585588
}
586589

587590
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {

src/node_worker.h

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class Worker : public AsyncWrap {
114114
bool stopped_ = true;
115115

116116
bool has_ref_ = true;
117+
uint64_t environment_flags_ = EnvironmentFlags::kNoFlags;
117118

118119
// The real Environment of the worker object. It has a lesser
119120
// lifespan than the worker object itself - comes to life
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const { Worker } = require('worker_threads');
5+
const { once } = require('events');
6+
const fs = require('fs');
7+
8+
// All the tests here are run sequentially, to avoid accidentally opening an fd
9+
// which another part of the test expects to be closed.
10+
11+
const preamble = `
12+
const fs = require("fs");
13+
const { parentPort } = require('worker_threads');
14+
const __filename = ${JSON.stringify(__filename)};
15+
process.on('warning', (warning) => parentPort.postMessage({ warning }));
16+
`;
17+
18+
(async () => {
19+
// Consistency check: Without trackUnmanagedFds, FDs are *not* closed.
20+
{
21+
const w = new Worker(`${preamble}
22+
parentPort.postMessage(fs.openSync(__filename));
23+
`, { eval: true, trackUnmanagedFds: false });
24+
const [ [ fd ] ] = await Promise.all([once(w, 'message'), once(w, 'exit')]);
25+
assert(fd > 2);
26+
fs.fstatSync(fd); // Does not throw.
27+
fs.closeSync(fd);
28+
}
29+
30+
// With trackUnmanagedFds, FDs are closed automatically.
31+
{
32+
const w = new Worker(`${preamble}
33+
parentPort.postMessage(fs.openSync(__filename));
34+
`, { eval: true, trackUnmanagedFds: true });
35+
const [ [ fd ] ] = await Promise.all([once(w, 'message'), once(w, 'exit')]);
36+
assert(fd > 2);
37+
assert.throws(() => fs.fstatSync(fd), { code: 'EBADF' });
38+
}
39+
40+
// There is a warning when an fd is unexpectedly opened twice.
41+
{
42+
const w = new Worker(`${preamble}
43+
parentPort.postMessage(fs.openSync(__filename));
44+
parentPort.once('message', () => {
45+
const reopened = fs.openSync(__filename);
46+
fs.closeSync(reopened);
47+
});
48+
`, { eval: true, trackUnmanagedFds: true });
49+
const [ fd ] = await once(w, 'message');
50+
fs.closeSync(fd);
51+
w.postMessage('');
52+
const [ { warning } ] = await once(w, 'message');
53+
assert.match(warning.message,
54+
/File descriptor \d+ opened in unmanaged mode twice/);
55+
}
56+
57+
// There is a warning when an fd is unexpectedly closed.
58+
{
59+
const w = new Worker(`${preamble}
60+
parentPort.once('message', (fd) => {
61+
fs.closeSync(fd);
62+
});
63+
`, { eval: true, trackUnmanagedFds: true });
64+
w.postMessage(fs.openSync(__filename));
65+
const [ { warning } ] = await once(w, 'message');
66+
assert.match(warning.message,
67+
/File descriptor \d+ closed but not opened in unmanaged mode/);
68+
}
69+
})().then(common.mustCall());

0 commit comments

Comments
 (0)