Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support telemetry batching and move WebSocket handling to worker #7391

Merged
merged 26 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
69b2f05
Support subscription batching from API, Tables, and Plots
akhenry Jan 9, 2024
a87ffee
Added batching worker
akhenry Jan 11, 2024
2f2af0b
Added configurable batch size and throttling rate
akhenry Jan 13, 2024
0061d16
Support batch size based throttling
akhenry Jan 16, 2024
74c1cdf
Default to latest strategy
akhenry Jan 16, 2024
e530fd8
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 17, 2024
c28ced5
Don't hide original error
akhenry Jan 18, 2024
947810b
Added copyright statement
akhenry Jan 19, 2024
f079c3a
Renamed BatchingWebSocketProvider to BatchingWebSocket
akhenry Jan 19, 2024
4c86b66
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 19, 2024
61edd0f
Adding docs
akhenry Jan 19, 2024
2e3dc4d
renamed class. changed throttling strategy to be driven by the main t…
akhenry Jan 21, 2024
0bdd5ef
Renamed classes
akhenry Jan 21, 2024
ad02ba7
Added more documentation
akhenry Jan 22, 2024
c4a3ace
Fixed broken tests
akhenry Jan 23, 2024
e135498
Addressed review comments
akhenry Jan 24, 2024
34b36d5
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 24, 2024
12b921a
Clean up and reconnect on websocket close
akhenry Jan 24, 2024
a6c4a45
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 26, 2024
6a04597
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 26, 2024
3f44613
Better management of subscription strategies
akhenry Jan 28, 2024
c4f83c1
Add tests to catch edge cases where two subscribers request different…
akhenry Jan 28, 2024
2cfd729
Ensure callbacks are invoked with telemetry in the requested format
akhenry Jan 29, 2024
a6bb0da
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 29, 2024
b93647b
Remove console out. Oops
akhenry Jan 29, 2024
cdf549b
Fix linting errors
akhenry Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added batching worker
  • Loading branch information
akhenry committed Jan 11, 2024
commit a87ffee2649f4d72419417e7266b0996345d9fa1
60 changes: 60 additions & 0 deletions src/api/telemetry/BatchingWebSocketProvider.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import installWorker from './WebSocketWorker.js';

class BatchingWebSocketProvider extends EventTarget {
#worker;

constructor() {
super();
// Install worker, register listeners etc.
const workerFunction = `(${installWorker.toString()})()`;
const workerBlob = new Blob([workerFunction]);
const workerUrl = URL.createObjectURL(workerBlob, { type: 'application/javascript' });
this.#worker = new Worker(workerUrl);

this.routeMessageToHandler = this.routeMessageToHandler.bind(this);
this.#worker.addEventListener('message', this.routeMessageToHandler);
}

connect(url) {
this.#worker.postMessage({
type: 'connect',
url
});
}

disconnect() {
this.#worker.postMessage({ type: 'disconnect' });
}

sendMessage(message) {
this.#worker.postMessage({
type: 'message',
message
});
}

setBatchingStrategy(strategy) {
const serializedStrategy = {
shouldBatchMessage: strategy.shouldBatchMessage.toString(),
getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString()
};

this.#worker.postMessage({
type: 'setBatchingStrategy',
serializedStrategy
});
}

routeMessageToHandler(message) {
// Batch message would need to be handle differently here
if (message.data.type === 'batch') {
this.dispatchEvent(new MessageEvent('batch', { data: message.data }));
} else if (message.data.type === 'message') {
this.dispatchEvent(new MessageEvent('message', { data: message.data }));
} else {
throw new Error(`Unknown message type: ${message.data.type}`);
}
}
}

export default BatchingWebSocketProvider;
2 changes: 2 additions & 0 deletions src/api/telemetry/TelemetryAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import objectUtils from 'objectUtils';

import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js';
import BatchingWebSocketProvider from './BatchingWebSocketProvider.js';
import DefaultMetadataProvider from './DefaultMetadataProvider.js';
import TelemetryCollection from './TelemetryCollection.js';
import TelemetryMetadataManager from './TelemetryMetadataManager.js';
Expand Down Expand Up @@ -95,6 +96,7 @@ export default class TelemetryAPI {
this.valueFormatterCache = new WeakMap();
this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry();
this.#isGreedyLAD = true;
this.BatchingWebSocketProvider = BatchingWebSocketProvider;
}

abortAllRequests() {
Expand Down
295 changes: 295 additions & 0 deletions src/api/telemetry/WebSocketWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
/* eslint-disable max-classes-per-file */
export default function installWorker() {
const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000];

class ResilientWebSocket extends EventTarget {
#webSocket;
#isConnected = false;
#isConnecting = false;
#messageQueue = [];
#reconnectTimeoutHandle;
#currentWaitIndex = 0;
#messageCallbacks = [];
#wsUrl;

connect(url) {
this.#wsUrl = url;
if (this.#isConnected) {
throw new Error('WebSocket already connected');
}

if (this.#isConnecting) {
throw new Error('WebSocket connection in progress');
}

this.#isConnecting = true;

this.#webSocket = new WebSocket(url);

const boundConnected = this.#connected.bind(this);
this.#webSocket.addEventListener('open', boundConnected);

const boundCleanUpAndReconnect = this.#cleanUpAndReconnect.bind(this);
this.#webSocket.addEventListener('error', boundCleanUpAndReconnect);

const boundDisconnect = this.disconnect.bind(this);
this.#webSocket.addEventListener('close', boundCleanUpAndReconnect);

const boundMessage = this.#message.bind(this);
this.#webSocket.addEventListener('message', boundMessage);

this.addEventListener(
'disconnected',
() => {
this.#webSocket.removeEventListener('open', boundConnected);
this.#webSocket.removeEventListener('error', boundCleanUpAndReconnect);
this.#webSocket.removeEventListener('close', boundDisconnect);
},
{ once: true }
);
}

//Do not use Event dispatching for this. Unnecessary overhead.
registerMessageCallback(callback) {
this.#messageCallbacks.push(callback);

return () => {
this.#messageCallbacks = this.#messageCallbacks.filter((cb) => cb !== callback);
};
}

#connected() {
console.debug('Websocket connected.');
this.#isConnected = true;
this.#isConnecting = false;
this.#currentWaitIndex = 0;

this.dispatchEvent(new Event('connected'));

this.#flushQueue();
}

#cleanUpAndReconnect() {
console.warn('Websocket closed. Attempting to reconnect...');
this.disconnect();
this.#reconnect();
}

#message(event) {
this.#messageCallbacks.forEach((callback) => callback(event.data));
}
disconnect() {
this.#isConnected = false;
this.#isConnecting = false;

if (this.#webSocket.readyState === WebSocket.OPEN) {
this.#webSocket.close();
}

this.dispatchEvent(new Event('disconnected'));
this.#webSocket = undefined;
}

#reconnect() {
if (this.#reconnectTimeoutHandle) {
return;
}

this.#reconnectTimeoutHandle = setTimeout(() => {
this.connect(this.#wsUrl);

this.#reconnectTimeoutHandle = undefined;
}, FALLBACK_AND_WAIT_MS[this.#currentWaitIndex]);

if (this.#currentWaitIndex < FALLBACK_AND_WAIT_MS.length - 1) {
this.#currentWaitIndex++;
}
}

enqueueMessage(message) {
this.#messageQueue.push(message);
this.#flushQueueIfReady();
}

#flushQueueIfReady() {
if (this.#isConnected) {
this.#flushQueue();
}
}

#flushQueue() {
while (this.#messageQueue.length > 0) {
if (!this.#isConnected) {
break;
}

const message = this.#messageQueue.shift();
this.#webSocket.send(message);
}
}
}

class WorkerToWebSocketMessageBroker {
#websocket;
#messageBatcher;

constructor(websocket, messageBatcher) {
this.#websocket = websocket;
this.#messageBatcher = messageBatcher;
}

routeMessageToHandler(message) {
const { type } = message.data;
switch (type) {
case 'connect':
this.connect(message);
break;
case 'disconnect':
this.disconnect(message);
break;
case 'message':
this.#websocket.enqueueMessage(message.data.message);
break;
case 'setBatchingStrategy':
this.setBatchingStrategy(message);
break;
case 'setRate':
this.setRate(message);
break;
default:
throw new Error(`Unknown message type: ${type}`);
}
}
connect(message) {
const { url } = message.data;
this.#websocket.connect(url);
}
disconnect() {
this.#websocket.disconnect();
}
message(message) {
const { subscribeMessage } = message.data;
this.#websocket.enqueueMessage(subscribeMessage);
}
setBatchingStrategy(message) {
const { serializedStrategy } = message.data;
const batchingStrategy = {
// eslint-disable-next-line no-new-func
shouldBatchMessage: new Function(serializedStrategy.shouldBatchMessage),
// eslint-disable-next-line no-new-func
getBatchIdFromMessage: new Function(serializedStrategy.getBatchIdFromMessage)
// Will also include maximum batch length here
};
this.#messageBatcher.setBatchingStrategy(batchingStrategy);
}
setRate(message) {
const { rate } = message.data;
this.#throttledTelemetryEmitter.setRate(rate);
}
}

class WebSocketToWorkerMessageBroker {
#websocket;
#worker;
#messageBatcher;

constructor(websocket, messageBatcher, worker) {
this.#websocket = websocket;
this.#messageBatcher = messageBatcher;
this.#worker = worker;
}

routeMessageToHandler(data) {
//Implement batching here
if (this.#messageBatcher.shouldBatchMessage(data)) {
this.#messageBatcher.addMessageToBatch(data);
} else {
this.#worker.postMessage(data);
}
}
}

class MessageBatcher {
#batch;
#batchingStrategy;

constructor() {
this.resetBatch();
}
resetBatch() {
this.#batch = {};
}
setBatchingStrategy(strategy) {
this.#batchingStrategy = strategy;
}
shouldBatchMessage(message) {
return (
this.#batchingStrategy.shouldBatchMessage &&
this.#batchingStrategy.shouldBatchMessage(message)
);
}
addMessageToBatch(message) {
const batchId = this.#batchingStrategy.getBatchIdFromMessage(message);

if (this.#batch[batchId] === undefined) {
this.#batch[batchId] = [message];
} else {
this.#batch[batchId].push(message);
}
}
nextBatch() {
const batch = this.#batch;
this.resetBatch();

return batch;
}
}

class ThrottledTelemetryEmitter {
#rate;
#messageBatcher;
#worker;
#intervalHandle;

constructor(messageBatcher, worker) {
this.#messageBatcher = messageBatcher;
this.#worker = worker;
}

setRate(rate) {
this.#rate = rate;
this.#stop();
this.#start();
}

#start() {
if (this.#intervalHandle) {
return;
}

this.#intervalHandle = setInterval(() => {
const batch = this.#messageBatcher.nextBatch();
this.#worker.postMessage(batch);
}, this.#rate);
}

#stop() {
if (this.#intervalHandle) {
clearInterval(this.#intervalHandle);
this.#intervalHandle = undefined;
}
}
}

const websocket = new ResilientWebSocket();
const messageBatcher = new MessageBatcher();
const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher);
const websocketBroker = new WebSocketToWorkerMessageBroker(websocket, messageBatcher, self);

self.addEventListener('message', (message) => {
workerBroker.routeMessageToHandler(message);
});
websocket.registerMessageCallback((data) => {
websocketBroker.routeMessageToHandler(data);
});
}