From d221870f475d49e251a8cb9bdd41b9b31d2036f4 Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 18 Jun 2023 16:31:13 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20implement=20sync=20messe?= =?UTF-8?q?nger?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- demo/fsa/main.ts | 37 ++++++++++++++ demo/fsa/webpack.config.js | 14 +++++- demo/fsa/worker.ts | 30 ++++++++++++ package.json | 1 + src/fsa-to-node/worker/SyncMessenger.ts | 65 +++++++++++++++++++++++++ yarn.lock | 18 +++++++ 6 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 demo/fsa/worker.ts create mode 100644 src/fsa-to-node/worker/SyncMessenger.ts diff --git a/demo/fsa/main.ts b/demo/fsa/main.ts index 5c0c66be9..32452ec87 100644 --- a/demo/fsa/main.ts +++ b/demo/fsa/main.ts @@ -3,6 +3,8 @@ import type * as fsa from '../../src/fsa/types'; import {fsaToNode} from '../../src/fsa-to-node'; +import {SyncMessenger} from '../../src/fsa-to-node/worker/SyncMessenger'; +import {encode, decode} from 'json-joy/es6/json-pack/msgpack/util'; const demo = async (dir: fsa.IFileSystemDirectoryHandle) => { const fs = fsaToNode(dir); @@ -17,6 +19,41 @@ const demo = async (dir: fsa.IFileSystemDirectoryHandle) => { stream.write('abc'); stream.write('def'); stream.end('ghi'); + + const worker = new Worker('https://localhost:9876/worker.js'); + worker.onerror = (e) => { + console.log("error", e); + }; + + let sab: SharedArrayBuffer | undefined = undefined; + let channel: SyncMessenger | undefined = undefined; + + worker.onmessage = (e) => { + const data = e.data; + if (data && typeof data === 'object') { + console.log('<', data); + switch (data.type) { + case 'init': { + sab = data.sab; + channel = new SyncMessenger(sab!); + worker.postMessage({type: 'set-root', dir, id: 0}); + break; + } + case 'root-set': { + console.log('READY'); + const request = encode({type: 'readdir', path: ''}); + console.log('call sync', request); + const response = channel!.callSync(request); + const responseDecoded = decode(response as any); + console.log('response', responseDecoded); + break; + } + } + } + }; + + + }; const main = async () => { diff --git a/demo/fsa/webpack.config.js b/demo/fsa/webpack.config.js index dcad5f068..d366bcaec 100644 --- a/demo/fsa/webpack.config.js +++ b/demo/fsa/webpack.config.js @@ -5,7 +5,10 @@ const root = require('app-root-path'); module.exports = { mode: 'development', devtool: 'inline-source-map', - entry: __dirname + '/main', + entry: { + bundle: __dirname + '/main', + worker: __dirname + '/worker', + }, plugins: [ // new ForkTsCheckerWebpackPlugin(), new HtmlWebpackPlugin({ @@ -36,10 +39,17 @@ module.exports = { }, }, output: { - filename: 'bundle.js', + filename: '[name].js', path: path.resolve(root.path, 'dist'), }, devServer: { + // HTTPS is required for SharedArrayBuffer to work. + https: true, + headers: { + // These two headers are required for SharedArrayBuffer to work. + "Cross-Origin-Opener-Policy": "same-origin", + "Cross-Origin-Embedder-Policy": "require-corp", + }, port: 9876, hot: false, }, diff --git a/demo/fsa/worker.ts b/demo/fsa/worker.ts new file mode 100644 index 000000000..21d009606 --- /dev/null +++ b/demo/fsa/worker.ts @@ -0,0 +1,30 @@ +import {SyncMessenger} from "../../src/fsa-to-node/worker/SyncMessenger"; +import {encode, decode} from 'json-joy/es6/json-pack/msgpack/util'; +import {IFileSystemDirectoryHandle} from "../../src/fsa/types"; + +const sab: SharedArrayBuffer = new SharedArrayBuffer(1024 * 32); +const messenger = new SyncMessenger(sab); + +onmessage = (e) => { + const data = e.data; + console.log('>', data); + if (data && typeof data === 'object') { + switch (data.type) { + case 'set-root': { + postMessage({type: 'root-set', id: data.id}); + const dir = data.dir as IFileSystemDirectoryHandle; + messenger.serveAsync(async (request) => { + const message = decode(request as any); + const list: string[] = []; + for await (const key of dir.keys()) { + list.push(key); + } + return encode(list); + }); + break; + } + } + } +}; + +postMessage({type: 'init', sab}); diff --git a/package.json b/package.json index d70b20b11..723e40cde 100644 --- a/package.json +++ b/package.json @@ -78,6 +78,7 @@ }, "dependencies": { "fs-monkey": "^1.0.4", + "json-joy": "^9.2.0", "thingies": "^1.11.1" }, "devDependencies": { diff --git a/src/fsa-to-node/worker/SyncMessenger.ts b/src/fsa-to-node/worker/SyncMessenger.ts new file mode 100644 index 000000000..c94f4c834 --- /dev/null +++ b/src/fsa-to-node/worker/SyncMessenger.ts @@ -0,0 +1,65 @@ +type AsyncCallback = (request: Uint8Array) => Promise; + +const microSleepSync = () => { + /** @todo Replace this by synchronous XHR call. */ + Math.random(); +}; + +const sleepUntilSync = (condition: () => boolean) => { + while (!condition()) microSleepSync(); +}; + +/** + * `SyncMessenger` allows to execute asynchronous code synchronously. The + * asynchronous code is executed in a Worker, while the main thread is blocked + * until the asynchronous code is finished. + * + * First, four 4-byte works is header, where the first word is used for Atomics + * notifications. The second word is used for spin-locking the main thread until + * the asynchronous code is finished. The third word is used to specify payload + * length. The fourth word is currently unused. + * + * The maximum payload size is the size of the SharedArrayBuffer minus the + * header size. + */ +export class SyncMessenger { + protected readonly int32: Int32Array; + protected readonly uint8: Uint8Array; + protected readonly headerSize; + protected readonly dataSize; + + public constructor(protected readonly sab: SharedArrayBuffer) { + this.int32 = new Int32Array(sab); + this.uint8 = new Uint8Array(sab); + this.headerSize = 4 * 4; + this.dataSize = sab.byteLength - this.headerSize; + } + + public callSync(data: Uint8Array): Uint8Array { + const requestLength = data.length; + this.int32[1] = 0; + this.int32[2] = requestLength; + this.uint8.set(data, this.headerSize); + Atomics.notify(this.int32, 0); + sleepUntilSync(() => this.int32[1] === 1); + const responseLength = this.int32[2]; + const response = this.uint8.slice(this.headerSize, this.headerSize + responseLength); + return response; + } + + public serveAsync(callback: AsyncCallback): void { + (async () => { + try { + const res = Atomics.wait(this.int32, 0, 0); + if (res !== 'ok') throw new Error(`Unexpected Atomics.wait result: ${res}`); + const requestLength = this.int32[2]; + const request = this.uint8.slice(this.headerSize, this.headerSize + requestLength); + const response = await callback(request); + const responseLength = response.length; + this.int32[2] = responseLength; + this.uint8.set(response, this.headerSize); + this.int32[1] = 1; + } catch {} + })().catch(() => {}); + } +} diff --git a/yarn.lock b/yarn.lock index 5057de49e..be0c494c9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1666,6 +1666,11 @@ arg@^4.1.0: resolved "https://registry.yarnpkg.com/arg/-/arg-4.1.3.tgz#269fc7ad5b8e42cb63c896d5666017261c144089" integrity sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA== +arg@^5.0.2: + version "5.0.2" + resolved "https://registry.yarnpkg.com/arg/-/arg-5.0.2.tgz#c81433cc427c92c4dcf4865142dbca6f15acd59c" + integrity sha512-PYjyFOLKQ9y57JvQ6QLo8dAgNqswh8M1RMJYdQduT6xbWSgK36P/Z/v+p888pM69jMMfS8Xd8F6I1kQ/I9HUGg== + argparse@^1.0.7: version "1.0.10" resolved "https://registry.yarnpkg.com/argparse/-/argparse-1.0.10.tgz#bcd6791ea5ae09725e17e5ad988134cd40b3d911" @@ -3488,6 +3493,11 @@ husky@^8.0.1: resolved "https://registry.yarnpkg.com/husky/-/husky-8.0.3.tgz#4936d7212e46d1dea28fef29bb3a108872cd9184" integrity sha512-+dQSyqPh4x1hlO1swXBiNb2HzTDN1I2IGLQx1GrBuiqFJfoMrnZWwVmatvSiO+Iz8fBUnf+lekwNo4c2LlXItg== +hyperdyperid@^1.2.0: + version "1.2.0" + resolved "https://registry.yarnpkg.com/hyperdyperid/-/hyperdyperid-1.2.0.tgz#59668d323ada92228d2a869d3e474d5a33b69e6b" + integrity sha512-Y93lCzHYgGWdrJ66yIktxiaGULYc6oGiABxhcO5AufBeOyoIdZF7bIfLaOrbM0iGIOXQQgxxRrFEnb+Y6w1n4A== + iconv-lite@0.4.24: version "0.4.24" resolved "https://registry.yarnpkg.com/iconv-lite/-/iconv-lite-0.4.24.tgz#2022b4b25fbddc21d2f524974a474aafe733908b" @@ -4270,6 +4280,14 @@ jsesc@^2.5.1: resolved "https://registry.yarnpkg.com/jsesc/-/jsesc-2.5.2.tgz#80564d2e483dacf6e8ef209650a67df3f0c283a4" integrity sha512-OYu7XEzjkCQ3C5Ps3QIZsQfNpqoJyZZA99wd9aWd05NCtC5pWOkShK2mkL6HXQR6/Cy2lbNdPlZBpuQHXE63gA== +json-joy@^9.2.0: + version "9.2.0" + resolved "https://registry.yarnpkg.com/json-joy/-/json-joy-9.2.0.tgz#6637140f0518ea73fe7829d20b4f224c50b364a5" + integrity sha512-SofrNJCCRuwOH2OnjUVNGVDswmWJbrlh0AnrWRUyG52FQBi+Tu1PmXV2m1j4S4yYuFbWO+2SvpxQ+6z+J5AM7A== + dependencies: + arg "^5.0.2" + hyperdyperid "^1.2.0" + json-parse-better-errors@^1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/json-parse-better-errors/-/json-parse-better-errors-1.0.2.tgz#bb867cfb3450e69107c131d1c514bab3dc8bcaa9"