Skip to content

Commit f947f3a

Browse files
committed
worker: add everysync
Signed-off-by: Matteo Collina <hello@matteocollina.com>
1 parent ca74d64 commit f947f3a

File tree

10 files changed

+442
-0
lines changed

10 files changed

+442
-0
lines changed

LICENSE

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2639,3 +2639,28 @@ The externally maintained libraries used by Node.js are:
26392639
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26402640
SOFTWARE.
26412641
"""
2642+
2643+
- everysync, located at lib/internal/worker/everysync, is licensed as follows:
2644+
"""
2645+
MIT License
2646+
2647+
Copyright (c) 2024 Matteo Collina
2648+
2649+
Permission is hereby granted, free of charge, to any person obtaining a copy
2650+
of this software and associated documentation files (the "Software"), to deal
2651+
in the Software without restriction, including without limitation the rights
2652+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
2653+
copies of the Software, and to permit persons to whom the Software is
2654+
furnished to do so, subject to the following conditions:
2655+
2656+
The above copyright notice and this permission notice shall be included in all
2657+
copies or substantial portions of the Software.
2658+
2659+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
2660+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
2661+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
2662+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
2663+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
2664+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2665+
SOFTWARE.
2666+
"""

doc/api/worker_threads.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1527,6 +1527,79 @@ Calling `unref()` on a worker allows the thread to exit if this is the only
15271527
active handle in the event system. If the worker is already `unref()`ed calling
15281528
`unref()` again has no effect.
15291529

1530+
## `worker.makeSync(buffer[, options])`
1531+
1532+
<!-- YAML
1533+
added: REPLACEME
1534+
-->
1535+
1536+
* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication.
1537+
* `options` {Object}
1538+
* `timeout` {number} The timeout in milliseconds for synchronous calls. **Default:** `5000`.
1539+
* `expandable` {boolean} Whether the buffer can be resized. **Default:** `true` if the buffer
1540+
supports `growable` option.
1541+
* Returns: {Object} An object with synchronous methods mirroring those exposed through [`worker.wire()`][].
1542+
1543+
Creates a synchronous API facade that communicates with a worker thread over a shared memory buffer.
1544+
The worker thread must call [`worker.wire()`][] on the same buffer to register the methods that can be called.
1545+
1546+
This function enables making synchronous calls to a worker thread, which is particularly useful
1547+
when code requires blocking operations but still wants to benefit from the worker thread's isolation.
1548+
1549+
```js
1550+
const { Worker, makeSync } = require('node:worker_threads');
1551+
1552+
// Create a SharedArrayBuffer for communication
1553+
const buffer = new SharedArrayBuffer(1024, {
1554+
maxByteLength: 64 * 1024 * 1024,
1555+
});
1556+
1557+
// Create a worker, passing the buffer
1558+
const worker = new Worker('worker-script.js', {
1559+
workerData: { buffer },
1560+
});
1561+
1562+
// Create a synchronous API facade
1563+
const api = makeSync(buffer);
1564+
1565+
// Call a method synchronously - this will block until the worker responds
1566+
const result = api.methodName(arg1, arg2);
1567+
```
1568+
1569+
## `worker.wire(buffer, methods)`
1570+
1571+
<!-- YAML
1572+
added: REPLACEME
1573+
-->
1574+
1575+
* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication.
1576+
* `methods` {Object} An object whose properties are methods to expose to the main thread.
1577+
1578+
Exposes methods to the main thread that can be called synchronously using [`worker.makeSync()`][].
1579+
The methods can be async functions or return promises, and the main thread will wait
1580+
for the promise to resolve or reject.
1581+
1582+
```js
1583+
const { workerData, wire } = require('node:worker_threads');
1584+
1585+
// Expose methods synchronously to the main thread
1586+
wire(workerData.buffer, {
1587+
async methodName(arg1, arg2) {
1588+
// Do work asynchronously
1589+
return result;
1590+
},
1591+
1592+
syncMethod(arg) {
1593+
// Do synchronous work
1594+
return result;
1595+
},
1596+
});
1597+
```
1598+
1599+
The `wire()` function should be called early in the worker's lifecycle to register
1600+
the methods before the main thread attempts to call them. Any values returned by
1601+
these methods are serialized and passed back to the main thread.
1602+
15301603
## Notes
15311604
15321605
### Synchronous blocking of stdio
@@ -1633,10 +1706,12 @@ thread spawned will spawn another until the application crashes.
16331706
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
16341707
[`vm`]: vm.md
16351708
[`worker.SHARE_ENV`]: #workershare_env
1709+
[`worker.makeSync()`]: #workermakesyncbuffer-options
16361710
[`worker.on('message')`]: #event-message_1
16371711
[`worker.postMessage()`]: #workerpostmessagevalue-transferlist
16381712
[`worker.terminate()`]: #workerterminate
16391713
[`worker.threadId`]: #workerthreadid_1
1714+
[`worker.wire()`]: #workerwirebuffer-methods
16401715
[async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool
16411716
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
16421717
[child processes]: child_process.md
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
'use strict';
2+
3+
const {
4+
AtomicsNotify,
5+
AtomicsStore,
6+
AtomicsWait,
7+
AtomicsWaitAsync,
8+
Int32Array,
9+
ObjectKeys,
10+
} = primordials;
11+
12+
const {
13+
codes: {
14+
ERR_WORKER_MESSAGING_TIMEOUT,
15+
},
16+
} = require('internal/errors');
17+
18+
const { read, write } = require('internal/worker/everysync/objects');
19+
const {
20+
OFFSET,
21+
TO_MAIN,
22+
TO_WORKER,
23+
} = require('internal/worker/everysync/indexes');
24+
25+
/**
26+
* Creates a synchronous API facade from a shared memory buffer.
27+
* This function is meant to be used in the main thread to communicate with
28+
* a worker thread that has called `wire()` on the same shared memory.
29+
* @param {SharedArrayBuffer} data - The shared memory buffer for communication
30+
* @param {object} [opts={}] - Options object
31+
* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations
32+
* @returns {object} - An object with methods that match the ones exposed by the worker
33+
*/
34+
function makeSync(data, opts = {}) {
35+
const timeout = opts.timeout || 1000;
36+
const metaView = new Int32Array(data);
37+
38+
const res = AtomicsWait(metaView, TO_WORKER, 0, timeout);
39+
AtomicsStore(metaView, TO_WORKER, 0);
40+
41+
if (res === 'ok') {
42+
const obj = read(data, OFFSET);
43+
44+
const api = {};
45+
for (const key of obj) {
46+
api[key] = (...args) => {
47+
write(data, { key, args }, OFFSET);
48+
AtomicsStore(metaView, TO_MAIN, 1);
49+
AtomicsNotify(metaView, TO_MAIN, 1);
50+
const res = AtomicsWait(metaView, TO_WORKER, 0, timeout);
51+
AtomicsStore(metaView, TO_WORKER, 0);
52+
if (res === 'ok') {
53+
const obj = read(data, OFFSET);
54+
return obj;
55+
}
56+
throw new ERR_WORKER_MESSAGING_TIMEOUT();
57+
};
58+
}
59+
60+
return api;
61+
}
62+
throw new ERR_WORKER_MESSAGING_TIMEOUT();
63+
}
64+
65+
/**
66+
* Wires up a shared memory buffer to invoke methods on an object.
67+
* This function is meant to be used in a worker thread to expose methods
68+
* to the main thread that has called `makeSync()` on the same shared memory.
69+
* @param {SharedArrayBuffer} data - The shared memory buffer for communication
70+
* @param {object} obj - Object with methods to expose to the main thread
71+
* @returns {Promise<void>} - A promise that never resolves unless there's an error
72+
*/
73+
async function wire(data, obj) {
74+
write(data, ObjectKeys(obj), OFFSET);
75+
76+
const metaView = new Int32Array(data);
77+
78+
AtomicsStore(metaView, TO_WORKER, 1);
79+
AtomicsNotify(metaView, TO_WORKER);
80+
81+
while (true) {
82+
const waitAsync = AtomicsWaitAsync(metaView, TO_MAIN, 0);
83+
const res = await waitAsync.value;
84+
AtomicsStore(metaView, TO_MAIN, 0);
85+
86+
if (res === 'ok') {
87+
const { key, args } = read(data, OFFSET);
88+
// This is where the magic happens - invoke the requested method
89+
const result = await obj[key](...args);
90+
write(data, result, OFFSET);
91+
AtomicsStore(metaView, TO_WORKER, 1);
92+
AtomicsNotify(metaView, TO_WORKER, 1);
93+
}
94+
}
95+
}
96+
97+
module.exports = {
98+
makeSync,
99+
wire,
100+
};
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
'use strict';
2+
3+
/**
4+
* Byte offset where the actual data begins in the shared memory
5+
* @type {number}
6+
*/
7+
const OFFSET = 64;
8+
9+
/**
10+
* Index in the Int32Array for signaling from worker to main thread
11+
* 0: writing from worker, reading from main
12+
* @type {number}
13+
*/
14+
const TO_WORKER = 0;
15+
16+
/**
17+
* Index in the Int32Array for signaling from main to worker thread
18+
* 1: writing from main, reading from worker
19+
* @type {number}
20+
*/
21+
const TO_MAIN = 1;
22+
23+
module.exports = {
24+
OFFSET,
25+
TO_WORKER,
26+
TO_MAIN,
27+
};
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
'use strict';
2+
3+
const {
4+
DataView,
5+
Uint8Array,
6+
} = primordials;
7+
8+
const {
9+
codes: {
10+
ERR_INVALID_BUFFER_SIZE,
11+
},
12+
} = require('internal/errors');
13+
14+
const { serialize, deserialize } = require('v8');
15+
16+
/**
17+
* Reads an object from a shared memory buffer
18+
* @param {SharedArrayBuffer} buffer - The shared memory buffer containing serialized data
19+
* @param {number} [byteOffset=0] - Byte offset where the data begins
20+
* @returns {any} - The deserialized object
21+
*/
22+
function read(buffer, byteOffset = 0) {
23+
const view = new DataView(buffer, byteOffset);
24+
const length = view.getUint32(0, true);
25+
const object = deserialize(new Uint8Array(buffer, byteOffset + 4, length));
26+
return object;
27+
}
28+
29+
/**
30+
* Writes an object to a shared memory buffer
31+
* @param {SharedArrayBuffer} buffer - The shared memory buffer to write to
32+
* @param {any} object - The object to serialize and write
33+
* @param {number} [byteOffset=0] - Byte offset where to write the data
34+
* @throws {Error} If the buffer is too small and not growable
35+
*/
36+
function write(buffer, object, byteOffset = 0) {
37+
const data = serialize(object);
38+
39+
if (buffer.byteLength < data.byteLength + 4 + byteOffset) {
40+
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype)
41+
if (typeof buffer.grow !== 'function') {
42+
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable');
43+
}
44+
buffer.grow(data.byteLength + 4 + byteOffset);
45+
}
46+
47+
const view = new DataView(buffer, byteOffset);
48+
view.setUint32(0, data.byteLength, true);
49+
new Uint8Array(buffer, byteOffset + 4).set(data);
50+
}
51+
52+
module.exports = {
53+
read,
54+
write,
55+
};

lib/worker_threads.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ const {
2929
isMarkedAsUntransferable,
3030
} = require('internal/buffer');
3131

32+
const {
33+
makeSync,
34+
wire,
35+
} = require('internal/worker/everysync/index');
36+
3237
module.exports = {
3338
isInternalThread,
3439
isMainThread,
@@ -49,4 +54,6 @@ module.exports = {
4954
BroadcastChannel,
5055
setEnvironmentData,
5156
getEnvironmentData,
57+
makeSync,
58+
wire,
5259
};

test/fixtures/everysync/echo.mjs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { workerData, wire } from 'node:worker_threads';
2+
3+
wire(workerData.data, {
4+
async echo(arg) {
5+
return arg;
6+
},
7+
});
8+
9+
// Keep the event loop alive
10+
setInterval(() => {}, 100000);
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { workerData, wire } from 'node:worker_threads';
2+
3+
wire(workerData.data, {
4+
fail(arg) {
5+
return new Promise((resolve, reject) => {
6+
// nothing to do here, we will fail
7+
});
8+
},
9+
});

0 commit comments

Comments
 (0)