Skip to content

Commit

Permalink
feat: 🎸 implement sync messenger
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Jun 18, 2023
1 parent f874849 commit d221870
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 2 deletions.
37 changes: 37 additions & 0 deletions demo/fsa/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import type * as fsa from '../../src/fsa/types';
import {fsaToNode} from '../../src/fsa-to-node';
import {SyncMessenger} from '../../src/fsa-to-node/worker/SyncMessenger';
import {encode, decode} from 'json-joy/es6/json-pack/msgpack/util';

const demo = async (dir: fsa.IFileSystemDirectoryHandle) => {
const fs = fsaToNode(dir);
Expand All @@ -17,6 +19,41 @@ const demo = async (dir: fsa.IFileSystemDirectoryHandle) => {
stream.write('abc');
stream.write('def');
stream.end('ghi');

const worker = new Worker('https://localhost:9876/worker.js');
worker.onerror = (e) => {
console.log("error", e);
};

let sab: SharedArrayBuffer | undefined = undefined;
let channel: SyncMessenger | undefined = undefined;

worker.onmessage = (e) => {
const data = e.data;
if (data && typeof data === 'object') {
console.log('<', data);
switch (data.type) {
case 'init': {
sab = data.sab;
channel = new SyncMessenger(sab!);
worker.postMessage({type: 'set-root', dir, id: 0});
break;
}
case 'root-set': {
console.log('READY');
const request = encode({type: 'readdir', path: ''});
console.log('call sync', request);
const response = channel!.callSync(request);
const responseDecoded = decode(response as any);
console.log('response', responseDecoded);
break;
}
}
}
};



};

const main = async () => {
Expand Down
14 changes: 12 additions & 2 deletions demo/fsa/webpack.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ const root = require('app-root-path');
module.exports = {
mode: 'development',
devtool: 'inline-source-map',
entry: __dirname + '/main',
entry: {
bundle: __dirname + '/main',
worker: __dirname + '/worker',
},
plugins: [
// new ForkTsCheckerWebpackPlugin(),
new HtmlWebpackPlugin({
Expand Down Expand Up @@ -36,10 +39,17 @@ module.exports = {
},
},
output: {
filename: 'bundle.js',
filename: '[name].js',
path: path.resolve(root.path, 'dist'),
},
devServer: {
// HTTPS is required for SharedArrayBuffer to work.
https: true,
headers: {
// These two headers are required for SharedArrayBuffer to work.
"Cross-Origin-Opener-Policy": "same-origin",
"Cross-Origin-Embedder-Policy": "require-corp",
},
port: 9876,
hot: false,
},
Expand Down
30 changes: 30 additions & 0 deletions demo/fsa/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import {SyncMessenger} from "../../src/fsa-to-node/worker/SyncMessenger";
import {encode, decode} from 'json-joy/es6/json-pack/msgpack/util';
import {IFileSystemDirectoryHandle} from "../../src/fsa/types";

const sab: SharedArrayBuffer = new SharedArrayBuffer(1024 * 32);
const messenger = new SyncMessenger(sab);

onmessage = (e) => {
const data = e.data;
console.log('>', data);
if (data && typeof data === 'object') {
switch (data.type) {
case 'set-root': {
postMessage({type: 'root-set', id: data.id});
const dir = data.dir as IFileSystemDirectoryHandle;
messenger.serveAsync(async (request) => {
const message = decode(request as any);
const list: string[] = [];
for await (const key of dir.keys()) {
list.push(key);
}
return encode(list);
});
break;
}
}
}
};

postMessage({type: 'init', sab});
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
},
"dependencies": {
"fs-monkey": "^1.0.4",
"json-joy": "^9.2.0",
"thingies": "^1.11.1"
},
"devDependencies": {
Expand Down
65 changes: 65 additions & 0 deletions src/fsa-to-node/worker/SyncMessenger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
type AsyncCallback = (request: Uint8Array) => Promise<Uint8Array>;

const microSleepSync = () => {
/** @todo Replace this by synchronous XHR call. */
Math.random();
};

const sleepUntilSync = (condition: () => boolean) => {
while (!condition()) microSleepSync();
};

/**
* `SyncMessenger` allows to execute asynchronous code synchronously. The
* asynchronous code is executed in a Worker, while the main thread is blocked
* until the asynchronous code is finished.
*
* First, four 4-byte works is header, where the first word is used for Atomics
* notifications. The second word is used for spin-locking the main thread until
* the asynchronous code is finished. The third word is used to specify payload
* length. The fourth word is currently unused.
*
* The maximum payload size is the size of the SharedArrayBuffer minus the
* header size.
*/
export class SyncMessenger {
protected readonly int32: Int32Array;
protected readonly uint8: Uint8Array;
protected readonly headerSize;
protected readonly dataSize;

public constructor(protected readonly sab: SharedArrayBuffer) {
this.int32 = new Int32Array(sab);
this.uint8 = new Uint8Array(sab);
this.headerSize = 4 * 4;
this.dataSize = sab.byteLength - this.headerSize;
}

public callSync(data: Uint8Array): Uint8Array {
const requestLength = data.length;
this.int32[1] = 0;
this.int32[2] = requestLength;
this.uint8.set(data, this.headerSize);
Atomics.notify(this.int32, 0);
sleepUntilSync(() => this.int32[1] === 1);
const responseLength = this.int32[2];
const response = this.uint8.slice(this.headerSize, this.headerSize + responseLength);
return response;
}

public serveAsync(callback: AsyncCallback): void {
(async () => {
try {
const res = Atomics.wait(this.int32, 0, 0);
if (res !== 'ok') throw new Error(`Unexpected Atomics.wait result: ${res}`);
const requestLength = this.int32[2];
const request = this.uint8.slice(this.headerSize, this.headerSize + requestLength);
const response = await callback(request);
const responseLength = response.length;
this.int32[2] = responseLength;
this.uint8.set(response, this.headerSize);
this.int32[1] = 1;
} catch {}
})().catch(() => {});
}
}
18 changes: 18 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,11 @@ arg@^4.1.0:
resolved "https://registry.yarnpkg.com/arg/-/arg-4.1.3.tgz#269fc7ad5b8e42cb63c896d5666017261c144089"
integrity sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==

arg@^5.0.2:
version "5.0.2"
resolved "https://registry.yarnpkg.com/arg/-/arg-5.0.2.tgz#c81433cc427c92c4dcf4865142dbca6f15acd59c"
integrity sha512-PYjyFOLKQ9y57JvQ6QLo8dAgNqswh8M1RMJYdQduT6xbWSgK36P/Z/v+p888pM69jMMfS8Xd8F6I1kQ/I9HUGg==

argparse@^1.0.7:
version "1.0.10"
resolved "https://registry.yarnpkg.com/argparse/-/argparse-1.0.10.tgz#bcd6791ea5ae09725e17e5ad988134cd40b3d911"
Expand Down Expand Up @@ -3488,6 +3493,11 @@ husky@^8.0.1:
resolved "https://registry.yarnpkg.com/husky/-/husky-8.0.3.tgz#4936d7212e46d1dea28fef29bb3a108872cd9184"
integrity sha512-+dQSyqPh4x1hlO1swXBiNb2HzTDN1I2IGLQx1GrBuiqFJfoMrnZWwVmatvSiO+Iz8fBUnf+lekwNo4c2LlXItg==

hyperdyperid@^1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/hyperdyperid/-/hyperdyperid-1.2.0.tgz#59668d323ada92228d2a869d3e474d5a33b69e6b"
integrity sha512-Y93lCzHYgGWdrJ66yIktxiaGULYc6oGiABxhcO5AufBeOyoIdZF7bIfLaOrbM0iGIOXQQgxxRrFEnb+Y6w1n4A==

iconv-lite@0.4.24:
version "0.4.24"
resolved "https://registry.yarnpkg.com/iconv-lite/-/iconv-lite-0.4.24.tgz#2022b4b25fbddc21d2f524974a474aafe733908b"
Expand Down Expand Up @@ -4270,6 +4280,14 @@ jsesc@^2.5.1:
resolved "https://registry.yarnpkg.com/jsesc/-/jsesc-2.5.2.tgz#80564d2e483dacf6e8ef209650a67df3f0c283a4"
integrity sha512-OYu7XEzjkCQ3C5Ps3QIZsQfNpqoJyZZA99wd9aWd05NCtC5pWOkShK2mkL6HXQR6/Cy2lbNdPlZBpuQHXE63gA==

json-joy@^9.2.0:
version "9.2.0"
resolved "https://registry.yarnpkg.com/json-joy/-/json-joy-9.2.0.tgz#6637140f0518ea73fe7829d20b4f224c50b364a5"
integrity sha512-SofrNJCCRuwOH2OnjUVNGVDswmWJbrlh0AnrWRUyG52FQBi+Tu1PmXV2m1j4S4yYuFbWO+2SvpxQ+6z+J5AM7A==
dependencies:
arg "^5.0.2"
hyperdyperid "^1.2.0"

json-parse-better-errors@^1.0.1:
version "1.0.2"
resolved "https://registry.yarnpkg.com/json-parse-better-errors/-/json-parse-better-errors-1.0.2.tgz#bb867cfb3450e69107c131d1c514bab3dc8bcaa9"
Expand Down

0 comments on commit d221870

Please sign in to comment.